0% found this document useful (0 votes)
4 views9 pages

PROJECT 6 Python

The document outlines a project to create an end-to-end data pipeline for tracking policy status transitions using Azure, Databricks, and Snowflake. It details the architecture, data transformation processes across Bronze, Silver, and Gold layers, and the final storage of processed data in Snowflake. The pipeline aims to maintain a comprehensive view of policy lifecycles, ensuring data quality and facilitating strategic insights.

Uploaded by

nikhilranjan2357
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
4 views9 pages

PROJECT 6 Python

The document outlines a project to create an end-to-end data pipeline for tracking policy status transitions using Azure, Databricks, and Snowflake. It details the architecture, data transformation processes across Bronze, Silver, and Gold layers, and the final storage of processed data in Snowflake. The pipeline aims to maintain a comprehensive view of policy lifecycles, ensuring data quality and facilitating strategic insights.

Uploaded by

nikhilranjan2357
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd

Policy Lifecycle Tracking Project | End-to-End

Azure Pipeline with Databricks & Snowflake


Project Summary

ㆍ Objective: To build a data pipeline for tracking policy status transitions over time by
ingesting, transforming, and storing policy data from raw sources to a refined analytical state.

ㆍ Source: Daily policy data stored in a Blob Storage (Raw Folder).

ㆍ Destination: A Snowflake data warehouse, with intermediate transformations and data


quality layers handled in Azure Data Lake Gen2 and Databricks.

ㆍ Purpose: Maintain a full lifecycle view of each policy, track changes in status, and compute
the duration spent in each status phase.

Architecture diagram:

The end-to-End pipeline with dynamic parameters

Pipeline debug result for day-1 file


Data Transformation & Workflow

1. Bronze Layer (Raw Ingestion)

ㆍ Ingest daily policy data from Azure Blob Storage into ADLS Gen2 Bronze folder.

ㆍ Raw data is stored as-is, preserving original structure for audit and traceability.

2. Silver Layer (Cleansing & Standardization)

ㆍ Use Databricks to process Bronze layer data.

ㆍ Apply cleansing (null handling, schema enforcement) and standardization (field renaming,
type casting).

ㆍ Store cleaned and standardized data in the Silver folder in ADLS Gen2.

Silver transformation logic is

# Import the required functions and types

from [Link] import *

from [Link] import *

#Read policy file

policy_df = [Link]("csv").option("header", "true").option("inferSchema",


"true").load("/mnt/bronze/policy_snapshot_*.csv")

# data cleansing

policy_clean_df=policy_df.withColumn("policy_status",trim(col("policy_status"))) \

.withColumn("policy_status",
when(col("policy_status").isin("submited","Submited"),"Submitted").

when(col("policy_status").isin("Actve","actve"),"Active").

when(col("policy_status").isin("Canceld","canceld"),"Cancelled").

when(col("policy_status").isin("Mature","mature"),"Mature").

otherwise(initcap(col("policy_status")))) \

.withColumn("submission_date",col("submission_date").cast(DateType())) \

.withColumn("status_update_date",to_date(col("status_update_date"))) \

.withColumn("agent_id",trim(col("agent_id"))) \

.withColumn("agent_id", when(col("agent_id").isNull(),
lit("UNKNOWN")).otherwise(col("agent_id"))) \

.fillna("UNKNOWN","policy_status") \

.dropna(subset=["submission_date", "status_update_date"])
#Read region file

region_df=[Link]("csv") \

.option("header", "true") \

.option("inferSchema", "true") \

.load("/mnt/region/[Link]")

#join policy and region files

policy_region_df=policy_clean_df.join(region_df,policy_clean_df.region
==region_df.region_id,"inner").drop("region_id")

# validation rule

valid_statuses = ["Submitted","Active","Cancelled","Mature"]

policy_validated_df=policy_region_df.filter(col("policy_status").isin(valid_statuses))

# Add audit information

df_silver = policy_validated_df.withColumn("ingesttime", current_timestamp())

# Write to silver table

df_silver.[Link]("overwrite").format("delta").save("/mnt/silver/policy")

The transformed file is in silver folder as shown below

3. Gold Layer (Business Transformations)

ㆍ Read Silver data into Databricks.

ㆍ Apply business logic to generate the Policy Dimension (policy_dim) table:

• Append daily data to maintain historical versions of policy records.


• Capture all phase changes for each policy.

ㆍ Store policy_dim table into Snowflake as a dimensional table.


4. Policy Status Tracking

ㆍ Read policy_dim table from Snowflake.

ㆍ Compute previous status, current status, and date difference between transitions using
window functions or lag operations.

ㆍ Store the output as policy_track_status table in Snowflake

Gold layer transformation logic is

# Import the required functions and types

from [Link] import *

from [Link] import *

from [Link] import Window

#read Policy file

policy_df = [Link]("delta").load("/mnt/silver/policy")

#Access snowflake account

sfOptions = {

"sfURL": "[Link]

"sfDatabase": "POLICY_DATA_DB",

"sfSchema": "GOLD_LAYER",

"sfWarehouse": "POLICY_WH",

"sfRole": "ACCOUNTADMIN",

"sfUser": "AAAAAAAAAAAA",

"sfPassword": "XXXXXXXXXXXX",

#write dim current table

policy_df.write \

.format("snowflake") \

.options(**sfOptions) \

.option("dbtable", "policy_dim") \

.mode("append") \

.save()
#Read the latest policy dim table

policy_dim_current = [Link] \

.format("snowflake") \

.options(**sfOptions) \

.option("dbtable", "policy_dim") \

.load()

#Lifecycle Status Tracking | get previous and next status

windowfunc=[Link]("policy_id").orderBy("status_update_date")

policy_status_df =
policy_dim_current.withColumn("prev_status",lag("policy_status").over(windowfunc)) \

.withColumn("prev_status_update_date",lag("status_update_date").over(windowfunc))

policy_diff_df =
policy_status_df.withColumn("No_of_day_in_status",datediff(col("status_update_date"),col("p
rev_status_update_date")))

policy_track_status_df = policy_diff_df \

.select("Policy_ID","Prev_status","policy_status","status_update_date","No_of_day_in_status")

#write dim current table

policy_track_status_df.write \

.format("snowflake") \

.options(**sfOptions) \

.option("dbtable", "policy_track_status") \

.mode("overwrite") \

.save()

The transformed files are in snowflake as shown in below screenshots

Post-Processing Logic:

• After processing each day's data: ○ Files are deleted from the raw folder in Blob
Storage.
• Metadata/logs of processed files are stored in a log folder.
• Bronze-level files are moved to an archive folder for backup and auditing.
Logged in log file regarding file deletion from raw folder

File was moved to archive folder


Processing day-2 file:Day-2 file was processed and debug result as shown below

Day-2 data is appended in snowflake Policy_dim table


The below table has the previous status and no of days that Policy was in previous status

Conclusion

ㆍ End-to-end pipeline ensures clean, historical, and traceable policy data.

ㆍ Intelligent logging, archival, and deletion mechanisms ensure efficient storage management
and audit readiness.

ㆍ Gold layer delivers value-added insights into policy lifecycle stages, enabling strategic
decision-making.

You might also like