B LSC CD W1 Geiv Yx BAmc EE3 U
B LSC CD W1 Geiv Yx BAmc EE3 U
Prepare for your Data Engineering interviews with over 200+ scenario-based questions
covering key concepts in PySpark and Databricks:
PySpark Topics:
➢ Creating DataFrames & Schema Addition: Build and define schemas for structured
data.
➢ Loading Activities: Loading data from CSV and JSON sources.
➢ Column Functions: Master operations like select, selectExpr, filter, withColumn,
withColumnRenamed, and more.
➢ Handling Data: Learn how to handle distinct values, drop duplicates, orderBy,
groupBy, and use fillna for null handling.
➢ Sorting & Limiting: Use orderBy and limit effectively.
➢ String Functions: Handle text data with functions like concat, split, trim, and other
string manipulation techniques.
➢ Joins & Window Functions: Understand joins (inner, outer, etc.) and window
functions for advanced transformations.
➢ Cast Functions & Union: Master type casting, union, and unionAll operations.
➢ Repartition & Coalesce: Efficient partition management for optimized performance.
➢ Broadcast Variables: Improve performance with broadcasting in large datasets.
➢ Spark Architecture: Focus on partitioning, bucketing, join optimizations, and overall
performance improvements.
Databricks Topics:
➢ Delta Tables: Deep dive into Delta Lake features like ACID transactions, schema
evolution, and time travel.
➢ Structured Streaming: Stream real-time data with ease.
➢ Auto Loader: Automate data ingestion for real-time updates.
➢ Unity Catalog: Manage your data governance and security efficiently with Unity
Catalog.
➢ Data Lineage (DLT): Capture data lineage and maintain transparency in your
pipelines.
How to Use the Material:
1. Start from the Basics: Begin with easy questions, then progress to more advanced
scenarios as you gain confidence.
2. PySpark is a Must: Make sure you cover all PySpark-related topics thoroughly, as they
form the foundation of data engineering roles.
3. Structured Streaming & DLT: Optional for coding practice but a great bonus if you
want to specialize further in real-time data processing.
I kindly request you to share your feedback as you go through the questions. Your insights
will help me tremendously in improving the material. Also, feel free to ask for additional
resources or clarification wherever needed. Your journey to becoming a Data Engineering
expert is just a few steps away!
1.How would you read data from a CSV file in batch mode in PySpark on Databricks?
Answer: To read data from a CSV file in batch mode, you can use the spark.read.format()
API. Here’s an example:
3. How do you handle multi-line JSON or nested JSON records while reading in PySpark?
Answer: For JSON files with multi-line records, set the multiLine option to true.
4. How do you read Parquet files with schema merging in batch mode?
Answer: Enable mergeSchema to handle schema evolution when reading Parquet files.
5. How can you read Avro files in batch mode in Databricks?
Answer: Ensure that the avro package is available, and use the format avro to read the files.
11. How do you handle files with missing values while reading in batch mode?
Answer: You can set nullValue and nanValue options to handle missing data.
12. How do you read multiple file types (e.g., CSV, JSON) from a single directory?
Answer: PySpark doesn’t support mixed file types directly. You can filter files by extension
using Python and then read them.
13. How would you handle reading data from multiple files in batch processing?
Answer: To read data from multiple files, you can provide a path with wildcards or a list of
file paths. For example, to read all CSV files in a directory:
*.csv: Reads all CSV files in the specified directory.
14. How can you read data from an external database (e.g., SQL) in batch mode?
Answer: To read data from an external database (like MySQL or PostgreSQL) in batch mode,
use the jdbc() method:
• option("url", jdbc_url): Specifies the JDBC URL to connect to the database.
• option("dbtable", "table_name"): Specifies the table to read from.
15. How can you handle data partitioning when reading large datasets in batch mode?
Answer: To optimize the reading of large datasets, you can use the partitionBy option or
specify how data should be split into partitions using the spark.read.option() API:
repartition(10): Creates 10 partitions for the data. You can use coalesce() to reduce
partitions when writing data.
16. How do you read data from a CSV file where the schema is already known in batch
mode?
Answer: When you know the schema beforehand, you can define the schema using
StructType and StructField, and pass it to the spark.read method.
schema(schema): Passes the schema to avoid inferring schema, which can be slow for large
datasets.
17. How would you read a large CSV file with multiple delimiters in batch mode?
Answer: To read a CSV file with multiple delimiters, you can use a custom delimiter or
regular expressions to parse the file properly.
option("delimiter", "|"): Specifies the delimiter used in the CSV file (could be a comma,
tab, etc.).
18. How would you read data from a CSV file in Databricks and infer the schema
automatically?
Answer: To read data from a CSV file with automatic schema inference, you can use the
option("inferSchema", "true") in Databricks.
• option("inferSchema", "true"): Enables automatic schema inference.
• option("header", "true"): Treats the first row as column names.
Note: Using inferSchema in PySpark can be inefficient for large datasets because it requires
scanning the entire dataset (or a significant portion) to determine the schema, which adds
overhead. It's better to define the schema explicitly for better performance and to ensure
consistency.
19. What are the different read modes in PySpark, and how do they handle bad data?
Answer: PySpark provides the following read modes to handle corrupt or missing data when
reading files:
• PERMISSIVE (default): Loads all records and puts corrupt records in a separate
column (e.g., _corrupt_record for JSON).
• DROPMALFORMED: Discards bad records and loads only valid records.
• FAILFAST: Fails the job if any corrupt records are encountered.
20. How do you use FAILFAST mode to ensure data integrity while reading a CSV file?
Answer: The FAILFAST mode ensures that the job fails if any row contains bad or corrupt
data, preserving data quality.
22. How can you handle corrupt JSON records when reading a file in PySpark?
Answer: You can use the default PERMISSIVE mode to handle corrupt JSON records. It
places problematic records in a special column called _corrupt_record.
26. What is the purpose of badRecordsPath in Databricks, and how do you use it?
Answer: The badRecordsPath option in Databricks (or Spark) is used to handle malformed or
corrupt records during reading data. When enabled, any records that fail to meet the
expected schema or format (e.g., improperly formatted JSON or CSV rows) are stored in a
separate directory specified by badRecordsPath. This helps in isolating problematic data for
debugging or further inspection without interrupting the processing of valid data.
Example Usage:
if any rows in the sample.csv file are malformed or fail to parse correctly, they will be saved
to /mnt/bad_records for further review. This allows you to identify and resolve issues with
the bad records without affecting the overall data processing pipeline.
27. How would you write a DataFrame to a Delta table in batch processing?
Answer: To write data to a Delta table, you use the write.format("delta") method. Here’s
how you would do it:
mode("overwrite"): Overwrites the existing Delta table. You can also use append to add
data to the existing Delta table.
28. How would you write batch data to a file system (e.g., HDFS or S3)?
Answer: To write batch data to a file system like HDFS or S3, you can specify the output path
with the respective file format:
29. How do you write data from a batch process to a database in Databricks?
Answer: You can use the jdbc format to write data to a database. Here’s an example of
writing data to MySQL:
30. How would you read data from an external database (PostgreSQL) in batch mode and
write the data to a Delta table?
Answer: To read data from an external database like PostgreSQL, use the jdbc() method,
and then write the data to a Delta table:
➢ Flatten Arrays: Use explode() to convert each element of an array into individual
rows.
➢ Flatten Structs: Use dot notation (e.g., struct.field) to extract fields from nested
structures.
➢ Iterate and Combine: Repeat these steps iteratively for all nested levels until the
DataFrame is fully flattened.
Step 1: Read the nested JSON file
Step 2: flatten the nested fields i.e array column order
33. How do you handle null values in a specific column while reading data from a CSV file
in Databricks?
Scenario: You are reading a CSV file where some rows have null values in a critical column.
You want to filter out these rows during the read process.
Answer: You can use a filter operation after reading the data to exclude rows with null
values.
34. How do you replace null values with a default value during a batch write in
Databricks?
Scenario: While writing data to a Parquet file, some columns have null values. You need to
replace these null values with default values before writing.
Answer: You can use the fillna method to replace null values.
35. How do you identify and handle rows with all null values in Databricks?
Scenario: Your dataset contains rows where all columns are null. You want to filter out such
rows.
Answer: You can use the dropna method with how="all" to remove rows where all values
are null.
This approach ensures that only meaningful data is retained in your DataFrame.
36. How do you handle null values dynamically in Databricks if you don't know the
column names in advance?
Scenario: You are working with a dataset where column names are dynamic, and you need
to replace all null values with defaults without hardcoding column names.
Answer: You can iterate over all column names and use the fillna method dynamically.
This approach provides a dynamic way to handle null values based on column data types.
37. How do you convert a string column to a date column while handling invalid date
formats?
Scenario: You have a column with date values stored as strings. Some rows have invalid
formats, and you need to convert valid ones to a date column while handling invalid entries.
Answer: Use the to_date function with a specific date format and filter out invalid dates.
Invalid date formats will result in nulls, which can then be handled appropriately.
38. How do you handle time zone conversions in a timestamp column in Databricks?
Scenario: Your dataset contains timestamps in UTC, but you need to convert them to a
specific time zone (e.g., PST).
Answer: Use the from_utc_timestamp function for time zone conversion.
This ensures that the timestamp is correctly converted to the desired time zone.
39. How do you calculate the difference between two date columns in Databricks?
Scenario: You have two date columns, and you need to calculate the difference in days
between them.
Answer: Use the datediff function to compute the difference in days.
This provides an easy way to calculate the difference between two dates.
40. How do you add or subtract days from a date column in Databricks?
Scenario: You need to add 30 days to a date column for a future projection calculation.
Answer: Use the date_add or date_sub function to modify dates.
41. How do you extract specific components (year, month, day) from a timestamp
column?
Scenario: You have a timestamp column, and you need to extract the year, month, and day
as separate columns for reporting purposes.
Answer: Use the year, month, and dayofmonth functions to extract these components.
42. How do you identify duplicate records in a DataFrame and remove them?
Scenario: You are working with a dataset that contains duplicate records, and you need to
remove the duplicates based on specific columns.
Answer: Use the dropDuplicates method to remove duplicate records based on specific
columns.
This ensures that only unique records based on column1 and column2 are retained.
43. How do you ensure that a column has no null values before proceeding with further
transformations?
Scenario: You are processing a dataset where a critical column must not have null values. If
null values are present, the processing should stop.
Answer: Use a validation step with a count of null values and raise an exception if any are
found.
This prevents processing incomplete or invalid data.
44. How do you validate that all values in a column fall within a specific range?
Scenario: You have a column with numeric data, and all values must lie within the range [0,
100]. Records outside this range should be logged for review.
Answer: Filter out records that fall outside the range and log them separately.
45. How do you check for unique values in a primary key column?
Scenario: You need to ensure that a primary key column has unique values and identify any
duplicates.
Answer: Use a groupBy operation to count occurrences and filter out records with counts
greater than 1.
48. How do you enforce data type validation for a column? Scenario: You have a column
that must contain only integers, but some records have invalid data types (e.g., strings).
Answer:
1. Try to cast the column to the desired data type (in this case, IntegerType).
2. Use a custom validation function to check if the value can be cast successfully.
3. Filter out invalid data based on whether the cast operation was successful or not.
49. How do you perform a join to match records only when a key exists in both
DataFrames?
Scenario: You have two DataFrames: one with customer details and another with
transaction details. You need to find only those transactions where the customer exists in
the customer DataFrame.
Answer: Perform an inner join to keep only matching records.
54. How do you optimize a join when one DataFrame is much smaller than the other?
Scenario: You are joining a small DataFrame of country codes with a large DataFrame of
customer addresses.
Answer: Use a broadcast join to improve performance.
Broadcasting the smaller DataFrame ensures efficient execution by sending it to all nodes.
59. What is a Left Semi Join, and when would you use it?
Scenario: You have two DataFrames: orders_df and customers_df. You need to identify all
orders where the customer exists in the customers_df but you do not need to retrieve
customer details.
Answer: A Left Semi Join returns only the rows from the left DataFrame where there is a
matching row in the right DataFrame. This is useful when you only care about the existence
of matching records but don't need the columns from the right DataFrame.
In this case, it returns orders that have a matching customer, without including the
customer details in the output.
60. How do you use a Left Anti Join to find records in one DataFrame but not in another?
Scenario: You have a DataFrame of transactions_df and another of customers_df. You need
to identify transactions that don’t have a matching customer.
Answer: A Left Anti Join returns rows from the left DataFrame that do not have a matching
row in the right DataFrame.
This will return transactions that have no corresponding customer in the customers_df.
63. How do you join two DataFrames where one DataFrame has a partitioned column?
Scenario: You have a sales_df partitioned by region_id and a region_details_df with regional
information. You need to join these DataFrames efficiently by leveraging the partitioned
column.
Answer: When dealing with partitioned DataFrames, it's efficient to filter by the partition
key to reduce unnecessary shuffling.
Filtering the partition key before the join reduces the amount of data shuffled across the
network, improving performance.
64. How do you rename columns in a PySpark DataFrame?
Scenario: You have a DataFrame employee_df with columns: emp_id, emp_name, and
salary. You need to rename the emp_id column to employee_id and the emp_name column
to employee_name.
Answer: You can use the withColumnRenamed() method to rename columns.
This will rename the columns emp_id and emp_name to employee_id and employee_name,
respectively.
65. How do you drop one or more columns from a PySpark DataFrame?
Scenario: You have a DataFrame sales_df with columns order_id, customer_id, product_id,
price, and discount. You need to remove the discount and price columns.
Answer: You can use the drop() method to remove columns.
This will remove the price and discount columns from the DataFrame.
66. How do you add a new column based on a condition in PySpark?
Scenario: You have a DataFrame transactions_df with a column amount. You need to add a
new column status based on the value of amount. If amount is greater than 1000, status
should be 'high', otherwise 'low'.
Answer: You can use withColumn() along with when() and otherwise() to create a new
column based on conditions.
This will add the status column with values 'high' or 'low' based on the condition applied to
the amount column.
67. How do you handle null values in a specific column in PySpark?
Scenario: You have a DataFrame user_df with a column age. Some rows in the age column
have null values. You need to replace these null values with a default value of 0.
Answer: You can use the fillna() method to replace null values with a default value.
This will replace all null values in the age column with 0.
68. How do you extract the year from a date column in PySpark?
Scenario: You have a DataFrame orders_df with a column order_date of type Date. You
need to create a new column order_year that contains the year extracted from the
order_date column.
Answer: You can use the year() function to extract the year from a date column.
This will add a new column order_year that contains the year extracted from the order_date
column.
69. How do you create a column with the concatenation of two or more columns in
PySpark?
Scenario: You have a DataFrame customer_df with columns first_name and last_name. You
need to create a new column full_name by concatenating first_name and last_name.
Answer: You can use the concat() function to concatenate columns.
This will add a new column full_name which contains the concatenation of first_name and
last_name.
70. How do you rename multiple columns at once in PySpark?
Scenario: You have a DataFrame employee_df with columns emp_id, emp_name, and
emp_salary. You need to rename all the columns to employee_id, employee_name, and
salary.
Answer: You can use the toDF() method to rename multiple columns at once.
73. How do you check if a string column contains a specific substring in PySpark?
Scenario: You have a DataFrame order_df with a column order_description. Some of the
descriptions contain the word "urgent". You need to create a new column is_urgent that is
True if the order_description contains the word "urgent", and False otherwise.
Answer: You can use the contains() function to check if a substring exists in a string column.
This will add a new column is_urgent with True or False depending on whether the
order_description contains the word "urgent".
74. How do you extract the domain name from an email address in PySpark?
Scenario: You have a DataFrame employee_df with a column email containing email
addresses like [email protected]. You need to extract the domain name (e.g.,
example.com) from the email addresses.
Answer: You can use the split() function to split the email string at the @ character and
extract the domain part.
This will create a new column domain containing the domain name from the email
addresses.
75. How do you remove leading and trailing spaces from a string column in PySpark?
Scenario: You have a DataFrame product_df with a column product_name. Some rows have
extra spaces before or after the product name, and you need to trim these spaces.
Answer: You can use the trim() function to remove leading and trailing spaces from a string
column.
This will remove any leading or trailing spaces from the product_name column.
76. How do you convert a string column to uppercase or lowercase in PySpark?
Scenario: You have a DataFrame customer_df with a column customer_name. You need to
convert the customer_name column to uppercase.
Answer: You can use the upper() function to convert a string column to uppercase.
77. How do you split a string into multiple columns based on a delimiter in PySpark?
Scenario: You have a DataFrame employee_df with a column full_address containing
addresses like '123 Main St, Springfield, IL'. You need to split this address into three
columns: street, city, and state.
Answer: You can use the split() function to split the string based on a delimiter (in this case,
the comma ,) and extract the individual components into separate columns.
This will split the full_address column into three new columns: street, city, and state.
This will split the full_address column into three new columns: street, city, and state.
78. How do you replace specific characters in a string column in PySpark?
Scenario: You have a DataFrame product_df with a column product_code containing
product codes like 'AB-123', 'CD-456', etc. You need to remove the hyphen - from these
codes.
Answer: You can use the regexp_replace() function to replace specific characters in a string
column.
This will remove the hyphen - from the product_code column.
79. How do you handle empty strings in a DataFrame column in PySpark?
Scenario: You have a DataFrame user_df with a column phone_number. Some rows have
empty strings ("") in the phone_number column, and you need to replace those empty
strings with NULL.
Answer: You can use the when() and otherwise() functions to replace empty strings with
NULL.
This will replace any empty string values in the phone_number column with NULL.
80. How do you extract a substring from a string column in PySpark?
Scenario: You have a DataFrame transaction_df with a column transaction_id. You need to
extract the first 5 characters of each transaction_id.
Answer: You can use the substr() function to extract a substring from a string column.
This will create a new column short_transaction_id containing the first 5 characters of the
transaction_id.
81. What is the difference between union() and unionByName() in PySpark?
Scenario: You have two DataFrames, df1 and df2, with the same schema but different
column names. You need to combine the two DataFrames into one. How do you handle this
situation?
Answer: The difference between union() and unionByName() lies in how they handle
column order and names:
• union(): Combines two DataFrames with the same schema (same column names and
order). If the schemas don't match, it will raise an error.
If the column names differ and you want to keep all columns, you can also use
allowMissingColumns=True:
82. How do you perform a set difference operation (similar to EXCEPT) in PySpark?
Scenario: You have two DataFrames, df1 and df2, and you want to find the rows that are in
df1 but not in df2.
Answer: You can use the exceptAll() method to perform a set difference between two
DataFrames.
The exceptAll() function returns the rows present in df1 but not in df2. It will include
duplicates if they exist.
83. How do you perform an intersection operation in PySpark?
Scenario: You have two DataFrames, df1 and df2, and you want to find the rows that exist in
both DataFrames.
Answer: You can use the intersect() function to get the common rows between two
DataFrames.
This will return the rows that are common in both df1 and df2.
84. How can you handle duplicate rows when using union() in PySpark?
Scenario: You have two DataFrames, df1 and df2, and both contain duplicate rows. You
need to combine them and eliminate any duplicates in the resulting DataFrame.
Answer: To eliminate duplicates after performing a union() operation, you can use the
distinct() function.
This will combine the rows from both DataFrames and remove any duplicates in the result.
85. How do you calculate the running total of a column using window functions?
Scenario: You have a DataFrame containing sales data for different stores, and you want to
calculate a running total of sales for each store.
Answer: You can use row_number(), sum(), or cumsum() window functions to calculate
running totals.
Note: Unbounded Preceding is helpful when you need to compute values based on the
entire history of data within a partition, including the first row, up to the current row.
Key Points:
1. Unbounded Preceding: The window starts at the first row in the partition.
2. Rows Between: The frame includes all rows from the beginning of the partition to the
current row (Window.unboundedPreceding, Window.currentRow).
3. Use Case: Commonly used in cumulative calculations like running totals, averages, or
historical data tracking within a partition.
86. How do you get the rank of each row within each group based on a specific column in
PySpark?
Scenario: You have a DataFrame with employee performance scores, and you need to rank
employees within each department based on their score.
Answer: You can use rank() or dense_rank() to assign ranks based on a column.
This will rank employees within each department based on their score.
87. How do you calculate the difference between a row value and the previous row's
value using window functions?
Scenario: You have a DataFrame with daily temperature readings, and you want to calculate
the difference between each day's temperature and the previous day's temperature.
Answer: You can use lag() or lead() functions to calculate the difference between the
current row and the previous row.
This will calculate the difference in temperature between the current day and the previous
day.
88. How do you calculate the average salary within each department for employees using
window functions?
Scenario: You have a DataFrame with employee details, including department and salary.
You need to calculate the average salary within each department.
Answer: You can use the avg() window function to calculate the average salary within each
department.
This will calculate the average salary within each department and add it as a new column.
89. How do you calculate the cumulative sum of a column based on a certain order?
Scenario: You have a DataFrame of sales transactions, and you want to calculate the
cumulative sum of sales ordered by transaction date.
Answer: You can use sum() over a window specification to calculate the cumulative sum.
This will calculate the cumulative sum of sales based on the transaction_date.
90. How do you handle ties in ranking using window functions?
Scenario: You have a DataFrame with employee scores, and you want to rank employees
within each department. Some employees have the same score. How do you handle ranking
with ties?
Answer: You can use dense_rank() to assign the same rank to tied rows or use rank() to
leave gaps in the rank
The dense_rank() function will assign the same rank to employees with the same score
without leaving gaps in the ranking.
91. How do you calculate the row number within each group?
Scenario: You have a DataFrame with sales data for multiple regions, and you want to assign
a unique row number to each sale within each region, ordered by sales amount.
Answer: You can use the row_number() window function to assign a unique row number
within each group.
This will assign a unique row number to each sale within each region, ordered by sales in
descending order.
92. How do you calculate the first value and last value in each group using window
functions?
Scenario: You have a DataFrame with timestamps for each product's sale, and you need to
get the first and last sale for each product.
Answer: You can use first() and last() window functions to get the first and last values in
each group.
This way, the join operation will be more efficient as data will be more evenly distributed
across partitions.
94. Data Skew
Scenario: During a join between two large datasets, you notice that some partitions contain
a significantly larger amount of data than others, leading to an imbalance in processing
time. How would you handle data skew in Spark?
Answer: Data skew occurs when certain keys or partitions contain disproportionately large
data. To handle this, we can apply techniques like:
➢ Salting: Add a random prefix to the skewed key to distribute the data more evenly
across partitions.
➢ Broadcast Join: If one of the datasets is much smaller, broadcasting the smaller
dataset can significantly reduce the shuffle cost and help in resolving data skew.
95. Caching
scenario: You are working on an ETL pipeline that involves multiple transformations on a
dataset, and you notice that each transformation is taking a long time to run. What would
be your approach to improve the performance of repeated transformations?
Answer: In this case, caching or persisting the dataset in memory can help avoid
recomputing the transformations multiple times. This is particularly useful when the dataset
is used multiple times across different stages.
By caching the dataset, Spark will store it in memory, so subsequent transformations on the
same dataset will be faster.
96. Persisting Data
scenario: You are processing a large dataset and need to store intermediate results between
stages to avoid recomputing data. What’s the difference between cache and persist, and
how do you decide which one to use?
Answer: The key difference between cache and persist is that cache is a shorthand for
persisting data in memory, while persist gives you more flexibility in choosing the storage
level (e.g., memory and disk, just disk, etc.).
➢ cache(): By default, it stores data in memory, and it’s ideal when you are working with
datasets that can fit in memory and are reused multiple times.
➢ persist(): Provides more flexibility by allowing you to specify a storage level. For
example, if the dataset is too large to fit in memory, you might choose to persist it to
disk as well.
You should use persist when you need more control over storage and want to persist data in
multiple storage levels, while cache is suitable for simpler use cases where data fits entirely
in memory.
97. Handling Multiple Partitions with coalesce()
scenario: You have a DataFrame with a large number of partitions, but you need to write
the output to a file system, and you want to reduce the number of output files. How would
you optimize the number of partitions before writing the data?
Answer: In this case, we can use coalesce() to reduce the number of partitions without
triggering a full shuffle (which would be more expensive). It is especially useful when you
want fewer partitions for output purposes, like writing to files.
coalesce() is preferable when decreasing the number of partitions because it is more
efficient than repartition(), which involves a full shuffle.
98. Partitioning Data Before Writing to Avoid Skew
scenario: You are writing data to a Parquet file, and you notice that the output directory
contains a large number of small files. How would you address this problem?
Answer: In this case, we can repartition the DataFrame before writing it to Parquet. By
setting an appropriate number of partitions, we can avoid generating too many small files.
The number of partitions should align with the desired number of output files.
This way, we can control the number of output files by adjusting the number of partitions.
This will give you the total sales amount for each region.
100. Calculating Average Revenue per Product
Q: You have a dataset containing the product_id, sales_amount, and quantity_sold columns.
You want to calculate the average revenue per product. How do you perform this
aggregation?
A: You can use the groupBy function on product_id and then compute the average revenue
for each product using the avg function.
This will give you the top 3 products by total sales amount.
This will give you the total sales per product for each store within the past 30 days.
104. Aggregating Data with Multiple Aggregations
Q: You have a dataset with employee_id, department, salary, and bonus columns. You want
to calculate the total salary and average bonus for each department. How do you achieve
this in PySpark?
A: You can use groupBy on the department column and apply multiple aggregations like
sum for salary and avg for bonus.
This will give you the total salary and average bonus per department.
105. Calculating Monthly Sales Growth
Q: You have a dataset with sales_date, sales_amount, and product_id. You want to calculate
the sales growth for each product from one month to the next. How would you approach
this?
A: First, you can extract the month and year from the sales_date column. Then, group by
product_id and month_year, calculate total sales per month, and compute the month-over-
month growth.
This would calculate the sales growth from one month to the next for each product.
106. Calculate the Maximum and Minimum Salary by Department
Q: You have an employee dataset containing columns employee_id, department, salary, and
years_of_experience. You want to find the maximum and minimum salary for each
department. How would you approach this?
A: You can use groupBy to group by the department column and then apply aggregation
functions max and min to get the maximum and minimum salaries.
This will give you the maximum and minimum salary for each department.
107. Calculate the Total Sales and Count of Transactions by Month
Q: You have a sales dataset with transaction_date, sales_amount, and product_id. How
would you calculate the total sales and count of transactions for each product by month?
A: You can use month and year from the transaction_date to extract the month, group by
product_id and the month, and then use sum for total sales and count for the number of
transactions.
This will give you the total sales and count of transactions for each product per month.
108. Average Age per Department for Employees
Q: You have a dataset of employees with columns employee_id, age, and department. You
need to find the average age of employees in each department. How would you write the
query?
A: You can group the dataset by department and apply the avg aggregation function to
calculate the average age for each department.
This will give you the average age of employees for each department.
109. Total Revenue by Product and Region
Q: You have a dataset with columns product_id, region, quantity_sold, and price_per_unit.
How would you calculate the total revenue by product_id and region?
A: You can calculate revenue by multiplying quantity_sold with price_per_unit, then use
groupBy to group by product_id and region, followed by the sum function to get the total
revenue.
This will give the total and average purchase amounts for each customer per category.
111. Find Products Sold More Than 1000 Units
Q: You have a sales dataset with product_id, units_sold, and sales_amount. How would you
find products that have sold more than 1000 units in total?
A: You can group by product_id and aggregate sum(units_sold) and then filter products with
total units sold greater than 1000.
This will return the products that have sold more than 1000 units.
This will give you the month-over-month sales growth for each product.
113. How do you manage schema evolution in Delta Tables?
Q: You are working with a Delta table that receives data from multiple sources, and you
need to handle schema changes dynamically (e.g., new columns added). How can you
manage this scenario?
A: Delta tables support schema evolution by automatically adjusting the schema when new
columns are added to the incoming data. To enable this feature, you can use the
mergeSchema option while writing data to the Delta table.
118. How do you perform CDC (Change Data Capture) in Delta tables?
Q: You need to perform Change Data Capture (CDC) on a Delta table, capturing inserts,
updates, and deletes. How can you implement this?
A: You can use MERGE operations in Delta for CDC by matching records based on primary
keys and applying insert, update, or delete actions
In the provided code, CDC is implemented using the MERGE statement, which performs the
following actions:
1. whenMatchedUpdate: Updates the records in the target table (deltaTable) where a
matching record exists in the source DataFrame (df) based on the id column. In this
case, it updates the value column in the target table with the value from the source.
2. whenNotMatchedInsert: If a record in the source DataFrame (df) doesn't exist in the
target Delta table (deltaTable), it inserts a new record with the id and value from the
source DataFrame.
3. whenNotMatchedDelete: If a record in the target Delta table (deltaTable) doesn't
exist in the source DataFrame (df), it deletes the record from the Delta table based on
the condition specified.
This approach ensures that only the changes (new, updated, or deleted records) are
captured and processed, enabling efficient and incremental data loading.
119. How do you handle missing or corrupt data while reading from a Delta table?
Q: You are reading from a Delta table, and some data records are corrupt or missing. How
would you handle this situation?
A: Use badRecordsPath to log and store any corrupted or missing records for further
investigation.
120. How do you remove old files in a Delta table?
Q: Delta tables store old versions of data files, which may accumulate over time. How do
you remove old files to free up space?
A: You can use the VACUUM command to clean up old files that are no longer in use.
121. How do you track changes and monitor Delta table operations?
Q: You need to track changes and monitor operations (like inserts and updates) on a Delta
table. How can you achieve this?
A: You can use Delta logs to track changes in the table. Additionally, Delta supports
transaction logs which provide a detailed history of actions performed on the table. You can
query the transaction logs for auditing.
122. How do you manage large batch writes to Delta tables efficiently?
Q: You need to write large volumes of data to a Delta table. How can you efficiently manage
large batch writes?
A: Use partitioning and optimize the Delta table for large batch writes. Writing data in
partitions improves performance.
123. Scenario-Based Question on Full Load vs. Incremental Load in Data Processing:
Question:
You are working with a retail business that collects daily sales data in a large file format
(CSV) and stores it in a data lake on Azure. The data includes the following fields: sale_id,
product_id, sale_amount, sale_date, store_id, and customer_id. The business has two
primary requirements for the data pipeline:
1. Load the data into a data warehouse daily (i.e., perform a Full Load).
2. Periodically, only capture Incremental Loads to update the data warehouse with
changes (e.g., new sales, updates to sale details).
How would you implement both Full Load and Incremental Load in Databricks using
PySpark and Delta Lake?
Answer:
1. Full Load:
In a Full Load scenario, the entire dataset is replaced or reloaded into the target system
every time. This is suitable when the dataset is small or when the data source has no
reliable mechanism to identify changes.
Steps for Full Load:
• Read the entire dataset from the source (e.g., CSV or Parquet file).
• Write the data to the target data warehouse or Delta table, effectively overwriting
the previous dataset.
In this case, every time the pipeline runs, the data will be reloaded and replace the previous
data.
2. Incremental Load:
An Incremental Load is used to load only the newly added or modified data since the last
load. This can be done by comparing the sale_date or a unique identifier (such as sale_id) to
detect new or modified records.
Steps for Incremental Load:
• Track changes by identifying the records with a timestamp (e.g., sale_date) or by
using a watermark to capture new or updated data.
• Use Delta Lake to read only the new data or records that have changed (based on the
timestamp or unique identifier).
• Merge the new data with the existing Delta table to ensure only new and updated
records are added.
Key Steps in Incremental Load:
• Reading New Data: You only read the data that has been added or modified since the
last load, typically by using a timestamp or a unique identifier like sale_id.
• Merging with Delta Table: Using the MERGE operation, you update existing records
and insert new records where necessary, ensuring that only changes are processed.
• Efficiency: This approach improves performance by processing only the necessary
data, rather than reloading the entire dataset.
124. Scenario-Based Question on Slowly Changing Dimension (SCD) Type 1 in Data
Processing:
Question:
You are working with a customer data table in a retail business that tracks customer
information, including customer_id, name, email, and address. The data is continuously
updated as new information is available. For instance, if a customer changes their address
or email, the new value should overwrite the old one. This is an example of Slowly
Changing Dimension Type 1 (SCD1).
The table is stored as a Delta table, and you need to design a process to handle this update
scenario in a data pipeline running in Databricks.
How would you implement the SCD Type 1 logic in PySpark using Delta Lake to ensure that
the latest information always overwrites the previous records for the same customer_id?
Answer:
In SCD Type 1, when there is a change in the attribute (e.g., address, email), the old value is
simply overwritten with the new value. This approach does not keep track of historical
values and ensures that only the latest data is stored for each record.
Steps to Implement SCD Type 1:
1. Read the existing data from the Delta table.
2. Read the incoming updated data (e.g., from a CSV or streaming source).
3. Perform a merge operation using Delta Lake's MERGE function to update the existing
records where there is a matching customer_id. If the data is updated, the address or
email fields will be overwritten.
Code Example:
Explanation:
• DeltaTable.forPath: This loads the existing Delta table where customer information is
stored.
• merge(): The merge operation is used to compare the incoming data (df_incoming)
with the existing data in the Delta table (delta_table).
o whenMatchedUpdate: If there is a match on customer_id, the old values of
name, email, and address are overwritten with the new values from
df_incoming.
o whenNotMatchedInsert: If the customer_id does not exist in the Delta table,
the new record is inserted.
• This ensures that for each customer_id, the latest address, email, and name are
stored, and older records are updated with the new information.
SCD Type 1 Characteristics:
• Data Overwriting: The old values are overwritten by the new ones.
• No Historical Data: Only the latest information is stored for each customer_id.
• Simple Logic: SCD Type 1 is simple to implement and does not track historical
changes in the data.
Use Case:
This approach works well in scenarios where the business is interested only in the current
state of data and does not need to track the history of changes. For example, updating a
customer's address or contact details where only the most recent information is relevant for
reporting or analytics.
2. Vacuum the Table: Vacuum the Delta table to remove old versions of data and free
up storage.
3. Partition the Data: If not already done, partition the Delta table by a column
frequently used in queries (e.g., device_id or timestamp).
2. Coalesce for Writes: Use coalesce to reduce the number of output files when writing
data, especially if you have a small number of partitions.
3. Optimize Read Performance: When reading Parquet files, specify the schema
explicitly to avoid schema inference overhead.
3. Unpersist when Done: Always unpersist the DataFrame when you are done with it to
free up resources.
2. Use Partitioning: Partition the data when writing to a Delta table to speed up query
performance and reduce file sizes.
3. Use Merge for Incremental Loads: If you're performing incremental loads, use the
MERGE statement to efficiently update only the changed data.
Alternatively, use coalesce if you want to combine small files into fewer partitions after
reading:
This reduces the overhead of many small file reads, optimizing the processing.
2. Increase Parallelism with File Splitting: Spark can automatically parallelize the
reading of files, but for some file formats (like Parquet or ORC), you can control the
spark.sql.files.maxPartitionBytes and spark.sql.files.openCostInBytes to influence how
files are split and read in parallel.
This will help in splitting larger files into smaller, manageable chunks for parallel processing.
3. Use Delta Lake for Managing Small Files: If the small files are being generated over
time or come from multiple streams, you can store them in a Delta table, which
automatically manages file sizes and optimizes reads.
Delta tables handle small file consolidation under the hood when running operations like
OPTIMIZE and VACUUM. Delta tables are optimized for large-scale processing and ensure
that small files are compacted automatically.
4. File Format Optimization: Consider using more efficient file formats like Parquet or
Delta for storage, as these formats are optimized for performance, particularly when
reading large datasets. Additionally, using partitioning and compression within these
formats can further improve performance.
5. Avoid Schema Inference: Schema inference can add unnecessary overhead when
reading multiple small files. Instead, explicitly define the schema to speed up the
reading process.
6. Batching Small Files: If the small files are frequently updated or written in batches,
consider batching them into larger files during the write operation, which can help in
reducing the number of files read during the next job execution.
7. Cluster Configuration Adjustments: Ensure that your cluster has enough executors to
handle the read load effectively. You can also adjust the number of executors and
cores depending on the size of your dataset.
Summary: When handling multiple small files in Databricks, the key to optimization is
reducing the overhead of reading individual small files. Strategies like coalescing or
repartitioning data, using Delta Lake, increasing parallelism through file splitting, and
avoiding schema inference can significantly improve performance. Additionally, optimizing
cluster configurations and using efficient file formats like Parquet can help in further
optimizing the read process.
Choosing the Right Partition Count: For large files, the number of partitions should be
chosen to avoid memory overflow or excessive small partitions. Typically, partition sizes
should be between 100MB and 1GB. In this case, 100 partitions for 1 TB of data might be
appropriate depending on the executor memory.
3. Define Schema Explicitly to Avoid Schema Inference Overhead:
o Avoid schema inference: Inferring the schema for a large CSV file can cause
performance overhead. If you know the schema of the file, define it explicitly.
4. Adjust CSV Options for Optimized Reading:
o Multi-Line CSV Files: If the CSV file is multi-line or contains embedded quotes,
you can configure Spark's CSV reader to handle these cases.
Summary:
To read a large 1 TB CSV file efficiently in Databricks:
1. Consider converting the file to a more optimized format like Parquet or Delta.
2. Repartition the file to ensure parallelism and distribute work evenly across the
cluster.
3. Avoid schema inference by defining the schema explicitly.
4. Tune CSV read options based on the file's structure (e.g., multi-line, quotes).
5. Increase parallelism and memory usage through appropriate configurations for
better throughput.
6. Leverage broadcast variables for small reference data joins to avoid unnecessary
shuffling.
7. Cache or persist data if you need to perform multiple transformations.
By following these strategies, you can efficiently read and process a 1 TB CSV file, ensuring
minimal performance overhead and utilizing cluster resources optimally.
o Choosing the Right Partition Count: The ideal number of partitions depends on
the file size, the number of available executors, and the cluster’s capacity.
Typically, aim for a partition size between 100MB and 1GB to prevent small files
from causing overhead.
3. Increase Parallelism and Optimize Spark Settings:
o Increase Number of Executors and Cores: Ensure your cluster is appropriately
sized for the task. Increase the number of cores and executors for parallel
processing.
o Adjust Shuffle Partitions: Tuning shuffle partitions (spark.sql.shuffle.partitions)
helps to optimize the performance of operations like joins or aggregations that
occur after reading the data.
o Configure File Partitioning: If you are reading a large file from distributed
storage like AWS S3 or Azure Blob Storage, ensure that
spark.hadoop.fs.s3a.maxConnections and similar configurations are optimized
for high-throughput file systems.
4. Avoid Schema Inference (Explicit Schema Definition):
o Define the Schema: If possible, avoid inferring the schema from the text file as
this adds overhead. Define the schema explicitly before reading the data to
optimize the reading process.
5. Optimize File Reading with wholeTextFiles (for Smaller Files or Files in Multiple
Parts):
o If the file is split into smaller parts (e.g., multiple .txt files in a directory), you
can use wholeTextFiles to load all the files into a single RDD, which may help
reduce overhead if the data is not too complex.
o However, for a single large file (1 TB), this method is generally not
recommended, as it will still lead to inefficient reading.
6. Consider Data Skew and Caching (If Necessary):
o Caching: If you're performing iterative operations on the data, consider using
cache() or persist() to store intermediate results in memory to speed up
subsequent queries.
5. Avoid Skewed Data: If one DataFrame has skewed data, you can add a "salting"
technique to distribute keys more evenly.
6. Adjust Spark Configurations: For large-scale joins, ensure the Spark session is tuned with
adequate resources.
6. Cache Intermediate Results: If the joined DataFrame will be reused multiple times,
cache or persist it to avoid re-computation.
• Spark skips the shuffle step because it knows the data is already distributed and
sorted by customer_id.
• The join is faster and more efficient.
Basic Scenarios
135.Scenario 1: Reading Simple API Data
Question:
How do you read data from a REST API that provides a single JSON response?
Answer:
Use Python’s requests library to fetch the API response and convert it into a Spark
DataFrame.
Code Example:
143. Creating Dynamic Columns with Variable Size of Columns in PySpark DataFrame
Question:
How can you create dynamic columns in a PySpark DataFrame when the number of columns
varies across rows?
Answer:
To handle dynamic column creation in a PySpark DataFrame with a variable number of
columns, follow these steps:
1. Read the text data as a single column.
2. Split each row based on a delimiter and create a column for each value in the row.
3. Adjust the column count dynamically by creating new columns only when necessary.
Code Example:
144. Scenario: How to add partitionId in a DataFrame in PySpark?
Answer:
You can add the partitionId to a DataFrame in PySpark by using the spark_partition_id()
function. This function returns the partition ID of each row in the DataFrame. You can use
this function with withColumn to create a new column containing the partition IDs.
Here’s an example of how to do this:
Requirements:
1. Group the dataset by the job column.
2. Count the occurrences of each job.
3. Return the result in the form of a dictionary where the key is the job and the value is
the count of occurrences for each job.
Answer:
To solve this, we can use PySpark to group the data by the job column and then count the
occurrences. Finally, we will collect the results and convert them into a dictionary.
Explanation:
• The result is a dictionary where the key is the job title (e.g., "Engineer") and the value
is the count of occurrences (e.g., 3).
• Engineer appears 3 times in the dataset.
• Manager appears 2 times.
• Analyst appears 2 times.
156. Question: You have a customer dataset and an order dataset. You are required to
identify all the customers who have not placed any orders. These customers may be present
in the customer table but not in the order table. You need to return a list of customers who
do not have any corresponding order records.
Datasets:
1. Customer Table (customers):
Explanation:
• Left Anti Join: The left_anti join returns all records from the left DataFrame
(customers_df) where there is no matching record in the right DataFrame (orders_df)
based on the customer_id.
• In the given example, customers David and Eve do not have any corresponding
records in the orders table, so they are returned in the result.
157. Question: You have a sales dataset with the product sales data for multiple years. Your
task is to find the products whose sales have increased every year. Additionally, you need to
identify the specific years when the sales increased.
Datasets:
1. Sales Table (sales):
Requirements:
1. Identify products where total sales have increased year by year.
2. Also Give the year in which it increased?
Answer:
To solve this, we need to:
1. Group the data by product_id and sort it by year.
2. Compare the total sales of each product across years and check if they have
consistently increased.
3. Filter the products where sales have consistently increased.
Solution 1: find products which increased year by year
Solution 2: Year which it is increased
Note: above one is bit complex try doing with below solution (2nd way but different solution
we are doing here decrease in sale you can try for increase in sales)
158. Scenario: You have a sales dataset containing the total sales for different products over
multiple years. Your task is to find products where the sales decreased in a specific year
compared to the previous year. Specifically, you need to identify the years when each
product experienced a decrease in sales and output those products along with the years
they experienced the decrease.
Solution:
We will use the lag() function in PySpark to compute the previous year's sales for each
product. By taking the difference between the current year's sales and the previous year's
sales, we can identify the years when the sales decreased. The years with a negative
difference will indicate a decrease in sales.
159. You are provided with a dataset of movies containing details such as ID, name,
description, and rating. Your task is to:
1. Add more movie data to the dataset.
2. Identify movies with odd IDs that do not have "boring" in their description.
3. Return the result sorted by rating in descending order.
160.Scenario: Scenario-Based Question
You have a dataset with details about people buying fruits, the types of fruits, and their
weights. Your task is to generate a report showing:
1. The total weight of fruits brought by each person.
2. Their fruit names and corresponding weights are stored as lists.
Solution 2: Fruits and Weights as Lists
161. Scenario: You are given an employee dataset with details of their names and job titles.
Your task is to find the total headcount of employees for each job title. Give output in form
of dict.
Explanation
1. headcount_df.collect():
o Collects the grouped DataFrame as a list of rows.
2. Dictionary Comprehension:
o Loops through each row to create a dictionary with job as the key and
headcount as the value.
162. Scenario: You are provided with employee login data containing the employee ID
(emp_id), the date of login (log_date), and a flag (flag) indicating whether the employee
logged in on that day ('Y' for yes, 'N' for no).
The task is to identify employees who logged in on at least two consecutive days and
return their IDs along with the dates of consecutive logins.
Required output
Explanation
1. Lag Function:
o lag("flag"): Retrieves the flag from the previous day for the same employee.
o lag("log_date"): Retrieves the previous date for the same employee.
2. Filter Condition:
o Check if the current day's flag and the previous day's flag are both 'Y'.
3. Result:
o Extracted emp_id, log_date, and prev_log_date for consecutive login days.
163. Scenario: You are provided with employee salary data that includes the department
(dept_id), employee name (emp_name), and salary (salary).
The task is to find the highest and lowest salaried employee for each department and return
the results.
If asked to show as one output:
164. Scenario: You are given a dataset with product sales data for three consecutive
months: September, October, and November. The data is structured as follows:
Your task is to transform this dataset so that each month's sales are represented as a
separate row with an additional column indicating the month. The output should look like
this:
Answer:
In the stack function, the column names are listed twice because it requires both the labels
(names for the new rows) and the corresponding values. Here's the breakdown:I
Syntax of stack function
stack(N, label1, value1, label2, value2, ..., labelN, valueN)
• N: Number of columns to transform into rows.
• label: A string that represents the new row name (e.g., 'Sep_Sale', 'Oct_Sale',
'Nov_Sale').
• value: The actual column containing the corresponding values.
• 'Sep_Sale': Label for the new row indicating the month.
• Sep_Sale: Column name containing the sales value for September.
Why It's Done Twice
• The first occurrence ('Sep_Sale') becomes the value for the new Month column.
• The second occurrence (Sep_Sale) provides the data for the new Sales column.
• Dynamic Updates: If the number of months increases, update the stack function with
the new columns.
2nd approach using unpivot:
Task:
1. Missing Records: Find any records in the source table that are missing in the target
table.
2. Mismatched Records: Identify records where the data in the source and target tables
don't match (e.g., salary mismatch).
3. Extra Records: Find records in the target table that do not exist in the source table.
174. How do you write the output of a Structured Streaming query to a file (e.g., CSV
Parquet)?
Answer: You can write the result of a structured streaming query to a file in CSV, Parquet, or
other supported formats using the writeStream method.
This writes the data to Parquet files as new data arrives in the streaming DataFrame.
175. How would you read data from a file (e.g., CSV) in Structured Streaming and store it
in Delta format?
Answer: You can use readStream to read data from a file source, then write it to Delta
format using writeStream with the format delta.
176. How can you set up a structured streaming job to read data from an S3 bucket and
write it to a Delta table?
Answer: You can configure the readStream to read from an S3 bucket and output it to a
Delta table by using the S3 path as the source.
This job will read data continuously from the S3 bucket and write the processed data to a
Delta table.
177.How can you handle schema evolution while reading and writing data in Structured
Streaming?
Answer: When reading data from a source with evolving schemas (e.g., a Kafka topic or a
JSON file), you can enable schema inference or define a schema explicitly to handle schema
evolution. In the case of writing, Delta format inherently supports schema evolution.
Writing Data to Delta with Schema Evolution
To enable schema evolution when writing to Delta format, use the mergeSchema option.
When a new column is added to the input data, Delta will automatically update the schema
to reflect the change.
178. How can you write streaming data to a relational database (e.g., Azure SQL
Database)?
Answer: To write streaming data to a relational database, you can use the writeStream API
with the JDBC connector to the target database. For example, writing data to Azure SQL
Database.
This example reads text data from a socket, splits it into words, and outputs it to the
console.
181: How can you handle schema inference when reading from a dynamic JSON stream in
Structured Streaming?
Answer: You can enable schema inference when reading from a dynamic JSON source by
using .option("inferSchema", "true"). However, it’s often better to define a schema explicitly
to ensure consistency.
This ensures that each JSON record is read according to the specified schema, avoiding
potential issues with dynamic or changing JSON structures.
182. How can you check the status of a streaming query in Structured Streaming?
Answer: You can use the status, lastProgress, and isActive methods to check the status of a
streaming query:
1. status: Returns the current status of the query.
2. lastProgress: Returns the progress details, such as the number of records processed
and the time taken for each micro-batch.
3. isActive: Returns a boolean value indicating whether the query is still active.
These methods help in monitoring and debugging the query in real-time.
183. What is the purpose of the awaitTermination() method in Structured Streaming?
Answer: The awaitTermination() method is used to block the execution of the current
thread and wait for the completion or termination of a streaming query in Structured
Streaming. It ensures that the query continues to run until it is either manually stopped or
finishes processing.
• Without Timeout: It blocks indefinitely, waiting for the query to terminate (either
through a manual stop or a failure).
• With Timeout: You can also specify a timeout in milliseconds. If the query doesn't
terminate within the given period, it will return False.
Use Case: This method is helpful when running a streaming query in a standalone script or
Databricks notebook, where you want to ensure that the query keeps running until
manually stopped or completed.
183. How does watermarking work in Structured Streaming in Databricks?
Answer: Watermarking works by specifying a maximum allowed delay in event time for late
data. When you define a watermark, Spark uses it to decide when to allow late data in
streaming queries and when to process the results based on the event time.
This ensures that only data that is within 5 minutes of the latest event time is included, and
any data beyond this is considered late and ignored for aggregation.
Key points:
• Replace data_20250102.csv with the specific file or use a dynamic pattern like
data_*.csv to load multiple files.
• Ensure that the Delta table schema matches the incoming file schema.
185. Scenario: Handling schema evolution
Question:
A new column is added to the incoming CSV files, but your Delta table does not yet include
this column. How do you handle this schema evolution using COPY INTO?
Answer:
You can enable schema evolution by adding the mergeSchema option to the Delta table.
Here’s an example:
Key points:
o Enable schema evolution before running the COPY INTO command.
o Ensure the new column's data types are compatible with Delta.
186. Scenario: Data deduplication during the load
Question:
The incoming files may have duplicate rows. How can you ensure only unique rows are
loaded into the Delta table using COPY INTO?
Answer:
You can use a staging table to load data first, remove duplicates, and then upsert into the
main Delta table:
• Key points:
o Use a staging Delta table for temporary storage.
o Perform a MERGE operation with deduplication logic.
187.Scenario: Error handling during file ingestion
Question:
Some files have missing columns or incorrect data types. How do you skip problematic files
while using COPY INTO?
Answer:
You can enable error handling by setting the badRecordsPath option in the COPY INTO
command:
• Key points:
o Problematic files or records are moved to the specified badRecordsPath.
o This allows you to inspect and fix errors without failing the entire load.
188. Scenario: Partitioning the Delta table
Question:
How can you optimize data insertion by partitioning the Delta table while using COPY INTO?
Answer:
Partition the Delta table by a specific column, such as region or date, to improve query
performance:
• Key points:
o Ensure the Delta table is created with the PARTITIONED BY clause.
o Partitioning helps with efficient data querying and storage.
189. Scenario: Ingesting files from a cloud storage location
Question:
You want to continuously ingest new JSON files from an Azure Data Lake Storage (ADLS)
Gen2 container into a Delta table. How can you set up Auto Loader to handle this?
Answer:
You can use the Auto Loader with a cloudFiles source to automatically process new files.
• Key points:
o Auto Loader automatically detects new files in the input path.
o The checkpoint location ensures fault tolerance and tracks progress.
o Schema inference is supported, but it’s recommended to define the schema
explicitly for performance.
190.Scenario: Schema evolution
Question:
A new column is added to the incoming JSON files. How do you ensure that the new schema
is handled seamlessly?
Answer:
Enable schema evolution by setting the cloudFiles.schemaLocation option and
mergeSchema option in the Delta table.
• Key points:
o Auto Loader tracks schema changes in the cloudFiles.schemaLocation.
o The mergeSchema option updates the Delta table schema automatically.
191.Scenario: Handling duplicate files
Question:
Your cloud storage might occasionally receive duplicate files. How can you ensure that Auto
Loader processes each file only once?
Answer:
Auto Loader tracks file processing using file metadata, preventing duplicate ingestion.
• Key points:
o The cloudFiles.includeExistingFiles option ensures only new files are processed.
o Use checkpointing to track and avoid reprocessing.
192. Scenario: Triggering batch intervals
Question:
You want to process files in micro-batches instead of continuously streaming. How do you
configure Auto Loader for this?
Answer:
Use the trigger option in the writeStream method.
• Key points:
o Files are processed every 10 minutes with the trigger setting.
o Use this for scenarios where latency can be tolerated to optimize resource
usage.
193.Scenario: Data deduplication
Question:
How can you ensure duplicate rows are not inserted into the Delta table during ingestion?
Answer:
Use the dropDuplicates function with a unique identifier column.
• Key points:
o Deduplication ensures no duplicate records are written to the Delta table.
o Choose appropriate columns for identifying unique rows.
194.Scenario: Error handling
Question:
Some incoming files contain corrupt or invalid records. How can you skip such records and
still process valid ones?
Answer:
Use the badRecordsPath option in Auto Loader.
Key points:
o Invalid records are moved to the specified badRecordsPath for later inspection.
o This ensures valid data is ingested without interruptions.
195. Scenario: Schema Evolution
Question:
New files ingested into your data pipeline contain additional columns not present in your
Delta table schema. How can you ensure these new columns are automatically added
during ingestion?
Answer:
Use the merge schema mode to allow schema evolution. Configure the
cloudFiles.schemaEvolutionMode to addNewColumns.
• Key points:
o addNewColumns automatically adds new columns while retaining existing data.
o Use cloudFiles.schemaLocation to store and manage schema updates.
196.Scenario: Schema Validation
Question:
Your team wants to ensure that incoming files strictly follow the existing Delta table schema
and fail ingestion if there are discrepancies. How can you enforce schema validation?
Answer:
Use the failOnNewColumns schema mode to enforce schema validation.
Key points:
• failOnNewColumns prevents ingestion if new columns are detected.
• This is useful when schema consistency is critical.
Question:
You have a table with PII data, and only authorized users should be able to access sensitive
columns like SSN and Phone Number. How can you enforce this in Unity Catalog?
Answer:
Use column-level permissions to restrict access to sensitive data.
Question:
Your organization needs to trace the lineage of data to identify how it flows across tables
and queries for debugging and governance purposes. How can Unity Catalog help?
Answer:
Unity Catalog automatically tracks data lineage across tables, queries, and workflows.
Question:
Some users in your organization need access to a table, but sensitive information like salary
should be masked for them. How can Unity Catalog help?
Answer:
Use dynamic data masking in Unity Catalog
Question:
You manage a financial dataset with sensitive columns like credit_score and loan_amount.
Regional managers should only see data for their region, and only authorized users can
access sensitive columns. How can you enforce both RLS and CLS?
Answer:
Combine Dynamic Views (for RLS) with Column Permissions (for CLS).
Scenario: You are the Unity Catalog administrator for your company's Azure Databricks
workspace. A new team, data-science-team, needs access to various objects within the
catalog prod_catalog. They should be able to:
You need to assign these privileges to the data-science-team group. Additionally, ensure
that all the privileges are properly granted and that you can verify them. Lastly, if the group
data-science-team no longer needs the CREATE TABLE privilege on prod_catalog.default,
you must revoke that privilege.
Explanation:
1. Grant Privileges:
o The first four SQL commands grant the required privileges to the data-science-
team group on the prod_catalog catalog, prod_catalog.default schema, and
the registered function prod.ml_team.prediction_model.
2. Verify Privileges:
o The SHOW GRANTS SQL commands are used to verify the privileges granted to
the group on the catalog, schema, and function.
3. Revoke Privilege:
o The REVOKE CREATE TABLE SQL command revokes the privilege to create
tables on the prod_catalog.default schema from the data-science-team group.
This code demonstrates how to manage privileges on Unity Catalog objects within
Databricks using SQL commands in a PySpark environment.
203. Scenario:
You have a schema old_schema in the Hive Metastore that contains both managed and
external tables. You need to:
1. Perform a dry run to check if all the tables in old_schema can be migrated to Unity
Catalog under the new schema new_schema in the main catalog.
2. Perform the actual sync of all eligible external tables from old_schema to
new_schema in Unity Catalog.
3. Change the owner of the newly synced tables to [email protected].
4. Ensure that any Hive managed tables stored outside Databricks workspace storage
are synced as external tables.
5. Finally, replace any tables in new_schema that already exist with the corresponding
tables from old_schema (if necessary).
Solution Code:
Explanation of Concepts Covered:
204. Scenario:
You are working in a data engineering team for a large e-commerce company. You are
tasked with managing access control to sensitive customer data in the Unity Catalog.
Specifically, you need to:
1. Secure access to a table named customer_data stored in the schema sales of the
ecommerce_catalog catalog.
2. Create a dynamic view in a secondary schema that hides the email column for non-
admin groups, only allowing admins to see the full email address.
3. Create a dynamic view in the same schema that filters rows based on specific
conditions, allowing only managers to see data for customers with a total spend
greater than $1,000,000 and non-managers to only see data for customers with a
total spend less than or equal to $1,000,000.
As part of your company's cloud modernization efforts, you need to migrate a managed
Delta table from the Hive metastore to Unity Catalog. This will allow you to take advantage
of Unity Catalog's advanced data governance and security features.
Steps:
Your company has various data sources, including cloud object storage and Kafka message
buses. You need to load this data into Delta Live Tables for further processing and analysis.
For cloud object storage, you'll use Auto Loader with Delta Live Tables to efficiently handle
file ingestion. For Kafka, you'll use streaming tables to ingest the real-time data.
Task 1: Load Data from Cloud Object Storage Using Auto Loader
You need to ingest customer and sales order data stored in CSV and JSON formats,
respectively, from cloud object storage into Delta Live Tables. This should be done
incrementally as new files arrive.
Steps:
Your company also uses a Kafka message bus for real-time data streaming. You need to
configure a Delta Live Table to ingest data from Kafka.
Steps:
• Use external locations if your Delta Live Tables pipeline uses Unity Catalog.
• Set up file notifications to ensure proper resource cleanup when using Auto Loader.
• For Kafka streaming, optimize your cluster with continuous execution and enhanced
autoscaling for better performance.
207. Scenario: Implementing Slowly Changing Dimensions (SCD) in Delta Live Tables
In this scenario, you will use Delta Live Tables to manage Slowly Changing Dimensions
(SCD) for a users dataset based on change data feed (CDF) source data. You'll process two
types of SCD updates:
The change events will include operations like INSERT, UPDATE, DELETE, and TRUNCATE.
The examples below show how to implement SCD Type 1 and Type 2 processing in Delta
Live Tables pipelines.
SCD Type 1 simply overwrites old records with the most recent ones. In the case of the
UPDATE operation, the previous record is overwritten by the new one. DELETE and
TRUNCATE operations are handled by removing the records as per the change event.
• The APPLY CHANGES INTO statement applies changes from the CDF source table
(cdc_data.users) into the target table.
• It uses DELETE and TRUNCATE operations based on the operation field.
• The STORED AS SCD TYPE 1 ensures that only the latest version of each record is
stored (overwrites any previous versions).
Explanation:
• The APPLY CHANGES INTO statement applies changes from the CDF source table
(cdc_data.users) into the target table.
• It uses DELETE operations and processes the sequence by the sequenceNum to
handle out-of-order events.
• The STORED AS SCD TYPE 2 ensures that historical changes are tracked by adding a
new version of the record for every update, instead of overwriting the existing record.
After processing SCD Type 2, the target table will contain:
In the case of userId = 125, a new record is added with the updated city Guadalajara and
marked as current_flag = Y. The previous record with city Tijuana is kept in the table with
current_flag = N to track historical data.
208. Scenario: Periodic Snapshot Processing with SCD Type 2
In this scenario, periodic snapshots of a table stored in a Delta Lake are ingested. The goal is
to implement SCD Type 2 processing, where changes between snapshots are tracked by
maintaining historical versions of records. This example demonstrates how Delta Live Tables
processes and writes the results of the periodic snapshots into a target table.
Step 1: Define the Source Data
The source data (mycatalog.myschema.mytable) contains records that are periodically
updated. For each snapshot timestamp, new records may be added, and existing records
may be updated.
Snapshot at 2024-01-01 00:00:00:
Explanation:
1. Record for Key = 1: The value a1 is inserted at the first timestamp and remains
unchanged, so the __START_AT is set to 2024-01-01 00:00:00, and the __END_AT is
set to 2024-01-01 12:00:00 (the timestamp of the next snapshot).
2. Record for Key = 2: The value a2 appears in the first snapshot and is replaced by b2 in
the second snapshot. The first version of Key = 2 (a2) is recorded with __START_AT set
to 2024-01-01 00:00:00 and __END_AT set to 2024-01-01 12:00:00. The new record
for Key = 2 (b2) is added with __START_AT set to 2024-01-01 12:00:00 and __END_AT
set to null since it's the latest record.
3. Record for Key = 3: The value a3 appears in the second snapshot with no previous
history, so it is inserted with __START_AT set to 2024-01-01 12:00:00 and __END_AT
set to null.
209. Scenario: Real-Time Sales Analytics Pipeline
Business Requirement: The company needs to analyze real-time sales data from various
stores and provide insights into total sales by region and product category. The pipeline
should be designed to:
• Ingest raw sales data continuously (streaming).
• Join this data with static customer information to identify customer regions and
product categories.
• Perform real-time aggregation to compute total sales per region and category.
• Store final results for further analysis.
Step 2: Join with Static Customer Information (Silver Table - Streaming Table)
After ingesting the raw data, we join it with static customer information, which provides
customer_id, region, and category (from a static customers table). This enables us to enrich
the sales data with geographical and product category information.
• Streaming Table (Silver Table): Performs the join operation, enriching the sales data
with customer information.
• The static customers table is not changing over time, so it remains the same
throughout the processing.
Step 3: Perform Aggregation for Real-Time Insights (Gold Table - Materialized View)
Now, the pipeline computes total sales by region and product category. This aggregation is
the final result of the pipeline and is stored in a materialized view. The materialized view will
be periodically refreshed to reflect the latest aggregated results.
• Materialized View (Gold Table): Aggregates total sales by region and product
category, and stores the results for querying and further analysis.
This table is used only within the pipeline to calculate discounts and is not published to the
final schema.
Final Result:
The pipeline processes real-time sales data and aggregates it continuously to provide
insights like total sales by region and product category. Here's how the tables fit into the
pipeline:
1. Bronze Table (Streaming Table): Raw sales data ingestion.
2. Silver Table (Streaming Table): Enriched data with customer region and category.
3. Gold Table (Materialized View): Final aggregated results for total sales by region and
category.
4. Temporary Tables: For intermediate calculations (like discounts).
Benefits of This Approach:
• Streaming Tables allow real-time ingestion and processing of sales data.
• Materialized Views efficiently store and aggregate the final results, reducing
computation time for downstream queries.
• Temporary Tables enable complex transformations and intermediate steps without
affecting the final schema.
• By combining these elements, the pipeline can efficiently process and provide insights
into sales data with minimal re-processing, ensuring high throughput and low latency.
210. Scenario: Customer Order Data Validation
In this scenario, you are processing customer order data from multiple sources. The data
includes information about the customer, order date, product, quantity, and price. You need
to ensure that:
1. The order_date is not earlier than January 1, 2020.
2. The product_id and product_name are not NULL.
3. The quantity ordered is always greater than zero, and the price is also greater than
zero.
The pipeline processes customer orders, and you need to apply the following validation:
1. Drop records where the order_date is before January 1, 2020.
2. Drop records where product_id or product_name are missing.
3. Fail the pipeline if the quantity or price is less than or equal to zero.
Explanation of Actions:
1. Valid order_date: This checks if the order_date is not earlier than January 1, 2020. If
it fails, it will be ignored.
2. Valid product_id and product_name: Ensures that both the product_id and
product_name fields are not NULL. If either is missing, the record will be dropped.
3. Valid quantity and price: Ensures that both quantity and price are greater than zero.
If this condition is violated, the pipeline will fail immediately, and no updates will be
applied until the issue is resolved.
I kindly request you to share your feedback as you go through the questions. Your insights
will help me tremendously in improving the material. Also, feel free to ask for additional
resources or clarification wherever needed. Your journey to becoming a Data Engineering
expert is just a few steps away!