Airbyte
Configure Airbyte open-source data integration with custom connectors, destinations, and CDC replication.
You are an expert in Airbyte open-source data integration, skilled at configuring sources, destinations, CDC replication, custom connectors, and production deployment patterns. ## Key Points - **Running Airbyte workers without resource limits**: A single large sync can exhaust all memory on the node; always set worker memory/CPU limits in Docker or Kubernetes - **Ignoring the normalization step in destination configuration**: Raw JSON blobs in `_airbyte_raw_*` tables are unusable for analytics; enable basic normalization or use dbt - **Using full_refresh for CDC-capable sources**: Wastes bandwidth and compute when the source supports WAL/binlog-based incremental replication - **Deploying without monitoring job failure rates and sync durations**: Silent failures lead to stale data; integrate with Prometheus/Grafana or send alerts on failed jobs - Self-hosting data integration to maintain full control over data residency and networking - Replicating databases with CDC when you need near-real-time change capture without vendor lock-in - Building custom connectors for internal APIs using the Airbyte CDK and connector protocol - Consolidating dozens of SaaS data sources into a single warehouse with a unified orchestration layer - Evaluating ELT tooling with the flexibility to migrate between open-source and Airbyte Cloud
skilldb get data-pipeline-services-skills/AirbyteFull skill: 258 linesAirbyte
You are an expert in Airbyte open-source data integration, skilled at configuring sources, destinations, CDC replication, custom connectors, and production deployment patterns.
Core Philosophy
Open-Source ELT with Protocol Standardization
Airbyte uses a standardized connector protocol (Airbyte Protocol) so any source can write to any destination. Leverage this to build custom connectors when pre-built ones are unavailable.
Sync Modes Drive Behavior
Each stream (table) has a sync mode that determines how data flows. Understand the matrix: full_refresh|overwrite, full_refresh|append, incremental|append, incremental|deduped_history. Choosing wrong modes causes data loss or unbounded growth.
Self-Hosted Control
Unlike managed services, you own the infrastructure. This means you control scaling, networking, and data residency but must handle upgrades, monitoring, and resource provisioning.
Setup
Deploy Airbyte OSS with Docker Compose or Kubernetes:
# Docker Compose (development/small-scale)
git clone --depth 1 https://github.com/airbytehq/airbyte.git
cd airbyte
./run-ab-platform.sh
# Access UI at http://localhost:8000
# Default credentials: airbyte / password
Kubernetes deployment with Helm:
helm repo add airbyte https://airbytehq.github.io/helm-charts
helm repo update
helm install airbyte airbyte/airbyte \
--namespace airbyte --create-namespace \
--set webapp.service.type=ClusterIP \
--set server.resources.requests.memory=1Gi \
--values custom-values.yaml
API access for automation:
export AIRBYTE_API_URL="http://localhost:8006/v1"
export AIRBYTE_TOKEN="your_bearer_token"
# List workspaces
curl -s -H "Authorization: Bearer ${AIRBYTE_TOKEN}" \
"${AIRBYTE_API_URL}/workspaces" | jq .
Key Patterns
Do: Use incremental|deduped for mutable source tables
import requests
# Create a connection with incremental deduped sync
connection = requests.post(
f"{AIRBYTE_API_URL}/connections",
headers={"Authorization": f"Bearer {AIRBYTE_TOKEN}"},
json={
"sourceId": source_id,
"destinationId": destination_id,
"name": "postgres_to_bigquery",
"schedule": {"scheduleType": "cron", "cronExpression": "0 */2 * * *"},
"namespaceDefinition": "destination",
"namespaceFormat": "raw_${SOURCE_NAMESPACE}",
"configurations": {
"streams": [
{
"name": "orders",
"syncMode": "incremental_deduped_history",
"cursorField": ["updated_at"],
"primaryKey": [["id"]],
}
]
},
},
).json()
Do Not: Use full_refresh|overwrite for large tables on frequent schedules
# BAD - rewrites entire table every sync
{"syncMode": "full_refresh_overwrite", "name": "events"} # 500M rows
# GOOD - only sync new/changed records
{
"syncMode": "incremental_append",
"name": "events",
"cursorField": ["created_at"],
}
Do: Configure CDC for real-time database replication
# PostgreSQL source with CDC (logical replication)
source_config = {
"sourceType": "postgres",
"configuration": {
"host": "db.example.com",
"port": 5432,
"database": "production",
"username": "airbyte_cdc",
"password": "secure_pass",
"replication_method": {
"method": "CDC",
"replication_slot": "airbyte_slot",
"publication": "airbyte_publication",
"initial_waiting_seconds": 300,
},
"ssl_mode": {"mode": "require"},
},
}
Common Patterns
Trigger sync via API and poll status
def trigger_sync(connection_id: str) -> str:
resp = requests.post(
f"{AIRBYTE_API_URL}/jobs",
headers={"Authorization": f"Bearer {AIRBYTE_TOKEN}"},
json={"connectionId": connection_id, "jobType": "sync"},
)
return resp.json()["jobId"]
def wait_for_job(job_id: str, timeout: int = 7200):
import time
start = time.time()
while time.time() - start < timeout:
resp = requests.get(
f"{AIRBYTE_API_URL}/jobs/{job_id}",
headers={"Authorization": f"Bearer {AIRBYTE_TOKEN}"},
)
status = resp.json()["status"]
if status == "succeeded":
return resp.json()
if status in ("failed", "cancelled"):
raise RuntimeError(f"Job {job_id} ended with status: {status}")
time.sleep(30)
raise TimeoutError(f"Job {job_id} did not finish within {timeout}s")
Manage connections with Terraform
resource "airbyte_connection" "postgres_to_snowflake" {
name = "prod-postgres-to-snowflake"
source_id = airbyte_source_postgres.prod.source_id
destination_id = airbyte_destination_snowflake.warehouse.destination_id
schedule = {
schedule_type = "cron"
cron_expression = "0 0 * * *"
}
configurations = {
streams = [
{
name = "users"
sync_mode = "incremental_deduped_history"
cursor_field = ["updated_at"]
primary_key = [["id"]]
},
{
name = "orders"
sync_mode = "incremental_append"
cursor_field = ["created_at"]
},
]
}
}
Build a custom source connector with the CDK
# source_my_api/source.py
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams.http import HttpStream
class MyApiStream(HttpStream):
url_base = "https://api.example.com/v1/"
primary_key = "id"
def path(self, **kwargs) -> str:
return "records"
def parse_response(self, response, **kwargs):
yield from response.json()["data"]
def next_page_token(self, response):
meta = response.json().get("meta", {})
if meta.get("has_more"):
return {"cursor": meta["next_cursor"]}
return None
class SourceMyApi(AbstractSource):
def check_connection(self, logger, config):
return True, None
def streams(self, config):
return [MyApiStream(authenticator=self._get_auth(config))]
Integrate with Airflow
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
trigger = AirbyteTriggerSyncOperator(
task_id="trigger_airbyte_sync",
airbyte_conn_id="airbyte_default",
connection_id="your-connection-uuid",
asynchronous=True,
)
wait = AirbyteJobSensor(
task_id="wait_for_sync",
airbyte_conn_id="airbyte_default",
airbyte_job_id=trigger.output,
)
trigger >> wait >> downstream_task
Anti-Patterns
- Running Airbyte workers without resource limits: A single large sync can exhaust all memory on the node; always set worker memory/CPU limits in Docker or Kubernetes
- Ignoring the normalization step in destination configuration: Raw JSON blobs in
_airbyte_raw_*tables are unusable for analytics; enable basic normalization or use dbt - Using full_refresh for CDC-capable sources: Wastes bandwidth and compute when the source supports WAL/binlog-based incremental replication
- Deploying without monitoring job failure rates and sync durations: Silent failures lead to stale data; integrate with Prometheus/Grafana or send alerts on failed jobs
When to Use
- Self-hosting data integration to maintain full control over data residency and networking
- Replicating databases with CDC when you need near-real-time change capture without vendor lock-in
- Building custom connectors for internal APIs using the Airbyte CDK and connector protocol
- Consolidating dozens of SaaS data sources into a single warehouse with a unified orchestration layer
- Evaluating ELT tooling with the flexibility to migrate between open-source and Airbyte Cloud
Install this skill directly: skilldb add data-pipeline-services-skills
Related Skills
Apache Airflow
Orchestrate data pipelines using Apache Airflow DAGs, operators, sensors, and XCom.
Apache Spark
Process large-scale data with Apache Spark using PySpark DataFrames, Spark SQL, and structured streaming.
Bigquery
Build analytical pipelines on Google BigQuery using SQL, streaming inserts, and federated queries.
Clickhouse
Build high-performance OLAP queries on ClickHouse using MergeTree engines, materialized views, and aggregations.
DBT
Build and test data transformation pipelines using dbt models, macros, and incremental strategies.
Fivetran
Configure and manage Fivetran connectors for automated data ingestion into warehouses.