Delta Lake Streaming Internals

Abhishek Raviprasad
3 min readJul 9, 2021

--

In this article, we are going to see how to use Delta lake as a streaming source & also as a sink along with the internals of delta streaming.

Introduction

Many organisations have started using Lakehouse architecture in their data pipelines and Delta as the default storage format.

Delta lake is ACID compliant. It provides snapshot isolation which helps concurrent read/write operations and enables efficient insert, update, deletes, and rollback capabilities. Delta Lake provides background file optimization through compaction and z-order partitioning achieving better performance improvements. It also provides time travel and versioning features.

Refer to https://docs.microsoft.com/en-us/azure/databricks/delta/ for more details on delta features.

Components of Delta Streaming

Basic components of delta streaming are,

  1. Source
  2. Sink
  3. Checkpoint directory with Query Progress Log
  4. Transaction Log
Basic Flow Diagram

Query Progress Log (QPL)

Is the JSON log generated by every microbatch. This gives the metrics for each micro batch in the job.

Metrics and JSON log output

Metrics in QPL

ID : It is the unique id for each stream and it maps to checkpoint directory. As soon as checkpoint directory gets deleted or changed the stream id also changes

BatchId : It is the id for each microbatch in the streaming job

numinputRows : This refers to number of rows ingested from source table in each microbatch

startOffset/endOffset : They hold the offset information of start and end point of each microbatch

reservoirVersion : Version of the delta table

Index : file index within each version/transaction (0 based index and position of file withing the transaction is the index)

isStartingVersion: true if stream starts from beginning of the table (first version)

inputRowsPerSecond : The rate at which the data is arriving from the source table into stream

processedRowsPerSecond : The rate at which data from source is being processed by spark

{
"id" : "3245a4dd-e3c8-4eae-9241-42885bf0e8a8",
"runId" : "1c7e68de-93a2-481d-a606-4217d45c39c1",
"name" : null,
"timestamp" : "2021-07-07T16:23:00.390Z",
"batchId" : 0,
"numInputRows" : 1051220,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 41211.38466363494,
"durationMs" : {
"addBatch" : 9180,
"getBatch" : 43,
"latestOffset" : 15240,
"queryPlanning" : 36,
"triggerExecution" : 25501,
"walCommit" : 474
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/user/hive/warehouse/iot_demo.db/turbine_gold]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "09565bc0-91fb-46bf-9d2a-134f9a21ea58",
"reservoirVersion" : 0,
"index" : 52,
"isStartingVersion" : true
},
"numInputRows" : 1051220,
"inputRowsPerSecond" : 0.01,
"processedRowsPerSecond" : 41211.38466363494,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/delta/events]",
"numOutputRows" : -1
}
}

Demo

Query used for the demo is shown below,

spark.readStream.format(“delta”).option(“maxFilesPerTrigger”,10).load(“dbfs:/mnt/abhr/deliveries/”).writeStream.format(“delta”).option(“checkpointLocation”,chkpoint).trigger(processingTime=“60 seconds”).start(“/delta/demo_ipl”)

State of Source Table

After 1st microbatch,

After 2nd microbatch,

The QPL for each microbatch get stored inside the checkpoint directory. As these details are stored inside the Checkpoint directory, incase of abrupt termination of the cluster or stream, the stream can be restarted from the latest checkpoint and continue processing only the incremental records/files from the source.

--

--

Abhishek Raviprasad
Abhishek Raviprasad

Written by Abhishek Raviprasad

Senior Solution Engineer at Infoworks.io, 4+ years of big data/ETL data warehouse experience building data pipelines

No responses yet