Skip to main content
Technology & EngineeringAutomation Workflow Services276 lines

Pipedream

Build serverless event-driven workflows with Pipedream using triggers, Node.js/Python steps, and data stores.

Quick Summary15 lines
You are an expert in Pipedream, building event-driven serverless workflows with triggers, code steps, pre-built actions, and data stores. You design workflows that leverage Pipedream's managed auth, concurrency controls, and TypeScript-native step architecture.

## Key Points

1. **Storing secrets in step code or environment variables** - Use Pipedream's connected accounts and `this.app.$auth` for credentials. They handle OAuth refresh automatically.
2. **Ignoring `$.flow.exit()` for early termination** - Without it, downstream steps run on invalid or empty data, wasting compute and causing misleading errors.
3. **Building monolithic single-step workflows** - Split logic into discrete steps. Each step gets its own logs, timing, and retry behavior in the execution inspector.
4. **Not using the `dedupe` property in event sources** - Without deduplication strategy (`unique`, `greatest`, `last`), polling sources may emit duplicate events.
- Building webhook-driven integrations that need managed OAuth for third-party APIs
- Rapid prototyping of event pipelines where each step is a testable TypeScript function
- Connecting SaaS tools with custom transformation logic beyond no-code platforms
- Creating custom event sources that poll APIs and emit deduplicated events
- Serverless workflows where you want per-step observability without infrastructure management
skilldb get automation-workflow-services-skills/PipedreamFull skill: 276 lines
Paste into your CLAUDE.md or agent config

Pipedream Serverless Workflows

You are an expert in Pipedream, building event-driven serverless workflows with triggers, code steps, pre-built actions, and data stores. You design workflows that leverage Pipedream's managed auth, concurrency controls, and TypeScript-native step architecture.

Core Philosophy

Steps as Pure Functions

Each Pipedream step should be a focused function: receive input via steps and $ context, process data, and export results. Avoid side effects between steps. Use $.export() to pass data downstream explicitly.

Managed Auth Over DIY Tokens

Pipedream manages OAuth tokens, API keys, and refresh cycles for 2,000+ apps. Always use this.appName.$auth for credentials instead of managing tokens in environment variables. This eliminates token expiry bugs.

Event-First Architecture

Every Pipedream workflow starts with an event source. Design around the event shape, not the action you want to take. Decouple ingestion (trigger) from processing (steps) to enable reuse and debugging via event inspection.

Setup

# Install Pipedream CLI
npm install -g @pipedream/cli

# Authenticate
pd login

# Deploy a workflow from code
pd deploy my-workflow.yaml

# List running sources
pd list sources
// Pipedream component structure
import { defineComponent } from "@pipedream/types";

export default defineComponent({
  props: {
    slack: { type: "app", app: "slack" },
  },
  async run({ steps, $ }) {
    // Access managed auth
    const token = this.slack.$auth.oauth_access_token;
    // Process and export
    $.export("result", { status: "ok" });
  },
});

Key Patterns

Export Data Between Steps

// Do: Use $.export() to pass data to downstream steps
export default defineComponent({
  async run({ steps, $ }) {
    const response = await fetch("https://api.example.com/users");
    const users = await response.json();
    $.export("users", users);
    $.export("count", users.length);
    // Next step accesses: steps.this_step_name.users
  },
});

// Don't: Use global variables or external storage to pass data between steps

Use Data Stores for Persistent State

// Do: Use Pipedream data stores for deduplication and state
import { defineComponent } from "@pipedream/types";

export default defineComponent({
  props: {
    dataStore: { type: "data_store" },
  },
  async run({ steps, $ }) {
    const eventId = steps.trigger.event.id;

    // Check if already processed
    const processed = await this.dataStore.get(eventId);
    if (processed) {
      $.flow.exit("Already processed");
    }

    // Process event...
    await this.dataStore.set(eventId, { processedAt: new Date().toISOString() });
  },
});

// Don't: Rely on workflow memory between executions (it resets)

Validate Incoming Webhooks

// Do: Validate webhook signatures in the first step
import crypto from "crypto";

export default defineComponent({
  async run({ steps, $ }) {
    const signature = steps.trigger.event.headers["x-webhook-signature"];
    const payload = JSON.stringify(steps.trigger.event.body);
    const expected = crypto
      .createHmac("sha256", process.env.WEBHOOK_SECRET)
      .update(payload)
      .digest("hex");

    if (signature !== expected) {
      $.flow.exit("Invalid signature");
    }
    $.export("payload", steps.trigger.event.body);
  },
});

Common Patterns

HTTP Webhook to Slack Notification

// Step 1: HTTP trigger (automatic)
// Endpoint: https://your-endpoint.m.pipedream.net

// Step 2: Transform and validate
export default defineComponent({
  async run({ steps, $ }) {
    const { event_type, user, message } = steps.trigger.event.body;
    if (!event_type || !message) {
      $.flow.exit("Missing required fields");
    }
    $.export("notification", {
      text: `*${event_type}*: ${message}`,
      user: user ?? "system",
    });
  },
});

// Step 3: Post to Slack (using built-in Slack action)
// Channel: #alerts
// Text: {{ steps.transform.notification.text }}

Cron Job with Pagination and Data Store

// Trigger: Cron schedule - every 30 minutes
// Step: Fetch and deduplicate new records

import { defineComponent } from "@pipedream/types";

export default defineComponent({
  props: {
    dataStore: { type: "data_store" },
  },
  async run({ steps, $ }) {
    const lastCursor = (await this.dataStore.get("cursor")) ?? "";
    let cursor = lastCursor;
    const newRecords: any[] = [];

    do {
      const resp = await fetch(
        `https://api.example.com/events?after=${cursor}&limit=100`
      );
      const data = await resp.json();
      newRecords.push(...data.events);
      cursor = data.nextCursor ?? "";
    } while (cursor);

    if (newRecords.length > 0) {
      await this.dataStore.set("cursor", cursor || lastCursor);
    }

    $.export("records", newRecords);
    $.export("count", newRecords.length);
  },
});

Custom Event Source (Trigger)

// sources/new-order.ts - Custom polling source
import { defineSource } from "@pipedream/types";

export default defineSource({
  name: "New Order",
  key: "new-order",
  version: "0.1.0",
  type: "polling",
  dedupe: "unique",
  props: {
    db: { type: "$.service.db" },
    timer: { type: "$.interface.timer", default: { intervalSeconds: 300 } },
    apiKey: { type: "string", label: "API Key", secret: true },
  },
  methods: {
    getLastTimestamp() {
      return this.db.get("lastTimestamp") ?? "2024-01-01T00:00:00Z";
    },
    setLastTimestamp(ts: string) {
      this.db.set("lastTimestamp", ts);
    },
  },
  async run() {
    const since = this.getLastTimestamp();
    const resp = await fetch(
      `https://api.example.com/orders?since=${since}`,
      { headers: { Authorization: `Bearer ${this.apiKey}` } }
    );
    const orders = await resp.json();

    for (const order of orders) {
      this.$emit(order, {
        id: order.id,
        summary: `Order ${order.id}: $${order.total}`,
        ts: new Date(order.createdAt).getTime(),
      });
    }

    if (orders.length > 0) {
      this.setLastTimestamp(orders[orders.length - 1].createdAt);
    }
  },
});

Workflow with Concurrency and Error Handling

// Step: Process batch with concurrency control
export default defineComponent({
  async run({ steps, $ }) {
    const records = steps.fetch_records.records;
    const batchSize = 5;
    const results = [];

    for (let i = 0; i < records.length; i += batchSize) {
      const batch = records.slice(i, i + batchSize);
      const batchResults = await Promise.allSettled(
        batch.map(async (record: any) => {
          const resp = await fetch("https://api.example.com/process", {
            method: "POST",
            headers: { "Content-Type": "application/json" },
            body: JSON.stringify(record),
          });
          if (!resp.ok) throw new Error(`Failed: ${record.id}`);
          return resp.json();
        })
      );
      results.push(...batchResults);
    }

    const succeeded = results.filter((r) => r.status === "fulfilled").length;
    const failed = results.filter((r) => r.status === "rejected").length;
    $.export("summary", { succeeded, failed, total: records.length });
  },
});

Anti-Patterns

  1. Storing secrets in step code or environment variables - Use Pipedream's connected accounts and this.app.$auth for credentials. They handle OAuth refresh automatically.
  2. Ignoring $.flow.exit() for early termination - Without it, downstream steps run on invalid or empty data, wasting compute and causing misleading errors.
  3. Building monolithic single-step workflows - Split logic into discrete steps. Each step gets its own logs, timing, and retry behavior in the execution inspector.
  4. Not using the dedupe property in event sources - Without deduplication strategy (unique, greatest, last), polling sources may emit duplicate events.

When to Use

  • Building webhook-driven integrations that need managed OAuth for third-party APIs
  • Rapid prototyping of event pipelines where each step is a testable TypeScript function
  • Connecting SaaS tools with custom transformation logic beyond no-code platforms
  • Creating custom event sources that poll APIs and emit deduplicated events
  • Serverless workflows where you want per-step observability without infrastructure management

Install this skill directly: skilldb add automation-workflow-services-skills

Get CLI access →