PySpark Lab
Loading your interview arsenal…
Best on Desktop — Code playground works best on a laptop or PC.
PySpark Interview Lab — Free

Master PySpark for Data Engineering

Ace your Data Engineering interview. The complete free toolkit — no sign-up, no paywall, just practice.

60
Questions
9
Concepts
Live
Editor
Fresher Q01–Q20
Intermediate Q21–Q40
Experienced Q41–Q55
Coding Q56–Q60

📅 2006 – 2013 · The Hadoop Era
The Origin Story

What Existed Before PySpark?

Understanding the pain Spark solved is what separates a good engineer from a great one — and it's exactly what interviewers probe.

"Submitted the job at 9am. Still running at 4pm. Guess I'll grab another coffee." — A data engineer, circa 2010
The Theory

How MapReduce Actually Worked

Every intermediate result hit disk. A pipeline of 3 operations meant 9 disk read/writes — and in-memory processing simply didn't exist.

📂
Input
HDFS blocks · 128 MB chunks
1
🗺️
Map
Parallel tasks per data chunk
2
💾 Write local disk
🔀
Shuffle
Sort + send over network
3
💾 Read + Write disk
📉
Reduce
Aggregate grouped data
4
💾 Write HDFS
Output
Final result back on HDFS
5
💾 Disk ops per job
MapReduce
⚡ Disk ops per job
Spark (in-memory)
Spark replaced all those disk hits with in-memory DAG execution — data stays in RAM across the entire pipeline, with lineage tracked as a graph for fault tolerance.
45 min
Per simple job
"Submit before lunch, eat, come back, pray it finished."
200+
Lines of Java boilerplate
"More config XML than actual business logic."
Disk reads/writes per job
"RAM existed. Hadoop just refused to use it."
Same task — WordCount on 1TB of logs
☕ Java · Hadoop MapReduce 2006 – 2013
// WordCount — simplified. Real jobs: 200+ lines. public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context ctx) throws IOException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); ctx.write(word, one); // shuffle to disk } } } // ... 80 more lines (Reducer, Driver, XML config) }
⏱ 45 min per job 📝 200+ lines 💾 Disk I/O every stage
VS 100× FASTER
🐍 Python · PySpark 2014 – Present
# Same WordCount. In-memory. 5 lines. from pyspark.sql import SparkSession from pyspark.sql.functions import explode, split, col spark = SparkSession.builder.getOrCreate() result = ( spark.read.text("logs/*.txt") .select(explode(split("value", " ")) .alias("word")) .groupBy("word") .count() .orderBy(col("count").desc()) ) result.show(10) # Done. In memory. ✓
⚡ 23 sec same data 📝 5 lines 🧠 In-memory DAG
Never say "Python"

When asked why Spark is faster, the answer is in-memory DAG execution — not the language. MapReduce flushed every intermediate result to disk. Spark keeps it all in RAM across the whole pipeline.

Common mistake
🔢 The number to remember

A 3-stage MapReduce job does ~9 disk read/writes. Spark does 0. That single fact explains the 10–100× speedup — especially for iterative ML jobs that loop over data many times.

Know the stat
🤔 The trap follow-up

"Is Spark always better?" — No. MapReduce handles datasets that exceed cluster RAM and has simpler fault recovery. Strong candidates mention this tradeoff without being asked.

Interviewer trap
2004
Google MapReduce paper
2006
Hadoop open-sourced
2009
Spark born @ UC Berkeley
2014
Spark 1.0 — Apache official
2016
Spark 2.0 — DataFrames unified
Now
Spark 3.5 + Delta Lake
Fun Fact Spark sorted 100TB in 23 minutes (2014 world record). That single benchmark ended the Hadoop MapReduce era.
Why Python? Before PySpark, you needed Java or Scala to use Spark. PySpark opened the door for 8M+ Python data scientists.
Interview Gold "Why is Spark faster than MapReduce?" — Always comes up. Answer: in-memory DAG execution vs disk-based job chaining.
Why PySpark

The Engine Behind Big Data

Understand the fundamentals interviewers test — from the "why" to the architecture that makes Spark fast.

01
What is PySpark?
PySpark is the Python API for Apache Spark — the dominant open-source engine for distributed data processing. While pandas is limited to one machine's RAM, PySpark splits work across a cluster, letting you process terabytes or petabytes without changing your Python code style.
🌐
Distributed by default — splits data across 100s of machines and runs tasks in parallel, all managed for you.
💾
In-memory execution — data stays in RAM between steps; no disk write/read loop like MapReduce.
🐍
Python-native API — DataFrame syntax feels like pandas; Spark's Catalyst Optimizer handles the rest.
Lazy evaluation — transforms build a DAG; execution only triggers on an Action, letting Spark optimise the full pipeline first.
100×
faster than MapReduce on typical workloads
PB+
proven production scale at Netflix, Uber
🐼
pandas Single machine · RAM bound · ~128 GB practical limit
↓ SCALE UP
PySpark N machines · petabyte scale · fault-tolerant · auto-retry
used in production at
Netflix Databricks Uber LinkedIn Amazon Apple Google
02
Spark Architecture
Spark follows a master-worker model. Your Python code runs in the Driver — it builds a DAG, optimises it with Catalyst, and breaks it into Tasks dispatched to Executors. A Cluster Manager handles resource allocation. Executors keep hot data in memory for fast re-use.
📄  analysis.py  ·  SparkSession.builder().getOrCreate()
↓  submits job
⚙  DRIVER  —  master node  ·  one JVM per application
SparkSession / SparkContext entry point · application config · connection to cluster
DAG Scheduler logical plan → Catalyst Optimizer → physical plan → stages at shuffle boundaries
Task Scheduler assigns Tasks to Executors · monitors heartbeats · retries failures · data-locality aware
↕  resource negotiation       ↓  task dispatch & shuffle data
🗂  Cluster Manager
YARN
Kubernetes
Mesos
Standalone
allocates JVM
processes on
worker nodes
Worker Node 1
Executor · 4 cores
Task 1Task 2Task 3Task 4
Execution mem
Storage/Cache
Worker Node 2
Executor · 4 cores
Task 5Task 6Task 7Task 8
Execution mem
Storage/Cache
Worker Node N
Executor · N cores
Task …Task …
Execution mem
Storage/Cache
⇄  shuffle  —  network data exchange triggered by groupBy · join · orderBy · repartition
Your code
Driver (1 per app)
Cluster Manager
Executors (1+ per worker)
03
RDD → DataFrame → Dataset
RDD (Resilient Distributed Dataset): Low-level, untyped, no optimizer. Fault-tolerant via lineage.

DataFrame: Structured, named columns, schema-aware. Uses the Catalyst Optimizer — much faster than RDD in practice.

Dataset: Typed version of DataFrame. Available in Scala/Java; in PySpark, DataFrame IS the Dataset API.

Rule: Use DataFrame/Spark SQL by default. Drop to RDD only when you need custom low-level transformations.
API Evolution Timeline
RDD (2012) Low-level · Untyped · No optimizer · Fault-tolerant via lineage Baseline
Catalyst Optimizer added ↓
DataFrame (2015) Columnar · Schema-aware · SQL support · Auto-optimized 5–20× faster
Type-safety layer ↓
Dataset (JVM only) Typed · Compile-time checks · In PySpark: DataFrame IS the Dataset API Same as DF
PySpark tip: DataFrame is Dataset[Row] — you get type safety via schema without Scala generics.
04
Lazy Evaluation
When you call .filter(), .select(), or .groupBy(), Spark does nothing yet. It builds a logical plan (DAG). Execution only happens when you call an Action like .show(), .collect(), or .write().

Why? Spark can optimise the entire chain at once — reordering filters, eliminating unnecessary columns, merging stages — before touching any data. This is the Catalyst Optimizer at work.
Transformations (Lazy)
.filter()no data moved
.select()no data moved
.groupBy()builds plan
.join()builds plan
.withColumn()no data moved
Actions (Trigger Run)
.show()executes DAG
.collect()executes DAG
.count()executes DAG
.write()executes DAG
.take(n)executes DAG
DAG Execution Flow
.filter(age > 25)LAZY
.select("name", "region")LAZY
.groupBy("region")LAZY + SHUFFLE
.write() ← Action triggers!EXECUTES
Catalyst optimises entire chain, then runsDONE
Every action re-runs the full lineage — use .cache() to checkpoint expensive intermediate results.
05
Transformations vs Actions
Transformations (lazy — build DAG):
filter, select, groupBy, join, withColumn, drop, orderBy, distinct, union, repartition

Actions (trigger execution — return results):
show, collect, count, take, first, write, save, foreach, reduce

Key insight: every time you call an action, Spark re-executes the entire lineage from scratch — unless you .cache() or .persist() intermediate results.
Narrow vs Wide Transform
Narrow · .filter()
Each partition is independent
P0───▶P0'
P1───▶P1'
P2───▶P2'
✓ No shuffle · No network I/O
Wide · .groupBy()
Data crosses partition lines
⚠ Shuffle stage · Network + disk write
06
Catalyst + Tungsten
Catalyst Optimizer (4 stages):
1. Analysis — resolve column/table references
2. Logical Optimization — rule-based rewrites (filter pushdown, constant folding)
3. Physical Planning — choose execution strategy (broadcast vs sort-merge join)
4. Code Generation — generate optimised JVM bytecode

Tungsten Engine: Memory management (off-heap binary storage), whole-stage code generation, cache-aware computation. Together they make DataFrame 5–20× faster than equivalent RDD code.
Catalyst Pipeline
1
Analysis Resolve names, types, references
2
Logical Optimization Filter pushdown · Constant folding · Column pruning
3
Physical Planning Broadcast join vs sort-merge · Partition strategy
4
Code Generation Whole-stage JVM bytecode — no interpreter overhead
⚡ Tungsten: off-heap memory · cache-aware layout · 5–20× vs RDD
07
PySpark vs pandas
pandas: single machine, eager execution, in-memory, <50GB practical limit, rich ecosystem.

PySpark: distributed cluster, lazy execution, petabyte-scale, requires Spark setup.

Best of both worlds: pandas_udf (vectorized UDFs via Apache Arrow) + df.toPandas() for small result sets. Spark 3.2+ also supports pandas API on Spark (pyspark.pandas) for pandas-compatible syntax on distributed data.
pandas
Single machine
Eager execution
RAM-limited (~50GB)
Rich ecosystem
PySpark
Distributed cluster
Lazy execution
Petabyte-scale
Cloud-native
Key Rule Narrow transforms (filter, select, withColumn) have no shuffle — data stays on the same partition. Wide transforms (groupBy, join, distinct, orderBy) trigger a shuffle — data crosses the network. This distinction drives 80% of PySpark performance tuning questions.
Interview Edge
Always distinguish narrow vs wide transforms. Interviewers probe this in every Spark round — most candidates can't explain why groupBy is slow.

No account needed · Jump straight into the questions

Start Practising Free
60 Questions Live Playground Free — No Signup

How It Works

PySpark Data Flow

Step through each phase of a real PySpark job — from raw data on disk to the final aggregated result.

job.py — sales_report.py
1from pyspark.sql import SparkSession
2from pyspark.sql.functions import col, sum, avg
3 
4spark = SparkSession.builder.getOrCreate()
5 
6df = spark.read.parquet("s3://data/sales/")
7# 3 million rows, 3 partitions
8 
9df2 = df.filter(col("year") == 2024)
10df3 = df2.select("region", "amount")
11df4 = df3.withColumn("amt_usd", col("amount")*1.1)
12# Nothing runs yet — DAG is building
13 
14agg = df4.groupBy("region")
15 .agg(sum("amt_usd").alias("total"),
16 avg("amt_usd").alias("avg_sale"))
17 
18result = agg.orderBy("total", ascending=False)
19result.write.parquet("s3://out/report/")
20# Job complete — 5 rows written
📂 Read — Spark connects to source & divides data into partitions LAZY
📂 S3 Parquet 3M rows · 3 files on disk
schema read → partitions created
P0
1M rows
P1
1M rows
P2
1M rows
💡 No data moves yet. Spark reads only the Parquet schema (column names + types) and records the source path in the DAG. Executor cores are completely idle at this stage.
🔧 Transform — each operation adds a node to the plan, nothing executes ALL LAZY
📖 READ — parquet("s3://data/sales/") LAZY
🔍 FILTER — year = 2024 LAZY
📋 SELECT — region, amount LAZY
✏️ PROJECT — amt_usd = amount × 1.1 CATALYST
💡 Zero executors used. filter(), select(), withColumn() just add nodes to the logical plan. Catalyst Optimizer will merge and reorder these before any real work begins.
🔀 Shuffle — groupBy moves rows with the same key to the same executor WIDE TRANSFORM
Before shuffle
(rows mixed by region)
After shuffle
(one region per partition)
P0
region A,B,C
P1
region A,B,D
P2
region B,C,D
R-A
all region A
R-B
all region B
R-C/D
regions C,D
Expensive network I/O. Rows from every executor travel across the network to their destination executor. This creates a Stage boundary — Stage 1 must fully finish before Stage 2 can begin.
Action — write() is the trigger. Only now does Spark start executing. ACTION
1
DAG SubmittedSparkContext sends the full plan to the Scheduler
2
Catalyst OptimisesLogical plan → physical plan (predicate pushdown, col pruning)
3
Stage SplitShuffle boundary creates 2 Stages — tasks can't cross it
💡 Lines 6–17 produced zero output. They were all lazy — only write() on line 19 forced Spark to compile and run the entire job.
⚙️ Execute — Tasks run in parallel per stage across all executors RUNNING
Stage 1 — filter · select · project (narrow transforms, no shuffle needed)
Executor 1
Task 1 — Partition P0 (1M rows)
2 cores · 4 GB
Executor 2
Task 2 — Partition P1 (1M rows)
2 cores · 4 GB
Executor 3
Task 3 — Partition P2 (1M rows)
2 cores · 4 GB
Stage 2 — groupBy aggregate (starts only after Stage 1 shuffle completes)
Executor 1
Task 4 — All region A rows
Executor 2
Task 5 — All region B rows
Executor 3
Task 6 — Regions C & D rows
💡 All 3 executors run in parallel within each stage. The shuffle barrier between Stage 1 and Stage 2 is the bottleneck — not the compute itself.
Output — Aggregated result written to S3 as Parquet DONE
3,000,000 input rows (raw sales data)
5 output rows (aggregated)
regiontotal ($)avg_sale ($)
North America4,821,540312.40
Europe3,109,220287.15
Asia Pacific2,650,800241.30
Latin America980,440198.75
Middle East430,120176.50
Job complete in ~4.2 s. Result written to s3://out/report/. MapReduce doing the same job would take minutes — Spark keeps data in-memory across stages instead of writing to disk between each step.
LAZY spark.read.parquet() connects to S3 and reads only the schema — no data is moved to executors. Spark notes the source path in the DAG and moves on.
Stage 1 of 6

Deep Dive

Core Spark Concepts

Click any topic to expand the full explanation — these are the concepts interviewers probe deepest.

0 / 9 explored
Most AskedCatalyst optimizer, shuffle partitions & join strategies — 80% of senior interviews.
TipClick any concept to expand. Expand all before your interview day.
Depth MattersInterviewers love "why" answers. Know the internal mechanics, not just the API.
Partitioning & Shuffling
Analogy

Think of your DataFrame as a giant book. Spark photocopies it into chapters (partitions) and hands each chapter to a different reader (executor). All readers work simultaneously — the job finishes in the time it takes to read one chapter, not the whole book.

What is a Partition?

A partition is a logical chunk of your DataFrame stored on one executor node. Spark processes each partition as one Task. More partitions = more parallelism (up to a point).

DataFrame split across 4 executors in parallel
DataFrame — 4 million rows
splits into 4 partitions
Partition 0
1M
rows
Partition 1
1M
rows
Partition 2
1M
rows
Partition 3
1M
rows
each assigned to one executor
Executor 1
Task 1
Executor 2
Task 2
Executor 3
Task 3
Executor 4
Task 4
All 4 tasks run simultaneously

Default Partitions

spark.sql.shuffle.partitions defaults to 200 after every shuffle — a number tuned for large clusters and often wrong for yours.

Rule of thumb: aim for 128–256 MB per partition and 2–4× your total executor cores. A 10 GB dataset on 20 cores needs ~50 partitions, not 200. Over-partitioning causes scheduler overhead; under-partitioning wastes cores.

repartition() vs coalesce()

repartition(n)
  • Full shuffle — data crosses the network
  • Can increase or decrease partition count
  • Produces equal-sized partitions
  • Use to rebalance skewed data or increase partitions
coalesce(n)
  • Minimal shuffle — merges local partitions only
  • Can only decrease partition count
  • May produce uneven partitions
  • Use before writing to reduce output file count

What Causes a Shuffle?

A shuffle moves data across the network between executors — the most expensive operation in Spark. It involves serialization, disk I/O, and network transfer.

  • Wide transformations trigger shuffles: groupBy, join, distinct, orderBy, repartition
  • Narrow transformations are free: filter, select, withColumn — each partition works independently, no network traffic

Reduce shuffles: filter and select early (before joins), broadcast small tables (broadcast(df)), and pre-partition datasets on join keys to avoid repeated reshuffling.

Caching & Persistence
Analogy

Without caching, asking Spark the same question twice is like a chef cooking the entire meal from scratch both times you ask. With cache, the first batch is kept warm — the second serving is instant.

Why Cache?

Spark's lazy evaluation re-runs the full transformation chain for every Action. Calling .count() and then .show() on the same DataFrame reads the source file and reruns every transform — twice. Cache breaks this cycle.

Without cache — 2× work
S3 readfilterjoin.count()
S3 readfilterjoin.show()

Source read + transforms run twice

With cache — 1× work
S3 readfilterjoin[cached]
[cached].count()
[cached].show()

Source read once, reused for both actions

cache() vs persist()

df.cache() is shorthand for df.persist(StorageLevel.MEMORY_AND_DISK). Use persist() when you need a specific storage level.

Storage Levels

MEMORY_ONLY
Fastest · drops if OOM
MEMORY_AND_DISK
Safe default
MEMORY_ONLY_SER
Smaller footprint
DISK_ONLY
Slow · saves RAM
OFF_HEAP
No GC pressure

Checkpointing vs Caching

Cache
  • Stored on executor memory / disk
  • Lineage is preserved
  • Lost if executor dies
  • Fast — no remote write
Checkpoint
  • Saved to HDFS / S3
  • Lineage is truncated — Spark forgets how it got there
  • Survives executor failure
  • Use in long iterative jobs and Streaming
Joins & Broadcast Strategy
Analogy

Broadcast join = photocopying a small menu and handing a copy to each chef. Each chef has everything they need and never leaves their station. Zero network traffic for the big table.

Sort-merge join = sending all chefs to a central filing cabinet, sorted alphabetically, to find their matching order. Works for any size, but there's a lot of walking (network shuffle).

Join Types

  • inner — only rows that match on both sides
  • left / right — all rows from one side; nulls where no match on the other
  • full — all rows from both sides, nulls wherever no match exists
  • left_semi — left rows where a match exists in right (equivalent to WHERE EXISTS)
  • left_anti — left rows where no match in right (equivalent to WHERE NOT EXISTS)
  • cross — Cartesian product: every left row × every right row — expensive, use deliberately

Join Strategies

Broadcast Hash Join
  • Small table (<10 MB default) copied to every executor
  • Large table has zero shuffle
  • Fastest strategy; best for dimension tables
  • Force it: broadcast(df) hint
  • Threshold: spark.sql.autoBroadcastJoinThreshold
Sort-Merge Join
  • Both sides shuffled by join key, then sorted
  • Default for large ↔ large joins
  • Predictable and memory-efficient
  • Pre-bucketing on join key eliminates the shuffle entirely

Shuffle Hash Join

Shuffles both sides on the join key, then builds a hash map on the smaller side. Faster than sort-merge when one side is clearly smaller but too big to broadcast. Needs more memory — can OOM on very large datasets.

Broadcast Variable vs Broadcast Join

A broadcast variable (sc.broadcast(value)) ships any read-only Python object — a dict, lookup map — to all executors once. A broadcast join is the DataFrame-level application of this: ship a small DataFrame so Spark never needs to shuffle the large one.

Data Skew & Salting
Analogy

Skew is like 4 checkout lanes at a supermarket — but one cashier has 200 people in line and the others have 5 each. The store closes when the last customer is served, so the busiest lane sets your total wait time. The other 3 cashiers are wasted.

What is Data Skew?

Skew occurs when some partition keys carry vastly more rows than others. During groupBy or join, one executor processes 80% of the data while others sit idle. Your job's duration = the slowest task's duration.

Skewed partitions — one task takes 100× longer
P0 · 7.8M rows
P1 · 80K
P2 · 60K
P3 · 70K

Executors 2–4 finish in seconds and sit idle. Job waits on P0.

Detecting Skew

df.groupBy("join_key").count().orderBy("count", ascending=False).show(10)

Also check Spark UI → Stages → Task metrics. If the 75th-percentile task duration is 2s but the max is 4 minutes, you have a hot key.

Salting — How to Fix It

Append a random integer ("salt") to the hot key to spread it across N sub-partitions, run the partial aggregation, then strip the salt and do the final rollup.

import pyspark.sql.functions as F

SALT = 10  # spread hot key across 10 sub-partitions

# 1 — salt the large (skewed) DataFrame
df_salted = df.withColumn("salt", (F.rand() * SALT).cast("int")) \
              .withColumn("salted_key", F.concat(col("join_key"), F.lit("_"), col("salt")))

# 2 — explode the small (lookup) DataFrame to match every salt value
lookup_exploded = lookup.withColumn("salt", F.explode(F.array([F.lit(i) for i in range(SALT)]))) \
                        .withColumn("salted_key", F.concat(col("join_key"), F.lit("_"), col("salt")))

# 3 — join on salted key, then aggregate as normal
result = df_salted.join(lookup_exploded, "salted_key").drop("salt", "salted_key")

Other Solutions

  • Broadcast join — if the smaller side fits in memory, broadcast it; eliminates the shuffle entirely
  • AQE Skew Handling (Spark 3+) — Spark automatically splits skewed partitions at runtime (spark.sql.adaptive.skewJoin.enabled=true); handles it without code changes
  • Isolate hot keys — filter the skewed key out, process it separately, then union the results
Window Functions
Analogy

groupBy collapses your team of 10 into a single summary row. Window functions keep all 10 rows intact while giving each employee a new column showing how they rank within their own department — without collapsing anyone.

What are Window Functions?

Window functions compute values across a sliding window of rows related to the current row — without reducing the row count. Unlike groupBy + agg, every input row produces exactly one output row with new computed columns.

Defining a Window

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank, row_number, lag, lead, sum, avg

# Partition by dept, order by salary — highest first
w = Window.partitionBy("dept").orderBy(col("salary").desc())

df.withColumn("rank",         rank().over(w))       \
  .withColumn("dense_rank",   dense_rank().over(w)) \
  .withColumn("row_num",      row_number().over(w)) \
  .withColumn("prev_salary",  lag("salary", 1).over(w)) \
  .withColumn("running_sum",  sum("salary").over(
      w.rowsBetween(Window.unboundedPreceding, Window.currentRow)))

rank() vs dense_rank() vs row_number()

These three look similar but behave differently when rows tie. The table below shows the same dataset through each lens:

NameSalaryrank()dense_rank()row_number()
Alice90,000111
Bob90,000112
Carol75,000323
Dave60,000434
Gap after tie (skips 2)No gap everAlways unique

Interview trap: "Get the top-N per group" questions — use row_number() for strictly unique results, rank() to include all tied entries at the cutoff position.

Frame Specification

rowsBetween(start, end) counts physical rows (position-based). rangeBetween(start, end) counts by value distance. Use Window.unboundedPreceding / Window.unboundedFollowing to span the entire partition.

Delta Lake Basics
Analogy

Delta Lake is like Git for your data files. Every write adds a commit to a transaction log. You can check out any past version, roll back a bad write, and multiple readers/writers work safely at the same time without corrupting each other.

What is Delta Lake?

Delta Lake is an open-source storage layer that adds ACID transactions to data lakes. It sits on top of Parquet files and adds a _delta_log/ JSON transaction log that records every change — inserts, updates, deletes, schema changes.

Delta Lake storage structure
Data Files
part-0001.parquet
part-0002.parquet
part-0003.parquet
_delta_log/
00000.json — v0 schema
00001.json — insert
00002.json — update
Checkpoints
00010.checkpoint.parquet
state snapshot
(every 10 commits)

ACID in Delta Lake

  • Atomicity: writes are all-or-nothing — a failed write is rolled back, no partial data lands
  • Consistency: schema enforcement blocks bad data at write time
  • Isolation: snapshot isolation — readers always see a consistent view even during concurrent writes
  • Durability: committed transactions persist in the log on S3 / ADLS / HDFS

Time Travel

# Read a past version
df = spark.read.format("delta").option("versionAsOf", 5).load("/data/table")

# Or by timestamp
df = spark.read.format("delta").option("timestampAsOf", "2025-01-01").load("/data/table")

# Rollback to version 3
DeltaTable.forPath(spark, "/data/table").restoreToVersion(3)

Z-Ordering (Clustering)

Z-ordering co-locates related data into the same Parquet files. Queries with filters on a Z-ordered column skip entire files — reading far less data.

delta_table.optimize().executeZOrderBy("user_id", "event_date")

OPTIMIZE + Z-ORDER after large bulk loads. Run VACUUM to delete old Parquet files and reclaim storage (default retention: 7 days).

Structured Streaming
Analogy

Think of a Kafka topic as a conveyor belt that never stops. Structured Streaming reads the belt in short bursts (micro-batches), processes each burst exactly like a batch query, and remembers where it stopped so it can pick up where it left off if the job crashes.

The Unbounded Table Model

Structured Streaming treats a live stream as an unbounded table that grows as new events arrive. You write a normal batch query; Spark executes it incrementally in micro-batches under the hood.

Micro-batch execution model
SOURCE
Kafka / S3
MICRO-BATCH
trigger every N sec
QUERY
filter / agg / join
SINK
Delta / Console

Trigger Modes

  • Trigger.ProcessingTime("10 seconds") — micro-batch every 10 seconds; standard choice
  • Trigger.Once() — process all pending data then stop; batch + streaming hybrid pattern
  • Trigger.AvailableNow() — like Once but splits into multiple micro-batches (Spark 3.3+)
  • Trigger.Continuous("1 second") — millisecond latency, experimental; uses continuous processing engine

Watermarking — Handling Late Data

Streaming aggregations must bound the state they keep in memory. Watermarking tells Spark: "discard events that arrive more than X late", capping memory growth.

df.withWatermark("event_time", "10 minutes") \
  .groupBy(window(col("event_time"), "5 minutes"), col("user_id")) \
  .agg(count("*").alias("event_count"))

Checkpointing for Fault Tolerance

Structured Streaming checkpoints progress (Kafka offsets, state store) to durable storage. A crash + restart picks up exactly where the job left off — guaranteeing exactly-once semantics end-to-end when combined with an idempotent sink.

Always set a checkpoint location in production: .option("checkpointLocation", "s3://bucket/checkpoints/job1"). Without it, a restart replays from the beginning and duplicates data at the sink.


PySpark Cheat Sheet

Most-used APIs, patterns, and interview tricks — all in one place.

Copy-Paste ReadyEvery snippet here is interview-tested. Adapt, don't memorize.
Hot ComboNULL handling + Window functions appear together in 60% of take-home tasks.
Watch OutAlways specify partitionBy in window specs — interviewers check this.
NULL Handling
dropna()Drop rows with any/all NULLs. subset=["col"] for specific cols
fillna(val)Replace NULLs with a value or dict of column→value
coalesce(c1,c2)Returns first non-null value across columns (SQL COALESCE)
isNull()filter(col("x").isNull()) — find NULL rows
isNotNull()filter(col("x").isNotNull()) — drop NULL rows
na.replace()Replace specific values: df.na.replace(0, None)
Aggregations
count()count("*") = all rows · count("col") = non-null
sum / avgsum("salary"), avg("salary"), mean("salary")
max / minmax("col"), min("col") — ignores NULLs
countDistinctcountDistinct("col") — unique non-null values
collect_listGroup values into array (duplicates kept)
collect_setGroup unique values into array (like a set)
Window Functions
rank()Tied rows get same rank; next rank skips (1,1,3)
dense_rank()Tied rows get same rank; no gap (1,1,2)
row_number()Always unique; ties broken arbitrarily (1,2,3)
lag(col, n)Previous row's value (n rows back)
lead(col, n)Next row's value (n rows ahead)
sum().over(w)Running total: rowsBetween(unboundedPreceding, 0)
Join Types
innerOnly matching rows from both sides
left / rightAll rows from left/right + matched from other
full / outerAll rows from both sides, NULLs where no match
left_semiRows in left that EXIST in right (no right cols)
left_antiRows in left that DON'T EXIST in right
crossCartesian product — every row × every row (CAUTION: massive output)
Partitioning
repartition(n)Full shuffle → can increase or decrease. Use to balance skew.
coalesce(n)No shuffle → can ONLY decrease. Use before write.
cache()= persist(MEMORY_AND_DISK). Survives up to 2 failures.
persist(level)MEMORY_ONLY · DISK_ONLY · MEMORY_AND_DISK · OFF_HEAP
unpersist()Always call when done — frees executor memory immediately
getNumPartitionsdf.rdd.getNumPartitions() — check current partition count
Read / Write
read.csvheader=True, inferSchema=True, sep=","
read.parquetColumnar, splittable — use for prod. Reads schema auto.
read.jsonmultiLine=True for multi-line JSON docs
write modesoverwrite · append · ignore · error (default)
partitionBy.write.partitionBy("year","month").parquet(...)
format("delta")Delta Lake: ACID, time travel, schema enforcement
Window Function TemplatePattern
from pyspark.sql.functions import rank, dense_rank, row_number, lag
from pyspark.sql.window import Window

# Define window: partition by dept, order by salary desc
w = Window.partitionBy("dept").orderBy(col("salary").desc())

df.withColumn("rnk",       rank().over(w))       # 1,1,3
  .withColumn("dense",     dense_rank().over(w)) # 1,1,2
  .withColumn("rn",        row_number().over(w)) # 1,2,3
  .withColumn("prev_sal",  lag("salary",1).over(w))

# Running total (cumulative sum)
w_cum = Window.partitionBy("dept")\
              .orderBy("salary")\
              .rowsBetween(Window.unboundedPreceding, 0)
df.withColumn("running_total", sum("salary").over(w_cum))
Deduplication PatternPattern
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

# Keep latest record per user_id
w = Window.partitionBy("user_id")\
          .orderBy(col("updated_at").desc())

deduped = df\
  .withColumn("rn", row_number().over(w))\
  .filter(col("rn") == 1)\
  .drop("rn")

# Alternative: dropDuplicates (picks arbitrary row)
df.dropDuplicates(["user_id"])  # ← no control over WHICH row kept
Broadcast JoinPerformance
from pyspark.sql.functions import broadcast

# Option 1: explicit broadcast hint (recommended)
result = large_df.join(broadcast(small_df), "id")

# Option 2: SQL hint
result = large_df.join(small_df.hint("broadcast"), "id")

# Option 3: set threshold (default 10MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50m")

# Verify in plan (look for BroadcastHashJoin)
result.explain()  # physical plan shows join strategy
Salting (Skew Fix)Performance
from pyspark.sql.functions import concat, lit, rand, floor, array, explode

N = 10  # salt buckets

# Step 1: salt large table
big = big_df.withColumn("salt_key",
  concat(col("dept"), lit("_"), floor(rand()*N).cast("int")))

# Step 2: explode small table across all buckets
small = small_df\
  .withColumn("salt_arr", array([lit(i) for i in range(N)]))\
  .withColumn("s", explode("salt_arr"))\
  .withColumn("salt_key", concat(col("dept"), lit("_"), col("s")))

# Step 3: join on salt_key
big.join(small, "salt_key").drop("salt_key")
Interview Tricks to Remember
Narrow vs Wide
Narrow = each input partition → one output partition. No shuffle. (filter, select, map)
Wide = multiple input partitions → one output partition. Causes shuffle. (groupBy, join, distinct)
rank vs dense_rank
Two people tie for 1st:
rank() → 1, 1, 3 (skips 2)
dense_rank() → 1, 1, 2 (no gap)
row_number() → 1, 2, 3 (always unique)
repartition vs coalesce
repartition(n) = full shuffle, can go up or down. Use to fix skew or increase parallelism.
coalesce(n) = no shuffle, only goes DOWN. Use before writing to reduce file count.
cache vs persist
cache() = persist(MEMORY_AND_DISK). Simple and good default.
persist(level) = you choose: MEMORY_ONLY (fastest, OOM risk), DISK_ONLY (slow, safe), OFF_HEAP (no GC).
Catalyst 4 Stages
Remember: A-L-P-C
Analysis → Logical Optimization → Physical Planning → Code Generation (Tungsten JVM bytecode)
Shuffle Partitions
Default is 200 — usually too high for small data, too low for big data.
Rule of thumb: 2-4 partitions per CPU core. Check with df.rdd.getNumPartitions()
AQE in 3 bullets
1. Dynamic coalescing — merges small shuffle partitions at runtime
2. Skew join handling — splits big skewed partitions
3. Join strategy switch — can downgrade SMJ → BHJ if runtime size fits
When to use broadcast
Table < 10 MB (default autoBroadcastJoinThreshold) → Spark auto-broadcasts.
Rule: if one side ≪ other side (lookup tables, dimension tables) → always hint broadcast explicitly.
left_semi vs inner join
inner join = returns all columns from both tables.
left_semi = returns only left-side rows that match, with only left-side columns. Equivalent to EXISTS in SQL. No duplicates from right side.
OOM Checklist
1. Too many partitions → increase shuffle.partitions
2. Skew → salt or broadcast
3. Huge collect() → use write() instead
4. Python UDF → switch to built-in or pandas_udf
5. Cache explosion → call unpersist()
Transformations → Actions
Transformations build the DAG (lazy). Actions trigger execution.
Common actions: show(), collect(), count(), take(n), write.*(), toPandas(), foreach()
Each action re-runs the full lineage unless cached.
UDF vs pandas_udf
Regular UDF: row-by-row, Python pickle serialization → slow.
pandas_udf: vectorized via Apache Arrow, batch processing → 10-100× faster.
Use @pandas_udf(returnType) decorator.

New · PySpark World

Learn PySpark,
the human way

Not a dry tutorial. Each concept taught with story, visual, code, quiz — then a live practice problem. One chapter at a time.

✓ Ch.01: Lazy Evaluation 🔒 Ch.02: Partitions 🔒 Ch.03: DataFrames 🔒 Ch.04: Joins 🔒 Ch.05: Windows 🔒 Ch.06: Performance
Start Chapter 1 Free

Free · No signup required · More chapters releasing soon

Chapter 01 · Available Now

Lazy Evaluation & the DAG

Why Spark refuses to work until you make it — and why that makes it fast.

1Story
2Visual
3Code
4Quiz
5Practice
Restaurant analogy — why lazy beats eager
Excalidraw DAG diagram with Catalyst optimizations
Annotated code + real explain() output
5-question quiz with instant answer reveal
Live playground practice problem
Open Chapter 1

Interview Questions

PySpark Question Bank

60 questions from Amazon, Databricks, TCS, Uber and more — Fresher to Senior level.

🎯
Interview Readiness 0 / 60 reviewed
0%
Real InterviewsQuestions sourced from actual Amazon, Databricks & Uber interview rounds.
StrategyStart with Easy → Medium. Hard questions build on these foundations.
Run CodeClick "Try the Code" — the editor opens in-place so you never lose your spot.
Level
Diff
60 questions
Jnext   Kprev
60 PySpark Questions Locked
Your complete interview prep is waiting inside — open the treasure to begin your journey from Fresher to Senior.
60 Questions Fresher → Senior Live Playground Free — No Signup
01
FresherEasyTCSInfosys
What is PySpark and why is it used?
Python API for Apache Spark — distributed big data processing that scales beyond a single machine's memory.

Answer

PySpark is the Python API for Apache Spark — a distributed computing engine that processes data across many machines simultaneously.

  • Scale: handles terabytes and petabytes — far beyond what pandas can load into a single machine's RAM
  • Speed: in-memory computation is up to 100× faster than Hadoop MapReduce for iterative workloads
  • Fault tolerance: if a machine fails mid-job, Spark automatically recomputes lost data using lineage (the DAG)
  • Unified engine: batch processing, streaming, ML (MLlib), SQL, and graph (GraphX) all in one framework
Quick Start
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MyApp").master("local[*]").getOrCreate()
df = spark.read.csv("data.csv", header=True, inferSchema=True)
df.filter(df.salary > 50000).groupBy("dept").count().show()
Interview Edge
When asked "why PySpark over pandas?" say: "pandas is limited to one machine's RAM — PySpark distributes across a cluster. For datasets above ~10 GB, PySpark is the right choice. For smaller datasets, pandas is actually faster because there is no serialization overhead." This shows you know the tradeoff, not just the buzzword.
02
FresherEasyAmazonTCS
Explain Apache Spark's architecture
Driver → Cluster Manager → Worker Nodes with Executors → Tasks. Execution: Application → Jobs → Stages → Tasks.

Answer

Driver: Master process — runs your code, creates SparkSession, builds the DAG, coordinates execution, collects results.
Cluster Manager: Manages cluster resources (YARN, Kubernetes, Standalone, Mesos).
Executors: JVM processes on worker nodes — run Tasks, store cached data.
Hierarchy: Application → Jobs (one per Action) → Stages (split by shuffles) → Tasks (one per partition)

03
FresherEasyInfosysWipro
What is an RDD?
Resilient Distributed Dataset — immutable, fault-tolerant distributed collection. The foundational abstraction of Spark.

Answer

An RDD is an immutable, fault-tolerant distributed collection of objects. Fault-tolerant via lineage — if a partition is lost, Spark recomputes it from the original source. Type-unsafe, no Catalyst optimizer. Prefer DataFrames; use RDD only for custom low-level operations or unstructured data (text, binary).

RDD Example
rdd = spark.sparkContext.parallelize([1,2,3,4,5], numSlices=3)
result = rdd.map(lambda x: x*2).filter(lambda x: x%2==0)
print(result.collect())  # [2, 4, 6, 8, 10]
04
FresherEasyAmazonAccenture
What is a DataFrame? How does it differ from an RDD?
Distributed table with named columns + schema + Catalyst optimization. Always faster than equivalent RDD for structured data.

Answer

A DataFrame is a distributed collection with named, typed columns — like a relational table. Uses Catalyst + Tungsten optimizers → significantly faster than RDD. Has SQL compatibility. Immutable — transformations return new DataFrames. Use DataFrames by default for all structured/semi-structured data.

05
FresherEasyTCSGoogle
What is SparkSession? How do you create one?
Unified entry point for all Spark since 2.0. Replaces SparkContext + SQLContext. Use .getOrCreate() to avoid duplicates.
SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("MyApp") \
    .master("yarn") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()  # returns existing or creates new
print(spark.version)  # 3.5.0
06
FresherEasyAmazonDatabricks
What is lazy evaluation? What is the DAG?
Transformations build a plan (DAG) but don't execute. An Action triggers Catalyst to optimize and execute the full plan.
Start here — the restaurant analogy

When you order food at a restaurant, the kitchen does not start cooking the second you speak. The waiter writes your order down. Only when the waiter takes the slip to the kitchen — "serve it now" — does any cooking happen.

PySpark works exactly the same way. Calling filter(), select(), or groupBy() is writing down your order. Nothing touches the actual data. The DAG is the written order slip. An Action (show(), count(), collect()) is handing that slip to the kitchen — that is when PySpark reads data, runs calculations, and returns a result.

This design lets PySpark look at your entire chain of steps before executing any of them, allowing the Catalyst optimizer to rewrite the plan in ways a human would rarely think of.

Execution flow — Transformations build the DAG, Actions fire it
filter() lazy select() lazy groupBy() lazy zero bytes read so far DAG / Catalyst optimizes rewrites plan show() fires execution ← ACTION

Formal Definition

Lazy Evaluation — PySpark does not execute transformations when they are called. Each call (filter, select, groupBy…) adds a node to the DAG but touches zero data.

DAG (Directed Acyclic Graph) — the dependency graph of all pending transformations. "Directed" means data flows in one direction (no cycles). Spark's Catalyst optimizer reads this entire graph and rewrites it before anything runs — pushing filters earlier, dropping unused columns, merging stages.

  • Global optimization: Catalyst can reorder steps automatically — e.g., apply a filter before a join to reduce data volume, even if you wrote it the other way
  • Avoid wasted work: If you write 5 transformations then never call an action, PySpark does exactly zero work
  • Fault recovery: Lost partitions are rebuilt by replaying the DAG lineage from the original source — no data duplication needed
  • Actions that trigger execution: show(), count(), collect(), take(n), write.*(), toPandas()
Step-by-step example — 10 million employee rows

You have a CSV with 10 million employee records and 50 columns. You write these 3 lines:

  1. df.filter(df.age > 30) — PySpark records "keep rows where age > 30". Nothing is read from disk.
  2. .select("name","dept","salary") — records "keep only 3 columns". Still nothing runs.
  3. .groupBy("dept").count() — records "group by dept and count". Still nothing runs.

Catalyst then inspects the entire plan. It notices: "I only need to read 3 columns from disk out of 50, and I can filter rows before grouping — this reduces data by ~70% before any shuffling." Then you call .show(). Only now does Spark read data — already with the Catalyst-optimized plan. On 10M rows this optimization can reduce execution time from minutes to seconds.

Inspect the Execution Plan
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("LazyDemo").getOrCreate()

# Create sample data — imagine this is a 10M row file
data = [("Alice",35,"Eng",90000),("Bob",28,"HR",60000),
        ("Carol",42,"Eng",110000),("Dave",25,"Eng",70000)]
df = spark.createDataFrame(data, ["name","age","dept","salary"])

# These 3 lines build the DAG — NOTHING executes yet
plan = df.filter(df.age > 30) \
         .select("name","dept","salary") \
         .groupBy("dept").count()

# See what Catalyst will actually run (optimized plan)
plan.explain()         # physical plan only
plan.explain(extended=True)  # logical + optimized + physical

# ← THIS is the Action that triggers execution
plan.show()            # data is read and processed here
Interview Edge
What separates a good answer from a great one: Most candidates say "transformations are lazy." The follow-up that impresses interviewers is explaining why this matters: "Catalyst sees the full plan before executing, so it can apply optimizations a developer would never code manually — like reading only 3 of 50 columns from a Parquet file, or pushing a filter before a shuffle. That's the real performance win."
07
FresherEasyInfosysMeta
Transformations vs Actions — 5 examples of each
Transformations: lazy → new DataFrame. Actions: trigger execution → result to driver or storage.

Answer

Transformations (lazy): filter, select, groupBy, join, withColumn, orderBy, distinct, union, repartition, map, flatMap

Actions (trigger execution): show(), collect(), count(), take(n), first(), write.*(), toPandas(), foreach(), reduce()

Narrow: filter, select, map — no shuffle, each partition maps to one output partition.
Wide: groupBy, join, distinct, orderBy — cause shuffle across executors, much more expensive.

08
FresherEasyTCSWipro
How do you read and write data in PySpark?
spark.read.format().load() and df.write.format().mode().save(). Know the 4 write modes: overwrite, append, ignore, error.
Read & Write
# Read
df = spark.read.csv("data.csv", header=True, inferSchema=True)
df = spark.read.parquet("s3://bucket/data/")
df = spark.read.format("delta").load("s3://bucket/delta/")
# Write (modes: overwrite, append, ignore, error)
df.write.mode("overwrite").parquet("output/")
df.write.mode("append").partitionBy("year","month").parquet("output/")
09
FresherEasyTCSAccenture
How do you handle NULL values in PySpark?
dropna(), fillna(), isNull()/isNotNull(), coalesce() — four different tools for different null scenarios.
NULL Handling
from pyspark.sql.functions import col, when, coalesce, lit, count
df.dropna()                               # drop rows with ANY null
df.dropna(subset=["name","salary"])     # drop if specific cols null
df.fillna({"salary":0, "dept":"Unknown"}) # fill per column
df.withColumn("pay", coalesce(col("salary"), col("base_pay"), lit(0)))
# Count nulls per column
df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns]).show()
10
FresherEasyAmazonGoogle
What is a UDF (User-Defined Function)?
Custom Python function for DataFrame columns. Flexible but slow — bypasses Catalyst, JVM↔Python serialization per row.

Answer

A UDF applies a custom Python function to DataFrame columns. Downside: row-at-a-time processing + JVM↔Python serialization per row → 10–100× slower than built-in functions. Always prefer Spark's built-in pyspark.sql.functions. For performance, use pandas_udf (vectorized, Arrow-based).

UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
band_udf = udf(lambda s: "High" if s >= 100000 else "Mid" if s >= 50000 else "Low", StringType())
df.withColumn("band", band_udf("salary")).show()
11
FresherEasyDatabricksInfosys
PySpark vs pandas — key differences
Distributed vs single-machine. Lazy vs eager. No row index in PySpark. Bridge: toPandas() and createDataFrame().

Answer

  • Execution: pandas eager; PySpark lazy (DAG)
  • Scale: pandas single machine RAM; PySpark petabyte-scale across cluster
  • Mutability: pandas mutable; PySpark DataFrames immutable
  • Row index: pandas has index; PySpark has no inherent row order
  • Bridge: df.toPandas() (small results only), spark.createDataFrame(pdf), Spark 3.2+ pyspark.pandas API
12
FresherEasyAmazonTCS
What is the difference between cache() and persist()?
cache() = persist(MEMORY_AND_DISK). persist() lets you choose storage level. Both avoid recomputation on repeated actions.
Cache vs Persist
from pyspark import StorageLevel
df_hot = df.join(other,"id").groupBy("dept").agg(...)
df_hot.cache()           # MEMORY_AND_DISK shorthand
df_hot.count()           # action triggers caching
df_hot.show()            # uses cache — fast!
df_hot.unpersist()       # release when done
df.persist(StorageLevel.MEMORY_ONLY_SER)  # smaller footprint
df.persist(StorageLevel.OFF_HEAP)         # avoid GC pressure
13
FresherEasyWiproAccenture
What file formats does PySpark support? Which is preferred?
Parquet for analytics, Delta for data lakes with ACID, Avro for Kafka/streaming, CSV/JSON only for ingestion.

Answer

  • Parquet (recommended): columnar, compressed, schema-embedded, predicate pushdown, column pruning.
  • Delta Lake: Parquet + ACID + time travel + schema enforcement. Best for data lakes.
  • Avro: row-based, excellent for streaming/Kafka, schema evolution.
  • ORC: similar to Parquet, popular with Hive.
  • CSV/JSON: use only at ingestion — slow, no schema, no compression.
14
FresherEasyTCSInfosys
How do you create a DataFrame from a list, CSV, or pandas?
spark.createDataFrame(), spark.read.csv(), rdd.toDF(), spark.range() — multiple entry points.
DataFrame Creation
# From list
df = spark.createDataFrame([("Alice",30),("Bob",25)], ["name","age"])
# From pandas
import pandas as pd
df = spark.createDataFrame(pd.DataFrame({"name":["Alice"],"age":[30]}))
# From RDD
df = sc.parallelize([("Alice",30)]).toDF(["name","age"])
# Quick range (1M rows)
df = spark.range(1000000)
df.printSchema(); df.show(5)
15
FresherEasyTCSInfosys
Core DataFrame operations — select, filter, groupBy, withColumn
The building blocks of every PySpark pipeline. when() / otherwise() for conditional logic.
Core Operations
from pyspark.sql.functions import col, avg, count, when, desc
df.select("name", (col("salary")*1.1).alias("new_salary"))
df.filter((col("dept")=="Eng") & (col("age")>25))
df.withColumn("grade", when(col("score")>=90,"A").when(col("score")>=70,"B").otherwise("C"))
df.groupBy("dept").agg(count("*").alias("n"),avg("salary").alias("avg_sal")) \
  .orderBy(desc("avg_sal")).show()
df.dropDuplicates(["employee_id"])
df.withColumnRenamed("salary","annual_pay")
16
IntermediateMediumAmazonDatabricks
Explain the Catalyst Optimizer and its 4 stages
Analysis → Logical Optimization → Physical Planning → Code Generation. This is why DataFrames beat RDDs in performance.

Answer

Catalyst is Spark's query optimizer — it automatically rewrites and optimizes DataFrame queries before execution:

  1. Analysis: Resolves column names, table references, and data types against the catalog. Catches errors early.
  2. Logical Optimization: Rule-based rewrites — predicate pushdown (filter early), constant folding, null propagation, column pruning (drop unused columns).
  3. Physical Planning: Chooses the execution strategy — which join type (broadcast vs sort-merge), partition strategy. Generates multiple physical plans and picks the cheapest one using Cost-Based Optimization (CBO).
  4. Code Generation: Tungsten's whole-stage code generation compiles the physical plan to optimized JVM bytecode — eliminates virtual function calls, uses CPU registers efficiently.

This is why identical logic runs 5–20× faster as a DataFrame than as an RDD — RDDs bypass all four stages.

17
IntermediateMediumAmazonUber
What is Broadcasting and when should you use it?
Send a small DataFrame to every executor — eliminates shuffle for the large table in a join. Threshold: 10MB default (autoBroadcastJoinThreshold).

Answer

Broadcasting sends a small, read-only table to every executor node so the large table never needs to shuffle across the network. Use when one side of a join is small enough to fit in executor memory (<10MB by default, configurable via spark.sql.autoBroadcastJoinThreshold). Typical speedup: 3–8× on fact-dimension joins.

Broadcast Join
from pyspark.sql.functions import broadcast
# large_df: 10GB fact table; dim_df: 5MB dimension table
result = large_df.join(broadcast(dim_df), "dept_id", "inner")
# dim_df sent to each executor once — no shuffle on large_df

# Tune threshold (default 10MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50m")
# Disable auto-broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
18
IntermediateMediumAmazonNetflix
Window Functions in PySpark — concept, syntax, and examples
Calculations across a row window without collapsing rows. rank() vs dense_rank() vs row_number() — know the difference.

Answer

Window functions compute values across a set of rows related to the current row — without collapsing them into one row (unlike groupBy+agg). Each row keeps its identity while gaining computed columns. Use cases: ranking, running totals, moving averages, lag/lead comparisons.

rank() vs dense_rank() vs row_number():
rank(): gaps after ties: 1,1,3; dense_rank(): no gaps: 1,1,2; row_number(): always unique: 1,2,3

Window Functions
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank, row_number, lag, sum, col

w = Window.partitionBy("dept").orderBy(col("salary").desc())
df.withColumn("rank",        rank().over(w)) \
  .withColumn("dense_rank",  dense_rank().over(w)) \
  .withColumn("row_num",     row_number().over(w)) \
  .withColumn("prev_salary", lag("salary",1).over(w)) \
  .withColumn("running_sum", sum("salary").over(
      w.rowsBetween(Window.unboundedPreceding, Window.currentRow))) \
  .show()
19
IntermediateMediumTCSAmazon
repartition() vs coalesce() — deep comparison
repartition(): full shuffle, can increase or decrease. coalesce(): minimal shuffle, only decreases. Use coalesce() before write.

Answer

  • repartition(n): full shuffle — data crosses network; can increase OR decrease partition count; produces equal-sized partitions. Use to rebalance skewed data or increase parallelism.
  • coalesce(n): merges local partitions with minimal shuffle; can only DECREASE partition count; may produce uneven partitions. Use before writing to reduce output file count.
  • Default shuffle partitions: spark.sql.shuffle.partitions = 200. Tune to 2–4× executor cores for your cluster size.
Repartition vs Coalesce
df.repartition(200)              # full shuffle, 200 equal partitions
df.repartition(50, "dept")      # shuffle + partition by column
df.coalesce(10)                  # merge locally, minimal shuffle
# Before writing: reduce small files
df.coalesce(1).write.mode("overwrite").parquet("output/")
# Check current partition count
print(df.rdd.getNumPartitions())
20
IntermediateMediumAmazonLinkedIn
What is Data Skew and how do you handle it?
Uneven data distribution causing hot partitions. Solutions: salting, broadcast joins, AQE skew handling, separate processing of hot keys.

Answer

Data skew: some partition keys have vastly more rows than others. During groupBy/join, one executor processes 80% of the data while others are idle. Job time = slowest task.
Detect: df.groupBy("key").count().orderBy("count",ascending=False).show(). Check Spark UI → Stages → Task metrics for outlier task durations.

Solutions:
1. Salting: add random prefix to keys, partial agg, remove prefix, final agg
2. Broadcast join: if small side fits in memory, broadcast it
3. AQE (Spark 3.0+): spark.sql.adaptive.skewJoin.enabled=true auto-splits skewed partitions
4. Separate hot keys: filter hot keys out, process separately, union results

21
IntermediateMediumUberLinkedIn
What is the Salting technique for skewed data?
Add random integer prefix to skewed keys → partial aggregation → remove prefix → final aggregation. Spreads hot partitions evenly.
Salting for Skewed groupBy
import pyspark.sql.functions as F
SALT = 10
# Step 1: add random salt (0–9) to key
df_s = df.withColumn("salted_key",
    F.concat(col("join_key"), F.lit("_"), (F.rand()*SALT).cast("int")))
# Step 2: partial aggregation on salted key
partial = df_s.groupBy("salted_key").agg(F.sum("amount").alias("part_sum"))
# Step 3: strip salt, final aggregation
result = partial \
    .withColumn("orig_key", F.split(col("salted_key"),"_")[0]) \
    .groupBy("orig_key").agg(F.sum("part_sum").alias("total"))
22
IntermediateMediumInfosysAmazon
Broadcast Variables vs Accumulators
Broadcast: read-only shared lookup sent to all executors once. Accumulator: write-only counter/sum aggregated back to driver.

Answer

Broadcast Variable: read-only data structure (dict, list) sent to every executor once and cached in memory. Workers can read it efficiently. Use for lookup tables, config maps, ML model weights.

Accumulator: workers can only add to it (write-only); the driver reads the final accumulated value. Use for counters and sums across tasks. Note: accumulators in transformations may be double-counted on re-execution — use inside foreachPartition for reliability.

Broadcast & Accumulator
# Broadcast variable
lookup = {"NYC":"New York", "LA":"Los Angeles"}
bc = spark.sparkContext.broadcast(lookup)
df.rdd.map(lambda r: bc.value.get(r.city, "Unknown")).collect()

# Accumulator
null_counter = spark.sparkContext.accumulator(0)
def count_nulls(row):
    if row.salary is None: null_counter.add(1)
df.rdd.foreach(count_nulls)
print(f"Null salaries: {null_counter.value}")
23
IntermediateMediumAmazonFlipkart
All join types in PySpark — with use cases
inner, left, right, full, left_semi, left_anti, cross — know what each returns and when to use it.

Answer

  • inner: only rows matching in both — standard join
  • left/right: all from one side, nulls for non-matching other side
  • full/outer: all rows from both sides, nulls where no match
  • left_semi: rows from left WHERE match exists in right (like WHERE EXISTS) — does NOT include right columns
  • left_anti: rows from left WHERE NO match in right (like WHERE NOT EXISTS) — great for finding missing records
  • cross: Cartesian product, every left row × every right row — use with extreme care
Join Types
df_a.join(df_b, "id", "inner")
df_a.join(df_b, "id", "left")
df_a.join(df_b, "id", "full")
df_a.join(df_b, "id", "left_semi")   # a's rows where id exists in b
df_a.join(df_b, "id", "left_anti")   # a's rows where id NOT in b
# Multi-column join
df_a.join(df_b, ["dept","year"], "inner")
24
IntermediateMediumTCSGoogle
Schema inference vs explicit schema — when to use each?
Schema inference reads data to determine types (slow, error-prone). Explicit StructType is faster, safer, and recommended for production.

Answer

inferSchema=True: Spark reads a sample of the data, guesses types. Convenient for exploration but adds an extra pass over the data — slow on large files, may guess wrong types (e.g., "001" read as integer instead of string).

Explicit schema: Define with StructType/StructField. Faster (no inference pass), predictable, catches schema mismatches early. Use in production always.

Explicit Schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
schema = StructType([
    StructField("emp_id",   StringType(),    nullable=False),
    StructField("name",     StringType(),    nullable=True),
    StructField("salary",   DoubleType(),    nullable=True),
    StructField("dept",     StringType(),    nullable=True),
    StructField("hire_date",TimestampType(), nullable=True),
])
df = spark.read.schema(schema).csv("employees.csv", header=True)
df.printSchema()
25
IntermediateMediumAmazonDatabricks
How does fault tolerance work in PySpark?
RDD lineage + recomputation. Checkpointing truncates lineage. DataFrames also have lineage stored in the physical plan.

Answer

RDD Lineage: Spark records every transformation that produced each RDD. If an executor dies and a partition is lost, Spark uses the lineage to recompute only that partition from the last stable data — no need to restart the entire job.

Checkpointing: Saves an RDD/DataFrame to HDFS/S3 and truncates the lineage. Required for very long lineages (iterative ML algorithms) or Structured Streaming (to recover stream state). Unlike cache, checkpointing survives executor failure permanently.

Replication: Each Spark block can be replicated across executors (default: no replication for performance). Delta Lake adds further durability with ACID transaction log.

26
IntermediateMediumGoogleDatabricks
What is Predicate Pushdown and Column Pruning?
Filter pushed to storage layer to reduce data read. Column pruning reads only needed columns. Both are automatic in Catalyst for Parquet/Delta.

Answer

Predicate Pushdown: Catalyst pushes filter conditions down to the data source (Parquet, Delta, JDBC). The storage layer only returns rows that pass the filter — never reading filtered rows at all. For Parquet: uses min/max statistics to skip entire row groups. For Delta: also uses partition pruning and Z-order data skipping.

Column Pruning: Only reads columns referenced in the query. If your CSV has 100 columns but you select("name","salary"), Catalyst tells the reader to skip the other 98. For columnar formats (Parquet, ORC), this is a massive I/O reduction.

Both happen automatically — no code change needed. Avoid df.rdd and Python UDFs which break these optimizations.

27
IntermediateMediumDatabricksAmazon
What are pandas_udf (vectorized UDFs)? Why are they faster?
Apache Arrow-based batched UDFs — process column chunks at once instead of row-at-a-time. 10–100× faster than regular Python UDFs.

Answer

Regular Python UDFs: JVM serializes each row → Python processes one row → deserializes result back → repeat for every row. Massive overhead.

pandas_udf: Uses Apache Arrow to transfer entire column chunks between JVM and Python in one step (zero-copy). Python processes a pandas Series/DataFrame at once using vectorized operations. Result: 10–100× faster, uses Python's full numerical ecosystem.

pandas_udf
from pyspark.sql.functions import pandas_udf
import pandas as pd
from pyspark.sql.types import DoubleType

# Scalar pandas_udf — operates on pandas Series
@pandas_udf(DoubleType())
def apply_tax(salary: pd.Series) -> pd.Series:
    return salary * 0.7  # vectorized — no loop!

df.withColumn("take_home", apply_tax("salary")).show()

# Grouped pandas_udf — group → pandas DataFrame → pandas DataFrame
@pandas_udf(df.schema)
def normalize_dept(pdf: pd.DataFrame) -> pd.DataFrame:
    pdf["salary"] = (pdf["salary"] - pdf["salary"].mean()) / pdf["salary"].std()
    return pdf
df.groupBy("dept").applyInPandas(normalize_dept, schema=df.schema).show()
28
IntermediateMediumAmazonTCS
PySpark job optimization — 10-point checklist
Broadcast joins, DataFrame over RDD, caching, partition tuning, avoid Python UDFs, filter early, explain() to inspect plans.

Answer

  1. Broadcast small tables — eliminate shuffle for small-large joins
  2. Use DataFrames over RDDs — get Catalyst + Tungsten for free
  3. Filter early — reduce data volume before expensive joins/aggregations
  4. Cache reused DataFrames — avoid recomputing expensive intermediate results
  5. Tune shuffle partitions — spark.sql.shuffle.partitions should be 2–4× executor cores
  6. Use Parquet/Delta — enables predicate pushdown and column pruning
  7. Avoid Python UDFs — use built-in functions or pandas_udf instead
  8. Partition on write — partitionBy("year","month") for better read performance
  9. Use explain() — inspect the physical plan to find missing filters or expensive joins
  10. Enable AQE — spark.sql.adaptive.enabled=true for automatic runtime optimization
29
IntermediateMediumAmazonNetflix
Complex data types — ArrayType, MapType, StructType with explode()
Nested data is common in JSON ingestion. explode() flattens arrays to rows. Dot notation for nested structs.
Complex Types
from pyspark.sql.functions import explode, col, map_keys, map_values

# Schema with nested types
json_data = """{"user":"Alice","tags":["sql","spark"],"scores":{"math":95,"eng":88}}"""
df = spark.read.json(sc.parallelize([json_data]))
df.printSchema()
# root: user: string, tags: array[string], scores: struct{eng,math}

# Explode array → one row per element
df.select("user", explode("tags").alias("tag")).show()
# +-----+-----+   +-----+-----+
# |user |tag  |   |Alice|sql  |
# +-----+-----+   |Alice|spark|

# Access nested struct fields
df.select("user", col("scores.math"), col("scores.eng")).show()
30
IntermediateMediumAmazonWalmart
Pivot: convert rows to columns
df.groupBy().pivot().agg() — turns category values into column headers. Handle nulls with fillna after pivot.
Pivot Example
from pyspark.sql.functions import sum

# Source: user | product | amount
# Goal: user | Electronics | Clothing | Food
pivoted = df.groupBy("user") \
    .pivot("product", ["Electronics","Clothing","Food"]) \
    .agg(sum("amount")) \
    .fillna(0)
pivoted.show()
# Specify values for performance (avoids full scan to find distinct values)
31
ExperiencedHardDatabricksGoogle
What is Adaptive Query Execution (AQE)?
Runtime re-optimization in Spark 3.0+. Adjusts join strategies, coalesces partitions, handles skew — based on actual runtime statistics, not estimates.

Answer

AQE (Spark 3.0+, default ON in 3.2+) re-optimizes query plans at runtime using actual data statistics collected during execution — not static estimates.

Three key features:

  1. Dynamic Coalescing: automatically coalesces small shuffle partitions after each shuffle stage, reducing the 200 default partitions to the actual optimal number
  2. Dynamic Skew Handling: detects skewed partitions at runtime and automatically splits them into smaller sub-tasks (spark.sql.adaptive.skewJoin.enabled=true)
  3. Dynamic Join Strategy Switching: can switch from sort-merge to broadcast join mid-execution if one side turns out to be small enough after filtering
Enable AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# AQE is ON by default in Spark 3.2+ and Databricks Runtime 8.0+
32
ExperiencedHardDatabricksAmazon
Delta Lake ACID properties — how is each implemented?
Transaction log (_delta_log) is the foundation. Snapshot isolation for readers. Optimistic concurrency control for writers.

Answer

  • Atomicity: every write operation is recorded in the transaction log (_delta_log) as a single JSON commit. If a write fails mid-way, the partial data is ignored — the commit entry never appears in the log.
  • Consistency: schema enforcement (rejectIfNotMatching) prevents bad data. Schema evolution is opt-in. Delta validates data types on every write.
  • Isolation: Snapshot Isolation — readers always see a consistent snapshot of the table as of when their query started, even during concurrent writes. Uses optimistic concurrency control for write-write conflicts.
  • Durability: commits are persisted in durable storage (S3, ADLS, HDFS) before acknowledgement. The log is the source of truth — the actual Parquet files are immutable and append-only.
33
ExperiencedHardDatabricksNetflix
Time travel in Delta Lake — mechanism and use cases
VERSION AS OF / TIMESTAMP AS OF. Stored in _delta_log. Use for auditing, rollback, and ML reproducibility.

Answer

Delta Lake's transaction log records every change with a version number and timestamp. Time travel lets you query data as it existed at any past point by reading the transaction log to reconstruct the table state at that version.

Use cases: audit trails (who changed what, when), rollback after bad writes, reproduce ML experiments with same training data snapshot, compare data before/after pipeline run.

Time Travel
# By version number
df = spark.read.format("delta").option("versionAsOf", 5).load("/data/table")
# By timestamp
df = spark.read.format("delta").option("timestampAsOf", "2025-01-01").load("/data/table")
# SQL syntax
spark.sql("SELECT * FROM my_table VERSION AS OF 5")
spark.sql("SELECT * FROM my_table TIMESTAMP AS OF '2025-01-01'")
# Rollback (restore previous version)
from delta.tables import DeltaTable
DeltaTable.forPath(spark, "/data/table").restoreToVersion(5)
34
ExperiencedHardUberLinkedIn
Structured Streaming architecture — unbounded table model
Treats a stream as an unbounded, append-only table. Write batch-like queries; Spark handles incremental execution. Trigger modes control latency.

Answer

Structured Streaming models a live data stream as an unbounded table — new data arrives as new rows. You write a static batch query; Spark handles incremental execution automatically in micro-batches.

Trigger modes: ProcessingTime("10 seconds") — batch every 10s; Once() — process all available, then stop; AvailableNow() — like Once but multi-batch (Spark 3.3+); Continuous("1 second") — millisecond latency (experimental)

Output modes: Append (only new rows), Complete (all rows rewritten), Update (only changed rows)

Streaming Example
from pyspark.sql.functions import window, col, count
# Read from Kafka
stream_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events").load()
# Aggregate: events per 5-minute window
query = stream_df \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(window(col("event_time"),"5 minutes"), "user_id") \
    .agg(count("*").alias("events")) \
    .writeStream.outputMode("append") \
    .format("delta").option("checkpointLocation","/checkpoints/q") \
    .trigger(processingTime="10 seconds").start()
query.awaitTermination()
35
ExperiencedHardUberAmazon
Watermarking in Structured Streaming — when and why?
Threshold for late data tolerance. Bounds state size. Without watermarking, Spark must keep state forever — memory grows unbounded.

Answer

Watermarking defines the maximum delay Spark will wait for late-arriving data: "data older than MAX(event_time) - watermark_threshold is too late and will be dropped."

Without watermarking, stateful operations (window aggregations, stream-stream joins) must keep state for all time windows indefinitely → unbounded memory growth → OOM crash.

Trade-off: larger watermark = more state kept = more late data accepted = higher memory use. Smaller watermark = less state = lower latency = some late data dropped.

Requirement: watermark MUST be set before append output mode works with aggregations (Spark enforces this).

36
ExperiencedHardDatabricksGoogle
Dynamic Partition Pruning (DPP)
Eliminates partitions not needed based on join condition filters — massive speedup for star schema queries. Spark 3.0+ with AQE.

Answer

DPP (Spark 3.0+) eliminates partitions from the probe side of a join based on the result of filtering the build side. Example: if you join a 10TB fact table with a dimension table and filter WHERE dim.category = 'Electronics', DPP uses that filter to skip non-Electronics partitions in the fact table before reading them — potentially scanning <1% of the data instead of 100%.

Requires: one side is partitioned on the join key; the query has a selective filter on the other side; AQE enabled.

37
ExperiencedHardAmazonNetflix
Broadcast vs Sort-Merge vs Shuffle Hash Join — when does Spark choose each?
Broadcast: small table (<10MB) — fastest, no shuffle. Sort-Merge: large-large — stable, scalable. Shuffle Hash: medium tables — needs memory.

Answer

  • Broadcast Hash Join: one side ≤ autoBroadcastJoinThreshold (10MB). Small table broadcast to all executors. No shuffle on large side. Fastest. Force with broadcast() hint.
  • Shuffle Hash Join: both sides shuffled by join key, then smaller side builds an in-memory hash map per partition. Faster than sort-merge but requires hash table to fit in memory. Chosen when one side is significantly smaller.
  • Sort-Merge Join: both sides shuffled and sorted by join key, then merged. Memory-efficient (streams rather than builds hash). Default for large-large joins. Can pre-sort/bucket to eliminate shuffle.
  • Broadcast Nested Loop Join: fallback for non-equi joins (e.g., range conditions). Very slow — O(n×m).

Force a strategy: df.hint("broadcast"), df.hint("merge"), df.hint("shuffle_hash")

38
ExperiencedHardDatabricksAmazon
Python UDF vs pandas_udf — architecture and performance comparison
Python UDF: row-at-a-time via Pickle. pandas_udf: batch via Arrow. 10–100× speed difference. Both bypass Catalyst.

Answer

Python UDF pipeline (slow): JVM executor → pickle serialize one row → Python worker → deserialize → apply function → serialize result → JVM. 2 serializations per row, Python process overhead, row-at-a-time.

pandas_udf pipeline (fast): JVM executor → Apache Arrow zero-copy batch transfer → Python worker receives pandas Series → vectorized numpy operations → Arrow transfer back. One serialization for thousands of rows, uses SIMD CPU instructions via numpy.

When to use each: Built-in functions > pandas_udf > Python UDF. Scala UDFs are even faster than pandas_udf (no JVM→Python boundary) but require Scala knowledge.

Both bypass Catalyst — the optimizer cannot see inside a UDF function body, so it can't push predicates into it or optimize across UDF boundaries.

39
ExperiencedHardDatabricksAmazon
Z-ordering in Delta Lake — what it does and when to use it
Co-locates related data in the same Parquet files for data skipping. Reduces files scanned for point lookups. Best for high-cardinality columns.

Answer

Z-ordering is a data clustering technique that co-locates rows with similar values of specified columns in the same Parquet files. Delta Lake then records the min/max values of those columns per file in the transaction log. When you query with a filter on a Z-ordered column, Delta uses data skipping to eliminate entire files without reading them.

When to use: high-cardinality columns you frequently filter on (user_id, product_id, order_id). Columns already used for partitioning don't benefit further. Works best when the table is regularly OPTIMIZEd.

Limitation: Z-ordering on multiple columns is a trade-off — effectiveness decreases as more columns are added. Typically <= 4 columns.

Z-Order Optimization
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "/data/events")
# Compact small files + Z-order by user_id
dt.optimize().executeZOrderBy("user_id", "event_date")
# SQL equivalent
spark.sql("OPTIMIZE events ZORDER BY (user_id, event_date)")
40
ExperiencedHardUberLinkedIn
Exactly-once semantics in Structured Streaming
Idempotent sink + checkpointing + offset tracking. Delta Lake as sink gives exactly-once out of the box.

Answer

Exactly-once = each input message produces exactly one output effect, even if the system crashes and restarts. Structured Streaming achieves this through:
1. Checkpointing: saves processed offsets (e.g., Kafka offset) to durable storage. On restart, reads from last committed offset — no re-processing of old messages.
2. Idempotent sinks: if a micro-batch is re-run due to failure, the sink must be safe to write twice. Delta Lake (ACID) and HDFS/S3 file sinks support this natively.
3. End-to-end exactly-once: Kafka source (committed offsets) + checkpointing + Delta Lake sink = full exactly-once guarantee.

41
CodingMediumAmazonFlipkart
Remove duplicate rows — keep the one with the latest timestamp
Window + ROW_NUMBER partitioned by ID, ordered by timestamp DESC, then filter WHERE rn = 1.
Deduplication
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, desc

w = Window.partitionBy("employee_id").orderBy(desc("updated_at"))

deduped = df.withColumn("rn", row_number().over(w)) \
            .filter(col("rn") == 1) \
            .drop("rn")

deduped.show()
# For simple dedup (no timestamp preference):
df.dropDuplicates(["employee_id"])
42
CodingEasyAmazonUber
Broadcast join — implement and verify it's used
from pyspark.sql.functions import broadcast — wrap the small DataFrame and use explain() to confirm BroadcastHashJoin in the plan.
Broadcast Join
from pyspark.sql.functions import broadcast

# Large fact table: millions of rows
fact_df = spark.read.parquet("s3://data/transactions/")
# Small dimension table: <10MB
dim_df = spark.read.parquet("s3://data/departments/")

result = fact_df.join(broadcast(dim_df), "dept_id", "inner")

# Verify BroadcastHashJoin appears in physical plan
result.explain()
# Physical Plan should show: BroadcastHashJoin, BuildRight
result.show()
43
CodingMediumAmazonNetflix
Top-N records per group using ROW_NUMBER
Window.partitionBy("dept").orderBy(desc("salary")), filter rn <= N, drop the row_number column.
Top-N Per Group
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, desc

N = 3
w = Window.partitionBy("dept").orderBy(desc("salary"))

top_n = df.withColumn("rn", row_number().over(w)) \
          .filter(col("rn") <= N) \
          .drop("rn")

top_n.orderBy("dept", desc("salary")).show()
# Returns top 3 earners per department
44
CodingEasyTCSInfosys
Conditional column with when() / otherwise()
Equivalent to SQL CASE WHEN. Chain multiple when() calls before the final otherwise(). Nest inside withColumn().
Conditional Logic
from pyspark.sql.functions import when, col

df.withColumn("grade",
    when(col("score") >= 90, "A")
    .when(col("score") >= 80, "B")
    .when(col("score") >= 70, "C")
    .otherwise("F")) \
.withColumn("salary_band",
    when(col("salary") >= 100000, "High")
    .when(col("salary") >= 50000,  "Mid")
    .otherwise("Low")).show()
45
CodingEasyTCSAccenture
Define explicit schema with StructType and read JSON
Always define schema explicitly in production. StructType([StructField(...)]) — no inferSchema needed.
Explicit Schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

schema = StructType([
    StructField("emp_id",    StringType(),    nullable=False),
    StructField("name",      StringType(),    nullable=True),
    StructField("salary",    DoubleType(),    nullable=True),
    StructField("dept",      StringType(),    nullable=True),
    StructField("hire_date", TimestampType(), nullable=True),
])
df = spark.read.schema(schema).json("s3://data/employees.json")
df.printSchema()
df.show(5)
46
CodingMediumAmazonDatabricks
Flatten nested JSON (explode arrays + dot notation for structs)
explode() one row per array element. col("struct.field") for nested struct access. explode_outer() preserves rows with empty arrays.
Flatten Nested JSON
from pyspark.sql.functions import explode, explode_outer, col

# JSON: {"user":"Alice", "orders":[{"id":1,"amt":50},{"id":2,"amt":80}], "addr":{"city":"NY"}}
df = spark.read.json("orders.json")

# Explode array: one row per order
df_exploded = df.select(
    col("user"),
    explode("orders").alias("order"),      # drops rows with empty arrays
    col("addr.city").alias("city")          # dot notation for struct
)

# Flatten nested struct fields
df_flat = df_exploded.select(
    "user", "city",
    col("order.id").alias("order_id"),
    col("order.amt").alias("amount")
)
df_flat.show()
47
CodingMediumAmazonWalmart
Pivot: convert transaction rows to wide-format columns
groupBy().pivot().agg() — always specify pivot values explicitly for performance, then fillna(0).
Pivot
from pyspark.sql.functions import sum

# Input: user | product_category | amount
# Output: user | Electronics | Clothing | Food

pivoted = df.groupBy("user") \
    .pivot("product_category", ["Electronics","Clothing","Food"]) \
    .agg(sum("amount")) \
    .fillna(0)  # replace nulls (categories user never bought)

pivoted.show()
# Specifying values avoids a full scan to find distinct category values
48
CodingHardUberLinkedIn
Implement salting for a skewed groupBy
Add random salt 0–N to key → partial agg → strip salt → final agg. Distributes hot partition across N executors.
Salting for Skewed GroupBy
import pyspark.sql.functions as F

SALT_BUCKETS = 20  # spread hot key across 20 partitions

# Step 1: add random salt to the skewed column
df_salted = df.withColumn("salt", (F.rand() * SALT_BUCKETS).cast("int")) \
              .withColumn("salted_key",
                  F.concat(col("customer_id"), F.lit("_"), col("salt")))

# Step 2: partial aggregation on salted key
partial = df_salted.groupBy("salted_key", "product") \
    .agg(F.sum("amount").alias("partial_sum"))

# Step 3: strip salt prefix, final aggregation
result = partial \
    .withColumn("customer_id", F.split(col("salted_key"), "_")[0]) \
    .groupBy("customer_id", "product") \
    .agg(F.sum("partial_sum").alias("total_amount"))
result.show()
49
CodingMediumTCSInfosys
Data validation framework — null checks, row counts, schema validation
Reusable quality checks: row count, null percentage per column, duplicate detection, schema match.
Data Validation
from pyspark.sql.functions import col, count, when, sum as spark_sum

def validate_df(df, expected_schema=None, pk_cols=None):
    results = {}
    total = df.count()
    results["row_count"] = total

    # Null percentage per column
    null_pct = df.select([
        (count(when(col(c).isNull(), c)) / total * 100).alias(c)
        for c in df.columns
    ]).collect()[0].asDict()
    results["null_pct"] = null_pct

    # Duplicate check
    if pk_cols:
        dup_count = total - df.dropDuplicates(pk_cols).count()
        results["duplicates"] = dup_count

    # Schema validation
    if expected_schema:
        schema_ok = df.schema == expected_schema
        results["schema_match"] = schema_ok

    return results

report = validate_df(df, pk_cols=["emp_id"])
print(report)
50
CodingEasyWiproTCS
Reduce small files with coalesce before writing
coalesce(n) merges locally — no full shuffle. Use before write to control output file count and size.
Coalesce Small Files
# Problem: job creates 1000 tiny files (small file problem)
print(df.rdd.getNumPartitions())  # 1000

# Solution: coalesce before write (no full shuffle)
df.coalesce(10).write.mode("overwrite").parquet("output/")
# Creates 10 output files instead of 1000

# For a single-file output (use with care on large data)
df.coalesce(1).write.mode("overwrite").csv("report.csv", header=True)

# Delta Lake auto-optimizes with OPTIMIZE command
spark.sql("OPTIMIZE my_delta_table")
51
MCQEasy
Which of the following causes a data shuffle?
Only wide transformations shuffle data across executor boundaries.
Afilter()
Bselect()
CgroupBy().agg()
DwithColumn()
Correct: C — groupBy().agg()
groupBy is a wide transformation — it must collect all rows with the same key from across all partitions into the same partition, which requires shuffling data over the network. filter, select, and withColumn are narrow transformations — each output partition depends on only one input partition, no network transfer needed.
52
MCQEasy
cache() is equivalent to persist() with which storage level?
Think about the default behavior — memory first, then disk.
AMEMORY_ONLY
BMEMORY_AND_DISK
CDISK_ONLY
DOFF_HEAP
Correct: B — MEMORY_AND_DISK
cache() stores data in memory and spills to disk when memory is full. This is the MEMORY_AND_DISK storage level. Note: in Spark 2.x, cache() was MEMORY_ONLY — it changed to MEMORY_AND_DISK in Spark 3.x.
53
MCQEasy
Which of the following is NOT an Action in PySpark?
Actions trigger execution and return results. Transformations build the DAG.
Acollect()
Bshow()
Cfilter()
Dcount()
Correct: C — filter()
filter() is a transformation — it returns a new DataFrame lazily without executing anything. collect(), show(), and count() are all Actions that trigger execution of the full DAG and return results to the driver.
54
MCQMedium
What technology do pandas_udf use for fast data transfer between JVM and Python?
Zero-copy columnar format designed for in-memory analytics.
AApache Arrow
BApache Parquet
CPython Pickle
DProtocol Buffers
Correct: A — Apache Arrow
Apache Arrow is a cross-language in-memory columnar format. pandas_udf use Arrow to transfer entire batches of data between the JVM and Python process in a zero-copy, zero-serialization manner — unlike Python UDFs which use Pickle for row-by-row serialization.
55
MCQMedium
Which method can ONLY decrease the number of partitions?
One causes a full shuffle, the other merges locally.
Arepartition()
Bcoalesce()
CpartitionBy()
DsortWithinPartitions()
Correct: B — coalesce()
coalesce(n) can only decrease partitions. It merges partitions with minimal data movement (combining local partitions on the same executor). repartition() can both increase and decrease partitions but always does a full shuffle.
56
MCQMedium
AQE stands for what, and which Spark version introduced it as default?
It re-optimizes at runtime using actual data statistics.
AAdaptive Query Execution — default ON in Spark 3.2
BAdaptive Queue Engine — default ON in Spark 3.0
CAutomatic Query Execution — default ON in Spark 2.4
DAdvanced Query Estimation — default ON in Spark 3.1
Correct: A — Adaptive Query Execution, default ON in Spark 3.2
AQE was introduced in Spark 3.0 but was opt-in. It became ON by default in Spark 3.2. AQE re-optimizes the physical plan at runtime based on actual statistics collected during shuffle stages — enabling dynamic partition coalescing, skew join handling, and join strategy switching.
57
MCQHard
What isolation level does Delta Lake use for concurrent readers and writers?
Readers see a point-in-time consistent view.
ARead Committed
BSnapshot Isolation
CSerializable
DRepeatable Read
Correct: B — Snapshot Isolation
Delta Lake uses Snapshot Isolation: each reader sees a consistent snapshot of the table as it existed at the time their query started, even if writers are simultaneously committing new data. This is weaker than Serializable (which prevents phantom reads and write skew) but sufficient for most data lake use cases and much more performant.
58
MCQMedium
Which join type is chosen when one table is smaller than autoBroadcastJoinThreshold?
No shuffle required on the larger table.
ABroadcast Hash Join
BSort-Merge Join
CShuffle Hash Join
DNested Loop Join
Correct: A — Broadcast Hash Join
When one side of a join is ≤ spark.sql.autoBroadcastJoinThreshold (default 10MB), Catalyst automatically chooses Broadcast Hash Join. The small table is sent to every executor and held in memory. The large table is scanned locally on each executor — no shuffle needed. This is the fastest join strategy and can be forced with the broadcast() hint even if the threshold isn't met.
59
MCQHard
In Structured Streaming, what happens if you don't set a watermark for a windowed aggregation?
Think about what Spark must keep track of for all possible late data.
ASpark automatically limits state to 1 hour
BState is dropped after each micro-batch
CState grows unbounded — potential OOM
DOnly complete output mode is available
Correct: C — State grows unbounded
Without a watermark, Spark must keep state for every window it has ever seen, in case late data arrives for that window at any future point. This causes the state store to grow without bound, eventually leading to out-of-memory errors. Watermarking tells Spark it's safe to evict state for windows older than MAX(event_time) - threshold.
60
MCQMedium
Which optimizer stage in Catalyst generates optimized JVM bytecode?
The final stage — where Tungsten's whole-stage code generation happens.
AAnalysis
BLogical Optimization
CPhysical Planning
DCode Generation
Correct: D — Code Generation
The fourth and final stage of the Catalyst optimizer is Code Generation (via Tungsten's whole-stage code gen). It compiles the physical plan into optimized JVM bytecode at runtime — eliminating virtual function calls, using CPU registers efficiently, and enabling SIMD-friendly data layout. This is a primary reason why DataFrame operations are so much faster than equivalent RDD code.
No questions match Try a different search term or clear your filters

Problem View
Python (PySpark)
Output
Click ▶ Run to see simulated output…

PySpark Playground

Select a snippet, explore the code, and run it to see simulated output — no setup required.

Code Snippets
Run Locally?

PySpark needs a JVM to execute. Open in Google Colab or run with pip install pyspark.

Python (PySpark) SparkSession Setup
▶ spark-submit --simulate idle
Output Simulated — matches real PySpark output
Click ▶ Run to see simulated output…