Delta Lake Streaming Internals
In this article, we are going to see how to use Delta lake as a streaming source & also as a sink along with the internals of delta streaming.
Introduction
Many organisations have started using Lakehouse architecture in their data pipelines and Delta as the default storage format.
Delta lake is ACID compliant. It provides snapshot isolation which helps concurrent read/write operations and enables efficient insert, update, deletes, and rollback capabilities. Delta Lake provides background file optimization through compaction and z-order partitioning achieving better performance improvements. It also provides time travel and versioning features.
Refer to https://docs.microsoft.com/en-us/azure/databricks/delta/ for more details on delta features.
Components of Delta Streaming
Basic components of delta streaming are,
- Source
- Sink
- Checkpoint directory with Query Progress Log
- Transaction Log
Query Progress Log (QPL)
Is the JSON log generated by every microbatch. This gives the metrics for each micro batch in the job.
Metrics in QPL
ID : It is the unique id for each stream and it maps to checkpoint directory. As soon as checkpoint directory gets deleted or changed the stream id also changes
BatchId : It is the id for each microbatch in the streaming job
numinputRows : This refers to number of rows ingested from source table in each microbatch
startOffset/endOffset : They hold the offset information of start and end point of each microbatch
reservoirVersion : Version of the delta table
Index : file index within each version/transaction (0 based index and position of file withing the transaction is the index)
isStartingVersion: true if stream starts from beginning of the table (first version)
inputRowsPerSecond : The rate at which the data is arriving from the source table into stream
processedRowsPerSecond : The rate at which data from source is being processed by spark
{
"id" : "3245a4dd-e3c8-4eae-9241-42885bf0e8a8",
"runId" : "1c7e68de-93a2-481d-a606-4217d45c39c1",
"name" : null,
"timestamp" : "2021-07-07T16:23:00.390Z",
"batchId" : 0,
"numInputRows" : 1051220,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 41211.38466363494,
"durationMs" : {
"addBatch" : 9180,
"getBatch" : 43,
"latestOffset" : 15240,
"queryPlanning" : 36,
"triggerExecution" : 25501,
"walCommit" : 474
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/user/hive/warehouse/iot_demo.db/turbine_gold]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "09565bc0-91fb-46bf-9d2a-134f9a21ea58",
"reservoirVersion" : 0,
"index" : 52,
"isStartingVersion" : true
},
"numInputRows" : 1051220,
"inputRowsPerSecond" : 0.01,
"processedRowsPerSecond" : 41211.38466363494,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/delta/events]",
"numOutputRows" : -1
}
}
Demo
Query used for the demo is shown below,
spark.readStream.format(“delta”).option(“maxFilesPerTrigger”,10).load(“dbfs:/mnt/abhr/deliveries/”).writeStream.format(“delta”).option(“checkpointLocation”,chkpoint).trigger(processingTime=“60 seconds”).start(“/delta/demo_ipl”)
After 1st microbatch,
After 2nd microbatch,
The QPL for each microbatch get stored inside the checkpoint directory. As these details are stored inside the Checkpoint directory, incase of abrupt termination of the cluster or stream, the stream can be restarted from the latest checkpoint and continue processing only the incremental records/files from the source.