Apache Airflow
Orchestrate data pipelines using Apache Airflow DAGs, operators, sensors, and XCom.
You are an expert in Apache Airflow pipeline orchestration, skilled at designing DAGs with proper dependency management, idempotent tasks, and robust error handling.
## Key Points
- **Passing large data via XCom**: XCom is stored in the metadata DB; push references (S3 paths, table names) instead of actual datasets
- **Using `depends_on_past=True` without `wait_for_downstream`**: Creates hidden deadlocks when upstream reruns are needed
- **Hardcoding dates instead of using `{{ ds }}` or `{{ data_interval_start }}`**: Breaks idempotency and makes backfills impossible
- **Running DAGs with `catchup=True` unintentionally**: Triggers massive backfills that overwhelm workers and external systems
- Scheduling and orchestrating batch ETL/ELT pipelines that run on time-based intervals
- Coordinating dependencies across multiple external systems (S3, BigQuery, Spark, dbt)
- Building data pipelines that require backfill capability over historical date ranges
- Monitoring and alerting on pipeline failures with SLA tracking and callbacks
- Running dynamic workflows where the number of tasks depends on upstream data
## Quick Example
```bash
export AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
export AIRFLOW__CORE__EXECUTOR=CeleryExecutor
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:pass@db:5432/airflow
export AIRFLOW__CORE__LOAD_EXAMPLES=False
```skilldb get data-pipeline-services-skills/Apache AirflowFull skill: 191 linesApache Airflow
You are an expert in Apache Airflow pipeline orchestration, skilled at designing DAGs with proper dependency management, idempotent tasks, and robust error handling.
Core Philosophy
DAGs as Code
Airflow DAGs are Python files. Keep them declarative, avoid heavy computation at parse time, and let operators handle execution. DAG parsing happens frequently; slow imports degrade the scheduler.
Idempotency and Retries
Every task must produce the same result when re-run for the same execution_date. Design tasks to overwrite rather than append, use date-partitioned targets, and configure retries with exponential backoff.
Separation of Orchestration and Computation
Airflow orchestrates work; it should not perform heavy data processing itself. Use operators that delegate to external systems (Spark, BigQuery, dbt) and keep the Airflow worker lightweight.
Setup
Install Airflow with constraints for reproducible environments:
export AIRFLOW_VERSION=2.9.1
export PYTHON_VERSION=$(python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
export CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
pip install apache-airflow-providers-google apache-airflow-providers-amazon
Configure airflow.cfg or environment variables:
export AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
export AIRFLOW__CORE__EXECUTOR=CeleryExecutor
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:pass@db:5432/airflow
export AIRFLOW__CORE__LOAD_EXAMPLES=False
Key Patterns
Do: Use TaskFlow API for Python tasks
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def etl_pipeline():
@task
def extract() -> dict:
return {"records": 100, "source": "api"}
@task
def transform(data: dict) -> dict:
data["transformed"] = True
return data
@task
def load(data: dict):
print(f"Loading {data['records']} records")
raw = extract()
cleaned = transform(raw)
load(cleaned)
etl_pipeline()
Do Not: Put heavy logic in DAG file top-level scope
# BAD - runs on every scheduler parse
import pandas as pd
df = pd.read_csv("huge_file.csv") # blocks scheduler
# GOOD - logic inside operator/task only
@task
def process():
import pandas as pd
df = pd.read_csv("huge_file.csv")
return df.shape[0]
Do: Use Jinja templates for date partitioning
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
query_job = BigQueryInsertJobOperator(
task_id="run_query",
configuration={
"query": {
"query": """
SELECT * FROM source_table
WHERE date_partition = '{{ ds }}'
""",
"useLegacySql": False,
"destinationTable": {
"projectId": "my-project",
"datasetId": "warehouse",
"tableId": "daily_{{ ds_nodash }}",
},
"writeDisposition": "WRITE_TRUNCATE",
}
},
)
Common Patterns
Sensor with timeout and poke interval
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
wait_for_file = S3KeySensor(
task_id="wait_for_upload",
bucket_name="data-lake",
bucket_key="incoming/{{ ds }}/data.parquet",
aws_conn_id="aws_default",
poke_interval=300,
timeout=3600,
mode="reschedule", # frees worker slot between pokes
)
Dynamic task mapping (Airflow 2.3+)
@task
def get_partitions() -> list[str]:
return ["us-east-1", "eu-west-1", "ap-south-1"]
@task
def process_partition(region: str):
print(f"Processing {region}")
regions = get_partitions()
process_partition.expand(region=regions)
Custom connection and hook usage
from airflow.hooks.base import BaseHook
@task
def call_api():
conn = BaseHook.get_connection("my_api")
import requests
resp = requests.get(
f"{conn.schema}://{conn.host}:{conn.port}/data",
headers={"Authorization": f"Bearer {conn.password}"},
)
return resp.json()
Task group for logical grouping
from airflow.utils.task_group import TaskGroup
with TaskGroup("validation") as validate:
check_schema = SqlColumnCheckOperator(...)
check_rows = SqlRowCountCheckOperator(...)
check_schema >> check_rows
extract >> validate >> load
Anti-Patterns
- Passing large data via XCom: XCom is stored in the metadata DB; push references (S3 paths, table names) instead of actual datasets
- Using
depends_on_past=Truewithoutwait_for_downstream: Creates hidden deadlocks when upstream reruns are needed - Hardcoding dates instead of using
{{ ds }}or{{ data_interval_start }}: Breaks idempotency and makes backfills impossible - Running DAGs with
catchup=Trueunintentionally: Triggers massive backfills that overwhelm workers and external systems
When to Use
- Scheduling and orchestrating batch ETL/ELT pipelines that run on time-based intervals
- Coordinating dependencies across multiple external systems (S3, BigQuery, Spark, dbt)
- Building data pipelines that require backfill capability over historical date ranges
- Monitoring and alerting on pipeline failures with SLA tracking and callbacks
- Running dynamic workflows where the number of tasks depends on upstream data
Install this skill directly: skilldb add data-pipeline-services-skills
Related Skills
Airbyte
Configure Airbyte open-source data integration with custom connectors, destinations, and CDC replication.
Apache Spark
Process large-scale data with Apache Spark using PySpark DataFrames, Spark SQL, and structured streaming.
Bigquery
Build analytical pipelines on Google BigQuery using SQL, streaming inserts, and federated queries.
Clickhouse
Build high-performance OLAP queries on ClickHouse using MergeTree engines, materialized views, and aggregations.
DBT
Build and test data transformation pipelines using dbt models, macros, and incremental strategies.
Fivetran
Configure and manage Fivetran connectors for automated data ingestion into warehouses.