0% found this document useful (0 votes)
36 views166 pages

B LSC CD W1 Geiv Yx BAmc EE3 U

This document provides over 200 scenario-based questions and answers to help prepare for Data Engineering interviews, focusing on key concepts in PySpark and Databricks. It covers various topics including DataFrame creation, data loading, handling null values, and advanced transformations like joins and window functions. Additionally, it includes guidance on how to use the material effectively and links to further resources for study.

Uploaded by

vigneshone998
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
36 views166 pages

B LSC CD W1 Geiv Yx BAmc EE3 U

This document provides over 200 scenario-based questions and answers to help prepare for Data Engineering interviews, focusing on key concepts in PySpark and Databricks. It covers various topics including DataFrame creation, data loading, handling null values, and advanced transformations like joins and window functions. Additionally, it includes guidance on how to use the material effectively and links to further resources for study.

Uploaded by

vigneshone998
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd

Scenario-Based Questions and Answer (200+)

Prepare for your Data Engineering interviews with over 200+ scenario-based questions
covering key concepts in PySpark and Databricks:
PySpark Topics:
➢ Creating DataFrames & Schema Addition: Build and define schemas for structured
data.
➢ Loading Activities: Loading data from CSV and JSON sources.
➢ Column Functions: Master operations like select, selectExpr, filter, withColumn,
withColumnRenamed, and more.
➢ Handling Data: Learn how to handle distinct values, drop duplicates, orderBy,
groupBy, and use fillna for null handling.
➢ Sorting & Limiting: Use orderBy and limit effectively.
➢ String Functions: Handle text data with functions like concat, split, trim, and other
string manipulation techniques.
➢ Joins & Window Functions: Understand joins (inner, outer, etc.) and window
functions for advanced transformations.
➢ Cast Functions & Union: Master type casting, union, and unionAll operations.
➢ Repartition & Coalesce: Efficient partition management for optimized performance.
➢ Broadcast Variables: Improve performance with broadcasting in large datasets.
➢ Spark Architecture: Focus on partitioning, bucketing, join optimizations, and overall
performance improvements.
Databricks Topics:
➢ Delta Tables: Deep dive into Delta Lake features like ACID transactions, schema
evolution, and time travel.
➢ Structured Streaming: Stream real-time data with ease.
➢ Auto Loader: Automate data ingestion for real-time updates.
➢ Unity Catalog: Manage your data governance and security efficiently with Unity
Catalog.
➢ Data Lineage (DLT): Capture data lineage and maintain transparency in your
pipelines.
How to Use the Material:
1. Start from the Basics: Begin with easy questions, then progress to more advanced
scenarios as you gain confidence.
2. PySpark is a Must: Make sure you cover all PySpark-related topics thoroughly, as they
form the foundation of data engineering roles.
3. Structured Streaming & DLT: Optional for coding practice but a great bonus if you
want to specialize further in real-time data processing.

Check below material in top mate for notes and referral


1.Python 200 + Int question and Answer - https://topmate.io/shivakiran_kotur/1337666
2.Spark Quick notes and pyspark basic - https://topmate.io/shivakiran_kotur/1244562
3.ADF basic important Scenario based Q n A -
https://topmate.io/shivakiran_kotur/1244608
4.Azure data Engineer Must do Question before Interview -
https://topmate.io/shivakiran_kotur/1244480
5.Dp 203 quick notes and dump question Screenshot -
https://topmate.io/shivakiran_kotur/1214127
6.Have any doubts do connect with me 1:1 on Topmate:
https://topmate.io/shivakiran_kotur/1208377

I kindly request you to share your feedback as you go through the questions. Your insights
will help me tremendously in improving the material. Also, feel free to ask for additional
resources or clarification wherever needed. Your journey to becoming a Data Engineering
expert is just a few steps away!
1.How would you read data from a CSV file in batch mode in PySpark on Databricks?
Answer: To read data from a CSV file in batch mode, you can use the spark.read.format()
API. Here’s an example:

2. How can you read multiple JSON files in batch mode?


Answer: You can use a directory path or wildcard pattern to read multiple JSON files.

3. How do you handle multi-line JSON or nested JSON records while reading in PySpark?
Answer: For JSON files with multi-line records, set the multiLine option to true.

4. How do you read Parquet files with schema merging in batch mode?
Answer: Enable mergeSchema to handle schema evolution when reading Parquet files.
5. How can you read Avro files in batch mode in Databricks?
Answer: Ensure that the avro package is available, and use the format avro to read the files.

6. How do you read ORC files in PySpark in batch mode?


Answer: Use the format orc to read ORC files.

7. How do you read a Delta table in Databricks in batch mode?


Answer: Delta tables can be read using the format delta.

8. How do you read XML files in batch mode in Databricks?


Answer: Use the spark-xml package to read XML files.
9. How do you read Excel files in batch mode?
Answer: Use the com.crealytics.spark.excel package to read Excel files.

10. How do you read binary files in batch mode?


Answer: Use the binaryFile format to read binary files, such as images.

11. How do you handle files with missing values while reading in batch mode?
Answer: You can set nullValue and nanValue options to handle missing data.
12. How do you read multiple file types (e.g., CSV, JSON) from a single directory?
Answer: PySpark doesn’t support mixed file types directly. You can filter files by extension
using Python and then read them.

13. How would you handle reading data from multiple files in batch processing?
Answer: To read data from multiple files, you can provide a path with wildcards or a list of
file paths. For example, to read all CSV files in a directory:
*.csv: Reads all CSV files in the specified directory.

14. How can you read data from an external database (e.g., SQL) in batch mode?
Answer: To read data from an external database (like MySQL or PostgreSQL) in batch mode,
use the jdbc() method:
• option("url", jdbc_url): Specifies the JDBC URL to connect to the database.
• option("dbtable", "table_name"): Specifies the table to read from.

15. How can you handle data partitioning when reading large datasets in batch mode?
Answer: To optimize the reading of large datasets, you can use the partitionBy option or
specify how data should be split into partitions using the spark.read.option() API:

repartition(10): Creates 10 partitions for the data. You can use coalesce() to reduce
partitions when writing data.
16. How do you read data from a CSV file where the schema is already known in batch
mode?
Answer: When you know the schema beforehand, you can define the schema using
StructType and StructField, and pass it to the spark.read method.
schema(schema): Passes the schema to avoid inferring schema, which can be slow for large
datasets.
17. How would you read a large CSV file with multiple delimiters in batch mode?
Answer: To read a CSV file with multiple delimiters, you can use a custom delimiter or
regular expressions to parse the file properly.

option("delimiter", "|"): Specifies the delimiter used in the CSV file (could be a comma,
tab, etc.).
18. How would you read data from a CSV file in Databricks and infer the schema
automatically?
Answer: To read data from a CSV file with automatic schema inference, you can use the
option("inferSchema", "true") in Databricks.
• option("inferSchema", "true"): Enables automatic schema inference.
• option("header", "true"): Treats the first row as column names.
Note: Using inferSchema in PySpark can be inefficient for large datasets because it requires
scanning the entire dataset (or a significant portion) to determine the schema, which adds
overhead. It's better to define the schema explicitly for better performance and to ensure
consistency.
19. What are the different read modes in PySpark, and how do they handle bad data?
Answer: PySpark provides the following read modes to handle corrupt or missing data when
reading files:
• PERMISSIVE (default): Loads all records and puts corrupt records in a separate
column (e.g., _corrupt_record for JSON).
• DROPMALFORMED: Discards bad records and loads only valid records.
• FAILFAST: Fails the job if any corrupt records are encountered.
20. How do you use FAILFAST mode to ensure data integrity while reading a CSV file?
Answer: The FAILFAST mode ensures that the job fails if any row contains bad or corrupt
data, preserving data quality.

21. In what scenarios would you use DROPMALFORMED mode?


Answer: Use DROPMALFORMED mode when you want to skip rows with corrupt data
without interrupting the job execution.
Example with a CSV file:

22. How can you handle corrupt JSON records when reading a file in PySpark?
Answer: You can use the default PERMISSIVE mode to handle corrupt JSON records. It
places problematic records in a special column called _corrupt_record.

23. What happens if you don’t specify a read mode in PySpark?


Answer: If no read mode is specified, PySpark defaults to PERMISSIVE mode. It loads all
data and puts corrupt records in a special column like _corrupt_record.
Example:
24. How do you handle malformed records in a Parquet file?
Answer: Since Parquet is a columnar format, it doesn't allow corrupt rows. However, you
can specify a schema to ensure that any unexpected data types cause a failure.

25. How do you handle empty files while reading in PySpark?


Answer: You can use the ignoreCorruptFiles configuration to skip empty or corrupted files.

26. What is the purpose of badRecordsPath in Databricks, and how do you use it?
Answer: The badRecordsPath option in Databricks (or Spark) is used to handle malformed or
corrupt records during reading data. When enabled, any records that fail to meet the
expected schema or format (e.g., improperly formatted JSON or CSV rows) are stored in a
separate directory specified by badRecordsPath. This helps in isolating problematic data for
debugging or further inspection without interrupting the processing of valid data.
Example Usage:
if any rows in the sample.csv file are malformed or fail to parse correctly, they will be saved
to /mnt/bad_records for further review. This allows you to identify and resolve issues with
the bad records without affecting the overall data processing pipeline.

27. How would you write a DataFrame to a Delta table in batch processing?
Answer: To write data to a Delta table, you use the write.format("delta") method. Here’s
how you would do it:

mode("overwrite"): Overwrites the existing Delta table. You can also use append to add
data to the existing Delta table.
28. How would you write batch data to a file system (e.g., HDFS or S3)?
Answer: To write batch data to a file system like HDFS or S3, you can specify the output path
with the respective file format:

29. How do you write data from a batch process to a database in Databricks?
Answer: You can use the jdbc format to write data to a database. Here’s an example of
writing data to MySQL:
30. How would you read data from an external database (PostgreSQL) in batch mode and
write the data to a Delta table?
Answer: To read data from an external database like PostgreSQL, use the jdbc() method,
and then write the data to a Delta table:

option("dbtable", "table_name"): Specifies the table to read from the PostgreSQL


database.
format("delta"): Writes the data to Delta format.
31. How would you read and write data from a Kafka topic in Databricks in batch mode?
Answer: To read from a Kafka topic in batch mode, you use the read.format("kafka") API,
and to write data to Kafka, you use the write.format("kafka") API.
option("kafka.bootstrap.servers", "localhost:9092"): Specifies Kafka brokers.
option("subscribe", "topic_name"): Subscribes to the Kafka topic.

32.Explode the Nested Json File

➢ Flatten Arrays: Use explode() to convert each element of an array into individual
rows.
➢ Flatten Structs: Use dot notation (e.g., struct.field) to extract fields from nested
structures.
➢ Iterate and Combine: Repeat these steps iteratively for all nested levels until the
DataFrame is fully flattened.
Step 1: Read the nested JSON file
Step 2: flatten the nested fields i.e array column order

Step 3: Further flatten the struct field

33. How do you handle null values in a specific column while reading data from a CSV file
in Databricks?
Scenario: You are reading a CSV file where some rows have null values in a critical column.
You want to filter out these rows during the read process.
Answer: You can use a filter operation after reading the data to exclude rows with null
values.
34. How do you replace null values with a default value during a batch write in
Databricks?
Scenario: While writing data to a Parquet file, some columns have null values. You need to
replace these null values with default values before writing.
Answer: You can use the fillna method to replace null values.

35. How do you identify and handle rows with all null values in Databricks?
Scenario: Your dataset contains rows where all columns are null. You want to filter out such
rows.
Answer: You can use the dropna method with how="all" to remove rows where all values
are null.

This approach ensures that only meaningful data is retained in your DataFrame.
36. How do you handle null values dynamically in Databricks if you don't know the
column names in advance?
Scenario: You are working with a dataset where column names are dynamic, and you need
to replace all null values with defaults without hardcoding column names.
Answer: You can iterate over all column names and use the fillna method dynamically.

This approach provides a dynamic way to handle null values based on column data types.
37. How do you convert a string column to a date column while handling invalid date
formats?
Scenario: You have a column with date values stored as strings. Some rows have invalid
formats, and you need to convert valid ones to a date column while handling invalid entries.
Answer: Use the to_date function with a specific date format and filter out invalid dates.

Invalid date formats will result in nulls, which can then be handled appropriately.
38. How do you handle time zone conversions in a timestamp column in Databricks?
Scenario: Your dataset contains timestamps in UTC, but you need to convert them to a
specific time zone (e.g., PST).
Answer: Use the from_utc_timestamp function for time zone conversion.
This ensures that the timestamp is correctly converted to the desired time zone.
39. How do you calculate the difference between two date columns in Databricks?
Scenario: You have two date columns, and you need to calculate the difference in days
between them.
Answer: Use the datediff function to compute the difference in days.

This provides an easy way to calculate the difference between two dates.
40. How do you add or subtract days from a date column in Databricks?
Scenario: You need to add 30 days to a date column for a future projection calculation.
Answer: Use the date_add or date_sub function to modify dates.

41. How do you extract specific components (year, month, day) from a timestamp
column?
Scenario: You have a timestamp column, and you need to extract the year, month, and day
as separate columns for reporting purposes.
Answer: Use the year, month, and dayofmonth functions to extract these components.

42. How do you identify duplicate records in a DataFrame and remove them?
Scenario: You are working with a dataset that contains duplicate records, and you need to
remove the duplicates based on specific columns.
Answer: Use the dropDuplicates method to remove duplicate records based on specific
columns.

This ensures that only unique records based on column1 and column2 are retained.
43. How do you ensure that a column has no null values before proceeding with further
transformations?
Scenario: You are processing a dataset where a critical column must not have null values. If
null values are present, the processing should stop.
Answer: Use a validation step with a count of null values and raise an exception if any are
found.
This prevents processing incomplete or invalid data.
44. How do you validate that all values in a column fall within a specific range?
Scenario: You have a column with numeric data, and all values must lie within the range [0,
100]. Records outside this range should be logged for review.
Answer: Filter out records that fall outside the range and log them separately.

45. How do you check for unique values in a primary key column?
Scenario: You need to ensure that a primary key column has unique values and identify any
duplicates.
Answer: Use a groupBy operation to count occurrences and filter out records with counts
greater than 1.

This helps identify records that violate the uniqueness constraint.


46. How do you handle invalid categorical values in a DataFrame?
Scenario: You have a column with predefined categories, and some records contain invalid
values. You need to remove or replace these invalid values.
Answer: Filter out invalid values using a predefined list of valid categories.
47. How do you check for and enforce schema consistency across multiple files in
Databricks?
Scenario: You are reading multiple JSON files into a single DataFrame, and the files may
have inconsistent schemas. You need to enforce a consistent schema.
Answer: Define a schema explicitly and use it while reading the files

48. How do you enforce data type validation for a column? Scenario: You have a column
that must contain only integers, but some records have invalid data types (e.g., strings).
Answer:
1. Try to cast the column to the desired data type (in this case, IntegerType).
2. Use a custom validation function to check if the value can be cast successfully.
3. Filter out invalid data based on whether the cast operation was successful or not.
49. How do you perform a join to match records only when a key exists in both
DataFrames?
Scenario: You have two DataFrames: one with customer details and another with
transaction details. You need to find only those transactions where the customer exists in
the customer DataFrame.
Answer: Perform an inner join to keep only matching records.

This keeps only records with matching customer_id in both DataFrames.


50. How do you handle situations where records from one DataFrame should always be
retained, even if there’s no match?
Scenario: You need a report of all customers, including those who haven’t made any
transactions.
Answer: Perform a left join to retain all records from the left DataFrame.
This retains all customers, with nulls for missing transactions.
51. How do you join two DataFrames when the column names for the keys differ?
Scenario: You have two DataFrames: orders_df with a column order_id and shipments_df
with a column shipment_order_id. You need to join them on these columns.
Answer: Use the on clause to specify the column mappings.

This matches records where order_id and shipment_order_id are equal.


52. How do you identify unmatched records in both DataFrames after a join?
Scenario: You want to find customers who don’t have transactions and transactions without
matching customers.
Answer: Perform a full outer join and filter rows where the key column is null in either
DataFrame.

53. How do you join DataFrames with complex conditions?


Scenario: You need to join two DataFrames on multiple conditions, such as matching both
customer_id and a date range.
Answer: Use a composite condition in the on clause.

54. How do you optimize a join when one DataFrame is much smaller than the other?
Scenario: You are joining a small DataFrame of country codes with a large DataFrame of
customer addresses.
Answer: Use a broadcast join to improve performance.
Broadcasting the smaller DataFrame ensures efficient execution by sending it to all nodes.

55. How do you handle null values in join keys?


Scenario: Your DataFrames have null values in the join key column, and you need to ensure
nulls don’t cause mismatches.
Answer: Filter out null values before performing the join.
56. How do you join two DataFrames when the key column has case sensitivity issues?
Scenario: You need to join two DataFrames, but the key columns have mismatched case
(e.g., Customer_Id vs. customer_id).
Answer: Normalize the case of the key column in both DataFrames before the join.

57. How do you avoid duplicate columns in the result of a join?


Scenario: You are joining two DataFrames, and both have columns with the same name,
causing duplicates in the result.
Answer: Use the select method to explicitly define the columns in the result.
This avoids duplicate columns and provides clarity in the output.
58. How do you handle skewed data in join operations?
Scenario: You are joining two DataFrames, and one of the join keys is heavily skewed,
causing performance issues.
Answer: Use salting to distribute the skewed key more evenly.
Salting helps in handling skewed data by spreading it across partitions.

59. What is a Left Semi Join, and when would you use it?
Scenario: You have two DataFrames: orders_df and customers_df. You need to identify all
orders where the customer exists in the customers_df but you do not need to retrieve
customer details.
Answer: A Left Semi Join returns only the rows from the left DataFrame where there is a
matching row in the right DataFrame. This is useful when you only care about the existence
of matching records but don't need the columns from the right DataFrame.

In this case, it returns orders that have a matching customer, without including the
customer details in the output.
60. How do you use a Left Anti Join to find records in one DataFrame but not in another?
Scenario: You have a DataFrame of transactions_df and another of customers_df. You need
to identify transactions that don’t have a matching customer.
Answer: A Left Anti Join returns rows from the left DataFrame that do not have a matching
row in the right DataFrame.

This will return transactions that have no corresponding customer in the customers_df.

61. How do you perform a self-join in PySpark?


Scenario: You have a employees_df DataFrame where each employee has a manager_id.
You need to find the names of employees along with their manager names by performing a
self-join.
Answer: A self-join occurs when a DataFrame is joined with itself. You should alias the
DataFrame to avoid column name conflicts.

This returns the list of employees and their respective managers.


62. How do you handle duplicate rows when joining DataFrames?
Scenario: You are joining two DataFrames: orders_df and products_df, and both contain
duplicate keys. You need to ensure there are no duplicate rows in the result.
Answer: To handle duplicate rows, you can use the distinct() function to remove duplicates
after the join.

63. How do you join two DataFrames where one DataFrame has a partitioned column?
Scenario: You have a sales_df partitioned by region_id and a region_details_df with regional
information. You need to join these DataFrames efficiently by leveraging the partitioned
column.
Answer: When dealing with partitioned DataFrames, it's efficient to filter by the partition
key to reduce unnecessary shuffling.

Filtering the partition key before the join reduces the amount of data shuffled across the
network, improving performance.
64. How do you rename columns in a PySpark DataFrame?
Scenario: You have a DataFrame employee_df with columns: emp_id, emp_name, and
salary. You need to rename the emp_id column to employee_id and the emp_name column
to employee_name.
Answer: You can use the withColumnRenamed() method to rename columns.
This will rename the columns emp_id and emp_name to employee_id and employee_name,
respectively.
65. How do you drop one or more columns from a PySpark DataFrame?
Scenario: You have a DataFrame sales_df with columns order_id, customer_id, product_id,
price, and discount. You need to remove the discount and price columns.
Answer: You can use the drop() method to remove columns.

This will remove the price and discount columns from the DataFrame.
66. How do you add a new column based on a condition in PySpark?
Scenario: You have a DataFrame transactions_df with a column amount. You need to add a
new column status based on the value of amount. If amount is greater than 1000, status
should be 'high', otherwise 'low'.
Answer: You can use withColumn() along with when() and otherwise() to create a new
column based on conditions.

This will add the status column with values 'high' or 'low' based on the condition applied to
the amount column.
67. How do you handle null values in a specific column in PySpark?
Scenario: You have a DataFrame user_df with a column age. Some rows in the age column
have null values. You need to replace these null values with a default value of 0.
Answer: You can use the fillna() method to replace null values with a default value.

This will replace all null values in the age column with 0.
68. How do you extract the year from a date column in PySpark?
Scenario: You have a DataFrame orders_df with a column order_date of type Date. You
need to create a new column order_year that contains the year extracted from the
order_date column.
Answer: You can use the year() function to extract the year from a date column.

This will add a new column order_year that contains the year extracted from the order_date
column.
69. How do you create a column with the concatenation of two or more columns in
PySpark?
Scenario: You have a DataFrame customer_df with columns first_name and last_name. You
need to create a new column full_name by concatenating first_name and last_name.
Answer: You can use the concat() function to concatenate columns.
This will add a new column full_name which contains the concatenation of first_name and
last_name.
70. How do you rename multiple columns at once in PySpark?
Scenario: You have a DataFrame employee_df with columns emp_id, emp_name, and
emp_salary. You need to rename all the columns to employee_id, employee_name, and
salary.
Answer: You can use the toDF() method to rename multiple columns at once.

This will rename all the columns in the DataFrame at once.


71. How do you perform arithmetic operations on columns in PySpark?
Scenario: You have a DataFrame product_sales_df with columns product_price and
quantity_sold. You need to calculate the total_sales by multiplying product_price and
quantity_sold.
Answer: You can perform arithmetic operations directly on columns in PySpark.
This will add a new column total_sales with the result of the multiplication between
product_price and quantity_sold.
72. How do you extract the first name and last name from a full name column in PySpark?
Scenario: You have a DataFrame users_df with a column full_name containing values like
'John Doe', 'Jane Smith'. You need to split this full_name into two separate columns:
first_name and last_name.
Answer: You can use the split() function to split the full name by space and extract the first
and last names.

73. How do you check if a string column contains a specific substring in PySpark?
Scenario: You have a DataFrame order_df with a column order_description. Some of the
descriptions contain the word "urgent". You need to create a new column is_urgent that is
True if the order_description contains the word "urgent", and False otherwise.
Answer: You can use the contains() function to check if a substring exists in a string column.

This will add a new column is_urgent with True or False depending on whether the
order_description contains the word "urgent".
74. How do you extract the domain name from an email address in PySpark?
Scenario: You have a DataFrame employee_df with a column email containing email
addresses like [email protected]. You need to extract the domain name (e.g.,
example.com) from the email addresses.
Answer: You can use the split() function to split the email string at the @ character and
extract the domain part.

This will create a new column domain containing the domain name from the email
addresses.
75. How do you remove leading and trailing spaces from a string column in PySpark?
Scenario: You have a DataFrame product_df with a column product_name. Some rows have
extra spaces before or after the product name, and you need to trim these spaces.
Answer: You can use the trim() function to remove leading and trailing spaces from a string
column.

This will remove any leading or trailing spaces from the product_name column.
76. How do you convert a string column to uppercase or lowercase in PySpark?
Scenario: You have a DataFrame customer_df with a column customer_name. You need to
convert the customer_name column to uppercase.
Answer: You can use the upper() function to convert a string column to uppercase.
77. How do you split a string into multiple columns based on a delimiter in PySpark?
Scenario: You have a DataFrame employee_df with a column full_address containing
addresses like '123 Main St, Springfield, IL'. You need to split this address into three
columns: street, city, and state.
Answer: You can use the split() function to split the string based on a delimiter (in this case,
the comma ,) and extract the individual components into separate columns.
This will split the full_address column into three new columns: street, city, and state.

This will split the full_address column into three new columns: street, city, and state.
78. How do you replace specific characters in a string column in PySpark?
Scenario: You have a DataFrame product_df with a column product_code containing
product codes like 'AB-123', 'CD-456', etc. You need to remove the hyphen - from these
codes.
Answer: You can use the regexp_replace() function to replace specific characters in a string
column.
This will remove the hyphen - from the product_code column.
79. How do you handle empty strings in a DataFrame column in PySpark?
Scenario: You have a DataFrame user_df with a column phone_number. Some rows have
empty strings ("") in the phone_number column, and you need to replace those empty
strings with NULL.
Answer: You can use the when() and otherwise() functions to replace empty strings with
NULL.

This will replace any empty string values in the phone_number column with NULL.
80. How do you extract a substring from a string column in PySpark?
Scenario: You have a DataFrame transaction_df with a column transaction_id. You need to
extract the first 5 characters of each transaction_id.
Answer: You can use the substr() function to extract a substring from a string column.
This will create a new column short_transaction_id containing the first 5 characters of the
transaction_id.
81. What is the difference between union() and unionByName() in PySpark?
Scenario: You have two DataFrames, df1 and df2, with the same schema but different
column names. You need to combine the two DataFrames into one. How do you handle this
situation?
Answer: The difference between union() and unionByName() lies in how they handle
column order and names:
• union(): Combines two DataFrames with the same schema (same column names and
order). If the schemas don't match, it will raise an error.

• unionByName(): Combines two DataFrames by matching column names. It can work


even if the column order is different, and it allows for columns that don't exist in one
of the DataFrames (you can set allowMissingColumns=True to handle missing
columns).

If the column names differ and you want to keep all columns, you can also use
allowMissingColumns=True:
82. How do you perform a set difference operation (similar to EXCEPT) in PySpark?
Scenario: You have two DataFrames, df1 and df2, and you want to find the rows that are in
df1 but not in df2.
Answer: You can use the exceptAll() method to perform a set difference between two
DataFrames.

The exceptAll() function returns the rows present in df1 but not in df2. It will include
duplicates if they exist.
83. How do you perform an intersection operation in PySpark?
Scenario: You have two DataFrames, df1 and df2, and you want to find the rows that exist in
both DataFrames.
Answer: You can use the intersect() function to get the common rows between two
DataFrames.
This will return the rows that are common in both df1 and df2.

84. How can you handle duplicate rows when using union() in PySpark?
Scenario: You have two DataFrames, df1 and df2, and both contain duplicate rows. You
need to combine them and eliminate any duplicates in the resulting DataFrame.
Answer: To eliminate duplicates after performing a union() operation, you can use the
distinct() function.
This will combine the rows from both DataFrames and remove any duplicates in the result.
85. How do you calculate the running total of a column using window functions?
Scenario: You have a DataFrame containing sales data for different stores, and you want to
calculate a running total of sales for each store.
Answer: You can use row_number(), sum(), or cumsum() window functions to calculate
running totals.
Note: Unbounded Preceding is helpful when you need to compute values based on the
entire history of data within a partition, including the first row, up to the current row.
Key Points:
1. Unbounded Preceding: The window starts at the first row in the partition.
2. Rows Between: The frame includes all rows from the beginning of the partition to the
current row (Window.unboundedPreceding, Window.currentRow).
3. Use Case: Commonly used in cumulative calculations like running totals, averages, or
historical data tracking within a partition.
86. How do you get the rank of each row within each group based on a specific column in
PySpark?
Scenario: You have a DataFrame with employee performance scores, and you need to rank
employees within each department based on their score.
Answer: You can use rank() or dense_rank() to assign ranks based on a column.

This will rank employees within each department based on their score.
87. How do you calculate the difference between a row value and the previous row's
value using window functions?
Scenario: You have a DataFrame with daily temperature readings, and you want to calculate
the difference between each day's temperature and the previous day's temperature.
Answer: You can use lag() or lead() functions to calculate the difference between the
current row and the previous row.

This will calculate the difference in temperature between the current day and the previous
day.
88. How do you calculate the average salary within each department for employees using
window functions?
Scenario: You have a DataFrame with employee details, including department and salary.
You need to calculate the average salary within each department.
Answer: You can use the avg() window function to calculate the average salary within each
department.
This will calculate the average salary within each department and add it as a new column.
89. How do you calculate the cumulative sum of a column based on a certain order?
Scenario: You have a DataFrame of sales transactions, and you want to calculate the
cumulative sum of sales ordered by transaction date.
Answer: You can use sum() over a window specification to calculate the cumulative sum.

This will calculate the cumulative sum of sales based on the transaction_date.
90. How do you handle ties in ranking using window functions?
Scenario: You have a DataFrame with employee scores, and you want to rank employees
within each department. Some employees have the same score. How do you handle ranking
with ties?
Answer: You can use dense_rank() to assign the same rank to tied rows or use rank() to
leave gaps in the rank
The dense_rank() function will assign the same rank to employees with the same score
without leaving gaps in the ranking.
91. How do you calculate the row number within each group?
Scenario: You have a DataFrame with sales data for multiple regions, and you want to assign
a unique row number to each sale within each region, ordered by sales amount.
Answer: You can use the row_number() window function to assign a unique row number
within each group.

This will assign a unique row number to each sale within each region, ordered by sales in
descending order.
92. How do you calculate the first value and last value in each group using window
functions?
Scenario: You have a DataFrame with timestamps for each product's sale, and you need to
get the first and last sale for each product.
Answer: You can use first() and last() window functions to get the first and last values in
each group.

93. Partitioning Data


Scenario: You have a large dataset of customer transactions, and you are performing a join
operation between two large datasets on the customer_id column. The operation is slow,
and there seems to be an imbalance in partition sizes. How would you address this issue?
Answer: In this scenario, we can repartition the datasets on the customer_id column to
ensure that each partition contains a roughly equal number of records. This can help
balance the data distribution and improve performance.

This way, the join operation will be more efficient as data will be more evenly distributed
across partitions.
94. Data Skew
Scenario: During a join between two large datasets, you notice that some partitions contain
a significantly larger amount of data than others, leading to an imbalance in processing
time. How would you handle data skew in Spark?
Answer: Data skew occurs when certain keys or partitions contain disproportionately large
data. To handle this, we can apply techniques like:
➢ Salting: Add a random prefix to the skewed key to distribute the data more evenly
across partitions.

➢ Broadcast Join: If one of the datasets is much smaller, broadcasting the smaller
dataset can significantly reduce the shuffle cost and help in resolving data skew.

95. Caching
scenario: You are working on an ETL pipeline that involves multiple transformations on a
dataset, and you notice that each transformation is taking a long time to run. What would
be your approach to improve the performance of repeated transformations?
Answer: In this case, caching or persisting the dataset in memory can help avoid
recomputing the transformations multiple times. This is particularly useful when the dataset
is used multiple times across different stages.

By caching the dataset, Spark will store it in memory, so subsequent transformations on the
same dataset will be faster.
96. Persisting Data
scenario: You are processing a large dataset and need to store intermediate results between
stages to avoid recomputing data. What’s the difference between cache and persist, and
how do you decide which one to use?
Answer: The key difference between cache and persist is that cache is a shorthand for
persisting data in memory, while persist gives you more flexibility in choosing the storage
level (e.g., memory and disk, just disk, etc.).
➢ cache(): By default, it stores data in memory, and it’s ideal when you are working with
datasets that can fit in memory and are reused multiple times.

➢ persist(): Provides more flexibility by allowing you to specify a storage level. For
example, if the dataset is too large to fit in memory, you might choose to persist it to
disk as well.

You should use persist when you need more control over storage and want to persist data in
multiple storage levels, while cache is suitable for simpler use cases where data fits entirely
in memory.
97. Handling Multiple Partitions with coalesce()
scenario: You have a DataFrame with a large number of partitions, but you need to write
the output to a file system, and you want to reduce the number of output files. How would
you optimize the number of partitions before writing the data?
Answer: In this case, we can use coalesce() to reduce the number of partitions without
triggering a full shuffle (which would be more expensive). It is especially useful when you
want fewer partitions for output purposes, like writing to files.
coalesce() is preferable when decreasing the number of partitions because it is more
efficient than repartition(), which involves a full shuffle.
98. Partitioning Data Before Writing to Avoid Skew
scenario: You are writing data to a Parquet file, and you notice that the output directory
contains a large number of small files. How would you address this problem?
Answer: In this case, we can repartition the DataFrame before writing it to Parquet. By
setting an appropriate number of partitions, we can avoid generating too many small files.
The number of partitions should align with the desired number of output files.

This way, we can control the number of output files by adjusting the number of partitions.

99. Aggregating Sales Data by Region


Q: You have a dataset with sales transactions, which includes the region, product_id, and
sales_amount. You need to find the total sales per region. How would you approach this
using group by in PySpark?
A: To aggregate sales by region, you can use the groupBy function and then apply an
aggregation function like sum to calculate the total sales per region.

This will give you the total sales amount for each region.
100. Calculating Average Revenue per Product
Q: You have a dataset containing the product_id, sales_amount, and quantity_sold columns.
You want to calculate the average revenue per product. How do you perform this
aggregation?
A: You can use the groupBy function on product_id and then compute the average revenue
for each product using the avg function.

This will give you the average revenue per product.


101. Finding the Top 3 Products by Sales
Q: You need to find the top 3 products based on the total sales amount from a dataset
containing product_id, sales_amount, and quantity_sold. How do you write the query in
PySpark?
A: To find the top 3 products, you can first aggregate the data by product_id using sum and
then use orderBy to sort the results in descending order and limit the output to the top 3.

This will give you the top 3 products by total sales amount.

102. Calculating Total Sales and Average Quantity Sold by Region


Q: You are working with a dataset that contains the columns region, product_id,
sales_amount, and quantity_sold. How would you calculate the total sales and average
quantity sold for each region?
A: You can use the groupBy function to group by region, and then use multiple aggregation
functions (sum for total sales and avg for average quantity sold) on the grouped data.
This will give you the total sales and average quantity sold for each region.

103. Grouping by Multiple Columns


Q: You have a dataset that includes store_id, product_id, sales_date, and sales_amount. You
need to find the total sales per product for each store, but only for sales made in the past
30 days. How would you write this query in PySpark?
A: You can use filter to filter for sales made in the past 30 days, then group by both store_id
and product_id, and apply an aggregation function like sum for the total sales.

This will give you the total sales per product for each store within the past 30 days.
104. Aggregating Data with Multiple Aggregations
Q: You have a dataset with employee_id, department, salary, and bonus columns. You want
to calculate the total salary and average bonus for each department. How do you achieve
this in PySpark?
A: You can use groupBy on the department column and apply multiple aggregations like
sum for salary and avg for bonus.
This will give you the total salary and average bonus per department.
105. Calculating Monthly Sales Growth
Q: You have a dataset with sales_date, sales_amount, and product_id. You want to calculate
the sales growth for each product from one month to the next. How would you approach
this?
A: First, you can extract the month and year from the sales_date column. Then, group by
product_id and month_year, calculate total sales per month, and compute the month-over-
month growth.

This would calculate the sales growth from one month to the next for each product.
106. Calculate the Maximum and Minimum Salary by Department
Q: You have an employee dataset containing columns employee_id, department, salary, and
years_of_experience. You want to find the maximum and minimum salary for each
department. How would you approach this?
A: You can use groupBy to group by the department column and then apply aggregation
functions max and min to get the maximum and minimum salaries.
This will give you the maximum and minimum salary for each department.
107. Calculate the Total Sales and Count of Transactions by Month
Q: You have a sales dataset with transaction_date, sales_amount, and product_id. How
would you calculate the total sales and count of transactions for each product by month?
A: You can use month and year from the transaction_date to extract the month, group by
product_id and the month, and then use sum for total sales and count for the number of
transactions.

This will give you the total sales and count of transactions for each product per month.
108. Average Age per Department for Employees
Q: You have a dataset of employees with columns employee_id, age, and department. You
need to find the average age of employees in each department. How would you write the
query?
A: You can group the dataset by department and apply the avg aggregation function to
calculate the average age for each department.
This will give you the average age of employees for each department.
109. Total Revenue by Product and Region
Q: You have a dataset with columns product_id, region, quantity_sold, and price_per_unit.
How would you calculate the total revenue by product_id and region?
A: You can calculate revenue by multiplying quantity_sold with price_per_unit, then use
groupBy to group by product_id and region, followed by the sum function to get the total
revenue.

This will provide the total revenue by product and region.


110. Grouping Data by Multiple Columns with Complex Aggregations
Q: You have a dataset with columns customer_id, purchase_date, purchase_amount, and
category. You want to find the total purchase amount and average purchase amount for
each customer_id per category. How would you do it?
A: You can group by both customer_id and category and apply sum for total purchase and
avg for average purchase amount.

This will give the total and average purchase amounts for each customer per category.
111. Find Products Sold More Than 1000 Units
Q: You have a sales dataset with product_id, units_sold, and sales_amount. How would you
find products that have sold more than 1000 units in total?
A: You can group by product_id and aggregate sum(units_sold) and then filter products with
total units sold greater than 1000.
This will return the products that have sold more than 1000 units.

112. Calculate Monthly Sales Growth for Each Product


Q: You have a dataset with sales_date, sales_amount, and product_id. You want to calculate
the month-over-month sales growth for each product.
A: First, extract the month and year from sales_date, group by product_id and month_year,
then calculate the monthly growth by comparing the sales for the current month with the
previous month.

This will give you the month-over-month sales growth for each product.
113. How do you manage schema evolution in Delta Tables?
Q: You are working with a Delta table that receives data from multiple sources, and you
need to handle schema changes dynamically (e.g., new columns added). How can you
manage this scenario?
A: Delta tables support schema evolution by automatically adjusting the schema when new
columns are added to the incoming data. To enable this feature, you can use the
mergeSchema option while writing data to the Delta table.

114. How do you update records in a Delta table based on a condition?


Q: You have a Delta table with a column status, and you need to update all records where
status = 'pending' to status = 'processed'. How can you accomplish this?
A: You can use MERGE operations in Delta tables for such updates.

115. How do you perform time travel in a Delta table?


Q: You need to retrieve the state of the Delta table as it was on a specific date or version.
How can you perform time travel?
A: You can query a Delta table at a specific version or timestamp using the
option("versionAsOf") or timestampAsOf.
116. How do you delete data from a Delta table?
Q: You have a Delta table, and you need to delete all records older than a certain date. How
can you delete data from the table?
A: You can use the DELETE command in Delta to remove records based on a condition. Or
can use normal SQL command also if delta table also exists.

117. How do you optimize a Delta table for better performance?


Q: You have a large Delta table, and you need to optimize the performance of queries on
the table. How would you do that?
A: Use the OPTIMIZE command with Delta tables to optimize the layout of data files and
improve query performance.

118. How do you perform CDC (Change Data Capture) in Delta tables?
Q: You need to perform Change Data Capture (CDC) on a Delta table, capturing inserts,
updates, and deletes. How can you implement this?
A: You can use MERGE operations in Delta for CDC by matching records based on primary
keys and applying insert, update, or delete actions
In the provided code, CDC is implemented using the MERGE statement, which performs the
following actions:
1. whenMatchedUpdate: Updates the records in the target table (deltaTable) where a
matching record exists in the source DataFrame (df) based on the id column. In this
case, it updates the value column in the target table with the value from the source.
2. whenNotMatchedInsert: If a record in the source DataFrame (df) doesn't exist in the
target Delta table (deltaTable), it inserts a new record with the id and value from the
source DataFrame.
3. whenNotMatchedDelete: If a record in the target Delta table (deltaTable) doesn't
exist in the source DataFrame (df), it deletes the record from the Delta table based on
the condition specified.
This approach ensures that only the changes (new, updated, or deleted records) are
captured and processed, enabling efficient and incremental data loading.

119. How do you handle missing or corrupt data while reading from a Delta table?
Q: You are reading from a Delta table, and some data records are corrupt or missing. How
would you handle this situation?
A: Use badRecordsPath to log and store any corrupted or missing records for further
investigation.
120. How do you remove old files in a Delta table?
Q: Delta tables store old versions of data files, which may accumulate over time. How do
you remove old files to free up space?
A: You can use the VACUUM command to clean up old files that are no longer in use.

121. How do you track changes and monitor Delta table operations?
Q: You need to track changes and monitor operations (like inserts and updates) on a Delta
table. How can you achieve this?
A: You can use Delta logs to track changes in the table. Additionally, Delta supports
transaction logs which provide a detailed history of actions performed on the table. You can
query the transaction logs for auditing.

122. How do you manage large batch writes to Delta tables efficiently?
Q: You need to write large volumes of data to a Delta table. How can you efficiently manage
large batch writes?
A: Use partitioning and optimize the Delta table for large batch writes. Writing data in
partitions improves performance.

123. Scenario-Based Question on Full Load vs. Incremental Load in Data Processing:
Question:
You are working with a retail business that collects daily sales data in a large file format
(CSV) and stores it in a data lake on Azure. The data includes the following fields: sale_id,
product_id, sale_amount, sale_date, store_id, and customer_id. The business has two
primary requirements for the data pipeline:
1. Load the data into a data warehouse daily (i.e., perform a Full Load).
2. Periodically, only capture Incremental Loads to update the data warehouse with
changes (e.g., new sales, updates to sale details).
How would you implement both Full Load and Incremental Load in Databricks using
PySpark and Delta Lake?
Answer:
1. Full Load:
In a Full Load scenario, the entire dataset is replaced or reloaded into the target system
every time. This is suitable when the dataset is small or when the data source has no
reliable mechanism to identify changes.
Steps for Full Load:
• Read the entire dataset from the source (e.g., CSV or Parquet file).
• Write the data to the target data warehouse or Delta table, effectively overwriting
the previous dataset.

In this case, every time the pipeline runs, the data will be reloaded and replace the previous
data.

2. Incremental Load:
An Incremental Load is used to load only the newly added or modified data since the last
load. This can be done by comparing the sale_date or a unique identifier (such as sale_id) to
detect new or modified records.
Steps for Incremental Load:
• Track changes by identifying the records with a timestamp (e.g., sale_date) or by
using a watermark to capture new or updated data.
• Use Delta Lake to read only the new data or records that have changed (based on the
timestamp or unique identifier).
• Merge the new data with the existing Delta table to ensure only new and updated
records are added.
Key Steps in Incremental Load:
• Reading New Data: You only read the data that has been added or modified since the
last load, typically by using a timestamp or a unique identifier like sale_id.
• Merging with Delta Table: Using the MERGE operation, you update existing records
and insert new records where necessary, ensuring that only changes are processed.
• Efficiency: This approach improves performance by processing only the necessary
data, rather than reloading the entire dataset.
124. Scenario-Based Question on Slowly Changing Dimension (SCD) Type 1 in Data
Processing:
Question:
You are working with a customer data table in a retail business that tracks customer
information, including customer_id, name, email, and address. The data is continuously
updated as new information is available. For instance, if a customer changes their address
or email, the new value should overwrite the old one. This is an example of Slowly
Changing Dimension Type 1 (SCD1).
The table is stored as a Delta table, and you need to design a process to handle this update
scenario in a data pipeline running in Databricks.
How would you implement the SCD Type 1 logic in PySpark using Delta Lake to ensure that
the latest information always overwrites the previous records for the same customer_id?
Answer:
In SCD Type 1, when there is a change in the attribute (e.g., address, email), the old value is
simply overwritten with the new value. This approach does not keep track of historical
values and ensures that only the latest data is stored for each record.
Steps to Implement SCD Type 1:
1. Read the existing data from the Delta table.
2. Read the incoming updated data (e.g., from a CSV or streaming source).
3. Perform a merge operation using Delta Lake's MERGE function to update the existing
records where there is a matching customer_id. If the data is updated, the address or
email fields will be overwritten.
Code Example:
Explanation:
• DeltaTable.forPath: This loads the existing Delta table where customer information is
stored.
• merge(): The merge operation is used to compare the incoming data (df_incoming)
with the existing data in the Delta table (delta_table).
o whenMatchedUpdate: If there is a match on customer_id, the old values of
name, email, and address are overwritten with the new values from
df_incoming.
o whenNotMatchedInsert: If the customer_id does not exist in the Delta table,
the new record is inserted.
• This ensures that for each customer_id, the latest address, email, and name are
stored, and older records are updated with the new information.
SCD Type 1 Characteristics:
• Data Overwriting: The old values are overwritten by the new ones.
• No Historical Data: Only the latest information is stored for each customer_id.
• Simple Logic: SCD Type 1 is simple to implement and does not track historical
changes in the data.
Use Case:
This approach works well in scenarios where the business is interested only in the current
state of data and does not need to track the history of changes. For example, updating a
customer's address or contact details where only the most recent information is relevant for
reporting or analytics.

125. Scenario-Based Question on Slowly Changing Dimension (SCD) Type 2 in Data


Processing:
Question:
You are working with a sales database for an e-commerce company that tracks customer
information, including customer_id, name, email, and address. The database is stored in a
Delta table in Databricks, and the company needs to maintain a history of customer
information changes over time.
When a customer's details (e.g., address, email) change, you need to preserve the old
record, mark it as inactive, and insert a new record with the updated details. This is an
example of Slowly Changing Dimension Type 2 (SCD2), where historical changes are tracked
by adding new records and keeping the old records.
How would you implement the SCD Type 2 logic in PySpark using Delta Lake to track
changes while maintaining the history of customer information?
Answer:
In SCD Type 2, historical changes are tracked by creating a new record for each change,
while marking the old record as expired (or inactive). This allows the system to maintain the
history of all changes.
Steps to Implement SCD Type 2:
1. Read the existing data from the Delta table.
2. Read the incoming updated data (e.g., from a CSV or streaming source).
3. Perform a merge operation to:
o Mark the existing record as inactive (set an end_date or is_active flag) when
the customer details have changed.
o Insert a new record for the updated customer with an active status and a new
start_date.
Code Example:
Explanation:
1. DeltaTable.forPath: Loads the existing Delta table where customer data is stored.
2. merge(): The merge operation is used to compare the incoming data (df_incoming)
with the existing data in the Delta table (delta_table).
o whenMatchedUpdate: If a matching customer_id exists and any of the tracked
fields (e.g., address, email) have changed, the old record is marked as inactive
by setting is_active = false and end_date to the current date.
o whenNotMatchedInsert: If the customer_id does not exist in the table (i.e., it's
a new record), a new record is inserted with is_active = true, and start_date is
set to the current date.
SCD Type 2 Characteristics:
• Historical Data Preservation: Old records are preserved with an inactive status,
allowing you to track historical changes.
• New Records for Changes: For each change in a customer's details, a new record is
inserted with the updated values and an active status.
• Date Tracking: Each record has a start_date and end_date (or an is_active flag) to
track the validity period of the record.
Use Case:
SCD Type 2 is useful in scenarios where it is important to preserve the history of changes.
For example, in a customer relationship management (CRM) system, if a customer changes
their address or email over time, you may want to keep track of all previous addresses and
emails in the system.
This approach ensures that the most recent and all previous versions of customer
information are available for reporting and analytics.

126. Scenario: Optimizing Delta Table Performance


Question:
You have a Delta table that is being updated frequently. The table has grown significantly,
and query performance has started to degrade. What steps would you take to optimize the
Delta table for better performance in Databricks?
Answer:
To optimize the performance of a Delta table, you can take the following steps:
1. Optimize with Z-Ordering: This will improve the performance of queries that filter by
specific columns.

2. Vacuum the Table: Vacuum the Delta table to remove old versions of data and free
up storage.

3. Partition the Data: If not already done, partition the Delta table by a column
frequently used in queries (e.g., device_id or timestamp).

127. Optimizing File Reads and Writes


Question: You have a batch processing pipeline in Databricks that reads and writes large
Parquet files. The job runs slowly due to the number of small file writes. What steps would
you take to optimize the reading and writing process?
Answer: To optimize file reads and writes in batch processing, consider the following
techniques:
1. Increase Partition Size: Ensure that your data is partitioned properly to avoid writing
too many small files. You can repartition the data before writing it.

2. Coalesce for Writes: Use coalesce to reduce the number of output files when writing
data, especially if you have a small number of partitions.

3. Optimize Read Performance: When reading Parquet files, specify the schema
explicitly to avoid schema inference overhead.

128. Caching for Repeated Computations


Question: You are processing a large dataset in a batch job and performing multiple
transformations on the same data. The transformations are slow due to repeated
computations. How can you optimize the performance of these transformations?
Answer: To optimize performance when applying repeated transformations, you can cache
the intermediate results:
1. Use Caching: Cache the DataFrame after the first transformation to avoid
recomputing it multiple times.
2. Persist with Storage Level: If the data is too large to fit in memory, use persist() with a
storage level to store it on disk.

3. Unpersist when Done: Always unpersist the DataFrame when you are done with it to
free up resources.

129. Handling Large Output Data


Question: You are writing the output of a batch processing job to a Delta table. The output
is large, and the job performance is degrading due to the write operation. How can you
optimize the writing process?
Answer: To optimize large output data writes:
1. Optimize with Z-Ordering: Use Z-Ordering on frequently queried columns to optimize
query performance.

2. Use Partitioning: Partition the data when writing to a Delta table to speed up query
performance and reduce file sizes.
3. Use Merge for Incremental Loads: If you're performing incremental loads, use the
MERGE statement to efficiently update only the changed data.

130. Scenario-Based Question: Optimizing the Reading of Multiple Small Files in


Databricks
Question: You have a batch processing job in Databricks where multiple small files are being
read from a cloud storage system (e.g., AWS S3, Azure Blob Storage). The job is taking a long
time due to the overhead of processing numerous small files. How would you optimize this
process for better performance?
Answer: When dealing with multiple small files in Databricks, the overhead of processing
each file individually can significantly degrade performance. Here are several strategies to
optimize reading these files:
1. Coalesce or Repartition Before Reading: The first step is to reduce the number of
partitions before reading the data. If the files are small, coalescing or repartitioning
can help reduce the number of output partitions.

Alternatively, use coalesce if you want to combine small files into fewer partitions after
reading:
This reduces the overhead of many small file reads, optimizing the processing.
2. Increase Parallelism with File Splitting: Spark can automatically parallelize the
reading of files, but for some file formats (like Parquet or ORC), you can control the
spark.sql.files.maxPartitionBytes and spark.sql.files.openCostInBytes to influence how
files are split and read in parallel.

This will help in splitting larger files into smaller, manageable chunks for parallel processing.
3. Use Delta Lake for Managing Small Files: If the small files are being generated over
time or come from multiple streams, you can store them in a Delta table, which
automatically manages file sizes and optimizes reads.

Delta tables handle small file consolidation under the hood when running operations like
OPTIMIZE and VACUUM. Delta tables are optimized for large-scale processing and ensure
that small files are compacted automatically.
4. File Format Optimization: Consider using more efficient file formats like Parquet or
Delta for storage, as these formats are optimized for performance, particularly when
reading large datasets. Additionally, using partitioning and compression within these
formats can further improve performance.

5. Avoid Schema Inference: Schema inference can add unnecessary overhead when
reading multiple small files. Instead, explicitly define the schema to speed up the
reading process.
6. Batching Small Files: If the small files are frequently updated or written in batches,
consider batching them into larger files during the write operation, which can help in
reducing the number of files read during the next job execution.

7. Cluster Configuration Adjustments: Ensure that your cluster has enough executors to
handle the read load effectively. You can also adjust the number of executors and
cores depending on the size of your dataset.

Summary: When handling multiple small files in Databricks, the key to optimization is
reducing the overhead of reading individual small files. Strategies like coalescing or
repartitioning data, using Delta Lake, increasing parallelism through file splitting, and
avoiding schema inference can significantly improve performance. Additionally, optimizing
cluster configurations and using efficient file formats like Parquet can help in further
optimizing the read process.

131. Scenario-Based Question: Reading a 1 TB CSV File in Databricks


Question: You are tasked with reading a 1 TB CSV file in Databricks. How would you
approach this in a way that ensures efficient reading and processing while minimizing
performance overhead?
Answer: Reading a 1 TB CSV file in Databricks requires a few optimizations due to the nature
of CSV files being row-based and often lacking schema information. Below are the steps to
efficiently read and process such a large file in CSV format:

Steps to Optimize the Reading of a Large CSV File:


1. Choose Appropriate File Format (Optional Conversion to Parquet or Delta):
o While reading a CSV file is necessary for the task, it's not the most efficient way
to process large data due to its row-based nature. After processing, consider
converting the CSV data into Parquet or Delta format for better performance in
future operations.

2. Increase Parallelism by Partitioning the Data:


o Repartition the Data: Use the repartition method to increase the number of
partitions. This helps distribute the file reading operation across more
executors, improving performance.

Choosing the Right Partition Count: For large files, the number of partitions should be
chosen to avoid memory overflow or excessive small partitions. Typically, partition sizes
should be between 100MB and 1GB. In this case, 100 partitions for 1 TB of data might be
appropriate depending on the executor memory.
3. Define Schema Explicitly to Avoid Schema Inference Overhead:
o Avoid schema inference: Inferring the schema for a large CSV file can cause
performance overhead. If you know the schema of the file, define it explicitly.
4. Adjust CSV Options for Optimized Reading:
o Multi-Line CSV Files: If the CSV file is multi-line or contains embedded quotes,
you can configure Spark's CSV reader to handle these cases.

5. Increase File Read Throughput (Cluster Settings and Parallelism):


o Increase Parallelism for Large Files: You can increase the file read throughput
by adjusting Spark’s configuration to handle large-scale data better.

6. Leverage Broadcast Variables for Small Lookups:


o Broadcast Small Tables: If you need to join the large CSV file with a smaller
reference dataset, you can use broadcast variables to minimize shuffling.
7. Use cache() or persist() for Repeated Operations:
o Cache for Repeated Access: If you need to perform multiple transformations or
actions on the CSV data, consider caching or persisting the data to memory.

8. Optimize Spark Settings (Shuffle Partitions and Executors):


o Tune Spark Settings: You can adjust the number of shuffle partitions, the
number of executors, and other relevant settings to improve the job's
performance.

9. Monitor and Tune Job Performance:


o Use Databricks’ Cluster Metrics to monitor job progress. Ensure the resources
(executors and cores) are efficiently utilized. Look for stages with high task
times or task failures that may indicate inefficiencies or resource constraints.

Summary:
To read a large 1 TB CSV file efficiently in Databricks:
1. Consider converting the file to a more optimized format like Parquet or Delta.
2. Repartition the file to ensure parallelism and distribute work evenly across the
cluster.
3. Avoid schema inference by defining the schema explicitly.
4. Tune CSV read options based on the file's structure (e.g., multi-line, quotes).
5. Increase parallelism and memory usage through appropriate configurations for
better throughput.
6. Leverage broadcast variables for small reference data joins to avoid unnecessary
shuffling.
7. Cache or persist data if you need to perform multiple transformations.
By following these strategies, you can efficiently read and process a 1 TB CSV file, ensuring
minimal performance overhead and utilizing cluster resources optimally.

132. Scenario-Based Question: Reading a 1 TB Text File in Databricks


Question: You are tasked with reading a 1 TB text file in Databricks. How would you
approach this in a way that ensures efficient reading and processing while minimizing
performance overhead?
Answer: Reading a 1 TB text file in Databricks requires careful consideration of file format,
partitioning, parallelism, and cluster resources to optimize performance. Below are the
steps to efficiently read and process a large text file like this:

Steps to Optimize the Reading of a Large Text File:


1. Choose the Right File Format (Optional Conversion to Parquet or Delta):
o Text File: While reading a 1 TB text file directly is possible, it's not the most
efficient way to process such a large file. Consider converting the text file to a
more optimized file format like Parquet or Delta for better performance in the
future.
o Parquet or Delta: These formats are columnar, and reading them will generally
be faster, especially when dealing with large datasets. If possible, convert the
text file to Parquet or Delta format before processing.

2. Parallelize the Reading of the File Using Partitioning:


o Repartition the Data: Use repartition or coalesce to increase the number of
partitions and distribute the workload across the cluster more efficiently.

o Choosing the Right Partition Count: The ideal number of partitions depends on
the file size, the number of available executors, and the cluster’s capacity.
Typically, aim for a partition size between 100MB and 1GB to prevent small files
from causing overhead.
3. Increase Parallelism and Optimize Spark Settings:
o Increase Number of Executors and Cores: Ensure your cluster is appropriately
sized for the task. Increase the number of cores and executors for parallel
processing.
o Adjust Shuffle Partitions: Tuning shuffle partitions (spark.sql.shuffle.partitions)
helps to optimize the performance of operations like joins or aggregations that
occur after reading the data.

o Configure File Partitioning: If you are reading a large file from distributed
storage like AWS S3 or Azure Blob Storage, ensure that
spark.hadoop.fs.s3a.maxConnections and similar configurations are optimized
for high-throughput file systems.
4. Avoid Schema Inference (Explicit Schema Definition):
o Define the Schema: If possible, avoid inferring the schema from the text file as
this adds overhead. Define the schema explicitly before reading the data to
optimize the reading process.
5. Optimize File Reading with wholeTextFiles (for Smaller Files or Files in Multiple
Parts):
o If the file is split into smaller parts (e.g., multiple .txt files in a directory), you
can use wholeTextFiles to load all the files into a single RDD, which may help
reduce overhead if the data is not too complex.

o However, for a single large file (1 TB), this method is generally not
recommended, as it will still lead to inefficient reading.
6. Consider Data Skew and Caching (If Necessary):
o Caching: If you're performing iterative operations on the data, consider using
cache() or persist() to store intermediate results in memory to speed up
subsequent queries.

7. Monitor Job Progress and Resource Utilization:


o Use Databricks' Cluster Metrics to monitor the progress of the job. Ensure that
there are no bottlenecks in the reading stage (e.g., too few executors or under-
utilized resources).
8. Consider Using Delta Tables for Future Incremental Loads:
o After processing the data, you may want to write the processed data to a Delta
table, which will provide better management for future incremental loads. This
allows for version control, time travel, and optimized querying.
Summary:
To read a large 1 TB text file efficiently in Databricks:
1. Convert to a more efficient format like Parquet or Delta (if possible).
2. Increase parallelism by repartitioning the data.
3. Adjust Spark settings for optimal file reading performance (e.g., shuffle partitions,
executor count).
4. Avoid schema inference by defining the schema explicitly.
5. Monitor job progress to ensure efficient resource utilization

133. Scenario: Joining Two Large DataFrames That Cannot Be Broadcasted


Question:
You have two large DataFrames in Databricks that cannot fit into memory for broadcasting.
How will you efficiently join these DataFrames to avoid performance bottlenecks?
Answer:
To efficiently join these DataFrames:
1. Repartition Both DataFrames on the Join Key:
Repartition both DataFrames on the column used for the join (e.g., customer_id) to
colocate data with the same key on the same partition.

2. Select Only Necessary Columns Before the Join:


Instead of joining the entire DataFrames, reduce their size by selecting only the
columns required for the join and further processing.
3. Filter Unnecessary Rows Before the Join:
Apply filters to reduce the data size before the join, if applicable.

4. Use Bucketing for Pre-Optimized Joins:


If the join is frequent and predictable, use bucketing during DataFrame writing to
group data by the join key. This avoids shuffling during runtime.

5. Avoid Skewed Data: If one DataFrame has skewed data, you can add a "salting"
technique to distribute keys more evenly.
6. Adjust Spark Configurations: For large-scale joins, ensure the Spark session is tuned with
adequate resources.

6. Cache Intermediate Results: If the joined DataFrame will be reused multiple times,
cache or persist it to avoid re-computation.

134. How Bucketing Helps in Joins


Bucketing is a technique that pre-sorts and groups data into fixed buckets based on the
values of a specific column (e.g., the join key). It helps in optimizing joins, especially with
large datasets, by minimizing shuffle and reducing computational overhead.

Key Benefits of Bucketing:


1. Avoids Shuffle During Joins:
o When two bucketed tables are joined on the same column and have the same
number of buckets, Spark knows that matching records will reside in the same
bucket across the tables. This eliminates the need for expensive shuffling
during the join.
2. Improved Query Performance:
o Since bucketed tables are pre-partitioned and sorted, queries like joins or
aggregations can operate faster as Spark doesn’t need to repartition or shuffle
data dynamically.
3. Better Resource Utilization:
o By avoiding large shuffles, you reduce memory and disk I/O usage, making the
job more resource-efficient.
Example Scenario:
You frequently join two large DataFrames, orders and customers, on the customer_id
column.
Without Bucketing (Dynamic Shuffle):
Every time you join the two DataFrames, Spark repartitions both DataFrames on
customer_id, which involves:
• Shuffling data across nodes.
• Sorting data dynamically during the join.
• Significant overhead for large datasets.
With Bucketing:
You bucket both DataFrames by customer_id at write time:

Now, when reading and joining:

• Spark skips the shuffle step because it knows the data is already distributed and
sorted by customer_id.
• The join is faster and more efficient.

When to Use Bucketing:


• Joins or aggregations are performed frequently on the same key.
• The datasets are static or infrequently updated, as bucketing needs to be done during
the data writing process.
• The number of buckets chosen matches the parallelism of the cluster for optimal
performance.
By pre-organizing the data, bucketing drastically improves the performance of joins and
other key-based operations.
Basic to Advanced Scenario-Based Questions on API Reading in Databricks

Basic Scenarios
135.Scenario 1: Reading Simple API Data
Question:
How do you read data from a REST API that provides a single JSON response?
Answer:
Use Python’s requests library to fetch the API response and convert it into a Spark
DataFrame.
Code Example:

136.Scenario 2: Reading API Data with Authentication


Question:
How do you handle an API that requires a Bearer token for authentication?
Answer:
Pass the token in the Authorization header.
Code Example:
Intermediate Scenarios
137.Scenario 3: Handling Paginated API Responses
Question:
How would you handle paginated API data where each page contains 1000 records?
Answer:
Use a loop to fetch each page and merge the results into a single Spark DataFrame.
Code Example:

138.Scenario 4: Handling API Rate Limits


Question:
How do you handle APIs that enforce rate limits (e.g., 100 requests per minute)?
Answer:
Use the time.sleep function to introduce a delay between API calls.
Code Example:
Advanced Scenarios
139.Scenario 5: Real-Time API Integration
Question:
How would you integrate a real-time API stream into Databricks for continuous data
ingestion?
Answer:
Use Structured Streaming to write incoming API data to a Delta table.
Code Example:
140.Scenario 6: Combining Data from Multiple APIs
Question:
How would you combine data from multiple APIs in Databricks for batch processing?
Answer:
Fetch data from each API, convert it to Spark DataFrames, and perform a union.
Code Example:

141.Scenario 7: Writing API Data to Delta Table


Question:
How do you write API data to a Delta table with an upsert (merge) operation?
Answer:
Use the Delta Lake merge functionality to update existing records and insert new ones.
Code Example:

142.Scenario 8: Managing Large API Responses


Question:
What would you do if the API returns a large dataset that cannot fit into memory?
Answer:
Process the API response in chunks and write each chunk directly to storage or a Delta
table.
Code Example:

143. Creating Dynamic Columns with Variable Size of Columns in PySpark DataFrame
Question:
How can you create dynamic columns in a PySpark DataFrame when the number of columns
varies across rows?
Answer:
To handle dynamic column creation in a PySpark DataFrame with a variable number of
columns, follow these steps:
1. Read the text data as a single column.
2. Split each row based on a delimiter and create a column for each value in the row.
3. Adjust the column count dynamically by creating new columns only when necessary.
Code Example:
144. Scenario: How to add partitionId in a DataFrame in PySpark?
Answer:
You can add the partitionId to a DataFrame in PySpark by using the spark_partition_id()
function. This function returns the partition ID of each row in the DataFrame. You can use
this function with withColumn to create a new column containing the partition IDs.
Here’s an example of how to do this:

Why are you getting different partition IDs?


• Partitioning: When a DataFrame is loaded or transformed in Spark, it is divided into
multiple partitions. Each partition is assigned an ID. The values you're seeing as
partition IDs (e.g., 2, 5, 7) correspond to the partition IDs that Spark assigns based on
its internal distribution of data.
• Data Distribution: The actual partition IDs depend on how Spark distributes the data
across its available executors and cores during the execution of your job.
If you're seeing partition IDs like 2, 5, and 7, it means that your DataFrame has been
distributed into at least 8 partitions (partition IDs range from 0 to N-1, where N is the total
number of partitions).
Explanation of Output:
Let's break it down:
• partitionId column: This column indicates which partition each row is located in.
o Row Alice (partitionId = 2) is in partition 2.
o Row Bob (partitionId = 5) is in partition 5.
o Row Charlie (partitionId = 7) is in partition 7.
This could happen due to Spark’s internal partitioning mechanism when performing
transformations. If you want more control over the partitioning, you can use operations like
repartition or coalesce to specify the number of partitions, which would influence the
partition IDs.
Code Example with Repartitioning:
You can control the partitioning by specifying the number of partitions explicitly before
applying the spark_partition_id() function.

Output after Repartitioning:


If you repartition the DataFrame into 3 partitions, the partition IDs will be limited to 0, 1,
and 2:
145. Data Skew and Partitioning Optimization
Question:
You have a large sales dataset, and you are performing a join operation with another
dataset containing customer information. However, the join operation is taking longer than
expected, and you notice that the partition IDs are highly skewed, with one partition
holding most of the data. How would you optimize this operation to avoid skew and
improve the performance of the join?
Answer:
In this case, the key problem is data skew, where one partition holds more data than others,
leading to performance bottlenecks. To optimize this:
1. Repartition Data: Use the repartition() function to ensure an even distribution of data
across partitions.
2. Salting: If the data skew is caused by certain keys (e.g., certain customer IDs), apply a
salting technique to randomly distribute data across partitions to prevent hot spots.
3. Broadcasting Smaller Dataset: If one of the datasets is much smaller than the other,
consider broadcasting the smaller dataset to avoid shuffling it during the join.
Here is how you can approach the optimization:
Impact on Performance:
• Repartitioning: The repartitioning ensures a more even distribution of data, thus
avoiding skewed partitions and reducing the likelihood of any one partition becoming
a bottleneck.
• Broadcasting: Broadcasting the smaller DataFrame reduces shuffling of the smaller
dataset and makes the join operation faster by sending the smaller DataFrame to all
executors.
• Optimized Join: By ensuring even partitioning and reducing shuffle through
broadcasting, the join operation becomes more efficient.
146. Generating Surrogate Key Using row_number() (Sequential Key)
Scenario:
You have a dataset of customer orders, and each order has a CustomerID, OrderID, and
OrderDate. You need to generate a surrogate key for each order based on the order date
(i.e., assigning sequential surrogate keys starting from 1, ordered by OrderDate).
Answer:
You can use the row_number() function along with a window specification to generate
sequential surrogate keys.
147. Generating Surrogate Key Using monotonically_increasing_id()
Scenario:
You have a dataset of employee records, and you need to generate a unique surrogate key
for each employee without any specific order (i.e., just a unique key per row).
Answer:
The monotonically_increasing_id() function generates a unique 64-bit integer for each row
in the DataFrame, which can be used as a surrogate key.
148. Generating Surrogate Key Using Hashing (MD5, SHA-2)
Scenario:
You need to create a surrogate key for a customer dataset based on CustomerID, FirstName,
and LastName. The key should be generated by hashing the combination of CustomerID and
Name fields to ensure consistency and uniqueness.
Answer:
You can use hash functions such as md5() or sha2() to create a surrogate key based on the
concatenation of specific fields.

149. Generating Surrogate Key Using UUID


Scenario:
You need to generate a unique surrogate key for each record in your dataset, but the
surrogate key should be globally unique across all records (i.e., it should be generated
randomly).
Answer:
You can use the uuid() function to generate a globally unique identifier for each row, which
can act as a surrogate key.

150. Generating Surrogate Key with Partitioning


Scenario:
You have a large dataset of products with their details. The dataset is partitioned by the
Category column. You need to generate a unique surrogate key for each product, and the
surrogate key should be partitioned by Category.
Answer:
You can use row_number() in combination with Window.partitionBy() to create a
partitioned surrogate key.
151. Scenario:
You are working with a dataset in Databricks that contains customer data, and you want to
find the count of null values for each column. How can you accomplish this?
Answer:
You can use the isNull() function to check for nulls and then use agg() to calculate the count
of null values for each column. Here's how to do it:
Explanation:
1. col(c).isNull(): This checks whether each value in the column c is null. It returns a
True/False value for each row.
2. cast("int"): Converts the boolean result (True/False) to integers (1/0).
3. sum(): Aggregates the values across the entire column, summing the 1s which
represents the count of null values.
4. alias(c + "_null_count"): Renames the result of each sum operation to reflect the
column name and add _null_count.
152. Scenario: Flatten the below complex JSON
153. Scenario: In the marks column, the values are stored as pipe (|) separated strings.
How can you split the marks column into individual columns, so that each mark has its
own column (e.g., marks_1, marks_2, marks_3, etc.) while retaining the id, name, and loc
for each row?
154. Question:
You have a DataFrame with multiple records for the same entity (e.g., customer or
product), and there is an updated_date column that tracks when the record was last
updated. Your task is to remove duplicates from the DataFrame, keeping only the most
recent record (based on updated_date) for each entity. How would you achieve this?
Answer:
To remove duplicate rows based on the updated_date column, we can use the Window
function to partition the data by the entity identifier (e.g., id) and order by updated_date in
descending order. Then, we can filter to keep only the latest record for each entity.
155. Question: You are working with a large dataset of employee information, which
includes fields such as id, name, and job. You need to analyze the dataset and find out how
many times each job appears within the dataset. You are required to return the result in the
form of a dictionary, where the key is the job title and the value is the number of
occurrences of that job in the dataset.

Requirements:
1. Group the dataset by the job column.
2. Count the occurrences of each job.
3. Return the result in the form of a dictionary where the key is the job and the value is
the count of occurrences for each job.
Answer:
To solve this, we can use PySpark to group the data by the job column and then count the
occurrences. Finally, we will collect the results and convert them into a dictionary.
Explanation:
• The result is a dictionary where the key is the job title (e.g., "Engineer") and the value
is the count of occurrences (e.g., 3).
• Engineer appears 3 times in the dataset.
• Manager appears 2 times.
• Analyst appears 2 times.

156. Question: You have a customer dataset and an order dataset. You are required to
identify all the customers who have not placed any orders. These customers may be present
in the customer table but not in the order table. You need to return a list of customers who
do not have any corresponding order records.
Datasets:
1. Customer Table (customers):

2. Order Table (orders):


Requirements:
1. Find all the customers who do not have any orders.
2. Return the customer name and customer ID for those who have no order records in
the orders table.
Answer:
To solve this, we will use a left anti join, which allows us to return all the records from the
customers table where there is no matching record in the orders table based on the
customer_id column.
Here’s the solution using PySpark:

Explanation:
• Left Anti Join: The left_anti join returns all records from the left DataFrame
(customers_df) where there is no matching record in the right DataFrame (orders_df)
based on the customer_id.
• In the given example, customers David and Eve do not have any corresponding
records in the orders table, so they are returned in the result.
157. Question: You have a sales dataset with the product sales data for multiple years. Your
task is to find the products whose sales have increased every year. Additionally, you need to
identify the specific years when the sales increased.
Datasets:
1. Sales Table (sales):

Requirements:
1. Identify products where total sales have increased year by year.
2. Also Give the year in which it increased?
Answer:
To solve this, we need to:
1. Group the data by product_id and sort it by year.
2. Compare the total sales of each product across years and check if they have
consistently increased.
3. Filter the products where sales have consistently increased.
Solution 1: find products which increased year by year
Solution 2: Year which it is increased
Note: above one is bit complex try doing with below solution (2nd way but different solution
we are doing here decrease in sale you can try for increase in sales)
158. Scenario: You have a sales dataset containing the total sales for different products over
multiple years. Your task is to find products where the sales decreased in a specific year
compared to the previous year. Specifically, you need to identify the years when each
product experienced a decrease in sales and output those products along with the years
they experienced the decrease.
Solution:
We will use the lag() function in PySpark to compute the previous year's sales for each
product. By taking the difference between the current year's sales and the previous year's
sales, we can identify the years when the sales decreased. The years with a negative
difference will indicate a decrease in sales.
159. You are provided with a dataset of movies containing details such as ID, name,
description, and rating. Your task is to:
1. Add more movie data to the dataset.
2. Identify movies with odd IDs that do not have "boring" in their description.
3. Return the result sorted by rating in descending order.
160.Scenario: Scenario-Based Question
You have a dataset with details about people buying fruits, the types of fruits, and their
weights. Your task is to generate a report showing:
1. The total weight of fruits brought by each person.
2. Their fruit names and corresponding weights are stored as lists.
Solution 2: Fruits and Weights as Lists

161. Scenario: You are given an employee dataset with details of their names and job titles.
Your task is to find the total headcount of employees for each job title. Give output in form
of dict.
Explanation
1. headcount_df.collect():
o Collects the grouped DataFrame as a list of rows.
2. Dictionary Comprehension:
o Loops through each row to create a dictionary with job as the key and
headcount as the value.

162. Scenario: You are provided with employee login data containing the employee ID
(emp_id), the date of login (log_date), and a flag (flag) indicating whether the employee
logged in on that day ('Y' for yes, 'N' for no).
The task is to identify employees who logged in on at least two consecutive days and
return their IDs along with the dates of consecutive logins.
Required output
Explanation
1. Lag Function:
o lag("flag"): Retrieves the flag from the previous day for the same employee.
o lag("log_date"): Retrieves the previous date for the same employee.
2. Filter Condition:
o Check if the current day's flag and the previous day's flag are both 'Y'.
3. Result:
o Extracted emp_id, log_date, and prev_log_date for consecutive login days.
163. Scenario: You are provided with employee salary data that includes the department
(dept_id), employee name (emp_name), and salary (salary).
The task is to find the highest and lowest salaried employee for each department and return
the results.
If asked to show as one output:

164. Scenario: You are given a dataset with product sales data for three consecutive
months: September, October, and November. The data is structured as follows:

Your task is to transform this dataset so that each month's sales are represented as a
separate row with an additional column indicating the month. The output should look like
this:
Answer:

In the stack function, the column names are listed twice because it requires both the labels
(names for the new rows) and the corresponding values. Here's the breakdown:I
Syntax of stack function
stack(N, label1, value1, label2, value2, ..., labelN, valueN)
• N: Number of columns to transform into rows.
• label: A string that represents the new row name (e.g., 'Sep_Sale', 'Oct_Sale',
'Nov_Sale').
• value: The actual column containing the corresponding values.
• 'Sep_Sale': Label for the new row indicating the month.
• Sep_Sale: Column name containing the sales value for September.
Why It's Done Twice
• The first occurrence ('Sep_Sale') becomes the value for the new Month column.
• The second occurrence (Sep_Sale) provides the data for the new Sales column.
• Dynamic Updates: If the number of months increases, update the stack function with
the new columns.
2nd approach using unpivot:

165. Finding Duplicate Emails in a Customer Database


You are working as a data engineer for an e-commerce company. The company is
maintaining a customer database with each customer's email address. Due to multiple
signups, some customers may have entered the same email address more than once. Your
task is to identify all the duplicate email addresses from the customer records, so that they
can be cleaned up in the database.
You are given the following dataset with customer IDs and their email addresses.
166. Find Customers Who Bought All Products
You are working as a data engineer for an online retail company. The company wants to
identify customers who have bought every product in their inventory. The database contains
information about customer purchases, and you need to create a list of customers who have
bought all the products available in the catalog.
Given the following datasets:
• Customers Purchases: Contains customer IDs and the products they have purchased.
• Products: Contains a list of all available products.
Your task is to write a query that identifies customers who have purchased every single
product listed in the products table.
167. Validate Data Between Source and Target Tables
You are tasked with validating the data between a source table and a target table. The goal
is to ensure that the data in the source and target tables match, including checking for any
missing or mismatched records. This validation process will help verify the integrity of data
migration or data replication processes.
You have the following tables:
• Source Table: Contains the original data.
• Target Table: Contains the migrated or replicated data but with some discrepancies
(missing records, extra records, and mismatched data).

Task:
1. Missing Records: Find any records in the source table that are missing in the target
table.
2. Mismatched Records: Identify records where the data in the source and target tables
don't match (e.g., salary mismatch).
3. Extra Records: Find records in the target table that do not exist in the source table.

Explanation of the Code:


1. Identify Missing Records: We use a left_anti join to identify records from the source
table that do not exist in the target table. These are the missing records in the target.
2. Identify Mismatched Records: An inner join is performed between the source and
target tables to find records that exist in both tables. We then filter on columns to
check if the values differ (e.g., the salary field).
3. Identify Extra Records: We again use a left_anti join, but this time on the target table
to find records that are present in the target but not in the source.
168. Scenario:
• A user is considered "active" if they made a purchase within 7 days of their previous
purchase. If they don't make a purchase within the next 7 days of today's purchase,
they are not active.
• You will need to look at the purchase date and user id, and calculate the difference
between consecutive purchases to determine whether the user is active.
169. You are working with a dataset containing product prices from different stores. The
goal is to determine whether Walmart is offering a competitive price, a good deal, or not
based on the prices of the same product in other stores. A good deal is defined as Walmart
offering the lowest price compared to other stores for the same product.
Explanation of Results:
• Iphone: Walmart offers the price of 90, which is lower than both BestBuy's (100) and
Amazon's (95), so it is marked as "Yes" for being competitive.
• Samsung: Walmart's price of 85 is higher than Amazon's price of 80, so it is marked as
"No" for being competitive.
170. Scenario:
We are working with CSV files that contain data related to employee information. The
predefined schema is based on the expected structure of the file. Our goal is to:
1. Check the schema of the CSV file dynamically.
2. If the schema matches the predefined schema, write directly to the Delta table.
3. If the schema does not match, read the file using the defined schema and then write
the data to the Delta table.
Steps:
1. Load the CSV file using Spark and inspect its schema.
2. Compare the schema of the loaded CSV file with the predefined schema.
3. Write to Delta table if the schemas match; otherwise, use the predefined schema for
reading and writing the data.
171. Can you explain how a Structured Streaming query works under the hood?
Answer:
Structured Streaming queries continuously read from a source, apply transformations, and
produce results. The query processes data in micro-batches (unless continuous processing is
used), and each micro-batch processes a portion of the incoming data as it arrives. Spark
uses checkpointing and write-ahead logs for fault tolerance, and state management is used
for operations like aggregations and windowing.
Explanation:
1. Source (Read):
o Reads CSV files from /path/to/input_folder as they arrive.
o Assumes CSV files have a header.
2. Transform:
o Converts the name column to uppercase using selectExpr.
3. Sink (Write):
o Writes the transformed data to /path/to/output_folder in CSV format.
o Uses a checkpointLocation to ensure fault tolerance and query recovery.
172. Compare Batch vs Streaming Code
173. How do you read data from a Kafka topic in Structured Streaming and output it to a
console?
Answer: To read data from a Kafka topic, you can use the readStream method with the
format("kafka") option. This allows you to access the topic, and then you can output the
processed stream to the console.

174. How do you write the output of a Structured Streaming query to a file (e.g., CSV
Parquet)?
Answer: You can write the result of a structured streaming query to a file in CSV, Parquet, or
other supported formats using the writeStream method.

This writes the data to Parquet files as new data arrives in the streaming DataFrame.
175. How would you read data from a file (e.g., CSV) in Structured Streaming and store it
in Delta format?
Answer: You can use readStream to read data from a file source, then write it to Delta
format using writeStream with the format delta.

176. How can you set up a structured streaming job to read data from an S3 bucket and
write it to a Delta table?
Answer: You can configure the readStream to read from an S3 bucket and output it to a
Delta table by using the S3 path as the source.
This job will read data continuously from the S3 bucket and write the processed data to a
Delta table.

177.How can you handle schema evolution while reading and writing data in Structured
Streaming?
Answer: When reading data from a source with evolving schemas (e.g., a Kafka topic or a
JSON file), you can enable schema inference or define a schema explicitly to handle schema
evolution. In the case of writing, Delta format inherently supports schema evolution.
Writing Data to Delta with Schema Evolution
To enable schema evolution when writing to Delta format, use the mergeSchema option.
When a new column is added to the input data, Delta will automatically update the schema
to reflect the change.
178. How can you write streaming data to a relational database (e.g., Azure SQL
Database)?
Answer: To write streaming data to a relational database, you can use the writeStream API
with the JDBC connector to the target database. For example, writing data to Azure SQL
Database.

This will append data to the specified SQL database table.


179. How can you write the results of a streaming query to a Kafka topic?
Answer: To write the results of a streaming query back to Kafka, you can use writeStream
with the format("kafka") option and define the Kafka producer options.
This will write the output of the streaming query back to a Kafka topic for further
consumption.
180. How would you read from a socket source in Structured Streaming and perform
some basic transformations?
Answer: To read data from a socket, you can use the readStream method with the
format("socket") option. You can then apply basic transformations to the incoming data.

This example reads text data from a socket, splits it into words, and outputs it to the
console.
181: How can you handle schema inference when reading from a dynamic JSON stream in
Structured Streaming?
Answer: You can enable schema inference when reading from a dynamic JSON source by
using .option("inferSchema", "true"). However, it’s often better to define a schema explicitly
to ensure consistency.
This ensures that each JSON record is read according to the specified schema, avoiding
potential issues with dynamic or changing JSON structures.
182. How can you check the status of a streaming query in Structured Streaming?
Answer: You can use the status, lastProgress, and isActive methods to check the status of a
streaming query:
1. status: Returns the current status of the query.
2. lastProgress: Returns the progress details, such as the number of records processed
and the time taken for each micro-batch.
3. isActive: Returns a boolean value indicating whether the query is still active.
These methods help in monitoring and debugging the query in real-time.
183. What is the purpose of the awaitTermination() method in Structured Streaming?
Answer: The awaitTermination() method is used to block the execution of the current
thread and wait for the completion or termination of a streaming query in Structured
Streaming. It ensures that the query continues to run until it is either manually stopped or
finishes processing.

• Without Timeout: It blocks indefinitely, waiting for the query to terminate (either
through a manual stop or a failure).
• With Timeout: You can also specify a timeout in milliseconds. If the query doesn't
terminate within the given period, it will return False.
Use Case: This method is helpful when running a streaming query in a standalone script or
Databricks notebook, where you want to ensure that the query keeps running until
manually stopped or completed.
183. How does watermarking work in Structured Streaming in Databricks?
Answer: Watermarking works by specifying a maximum allowed delay in event time for late
data. When you define a watermark, Spark uses it to decide when to allow late data in
streaming queries and when to process the results based on the event time.
This ensures that only data that is within 5 minutes of the latest event time is included, and
any data beyond this is considered late and ignored for aggregation.

184. Scenario: Load data incrementally into a Delta table


Question:
You receive new CSV files in an Azure Data Lake Storage (ADLS) Gen2 container daily. The
file names follow the pattern data_YYYYMMDD.csv. How can you incrementally load these
files into a Delta table using the COPY INTO command?
Answer:
You can use the COPY INTO command with a wildcard pattern in the file path to match only
the new files. Here's an example:

Key points:
• Replace data_20250102.csv with the specific file or use a dynamic pattern like
data_*.csv to load multiple files.
• Ensure that the Delta table schema matches the incoming file schema.
185. Scenario: Handling schema evolution
Question:
A new column is added to the incoming CSV files, but your Delta table does not yet include
this column. How do you handle this schema evolution using COPY INTO?
Answer:
You can enable schema evolution by adding the mergeSchema option to the Delta table.
Here’s an example:

Key points:
o Enable schema evolution before running the COPY INTO command.
o Ensure the new column's data types are compatible with Delta.
186. Scenario: Data deduplication during the load
Question:
The incoming files may have duplicate rows. How can you ensure only unique rows are
loaded into the Delta table using COPY INTO?
Answer:
You can use a staging table to load data first, remove duplicates, and then upsert into the
main Delta table:
• Key points:
o Use a staging Delta table for temporary storage.
o Perform a MERGE operation with deduplication logic.
187.Scenario: Error handling during file ingestion
Question:
Some files have missing columns or incorrect data types. How do you skip problematic files
while using COPY INTO?
Answer:
You can enable error handling by setting the badRecordsPath option in the COPY INTO
command:

• Key points:
o Problematic files or records are moved to the specified badRecordsPath.
o This allows you to inspect and fix errors without failing the entire load.
188. Scenario: Partitioning the Delta table
Question:
How can you optimize data insertion by partitioning the Delta table while using COPY INTO?
Answer:
Partition the Delta table by a specific column, such as region or date, to improve query
performance:

• Key points:
o Ensure the Delta table is created with the PARTITIONED BY clause.
o Partitioning helps with efficient data querying and storage.
189. Scenario: Ingesting files from a cloud storage location
Question:
You want to continuously ingest new JSON files from an Azure Data Lake Storage (ADLS)
Gen2 container into a Delta table. How can you set up Auto Loader to handle this?
Answer:
You can use the Auto Loader with a cloudFiles source to automatically process new files.
• Key points:
o Auto Loader automatically detects new files in the input path.
o The checkpoint location ensures fault tolerance and tracks progress.
o Schema inference is supported, but it’s recommended to define the schema
explicitly for performance.
190.Scenario: Schema evolution
Question:
A new column is added to the incoming JSON files. How do you ensure that the new schema
is handled seamlessly?
Answer:
Enable schema evolution by setting the cloudFiles.schemaLocation option and
mergeSchema option in the Delta table.
• Key points:
o Auto Loader tracks schema changes in the cloudFiles.schemaLocation.
o The mergeSchema option updates the Delta table schema automatically.
191.Scenario: Handling duplicate files
Question:
Your cloud storage might occasionally receive duplicate files. How can you ensure that Auto
Loader processes each file only once?
Answer:
Auto Loader tracks file processing using file metadata, preventing duplicate ingestion.

• Key points:
o The cloudFiles.includeExistingFiles option ensures only new files are processed.
o Use checkpointing to track and avoid reprocessing.
192. Scenario: Triggering batch intervals
Question:
You want to process files in micro-batches instead of continuously streaming. How do you
configure Auto Loader for this?
Answer:
Use the trigger option in the writeStream method.
• Key points:
o Files are processed every 10 minutes with the trigger setting.
o Use this for scenarios where latency can be tolerated to optimize resource
usage.
193.Scenario: Data deduplication
Question:
How can you ensure duplicate rows are not inserted into the Delta table during ingestion?
Answer:
Use the dropDuplicates function with a unique identifier column.

• Key points:
o Deduplication ensures no duplicate records are written to the Delta table.
o Choose appropriate columns for identifying unique rows.
194.Scenario: Error handling
Question:
Some incoming files contain corrupt or invalid records. How can you skip such records and
still process valid ones?
Answer:
Use the badRecordsPath option in Auto Loader.

Key points:
o Invalid records are moved to the specified badRecordsPath for later inspection.
o This ensures valid data is ingested without interruptions.
195. Scenario: Schema Evolution
Question:
New files ingested into your data pipeline contain additional columns not present in your
Delta table schema. How can you ensure these new columns are automatically added
during ingestion?
Answer:
Use the merge schema mode to allow schema evolution. Configure the
cloudFiles.schemaEvolutionMode to addNewColumns.

• Key points:
o addNewColumns automatically adds new columns while retaining existing data.
o Use cloudFiles.schemaLocation to store and manage schema updates.
196.Scenario: Schema Validation
Question:
Your team wants to ensure that incoming files strictly follow the existing Delta table schema
and fail ingestion if there are discrepancies. How can you enforce schema validation?
Answer:
Use the failOnNewColumns schema mode to enforce schema validation.
Key points:
• failOnNewColumns prevents ingestion if new columns are detected.
• This is useful when schema consistency is critical.

197. Scenario: Schema Drift Detection


Question:
You suspect schema drift (frequent changes in schema) in the incoming data. How can you
monitor and track these schema changes effectively?
Answer:
Enable schema drift detection by using the cloudFiles.schemaLocation option.
Key points:
• Changes to the schema are logged and stored at the schema location.
• Review these logs periodically to understand schema evolution trends.
198. Scenario: Managing Access to Data Across Teams
Question:
Your organization has two teams: Finance and Marketing. Each team should only access its
respective Delta tables, but both teams are using the same Databricks workspace. How can
you ensure access control using Unity Catalog?
Answer:
Use Table ACLs (Access Control Lists) in Unity Catalog to define granular permissions for
each team.
1. Create a catalog and schema for each team

2. Grant permissions to the respective teams:


3. Deny cross-team access (default behavior ensures no access unless granted).

199. Scenario: Managing Personally Identifiable Information (PII)

Question:
You have a table with PII data, and only authorized users should be able to access sensitive
columns like SSN and Phone Number. How can you enforce this in Unity Catalog?

Answer:
Use column-level permissions to restrict access to sensitive data.

1. Grant permissions to non-sensitive columns only:

GRANT SELECT (name, email) ON TABLE my_catalog.hr.employees TO `general_users`;

2. Restrict access to sensitive columns:

GRANT SELECT (SSN, phone_number) ON TABLE my_catalog.hr.employees TO `hr_admins`;

3. Ensure access is logged:


All access to columns is logged for auditing purposes.

200. Scenario: Enforcing Data Lineage Tracking

Question:
Your organization needs to trace the lineage of data to identify how it flows across tables
and queries for debugging and governance purposes. How can Unity Catalog help?

Answer:
Unity Catalog automatically tracks data lineage across tables, queries, and workflows.

1. Enable lineage tracking in Unity Catalog:


Lineage is automatically enabled once Unity Catalog is set up.
2. View lineage in Databricks:
Use the Data Explorer to see:
o Upstream and downstream dependencies for tables.
o Lineage at the column level.
3. Scenario example:
If a query fails due to unexpected data, you can trace back to the source table or ETL
job that introduced the issue.

201. Scenario 7: Masking Sensitive Data for Non-Privileged Users

Question:
Some users in your organization need access to a table, but sensitive information like salary
should be masked for them. How can Unity Catalog help?

Answer:
Use dynamic data masking in Unity Catalog

202. Scenario 3: Combining RLS and CLS for Sensitive Data

Question:
You manage a financial dataset with sensitive columns like credit_score and loan_amount.
Regional managers should only see data for their region, and only authorized users can
access sensitive columns. How can you enforce both RLS and CLS?
Answer:
Combine Dynamic Views (for RLS) with Column Permissions (for CLS).

1. Create a dynamic view for RLS:

2. Grant permissions on the view:

3. Grant access to sensitive columns for authorized groups:


Only users in sensitive_data_group see the credit_score column.

203. Scenario-Based Question:

Scenario: You are the Unity Catalog administrator for your company's Azure Databricks
workspace. A new team, data-science-team, needs access to various objects within the
catalog prod_catalog. They should be able to:

1. Use the prod_catalog catalog.


2. Use the default schema in prod_catalog.
3. Create tables within the default schema.
4. Execute a specific registered model called prod.ml_team.prediction_model.

You need to assign these privileges to the data-science-team group. Additionally, ensure
that all the privileges are properly granted and that you can verify them. Lastly, if the group
data-science-team no longer needs the CREATE TABLE privilege on prod_catalog.default,
you must revoke that privilege.

Write a PySpark code block that:


1. Grants the appropriate privileges to data-science-team.
2. Verifies the privileges granted to the group.
3. Revokes the CREATE TABLE privilege on the prod_catalog.default schema.

Explanation:

1. Grant Privileges:
o The first four SQL commands grant the required privileges to the data-science-
team group on the prod_catalog catalog, prod_catalog.default schema, and
the registered function prod.ml_team.prediction_model.
2. Verify Privileges:
o The SHOW GRANTS SQL commands are used to verify the privileges granted to
the group on the catalog, schema, and function.
3. Revoke Privilege:
o The REVOKE CREATE TABLE SQL command revokes the privilege to create
tables on the prod_catalog.default schema from the data-science-team group.

This code demonstrates how to manage privileges on Unity Catalog objects within
Databricks using SQL commands in a PySpark environment.

203. Scenario:

You have a schema old_schema in the Hive Metastore that contains both managed and
external tables. You need to:
1. Perform a dry run to check if all the tables in old_schema can be migrated to Unity
Catalog under the new schema new_schema in the main catalog.
2. Perform the actual sync of all eligible external tables from old_schema to
new_schema in Unity Catalog.
3. Change the owner of the newly synced tables to [email protected].
4. Ensure that any Hive managed tables stored outside Databricks workspace storage
are synced as external tables.
5. Finally, replace any tables in new_schema that already exist with the corresponding
tables from old_schema (if necessary).

Solution Code:
Explanation of Concepts Covered:

1. Dry Run (DRY RUN):


o The dry run helps check the eligibility of tables for syncing without actually
modifying any tables. The results will indicate if the tables are ready for
migration.
o Example: SYNC SCHEMA main.new_schema FROM hive_metastore.old_schema
DRY RUN;
2. Syncing Tables:
o The SYNC command can be executed at the schema level or the table level.
o For schema-level sync, all eligible tables from the source schema in Hive
Metastore are synced to Unity Catalog.
o Example: SYNC SCHEMA main.new_schema FROM
hive_metastore.old_schema;
3. Syncing External Tables (AS EXTERNAL):
o The AS EXTERNAL clause is used for syncing managed tables stored outside of
DBFS to Unity Catalog as external tables.
o Example: SYNC SCHEMA main.new_schema AS EXTERNAL FROM
hive_metastore.old_schema;
4. Setting Table Ownership (SET OWNER):
o This option allows the user to set a specific owner for the tables in Unity
Catalog. In this case, the owner is [email protected].
o Example: SET OWNER [email protected];
5. Replacing Existing Tables:
o If a table with the same name already exists in the target schema, the sync
operation will replace it with the source table.
o Example: SYNC TABLE main.new_schema.my_table FROM
hive_metastore.old_schema.my_table;
6. Complete Schema Migration:
o The full migration of both managed and external tables from the Hive
Metastore schema to Unity Catalog is handled with a single command.
o Example: SYNC SCHEMA main.new_schema FROM hive_metastore.old_schema
AS EXTERNAL;

204. Scenario:

You are working in a data engineering team for a large e-commerce company. You are
tasked with managing access control to sensitive customer data in the Unity Catalog.
Specifically, you need to:

1. Secure access to a table named customer_data stored in the schema sales of the
ecommerce_catalog catalog.
2. Create a dynamic view in a secondary schema that hides the email column for non-
admin groups, only allowing admins to see the full email address.
3. Create a dynamic view in the same schema that filters rows based on specific
conditions, allowing only managers to see data for customers with a total spend
greater than $1,000,000 and non-managers to only see data for customers with a
total spend less than or equal to $1,000,000.

You need to:

• Grant the appropriate permissions to the sales_team and admin_group groups.


• Ensure that both the table and views are only accessible by the required groups.

Explanation of Concepts Covered:

1. Granting Permissions on Catalog and Schema:


o GRANT USE CATALOG and GRANT USE SCHEMA are used to allow access to the
catalog and schema, respectively, for the specified groups (e.g., sales_team,
admin_group).
o Example: GRANT USE CATALOG ON CATALOG ecommerce_catalog TO
sales_team;
2. Granting Permissions on Tables:
o GRANT SELECT on the table allows the specified group to view the contents of
the table.
o Example: GRANT SELECT ON ecommerce_catalog.sales.customer_data TO
sales_team;
3. Dynamic View for Column-Level Security:
o The view customer_data_view masks the email column based on the group
membership, allowing full access to email only for admin_group while showing
'REDACTED' for non-admin users.
o Example: CASE WHEN is_account_group_member('admin_group') THEN email
ELSE 'REDACTED' END AS email
4. Dynamic View for Row-Level Security:
o The view customer_data_filtered filters rows based on the total column,
allowing managers to see all data, but limiting non-managers to rows where
total is less than or equal to $1,000,000.
o Example: CASE WHEN is_account_group_member('managers') THEN TRUE
ELSE total <= 1000000 END
5. Granting Permissions on Views:
o After creating the dynamic views, you grant the necessary SELECT permissions
on the views to the appropriate groups.
o Example: GRANT SELECT ON ecommerce_catalog.sales.customer_data_view
TO admin_group;
205. Migrating a Hive Managed Table to Unity Catalog

As part of your company's cloud modernization efforts, you need to migrate a managed
Delta table from the Hive metastore to Unity Catalog. This will allow you to take advantage
of Unity Catalog's advanced data governance and security features.

Steps:

1. Clone the Table to Unity Catalog:

2. Grant Access to the New Table:

3. Add a Comment to the Original Hive Table:


206. Ingesting Data with Delta Live Tables

Your company has various data sources, including cloud object storage and Kafka message
buses. You need to load this data into Delta Live Tables for further processing and analysis.
For cloud object storage, you'll use Auto Loader with Delta Live Tables to efficiently handle
file ingestion. For Kafka, you'll use streaming tables to ingest the real-time data.

Task 1: Load Data from Cloud Object Storage Using Auto Loader

You need to ingest customer and sales order data stored in CSV and JSON formats,
respectively, from cloud object storage into Delta Live Tables. This should be done
incrementally as new files arrive.

Steps:

1. Define a Delta Live Table for customers (CSV format):

2. Define a Delta Live Table for sales orders (JSON format):

Task 2: Load Data from Kafka Message Bus

Your company also uses a Kafka message bus for real-time data streaming. You need to
configure a Delta Live Table to ingest data from Kafka.
Steps:

1. Define a Delta Live Table for Kafka streaming data:

2. Perform downstream operations in SQL to transform the Kafka data:

Task 3: Permissions and Best Practices

To ensure smooth operations, you should follow best practices:

• Use external locations if your Delta Live Tables pipeline uses Unity Catalog.
• Set up file notifications to ensure proper resource cleanup when using Auto Loader.
• For Kafka streaming, optimize your cluster with continuous execution and enhanced
autoscaling for better performance.

207. Scenario: Implementing Slowly Changing Dimensions (SCD) in Delta Live Tables

In this scenario, you will use Delta Live Tables to manage Slowly Changing Dimensions
(SCD) for a users dataset based on change data feed (CDF) source data. You'll process two
types of SCD updates:

• SCD Type 1: Overwrites old records with new ones.


• SCD Type 2: Tracks historical changes by maintaining previous versions of records.

The change events will include operations like INSERT, UPDATE, DELETE, and TRUNCATE.
The examples below show how to implement SCD Type 1 and Type 2 processing in Delta
Live Tables pipelines.

Step 1: Sample Dataset for Change Data Feed (CDF)

The source data contains the following columns:

• userId: Unique identifier for the user


• name: User’s name
• city: User’s city
• operation: The change event (e.g., INSERT, DELETE, UPDATE, TRUNCATE)
• sequenceNum: A sequence number to preserve event order

Example CDF data:

Step 2: Implementing SCD Type 1 (Overwriting Old Records)

SCD Type 1 simply overwrites old records with the most recent ones. In the case of the
UPDATE operation, the previous record is overwritten by the new one. DELETE and
TRUNCATE operations are handled by removing the records as per the change event.

Example for SCD Type 1 Update:


Explanation:

• The APPLY CHANGES INTO statement applies changes from the CDF source table
(cdc_data.users) into the target table.
• It uses DELETE and TRUNCATE operations based on the operation field.
• The STORED AS SCD TYPE 1 ensures that only the latest version of each record is
stored (overwrites any previous versions).

After processing SCD Type 1, the target table will contain:

Step 3: Implementing SCD Type 2 (Tracking Historical Changes)


SCD Type 2 maintains historical changes by adding a new record for every update while
preserving old versions of the data. This approach tracks the changes by adding an effective
date or a flag to indicate active/inactive records.
Example for SCD Type 2 Update:
SQL Code:

Explanation:
• The APPLY CHANGES INTO statement applies changes from the CDF source table
(cdc_data.users) into the target table.
• It uses DELETE operations and processes the sequence by the sequenceNum to
handle out-of-order events.
• The STORED AS SCD TYPE 2 ensures that historical changes are tracked by adding a
new version of the record for every update, instead of overwriting the existing record.
After processing SCD Type 2, the target table will contain:

In the case of userId = 125, a new record is added with the updated city Guadalajara and
marked as current_flag = Y. The previous record with city Tijuana is kept in the table with
current_flag = N to track historical data.
208. Scenario: Periodic Snapshot Processing with SCD Type 2
In this scenario, periodic snapshots of a table stored in a Delta Lake are ingested. The goal is
to implement SCD Type 2 processing, where changes between snapshots are tracked by
maintaining historical versions of records. This example demonstrates how Delta Live Tables
processes and writes the results of the periodic snapshots into a target table.
Step 1: Define the Source Data
The source data (mycatalog.myschema.mytable) contains records that are periodically
updated. For each snapshot timestamp, new records may be added, and existing records
may be updated.
Snapshot at 2024-01-01 00:00:00:

Snapshot at 2024-01-01 12:00:00:

Step 2: Implementing SCD Type 2 Processing


The dlt.apply_changes_from_snapshot function is used to ingest periodic snapshots and
maintain SCD Type 2 records. For this process:
• Records are tracked over time by adding a __START_AT timestamp when they first
appear.
• When records are updated, a new version is inserted, and the __END_AT timestamp
is updated to track when the previous record was superseded.
The key steps in the process are:
1. Define the source: Load the data from the snapshot (mycatalog.myschema.mytable).
2. Create a target table: Create a target table to hold the processed results.
3. Apply changes from snapshot: Use dlt.apply_changes_from_snapshot to perform the
SCD Type 2 processing.
Python Code:

Step 3: Resulting Target Table After Processing


After applying the changes, the target table will store the following records. This table
includes:
• __START_AT: The timestamp when the record first appeared.
• __END_AT: The timestamp when the record was replaced (for updated records). If the
record is currently active, the __END_AT will be null.
Resulting Target Table:

Explanation:
1. Record for Key = 1: The value a1 is inserted at the first timestamp and remains
unchanged, so the __START_AT is set to 2024-01-01 00:00:00, and the __END_AT is
set to 2024-01-01 12:00:00 (the timestamp of the next snapshot).
2. Record for Key = 2: The value a2 appears in the first snapshot and is replaced by b2 in
the second snapshot. The first version of Key = 2 (a2) is recorded with __START_AT set
to 2024-01-01 00:00:00 and __END_AT set to 2024-01-01 12:00:00. The new record
for Key = 2 (b2) is added with __START_AT set to 2024-01-01 12:00:00 and __END_AT
set to null since it's the latest record.
3. Record for Key = 3: The value a3 appears in the second snapshot with no previous
history, so it is inserted with __START_AT set to 2024-01-01 12:00:00 and __END_AT
set to null.
209. Scenario: Real-Time Sales Analytics Pipeline
Business Requirement: The company needs to analyze real-time sales data from various
stores and provide insights into total sales by region and product category. The pipeline
should be designed to:
• Ingest raw sales data continuously (streaming).
• Join this data with static customer information to identify customer regions and
product categories.
• Perform real-time aggregation to compute total sales per region and category.
• Store final results for further analysis.

Solution: Using Streaming Tables, Materialized Views, and Temporary Tables


Step 1: Raw Data Ingestion (Bronze Table - Streaming Table)
The pipeline begins by continuously ingesting raw sales data (in JSON format) from an S3
bucket. This raw data contains fields like sale_id, customer_id, product_id, amount, and
timestamp.
• Streaming Table (Bronze Table): Ingests raw sales data continuously.
• This table is defined as a streaming table because new sales transactions are being
continuously added.

Step 2: Join with Static Customer Information (Silver Table - Streaming Table)
After ingesting the raw data, we join it with static customer information, which provides
customer_id, region, and category (from a static customers table). This enables us to enrich
the sales data with geographical and product category information.
• Streaming Table (Silver Table): Performs the join operation, enriching the sales data
with customer information.
• The static customers table is not changing over time, so it remains the same
throughout the processing.

Step 3: Perform Aggregation for Real-Time Insights (Gold Table - Materialized View)
Now, the pipeline computes total sales by region and product category. This aggregation is
the final result of the pipeline and is stored in a materialized view. The materialized view will
be periodically refreshed to reflect the latest aggregated results.
• Materialized View (Gold Table): Aggregates total sales by region and product
category, and stores the results for querying and further analysis.

Step 4: Temporary Table for Intermediate Calculations


If there are intermediate calculations (e.g., calculating discounts or tax), you can use
temporary tables. These tables store intermediate results but are not meant to be accessed
outside the pipeline.
For example, if you want to calculate the discount on each sale before applying it to the
final aggregation:

This table is used only within the pipeline to calculate discounts and is not published to the
final schema.

Final Result:
The pipeline processes real-time sales data and aggregates it continuously to provide
insights like total sales by region and product category. Here's how the tables fit into the
pipeline:
1. Bronze Table (Streaming Table): Raw sales data ingestion.
2. Silver Table (Streaming Table): Enriched data with customer region and category.
3. Gold Table (Materialized View): Final aggregated results for total sales by region and
category.
4. Temporary Tables: For intermediate calculations (like discounts).
Benefits of This Approach:
• Streaming Tables allow real-time ingestion and processing of sales data.
• Materialized Views efficiently store and aggregate the final results, reducing
computation time for downstream queries.
• Temporary Tables enable complex transformations and intermediate steps without
affecting the final schema.
• By combining these elements, the pipeline can efficiently process and provide insights
into sales data with minimal re-processing, ensuring high throughput and low latency.
210. Scenario: Customer Order Data Validation
In this scenario, you are processing customer order data from multiple sources. The data
includes information about the customer, order date, product, quantity, and price. You need
to ensure that:
1. The order_date is not earlier than January 1, 2020.
2. The product_id and product_name are not NULL.
3. The quantity ordered is always greater than zero, and the price is also greater than
zero.
The pipeline processes customer orders, and you need to apply the following validation:
1. Drop records where the order_date is before January 1, 2020.
2. Drop records where product_id or product_name are missing.
3. Fail the pipeline if the quantity or price is less than or equal to zero.

Explanation of Actions:
1. Valid order_date: This checks if the order_date is not earlier than January 1, 2020. If
it fails, it will be ignored.
2. Valid product_id and product_name: Ensures that both the product_id and
product_name fields are not NULL. If either is missing, the record will be dropped.
3. Valid quantity and price: Ensures that both quantity and price are greater than zero.
If this condition is violated, the pipeline will fail immediately, and no updates will be
applied until the issue is resolved.

Check below material in top mate for notes and referral


1.Python 200 + Int question and Answer - https://topmate.io/shivakiran_kotur/1337666
2.Spark Quick notes and pyspark basic - https://topmate.io/shivakiran_kotur/1244562
3.ADF basic important Scenario based Q n A -
https://topmate.io/shivakiran_kotur/1244608
4.Azure data Engineer Must do Question before Interview -
https://topmate.io/shivakiran_kotur/1244480
5.Dp 203 quick notes and dump question Screenshot -
https://topmate.io/shivakiran_kotur/1214127
6.Have any doubts do connect with me 1:1 on Topmate:
https://topmate.io/shivakiran_kotur/1208377

I kindly request you to share your feedback as you go through the questions. Your insights
will help me tremendously in improving the material. Also, feel free to ask for additional
resources or clarification wherever needed. Your journey to becoming a Data Engineering
expert is just a few steps away!

You might also like