0% found this document useful (0 votes)
21 views14 pages

PROJECT 9 For Python

Python mini project

Uploaded by

nikhilranjan2357
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
21 views14 pages

PROJECT 9 For Python

Python mini project

Uploaded by

nikhilranjan2357
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 14

Financial Data Lakehouse on Azure with

Medallion Architecture
Objective:

To build a scalable, maintainable, and analytics-ready financial data platform using a Medallion
architecture (Bronze → Silver → Gold) on Azure Data Lake Storage Gen2 using Apache Spark
(PySpark), with support for SCD Type 2, quality validation, and dimensional modeling

Architecture

Pipeline

1. Bronze Layer – Raw Ingestion

Purpose: Store raw, unprocessed CSV files from source systems.

Sources Ingested:

• raw_customer_data_consistent.csv
• raw_stock_data.csv
• raw_transaction_data_consistent.csv

Raw to Bronze source and Sink screenshots.


Sink dataset:

Files in bronze folder -ADLS Gen2


2. Silver Layer – Data Cleaning & Standardization

Purpose: Apply data cleansing, transformation, and standard formatting for analytics
readiness.

Transformations Done:

Customer Data:

• Remove duplicates based on Customer_ID.


• Trim and clean Name, Email, and Phone.
• Normalize date format for DOB.
• Lowercase and title-case where appropriate.
• Validate presence of critical fields.

Transaction Data:

• Filter transactions with valid Quantity, Price.


• Parse Transaction_Date.
• Fill missing values and remove nulls in critical fields.
• Deduplicate on Transaction_ID.

Stock Data:

• Deduplicate on Date + Stock_Symbol.


• Normalize Date field format.

Output Format:

• Saved as Parquet files to ADLS Gen2 silver container.


Bronze to Silver transformation in pyspark

#import libraries

from pyspark.sql.functions import *

from pyspark.sql.window import *

from delta.tables import *

# Read bronze_nifty_companies from ADLS or Delta

df_br_cust = spark.read.format("csv") \

.option("header","true") \

.option("inferschema","true") \

.load("abfss://[email protected]/raw_customer_data_consistent.c
sv")

df_br_stock = spark.read.format("csv") \

.option("header","true") \

.option("inferschema","true") \

.load("abfss://[email protected]/raw_stock_data.csv")

df_br_trans = spark.read.format("csv") \

.option("header","true") \

.option("inferschema","true") \

.load("abfss://[email protected]/raw_transaction_data_consistent
.csv")

# transactions clean up

df_tran_clean = df_br_trans \

.withColumn("Transaction_Date", to_date(col("Transaction_Date"), "yyyy-MM-dd")) \

.filter(col("Quantity") > 0) \

.filter(col("Price") > 0) \

.fillna("Unknown",subset=["Customer_ID","Product"]) \

.dropna(subset=["Transaction_Type"]) \

.dropDuplicates(["Transaction_ID"])

#clean up stocks

df_stock_clean = df_br_stock \

.withColumn("Date",to_date(col("Date"),"YYYY-MM-DD")) \

.dropDuplicates(["Date","Stock_symbol"])
#cleaning customer data

df_cust_clean = df_br_cust.dropDuplicates(["Customer_ID"]) \

.withColumn("Name", trim("Name")) \

.withColumn("Email", lower(trim("Email"))) \

.withColumn("Phone", trim("Phone")) \

.withColumn("DOB", to_date("DOB", "yyyy-MM-dd")) \

.filter(col("DOB").isNotNull()) \

.dropna(subset=["Customer_ID", "Name", "DOB", "Email"]) \

.withColumn("Name", initcap("Name"))

#writing files to ADLS GEN2 silver folder

df_tran_clean.write.mode("overwrite").format("parquet").save("abfss://silver@policyadlsgen2.
dfs.core.windows.net/transactions")

df_stock_clean.write.mode("overwrite").format("parquet").save("abfss://silver@policyadlsgen2
.dfs.core.windows.net/stocks")

df_cust_clean.write.mode("overwrite").format("parquet").save("abfss://silver@policyadlsgen2.
dfs.core.windows.net/customers")

Files from Silver layer - ADLS GEN2

Customer file in silver layer:


Stocks file in Silver layer:

Transaction file in Silver layer


3. Gold Layer – Dimensional Modeling & Business Logic

Purpose: Create analytics-ready dimensional models with Slowly Changing Dimensions (SCD)
support.

Dimensions:

Customer Dimension (dim_customer):

• Implements SCD Type 2 using Delta Lake.


• Uses SHA-256 record_hash to detect changes.
• Tracks is_current, start_date, end_date.

Stock Dimension (dim_stock):

• Tracks only the latest available metadata (Latest_Metadata_Date) using a window


functions

Fact Table:

• Transaction Fact (fact_transaction):


• Enriched with customer and stock dimension lookups.
• Adds derived columns: Year, Month, Day from Transaction_Date.
• Supports time-based analytics and joins with dimensions.

Output Format:

• Saved as Delta Lake format to ADLS Gen2 gold container.

Silver to Gold transformations:

#Read files from silver files

silver_path = "abfss://[email protected]/"

df_g_cust = spark.read.format("parquet").load(f"{silver_path}customers")

df_g_transactions = spark.read.format("parquet").load(f"{silver_path}transactions")

df_g_stocks = spark.read.format("parquet").load(f"{silver_path}/stocks")

# Add SCD Type 2 metadata columns to incoming data

df_cust_transformed = df_g_cust.withColumn("record_hash", sha2(concat_ws("||",


*df_g_cust.columns), 256)) \

.withColumn("is_current", lit(True)) \

.withColumn("start_date", current_timestamp()) \

.withColumn("end_date", lit(None).cast("timestamp"))
# Define target table path for Gold dim_customer

gold_cust_path = "abfss://[email protected]/dim_customer/"

# Check if Gold table exists

if DeltaTable.isDeltaTable(spark, gold_cust_path):

delta_gold = DeltaTable.forPath(spark, gold_cust_path)

df_existing = delta_gold.toDF().filter("is_current = True")

# Join on business key (e.g., Customer_ID) and hash comparison

join_cond = [df_existing["Customer_ID"] == df_cust_transformed["Customer_ID"]]

df_changes = df_existing.join(df_cust_transformed, join_cond, "inner") \


.filter(df_existing["record_hash"] = df_cust_transformed["record_hash"]) \

.drop(df_existing["Customer_ID"],

df_existing["Name"],

df_existing["DOB"],

df_existing["Email"],

df_existing["Phone"],

df_existing["record_hash"],

df_existing["is_current"],

df_existing["start_date"],

df_existing["end_date"])

if df_changes.count() > 0:

# Expire old records

delta_gold.alias("tgt").merge(

df_changes.alias("src"),

"tgt.Customer_ID = src.Customer_ID AND tgt.is_current = true"

).whenMatchedUpdate(set={

"is_current": lit(False),

"end_date": current_timestamp()

}).execute()
# Insert new version

df_cust_transformed.alias("new_data") \

.join(df_existing.select("Customer_ID"), "Customer_ID", "left_anti") \

.unionByName(df_changes) \

.write.format("delta").mode("append").save(gold_cust_path)

else:

# First time load

df_cust_transformed.write.format("delta").mode("overwrite").save(gold_cust_path)

# 1. Get latest metadata per Stock_Symbol based on Date

window_spec = Window.partitionBy("Stock_Symbol").orderBy(col("Date").desc())

df_dim_stock = df_g_stocks.withColumn("row_num", row_number().over(window_spec)) \

.filter("row_num = 1") \

.select("Stock_Symbol", "Date") \

.withColumnRenamed("Date", "Latest_Metadata_Date")

# 2. Save to Gold layer

df_dim_stock.write.format("delta").mode("overwrite") \

.save("abfss://[email protected]/dim_stock/")

# Read Gold-layer customer and stock dimensions (latest SCD state)

df_dim_customer =
spark.read.format("delta").load("abfss://[email protected]/dim_cust
omer/") \

.filter("is_current = true")

df_dim_stock =
spark.read.format("delta").load("abfss://[email protected]/dim_stoc
k/")
# Join with dimension tables using business keys

df_fact = df_g_transactions \

.join(df_dim_customer.select("Customer_ID"), on="Customer_ID", how="inner") \

.join(df_dim_stock.select("Stock_Symbol"), on="Stock_Symbol", how="inner") \

.withColumn("Year", year("Transaction_Date")) \

.withColumn("Month", month("Transaction_Date")) \

.withColumn("Day", dayofmonth("Transaction_Date"))

# Select fact table schema

df_fact_selected = df_fact.select(

"Transaction_ID",

"Customer_ID",

"Stock_Symbol",

"Transaction_Date",

"Transaction_Type",

"Quantity",

"Price",

"Product",

"Year",

"Month",

"Day"

# Write fact table to Gold layer in Delta format

df_fact_selected.write.format("delta").mode("overwrite") \

.save("abfss://[email protected]/fact_transaction/")
Files in Gold Layer:

Customer file in Gold layer:

Stock file in Gold layer:

Transactions file in Gold layer


Next day- New Customer

- Some existing records were updated

- new records were added

The existing records had some update, so the existing record was marked “is_current”
False and end-date as processed date.
The brand new records were added with start date as ingestion date and Is_current as ”True”
and end-date is null.

Key Features

• Clean separation of layers using Medallion Architecture.


• Robust SCD Type 2 handling in Customer dimension.
• Daily incremental ready transformations (supports idempotency and hash diffing).
• Modular and extensible ETL logic.
• Stored in Delta & Parquet formats optimized for analytical workloads.

You might also like