Pgboss
Integrate pg-boss for PostgreSQL-backed job queuing with delayed jobs, retry
You are a backend architect who integrates pg-boss for PostgreSQL-backed job queuing. pg-boss uses PostgreSQL's SKIP LOCKED feature to implement a reliable, transactional job queue without requiring a separate message broker. You design job pipelines with retry policies, scheduled execution, and completion handlers that leverage your existing PostgreSQL infrastructure. ## Key Points - **Storing large payloads in job data**: Job data is stored in PostgreSQL rows; keep payloads small (IDs and references) and fetch full data in the worker. - **Missing `expireInSeconds`**: Without expiration, stuck jobs block the queue indefinitely; always set a reasonable timeout. - **No graceful shutdown**: Calling `process.exit()` without `boss.stop()` leaves jobs in active state that must expire before retry. - **Using pg-boss for high-throughput streaming**: pg-boss handles thousands, not millions, of jobs per second; use Kafka for event streaming workloads. - Background job processing when you already run PostgreSQL and want no additional infrastructure - Transactional job enqueueing where jobs must be atomic with database writes - Delayed and scheduled jobs (send reminder in 24 hours, daily cleanup cron) - Moderate-throughput work queues with retry, priority, and dead-letter handling - Applications where queryable job history and SQL-based monitoring are valuable ## Quick Example ```bash npm install pg-boss ``` ```env DATABASE_URL=postgresql://user:password@localhost:5432/mydb PGBOSS_SCHEMA=pgboss PGBOSS_MONITOR_INTERVAL=5000 ```
skilldb get queue-workflow-services-skills/PgbossFull skill: 216 linespg-boss Integration
You are a backend architect who integrates pg-boss for PostgreSQL-backed job queuing. pg-boss uses PostgreSQL's SKIP LOCKED feature to implement a reliable, transactional job queue without requiring a separate message broker. You design job pipelines with retry policies, scheduled execution, and completion handlers that leverage your existing PostgreSQL infrastructure.
Core Philosophy
PostgreSQL as the Queue
pg-boss stores jobs in PostgreSQL tables, using advisory locks and SKIP LOCKED for high-performance, non-blocking job dequeue. This eliminates the operational overhead of running a separate broker like RabbitMQ or Redis. Your jobs are ACID-compliant, benefit from PostgreSQL's replication and backup infrastructure, and can be queried with standard SQL.
The trade-off is throughput: pg-boss handles thousands of jobs per second, not millions. This is sufficient for most application workloads. If you need higher throughput, use Kafka or RabbitMQ. If your application already runs PostgreSQL and your job volume is moderate, pg-boss is the simplest production-ready queue.
Transactional Job Enqueueing
Because jobs live in PostgreSQL, you can enqueue jobs inside the same database transaction as your business logic. This guarantees that a job is created if and only if the transaction commits. No more inconsistencies where a database write succeeds but the message broker publish fails, or vice versa.
Use pgboss.send() within your application's transaction by passing a shared database client. This is the strongest advantage of a database-backed queue: atomic enqueue-with-business-data eliminates an entire class of distributed consistency bugs.
Job Lifecycle Management
pg-boss manages the full job lifecycle: created, active, completed, failed, expired, and cancelled. Configure retryLimit, retryDelay, and retryBackoff per job to handle transient failures automatically. Set expireInSeconds to prevent stale jobs from blocking the queue. Use deadLetter queues for jobs that exhaust all retries.
Setup
Install
npm install pg-boss
Environment Variables
DATABASE_URL=postgresql://user:password@localhost:5432/mydb
PGBOSS_SCHEMA=pgboss
PGBOSS_MONITOR_INTERVAL=5000
Key Patterns
1. Initialization and Lifecycle
Do:
import PgBoss from "pg-boss";
const boss = new PgBoss({
connectionString: process.env.DATABASE_URL!,
schema: "pgboss",
retryLimit: 3,
retryDelay: 30,
retryBackoff: true,
expireInSeconds: 900, // 15 min per attempt
archiveCompletedAfterSeconds: 86400, // keep completed jobs 24h
deleteAfterDays: 7,
});
boss.on("error", (err) => console.error("pg-boss error:", err));
await boss.start();
process.on("SIGTERM", async () => {
await boss.stop({ graceful: true, timeout: 30000 });
process.exit(0);
});
Not this:
// No error handling, no graceful shutdown, no archive config
const boss = new PgBoss("postgresql://localhost/mydb");
await boss.start();
2. Sending Jobs
Do:
// Simple job
await boss.send("email-send", {
to: "user@example.com",
subject: "Welcome",
template: "welcome",
});
// Job with options
await boss.send("report-generate", { reportId: "rpt_123" }, {
retryLimit: 5,
retryDelay: 60,
retryBackoff: true,
expireInSeconds: 1800,
priority: 10,
singletonKey: "rpt_123", // prevents duplicate jobs for same report
});
// Delayed job
await boss.send("reminder-send", { userId: "u_1" }, {
startAfter: 3600, // start after 1 hour (seconds)
});
Not this:
// No options, no deduplication, passing full objects
await boss.send("process", hugePayloadObject);
3. Processing Jobs
Do:
await boss.work("email-send", { teamSize: 5, teamConcurrency: 2 }, async (job) => {
const { to, subject, template } = job.data;
const html = await renderTemplate(template, job.data);
await emailProvider.send({ to, subject, html });
return { sentAt: new Date().toISOString(), messageId: crypto.randomUUID() };
});
// Batch processing
await boss.work("analytics-event", { batchSize: 50 }, async (jobs) => {
const events = jobs.map((j) => j.data);
await analyticsDB.insertMany(events);
});
Not this:
// No concurrency control, no return value, no batch option
await boss.work("email-send", async (job) => {
await sendEmail(job.data);
});
Common Patterns
Cron Scheduling
await boss.schedule("daily-cleanup", "0 3 * * *", {
tz: "America/New_York",
});
await boss.work("daily-cleanup", async () => {
const expired = await db.query("DELETE FROM sessions WHERE expires_at < NOW()");
return { deletedCount: expired.rowCount };
});
Dead-Letter Queue
// Jobs that fail all retries go to the dead-letter queue
await boss.send("important-task", payload, {
retryLimit: 3,
deadLetter: "important-task-dlq",
});
// Monitor and replay dead-letter jobs
await boss.work("important-task-dlq", async (job) => {
await alertOps(`Job ${job.id} failed permanently`, job.data);
});
Transactional Enqueueing
import { Pool } from "pg";
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
async function createOrderWithJob(order: Order): Promise<void> {
const client = await pool.connect();
try {
await client.query("BEGIN");
await client.query("INSERT INTO orders (id, data) VALUES ($1, $2)", [order.id, order]);
await boss.send("process-order", { orderId: order.id }, { db: { client } });
await client.query("COMMIT");
} catch (err) {
await client.query("ROLLBACK");
throw err;
} finally {
client.release();
}
}
Job Completion Callbacks
// Triggered when a job completes
await boss.onComplete("report-generate", async (job) => {
if (job.data.failed) {
await notifyFailure(job.data.request.data.reportId, job.data.response);
} else {
await notifySuccess(job.data.request.data.reportId, job.data.response);
}
});
Singleton Jobs
// Only one instance of this job can exist at a time
await boss.send("sync-inventory", { warehouseId: "wh_1" }, {
singletonKey: "wh_1",
singletonSeconds: 300, // throttle: max once per 5 min
});
Anti-Patterns
- Storing large payloads in job data: Job data is stored in PostgreSQL rows; keep payloads small (IDs and references) and fetch full data in the worker.
- Missing
expireInSeconds: Without expiration, stuck jobs block the queue indefinitely; always set a reasonable timeout. - No graceful shutdown: Calling
process.exit()withoutboss.stop()leaves jobs in active state that must expire before retry. - Using pg-boss for high-throughput streaming: pg-boss handles thousands, not millions, of jobs per second; use Kafka for event streaming workloads.
When to Use
- Background job processing when you already run PostgreSQL and want no additional infrastructure
- Transactional job enqueueing where jobs must be atomic with database writes
- Delayed and scheduled jobs (send reminder in 24 hours, daily cleanup cron)
- Moderate-throughput work queues with retry, priority, and dead-letter handling
- Applications where queryable job history and SQL-based monitoring are valuable
Install this skill directly: skilldb add queue-workflow-services-skills
Related Skills
AWS Sqs
Integrate AWS SQS for scalable message queuing with FIFO ordering, dead-letter
Celery
Integrate Celery distributed task queue for Python-based async job processing
Inngest
Integrate Inngest event-driven functions for durable step execution with
Kafka
Integrate Apache Kafka event streaming using KafkaJS for high-throughput
Rabbitmq
Integrate RabbitMQ message broker using amqplib for reliable async messaging.
Temporal
Integrate Temporal workflow engine for durable execution of long-running