from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends,
APIRouter, HTTPException
from app.services.chat_ws_manager import manager
from app.core.database import get_db
from app.repositories.chat_repo import *
from app.models.user import User, BusinessRole
from app.api.dependencies import get_current_user
from sqlalchemy.orm import Session
from app.models.chat import ChatSession, ChatSessionStatus
from app.models.business import Business
import json
from app.repositories.user_repo import get_user_by_id
router = APIRouter(prefix="/ws", tags=["Chat"])
@router.websocket("/chat/{user_id}")
async def user_chat_socket(websocket: WebSocket, user_id: str, db:
Session = Depends(get_db)):
await manager.connect_user(user_id, websocket)
try:
# Find or assign admin
admin_id = manager.get_session_admin(user_id)
if not admin_id:
admin = get_least_busy_admin(db)
if not admin:
await websocket.send_text("No admin available right now.
Try again later.")
await websocket.close()
return
admin_id = admin.id
manager.register_session(user_id, admin_id)
session = create_chat_session(db, user_id=user_id,
admin_id=admin_id)
else:
session = get_user_active_chat(db, user_id)
if not session:
await websocket.send_text("No active session found.")
await websocket.close()
return
while True:
data = await websocket.receive_json()
message = data.get("content", "")
if not message:
await websocket.send_json({"error": "Empty message"})
continue
saved_message = save_message(db, session_id=session.id,
sender_id=user_id, content=message)
await manager.send_to_admin(admin_id, json.dumps({
"session_id": session.id,
"user_id": user_id,
"content": message,
"timestamp": saved_message.timestamp.isoformat()
}))
await websocket.send_json({"message": f"You: {message}",
"timestamp": saved_message.timestamp.isoformat()})
mark_message_delivered(db, saved_message.id)
except WebSocketDisconnect:
manager.disconnect_user(user_id)
except Exception as e:
await websocket.send_json({"error": str(e)})
manager.disconnect_user(user_id)
@router.websocket("/admin/{admin_id}")
async def admin_chat_socket(websocket: WebSocket, admin_id: str, db:
Session = Depends(get_db)):
await manager.connect_admin(admin_id, websocket)
try:
while True:
data = await websocket.receive_json()
session_id = data.get("session_id")
content = data.get("content")
target_user = data.get("user_id")
if not all([session_id, content, target_user]):
await websocket.send_json({"error": "Missing session_id,
content, or user_id"})
continue
session = db.query(ChatSession).filter(
ChatSession.id == session_id,
ChatSession.user_id == target_user,
ChatSession.is_active == True
).first()
if not session:
await websocket.send_json({"error": "No active chat
session found"})
continue
saved_message = save_message(db, session_id=session.id,
sender_id=admin_id, content=content)
await manager.send_to_user(target_user, json.dumps({
"session_id": session.id,
"admin_id": admin_id,
"content": content,
"timestamp": saved_message.timestamp.isoformat()
}))
await websocket.send_json({
"message": f"You to {target_user}: {content}",
"timestamp": saved_message.timestamp.isoformat()
})
mark_message_delivered(db, saved_message.id)
except WebSocketDisconnect:
manager.disconnect_admin(admin_id)
except Exception as e:
await websocket.send_json({"error": str(e)})
manager.disconnect_admin(admin_id)
@router.websocket("/business/{business_id}/{user_id}")
async def business_chat_socket(websocket: WebSocket, business_id: str,
user_id: str, db: Session = Depends(get_db)):
# if current_user.business_id != business_id or
current_user.business_role not in [BusinessRole.owner.value,
BusinessRole.admin.value, BusinessRole.staff.value]:
# await websocket.send_json({"error": "Not authorized to access
this business chat"})
# await websocket.close()
# return
# await manager.connect_user(user_id, websocket)
business = db.query(Business).filter_by(id=business_id).first()
if not business:
await websocket.send_json({"error": "Business not found"})
await websocket.close()
return
session = get_active_business_session(db, user_id, business_id)
if not session:
# Get business owner instead of least busy admin
owner = db.query(User).filter(
User.business_id == business_id,
User.business_role == BusinessRole.owner.value
).first()
owner_id = owner.id if owner else None
session = create_chat_session(db, user_id=user_id,
admin_id=owner_id, business_id=business_id)
await manager.connect_user(user_id, websocket)
try:
while True:
data = await websocket.receive_json()
message = data.get("content", "")
if not message:
await websocket.send_json({"error": "Empty message"})
continue
saved_message = save_message(db, session_id=session.id,
sender_id=user_id, content=message)
await manager.send_to_business(business_id, json.dumps({
"session_id": session.id,
"user_id": user_id,
"content": message,
"business_name": business.name,
"timestamp": saved_message.timestamp.isoformat()
}), db)
await websocket.send_json({
"message": f"You to {business.name}: {message}",
"timestamp": saved_message.timestamp.isoformat()
})
mark_message_delivered(db, saved_message.id)
if session.admin_id:
await manager.send_to_admin(session.admin_id,
json.dumps({
"session_id": session.id,
"user_id": user_id,
"content": message,
"business_name": business.name,
"timestamp": saved_message.timestamp.isoformat()
}))
except WebSocketDisconnect:
manager.disconnect_user(user_id)
except Exception as e:
await websocket.send_json({"error": str(e)})
manager.disconnect_user(user_id)
@router.post("/redirect/{session_id}")
async def redirect_chat(
session_id: str,
new_recipient_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
session = db.query(ChatSession).filter_by(id=session_id).first()
if not session or not session.business_id:
raise HTTPException(status_code=404, detail="Session not found
or not a business chat")
business =
db.query(Business).filter_by(id=session.business_id).first()
if not business:
raise HTTPException(status_code=404, detail="Business not
found")
if current_user.business_id != session.business_id or
current_user.business_role not in [BusinessRole.owner,
BusinessRole.admin]:
raise HTTPException(status_code=403, detail="Not authorized to
redirect this session")
new_recipient = db.query(User).filter_by(id=new_recipient_id,
business_id=session.business_id).first()
if not new_recipient or new_recipient.business_role not in
[BusinessRole.admin, BusinessRole.staff]:
raise HTTPException(status_code=400, detail="New recipient must
be an admin or staff in the same business")
if not await manager.redirect_session(session_id, new_recipient_id,
db):
raise HTTPException(status_code=500, detail="Failed to redirect
session")
return {"message": f"Session {session_id} redirected to
{new_recipient_id}"}
@router.get("/business/{business_id}/staff")
async def get_business_staff(
business_id: str,
# current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# if current_user.business_id != business_id or
current_user.business_role not in [BusinessRole.owner.value,
BusinessRole.admin.value]:
# print(f"Unauthorized access attempt by user
{current_user.business_id} for business {business_id}")
# raise HTTPException(status_code=403, detail="Not authorized
to view staff")
staff = db.query(User).filter(
User.business_id == business_id,
User.business_role.in_([BusinessRole.admin.value,
BusinessRole.staff.value])
).all()
return [
{"id": user.id, "role": user.business_role, "name":
user.full_name or user.username}
for user in staff
]
@router.get("/chat/history/{session_id}")
async def get_chat_history_endpoint(
session_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
session = db.query(ChatSession).filter_by(id=session_id).first()
if not session:
raise HTTPException(status_code=404, detail="Session not found")
if current_user.id not in [session.user_id, session.admin_id] and
current_user.business_id != session.business_id:
raise HTTPException(status_code=403, detail="Not authorized to
view this chat")
messages = get_chat_history(db, session_id)
return [
{
"id": msg.id,
"sender_id": msg.sender_id,
"content": msg.content,
"timestamp": msg.timestamp.isoformat(),
"is_delivered": msg.is_delivered
}
for msg in messages
]
@router.websocket("/business/{business_id}/owner/{user_id}")
async def business_owner_socket(websocket: WebSocket, business_id: str,
db: Session = Depends(get_db), user_id: str = None):
await websocket.accept()
try:
# Validate user with cookie-based session
# current_user = await get_current_user(cookie=cookie, db=db)
if cookie else None
current_user: User = get_user_by_id(db, user_id) if user_id
else None
if not current_user:
await websocket.send_json({"error": "Invalid or
unauthorized user"})
await websocket.close()
return
# Verify user is owner or admin of the business
if current_user.business_id != business_id or
current_user.business_role not in [BusinessRole.owner.value,
BusinessRole.admin.value]:
await websocket.send_json({"error": "Not authorized to
access this business chat"})
await websocket.close()
return
business = db.query(Business).filter_by(id=business_id).first()
if not business:
await websocket.send_json({"error": "Business not found"})
await websocket.close()
return
# Connect as admin (owner/admin uses admin connection for
messaging)
await manager.connect_admind(current_user.id, websocket)
print(f"Business owner/admin connected:
user_id={current_user.id}, business_id={business_id}")
try:
while True:
data = await websocket.receive_json()
session_id = data.get("session_id")
content = data.get("content")
target_user = data.get("user_id")
if not all([session_id, content, target_user]):
await websocket.send_json({"error": "Missing
session_id, content, or user_id"})
continue
session = db.query(ChatSession).filter(
ChatSession.id == session_id,
ChatSession.user_id == target_user,
ChatSession.business_id == business_id,
ChatSession.is_active == True
).first()
if not session:
await websocket.send_json({"error": "No active chat
session found"})
continue
saved_message = save_message(db, session_id=session.id,
sender_id=current_user.id, content=content)
await manager.send_to_user(target_user, json.dumps({
"session_id": session.id,
"admin_id": current_user.id,
"content": content,
"business_name": business.name,
"timestamp": saved_message.timestamp.isoformat()
}))
await websocket.send_json({
"message": f"You to {target_user}: {content}",
"timestamp": saved_message.timestamp.isoformat()
})
mark_message_delivered(db, saved_message.id)
except WebSocketDisconnect:
manager.disconnect_admin(current_user.id)
except Exception as e:
await websocket.send_json({"error": str(e)})
manager.disconnect_admin(current_user.id)
except HTTPException as e:
await websocket.send_json({"error": e.detail})
await websocket.close()
from fastapi import WebSocket
from typing import Dict, Optional, List
from sqlalchemy.orm import Session
from app.models.user import User, BusinessRole
from app.models.business import Business
from app.models.chat import ChatSession, ChatSessionStatus
from collections import defaultdict
class ConnectionManager:
def __init__(self):
self.user_connections: Dict[str, WebSocket] = {}
self.admin_connections: Dict[str, WebSocket] = {}
self.active_sessions: Dict[str, str] = {} # user_id ->
admin_id
self.message_queue: Dict[str, List[str]] = defaultdict(list) #
user_id -> [messages]
async def connect_user(self, user_id: str, websocket: WebSocket):
await websocket.accept()
self.user_connections[user_id] = websocket
# Send queued messages if any
if user_id in self.message_queue:
for message in self.message_queue[user_id]:
await websocket.send_text(message)
del self.message_queue[user_id]
print(f"[User Connected] {user_id}")
async def connect_admin(self, admin_id: str, websocket: WebSocket):
await websocket.accept()
self.admin_connections[admin_id] = websocket
# Send queued messages
if admin_id in self.message_queue:
for message in self.message_queue[admin_id]:
await websocket.send_text(message)
del self.message_queue[admin_id]
print(f"[Admin Connected] {admin_id}")
async def connect_admind(self, admin_id: str, websocket: WebSocket):
# await websocket.accept()
self.admin_connections[admin_id] = websocket
# Send queued messages
if admin_id in self.message_queue:
for message in self.message_queue[admin_id]:
await websocket.send_text(message)
del self.message_queue[admin_id]
print(f"[Admin Connected] {admin_id}")
def disconnect_user(self, user_id: str):
self.user_connections.pop(user_id, None)
self.active_sessions.pop(user_id, None)
print(f"[User Disconnected] {user_id}")
def disconnect_admin(self, admin_id: str):
self.admin_connections.pop(admin_id, None)
disconnected = [u for u, a in self.active_sessions.items() if a
== admin_id]
for u in disconnected:
self.active_sessions.pop(u, None)
print(f"[Admin Disconnected] {admin_id}")
def register_session(self, user_id: str, admin_id: str):
self.active_sessions[user_id] = admin_id
def get_session_admin(self, user_id: str) -> Optional[str]:
return self.active_sessions.get(user_id)
async def send_to_user(self, user_id: str, message: str,
queue_if_offline: bool = True):
ws = self.user_connections.get(user_id)
if ws:
await ws.send_text(message)
elif queue_if_offline:
self.message_queue[user_id].append(message)
async def send_to_admin(self, admin_id: str, message: str,
queue_if_offline: bool = True):
ws = self.admin_connections.get(admin_id)
if ws:
await ws.send_text(message)
elif queue_if_offline:
self.message_queue[admin_id].append(message)
async def send_to_business(self, business_id: str, message: str, db:
Session):
recipients = db.query(User).filter(
User.business_id == business_id,
User.business_role.in_([BusinessRole.owner.value,
BusinessRole.admin.value])
).all()
for recipient in recipients:
await self.send_to_admin(recipient.id, message)
async def redirect_session(self, session_id: str, new_recipient_id:
str, db: Session):
session = db.query(ChatSession).filter_by(id=session_id).first()
if not session:
return False
old_admin_id = session.admin_id
session.admin_id = new_recipient_id
session.status = ChatSessionStatus.ACTIVE
db.commit()
# Update active_sessions
if session.user_id in self.active_sessions:
self.active_sessions[session.user_id] = new_recipient_id
# Notify new recipient
await self.send_to_admin(new_recipient_id, f"Assigned to
session {session_id} with user {session.user_id}")
# Notify old admin
if old_admin_id:
await self.send_to_admin(old_admin_id, f"Session
{session_id} reassigned to {new_recipient_id}")
# Notify user
business =
db.query(Business).filter_by(id=session.business_id).first()
await self.send_to_user(session.user_id, f"Your chat with
{business.name if business else 'business'} has been reassigned.")
return True
manager = ConnectionManager()