Policy Lifecycle Tracking Project | End-to-End
Azure Pipeline with Databricks & Snowflake
Project Summary
ㆍ Objective: To build a data pipeline for tracking policy status transitions over time by
ingesting, transforming, and storing policy data from raw sources to a refined analytical state.
ㆍ Source: Daily policy data stored in a Blob Storage (Raw Folder).
ㆍ Destination: A Snowflake data warehouse, with intermediate transformations and data
quality layers handled in Azure Data Lake Gen2 and Databricks.
ㆍ Purpose: Maintain a full lifecycle view of each policy, track changes in status, and compute
the duration spent in each status phase.
Architecture diagram:
The end-to-End pipeline with dynamic parameters
Pipeline debug result for day-1 file
Data Transformation & Workflow
1. Bronze Layer (Raw Ingestion)
ㆍ Ingest daily policy data from Azure Blob Storage into ADLS Gen2 Bronze folder.
ㆍ Raw data is stored as-is, preserving original structure for audit and traceability.
2. Silver Layer (Cleansing & Standardization)
ㆍ Use Databricks to process Bronze layer data.
ㆍ Apply cleansing (null handling, schema enforcement) and standardization (field renaming,
type casting).
ㆍ Store cleaned and standardized data in the Silver folder in ADLS Gen2.
Silver transformation logic is
# Import the required functions and types
from [Link] import *
from [Link] import *
#Read policy file
policy_df = [Link]("csv").option("header", "true").option("inferSchema",
"true").load("/mnt/bronze/policy_snapshot_*.csv")
# data cleansing
policy_clean_df=policy_df.withColumn("policy_status",trim(col("policy_status"))) \
.withColumn("policy_status",
when(col("policy_status").isin("submited","Submited"),"Submitted").
when(col("policy_status").isin("Actve","actve"),"Active").
when(col("policy_status").isin("Canceld","canceld"),"Cancelled").
when(col("policy_status").isin("Mature","mature"),"Mature").
otherwise(initcap(col("policy_status")))) \
.withColumn("submission_date",col("submission_date").cast(DateType())) \
.withColumn("status_update_date",to_date(col("status_update_date"))) \
.withColumn("agent_id",trim(col("agent_id"))) \
.withColumn("agent_id", when(col("agent_id").isNull(),
lit("UNKNOWN")).otherwise(col("agent_id"))) \
.fillna("UNKNOWN","policy_status") \
.dropna(subset=["submission_date", "status_update_date"])
#Read region file
region_df=[Link]("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("/mnt/region/[Link]")
#join policy and region files
policy_region_df=policy_clean_df.join(region_df,policy_clean_df.region
==region_df.region_id,"inner").drop("region_id")
# validation rule
valid_statuses = ["Submitted","Active","Cancelled","Mature"]
policy_validated_df=policy_region_df.filter(col("policy_status").isin(valid_statuses))
# Add audit information
df_silver = policy_validated_df.withColumn("ingesttime", current_timestamp())
# Write to silver table
df_silver.[Link]("overwrite").format("delta").save("/mnt/silver/policy")
The transformed file is in silver folder as shown below
3. Gold Layer (Business Transformations)
ㆍ Read Silver data into Databricks.
ㆍ Apply business logic to generate the Policy Dimension (policy_dim) table:
• Append daily data to maintain historical versions of policy records.
• Capture all phase changes for each policy.
ㆍ Store policy_dim table into Snowflake as a dimensional table.
4. Policy Status Tracking
ㆍ Read policy_dim table from Snowflake.
ㆍ Compute previous status, current status, and date difference between transitions using
window functions or lag operations.
ㆍ Store the output as policy_track_status table in Snowflake
Gold layer transformation logic is
# Import the required functions and types
from [Link] import *
from [Link] import *
from [Link] import Window
#read Policy file
policy_df = [Link]("delta").load("/mnt/silver/policy")
#Access snowflake account
sfOptions = {
"sfURL": "[Link]
"sfDatabase": "POLICY_DATA_DB",
"sfSchema": "GOLD_LAYER",
"sfWarehouse": "POLICY_WH",
"sfRole": "ACCOUNTADMIN",
"sfUser": "AAAAAAAAAAAA",
"sfPassword": "XXXXXXXXXXXX",
#write dim current table
policy_df.write \
.format("snowflake") \
.options(**sfOptions) \
.option("dbtable", "policy_dim") \
.mode("append") \
.save()
#Read the latest policy dim table
policy_dim_current = [Link] \
.format("snowflake") \
.options(**sfOptions) \
.option("dbtable", "policy_dim") \
.load()
#Lifecycle Status Tracking | get previous and next status
windowfunc=[Link]("policy_id").orderBy("status_update_date")
policy_status_df =
policy_dim_current.withColumn("prev_status",lag("policy_status").over(windowfunc)) \
.withColumn("prev_status_update_date",lag("status_update_date").over(windowfunc))
policy_diff_df =
policy_status_df.withColumn("No_of_day_in_status",datediff(col("status_update_date"),col("p
rev_status_update_date")))
policy_track_status_df = policy_diff_df \
.select("Policy_ID","Prev_status","policy_status","status_update_date","No_of_day_in_status")
#write dim current table
policy_track_status_df.write \
.format("snowflake") \
.options(**sfOptions) \
.option("dbtable", "policy_track_status") \
.mode("overwrite") \
.save()
The transformed files are in snowflake as shown in below screenshots
Post-Processing Logic:
• After processing each day's data: ○ Files are deleted from the raw folder in Blob
Storage.
• Metadata/logs of processed files are stored in a log folder.
• Bronze-level files are moved to an archive folder for backup and auditing.
Logged in log file regarding file deletion from raw folder
File was moved to archive folder
Processing day-2 file:Day-2 file was processed and debug result as shown below
Day-2 data is appended in snowflake Policy_dim table
The below table has the previous status and no of days that Policy was in previous status
Conclusion
ㆍ End-to-end pipeline ensures clean, historical, and traceable policy data.
ㆍ Intelligent logging, archival, and deletion mechanisms ensure efficient storage management
and audit readiness.
ㆍ Gold layer delivers value-added insights into policy lifecycle stages, enabling strategic
decision-making.