On the importance to have good Hive statistics on your tables

Preamble

Hive statistics: if like me you come from the RDBMS world I’ll not make any offense re-explaining the importance of good statistics (table, columns, ..) to help the optimizer to choose the best approach to execute your SQL statements. In this post you will see a concrete example why it is also super important in an Hadoop cluster !

We have the (bad) habit to load incrementally figures in our Hive partitioned table without gathering statistics each time. In short this is clearly not a best practice !

Then to create a more refined table we are joining two tables and the initial question was: do we need to put all predicates in the JOIN ON keyword or it can be added in a traditional WHERE clause and Hive optimizer is clever enough to do what is usually called predicate push-down.

We have Hortonworks Hadoop edition called HDP 2.6.4 so running Hive 1.2.1000.

The problematic queries

We were trying to load a third table from two sources with (query01):

SELECT dpf.fab,dpf.start_t,dpf.finish_t,dpf.lot_id,dpf.wafer_id,dpf.flow_id,dpf.die_x,dpf.die_y,dpf.part_seq,dpf.parameters,dpf.ingestion_date,dpf.lot_partition
FROM prod_ews_refined.tbl_die_param_flow dpf join prod_ews_refined.tbl_die_bin db
ON dpf.fab="C2WF"
AND dpf.lot_partition="Q926"
AND dpf.die_partition=9
AND db.pass_fail="F"
AND db.fab="C2WF"
AND db.lot_partition="Q926"
AND db.die_partition=9
AND dpf.fab=db.fab
AND dpf.lot_partition=db.lot_partition
AND dpf.start_t=db.start_t
AND dpf.finish_t= db.finish_t
AND dpf.lot_id=db.lot_id
AND dpf.wafer_id=db.wafer_id
AND dpf.flow_id=db.flow_id
AND dpf.die_x=db.die_x
AND dpf.die_y=db.die_y
AND dpf.part_seq=db.part_seq
AND dpf.ingestion_date<"201911291611"
AND db.ingestion_date<"201911291611";

And comparing with its sister, I would say written more logically, but in fact not following best practices (query02):

SELECT dpf.fab,dpf.start_t,dpf.finish_t,dpf.lot_id,dpf.wafer_id,dpf.flow_id,dpf.die_x,dpf.die_y,dpf.part_seq,dpf.parameters,dpf.ingestion_date,dpf.lot_partition
FROM prod_ews_refined.tbl_die_param_flow dpf join prod_ews_refined.tbl_die_bin db
ON dpf.fab=db.fab
AND dpf.lot_partition=db.lot_partition
AND dpf.die_partition=db.die_partition
AND dpf.start_t=db.start_t
AND dpf.finish_t= db.finish_t
AND dpf.lot_id=db.lot_id
AND dpf.wafer_id=db.wafer_id
AND dpf.flow_id=db.flow_id
AND dpf.die_x=db.die_x
AND dpf.die_y=db.die_y
AND dpf.part_seq=db.part_seq
WHERE dpf.fab="C2WF"
AND dpf.lot_partition="Q926"
AND dpf.die_partition="9"
AND db.fab="C2WF"
AND db.lot_partition="Q926"
AND db.pass_fail="F"
AND db.die_partition="9"
AND dpf.ingestion_date<"201911291611"
AND db.ingestion_date<"201911291611";

The explain plan of query01 is (use EXPLAIN Hive command to generate it in Beeline):

Explain
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Plan not optimized by CBO.
 
Vertex dependency in root stage
Map 2 <- Map 1 (BROADCAST_EDGE)
 
Stage-0
    Fetch Operator
      limit:-1
      Stage-1
          Map 2
          File Output Operator [FS_3747670]
            compressed:false
            Statistics:Num rows: 10066 Data size: 541406213 Basic stats: COMPLETE Column stats: NONE
            table:{"input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"}
            Select Operator [SEL_3747669]
                outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11"]
                Statistics:Num rows: 10066 Data size: 541406213 Basic stats: COMPLETE Column stats: NONE
                Map Join Operator [MAPJOIN_3747675]
                |  condition map:[{"":"Inner Join 0 to 1"}]
                |  HybridGraceHashJoin:true
                |  keys:{"Map 2":"'C2WF' (type: string), 'Q926' (type: string), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)","Map 1":"'C2WF' (type: string), 'Q926' (type: string), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)"}
                |  outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"]
                |  Statistics:Num rows: 10066 Data size: 541406213 Basic stats: COMPLETE Column stats: NONE
                |<-Map 1 [BROADCAST_EDGE]
                |  Reduce Output Operator [RS_3747666]
                |     key expressions:'C2WF' (type: string), 'Q926' (type: string), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)
                |     Map-reduce partition columns:'C2WF' (type: string), 'Q926' (type: string), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)
                |     sort order:++++++++++
                |     Statistics:Num rows: 9151 Data size: 492187456 Basic stats: COMPLETE Column stats: NONE
                |     value expressions:parameters (type: array<string>), ingestion_date (type: string)
                |     Filter Operator [FIL_3747673]
                |        predicate:((ingestion_date < '201911291611') and start_t is not null and finish_t is not null and lot_id is not null and wafer_id is not null and flow_id is not null and die_x is not null and die_y is not null and part_seq is not null) (type: boolean)
                |        Statistics:Num rows: 9151 Data size: 492187456 Basic stats: COMPLETE Column stats: NONE
                |        TableScan [TS_3747655]
                |           alias:dpf
                |           Statistics:Num rows: 7027779 Data size: 377989801184 Basic stats: COMPLETE Column stats: NONE
                |<-Filter Operator [FIL_3747674]
                      predicate:((pass_fail = 'F') and (ingestion_date < '201911291611') and start_t is not null and finish_t is not null and lot_id is not null and wafer_id is not null and flow_id is not null and die_x is not null and die_y is not null and part_seq is not null) (type: boolean)
                      Statistics:Num rows: 4584 Data size: 5321735 Basic stats: COMPLETE Column stats: NONE
                      TableScan [TS_3747656]
                        alias:db
                        Statistics:Num rows: 7040091 Data size: 8173102308 Basic stats: COMPLETE Column stats: NONE
 
 
43 rows selected.

Notice the “Column stats: NONE” written everywhere in the explain plan…

Or graphically using Hive View in Ambari. To do so copy/paste the query without the EXPLAIN keyword and push the Visual Explain button. It will generate the query without executing it:

hive_statistics01
hive_statistics01

So giving:

hive_statistics02
hive_statistics02

For qyery02 the explain plan is a bit longer but we know by experience that it has nothing to see with execution time:

Explain
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Plan not optimized by CBO.
 
Vertex dependency in root stage
Map 2 <- Map 1 (BROADCAST_EDGE)
 
Stage-0
    Fetch Operator
      limit:-1
      Stage-1
          Map 2
          File Output Operator [FS_3747687]
            compressed:false
            Statistics:Num rows: 20969516 Data size: 3690634816 Basic stats: COMPLETE Column stats: PARTIAL
            table:{"input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"}
            Select Operator [SEL_3747686]
                outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11"]
                Statistics:Num rows: 20969516 Data size: 3690634816 Basic stats: COMPLETE Column stats: PARTIAL
                Map Join Operator [MAPJOIN_3747710]
                |  condition map:[{"":"Inner Join 0 to 1"}]
                |  HybridGraceHashJoin:true
                |  keys:{"Map 2":"'C2WF' (type: string), 'Q926' (type: string), 9 (type: int), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)","Map 1":"'C2WF' (type: string), 'Q926' (type: string), 9 (type: int), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)"}
                |  outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"]
                |  Statistics:Num rows: 20969516 Data size: 53094814512 Basic stats: COMPLETE Column stats: PARTIAL
                |<-Map 1 [BROADCAST_EDGE]
                |  Reduce Output Operator [RS_3747681]
                |     key expressions:'C2WF' (type: string), 'Q926' (type: string), 9 (type: int), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)
                |     Map-reduce partition columns:'C2WF' (type: string), 'Q926' (type: string), 9 (type: int), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)
                |     sort order:+++++++++++
                |     Statistics:Num rows: 9151 Data size: 1647180 Basic stats: COMPLETE Column stats: PARTIAL
                |     value expressions:parameters (type: array<string>), ingestion_date (type: string)
                |     Filter Operator [FIL_3747690]
                |        predicate:(start_t is not null and finish_t is not null and lot_id is not null and wafer_id is not null and flow_id is not null and die_x is not null and die_y is not null and part_seq is not null and (ingestion_date < '201911291611')) (type: boolean)
                |        Statistics:Num rows: 9151 Data size: 1647180 Basic stats: COMPLETE Column stats: PARTIAL
                |        TableScan [TS_3747678]
                |           alias:dpf
                |           Statistics:Num rows: 7027779 Data size: 377989801184 Basic stats: COMPLETE Column stats: PARTIAL
                |  Dynamic Partitioning Event Operator [EVENT_3747703]
                |     Statistics:Num rows: 1477 Data size: 265860 Basic stats: COMPLETE Column stats: PARTIAL
                |     Group By Operator [GBY_3747702]
                |        keys:_col0 (type: string)
                |        outputColumnNames:["_col0"]
                |        Statistics:Num rows: 1477 Data size: 265860 Basic stats: COMPLETE Column stats: PARTIAL
                |        Select Operator [SEL_3747701]
                |           outputColumnNames:["_col0"]
                |           Statistics:Num rows: 9151 Data size: 1647180 Basic stats: COMPLETE Column stats: PARTIAL
                |            Please refer to the previous Filter Operator [FIL_3747690]
                |  Dynamic Partitioning Event Operator [EVENT_3747706]
                |     Statistics:Num rows: 1477 Data size: 265860 Basic stats: COMPLETE Column stats: PARTIAL
                |     Group By Operator [GBY_3747705]
                |        keys:_col0 (type: string)
                |        outputColumnNames:["_col0"]
                |        Statistics:Num rows: 1477 Data size: 265860 Basic stats: COMPLETE Column stats: PARTIAL
                |        Select Operator [SEL_3747704]
                |           outputColumnNames:["_col0"]
                |           Statistics:Num rows: 9151 Data size: 1647180 Basic stats: COMPLETE Column stats: PARTIAL
                |            Please refer to the previous Filter Operator [FIL_3747690]
                |  Dynamic Partitioning Event Operator [EVENT_3747709]
                |     Statistics:Num rows: 1477 Data size: 265860 Basic stats: COMPLETE Column stats: PARTIAL
                |     Group By Operator [GBY_3747708]
                |        keys:_col0 (type: int)
                |        outputColumnNames:["_col0"]
                |        Statistics:Num rows: 1477 Data size: 265860 Basic stats: COMPLETE Column stats: PARTIAL
                |        Select Operator [SEL_3747707]
                |           outputColumnNames:["_col0"]
                |           Statistics:Num rows: 9151 Data size: 1647180 Basic stats: COMPLETE Column stats: PARTIAL
                |            Please refer to the previous Filter Operator [FIL_3747690]
                |<-Filter Operator [FIL_3747691]
                      predicate:(start_t is not null and finish_t is not null and lot_id is not null and wafer_id is not null and flow_id is not null and die_x is not null and die_y is not null and part_seq is not null and (pass_fail = 'F') and (ingestion_date < '201911291611')) (type: boolean)
                      Statistics:Num rows: 4583 Data size: 824940 Basic stats: COMPLETE Column stats: PARTIAL
                      TableScan [TS_3747679]
                        alias:db
                        Statistics:Num rows: 7040091 Data size: 8173102308 Basic stats: COMPLETE Column stats: PARTIAL
 
 
73 rows selected.

Or graphically:

hive_statistics03
hive_statistics03

Even if we would expect the Hive optimizer to be clever and apply predicate pushdown, explain plans are different and the one with the WHERE clause look less efficient on the paper.

Overall it is NOT AT ALL a best practice to put a separate WHERE clause in a JOIN operation because the filter will be applied after the join operation and as such increasing the volume of rows to be joined ! And it will even create wrong result in the particular case of an OUTER JOIN as clearly explained in the official documentation:

hive_statistics04
hive_statistics04

I have tried to execute the two queries replacing the column list by a COUNT(*) to avoid network penalty and to be honest I have not seen any big difference. Of course the execution time is linked to the usage of the YARN queue and as the cluster is already widely used I was not alone on it:

0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> select count(*)
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> from prod_ews_refined.tbl_die_param_flow dpf join prod_ews_refined.tbl_die_bin db  on
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> dpf.fab="C2WF"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition="Q926"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition="9"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.pass_fail="F"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.fab="C2WF"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.lot_partition="Q926"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.die_partition="9"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.fab=db.fab
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition=db.lot_partition
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.start_t=db.start_t
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.finish_t= db.finish_t
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_id=db.lot_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.wafer_id=db.wafer_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.flow_id=db.flow_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_x=db.die_x
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_y=db.die_y
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.part_seq=db.part_seq
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.ingestion_date<"201911291611"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.ingestion_date<"201911291611";
INFO  : Tez session hasn't been created yet. Opening session
INFO  : Dag name: select count(*)
from ..._date<"201911291611"(Stage-1)
INFO  : Setting tez.task.scale.memory.reserve-fraction to 0.30000001192092896
INFO  : Status: Running (Executing on YARN cluster with App id application_1574697549080_17084)
 
--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED      6          6        0        0       0       0
Map 2 ..........   SUCCEEDED      6          6        0        0       0       0
Reducer 3 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 03/03  [==========================>>] 100%  ELAPSED TIME: 726.66 s
--------------------------------------------------------------------------------
+---------+--+
|   _c0   |
+---------+--+
| 322905  |
+---------+--+
1 row selected (735.944 seconds)
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> select count(*)
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> from prod_ews_refined.tbl_die_param_flow dpf join prod_ews_refined.tbl_die_bin db
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> on dpf.fab=db.fab
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition=db.lot_partition
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition = db.die_partition
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.start_t=db.start_t
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.finish_t= db.finish_t
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_id=db.lot_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.wafer_id=db.wafer_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.flow_id=db.flow_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_x=db.die_x
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_y=db.die_y
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.part_seq=db.part_seq
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> where dpf.fab="C2WF"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition="Q926"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition="9"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.fab="C2WF"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.lot_partition="Q926"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.pass_fail="F"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.die_partition="9"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.ingestion_date<"201911291611"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.ingestion_date<"201911291611";
INFO  : Session is already open
INFO  : Dag name: select count(*)
from ..._date<"201911291611"(Stage-1)
INFO  : Setting tez.task.scale.memory.reserve-fraction to 0.30000001192092896
INFO  : Status: Running (Executing on YARN cluster with App id application_1574697549080_17084)
 
--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED      6          6        0        0       0       0
Map 2 ..........   SUCCEEDED      6          6        0        0       0       0
Reducer 3 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 03/03  [==========================>>] 100%  ELAPSED TIME: 354.95 s
--------------------------------------------------------------------------------
+---------+--+
|   _c0   |
+---------+--+
| 322905  |
+---------+--+
1 row selected (355.482 seconds)
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> select count(*)
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> from prod_ews_refined.tbl_die_param_flow dpf join prod_ews_refined.tbl_die_bin db  on
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> dpf.fab="C2WF"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition="Q926"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition="9"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.pass_fail="F"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.fab="C2WF"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.lot_partition="Q926"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.die_partition="9"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.fab=db.fab
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition=db.lot_partition
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.start_t=db.start_t
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.finish_t= db.finish_t
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_id=db.lot_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.wafer_id=db.wafer_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.flow_id=db.flow_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_x=db.die_x
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_y=db.die_y
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.part_seq=db.part_seq
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.ingestion_date<"201911291611"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.ingestion_date<"201911291611";
INFO  : Session is already open
INFO  : Dag name: select count(*)
from ..._date<"201911291611"(Stage-1)
INFO  : Setting tez.task.scale.memory.reserve-fraction to 0.30000001192092896
INFO  : Tez session was closed. Reopening...
INFO  : Session re-established.
INFO  : Status: Running (Executing on YARN cluster with App id application_1574697549080_17169)
 
--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED      6          6        0        0       0       0
Map 2 ..........   SUCCEEDED      6          6        0        0       0       0
Reducer 3 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 03/03  [==========================>>] 100%  ELAPSED TIME: 1108.09 s
--------------------------------------------------------------------------------
+---------+--+
|   _c0   |
+---------+--+
| 322905  |
+---------+--+
1 row selected (1135.466 seconds)
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> select count(*)
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> from prod_ews_refined.tbl_die_param_flow dpf join prod_ews_refined.tbl_die_bin db
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> on dpf.fab=db.fab
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition=db.lot_partition
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition = db.die_partition
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.start_t=db.start_t
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.finish_t= db.finish_t
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_id=db.lot_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.wafer_id=db.wafer_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.flow_id=db.flow_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_x=db.die_x
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_y=db.die_y
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.part_seq=db.part_seq
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> where dpf.fab="C2WF"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition="Q926"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition="9"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.fab="C2WF"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.lot_partition="Q926"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.pass_fail="F"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.die_partition="9"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.ingestion_date<"201911291611"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.ingestion_date<"201911291611";
INFO  : Session is already open
INFO  : Dag name: select count(*)
from ..._date<"201911291611"(Stage-1)
INFO  : Setting tez.task.scale.memory.reserve-fraction to 0.30000001192092896
INFO  : Status: Running (Executing on YARN cluster with App id application_1574697549080_17169)
 
--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED      6          6        0        0       0       0
Map 2 ..........   SUCCEEDED      6          6        0        0       1       0
Reducer 3 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 03/03  [==========================>>] 100%  ELAPSED TIME: 1273.38 s
--------------------------------------------------------------------------------
+---------+--+
|   _c0   |
+---------+--+
| 322905  |
+---------+--+
1 row selected (1273.899 seconds)

Problem is that for other partition of source tables the query was not running at all and failed for the classical Out Of Memory (OOM) error. Leaving us with empty partitions in final tables…

Problem has gone with good Hive statistics

We have noticed the Column stats: PARTIAL and Column stats: NONE in explain plans so I decided to check columns statistics with command like:

0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> describe formatted prod_ews_refined.tbl_die_param_flow ingestion_date partition(fab='C2WF',lot_partition='Q926',die_partition='9');
+-------------------------+-----------------------+-----------------------+-------+------------+-----------------+--------------+--------------+------------+-------------+----------+--+
|        col_name         |       data_type       |          min          |  max  | num_nulls  | distinct_count  | avg_col_len  | max_col_len  | num_trues  | num_falses  | comment  |
+-------------------------+-----------------------+-----------------------+-------+------------+-----------------+--------------+--------------+------------+-------------+----------+--+
| # col_name              | data_type             | comment               |       | NULL       | NULL            | NULL         | NULL         | NULL       | NULL        | NULL     |
|                         | NULL                  | NULL                  | NULL  | NULL       | NULL            | NULL         | NULL         | NULL       | NULL        | NULL     |
| ingestion_date          | string                | from deserializer     | NULL  | NULL       | NULL            | NULL         | NULL         | NULL       | NULL        | NULL     |
+-------------------------+-----------------------+-----------------------+-------+------------+-----------------+--------------+--------------+------------+-------------+----------+--+

As you can see we had no statistics on columns due to our obvious lack of analyze command after ingestion of new figures…

I gathered them with:

0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> analyze table tbl_die_bin partition(fab="C2WF",lot_partition="Q926",die_partition=9) compute statistics for columns;
INFO  : Tez session hasn't been created yet. Opening session
INFO  : Dag name: analyze table tbl_die_bin partitio...columns(Stage-0)
INFO  : Status: Running (Executing on YARN cluster with App id application_1574697549080_20216)
 
--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED      9          9        0        0       0       0
Reducer 2 ......   SUCCEEDED     31         31        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 64.75 s
--------------------------------------------------------------------------------
No rows affected (73.7 seconds)
 
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> analyze table tbl_die_param_flow partition(fab="C2WF",lot_partition="Q926",die_partition=9) compute statistics for columns start_t,finish_t,lot_id,wafer_id,flow_id,die_x,die_y,part_seq,ingestion_date;
INFO  : Tez session hasn't been created yet. Opening session
--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     39         39        0        0       0       0
Reducer 2 ......   SUCCEEDED    253        253        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 198.48 s
--------------------------------------------------------------------------------
No rows affected (369.772 seconds)

There is the special case of array< string> datatype that you must remove explicitly from column list to avoid this:

0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> analyze table tbl_die_param_flow partition(fab="C2WF",lot_partition="Q926",die_partition=9) compute statistics for columns;
Error: Error while compiling statement: FAILED: UDFArgumentTypeException Only primitive type arguments are accepted but array< string> is passed. (state=42000,code=40000)

With now accurate statistics:

0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> describe formatted prod_ews_refined.tbl_die_param_flow ingestion_date partition(fab="C2WF",lot_partition="Q926",die_partition=9);
+-------------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+--+
|        col_name         |       data_type       |          min          |          max          |       num_nulls       |    distinct_count     |      avg_col_len      |      max_col_len      |       num_trues       |      num_falses       |        comment        |
+-------------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+--+
| # col_name              | data_type             | min                   | max                   | num_nulls             | distinct_count        | avg_col_len           | max_col_len           | num_trues             | num_falses            | comment               |
|                         | NULL                  | NULL                  | NULL                  | NULL                  | NULL                  | NULL                  | NULL                  | NULL                  | NULL                  | NULL                  |
| ingestion_date          | string                |                       |                       | 0                     | 510                   | 12.0                  | 12                    |                       |                       | from deserializer     |
+-------------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+--+
3 rows selected (0.487 seconds)

The explain plan for query01 and qyery02 is exactly the same:

Explain
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Plan not optimized by CBO.
 
Vertex dependency in root stage
Map 1 <- Map 2 (BROADCAST_EDGE)
 
Stage-0
   Fetch Operator
      limit:-1
      Stage-1
         Map 1
         File Output Operator [FS_3981081]
            compressed:false
            Statistics:Num rows: 100157 Data size: 76720262 Basic stats: COMPLETE Column stats: PARTIAL
            table:{"input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"}
            Select Operator [SEL_3981080]
               outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11"]
               Statistics:Num rows: 100157 Data size: 76720262 Basic stats: COMPLETE Column stats: PARTIAL
               Map Join Operator [MAPJOIN_3981086]
               |  condition map:[{"":"Inner Join 0 to 1"}]
               |  HybridGraceHashJoin:true
               |  keys:{"Map 2":"'C2WF' (type: string), 'Q926' (type: string), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)","Map 1":"'C2WF' (type: string), 'Q926' (type: string), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)"}
               |  outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"]
               |  Statistics:Num rows: 100157 Data size: 251394070 Basic stats: COMPLETE Column stats: PARTIAL
               |<-Map 2 [BROADCAST_EDGE] vectorized
               |  Reduce Output Operator [RS_3981089]
               |     key expressions:'C2WF' (type: string), 'Q926' (type: string), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)
               |     Map-reduce partition columns:'C2WF' (type: string), 'Q926' (type: string), start_t (type: string), finish_t (type: string), lot_id (type: string), wafer_id (type: string), flow_id (type: string), die_x (type: int), die_y (type: int), part_seq (type: int)
               |     sort order:++++++++++
               |     Statistics:Num rows: 1184068 Data size: 799245900 Basic stats: COMPLETE Column stats: COMPLETE
               |     Filter Operator [FIL_3981088]
               |        predicate:((pass_fail = 'F') and (ingestion_date < '201911291611') and start_t is not null and finish_t is not null and lot_id is not null and wafer_id is not null and flow_id is not null and die_x is not null and die_y is not null and part_seq is not null) (type: boolean)
               |        Statistics:Num rows: 1184068 Data size: 799245900 Basic stats: COMPLETE Column stats: COMPLETE
               |        TableScan [TS_3981067]
               |           alias:db
               |           Statistics:Num rows: 7104410 Data size: 8247850962 Basic stats: COMPLETE Column stats: COMPLETE
               |<-Filter Operator [FIL_3981084]
                     predicate:((ingestion_date < '201911291611') and start_t is not null and finish_t is not null and lot_id is not null and wafer_id is not null and flow_id is not null and die_x is not null and die_y is not null and part_seq is not null) (type: boolean)
                     Statistics:Num rows: 2364032 Data size: 1394778880 Basic stats: COMPLETE Column stats: PARTIAL
                     TableScan [TS_3981066]
                        alias:dpf
                        Statistics:Num rows: 7092098 Data size: 379282949029 Basic stats: COMPLETE Column stats: PARTIAL
 
 
42 rows selected.

Or graphically:

hive_statistics05
hive_statistics05

And execution time is greatly reduced (around a factor 10):

0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> select count(*)
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> from prod_ews_refined.tbl_die_param_flow dpf join prod_ews_refined.tbl_die_bin db
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> on dpf.fab="C2WF"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition="Q926"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition="9"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.pass_fail="F"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.fab="C2WF"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.lot_partition="Q926"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.die_partition="9"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.fab=db.fab
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition=db.lot_partition
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.start_t=db.start_t
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.finish_t= db.finish_t
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_id=db.lot_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.wafer_id=db.wafer_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.flow_id=db.flow_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_x=db.die_x
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_y=db.die_y
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.part_seq=db.part_seq
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.ingestion_date<"201911291611"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.ingestion_date<"201911291611";
INFO  : Session is already open
INFO  : Dag name: select count(*)
from ..._date<"201911291611"(Stage-1)
INFO  : Setting tez.task.scale.memory.reserve-fraction to 0.30000001192092896
INFO  : Tez session was closed. Reopening...
INFO  : Session re-established.
INFO  : Status: Running (Executing on YARN cluster with App id application_1574697549080_23102)
 
--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     38         38        0        0       0       0
Map 3 ..........   SUCCEEDED      6          6        0        0       0       0
Reducer 2 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 03/03  [==========================>>] 100%  ELAPSED TIME: 146.74 s
--------------------------------------------------------------------------------
+---------+--+
|   _c0   |
+---------+--+
| 322905  |
+---------+--+
1 row selected (153.287 seconds)
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> select count(*)
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> from prod_ews_refined.tbl_die_param_flow dpf join prod_ews_refined.tbl_die_bin db
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> on dpf.fab=db.fab
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition=db.lot_partition
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition = db.die_partition
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.start_t=db.start_t
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.finish_t= db.finish_t
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_id=db.lot_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.wafer_id=db.wafer_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.flow_id=db.flow_id
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_x=db.die_x
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_y=db.die_y
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.part_seq=db.part_seq
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> where dpf.fab="C2WF"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.lot_partition="Q926"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.die_partition="9"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.fab="C2WF"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.lot_partition="Q926"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.pass_fail="F"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.die_partition="9"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and dpf.ingestion_date<"201911291611"
0: jdbc:hive2://zookeeper01.domain.com:2181,zoo> and db.ingestion_date<"201911291611";
INFO  : Session is already open
INFO  : Dag name: select count(*)
from ..._date<"201911291611"(Stage-1)
INFO  : Setting tez.task.scale.memory.reserve-fraction to 0.30000001192092896
INFO  : Status: Running (Executing on YARN cluster with App id application_1574697549080_23102)
 
--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     38         38        0        0       0       0
Map 3 ..........   SUCCEEDED      6          6        0        0       0       0
Reducer 2 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 03/03  [==========================>>] 100%  ELAPSED TIME: 126.12 s
--------------------------------------------------------------------------------
+---------+--+
|   _c0   |
+---------+--+
| 322905  |
+---------+--+
1 row selected (126.614 seconds)

References

Yannick Jaquier on LinkedinYannick Jaquier on RssYannick Jaquier on Twitter
Yannick Jaquier
Find more about me on social media.
This entry was posted in Hadoop and tagged . Bookmark the permalink.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>