Table of contents
Preamble
For their open source position we have chosen to install an Hortonworks HDP 2.6 Hadoop cluster. At the initial phase of our Hadoop project ORC storage has been chosen as the default storage engine for our very first Hive tables.
Performance of our queries is, obviously, a key factor we consider. This is why we have started to consider Live Long And Process (LLAP) and realized it was not so easy to handle in our small initial cluster. Then the merge between Hortonworks and Cloudera happened and we decided to move all our tables to Parquet storage engine with the clear objective to use Impala from Cloudera.
But at a point in time we have started to study a bit the disk space usage (again linked to our small initial cluster) and realized that Parquet tables were much bigger than their ORC counterpart. All our Hive tables are highly using partitioning for performance and to ease cleaning by simply dropping old partitions…
There are plenty of articles comparing Parquet and ORC (and others) storage engines but if you read them carefully till the end there will most probably be a disclaimer stating that the comparison is tightly linked to data nature. In other words your data model and figures is unique and you have really no other option than testing it by yourself and this blog post is here to provide few tricks to achieve this…
Our cluster is running HDP-2.6.4.0 with Ambari version 2.6.1.0.
ORC versus Parquet compression
On one partition of one table we observed:
- Parquet = 33.9 G
- ORC = 2.4 G
Digging further we saw that ORC compression can be easily configured in Ambari and we have set it to zlib:
While the default Parquet compression is (apparently) uncompressed that is obviously not really good from compression perspective.
Digging in multiple (contradictory) blog posts and official documentation and personal testing I have been able to draw below table:
Hive SQL property | Default | Values |
---|---|---|
orc.compress | ZLIB | NONE, ZLIB or SNAPPY |
parquet.compression | UNCOMPRESSED | UNCOMPRESSED, GZIP or SNAPPY |
Remark:
I have seen many blog posts suggesting to use parquet.compress for Parquet compression algorithm but in my opinion this one does not work…
To change compression algorithm when creating a table use TBLPROPERTIES keyword like:
STORED AS PARQUET TBLPROPERTIES("parquet.compression"="GZIP"); STORED AS ORC TBLPROPERTIES("orc.compress"="SNAPPY") |
So, as an example, the test table I have built for my testing is something like:
DROP TABLE DEFAULT.test purge; CREATE TABLE DEFAULT.test( column01 string, column02 string, column03 int, column04 array<string>) PARTITIONED BY (column01 string, column02 string, column03 int) STORED AS PARQUET TBLPROPERTIES("parquet.compression"="SNAPPY"); |
To get the size of your test table (replace database_name and table_name by real values) just use something like (check the value of hive.metastore.warehouse.dir for /apps/hive/warehouse):
[hdfs@server01 ~]$ hdfs dfs -du -s -h /apps/hive/warehouse/database_name/table_name |
Then I have copied my source Parquet table to this test table using the six combination of storage engine and compression algorithms and the result is:
ORC | Parquet | ||||
---|---|---|---|---|---|
orc.compress | parquet.compression | ||||
NONE | ZLIB | SNAPPY | UNCOMPRESSED | GZIP | SNAPPY |
12.9 G | 2.4 G | 3.2 G | 33.9 G | 7.3 G | 11.5 G |
Or graphically:
To do the copy I have written a shell script that is dynamically copying each partition of the source table to its destination table that has the same layout:
#!/bin/bash # # Y.JAQUIER 06/02/2019 Creation # # ----------------------------------------------------------------------------- # # This job copy one table on another one # Useful when migrating from Parquet to ORC for example # # Destination table must have been created before using this script # # ----------------------------------------------------------------------------- # ############################################################################### # Function to execute a query on hive # Param 1 : Query string ############################################################################### function execute_query { #echo "Execute query: $1" MYSTART=$(date) beeline -u "jdbc:hive2://${HIVE_CONNEXION}?tez.queue.name=${HIVE_QUEUE}" -n ${HIVE_USER} --incremental=true --silent=true --fastConnect=true -e "$1" status=$? MYEND=$(date) MYDURATION=$(expr $(date -d "$MYEND" +%s) - $(date -d "$MYSTART" +%s)) echo "Executed in $MYDURATION second(s)" #echo "MEASURE|$FAB|${INGESTION_DATE}|$1|$(date -d "$MYSTART" +%Y%m%d%H%M%S)|$(date -d "$MYEND" +%Y%m%d%H%M%S)|$MYDURATION" if [[ $status != "0" ]] then echo " !!!!!!!! Error Execution Query $1 !!!!!!!! " exit -1 fi } ############################################################################### # Function to execute a query on hive in CSV output file # Param 1 : Query string # Param 2 : output file ############################################################################### function execute_query_to_csv { #echo "Execute query to csv: $1 => $2" MYSTART=$(date) beeline -u "jdbc:hive2://${HIVE_CONNEXION}?tez.queue.name=${HIVE_QUEUE}" -n ${HIVE_USER} --outputformat=csv2 --silent=true --verbose=false \ --showHeader=false --fastConnect=true -e "$1" > $2 status=$? MYEND=$(date) MYDURATION=$(expr $(date -d "$MYEND" +%s) - $(date -d "$MYSTART" +%s)) #echo "MEASURE|$FAB|${INGESTION_DATE}|$1|$(date -d "$MYSTART" +%Y%m%d%H%M%S)|$(date -d "$MYEND" +%Y%m%d%H%M%S)|$MYDURATION" if [[ $status != "0" ]] then echo " !!!!!!!! Error Execution Query to csv : $1 => $2 !!!!!!!! " exit -1 fi } ############################################################################### # Print the help ############################################################################### function print_help { echo "syntax:" echo "$0 source_database.source_table destination_database.destination_table partition_filter_pattern_and_option (not mandatory)" echo "Source and destination table must exists" echo "Destination table data will be overwritten !!" } ############################################################################### # Main ############################################################################### HIVE_CONNEXION="..." HIVE_QUEUE=... HIVE_USER=... TABLE_SOURCE=$1 TABLE_DESTINATON=$2 PARTITION_FILTER=$3 if [[ $# < "2" ]] then print_help exit 0 fi echo "This will overwrite $TABLE_DESTINATON table by $TABLE_SOURCE table data !" echo "The destination table MUST be created first !" read -p "Do you wish to continue [Y | N] ? " answer case $answer in [Yy]* ) ;; [Nn]* ) exit 0;; * ) echo "Please answer yes or no."; exit 0;; esac # Generate partitions list execute_query_to_csv "show partitions $1;" partition_list.$$.csv # Filter partiion list base on regular expression given if [[ $PARTITION_FILTER != "" ]] then grep $PARTITION_FILTER partition_list.$$.csv > partition_list1.$$.csv mv partition_list1.$$.csv partition_list.$$.csv fi partition_number=$(cat partition_list.$$.csv | wc -l) # Generate column list (with partition columns which must be removed) execute_query_to_csv "show columns from $1;" column_list.$$.csv # First partition column while read line do first_partition_column=$(echo $line | awk -F "=" '{print $1}') break done < partition_list.$$.csv # Columns list without partition columns columns_list_without_partitions="" while read line do if [[ $line = $first_partition_column ]] then break fi columns_list_without_partitions+="$line," done < column_list.$$.csv # Remove trailing comma columns_length=${#columns_list_without_partitions} columns_list_without_partitions=${columns_list_without_partitions:0:$(($columns_length-1))} echo "The source table has $partition_number partition(s)" # Generate list of all insert partition by partition i=1 while read line do #echo $line echo "Partition ${i}:" j=1 query1="insert overwrite table $TABLE_DESTINATON partition (" query2="" query3="" IFS="/" read -r -a partition_list <<< "$line" # We fetch all partition columns for partition_column_list in "${partition_list[@]}" do IFS="=" read -r -a partition_columns <<< "$partition_column_list" # First insert is with WHERE and we must enclosed columns value with double quote if [[ $j -eq 1 ]] then query3+="where ${partition_columns[0]}=\"${partition_columns[1]}\" " else query3+="and ${partition_columns[0]}=\"${partition_columns[1]}\" " fi query2+="${partition_columns[0]}=\"${partition_columns[1]}\"," j=$((j+1)) done IFS="" i=$((i+1)) query2_length=${#query2} query2_length=$((query2_length-1)) query2=${query2:0:$query2_length} final_query=$query1$query2") select "$columns_list_without_partitions" from $TABLE_SOURCE "$query3 #Executing the query comment out the execute query to test it before running echo $final_query execute_query $final_query done < partition_list.$$.csv rm partition_list.$$.csv rm column_list.$$.csv |
So clearly for our data nature the ORC storage engine cannot be beaten when it comes to disk usage…
I have taken additional figures when we have migrated our live tables of our Spotfire data model:
[hdfs@server01 ~]$ hdfs dfs -du -s -h /apps/hive/warehouse/database.db/table01* 564.2 G /apps/hive/warehouse/database.db/table01_orc 3.6 T /apps/hive/warehouse/database.db/table01_pqt |
[hdfs@server01 ~]$ hdfs dfs -du -s -h /apps/hive/warehouse/database.db/table02* 121.3 M /apps/hive/warehouse/database.db/table02_pqt 5.6 M /apps/hive/warehouse/database.db/table02_orc |
ORC versus Parquet response time
But what about response time ? To do this I have extracted a typical query used by Spotfire and executed it on Parquet (UNCOMPRESSED) and on ORC (ZLIB) tables:
Iteration | Parquet (s) | ORC (s) |
---|---|---|
Run 1 | 6.553 | 0.077 |
Run 2 | 5.291 | 0.066 |
Run 3 | 1.915 | 0.065 |
Run 4 | 2.987 | 0.074 |
Run 5 | 1.825 | 0.070 |
Run 6 | 2.720 | 0.092 |
Run 7 | 3.989 | 0.062 |
Run 8 | 4.526 | 0.079 |
Run 9 | 3.385 | 0.082 |
Run 10 | 3.588 | 0.176 |
Average | 3.6779 | 0.0843 |
So on average over my ten runs we have a factor of 43-44 time faster for ORC…
I would explain this that I have much less data to read on disk for the ORC tables and, again, this is linked to our data nodes hardware where we have much more CPU than disk axis (ratio of one thread per physical disk is not at all followed). If you are low on CPU and have plenty of disks (which is also not a good practice for an Hadoop cluster) you might experience different results…
References
- ORCFile in HDP 2: Better Compression, Better Performance
- File Format Benchmark – Avro, JSON, ORC & Parquet
- Hive Configuration