Change Data Feed to track changes at row level

Preamble

Change Data Feed (CDF) is a feature to track row level change of a delta table. This is the perfect feature to build incremental processes between bronze and silver or gold table in the medallion architecture. The bronze table that is raw unfiltered figures is more a pillar managed by Auto Loader or Spark Structured Streaming.

Implementing the feature has been quite trivial but then I had issue to understand how to use it in a clever way to build other pillars from this information…

CDF is available with Databricks Runtime (DBR) 8.4 and above, I have decided to use Databricks Runtime 11.3 LTS on Azure for the availability of spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled parameter but should work with any runtime greater than 11.1. To be able to use DBR 11.3 LTS you might be obliged to update Databricks connect 10.4.18 and/or Python on your laptop. I have made the questionable choice to do “export DEBUG_IGNORE_VERSION_MISMATCH=1”.

The use typical uses cases are very well summarized in this graphics from Databricks YouTube video:

cdf01
cdf01

Change Data Feed testing

First things first, identify what is the current time of your Databricks cluster with below notebook command. This is required in below testing to get records of a particular time window:

%SQL
SELECT CURRENT_TIMESTAMP(),CURRENT_DATE()

On my test Databricks cluster I am 1 hour before (UTC) my current time zone (UTC+1 in winter). This is also why, for convenience, I am using config(“spark.sql.session.timeZone”, “UTC”) when creating my session to match what I see in a notebook where the spark session is already created.

I start with my test bronze table:

DROP TABLE IF EXISTS source01;
 
CREATE TABLE IF NOT EXISTS source01 (
id INT,
descr string
)
USING delta
tblproperties ("quality" = "bronze", delta.enableChangeDataFeed = TRUE);
 
INSERT INTO source01 VALUES(1,'One');
INSERT INTO source01 VALUES(2,'Two');
INSERT INTO source01 VALUES(3,'Three');

Just to be able to recover operations I have issue above command just before 4PM my local time and after 4PM, still my local time, I execute the below update quick script:

INSERT INTO source01 VALUES(4,'Four');
INSERT INTO source01 VALUES(5,'Five');
UPDATE source01 SET descr='The One' WHERE id=1;
DELETE FROM source01 WHERE id=5;

After this second bunch of commands you should be able to see _change_data directory. As written in official documentation some operations do not generate a _change_data directory because CDF can be extracted directly from _delta_log:

>>> from pyspark.dbutils import DBUtils
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("dbutils").config("spark.sql.session.timeZone", "UTC").getOrCreate()
>>> dbutils = DBUtils(spark)
>>> dbutils.fs.ls("/user/hive/warehouse/source01")
[FileInfo(path='dbfs:/user/hive/warehouse/source01/_change_data/', name='_change_data/', size=0, modificationTime=1673363153000),
FileInfo(path='dbfs:/user/hive/warehouse/source01/_delta_log/', name='_delta_log/', size=0, modificationTime=1673363155000),
FileInfo(path='dbfs:/user/hive/warehouse/source01/part-00000-1d11a2cf-144e-4fdc-9b28-9bdebc4b27a2-c000.snappy.parquet', name='part-00000-1d11a2cf-144e-4fdc-9b28-9bdebc4b27a2-c000.snappy.parquet', size=819, modificationTime=1673362371000),
FileInfo(path='dbfs:/user/hive/warehouse/source01/part-00000-25e3b64e-6559-4e33-9fa9-894973c133a4.c000.snappy.parquet', name='part-00000-25e3b64e-6559-4e33-9fa9-894973c133a4.c000.snappy.parquet', size=1049, modificationTime=1673363150000),
FileInfo(path='dbfs:/user/hive/warehouse/source01/part-00000-32f5f4de-e619-42df-a32b-77d5fd928396-c000.snappy.parquet', name='part-00000-32f5f4de-e619-42df-a32b-77d5fd928396-c000.snappy.parquet', size=812, modificationTime=1673363147000),
FileInfo(path='dbfs:/user/hive/warehouse/source01/part-00000-4c63bf09-8a99-4ebe-9701-8d857ee1ce2c-c000.snappy.parquet', name='part-00000-4c63bf09-8a99-4ebe-9701-8d857ee1ce2c-c000.snappy.parquet', size=805, modificationTime=1673362366000),
FileInfo(path='dbfs:/user/hive/warehouse/source01/part-00000-84f156e4-a695-4a05-8e27-b4840ae5bd19-c000.snappy.parquet', name='part-00000-84f156e4-a695-4a05-8e27-b4840ae5bd19-c000.snappy.parquet', size=811, modificationTime=1673363145000),
FileInfo(path='dbfs:/user/hive/warehouse/source01/part-00000-f6638646-8f68-421a-8273-e89223778850-c000.snappy.parquet', name='part-00000-f6638646-8f68-421a-8273-e89223778850-c000.snappy.parquet', size=805, modificationTime=1673362369000)]

Let’s perform few CDF queries with table_changes keyword. In order to avoid error message like (if your timestamp is after the latest one available in your table):

com.databricks.sql.transaction.tahoe.DeltaErrors$TemporallyUnstableInputException: The provided timestamp: 2023-01-10 16:00:00.0 is after the latest commit timestamp of 2023-01-10 15:05:54.0. If you wish to query this version of the table, please either provide the version with “VERSION AS OF 7” or use the exact timestamp of the last commit: “TIMESTAMP AS OF ‘2023-01-10 15:05:54′”.

They say you can use:

SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = TRUE

Or

spark.conf.set("spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled", True)

Check current value with (SQL or Python):

spark.conf.get("spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled")
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled;

Remark:
This is working only with Databricks runtime 11.1 ! Before DBR 11.1 I would say you will have plenty of boring errors to handle and CDF feature is not easily usable… For me this mandatory to be able to specify an end window and not receive any error. I imagine having process at D day selecting what occurred D-1 day from midnight to midnight and if by a bad luck nothing occurred onto the table you cannot accept to receive an error message. This part in not clearly displayed in official documentation and I have lost plenty of time with it…

To have the latest information I have decided to execute this SQL command in a notebook:

SELECT * FROM table_changes("source01", '2023-01-10 14:00:00', '2023-01-10 15:30:00') ORDER BY _commit_timestamp;

Or in Databricks CLI (table_changes cannot be used in Spark SQL):

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("TestCDF").config("spark.sql.session.timeZone", "UTC").getOrCreate()
>>> spark.conf.set("spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled", True)
>>> spark.read.format("delta").option("readChangeFeed", "true").option("startingTimestamp", "2023-01-10 14:00:00")\
.option("endingTimestamp", "2023-01-10 15:30:00").option("endingTimestamp", "2023-01-10 15:30:00")\
.table("source01").orderBy("_commit_timestamp").show()
 
+---+-------+----------------+---------------+-------------------+
| id|  descr|    _change_type|_commit_version|  _commit_timestamp|
+---+-------+----------------+---------------+-------------------+
|  1|    One|          insert|              1|2023-01-10 14:52:47|
|  2|    Two|          insert|              2|2023-01-10 14:52:49|
|  3|  Three|          insert|              3|2023-01-10 14:52:51|
|  4|   Four|          insert|              4|2023-01-10 15:05:45|
|  5|   Five|          insert|              5|2023-01-10 15:05:48|
|  1|The One|update_postimage|              6|2023-01-10 15:05:51|
|  1|    One| update_preimage|              6|2023-01-10 15:05:51|
|  5|   Five|          delete|              7|2023-01-10 15:05:54|
+---+-------+----------------+---------------+-------------------+

_change_type column contains the operation that has been done on the row: insert, delete and update_preimage/update_postimage for an update with before and after image.

Then what to do with this information ? This part is not well explained in official documentation except maybe in the provided demo Notebook. This notebook, at least, does not take into account the delete I have on my id equal to 5. What is also not clearly written is that you must have an unique key on your source table as well as on the destination table you plan to feed with these changes to be able to issue DML on this unique key.

To explore it I have created this fake destination table:

DROP TABLE IF EXISTS destination01;
 
CREATE TABLE IF NOT EXISTS destination01 (
id INT,
descr string
)
USING delta
tblproperties ("quality" = "silver");

As suggested in the example notebook the (supposed) last version of my soruce table is:

CREATE OR REPLACE TEMPORARY VIEW source01_latest_version AS
SELECT * 
    FROM 
         (SELECT *, rank() OVER (partition BY id ORDER BY _commit_version DESC) AS rank
          FROM table_changes('source01', '2023-01-10 13:00:00')
          WHERE _change_type !='update_preimage')
    WHERE rank=1

And I merge it in my destination table with:

MERGE INTO destination01 d USING source01_latest_version s ON s.id = d.id
WHEN matched AND s._change_type='update_postimage' THEN UPDATE SET d.descr = s.descr
WHEN matched AND s._change_type='delete' THEN DELETE
WHEN NOT matched AND s._change_type='insert' THEN INSERT (id, descr) VALUES (s.id, s.descr);

We can see that this is far to be meet what we expected. If in same window I insert and then update a row then this row is not taken into account (id = 1):

>>> spark.sql("select * from destination01 order by id").show()
 
+---+-----+
| id|descr|
+---+-----+
|  2|  Two|
|  3|Three|
|  4| Four|
+---+-----+

What can we do in Python ? I have written this Python code (the truncate command is only here for testing purpose):

spark.conf.set("spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled", True)
spark.sql("truncate table destination01")
source01_latest_updates=spark.read.format("delta")\
.option("readChangeFeed", "true")\
.option("startingTimestamp", '2023-01-10 13:00:00')\
.option("endingTimestamp", "2023-01-10 15:30:00")\
.table("source01").orderBy("_commit_timestamp")
for row in source01_latest_updates.collect():
  if row['_change_type']=="insert":
    spark.sql("insert into destination01(id, descr) values("+str(row['id'])+",'"+row['descr']+"')")
  if row['_change_type']=="update_postimage":
    spark.sql("update destination01 set descr='"+row['descr']+"' where id="+str(row['id']))
  if row['_change_type']=="delete":
    spark.sql("delete from destination01 where id="+str(row['id']))

Of course we have much more flexibility in Python means for each row I am able to fine tune what I would like to do. In the update_postimage step we could even imagine checking that what is inside the destination table match what we have in update_preimage step. I finally get a not so bad destination table:

>>> spark.sql("select * from destination01 order by id").show()
+---+-------+
| id|  descr|
+---+-------+
|  1|The One|
|  2|    Two|
|  3|  Three|
|  4|   Four|
+---+-------+

Change Data Feed testing with Spark Structured Streaming

One of the example Databricks provide is to get the latest snapshot of the table by not specifying any starting or ending timestamp or version, like:

source01_cdf_stream=spark.readStream.format("delta")\
.option("readChangeFeed", "true")\
.table("source01")
display(source01_cdf_stream)

But frankly I do not see the difference with selecting directly from the source table directly… Most probably I am missing something…

One limitation I had with streaming dataframe is that you cannot sort the dataframe so it is difficult to apply the DML in right order (order by _commit_timestamp):

AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;

Same if you specify a starting time you get a real time view of the changes occurring onto your source table but they come in a random order:

source01_cdf_stream=spark.readStream.format("delta")\
.option("readChangeFeed", "true")\
.option("startingTimestamp",'2023-01-10 13:00:00')\
.table("source01")
display(source01_cdf_stream)
cdf02
cdf02

Same comment as above, as the DML commands come in no particular order I fail to see how to use them properly. I was about to tell me that this CDF and Spark Structured Streaming were not really usable together until I saw the Databricks YouTube video. That video demonstrate how to use both feature hand in hand. They use foreachBatch that we have seen in Spark Structure Streaming blog post (link). Their proposal is the most elegant one I have seen and it would be great to see it directly in official documentation:

from pyspark.sql import functions as F
from delta.tables import DeltaTable
 
def foreach_batch_function(df, batch_id):
  destination01=DeltaTable.forName(spark,"destination01")
  destination01.alias("destination").merge(source=df.alias("source"), condition = "source.id = destination.id")\
  .whenMatchedUpdate(condition = "source._change_type = 'update_postimage'",
    set =
    {
      "descr": "source.descr"
    })\
  .whenMatchedDelete(condition = "source._change_type = 'delete'",)\
  .whenNotMatchedInsert(condition = "source._change_type = 'insert'",
    values =
    {
      "id": "source.id",
      "descr": "source.descr"
    }
  ).execute()
 
source01_cdf_stream=spark.readStream.format("delta")\
.option("readChangeFeed", "true")\
.option("startingVersion","latest")\
.table("source01")\
.filter("_change_type != 'update_preimage'")\
.writeStream\
.format("delta")\
.foreachBatch(foreach_batch_function)\
.outputMode("append")\
.trigger(processingTime='60 seconds')\
.start()

The filter on _change_type for pre image values is highly important or you get an error like:

com.databricks.sql.transaction.tahoe.DeltaUnsupportedOperationException: Cannot perform Merge as multiple source rows matched and attempted to modify the same target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge, when multiple source rows match on the same target row, the result may be ambiguous as it is unclear which source row should be used to update or delete the matching target row. You can preprocess the source table to eliminate the possibility of multiple matches. Please refer to https://docs.microsoft.com/azure/databricks/delta/delta-update#upsert-into-a-table-using-merge

I start with a source01 and destination01 table both equal to:

>>> spark.sql("select * from destination01 order by id").show()
+---+-------+
| id|  descr|
+---+-------+
|  1|The One|
|  2|    Two|
|  3|  Three|
|  4|   Four|
+---+-------+

The I insert a new row in source01 that got replicated in destination01 (insert into source01 values(5, “Five”);):

>>> spark.sql("select * from destination01 order by id").show()
+---+-------+
| id|  descr|
+---+-------+
|  1|The One|
|  2|    Two|
|  3|  Three|
|  4|   Four|
|  5|   Five|
+---+-------+

And I update this last inserted row (update source01 set descr=”Really Five” where id=5;):

>>> spark.sql("select * from destination01 order by id").show()
+---+-----------+
| id|      descr|
+---+-----------+
|  1|    The One|
|  2|        Two|
|  3|      Three|
|  4|       Four|
|  5|Really Five|
+---+-----------+

I finally delete this row (delete from source01 where id=5;):

>>> spark.sql("select * from destination01 order by id").show()
+---+-------+
| id|  descr|
+---+-------+
|  1|The One|
|  2|    Two|
|  3|  Three|
|  4|   Four|
+---+-------+

The screenshot of the Spark Structure Streaming in Databricks notebook:

cdf03
cdf03

References

About Post Author

Share the knowledge!

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>