Building Delta Lakes with Apache Spark

Why Data Lakes are required?

The importance of an Optimal Storage Solution:
* Scalability and performance * Transaction support * Support for diverse data formats * Support for diverse workloads * Openness

Different storage solutions available:
* Databases * Data lakes * Lake houses

Databases

  • Designed to store structured data as tables
  • Adherence to strict schema
  • Strong transactional ACID guarantees
  • SQL workloads
    • OLTP
    • OLAP (Supported by Spark)

Limitations of Databases

Trends in the Industry
* Growth in data sizes * Growth in diversity of analytics

Limitations of the Databases
* Databases are extremely expensive to scale out * Databases do not support non-sql analytics well

These development led to the Growth of Data Lakes

Data Lakes

Data lakes decouple storage and the compute. Data lakes are built by choosing the following:
* Storage system - HDFS or cloud object store * File format - Parquet, ORC, JSON * Processing engine - Spark, Presto, Flink Data lakes provide a cheaper solution than databases.

Why use Apache Spark for Building Data Lakes

  • Support for diverse workloads
    • Batch processing
    • Stream processing
    • ETL
    • SQL workloads
    • ML
  • Support for diverse file formats
  • Support for diverse file systems
    • Read and write to different storage systems

Limitations of Data Lakes

  • Fail to provide ACID guarantees
    • No mechanism to roll back files already written
    • No isolation when concurrent workloads modify the data
    • Inconsistent view of data due to failed writes
    • Writing out files in a format and schema inconsistent with existing data

Lakehouses

  • It combines the best elements of databases and data lakes
  • ACID guarantees for transaction support
  • Schema enforcement
  • Support for diverse datatypes
  • Support for diverse workloads
  • Support for upserts and deletes
  • Data governance

Lakehouse systems available:
* Apache Hudi * Apache Iceberg * Delta Lake * strong integration with Apace Spark * Developed by the creators of Spark

They do the following:
* Store large volumes of data in structured file formats * Scalable filesystems * Maintain a transaction log to record timeline of atomic changes to the data * Use log to define versions of the table data * Isolation guarantees between readers and writers * Support reading and writing with Apache Spark

Delta Lake

It supports:
* Open data storage format * Provides transactional guarantees
* Schema enforcement
* Schema evolution
* Support structured streaming
* Update, delete and merge operations in Java, Scala and Python
* Time travel
* Rollback to previous versions
* Isolation between multiple concurrent writers

Building a Delta Lake

Install Delta in python environment

pip install delta-spark

and configure SparkSession with configure_spark_with_delta_pip()

from delta import *
from pyspark.sql import SparkSession
builder = SparkSession.builder.appName("Myapp") \
    .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
23/05/07 09:59:52 WARN Utils: Your hostname, thulasiram resolves to a loopback address: 127.0.1.1; using 192.168.0.105 instead (on interface wlp0s20f3)
23/05/07 09:59:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/thulasiram/mambaforge/envs/spark_learn/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/thulasiram/.ivy2/cache
The jars for the packages stored in: /home/thulasiram/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-38aecb9f-d28c-45a4-85dc-63c8a58a46ee;1.0
    confs: [default]
    found io.delta#delta-core_2.12;2.3.0 in central
    found io.delta#delta-storage;2.3.0 in central
    found org.antlr#antlr4-runtime;4.8 in central
downloading https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.3.0/delta-core_2.12-2.3.0.jar ...
    [SUCCESSFUL ] io.delta#delta-core_2.12;2.3.0!delta-core_2.12.jar (3864ms)
downloading https://repo1.maven.org/maven2/io/delta/delta-storage/2.3.0/delta-storage-2.3.0.jar ...
    [SUCCESSFUL ] io.delta#delta-storage;2.3.0!delta-storage.jar (622ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr4-runtime/4.8/antlr4-runtime-4.8.jar ...
    [SUCCESSFUL ] org.antlr#antlr4-runtime;4.8!antlr4-runtime.jar (741ms)
:: resolution report :: resolve 7161ms :: artifacts dl 5233ms
    :: modules in use:
    io.delta#delta-core_2.12;2.3.0 from central in [default]
    io.delta#delta-storage;2.3.0 from central in [default]
    org.antlr#antlr4-runtime;4.8 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   3   |   3   |   3   |   0   ||   3   |   3   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-38aecb9f-d28c-45a4-85dc-63c8a58a46ee
    confs: [default]
    3 artifacts copied, 0 already retrieved (4246kB/6ms)
23/05/07 10:00:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Loading Data into a Delta Lake Table

sourcePath = '/home/thulasiram/personal/data_engineering/data/loan-risks.snappy.parquet'
# Configure Delta Lake path
deltaPath = '/home/thulasiram/personal/data_engineering/data/loans_delta'
# Create the Delta lake table with the loans data downloaded
(spark.read.format('parquet')
 .load(sourcePath)
 .write.format("delta")
 .save(deltaPath))
                                                                                
23/05/07 10:28:25 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                
# Create a view on the data
(spark.read.format("delta")
 .load(deltaPath)
 .createOrReplaceTempView("loans_delta")
)
# Explore the data
spark.sql("select count(*) from loans_delta").show()
+--------+
|count(1)|
+--------+
|   14705|
+--------+
spark.sql("select * from loans_delta limit 5").show()
+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      0|       1000|   182.22|        CA|
|      1|       1000|   361.19|        WA|
|      2|       1000|   176.26|        TX|
|      3|       1000|   1000.0|        OK|
|      4|       1000|   249.98|        PA|
+-------+-----------+---------+----------+

Enforcing Schema on Write to Prevent Data Corruption

  • A common problem with managing data with Spark using formats like JSON, Parquet and ORC is accidental data corruption caused by writing incorrectly formated data format.
  • These formats define the layout of individual files and not of an entire table
  • There is no guarantees of consistency for the entire table of many parquet files
  • Delta lake format records the schema as table-level metadata
  • Delta lake table can verify if the data being written has a schema compatible with that of the table
  • If the schema is not compatible, spark will throw an error
# 'closed' column is not there in the data
# This will give error with schema mismatch
from pyspark.sql.functions import *
cols = ['loan_id','funded_amnt','paid_amnt',
        'addr_state','closed']
items = [
   (1111111, 1000, 1000.0, 'TX', True), 
   (2222222, 2000, 0.0, 'CA', False)
]

loanUpdates = (spark.createDataFrame(items, cols)
  .withColumn("funded_amnt", col("funded_amnt").cast("int")))
loanUpdates.write.format("delta").mode("append").save(deltaPath)
AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 00d13bf8-8fb3-49b5-b56a-887a2b772dde).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- loan_id: long (nullable = true)
-- funded_amnt: integer (nullable = true)
-- paid_amnt: double (nullable = true)
-- addr_state: string (nullable = true)


Data schema:
root
-- loan_id: long (nullable = true)
-- funded_amnt: integer (nullable = true)
-- paid_amnt: double (nullable = true)
-- addr_state: string (nullable = true)
-- closed: boolean (nullable = true)

         

Schema Evolution

(loanUpdates.write.format("delta").mode("append")
  .option("mergeSchema", "true")
  .save(deltaPath))

Transforming Existing Data

A common use case when managing data is fixing errors in the data. Suppose, upon reviewing the data, we realized that all of the loans assigned to addr_state = ‘OR’ should have been assigned to addr_state = ‘WA’. If the loan table were a Parquet table, then to do such an update we would need to:

  • Copy all of the rows that are not affected into a new table.
  • Copy all of the rows that are affected into a DataFrame, then perform the data modification.
  • Insert the previously noted DataFrame’s rows into the new table.
  • Remove the old table and rename the new table to the old table name.
from delta.tables import *

deltaTable = DeltaTable.forPath(spark,deltaPath)
deltaTable.update("addr_state = 'OR'", {"addr_state":"'WA'"})

Taking care of GDPR

# Deleting the data on all loans that have been paid
deltaTable = DeltaTable.forPath(spark,deltaPath)
deltaTable.delete("funded_amnt >= paid_amnt")

Upserts

  • Taking care of change data capture
  • Replicate changes to OLTP to OLAP workloads
  • To continue with our loan data example, say we have another table of new loan information, some of which are new loans and others of which are updates to existing loans. In addition, let’s say this changes table has the same schema as the loan_delta table. You can upsert these changes into the table using the DeltaTable.merge() operation, which is based on the MERGE SQL command
(deltaTable
.alias("t")
.merge(loanUpdates.alias("s"), "t.loan_id = s.loan_id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)

Auditing Data Changes with Operation History

deltaTable.history().show()
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      4|2023-05-07 11:08:...|  null|    null|    MERGE|{predicate -> (t....|null|    null|     null|          3|  Serializable|        false|{numTargetRowsCop...|        null|Apache-Spark/3.3....|
|      3|2023-05-07 11:00:...|  null|    null|   DELETE|{predicate -> ["(...|null|    null|     null|          2|  Serializable|        false|{numRemovedFiles ...|        null|Apache-Spark/3.3....|
|      2|2023-05-07 10:55:...|  null|    null|   UPDATE|{predicate -> (ad...|null|    null|     null|          1|  Serializable|        false|{numRemovedFiles ...|        null|Apache-Spark/3.3....|
|      1|2023-05-07 10:50:...|  null|    null|    WRITE|{mode -> Append, ...|null|    null|     null|          0|  Serializable|         true|{numFiles -> 3, n...|        null|Apache-Spark/3.3....|
|      0|2023-05-07 10:28:...|  null|    null|    WRITE|{mode -> ErrorIfE...|null|    null|     null|       null|  Serializable|         true|{numFiles -> 1, n...|        null|Apache-Spark/3.3....|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
# Printing key columns 
(deltaTable
 .history(3)
 .select("version","timestamp","operation")
 .show(truncate=False) 
 )
+-------+-----------------------+---------+
|version|timestamp              |operation|
+-------+-----------------------+---------+
|4      |2023-05-07 11:08:35.63 |MERGE    |
|3      |2023-05-07 11:00:01.792|DELETE   |
|2      |2023-05-07 10:55:50.402|UPDATE   |
+-------+-----------------------+---------+

Time Travel

(spark.read
 .format("delta")
 .option("timesatmpAsOf","2023-05-07")
 .load(deltaPath)).show()
+-------+-----------+---------+----------+------+
|loan_id|funded_amnt|paid_amnt|addr_state|closed|
+-------+-----------+---------+----------+------+
|1111111|       1000|   1000.0|        TX|  true|
|2222222|       2000|      0.0|        CA| false|
+-------+-----------+---------+----------+------+
(spark.read.format("delta")
 .option("versionAsOf","4")
 .load(deltaPath)
).show()
+-------+-----------+---------+----------+------+
|loan_id|funded_amnt|paid_amnt|addr_state|closed|
+-------+-----------+---------+----------+------+
|1111111|       1000|   1000.0|        TX|  true|
|2222222|       2000|      0.0|        CA| false|
+-------+-----------+---------+----------+------+

Summary

  • Databases fail to fulfill modern use cases
  • Databases are designed to overcome database limitations
  • Datalakes lack ACID guarantees provided by Databases
  • Lakehouses are next generation data solutions - Provide best features of databases and data lakes