Skip to main content
Technology & EngineeringData Pipeline Services191 lines

Apache Airflow

Orchestrate data pipelines using Apache Airflow DAGs, operators, sensors, and XCom.

Quick Summary24 lines
You are an expert in Apache Airflow pipeline orchestration, skilled at designing DAGs with proper dependency management, idempotent tasks, and robust error handling.

## Key Points

- **Passing large data via XCom**: XCom is stored in the metadata DB; push references (S3 paths, table names) instead of actual datasets
- **Using `depends_on_past=True` without `wait_for_downstream`**: Creates hidden deadlocks when upstream reruns are needed
- **Hardcoding dates instead of using `{{ ds }}` or `{{ data_interval_start }}`**: Breaks idempotency and makes backfills impossible
- **Running DAGs with `catchup=True` unintentionally**: Triggers massive backfills that overwhelm workers and external systems
- Scheduling and orchestrating batch ETL/ELT pipelines that run on time-based intervals
- Coordinating dependencies across multiple external systems (S3, BigQuery, Spark, dbt)
- Building data pipelines that require backfill capability over historical date ranges
- Monitoring and alerting on pipeline failures with SLA tracking and callbacks
- Running dynamic workflows where the number of tasks depends on upstream data

## Quick Example

```bash
export AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
export AIRFLOW__CORE__EXECUTOR=CeleryExecutor
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:pass@db:5432/airflow
export AIRFLOW__CORE__LOAD_EXAMPLES=False
```
skilldb get data-pipeline-services-skills/Apache AirflowFull skill: 191 lines
Paste into your CLAUDE.md or agent config

Apache Airflow

You are an expert in Apache Airflow pipeline orchestration, skilled at designing DAGs with proper dependency management, idempotent tasks, and robust error handling.

Core Philosophy

DAGs as Code

Airflow DAGs are Python files. Keep them declarative, avoid heavy computation at parse time, and let operators handle execution. DAG parsing happens frequently; slow imports degrade the scheduler.

Idempotency and Retries

Every task must produce the same result when re-run for the same execution_date. Design tasks to overwrite rather than append, use date-partitioned targets, and configure retries with exponential backoff.

Separation of Orchestration and Computation

Airflow orchestrates work; it should not perform heavy data processing itself. Use operators that delegate to external systems (Spark, BigQuery, dbt) and keep the Airflow worker lightweight.

Setup

Install Airflow with constraints for reproducible environments:

export AIRFLOW_VERSION=2.9.1
export PYTHON_VERSION=$(python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
export CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
pip install apache-airflow-providers-google apache-airflow-providers-amazon

Configure airflow.cfg or environment variables:

export AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
export AIRFLOW__CORE__EXECUTOR=CeleryExecutor
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:pass@db:5432/airflow
export AIRFLOW__CORE__LOAD_EXAMPLES=False

Key Patterns

Do: Use TaskFlow API for Python tasks

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def etl_pipeline():
    @task
    def extract() -> dict:
        return {"records": 100, "source": "api"}

    @task
    def transform(data: dict) -> dict:
        data["transformed"] = True
        return data

    @task
    def load(data: dict):
        print(f"Loading {data['records']} records")

    raw = extract()
    cleaned = transform(raw)
    load(cleaned)

etl_pipeline()

Do Not: Put heavy logic in DAG file top-level scope

# BAD - runs on every scheduler parse
import pandas as pd
df = pd.read_csv("huge_file.csv")  # blocks scheduler

# GOOD - logic inside operator/task only
@task
def process():
    import pandas as pd
    df = pd.read_csv("huge_file.csv")
    return df.shape[0]

Do: Use Jinja templates for date partitioning

from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

query_job = BigQueryInsertJobOperator(
    task_id="run_query",
    configuration={
        "query": {
            "query": """
                SELECT * FROM source_table
                WHERE date_partition = '{{ ds }}'
            """,
            "useLegacySql": False,
            "destinationTable": {
                "projectId": "my-project",
                "datasetId": "warehouse",
                "tableId": "daily_{{ ds_nodash }}",
            },
            "writeDisposition": "WRITE_TRUNCATE",
        }
    },
)

Common Patterns

Sensor with timeout and poke interval

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

wait_for_file = S3KeySensor(
    task_id="wait_for_upload",
    bucket_name="data-lake",
    bucket_key="incoming/{{ ds }}/data.parquet",
    aws_conn_id="aws_default",
    poke_interval=300,
    timeout=3600,
    mode="reschedule",  # frees worker slot between pokes
)

Dynamic task mapping (Airflow 2.3+)

@task
def get_partitions() -> list[str]:
    return ["us-east-1", "eu-west-1", "ap-south-1"]

@task
def process_partition(region: str):
    print(f"Processing {region}")

regions = get_partitions()
process_partition.expand(region=regions)

Custom connection and hook usage

from airflow.hooks.base import BaseHook

@task
def call_api():
    conn = BaseHook.get_connection("my_api")
    import requests
    resp = requests.get(
        f"{conn.schema}://{conn.host}:{conn.port}/data",
        headers={"Authorization": f"Bearer {conn.password}"},
    )
    return resp.json()

Task group for logical grouping

from airflow.utils.task_group import TaskGroup

with TaskGroup("validation") as validate:
    check_schema = SqlColumnCheckOperator(...)
    check_rows = SqlRowCountCheckOperator(...)
    check_schema >> check_rows

extract >> validate >> load

Anti-Patterns

  • Passing large data via XCom: XCom is stored in the metadata DB; push references (S3 paths, table names) instead of actual datasets
  • Using depends_on_past=True without wait_for_downstream: Creates hidden deadlocks when upstream reruns are needed
  • Hardcoding dates instead of using {{ ds }} or {{ data_interval_start }}: Breaks idempotency and makes backfills impossible
  • Running DAGs with catchup=True unintentionally: Triggers massive backfills that overwhelm workers and external systems

When to Use

  • Scheduling and orchestrating batch ETL/ELT pipelines that run on time-based intervals
  • Coordinating dependencies across multiple external systems (S3, BigQuery, Spark, dbt)
  • Building data pipelines that require backfill capability over historical date ranges
  • Monitoring and alerting on pipeline failures with SLA tracking and callbacks
  • Running dynamic workflows where the number of tasks depends on upstream data

Install this skill directly: skilldb add data-pipeline-services-skills

Get CLI access →