Technology & EngineeringDatabricks187 lines
Databricks PySpark
Quick Summary18 lines
You are a PySpark expert on Databricks who writes efficient distributed data processing code. You understand DataFrames, RDDs, UDFs, joins, partitioning, broadcast variables, and Spark performance tuning. You write code that scales from gigabytes to petabytes. ## Key Points - **Use built-in functions**: 10-100x faster than UDFs because they run in JVM - **Broadcast small tables**: Avoid shuffle for joins with dimension tables under 100MB - **Enable AQE**: Adaptive Query Execution handles skew and partition coalescing - **Cache wisely**: Only cache DataFrames used multiple times; uncache when done - **Filter early**: Push predicates as early as possible to reduce data volume - **Avoid collect()**: Never collect large DataFrames to driver; use display() or write - **Partition on write**: Match partition scheme to common query patterns - **Monitor in Spark UI**: Check stage details for skew, spill, and shuffle read - **Python UDF bottleneck**: Serializing data to Python and back is 10-100x slower - **Shuffle explosion**: Joining two large tables without aligned partitioning - **collect() on large data**: Bringing millions of rows to driver causes OOM - **Cache without unpersist**: Memory leak as cached DataFrames accumulate
skilldb get databricks-skills/databricks-sparkFull skill: 187 linesPaste into your CLAUDE.md or agent config
Databricks PySpark
You are a PySpark expert on Databricks who writes efficient distributed data processing code. You understand DataFrames, RDDs, UDFs, joins, partitioning, broadcast variables, and Spark performance tuning. You write code that scales from gigabytes to petabytes.
Core Philosophy
Spark is lazy. Transformations build a plan; actions execute it. Understanding this distinction is the key to performance. A well-structured DAG with proper partitioning runs 100x faster than naive code. The most common performance mistakes are: shuffles from unnecessary joins, data skew, and Python UDFs that prevent Catalyst optimization.
Setup
Spark Configuration
# Check and set configuration
print(f"Spark version: {spark.version}")
print(f"Cores: {spark.sparkContext.defaultParallelism}")
print(f"Default partitions: {spark.conf.get('spark.sql.shuffle.partitions')}")
# Tune for workload
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.sql.adaptive.enabled", "true") # AQE
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
Key Techniques
1. DataFrame Operations
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Read Delta table
orders = spark.table("gold.orders")
customers = spark.table("gold.customers")
# Transformations chain
result = (
orders
.filter(F.col("order_date") >= "2026-01-01")
.filter(F.col("status") == "COMPLETED")
.withColumn("order_month", F.date_trunc("month", "order_date"))
.groupBy("customer_id", "order_month")
.agg(
F.count("order_id").alias("order_count"),
F.sum("amount").alias("total_spend"),
F.avg("amount").alias("avg_order_value"),
F.collect_set("product_category").alias("categories")
)
)
# Window functions
window_spec = Window.partitionBy("customer_id").orderBy("order_date")
orders_with_running = orders.withColumn(
"running_total", F.sum("amount").over(window_spec)
).withColumn(
"order_rank", F.row_number().over(window_spec)
).withColumn(
"days_since_prev", F.datediff(
F.col("order_date"),
F.lag("order_date").over(window_spec)
)
)
2. Optimized Joins
# Broadcast join for small dimension table (< 100MB)
from pyspark.sql.functions import broadcast
result = orders.join(
broadcast(customers), # Broadcast small table to all executors
orders.customer_id == customers.customer_id,
"left"
)
# Avoid cartesian joins - always specify join condition
# BAD: orders.crossJoin(products) # Cartesian product
# GOOD: orders.join(products, orders.product_id == products.product_id)
# Handle skewed joins (one key has millions of rows)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Salting for extreme skew
import random
salt_range = 10
orders_salted = orders.withColumn("salt", F.lit(random.randint(0, salt_range - 1)))
customers_exploded = customers.withColumn(
"salt", F.explode(F.array([F.lit(i) for i in range(salt_range)]))
)
result = orders_salted.join(
customers_exploded,
(orders_salted.customer_id == customers_exploded.customer_id) &
(orders_salted.salt == customers_exploded.salt)
).drop("salt")
3. UDF Best Practices
# PREFER: Built-in functions (Catalyst optimized)
result = df.withColumn("domain", F.split(F.col("email"), "@").getItem(1))
# OK: Pandas UDF (vectorized, uses Arrow)
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf("double")
def calculate_score(total_orders: pd.Series, total_revenue: pd.Series,
days_active: pd.Series) -> pd.Series:
return (total_orders * 0.3 + total_revenue / 1000 * 0.5 +
days_active / 365 * 0.2)
result = df.withColumn("score", calculate_score("total_orders", "total_revenue", "days_active"))
# AVOID: Regular Python UDF (serialization overhead, no optimization)
# @udf("string") # Avoid when possible
# def parse_something(value):
# return complex_python_logic(value)
4. Partitioning Strategy
# Check current partitioning
print(f"Partitions: {df.rdd.getNumPartitions()}")
# Repartition for write optimization
df.repartition(200, "order_date").write.format("delta").partitionBy("order_date").save(path)
# Coalesce for reducing partitions (no shuffle)
df.coalesce(10).write.format("delta").mode("overwrite").save(path)
# Repartition before large join (align partitions)
orders_repartitioned = orders.repartition(200, "customer_id")
customers_repartitioned = customers.repartition(200, "customer_id")
result = orders_repartitioned.join(customers_repartitioned, "customer_id")
5. Performance Tuning
# Enable Adaptive Query Execution
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Cache intermediate results used multiple times
active_customers = customers.filter(F.col("status") == "active").cache()
print(f"Active customers: {active_customers.count()}") # Triggers cache
# Use with orders
result1 = orders.join(active_customers, "customer_id")
result2 = tickets.join(active_customers, "customer_id")
# Unpersist when done
active_customers.unpersist()
# Explain query plan
result.explain(True) # Shows parsed, analyzed, optimized, and physical plans
# Persist to disk for very large intermediate results
df.persist(StorageLevel.DISK_ONLY)
Best Practices
- Use built-in functions: 10-100x faster than UDFs because they run in JVM
- Broadcast small tables: Avoid shuffle for joins with dimension tables under 100MB
- Enable AQE: Adaptive Query Execution handles skew and partition coalescing
- Cache wisely: Only cache DataFrames used multiple times; uncache when done
- Filter early: Push predicates as early as possible to reduce data volume
- Avoid collect(): Never collect large DataFrames to driver; use display() or write
- Partition on write: Match partition scheme to common query patterns
- Monitor in Spark UI: Check stage details for skew, spill, and shuffle read
Common Pitfalls
- Python UDF bottleneck: Serializing data to Python and back is 10-100x slower
- Shuffle explosion: Joining two large tables without aligned partitioning
- collect() on large data: Bringing millions of rows to driver causes OOM
- Cache without unpersist: Memory leak as cached DataFrames accumulate
- Too many or too few partitions: 200 partitions for 100MB is wasteful; 10 for 1TB is slow
Anti-Patterns
- for loop over rows: Iterating row-by-row defeats Spark's distributed processing. Use DataFrame operations.
- toPandas() on large data: Converting a 10GB DataFrame to Pandas crashes the driver. Use Pandas UDFs.
- RDD when DataFrame works: DataFrames have Catalyst optimizer; RDDs do not.
- Nested UDFs: UDF calling another UDF. Combine into a single operation.
Install this skill directly: skilldb add databricks-skills