Auto loader to process flow of raw files hands-on

Preamble

Auto Loader is used to ingest raw files in your lakehouse. Auto Loader is based on Spark Structured Streaming. When you have started to work with Databricks ingesting/refining/duplicating data will be most part of you daily work. Of course Databricks can do much more but step zero is to get some data to then do some more clever stuff with it.

If you implemented what Databricks call the medallion architecture and its three layers/pillars called bronze, silver and gold. In the bronze layer you roughly take the raw files to build your first bronze delta tables.

The silver and gold delta tables can be created using Spark Structured Streaming that you will then refine and clean till the gold layer. Gold layer that will ultimately ends up in a SQL Server database or be the source of the real aim of your project (ML, …). We will hopefully see silver and gold layers techniques in further posts.

From official documentation Auto Loader is:

Auto Loader incrementally and efficiently processes new data files as they arrive in cloud storage without any additional setup.

I have written this blog post because I rate the official documentation not so clear on what to do and where and also sometimes a bit outdated. I have used Databricks Runtime 10.4 on Azure but should work with any runtime. The screenshots may also have changed at the time you see them as Databricks web interface is frequently changing…

Auto Loader test case

The test case I have thought is a bunch of CSV files (CSV is natively handle by Auto Loader) that will come in a fixed directory (/FileStore/yjaquier/autoloader/source01). Those file can be created on your client machine or directly in a Databricks notebook. They will be of the form (then I will increment the id and the description each time I add a new CSV file):

id,descr
1,One

As suggested in the official documentation I have create a notebook (“auto loader source01”) in Python with this code. I have used the header option to handle the header of my CSV files. CloudFiles is a Spark Structured Streaming source:

import dlt
 
@dlt.table(name="source01")
def source01():
  return (
    spark.readStream.format("cloudFiles")\
    .option("cloudFiles.format", "csv")\
    .option("header", True)\
    .load("/FileStore/yjaquier/autoloader/source01")
  )

This Python notebook can also be written in SQL using a syntax like:

CREATE OR refresh streaming live TABLE source01
comment "Raw CSV files ingested from /FileStore/yjaquier/autoloader/source01 DBFS directory"
tblproperties ("quality" = "bronze")
AS
SELECT * FROM cloud_files("/FileStore/yjaquier/autoloader/source01", "csv", map("header", "true"));

This script will create a table called source01… You cannot execute and test as is this script because you will get the expected error:

ModuleNotFoundError: No module named 'dlt'

Go to workflow and create a new pipelines in Delta Live Tables tab (in below screenshot I have created “auto loader source01” pipelines):

auto_loader01
auto_loader01

For my testing purpose I have chosen the smallest possible configuration:

auto_loader02
auto_loader02

This is where you link your pipeline to the notebook we created just before:

auto_loader03
auto_loader03

The most important configuration is the Target schema that is the target database where the table will be created:

auto_loader04
auto_loader04

Auto Loader testing

You can start once (triggerred mode) the pipeline by pressing the start button. But before do not forget to put few files in source directory (/FileStore/yjaquier/autoloader/source01).

As said you can create the file on your client machine and upload them with Databricks graphical interface or create them in a Databricks notebook using Python code like:

dbutils.fs.ls('/FileStore/yjaquier/autoloader/source01/')
# dbutils.fs.help()
dbutils.fs.put('/FileStore/yjaquier/autoloader/source01/two.csv','id,descr\r\n2,Two')
dbutils.fs.head('/FileStore/yjaquier/autoloader/source01/two.csv')

I have created three CSV files and run the pipeline:

auto_loader05
auto_loader05

Then in a notebook if you perfrom the query you should see the expected figures:

auto_loader06
auto_loader06

There is an additional _rescued_data column that is added automatically by Databricks:

When Auto Loader infers the schema, a rescued data column is automatically added to your schema as _rescued_data. You can rename the column or include it in cases where you provide a schema by setting the option rescuedDataColumn.

The rescued data column ensures that columns that don’t match with the schema are rescued instead of being dropped.

References

About Post Author

Share the knowledge!

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>