Blogs

Overcoming Lakehouse Limitations: Implementing Upserts with PySpark

, February 14, 20250 View

Lakehouses combine the best features of data lakes and data warehouses, offering scalable and cost-effective solutions for storing and processing large datasets. However, they come with a notable limitation: insert, update, and delete operations on tables are not natively supported. This poses a challenge for use cases requiring data synchronization or incremental updates. 

To overcome this limitation, I created a PySpark notebook that demonstrates how to perform an upsert operation—a combination of insert and update—on a Delta Lake table. In this blog, I will discuss the limitation in detail and walk you through the notebook code. 

The Challenge

Traditional data lakes are designed for immutable data storage, which makes operations like updates and deletes cumbersome. Without a mechanism to handle these operations, keeping datasets synchronized becomes difficult. For instance: 

  • Incremental Updates: Adding new data or modifying existing records requires rewriting the entire dataset. 
  • Real-Time Data Sync: Data pipelines need efficient mechanisms to manage changing records.
  • Data Quality: Without updates or deletes, correcting errors in data becomes a challenge. 

The Solution: Upserts with PySpark and Delta Lake

Data engineering often requires keeping datasets synchronized. This can mean updating existing records or inserting new ones—an operation known as upsert (update + insert). 

Delta Lake extends the capabilities of a Lakehouse by adding support for ACID transactions and scalable metadata handling. By leveraging Delta Lake’s merge API, we can efficiently perform upserts on large datasets.  

Below, I’ll explain the code from the notebook I created to implement this solution. 

Notebook Walkthrough: Upserting Data in Lakehouse 

Step 1: Load Source and Target Tables

First, we load the source (new data) and target (existing data) tables into PySpark DataFrames: 

new_df = spark.sql(“SELECT * FROM Upsert.mock_data2”) 
df = spark.sql(“SELECT * FROM Upsert.mock_data”) 

  • mock_data2: The source table containing new or updated records. 
  • mock_data: The target table where the upsert operation will be performed. 

Step 2: Ensure the Target Table is a Delta Table

To perform upserts, the target table must be in Delta format. If it isn’t, we convert it: 

#Define the Delta table path 

target_table_path = “abfss://<storage-account>@<adls-path>/Tables/mock_data” 
if not DeltaTable.isDeltaTable(spark, target_table_path): 
df.write.format(“delta”).mode(“overwrite”).save(target_table_path)

  • DeltaTable.isDeltaTable: Checks if the target is already a Delta table.
  • If not, the target is converted and saved in Delta format. 

Step 3: Perform the Upsert Using Merge

The upsert operation is implemented using Delta Lake’s merge API: 

from delta.tables import DeltaTable 
delta_table = DeltaTable.forPath(spark, target_table_path) 

    delta_table.alias(“target”) 
    .merge( 
        new_df.alias(“source”), 
        “target.id = source.id”  # Merge condition based on the ‘id’ column 
    ) 
    .whenMatchedUpdate(set={ 
        “name”: “source.name”,  # Update ‘name’ 
        “date_of_birth”: “source.date_of_birth”  # Update ‘date_of_birth’ 
    }) 
    .whenNotMatchedInsert(values={ 
        “id”: “source.id”,  # Insert new ‘id’ 
        “name”: “source.name”,  # Insert new ‘name’ 
        “date_of_birth”: “source.date_of_birth”  # Insert new ‘date_of_birth’ 
    }) 
    .execute() 
)

  • Merge Condition: Matches records in the target (target.id) with the source (source.id). 
  • When Matched: Updates the name and date_of_birth columns in the target for matching records. 
  • When Not Matched: Inserts new rows from the source into the target.

Step 4: Verify the Upsert Results

Finally, the updated target table is read back to confirm the changes: 

updated_df = spark.read.format(“delta”).load(target_table_path) 
display(updated_df)

This displays the merged dataset, showing both updated and newly inserted records. 

Key Benefits of Delta Lake

Using Delta Lake for upserts addresses the limitations of traditional Lakehouses by enabling: 

  • ACID Transactions: Ensures data consistency during updates and inserts. 
  • Efficient Incremental Processing: Handles changes without rewriting the entire dataset. 
  • Schema Enforcement: Maintains data integrity. 
  • Historical data Storage: Provides access to historical data versions for audits and rollbacks. 

Conclusion

The inability to perform insert, update, and delete operations in a Lakehouse can hinder its usability for dynamic data workflows. By implementing upserts with PySpark and Delta Lake, you can overcome these limitations and build robust, scalable data pipelines. 

Try adapting this approach in your projects to unlock the full potential of your Lakehouse!

Leave a comment

Your email address will not be published. Required fields are marked *