In my last post, I showed how to diagnose skew in Spark UI quickly. In this post, I want to show the real-world version: one skewed join key took a job that normally ran in ~12 minutes and pushed it past 2 hours.
The punchline: you don’t need to resize the cluster. You need to stop one partition from becoming “the entire dataset.”
The setup (what changed?)
This pipeline joined a large fact-like dataset (events) to a dimension-like dataset (accounts) on account_id.
Nothing about the cluster changed. Same instance types, same executor shape, same configs.
What changed was the data:
- A single
account_idbecame “hot” (think: a customer onboarding a massive integration, a backfill, a product bug spamming events, etc.). - That one key represented a huge fraction of rows for the day.
The symptom (what Spark UI showed)
The slowest stage was the join stage (a wide transformation). In Spark UI:
- Most tasks finished in a few minutes
- A tiny handful ran forever
- Those long tasks had:
- Massive Shuffle Read
- Massive Records Read
- Often spill (memory/disk) and high GC
This is the classic skew signature:
The cluster is “busy”, but progress is gated by 1–5 tasks that are doing 100x the work.
The root cause (why one key destroys runtime)
Spark partitions data during a join by hashing the join key. If one key has an extreme number of rows, then one partition gets an extreme number of rows.
So instead of “N partitions each processing ~1/N of the join”, you get:
- (N - 1) partitions: finish quickly
- (1) partition: does a huge join, spills repeatedly, and determines the wall-clock runtime
And importantly: more cluster resources don’t fix the fact that one partition is the bottleneck. You can add 10x executors and you’ll still be waiting on the same single slow task.
Confirm it’s skew (one quick check)
If you have access to Spark SQL, do a quick distribution check on the join key for the slow day:
-- replace table/column names as needed
select
account_id,
count(*) as c
from events
where dt = '2026-01-22'
group by account_id
order by c desc
limit 20;
If the top key is orders of magnitude larger than the median, you’ve found your culprit.
Fixes that work without resizing the cluster
There are three practical paths. Which one you choose depends on your Spark version and the shape of the data.
Fix #1: Turn on AQE + skew join handling (Spark 3.x)
If you’re on Spark 3.x, Adaptive Query Execution (AQE) can automatically detect skewed shuffle partitions and split them.
These are the first two configs I check:
--conf spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.skewJoin.enabled=true
If you’re using EMR, this is often the “fastest win” because it requires zero code changes.
Important note: AQE helps a lot, but if one key is truly “owns the world” skew, you may still need salting (next fix).
Fix #2: Broadcast the small side (when it’s actually small)
If accounts is small enough to broadcast reliably, you can avoid a shuffle-heavy join entirely.
In Spark SQL:
select /*+ BROADCAST(accounts) */
e.*,
a.account_tier
from events e
join accounts a
on e.account_id = a.account_id;
In PySpark:
from pyspark.sql.functions import broadcast
joined = events.join(broadcast(accounts), on="account_id", how="left")
This works great when:
- The dimension is stable and relatively small
- The broadcast won’t blow executor memory
If the “small side” isn’t small, broadcasting just moves the pain somewhere else.
Fix #3: Salt the join key (the “extreme skew” fix)
When one key is catastrophically skewed, the best approach is to spread that one key across multiple partitions by adding a synthetic “salt” bucket.
Conceptually:
- Add
salt = hash(row) % Nto the big side - Duplicate the small side across all salt values
- Join on
(key, salt)instead of justkey
Here’s a concrete PySpark pattern:
from pyspark.sql import functions as F
SALT_BUCKETS = 32 # tune based on skew severity and cluster size
# Big side: assign each row to a salt bucket
events_salted = (
events
.withColumn("salt", (F.pmod(F.xxhash64("some_row_unique_id"), F.lit(SALT_BUCKETS))).cast("int"))
)
# Small side: replicate each account_id across all salt buckets
salt_df = spark.range(SALT_BUCKETS).select(F.col("id").cast("int").alias("salt"))
accounts_salted = accounts.crossJoin(salt_df)
joined = (
events_salted
.join(accounts_salted, on=["account_id", "salt"], how="left")
.drop("salt")
)
Why this works:
- The hot key no longer lands in a single partition
- You convert “one impossible task” into “many reasonable tasks”
Tradeoff:
- You pay extra compute (replicating the small side), but you usually win big on wall-clock time.
Two additional “no new hardware” improvements that often stack
Pre-aggregate before the join
If the fact table has multiple rows per key but you only need aggregates, do this before the join:
- Fewer rows shuffled
- Less skew impact
Example:
with events_agg as (
select
account_id,
count(*) as event_count
from events
where dt = '2026-01-22'
group by account_id
)
select
a.account_id,
a.account_tier,
e.event_count
from accounts a
left join events_agg e
on a.account_id = e.account_id;
Repartition intentionally (don’t let the default decide)
If you know the join key is the bottleneck, be explicit:
events = events.repartition("account_id")
accounts = accounts.repartition("account_id")
This doesn’t “solve” skew by itself, but it often improves stability and predictability, especially when upstream steps left you with a terrible partitioning.
What I’d do first (fastest path to a 12-minute job again)
In order:
- Enable AQE + skew join (Spark 3.x) and re-run.
- If the dimension is small: broadcast it.
- If one key is still wrecking you: salt the join (the reliable fix).
That’s it. One hot join key can ruin your day, but you don’t need to throw money at the cluster to fix it; you need to make the work divisible again.
Thank you for reading.
Cheers!
Jason