0% found this document useful (0 votes)
12 views11 pages

Fast Back End

Uploaded by

Vikash Bafila
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
12 views11 pages

Fast Back End

Uploaded by

Vikash Bafila
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
You are on page 1/ 11

import os

import pandas as pd
import regex as re
import tiktoken
import requests
import re
import google.auth
from openpyxl import Workbook
from google.auth.transport.requests import Request
from concurrent.futures import ThreadPoolExecutor
from openpyxl import load_workbook
from openpyxl.utils import get_column_letter
from openpyxl.styles import Alignment, PatternFill, Border, Side
from langchain_core.runnables import Runnable
from langchain_core.prompt_values import StringPromptValue
from typing import Dict, List, Optional
from langchain_core.tools import tool
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
import google.auth.transport.requests
from google.oauth2 import service_account
from langchain_core.runnables import Runnable
from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from pydantic import BaseModel
import tempfile
import shutil
from pathlib import Path
import asyncio
from typing import Union
import uuid

app = FastAPI(title="Business Glossary Generator API", version="1.0.0")

# Add CORS middleware


app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure this for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

SERVICE_URL = "https://medium-cf-593290627747.us-central1.run.app"
SERVICE_ACCOUNT_FILE = "shining-fiber-468008-c0-0c5ea6c8ff51.json"

# Pydantic models
class ProcessSchemaRequest(BaseModel):
schema_description: Optional[str] = ""
max_tokens: Optional[int] = 100

class ProcessStatusResponse(BaseModel):
task_id: str
status: str
message: str
progress: Optional[int] = 0
class GlossaryResponse(BaseModel):
task_id: str
status: str
message: str
download_url: Optional[str] = None
total_rows: Optional[int] = 0

# In-memory storage for task status (use Redis in production)


task_storage = {}

def call_cloud_run_model(prompt: str, temperature: float = 0.7) -> dict:


try:
credentials = service_account.IDTokenCredentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
target_audience="https://medium-cf-593290627747.us-central1.run.app"
)
auth_request = google.auth.transport.requests.Request()
credentials.refresh(auth_request)

headers = {
"Authorization": f"Bearer {credentials.token}",
"Content-Type": "application/json"
}

payload = {
"prompt": prompt,
"temperature": temperature
}

response = requests.post(SERVICE_URL, headers=headers, json=payload)


response.raise_for_status()
return response.json()

except Exception as e:
print(f"❌ Error in call_cloud_run_model: {str(e)}")
return {"error": str(e)}

class CloudRunLLM(Runnable):
def invoke(self, input, config=None):
try:
if isinstance(input, dict):
prompt_text = input.get('formatted_prompt') or str(input)
else:
prompt_text = str(input)

print("🟢 Prompt sent to Cloud Run:")


print(prompt_text)

response = call_cloud_run_model(prompt_text)

print("🟢 Response from Cloud Run:")


print(response)

if 'error' in response:
return f"Error: {response['error']}"

if isinstance(response, dict):
return response.get("predictions", {}).get("choices", [{}])
[0].get("message", {}).get("content", str(response))
else:
return str(response)

except Exception as e:
print(f"Error in CloudRunLLM.invoke: {str(e)}")
return f"Error processing request: {str(e)}"

llm = CloudRunLLM()

def read_schema(schema_file=None, schema_df=None, max_tokens=100):


encoding = tiktoken.get_encoding("cl100k_base")

if schema_df is None:
schema_df = pd.read_excel(schema_file, engine='openpyxl')
schema_df = schema_df.drop_duplicates(subset=['TABLE_NAME', 'COLUMN_NAME'],
keep='first')
schema_df = schema_df[schema_df['TABLE_NAME'].notna() &
schema_df['COLUMN_NAME'].notna()]
schema_df = schema_df[schema_df['SCHEMA_NAME'].notna() &
schema_df['TABLE_NAME'].notna()]
schema_df = schema_df[schema_df['COLUMN_NAME'].str.strip() != '']
schema_df = schema_df[schema_df['TABLE_NAME'].str.strip() != '']
schema_df = schema_df[schema_df['SCHEMA_NAME'].str.strip() != '']

schema_info_batches = []
current_batch = []
current_token_count = 0

has_COLUMN_NAME = 'COLUMN_NAME' in schema_df.columns


has_ddl = 'DDL' in schema_df.columns
base_fields = {'SCHEMA_NAME', 'TABLE_NAME', 'COLUMN_NAME'}
additional_fields = [col for col in schema_df.columns if col not in
base_fields]

for schema in schema_df['SCHEMA_NAME'].unique():


schema_tables = schema_df[schema_df['SCHEMA_NAME'] == schema]
['TABLE_NAME'].unique()
for table in schema_tables:
chunk = f"\nSchema: {schema}\nTable: {table}\n"
if has_ddl:
ddl_rows = schema_df[(schema_df['SCHEMA_NAME'] == schema) &
(schema_df['TABLE_NAME'] == table)]['DDL']
ddl_val = ddl_rows.values[0] if len(ddl_rows) > 0 else ""
chunk += f"DDL: {ddl_val}\n"
if has_COLUMN_NAME:
table_rows = schema_df[(schema_df['SCHEMA_NAME'] == schema) &
(schema_df['TABLE_NAME'] == table)]
columns = []
for _, row in table_rows.iterrows():
col_name = str(row['COLUMN_NAME']).strip()
extras = []
for field in additional_fields:
val = str(row[field]).strip() if pd.notna(row[field]) else
""
if val:
extras.append(val)
if extras:
col_name += " (" + ", ".join(extras) + ")"
columns.append(col_name)
if columns:
chunk += "Columns: " + ", ".join(columns) + "\n"

chunk_token_length = len(encoding.encode(chunk))

if current_token_count + chunk_token_length > max_tokens:


if current_batch:
schema_info_batches.append(current_batch)
current_batch = [chunk]
current_token_count = chunk_token_length
else:
current_batch.append(chunk)
current_token_count += chunk_token_length

if current_batch:
schema_info_batches.append(current_batch)

return schema_df, schema_info_batches

def ddl_description(chunk, schema_description):


db_schema_prompt = PromptTemplate.from_template(
"""
You are a business analyst and subject matter expertise in multiple domains of
enterprise including ESG, BFSI and retail.
You will create a comprehensive business glossary from the given database
information by reasoning step-by-step
based on the following instruction:

1. Table Description: For each table, generate an informative description by


drawing context from the schema description
as well as name and metadata of the attributes. Be authoritative and concise,
using your expertise in the domain to provide
a clear understanding of the table's purpose and relevance. Table descriptions
should be succinct yet informative, capturing
the essence of the data it holds and its role in the broader business context.
Do not use generic terms or broad and vague
descriptions; do not use words like "likely", "may", "appears to", "could",
etc. Instead, use definitive language that
reflects your expertise and understanding of the domain. Do not mention the
TABLE_NAME or SCHEMA_NAME in the description.

2. Column Descriptions: List all columns for each table with comprehensive and
business-centric definitions. Industry
specific terminologies and business concepts should be broken down and
elaborated upon such that the reader finds the
glossary useful for data engineering as well as business analytics.
Abbreviations should be expanded and explained, and
acronyms should be defined. Do not use generic terms or broad and vague
descriptions. Ensure there is no redundancy in
the descriptions. Include formulas to explain business concepts better.
Formulas should not contain square brackets.
Strictly do not consolidate or merge column descriptions, even if they are
similar or sequential.

3. Subject Area: The column description should be accompanied by the subject


area under the business domain that the
data within the column pertains to. Subject area name should be enclosed in
square brackets. Use ontological terminologies to
avoid inconsistencies in subject area names due to synonyms or abbreviations.
4. PII: The column description should also indicate if the column contains PII
(Personally Identifiable Information) data by
appending "[pii]" to the column description after the subject area.
You are analyzing column metadata to determine if any column contains
Personally Identifiable Information (PII).

PII detection rules:


- Classify PII into 3 levels:
- Level 1: Indirect identifiers. Alone not sensitive (e.g., Region code, Zone,
Gender etc). Only becomes PII when aggregated with other sensitive fields.
- Level 2: Public identifiers that can be used to identify a person (e.g.,
Name, Phone, Email, Staff ID, Branch Name etc). Always PII.
- Level 3: Highly sensitive financial/personal identifiers (e.g., Account
Number, Customer ID, Aadhar, PAN, Financial Limits etc). Always PII.

Apply the following rules:


- If a column contains Level 2 or Level 3 data → mark as "[Yes]".
- If a column contains Level 1 PII but *cannot* be aggregated with other
sensitive data → mark as "[No]".
- If a column contains Level 1 PII *and* can be aggregated with others to
reveal identity → mark as "[Yes]".
- If unsure or ambiguous, err on the side of caution and mark it as "[Yes]".

Additional instructions:
- Column names may be cryptic (e.g., "ACID", "FORACID", "CUST_ID"). Use
description context to infer meaning.
- Do not rely only on column names; read the column definitions before tagging.

5. Ambiguous Attributes: Infer meanings for incomplete or ambiguous names based


on context from the schema description and
metadata. However, a table may contain placeholder columns such as "col1",
"col2", "EXTENSION_ATTRIBUTE_1", "JOB_INFORMATION_19",
etc. which should not be described unless meaningful labels are provided
against them.

6. Only describe the tables and columns listed below. Do not invent or add any
that are not present.

Schema Description:
{schema_description}

Table Info:
{schema_data}

Use this format throughout:


Schema: [SCHEMA_NAME]\n
Table: [TABLE_NAME]\n
Description: [Concise business-level table description]\n
- [Column 1 Name]: [Comprehensive and business-centric definition for the
column] [Subject Area] [Yes/No]\n
- [Column 2 Name]: [Comprehensive and business-centric definition for the
column] [Subject Area] [Yes/No]\n
...
- [Column n Name]: [Comprehensive and business-centric definition for the
column] [Subject Area] [Yes/No]\n
------
Ensure every column has a "[Yes]" or "[No]" PII tag. Do not skip or omit any
field.
Do not include introductory or closing messages. After describing all columns,
end your response with ------.
""")
runnable = RunnablePassthrough()
output_parser = StrOutputParser()

chain = runnable | db_schema_prompt | llm | output_parser


glossary = chain.invoke({"schema_data": chunk, "schema_description":
schema_description})
return glossary

def refine_ddl_description(ddl_chunk: str, schema_description: str = "") -> str:


refine_db_schema_prompt = PromptTemplate(
input_variables=["ddl_chunk"],
template="""
You are a business analyst with subject matter expertise in enterprise data
governance across ESG, BFSI, and Retail domains.

Below is a 'business glossary' where 'PII' values are not accurate.


{ddl_chunk}

Your task is to re-evaluate the 'column names and descriptions' to correctly


assign the 'PII' tag as either '[Yes]' or '[No]'.

Use this format:


Schema: [SCHEMA_NAME]
Table: [TABLE_NAME]
Description: [Concise table-level description]

- [Column 1 Name]: [Column description] [Subject Area] [Yes/No]


- [Column 2 Name]: [Column description] [Subject Area] [Yes/No]
...
------

Do NOT skip or omit any column. End only with `------`. Do not add explanations
or intro messages.
"""
)

chain = (
{"ddl_chunk": lambda _: ddl_chunk}
| refine_db_schema_prompt
| llm
| StrOutputParser()
)

updated_glossary = chain.invoke({})
return updated_glossary

def parse_glossary_output(text: str) -> list:


"""
Parses the model's glossary response and extracts:
(Table Name, Column Name, Description, Subject Area, PII Classification)
"""
rows = []
current_table = None
lines = text.splitlines()
for line in lines:
line = line.strip()
if line.lower().startswith("table:"):
current_table = line.split(":", 1)[1].strip()
elif line.startswith("- ") and ":" in line:
try:
column_part, rest = line[2:].split(":", 1)
column_name = column_part.strip()
matches = re.findall(r"(.*?)\s*(\[(.*?)\])?\s*(\[(Yes|No)\])?$",
rest.strip())
if matches:
description = matches[0][0].strip()
subject_area = matches[0][2] if matches[0][2] else ""
pii = matches[0][4] if matches[0][4] else ""
rows.append((current_table, column_name, description,
subject_area, pii))
except Exception:
continue
return rows

async def process_schema_async(task_id: str, schema_file_path: str,


schema_description: str = "", max_tokens: int = 100):
"""Background task to process schema file"""
try:
print(f"🔍 Debug: Starting background processing for task: {task_id}")
print(f"🔍 Debug: Schema file path: {schema_file_path}")
print(f"🔍 Debug: File exists: {os.path.exists(schema_file_path)}")

task_storage[task_id]["status"] = "processing"
task_storage[task_id]["message"] = "Reading schema file..."

# Verify file exists and is readable


if not os.path.exists(schema_file_path):
raise Exception(f"Schema file not found: {schema_file_path}")

# Read the schema file and create batches


print(f"🔍 Debug: Reading schema file...")
schema_df, schema_batches = read_schema(schema_file=schema_file_path,
max_tokens=max_tokens)
print(f"🔍 Debug: Schema read successfully. Batches: {len(schema_batches)}")

task_storage[task_id]["message"] = f"Processing {len(schema_batches)}


batches..."

all_rows = []
total_batches = len(schema_batches)

for i, batch in enumerate(schema_batches):


print(f"🔍 Debug: Processing batch {i + 1}/{total_batches}")

task_storage[task_id]["message"] = f"Processing batch {i + 1} of


{total_batches}..."
task_storage[task_id]["progress"] = int((i / total_batches) * 50) #
First 50% for initial processing

initial_glossary = ddl_description("\n".join(batch),
schema_description)
print(f"🔍 Debug: Initial glossary generated for batch {i + 1}")
task_storage[task_id]["message"] = f"Refining batch {i + 1} of
{total_batches}..."
task_storage[task_id]["progress"] = int((i / total_batches) * 50) + 25
# Next 25% for refinement

refined_glossary = refine_ddl_description(initial_glossary,
schema_description)
print(f"🔍 Debug: Refined glossary generated for batch {i + 1}")

batch_rows = parse_glossary_output(refined_glossary)
all_rows.extend(batch_rows)
print(f"🔍 Debug: Batch {i + 1} parsed, total rows so far:
{len(all_rows)}")

task_storage[task_id]["progress"] = int((i / total_batches) * 75) + 25


# Final 25% for parsing

# Save to Excel
print(f"🔍 Debug: Generating Excel file...")
task_storage[task_id]["message"] = "Generating Excel file..."
task_storage[task_id]["progress"] = 95

wb = Workbook()
ws = wb.active
ws.title = "Business Glossary"

headers = ["Table Name", "Column Name", "Description", "Subject Area", "PII


Classification"]
ws.append(headers)

for row in all_rows:


ws.append(row)

# Create output directory if it doesn't exist


output_dir = Path("output")
output_dir.mkdir(exist_ok=True)

output_file = output_dir / f"business_glossary_{task_id}.xlsx"


wb.save(str(output_file))
print(f"✅ Excel file saved: {output_file}")

task_storage[task_id]["status"] = "completed"
task_storage[task_id]["message"] = "Processing completed successfully"
task_storage[task_id]["progress"] = 100
task_storage[task_id]["output_file"] = str(output_file)
task_storage[task_id]["total_rows"] = len(all_rows)

print(f"✅ Background processing completed for task: {task_id}")

# Clean up the uploaded file


if os.path.exists(schema_file_path):
os.remove(schema_file_path)
print(f" Cleaned up uploaded file: {schema_file_path}")

except Exception as e:
print(f"❌ Error in background task {task_id}: {str(e)}")
print(f"❌ Error type: {type(e).__name__}")
import traceback
print(f"❌ Traceback: {traceback.format_exc()}")

task_storage[task_id]["status"] = "failed"
task_storage[task_id]["message"] = f"Error: {str(e)}"
task_storage[task_id]["progress"] = 0

# FastAPI endpoints
@app.get("/")
async def root():
return {"message": "Business Glossary Generator API", "version": "1.0.0"}

@app.post("/upload-schema", response_model=ProcessStatusResponse)
async def upload_schema(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
schema_description: str = "",
max_tokens: int = 100
):
"""Upload schema Excel file and start processing"""

print(f"🔍 Debug: Received file: {file.filename}")


print(f"🔍 Debug: File content type: {file.content_type}")
print(f"🔍 Debug: Schema description: {schema_description}")
print(f"🔍 Debug: Max tokens: {max_tokens}")

# Validate file type


if not file.filename or not file.filename.endswith(('.xlsx', '.xls')):
print(f"❌ Invalid file type: {file.filename}")
raise HTTPException(status_code=400, detail="Only Excel files (.xlsx, .xls)
are supported")

# Generate unique task ID


task_id = str(uuid.uuid4())
print(f"🔍 Debug: Generated task ID: {task_id}")

# Save uploaded file temporarily


uploads_dir = Path("uploads")
uploads_dir.mkdir(exist_ok=True)

file_path = uploads_dir / f"schema_{task_id}_{file.filename}"

try:
print(f"🔍 Debug: Saving file to: {file_path}")
# Read file content
file_content = await file.read()
print(f"🔍 Debug: File content length: {len(file_content)} bytes")

# Save to disk
with open(file_path, "wb") as buffer:
buffer.write(file_content)

print(f"✅ File saved successfully to: {file_path}")

except Exception as e:
print(f"❌ Error saving file: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to save uploaded file:
{str(e)}")

# Initialize task status


task_storage[task_id] = {
"status": "started",
"message": "Task initiated",
"progress": 0,
"output_file": None,
"total_rows": 0
}

print(f"🔍 Debug: Task storage initialized for: {task_id}")

# Start background processing


background_tasks.add_task(
process_schema_async,
task_id,
str(file_path),
schema_description,
max_tokens
)

print(f"✅ Background task started for: {task_id}")

response = ProcessStatusResponse(
task_id=task_id,
status="started",
message="Schema processing started",
progress=0
)

print(f"🔍 Debug: Returning response: {response}")


return response

@app.get("/status/{task_id}", response_model=GlossaryResponse)
async def get_task_status(task_id: str):
"""Get the status of a processing task"""

if task_id not in task_storage:


raise HTTPException(status_code=404, detail="Task not found")

task = task_storage[task_id]

download_url = None
if task["status"] == "completed" and task.get("output_file"):
download_url = f"/download/{task_id}"

return GlossaryResponse(
task_id=task_id,
status=task["status"],
message=task["message"],
download_url=download_url,
total_rows=task.get("total_rows", 0)
)

@app.get("/download/{task_id}")
async def download_glossary(task_id: str):
"""Download the generated business glossary Excel file"""

if task_id not in task_storage:


raise HTTPException(status_code=404, detail="Task not found")
task = task_storage[task_id]

if task["status"] != "completed":
raise HTTPException(status_code=400, detail="Task not completed yet")

output_file = task.get("output_file")
if not output_file or not os.path.exists(output_file):
raise HTTPException(status_code=404, detail="Output file not found")

return FileResponse(
output_file,
media_type="application/vnd.openxmlformats-
officedocument.spreadsheetml.sheet",
filename=f"business_glossary_{task_id}.xlsx"
)

@app.delete("/cleanup/{task_id}")
async def cleanup_task(task_id: str):
"""Clean up task data and associated files"""

if task_id not in task_storage:


raise HTTPException(status_code=404, detail="Task not found")

task = task_storage[task_id]

# Remove output file if exists


output_file = task.get("output_file")
if output_file and os.path.exists(output_file):
os.remove(output_file)

# Remove task from storage


del task_storage[task_id]

return {"message": "Task cleaned up successfully"}

@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "timestamp": pd.Timestamp.now().isoformat()}

if __name__ == "__main__":
import uvicorn
uvicorn.run("fast_back_end:app", host="0.0.0.0", port=8000, reload=True)

You might also like