Snowflake
Build and optimize data pipelines on Snowflake using SQL, stages, streams, tasks, and Snowpipe.
You are an expert in Snowflake data warehouse engineering, skilled at designing efficient SQL transformations, continuous data pipelines with streams and tasks, and cost-optimized warehouse configurations. ## Key Points - **Running large ETL on an XS warehouse to save credits**: Larger warehouses process faster and often use the same total credits due to shorter run time; benchmark before defaulting to small sizes - **Not setting AUTO_SUSPEND on warehouses**: Idle warehouses burn credits continuously; 60-120 second auto-suspend is appropriate for most interactive and ETL workloads - **Using `SELECT *` instead of specifying columns on wide tables**: Snowflake is columnar; selecting only needed columns dramatically reduces scan volume and cost - **Creating streams without tasks to consume them**: Unconsumed streams hold back Snowflake's internal garbage collection, increasing storage costs over time - Building a cloud data warehouse with elastic, pay-per-query compute and zero infrastructure management - Ingesting and querying semi-structured data (JSON, Avro, Parquet) alongside relational tables - Implementing continuous ELT pipelines with change data capture via streams and tasks - Sharing live data across organizations using Snowflake data shares without copying - Running concurrent analytics workloads with isolated warehouses to prevent resource contention
skilldb get data-pipeline-services-skills/SnowflakeFull skill: 235 linesSnowflake
You are an expert in Snowflake data warehouse engineering, skilled at designing efficient SQL transformations, continuous data pipelines with streams and tasks, and cost-optimized warehouse configurations.
Core Philosophy
Separation of Storage and Compute
Snowflake decouples storage from compute. Data is stored once in cloud object storage; multiple virtual warehouses can query it independently. Size warehouses for the workload, not the data volume.
Zero-Copy Cloning and Time Travel
Cloning databases, schemas, or tables is instantaneous and free (no data duplication). Use clones for development environments and Time Travel for recovering from mistakes.
Pay Per Query, Not Per Cluster
Virtual warehouses auto-suspend and auto-resume. Design workloads around warehouse sizing (XS to 6XL) and concurrency, not around keeping clusters warm.
Setup
Configure Snowflake connection and basic objects:
-- Create warehouse with auto-suspend
CREATE WAREHOUSE IF NOT EXISTS etl_wh
WAREHOUSE_SIZE = 'MEDIUM'
AUTO_SUSPEND = 120
AUTO_RESUME = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 3
SCALING_POLICY = 'STANDARD'
INITIALLY_SUSPENDED = TRUE;
-- Create database and schemas
CREATE DATABASE IF NOT EXISTS analytics;
CREATE SCHEMA IF NOT EXISTS analytics.raw;
CREATE SCHEMA IF NOT EXISTS analytics.staging;
CREATE SCHEMA IF NOT EXISTS analytics.marts;
Python connector setup:
import snowflake.connector
conn = snowflake.connector.connect(
account="myorg-myaccount",
user="ETL_USER",
password=os.environ["SNOWFLAKE_PASSWORD"],
warehouse="ETL_WH",
database="ANALYTICS",
schema="RAW",
role="ETL_ROLE",
)
cursor = conn.cursor()
cursor.execute("SELECT CURRENT_VERSION()")
print(cursor.fetchone()[0])
Key Patterns
Do: Use COPY INTO with stages for bulk loading
-- Create external stage pointing to S3
CREATE OR REPLACE STAGE raw.s3_stage
URL = 's3://data-lake/incoming/'
CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
FILE_FORMAT = (TYPE = 'PARQUET');
-- Bulk load with COPY INTO
COPY INTO raw.events
FROM @raw.s3_stage/events/
FILE_FORMAT = (TYPE = 'PARQUET')
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
ON_ERROR = 'CONTINUE'
PURGE = FALSE;
Do Not: Use INSERT INTO ... SELECT for large data movement without clustering
-- BAD - no clustering, scans entire source
INSERT INTO marts.daily_revenue
SELECT date, SUM(amount) FROM raw.orders GROUP BY date;
-- GOOD - use CTAS or cluster the target table
CREATE OR REPLACE TABLE marts.daily_revenue
CLUSTER BY (date)
AS
SELECT
date_trunc('day', order_timestamp) AS date,
SUM(amount) AS total_revenue,
COUNT(DISTINCT customer_id) AS unique_customers
FROM staging.orders
GROUP BY 1;
Do: Use streams and tasks for continuous ELT
-- Create a stream to track changes on raw table
CREATE OR REPLACE STREAM raw.orders_stream
ON TABLE raw.orders
APPEND_ONLY = FALSE;
-- Create a task to process stream data every 5 minutes
CREATE OR REPLACE TASK staging.process_orders
WAREHOUSE = etl_wh
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('raw.orders_stream')
AS
MERGE INTO staging.orders AS target
USING (
SELECT
order_id,
customer_id,
amount,
status,
METADATA$ACTION AS action,
METADATA$ISUPDATE AS is_update
FROM raw.orders_stream
) AS source
ON target.order_id = source.order_id
WHEN MATCHED AND source.action = 'INSERT' AND source.is_update THEN
UPDATE SET
status = source.status,
amount = source.amount,
updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED AND source.action = 'INSERT' THEN
INSERT (order_id, customer_id, amount, status, created_at)
VALUES (source.order_id, source.customer_id, source.amount, source.status, CURRENT_TIMESTAMP());
ALTER TASK staging.process_orders RESUME;
Common Patterns
Snowpipe for continuous auto-ingestion
-- Create pipe for automatic S3 ingestion via SQS notifications
CREATE OR REPLACE PIPE raw.events_pipe
AUTO_INGEST = TRUE
AS
COPY INTO raw.events
FROM @raw.s3_stage/events/
FILE_FORMAT = (TYPE = 'JSON')
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
-- Check pipe status
SELECT SYSTEM$PIPE_STATUS('raw.events_pipe');
Query semi-structured JSON data
-- Load JSON into VARIANT column
CREATE TABLE raw.api_responses (
loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
payload VARIANT
);
-- Query nested JSON with dot notation and FLATTEN
SELECT
payload:user_id::INT AS user_id,
payload:event_type::STRING AS event_type,
payload:timestamp::TIMESTAMP AS event_ts,
f.value:item_id::STRING AS item_id,
f.value:quantity::INT AS quantity
FROM raw.api_responses,
LATERAL FLATTEN(input => payload:items) f
WHERE payload:event_type::STRING = 'purchase';
Dynamic data masking for PII
CREATE OR REPLACE MASKING POLICY pii_email_mask AS
(val STRING) RETURNS STRING ->
CASE
WHEN CURRENT_ROLE() IN ('ANALYST_FULL_ACCESS') THEN val
ELSE REGEXP_REPLACE(val, '.+@', '***@')
END;
ALTER TABLE staging.customers
MODIFY COLUMN email SET MASKING POLICY pii_email_mask;
Cost monitoring queries
-- Credit usage by warehouse over last 30 days
SELECT
warehouse_name,
SUM(credits_used) AS total_credits,
SUM(credits_used) * 3.00 AS estimated_cost_usd
FROM snowflake.account_usage.warehouse_metering_history
WHERE start_time >= DATEADD('day', -30, CURRENT_TIMESTAMP())
GROUP BY warehouse_name
ORDER BY total_credits DESC;
-- Expensive queries in the last 24 hours
SELECT
query_id,
user_name,
warehouse_name,
total_elapsed_time / 1000 AS duration_sec,
bytes_scanned / POWER(1024, 3) AS gb_scanned
FROM snowflake.account_usage.query_history
WHERE start_time >= DATEADD('hour', -24, CURRENT_TIMESTAMP())
ORDER BY total_elapsed_time DESC
LIMIT 20;
Anti-Patterns
- Running large ETL on an XS warehouse to save credits: Larger warehouses process faster and often use the same total credits due to shorter run time; benchmark before defaulting to small sizes
- Not setting AUTO_SUSPEND on warehouses: Idle warehouses burn credits continuously; 60-120 second auto-suspend is appropriate for most interactive and ETL workloads
- Using
SELECT *instead of specifying columns on wide tables: Snowflake is columnar; selecting only needed columns dramatically reduces scan volume and cost - Creating streams without tasks to consume them: Unconsumed streams hold back Snowflake's internal garbage collection, increasing storage costs over time
When to Use
- Building a cloud data warehouse with elastic, pay-per-query compute and zero infrastructure management
- Ingesting and querying semi-structured data (JSON, Avro, Parquet) alongside relational tables
- Implementing continuous ELT pipelines with change data capture via streams and tasks
- Sharing live data across organizations using Snowflake data shares without copying
- Running concurrent analytics workloads with isolated warehouses to prevent resource contention
Install this skill directly: skilldb add data-pipeline-services-skills
Related Skills
Airbyte
Configure Airbyte open-source data integration with custom connectors, destinations, and CDC replication.
Apache Airflow
Orchestrate data pipelines using Apache Airflow DAGs, operators, sensors, and XCom.
Apache Spark
Process large-scale data with Apache Spark using PySpark DataFrames, Spark SQL, and structured streaming.
Bigquery
Build analytical pipelines on Google BigQuery using SQL, streaming inserts, and federated queries.
Clickhouse
Build high-performance OLAP queries on ClickHouse using MergeTree engines, materialized views, and aggregations.
DBT
Build and test data transformation pipelines using dbt models, macros, and incremental strategies.