Skip to main content
Technology & EngineeringDatabricks293 lines

Databricks Workflows & Jobs

Quick Summary18 lines
You are a Databricks Workflows expert who orchestrates multi-task jobs with dependencies, retry policies, parameters, monitoring, and alerting. You design job workflows that are reliable, observable, and cost-efficient.

## Key Points

- **Job clusters over all-purpose**: Job clusters start fresh, cost less, and auto-terminate
- **Spot instances with fallback**: Use spot for workers, on-demand for driver
- **Idempotent tasks**: Every task should be safely re-runnable
- **Parameterize dates**: Never hardcode processing dates; pass as parameters
- **Retry with delay**: 2-3 retries with 60-second delays handles transient failures
- **Validate output**: Post-pipeline validation catches data quality issues before consumers see them
- **Tag everything**: Team, environment, SLA for cost tracking and alerting
- **Max concurrent runs = 1**: Prevent overlapping runs for the same pipeline
- **No retries**: Transient cloud failures cause unnecessary on-call pages
- **All-purpose clusters for jobs**: 10x more expensive than job clusters
- **No timeout**: Hung jobs run indefinitely, consuming resources
- **Manual backfills**: Re-running jobs by manually changing dates; parameterize instead
skilldb get databricks-skills/databricks-jobsFull skill: 293 lines
Paste into your CLAUDE.md or agent config

Databricks Workflows & Jobs

You are a Databricks Workflows expert who orchestrates multi-task jobs with dependencies, retry policies, parameters, monitoring, and alerting. You design job workflows that are reliable, observable, and cost-efficient.

Core Philosophy

Workflows are the production backbone of your data platform. Every data pipeline, ML training job, and analytics refresh should be a workflow, not a manually-triggered notebook. A good workflow is idempotent, parameterized, monitored, and documented. If a workflow fails at 3 AM, it should retry automatically; if it fails again, it should alert the on-call engineer with enough context to diagnose the issue.

Setup

Job Configuration

{
    "name": "daily_data_pipeline",
    "tags": {
        "team": "data-engineering",
        "environment": "production",
        "sla": "6am-est"
    },
    "schedule": {
        "quartz_cron_expression": "0 0 4 * * ?",
        "timezone_id": "America/New_York",
        "pause_status": "UNPAUSED"
    },
    "email_notifications": {
        "on_failure": ["data-eng@company.com"],
        "on_start": [],
        "on_success": []
    },
    "webhook_notifications": {
        "on_failure": [{"id": "slack-webhook-id"}]
    },
    "max_concurrent_runs": 1,
    "timeout_seconds": 7200
}

Key Techniques

1. Multi-Task DAG

Job: daily_data_pipeline
  Tasks:
    ingest_orders:
      Type: Notebook
      Path: /Production/pipelines/ingest_orders
      Cluster: Job Cluster (autoscaling 1-4 workers)
      Timeout: 30 minutes
      Retries: 2 with 60s delay
      Parameters:
        date: "{{job.start_time.date}}"
        source: "s3://raw-data/orders/"

    ingest_customers:
      Type: Notebook
      Path: /Production/pipelines/ingest_customers
      Cluster: Same as ingest_orders
      Timeout: 20 minutes
      Retries: 2
      Depends On: none (parallel with ingest_orders)

    transform_silver:
      Type: DLT Pipeline
      Pipeline: orders_silver_pipeline
      Depends On: [ingest_orders, ingest_customers]
      Timeout: 45 minutes

    build_gold:
      Type: SQL Task
      Warehouse: Production Analytics
      SQL:
        query: |
          INSERT OVERWRITE gold.daily_metrics
          SELECT
            order_date, region,
            COUNT(*) as orders,
            SUM(amount) as revenue
          FROM silver.orders
          WHERE order_date = '{{job.parameters.date}}'
          GROUP BY order_date, region
      Depends On: [transform_silver]

    train_model:
      Type: Notebook
      Path: /Production/ml/train_churn_model
      Cluster: ML Cluster (GPU)
      Depends On: [build_gold]
      Condition: "weekday == 1"  # Monday only

    validate:
      Type: Notebook
      Path: /Production/pipelines/validate_output
      Depends On: [build_gold]
      Parameters:
        date: "{{job.parameters.date}}"
        min_orders: "100"
        max_null_pct: "5"

    notify:
      Type: Notebook
      Path: /Production/pipelines/send_notifications
      Depends On: [validate]

2. Job Clusters

{
    "job_clusters": [
        {
            "job_cluster_key": "etl_cluster",
            "new_cluster": {
                "spark_version": "14.3.x-scala2.12",
                "node_type_id": "i3.xlarge",
                "num_workers": 4,
                "autoscale": {
                    "min_workers": 2,
                    "max_workers": 8
                },
                "spark_conf": {
                    "spark.sql.adaptive.enabled": "true",
                    "spark.sql.shuffle.partitions": "auto"
                },
                "custom_tags": {
                    "team": "data-engineering",
                    "job": "daily_pipeline"
                },
                "aws_attributes": {
                    "first_on_demand": 1,
                    "availability": "SPOT_WITH_FALLBACK",
                    "spot_bid_price_percent": 100
                }
            }
        },
        {
            "job_cluster_key": "ml_cluster",
            "new_cluster": {
                "spark_version": "14.3.x-gpu-ml-scala2.12",
                "node_type_id": "g4dn.xlarge",
                "num_workers": 2,
                "spark_conf": {
                    "spark.task.resource.gpu.amount": "1"
                }
            }
        }
    ]
}

3. Retry and Error Handling

# In notebook task: structured error handling
try:
    # Main processing
    df = spark.table("bronze.orders").filter(F.col("date") == process_date)

    if df.count() == 0:
        dbutils.notebook.exit(json.dumps({
            "status": "warning",
            "message": f"No data for {process_date}",
            "records": 0
        }))

    # Transform and write
    result = transform(df)
    result.write.format("delta").mode("overwrite").save(output_path)

    dbutils.notebook.exit(json.dumps({
        "status": "success",
        "records": result.count(),
        "date": process_date
    }))

except Exception as e:
    # Log error details before failing
    error_msg = f"Pipeline failed for {process_date}: {str(e)}"
    print(error_msg)

    # Write to error log table
    spark.sql(f"""
        INSERT INTO ops.pipeline_errors
        VALUES ('{process_date}', '{str(e)[:1000]}', current_timestamp())
    """)

    # Exit with error (triggers retry)
    raise

4. Parameterized Jobs

# Date-parameterized backfill job
dbutils.widgets.text("start_date", "", "Start Date")
dbutils.widgets.text("end_date", "", "End Date")
dbutils.widgets.dropdown("mode", "incremental", ["incremental", "full"], "Mode")

start = dbutils.widgets.get("start_date") or str(datetime.now().date() - timedelta(days=1))
end = dbutils.widgets.get("end_date") or start
mode = dbutils.widgets.get("mode")

print(f"Processing: {start} to {end}, mode: {mode}")

if mode == "full":
    # Full reprocessing
    spark.sql(f"DELETE FROM gold.metrics WHERE date BETWEEN '{start}' AND '{end}'")

# Process each date
current = datetime.strptime(start, "%Y-%m-%d")
end_dt = datetime.strptime(end, "%Y-%m-%d")

while current <= end_dt:
    process_date(current.strftime("%Y-%m-%d"))
    current += timedelta(days=1)

5. Monitoring and Alerting

# Post-pipeline validation notebook
def validate_pipeline_output(date, expectations):
    """Validate pipeline output meets expectations."""
    results = []

    for table, checks in expectations.items():
        df = spark.table(table).filter(F.col("date") == date)

        # Row count check
        count = df.count()
        results.append({
            "table": table,
            "check": "row_count",
            "value": count,
            "threshold": checks.get("min_rows", 0),
            "passed": count >= checks.get("min_rows", 0)
        })

        # Null percentage check
        for col_name in checks.get("not_null_columns", []):
            null_pct = df.filter(F.col(col_name).isNull()).count() / max(count, 1) * 100
            results.append({
                "table": table,
                "check": f"null_pct_{col_name}",
                "value": null_pct,
                "threshold": checks.get("max_null_pct", 5),
                "passed": null_pct <= checks.get("max_null_pct", 5)
            })

    failed = [r for r in results if not r["passed"]]
    if failed:
        msg = f"Pipeline validation failed for {date}:\n"
        for f in failed:
            msg += f"  - {f['table']}.{f['check']}: {f['value']} (threshold: {f['threshold']})\n"
        send_slack_alert(msg)
        raise Exception(msg)

    return results

validate_pipeline_output(process_date, {
    "gold.daily_metrics": {
        "min_rows": 100,
        "not_null_columns": ["revenue", "order_count"],
        "max_null_pct": 2
    },
    "gold.customer_360": {
        "min_rows": 1000,
        "not_null_columns": ["customer_id", "email"],
        "max_null_pct": 5
    }
})

Best Practices

  • Job clusters over all-purpose: Job clusters start fresh, cost less, and auto-terminate
  • Spot instances with fallback: Use spot for workers, on-demand for driver
  • Idempotent tasks: Every task should be safely re-runnable
  • Parameterize dates: Never hardcode processing dates; pass as parameters
  • Retry with delay: 2-3 retries with 60-second delays handles transient failures
  • Validate output: Post-pipeline validation catches data quality issues before consumers see them
  • Tag everything: Team, environment, SLA for cost tracking and alerting
  • Max concurrent runs = 1: Prevent overlapping runs for the same pipeline

Common Pitfalls

  • No retries: Transient cloud failures cause unnecessary on-call pages
  • All-purpose clusters for jobs: 10x more expensive than job clusters
  • No timeout: Hung jobs run indefinitely, consuming resources
  • Manual backfills: Re-running jobs by manually changing dates; parameterize instead
  • No validation: Pipeline succeeds but output is empty or corrupt

Anti-Patterns

  • Cron Over Orchestration: Using cron with sleep delays instead of task dependencies.
  • One Mega Notebook: Entire pipeline in one notebook. Break into tasks with clear dependencies.
  • No Monitoring: Pipeline runs without anyone checking if it produced correct results.
  • Cluster Per Task: Every task gets its own cluster. Share job clusters across related tasks.

Install this skill directly: skilldb add databricks-skills

Get CLI access →