Data Pipeline
Patterns for building robust scraping data pipelines with validation, deduplication, storage, and monitoring
You are an expert in designing and building data pipelines for web scraping, covering extraction, transformation, validation, storage, and monitoring. ## Key Points - **Separate concerns.** Keep fetching, parsing, validation, and storage in distinct modules. This makes each component testable and replaceable. - **Validate early and strictly.** Use Pydantic or Zod to catch bad data before it reaches storage. Log validation failures for debugging. - **Deduplicate at ingestion time.** Use content hashing to avoid storing duplicate records, especially on incremental re-scrapes. - **Make pipelines resumable.** Persist the URL queue and processing state to a database so that a crashed pipeline can restart from where it left off. - **Export in standard formats.** JSONL (one JSON object per line) is ideal for streaming and downstream processing. CSV works for spreadsheet consumption. - **Monitor failure rates.** A sudden spike in errors usually means the site changed its markup or is rate-limiting you. - **Version your scrapers.** When site markup changes, having versioned parsers lets you roll back or maintain multiple strategies. - **No error handling in the fetch loop.** A single exception can halt the entire pipeline. Wrap each URL's processing in try/except and record failures. - **In-memory queues.** If the pipeline crashes, all pending URLs are lost. Persist the queue to SQLite or Redis. - **Schema drift.** Websites change their HTML over time. Parsers that worked last month may silently return empty fields today. Add assertions and monitoring for expected data shapes. - **Storing raw HTML without parsed data.** Always store the raw HTML alongside parsed records so you can re-parse later without re-fetching. - **Ignoring timezones in timestamps.** Always store timestamps in UTC with timezone info. Use `datetime.utcnow()` or `datetime.now(timezone.utc)`. ## Quick Example ```bash pip install requests beautifulsoup4 lxml pip install pydantic # data validation pip install sqlite-utils # lightweight storage pip install schedule # simple job scheduling pip install structlog # structured logging ``` ```bash npm install cheerio axios zod better-sqlite3 p-queue winston ```
skilldb get web-scraping-skills/Data PipelineFull skill: 391 linesData Pipeline — Web Scraping
You are an expert in designing and building data pipelines for web scraping, covering extraction, transformation, validation, storage, and monitoring.
Core Philosophy
Overview
A scraping data pipeline is the end-to-end system that takes raw web data and turns it into clean, structured, reliable datasets. Beyond just fetching pages, a production pipeline handles scheduling, deduplication, validation, error recovery, storage, and monitoring. This skill covers architectural patterns and concrete implementations for each pipeline stage.
Setup & Configuration
Common stack for a Python-based pipeline:
pip install requests beautifulsoup4 lxml
pip install pydantic # data validation
pip install sqlite-utils # lightweight storage
pip install schedule # simple job scheduling
pip install structlog # structured logging
For a Node.js pipeline:
npm install cheerio axios zod better-sqlite3 p-queue winston
Core Patterns
Pipeline architecture
[Scheduler] -> [URL Queue] -> [Fetcher] -> [Parser] -> [Validator]
|
[Deduplicator]
|
[Transformer] -> [Storage]
|
[Monitor]
URL queue management
import sqlite3
from datetime import datetime
class URLQueue:
def __init__(self, db_path='queue.db'):
self.conn = sqlite3.connect(db_path)
self.conn.execute('''
CREATE TABLE IF NOT EXISTS urls (
url TEXT PRIMARY KEY,
status TEXT DEFAULT 'pending',
attempts INTEGER DEFAULT 0,
last_attempt TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
def add(self, urls):
self.conn.executemany(
'INSERT OR IGNORE INTO urls (url) VALUES (?)',
[(u,) for u in urls],
)
self.conn.commit()
def get_next(self, batch_size=10):
cursor = self.conn.execute(
"SELECT url FROM urls WHERE status='pending' AND attempts < 3 "
"ORDER BY created_at LIMIT ?",
(batch_size,),
)
return [row[0] for row in cursor.fetchall()]
def mark_done(self, url):
self.conn.execute(
"UPDATE urls SET status='done' WHERE url=?", (url,)
)
self.conn.commit()
def mark_failed(self, url):
self.conn.execute(
"UPDATE urls SET status='failed', attempts=attempts+1, "
"last_attempt=? WHERE url=?",
(datetime.utcnow().isoformat(), url),
)
self.conn.commit()
Data validation with Pydantic
from pydantic import BaseModel, field_validator, HttpUrl
from typing import Optional
from datetime import datetime
class Product(BaseModel):
name: str
price: float
currency: str = 'USD'
url: HttpUrl
scraped_at: datetime = datetime.utcnow()
category: Optional[str] = None
@field_validator('name')
@classmethod
def name_not_empty(cls, v):
if not v.strip():
raise ValueError('Product name cannot be empty')
return v.strip()
@field_validator('price')
@classmethod
def price_positive(cls, v):
if v <= 0:
raise ValueError('Price must be positive')
return round(v, 2)
# Usage
try:
product = Product(name='Widget', price=29.99, url='https://example.com/widget')
except ValidationError as e:
print(f'Validation failed: {e}')
Deduplication
import hashlib
class Deduplicator:
def __init__(self, db_path='seen.db'):
self.conn = sqlite3.connect(db_path)
self.conn.execute(
'CREATE TABLE IF NOT EXISTS seen (hash TEXT PRIMARY KEY, first_seen TEXT)'
)
def _hash(self, item: dict) -> str:
# Create a stable hash from key fields
key_fields = sorted(f'{k}={v}' for k, v in item.items() if k != 'scraped_at')
content = '|'.join(key_fields)
return hashlib.sha256(content.encode()).hexdigest()
def is_new(self, item: dict) -> bool:
h = self._hash(item)
exists = self.conn.execute(
'SELECT 1 FROM seen WHERE hash=?', (h,)
).fetchone()
if exists:
return False
self.conn.execute(
'INSERT INTO seen (hash, first_seen) VALUES (?, ?)',
(h, datetime.utcnow().isoformat()),
)
self.conn.commit()
return True
Data transformation and cleaning
import re
def clean_price(raw: str) -> float:
"""Extract numeric price from strings like '$1,234.56' or '1.234,56 EUR'."""
cleaned = re.sub(r'[^\d.,]', '', raw)
# Handle European format (1.234,56)
if ',' in cleaned and '.' in cleaned:
if cleaned.rindex(',') > cleaned.rindex('.'):
cleaned = cleaned.replace('.', '').replace(',', '.')
else:
cleaned = cleaned.replace(',', '')
elif ',' in cleaned:
cleaned = cleaned.replace(',', '.')
return float(cleaned)
def normalize_whitespace(text: str) -> str:
return re.sub(r'\s+', ' ', text).strip()
def transform_product(raw: dict) -> dict:
return {
'name': normalize_whitespace(raw.get('name', '')),
'price': clean_price(raw.get('price', '0')),
'url': raw.get('url', ''),
}
Storage layer
import sqlite_utils
import json
from datetime import datetime
class Storage:
def __init__(self, db_path='scraped_data.db'):
self.db = sqlite_utils.Database(db_path)
def save_products(self, products: list[dict]):
self.db['products'].insert_all(
products,
pk='url',
alter=True,
replace=True,
)
def export_jsonl(self, output_path: str):
with open(output_path, 'w') as f:
for row in self.db['products'].rows:
f.write(json.dumps(row) + '\n')
def export_csv(self, output_path: str):
import csv
rows = list(self.db['products'].rows)
if not rows:
return
with open(output_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=rows[0].keys())
writer.writeheader()
writer.writerows(rows)
Node.js pipeline with Zod validation
const { z } = require('zod');
const Database = require('better-sqlite3');
const PQueue = require('p-queue').default;
const ProductSchema = z.object({
name: z.string().min(1).transform(s => s.trim()),
price: z.number().positive(),
url: z.string().url(),
scrapedAt: z.string().datetime().default(() => new Date().toISOString()),
});
class Pipeline {
constructor(dbPath = 'data.db') {
this.db = new Database(dbPath);
this.db.exec(`
CREATE TABLE IF NOT EXISTS products (
url TEXT PRIMARY KEY, name TEXT, price REAL, scraped_at TEXT
)
`);
this.insert = this.db.prepare(
'INSERT OR REPLACE INTO products (url, name, price, scraped_at) VALUES (?, ?, ?, ?)'
);
this.queue = new PQueue({ concurrency: 5, interval: 1000, intervalCap: 2 });
}
async process(rawItem) {
const validated = ProductSchema.parse(rawItem);
this.insert.run(validated.url, validated.name, validated.price, validated.scrapedAt);
}
async addJob(fetchFn) {
return this.queue.add(fetchFn);
}
}
Monitoring and alerting
import structlog
from datetime import datetime
log = structlog.get_logger()
class PipelineMonitor:
def __init__(self):
self.stats = {
'fetched': 0,
'parsed': 0,
'validated': 0,
'failed': 0,
'duplicates': 0,
'stored': 0,
'start_time': datetime.utcnow(),
}
def record(self, event: str, **kwargs):
if event in self.stats:
self.stats[event] += 1
log.info('pipeline_event', event=event, **kwargs)
def report(self):
elapsed = (datetime.utcnow() - self.stats['start_time']).total_seconds()
rate = self.stats['fetched'] / max(elapsed, 1)
log.info(
'pipeline_report',
elapsed_seconds=round(elapsed, 1),
requests_per_second=round(rate, 2),
**self.stats,
)
def check_health(self):
failure_rate = self.stats['failed'] / max(self.stats['fetched'], 1)
if failure_rate > 0.3:
log.warning('high_failure_rate', rate=round(failure_rate, 2))
return False
return True
Putting it all together
async def run_pipeline():
queue = URLQueue()
dedup = Deduplicator()
storage = Storage()
monitor = PipelineMonitor()
session = requests.Session()
queue.add(['https://example.com/page/1', 'https://example.com/page/2'])
while urls := queue.get_next(batch_size=5):
for url in urls:
try:
resp = session.get(url, timeout=15)
monitor.record('fetched', url=url)
raw_items = parse_page(resp.text) # your parsing logic
for raw in raw_items:
monitor.record('parsed')
try:
item = transform_product(raw)
product = Product(**item)
monitor.record('validated')
except Exception:
monitor.record('failed')
continue
if dedup.is_new(product.model_dump()):
storage.save_products([product.model_dump()])
monitor.record('stored')
else:
monitor.record('duplicates')
queue.mark_done(url)
except Exception as e:
queue.mark_failed(url)
monitor.record('failed', error=str(e))
polite_delay(1.0, 2.5)
if not monitor.check_health():
log.error('Pipeline health check failed, stopping')
break
monitor.report()
storage.export_jsonl('output.jsonl')
Best Practices
- Separate concerns. Keep fetching, parsing, validation, and storage in distinct modules. This makes each component testable and replaceable.
- Validate early and strictly. Use Pydantic or Zod to catch bad data before it reaches storage. Log validation failures for debugging.
- Deduplicate at ingestion time. Use content hashing to avoid storing duplicate records, especially on incremental re-scrapes.
- Make pipelines resumable. Persist the URL queue and processing state to a database so that a crashed pipeline can restart from where it left off.
- Export in standard formats. JSONL (one JSON object per line) is ideal for streaming and downstream processing. CSV works for spreadsheet consumption.
- Monitor failure rates. A sudden spike in errors usually means the site changed its markup or is rate-limiting you.
- Version your scrapers. When site markup changes, having versioned parsers lets you roll back or maintain multiple strategies.
Common Pitfalls
- No error handling in the fetch loop. A single exception can halt the entire pipeline. Wrap each URL's processing in try/except and record failures.
- In-memory queues. If the pipeline crashes, all pending URLs are lost. Persist the queue to SQLite or Redis.
- Schema drift. Websites change their HTML over time. Parsers that worked last month may silently return empty fields today. Add assertions and monitoring for expected data shapes.
- Storing raw HTML without parsed data. Always store the raw HTML alongside parsed records so you can re-parse later without re-fetching.
- Ignoring timezones in timestamps. Always store timestamps in UTC with timezone info. Use
datetime.utcnow()ordatetime.now(timezone.utc). - Monolithic pipeline scripts. Large single-file scrapers become unmaintainable. Organize into queue, fetcher, parser, validator, storage, and monitor modules.
Anti-Patterns
Over-engineering for hypothetical scale. Building for millions of users when you have hundreds adds complexity without value. Solve today's problems first.
Ignoring the existing ecosystem. Reinventing functionality that mature libraries already provide well wastes time and introduces unnecessary risk.
Premature abstraction. Creating elaborate frameworks and utilities before you have enough concrete cases to know what the abstraction should look like produces the wrong abstraction.
Neglecting error handling at boundaries. Internal code can trust its inputs, but system boundaries (user input, APIs, file I/O) require defensive validation.
Skipping documentation for obvious code. What is obvious to you today will not be obvious to your colleague next month or to you next year.
Install this skill directly: skilldb add web-scraping-skills
Related Skills
Anti Detection
Ethical techniques for handling CAPTCHAs, rate limiting, and bot detection while scraping responsibly
Beautifulsoup
HTML and XML parsing with Beautiful Soup in Python for flexible data extraction
Cheerio
Fast server-side HTML parsing and data extraction with Cheerio using jQuery-like syntax
Playwright Scraping
Cross-browser web scraping with Playwright, supporting Chromium, Firefox, and WebKit
Puppeteer
Headless Chrome browser automation with Puppeteer for scraping dynamic, JavaScript-rendered pages
Scrapy
Production-grade web scraping framework in Python with built-in crawling, pipelines, and middleware