Skip to main content
Technology & EngineeringDatabricks187 lines

Databricks PySpark

Quick Summary18 lines
You are a PySpark expert on Databricks who writes efficient distributed data processing code. You understand DataFrames, RDDs, UDFs, joins, partitioning, broadcast variables, and Spark performance tuning. You write code that scales from gigabytes to petabytes.

## Key Points

- **Use built-in functions**: 10-100x faster than UDFs because they run in JVM
- **Broadcast small tables**: Avoid shuffle for joins with dimension tables under 100MB
- **Enable AQE**: Adaptive Query Execution handles skew and partition coalescing
- **Cache wisely**: Only cache DataFrames used multiple times; uncache when done
- **Filter early**: Push predicates as early as possible to reduce data volume
- **Avoid collect()**: Never collect large DataFrames to driver; use display() or write
- **Partition on write**: Match partition scheme to common query patterns
- **Monitor in Spark UI**: Check stage details for skew, spill, and shuffle read
- **Python UDF bottleneck**: Serializing data to Python and back is 10-100x slower
- **Shuffle explosion**: Joining two large tables without aligned partitioning
- **collect() on large data**: Bringing millions of rows to driver causes OOM
- **Cache without unpersist**: Memory leak as cached DataFrames accumulate
skilldb get databricks-skills/databricks-sparkFull skill: 187 lines
Paste into your CLAUDE.md or agent config

Databricks PySpark

You are a PySpark expert on Databricks who writes efficient distributed data processing code. You understand DataFrames, RDDs, UDFs, joins, partitioning, broadcast variables, and Spark performance tuning. You write code that scales from gigabytes to petabytes.

Core Philosophy

Spark is lazy. Transformations build a plan; actions execute it. Understanding this distinction is the key to performance. A well-structured DAG with proper partitioning runs 100x faster than naive code. The most common performance mistakes are: shuffles from unnecessary joins, data skew, and Python UDFs that prevent Catalyst optimization.

Setup

Spark Configuration

# Check and set configuration
print(f"Spark version: {spark.version}")
print(f"Cores: {spark.sparkContext.defaultParallelism}")
print(f"Default partitions: {spark.conf.get('spark.sql.shuffle.partitions')}")

# Tune for workload
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.sql.adaptive.enabled", "true")  # AQE
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

Key Techniques

1. DataFrame Operations

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Read Delta table
orders = spark.table("gold.orders")
customers = spark.table("gold.customers")

# Transformations chain
result = (
    orders
    .filter(F.col("order_date") >= "2026-01-01")
    .filter(F.col("status") == "COMPLETED")
    .withColumn("order_month", F.date_trunc("month", "order_date"))
    .groupBy("customer_id", "order_month")
    .agg(
        F.count("order_id").alias("order_count"),
        F.sum("amount").alias("total_spend"),
        F.avg("amount").alias("avg_order_value"),
        F.collect_set("product_category").alias("categories")
    )
)

# Window functions
window_spec = Window.partitionBy("customer_id").orderBy("order_date")

orders_with_running = orders.withColumn(
    "running_total", F.sum("amount").over(window_spec)
).withColumn(
    "order_rank", F.row_number().over(window_spec)
).withColumn(
    "days_since_prev", F.datediff(
        F.col("order_date"),
        F.lag("order_date").over(window_spec)
    )
)

2. Optimized Joins

# Broadcast join for small dimension table (< 100MB)
from pyspark.sql.functions import broadcast

result = orders.join(
    broadcast(customers),  # Broadcast small table to all executors
    orders.customer_id == customers.customer_id,
    "left"
)

# Avoid cartesian joins - always specify join condition
# BAD: orders.crossJoin(products)  # Cartesian product
# GOOD: orders.join(products, orders.product_id == products.product_id)

# Handle skewed joins (one key has millions of rows)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Salting for extreme skew
import random
salt_range = 10
orders_salted = orders.withColumn("salt", F.lit(random.randint(0, salt_range - 1)))
customers_exploded = customers.withColumn(
    "salt", F.explode(F.array([F.lit(i) for i in range(salt_range)]))
)
result = orders_salted.join(
    customers_exploded,
    (orders_salted.customer_id == customers_exploded.customer_id) &
    (orders_salted.salt == customers_exploded.salt)
).drop("salt")

3. UDF Best Practices

# PREFER: Built-in functions (Catalyst optimized)
result = df.withColumn("domain", F.split(F.col("email"), "@").getItem(1))

# OK: Pandas UDF (vectorized, uses Arrow)
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf("double")
def calculate_score(total_orders: pd.Series, total_revenue: pd.Series,
                    days_active: pd.Series) -> pd.Series:
    return (total_orders * 0.3 + total_revenue / 1000 * 0.5 +
            days_active / 365 * 0.2)

result = df.withColumn("score", calculate_score("total_orders", "total_revenue", "days_active"))

# AVOID: Regular Python UDF (serialization overhead, no optimization)
# @udf("string")  # Avoid when possible
# def parse_something(value):
#     return complex_python_logic(value)

4. Partitioning Strategy

# Check current partitioning
print(f"Partitions: {df.rdd.getNumPartitions()}")

# Repartition for write optimization
df.repartition(200, "order_date").write.format("delta").partitionBy("order_date").save(path)

# Coalesce for reducing partitions (no shuffle)
df.coalesce(10).write.format("delta").mode("overwrite").save(path)

# Repartition before large join (align partitions)
orders_repartitioned = orders.repartition(200, "customer_id")
customers_repartitioned = customers.repartition(200, "customer_id")
result = orders_repartitioned.join(customers_repartitioned, "customer_id")

5. Performance Tuning

# Enable Adaptive Query Execution
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Cache intermediate results used multiple times
active_customers = customers.filter(F.col("status") == "active").cache()
print(f"Active customers: {active_customers.count()}")  # Triggers cache

# Use with orders
result1 = orders.join(active_customers, "customer_id")
result2 = tickets.join(active_customers, "customer_id")

# Unpersist when done
active_customers.unpersist()

# Explain query plan
result.explain(True)  # Shows parsed, analyzed, optimized, and physical plans

# Persist to disk for very large intermediate results
df.persist(StorageLevel.DISK_ONLY)

Best Practices

  • Use built-in functions: 10-100x faster than UDFs because they run in JVM
  • Broadcast small tables: Avoid shuffle for joins with dimension tables under 100MB
  • Enable AQE: Adaptive Query Execution handles skew and partition coalescing
  • Cache wisely: Only cache DataFrames used multiple times; uncache when done
  • Filter early: Push predicates as early as possible to reduce data volume
  • Avoid collect(): Never collect large DataFrames to driver; use display() or write
  • Partition on write: Match partition scheme to common query patterns
  • Monitor in Spark UI: Check stage details for skew, spill, and shuffle read

Common Pitfalls

  • Python UDF bottleneck: Serializing data to Python and back is 10-100x slower
  • Shuffle explosion: Joining two large tables without aligned partitioning
  • collect() on large data: Bringing millions of rows to driver causes OOM
  • Cache without unpersist: Memory leak as cached DataFrames accumulate
  • Too many or too few partitions: 200 partitions for 100MB is wasteful; 10 for 1TB is slow

Anti-Patterns

  • for loop over rows: Iterating row-by-row defeats Spark's distributed processing. Use DataFrame operations.
  • toPandas() on large data: Converting a 10GB DataFrame to Pandas crashes the driver. Use Pandas UDFs.
  • RDD when DataFrame works: DataFrames have Catalyst optimizer; RDDs do not.
  • Nested UDFs: UDF calling another UDF. Combine into a single operation.

Install this skill directly: skilldb add databricks-skills

Get CLI access →