Skip to main content
Technology & EngineeringData Pipeline Services258 lines

Airbyte

Configure Airbyte open-source data integration with custom connectors, destinations, and CDC replication.

Quick Summary15 lines
You are an expert in Airbyte open-source data integration, skilled at configuring sources, destinations, CDC replication, custom connectors, and production deployment patterns.

## Key Points

- **Running Airbyte workers without resource limits**: A single large sync can exhaust all memory on the node; always set worker memory/CPU limits in Docker or Kubernetes
- **Ignoring the normalization step in destination configuration**: Raw JSON blobs in `_airbyte_raw_*` tables are unusable for analytics; enable basic normalization or use dbt
- **Using full_refresh for CDC-capable sources**: Wastes bandwidth and compute when the source supports WAL/binlog-based incremental replication
- **Deploying without monitoring job failure rates and sync durations**: Silent failures lead to stale data; integrate with Prometheus/Grafana or send alerts on failed jobs
- Self-hosting data integration to maintain full control over data residency and networking
- Replicating databases with CDC when you need near-real-time change capture without vendor lock-in
- Building custom connectors for internal APIs using the Airbyte CDK and connector protocol
- Consolidating dozens of SaaS data sources into a single warehouse with a unified orchestration layer
- Evaluating ELT tooling with the flexibility to migrate between open-source and Airbyte Cloud
skilldb get data-pipeline-services-skills/AirbyteFull skill: 258 lines
Paste into your CLAUDE.md or agent config

Airbyte

You are an expert in Airbyte open-source data integration, skilled at configuring sources, destinations, CDC replication, custom connectors, and production deployment patterns.

Core Philosophy

Open-Source ELT with Protocol Standardization

Airbyte uses a standardized connector protocol (Airbyte Protocol) so any source can write to any destination. Leverage this to build custom connectors when pre-built ones are unavailable.

Sync Modes Drive Behavior

Each stream (table) has a sync mode that determines how data flows. Understand the matrix: full_refresh|overwrite, full_refresh|append, incremental|append, incremental|deduped_history. Choosing wrong modes causes data loss or unbounded growth.

Self-Hosted Control

Unlike managed services, you own the infrastructure. This means you control scaling, networking, and data residency but must handle upgrades, monitoring, and resource provisioning.

Setup

Deploy Airbyte OSS with Docker Compose or Kubernetes:

# Docker Compose (development/small-scale)
git clone --depth 1 https://github.com/airbytehq/airbyte.git
cd airbyte
./run-ab-platform.sh

# Access UI at http://localhost:8000
# Default credentials: airbyte / password

Kubernetes deployment with Helm:

helm repo add airbyte https://airbytehq.github.io/helm-charts
helm repo update

helm install airbyte airbyte/airbyte \
  --namespace airbyte --create-namespace \
  --set webapp.service.type=ClusterIP \
  --set server.resources.requests.memory=1Gi \
  --values custom-values.yaml

API access for automation:

export AIRBYTE_API_URL="http://localhost:8006/v1"
export AIRBYTE_TOKEN="your_bearer_token"

# List workspaces
curl -s -H "Authorization: Bearer ${AIRBYTE_TOKEN}" \
  "${AIRBYTE_API_URL}/workspaces" | jq .

Key Patterns

Do: Use incremental|deduped for mutable source tables

import requests

# Create a connection with incremental deduped sync
connection = requests.post(
    f"{AIRBYTE_API_URL}/connections",
    headers={"Authorization": f"Bearer {AIRBYTE_TOKEN}"},
    json={
        "sourceId": source_id,
        "destinationId": destination_id,
        "name": "postgres_to_bigquery",
        "schedule": {"scheduleType": "cron", "cronExpression": "0 */2 * * *"},
        "namespaceDefinition": "destination",
        "namespaceFormat": "raw_${SOURCE_NAMESPACE}",
        "configurations": {
            "streams": [
                {
                    "name": "orders",
                    "syncMode": "incremental_deduped_history",
                    "cursorField": ["updated_at"],
                    "primaryKey": [["id"]],
                }
            ]
        },
    },
).json()

Do Not: Use full_refresh|overwrite for large tables on frequent schedules

# BAD - rewrites entire table every sync
{"syncMode": "full_refresh_overwrite", "name": "events"}  # 500M rows

# GOOD - only sync new/changed records
{
    "syncMode": "incremental_append",
    "name": "events",
    "cursorField": ["created_at"],
}

Do: Configure CDC for real-time database replication

# PostgreSQL source with CDC (logical replication)
source_config = {
    "sourceType": "postgres",
    "configuration": {
        "host": "db.example.com",
        "port": 5432,
        "database": "production",
        "username": "airbyte_cdc",
        "password": "secure_pass",
        "replication_method": {
            "method": "CDC",
            "replication_slot": "airbyte_slot",
            "publication": "airbyte_publication",
            "initial_waiting_seconds": 300,
        },
        "ssl_mode": {"mode": "require"},
    },
}

Common Patterns

Trigger sync via API and poll status

def trigger_sync(connection_id: str) -> str:
    resp = requests.post(
        f"{AIRBYTE_API_URL}/jobs",
        headers={"Authorization": f"Bearer {AIRBYTE_TOKEN}"},
        json={"connectionId": connection_id, "jobType": "sync"},
    )
    return resp.json()["jobId"]

def wait_for_job(job_id: str, timeout: int = 7200):
    import time
    start = time.time()
    while time.time() - start < timeout:
        resp = requests.get(
            f"{AIRBYTE_API_URL}/jobs/{job_id}",
            headers={"Authorization": f"Bearer {AIRBYTE_TOKEN}"},
        )
        status = resp.json()["status"]
        if status == "succeeded":
            return resp.json()
        if status in ("failed", "cancelled"):
            raise RuntimeError(f"Job {job_id} ended with status: {status}")
        time.sleep(30)
    raise TimeoutError(f"Job {job_id} did not finish within {timeout}s")

Manage connections with Terraform

resource "airbyte_connection" "postgres_to_snowflake" {
  name           = "prod-postgres-to-snowflake"
  source_id      = airbyte_source_postgres.prod.source_id
  destination_id = airbyte_destination_snowflake.warehouse.destination_id

  schedule = {
    schedule_type   = "cron"
    cron_expression = "0 0 * * *"
  }

  configurations = {
    streams = [
      {
        name       = "users"
        sync_mode  = "incremental_deduped_history"
        cursor_field = ["updated_at"]
        primary_key  = [["id"]]
      },
      {
        name       = "orders"
        sync_mode  = "incremental_append"
        cursor_field = ["created_at"]
      },
    ]
  }
}

Build a custom source connector with the CDK

# source_my_api/source.py
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams.http import HttpStream

class MyApiStream(HttpStream):
    url_base = "https://api.example.com/v1/"
    primary_key = "id"

    def path(self, **kwargs) -> str:
        return "records"

    def parse_response(self, response, **kwargs):
        yield from response.json()["data"]

    def next_page_token(self, response):
        meta = response.json().get("meta", {})
        if meta.get("has_more"):
            return {"cursor": meta["next_cursor"]}
        return None

class SourceMyApi(AbstractSource):
    def check_connection(self, logger, config):
        return True, None

    def streams(self, config):
        return [MyApiStream(authenticator=self._get_auth(config))]

Integrate with Airflow

from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor

trigger = AirbyteTriggerSyncOperator(
    task_id="trigger_airbyte_sync",
    airbyte_conn_id="airbyte_default",
    connection_id="your-connection-uuid",
    asynchronous=True,
)

wait = AirbyteJobSensor(
    task_id="wait_for_sync",
    airbyte_conn_id="airbyte_default",
    airbyte_job_id=trigger.output,
)

trigger >> wait >> downstream_task

Anti-Patterns

  • Running Airbyte workers without resource limits: A single large sync can exhaust all memory on the node; always set worker memory/CPU limits in Docker or Kubernetes
  • Ignoring the normalization step in destination configuration: Raw JSON blobs in _airbyte_raw_* tables are unusable for analytics; enable basic normalization or use dbt
  • Using full_refresh for CDC-capable sources: Wastes bandwidth and compute when the source supports WAL/binlog-based incremental replication
  • Deploying without monitoring job failure rates and sync durations: Silent failures lead to stale data; integrate with Prometheus/Grafana or send alerts on failed jobs

When to Use

  • Self-hosting data integration to maintain full control over data residency and networking
  • Replicating databases with CDC when you need near-real-time change capture without vendor lock-in
  • Building custom connectors for internal APIs using the Airbyte CDK and connector protocol
  • Consolidating dozens of SaaS data sources into a single warehouse with a unified orchestration layer
  • Evaluating ELT tooling with the flexibility to migrate between open-source and Airbyte Cloud

Install this skill directly: skilldb add data-pipeline-services-skills

Get CLI access →