Azure Data Engineer End to End Project 1
In this data engineering project, we will learn all the in-demand tools and technology such as Azure data
factory, Azure databricks, Azure synapse analytics along with manage identities API connections and
many more.
Project Data Architecture:
Source - We going to use HTTP connection as our source, we could have easily use some manual
uploading of CSV files into data lake but for real time scenario we will be pulling data from the API’
For this Scenario it will be GitHub account and we will be pulling data directly from that API.
Architecture we are following is Medallion architecture that we follow in data engineering solution
We make out data travel through 3 different layers Bronze, Silver, Gold
Bronze Layer – In which we keep our data as it is that is available in source.
Silver Layer – In which we keep transformed data which is transformed by Azure DataBricks.
Gold Layer – After so many transformations and cleaning we will serve the data to stockholder, Data
analyst, Data scientist
Overview of Source Data
We are going to work on Adventure Works data
Calendar table – Just dates
Customer table – we have all info related to customer
Product category table – info about category of products
Product sub category table – info about sub category of products
Products table – info about product
Returns data table
Past 3 years of tables
Territories data
Past 3 years sales data, returns data are our fact table and we can build dimensions on other tables
Calendar, Customer, Products, Product category, Product sub category, Territories.
Why we need to perform dimensions? - because if we want to perform aggregations, if we want to
provide contextual information to our fact table we need dimensions. We will be just building a model by
keeping sales data in the center and we can say returns data in the center and lookups as our
dimension.
Create New resource Group
Tags in resource group – to categorise the resource grp
Add storage acc Datalake resource
Primary Service, we should select Azure Blob Storage or Azure DataLake Storage Gen 2
Redundacy – based on project
Create Azure data factory
Create Bronze, Silver, Gold container in storage account
Now we have to load our source to Bronze
Create one new pipeline in ADF and name the pipeline
Add copy activity to that pipeline and name that copy activity
We have to create linkedService and Dataset
First we will do for products file
URL for Products file
https://raw.githubusercontent.com/anshlambaoldgit/Adventure-Works-Data-Engineering-
Project/refs/heads/main/Data/AdventureWorks_Products.csv
this URL have 2 parts one is base URL and another one is relative URL
base url - https://raw.githubusercontent.com/
relative url - /anshlambaoldgit/Adventure-Works-Data-Engineering-
Project/refs/heads/main/Data/AdventureWorks_Products.csv
create linked service for http, provide base url while creating linked service
select authentication type as Anonymous
test connection
Create another linked service for Data Lake.
Select storage acc while creating linked service
Now go to copy activity, go to source, create dataset for http, select file format
Name the dataset, select created linked service for http
Provide relative URL
Now in create dataset for data lake and select file format
Name the dataset, select created linked service for datalake
Run the copy activity.
To Build dynamite pipeline
We just copied only products data – but it is not recommended we should copy all files using dynamite
pipeline.
Instead of passing information statistically we should pass dynamically
What information we should pass?
1. Relative URL (because we have different relative URL for different files)
2. Folder – in which we will save our data
3. File itself
Above 3 values will be changing in copy activity, so instead of hardcoding these 3 values, we will create 3
parameters we will keep on changing the values of these parameters in every iteration
In ADF, we have activity called Foreach activity to achieve this
Create new pipeline
Add copy activity, name the copy activity
In source, create one new common dataset for http all the source files, select already created linked
service for http, to set dynamic relative url click on open dataset in advance
Click on add dynamic content below the Relative url text box, add parameter and select that expression
Go to sink, create one new common dataset for all files paste in bronze, select already created linked
service for datalake, to set dynamic folder and file name click on open dataset in advance
Click on add dynamic content below folder and file, add parameter and select that expression
Add foreach activity, check in sequency in settings
We need to create an array in which we will passing the parameters in the form of dictionaries line 1st
element for 1st dictionary, 2nd element for 2nd dictionary - how we can do this? – we simply create json
file.
[
{
"p_rel_url" : "anshlamba03/Adventure-Works-Data-Engineering-
Project/main/Data/AdventureWorks_Product_Categories.csv",
"p_sink_folder" : "AdventureWorks_Product_Categories",
"p_sink_file" : "AdventureWorks_Product_Categories.csv"
},
{
"p_rel_url" : "anshlamba03/Adventure-Works-Data-Engineering-
Project/main/Data/AdventureWorks_Calendar.csv",
"p_sink_folder" : "AdventureWorks_Calendar",
"p_sink_file" : "AdventureWorks_Calendar.csv"
},
{
"p_rel_url" : "anshlamba03/Adventure-Works-Data-Engineering-
Project/main/Data/AdventureWorks_Customers.csv",
"p_sink_folder" : "AdventureWorks_Customers",
"p_sink_file" : "AdventureWorks_Customers.csv"
},
{
"p_rel_url" : "anshlamba03/Adventure-Works-Data-Engineering-
Project/main/Data/AdventureWorks_Product_Subcategories.csv",
"p_sink_folder" : "Product_Subcategories",
"p_sink_file" : "AdventureWorks_Product_Subcategories.csv"
},
{
"p_rel_url" : "anshlamba03/Adventure-Works-Data-Engineering-
Project/main/Data/AdventureWorks_Products.csv",
"p_sink_folder" : "AdventureWorks_Products",
"p_sink_file" : "AdventureWorks_Products.csv"
},
{
"p_rel_url" : "anshlamba03/Adventure-Works-Data-Engineering-
Project/main/Data/AdventureWorks_Returns.csv",
"p_sink_folder" : "AdventureWorks_Returns",
"p_sink_file":"AdventureWorks_Returns.csv"
},
{
"p_rel_url" : "anshlamba03/Adventure-Works-Data-Engineering-
Project/main/Data/AdventureWorks_Sales_2015.csv",
"p_sink_folder" : "AdventureWorks_Sales_2015",
"p_sink_file" : "AdventureWorks_Sales_2015.csv"
},
{
"p_rel_url" : "anshlamba03/Adventure-Works-Data-Engineering-
Project/main/Data/AdventureWorks_Sales_2016.csv",
"p_sink_folder" : "AdventureWorks_Sales_2016",
"p_sink_file" : "AdventureWorks_Sales_2016.cvsv"
},
{
"p_rel_url" : "anshlamba03/Adventure-Works-Data-Engineering-
Project/main/Data/AdventureWorks_Sales_2017.csv",
"p_sink_folder" : "AdventureWorks_Sales_2017",
"p_sink_file" : "AdventureWorks_Sales_2017.csv"
},
{
"p_rel_url" : "anshlamba03/Adventure-Works-Data-Engineering-
Project/main/Data/AdventureWorks_Territories.csv",
"p_sink_folder" : "AdventureWorks_Territories",
"p_sink_file" : "AdventureWorks_Territories.csv"
}
]
We are storing this json in datalake parameter folder (create parameter folder)
Add lookup activity – use of this activity? – if we want to use the output of a file if we want to just have a
look, we should use lookup activity it will just give us the output of the data.
We are going to get the output of json file using lookup activity
Name the lookup activity, add data set for this – select datalake, json file format, select linkedservice,
select file path.
In settings tab we can First row only one check box – we should uncheck this else it will read only first
value
Now connect lookup to Foreach
In Foreach, now add items in dynamic @activity('LookupGit').output.value
Cut the copy activity and paste inside the foreach
Fill dynamic names now
p_relative_url - @item().p_rel_url
p_sink_folder - @item().p_sink_folder
p_file_name - @item().p_sink_file
Click on Debug – run the pipeline
**************Now all the source were uploaded to bronze**************
Now we are going to use databricks
Create databricks
Launch databricks
Create cluster in compute
Now we should create service principal- Why service principal? How to create? – refer databricks notes
1. Create service principle application in Microsoft entra id
2. Create Secret for that service principle.
3. Assign Storage Blob Data Contributor role for created service principal app
Create folder in workspace, Create notebook within that workspace
Connect notebook to the cluster
Access datalake using Databricks - refer databricks notes
spark.conf.set("fs.azure.account.auth.type.spawprojectdatalake.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.spawprojectdatalake.dfs.core.windows.net",
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.spawprojectdatalake.dfs.core.windows.net",
"77231dd1-8890-43d3-91b6-8a12bd892ad5")
spark.conf.set("fs.azure.account.oauth2.client.secret.spawprojectdatalake.dfs.core.windows.net", "Ur-
8Q~X2LAbKdnTsaFTVdb_yqddjcReF57YuIbbB")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.spawprojectdatalake.dfs.core.windows.net",
"https://login.microsoftonline.com/4174a24f-4f27-4112-a180-0e4923932a83/oauth2/token")
Read Calender Data
df_cal = spark.read.format('csv')\
.option("header", "true")\
.option("inferSchema", "true")\
.load('abfss://
[email protected]/AdventureWorks_Calendar')
To display
df_cal.display()
Load all data like this
Sales we 3 years of data, instead of loading 1 by 1 we can mention AdventureWorks_Sales* in the load
URL. So that all 3 years data will be combined and load
First we are transforming date file data
2015-01-01
2015-01-02
2015-01-03
2015-01-04
Requirement:-
Dates are there in file like above, we want to create column called month, year
df_cal = df.withColumn('Month',month(col('Date')))\
.withColumn('Year',year(col('Date')))
df_cal.display()
After transformation: -
2015-01-01 1 2015
2015-01-02 1 2015
To save this in Silver
df_cal.write.format('parquet')\
.mode('append')\
.option("path","abfss://
[email protected]/AdventureWorks_Calendar")\
.save()
Next customer table transformation
Requirement: -
Prefixm, First name, Last name column is there, we are going to create full name column
There are two ways to do that
df_cus = df_cus.withColumn("Fullname",concat(col('Prefix'),lit(' '),col('FirstName'),lit('
'),col('LastName')))
Advanced way
df_cus = df_cus.withColumn('fullname',concat_ws(' ',col('Prefix'),col('FirstName'),col('LastName')))
write the customers data to silver
df_cus.write.format('parquet')\
.mode('append')\
.option("path","abfss://
[email protected]/AdventureWorks_Customer
s")\
.save()
Write the sub categories table to silver without any transformations.
Next have to transform Products file
Requirement: -
We should transform ProductSKU column instead of adding new column eg, if productSKU is HL-U509-R
in new column we should put HL
We should ProductName column instead of adding new column eg, if Product name is Mountane Bike
then we should put Mountain
df_pro = df_pro.withColumn('ProductSKU',split(col('ProductSKU'),'-')[0])\
.withColumn('ProductName',split(col('ProductName'),' ')[0])
Write to silver
df_pro.write.format('parquet')\
.mode('append')\
.option("path","abfss://[email protected]/AdventureWorks_Products")\
.save()
Next Returns file
No need to transform anything on Returns file, directly we can it to Silver
Next Territories
No need to transform anything on Territories file, directly we can it to Silver
Next sales file
Requirement: -
1. Stock Date column is Date format – we are going to change this to Time stamp date format
2. In OrderNumber column, We are going to replace T instead of S eg. SO61285 -> TO61285
3. We are going to multiply OrderLineTem column and OrderQuantity’
df_sales = df_sales.withColumn('StockDate',to_timestamp('StockDate'))
df_sales = df_sales.withColumn('OrderNumber',regexp_replace(col('OrderNumber'),'S','T'))
df_sales = df_sales.withColumn('multiply',col('OrderLineItem')*col('OrderQuantity'))
Sales Analysis
We are going to perform Group aggregation
How many orders we received everyday
df_sales.groupBy('OrderDate').agg(count('OrderNumber').alias('total_order')).display()
Write Sales to silver
df_sales.write.format('parquet')\
.mode('append')\
.option("path","abfss://[email protected]/AdventureWorks_Sales")\
.save()
If we want to make visualization, Click on + in result and click on visualization
Want to visualize Product category
df_procat.display()
By clicking on Save we can save this in notebook
We have transformed all data and uploaded to Gold
Last phase of project
Synapse Analytics --- for Data serve
Pick the Synapse analytics service from market place
Name the Synapse analystics
We need to provide for managed resource group same as like as databricks
We need to provide Workspace name
We need to have storage acc for synapse ( we can select our existing storage acc but instead we are
using create new option as we are loading our files in our storage acc) and file system as well --- It is
default storage.
Click on next----. we need to provide sql admin, passwords things
Next --- Next --- Review create
Top three layers of Synapse analytics
1. ADF - for pipelines
2. Spark cluster - for py spark coding
3. Data warehousing - for sql database, lake database
For this project we are going to use data warehouse
We need to access data in data lake by using synapse analytics for that we need to assign role – ref
synapse notes
Go to develop tab
Click on +
And select sql script
Create database
Click on Data tab in left pane, click on +, Click on SQL database
In synapse analytics we will get 2 option – serverless, dedicated
Go with serverless, name the database
Now go to sql script, see use db dropdown – we can see created db
Now we have to pick the data stored in silver layer
How to do that? – we have powerful function called openrowset() – it helps us to apply the abstraction
layer on data residing in data lake.
We should assign one role to ourself to access the data
1. Go to azure portal
2. Go to data lake
3. Go to Access control (IAM)
4. Click on Add -- Add role assignment
5. Search storage blob data contributor role and select
6. select member
7. select our mail id
8. Review + assign
Go back to synapse analytics sql script
Picking calender file data
SELECT * FROM
OPENROWSET(
BULK 'https://spawprojectdatalake.dfs.core.windows.net/silver/AdventureWorks_Calendar/',
FORMAT = 'PARQUET'
) as query1
How we can know that URL ? --- go to datalake → Silver → Calender file → open and copy URL
https://spawprojectdatalake.blob.core.windows.net/silver/AdventureWorks_Calendar/part-00000-tid-
7567393523100591135-1c3877cd-66a1-4927-bc38-e9d0f5443d78-44-1-c000.snappy.parquet
Only need to mention URL till AdventureWorks_Calendar as we are using separate folder for all files
In the URL we should change blog to dfs
We will create Views on top of this query, those view we will store in gold layer
I will create schema
CREATE SCHEMA gold;
Create views
-- Create View Calendar
-----------------------------
CREATE VIEW gold.calendar
AS
SELECT * FROM
OPENROWSET
(
BULK 'https://spawprojectdatalake.dfs.core.windows.net/silver/AdventureWorks_Calendar/',
FORMAT = 'PARQUET'
) as QUERY1
------------------------------------------------------------------------------------------------
-- Create View Customers
-----------------------------
CREATE VIEW gold.customers
AS
SELECT * FROM
OPENROWSET
(
BULK 'https://spawprojectdatalake.dfs.core.windows.net/silver/AdventureWorks_Customers/',
FORMAT = 'PARQUET'
) as QUERY1
------------------------------------------------------------------------------------------------
-- Create View Products
-----------------------------
CREATE VIEW gold.products
AS
SELECT * FROM
OPENROWSET
(
BULK 'https://spawprojectdatalake.dfs.core.windows.net/silver/AdventureWorks_Products/',
FORMAT = 'PARQUET'
) as QUERY1
------------------------------------------------------------------------------------------------
-- Create View Returns
-----------------------------
CREATE VIEW gold.returns
AS
SELECT * FROM
OPENROWSET
(
BULK 'https://spawprojectdatalake.dfs.core.windows.net/silver/AdventureWorks_Returns/',
FORMAT = 'PARQUET'
) as QUERY1
------------------------------------------------------------------------------------------------
-- Create View Sales
-----------------------------
CREATE VIEW gold.sales
AS
SELECT * FROM
OPENROWSET
(
BULK 'https://spawprojectdatalake.dfs.core.windows.net/silver/AdventureWorks_Sales/',
FORMAT = 'PARQUET'
) as QUERY1
------------------------------------------------------------------------------------------------
-- Create View Subcatagories
-----------------------------
CREATE VIEW gold.subcat
AS
SELECT * FROM
OPENROWSET
(
BULK 'https://spawprojectdatalake.dfs.core.windows.net/silver/AdventureWorks_SubCategories/',
FORMAT = 'PARQUET'
) as QUERY1
------------------------------------------------------------------------------------------------
-- Create View Terittories
-----------------------------
CREATE VIEW gold.terittories
AS
SELECT * FROM
OPENROWSET
(
BULK 'https://spawprojectdatalake.dfs.core.windows.net/silver/AdventureWorks_Territories/',
FORMAT = 'PARQUET'
) as QUERY1
Now create new sql script and check like select * from gold.subcat
We are able to access
Now need to create external table
To create external table in synapse analytics, we should follow 3 steps
1. Credential ---- we should pick the data using manage identity
2. External data source ---- to pick instead of using Full URL again and again, can put the URL till
container level (till silver) rest of the URL I can put in the location
3. External file format ---- we should define where file saved in which format saved
Create new script
First we should create master key for that db
CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'Surya1512'
Create Credential
CREATE DATABASE SCOPED CREDENTIAL cred_surya
WITH
IDENTITY = 'Managed Identity'
Create External data source
CREATE EXTERNAL DATA SOURCE source_silver
WITH
(
LOCATION = 'https://spawprojectdatalake.dfs.core.windows.net/silver',
CREDENTIAL = cred_surya
)
For gold as well
CREATE EXTERNAL DATA SOURCE source_gold
WITH
(
LOCATION = 'https://spawprojectdatalake.dfs.core.windows.net/gold',
CREDENTIAL = cred_surya
)
Create external table format
CREATE EXTERNAL FILE FORMAT format_parquet
WITH
(
FORMAT_TYPE = PARQUET,
DATA_COMPRESSION = 'org.apache.hadoop.io.compress.SnappyCodec'
)
Now we are good to create external table using CETAS – Creating External Table AS Select
CREATE EXTERNAL TABLE gold.extsales
WITH
(
LOCATION = 'extsales',
DATA_SOURCE = source_gold,
FILE_FORMAT = format_parquet
)
AS
SELECT * FROM gold.sales
Now we can get the data by quering SELECT * FROM gold.sales
By views also we are able to get, why we are creating external table?
In views we don’t have physical data, but here data are stored in physical format – we ref it for future
Now in gold folder we can see the file --------------------