Table of contents
Preamble
Hive statistics: if like me you come from the RDBMS world I’ll not make any offense re-explaining the importance of good statistics (table, columns, ..) to help the optimizer to choose the best approach to execute your SQL statements. In this post you will see a concrete example why it is also super important in an Hadoop cluster !
We have the (bad) habit to load incrementally figures in our Hive partitioned table without gathering statistics each time. In short this is clearly not a best practice !
Then to create a more refined table we are joining two tables and the initial question was: do we need to put all predicates in the JOIN ON keyword or it can be added in a traditional WHERE clause and Hive optimizer is clever enough to do what is usually called predicate push-down.
We have Hortonworks Hadoop edition called HDP 2.6.4 so running Hive 1.2.1000.
The problematic queries
We were trying to load a third table from two sources with (query01):
SELECT dpf.fab,dpf.start_t,dpf.finish_t,dpf.lot_id,dpf.wafer_id,dpf.flow_id,dpf.die_x,dpf.die_y,dpf.part_seq,dpf.parameters,dpf.ingestion_date,dpf.lot_partition FROM prod_ews_refined.tbl_die_param_flow dpf join prod_ews_refined.tbl_die_bin db ON dpf.fab="C2WF" AND dpf.lot_partition="Q926" AND dpf.die_partition=9 AND db.pass_fail="F" AND db.fab="C2WF" AND db.lot_partition="Q926" AND db.die_partition=9 AND dpf.fab=db.fab AND dpf.lot_partition=db.lot_partition AND dpf.start_t=db.start_t AND dpf.finish_t= db.finish_t AND dpf.lot_id=db.lot_id AND dpf.wafer_id=db.wafer_id AND dpf.flow_id=db.flow_id AND dpf.die_x=db.die_x AND dpf.die_y=db.die_y AND dpf.part_seq=db.part_seq AND dpf.ingestion_date<"201911291611" AND db.ingestion_date<"201911291611"; |
And comparing with its sister, I would say written more logically, but in fact not following best practices (query02):
SELECT dpf.fab,dpf.start_t,dpf.finish_t,dpf.lot_id,dpf.wafer_id,dpf.flow_id,dpf.die_x,dpf.die_y,dpf.part_seq,dpf.parameters,dpf.ingestion_date,dpf.lot_partition FROM prod_ews_refined.tbl_die_param_flow dpf join prod_ews_refined.tbl_die_bin db ON dpf.fab=db.fab AND dpf.lot_partition=db.lot_partition AND dpf.die_partition=db.die_partition AND dpf.start_t=db.start_t AND dpf.finish_t= db.finish_t AND dpf.lot_id=db.lot_id AND dpf.wafer_id=db.wafer_id AND dpf.flow_id=db.flow_id AND dpf.die_x=db.die_x AND dpf.die_y=db.die_y AND dpf.part_seq=db.part_seq WHERE dpf.fab="C2WF" AND dpf.lot_partition="Q926" AND dpf.die_partition="9" AND db.fab="C2WF" AND db.lot_partition="Q926" AND db.pass_fail="F" AND db.die_partition="9" AND dpf.ingestion_date<"201911291611" AND db.ingestion_date<"201911291611"; |
The explain plan of query01 is (use EXPLAIN Hive command to generate it in Beeline):
Explain --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Plan not optimized by CBO. Vertex dependency in root stage Map 2 <- Map 1 (BROADCAST_EDGE) Stage-0 Fetch Operator limit:-1 Stage-1 Map 2 File Output Operator [FS_3747670] compressed:false Statistics:Num rows: 10066 Data size: 541406213 Basic stats: COMPLETE Column stats: NONE table:{"input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"} Select Operator [SEL_3747669] outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11"] Statistics:Num rows: 10066 Data size: 541406213 Basic stats: COMPLETE Column stats: NONE Map Join Operator [MAPJOIN_3747675] | condition map:[{"":"Inner Join 0 to 1"}] | HybridGraceHashJoin:true | keys:{"Map 2":"'C2WF' (type: string), 'Q926' (type: string), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)","Map 1":"'C2WF' (type: string), 'Q926' (type: string), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)"} | outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"] | Statistics:Num rows: 10066 Data size: 541406213 Basic stats: COMPLETE Column stats: NONE |<-Map 1 [BROADCAST_EDGE] | Reduce Output Operator [RS_3747666] | key expressions:'C2WF' (type: string), 'Q926' (type: string), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int) | Map-reduce partition columns:'C2WF' (type: string), 'Q926' (type: string), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int) | sort order:++++++++++ | Statistics:Num rows: 9151 Data size: 492187456 Basic stats: COMPLETE Column stats: NONE | value expressions:parameters (type: array<string>), ingestion_date (type: string) | Filter Operator [FIL_3747673] | predicate:((ingestion_date < '201911291611') and start_t is not null and finish_t is not null and lot_id is not null and wafer_id is not null and flow_id is not null and die_x is not null and die_y is not null and part_seq is not null) (type: boolean) | Statistics:Num rows: 9151 Data size: 492187456 Basic stats: COMPLETE Column stats: NONE | TableScan [TS_3747655] | alias:dpf | Statistics:Num rows: 7027779 Data size: 377989801184 Basic stats: COMPLETE Column stats: NONE |<-Filter Operator [FIL_3747674] predicate:((pass_fail = 'F') and (ingestion_date < '201911291611') and start_t is not null and finish_t is not null and lot_id is not null and wafer_id is not null and flow_id is not null and die_x is not null and die_y is not null and part_seq is not null) (type: boolean) Statistics:Num rows: 4584 Data size: 5321735 Basic stats: COMPLETE Column stats: NONE TableScan [TS_3747656] alias:db Statistics:Num rows: 7040091 Data size: 8173102308 Basic stats: COMPLETE Column stats: NONE 43 rows selected. |
Notice the “Column stats: NONE” written everywhere in the explain plan…
Or graphically using Hive View in Ambari. To do so copy/paste the query without the EXPLAIN keyword and push the Visual Explain button. It will generate the query without executing it:
So giving:
For qyery02 the explain plan is a bit longer but we know by experience that it has nothing to see with execution time:
Explain --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Plan not optimized by CBO. Vertex dependency in root stage Map 2 <- Map 1 (BROADCAST_EDGE) Stage-0 Fetch Operator limit:-1 Stage-1 Map 2 File Output Operator [FS_3747687] compressed:false Statistics:Num rows: 20969516 Data size: 3690634816 Basic stats: COMPLETE Column stats: PARTIAL table:{"input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"} Select Operator [SEL_3747686] outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11"] Statistics:Num rows: 20969516 Data size: 3690634816 Basic stats: COMPLETE Column stats: PARTIAL Map Join Operator [MAPJOIN_3747710] | condition map:[{"":"Inner Join 0 to 1"}] | HybridGraceHashJoin:true | keys:{"Map 2":"'C2WF' (type: string), 'Q926' (type: string), 9 (type: int), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)","Map 1":"'C2WF' (type: string), 'Q926' (type: string), 9 (type: int), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)"} | outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"] | Statistics:Num rows: 20969516 Data size: 53094814512 Basic stats: COMPLETE Column stats: PARTIAL |<-Map 1 [BROADCAST_EDGE] | Reduce Output Operator [RS_3747681] | key expressions:'C2WF' (type: string), 'Q926' (type: string), 9 (type: int), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int) | Map-reduce partition columns:'C2WF' (type: string), 'Q926' (type: string), 9 (type: int), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int) | sort order:+++++++++++ | Statistics:Num rows: 9151 Data size: 1647180 Basic stats: COMPLETE Column stats: PARTIAL | value expressions:parameters (type: array<string>), ingestion_date (type: string) | Filter Operator [FIL_3747690] | predicate:(start_t is not null and finish_t is not null and lot_id is not null and wafer_id is not null and flow_id is not null and die_x is not null and die_y is not null and part_seq is not null and (ingestion_date < '201911291611')) (type: boolean) | Statistics:Num rows: 9151 Data size: 1647180 Basic stats: COMPLETE Column stats: PARTIAL | TableScan [TS_3747678] | alias:dpf | Statistics:Num rows: 7027779 Data size: 377989801184 Basic stats: COMPLETE Column stats: PARTIAL | Dynamic Partitioning Event Operator [EVENT_3747703] | Statistics:Num rows: 1477 Data size: 265860 Basic stats: COMPLETE Column stats: PARTIAL | Group By Operator [GBY_3747702] | keys:_col0 (type: string) | outputColumnNames:["_col0"] | Statistics:Num rows: 1477 Data size: 265860 Basic stats: COMPLETE Column stats: PARTIAL | Select Operator [SEL_3747701] | outputColumnNames:["_col0"] | Statistics:Num rows: 9151 Data size: 1647180 Basic stats: COMPLETE Column stats: PARTIAL | Please refer to the previous Filter Operator [FIL_3747690] | Dynamic Partitioning Event Operator [EVENT_3747706] | Statistics:Num rows: 1477 Data size: 265860 Basic stats: COMPLETE Column stats: PARTIAL | Group By Operator [GBY_3747705] | keys:_col0 (type: string) | outputColumnNames:["_col0"] | Statistics:Num rows: 1477 Data size: 265860 Basic stats: COMPLETE Column stats: PARTIAL | Select Operator [SEL_3747704] | outputColumnNames:["_col0"] | Statistics:Num rows: 9151 Data size: 1647180 Basic stats: COMPLETE Column stats: PARTIAL | Please refer to the previous Filter Operator [FIL_3747690] | Dynamic Partitioning Event Operator [EVENT_3747709] | Statistics:Num rows: 1477 Data size: 265860 Basic stats: COMPLETE Column stats: PARTIAL | Group By Operator [GBY_3747708] | keys:_col0 (type: int) | outputColumnNames:["_col0"] | Statistics:Num rows: 1477 Data size: 265860 Basic stats: COMPLETE Column stats: PARTIAL | Select Operator [SEL_3747707] | outputColumnNames:["_col0"] | Statistics:Num rows: 9151 Data size: 1647180 Basic stats: COMPLETE Column stats: PARTIAL | Please refer to the previous Filter Operator [FIL_3747690] |<-Filter Operator [FIL_3747691] predicate:(start_t is not null and finish_t is not null and lot_id is not null and wafer_id is not null and flow_id is not null and die_x is not null and die_y is not null and part_seq is not null and (pass_fail = 'F') and (ingestion_date < '201911291611')) (type: boolean) Statistics:Num rows: 4583 Data size: 824940 Basic stats: COMPLETE Column stats: PARTIAL TableScan [TS_3747679] alias:db Statistics:Num rows: 7040091 Data size: 8173102308 Basic stats: COMPLETE Column stats: PARTIAL 73 rows selected. |
Or graphically:
Even if we would expect the Hive optimizer to be clever and apply predicate pushdown, explain plans are different and the one with the WHERE clause look less efficient on the paper.
Overall it is NOT AT ALL a best practice to put a separate WHERE clause in a JOIN operation because the filter will be applied after the join operation and as such increasing the volume of rows to be joined ! And it will even create wrong result in the particular case of an OUTER JOIN as clearly explained in the official documentation:
I have tried to execute the two queries replacing the column list by a COUNT(*) to avoid network penalty and to be honest I have not seen any big difference. Of course the execution time is linked to the usage of the YARN queue and as the cluster is already widely used I was not alone on it:
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> select count(*) 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> from prod_ews_refined.tbl_die_param_flow dpf join prod_ews_refined.tbl_die_bin db on 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> dpf.fab="C2WF" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition="Q926" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition="9" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.pass_fail="F" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.fab="C2WF" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.lot_partition="Q926" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.die_partition="9" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.fab=db.fab 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition=db.lot_partition 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.start_t=db.start_t 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.finish_t= db.finish_t 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_id=db.lot_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.wafer_id=db.wafer_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.flow_id=db.flow_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_x=db.die_x 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_y=db.die_y 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.part_seq=db.part_seq 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.ingestion_date<"201911291611" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.ingestion_date<"201911291611"; INFO : Tez session hasn't been created yet. Opening session INFO : Dag name: select count(*) from ..._date<"201911291611"(Stage-1) INFO : Setting tez.task.scale.memory.reserve-fraction to 0.30000001192092896 INFO : Status: Running (Executing on YARN cluster with App id application_1574697549080_17084) -------------------------------------------------------------------------------- VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 .......... SUCCEEDED 6 6 0 0 0 0 Map 2 .......... SUCCEEDED 6 6 0 0 0 0 Reducer 3 ...... SUCCEEDED 1 1 0 0 0 0 -------------------------------------------------------------------------------- VERTICES: 03/03 [==========================>>] 100% ELAPSED TIME: 726.66 s -------------------------------------------------------------------------------- +---------+--+ | _c0 | +---------+--+ | 322905 | +---------+--+ 1 row selected (735.944 seconds) 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> select count(*) 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> from prod_ews_refined.tbl_die_param_flow dpf join prod_ews_refined.tbl_die_bin db 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> on dpf.fab=db.fab 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition=db.lot_partition 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition = db.die_partition 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.start_t=db.start_t 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.finish_t= db.finish_t 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_id=db.lot_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.wafer_id=db.wafer_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.flow_id=db.flow_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_x=db.die_x 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_y=db.die_y 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.part_seq=db.part_seq 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> where dpf.fab="C2WF" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition="Q926" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition="9" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.fab="C2WF" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.lot_partition="Q926" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.pass_fail="F" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.die_partition="9" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.ingestion_date<"201911291611" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.ingestion_date<"201911291611"; INFO : Session is already open INFO : Dag name: select count(*) from ..._date<"201911291611"(Stage-1) INFO : Setting tez.task.scale.memory.reserve-fraction to 0.30000001192092896 INFO : Status: Running (Executing on YARN cluster with App id application_1574697549080_17084) -------------------------------------------------------------------------------- VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 .......... SUCCEEDED 6 6 0 0 0 0 Map 2 .......... SUCCEEDED 6 6 0 0 0 0 Reducer 3 ...... SUCCEEDED 1 1 0 0 0 0 -------------------------------------------------------------------------------- VERTICES: 03/03 [==========================>>] 100% ELAPSED TIME: 354.95 s -------------------------------------------------------------------------------- +---------+--+ | _c0 | +---------+--+ | 322905 | +---------+--+ 1 row selected (355.482 seconds) 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> select count(*) 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> from prod_ews_refined.tbl_die_param_flow dpf join prod_ews_refined.tbl_die_bin db on 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> dpf.fab="C2WF" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition="Q926" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition="9" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.pass_fail="F" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.fab="C2WF" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.lot_partition="Q926" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.die_partition="9" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.fab=db.fab 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition=db.lot_partition 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.start_t=db.start_t 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.finish_t= db.finish_t 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_id=db.lot_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.wafer_id=db.wafer_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.flow_id=db.flow_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_x=db.die_x 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_y=db.die_y 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.part_seq=db.part_seq 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.ingestion_date<"201911291611" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.ingestion_date<"201911291611"; INFO : Session is already open INFO : Dag name: select count(*) from ..._date<"201911291611"(Stage-1) INFO : Setting tez.task.scale.memory.reserve-fraction to 0.30000001192092896 INFO : Tez session was closed. Reopening... INFO : Session re-established. INFO : Status: Running (Executing on YARN cluster with App id application_1574697549080_17169) -------------------------------------------------------------------------------- VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 .......... SUCCEEDED 6 6 0 0 0 0 Map 2 .......... SUCCEEDED 6 6 0 0 0 0 Reducer 3 ...... SUCCEEDED 1 1 0 0 0 0 -------------------------------------------------------------------------------- VERTICES: 03/03 [==========================>>] 100% ELAPSED TIME: 1108.09 s -------------------------------------------------------------------------------- +---------+--+ | _c0 | +---------+--+ | 322905 | +---------+--+ 1 row selected (1135.466 seconds) 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> select count(*) 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> from prod_ews_refined.tbl_die_param_flow dpf join prod_ews_refined.tbl_die_bin db 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> on dpf.fab=db.fab 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition=db.lot_partition 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition = db.die_partition 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.start_t=db.start_t 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.finish_t= db.finish_t 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_id=db.lot_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.wafer_id=db.wafer_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.flow_id=db.flow_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_x=db.die_x 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_y=db.die_y 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.part_seq=db.part_seq 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> where dpf.fab="C2WF" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition="Q926" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition="9" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.fab="C2WF" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.lot_partition="Q926" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.pass_fail="F" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.die_partition="9" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.ingestion_date<"201911291611" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.ingestion_date<"201911291611"; INFO : Session is already open INFO : Dag name: select count(*) from ..._date<"201911291611"(Stage-1) INFO : Setting tez.task.scale.memory.reserve-fraction to 0.30000001192092896 INFO : Status: Running (Executing on YARN cluster with App id application_1574697549080_17169) -------------------------------------------------------------------------------- VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 .......... SUCCEEDED 6 6 0 0 0 0 Map 2 .......... SUCCEEDED 6 6 0 0 1 0 Reducer 3 ...... SUCCEEDED 1 1 0 0 0 0 -------------------------------------------------------------------------------- VERTICES: 03/03 [==========================>>] 100% ELAPSED TIME: 1273.38 s -------------------------------------------------------------------------------- +---------+--+ | _c0 | +---------+--+ | 322905 | +---------+--+ 1 row selected (1273.899 seconds) |
Problem is that for other partition of source tables the query was not running at all and failed for the classical Out Of Memory (OOM) error. Leaving us with empty partitions in final tables…
Problem has gone with good Hive statistics
We have noticed the Column stats: PARTIAL and Column stats: NONE in explain plans so I decided to check columns statistics with command like:
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> describe formatted prod_ews_refined.tbl_die_param_flow ingestion_date partition(fab='C2WF',lot_partition='Q926',die_partition='9'); +-------------------------+-----------------------+-----------------------+-------+------------+-----------------+--------------+--------------+------------+-------------+----------+--+ | col_name | data_type | min | max | num_nulls | distinct_count | avg_col_len | max_col_len | num_trues | num_falses | comment | +-------------------------+-----------------------+-----------------------+-------+------------+-----------------+--------------+--------------+------------+-------------+----------+--+ | # col_name | data_type | comment | | NULL | NULL | NULL | NULL | NULL | NULL | NULL | | | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | | ingestion_date | string | from deserializer | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | +-------------------------+-----------------------+-----------------------+-------+------------+-----------------+--------------+--------------+------------+-------------+----------+--+ |
As you can see we had no statistics on columns due to our obvious lack of analyze command after ingestion of new figures…
I gathered them with:
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> analyze table tbl_die_bin partition(fab="C2WF",lot_partition="Q926",die_partition=9) compute statistics for columns; INFO : Tez session hasn't been created yet. Opening session INFO : Dag name: analyze table tbl_die_bin partitio...columns(Stage-0) INFO : Status: Running (Executing on YARN cluster with App id application_1574697549080_20216) -------------------------------------------------------------------------------- VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 .......... SUCCEEDED 9 9 0 0 0 0 Reducer 2 ...... SUCCEEDED 31 31 0 0 0 0 -------------------------------------------------------------------------------- VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 64.75 s -------------------------------------------------------------------------------- No rows affected (73.7 seconds) 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> analyze table tbl_die_param_flow partition(fab="C2WF",lot_partition="Q926",die_partition=9) compute statistics for columns start_t,finish_t,lot_id,wafer_id,flow_id,die_x,die_y,part_seq,ingestion_date; INFO : Tez session hasn't been created yet. Opening session -------------------------------------------------------------------------------- VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- -------------------------------------------------------------------------------- VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 .......... SUCCEEDED 39 39 0 0 0 0 Reducer 2 ...... SUCCEEDED 253 253 0 0 0 0 -------------------------------------------------------------------------------- VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 198.48 s -------------------------------------------------------------------------------- No rows affected (369.772 seconds) |
There is the special case of array< string> datatype that you must remove explicitly from column list to avoid this:
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> analyze table tbl_die_param_flow partition(fab="C2WF",lot_partition="Q926",die_partition=9) compute statistics for columns; Error: Error while compiling statement: FAILED: UDFArgumentTypeException Only primitive type arguments are accepted but array< string> is passed. (state=42000,code=40000) |
With now accurate statistics:
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> describe formatted prod_ews_refined.tbl_die_param_flow ingestion_date partition(fab="C2WF",lot_partition="Q926",die_partition=9); +-------------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+--+ | col_name | data_type | min | max | num_nulls | distinct_count | avg_col_len | max_col_len | num_trues | num_falses | comment | +-------------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+--+ | # col_name | data_type | min | max | num_nulls | distinct_count | avg_col_len | max_col_len | num_trues | num_falses | comment | | | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | | ingestion_date | string | | | 0 | 510 | 12.0 | 12 | | | from deserializer | +-------------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+--+ 3 rows selected (0.487 seconds) |
The explain plan for query01 and qyery02 is exactly the same:
Explain --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Plan not optimized by CBO. Vertex dependency in root stage Map 1 <- Map 2 (BROADCAST_EDGE) Stage-0 Fetch Operator limit:-1 Stage-1 Map 1 File Output Operator [FS_3981081] compressed:false Statistics:Num rows: 100157 Data size: 76720262 Basic stats: COMPLETE Column stats: PARTIAL table:{"input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"} Select Operator [SEL_3981080] outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11"] Statistics:Num rows: 100157 Data size: 76720262 Basic stats: COMPLETE Column stats: PARTIAL Map Join Operator [MAPJOIN_3981086] | condition map:[{"":"Inner Join 0 to 1"}] | HybridGraceHashJoin:true | keys:{"Map 2":"'C2WF' (type: string), 'Q926' (type: string), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)","Map 1":"'C2WF' (type: string), 'Q926' (type: string), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)"} | outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"] | Statistics:Num rows: 100157 Data size: 251394070 Basic stats: COMPLETE Column stats: PARTIAL |<-Map 2 [BROADCAST_EDGE] vectorized | Reduce Output Operator [RS_3981089] | key expressions:'C2WF' (type: string), 'Q926' (type: string), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int) | Map-reduce partition columns:'C2WF' (type: string), 'Q926' (type: string), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int) | sort order:++++++++++ | Statistics:Num rows: 1184068 Data size: 799245900 Basic stats: COMPLETE Column stats: COMPLETE | Filter Operator [FIL_3981088] | predicate:((pass_fail = 'F') and (ingestion_date < '201911291611') and start_t is not null and finish_t is not null and lot_id is not null and wafer_id is not null and flow_id is not null and die_x is not null and die_y is not null and part_seq is not null) (type: boolean) | Statistics:Num rows: 1184068 Data size: 799245900 Basic stats: COMPLETE Column stats: COMPLETE | TableScan [TS_3981067] | alias:db | Statistics:Num rows: 7104410 Data size: 8247850962 Basic stats: COMPLETE Column stats: COMPLETE |<-Filter Operator [FIL_3981084] predicate:((ingestion_date < '201911291611') and start_t is not null and finish_t is not null and lot_id is not null and wafer_id is not null and flow_id is not null and die_x is not null and die_y is not null and part_seq is not null) (type: boolean) Statistics:Num rows: 2364032 Data size: 1394778880 Basic stats: COMPLETE Column stats: PARTIAL TableScan [TS_3981066] alias:dpf Statistics:Num rows: 7092098 Data size: 379282949029 Basic stats: COMPLETE Column stats: PARTIAL 42 rows selected. |
Or graphically:
And execution time is greatly reduced (around a factor 10):
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> select count(*) 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> from prod_ews_refined.tbl_die_param_flow dpf join prod_ews_refined.tbl_die_bin db 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> on dpf.fab="C2WF" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition="Q926" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition="9" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.pass_fail="F" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.fab="C2WF" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.lot_partition="Q926" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.die_partition="9" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.fab=db.fab 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition=db.lot_partition 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.start_t=db.start_t 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.finish_t= db.finish_t 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_id=db.lot_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.wafer_id=db.wafer_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.flow_id=db.flow_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_x=db.die_x 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_y=db.die_y 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.part_seq=db.part_seq 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.ingestion_date<"201911291611" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.ingestion_date<"201911291611"; INFO : Session is already open INFO : Dag name: select count(*) from ..._date<"201911291611"(Stage-1) INFO : Setting tez.task.scale.memory.reserve-fraction to 0.30000001192092896 INFO : Tez session was closed. Reopening... INFO : Session re-established. INFO : Status: Running (Executing on YARN cluster with App id application_1574697549080_23102) -------------------------------------------------------------------------------- VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 .......... SUCCEEDED 38 38 0 0 0 0 Map 3 .......... SUCCEEDED 6 6 0 0 0 0 Reducer 2 ...... SUCCEEDED 1 1 0 0 0 0 -------------------------------------------------------------------------------- VERTICES: 03/03 [==========================>>] 100% ELAPSED TIME: 146.74 s -------------------------------------------------------------------------------- +---------+--+ | _c0 | +---------+--+ | 322905 | +---------+--+ 1 row selected (153.287 seconds) 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> select count(*) 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> from prod_ews_refined.tbl_die_param_flow dpf join prod_ews_refined.tbl_die_bin db 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> on dpf.fab=db.fab 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition=db.lot_partition 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition = db.die_partition 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.start_t=db.start_t 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.finish_t= db.finish_t 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_id=db.lot_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.wafer_id=db.wafer_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.flow_id=db.flow_id 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_x=db.die_x 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_y=db.die_y 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.part_seq=db.part_seq 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> where dpf.fab="C2WF" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition="Q926" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition="9" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.fab="C2WF" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.lot_partition="Q926" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.pass_fail="F" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.die_partition="9" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.ingestion_date<"201911291611" 0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.ingestion_date<"201911291611"; INFO : Session is already open INFO : Dag name: select count(*) from ..._date<"201911291611"(Stage-1) INFO : Setting tez.task.scale.memory.reserve-fraction to 0.30000001192092896 INFO : Status: Running (Executing on YARN cluster with App id application_1574697549080_23102) -------------------------------------------------------------------------------- VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 .......... SUCCEEDED 38 38 0 0 0 0 Map 3 .......... SUCCEEDED 6 6 0 0 0 0 Reducer 2 ...... SUCCEEDED 1 1 0 0 0 0 -------------------------------------------------------------------------------- VERTICES: 03/03 [==========================>>] 100% ELAPSED TIME: 126.12 s -------------------------------------------------------------------------------- +---------+--+ | _c0 | +---------+--+ | 322905 | +---------+--+ 1 row selected (126.614 seconds) |
References
- WHAT IS PREDICATE PUSHDOWN?
- SQL Joins Using WHERE or ON
- LanguageManual Joins
- Apache Hive EXPLAIN Command and Example
Ankhi says:
Is statistics collection important to improve the performance of accessing the table?