Apache Spark
Process large-scale data with Apache Spark using PySpark DataFrames, Spark SQL, and structured streaming.
You are an expert in Apache Spark distributed data processing, skilled at writing efficient PySpark transformations, optimizing shuffle operations, and building both batch and streaming pipelines. ## Key Points - **Using Python UDFs for operations available as built-in functions**: Python UDFs serialize data row-by-row between JVM and Python; built-in functions run natively in Catalyst - **Calling `.count()` or `.show()` in the middle of a pipeline for debugging**: Each action triggers a full job execution; use `.explain()` to inspect plans without running them - **Setting `shuffle.partitions` to 200 for all jobs regardless of data size**: Small datasets get 200 tiny partitions with excessive overhead; use AQE or set partitions proportional to data volume - **Writing to a single output file with `.coalesce(1)` on large datasets**: Forces all data through one executor, eliminates parallelism, and risks OOM on that single node - Processing datasets from gigabytes to petabytes that exceed single-machine memory - Running complex ETL with joins across multiple large tables requiring distributed shuffle - Building real-time data pipelines with Structured Streaming from Kafka, Kinesis, or file sources - Performing feature engineering and large-scale data preparation for ML training pipelines - Executing interactive SQL analytics on data lakes using Spark SQL with Delta Lake or Iceberg ## Quick Example ```bash pip install pyspark==3.5.1 # For Delta Lake support pip install delta-spark==3.1.0 ```
skilldb get data-pipeline-services-skills/Apache SparkFull skill: 220 linesApache Spark
You are an expert in Apache Spark distributed data processing, skilled at writing efficient PySpark transformations, optimizing shuffle operations, and building both batch and streaming pipelines.
Core Philosophy
Lazy Evaluation and Catalyst Optimization
Spark builds a logical plan of transformations and optimizes it before execution. Write transformations declaratively using DataFrame/Dataset APIs so Catalyst can push down filters, prune columns, and reorder joins.
Partition Awareness
Every Spark operation distributes data across partitions. Understanding when shuffles occur (joins, groupBy, repartition) and controlling partition count is the primary lever for performance tuning.
Minimize Data Movement
Reduce shuffles by using broadcast joins for small tables, pre-partitioning data by join keys, and filtering early. Network I/O between executors is the dominant bottleneck in most Spark jobs.
Setup
Install PySpark and configure a session:
pip install pyspark==3.5.1
# For Delta Lake support
pip install delta-spark==3.1.0
Create a SparkSession with common configurations:
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("etl_pipeline")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.parquet.compression.codec", "zstd")
.config("spark.sql.shuffle.partitions", "200")
.getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
Key Patterns
Do: Use DataFrame API with column expressions instead of UDFs
from pyspark.sql import functions as F
# GOOD - uses Catalyst-optimized built-in functions
df_clean = (
df.withColumn("full_name", F.concat_ws(" ", F.col("first_name"), F.col("last_name")))
.withColumn("order_date", F.to_date("order_timestamp"))
.withColumn("amount_usd", F.col("amount_cents") / 100)
.filter(F.col("status").isin("completed", "shipped"))
.select("order_id", "full_name", "order_date", "amount_usd")
)
Do Not: Collect large datasets to the driver
# BAD - pulls entire dataset into driver memory, causes OOM
all_rows = df.collect()
for row in all_rows:
process(row)
# GOOD - process distributed, collect only aggregated results
summary = (
df.groupBy("category")
.agg(F.sum("amount").alias("total"), F.count("*").alias("cnt"))
.collect() # small aggregated result is safe to collect
)
Do: Use broadcast joins for small dimension tables
from pyspark.sql.functions import broadcast
# Small lookup table (< 10MB) broadcast to all executors
dim_products = spark.read.parquet("s3://warehouse/dim_products/")
enriched = (
fact_orders
.join(broadcast(dim_products), "product_id", "left")
.select("order_id", "product_name", "category", "amount")
)
Common Patterns
Read and write partitioned Parquet with Delta Lake
from delta.tables import DeltaTable
# Read with partition pruning
events = (
spark.read.format("delta")
.load("s3://data-lake/events/")
.filter(F.col("event_date") >= "2024-01-01")
)
# Write with partitioning
(
events
.repartition("event_date")
.write.format("delta")
.mode("overwrite")
.partitionBy("event_date")
.option("overwriteSchema", "true")
.save("s3://data-lake/events_processed/")
)
# Upsert with Delta merge
delta_table = DeltaTable.forPath(spark, "s3://data-lake/dim_customers/")
delta_table.alias("target").merge(
new_customers.alias("source"),
"target.customer_id = source.customer_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Structured Streaming from Kafka
stream_df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "events")
.option("startingOffsets", "latest")
.load()
)
parsed = (
stream_df
.select(F.from_json(F.col("value").cast("string"), schema).alias("data"))
.select("data.*")
.withWatermark("event_time", "10 minutes")
)
query = (
parsed
.groupBy(F.window("event_time", "5 minutes"), "event_type")
.count()
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "s3://checkpoints/event_counts/")
.trigger(processingTime="1 minute")
.start("s3://data-lake/event_counts/")
)
Window functions for analytics
from pyspark.sql.window import Window
window_spec = Window.partitionBy("customer_id").orderBy("order_date")
orders_enriched = (
orders
.withColumn("order_rank", F.row_number().over(window_spec))
.withColumn("prev_order_date", F.lag("order_date", 1).over(window_spec))
.withColumn(
"days_since_last_order",
F.datediff("order_date", "prev_order_date"),
)
.withColumn(
"running_total",
F.sum("amount").over(window_spec.rowsBetween(Window.unboundedPreceding, 0)),
)
)
Handling skewed joins
# Enable AQE skew join optimization (Spark 3.x)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
# Manual salting for severely skewed keys
salt_range = 10
skewed_df = df.withColumn("salt", (F.rand() * salt_range).cast("int"))
lookup_exploded = lookup.crossJoin(
spark.range(salt_range).withColumnRenamed("id", "salt")
)
result = skewed_df.join(lookup_exploded, ["join_key", "salt"]).drop("salt")
Anti-Patterns
- Using Python UDFs for operations available as built-in functions: Python UDFs serialize data row-by-row between JVM and Python; built-in functions run natively in Catalyst
- Calling
.count()or.show()in the middle of a pipeline for debugging: Each action triggers a full job execution; use.explain()to inspect plans without running them - Setting
shuffle.partitionsto 200 for all jobs regardless of data size: Small datasets get 200 tiny partitions with excessive overhead; use AQE or set partitions proportional to data volume - Writing to a single output file with
.coalesce(1)on large datasets: Forces all data through one executor, eliminates parallelism, and risks OOM on that single node
When to Use
- Processing datasets from gigabytes to petabytes that exceed single-machine memory
- Running complex ETL with joins across multiple large tables requiring distributed shuffle
- Building real-time data pipelines with Structured Streaming from Kafka, Kinesis, or file sources
- Performing feature engineering and large-scale data preparation for ML training pipelines
- Executing interactive SQL analytics on data lakes using Spark SQL with Delta Lake or Iceberg
Install this skill directly: skilldb add data-pipeline-services-skills
Related Skills
Airbyte
Configure Airbyte open-source data integration with custom connectors, destinations, and CDC replication.
Apache Airflow
Orchestrate data pipelines using Apache Airflow DAGs, operators, sensors, and XCom.
Bigquery
Build analytical pipelines on Google BigQuery using SQL, streaming inserts, and federated queries.
Clickhouse
Build high-performance OLAP queries on ClickHouse using MergeTree engines, materialized views, and aggregations.
DBT
Build and test data transformation pipelines using dbt models, macros, and incremental strategies.
Fivetran
Configure and manage Fivetran connectors for automated data ingestion into warehouses.