End-to-End Tutorial: Integrating SQLMesh with DLT
This tutorial guides you through the process of setting up and using SQLMesh with DLT (Data Load Tool) to create a seamless, end-to-end data pipelines.
Introduction
DLT (Data Load Tool) is an open-source Python library that simplifies the data loading process, handling schema automation, normalization, and destination connectivity.
SQLMesh is a powerful data transformation framework that offers benefits like:
Automatic DAG generation
Column-level lineage tracking
Schema change detection (breaking vs. non-breaking changes)
Virtual data environments
Data versioning capabilities
When combined, these tools create a robust data platform that handles the entire ELT (Extract, Load, Transform) process with metadata awareness across the pipeline.
Prerequisites
Python 3.9+ installed
Basic understanding of Python and SQL
Knowledge of data warehouse concepts
pip or Poetry for package management
1. Setting Up Your Environment
First, let's set up a virtual environment and install the required packages:
# Create and activate a virtual environment
python -m venv sqlmesh-dlt-env
source sqlmesh-dlt-env/bin/activate # On Windows: sqlmesh-dlt-env\Scripts\activate
# Install the required packages
pip install dlt sqlmesh duckdb
2. Understanding Our Example DLT Pipeline
We'll be working with a sushi restaurant example pipeline that loads data into DuckDB. Let's examine the DLT pipeline structure:
import typing as t
import dlt
# Example sushi_types table
@dlt.resource(name="sushi_types", primary_key="id", write_disposition="merge")
def sushi_types() -> t.Iterator[t.Dict[str, t.Any]]:
yield from [
{"id": 0, "name": "Tobiko"},
{"id": 1, "name": "Sashimi"},
{"id": 2, "name": "Maki"},
{"id": 3, "name": "Temaki"},
]
# Example waiters table
@dlt.resource(name="waiters", primary_key="id", write_disposition="merge")
def waiters() -> t.Iterator[t.Dict[str, t.Any]]:
yield from [
{"id": 0, "name": "Toby"},
{"id": 1, "name": "Tyson"},
{"id": 2, "name": "Ryan"},
{"id": 3, "name": "George"},
{"id": 4, "name": "Chris"},
{"id": 5, "name": "Max"},
{"id": 6, "name": "Vincent"},
{"id": 7, "name": "Iaroslav"},
{"id": 8, "name": "Emma"},
{"id": 9, "name": "Maia"},
]
# Example sushi menu table with extra one and two levels of nesting tables
@dlt.resource(name="sushi_menu", primary_key="id", write_disposition="merge")
def sushi_menu() -> t.Iterator[t.Dict[str, t.Any]]:
yield from [
{
"id": 0,
"name": "Tobiko",
"fillings": ["Red Tobiko", "Black Tobiko", "Wasabi Tobiko", "Green Tobiko"],
"details": {
"preparation": "Raw",
"ingredients": ["Seaweed", "Rice", "Tobiko"],
"price": 12.99,
"spicy": False,
},
},
{
"id": 1,
"name": "Sashimi",
"fillings": [
"Tuna Sashimi",
"Salmon Sashimi",
"Yellowtail Sashimi",
"Octopus Sashimi",
"Scallop Sashimi",
],
"details": {
"preparation": "Raw",
"ingredients": ["Fish", "Soy Sauce", "Wasabi"],
"price": 19.99,
"spicy": False,
},
},
{
"id": 2,
"name": "Maki",
"fillings": ["Cucumber", "Tuna", "Salmon", "Avocado", "Tempura Shrimp"],
"details": {
"preparation": "Rolled",
"ingredients": ["Seaweed", "Rice", "Fish", "Vegetables"],
"price": 14.99,
"spicy": True,
},
},
{
"id": 3,
"name": "Temaki",
"fillings": ["Tuna Temaki", "Salmon Temaki", "Vegetable Temaki", "Ebi Temaki"],
"details": {
"preparation": "Hand Roll",
"ingredients": ["Seaweed", "Rice", "Fish", "Vegetables"],
"price": 10.99,
"spicy": True,
},
},
]
# Run the pipeline
p = dlt.pipeline(pipeline_name="sushi", destination="duckdb")
info = p.run([sushi_types(), waiters(), sushi_menu()])
In this pipeline:
sushi_types defines a simple table of sushi types
waiters defines a table of restaurant staff
sushi_menu defines a more complex table with nested structures
All tables use the merge write disposition, which allows for incremental updates
Save this file as sushi_pipeline.py in your project directory.
3. Running the DLT Pipeline
First, we need to run our DLT pipeline to generate the data and metadata that SQLMesh will use:
python sushi_pipeline.py
This command executes the pipeline defined in the Python file, loading data into DuckDB and generating metadata about the schema and loading process.
You should see output similar to:
Pipeline sushi load step completed in X.XX seconds
Load package TIMESTAMP is LOADED and contains no failed jobs
4. Initializing a SQLMesh Project with DLT Integration
Now that we have run our DLT pipeline, we can create a SQLMesh project that integrates with it:
# Initialize a new SQLMesh project using DLT template
sqlmesh init -t dlt --dlt-pipeline sushi
This command:
Creates a new SQLMesh project in the current directory
Uses the DLT template to automatically generate SQLMesh models based on your DLT pipeline
Configures SQLMesh to connect to your data warehouse (DuckDB in this case)
You should now have a SQLMesh project structure with files like:
config.yaml - Configuration for SQLMesh
models/ - Directory containing SQL models
macros/ - Directory for SQLMesh macros
5. Exploring the Generated SQLMesh Models
Let's examine the models that SQLMesh has generated based on our DLT pipeline:
Incremental Load Tracking Model
SQLMesh creates a special model to track incremental loads:
/* models/incremental__dlt_loads.sql */
MODEL (
name sushi_dataset_sqlmesh.incremental__dlt_loads,
kind INCREMENTAL_BY_TIME_RANGE,
start_date '2024-01-01',
columns (
load_id STRING,
table_name STRING,
loaded_at TIMESTAMP,
normalized_at TIMESTAMP,
first_load BOOLEAN
)
);
SELECT
_dlt_load_id AS load_id,
_dlt_root_table AS table_name,
_dlt_loaded_at AS loaded_at,
_dlt_normalized_at AS normalized_at,
_dlt_first_load AS first_load
FROM sushi._dlt_loads
WHERE CAST(_dlt_loaded_at AS TIMESTAMP) > @start_ts
AND CAST(_dlt_loaded_at AS TIMESTAMP) <= @end_ts
Resource Models
For each resource in our DLT pipeline, SQLMesh creates an incremental model:
/* models/incremental_sushi_types.sql */
MODEL (
name sushi_dataset_sqlmesh.incremental_sushi_types,
kind INCREMENTAL_BY_TIME_RANGE,
start_date '2024-01-01',
columns (
id INTEGER,
name STRING
),
depends_on (
sushi_dataset_sqlmesh.incremental__dlt_loads
)
);
WITH load_ids AS (
SELECT load_id
FROM sushi_dataset_sqlmesh.incremental__dlt_loads
WHERE CAST(loaded_at AS TIMESTAMP) > @start_ts
AND CAST(loaded_at AS TIMESTAMP) <= @end_ts
)
SELECT
id,
name
FROM sushi.sushi_types
WHERE _dlt_load_id IN (SELECT load_id FROM load_ids)
Similarly, models are generated for the waiters and sushi_menu tables.
6. Planning and Applying SQLMesh Changes
Now that we have our SQLMesh models ready, let's plan and apply the changes:
# Plan the changes
sqlmesh plan
This command analyzes the data models and shows what changes need to be applied. You'll see output similar to:
environment `prod` will be initialized
Models:
└── Added:
├── sushi_dataset_sqlmesh.incremental__dlt_loads
├── sushi_dataset_sqlmesh.incremental_sushi_types
├── sushi_dataset_sqlmesh.incremental_waiters
└── sushi_dataset_sqlmesh.incremental_sushi_menu
Models needing backfill (missing dates):
├── sushi_dataset_sqlmesh.incremental__dlt_loads: 2024-05-01 - 2024-05-11
├── sushi_dataset_sqlmesh.incremental_sushi_types: 2024-05-01 - 2024-05-11
├── sushi_dataset_sqlmesh.incremental_waiters: 2024-05-01 - 2024-05-11
└── sushi_dataset_sqlmesh.incremental_sushi_menu: 2024-05-01 - 2024-05-11
Apply - Backfill Tables [y/n]:
Type y to apply the changes and backfill the tables.
7. Creating Additional Transformation Models
Now that we have our source data loaded into SQLMesh models, we can create additional transformation models for analysis. Let's create a model that joins sushi_menu with sushi_types to create a combined view:
Create a new file models/combined_menu.sql:
MODEL (
name sushi_dataset_sqlmesh.combined_menu,
kind VIEW,
depends_on (
sushi_dataset_sqlmesh.incremental_sushi_menu,
sushi_dataset_sqlmesh.incremental_sushi_types
)
);
SELECT
m.id,
m.name,
m.fillings,
m.details,
t.name as type_name
FROM sushi_dataset_sqlmesh.incremental_sushi_menu m
JOIN sushi_dataset_sqlmesh.incremental_sushi_types t ON m.id = t.id
Let's also create an analytics model that calculates price statistics:
Create a new file models/price_analytics.sql:
MODEL (
name sushi_dataset_sqlmesh.price_analytics,
kind VIEW,
depends_on (
sushi_dataset_sqlmesh.incremental_sushi_menu
)
);
SELECT
AVG(details:price) as avg_price,
MIN(details:price) as min_price,
MAX(details:price) as max_price,
CASE WHEN details:spicy THEN 'Spicy' ELSE 'Not Spicy' END as spice_level,
COUNT(*) as item_count
FROM sushi_dataset_sqlmesh.incremental_sushi_menu
GROUP BY CASE WHEN details:spicy THEN 'Spicy' ELSE 'Not Spicy' END
8. Planning and Applying the New Models
Now let's apply our new transformation models:
sqlmesh plan
You'll see output showing the new models being added. Type y to apply the changes.
9. Updating the DLT Pipeline
Let's update our DLT pipeline to add a new sushi type and modify an existing one:
# Update sushi_pipeline.py
# In the sushi_types function, update the yield statement:
@dlt.resource(name="sushi_types", primary_key="id", write_disposition="merge")
def sushi_types() -> t.Iterator[t.Dict[str, t.Any]]:
yield from [
{"id": 0, "name": "Premium Tobiko"}, # Changed from "Tobiko"
{"id": 1, "name": "Sashimi"},
{"id": 2, "name": "Maki"},
{"id": 3, "name": "Temaki"},
{"id": 4, "name": "Uramaki"}, # New sushi type
]
Run the updated pipeline:
python sushi_pipeline.py
10. Incremental Updates with SQLMesh
Now let's update our SQLMesh models to reflect these changes:
# Refresh the SQLMesh models with the latest DLT data
sqlmesh dlt_refresh --dlt-pipeline sushi
# Plan and apply the changes
sqlmesh plan
You'll see that SQLMesh detects the changes in the data and updates only the incremental models that have changed.
11. Extending the Integration
Creating a Custom Schema Update Script
Let's create a script that automates the process of updating our SQLMesh models when the DLT pipeline changes:
# Save as update_schema.py
import os
import subprocess
import sys
def update_pipeline_and_schema(pipeline_file):
# Run the DLT pipeline
print(f"Running DLT pipeline: {pipeline_file}")
result = subprocess.run(['python', pipeline_file], capture_output=True, text=True)
if result.returncode != 0:
print(f"Error running DLT pipeline: {result.stderr}")
sys.exit(1)
print(result.stdout)
# Get pipeline name from the file
with open(pipeline_file, 'r') as f:
content = f.read()
# Simple parsing to extract pipeline name
import re
match = re.search(r'pipeline_name=[\'"](\w+)[\'"]', content)
if not match:
print("Could not find pipeline_name in the file.")
sys.exit(1)
pipeline_name = match.group(1)
# Refresh SQLMesh models
print(f"Refreshing SQLMesh models for pipeline: {pipeline_name}")
result = subprocess.run(['sqlmesh', 'dlt_refresh', '--dlt-pipeline', pipeline_name],
capture_output=True, text=True)
if result.returncode != 0:
print(f"Error refreshing SQLMesh models: {result.stderr}")
sys.exit(1)
print(result.stdout)
# Plan SQLMesh changes
print("Planning SQLMesh changes")
result = subprocess.run(['sqlmesh', 'plan', 'prod'], capture_output=False, text=True)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python update_schema.py <pipeline_file.py>")
sys.exit(1)
pipeline_file = sys.argv[1]
if not os.path.exists(pipeline_file):
print(f"Pipeline file not found: {pipeline_file}")
sys.exit(1)
update_pipeline_and_schema(pipeline_file)
Run this script to update both the pipeline and schema:
python update_schema.py sushi_pipeline.py
12. Monitoring and Troubleshooting
Viewing SQLMesh Lineage
SQLMesh provides a way to visualize the data lineage. Let's see how our models are connected:
sqlmesh dag
This will generate a graph visualization showing how data flows through your models.
Checking Execution Status
To check the status of your SQLMesh models:
sqlmesh info
13. Best Practices for DLT and SQLMesh Integration
Pipeline Organization:
Incremental Loading:
Schema Evolution:
Version Control:
Testing:
Conclusion
In this tutorial, we've explored how to:
Set up a DLT pipeline for data ingestion
Initialize a SQLMesh project with DLT integration
Create transformation models in SQLMesh
Handle schema changes and incremental updates
Automate the process with custom scripts
Monitor and troubleshoot the pipeline
By combining DLT and SQLMesh, you've created a robust data pipeline with:
Automatic schema inference and management
Incremental data loading
Column-level lineage tracking
Version-controlled transformations
Safe testing through virtual environments
This integration provides a solid foundation for building scalable, maintainable data platforms that can evolve with your business needs.
Additional Resources
Head of Education & Evangelism at Tobiko
Founder, Insights x Design