Article Image
Article Image
read

If you’ve ever asked “Why is my PySpark job slow on EMR?” the honest answer is usually: it’s not one thing. It’s a handful of small decisions that compound—cluster sizing, file layout, shuffle tuning, join strategy, and the never-ending battle with small files on S3.

This post is my “battle tested” checklist of PySpark best practices on AWS EMR (YARN + Spark). It’s intentionally practical: a mix of mindset, cluster design, and a set of defaults that keep you out of trouble.

The 3 rules of Spark on EMR

  1. Design for the shuffle. Most expensive Spark workloads are shuffle-bound, not CPU-bound. Your job’s performance is often a function of “how much data is moved” not “how much data is computed”.
  2. S3 is not HDFS. Reads/writes are slower, list operations can be painful, and commit semantics matter. File sizes and partitioning strategy are first-class concerns.
  3. The driver is a single machine. If the driver dies (OOM, network, huge plan, giant collect), the job dies. Protect the driver.

Pick the right EMR shape (before touching Spark configs)

EMR performance problems are often infra problems wearing a Spark costume.

  • Use instance fleets (or flexible instance groups) to blend On-Demand + Spot, but keep at least some stable On-Demand capacity for executors if the job is critical.
  • Prefer fewer, larger executors over many tiny ones when you do a lot of shuffle (too many executors often means too many shuffle files and too much overhead).
  • Storage matters:
    • If your workload spills (sorts, wide aggregations, joins), ensure adequate local disk (instance store or EBS) and don’t starve the container for disk.
    • For heavy shuffle, instance store can be great; for steady-state reliability, EBS can be simpler to reason about.
  • Networking matters: large shuffles are network-bound. A “compute optimized” box with weak network can disappoint.

Executor sizing: stop guessing

On EMR (YARN), you are choosing how many containers you want and how fat they should be.

My default approach:

  • Target 4–6 cores per executor for most ETL/SQL workloads.
  • Give the executor enough memory to avoid constant spilling, but don’t turn it into a single huge container (GC pain, fewer tasks in parallel).

Example mental model:

  • Node: 16 vCPU, 128 GB RAM
  • Leave headroom for OS/YARN/system daemons.
  • If you choose 4 cores/executor and ~24 GB/executor memory, you get ~3 executors per node (12 cores used, headroom left), which is often a good starting point.

Then validate with the Spark UI:

  • If you see lots of spilled bytes (memory + disk), increase executor memory or fix partitioning/shuffles.
  • If tasks are short and scheduling dominates, increase cores/executor (or reduce total executors).

A solid baseline spark-submit for EMR

Tune from a known good baseline instead of config roulette.

spark-submit \
  --deploy-mode cluster \
  --conf spark.sql.adaptive.enabled=true \
  --conf spark.sql.adaptive.coalescePartitions.enabled=true \
  --conf spark.sql.adaptive.skewJoin.enabled=true \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  --conf spark.dynamicAllocation.enabled=true \
  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
  --conf spark.sql.shuffle.partitions=800 \
  --conf spark.network.timeout=600s \
  --conf spark.sql.broadcastTimeout=1200 \
  --conf spark.driver.maxResultSize=2g \
  --conf spark.executor.cores=4 \
  --conf spark.executor.memory=24g \
  --conf spark.executor.memoryOverhead=4g \
  --conf spark.yarn.maxAppAttempts=1 \
  s3://YOUR_BUCKET/jobs/etl_main.py \
  --date 2026-01-20

Notes:

  • I generally enable AQE (spark.sql.adaptive.*) by default on Spark 3.x. It fixes a surprising amount of “oops, wrong shuffle partitions” pain.
  • I set spark.yarn.maxAppAttempts=1 for deterministic pipelines (retries can repeat side effects). If you want resiliency, use retries but write idempotently.
  • spark.sql.shuffle.partitions is workload dependent. If you don’t know, start higher than you think and let AQE coalesce.

DataFrames over RDDs (and avoid Python UDFs when possible)

For PySpark jobs on EMR, the fastest path is:

  • DataFrames + built-in functions (pyspark.sql.functions) over RDD transformations.
  • If you must do custom logic, prefer:
    • SQL expressions (often easiest)
    • pandas UDFs (vectorized) over standard Python UDFs

Standard Python UDFs are slow because every row crosses the Python/JVM boundary.

Partitioning strategy: the difference between minutes and hours

There are two partitioning problems:

  • Spark partitions (in-memory parallelism)
  • Data partitions on S3 (directory layout like dt=2026-01-20/region=us-east-1/…)

They are related, but not the same.

Spark partitions (in-memory)

  • Repartition before big shuffles when it reduces skew or improves parallelism.
  • Coalesce before writing if you’re producing too many output files.
from pyspark.sql import functions as F

df = df.repartition(800, F.col("customer_id"))   # useful before a join/agg keyed by customer_id
out = df.coalesce(200)                           # useful before write to reduce file count

Rule of thumb: aim for tasks that are tens of seconds, not milliseconds (too many) and not tens of minutes (too few / skew).

S3 partitioning (layout)

  • Partition on columns you filter on frequently (e.g., dt, region, event_type), but don’t over-partition (partition explosion).
  • Avoid ultra-high-cardinality partition columns (e.g., user_id) unless you absolutely need it.
  • If you frequently query by date, start with dt (or year/month/day) as the outer partition.

File format: Parquet (or Iceberg) and right-sized files

For most EMR ETL:

  • Parquet + Snappy is the default.
  • Iceberg/Hudi/Delta can be worth it when you need upserts, schema evolution, time travel, or compaction built-in.

The #1 S3 performance killer is small files.

Targets that generally behave well:

  • 128–512 MB Parquet files (compressed size varies; don’t obsess, just avoid 5 MB files)
  • A file count that is “reasonable” per partition (tens to low hundreds, not thousands)

Write example:

(df
 .write
 .mode("overwrite")
 .format("parquet")
 .option("compression", "snappy")
 .partitionBy("dt")
 .save("s3://YOUR_BUCKET/warehouse/events/"))

If you are overwriting a single partition, prefer dynamic partition overwrite instead of wiping the whole table path:

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

S3 committers: don’t ignore this

On EMR, writing to S3 efficiently and correctly depends on the committer and your Spark/Hadoop versions.

General guidance:

  • Use the EMR defaults unless you have a reason not to. EMR is opinionated here and usually does the right thing for that release line.
  • If you see slow “commit” phases or lots of rename/list operations, revisit your committer settings and table format.

If you do need to tune S3A commit behavior, do it intentionally (and test correctness on failures). A common starting point (varies by stack) is a directory committer:

--conf spark.hadoop.fs.s3a.committer.name=directory \
--conf spark.hadoop.fs.s3a.committer.staging.conflict-mode=append \
--conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2

Practical note: committers are one of those areas where “it works” and “it works under failure” are different things. If your pipeline is business-critical, test with executor loss and step retries.

Join strategy: broadcast with intent, not by accident

The fastest join is often a broadcast join—if the small side is truly small.

  • Use broadcast when the dimension table is small (MBs to low GBs depending on executor memory and cluster).
  • Avoid broadcasting “kinda small” tables that fluctuate and occasionally blow up.
from pyspark.sql import functions as F

dim = spark.read.parquet("s3://.../dim_customers/")
fact = spark.read.parquet("s3://.../fact_events/dt=2026-01-20/")

out = fact.join(F.broadcast(dim), on="customer_id", how="left")

Also watch for:

  • Skewed keys (a few keys dominate). Turn on AQE skew join and/or salt keys for extreme skew.
  • Exploding joins (many-to-many) that multiply row counts unexpectedly.

Aggregations and shuffles: fix skew first, then tune knobs

Before you touch shuffle knobs, check for skew:

  • In the Spark UI, look for stages where a few tasks take 10–100x longer than others.
  • If skew exists, increasing partitions just makes more tiny partitions plus a few monsters.

Common fixes:

  • Filter early (push down predicates, prune columns).
  • Pre-aggregate before a join if possible.
  • Repartition by the join key (and ensure keys are well-distributed).

Cache only when it actually saves work

Caching is not a performance cheat code. Cache if:

  • You reuse the same DataFrame multiple times, and
  • The recomputation cost is significant, and
  • You have memory headroom

Otherwise caching can just increase GC and eviction churn.

If you do cache, materialize intentionally:

df = df.persist()
_ = df.count()   # force evaluation once, then reuse

Avoid driver OOMs (the silent EMR job killer)

The most common driver mistakes:

  • collect() on large data
  • toPandas() on anything bigger than “tiny”
  • Huge groupBy().applyInPandas() results pulled back to the driver
  • Printing large DataFrames in logs

Safer patterns:

  • Use limit() before toPandas()
  • Use take(n) for small samples
  • Write outputs to S3, not back to the driver

Packaging dependencies on EMR (PySpark reality)

You have a few practical choices:

  • Ship a zipped Python package via --py-files (best for pure Python libs you control).
  • Use a bootstrap action to install dependencies on each node (works, but can drift unless pinned).
  • Use a packaged environment (e.g., a prebuilt virtualenv/conda archive) if your deps are heavy (numpy/pandas versions, etc.).

Simple pattern using --py-files:

spark-submit \
  --py-files s3://YOUR_BUCKET/jobs/deps/my_job_lib.zip \
  s3://YOUR_BUCKET/jobs/etl_main.py

Whatever you choose, make sure your job is reproducible. “It worked yesterday on that cluster” is not a strategy.

Observability: Spark UI + CloudWatch (minimum viable debugging)

When a job is slow, the answer is usually in:

  • Spark UI: stages, tasks, shuffle read/write, skew, spilled bytes, GC time
  • YARN logs: executor lost, container killed, disk full, preemption
  • CloudWatch (or Ganglia if you still run it): CPU, network, disk, memory pressure

If you only take one habit from this post: learn to read the Spark UI. It turns “Spark is slow” into a specific hypothesis you can test.

Production hygiene on EMR steps

Some practices that reduce “mystery failures”:

  • Idempotent writes: write to a temp location and atomically promote (or use table formats that handle commits).
  • Explicit schemas for critical tables (avoid schema inference on large data).
  • Pin EMR release versions for pipelines; upgrade intentionally.
  • Pin library versions (PyPI + JVM jars). Random upgrades are performance regressions waiting to happen.
  • Small integration tests on a small cluster before scaling.

A quick “do this every time” checklist

  • Cluster: enough disk for shuffle/spill, stable executors for critical jobs, right instance type for network-heavy shuffles
  • Spark: AQE enabled, executor sizing intentional, shuffle partitions sane
  • Data: Parquet (or Iceberg), avoid small files, partition by common filters, prune columns early
  • Code: DataFrames + built-ins, minimal Python UDFs, no accidental collect()
  • Ops: logs accessible, Spark UI saved, dependencies pinned, writes idempotent

That’s it for today. In my next post I will walks through a real Spark UI screenshot and shows how to diagnose skew, spill, and “too many small tasks” in under 10 minutes.

Thank you for reading.

Cheers!
Jason

Blog Logo

Jason Rich


Published

Image

NADEBlg!

Back to Overview