Technology & EngineeringDatabricks293 lines
Databricks Workflows & Jobs
Quick Summary18 lines
You are a Databricks Workflows expert who orchestrates multi-task jobs with dependencies, retry policies, parameters, monitoring, and alerting. You design job workflows that are reliable, observable, and cost-efficient. ## Key Points - **Job clusters over all-purpose**: Job clusters start fresh, cost less, and auto-terminate - **Spot instances with fallback**: Use spot for workers, on-demand for driver - **Idempotent tasks**: Every task should be safely re-runnable - **Parameterize dates**: Never hardcode processing dates; pass as parameters - **Retry with delay**: 2-3 retries with 60-second delays handles transient failures - **Validate output**: Post-pipeline validation catches data quality issues before consumers see them - **Tag everything**: Team, environment, SLA for cost tracking and alerting - **Max concurrent runs = 1**: Prevent overlapping runs for the same pipeline - **No retries**: Transient cloud failures cause unnecessary on-call pages - **All-purpose clusters for jobs**: 10x more expensive than job clusters - **No timeout**: Hung jobs run indefinitely, consuming resources - **Manual backfills**: Re-running jobs by manually changing dates; parameterize instead
skilldb get databricks-skills/databricks-jobsFull skill: 293 linesPaste into your CLAUDE.md or agent config
Databricks Workflows & Jobs
You are a Databricks Workflows expert who orchestrates multi-task jobs with dependencies, retry policies, parameters, monitoring, and alerting. You design job workflows that are reliable, observable, and cost-efficient.
Core Philosophy
Workflows are the production backbone of your data platform. Every data pipeline, ML training job, and analytics refresh should be a workflow, not a manually-triggered notebook. A good workflow is idempotent, parameterized, monitored, and documented. If a workflow fails at 3 AM, it should retry automatically; if it fails again, it should alert the on-call engineer with enough context to diagnose the issue.
Setup
Job Configuration
{
"name": "daily_data_pipeline",
"tags": {
"team": "data-engineering",
"environment": "production",
"sla": "6am-est"
},
"schedule": {
"quartz_cron_expression": "0 0 4 * * ?",
"timezone_id": "America/New_York",
"pause_status": "UNPAUSED"
},
"email_notifications": {
"on_failure": ["data-eng@company.com"],
"on_start": [],
"on_success": []
},
"webhook_notifications": {
"on_failure": [{"id": "slack-webhook-id"}]
},
"max_concurrent_runs": 1,
"timeout_seconds": 7200
}
Key Techniques
1. Multi-Task DAG
Job: daily_data_pipeline
Tasks:
ingest_orders:
Type: Notebook
Path: /Production/pipelines/ingest_orders
Cluster: Job Cluster (autoscaling 1-4 workers)
Timeout: 30 minutes
Retries: 2 with 60s delay
Parameters:
date: "{{job.start_time.date}}"
source: "s3://raw-data/orders/"
ingest_customers:
Type: Notebook
Path: /Production/pipelines/ingest_customers
Cluster: Same as ingest_orders
Timeout: 20 minutes
Retries: 2
Depends On: none (parallel with ingest_orders)
transform_silver:
Type: DLT Pipeline
Pipeline: orders_silver_pipeline
Depends On: [ingest_orders, ingest_customers]
Timeout: 45 minutes
build_gold:
Type: SQL Task
Warehouse: Production Analytics
SQL:
query: |
INSERT OVERWRITE gold.daily_metrics
SELECT
order_date, region,
COUNT(*) as orders,
SUM(amount) as revenue
FROM silver.orders
WHERE order_date = '{{job.parameters.date}}'
GROUP BY order_date, region
Depends On: [transform_silver]
train_model:
Type: Notebook
Path: /Production/ml/train_churn_model
Cluster: ML Cluster (GPU)
Depends On: [build_gold]
Condition: "weekday == 1" # Monday only
validate:
Type: Notebook
Path: /Production/pipelines/validate_output
Depends On: [build_gold]
Parameters:
date: "{{job.parameters.date}}"
min_orders: "100"
max_null_pct: "5"
notify:
Type: Notebook
Path: /Production/pipelines/send_notifications
Depends On: [validate]
2. Job Clusters
{
"job_clusters": [
{
"job_cluster_key": "etl_cluster",
"new_cluster": {
"spark_version": "14.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 4,
"autoscale": {
"min_workers": 2,
"max_workers": 8
},
"spark_conf": {
"spark.sql.adaptive.enabled": "true",
"spark.sql.shuffle.partitions": "auto"
},
"custom_tags": {
"team": "data-engineering",
"job": "daily_pipeline"
},
"aws_attributes": {
"first_on_demand": 1,
"availability": "SPOT_WITH_FALLBACK",
"spot_bid_price_percent": 100
}
}
},
{
"job_cluster_key": "ml_cluster",
"new_cluster": {
"spark_version": "14.3.x-gpu-ml-scala2.12",
"node_type_id": "g4dn.xlarge",
"num_workers": 2,
"spark_conf": {
"spark.task.resource.gpu.amount": "1"
}
}
}
]
}
3. Retry and Error Handling
# In notebook task: structured error handling
try:
# Main processing
df = spark.table("bronze.orders").filter(F.col("date") == process_date)
if df.count() == 0:
dbutils.notebook.exit(json.dumps({
"status": "warning",
"message": f"No data for {process_date}",
"records": 0
}))
# Transform and write
result = transform(df)
result.write.format("delta").mode("overwrite").save(output_path)
dbutils.notebook.exit(json.dumps({
"status": "success",
"records": result.count(),
"date": process_date
}))
except Exception as e:
# Log error details before failing
error_msg = f"Pipeline failed for {process_date}: {str(e)}"
print(error_msg)
# Write to error log table
spark.sql(f"""
INSERT INTO ops.pipeline_errors
VALUES ('{process_date}', '{str(e)[:1000]}', current_timestamp())
""")
# Exit with error (triggers retry)
raise
4. Parameterized Jobs
# Date-parameterized backfill job
dbutils.widgets.text("start_date", "", "Start Date")
dbutils.widgets.text("end_date", "", "End Date")
dbutils.widgets.dropdown("mode", "incremental", ["incremental", "full"], "Mode")
start = dbutils.widgets.get("start_date") or str(datetime.now().date() - timedelta(days=1))
end = dbutils.widgets.get("end_date") or start
mode = dbutils.widgets.get("mode")
print(f"Processing: {start} to {end}, mode: {mode}")
if mode == "full":
# Full reprocessing
spark.sql(f"DELETE FROM gold.metrics WHERE date BETWEEN '{start}' AND '{end}'")
# Process each date
current = datetime.strptime(start, "%Y-%m-%d")
end_dt = datetime.strptime(end, "%Y-%m-%d")
while current <= end_dt:
process_date(current.strftime("%Y-%m-%d"))
current += timedelta(days=1)
5. Monitoring and Alerting
# Post-pipeline validation notebook
def validate_pipeline_output(date, expectations):
"""Validate pipeline output meets expectations."""
results = []
for table, checks in expectations.items():
df = spark.table(table).filter(F.col("date") == date)
# Row count check
count = df.count()
results.append({
"table": table,
"check": "row_count",
"value": count,
"threshold": checks.get("min_rows", 0),
"passed": count >= checks.get("min_rows", 0)
})
# Null percentage check
for col_name in checks.get("not_null_columns", []):
null_pct = df.filter(F.col(col_name).isNull()).count() / max(count, 1) * 100
results.append({
"table": table,
"check": f"null_pct_{col_name}",
"value": null_pct,
"threshold": checks.get("max_null_pct", 5),
"passed": null_pct <= checks.get("max_null_pct", 5)
})
failed = [r for r in results if not r["passed"]]
if failed:
msg = f"Pipeline validation failed for {date}:\n"
for f in failed:
msg += f" - {f['table']}.{f['check']}: {f['value']} (threshold: {f['threshold']})\n"
send_slack_alert(msg)
raise Exception(msg)
return results
validate_pipeline_output(process_date, {
"gold.daily_metrics": {
"min_rows": 100,
"not_null_columns": ["revenue", "order_count"],
"max_null_pct": 2
},
"gold.customer_360": {
"min_rows": 1000,
"not_null_columns": ["customer_id", "email"],
"max_null_pct": 5
}
})
Best Practices
- Job clusters over all-purpose: Job clusters start fresh, cost less, and auto-terminate
- Spot instances with fallback: Use spot for workers, on-demand for driver
- Idempotent tasks: Every task should be safely re-runnable
- Parameterize dates: Never hardcode processing dates; pass as parameters
- Retry with delay: 2-3 retries with 60-second delays handles transient failures
- Validate output: Post-pipeline validation catches data quality issues before consumers see them
- Tag everything: Team, environment, SLA for cost tracking and alerting
- Max concurrent runs = 1: Prevent overlapping runs for the same pipeline
Common Pitfalls
- No retries: Transient cloud failures cause unnecessary on-call pages
- All-purpose clusters for jobs: 10x more expensive than job clusters
- No timeout: Hung jobs run indefinitely, consuming resources
- Manual backfills: Re-running jobs by manually changing dates; parameterize instead
- No validation: Pipeline succeeds but output is empty or corrupt
Anti-Patterns
- Cron Over Orchestration: Using cron with sleep delays instead of task dependencies.
- One Mega Notebook: Entire pipeline in one notebook. Break into tasks with clear dependencies.
- No Monitoring: Pipeline runs without anyone checking if it produced correct results.
- Cluster Per Task: Every task gets its own cluster. Share job clusters across related tasks.
Install this skill directly: skilldb add databricks-skills