DBT
Build and test data transformation pipelines using dbt models, macros, and incremental strategies.
You are an expert in dbt for analytics engineering, skilled at designing modular SQL transformations with proper testing, documentation, and incremental materialization strategies.
## Key Points
- name: stg_orders
- name: raw
- **Using `{{ this }}` in non-incremental models**: The `{{ this }}` reference only makes sense in incremental models; elsewhere it causes confusing errors or circular references
- **Skipping the staging layer and referencing sources directly in marts**: Creates tight coupling to source schemas and duplicates renaming/casting logic across models
- **Hardcoding warehouse-specific SQL instead of using dbt utils/macros**: Breaks portability and prevents reuse; use `dbt_utils.surrogate_key()`, `dbt_utils.pivot()`, etc.
- **Not running `dbt test` in CI/CD**: Deploying untested models to production leads to silent data quality regressions that downstream consumers discover too late
- Transforming raw ingested data into clean, tested, documented analytical models
- Building a shared semantic layer with consistent business logic definitions
- Managing SQL-based transformation dependencies with automatic DAG resolution
- Implementing slowly changing dimensions with built-in snapshot support
- Enforcing data contracts and quality checks as part of the deployment pipeline
## Quick Example
```bash
pip install dbt-core dbt-bigquery # or dbt-snowflake, dbt-postgres
dbt init my_project
```skilldb get data-pipeline-services-skills/DBTFull skill: 242 linesdbt (data build tool)
You are an expert in dbt for analytics engineering, skilled at designing modular SQL transformations with proper testing, documentation, and incremental materialization strategies.
Core Philosophy
Models as Select Statements
Every dbt model is a single SELECT statement stored in a .sql file. dbt handles DDL (CREATE, INSERT) automatically based on the materialization strategy. Keep models focused on one logical transformation.
Test-Driven Transformation
Define schema tests (unique, not_null, accepted_values, relationships) on every model. Data quality issues caught early prevent cascading failures downstream.
Layered Architecture
Organize models into staging (1:1 source mirrors), intermediate (business logic joins), and marts (final consumer-facing tables). Each layer has clear naming conventions and ownership.
Setup
Initialize a dbt project and configure your warehouse connection:
pip install dbt-core dbt-bigquery # or dbt-snowflake, dbt-postgres
dbt init my_project
Configure profiles.yml (typically at ~/.dbt/profiles.yml):
my_project:
target: dev
outputs:
dev:
type: bigquery
method: oauth
project: my-gcp-project
dataset: dbt_dev_{{ env_var('USER') }}
threads: 4
location: US
prod:
type: bigquery
method: service-account
project: my-gcp-project
dataset: analytics
threads: 8
keyfile: /secrets/sa-key.json
Project structure in dbt_project.yml:
name: my_project
version: '1.0.0'
profile: my_project
models:
my_project:
staging:
+materialized: view
+schema: staging
marts:
+materialized: table
+schema: analytics
Key Patterns
Do: Use staging models as a clean interface to raw sources
-- models/staging/stg_orders.sql
with source as (
select * from {{ source('raw', 'orders') }}
),
renamed as (
select
id as order_id,
user_id as customer_id,
cast(created_at as timestamp) as ordered_at,
cast(amount_cents as numeric) / 100 as order_amount,
status as order_status
from source
where _deleted is false
)
select * from renamed
Do Not: Write complex multi-CTE logic without breaking into separate models
-- BAD: 300-line model with 12 CTEs doing unrelated things
-- GOOD: split into focused intermediate models
-- models/intermediate/int_orders_with_payments.sql
select
o.order_id,
o.customer_id,
o.ordered_at,
p.total_paid,
p.payment_method
from {{ ref('stg_orders') }} o
left join {{ ref('int_payments_aggregated') }} p
on o.order_id = p.order_id
Do: Define tests and documentation in schema YAML
# models/staging/_stg_models.yml
version: 2
models:
- name: stg_orders
description: "Cleaned orders from source system, one row per order"
columns:
- name: order_id
description: "Primary key"
tests:
- unique
- not_null
- name: order_status
tests:
- accepted_values:
values: ['pending', 'completed', 'cancelled', 'refunded']
- name: customer_id
tests:
- relationships:
to: ref('stg_customers')
field: customer_id
Common Patterns
Incremental model with merge strategy
-- models/marts/fct_events.sql
{{
config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='merge',
on_schema_change='append_new_columns'
)
}}
select
event_id,
user_id,
event_type,
event_properties,
occurred_at
from {{ ref('stg_events') }}
{% if is_incremental() %}
where occurred_at > (select max(occurred_at) from {{ this }})
{% endif %}
Reusable macro for common logic
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
round(cast({{ column_name }} as numeric) / 100, 2)
{% endmacro %}
-- Usage in models:
select
order_id,
{{ cents_to_dollars('amount_cents') }} as amount_dollars
from {{ source('payments', 'charges') }}
Snapshot for slowly changing dimensions (SCD Type 2)
-- snapshots/snap_customers.sql
{% snapshot snap_customers %}
{{
config(
target_schema='snapshots',
unique_key='customer_id',
strategy='timestamp',
updated_at='updated_at',
)
}}
select * from {{ source('raw', 'customers') }}
{% endsnapshot %}
Custom generic test
-- tests/generic/test_positive_value.sql
{% test positive_value(model, column_name) %}
select {{ column_name }}
from {{ model }}
where {{ column_name }} < 0
{% endtest %}
Source freshness monitoring
# models/staging/_sources.yml
sources:
- name: raw
database: production
freshness:
warn_after: { count: 12, period: hour }
error_after: { count: 24, period: hour }
loaded_at_field: _etl_loaded_at
tables:
- name: orders
- name: customers
Anti-Patterns
- Using
{{ this }}in non-incremental models: The{{ this }}reference only makes sense in incremental models; elsewhere it causes confusing errors or circular references - Skipping the staging layer and referencing sources directly in marts: Creates tight coupling to source schemas and duplicates renaming/casting logic across models
- Hardcoding warehouse-specific SQL instead of using dbt utils/macros: Breaks portability and prevents reuse; use
dbt_utils.surrogate_key(),dbt_utils.pivot(), etc. - Not running
dbt testin CI/CD: Deploying untested models to production leads to silent data quality regressions that downstream consumers discover too late
When to Use
- Transforming raw ingested data into clean, tested, documented analytical models
- Building a shared semantic layer with consistent business logic definitions
- Managing SQL-based transformation dependencies with automatic DAG resolution
- Implementing slowly changing dimensions with built-in snapshot support
- Enforcing data contracts and quality checks as part of the deployment pipeline
Install this skill directly: skilldb add data-pipeline-services-skills
Related Skills
Airbyte
Configure Airbyte open-source data integration with custom connectors, destinations, and CDC replication.
Apache Airflow
Orchestrate data pipelines using Apache Airflow DAGs, operators, sensors, and XCom.
Apache Spark
Process large-scale data with Apache Spark using PySpark DataFrames, Spark SQL, and structured streaming.
Bigquery
Build analytical pipelines on Google BigQuery using SQL, streaming inserts, and federated queries.
Clickhouse
Build high-performance OLAP queries on ClickHouse using MergeTree engines, materialized views, and aggregations.
Fivetran
Configure and manage Fivetran connectors for automated data ingestion into warehouses.