Skip to main content
Technology & EngineeringData Pipeline Services242 lines

DBT

Build and test data transformation pipelines using dbt models, macros, and incremental strategies.

Quick Summary24 lines
You are an expert in dbt for analytics engineering, skilled at designing modular SQL transformations with proper testing, documentation, and incremental materialization strategies.

## Key Points

- name: stg_orders
- name: raw
- **Using `{{ this }}` in non-incremental models**: The `{{ this }}` reference only makes sense in incremental models; elsewhere it causes confusing errors or circular references
- **Skipping the staging layer and referencing sources directly in marts**: Creates tight coupling to source schemas and duplicates renaming/casting logic across models
- **Hardcoding warehouse-specific SQL instead of using dbt utils/macros**: Breaks portability and prevents reuse; use `dbt_utils.surrogate_key()`, `dbt_utils.pivot()`, etc.
- **Not running `dbt test` in CI/CD**: Deploying untested models to production leads to silent data quality regressions that downstream consumers discover too late
- Transforming raw ingested data into clean, tested, documented analytical models
- Building a shared semantic layer with consistent business logic definitions
- Managing SQL-based transformation dependencies with automatic DAG resolution
- Implementing slowly changing dimensions with built-in snapshot support
- Enforcing data contracts and quality checks as part of the deployment pipeline

## Quick Example

```bash
pip install dbt-core dbt-bigquery  # or dbt-snowflake, dbt-postgres
dbt init my_project
```
skilldb get data-pipeline-services-skills/DBTFull skill: 242 lines
Paste into your CLAUDE.md or agent config

dbt (data build tool)

You are an expert in dbt for analytics engineering, skilled at designing modular SQL transformations with proper testing, documentation, and incremental materialization strategies.

Core Philosophy

Models as Select Statements

Every dbt model is a single SELECT statement stored in a .sql file. dbt handles DDL (CREATE, INSERT) automatically based on the materialization strategy. Keep models focused on one logical transformation.

Test-Driven Transformation

Define schema tests (unique, not_null, accepted_values, relationships) on every model. Data quality issues caught early prevent cascading failures downstream.

Layered Architecture

Organize models into staging (1:1 source mirrors), intermediate (business logic joins), and marts (final consumer-facing tables). Each layer has clear naming conventions and ownership.

Setup

Initialize a dbt project and configure your warehouse connection:

pip install dbt-core dbt-bigquery  # or dbt-snowflake, dbt-postgres
dbt init my_project

Configure profiles.yml (typically at ~/.dbt/profiles.yml):

my_project:
  target: dev
  outputs:
    dev:
      type: bigquery
      method: oauth
      project: my-gcp-project
      dataset: dbt_dev_{{ env_var('USER') }}
      threads: 4
      location: US
    prod:
      type: bigquery
      method: service-account
      project: my-gcp-project
      dataset: analytics
      threads: 8
      keyfile: /secrets/sa-key.json

Project structure in dbt_project.yml:

name: my_project
version: '1.0.0'
profile: my_project

models:
  my_project:
    staging:
      +materialized: view
      +schema: staging
    marts:
      +materialized: table
      +schema: analytics

Key Patterns

Do: Use staging models as a clean interface to raw sources

-- models/staging/stg_orders.sql
with source as (
    select * from {{ source('raw', 'orders') }}
),

renamed as (
    select
        id as order_id,
        user_id as customer_id,
        cast(created_at as timestamp) as ordered_at,
        cast(amount_cents as numeric) / 100 as order_amount,
        status as order_status
    from source
    where _deleted is false
)

select * from renamed

Do Not: Write complex multi-CTE logic without breaking into separate models

-- BAD: 300-line model with 12 CTEs doing unrelated things
-- GOOD: split into focused intermediate models

-- models/intermediate/int_orders_with_payments.sql
select
    o.order_id,
    o.customer_id,
    o.ordered_at,
    p.total_paid,
    p.payment_method
from {{ ref('stg_orders') }} o
left join {{ ref('int_payments_aggregated') }} p
    on o.order_id = p.order_id

Do: Define tests and documentation in schema YAML

# models/staging/_stg_models.yml
version: 2

models:
  - name: stg_orders
    description: "Cleaned orders from source system, one row per order"
    columns:
      - name: order_id
        description: "Primary key"
        tests:
          - unique
          - not_null
      - name: order_status
        tests:
          - accepted_values:
              values: ['pending', 'completed', 'cancelled', 'refunded']
      - name: customer_id
        tests:
          - relationships:
              to: ref('stg_customers')
              field: customer_id

Common Patterns

Incremental model with merge strategy

-- models/marts/fct_events.sql
{{
  config(
    materialized='incremental',
    unique_key='event_id',
    incremental_strategy='merge',
    on_schema_change='append_new_columns'
  )
}}

select
    event_id,
    user_id,
    event_type,
    event_properties,
    occurred_at
from {{ ref('stg_events') }}

{% if is_incremental() %}
where occurred_at > (select max(occurred_at) from {{ this }})
{% endif %}

Reusable macro for common logic

-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
    round(cast({{ column_name }} as numeric) / 100, 2)
{% endmacro %}

-- Usage in models:
select
    order_id,
    {{ cents_to_dollars('amount_cents') }} as amount_dollars
from {{ source('payments', 'charges') }}

Snapshot for slowly changing dimensions (SCD Type 2)

-- snapshots/snap_customers.sql
{% snapshot snap_customers %}
{{
  config(
    target_schema='snapshots',
    unique_key='customer_id',
    strategy='timestamp',
    updated_at='updated_at',
  )
}}

select * from {{ source('raw', 'customers') }}

{% endsnapshot %}

Custom generic test

-- tests/generic/test_positive_value.sql
{% test positive_value(model, column_name) %}
select {{ column_name }}
from {{ model }}
where {{ column_name }} < 0
{% endtest %}

Source freshness monitoring

# models/staging/_sources.yml
sources:
  - name: raw
    database: production
    freshness:
      warn_after: { count: 12, period: hour }
      error_after: { count: 24, period: hour }
    loaded_at_field: _etl_loaded_at
    tables:
      - name: orders
      - name: customers

Anti-Patterns

  • Using {{ this }} in non-incremental models: The {{ this }} reference only makes sense in incremental models; elsewhere it causes confusing errors or circular references
  • Skipping the staging layer and referencing sources directly in marts: Creates tight coupling to source schemas and duplicates renaming/casting logic across models
  • Hardcoding warehouse-specific SQL instead of using dbt utils/macros: Breaks portability and prevents reuse; use dbt_utils.surrogate_key(), dbt_utils.pivot(), etc.
  • Not running dbt test in CI/CD: Deploying untested models to production leads to silent data quality regressions that downstream consumers discover too late

When to Use

  • Transforming raw ingested data into clean, tested, documented analytical models
  • Building a shared semantic layer with consistent business logic definitions
  • Managing SQL-based transformation dependencies with automatic DAG resolution
  • Implementing slowly changing dimensions with built-in snapshot support
  • Enforcing data contracts and quality checks as part of the deployment pipeline

Install this skill directly: skilldb add data-pipeline-services-skills

Get CLI access →