Ace your Data Engineering interview. The complete free toolkit — no sign-up, no paywall, just practice.
Understanding the pain Spark solved is what separates a good engineer from a great one — and it's exactly what interviewers probe.
"Submitted the job at 9am. Still running at 4pm. Guess I'll grab another coffee." — A data engineer, circa 2010
Every intermediate result hit disk. A pipeline of 3 operations meant 9 disk read/writes — and in-memory processing simply didn't exist.
Understand the fundamentals interviewers test — from the "why" to the architecture that makes Spark fast.
.filter(), .select(), or .groupBy(), Spark does nothing yet. It builds a logical plan (DAG). Execution only happens when you call an Action like .show(), .collect(), or .write().filter, select, groupBy, join, withColumn, drop, orderBy, distinct, union, repartitionshow, collect, count, take, first, write, save, foreach, reduce.cache() or .persist() intermediate results.
pandas_udf (vectorized UDFs via Apache Arrow) + df.toPandas() for small result sets. Spark 3.2+ also supports pandas API on Spark (pyspark.pandas) for pandas-compatible syntax on distributed data.
groupBy is slow.No account needed · Jump straight into the questions
Start Practising Free →Step through each phase of a real PySpark job — from raw data on disk to the final aggregated result.
| region | total ($) | avg_sale ($) |
|---|---|---|
| North America | 4,821,540 | 312.40 |
| Europe | 3,109,220 | 287.15 |
| Asia Pacific | 2,650,800 | 241.30 |
| Latin America | 980,440 | 198.75 |
| Middle East | 430,120 | 176.50 |
Click any topic to expand the full explanation — these are the concepts interviewers probe deepest.
Think of your DataFrame as a giant book. Spark photocopies it into chapters (partitions) and hands each chapter to a different reader (executor). All readers work simultaneously — the job finishes in the time it takes to read one chapter, not the whole book.
A partition is a logical chunk of your DataFrame stored on one executor node. Spark processes each partition as one Task. More partitions = more parallelism (up to a point).
spark.sql.shuffle.partitions defaults to 200 after every shuffle — a number tuned for large clusters and often wrong for yours.
Rule of thumb: aim for 128–256 MB per partition and 2–4× your total executor cores. A 10 GB dataset on 20 cores needs ~50 partitions, not 200. Over-partitioning causes scheduler overhead; under-partitioning wastes cores.
A shuffle moves data across the network between executors — the most expensive operation in Spark. It involves serialization, disk I/O, and network transfer.
groupBy, join, distinct, orderBy, repartitionfilter, select, withColumn — each partition works independently, no network trafficReduce shuffles: filter and select early (before joins), broadcast small tables (broadcast(df)), and pre-partition datasets on join keys to avoid repeated reshuffling.
Without caching, asking Spark the same question twice is like a chef cooking the entire meal from scratch both times you ask. With cache, the first batch is kept warm — the second serving is instant.
Spark's lazy evaluation re-runs the full transformation chain for every Action. Calling .count() and then .show() on the same DataFrame reads the source file and reruns every transform — twice. Cache breaks this cycle.
Source read + transforms run twice
Source read once, reused for both actions
df.cache() is shorthand for df.persist(StorageLevel.MEMORY_AND_DISK). Use persist() when you need a specific storage level.
Broadcast join = photocopying a small menu and handing a copy to each chef. Each chef has everything they need and never leaves their station. Zero network traffic for the big table.
Sort-merge join = sending all chefs to a central filing cabinet, sorted alphabetically, to find their matching order. Works for any size, but there's a lot of walking (network shuffle).
inner — only rows that match on both sidesleft / right — all rows from one side; nulls where no match on the otherfull — all rows from both sides, nulls wherever no match existsleft_semi — left rows where a match exists in right (equivalent to WHERE EXISTS)left_anti — left rows where no match in right (equivalent to WHERE NOT EXISTS)cross — Cartesian product: every left row × every right row — expensive, use deliberatelybroadcast(df) hintspark.sql.autoBroadcastJoinThresholdShuffles both sides on the join key, then builds a hash map on the smaller side. Faster than sort-merge when one side is clearly smaller but too big to broadcast. Needs more memory — can OOM on very large datasets.
A broadcast variable (sc.broadcast(value)) ships any read-only Python object — a dict, lookup map — to all executors once. A broadcast join is the DataFrame-level application of this: ship a small DataFrame so Spark never needs to shuffle the large one.
Skew is like 4 checkout lanes at a supermarket — but one cashier has 200 people in line and the others have 5 each. The store closes when the last customer is served, so the busiest lane sets your total wait time. The other 3 cashiers are wasted.
Skew occurs when some partition keys carry vastly more rows than others. During groupBy or join, one executor processes 80% of the data while others sit idle. Your job's duration = the slowest task's duration.
Executors 2–4 finish in seconds and sit idle. Job waits on P0.
df.groupBy("join_key").count().orderBy("count", ascending=False).show(10)
Also check Spark UI → Stages → Task metrics. If the 75th-percentile task duration is 2s but the max is 4 minutes, you have a hot key.
Append a random integer ("salt") to the hot key to spread it across N sub-partitions, run the partial aggregation, then strip the salt and do the final rollup.
import pyspark.sql.functions as F SALT = 10 # spread hot key across 10 sub-partitions # 1 — salt the large (skewed) DataFrame df_salted = df.withColumn("salt", (F.rand() * SALT).cast("int")) \ .withColumn("salted_key", F.concat(col("join_key"), F.lit("_"), col("salt"))) # 2 — explode the small (lookup) DataFrame to match every salt value lookup_exploded = lookup.withColumn("salt", F.explode(F.array([F.lit(i) for i in range(SALT)]))) \ .withColumn("salted_key", F.concat(col("join_key"), F.lit("_"), col("salt"))) # 3 — join on salted key, then aggregate as normal result = df_salted.join(lookup_exploded, "salted_key").drop("salt", "salted_key")
spark.sql.adaptive.skewJoin.enabled=true); handles it without code changesunion the resultsgroupBy collapses your team of 10 into a single summary row. Window functions keep all 10 rows intact while giving each employee a new column showing how they rank within their own department — without collapsing anyone.
Window functions compute values across a sliding window of rows related to the current row — without reducing the row count. Unlike groupBy + agg, every input row produces exactly one output row with new computed columns.
from pyspark.sql.window import Window from pyspark.sql.functions import rank, dense_rank, row_number, lag, lead, sum, avg # Partition by dept, order by salary — highest first w = Window.partitionBy("dept").orderBy(col("salary").desc()) df.withColumn("rank", rank().over(w)) \ .withColumn("dense_rank", dense_rank().over(w)) \ .withColumn("row_num", row_number().over(w)) \ .withColumn("prev_salary", lag("salary", 1).over(w)) \ .withColumn("running_sum", sum("salary").over( w.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
These three look similar but behave differently when rows tie. The table below shows the same dataset through each lens:
| Name | Salary | rank() | dense_rank() | row_number() |
|---|---|---|---|---|
| Alice | 90,000 | 1 | 1 | 1 |
| Bob | 90,000 | 1 | 1 | 2 |
| Carol | 75,000 | 3 | 2 | 3 |
| Dave | 60,000 | 4 | 3 | 4 |
| Gap after tie (skips 2) | No gap ever | Always unique | ||
Interview trap: "Get the top-N per group" questions — use row_number() for strictly unique results, rank() to include all tied entries at the cutoff position.
rowsBetween(start, end) counts physical rows (position-based). rangeBetween(start, end) counts by value distance. Use Window.unboundedPreceding / Window.unboundedFollowing to span the entire partition.
Delta Lake is like Git for your data files. Every write adds a commit to a transaction log. You can check out any past version, roll back a bad write, and multiple readers/writers work safely at the same time without corrupting each other.
Delta Lake is an open-source storage layer that adds ACID transactions to data lakes. It sits on top of Parquet files and adds a _delta_log/ JSON transaction log that records every change — inserts, updates, deletes, schema changes.
# Read a past version df = spark.read.format("delta").option("versionAsOf", 5).load("/data/table") # Or by timestamp df = spark.read.format("delta").option("timestampAsOf", "2025-01-01").load("/data/table") # Rollback to version 3 DeltaTable.forPath(spark, "/data/table").restoreToVersion(3)
Z-ordering co-locates related data into the same Parquet files. Queries with filters on a Z-ordered column skip entire files — reading far less data.
delta_table.optimize().executeZOrderBy("user_id", "event_date")
OPTIMIZE + Z-ORDER after large bulk loads. Run VACUUM to delete old Parquet files and reclaim storage (default retention: 7 days).
Think of a Kafka topic as a conveyor belt that never stops. Structured Streaming reads the belt in short bursts (micro-batches), processes each burst exactly like a batch query, and remembers where it stopped so it can pick up where it left off if the job crashes.
Structured Streaming treats a live stream as an unbounded table that grows as new events arrive. You write a normal batch query; Spark executes it incrementally in micro-batches under the hood.
Trigger.ProcessingTime("10 seconds") — micro-batch every 10 seconds; standard choiceTrigger.Once() — process all pending data then stop; batch + streaming hybrid patternTrigger.AvailableNow() — like Once but splits into multiple micro-batches (Spark 3.3+)Trigger.Continuous("1 second") — millisecond latency, experimental; uses continuous processing engineStreaming aggregations must bound the state they keep in memory. Watermarking tells Spark: "discard events that arrive more than X late", capping memory growth.
df.withWatermark("event_time", "10 minutes") \ .groupBy(window(col("event_time"), "5 minutes"), col("user_id")) \ .agg(count("*").alias("event_count"))
Structured Streaming checkpoints progress (Kafka offsets, state store) to durable storage. A crash + restart picks up exactly where the job left off — guaranteeing exactly-once semantics end-to-end when combined with an idempotent sink.
Always set a checkpoint location in production: .option("checkpointLocation", "s3://bucket/checkpoints/job1"). Without it, a restart replays from the beginning and duplicates data at the sink.
Most-used APIs, patterns, and interview tricks — all in one place.
partitionBy in window specs — interviewers check this.
subset=["col"] for specific colsfilter(col("x").isNull()) — find NULL rowsfilter(col("x").isNotNull()) — drop NULL rowsdf.na.replace(0, None)count("*") = all rows · count("col") = non-nullsum("salary"), avg("salary"), mean("salary")max("col"), min("col") — ignores NULLscountDistinct("col") — unique non-null valuesrowsBetween(unboundedPreceding, 0)df.rdd.getNumPartitions() — check current partition countheader=True, inferSchema=True, sep=","multiLine=True for multi-line JSON docsoverwrite · append · ignore · error (default).write.partitionBy("year","month").parquet(...)from pyspark.sql.functions import rank, dense_rank, row_number, lag from pyspark.sql.window import Window # Define window: partition by dept, order by salary desc w = Window.partitionBy("dept").orderBy(col("salary").desc()) df.withColumn("rnk", rank().over(w)) # 1,1,3 .withColumn("dense", dense_rank().over(w)) # 1,1,2 .withColumn("rn", row_number().over(w)) # 1,2,3 .withColumn("prev_sal", lag("salary",1).over(w)) # Running total (cumulative sum) w_cum = Window.partitionBy("dept")\ .orderBy("salary")\ .rowsBetween(Window.unboundedPreceding, 0) df.withColumn("running_total", sum("salary").over(w_cum))
from pyspark.sql.functions import row_number from pyspark.sql.window import Window # Keep latest record per user_id w = Window.partitionBy("user_id")\ .orderBy(col("updated_at").desc()) deduped = df\ .withColumn("rn", row_number().over(w))\ .filter(col("rn") == 1)\ .drop("rn") # Alternative: dropDuplicates (picks arbitrary row) df.dropDuplicates(["user_id"]) # ← no control over WHICH row kept
from pyspark.sql.functions import broadcast # Option 1: explicit broadcast hint (recommended) result = large_df.join(broadcast(small_df), "id") # Option 2: SQL hint result = large_df.join(small_df.hint("broadcast"), "id") # Option 3: set threshold (default 10MB) spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50m") # Verify in plan (look for BroadcastHashJoin) result.explain() # physical plan shows join strategy
from pyspark.sql.functions import concat, lit, rand, floor, array, explode N = 10 # salt buckets # Step 1: salt large table big = big_df.withColumn("salt_key", concat(col("dept"), lit("_"), floor(rand()*N).cast("int"))) # Step 2: explode small table across all buckets small = small_df\ .withColumn("salt_arr", array([lit(i) for i in range(N)]))\ .withColumn("s", explode("salt_arr"))\ .withColumn("salt_key", concat(col("dept"), lit("_"), col("s"))) # Step 3: join on salt_key big.join(small, "salt_key").drop("salt_key")
df.rdd.getNumPartitions()shuffle.partitions@pandas_udf(returnType) decorator.60 questions from Amazon, Databricks, TCS, Uber and more — Fresher to Senior level.
PySpark is the Python API for Apache Spark — a distributed computing engine that processes data across many machines simultaneously.
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("MyApp").master("local[*]").getOrCreate() df = spark.read.csv("data.csv", header=True, inferSchema=True) df.filter(df.salary > 50000).groupBy("dept").count().show()
Driver: Master process — runs your code, creates SparkSession, builds the DAG, coordinates execution, collects results.
Cluster Manager: Manages cluster resources (YARN, Kubernetes, Standalone, Mesos).
Executors: JVM processes on worker nodes — run Tasks, store cached data.
Hierarchy: Application → Jobs (one per Action) → Stages (split by shuffles) → Tasks (one per partition)
An RDD is an immutable, fault-tolerant distributed collection of objects. Fault-tolerant via lineage — if a partition is lost, Spark recomputes it from the original source. Type-unsafe, no Catalyst optimizer. Prefer DataFrames; use RDD only for custom low-level operations or unstructured data (text, binary).
rdd = spark.sparkContext.parallelize([1,2,3,4,5], numSlices=3) result = rdd.map(lambda x: x*2).filter(lambda x: x%2==0) print(result.collect()) # [2, 4, 6, 8, 10]
A DataFrame is a distributed collection with named, typed columns — like a relational table. Uses Catalyst + Tungsten optimizers → significantly faster than RDD. Has SQL compatibility. Immutable — transformations return new DataFrames. Use DataFrames by default for all structured/semi-structured data.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("MyApp") \ .master("yarn") \ .config("spark.executor.memory", "4g") \ .config("spark.executor.cores", "2") \ .getOrCreate() # returns existing or creates new print(spark.version) # 3.5.0
When you order food at a restaurant, the kitchen does not start cooking the second you speak. The waiter writes your order down. Only when the waiter takes the slip to the kitchen — "serve it now" — does any cooking happen.
PySpark works exactly the same way. Calling filter(), select(), or groupBy() is writing down your order. Nothing touches the actual data. The DAG is the written order slip. An Action (show(), count(), collect()) is handing that slip to the kitchen — that is when PySpark reads data, runs calculations, and returns a result.
This design lets PySpark look at your entire chain of steps before executing any of them, allowing the Catalyst optimizer to rewrite the plan in ways a human would rarely think of.
Lazy Evaluation — PySpark does not execute transformations when they are called. Each call (filter, select, groupBy…) adds a node to the DAG but touches zero data.
DAG (Directed Acyclic Graph) — the dependency graph of all pending transformations. "Directed" means data flows in one direction (no cycles). Spark's Catalyst optimizer reads this entire graph and rewrites it before anything runs — pushing filters earlier, dropping unused columns, merging stages.
filter before a join to reduce data volume, even if you wrote it the other wayshow(), count(), collect(), take(n), write.*(), toPandas()You have a CSV with 10 million employee records and 50 columns. You write these 3 lines:
df.filter(df.age > 30) — PySpark records "keep rows where age > 30". Nothing is read from disk..select("name","dept","salary") — records "keep only 3 columns". Still nothing runs..groupBy("dept").count() — records "group by dept and count". Still nothing runs.Catalyst then inspects the entire plan. It notices: "I only need to read 3 columns from disk out of 50, and I can filter rows before grouping — this reduces data by ~70% before any shuffling." Then you call .show(). Only now does Spark read data — already with the Catalyst-optimized plan. On 10M rows this optimization can reduce execution time from minutes to seconds.
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("LazyDemo").getOrCreate() # Create sample data — imagine this is a 10M row file data = [("Alice",35,"Eng",90000),("Bob",28,"HR",60000), ("Carol",42,"Eng",110000),("Dave",25,"Eng",70000)] df = spark.createDataFrame(data, ["name","age","dept","salary"]) # These 3 lines build the DAG — NOTHING executes yet plan = df.filter(df.age > 30) \ .select("name","dept","salary") \ .groupBy("dept").count() # See what Catalyst will actually run (optimized plan) plan.explain() # physical plan only plan.explain(extended=True) # logical + optimized + physical # ← THIS is the Action that triggers execution plan.show() # data is read and processed here
Transformations (lazy): filter, select, groupBy, join, withColumn, orderBy, distinct, union, repartition, map, flatMap
Actions (trigger execution): show(), collect(), count(), take(n), first(), write.*(), toPandas(), foreach(), reduce()
Narrow: filter, select, map — no shuffle, each partition maps to one output partition.
Wide: groupBy, join, distinct, orderBy — cause shuffle across executors, much more expensive.
# Read df = spark.read.csv("data.csv", header=True, inferSchema=True) df = spark.read.parquet("s3://bucket/data/") df = spark.read.format("delta").load("s3://bucket/delta/") # Write (modes: overwrite, append, ignore, error) df.write.mode("overwrite").parquet("output/") df.write.mode("append").partitionBy("year","month").parquet("output/")
from pyspark.sql.functions import col, when, coalesce, lit, count df.dropna() # drop rows with ANY null df.dropna(subset=["name","salary"]) # drop if specific cols null df.fillna({"salary":0, "dept":"Unknown"}) # fill per column df.withColumn("pay", coalesce(col("salary"), col("base_pay"), lit(0))) # Count nulls per column df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns]).show()
A UDF applies a custom Python function to DataFrame columns. Downside: row-at-a-time processing + JVM↔Python serialization per row → 10–100× slower than built-in functions. Always prefer Spark's built-in pyspark.sql.functions. For performance, use pandas_udf (vectorized, Arrow-based).
from pyspark.sql.functions import udf from pyspark.sql.types import StringType band_udf = udf(lambda s: "High" if s >= 100000 else "Mid" if s >= 50000 else "Low", StringType()) df.withColumn("band", band_udf("salary")).show()
from pyspark import StorageLevel df_hot = df.join(other,"id").groupBy("dept").agg(...) df_hot.cache() # MEMORY_AND_DISK shorthand df_hot.count() # action triggers caching df_hot.show() # uses cache — fast! df_hot.unpersist() # release when done df.persist(StorageLevel.MEMORY_ONLY_SER) # smaller footprint df.persist(StorageLevel.OFF_HEAP) # avoid GC pressure
# From list df = spark.createDataFrame([("Alice",30),("Bob",25)], ["name","age"]) # From pandas import pandas as pd df = spark.createDataFrame(pd.DataFrame({"name":["Alice"],"age":[30]})) # From RDD df = sc.parallelize([("Alice",30)]).toDF(["name","age"]) # Quick range (1M rows) df = spark.range(1000000) df.printSchema(); df.show(5)
from pyspark.sql.functions import col, avg, count, when, desc df.select("name", (col("salary")*1.1).alias("new_salary")) df.filter((col("dept")=="Eng") & (col("age")>25)) df.withColumn("grade", when(col("score")>=90,"A").when(col("score")>=70,"B").otherwise("C")) df.groupBy("dept").agg(count("*").alias("n"),avg("salary").alias("avg_sal")) \ .orderBy(desc("avg_sal")).show() df.dropDuplicates(["employee_id"]) df.withColumnRenamed("salary","annual_pay")
Catalyst is Spark's query optimizer — it automatically rewrites and optimizes DataFrame queries before execution:
This is why identical logic runs 5–20× faster as a DataFrame than as an RDD — RDDs bypass all four stages.
Broadcasting sends a small, read-only table to every executor node so the large table never needs to shuffle across the network. Use when one side of a join is small enough to fit in executor memory (<10MB by default, configurable via spark.sql.autoBroadcastJoinThreshold). Typical speedup: 3–8× on fact-dimension joins.
from pyspark.sql.functions import broadcast # large_df: 10GB fact table; dim_df: 5MB dimension table result = large_df.join(broadcast(dim_df), "dept_id", "inner") # dim_df sent to each executor once — no shuffle on large_df # Tune threshold (default 10MB) spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50m") # Disable auto-broadcast spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
Window functions compute values across a set of rows related to the current row — without collapsing them into one row (unlike groupBy+agg). Each row keeps its identity while gaining computed columns. Use cases: ranking, running totals, moving averages, lag/lead comparisons.
rank() vs dense_rank() vs row_number():
rank(): gaps after ties: 1,1,3; dense_rank(): no gaps: 1,1,2; row_number(): always unique: 1,2,3
from pyspark.sql.window import Window from pyspark.sql.functions import rank, dense_rank, row_number, lag, sum, col w = Window.partitionBy("dept").orderBy(col("salary").desc()) df.withColumn("rank", rank().over(w)) \ .withColumn("dense_rank", dense_rank().over(w)) \ .withColumn("row_num", row_number().over(w)) \ .withColumn("prev_salary", lag("salary",1).over(w)) \ .withColumn("running_sum", sum("salary").over( w.rowsBetween(Window.unboundedPreceding, Window.currentRow))) \ .show()
df.repartition(200) # full shuffle, 200 equal partitions df.repartition(50, "dept") # shuffle + partition by column df.coalesce(10) # merge locally, minimal shuffle # Before writing: reduce small files df.coalesce(1).write.mode("overwrite").parquet("output/") # Check current partition count print(df.rdd.getNumPartitions())
Data skew: some partition keys have vastly more rows than others. During groupBy/join, one executor processes 80% of the data while others are idle. Job time = slowest task.
Detect: df.groupBy("key").count().orderBy("count",ascending=False).show(). Check Spark UI → Stages → Task metrics for outlier task durations.
Solutions:
1. Salting: add random prefix to keys, partial agg, remove prefix, final agg
2. Broadcast join: if small side fits in memory, broadcast it
3. AQE (Spark 3.0+): spark.sql.adaptive.skewJoin.enabled=true auto-splits skewed partitions
4. Separate hot keys: filter hot keys out, process separately, union results
import pyspark.sql.functions as F SALT = 10 # Step 1: add random salt (0–9) to key df_s = df.withColumn("salted_key", F.concat(col("join_key"), F.lit("_"), (F.rand()*SALT).cast("int"))) # Step 2: partial aggregation on salted key partial = df_s.groupBy("salted_key").agg(F.sum("amount").alias("part_sum")) # Step 3: strip salt, final aggregation result = partial \ .withColumn("orig_key", F.split(col("salted_key"),"_")[0]) \ .groupBy("orig_key").agg(F.sum("part_sum").alias("total"))
Broadcast Variable: read-only data structure (dict, list) sent to every executor once and cached in memory. Workers can read it efficiently. Use for lookup tables, config maps, ML model weights.
Accumulator: workers can only add to it (write-only); the driver reads the final accumulated value. Use for counters and sums across tasks. Note: accumulators in transformations may be double-counted on re-execution — use inside foreachPartition for reliability.
# Broadcast variable lookup = {"NYC":"New York", "LA":"Los Angeles"} bc = spark.sparkContext.broadcast(lookup) df.rdd.map(lambda r: bc.value.get(r.city, "Unknown")).collect() # Accumulator null_counter = spark.sparkContext.accumulator(0) def count_nulls(row): if row.salary is None: null_counter.add(1) df.rdd.foreach(count_nulls) print(f"Null salaries: {null_counter.value}")
df_a.join(df_b, "id", "inner") df_a.join(df_b, "id", "left") df_a.join(df_b, "id", "full") df_a.join(df_b, "id", "left_semi") # a's rows where id exists in b df_a.join(df_b, "id", "left_anti") # a's rows where id NOT in b # Multi-column join df_a.join(df_b, ["dept","year"], "inner")
inferSchema=True: Spark reads a sample of the data, guesses types. Convenient for exploration but adds an extra pass over the data — slow on large files, may guess wrong types (e.g., "001" read as integer instead of string).
Explicit schema: Define with StructType/StructField. Faster (no inference pass), predictable, catches schema mismatches early. Use in production always.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType schema = StructType([ StructField("emp_id", StringType(), nullable=False), StructField("name", StringType(), nullable=True), StructField("salary", DoubleType(), nullable=True), StructField("dept", StringType(), nullable=True), StructField("hire_date",TimestampType(), nullable=True), ]) df = spark.read.schema(schema).csv("employees.csv", header=True) df.printSchema()
RDD Lineage: Spark records every transformation that produced each RDD. If an executor dies and a partition is lost, Spark uses the lineage to recompute only that partition from the last stable data — no need to restart the entire job.
Checkpointing: Saves an RDD/DataFrame to HDFS/S3 and truncates the lineage. Required for very long lineages (iterative ML algorithms) or Structured Streaming (to recover stream state). Unlike cache, checkpointing survives executor failure permanently.
Replication: Each Spark block can be replicated across executors (default: no replication for performance). Delta Lake adds further durability with ACID transaction log.
Predicate Pushdown: Catalyst pushes filter conditions down to the data source (Parquet, Delta, JDBC). The storage layer only returns rows that pass the filter — never reading filtered rows at all. For Parquet: uses min/max statistics to skip entire row groups. For Delta: also uses partition pruning and Z-order data skipping.
Column Pruning: Only reads columns referenced in the query. If your CSV has 100 columns but you select("name","salary"), Catalyst tells the reader to skip the other 98. For columnar formats (Parquet, ORC), this is a massive I/O reduction.
Both happen automatically — no code change needed. Avoid df.rdd and Python UDFs which break these optimizations.
Regular Python UDFs: JVM serializes each row → Python processes one row → deserializes result back → repeat for every row. Massive overhead.
pandas_udf: Uses Apache Arrow to transfer entire column chunks between JVM and Python in one step (zero-copy). Python processes a pandas Series/DataFrame at once using vectorized operations. Result: 10–100× faster, uses Python's full numerical ecosystem.
from pyspark.sql.functions import pandas_udf import pandas as pd from pyspark.sql.types import DoubleType # Scalar pandas_udf — operates on pandas Series @pandas_udf(DoubleType()) def apply_tax(salary: pd.Series) -> pd.Series: return salary * 0.7 # vectorized — no loop! df.withColumn("take_home", apply_tax("salary")).show() # Grouped pandas_udf — group → pandas DataFrame → pandas DataFrame @pandas_udf(df.schema) def normalize_dept(pdf: pd.DataFrame) -> pd.DataFrame: pdf["salary"] = (pdf["salary"] - pdf["salary"].mean()) / pdf["salary"].std() return pdf df.groupBy("dept").applyInPandas(normalize_dept, schema=df.schema).show()
from pyspark.sql.functions import explode, col, map_keys, map_values # Schema with nested types json_data = """{"user":"Alice","tags":["sql","spark"],"scores":{"math":95,"eng":88}}""" df = spark.read.json(sc.parallelize([json_data])) df.printSchema() # root: user: string, tags: array[string], scores: struct{eng,math} # Explode array → one row per element df.select("user", explode("tags").alias("tag")).show() # +-----+-----+ +-----+-----+ # |user |tag | |Alice|sql | # +-----+-----+ |Alice|spark| # Access nested struct fields df.select("user", col("scores.math"), col("scores.eng")).show()
from pyspark.sql.functions import sum # Source: user | product | amount # Goal: user | Electronics | Clothing | Food pivoted = df.groupBy("user") \ .pivot("product", ["Electronics","Clothing","Food"]) \ .agg(sum("amount")) \ .fillna(0) pivoted.show() # Specify values for performance (avoids full scan to find distinct values)
AQE (Spark 3.0+, default ON in 3.2+) re-optimizes query plans at runtime using actual data statistics collected during execution — not static estimates.
Three key features:
spark.sql.adaptive.skewJoin.enabled=true)spark.conf.set("spark.sql.adaptive.enabled", "true") spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true") # AQE is ON by default in Spark 3.2+ and Databricks Runtime 8.0+
Delta Lake's transaction log records every change with a version number and timestamp. Time travel lets you query data as it existed at any past point by reading the transaction log to reconstruct the table state at that version.
Use cases: audit trails (who changed what, when), rollback after bad writes, reproduce ML experiments with same training data snapshot, compare data before/after pipeline run.
# By version number df = spark.read.format("delta").option("versionAsOf", 5).load("/data/table") # By timestamp df = spark.read.format("delta").option("timestampAsOf", "2025-01-01").load("/data/table") # SQL syntax spark.sql("SELECT * FROM my_table VERSION AS OF 5") spark.sql("SELECT * FROM my_table TIMESTAMP AS OF '2025-01-01'") # Rollback (restore previous version) from delta.tables import DeltaTable DeltaTable.forPath(spark, "/data/table").restoreToVersion(5)
Structured Streaming models a live data stream as an unbounded table — new data arrives as new rows. You write a static batch query; Spark handles incremental execution automatically in micro-batches.
Trigger modes: ProcessingTime("10 seconds") — batch every 10s; Once() — process all available, then stop; AvailableNow() — like Once but multi-batch (Spark 3.3+); Continuous("1 second") — millisecond latency (experimental)
Output modes: Append (only new rows), Complete (all rows rewritten), Update (only changed rows)
from pyspark.sql.functions import window, col, count # Read from Kafka stream_df = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "events").load() # Aggregate: events per 5-minute window query = stream_df \ .withWatermark("event_time", "10 minutes") \ .groupBy(window(col("event_time"),"5 minutes"), "user_id") \ .agg(count("*").alias("events")) \ .writeStream.outputMode("append") \ .format("delta").option("checkpointLocation","/checkpoints/q") \ .trigger(processingTime="10 seconds").start() query.awaitTermination()
Watermarking defines the maximum delay Spark will wait for late-arriving data: "data older than MAX(event_time) - watermark_threshold is too late and will be dropped."
Without watermarking, stateful operations (window aggregations, stream-stream joins) must keep state for all time windows indefinitely → unbounded memory growth → OOM crash.
Trade-off: larger watermark = more state kept = more late data accepted = higher memory use. Smaller watermark = less state = lower latency = some late data dropped.
Requirement: watermark MUST be set before append output mode works with aggregations (Spark enforces this).
DPP (Spark 3.0+) eliminates partitions from the probe side of a join based on the result of filtering the build side. Example: if you join a 10TB fact table with a dimension table and filter WHERE dim.category = 'Electronics', DPP uses that filter to skip non-Electronics partitions in the fact table before reading them — potentially scanning <1% of the data instead of 100%.
Requires: one side is partitioned on the join key; the query has a selective filter on the other side; AQE enabled.
broadcast() hint.Force a strategy: df.hint("broadcast"), df.hint("merge"), df.hint("shuffle_hash")
Python UDF pipeline (slow): JVM executor → pickle serialize one row → Python worker → deserialize → apply function → serialize result → JVM. 2 serializations per row, Python process overhead, row-at-a-time.
pandas_udf pipeline (fast): JVM executor → Apache Arrow zero-copy batch transfer → Python worker receives pandas Series → vectorized numpy operations → Arrow transfer back. One serialization for thousands of rows, uses SIMD CPU instructions via numpy.
When to use each: Built-in functions > pandas_udf > Python UDF. Scala UDFs are even faster than pandas_udf (no JVM→Python boundary) but require Scala knowledge.
Both bypass Catalyst — the optimizer cannot see inside a UDF function body, so it can't push predicates into it or optimize across UDF boundaries.
Z-ordering is a data clustering technique that co-locates rows with similar values of specified columns in the same Parquet files. Delta Lake then records the min/max values of those columns per file in the transaction log. When you query with a filter on a Z-ordered column, Delta uses data skipping to eliminate entire files without reading them.
When to use: high-cardinality columns you frequently filter on (user_id, product_id, order_id). Columns already used for partitioning don't benefit further. Works best when the table is regularly OPTIMIZEd.
Limitation: Z-ordering on multiple columns is a trade-off — effectiveness decreases as more columns are added. Typically <= 4 columns.
from delta.tables import DeltaTable dt = DeltaTable.forPath(spark, "/data/events") # Compact small files + Z-order by user_id dt.optimize().executeZOrderBy("user_id", "event_date") # SQL equivalent spark.sql("OPTIMIZE events ZORDER BY (user_id, event_date)")
Exactly-once = each input message produces exactly one output effect, even if the system crashes and restarts. Structured Streaming achieves this through:
1. Checkpointing: saves processed offsets (e.g., Kafka offset) to durable storage. On restart, reads from last committed offset — no re-processing of old messages.
2. Idempotent sinks: if a micro-batch is re-run due to failure, the sink must be safe to write twice. Delta Lake (ACID) and HDFS/S3 file sinks support this natively.
3. End-to-end exactly-once: Kafka source (committed offsets) + checkpointing + Delta Lake sink = full exactly-once guarantee.
from pyspark.sql.window import Window from pyspark.sql.functions import row_number, col, desc w = Window.partitionBy("employee_id").orderBy(desc("updated_at")) deduped = df.withColumn("rn", row_number().over(w)) \ .filter(col("rn") == 1) \ .drop("rn") deduped.show() # For simple dedup (no timestamp preference): df.dropDuplicates(["employee_id"])
from pyspark.sql.functions import broadcast # Large fact table: millions of rows fact_df = spark.read.parquet("s3://data/transactions/") # Small dimension table: <10MB dim_df = spark.read.parquet("s3://data/departments/") result = fact_df.join(broadcast(dim_df), "dept_id", "inner") # Verify BroadcastHashJoin appears in physical plan result.explain() # Physical Plan should show: BroadcastHashJoin, BuildRight result.show()
from pyspark.sql.window import Window from pyspark.sql.functions import row_number, col, desc N = 3 w = Window.partitionBy("dept").orderBy(desc("salary")) top_n = df.withColumn("rn", row_number().over(w)) \ .filter(col("rn") <= N) \ .drop("rn") top_n.orderBy("dept", desc("salary")).show() # Returns top 3 earners per department
from pyspark.sql.functions import when, col df.withColumn("grade", when(col("score") >= 90, "A") .when(col("score") >= 80, "B") .when(col("score") >= 70, "C") .otherwise("F")) \ .withColumn("salary_band", when(col("salary") >= 100000, "High") .when(col("salary") >= 50000, "Mid") .otherwise("Low")).show()
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType schema = StructType([ StructField("emp_id", StringType(), nullable=False), StructField("name", StringType(), nullable=True), StructField("salary", DoubleType(), nullable=True), StructField("dept", StringType(), nullable=True), StructField("hire_date", TimestampType(), nullable=True), ]) df = spark.read.schema(schema).json("s3://data/employees.json") df.printSchema() df.show(5)
from pyspark.sql.functions import explode, explode_outer, col # JSON: {"user":"Alice", "orders":[{"id":1,"amt":50},{"id":2,"amt":80}], "addr":{"city":"NY"}} df = spark.read.json("orders.json") # Explode array: one row per order df_exploded = df.select( col("user"), explode("orders").alias("order"), # drops rows with empty arrays col("addr.city").alias("city") # dot notation for struct ) # Flatten nested struct fields df_flat = df_exploded.select( "user", "city", col("order.id").alias("order_id"), col("order.amt").alias("amount") ) df_flat.show()
from pyspark.sql.functions import sum # Input: user | product_category | amount # Output: user | Electronics | Clothing | Food pivoted = df.groupBy("user") \ .pivot("product_category", ["Electronics","Clothing","Food"]) \ .agg(sum("amount")) \ .fillna(0) # replace nulls (categories user never bought) pivoted.show() # Specifying values avoids a full scan to find distinct category values
import pyspark.sql.functions as F SALT_BUCKETS = 20 # spread hot key across 20 partitions # Step 1: add random salt to the skewed column df_salted = df.withColumn("salt", (F.rand() * SALT_BUCKETS).cast("int")) \ .withColumn("salted_key", F.concat(col("customer_id"), F.lit("_"), col("salt"))) # Step 2: partial aggregation on salted key partial = df_salted.groupBy("salted_key", "product") \ .agg(F.sum("amount").alias("partial_sum")) # Step 3: strip salt prefix, final aggregation result = partial \ .withColumn("customer_id", F.split(col("salted_key"), "_")[0]) \ .groupBy("customer_id", "product") \ .agg(F.sum("partial_sum").alias("total_amount")) result.show()
from pyspark.sql.functions import col, count, when, sum as spark_sum def validate_df(df, expected_schema=None, pk_cols=None): results = {} total = df.count() results["row_count"] = total # Null percentage per column null_pct = df.select([ (count(when(col(c).isNull(), c)) / total * 100).alias(c) for c in df.columns ]).collect()[0].asDict() results["null_pct"] = null_pct # Duplicate check if pk_cols: dup_count = total - df.dropDuplicates(pk_cols).count() results["duplicates"] = dup_count # Schema validation if expected_schema: schema_ok = df.schema == expected_schema results["schema_match"] = schema_ok return results report = validate_df(df, pk_cols=["emp_id"]) print(report)
# Problem: job creates 1000 tiny files (small file problem) print(df.rdd.getNumPartitions()) # 1000 # Solution: coalesce before write (no full shuffle) df.coalesce(10).write.mode("overwrite").parquet("output/") # Creates 10 output files instead of 1000 # For a single-file output (use with care on large data) df.coalesce(1).write.mode("overwrite").csv("report.csv", header=True) # Delta Lake auto-optimizes with OPTIMIZE command spark.sql("OPTIMIZE my_delta_table")
Select a snippet, explore the code, and run it to see simulated output — no setup required.
Not a dry tutorial. Each concept taught with story, visual, code, quiz — then a live practice problem.