0% found this document useful (0 votes)
64 views18 pages

Spark SQL Optimization - Real Case Studies

The document discusses various optimization techniques for Spark SQL, including join optimization using broadcast joins, predicate pushdown for efficient data reading, caching to avoid recomputation, and handling data skew in joins. It also covers the benefits of using columnar file formats like Parquet, Delta Lake optimizations, and strategies for efficient aggregations. Each technique is illustrated with use cases, unoptimized and optimized code examples, and performance improvement metrics.

Uploaded by

Akash Nahak
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
64 views18 pages

Spark SQL Optimization - Real Case Studies

The document discusses various optimization techniques for Spark SQL, including join optimization using broadcast joins, predicate pushdown for efficient data reading, caching to avoid recomputation, and handling data skew in joins. It also covers the benefits of using columnar file formats like Parquet, Delta Lake optimizations, and strategies for efficient aggregations. Each technique is illustrated with use cases, unoptimized and optimized code examples, and performance improvement metrics.

Uploaded by

Akash Nahak
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd

Spark SQL Optimization

1: Optimize JOIN Queries in Spark SQL

🔷 Topic: Join Optimization


📌 Explanation:
Join operations are expensive in Spark due to shuffling data across nodes. You can
optimize joins by:

• Using broadcast joins for small tables.


• Controlling join types (broadcast, sort-merge, shuffle hash).
• Ensuring partitioning and data skew are handled.

📊 Use Case:
You have a users table with 10M records and a country_codes table with only 200
records. You want to join them to get the country name for each user.

❌ Unoptimized Spark SQL:


users_df.createOrReplaceTempView("users")
countries_df.createOrReplaceTempView("countries")
# Regular join without optimization
[Link]("""
SELECT u.user_id, [Link], c.country_name
FROM users u
JOIN countries c
ON u.country_code = c.country_code
""")

⚠️ Problem:
This causes a shuffle join (both datasets are shuffled), which is inefficient because
countries is small and doesn’t need to be shuffled.

✅ Optimized Spark SQL: Use Broadcast Join


# Broadcast the smaller table
from [Link] import broadcast
optimized_df = users_df.join(

broadcast(countries_df),
users_df.country_code == countries_df.country_code,
"inner"
)

optimized_df.select("user_id", "name", "country_name").show()

✅ Alternate (SQL syntax):


[Link]("""
SELECT /*+ BROADCAST(c) */
u.user_id, [Link], c.country_name
FROM users u
JOIN countries c
ON u.country_code = c.country_code
""")

🔍 Why This Works:


• Broadcast join sends the small table (countries) to all executors.
• Avoids shuffle — faster performance, especially with large users dataset.
• Works best when one table is small (<10 MB by default) — configurable.

📈 Performance Improvement:
Metric Unoptimized Optimized (Broadcast)
Shuffle Read Size High Low
Join Time ~12s ~3s
Stage Failures Possible Fewer

Perfect! Let’s continue with the next Spark SQL optimization case studies — following the
same structure of topic explanation, use case, and optimized vs. unoptimized code.

✅ Case Study 2: Predicate Pushdown in


Spark SQL
🔷 Topic: Predicate Pushdown
📌 Explanation:
Predicate Pushdown means applying filters at the data source level, so that Spark reads
only the relevant data. This:

• Reduces I/O and network usage.


• Speeds up query execution.
• Works best with formats like Parquet, ORC, and JDBC sources.

📊 Use Case:
You're querying a Parquet dataset of 1 TB of sales data, but only need records from
January 2024.

❌ Unoptimized Spark SQL:


# Reading full file then filtering
df = [Link]("s3://data-lake/sales/")
filtered = [Link]("sale_date >= '2024-01-01' AND sale_date < '2024-
02-01'")
[Link]("sale_id", "amount").show()

⚠️ Problem:
If schema inference is triggered or data is cached early, Spark may read the entire 1 TB,
then filter in memory — wasting I/O and time.
✅ Optimized Spark SQL (Pushdown Enabled):
# Apply filter during read itself (pushdown)
df = [Link] \
.option("basePath", "s3://data-lake/sales/") \
.parquet("s3://data-lake/sales/year=2024/month=01/")

[Link]("sale_id", "amount").show()

Or, using a path and partition filter:

df = [Link]("s3://data-lake/sales/")
filtered = [Link]("year = 2024 AND month = 1")
[Link]("sale_id", "amount").show()

🔍 Why This Works:


• Pushdown filters before loading data into Spark.
• Works best when the data is partitioned by date fields.
• Spark reads only year=2024/month=01/, skipping the rest.

📈 Performance Improvement:
Metric Unoptimized Optimized (Pushdown)
Data Read 1 TB ~80 GB (Jan only)
Read Time ~90s ~12s
CPU Usage High Low
✅ Case Study 3: Caching and Persistence
🔷 Topic: Caching and Persistence
📌 Explanation:
If a DataFrame is reused multiple times in a pipeline or across queries, caching avoids
recomputation.
Use .cache() or .persist(StorageLevel) to store it in memory or disk.

📊 Use Case:
You run 5 analytics queries on a heavy transformation of a 100M row DataFrame.

❌ Unoptimized:
# Expensive transformation computed 5 times
transformed = [Link]("net_price", [Link] * (1 - [Link]))
[Link]("category = 'electronics'").count()
[Link]("category").agg({"net_price": "avg"}).show()
# ... and 3 more actions

✅ Optimized:
from pyspark import StorageLevel

# Cache after first transformation


transformed = [Link]("net_price", [Link] * (1 -
[Link])).cache()

# Run queries
[Link]("category = 'electronics'").count()
[Link]("category").agg({"net_price": "avg"}).show()
# ... other queries

Or use .persist(StorageLevel.MEMORY_AND_DISK)if memory is tight.

🔍 Why This Works:


• Without caching: Spark recomputes lineage for each action.
• With caching: Transformation is computed once, reused efficiently.

📈 Performance Improvement:
Metric Unoptimized Optimized (Cached)
Total Time (5 queries) ~180s ~60s
CPU Load High Lower
Memory Usage Low Higher (intentional)

✅ Case Study 4: Skew Join Optimization


in Spark SQL
🔷 Topic: Skew Join Handling
📌 Explanation:
Data skew occurs when one or more keys in a join have disproportionately more rows
than others. This causes:

• One executor to do most of the work.


• Long-running stages and uneven load.
• Possible out-of-memory errors.

📊 Use Case:
You are joining a transactions table (2B records) with a merch ant table
s (500K records).
But 60% of the transactions belong to a single merchant.

❌ Unoptimized Spark SQL:


# Skewed join on merchant_id
transactions_df.join(merchants_df, "merchant_id").select("txn_id",
"merchant_name").show()

⚠️ Problem:
• Most merchant_ids are balanced.
• One merchant_id (say, M12345) appears 1.2B times.
• This causes a hot partition, poor performance, and executor OOM.
✅ Optimized Spark SQL:
Option 1: Salting the skewed key (manual skew fix)

from [Link] import col, concat_ws, lit, rand

# Add salt to transactions


salted_txns = transactions_df.withColumn("salt", (rand() *
10).cast("int"))
salted_txns = salted_txns.withColumn("skewed_key", concat_ws("_",
col("merchant_id"), col("salt")))

# Duplicate skewed keys in merchants 10 times (salt replication)


replicated_merchants = merchants_df \
.filter(col("merchant_id") == "M12345") \
.withColumn("salt", explode(array([lit(i) for i in range(10)]))) \
.withColumn("skewed_key", concat_ws("_", col("merchant_id"),
col("salt")))
# Normal keys remain same
normal_merchants = merchants_df.filter(col("merchant_id") != "M12345")
# Union replicated and normal
final_merchants =
rep lic at ed_ mer ch ant s. uni on ByN am e(n orm a l_m erc ha nts .wi th Col um
ey", col("merchant_id")))
# Final join
result = salted_txns.join(final_merchants,
"skewed_key").select("txn_id", "merchant_name")
Option 2: Set Spark skew optimization config (automatic for > Spark 3.0):

[Link]("[Link]", "true")
[Link]("[Link]", "true")

🔍 Why This Works:


• Salting splits the skewed key into multiple smaller keys, balancing load.
• Adaptive Skew Join (in Spark 3.0+) automatically detects and splits large partitions
at runtime.

📈 Performance Improvement:
Metric Unoptimized Optimized (Salting)
Runtime ~200s ~60s
Stage Failure Risk High Low
Executor Memory Load Imbalanced Balanced

✅ Case Study 5: File Format + Partition


Pruning in Spark SQL

🔷 Topic: File Format and Partition Pruning


📌 Explanation:
Two key performance boosters:
• Columnar Formats: Use Parquet/ORC instead of CSV/JSON.
• Partition Pruning: Read only needed partitions based on query filters.

📊 Use Case:
Reading product catalog data partitioned by categor yand brand.

❌ Unoptimized Spark SQL:


# CSV read, no pruning
df = [Link]("/mnt/products/")
[Link]("category = 'Electronics' AND brand = 'Samsung'").show()

⚠️ Problem:
• CSV is row-based — slow read, poor compression.
• No pruning — reads entire folder structure.

✅ Optimized Spark SQL:


# Use Parquet and filters
df = [Link]("/mnt/products/")
# Use partition pruning with filters
result = [Link]("category = 'Electronics' AND brand = 'Samsung'")
[Link]("product_id", "price").show()

Or, directly specify paths:


# Read specific partitions (best for large data)
df =
spa rk. re ad. par qu et( "/ mnt /p rod uc ts/ cat e gor y=E le ctr oni cs /br an
)

🔍 Why This Works:


• Parquet is a compressed, columnar format — faster read and scan.
• Spark prunes directory partitions at read time using filter predicates.

📈 Performance Improvement:
Metric CSV + No Pruning Parquet + Pruning
Load Time ~70s ~6s
Disk Read Full Dataset Only Needed Parts
Compression
Low High
Ratio

Great! Let's continue with the next two Spark SQL optimization case studies:

✅ Case Study 6: Delta Lake Optimization


in Spark SQL
🔷 Topic: Delta Lake Optimization (ZORDER, Data
Skipping, Vacuum, Compaction)

📌 Explanation:
Delta Lake is a storage layer that brings ACID transactions and schema enforcement to
Spark. But to make queries faster and scalable, you must use:

• ZORDER: Optimizes data layout for faster filtering on specific columns.


• Data Skipping: Leverages statistics to avoid scanning unnecessary files.
• Vacuum: Cleans up stale files.
• Compaction: Merges many small files into large ones for performance.

📊 Use Case:
You manage a Delta table /delta /ev en ts/
with 5 years of IoT event data. Most queries
filter on device_id and event_date.

❌ Unoptimized Delta Lake Usage:


# Query without ZORDER or compaction
df = [Link]("delta").load("/delta/events/")
[Link]("device_id = 'D1002' AND event_date = '2023-08-01'").count()

⚠️ Problem:
• Query scans many small files (~millions).
• No data clustering → slow scans even if partitions exist.
✅ Optimized Delta Lake Usage:
Step 1: Compaction (CoalescingFiles)

# Coalesce into fewer files


(
[Link]("delta").load("/delta/events/")
.repartition(10) # Tune as needed
.[Link]("dataChange", "false")
.format("delta")
.mode("overwrite")
.save("/delta/events/")
)

Step 2: Z-Ordering on Filter Columns

OPTIMIZE delta.`/delta/events/` ZORDER BY (device_id, event_date)

Note: OPTIMIZE & are Databricks-only features (or Delta Lake OSS 2.0+ with
Z ORD ER
Photon).
Step 3: Vacuum Old Files

VACUUM delta.`/delta/events/` RETAIN 168 HOURS

🔍 Why This Works:


• ZORDER clusters column values across files to reduce file scans.
• Data skipping uses min/max stats to skip irrelevant files.
• Vacuum deletes obsolete files — keeps storage clean.
• Compaction improves read performance and parallelism.
📈 Performance Improvement:
Metric Unoptimized Optimized (ZORDER + Compact)
Query Time ~120s ~8s
Files Scanned ~800K ~100
Disk IO High Minimal

✅ Case Study 7: Aggregation Optimization


in Spark SQL

🔷 Topic: Aggregation Tuning


📌 Explanation:
Aggregations can be costly, especially on large datasets. You can optimize them via:

• Partial aggregation (map-side combine)


• Approximate aggregations (approx_count_distinct)
• Efficient group keys (avoid high-cardinality)

📊 Use Case:
You analyze user behavior on an e-commerce site with 5 billion click records and need to:

• Count distinct users per region


• Calculate total time spent per session

❌ Unoptimized:
# Heavy exact aggregation
df. gro up By( "re gi on"
). agg (
countDistinct("user_id").alias("unique_users"),
sum("session_time").alias("total_time")
).show()

✅ Optimized:
Option 1: Use Approximate Aggregation for Large Data

from [Link] import approx_count_distinct

[Link]("region").agg(
approx_count_distinct("user_id").alias("unique_users"),
sum("session_time").alias("total_time")
).show()

Option 2: Enable Map-Side Combine

[Link]("[Link]", "true")

Option 3: Use Bucketed Tables if grouping by same column repeatedly

CREATE TABLE user_logs_bucketed


USING PARQUET
CLUSTERED BY (region) INTO 32 BUCKETS
AS SELECT * FROM user_logs

🔍 Why This Works:


• Approximate methods use probabilistic data structures (like HyperLogLog++).
• Map-side combine reduces shuffle size by aggregating locally first.
• Bucketing helps Spark avoid full shuffle during grouping.

📈 Performance Improvement:
Metric Unoptimized Optimized
Aggregation Time ~90s ~12s
Memory Usage High Lower
Shuffle Data Large Reduced

✅ Summary of All Optimization Techniques So Far


Case Study Key Technique
Join Optimization Broadcast joins
Predicate Pushdown Filtering during read
Caching and Persistence Avoid recomputation
Skew Join Optimization Salting / Adaptive skew join
File Format & Partition Pruning Use Parquet + prune with filters
Delta Lake Optimization ZORDER, Vacuum, Compaction
Aggregation Optimization Map-side, approximate, bucketing

You might also like