Skip to main content
Technology & EngineeringCloudflare Workers529 lines

Workers Patterns

Production patterns for Cloudflare Workers including queue consumers, cron triggers, email workers, browser rendering, Hyperdrive database connection pooling, Vectorize vector search, and the analytics engine.

Quick Summary27 lines
You are an expert in advanced Cloudflare Workers production patterns, including queues, scheduled tasks, email handling, browser rendering, database connection pooling with Hyperdrive, vector search with Vectorize, and analytics.

## Key Points

- **Connection pooling**: Reuses database connections across Worker invocations.
- **Query caching**: Caches repeated read queries at the edge.
- **No cold-start penalty**: Connections are pre-established; no TCP/TLS handshake per request.
- **Works with any PostgreSQL-compatible database**: Neon, Supabase, AWS RDS, etc.
- **Queue messages not delivered** — Check that the consumer Worker is deployed and the queue name matches in both producer and consumer configs.
- **Cron not firing** — Verify the cron expression in `wrangler.toml`. Use `wrangler tail` to see if the scheduled handler is invoked.
- **Hyperdrive connection errors** — Ensure the database allows connections from Cloudflare's IP ranges. Check that the connection string is correct.
- **Vectorize query returns no results** — Confirm vectors have been upserted with the same dimensions as the index. Check that the embedding model matches.
- **Analytics Engine writes lost** — Always use `ctx.waitUntil()` for analytics writes to ensure they complete before the Worker terminates.

## Quick Example

```toml
# wrangler.toml — no special config needed
# Configure email routing in the Cloudflare dashboard to point to your Worker
```

```toml
# wrangler.toml
browser = { binding = "BROWSER" }
```
skilldb get cloudflare-workers-skills/Workers PatternsFull skill: 529 lines
Paste into your CLAUDE.md or agent config

Workers Patterns — Cloudflare Workers

You are an expert in advanced Cloudflare Workers production patterns, including queues, scheduled tasks, email handling, browser rendering, database connection pooling with Hyperdrive, vector search with Vectorize, and analytics.

Queues

Cloudflare Queues provide reliable, at-least-once message delivery between Workers.

Setup

# wrangler.toml

# Producer: send messages to the queue
[[queues.producers]]
binding = "MY_QUEUE"
queue = "my-queue"

# Consumer: process messages from the queue
[[queues.consumers]]
queue = "my-queue"
max_batch_size = 10
max_batch_timeout = 30
max_retries = 3
dead_letter_queue = "my-dlq"

Producer — sending messages

export interface Env {
  MY_QUEUE: Queue;
}

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const body = await request.json();

    // Send a single message
    await env.MY_QUEUE.send({
      type: "process-image",
      imageId: body.imageId,
      userId: body.userId,
    });

    // Send a batch of messages
    await env.MY_QUEUE.sendBatch([
      { body: { type: "send-email", to: "alice@example.com" } },
      { body: { type: "send-email", to: "bob@example.com" } },
    ]);

    // Send with a delay
    await env.MY_QUEUE.send(
      { type: "reminder", userId: "123" },
      { delaySeconds: 3600 } // Deliver after 1 hour
    );

    return Response.json({ queued: true });
  },
};

Consumer — processing messages

export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const message of batch.messages) {
      try {
        const { type } = message.body as { type: string };

        switch (type) {
          case "process-image":
            await processImage(message.body, env);
            break;
          case "send-email":
            await sendEmail(message.body, env);
            break;
          default:
            console.error(`Unknown message type: ${type}`);
        }

        // Acknowledge successful processing
        message.ack();
      } catch (err) {
        // Message will be retried (up to max_retries)
        message.retry();
      }
    }
  },
};

Dead letter queue consumer

export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    // These messages failed max_retries times
    for (const message of batch.messages) {
      console.error("Dead letter:", JSON.stringify(message.body));
      // Store in D1 for manual investigation
      await env.DB.prepare(
        "INSERT INTO dead_letters (payload, failed_at) VALUES (?, datetime('now'))"
      ).bind(JSON.stringify(message.body)).run();
      message.ack();
    }
  },
};

Cron Triggers (Scheduled Workers)

Run Workers on a schedule without any HTTP request.

Setup

# wrangler.toml
[triggers]
crons = [
  "0 * * * *",      # Every hour
  "0 0 * * *",      # Every day at midnight
  "*/5 * * * *",    # Every 5 minutes
  "0 9 * * MON",    # Every Monday at 9 AM
]

Handler

export default {
  async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext): Promise<void> {
    const { cron, scheduledTime } = event;

    switch (cron) {
      case "0 * * * *":
        // Hourly health check
        await performHealthCheck(env);
        break;

      case "0 0 * * *":
        // Daily cleanup
        ctx.waitUntil(cleanupExpiredSessions(env));
        ctx.waitUntil(aggregateDailyStats(env));
        break;

      case "*/5 * * * *":
        // Poll external API every 5 minutes
        await syncExternalData(env);
        break;
    }
  },

  // Also handles HTTP requests
  async fetch(request: Request, env: Env): Promise<Response> {
    return new Response("Worker is running");
  },
};

async function cleanupExpiredSessions(env: Env) {
  await env.DB.prepare(
    "DELETE FROM sessions WHERE expires_at < datetime('now')"
  ).run();
}

async function aggregateDailyStats(env: Env) {
  const yesterday = new Date(Date.now() - 86400000).toISOString().split("T")[0];
  await env.DB.prepare(
    `INSERT INTO daily_stats (date, total_requests, unique_users)
     SELECT ?, COUNT(*), COUNT(DISTINCT user_id)
     FROM request_log WHERE date(created_at) = ?`
  ).bind(yesterday, yesterday).run();
}

Email Workers

Process inbound emails directly with a Worker.

Setup

# wrangler.toml — no special config needed
# Configure email routing in the Cloudflare dashboard to point to your Worker

Handler

export default {
  async email(message: EmailMessage, env: Env): Promise<void> {
    const { from, to } = message;
    const subject = message.headers.get("subject") || "(no subject)";

    console.log(`Email from ${from} to ${to}: ${subject}`);

    // Read the raw email body
    const rawEmail = await new Response(message.raw).text();

    // Store in D1
    await env.DB.prepare(
      "INSERT INTO emails (sender, recipient, subject, body, received_at) VALUES (?, ?, ?, ?, datetime('now'))"
    ).bind(from, to, subject, rawEmail).run();

    // Forward to another address
    await message.forward("admin@example.com");

    // Or reply (reject with a bounce message)
    // message.setReject("Mailbox full");
  },
};

Browser Rendering

Render web pages, generate PDFs, and take screenshots using a headless Chromium browser.

Setup

# wrangler.toml
browser = { binding = "BROWSER" }

Take a screenshot

import puppeteer from "@cloudflare/puppeteer";

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url).searchParams.get("url");
    if (!url) return new Response("Missing url param", { status: 400 });

    const browser = await puppeteer.launch(env.BROWSER);
    const page = await browser.newPage();
    await page.setViewport({ width: 1280, height: 720 });
    await page.goto(url, { waitUntil: "networkidle0" });

    const screenshot = await page.screenshot({ type: "png" });
    await browser.close();

    return new Response(screenshot, {
      headers: { "content-type": "image/png" },
    });
  },
};

Generate a PDF

async function generatePDF(env: Env, html: string): Promise<ArrayBuffer> {
  const browser = await puppeteer.launch(env.BROWSER);
  const page = await browser.newPage();

  await page.setContent(html, { waitUntil: "networkidle0" });

  const pdf = await page.pdf({
    format: "A4",
    margin: { top: "1cm", right: "1cm", bottom: "1cm", left: "1cm" },
    printBackground: true,
  });

  await browser.close();
  return pdf;
}

Hyperdrive — Database Connection Pooling

Hyperdrive provides connection pooling and caching for PostgreSQL, MySQL, and other databases, eliminating cold-start connection overhead.

Setup

# Create a Hyperdrive config
npx wrangler hyperdrive create my-database \
  --connection-string="postgres://user:password@host:5432/dbname"
# wrangler.toml
[[hyperdrive]]
binding = "HYPERDRIVE"
id = "xxxx-yyyy-zzzz"

Usage with pg driver

import { Client } from "pg";

export interface Env {
  HYPERDRIVE: Hyperdrive;
}

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    // Hyperdrive provides a connection string with pooling built in
    const client = new Client({ connectionString: env.HYPERDRIVE.connectionString });
    await client.connect();

    try {
      const result = await client.query("SELECT * FROM users WHERE active = $1 LIMIT 50", [true]);
      return Response.json(result.rows);
    } finally {
      await client.end();
    }
  },
};

Hyperdrive benefits

  • Connection pooling: Reuses database connections across Worker invocations.
  • Query caching: Caches repeated read queries at the edge.
  • No cold-start penalty: Connections are pre-established; no TCP/TLS handshake per request.
  • Works with any PostgreSQL-compatible database: Neon, Supabase, AWS RDS, etc.

Vectorize — Vector Search

Vectorize is Cloudflare's native vector database for similarity search, purpose-built for RAG and recommendation systems.

Setup

# Create a Vectorize index
npx wrangler vectorize create my-index --dimensions=768 --metric=cosine
# wrangler.toml
[[vectorize]]
binding = "VECTORIZE"
index_name = "my-index"

Index documents

export interface Env {
  AI: Ai;
  VECTORIZE: VectorizeIndex;
  KV: KVNamespace;
}

async function indexDocument(env: Env, id: string, text: string, metadata: Record<string, string>) {
  // Generate embedding
  const embeddingResult = await env.AI.run("@cf/baai/bge-base-en-v1.5", {
    text: [text],
  });

  // Store the vector
  await env.VECTORIZE.upsert([
    {
      id,
      values: embeddingResult.data[0],
      metadata,
    },
  ]);

  // Store the full text in KV (Vectorize stores metadata, not full documents)
  await env.KV.put(`doc:${id}`, text);
}

Query for similar documents

async function search(env: Env, query: string, topK: number = 10) {
  // Embed the query
  const embeddingResult = await env.AI.run("@cf/baai/bge-base-en-v1.5", {
    text: [query],
  });

  // Search Vectorize
  const results = await env.VECTORIZE.query(embeddingResult.data[0], {
    topK,
    returnMetadata: "all",
  });

  // Retrieve full documents
  const documents = await Promise.all(
    results.matches.map(async (match) => ({
      id: match.id,
      score: match.score,
      metadata: match.metadata,
      text: await env.KV.get(`doc:${match.id}`),
    }))
  );

  return documents;
}

Filtering

const results = await env.VECTORIZE.query(queryVector, {
  topK: 10,
  filter: {
    category: "technology",
    // Supports: $eq, $ne, $in, $nin for strings
    // Supports: $eq, $ne, $lt, $lte, $gt, $gte for numbers
  },
});

Analytics Engine

Write high-cardinality, time-series analytics data without worrying about scale.

Setup

# wrangler.toml
[[analytics_engine_datasets]]
binding = "ANALYTICS"
dataset = "my_analytics"

Write events

export default {
  async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
    const url = new URL(request.url);

    // Write analytics event (non-blocking)
    ctx.waitUntil(
      Promise.resolve(
        env.ANALYTICS.writeDataPoint({
          blobs: [
            request.method,                          // index 0
            url.pathname,                            // index 1
            request.headers.get("user-agent") || "", // index 2
            request.cf?.country || "unknown",         // index 3
          ],
          doubles: [
            Date.now(),  // index 0: timestamp
          ],
          indexes: [
            url.pathname, // Used for fast lookups
          ],
        })
      )
    );

    return handleRequest(request, env);
  },
};

Query with SQL API

Query analytics data via the Cloudflare API using SQL:

curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/analytics_engine/sql" \
  -H "Authorization: Bearer {api_token}" \
  -d "SELECT
        blob1 AS method,
        blob2 AS path,
        blob4 AS country,
        COUNT() AS requests
      FROM my_analytics
      WHERE timestamp > NOW() - INTERVAL '24' HOUR
      GROUP BY method, path, country
      ORDER BY requests DESC
      LIMIT 100"

Combining Patterns — Full Production Example

export default {
  // HTTP requests
  async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
    // Track analytics
    ctx.waitUntil(trackRequest(env, request));

    // Route requests
    const url = new URL(request.url);
    if (url.pathname.startsWith("/api/")) {
      return handleAPI(request, env);
    }
    return new Response("Not Found", { status: 404 });
  },

  // Cron triggers
  async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext): Promise<void> {
    ctx.waitUntil(cleanupOldData(env));
  },

  // Queue consumers
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const msg of batch.messages) {
      try {
        await processMessage(msg.body, env);
        msg.ack();
      } catch {
        msg.retry();
      }
    }
  },

  // Email handler
  async email(message: EmailMessage, env: Env): Promise<void> {
    await storeEmail(message, env);
  },
};

Troubleshooting

  • Queue messages not delivered — Check that the consumer Worker is deployed and the queue name matches in both producer and consumer configs.
  • Cron not firing — Verify the cron expression in wrangler.toml. Use wrangler tail to see if the scheduled handler is invoked.
  • Hyperdrive connection errors — Ensure the database allows connections from Cloudflare's IP ranges. Check that the connection string is correct.
  • Vectorize query returns no results — Confirm vectors have been upserted with the same dimensions as the index. Check that the embedding model matches.
  • Analytics Engine writes lost — Always use ctx.waitUntil() for analytics writes to ensure they complete before the Worker terminates.

Install this skill directly: skilldb add cloudflare-workers-skills

Get CLI access →