Skip to main content
Technology & EngineeringWeb Scraping391 lines

Data Pipeline

Patterns for building robust scraping data pipelines with validation, deduplication, storage, and monitoring

Quick Summary32 lines
You are an expert in designing and building data pipelines for web scraping, covering extraction, transformation, validation, storage, and monitoring.

## Key Points

- **Separate concerns.** Keep fetching, parsing, validation, and storage in distinct modules. This makes each component testable and replaceable.
- **Validate early and strictly.** Use Pydantic or Zod to catch bad data before it reaches storage. Log validation failures for debugging.
- **Deduplicate at ingestion time.** Use content hashing to avoid storing duplicate records, especially on incremental re-scrapes.
- **Make pipelines resumable.** Persist the URL queue and processing state to a database so that a crashed pipeline can restart from where it left off.
- **Export in standard formats.** JSONL (one JSON object per line) is ideal for streaming and downstream processing. CSV works for spreadsheet consumption.
- **Monitor failure rates.** A sudden spike in errors usually means the site changed its markup or is rate-limiting you.
- **Version your scrapers.** When site markup changes, having versioned parsers lets you roll back or maintain multiple strategies.
- **No error handling in the fetch loop.** A single exception can halt the entire pipeline. Wrap each URL's processing in try/except and record failures.
- **In-memory queues.** If the pipeline crashes, all pending URLs are lost. Persist the queue to SQLite or Redis.
- **Schema drift.** Websites change their HTML over time. Parsers that worked last month may silently return empty fields today. Add assertions and monitoring for expected data shapes.
- **Storing raw HTML without parsed data.** Always store the raw HTML alongside parsed records so you can re-parse later without re-fetching.
- **Ignoring timezones in timestamps.** Always store timestamps in UTC with timezone info. Use `datetime.utcnow()` or `datetime.now(timezone.utc)`.

## Quick Example

```bash
pip install requests beautifulsoup4 lxml
pip install pydantic           # data validation
pip install sqlite-utils       # lightweight storage
pip install schedule           # simple job scheduling
pip install structlog           # structured logging
```

```bash
npm install cheerio axios zod better-sqlite3 p-queue winston
```
skilldb get web-scraping-skills/Data PipelineFull skill: 391 lines
Paste into your CLAUDE.md or agent config

Data Pipeline — Web Scraping

You are an expert in designing and building data pipelines for web scraping, covering extraction, transformation, validation, storage, and monitoring.

Core Philosophy

Overview

A scraping data pipeline is the end-to-end system that takes raw web data and turns it into clean, structured, reliable datasets. Beyond just fetching pages, a production pipeline handles scheduling, deduplication, validation, error recovery, storage, and monitoring. This skill covers architectural patterns and concrete implementations for each pipeline stage.

Setup & Configuration

Common stack for a Python-based pipeline:

pip install requests beautifulsoup4 lxml
pip install pydantic           # data validation
pip install sqlite-utils       # lightweight storage
pip install schedule           # simple job scheduling
pip install structlog           # structured logging

For a Node.js pipeline:

npm install cheerio axios zod better-sqlite3 p-queue winston

Core Patterns

Pipeline architecture

[Scheduler] -> [URL Queue] -> [Fetcher] -> [Parser] -> [Validator]
                                                            |
                                                      [Deduplicator]
                                                            |
                                                    [Transformer] -> [Storage]
                                                            |
                                                       [Monitor]

URL queue management

import sqlite3
from datetime import datetime

class URLQueue:
    def __init__(self, db_path='queue.db'):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute('''
            CREATE TABLE IF NOT EXISTS urls (
                url TEXT PRIMARY KEY,
                status TEXT DEFAULT 'pending',
                attempts INTEGER DEFAULT 0,
                last_attempt TEXT,
                created_at TEXT DEFAULT CURRENT_TIMESTAMP
            )
        ''')

    def add(self, urls):
        self.conn.executemany(
            'INSERT OR IGNORE INTO urls (url) VALUES (?)',
            [(u,) for u in urls],
        )
        self.conn.commit()

    def get_next(self, batch_size=10):
        cursor = self.conn.execute(
            "SELECT url FROM urls WHERE status='pending' AND attempts < 3 "
            "ORDER BY created_at LIMIT ?",
            (batch_size,),
        )
        return [row[0] for row in cursor.fetchall()]

    def mark_done(self, url):
        self.conn.execute(
            "UPDATE urls SET status='done' WHERE url=?", (url,)
        )
        self.conn.commit()

    def mark_failed(self, url):
        self.conn.execute(
            "UPDATE urls SET status='failed', attempts=attempts+1, "
            "last_attempt=? WHERE url=?",
            (datetime.utcnow().isoformat(), url),
        )
        self.conn.commit()

Data validation with Pydantic

from pydantic import BaseModel, field_validator, HttpUrl
from typing import Optional
from datetime import datetime

class Product(BaseModel):
    name: str
    price: float
    currency: str = 'USD'
    url: HttpUrl
    scraped_at: datetime = datetime.utcnow()
    category: Optional[str] = None

    @field_validator('name')
    @classmethod
    def name_not_empty(cls, v):
        if not v.strip():
            raise ValueError('Product name cannot be empty')
        return v.strip()

    @field_validator('price')
    @classmethod
    def price_positive(cls, v):
        if v <= 0:
            raise ValueError('Price must be positive')
        return round(v, 2)

# Usage
try:
    product = Product(name='Widget', price=29.99, url='https://example.com/widget')
except ValidationError as e:
    print(f'Validation failed: {e}')

Deduplication

import hashlib

class Deduplicator:
    def __init__(self, db_path='seen.db'):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute(
            'CREATE TABLE IF NOT EXISTS seen (hash TEXT PRIMARY KEY, first_seen TEXT)'
        )

    def _hash(self, item: dict) -> str:
        # Create a stable hash from key fields
        key_fields = sorted(f'{k}={v}' for k, v in item.items() if k != 'scraped_at')
        content = '|'.join(key_fields)
        return hashlib.sha256(content.encode()).hexdigest()

    def is_new(self, item: dict) -> bool:
        h = self._hash(item)
        exists = self.conn.execute(
            'SELECT 1 FROM seen WHERE hash=?', (h,)
        ).fetchone()
        if exists:
            return False
        self.conn.execute(
            'INSERT INTO seen (hash, first_seen) VALUES (?, ?)',
            (h, datetime.utcnow().isoformat()),
        )
        self.conn.commit()
        return True

Data transformation and cleaning

import re

def clean_price(raw: str) -> float:
    """Extract numeric price from strings like '$1,234.56' or '1.234,56 EUR'."""
    cleaned = re.sub(r'[^\d.,]', '', raw)
    # Handle European format (1.234,56)
    if ',' in cleaned and '.' in cleaned:
        if cleaned.rindex(',') > cleaned.rindex('.'):
            cleaned = cleaned.replace('.', '').replace(',', '.')
        else:
            cleaned = cleaned.replace(',', '')
    elif ',' in cleaned:
        cleaned = cleaned.replace(',', '.')
    return float(cleaned)

def normalize_whitespace(text: str) -> str:
    return re.sub(r'\s+', ' ', text).strip()

def transform_product(raw: dict) -> dict:
    return {
        'name': normalize_whitespace(raw.get('name', '')),
        'price': clean_price(raw.get('price', '0')),
        'url': raw.get('url', ''),
    }

Storage layer

import sqlite_utils
import json
from datetime import datetime

class Storage:
    def __init__(self, db_path='scraped_data.db'):
        self.db = sqlite_utils.Database(db_path)

    def save_products(self, products: list[dict]):
        self.db['products'].insert_all(
            products,
            pk='url',
            alter=True,
            replace=True,
        )

    def export_jsonl(self, output_path: str):
        with open(output_path, 'w') as f:
            for row in self.db['products'].rows:
                f.write(json.dumps(row) + '\n')

    def export_csv(self, output_path: str):
        import csv
        rows = list(self.db['products'].rows)
        if not rows:
            return
        with open(output_path, 'w', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=rows[0].keys())
            writer.writeheader()
            writer.writerows(rows)

Node.js pipeline with Zod validation

const { z } = require('zod');
const Database = require('better-sqlite3');
const PQueue = require('p-queue').default;

const ProductSchema = z.object({
  name: z.string().min(1).transform(s => s.trim()),
  price: z.number().positive(),
  url: z.string().url(),
  scrapedAt: z.string().datetime().default(() => new Date().toISOString()),
});

class Pipeline {
  constructor(dbPath = 'data.db') {
    this.db = new Database(dbPath);
    this.db.exec(`
      CREATE TABLE IF NOT EXISTS products (
        url TEXT PRIMARY KEY, name TEXT, price REAL, scraped_at TEXT
      )
    `);
    this.insert = this.db.prepare(
      'INSERT OR REPLACE INTO products (url, name, price, scraped_at) VALUES (?, ?, ?, ?)'
    );
    this.queue = new PQueue({ concurrency: 5, interval: 1000, intervalCap: 2 });
  }

  async process(rawItem) {
    const validated = ProductSchema.parse(rawItem);
    this.insert.run(validated.url, validated.name, validated.price, validated.scrapedAt);
  }

  async addJob(fetchFn) {
    return this.queue.add(fetchFn);
  }
}

Monitoring and alerting

import structlog
from datetime import datetime

log = structlog.get_logger()

class PipelineMonitor:
    def __init__(self):
        self.stats = {
            'fetched': 0,
            'parsed': 0,
            'validated': 0,
            'failed': 0,
            'duplicates': 0,
            'stored': 0,
            'start_time': datetime.utcnow(),
        }

    def record(self, event: str, **kwargs):
        if event in self.stats:
            self.stats[event] += 1
        log.info('pipeline_event', event=event, **kwargs)

    def report(self):
        elapsed = (datetime.utcnow() - self.stats['start_time']).total_seconds()
        rate = self.stats['fetched'] / max(elapsed, 1)
        log.info(
            'pipeline_report',
            elapsed_seconds=round(elapsed, 1),
            requests_per_second=round(rate, 2),
            **self.stats,
        )

    def check_health(self):
        failure_rate = self.stats['failed'] / max(self.stats['fetched'], 1)
        if failure_rate > 0.3:
            log.warning('high_failure_rate', rate=round(failure_rate, 2))
            return False
        return True

Putting it all together

async def run_pipeline():
    queue = URLQueue()
    dedup = Deduplicator()
    storage = Storage()
    monitor = PipelineMonitor()
    session = requests.Session()

    queue.add(['https://example.com/page/1', 'https://example.com/page/2'])

    while urls := queue.get_next(batch_size=5):
        for url in urls:
            try:
                resp = session.get(url, timeout=15)
                monitor.record('fetched', url=url)

                raw_items = parse_page(resp.text)  # your parsing logic
                for raw in raw_items:
                    monitor.record('parsed')
                    try:
                        item = transform_product(raw)
                        product = Product(**item)
                        monitor.record('validated')
                    except Exception:
                        monitor.record('failed')
                        continue

                    if dedup.is_new(product.model_dump()):
                        storage.save_products([product.model_dump()])
                        monitor.record('stored')
                    else:
                        monitor.record('duplicates')

                queue.mark_done(url)
            except Exception as e:
                queue.mark_failed(url)
                monitor.record('failed', error=str(e))

            polite_delay(1.0, 2.5)

        if not monitor.check_health():
            log.error('Pipeline health check failed, stopping')
            break

    monitor.report()
    storage.export_jsonl('output.jsonl')

Best Practices

  • Separate concerns. Keep fetching, parsing, validation, and storage in distinct modules. This makes each component testable and replaceable.
  • Validate early and strictly. Use Pydantic or Zod to catch bad data before it reaches storage. Log validation failures for debugging.
  • Deduplicate at ingestion time. Use content hashing to avoid storing duplicate records, especially on incremental re-scrapes.
  • Make pipelines resumable. Persist the URL queue and processing state to a database so that a crashed pipeline can restart from where it left off.
  • Export in standard formats. JSONL (one JSON object per line) is ideal for streaming and downstream processing. CSV works for spreadsheet consumption.
  • Monitor failure rates. A sudden spike in errors usually means the site changed its markup or is rate-limiting you.
  • Version your scrapers. When site markup changes, having versioned parsers lets you roll back or maintain multiple strategies.

Common Pitfalls

  • No error handling in the fetch loop. A single exception can halt the entire pipeline. Wrap each URL's processing in try/except and record failures.
  • In-memory queues. If the pipeline crashes, all pending URLs are lost. Persist the queue to SQLite or Redis.
  • Schema drift. Websites change their HTML over time. Parsers that worked last month may silently return empty fields today. Add assertions and monitoring for expected data shapes.
  • Storing raw HTML without parsed data. Always store the raw HTML alongside parsed records so you can re-parse later without re-fetching.
  • Ignoring timezones in timestamps. Always store timestamps in UTC with timezone info. Use datetime.utcnow() or datetime.now(timezone.utc).
  • Monolithic pipeline scripts. Large single-file scrapers become unmaintainable. Organize into queue, fetcher, parser, validator, storage, and monitor modules.

Anti-Patterns

Over-engineering for hypothetical scale. Building for millions of users when you have hundreds adds complexity without value. Solve today's problems first.

Ignoring the existing ecosystem. Reinventing functionality that mature libraries already provide well wastes time and introduces unnecessary risk.

Premature abstraction. Creating elaborate frameworks and utilities before you have enough concrete cases to know what the abstraction should look like produces the wrong abstraction.

Neglecting error handling at boundaries. Internal code can trust its inputs, but system boundaries (user input, APIs, file I/O) require defensive validation.

Skipping documentation for obvious code. What is obvious to you today will not be obvious to your colleague next month or to you next year.

Install this skill directly: skilldb add web-scraping-skills

Get CLI access →