Financial Data Lakehouse on Azure with
Medallion Architecture
Objective:
To build a scalable, maintainable, and analytics-ready financial data platform using a Medallion
architecture (Bronze → Silver → Gold) on Azure Data Lake Storage Gen2 using Apache Spark
(PySpark), with support for SCD Type 2, quality validation, and dimensional modeling
Architecture
Pipeline
1. Bronze Layer – Raw Ingestion
Purpose: Store raw, unprocessed CSV files from source systems.
Sources Ingested:
• raw_customer_data_consistent.csv
• raw_stock_data.csv
• raw_transaction_data_consistent.csv
Raw to Bronze source and Sink screenshots.
Sink dataset:
Files in bronze folder -ADLS Gen2
2. Silver Layer – Data Cleaning & Standardization
Purpose: Apply data cleansing, transformation, and standard formatting for analytics
readiness.
Transformations Done:
Customer Data:
• Remove duplicates based on Customer_ID.
• Trim and clean Name, Email, and Phone.
• Normalize date format for DOB.
• Lowercase and title-case where appropriate.
• Validate presence of critical fields.
Transaction Data:
• Filter transactions with valid Quantity, Price.
• Parse Transaction_Date.
• Fill missing values and remove nulls in critical fields.
• Deduplicate on Transaction_ID.
Stock Data:
• Deduplicate on Date + Stock_Symbol.
• Normalize Date field format.
Output Format:
• Saved as Parquet files to ADLS Gen2 silver container.
Bronze to Silver transformation in pyspark
#import libraries
from pyspark.sql.functions import *
from pyspark.sql.window import *
from delta.tables import *
# Read bronze_nifty_companies from ADLS or Delta
df_br_cust = spark.read.format("csv") \
.option("header","true") \
.option("inferschema","true") \
.load("abfss://[email protected]/raw_customer_data_consistent.c
sv")
df_br_stock = spark.read.format("csv") \
.option("header","true") \
.option("inferschema","true") \
.load("abfss://[email protected]/raw_stock_data.csv")
df_br_trans = spark.read.format("csv") \
.option("header","true") \
.option("inferschema","true") \
.load("abfss://[email protected]/raw_transaction_data_consistent
.csv")
# transactions clean up
df_tran_clean = df_br_trans \
.withColumn("Transaction_Date", to_date(col("Transaction_Date"), "yyyy-MM-dd")) \
.filter(col("Quantity") > 0) \
.filter(col("Price") > 0) \
.fillna("Unknown",subset=["Customer_ID","Product"]) \
.dropna(subset=["Transaction_Type"]) \
.dropDuplicates(["Transaction_ID"])
#clean up stocks
df_stock_clean = df_br_stock \
.withColumn("Date",to_date(col("Date"),"YYYY-MM-DD")) \
.dropDuplicates(["Date","Stock_symbol"])
#cleaning customer data
df_cust_clean = df_br_cust.dropDuplicates(["Customer_ID"]) \
.withColumn("Name", trim("Name")) \
.withColumn("Email", lower(trim("Email"))) \
.withColumn("Phone", trim("Phone")) \
.withColumn("DOB", to_date("DOB", "yyyy-MM-dd")) \
.filter(col("DOB").isNotNull()) \
.dropna(subset=["Customer_ID", "Name", "DOB", "Email"]) \
.withColumn("Name", initcap("Name"))
#writing files to ADLS GEN2 silver folder
df_tran_clean.write.mode("overwrite").format("parquet").save("abfss://silver@policyadlsgen2.
dfs.core.windows.net/transactions")
df_stock_clean.write.mode("overwrite").format("parquet").save("abfss://silver@policyadlsgen2
.dfs.core.windows.net/stocks")
df_cust_clean.write.mode("overwrite").format("parquet").save("abfss://silver@policyadlsgen2.
dfs.core.windows.net/customers")
Files from Silver layer - ADLS GEN2
Customer file in silver layer:
Stocks file in Silver layer:
Transaction file in Silver layer
3. Gold Layer – Dimensional Modeling & Business Logic
Purpose: Create analytics-ready dimensional models with Slowly Changing Dimensions (SCD)
support.
Dimensions:
Customer Dimension (dim_customer):
• Implements SCD Type 2 using Delta Lake.
• Uses SHA-256 record_hash to detect changes.
• Tracks is_current, start_date, end_date.
Stock Dimension (dim_stock):
• Tracks only the latest available metadata (Latest_Metadata_Date) using a window
functions
Fact Table:
• Transaction Fact (fact_transaction):
• Enriched with customer and stock dimension lookups.
• Adds derived columns: Year, Month, Day from Transaction_Date.
• Supports time-based analytics and joins with dimensions.
Output Format:
• Saved as Delta Lake format to ADLS Gen2 gold container.
Silver to Gold transformations:
#Read files from silver files
silver_path = "abfss://[email protected]/"
df_g_cust = spark.read.format("parquet").load(f"{silver_path}customers")
df_g_transactions = spark.read.format("parquet").load(f"{silver_path}transactions")
df_g_stocks = spark.read.format("parquet").load(f"{silver_path}/stocks")
# Add SCD Type 2 metadata columns to incoming data
df_cust_transformed = df_g_cust.withColumn("record_hash", sha2(concat_ws("||",
*df_g_cust.columns), 256)) \
.withColumn("is_current", lit(True)) \
.withColumn("start_date", current_timestamp()) \
.withColumn("end_date", lit(None).cast("timestamp"))
# Define target table path for Gold dim_customer
gold_cust_path = "abfss://[email protected]/dim_customer/"
# Check if Gold table exists
if DeltaTable.isDeltaTable(spark, gold_cust_path):
delta_gold = DeltaTable.forPath(spark, gold_cust_path)
df_existing = delta_gold.toDF().filter("is_current = True")
# Join on business key (e.g., Customer_ID) and hash comparison
join_cond = [df_existing["Customer_ID"] == df_cust_transformed["Customer_ID"]]
df_changes = df_existing.join(df_cust_transformed, join_cond, "inner") \
.filter(df_existing["record_hash"] = df_cust_transformed["record_hash"]) \
.drop(df_existing["Customer_ID"],
df_existing["Name"],
df_existing["DOB"],
df_existing["Email"],
df_existing["Phone"],
df_existing["record_hash"],
df_existing["is_current"],
df_existing["start_date"],
df_existing["end_date"])
if df_changes.count() > 0:
# Expire old records
delta_gold.alias("tgt").merge(
df_changes.alias("src"),
"tgt.Customer_ID = src.Customer_ID AND tgt.is_current = true"
).whenMatchedUpdate(set={
"is_current": lit(False),
"end_date": current_timestamp()
}).execute()
# Insert new version
df_cust_transformed.alias("new_data") \
.join(df_existing.select("Customer_ID"), "Customer_ID", "left_anti") \
.unionByName(df_changes) \
.write.format("delta").mode("append").save(gold_cust_path)
else:
# First time load
df_cust_transformed.write.format("delta").mode("overwrite").save(gold_cust_path)
# 1. Get latest metadata per Stock_Symbol based on Date
window_spec = Window.partitionBy("Stock_Symbol").orderBy(col("Date").desc())
df_dim_stock = df_g_stocks.withColumn("row_num", row_number().over(window_spec)) \
.filter("row_num = 1") \
.select("Stock_Symbol", "Date") \
.withColumnRenamed("Date", "Latest_Metadata_Date")
# 2. Save to Gold layer
df_dim_stock.write.format("delta").mode("overwrite") \
.save("abfss://[email protected]/dim_stock/")
# Read Gold-layer customer and stock dimensions (latest SCD state)
df_dim_customer =
spark.read.format("delta").load("abfss://[email protected]/dim_cust
omer/") \
.filter("is_current = true")
df_dim_stock =
spark.read.format("delta").load("abfss://[email protected]/dim_stoc
k/")
# Join with dimension tables using business keys
df_fact = df_g_transactions \
.join(df_dim_customer.select("Customer_ID"), on="Customer_ID", how="inner") \
.join(df_dim_stock.select("Stock_Symbol"), on="Stock_Symbol", how="inner") \
.withColumn("Year", year("Transaction_Date")) \
.withColumn("Month", month("Transaction_Date")) \
.withColumn("Day", dayofmonth("Transaction_Date"))
# Select fact table schema
df_fact_selected = df_fact.select(
"Transaction_ID",
"Customer_ID",
"Stock_Symbol",
"Transaction_Date",
"Transaction_Type",
"Quantity",
"Price",
"Product",
"Year",
"Month",
"Day"
# Write fact table to Gold layer in Delta format
df_fact_selected.write.format("delta").mode("overwrite") \
.save("abfss://[email protected]/fact_transaction/")
Files in Gold Layer:
Customer file in Gold layer:
Stock file in Gold layer:
Transactions file in Gold layer
Next day- New Customer
- Some existing records were updated
- new records were added
The existing records had some update, so the existing record was marked “is_current”
False and end-date as processed date.
The brand new records were added with start date as ingestion date and Is_current as ”True”
and end-date is null.
Key Features
• Clean separation of layers using Medallion Architecture.
• Robust SCD Type 2 handling in Customer dimension.
• Daily incremental ready transformations (supports idempotency and hash diffing).
• Modular and extensible ETL logic.
• Stored in Delta & Parquet formats optimized for analytical workloads.