SA coding assessment: Data
Engineering, Airlines
Version 2022.01
What you'll do:
We provide the dataset. You will load it into dataframes, and perform
some data cleansing and transformation tasks.
You will answer a series of questions to show insights from the data.
There are also some written-answer questions.
We care about the process, not the result. I.e., we're looking for proper
use of data engineering techniques and understanding of the code you've
written.
This Data Engineering section is scored out of 55 points.
# This folder is for you to write any data as needed. Write access is restricted elsewhere. You can
always read from dbfs.
aws_role_id = "AROAUQVMTFU2DCVUR57M2"
user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
userhome = f"s3a://e2-interview-user-data/home/{aws_role_id}:{user}"
print(userhome)
The following questions use the airlines dataset located at dbfs:/interview-
datasets/sa/airlines. All airlines questions expect answers that use the
Dataframes API (Scala or Python). SQL only answers are accepted but may
receive reduced points. We will not accept answers that use the RDD API.
Question : 1
Write code that uses the DataFrame API to read in the entire airlines data
set with clearly named columns.
# Please provide your code answer for Question 1 here.
# Reading Schema - While loading data from /airlines the header was not detected properly by
default with inferSchema & header switch so this workaround to fetch the schema
header = spark.read.csv("/interview-datasets/sa/airlines/part-00000", header="true")
airline_sch = header.schema
airlineDf = spark.read.csv("/interview-datasets/sa/airlines/", inferSchema="false", header="true",
schema=airline_sch)
airlineDf.cache()
display(airlineDf)
Written Answer
Explain your answer to Question 1 here.
I have used spark dataframe API method to read the airlines CSV file.
Since the part-00000 has the header record and while loading records in
parallel to dataframe the Header is not always the firect record so I used
an alternate step to read the schema from part-00000 and then use the
schema to load the dataset with named columns with swtich Header as
True to eliminate the header record from part-00000.
I have cached the filtered df (airlineDf) for frequent use in below questions
as a best practice and for better performance
Airlines Question 2 [5 Points]
How many unique airlines are present in this dataset?
# Please provide your code answer for Question 2 here
from pyspark.sql import functions as F
# Get distinct count of UniqueCarrier
unqCar = airlineDf.select(F.countDistinct("UniqueCarrier"))
display(unqCar)
Written Answer
I am using Dataframe method Select and using countDistinct function
to find no. of unique UniqueCarrier from the filtered dataset
Storing the output into new dataframe unqCar.
Airlines Question 3 [10 Points]
Which airline is delayed on departure most often? Show a bar graph of the
top five most delayed airlines.
# Please provide your code answer for Question 3
from pyspark.sql import functions as F
from pyspark.sql import types as T
airdf = airlineDf.withColumn("iDepDelay", F.col("DepDelay").cast(T.IntegerType()))
delayedAirline = airdf.filter(F.col("iDepDelay") >
0).groupBy("UniqueCarrier").agg(F.count("iDepDelay").alias("NumDelays")).sort(F.desc("NumDelays"
)).take(5)
display(delayedAirline)
Written Answer
I have followed below steps to derive the result:
Filtered the airlines that has departure delay records by checking the
DepDelay field is greater than 0.
Aggregating number of occurences an airline has delayed based on
the UniqueAirline field
Sort NumDelays in descending order and take the first 5 items into
the result dataframe - delayedAirline
Display using Bar chart for 5 carriers and respective delay occurences
Airlines Question 4 [15 Points]
Part a: What was the average arrival delay per airline?
Part b: Also, for each airline, on average did flights arrive early or late?
Calculate the average arrival delay per airline. Then, add a column to this
new dataframe (containing the grouped averages) that contains the string
"Late" if the average arrival for that airline arrive >15 minutes late,
"Early" if the average arrival is <0 minutes late, and "On-time" if the
average arrival is between 0 and 15 minutes late.
To add the additional column, use a Spark UDF. Additionally, make sure to
filter out or fill in null values in your dataframe (if there are any) prior to
applying the UDF.
# Please provide your code answer for Question 4
# Part a:
arrDelaydf = airlineDf.withColumn("iArrDelay", F.col("ArrDelay").cast(T.IntegerType()))
avgArrival = arrDelaydf.groupby("UniqueCarrier").agg(F.avg("iArrDelay").alias("AvgArrDelay"))
display(avgArrival)
# Part b
from pyspark.sql.functions import udf
@udf("string")
def groupedAvg_udf(a):
if a >= 15:
return 'Late'
elif a >= 0 and a < 15:
return 'On-time'
else:
return 'Early'
groupedArr = avgArrival.withColumn("groupedAvgArrival", groupedAvg_udf("AvgArrDelay"))
display(groupedArr)
Written Answer
Part: A
Convert the string datatype field ArrDelay into Integer
Calculate avg on Arrival Delay per UniqueCarrier
Part: B
Declaring an UDF to take input as the Avg Arrival Delay and applying
the logic to find Late, On-time or Early
Creating a new Dataframe and adding a new column using the UDF
created to return Grouped Avg Delay string
Airlines Question 5 [15 Points]
What file format is airlines data stored in, and was this the most optimal
format for the questions asked above?
What format would you store this data in if you frequently queried only
the UniqueCarr and CancellationCode columns?
What if you frequently read entire rows of the dataset?
Note: Cite any sources used. You do not need a code answer for this
question.
Written Answer
File format was CSV; Looking at the data and operations performed I
think it would be good to store them in parquet file format
If I am supposed to use only few columns for the analysis then I prefer
use parquet only since it is good while performing operations on
column level especially when the data volumn is huge
If I am supposed to process entire row, then i prefer to choose ORC file
format since it gives better read performance on row level since the
data are stored/processed at stripe level and options to skip rows
based on the stats stored in the footer of each stripes.
I have written based on my experience but usually I refer Spark
documentation and databricks learning materials while researching for
better ways of designing solutions.
Airlines Question 6 [5 Points]
If you needed to keep multiple versions of this dataset, why might you use
the Delta format to do this efficiently?
Written Answer
Delta lake format provides versioning options either by version or by
timestamp which ease the complexity of managing multiple versions of a
dataset. Also, it provides reliable ACID transactions and better metadata
management options.
In this use case I would have used a single table to restore old data or
merge multiple datasets using delta format. It provides better cache
capabilities for frequently used queries. Converting from parquet or Delta
comes available out of the box from Databricks which simplifies the
funtional dependencies and common across languages.
Most of the questions were on single or few column based so I would use
Delta or Parquet format to store and process the data instead of from the
CSV format
SA coding assessment: Data
Engineering, Baby Names
Version 2022.01
What you'll do:
We provide the dataset. You will load it into dataframes, and perform
some data cleansing and transformation tasks.
You will answer a series of questions to show insights from the data.
There are also some written-answer questions.
We care about the process, not the result. I.e., we're looking for proper
use of data engineering techniques and understanding of the code you've
written.
This Data Engineering section is scored out of 50 points.
Setup Env
1
# This folder is for you to write any data as needed. Write access is restricted elsewhere.
You can always read from dbfs.
2
aws_role_id = "AROAUQVMTFU2DCVUR57M2"
3
user =
dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
4
userhome = f"s3a://e2-interview-user-data/home/{aws_role_id}:{user}"
5
print(userhome)
s3a://e2-interview-user-data/home/
AROAUQVMTFU2DCVUR57M2:[email protected]
Using Baby Names Data Set
This dataset comes from a website referenced by Data.gov. It lists baby
names used in the state of NY from 2007 to 2018.
Run the following two cells to copy this file to a usable location.
%scala
2
import java.net.URL
3
import java.io.File
4
import org.apache.commons.io.FileUtils
5
6
val tmpFile = new File("/tmp/rows.json")
7
FileUtils.copyURLToFile(new
URL("https://health.data.ny.gov/api/views/jxy9-yhdk/rows.json?
accessType=DOWNLOAD"), tmpFile)
import java.net.URL import java.io.File import
org.apache.commons.io.FileUtils tmpFile: java.io.File = /tmp/rows.json
# https://docs.python.org/3/library/hashlib.html#blake2
from hashlib import blake2b
user =
dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().a
pply('user')
h = blake2b(digest_size=4)
h.update(user.encode("utf-8"))
display_name = "user_" + h.hexdigest()
print("Display Name: " + display_name)
dbutils.fs.cp('file:/tmp/rows.json', userhome + '/rows.json')
dbutils.fs.cp(userhome + '/rows.json'
,f"dbfs:/tmp/{display_name}/rows.json")
baby_names_path = f"dbfs:/tmp/{display_name}/rows.json"
print("Baby Names Path: " + baby_names_path)
dbutils.fs.head(baby_names_path)
# Ensure you use baby_names_path to answer the questions. A bug in
Spark 2.X will cause your read to fail if you read the file from userhome.
# Please note that dbfs:/tmp is cleaned up daily at 6AM pacific
Baby Names Question 1 - Nested Data [15 Points]
Use Spark SQL's native JSON support to read the baby names file into a
dataframe. Use this dataframe to create a temporary table containing all
the nested data columns ("sid", "id", "position", "created_at",
"created_meta", "updated_at", "updated_meta", "meta", "year",
"first_name", "county", "sex", "count") so that they can be queried using
SQL.
Hint: you can use dbutils.fs.head(baby_names_path) to take a look at the
dataset before reading it in.
Suggested Steps:
1. Read in the JSON data
2. Pull all columns in the nested data column to top level, following the schema
specified above. There are built-in Spark SQL functions that will accomplish
this.
3. Create a temp table from this expanded dataframe using
createOrReplaceTempView()
# Please provide your code answer for Question 1 here
# dbutils.fs.head(baby_names_path)
# Loading JSON file into Dataframe
from pyspark.sql.functions import explode
spark.read.json(baby_names_path,multiLine=True).select(explode("data").
alias("BN")).createOrReplaceTempView("NameView")
sqlContext.sql("SELECT BN[0] AS sid, BN[1] AS id, BN[2] AS position,
BN[3] AS created_at, BN[4] AS created_meta, BN[5] AS updated_at, BN[6]
AS updated_meta, BN[7] AS meta, BN[8] AS year, BN[9] AS first_name,
BN[10] AS country, BN[11] AS sex, BN[12] AS count FROM
NameView").createOrReplaceTempView("babyNames")
display(sqlContext.sql("SELECT * FROM babyNames LIMIT 10"))
Written Answer
1. I have used native spark JSON file read method and using explode
method to extract the "data" node and creating a temp view
NameView
2. Using the NameView, I have extracted nested column into named
temp view using createOrReplaceTempView()
3. Finally displaying sample 5 records for review
Baby Names Question 2 - Multiple Languages [10 Points]
Using the temp table you created in the question above, write a SQL
query that gives the most popular baby name for each year in the
dataset. Then, write the same query using either the Scala or Python
dataframe APIs.
Code Answer
# Please provide your code answer for Question 2 here. You will need
separate cells for your SQL answer and your Python or Scala answer.
# SQL Answer
sumBabyNames = spark.sql("SELECT year, first_name, SUM(count) AS
sum_name FROM babyNames GROUP BY year, first_name ORDER BY
first_name")
display(sumBabyNames)
sumBabyNames.createOrReplaceTempView("bNames")
popNames = spark.sql("SELECT year,first_name, sum_name FROM
(SELECT *, DENSE_RANK() OVER(PARTITION BY year ORDER BY sum_name
DESC) AS rank FROM bNames) WHERE rank=1 ORDER BY year")
display(popNames.show())
# Python code
from pyspark.sql.window import *
from pyspark.sql.functions import *
nameDf = spark.sql("select year, first_name, sum(count) as scount from babyNames group by year,
first_name")
popularName = nameDf.withColumn("rank",
dense_rank().over(Window.partitionBy("year").orderBy(desc("scount")) )).where(col("rank")==1).sel
ect('year','first_name','scount').orderBy("year").show()
Written Answer
Please provide your brief, written description of your code here.
Baby Names Question 3 - Performance [10 Points]
Are there any performance considerations when choosing a language API
(SQL vs Python vs Scala) in the context of Spark?
Are there any performance considerations when using different data
representations (RDD, Dataframe) in Spark? Please explain, and provide
references if possible. No code answer is required.
visitors_path = "/interview-datasets/sa/births/births-with-visitor-data.json"
## Hint: the code below will read in the downloaded JSON files. However, the xml column needs to
be given structure. Consider using a UDF.
# Read the births-with-visitor-data.json file into a dataframe and parse the nested XML fields into
columns and print the total record count
df = spark.read.option("inferSchema", True).json(visitors_path)
import xml.etree.ElementTree as ET
import pyspark.sql.functions as F
@F.udf('array<struct<id:string, age:string, sex:string>>')
def parse_visitors(s):
root = ET.fromstring(s)
return list(map(lambda x: x.attrib, root.findall('visitor')))
df.select("year","county","first_name",
F.explode(parse_visitors('visitors')).alias('visitors')).select('year','county','first_name','visitors.*').crea
teOrReplaceTempView("babyVisitors")
display(spark.sql("select count(*) from babyVisitors"))
bVisitors = spark.sql("select * from babyVisitors")
bVisitors.show(1)
bVisitors.printSchema()
## Hint: check for inconsistently capitalized field values. It will make your answer incorrect.
# Find the county with the highest average number of visitors across all births in that county
highAvgVisitors = bVisitors.groupBy("county").agg(F.avg("id").alias("avgVis"))\
.select(F.max(F.struct("county", "avgVis")).alias("v")).select("v.*")
display(highAvgVisitors.show())
## Hint: check for inconsistently capitalized field values. It will make your answer incorrect.
# Find the average visitor age for a birth in the county of KINGS
bVisitors.filter(col("county") ==
'KINGS').groupBy(F.upper("county")).agg(F.avg("age").alias("avgAge")).show()
## Hint: check for inconsistently capitalized field values. It will make your answer incorrect.
# Find the most common birth visitor age in the county of KINGS
# When ignoring the inconsistent capitalized field county
mostComAge = spark.sql("SELECT county, age, count(*) AS comAge FROM babyVisitors WHERE
county = 'KINGS' \
GROUP BY county, age ORDER BY comAge DESC LIMIT 1")
display (mostComAge)
# When fixing the inconsistent capitalized field county by converting to UPPER case as standard
mostComAge = spark.sql("SELECT UPPER(county), age, count(*) AS comAge FROM babyVisitors
WHERE UPPER(county) = 'KINGS' \
GROUP BY UPPER(county), age ORDER BY comAge DESC LIMIT 1")
display (mostComAge)
#4 - Written Answer
In this baby visitor dataset the apply the below logic to find the most
common age in the county "KINGS" using Spark Sql a. Find count of Age
group by county and apply UPPER function to eliminate case sensitive
distribution of records b. Ordering the resultset in descending order and
pick the 1st element from the output c. Display the final result from the
dataframe