Skip to main content
Technology & EngineeringData Pipeline Services220 lines

Apache Spark

Process large-scale data with Apache Spark using PySpark DataFrames, Spark SQL, and structured streaming.

Quick Summary24 lines
You are an expert in Apache Spark distributed data processing, skilled at writing efficient PySpark transformations, optimizing shuffle operations, and building both batch and streaming pipelines.

## Key Points

- **Using Python UDFs for operations available as built-in functions**: Python UDFs serialize data row-by-row between JVM and Python; built-in functions run natively in Catalyst
- **Calling `.count()` or `.show()` in the middle of a pipeline for debugging**: Each action triggers a full job execution; use `.explain()` to inspect plans without running them
- **Setting `shuffle.partitions` to 200 for all jobs regardless of data size**: Small datasets get 200 tiny partitions with excessive overhead; use AQE or set partitions proportional to data volume
- **Writing to a single output file with `.coalesce(1)` on large datasets**: Forces all data through one executor, eliminates parallelism, and risks OOM on that single node
- Processing datasets from gigabytes to petabytes that exceed single-machine memory
- Running complex ETL with joins across multiple large tables requiring distributed shuffle
- Building real-time data pipelines with Structured Streaming from Kafka, Kinesis, or file sources
- Performing feature engineering and large-scale data preparation for ML training pipelines
- Executing interactive SQL analytics on data lakes using Spark SQL with Delta Lake or Iceberg

## Quick Example

```bash
pip install pyspark==3.5.1

# For Delta Lake support
pip install delta-spark==3.1.0
```
skilldb get data-pipeline-services-skills/Apache SparkFull skill: 220 lines
Paste into your CLAUDE.md or agent config

Apache Spark

You are an expert in Apache Spark distributed data processing, skilled at writing efficient PySpark transformations, optimizing shuffle operations, and building both batch and streaming pipelines.

Core Philosophy

Lazy Evaluation and Catalyst Optimization

Spark builds a logical plan of transformations and optimizes it before execution. Write transformations declaratively using DataFrame/Dataset APIs so Catalyst can push down filters, prune columns, and reorder joins.

Partition Awareness

Every Spark operation distributes data across partitions. Understanding when shuffles occur (joins, groupBy, repartition) and controlling partition count is the primary lever for performance tuning.

Minimize Data Movement

Reduce shuffles by using broadcast joins for small tables, pre-partitioning data by join keys, and filtering early. Network I/O between executors is the dominant bottleneck in most Spark jobs.

Setup

Install PySpark and configure a session:

pip install pyspark==3.5.1

# For Delta Lake support
pip install delta-spark==3.1.0

Create a SparkSession with common configurations:

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("etl_pipeline")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.parquet.compression.codec", "zstd")
    .config("spark.sql.shuffle.partitions", "200")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

Key Patterns

Do: Use DataFrame API with column expressions instead of UDFs

from pyspark.sql import functions as F

# GOOD - uses Catalyst-optimized built-in functions
df_clean = (
    df.withColumn("full_name", F.concat_ws(" ", F.col("first_name"), F.col("last_name")))
    .withColumn("order_date", F.to_date("order_timestamp"))
    .withColumn("amount_usd", F.col("amount_cents") / 100)
    .filter(F.col("status").isin("completed", "shipped"))
    .select("order_id", "full_name", "order_date", "amount_usd")
)

Do Not: Collect large datasets to the driver

# BAD - pulls entire dataset into driver memory, causes OOM
all_rows = df.collect()
for row in all_rows:
    process(row)

# GOOD - process distributed, collect only aggregated results
summary = (
    df.groupBy("category")
    .agg(F.sum("amount").alias("total"), F.count("*").alias("cnt"))
    .collect()  # small aggregated result is safe to collect
)

Do: Use broadcast joins for small dimension tables

from pyspark.sql.functions import broadcast

# Small lookup table (< 10MB) broadcast to all executors
dim_products = spark.read.parquet("s3://warehouse/dim_products/")

enriched = (
    fact_orders
    .join(broadcast(dim_products), "product_id", "left")
    .select("order_id", "product_name", "category", "amount")
)

Common Patterns

Read and write partitioned Parquet with Delta Lake

from delta.tables import DeltaTable

# Read with partition pruning
events = (
    spark.read.format("delta")
    .load("s3://data-lake/events/")
    .filter(F.col("event_date") >= "2024-01-01")
)

# Write with partitioning
(
    events
    .repartition("event_date")
    .write.format("delta")
    .mode("overwrite")
    .partitionBy("event_date")
    .option("overwriteSchema", "true")
    .save("s3://data-lake/events_processed/")
)

# Upsert with Delta merge
delta_table = DeltaTable.forPath(spark, "s3://data-lake/dim_customers/")

delta_table.alias("target").merge(
    new_customers.alias("source"),
    "target.customer_id = source.customer_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

Structured Streaming from Kafka

stream_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("subscribe", "events")
    .option("startingOffsets", "latest")
    .load()
)

parsed = (
    stream_df
    .select(F.from_json(F.col("value").cast("string"), schema).alias("data"))
    .select("data.*")
    .withWatermark("event_time", "10 minutes")
)

query = (
    parsed
    .groupBy(F.window("event_time", "5 minutes"), "event_type")
    .count()
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "s3://checkpoints/event_counts/")
    .trigger(processingTime="1 minute")
    .start("s3://data-lake/event_counts/")
)

Window functions for analytics

from pyspark.sql.window import Window

window_spec = Window.partitionBy("customer_id").orderBy("order_date")

orders_enriched = (
    orders
    .withColumn("order_rank", F.row_number().over(window_spec))
    .withColumn("prev_order_date", F.lag("order_date", 1).over(window_spec))
    .withColumn(
        "days_since_last_order",
        F.datediff("order_date", "prev_order_date"),
    )
    .withColumn(
        "running_total",
        F.sum("amount").over(window_spec.rowsBetween(Window.unboundedPreceding, 0)),
    )
)

Handling skewed joins

# Enable AQE skew join optimization (Spark 3.x)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

# Manual salting for severely skewed keys
salt_range = 10
skewed_df = df.withColumn("salt", (F.rand() * salt_range).cast("int"))
lookup_exploded = lookup.crossJoin(
    spark.range(salt_range).withColumnRenamed("id", "salt")
)

result = skewed_df.join(lookup_exploded, ["join_key", "salt"]).drop("salt")

Anti-Patterns

  • Using Python UDFs for operations available as built-in functions: Python UDFs serialize data row-by-row between JVM and Python; built-in functions run natively in Catalyst
  • Calling .count() or .show() in the middle of a pipeline for debugging: Each action triggers a full job execution; use .explain() to inspect plans without running them
  • Setting shuffle.partitions to 200 for all jobs regardless of data size: Small datasets get 200 tiny partitions with excessive overhead; use AQE or set partitions proportional to data volume
  • Writing to a single output file with .coalesce(1) on large datasets: Forces all data through one executor, eliminates parallelism, and risks OOM on that single node

When to Use

  • Processing datasets from gigabytes to petabytes that exceed single-machine memory
  • Running complex ETL with joins across multiple large tables requiring distributed shuffle
  • Building real-time data pipelines with Structured Streaming from Kafka, Kinesis, or file sources
  • Performing feature engineering and large-scale data preparation for ML training pipelines
  • Executing interactive SQL analytics on data lakes using Spark SQL with Delta Lake or Iceberg

Install this skill directly: skilldb add data-pipeline-services-skills

Get CLI access →