Spark lineage issue and how to handle it with Hive Warehouse Connector

Preamble

One of my teammate has submitted me an interesting issue. In a Spark script he was reading a table partition, doing some operations on the resulting DataFrame and then tried to overwrite the modified DataFrame back in the same partition.

Obviously this was hanging so this blog post… Internally this hanging problem is not a bug but a feature called Spark lineage. It avoid for exemple to loose or corrupt you data in case of a crash of your process.

We are using HDP 3.1.4 and so Spark 2.3.2.3.1.4.0-315. So the below script will use Hive Warehouse Connector (HWC).

Test case and problem

The creation script of my small test table is the following:

DROP TABLE yannick.test01 purge;
CREATE TABLE yannick.test01(val int, descr string) partitioned BY (fab string, lot_partition string) stored AS orc;
 
INSERT INTO yannick.test01 PARTITION(fab='GVA', lot_partition='TEST') VALUES(1,'One');
INSERT INTO yannick.test01 PARTITION(fab='GVA', lot_partition='TEST') VALUES(2,'Two');
INSERT INTO yannick.test01 PARTITION(fab='GVA', lot_partition='TEST') VALUES(3,'Three');

In Beeline it gives:

0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> SELECT * FROM yannick.test01;
+-------------+---------------+-------------+-----------------------+
| test01.val  | test01.descr  | test01.fab  | test01.lot_partition  |
+-------------+---------------+-------------+-----------------------+
| 1           | One           | GVA         | TEST                  |
| 2           | Two           | GVA         | TEST                  |
| 3           | Three         | GVA         | TEST                  |
+-------------+---------------+-------------+-----------------------+
3 ROWS selected (0.317 seconds)

To simulate a modification of the current partition in a DataFrame and the write back I have written below PySpark script:

>>> from pyspark_llap import HiveWarehouseSession
>>> from pyspark.sql.functions import *
>>> hive = HiveWarehouseSession.session(spark).build()
>>> df01=hive.executeQuery("select * from yannick.test01");
>>> df02=df01.withColumn('val',col('val')+1)
>>> df02=df01.withColumn('val',col('val')+1)
>>> df02.show()
+---+-----+---+-------------+
|val|descr|fab|lot_partition|
+---+-----+---+-------------+
|  4|Three|GVA|         TEST|
|  3|  Two|GVA|         TEST|
|  2|  One|GVA|         TEST|
+---+-----+---+-------------+

I have added 1 to val column values. But when I try to write it back the hanging command is this one:

df02.write.mode('overwrite').format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("partition", "fab,lot_partition").option('table','yannick.test01').save()

While if you try to append figures it works well:

df02.write.mode('append').format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("partition", "fab,lot_partition").option('table','yannick.test01').save()

My teammate has implemented a solution to write in a temporary table (through HWC) and then launch another process (because doing the two operations in a single script is hanging) selecting this temporary table to insert overwrite back in final table, working but not sexy.

Spark lineage solution

In fact as we will see there is no magic solution and overall this Spark lineage is a good thing. I have simply found a solution to do it all in one single script. And, at least, it simplifies our schedule.

I write this intermediate DataFrame in a Spark ORC table (versus a Hive table accessible through HWC):

>>> df02.write.format('orc').mode('overwrite').saveAsTable('temporary_table')
>>> df03=sql('select * from temporary_table');
>>> df03.show()
+---+-----+---+-------------+
|val|descr|fab|lot_partition|
+---+-----+---+-------------+
|  4|Three|GVA|         TEST|
|  3|  Two|GVA|         TEST|
|  2|  One|GVA|         TEST|
+---+-----+---+-------------+

Later you can use things like:

sql('select * from temporary_table').show()
sql('show tables').show()
sql('drop table temporary_table purge')

I was also wondering where are going those tables because I did not see it in default database of the traditional Hive managed table directory:

[hdfs@client_node ~]$ hdfs dfs -ls  /warehouse/tablespace/managed/
Found 1 items
drwxrwxrwx+  - hive hadoop          0 2020-03-09 16:33 /warehouse/tablespace/managed/hive

Destination is set by those Ambari/Spark parameters:

spark_lineage01
spark_lineage01
[hdfs@server ~]$ hdfs dfs -ls /apps/spark/warehouse
Found 1 items
drwxr-xr-x   - mfgdl_ingestion hdfs          0 2020-03-20 14:21 /apps/spark/warehouse/temporary_table
sql('select * from temporary_table').write.mode('overwrite').format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).
option("partition", "fab,lot_partition").option('table','yannick.test01').save()

It also clarify (really ?) a bit this story of Spark Metastore vs Hive Metastore…

Spark lineage second problem and partial solution

Previous solution went well till my teammate told me that it was still hanging and he shared his code with me. I noticed he was using, for performance reason, the persist() function when reading the source table in a DataFrame:

scala> val df01=hive.executeQuery("select * from yannick.test01").persist()

I have found a mention of this is in Close HiveWarehouseSession operations:

Spark can invoke operations, such as cache(), persist(), and rdd(), on a DataFrame you obtain from running a HiveWarehouseSession executeQuery() or table(). The Spark operations can lock Hive resources. You can release any locks and resources by calling the HiveWarehouseSession close().

So I tried using below Spark Scale code:

scala> import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.hwc.HiveWarehouseSession
 
scala> import com.hortonworks.hwc.HiveWarehouseSession._
import com.hortonworks.hwc.HiveWarehouseSession._
 
scala> val HIVE_WAREHOUSE_CONNECTOR="com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector"
HIVE_WAREHOUSE_CONNECTOR: String = com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector
 
scala> val hive = HiveWarehouseSession.session(spark).build()
hive: com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl = com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl@25f3207e
 
scala> val df01=hive.executeQuery("select * from yannick.test01").persist()
df01: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [val: int, descr: string ... 2 more fields]
 
scala> df01.show()
20/03/24 18:29:51 WARN TaskSetManager: Stage 0 contains a task of very large size (439 KB). The maximum recommended task size is 100 KB.
+---+-----+---+-------------+
|val|descr|fab|lot_partition|
+---+-----+---+-------------+
|  3|Three|GVA|         TEST|
|  1|  One|GVA|         TEST|
|  2|  Two|GVA|         TEST|
+---+-----+---+-------------+
 
 
scala> val df02=df01.withColumn("val",$"val" + 1)
df02: org.apache.spark.sql.DataFrame = [val: int, descr: string ... 2 more fields]
 
scala> df02.show()
20/03/24 18:30:16 WARN TaskSetManager: Stage 1 contains a task of very large size (439 KB). The maximum recommended task size is 100 KB.
+---+-----+---+-------------+
|val|descr|fab|lot_partition|
+---+-----+---+-------------+
|  4|Three|GVA|         TEST|
|  2|  One|GVA|         TEST|
|  3|  Two|GVA|         TEST|
+---+-----+---+-------------+
 
 
scala> hive.close()
 
scala> val hive = HiveWarehouseSession.session(spark).build()
hive: com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl = com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl@1c9f274d
 
scala> df02.write.mode("overwrite").format(HIVE_WAREHOUSE_CONNECTOR).option("partition", "fab,lot_partition").option("table","yannick.test01").save()
20/03/24 18:33:44 WARN TaskSetManager: Stage 2 contains a task of very large size (439 KB). The maximum recommended task size is 100 KB.

And voila my table has been correctly written back without any hanging. Looks like a marvelous solution till I received a feedback from my teammate that is using PySpark:

>>> hive.close()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'HiveWarehouseSessionImpl' object has no attribute 'close'

I have tried to have a look to this HWC source code and apparently the close() function has not been exposed to be used in PySpark… Definitively since we moved to HPD 3 this HWC implementation looks not very mature and we have already identified many issue with it…

If someone has found something interesting please share and I might come back on this article if we find a sexy solution…

References

About Post Author

Share the knowledge!
This entry was posted in Hadoop and tagged . Bookmark the permalink.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>