Prep chatgpt
26 October 2024 12:56
Why corrupted records occur?
• While reading data (like JSON, CSV, etc.), sometimes records don’t match the expected schema.
• Example: a malformed JSON string, missing columns, or incorrect data types.
How Spark handles corrupted records?
Spark provides different modes and options to handle them:
1. Using mode option (CSV, JSON, Parquet)
• PERMISSIVE (default): Keeps corrupt record, fills missing values with null.
• DROPMALFORMED: Discards corrupted records.
• FAILFAST: Fails immediately if a corrupt record is found.
✅ Example:
df = spark.read \
.option("mode", "PERMISSIVE") \
.json("/path/data.json")
2. Using columnNameOfCorruptRecord
• For JSON/CSV, Spark can capture bad records in a special column.
• Default column name = _corrupt_record.
• You can rename it using spark.sql.columnNameOfCorruptRecord.
✅ Example:
spark.conf.set("spark.sql.columnNameOfCorruptRecord", "bad_data")
df = spark.read \
.option("mode", "PERMISSIVE") \
.json("/path/data.json")
df.select("bad_data").where("bad_data IS NOT NULL").show()
This way, you can analyze corrupted rows separately.
3. Using badRecordsPath (from Spark 2.3+)
• Instead of keeping corrupted data in _corrupt_record, Spark can write bad records into a separate path (JSON format).
✅ Example:
df = spark.read \
.option("badRecordsPath", "/path/to/badRecords") \
.json("/path/input/")
Corrupted rows will be saved for debugging, and valid records will load into the DataFrame.
4. Schema Evolution & Casting
• Sometimes records look corrupted because schema inference fails.
• Best practice: define explicit schema instead of relying on inference.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True)
])
df = spark.read.schema(schema).json("/path/data.json")
Interview Tip Answer (short & professional)
“In Spark, corrupted records are handled using different modes: PERMISSIVE (default), DROPMALFORMED, and FAILFAST. For JSON
and CSV, we can capture bad records in a special column (_corrupt_record) or direct them to a path using badRecordsPath. Best
practice is to define explicit schema to minimize schema mismatches.”
Json file reading
Q1. How do you read and flatten a nested JSON file in Spark?
Answer:
When we read nested JSON in Spark, the schema often contains struct or array types. To flatten it, we use explode() for arrays and dot
pyspark Page 1
When we read nested JSON in Spark, the schema often contains struct or array types. To flatten it, we use explode() for arrays and dot
notation (col("field.subfield")) for structs.
✅ Example:
from pyspark.sql.functions import col, explode
df = spark.read.json("/path/nested.json")
# Flatten struct fields
flat_df = df.select(
col("id"),
col("user.name").alias("user_name"),
col("user.address.city").alias("city")
)
# Explode array
exploded_df = flat_df.withColumn("orders", explode(col("user.orders")))
This gives a tabular structure from nested JSON, which is easier for downstream use.
Q2. JSON file has an array of structs inside it. How do you handle it in Spark?
Answer:
If JSON contains an array of structs, I use explode() to convert the array into multiple rows, and then select inner fields with dot notation.
✅ Example:
df = spark.read.json("/path/orders.json")
df2 = df.withColumn("order", explode("orders")) \
.select("id", "order.order_id", "order.amount")
This way, each order becomes a separate row.
Q3. What’s the difference between reading multiline JSON and line-delimited JSON with nested structures?
Answer:
• For line-delimited JSON, each line is a JSON object, so Spark reads it directly.
• For multiline JSON (where objects are spread across multiple lines), we must set .option("multiline", "true").
df = spark.read.option("multiline", "true").json("/path/nested.json")
Without multiline=true, Spark may throw errors or mark records as _corrupt_record.
Q4. How do you handle dynamic or deeply nested JSON where schema changes?
Answer:
For dynamic nested JSON, I prefer these approaches:
1. Use schema_of_json + from_json to define schema dynamically.
2. Use get_json_object or from_json with a schema for selective parsing.
3. If schema evolves frequently, I ingest JSON in raw format (Bronze layer), then apply transformations in Silver/Gold layers.
Q5. Suppose your JSON has multiple nested arrays (e.g., user -> orders -> items). How would you flatten it?
Answer:
I apply multiple levels of explode() until all arrays are flattened.
✅ Example:
df = spark.read.json("/path/nested.json")
df2 = df.withColumn("order", explode("user.orders")) \
.withColumn("item", explode("order.items")) \
.select("user.id", "order.order_id", "item.item_id", "item.price")
This gives a fully flattened dataset with user, order, and item details.
✅ Interview Tip:
When answering nested JSON questions, always mention:
• Use of explode for arrays.
• Dot notation for struct fields.
• multiline=true for multiline JSON.
• Best practice: Define schema explicitly instead of relying on inference.
pyspark Page 2
Q1. What is Catalyst Optimizer / Spark SQL Engine?
The Catalyst Optimizer is the query optimization engine in Spark SQL. It transforms logical plans into optimized logical plans using rule -based and cost-based optimization. Finally, it
generates a physical plan, which Spark executes as RDD operations.
Q2. Why do we get AnalysisException error?
An AnalysisException occurs when Spark fails to resolve references during query analysis — common reasons include missing columns, invalid table names, schema mismatches, or
accessing unregistered temporary views. It happens in the analysis phase before execution.
Q3. What is Catalog?
A Catalog is Spark’s metadata store that keeps information about databases, tables, views, and functions. It is used by the analyzer to resolve references in queries. In Databricks with
Unity Catalog, it also manages permissions and data governance.
Q4. What is Physical Planning / Spark Plan?
Physical Planning is the process where Spark converts an optimized logical plan into one or more physical plans (execution strategies). Spark chooses the best plan using cost-based
optimization, and the final selected plan is called the SparkPlan, which executes on the cluster.
Q5. Is Spark SQL Engine a Compiler?
Yes, Spark SQL Engine works like a compiler. It parses SQL queries, generates logical and physical plans, applies optimizatio ns, and finally compiles them into Java bytecode that runs on
the JVM over Spark’s execution engine.
Q6. How many phases are involved in Spark SQL engine to convert code into Java bytecode?
There are four main phases:
1. Parsing → SQL/DSL is converted into an unresolved logical plan.
2. Analysis → References are resolved using the Catalog (resolved logical plan).
3. Optimization → Catalyst Optimizer applies rules to create an optimized logical plan.
4. Physical Planning → Generates the best execution plan and compiles it into Java bytecode for execution.
✅ Interview Tip:
Answer in a structured way — Logical Plan → Optimized Plan → Physical Plan → Execution. It shows deep understanding and practical experience.
1. What is Parquet file format?
Parquet is a columnar storage format designed for big data processing. Unlike row-based formats (CSV, JSON), it stores data column-wise, which improves
compression and speeds up analytical queries by reading only the required columns.
2. Why do we need Parquet?
We need Parquet because it:
• Reduces storage cost with efficient compression and encoding.
• Improves performance with column pruning and predicate pushdown.
• Is highly compatible with big data tools like Spark, Hive, ADF, and Databricks.
3. How to read Parquet file?
• In Spark (PySpark): df = spark.read.parquet("path")
• In Pandas: pd.read_parquet("path")
• In SQL engines: Register Parquet tables and query directly.
4. What makes Parquet default choice?
• Columnar storage → faster analytical queries.
• Efficient compression & encoding → smaller storage footprint.
• Schema evolution support → handles changing data structures.
• Compatibility with Hadoop ecosystem & cloud platforms → widely adopted standard.
5. What encoding is done on data?
Parquet uses multiple encodings to save space and improve performance, such as:
• Dictionary Encoding – replaces repeated values with integer keys.
• Run-Length Encoding (RLE) – compresses repeated sequences.
• Delta Encoding – stores differences instead of raw values.
6. What compression techniques are used?
Parquet supports compression codecs like:
pyspark Page 3
Parquet supports compression codecs like:
• Snappy (default) → fast, balanced.
• Gzip → higher compression, slower.
• LZO, Brotli, ZSTD → trade-offs between speed and compression.
7. How to optimize the Parquet file?
• Use appropriate partitioning and bucketing in data lake.
• Apply ZORDER BY or clustering for better filtering.
• Use column pruning by selecting only required columns.
• Use efficient compression codec depending on workload.
8. What is row group, column, and page?
• Row Group: Logical chunk of rows; all column data for these rows is stored together.
• Column Chunk: Data of a single column within a row group.
• Page: Smallest storage unit (usually 8KB–1MB), storing actual encoded values.
9. How projection pruning and predicate pushdown works?
• Projection pruning → Reads only the selected columns instead of full dataset. Example: SELECT name FROM table will not read other columns.
• Predicate pushdown → Filters data at storage level before reading into memory. Example: WHERE age > 30 will skip irrelevant row groups.
Join Strategies in Spark
1. Shuffle Sort-Merge Join (SMJ)
• Spark shuffles both datasets on join keys, sorts them, and then merges.
• Best for: Large datasets when both sides are big and sorted joins are efficient.
• Downside: Expensive shuffle & sort → can be slow.
2. Shuffle Hash Join
• Spark shuffles both datasets on join keys, then builds a hash table on one side and probes with the other.
• Best for: Medium-sized tables when one side can fit in memory after shuffle.
• Faster than SMJ if data distribution is good.
3. Broadcast Hash Join (BHJ)
• Spark broadcasts the smaller dataset to all executors, builds a hash table in memory, and probes with the larger dataset.
• Best for: Joining a very small dataset with a large dataset.
• Very fast (avoids shuffle), but the small dataset must fit in memory.
4. Cartesian Join
• Produces cross join (every row from left × every row from right).
• Best for: Rare cases like generating combinations.
• Downside: Extremely expensive for large datasets (row explosion).
5. Broadcast Nested Loop Join (BNLJ)
• Spark broadcasts the smaller dataset and does a nested loop scan with the larger dataset.
• Best for: When there is no join condition (e.g., cross join with filters).
• Downside: Very costly if data is large.
✅ Summary for Interviews:
• Broadcast Hash Join → fastest when one dataset is small.
• Shuffle Sort-Merge Join → default for large joins.
• Shuffle Hash Join → good alternative when memory is available.
• Cartesian / Nested Loop Joins → only for special cases, usually avoided.
What is OO in S ar
OOM stands for Out Of Memory. It happens when Spark’s driver or executor runs out of allocated memory and cannot store more objects
(RDD/DataFrame, shuffle data, broadcast variables, etc.). This usually causes the job to fail with an OutOfMemoryError.
Wh d we get dri er OO
Driver OOM happens when too much data is collected or stored in the driver.
Examples:
• Using .collect() on huge datasets.
• Large result sets being sent back to the driver.
• Storing large broadcast variables or metadata in the driver’s memory.
What is dri er erhead mem r
Driver (and executor) overhead memory is extra memory reserved outside the JVM heap to handle things like:
• Garbage collection (GC)
• Spark internal metadata
• Off-heap allocations and user data structures
If this overhead memory is insufficient, Spark may still throw OOM even if heap looks free.
mm n reas n t get a dri er OO
• Running collect(), take(), show() on very large DataFrames.
• Improper caching of large datasets in the driver.
• Storing huge logs, metrics, or accumulators in memory.
• Broadcasting very large variables to executors.
w t handle OO
• Avoid collect() — use actions like take(), limit() or write to storage.
• Increase driver/executor memory using configs (--driver-memory, --executor-memory).
pyspark Page 4
• Increase driver/executor memory using configs (--driver-memory, --executor-memory).
• Use persist() / cache() wisely (only when reused).
• Use broadcast joins only when small enough to fit memory.
• Optimize partitions to balance shuffle and avoid skew.
• Tune spark.memory.fraction and spark.executor.memoryOverhead for workloads.
pyspark Page 5
pyspark Page 6