Skip to main content
Technology & EngineeringData Pipeline Services235 lines

Snowflake

Build and optimize data pipelines on Snowflake using SQL, stages, streams, tasks, and Snowpipe.

Quick Summary15 lines
You are an expert in Snowflake data warehouse engineering, skilled at designing efficient SQL transformations, continuous data pipelines with streams and tasks, and cost-optimized warehouse configurations.

## Key Points

- **Running large ETL on an XS warehouse to save credits**: Larger warehouses process faster and often use the same total credits due to shorter run time; benchmark before defaulting to small sizes
- **Not setting AUTO_SUSPEND on warehouses**: Idle warehouses burn credits continuously; 60-120 second auto-suspend is appropriate for most interactive and ETL workloads
- **Using `SELECT *` instead of specifying columns on wide tables**: Snowflake is columnar; selecting only needed columns dramatically reduces scan volume and cost
- **Creating streams without tasks to consume them**: Unconsumed streams hold back Snowflake's internal garbage collection, increasing storage costs over time
- Building a cloud data warehouse with elastic, pay-per-query compute and zero infrastructure management
- Ingesting and querying semi-structured data (JSON, Avro, Parquet) alongside relational tables
- Implementing continuous ELT pipelines with change data capture via streams and tasks
- Sharing live data across organizations using Snowflake data shares without copying
- Running concurrent analytics workloads with isolated warehouses to prevent resource contention
skilldb get data-pipeline-services-skills/SnowflakeFull skill: 235 lines
Paste into your CLAUDE.md or agent config

Snowflake

You are an expert in Snowflake data warehouse engineering, skilled at designing efficient SQL transformations, continuous data pipelines with streams and tasks, and cost-optimized warehouse configurations.

Core Philosophy

Separation of Storage and Compute

Snowflake decouples storage from compute. Data is stored once in cloud object storage; multiple virtual warehouses can query it independently. Size warehouses for the workload, not the data volume.

Zero-Copy Cloning and Time Travel

Cloning databases, schemas, or tables is instantaneous and free (no data duplication). Use clones for development environments and Time Travel for recovering from mistakes.

Pay Per Query, Not Per Cluster

Virtual warehouses auto-suspend and auto-resume. Design workloads around warehouse sizing (XS to 6XL) and concurrency, not around keeping clusters warm.

Setup

Configure Snowflake connection and basic objects:

-- Create warehouse with auto-suspend
CREATE WAREHOUSE IF NOT EXISTS etl_wh
  WAREHOUSE_SIZE = 'MEDIUM'
  AUTO_SUSPEND = 120
  AUTO_RESUME = TRUE
  MIN_CLUSTER_COUNT = 1
  MAX_CLUSTER_COUNT = 3
  SCALING_POLICY = 'STANDARD'
  INITIALLY_SUSPENDED = TRUE;

-- Create database and schemas
CREATE DATABASE IF NOT EXISTS analytics;
CREATE SCHEMA IF NOT EXISTS analytics.raw;
CREATE SCHEMA IF NOT EXISTS analytics.staging;
CREATE SCHEMA IF NOT EXISTS analytics.marts;

Python connector setup:

import snowflake.connector

conn = snowflake.connector.connect(
    account="myorg-myaccount",
    user="ETL_USER",
    password=os.environ["SNOWFLAKE_PASSWORD"],
    warehouse="ETL_WH",
    database="ANALYTICS",
    schema="RAW",
    role="ETL_ROLE",
)

cursor = conn.cursor()
cursor.execute("SELECT CURRENT_VERSION()")
print(cursor.fetchone()[0])

Key Patterns

Do: Use COPY INTO with stages for bulk loading

-- Create external stage pointing to S3
CREATE OR REPLACE STAGE raw.s3_stage
  URL = 's3://data-lake/incoming/'
  CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
  FILE_FORMAT = (TYPE = 'PARQUET');

-- Bulk load with COPY INTO
COPY INTO raw.events
FROM @raw.s3_stage/events/
  FILE_FORMAT = (TYPE = 'PARQUET')
  MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
  ON_ERROR = 'CONTINUE'
  PURGE = FALSE;

Do Not: Use INSERT INTO ... SELECT for large data movement without clustering

-- BAD - no clustering, scans entire source
INSERT INTO marts.daily_revenue
SELECT date, SUM(amount) FROM raw.orders GROUP BY date;

-- GOOD - use CTAS or cluster the target table
CREATE OR REPLACE TABLE marts.daily_revenue
  CLUSTER BY (date)
AS
SELECT
    date_trunc('day', order_timestamp) AS date,
    SUM(amount) AS total_revenue,
    COUNT(DISTINCT customer_id) AS unique_customers
FROM staging.orders
GROUP BY 1;

Do: Use streams and tasks for continuous ELT

-- Create a stream to track changes on raw table
CREATE OR REPLACE STREAM raw.orders_stream
  ON TABLE raw.orders
  APPEND_ONLY = FALSE;

-- Create a task to process stream data every 5 minutes
CREATE OR REPLACE TASK staging.process_orders
  WAREHOUSE = etl_wh
  SCHEDULE = '5 MINUTE'
  WHEN SYSTEM$STREAM_HAS_DATA('raw.orders_stream')
AS
MERGE INTO staging.orders AS target
USING (
    SELECT
        order_id,
        customer_id,
        amount,
        status,
        METADATA$ACTION AS action,
        METADATA$ISUPDATE AS is_update
    FROM raw.orders_stream
) AS source
ON target.order_id = source.order_id
WHEN MATCHED AND source.action = 'INSERT' AND source.is_update THEN
    UPDATE SET
        status = source.status,
        amount = source.amount,
        updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED AND source.action = 'INSERT' THEN
    INSERT (order_id, customer_id, amount, status, created_at)
    VALUES (source.order_id, source.customer_id, source.amount, source.status, CURRENT_TIMESTAMP());

ALTER TASK staging.process_orders RESUME;

Common Patterns

Snowpipe for continuous auto-ingestion

-- Create pipe for automatic S3 ingestion via SQS notifications
CREATE OR REPLACE PIPE raw.events_pipe
  AUTO_INGEST = TRUE
AS
COPY INTO raw.events
FROM @raw.s3_stage/events/
  FILE_FORMAT = (TYPE = 'JSON')
  MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;

-- Check pipe status
SELECT SYSTEM$PIPE_STATUS('raw.events_pipe');

Query semi-structured JSON data

-- Load JSON into VARIANT column
CREATE TABLE raw.api_responses (
    loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
    payload VARIANT
);

-- Query nested JSON with dot notation and FLATTEN
SELECT
    payload:user_id::INT AS user_id,
    payload:event_type::STRING AS event_type,
    payload:timestamp::TIMESTAMP AS event_ts,
    f.value:item_id::STRING AS item_id,
    f.value:quantity::INT AS quantity
FROM raw.api_responses,
    LATERAL FLATTEN(input => payload:items) f
WHERE payload:event_type::STRING = 'purchase';

Dynamic data masking for PII

CREATE OR REPLACE MASKING POLICY pii_email_mask AS
  (val STRING) RETURNS STRING ->
    CASE
      WHEN CURRENT_ROLE() IN ('ANALYST_FULL_ACCESS') THEN val
      ELSE REGEXP_REPLACE(val, '.+@', '***@')
    END;

ALTER TABLE staging.customers
  MODIFY COLUMN email SET MASKING POLICY pii_email_mask;

Cost monitoring queries

-- Credit usage by warehouse over last 30 days
SELECT
    warehouse_name,
    SUM(credits_used) AS total_credits,
    SUM(credits_used) * 3.00 AS estimated_cost_usd
FROM snowflake.account_usage.warehouse_metering_history
WHERE start_time >= DATEADD('day', -30, CURRENT_TIMESTAMP())
GROUP BY warehouse_name
ORDER BY total_credits DESC;

-- Expensive queries in the last 24 hours
SELECT
    query_id,
    user_name,
    warehouse_name,
    total_elapsed_time / 1000 AS duration_sec,
    bytes_scanned / POWER(1024, 3) AS gb_scanned
FROM snowflake.account_usage.query_history
WHERE start_time >= DATEADD('hour', -24, CURRENT_TIMESTAMP())
ORDER BY total_elapsed_time DESC
LIMIT 20;

Anti-Patterns

  • Running large ETL on an XS warehouse to save credits: Larger warehouses process faster and often use the same total credits due to shorter run time; benchmark before defaulting to small sizes
  • Not setting AUTO_SUSPEND on warehouses: Idle warehouses burn credits continuously; 60-120 second auto-suspend is appropriate for most interactive and ETL workloads
  • Using SELECT * instead of specifying columns on wide tables: Snowflake is columnar; selecting only needed columns dramatically reduces scan volume and cost
  • Creating streams without tasks to consume them: Unconsumed streams hold back Snowflake's internal garbage collection, increasing storage costs over time

When to Use

  • Building a cloud data warehouse with elastic, pay-per-query compute and zero infrastructure management
  • Ingesting and querying semi-structured data (JSON, Avro, Parquet) alongside relational tables
  • Implementing continuous ELT pipelines with change data capture via streams and tasks
  • Sharing live data across organizations using Snowflake data shares without copying
  • Running concurrent analytics workloads with isolated warehouses to prevent resource contention

Install this skill directly: skilldb add data-pipeline-services-skills

Get CLI access →