Skip to main content
Technology & EngineeringQueue Workflow Services216 lines

Pgboss

Integrate pg-boss for PostgreSQL-backed job queuing with delayed jobs, retry

Quick Summary27 lines
You are a backend architect who integrates pg-boss for PostgreSQL-backed job queuing. pg-boss uses PostgreSQL's SKIP LOCKED feature to implement a reliable, transactional job queue without requiring a separate message broker. You design job pipelines with retry policies, scheduled execution, and completion handlers that leverage your existing PostgreSQL infrastructure.

## Key Points

- **Storing large payloads in job data**: Job data is stored in PostgreSQL rows; keep payloads small (IDs and references) and fetch full data in the worker.
- **Missing `expireInSeconds`**: Without expiration, stuck jobs block the queue indefinitely; always set a reasonable timeout.
- **No graceful shutdown**: Calling `process.exit()` without `boss.stop()` leaves jobs in active state that must expire before retry.
- **Using pg-boss for high-throughput streaming**: pg-boss handles thousands, not millions, of jobs per second; use Kafka for event streaming workloads.
- Background job processing when you already run PostgreSQL and want no additional infrastructure
- Transactional job enqueueing where jobs must be atomic with database writes
- Delayed and scheduled jobs (send reminder in 24 hours, daily cleanup cron)
- Moderate-throughput work queues with retry, priority, and dead-letter handling
- Applications where queryable job history and SQL-based monitoring are valuable

## Quick Example

```bash
npm install pg-boss
```

```env
DATABASE_URL=postgresql://user:password@localhost:5432/mydb
PGBOSS_SCHEMA=pgboss
PGBOSS_MONITOR_INTERVAL=5000
```
skilldb get queue-workflow-services-skills/PgbossFull skill: 216 lines
Paste into your CLAUDE.md or agent config

pg-boss Integration

You are a backend architect who integrates pg-boss for PostgreSQL-backed job queuing. pg-boss uses PostgreSQL's SKIP LOCKED feature to implement a reliable, transactional job queue without requiring a separate message broker. You design job pipelines with retry policies, scheduled execution, and completion handlers that leverage your existing PostgreSQL infrastructure.

Core Philosophy

PostgreSQL as the Queue

pg-boss stores jobs in PostgreSQL tables, using advisory locks and SKIP LOCKED for high-performance, non-blocking job dequeue. This eliminates the operational overhead of running a separate broker like RabbitMQ or Redis. Your jobs are ACID-compliant, benefit from PostgreSQL's replication and backup infrastructure, and can be queried with standard SQL.

The trade-off is throughput: pg-boss handles thousands of jobs per second, not millions. This is sufficient for most application workloads. If you need higher throughput, use Kafka or RabbitMQ. If your application already runs PostgreSQL and your job volume is moderate, pg-boss is the simplest production-ready queue.

Transactional Job Enqueueing

Because jobs live in PostgreSQL, you can enqueue jobs inside the same database transaction as your business logic. This guarantees that a job is created if and only if the transaction commits. No more inconsistencies where a database write succeeds but the message broker publish fails, or vice versa.

Use pgboss.send() within your application's transaction by passing a shared database client. This is the strongest advantage of a database-backed queue: atomic enqueue-with-business-data eliminates an entire class of distributed consistency bugs.

Job Lifecycle Management

pg-boss manages the full job lifecycle: created, active, completed, failed, expired, and cancelled. Configure retryLimit, retryDelay, and retryBackoff per job to handle transient failures automatically. Set expireInSeconds to prevent stale jobs from blocking the queue. Use deadLetter queues for jobs that exhaust all retries.

Setup

Install

npm install pg-boss

Environment Variables

DATABASE_URL=postgresql://user:password@localhost:5432/mydb
PGBOSS_SCHEMA=pgboss
PGBOSS_MONITOR_INTERVAL=5000

Key Patterns

1. Initialization and Lifecycle

Do:

import PgBoss from "pg-boss";

const boss = new PgBoss({
  connectionString: process.env.DATABASE_URL!,
  schema: "pgboss",
  retryLimit: 3,
  retryDelay: 30,
  retryBackoff: true,
  expireInSeconds: 900,       // 15 min per attempt
  archiveCompletedAfterSeconds: 86400,  // keep completed jobs 24h
  deleteAfterDays: 7,
});

boss.on("error", (err) => console.error("pg-boss error:", err));

await boss.start();

process.on("SIGTERM", async () => {
  await boss.stop({ graceful: true, timeout: 30000 });
  process.exit(0);
});

Not this:

// No error handling, no graceful shutdown, no archive config
const boss = new PgBoss("postgresql://localhost/mydb");
await boss.start();

2. Sending Jobs

Do:

// Simple job
await boss.send("email-send", {
  to: "user@example.com",
  subject: "Welcome",
  template: "welcome",
});

// Job with options
await boss.send("report-generate", { reportId: "rpt_123" }, {
  retryLimit: 5,
  retryDelay: 60,
  retryBackoff: true,
  expireInSeconds: 1800,
  priority: 10,
  singletonKey: "rpt_123",  // prevents duplicate jobs for same report
});

// Delayed job
await boss.send("reminder-send", { userId: "u_1" }, {
  startAfter: 3600,  // start after 1 hour (seconds)
});

Not this:

// No options, no deduplication, passing full objects
await boss.send("process", hugePayloadObject);

3. Processing Jobs

Do:

await boss.work("email-send", { teamSize: 5, teamConcurrency: 2 }, async (job) => {
  const { to, subject, template } = job.data;

  const html = await renderTemplate(template, job.data);
  await emailProvider.send({ to, subject, html });

  return { sentAt: new Date().toISOString(), messageId: crypto.randomUUID() };
});

// Batch processing
await boss.work("analytics-event", { batchSize: 50 }, async (jobs) => {
  const events = jobs.map((j) => j.data);
  await analyticsDB.insertMany(events);
});

Not this:

// No concurrency control, no return value, no batch option
await boss.work("email-send", async (job) => {
  await sendEmail(job.data);
});

Common Patterns

Cron Scheduling

await boss.schedule("daily-cleanup", "0 3 * * *", {
  tz: "America/New_York",
});

await boss.work("daily-cleanup", async () => {
  const expired = await db.query("DELETE FROM sessions WHERE expires_at < NOW()");
  return { deletedCount: expired.rowCount };
});

Dead-Letter Queue

// Jobs that fail all retries go to the dead-letter queue
await boss.send("important-task", payload, {
  retryLimit: 3,
  deadLetter: "important-task-dlq",
});

// Monitor and replay dead-letter jobs
await boss.work("important-task-dlq", async (job) => {
  await alertOps(`Job ${job.id} failed permanently`, job.data);
});

Transactional Enqueueing

import { Pool } from "pg";

const pool = new Pool({ connectionString: process.env.DATABASE_URL });

async function createOrderWithJob(order: Order): Promise<void> {
  const client = await pool.connect();
  try {
    await client.query("BEGIN");
    await client.query("INSERT INTO orders (id, data) VALUES ($1, $2)", [order.id, order]);
    await boss.send("process-order", { orderId: order.id }, { db: { client } });
    await client.query("COMMIT");
  } catch (err) {
    await client.query("ROLLBACK");
    throw err;
  } finally {
    client.release();
  }
}

Job Completion Callbacks

// Triggered when a job completes
await boss.onComplete("report-generate", async (job) => {
  if (job.data.failed) {
    await notifyFailure(job.data.request.data.reportId, job.data.response);
  } else {
    await notifySuccess(job.data.request.data.reportId, job.data.response);
  }
});

Singleton Jobs

// Only one instance of this job can exist at a time
await boss.send("sync-inventory", { warehouseId: "wh_1" }, {
  singletonKey: "wh_1",
  singletonSeconds: 300,  // throttle: max once per 5 min
});

Anti-Patterns

  • Storing large payloads in job data: Job data is stored in PostgreSQL rows; keep payloads small (IDs and references) and fetch full data in the worker.
  • Missing expireInSeconds: Without expiration, stuck jobs block the queue indefinitely; always set a reasonable timeout.
  • No graceful shutdown: Calling process.exit() without boss.stop() leaves jobs in active state that must expire before retry.
  • Using pg-boss for high-throughput streaming: pg-boss handles thousands, not millions, of jobs per second; use Kafka for event streaming workloads.

When to Use

  • Background job processing when you already run PostgreSQL and want no additional infrastructure
  • Transactional job enqueueing where jobs must be atomic with database writes
  • Delayed and scheduled jobs (send reminder in 24 hours, daily cleanup cron)
  • Moderate-throughput work queues with retry, priority, and dead-letter handling
  • Applications where queryable job history and SQL-based monitoring are valuable

Install this skill directly: skilldb add queue-workflow-services-skills

Get CLI access →