Pyspark
What is Apache Spark?
Apache Spark is an open-source distributed data processing engine designed for big data analytics.
It allows you to process large datasets across multiple machines (clusters) quickly and efficiently.
Think of it as a supercharged, scalable version of Python’s Pandas or SQL that works on massive data distributed across many servers.
Spark is written in scala
What is PySpark?
PySpark is the Python API for Apache Spark. It allows you to write Spark applications in Python.
In other words:
-
Apache Spark = the big data processing engine (written in Scala, runs on JVM).
-
PySpark = a way to use Spark with Python.
Key Features of Spark
-
Distributed Computing:
-
Data is split into chunks and processed in parallel across a cluster.
-
Can handle petabytes of data.
-
-
Fast Processing:
-
Uses in-memory computation, which is faster than disk-based systems like Hadoop MapReduce.
-
Optimized DAG (Directed Acyclic Graph) execution for tasks.
-
-
Multi-Language Support:
-
Scala (native)
-
Java
-
Python (PySpark)
-
R
-
SQL
-
-
Unified Engine:
Spark can handle multiple workloads:-
Batch processing → Big files (CSV, Parquet, JSON)
-
Stream processing → Real-time data (Kafka, sockets)
-
Machine Learning → MLlib
-
Graph Processing → GraphX
-
-
Fault-Tolerant:
-
Uses RDDs (Resilient Distributed Datasets) that can recover lost data automatically.
=============================================
🔹 1. Monolithic System (Single-node processing / vertical scaling)
A monolithic system means everything runs on one machine (one CPU, one memory space).
All components (ui,db,api etc) are tightly connected and run as one application.
✅ Characteristics
-
Runs on a single server (highly dependent on one)
-
Tight coupling (all components together)
-
Processes data sequentially or limited parallelism
-
Limited scalability
- We can increase RAM but to certain limit
📌 Example
-
Traditional Python script using Pandas
-
Single-node database queries
❌ Problems
-
Cannot handle big data
-
Memory becomes bottleneck
-
Slow for large-scale processing
🔹 2. Distributed Processing (Spark)
Apache Spark is a distributed system, meaning it runs across multiple machines (cluster).
Split application in many small component each running independently on different machine/severs .
Distributed Computing is a computing model where large computational tasks are divided and executed across multiple machines (nodes) that work in parallel. Think of it as breaking a huge job into smaller parts and assigning each part to a different worker. It's key features include:
- Speed: Multiple nodes work simultaneously.
- Scalability: Add more nodes to handle more data.
- Fault tolerance: If one node fails, others continue.
✅ Characteristics
-
Data is split across multiple nodes
-
Parallel processing
-
Fault-tolerant (if one node fails, others continue)
-
Scales horizontally (add more machines)
- High availability (no one point of failure)
📌 Core Idea
Instead of:
👉 “1 machine doing all work”
Spark does:
👉 “Many machines doing parts of work together”
👉 “1 machine doing all work”
👉 “Many machines doing parts of work together”
=============================================
Why Spark Replaced Hadoop/map-reduce
What is MapReduce?
MapReduce is a distributed data processing model introduced by
Google, and widely used in Apache Hadoop.
👉 It processes large datasets in parallel across clusters.
🔹1. Core Idea
MapReduce works in 2 main phases:
1️⃣ Map Phase
- Takes input data
- Breaks it into smaller chunks
- Processes each chunk
- Outputs key-value pairs
Example:
Input:
Hello World Hello
Map Output:
(Hello, 1)
(World, 1)
(Hello, 1)
2️⃣ Reduce Phase
- Groups same keys
- Aggregates values
Output:
(Hello, 2)
(World, 1)
🔹 Simple Flow
Input Data → Map → Shuffle & Sort → Reduce → Final Output
🏗️ 2. Internal Components
🧠 Master Node
-
Job scheduling
-
Resource allocation
In modern Hadoop:
- YARN handles resources
⚙️ Worker Nodes
-
Execute map & reduce tasks
-
Store data blocks
🧠 Important Step: Shuffle & Sort
- Groups same keys together
- Happens between Map and Reduce
- Most expensive step ⚠️
🔥 3. Key Concepts
✅ Data Locality
👉 “Move compute to data, not data to compute”
-
Mapper runs where data is stored
-
Reduces network cost
✅ Fault Tolerance
-
If a node fails:
-
Task is re-executed on another node
-
Based on replication in HDFS
- Task is re-executed on another node
✅ Parallelism
-
Multiple mappers + reducers run simultaneously
-
Massive scalability
✅ Partitioning
-
Decides which reducer gets which key
Default:
hash(key) % number_of_reducers
⚡ 4. Types of MapReduce Jobs
1. Map-only Job
- No reducer
- Used for filtering, transformations
2. Map + Reduce Job
- Full processing (aggregation, joins)
🚫 5. Limitations (Why Spark Replaced It)
Compared to Apache Spark:
❌ Disk I/O heavy
- Writes to disk after every step
❌ High latency
- Not real-time
❌ Not iterative-friendly
- Poor for ML, graph processing
🔹 Example Use Cases
- Word count (most famous)
- Log analysis
- ETL jobs
- Indexing (search engines)
🔹 MapReduce vs Spark
| Feature | MapReduce (Hadoop) | Spark |
|---|---|---|
| Processing | Disk-based | In-memory |
| Speed | Slow | Fast ⚡ |
| Iterations | Poor | Excellent |
| Complexity | More code | Easier APIs |
| Real-time | Not suitable | Supports real-time |
👉 That’s why modern systems prefer Apache Spark over MapReduce.
🔹 Limitations of MapReduce
- Writes intermediate data to disk (slow ❌)
- Not good for iterative algorithms (ML, graph)
- Complex coding (Java-heavy)
=============================================
Vertical vs Horizontal Scaling
🔥 1. Core Idea (Simple)
- Vertical Scaling → Increase power of one machine
- Horizontal Scaling → Increase number of machines
🧠 2. Vertical Scaling (Scale Up ⬆️)
👉 Upgrade existing system
📌 Example:
- Add more RAM
- Add more CPU
- Upgrade disk
🖥️ Visualization
1 Server → Bigger Server
✅ Advantages
- Simple to implement
- No code changes required
❌ Disadvantages
- Limited (hardware limit)
- Expensive
- Single point of failure
🧠 3. Horizontal Scaling (Scale Out ➡️)
👉 Add more machines to cluster
🖥️ Visualization
1 Server → 5 Servers → 100 Servers
✅ Advantages
- Highly scalable
- Fault tolerant
- Cost-effective
❌ Disadvantages
- Complex setup
- Requires distributed system design
📊 4. Comparison Table
| Feature | Vertical Scaling ⬆️ | Horizontal Scaling ➡️ |
|---|---|---|
| Approach | Increase machine power | Increase number of machines |
| Scalability | Limited | Unlimited |
| Cost | Expensive | Cost-effective |
| Fault Tolerance | ❌ Low | ✅ High |
| Complexity | Simple | Complex |
| Example | Upgrade server RAM | Add more nodes |
👉 Spark uses Horizontal Scaling
- Data distributed across nodes
- Tasks run in parallel
- Cluster grows as needed
=============================================
Spark is a distributed computing system, meaning it runs across a cluster of machines.
Its architecture is designed for speed, fault tolerance, and scalability.
Apache Spark architecture is a master–worker system where a central driver coordinates multiple worker nodes to process data in parallel.
🧠 1. High-Level Architecture
Driver Program
|
---------------------
| | |
Executor Executor Executor
(Node1) (Node2) (Node3)
🔹 2. Core Components
🧭 1. Driver Program (Master Brain)
👉 The Driver is the main controller / brain of spark application.
Responsibilities:
- Creates SparkSession
- reads code
- Converts code → DAG (Directed Acyclic Graph)
- request resource from cluster manager
- Schedules tasks across executor
- track job progress and task status
- Communicates with executors
- handle failure
👉 Think:
“Driver = brain of Spark” which decides what to be done and who will do it.
⚙️ 2. Worker node(executor+task)
👉 every worker node run on executor process
Think as employee and task are to do item assign by driver
executor do actual work:
- Execute tasks
- Store data (in-memory or disk)
- stre shuffle data
- Return results to driver
👉 Each executor has:
- CPU cores
- Memory
📦 3. Cluster Manager
👉 Manages resources across cluster
👉Allocates cpu and memory to spark job.
Types:
- YARN
- Kubernetes
- Standalone manager(spark's own)
Role:
- start executor on worker machine
- Allocates executors
- Manages resources
- replace crushed executor
🔄 3. Execution Flow (Step-by-Step)
Explain how a Spark job runs internally?
↓
Lazy Evaluation
↓
DAG Creation
↓
Optimization (Catalyst)
↓
Action → Job
↓
Stages (split by shuffle)
↓
Tasks (per partition)
↓
Executors run tasks
↓
Result to Driver
Step 1: Code Submission
Spark Application: You write your program (Python, Scala, Java, R).
df.filter().groupBy().count()
👉 You define transformations + action
Step 2: DAG Creation / Driver Program Starts / Lazy evaluation
2.1. Lazy Evaluation
- Transformations are not executed immediately
- Spark builds a logical plan (DAG)
2.2. Driver converts logic → DAG
👉 DAG = sequence of transformations (logical plan)
👉 Driver = brain
2.3. Driver Program: SparkSession/SparkContext runs on the driver. It:
Connects to cluster manager (like YARN)
Divides work into tasks
Builds a DAG (Directed Acyclic Graph) of transformations
- Spark optimizes DAG (removes unnecessary steps)
- Uses internal optimizations (Catalyst for DataFrames)
Sends tasks to executors
2.4. Catalyst Optimizer (for DataFrames)
👉 Optimizes the plan:
- Predicate pushdown
- Column pruning
- Join optimization
Step 3:
3.1. Job Creation from action -👉 Action triggers execution: .show()
✅ Creates 1 Job
3.2. Stage Division (DAG → Stages)
- Split DAG into stages
- Whenever a shuffle happens → Spark creates a new stage
- Based on shuffle boundaries
- A stage is a set of transformations that can run without data movement
- Shuffle = data moves across partitions/nodes
Example:
- filter - Narrow transformation → no shuffle → Same stage
- groupBy -Wide transformation → shuffle → New stage
Step 4: Stage → Task
4.1. Task Creation
👉 Each stage is divided into tasks
- 1 partition = 1 task
4 partitions → 4 tasks4.2. Task Scheduling
👉 Components:
- Driver → coordinates execution , Driver ask cluster manager for executor
- Cluster manager (YARN / Standalone) →
1. Cluster managaer start executor on worker node
2. Cluster managaer start executor on worker node
3.Cluster manager allocates: Executors , CPU , Memory- Executors →
1. Executor run task in parallel
2. Store data in memory
👉 Parallel execution starts here ⚡
Step 5:
Task Execution (on Executors)
- Tasks sent to executors
Executors:
- Process data
- Perform transformations
Shuffle (if needed)
- Happens in wide transformations (groupBy, join)
- Data moves across nodes
👉 Most expensive step ⚠️
Execution happens only when action is called
- count()
- collect()
- show()👉 Lazy evaluation ends here
Step 6: Result Return :
Executors return results to driver, driver aggregates and produces output.
- Executors → Driver
🔥 4. Important Concepts
✅ DAG (Directed Acyclic Graph)
- Logical execution plan
- Optimizes execution
✅ Lazy Evaluation
👉 Spark does NOT execute immediately
- Builds plan first
- Executes only on action
Example:
df.filter() # no execution
df.show() # execution starts
✅ Transformations vs Actions
Transformations (lazy)
- map, filter, groupBy
Actions (trigger execution)
- show(), collect(), count()
⚡ 5. Types of Transformations
🔹 Narrow Transformation
- No shuffle
- Fast
Example:
- map()
- filter()
🔹 Wide Transformation
- Requires shuffle
- Expensive
Example:
- groupBy()
- join()
🔀 6. Shuffle (Critical Concept)
👉 Data moves across nodes
- Happens in wide transformations
- Costly (network + disk I/O)
💾 7. Storage (Memory Management)
Spark uses:
- In-memory storage (fast)
- Disk (fallback)
👉 Supports:
- Cache / Persist
⚡ 8. Fault Tolerance
Spark uses lineage
👉 If data lost:
- Recompute from original transformations
(No need for replication like Hadoop)
=============================================
👉 Why is Apache Spark fast?
🔥 1. In-Memory Processing (Biggest Reason 🚀)
👉 Spark processes data in RAM instead of disk
Disk (Hadoop) ❌ → Slow
Memory (Spark) ✅ → Fast
✅ Benefit:
-
Avoids repeated disk I/O
-
Much faster computation
🔥 2. Lazy Evaluation + DAG Optimization
👉 Spark doesn’t execute immediately
👉 Builds a DAG (execution plan)
- Combines operations
- Removes unnecessary steps
✅ Example:
filter → select → groupBy
👉 Executed together (optimized)
🔥 3. Catalyst Optimizer (Smart Query Engine)
👉 Automatically optimizes queries:
- Predicate pushdown
- Column pruning
- Join reordering
✅ Result:
-
Less data processed
-
Faster execution
🔥 4. Tungsten Execution Engine
👉 Low-level optimization:
- Efficient memory management
- Binary processing (less serialization)
- Code generation
✅ Result:
-
CPU-efficient execution
🔥 5. Parallel Processing
👉 Data split into partitions
100 partitions → 100 tasks → parallel execution
✅ Result:
-
Massive speed-up
🔥 6. Reduced Disk I/O
👉 Compared to MapReduce:
- MapReduce → writes to disk after each step ❌
- Spark → keeps data in memory ✅
🔥 7. Efficient Shuffle Handling
👉 Optimized data movement:
- Combines data before shuffle
- Uses compression
🔥 8. Caching / Persistence
df.cache()
👉 Reuses data instead of recomputing
📊 Summary Table
Reason Impact In-memory processing Faster than disk Lazy evaluation Optimized execution Catalyst optimizer Smart query planning Tungsten engine CPU + memory efficiency Parallel processing Faster computation Caching Avoid recomputation
| Reason | Impact |
|---|---|
| In-memory processing | Faster than disk |
| Lazy evaluation | Optimized execution |
| Catalyst optimizer | Smart query planning |
| Tungsten engine | CPU + memory efficiency |
| Parallel processing | Faster computation |
| Caching | Avoid recomputation |
=============================================
🔥 1. Core Idea (Simple)
- Driver → Brain of Spark application
- ApplicationMaster (AM) → Resource manager coordinator (in YARN)
👉 Driver = Executes logic
ApplicationMaster = Manages resources
👉 In YARN, the Application Manager is a part of the ResourceManager that:
Accepts, tracks, and manages applications (jobs) submitted to the cluster
📊 2. Comparison Table
| Feature | Driver Program | ApplicationMaster (YARN) |
|---|---|---|
| Role | Main controller of Spark app | Manages resources for the app |
| Responsibility | DAG creation, scheduling tasks | Requests containers from YARN |
| Runs user code? | ✅ Yes | ❌ No |
| Part of Spark? | ✅ Yes | ❌ No (part of YARN) |
| Location | Client / Cluster node | Always inside cluster |
| Controls executors? | ✅ Yes | Indirectly via YARN |
🧠 3. Driver Program (Spark)
👉 Core responsibilities:
- Creates DAG
- Splits into stages & tasks
- Sends tasks to executors
- Collects results
👉 Think:
“Driver = brain + scheduler”
⚙️ 4. ApplicationMaster (YARN)
👉 Exists only when using YARN
Responsibilities:
- Negotiates resources with YARN
- Requests containers
- Launches executors
- Monitors application
👉 Think:
“AM = resource manager for your app”
🔄 5. How They Work Together
Flow:
User submits Spark job
↓
ApplicationMaster starts (via YARN)
↓
AM allocates resources (containers)
↓
Driver runs (inside AM OR separately)
↓
Driver requests executors
↓
Executors launched on worker nodes
↓
Driver sends tasks → Executors
📍 6. Deployment Modes (VERY IMPORTANT)
🖥️ Client Mode
- Driver runs on client machine
- AM only manages resources
👉 If client dies → job fails ❌
☁️ Cluster Mode
- Driver runs inside ApplicationMaster
👉 More reliable ✅
=============================================
Py4J
🧠 Why Py4J is Needed in Spark?
👉 Because Apache Spark is written in Scala/Java, not Python.
So when you write:
df.show()
👉 Python cannot directly execute Spark code ❌
👉 It uses Py4J to communicate with the JVM ✅
PySpark slower than Scala Spark?
👉 Yes (slightly), due to Py4J overhead
📦 Example
df = spark.read.csv("file.csv")
df.show()
👉 Internally:
- Python → Py4J → JVM → Spark execution → Result → back to Python
🔥 Key Responsibilities of Py4J
- Enables Python ↔ Java communication
- Converts Python calls → JVM calls
- Transfers data between Python & Spark engine
👉 SparkContext vs SparkSession
🔥 1. Core Difference (Simple)
- SparkContext (SC) → Old entry point (low-level)
- SparkSession → New unified entry point (high-level)
👉SparkSession = wrapper over SparkContext + SQLContext + HiveContext
🧠 Why Do We Need It?
⚡ Important Points
✅ Creates SparkContext internally
-
You don’t need to create it manually
|
📊 2. Comparison Table
| Feature | SparkContext | SparkSession |
|---|---|---|
| Introduced in | Spark 1.x | Spark 2.x+ |
| Level | Low-level API | High-level unified API |
| Purpose | Connect to cluster | Entry point for all Spark features |
| Supports SQL | ❌ No | ✅ Yes |
| Supports DataFrame | ❌ No (needs SQLContext) | ✅ Yes |
| Ease of use | Complex | Simple |
| Usage today | Rare | Standard |
🧠 3. SparkContext (SC)
👉 Old way to start Spark
Responsibilities:
- Connect to cluster
- Create RDDs
- Manage execution
Example:
from pyspark import SparkContext
sc = SparkContext("local", "App")
rdd = sc.parallelize([1,2,3])
👉 Limitation:
- No direct SQL/DataFrame support
🚀 4. SparkSession (Modern Way)
👉 Introduced to simplify everything
Responsibilities:
-
Entry point for:
- RDD
- DataFrame
- SQL
- Hive
Example:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("App") \
.getOrCreate()
df = spark.read.csv("file.csv")
🔗 5. Relationship
Inside SparkSession:
SparkSession
├── SparkContext
├── SQLContext
└── HiveContext
👉 You can still access:
spark.sparkContext
🎯 6. When to Use What?
Use SparkSession ✅
- Always (modern Spark)
- DataFrame / SQL work
- Production pipelines
Use SparkContext ❗
- Rare cases
- Low-level RDD operations
- Legacy code
👉“SparkContext is the original low-level entry point for RDD-based processing, while SparkSession is the modern unified entry point that provides access to RDDs, DataFrames, SQL, and Hive functionalities.”
=============================================
1️⃣ What is Lazy Evaluation?
👉 Lazy evaluation means PySpark does NOT execute transformations immediately.
Instead, it create best logical plan and builds a plan (DAG) and executes only when an action is called.
In simple words:
Transformations are lazy, Actions are eager
2️⃣ Transformations vs Actions (Core idea)
🔹 Transformations (Lazy)
These do not trigger execution:
-
select() -
filter() -
map() -
withColumn() -
join() -
groupBy()
They only build a logical execution plan (DAG).
👉 Spark is just planning the execution
3️⃣ What happens internally? (Spark flow)
When you write PySpark code:
Step 1: Build Logical Plan
Spark just creates a DAG (Directed Acyclic Graph):
❌ No execution yet
Step 2: Action triggers execution
Now Spark:
-
Optimizes the plan (Catalyst Optimizer)
-
Creates a physical execution plan
-
Submits jobs to executors
-
Reads data + performs transformations
4️⃣ Why does Spark use Lazy Evaluation?
✅ 1. Performance Optimization
Spark can:
-
Reorder operations
-
Push filters early (predicate pushdown)
-
Remove unused columns
-
Combine transformations
Example:
Spark internally optimizes to:
✅ 2. Avoids Unnecessary Computation
If no action is called, Spark does nothing.
✅ 3. Fault Tolerance
Spark tracks lineage, not data.Uses lineage (can recompute lost data)
If a partition fails, Spark recomputes it using the DAG.
5️⃣🔥 Example: Without Thinking vs Spark Optimization
❌ What You Might Think Happens
Step 1: Read data
Step 2: Apply filter (age > 25)
Step 3: Store result
Step 4: Apply filter (salary > 50000)
Step 5: Store result
Step 6: Select columns
Step 7: Show result
👉 Multiple steps, multiple intermediate outputs (slow ❌)
✅ What Spark Actually Does (Lazy Optimization)
Because of lazy evaluation, Spark builds a DAG and optimizes it:
Read → Combined Filter → Select → Show
👉 Both filters are merged into ONE operation 🚀
⚡ Optimized Execution
Instead of:
filter(age) → filter(salary)
Spark does:
filter(age AND salary)
🔥 Internal Optimization (Catalyst)
Spark’s optimizer:
- Combines filters ✅
- Pushes filters down to data source (Predicate Pushdown) ✅
- Removes unnecessary columns (Column Pruning) ✅
📊 Another Example (Even Better)
🧾 Code
df = spark.read.csv("data.csv")
df.select("name", "age") \
.filter(df.age > 30) \
.filter(df.name.startswith("A")) \
.show()
🚀 Spark Optimization
Read only required columns → Apply combined filter → Output
👉 Optimizations:
-
Reads only
name,age(not full table) - Combines filters
- Executes in one pass
6️⃣ DAG Visualization
You can see lazy evaluation in action using:
This shows:
-
Parsed Logical Plan
-
Optimized Logical Plan
-
Physical Plan
7️⃣ Caching & Lazy Evaluation (Very Important)
⚠️ cache() itself is lazy
Data is cached only when an action runs
8️⃣ Real-world Data Engineering Example
In ETL pipelines:
-
Spark waits
-
Optimizes
-
Executes everything once
-
Writes final output
🚀 This is why Spark scales so well.
🔥 What is a Partition?
👉A partition in Spark is a logical division of data that enables parallel processing, where each partition is processed by a separate task, and proper partitioning is critical for performance optimization.👉A partition is a chunk of distributed data.
- Spark splits data into partitions
- Each partition is processed in parallel
- each partition is process by one task
- each task run on 1 executor core
1 Core = 1 Task at a time
1 Executor with 4 cores → can run 4 tasks in parallel
🧠 Simple Idea
Big Data → Split → Partition1 | Partition2 | Partition3
👉 Each partition = one task
⚡ Why Partitions Matter?
- Enable parallel processing
- Improve performance
- Control resource utilization
👉 More partitions → more parallelism 🚀
🔥 1. What is Partition Size?
👉Partition size = amount of data inside one partition
🧠 Example
Total data = 100 MB
Partitions = 5
👉 Each partition ≈ 20 MB
Partitions = 5
⚡ Why Important?
-
Too large → slow task ⚠️
-
Too small → too many tasks ⚠️
✅ Ideal Size (Rule of Thumb)
👉100 MB – 200 MB per partition
🔥 2. What is getNumPartitions()?
👉Returns number of partitions
📌 Example
rdd.getNumPartitions()
👉 Output: 5
🧠 Meaning
👉RDD is divided into 5 partitions → 5 tasks
Example Together
rdd = sc.parallelize(range(100), 4)
👉 Results:
-
getNumPartitions() → 4
-
Partition size → ~25 elements each
getNumPartitions() → 4
⚠️ Important Point
👉Spark does NOT directly give partition size
You estimate it based on:
-
Total data size
-
Number of partitions
🔹 Example
rdd = spark.sparkContext.parallelize([1,2,3,4,5,6], 3)
👉 Output:
- Partition 1 → [1,2]
- Partition 2 → [3,4]
- Partition 3 → [5,6]
👉# Tasks = # Partitions
🔥 Types of Partitioning
1️⃣ Hash Partitioning (Default)
- Uses: hash(key) % number_of_partitions
👉 Used in:
- groupBy
- reduceByKey
2️⃣ Range Partitioning
- Data divided based on value ranges
👉 Useful for:
- Sorted data
- Range queries
🔄 Partition Operations
🔹 Repartition (Increase/Decrease)
👉 Used when:
- You want more partitions
- You want even data distribution
df.repartition(10)[Data] → Shuffle → Equal partitions (balanced)
- Full shuffle ⚠️
- Expensive but balanced ⚠️
🔹 Coalesce (Reduce Only)
👉 Used when:
- You want to reduce partitions
- Avoid heavy shuffle
df.coalesce(2)[Data] → Merge partitions → Fewer partitions (not evenly balanced)
- No full shuffle
- Faster
- Faster but may cause imbalance ⚠️
🚫 Common Mistakes
❌ Using repartition() unnecessarily → slow job
❌ Using coalesce() for increasing partitions → not possible
🔀 Partitioning & Shuffle
👉 Shuffle happens when:
- Data needs to move across partitions
Example:
df.groupBy("dept").count()
👉 Spark reshuffles data so same keys go to same partition
💾 Partition Storage
- In memory (fast)
- Disk (fallback)
🚫 Problems with Bad Partitioning
❌ Too Few Partitions
- Less parallelism
- Slow execution
❌ Too Many Partitions
- Overhead of task scheduling
❌ Data Skew ⚠️
- One partition has more data
- Causes slow jobs
🔥 What is Bucketing?
👉Bucketing is a technique to divide data into a fixed number of files (buckets) using a hash function on a column.
👉
Bucketing in Spark distributes data into a fixed number of files using a hash function on a column, which helps optimize joins and aggregations by reducing shuffle.
🧠 Core Idea
bucket_number = hash(column) % number_of_buckets
👉 Same values → always go to same bucket
⚙️ How It Works
Example:
df.write \
.bucketBy(4, "user_id") \
.saveAsTable("users_bucketed")
👉 Result:
- Data is split into 4 bucket files
-
Based on
user_id
📊 Visualization
Data → hash(user_id) → Bucket 0
Bucket 1
Bucket 2
Bucket 3
🔥 Why Bucketing is Important?
✅ 1. Faster Joins
If two tables:
- Use same bucket column
- Same number of buckets
👉 Spark can avoid shuffle 🚀
✅ 2. Better Aggregations
- groupBy becomes faster
- Data already distributed
✅ 3. Controlled File Count
- Unlike partitioning (dynamic)
- Bucketing = fixed number of files
⚡ Example (Join Optimization)
Without Bucketing ❌
Join → Shuffle → Slow
With Bucketing ✅
Same bucket → Direct join → Fast
🧾 SQL Example
CREATE TABLE users (
id INT,
name STRING
)
USING parquet
CLUSTERED BY (id) INTO 4 BUCKETS;
📊 Key Characteristics
| Feature | Bucketing |
|---|---|
| Based on | Hash function |
| Number of buckets | Fixed |
| Storage | Files (not folders) |
| Use case | Joins, aggregations |
| Shuffle reduction | Yes |
🔄 Bucketing vs Partitioning (Quick View)
🔥 Core Idea (1-liner)
- Partitioning → splits data into folders based on column values
- Bucketing → splits data into fixed number of files using hash
🧠 Partitioning
🔹 How it Works
Data → Split by column value → Stored in folders
Example:
df.write.partitionBy("country").save("data/")
👉 Storage:
data/country=India/
data/country=USA/
✅ Best For
- Filtering queries
- Large datasets with distinct values
❌ Problem
- Too many unique values → too many small files ⚠️
⚙️ Bucketing (Detail)
🔹 How it Works
bucket = hash(column) % num_buckets
Example:
df.write.bucketBy(10, "user_id").saveAsTable("table")
👉 Creates:
10 bucket files
✅ Best For
- Joins
- GroupBy operations
- Large tables
🚀 Advantage
👉 Reduces shuffle during joins
📊 Query:
SELECT * FROM orders WHERE country = 'India'
👉 Partitioning:
-
Reads only
country=Indiafolder ✅ (fast)
📊 Join:
SELECT * FROM orders o JOIN users u ON o.user_id = u.user_id
👉 Bucketing:
- Same bucket → no shuffle needed ✅
🧠When to Use What?
Use Partitioning:
- Low cardinality columns (country, date)
- Filtering queries
Use Bucketing:
- High cardinality columns (user_id)
- Frequent joins
🚫 Limitations
❌ Only works with tables
-
Requires
saveAsTable()
❌ Needs same config for joins
- Same column
- Same number of buckets
❌ Not always automatically used
- Spark may still shuffle if conditions not met
================================================
🔥 What is RDD?
👉RDD is the fundamental distributed data structure in Spark that represents an immutable collection of data processed in parallel with fault tolerance through lineage.
RDD is List of logical partition , which can be distributed among executors.
🧠 Key Features
✅ 1. Resilient (Fault-tolerant)
- Can recover lost data using lineage
✅ 2. Distributed
- Data is split into partitions across nodes
✅ 3. Immutable
- Cannot be changed after creation
- New RDD is created after each operation
✅ 4. Lazy Evaluation
- Transformations are not executed immediately
📊 Simple Example
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
rdd = sc.parallelize([1,2,3,4])
rdd2 = rdd.map(lambda x: x * 2)
print(rdd2.collect())
🔄 How RDD Works Internally
RDD → Transformations → DAG → Execution → Result
🔹 Types of RDD
1️⃣ Parallelized RDD(Collection RDD)
rdd = spark.sparkContext.parallelize([1, 2, 3, 4])
🧠 Key Points
-
Data comes from driver memory
-
Used for:
-
Testing
-
Small datasets
-
Not used for big data processing ❌
- Testing
- Small datasets
2️⃣ External RDD(Hadoop RDD)
🧠 Data Sources
- HDFS
- S3
- Local file system
- Used in real-world big data
- Distributed across cluster
- Scalable
🧠 Lineage (VERY IMPORTANT)
👉 RDD keeps track of how it was created
Example:
rdd → map → filter → reduce
👉 If data lost → recomputed using lineage
⚡ Example of Fault Tolerance
-
If one partition fails:
👉 Spark recomputes only that partition
📊 RDD vs DataFrame
| Feature | RDD | DataFrame |
|---|---|---|
| Level | Low-level | High-level |
| Performance | Slower | Faster |
| Optimization | Manual | Automatic (Catalyst) |
| Type safety | Yes | Yes (schema-based) |
🔥 When to Use RDD?
✅ Use RDD:
- Low-level control needed
- Complex transformations
- Working with unstructured data
❌ Avoid RDD:
- Normal ETL → use DataFrame instead
🔥 What is a Transformation?
👉A transformation is an operation on an RDD/DataFrame that:
- Creates a new dataset
- Does NOT execute immediately (lazy)
🧠 Simple Idea
rdd2 = rdd.map(lambda x: x * 2)
👉 No execution yet ❌
👉 Just creating a new RDD ✅
⚙️ Key Characteristics
- ✅ Lazy execution
- ✅ Immutable (creates new dataset)
- ✅ Builds DAG (execution plan)
🔹 Types of Transformations
🧩 1. Narrow Transformations
👉 Data stays in the same partition
👉 No data movement (no shuffle)
👉 Fast ⚡
👉 Transformation in one partition does not need data from other partitions ( you can get result from independent partition)
👉 Eample : deleting where id <5 or filter where name like '%kumar'
Examples:
-
map() -
filter() -
flatMap()
📌 Example
rdd.map(lambda x: x * 2)
🔀 2. Wide Transformations
👉 Data moves across partitions
👉 Requires shuffle
👉 Expensive ⚠️
👉 Result needs data from other partitions
Examples:
-
groupByKey() -
reduceByKey() -
join()
reduceByKey() is almost always better than groupByKey()
👉reduceByKey() aggregates data BEFORE shuffle
👉groupByKey() shuffles ALL data first, then aggregates
🔥 Why do we get 200 partitions in wide transformations?
👉 By default, Spark uses:
spark.sql.shuffle.partitions = 200
🧠 What it Means
👉 During a wide transformation (shuffle) like:
df.groupBy("dept").count()
df.join(df2, "id")
👉 Spark repartitions the data into 200 partitions
🔥 Why 200?
👉 Default value chosen by Spark for parallelism
-
Enough tasks for distributed processing
-
Works well for medium/large clusters
⚡ Impact
✅ Advantages
-
More parallelism
-
Better resource utilization
❌ Problems (Very Common ⚠️)
1. Too many small tasks
Small data → 200 partitions → tiny tasks → overhead
2. Performance degradation
-
Task scheduling overhead
-
Increased shuffle cost
⚡ Example 1: join()
rdd1.join(rdd2)
🧠 What Happens
- Same keys from both RDDs must be together
- Data moves across nodes
👉 Heavy shuffle ⚠️
⚡ Example 2
🧾 Input RDD
rdd = sc.parallelize([("A",1), ("B",1), ("A",2), ("B",3)])
🔄 Operation
rdd.groupByKey()
🧠 What Happens
-
All values of key
"A"must go to same partition -
All values of key
"B"must go to same partition
👉 Requires shuffle
📊 Output
("A", [1,2])
("B", [1,3])
📊 Narrow vs Wide
| Feature | Narrow Transformation | Wide Transformation |
|---|---|---|
| Shuffle | ❌ No | ✅ Yes |
| Speed | Fast | Slower |
| Partition | Same | Re-distributed |
| Example | map, filter | groupBy, join |
================================================
Job , Stage , Task , Shuffle
🔥 1. What is a Job?
👉A Job is created when you trigger an action
📌 Example
df.show()
df.count()
👉 Each action = 1 Job
👉 Count actions
👉 1 job = 1 stage + 1 task atleast always
🔥 2. What is a Stage?
👉A Stage is a group of tasks that can run without shuffle
Stage are seperated by wide transformation such as groupby,join.
Stage is created when spark must shuffle data
- Spark divides a job into stages
-
Split happens at shuffle boundaries
👉 Count shuffle operations + 1
Types:
- ShuffleMapStage (before shuffle)
- ResultStage (final stage)
🔥 3. What is a Task?
👉A Task is the smallest unit of work
👉 Tasks per stage = number of partitions
👉Each task process one partition
👉task run in parallel on executor core
- Each partition = 1 task
Example:
4 partitions → 4 tasks
🔥 4. What is Shuffle?
👉Shuffle = Data movement across partitions/nodes
👉Shuffle = Data movement across partitions/nodes
👉No Shuffle = Data stays in same partition
Happens in:
- groupBy()
- join()
- reduceByKey()
🧠 What happens:
Node1 →\
→ Data exchange → New partitions
Node2 →/
👉 Expensive because:
- Network I/O
- Disk I/O
- Sorting
↓
--- SHUFFLE ---
↓
Stage 2 → groupBy
| Feature | Shuffle 🔀 | No Shuffle 🚀 |
|---|---|---|
| Data movement | Across nodes | Within same partition |
| Network I/O | High | None |
| Performance | Slow | Fast |
| Stage split | Yes | No |
| Transformation | Wide | Narrow |
⚡ Putting It All Together
Example 1:
df.filter().groupBy("dept").count().show()
Execution:
🧩 Step 1: Job
-
Triggered by
.show()
🔀 Step 2: Stages
- Stage 1 → filter (no shuffle)
- Stage 2 → groupBy (shuffle happens)
⚙️ Step 3: Tasks
- Each stage has tasks based on partitions
🔥 Full Flow
Action → Job → Stages → Tasks → Execution
🔥 Example 2
df1 = spark.read.csv("orders.csv")
df2 = spark.read.csv("customers.csv")
result = df1.filter(df1.amount > 100) \
.join(df2, "customer_id") \
.groupBy("country") \
.count()
result.show()
🧠 Step 1: Job Count
👉 Only one action:
result.show()
✅ Job Count = 1
🧠 Step 2: Identify Transformations
| Operation | Type | Shuffle? |
|---|---|---|
| filter() | Narrow | ❌ No |
| join() | Wide | ✅ Yes |
| groupBy() | Wide | ✅ Yes |
🧠 Step 3: Stage Breakdown
👉 Stages are split at shuffle boundaries
🔹 Stage 1 (Before 1st Shuffle)
-
Read
df1,df2 -
Apply
filter()
👉 No shuffle yet
🔹 Stage 2 (Join Shuffle)
-
join()→ shuffle required
🔹 Stage 3 (GroupBy Shuffle)
-
groupBy()→ another shuffle
✅ Stage Count = 3
🧠 Step 4: Task Count
👉 Depends on partitions
Assume:
spark.sql.shuffle.partitions = 4
🔹 Stage 1
-
Input split into 4 partitions
👉 4 tasks
🔹 Stage 2 (Join)
-
Shuffle → 4 partitions
👉 4 tasks
🔹 Stage 3 (GroupBy)
-
Shuffle → 4 partitions
👉 4 tasks
✅ Total Tasks = 4 + 4 + 4 = 12
| Component | Count |
|---|---|
| Jobs | 1 |
| Stages | 3 |
| Tasks | 12 |
🔥 Example 3 (Multiple Actions + Multiple Shuffles)
df = spark.read.csv("sales.csv")
df1 = df.filter(df.amount > 100)
df2 = df1.groupBy("region").sum("amount")
df3 = df2.filter(df2["sum(amount)"] > 1000)
df3.show()
df3.count()
🧠 Step 1: Job Count
👉 Actions:
df3.show()
df3.count()
✅ Job Count = 2
(Each action = separate job)
🧠 Step 2: Identify Transformations
| Operation | Type | Shuffle? |
|---|---|---|
| filter() | Narrow | ❌ No |
| groupBy() | Wide | ✅ Yes |
| filter() | Narrow | ❌ No |
🧠 Step 3: Stage Breakdown (per job)
🔹 Stage 1
- Read data
- filter(amount > 100)
👉 No shuffle
🔹 Stage 2
- groupBy("region") → shuffle happens
🔹 Stage 3
- filter(sum > 1000)
👉 No shuffle (but runs after shuffle stage)
✅ Stage Count per Job = 3
🧠 Step 4: Task Count
Assume:
spark.sql.shuffle.partitions = 3
🔹 Stage 1
👉 3 partitions → 3 tasks
🔹 Stage 2 (Shuffle)
👉 3 partitions → 3 tasks
🔹 Stage 3
👉 3 partitions → 3 tasks
✅ Tasks per Job = 9
✅ Total Tasks (2 jobs) = 18
| Component | Count |
|---|---|
| Jobs | 2 |
| Stages per job | 3 |
| Total Stages | 6 |
| Tasks per job | 9 |
| Total Tasks | 18 |
🔥 What is a Join in Spark?
👉A join combines data from two datasets based on a common key
🧠 Simple Example
df1.join(df2, "id")
👉 Combines rows where id matches
📊 Types of Joins in Spark
| Join Type | Description |
|---|---|
| Inner Join | Only matching rows |
| Left Join | All left + matching right |
| Right Join | All right + matching left |
| Full Outer Join | All rows from both sides |
| Left Semi Join | Only matching rows from left (no right) |
| Left Anti Join | Non-matching rows from left |
| Cross Join | Cartesian product |
🔹 Examples
✅ Inner Join
df1.join(df2, "id", "inner")
✅ Left Join
df1.join(df2, "id", "left")
✅ Left Anti Join (VERY IMPORTANT 🔥)
df1.join(df2, "id", "left_anti")
👉 Returns rows in df1 not present in df2
🔥 Join Types Based on Execution
⚙️ 1. Shuffle hash Join (Default)
👉 Most common
- Data is shuffled across nodes
- Expensive
🧠 Idea:
- Shuffle both datasets
- Build hash table on one side
- hash table is created for all partition
✅ Use When:
- Medium datasets
- Hash fits in memory
- Both datasets are large
⚡ 2. Broadcast hash Join (Optimized 🚀)
👉 Small table is sent to all nodes
from pyspark.sql.functions import broadcast
df1.join(broadcast(df2), "id")
✅ Benefits:
- No shuffle
- Very fast
⚙️ 3. Sort-Merge Join
👉 Used for large datasets
- Data sorted before join in partition
- Efficient for big data
✅ Use When:
- Both datasets are large
⚠️ Cost:
- Shuffle + sort (expensive)
🧠 Steps:
-
Shuffle both datasets
-
Sort by join key
-
Merge
⚙️ 4. Bucket Join
👉 Uses bucketing
-
Avoids shuffle if:
- Same bucket column
- Same number of buckets
📊 Join Strategy Comparison
| Join Type | Shuffle | Best For |
|---|---|---|
| Shuffle Join | Yes | Large datasets |
| Broadcast Join | No | Small + large table |
| Sort-Merge Join | Yes | Large sorted data |
| Bucket Join | No* | Pre-bucketed tables |
🔥 Performance Tips
✅ Use Broadcast Join
- When one table is small
✅ Avoid Skew
- Uneven keys → slow joins
✅ Use Proper Partitioning
- Reduces shuffle
✅ Use Bucketing
- For repeated joins
🔥1. How Spark Decides Join Strategy (Very Important)
Spark doesn’t always use the same join — it chooses based on:
📊 Factors:
- Table size
- Configurations
- Statistics (if available)
- Join type
🚀 2.Broadcast Hash Join (Deep)
👉 Fastest join
How it works:
Small table → broadcast to all executors
Large table → streamed
Example:
from pyspark.sql.functions import broadcast
df1.join(broadcast(df2), "id")
🔥 Key Points:
- No shuffle ❌
- Memory dependent ⚠️
- Controlled by:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10MB)
⚙️ 3. Sort-Merge Join (Default for Big Data)
👉 Most common for large datasets
Steps:
- Shuffle both datasets
- Sort on join key
- Merge rows
🔥 Why used?
- Scalable for big data
- Works when data doesn’t fit in memory
⚙️ 4. Shuffle Hash Join
👉 Less common
- One side hashed
- Both sides shuffled
⚠️ 5. Data Skew in Joins (VERY IMPORTANT 🔥)
👉 Problem:
- One key has too many records
Example:
id=1 → millions of rows
👉 Causes:
- One task becomes slow
- Job delay
🔧 Solutions:
✅ 1. Salting Technique
df.withColumn("salt", rand())
✅ 2. Broadcast Small Table
- Avoid shuffle
✅ 3. Skew Join Optimization (Spark 3+)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
⚡ 6. Join Optimization Techniques
✅ Predicate Pushdown
- Filter before join
df1.filter(...).join(df2)
✅ Column Pruning
- Select only required columns
✅ Repartition Before Join
df.repartition("id")
👉 Ensures better distribution
🔄 7. Join Reordering (Catalyst Optimizer)
👉 Spark can reorder joins:
(A JOIN B) JOIN C
may become:
B JOIN C → JOIN A
👉 Chooses cheapest path
🔥 8. Broadcast Hint (Manual Control)
df1.join(df2.hint("broadcast"), "id")
👉 Forces broadcast join
⚠️ 9. Common Mistakes
❌ Not filtering before join
❌ Joining huge datasets without partitioning
❌ Ignoring skew
❌ Selecting unnecessary columns
🔍 10. How to Check Join Type
👉 Use:
df.explain(True)
Output shows:
- BroadcastHashJoin
- SortMergeJoin
- etc.
🔥 1. Use Broadcast Join (BIGGEST WIN 🚀)
👉 If one table is small → broadcast it
from pyspark.sql.functions import broadcast
df_large.join(broadcast(df_small), "id")
✅ Why?
- No shuffle ❌
- Very fast ⚡
🔥 2. Filter Data Before Join
👉 Reduce data size early
df1_filtered = df1.filter(df1.status == "ACTIVE")
df1_filtered.join(df2, "id")
✅ Benefit:
- Less data shuffled
- Faster execution
🔥 3. Select Only Required Columns (Column Pruning)
df1.select("id", "name").join(df2.select("id", "salary"), "id")
✅ Benefit:
- Less memory usage
- Faster shuffle
🔥 4. Handle Data Skew (VERY IMPORTANT ⚠️)
👉 Problem:
- One key has huge data → slow task
🔧 Solution 1: Salting
from pyspark.sql.functions import rand
df.withColumn("salt", (rand()*10).cast("int"))
🔧 Solution 2: Enable AQE
spark.conf.set("spark.sql.adaptive.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
🔥 5. Repartition Before Join
df1 = df1.repartition("id")
df2 = df2.repartition("id")
✅ Why?
- Ensures same key in same partition
- Reduces shuffle overhead
🔥 6. Use Bucketing (For Repeated Joins)
df.write.bucketBy(8, "id").saveAsTable("table")
✅ Benefit:
- Avoid shuffle in joins
🔥 7. Tune Shuffle Partitions
spark.conf.set("spark.sql.shuffle.partitions", 200)
Rule:
- Too high → overhead
- Too low → less parallelism
🔥 8. Use Correct Join Type
👉 Example:
df1.join(df2, "id", "left_semi")
✅ Benefit:
- Avoids unnecessary data transfer
🔥 9. Avoid Cross Join ❌
df1.crossJoin(df2)
👉 Very expensive (Cartesian product)
🔥 10. Check Execution Plan
df.explain(True)
👉 Look for:
- BroadcastHashJoin ✅
- SortMergeJoin ⚠️
📊 Summary Table
| Optimization | Benefit |
|---|---|
| Broadcast join | No shuffle, fastest |
| Filter early | Less data processed |
| Column pruning | Less memory usage |
| Repartition | Better distribution |
| Bucketing | Avoid shuffle |
| Handle skew | Avoid slow tasks |
| Tune partitions | Better parallelism |
| Feature | RDD 🧱 | DataFrame 📊 | Dataset 🧠 |
|---|---|---|---|
| Level | Low-level | High-level | High-level |
| Data Structure | Distributed objects | Table (rows & columns) | Typed objects |
| Schema | ❌ No | ✅ Yes | ✅ Yes |
| Type Safety | ✅ Yes | ❌ No | ✅ Yes |
| Performance | ❌ Slower | ✅ Faster | ✅ Faster |
| Optimization | ❌ Manual | ✅ Catalyst optimizer | ✅ Catalyst optimizer |
| API | Functional (map, filter) | SQL / DSL | Strongly typed API |
| Language Support | Python, Java, Scala | Python, Java, Scala | Scala, Java (not Python ⚠️) |
| Use Case | Complex logic | ETL, analytics | Type-safe transformations |
🔥 1. RDD (Resilient Distributed Dataset)
🧠 What it is
- Low-level distributed collection of objects
- Core abstraction of Spark
⚙️ How it Works
Data → RDD → Transformations → DAG → Execution
- No optimization layer
- Executes exactly what you write
📌 Example
rdd = sc.parallelize([1,2,3,4])
rdd.map(lambda x: x * 2).collect()
✅ Advantages
- Full control over execution
- Fine-grained transformations
- Works with unstructured data
❌ Disadvantages
- No optimization → slower
- No schema
- Harder to write & maintain
🧠 Internal Concept (Important 🔥)
- Uses lineage for fault tolerance
- No Catalyst optimizer
- More JVM object overhead
📍 When to Use
- Complex logic not supported in DataFrame
- Low-level transformations
📊 2. DataFrame
🧠 What it is
- Distributed table-like structure
- Similar to SQL table (rows + columns)
⚙️ How it Works
Data → Logical Plan → Catalyst Optimizer → Physical Plan → Execution
📌 Example
df.filter(df.age > 25).select("name").show()
🔥 Internal Architecture
1. Logical Plan
- Represents query
2. Catalyst Optimizer
-
Optimizes query:
- Predicate pushdown
- Column pruning
- Join optimization
3. Physical Plan
-
Chooses execution strategy:
- Broadcast join
- Sort merge join
4. Tungsten Engine
- Memory optimization
- Code generation
✅ Advantages
- Very fast 🚀
- Automatic optimization
- Easy SQL support
❌ Disadvantages
- No compile-time type safety
- Less control than RDD
📍 When to Use
- ETL pipelines
- Data analytics
- Almost all production workloads
🧠 3. Dataset (Advanced)
🧠 What it is
- Combination of RDD + DataFrame
- Provides type safety + optimization
⚙️ How it Works
Dataset → Logical Plan → Catalyst → Physical Plan → Execution
👉 Same engine as DataFrame, but typed
📌 Example (Scala)
case class Person(name: String, age: Int)
val ds = spark.createDataset(Seq(Person("A", 25)))
🔥 Key Concept: Encoders
👉 Converts JVM objects ↔ Spark internal format
✅ Advantages
- Compile-time type safety
- Optimized like DataFrame
- Less runtime errors
❌ Disadvantages
- Not available in Python ⚠️
- Slight overhead vs DataFrame
📍 When to Use
- Scala/Java projects
- When type safety is critical
📊 Deep Comparison
| Feature | RDD 🧱 | DataFrame 📊 | Dataset 🧠 |
|---|---|---|---|
| Abstraction Level | Low | High | High |
| Schema | ❌ No | ✅ Yes | ✅ Yes |
| Type Safety | ✅ Yes | ❌ No | ✅ Yes |
| Optimization | ❌ No | ✅ Catalyst | ✅ Catalyst |
| Performance | ❌ Slow | ✅ Fast | ✅ Fast |
| API | Functional | SQL + DSL | Typed API |
| Language Support | All | All | Scala/Java only |
================================================Exchange/Shuffle
🔥 1. What is “Exchange” in Spark?
👉Exchange = Shuffle boundary in execution plan
👉 It represents:
- Data redistribution across partitions
- Movement of data between stages
🧠 Simple
Exchange = Shuffle = Stage Break
🔥 2. Read Exchange & Write Exchange
🔹 Write Exchange
👉 Happens before shuffle
👉 Spark writing shuffled output to disk so that downstream tasks can read it.
What it does:
- Writes data from current stage
- Partitions data based on key
🔹 Read Exchange
👉 Happens after shuffle
👉Spark is reading data that has been shulled in previous step
What it does:
- Reads shuffled data
- Feeds into next stage
🔄 Flow
Stage 1
↓
Write Exchange (shuffle write)
↓
--- Data moves across nodes ---
↓
Read Exchange (shuffle read)
↓
Stage 2
🔥 3. Example
df.groupBy("dept").count().explain(True)
🧠 Execution Plan (Simplified)
== Physical Plan ==
HashAggregate
Exchange hashpartitioning(dept, 200)
HashAggregate
👉 That Exchange means:
- Data is shuffled based on dept
- New stage starts
🔥 4. Real Meaning
| Term | Meaning |
|---|---|
| Write Exchange | Writing shuffle data |
| Read Exchange | Reading shuffled data |
| Exchange | Full shuffle operation |
⚡ 5. When Exchange Happens?
👉 During wide transformations
- groupBy
- join
- distinct
🔥 6. Why Important?
👉 Exchange is:
- Expensive ⚠️
- Network heavy
- Disk heavy
👉 More exchanges = slower job
================================================
AQE (Adaptive Query Execution)
🔥 1. What is AQE?
👉AQE = Adaptive Query Execution
👉 Spark dynamically optimizes query at runtime (not just at planning time)
👉Static Plan ❌ → AQE makes it Dynamic ✅
⚙️ Enable AQE
spark.conf.set("spark.sql.adaptive.enabled", True)
🔥 2. Why AQE is Needed?
👉 Without AQE:
- Spark fixes plan before execution
- Doesn’t know actual data size
👉 With AQE:
- Spark adjusts based on real data at runtime
🔥 3. How AQE Reduces Shuffle Read
👉 Main idea:Reduce unnecessary data read during shuffle
🔹 1. Coalescing Shuffle Partitions (BIGGEST 🔥)
👉 Problem:
Default = 200 partitions → many small files
👉 AQE Solution:
Merge small partitions → fewer partitions
Example
Before AQE:
200 partitions → 200 shuffle reads ❌
After AQE:
200 → 20 partitions → 20 reads ✅
Config:
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True)
🔹 2. Skew Join Optimization
👉 Problem:
One partition = huge → slow task
👉 AQE Solution:
- Splits large partitions
- Balances workload
Result:
- Faster execution
- Less heavy shuffle read
Config:
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
🔹 3. Dynamic Join Strategy (Huge 🚀)
👉 AQE can change join type at runtime
Example:
Planned → SortMergeJoin ❌
Runtime → BroadcastJoin ✅
👉 Result:
- Avoids shuffle completely
- Reduces shuffle read to zero
🔹 4. Local Shuffle Read
👉 AQE tries to read data locally
Instead of remote fetch → local read
Benefit:
- Less network I/O
- Faster reads
📊 Summary Table
| AQE Feature | How it Reduces Shuffle Read |
|---|---|
| Coalesce partitions | Fewer partitions to read |
| Skew join optimization | Balanced partition sizes |
| Dynamic join selection | Avoid shuffle (broadcast) |
| Local shuffle read | Reduce network data transfer |
🔥 Before vs After AQE
Without AQE:
200 partitions → heavy shuffle → slow ❌
With AQE:
20 partitions → optimized → fast ✅
Driver Memory Management
🔥 1. What is Driver?
👉Driver = Main process of Spark application
- Runs your code
- create sparksession
- builds logical/physical plan
- Creates DAG
- Schedules jobs-stage-task
- track task status n retries
- Collects results
🔥 2. What is Driver Memory?
- Memory allocated to driver process
- driver memory is not used for heavy data processing(executor do that)
--driver-memory 4G
🧠 What Uses Driver Memory?
🔹 1. Application Code
- Variables
- Objects
🔹 2. Metadata (Very Important 🔥)
- DAG n query plan
- Stages -job- task metadata
- spark configs
- scheduler state
🔹 3. Result Data
👉 When you do:
df.collect()df.take()df.head()
- Data comes to driver memory
- accumulators
- broadcast vaiable metadata
⚠️ BIG Reason of OOM
🔹 4. Broadcast Variables
👉 Stored on driver before sending to executors
🔹 5. Accumulators
👉 Maintained on driver
🔥 3. Memory Flow
Executors → process data
↓
Driver ← receives results (collect/show)
⚠️ 4. Common Driver OOM (VERY IMPORTANT)
❌ 1. Using collect() on large data
df.collect()
👉 Loads entire dataset into driver → crash ❌
❌ 2. Using toPandas()
df.toPandas()
👉 Converts full dataset → stored in driver memory
❌ 3. Large show()
df.show(1000000)
👉 Loads huge data into driver
❌ 4. Large Broadcast Variables
broadcast(large_df)
👉 Driver must hold it first
❌ 5. Too Many Partitions (Metadata overload)
Millions of tasks → driver stores metadata → memory issue
❌ 6. Collecting in loops
for x in df.collect():
👉 Repeated memory usage
🔥 5. How to Manage Driver Memory
✅ 1. Avoid collect()
Use:
df.show()
df.take(10)
✅ 2. Use write instead of collect
df.write.parquet("output/")
✅ 3. Limit data
df.limit(1000).collect()
✅ 4. Increase driver memory
--driver-memory 8G
✅ 5. Avoid large broadcast
👉 Only broadcast small tables
✅ 6. Reduce partitions metadata
df.coalesce(50)
✅ 7. Use streaming approach
👉 Process in chunks instead of full load
🔥 6. Driver vs Executor Memory
| Feature | Driver | Executor |
|---|---|---|
| Role | Control + coordination | Data processing |
| Stores | Metadata + results | Actual data |
| OOM risk | collect() | large partitions |
👉JVM Heap = Memory for Spark data & execution
👉Overhead Memory = Extra memory outside heap (system + native usage)
📊 2. Comparison Table
| Feature | JVM Heap 🧠 | Overhead Memory ⚙️ |
|---|---|---|
| Type | Inside JVM | Outside JVM |
| Used for | Data, execution, cache | Native memory, Python, JVM meta |
| Config | --executor-memory | --executor-memoryOverhead |
| Managed by | JVM | OS / Spark |
| OOM Type | Java heap OOM | Container killed (YARN/K8s) |
🔥 3. JVM Heap Memory
- This is the main memory used by Spark
- Managed by java garbage collector
📌 Used for:
- spark execution n storage memory (for executor)
- spark plan , metadata, object
- RDD / DataFrame data
- Cached data
- Shuffle data
- Execution (joins, aggregations)
- for drivr-dag , plan , scheduling metadata
⚙️ Config
--executor-memory 8Gspark.driver.memory=10GB
🧠 Example
Executor Memory = 8GB
→ JVM Heap = ~8GB
❌ Heap OOM Error
java.lang.OutOfMemoryError: Java heap space
🔥 4. Memory Overhead (non heap / off heap)
👉Memory used outside JVM heap but inside spark container
Not managed by GC
📌 Used for:
- Python processes (PySpark)
- JVM metadata
- Native libraries
- Off-heap memory
- Shuffle buffers
- thread stacks
⚙️ Config
🧠 Default
Usually:
Max(10% of executor memory, 384MB)
❌ Overhead OOM Error
Container killed by YARN for exceeding memory limits
🔥 5. Total Memory
Total Container Memory =
JVM Heap + Overhead Memory
📌 Example
Executor Memory = 8GB
Overhead = 2GB
Total = 10GB
🔥 6. Why Overhead is Important (VERY IMPORTANT ⚠️)
👉 Especially in PySpark
- Python runs outside JVM
- Needs extra memory
⚠️ Problem
👉 If overhead too low:
Container killed ❌
🔥 7. Real Example
--executor-memory 8G
--executor-memoryOverhead 512M ❌ too low
👉 💥 Crash
✅ Fix
--executor-memory 8G
--executor-memoryOverhead 2G ✅
🔥 8. When to Increase What?
| Problem Type | Increase What? |
|---|---|
| Java heap OOM | Heap memory |
| Container killed | Overhead memory |
| PySpark job | Overhead memory |
| Heavy joins / caching | Heap memory |
🔥 1. Core Difference (1-liner)
👉On-Heap = Memory inside JVM (GC managed)
👉Off-Heap = Memory outside JVM (no GC)
📊 2. Comparison Table
| Feature | On-Heap 🧠 | Off-Heap ⚙️ |
|---|---|---|
| Location | Inside JVM heap | Outside JVM (native memory) |
| Managed by | JVM Garbage Collector | Spark / OS |
| Performance | Slower (GC overhead) | Faster (no GC pauses) |
| Stability | Safer (auto managed) | Risky (manual handling) |
| Default | Enabled | Disabled |
| Config | --executor-memory | spark.memory.offHeap.* |
🔥 3. On-Heap Memory
👉 Default memory used by Spark
📌 Used for:
- RDD / DataFrame data
- Cached data
- Execution (joins, aggregations)
⚙️ Config
--executor-memory 8G
⚠️ Problem
Large data → frequent Garbage Collection → slow performance
🔥 4. Off-Heap Memory
👉 Memory allocated outside JVM
📌 Used for:
- Tungsten engine (optimized execution)
- Binary data processing
- Shuffle buffers
- Serialized data
⚙️ Enable
spark.conf.set("spark.memory.offHeap.enabled", True)
spark.conf.set("spark.memory.offHeap.size", "4g")
🚀 Benefit
- Less GC
- Faster execution
- Better memory efficiency
🔄 5. Memory Layout
Total Memory
├── On-Heap (JVM)
└── Off-Heap (Native)
⚠️ 6. Risks of Off-Heap
- Memory leaks possible
- Harder to debug
- Needs careful tuning
🔥 7. When to Use What?
✅ Use On-Heap (default)
- Most workloads
- Small/medium data
✅ Use Off-Heap
- Large datasets
- Heavy joins / aggregations
- GC becoming bottleneck
📊 8. Example
================================================Executor Memory = 8GB (on-heap)
Off-Heap = 4GB
Total usable ≈ 12GB
🔥 1. What is Executor Memory?
👉Memory allocated to each executor process
--executor-memory 8GExecutor responsible for:
- running task
- holding data partition
- perform shuffle
- caching data
- writing output
🔥 2. Total Executor Memory Layout
Executor Memory
├── JVM Heap (On-Heap)
│ ├── Execution Memory(join,sort,shuffle,aggregation)
│ └── Storage Memory(rdd , df)
| |___user memory (udf,user object,metadata)
└── Overhead Memory (Off-Heap + system)
🧠 3. Inside JVM Heap (MOST IMPORTANT 🔥)
Spark divides heap into:
🔹 1. Execution Memory ⚙️
👉 Used for:
- Joins
- Aggregations
- Shuffle operations
- Sorting
🔹 2. Storage Memory 📦(caching)
- spark evict cache blocks
- evicted blocks are recomputed later
👉 Used for:
-
Cached data (
cache,persist) - Broadcast variables
🔄 Unified Memory Model/management
Execution ↔ Storage (can borrow from each other)JVM Heap (Executor) ├── Reserved Memory (~300MB) └── Unified Memory ├── Execution Memory ⚙️ └── Storage Memory 📦
🔥 Important Rules
👉Execution can evict storage
👉Storage cannot evict execution
⚡ Example
- If no caching → execution uses more memory
- If heavy caching → storage uses more
👉 Solves:
- Memory wastage ❌
- Improves utilization ✅
- Better performance 🚀
Case 3 : SM can used EM storageCase 4 : SM can not evict EM space . So SM will evict its own space from least used data.
🔥 4. Overhead Memory
👉 Outside JVM heap , not managed by GC
overhead cannot spill to disk - container gets killed
Used for:
- Python (PySpark)
- Native libraries
- JVM metadata
- panda udf
- arrow buffer
⚙️ Config
--executor-memoryOverhead 2Gmemoryoverhead=max(384mb , 10% of execuotor heap)
🔥 5. Memory Flow Example
Task starts
↓
Execution memory used (join/groupBy)
↓
Data cached → storage memory
↓
Shuffle → uses execution memory
⚠️ 6. Common Executor OOM Issues
❌ 1. Large shuffle / join
👉 Not enough execution memory
❌ 2. Too much caching
df.cache()
👉 Storage memory full
❌ 3. Data skew
👉 One partition too large
❌ 4. Large partitions
👉 Task can't fit in memory
❌ 5. Low overhead memory (PySpark)
👉 Container killed
🔥 7. How to Fix (Production Tips 🚀)
✅ 1. Increase executor memory
--executor-memory 16G
✅ 2. Tune partitions
df.repartition(200)
👉 Smaller partitions → less memory per task
✅ 3. Avoid over-caching
👉 Cache only needed data
✅ 4. Use broadcast join
👉 Avoid heavy shuffle
✅ 5. Enable AQE
spark.conf.set("spark.sql.adaptive.enabled", True)
✅ 6. Increase overhead (PySpark)
--executor-memoryOverhead 2G
📊 8. Key Configs
| Config | Purpose |
|---|---|
--executor-memory | JVM heap |
--executor-memoryOverhead | Extra memory |
spark.memory.fraction | Heap usage split |
spark.sql.shuffle.partitions | Shuffle parallelism |
🔥 9. Real Example
Executor Memory = 8GB
→ 60% usable by Spark
→ Split between execution & storage
================================================
🔥 What is Data Spill?
Data spill = Memory overflow → Disk usage
When an executor is doing heavy operations (like joins, aggregations, sorts), it tries to keep data in memory.
If memory is insufficient, Spark spills data from RAM to disk.
🧠 Why Spill Happens
1. Not enough executor memory
-
spark.executor.memorytoo low - Large dataset or skewed data
2. Wide transformations
Operations like:
-
groupBy -
reduceByKey -
join -
orderBy
👉 These require shuffle + buffering, which consumes a lot of memory
3. Large partitions
- Few partitions → each partition is huge
- One executor gets too much data
4. Data skew
- Some keys have huge data → one partition overloaded
5. Inefficient joins
- Shuffle join instead of broadcast join
- No partitioning strategy
⚙️ Types of Spill
1. Memory Spill (Shuffle Spill)
- Happens during shuffle operations
- Data is written to disk as temporary files
2. Disk Spill
- When memory completely fills
- Spark writes serialized data to disk
📊 Where It Happens (Internally)
Inside executor:
- Execution Memory (for shuffle, join, aggregation)
- If exceeded → spill triggered
👉 Comes under Unified Memory Management
⚠️ Impact of Spill
Spilling is not a failure, but it slows down jobs:
- Disk I/O is much slower than RAM
- Increased job time
- More GC overhead
- Possible disk space issues
🚀 How to Avoid / Reduce Spill
✅ 1. Increase memory
spark.executor.memory=8g
spark.executor.memoryOverhead=2g
✅ 2. Increase partitions
df.repartition(200)
✔ Smaller partitions = less memory per task
✅ 3. Use Broadcast Join
from pyspark.sql.functions import broadcast
df1.join(broadcast(df2), "id")
✔ Avoids shuffle → less memory pressure
✅ 4. Handle data skew
- Salting technique
- Skew join optimization (AQE)
✅ 5. Enable AQE (Adaptive Query Execution)
spark.sql.adaptive.enabled=true
✔ Automatically:
- Reduces shuffle partitions
- Handles skew
- Optimizes joins
✅ 6. Use efficient transformations
-
Prefer
reduceByKeyovergroupByKey - Filter early
✅ 7. Persist wisely
df.persist()
✔ Avoid recomputation (but don’t overuse)
================================================
Here it will face OOM cause of skewness only one partition is large and can notspill everything to disk case we need data in executor to perform transformation
1. What is Data Skew (Skewness)?
👉 Data skew = uneven data distribution across partitions
Instead of data being evenly spread:
Partition 1 → 10 rows
Partition 2 → 12 rows
Partition 3 → 1,000,000 rows ❌ (skewed)
🧠 Why Skew Happens
- Some keys appear very frequently
- Example:
customer_id:
A → 1M records
B → 10 records
C → 5 records
👉 During shuffle:
- One executor gets A → huge data
- Others finish fast → one task becomes bottleneck
⚠️ Problems Caused by Skew
- Slow job (one task runs forever)
- Executor memory issues
- Data spill to disk
- CPU underutilization
📍 Where You See Skew
- Joins
- groupBy / aggregations
- reduceByKey
- orderBy
🔍 How to Detect Skew
In Spark UI:
- One task takes much longer than others
- Uneven input sizes across tasks
🧂 2. What is Salting?
👉 Salting = technique to break skewed keys into smaller chunks
Instead of sending all "A" records to one partition → distribute them.
🧠 Idea
Original skewed key:
A → 1M rows
After salting:
A_1 → 200k
A_2 → 200k
A_3 → 200k
A_4 → 200k
A_5 → 200k
👉 Now data is evenly distributed across partitions
⚙️ How Salting Works (Step-by-Step)
Step 1: Add random salt to skewed key
from pyspark.sql.functions import col, rand, floor
df1 = df1.withColumn("salt", floor(rand() * 5))
df1 = df1.withColumn("new_key", col("key") + col("salt"))
Step 2: Expand smaller dataset (important!)
from pyspark.sql.functions import explode, array
df2 = df2.withColumn("salt", explode(array([0,1,2,3,4])))
df2 = df2.withColumn("new_key", col("key") + col("salt"))
Step 3: Join on salted key
df1.join(df2, "new_key")
📊 Before vs After Salting
| Without Salting | With Salting |
|---|---|
| One partition overloaded | Even distribution |
| Slow execution | Faster |
| Spill to disk | Reduced spill |
⚠️ When to Use Salting
✔ Use when:
- Severe skew (one key dominating)
- Large joins causing bottleneck
❌ Avoid when:
- Data is already balanced
- Small dataset (use broadcast instead)
🚀 Alternatives to Salting
✅ 1. Broadcast Join
broadcast(df_small)
✅ 2. AQE (Adaptive Query Execution)
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
👉 Spark automatically handles skew
✅ 3. Repartition by key
df.repartition("key")
================================================
🔥 What is Caching?
👉 Caching = storing data in memory to reuse it without recomputation
By default, Spark:
- Recomputes data every time (due to lazy evaluation)
Caching avoids this.
🧠 Why Caching is Needed
Without cache:
df = spark.read.csv("data.csv")
df.filter("age > 30").count()
df.filter("age > 30").show()
👉 Spark will:
- Read file twice ❌
- Apply filter twice ❌
✅ With Cache
df = spark.read.csv("data.csv")
df.cache()
df.filter("age > 30").count()
df.filter("age > 30").show()
👉 Spark will:
- Read once ✅
- Store in memory ✅
- Reuse data ✅
⚙️ How Caching Works Internally
- You call:
df.cache()
- Nothing happens immediately (lazy)
- On first action:
df.count()
👉 Spark:
- Computes DAG
- Stores partitions in executor memory
-
Next action:
👉 Uses cached data (skips recomputation)
📦 Where Data is Stored
- Executor memory (RAM)
- If memory full → spill to disk
🧱 Storage Levels
Default:
MEMORY_AND_DISK
Other options:
| Level | Meaning |
|---|---|
| MEMORY_ONLY | Fastest, may lose data if memory full |
| MEMORY_AND_DISK | tries in memory first , Spill to disk if needed |
| DISK_ONLY | Stored only on disk n slow |
| MEMORY_ONLY_SER | Serialized (less memory, more CPU) 2 time copy |
Example:
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_ONLY)
🔁 Cache vs Persist
| Cache | Persist |
|---|---|
| Shortcut | Flexible |
| Default = MEMORY_AND_DISK | You choose storage level |
👉 cache() = persist(MEMORY_AND_DISK)
⚠️ When to Use Caching
✅ Use when:
- Same DataFrame used multiple times
- Iterative algorithms (ML)
- Expensive transformations
❌ Avoid when:
- Used only once
- Data is too large (won’t fit memory)
- Causes memory pressure → spill
🚨 Common Mistakes
❌ 1. Forgetting action
df.cache() # nothing happens yet
👉 Need action:
df.count()
❌ 2. Over-caching
- Caching too many DataFrames → OOM / spill
❌ 3. Not unpersisting
df.unpersist()
👉 Frees memory
🔍 How to Check Cache
In Spark UI:
- Storage tab → shows cached RDD/DataFrame
🚀 Cache vs Checkpoint
| Cache | Checkpoint |
|---|---|
| Stored in memory | Stored in HDFS/disk |
| Fast | Slower |
| Can be lost | Reliable |
📊 Real Use Case
df = spark.read.parquet("big_data")
df = df.filter("country = 'IN'").cache()
df.count()
df.groupBy("city").count().show()
df.select("name").show()
👉 Filter runs once → reused everywhere
================================================
🔥 What is persist()?
👉 persist() = explicitly store a DataFrame/RDD with a chosen storage level
It gives you control over how and where data is stored (memory, disk, serialized, etc.)
🧠 Why persist()?
Because cache() is limited:
-
Always uses default:
MEMORY_AND_DISK
👉 persist() lets you choose:
- Memory only
- Disk only
- Serialized formats
- Combination of memory + disk
⚙️ Syntax
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_ONLY)
📦 Storage Levels Explained
1. MEMORY_ONLY
df.persist(StorageLevel.MEMORY_ONLY)
✔ Fastest
❌ If memory is insufficient → data lost → recomputed
2. MEMORY_AND_DISK (default)
df.persist(StorageLevel.MEMORY_AND_DISK)
✔ Safe (spills to disk if memory full)
✔ Balanced performance
3. DISK_ONLY
df.persist(StorageLevel.DISK_ONLY)
✔ Saves memory
❌ Slower (disk I/O)
4. MEMORY_ONLY_SER
df.persist(StorageLevel.MEMORY_ONLY_SER)
✔ Uses less memory (serialized)
❌ More CPU (serialization/deserialization)
5. MEMORY_AND_DISK_SER
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
✔ Memory efficient + safe
✔ Common in large datasets
🔁 Cache vs Persist
| Feature | cache() | persist() |
|---|---|---|
| Control | ❌ No | ✅ Yes |
| Default level | MEMORY_AND_DISK | User-defined |
| Ease | Simple | Flexible |
👉 cache() = persist(MEMORY_AND_DISK)
⚙️ Internal Working
- Call:
df.persist(...)
- Spark marks dataset for persistence (lazy)
- First action:
df.count()
👉 Spark:
- Executes DAG
- Stores partitions in executor memory/disk
-
Next actions:
👉 Reuse stored data (no recomputation)
⚠️ When to Use persist()
✅ When:
- Dataset reused multiple times
- You want control over storage
- Data is large → need serialization
🚨 When NOT to Use
❌ Single use dataset
❌ Very large data that doesn’t fit even with disk
❌ Memory already under pressure (can cause spill)
🧹 Important: Unpersist
df.unpersist()
👉 Always clean up:
- Frees executor memory
- Avoids OOM
🔍 Real Example
from pyspark import StorageLevel
df = spark.read.parquet("big_data")
df = df.filter("country = 'IN'")
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
df.count()
df.groupBy("city").count().show()
df.select("name").show()
👉 Filter runs once → reused efficiently
================================================
🔥 What is Partition Pruning?
👉 Partition pruning = reading only required partitions instead of full data
Instead of scanning entire dataset:
year=2022
year=2023
year=2024
If your query is:
WHERE year = 2024
👉 Spark reads only year=2024 partition ✅
🧠 Why It Matters
Without pruning:
- Full data scan ❌
- High I/O ❌
- Slow performance ❌
With pruning:
- Less data read ✅
- Faster queries ✅
- Lower cost ✅
📦 How Data is Stored (Partitioned)
Example:
/data/
year=2023/
year=2024/
year=2025/
👉 These folders are partitions
⚙️ How Partition Pruning Works
Spark:
- Reads metadata (folder structure)
- Applies filter on partition column
- Skips unnecessary partitions
✅ Example
df = spark.read.parquet("/data/sales")
df.filter("year = 2024").show()
👉 Spark:
- Skips other folders
-
Reads only
year=2024
🚨 Important Rule
👉 Filter must be on partition column
✔ Works:
df.filter("year = 2024")
❌ Won’t prune:
df.filter("year + 1 = 2025") # transformation breaks pruning
🔍 Types of Partition Pruning
1. Static Partition Pruning
- Happens at query compile time
- Known filter value
WHERE year = 2024
2. Dynamic Partition Pruning (DPP)
👉 Happens during joins
SELECT *
FROM fact f
JOIN dim d
ON f.id = d.id
WHERE d.country = 'IN'
👉 Spark:
- Filters dim table
- Uses it to prune fact table partitions dynamically
⚡ Example (Join Optimization)
Without DPP:
- Full fact table scan ❌
With DPP:
- Only relevant partitions scanned ✅
🔧 Enable DPP
spark.sql.optimizer.dynamicPartitionPruning.enabled=true
⚠️ When Partition Pruning Fails
❌ Using functions on partition column
❌ Data not partitioned properly
❌ Filtering on non-partition column
❌ Too many small partitions (over-partitioning)
================================================
🔥 What does “writing into partition” mean?
👉 Partitioning while writing = storing data physically in folders based on column values
Instead of one big file:
/data/sales/
part-0001.parquet
👉 With partitioning:
/data/sales/
country=IN/
country=US/
country=UK/
⚙️ How to Write Partitioned Data
✅ Basic Syntax
df.write \
.partitionBy("country") \
.mode("overwrite") \
.parquet("/data/sales")
📦 Output Structure
If data is:
country | year | amount
IN | 2024 | 100
US | 2024 | 200
IN | 2023 | 150
👉 Output:
/data/sales/
country=IN/
part-0001.parquet
country=US/
part-0002.parquet
🧠 Multiple Partition Columns
df.write \
.partitionBy("country", "year") \
.parquet("/data/sales")
👉 Structure:
country=IN/year=2024/
country=IN/year=2023/
country=US/year=2024/
🔁 Write Modes
| Mode | Meaning |
|---|---|
| overwrite | Replace existing data |
| append | Add new data |
| ignore | Skip if exists |
| error | Fail if exists |
⚠️ Important Concepts
1. Partition Columns Not Stored Inside File
👉 They are stored as folder names, not inside data
2. High Cardinality Problem
❌ Bad:
.partitionBy("user_id")
👉 Creates millions of folders → performance issue
✔ Good:
- Partition by low-cardinality columns (country, date)
3. Small File Problem
Too many partitions → too many small files ❌
👉 Fix:
df.repartition(10).write.partitionBy("country")
4. Repartition Before Writing
df.repartition("country")
.write.partitionBy("country")
✔ Ensures better distribution
✔ Reduces shuffle issues
🔍 Static vs Dynamic Partition Writing
Static Partition
df.write.partitionBy("country")
👉 Spark decides partitions based on data
Dynamic Partition (Hive-style)
INSERT INTO table PARTITION (country)
SELECT ...
🚀 Best Practices
✔ Partition on frequently filtered columns
✔ Avoid too many partitions
✔ Combine with partition pruning
✔ Use coalesce() to reduce small files
📊 Real Example
df = spark.read.csv("sales.csv")
df.write \
.mode("overwrite") \
.partitionBy("year", "month") \
.parquet("/data/sales")
👉 Query:
df.filter("year = 2024")
👉 Only year=2024 folder is read (partition pruning) 🚀
Comments
Post a Comment