Technology & EngineeringDatabricks210 lines
Databricks Delta Live Tables (DLT)
Quick Summary18 lines
You are a DLT pipeline architect who builds production-grade medallion architecture pipelines with data quality expectations, CDC processing, streaming tables, and materialized views. You design pipelines that are declarative, testable, and observable. ## Key Points - **Medallion architecture**: Bronze (raw), Silver (cleaned), Gold (business-ready) - **Expectations at Silver layer**: Validate data quality before it reaches Gold - **Use streaming for incremental**: `readStream` instead of `read` for incremental processing - **SCD Type 2 for dimensions**: Track history with `apply_changes` and `scd_type=2` - **Materialized views for aggregations**: Self-refreshing aggregated data - **Auto Loader for file ingestion**: `cloudFiles` format handles new file discovery - **Separate dev and prod pipelines**: Development mode skips retries and uses smaller clusters - **expect_or_fail in production**: Stops the entire pipeline for one bad record; use expect_or_drop - **No expectations at all**: Bad data flows through silently to Gold tables - **Streaming without checkpointing**: Data loss on pipeline restart - **Over-complex single pipeline**: 50 tables in one pipeline; break into domain pipelines - **Imperative Logic in DLT**: Writing loops and conditional processing. DLT is declarative.
skilldb get databricks-skills/databricks-pipelinesFull skill: 210 linesPaste into your CLAUDE.md or agent config
Databricks Delta Live Tables (DLT)
You are a DLT pipeline architect who builds production-grade medallion architecture pipelines with data quality expectations, CDC processing, streaming tables, and materialized views. You design pipelines that are declarative, testable, and observable.
Core Philosophy
DLT is declarative data engineering. You define WHAT data should look like, not HOW to process it. The runtime handles scheduling, retries, scaling, and dependency resolution. This means you focus on data quality expectations and transformations, not infrastructure. If your DLT pipeline has complex orchestration logic, you are fighting the framework.
Setup
Pipeline Configuration
{
"name": "orders_pipeline",
"target": "production.gold",
"channel": "CURRENT",
"continuous": false,
"development": false,
"configuration": {
"env": "production",
"source_schema": "production.bronze"
},
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
]
}
Key Techniques
1. Medallion Architecture
import dlt
from pyspark.sql.functions import *
# Bronze: Raw ingestion
@dlt.table(
comment="Raw orders from source system",
table_properties={"quality": "bronze"}
)
def bronze_orders():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaLocation", "/mnt/schemas/orders")
.load("/mnt/data/raw/orders/")
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", input_file_name())
)
# Silver: Cleaned and validated
@dlt.table(
comment="Cleaned orders with quality checks",
table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_order_id", "order_id IS NOT NULL")
@dlt.expect_or_drop("valid_amount", "amount > 0")
@dlt.expect("valid_date", "order_date >= '2020-01-01'")
@dlt.expect_or_fail("valid_customer", "customer_id IS NOT NULL")
def silver_orders():
return (
dlt.read_stream("bronze_orders")
.withColumn("order_date", to_date("order_date"))
.withColumn("amount", col("amount").cast("decimal(10,2)"))
.withColumn("status", upper(trim(col("status"))))
.dropDuplicates(["order_id"])
)
# Gold: Business-ready aggregation
@dlt.table(
comment="Daily order metrics",
table_properties={"quality": "gold"}
)
def gold_daily_metrics():
return (
dlt.read("silver_orders")
.groupBy("order_date", "region")
.agg(
count("order_id").alias("order_count"),
countDistinct("customer_id").alias("unique_customers"),
sum("amount").alias("total_revenue"),
avg("amount").alias("avg_order_value")
)
)
2. Data Quality Expectations
# Expect: Log warning but keep record
@dlt.expect("reasonable_amount", "amount < 1000000")
# Expect or Drop: Remove invalid records
@dlt.expect_or_drop("has_email", "email IS NOT NULL AND email LIKE '%@%'")
# Expect or Fail: Stop pipeline on violation
@dlt.expect_or_fail("schema_version", "_schema_version = 2")
# Multiple expectations
@dlt.expect_all({
"valid_id": "id IS NOT NULL",
"valid_name": "name IS NOT NULL AND LENGTH(name) > 0",
"valid_email": "email RLIKE '^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$'"
})
# Expect all or drop
@dlt.expect_all_or_drop({
"positive_amount": "amount > 0",
"valid_currency": "currency IN ('USD', 'EUR', 'GBP')",
"recent_date": "created_at >= '2024-01-01'"
})
3. Change Data Capture (CDC)
# Process CDC events from source
@dlt.table(comment="CDC-processed customers")
def silver_customers():
return dlt.apply_changes(
target="silver_customers",
source="bronze_customer_cdc",
keys=["customer_id"],
sequence_by="event_timestamp",
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "event_timestamp"],
stored_as_scd_type=2 # Slowly Changing Dimension Type 2
)
4. Streaming Tables
# Streaming table from Kafka
@dlt.table(
comment="Real-time events from Kafka",
table_properties={"pipelines.autoOptimize.zOrderCols": "event_type,user_id"}
)
def streaming_events():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "latest")
.load()
.select(
col("key").cast("string").alias("event_key"),
from_json(col("value").cast("string"), event_schema).alias("data"),
col("timestamp").alias("kafka_timestamp")
)
.select("event_key", "data.*", "kafka_timestamp")
)
5. Materialized Views
-- SQL-based DLT with materialized views
CREATE OR REFRESH MATERIALIZED VIEW gold.customer_360
COMMENT 'Unified customer view'
AS SELECT
c.customer_id,
c.name,
c.email,
c.segment,
COALESCE(o.total_orders, 0) AS total_orders,
COALESCE(o.total_revenue, 0) AS total_revenue,
COALESCE(o.last_order_date, c.created_at) AS last_activity,
COALESCE(t.open_tickets, 0) AS open_tickets,
COALESCE(s.mrr, 0) AS current_mrr
FROM silver.customers c
LEFT JOIN (
SELECT customer_id, COUNT(*) AS total_orders, SUM(amount) AS total_revenue, MAX(order_date) AS last_order_date
FROM silver.orders GROUP BY customer_id
) o ON c.customer_id = o.customer_id
LEFT JOIN (
SELECT customer_id, COUNT(*) AS open_tickets FROM silver.tickets WHERE status = 'open' GROUP BY customer_id
) t ON c.customer_id = t.customer_id
LEFT JOIN silver.subscriptions s ON c.customer_id = s.customer_id AND s.status = 'active';
Best Practices
- Medallion architecture: Bronze (raw), Silver (cleaned), Gold (business-ready)
- Expectations at Silver layer: Validate data quality before it reaches Gold
- Use streaming for incremental:
readStreaminstead ofreadfor incremental processing - SCD Type 2 for dimensions: Track history with
apply_changesandscd_type=2 - Materialized views for aggregations: Self-refreshing aggregated data
- Auto Loader for file ingestion:
cloudFilesformat handles new file discovery - Separate dev and prod pipelines: Development mode skips retries and uses smaller clusters
Common Pitfalls
- expect_or_fail in production: Stops the entire pipeline for one bad record; use expect_or_drop
- No expectations at all: Bad data flows through silently to Gold tables
- Streaming without checkpointing: Data loss on pipeline restart
- Over-complex single pipeline: 50 tables in one pipeline; break into domain pipelines
Anti-Patterns
- Imperative Logic in DLT: Writing loops and conditional processing. DLT is declarative.
- Skipping Silver Layer: Raw data directly to Gold. Silver is where data quality is enforced.
- No Monitoring: Pipeline runs without alerting on data quality metric degradation.
- Monolith Pipeline: Every table in one pipeline. Domain-based pipelines are more maintainable.
Install this skill directly: skilldb add databricks-skills