
Some checks failed
Deploy to Production, build images and push to Gitea Registry / build_and_push (pull_request) Failing after 1m17s
This commit includes several key updates and new features: - Enhanced WebSocket functionality across various components, improving real-time communication and user experience. - Introduced new components for managing settlements, including `SettlementCard.vue`, `SettlementForm.vue`, and `SuggestedSettlementsCard.vue`, to streamline financial interactions. - Updated existing components and services to support the new settlement features, ensuring consistency and improved performance. - Added advanced performance optimizations to enhance loading times and responsiveness throughout the application. These changes aim to provide a more robust and user-friendly experience in managing financial settlements and real-time interactions.
263 lines
9.3 KiB
Python
263 lines
9.3 KiB
Python
# be/app/core/websocket.py
|
|
# Real-time WebSocket Manager - Unified event broadcasting system
|
|
|
|
import json
|
|
import asyncio
|
|
from typing import Dict, List, Set, Any, Optional, Union
|
|
from datetime import datetime
|
|
from fastapi import WebSocket, WebSocketDisconnect
|
|
from dataclasses import dataclass, asdict
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class WebSocketEvent:
|
|
"""Unified WebSocket event structure"""
|
|
event: str
|
|
payload: Any
|
|
timestamp: str
|
|
user_id: Optional[int] = None
|
|
room: Optional[str] = None
|
|
|
|
def to_json(self) -> str:
|
|
return json.dumps(asdict(self))
|
|
|
|
@dataclass
|
|
class ConnectedClient:
|
|
"""Represents a connected WebSocket client"""
|
|
websocket: WebSocket
|
|
user_id: int
|
|
user_name: str
|
|
rooms: Set[str]
|
|
connected_at: datetime
|
|
last_activity: datetime
|
|
|
|
class WebSocketManager:
|
|
"""
|
|
Manages WebSocket connections, rooms, and event broadcasting
|
|
Supports household-wide and entity-specific room subscriptions
|
|
"""
|
|
|
|
def __init__(self):
|
|
# Active connections by user_id
|
|
self.connections: Dict[int, ConnectedClient] = {}
|
|
|
|
# Room subscriptions: room_name -> set of user_ids
|
|
self.rooms: Dict[str, Set[int]] = {}
|
|
|
|
# Event queue for offline users
|
|
self.offline_events: Dict[int, List[WebSocketEvent]] = {}
|
|
|
|
async def connect(self, websocket: WebSocket, user_id: int, user_name: str) -> bool:
|
|
"""Connect a user and store their WebSocket connection"""
|
|
try:
|
|
await websocket.accept()
|
|
|
|
# Remove existing connection if any
|
|
if user_id in self.connections:
|
|
await self.disconnect(user_id)
|
|
|
|
client = ConnectedClient(
|
|
websocket=websocket,
|
|
user_id=user_id,
|
|
user_name=user_name,
|
|
rooms=set(),
|
|
connected_at=datetime.utcnow(),
|
|
last_activity=datetime.utcnow()
|
|
)
|
|
|
|
self.connections[user_id] = client
|
|
|
|
# Send any queued offline events
|
|
await self._send_offline_events(user_id)
|
|
|
|
logger.info(f"WebSocket connected: user {user_id} ({user_name})")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"WebSocket connection failed for user {user_id}: {e}")
|
|
return False
|
|
|
|
async def disconnect(self, user_id: int):
|
|
"""Disconnect a user and clean up their subscriptions"""
|
|
if user_id not in self.connections:
|
|
return
|
|
|
|
client = self.connections[user_id]
|
|
|
|
# Remove from all rooms
|
|
for room in list(client.rooms):
|
|
self.leave_room(user_id, room)
|
|
|
|
# Close WebSocket connection
|
|
try:
|
|
await client.websocket.close()
|
|
except Exception:
|
|
pass
|
|
|
|
# Remove from connections
|
|
del self.connections[user_id]
|
|
|
|
logger.info(f"WebSocket disconnected: user {user_id}")
|
|
|
|
def join_room(self, user_id: int, room: str):
|
|
"""Subscribe user to a room for targeted events"""
|
|
if user_id not in self.connections:
|
|
return False
|
|
|
|
# Add to room
|
|
if room not in self.rooms:
|
|
self.rooms[room] = set()
|
|
self.rooms[room].add(user_id)
|
|
|
|
# Update client rooms
|
|
self.connections[user_id].rooms.add(room)
|
|
|
|
logger.debug(f"User {user_id} joined room: {room}")
|
|
return True
|
|
|
|
def leave_room(self, user_id: int, room: str):
|
|
"""Unsubscribe user from a room"""
|
|
if room in self.rooms:
|
|
self.rooms[room].discard(user_id)
|
|
|
|
# Clean up empty rooms
|
|
if not self.rooms[room]:
|
|
del self.rooms[room]
|
|
|
|
if user_id in self.connections:
|
|
self.connections[user_id].rooms.discard(room)
|
|
|
|
logger.debug(f"User {user_id} left room: {room}")
|
|
|
|
async def send_to_user(self, user_id: int, event: WebSocketEvent):
|
|
"""Send event to a specific user"""
|
|
if user_id in self.connections:
|
|
client = self.connections[user_id]
|
|
try:
|
|
await client.websocket.send_text(event.to_json())
|
|
client.last_activity = datetime.utcnow()
|
|
logger.debug(f"Sent event {event.event} to user {user_id}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to send event to user {user_id}: {e}")
|
|
await self.disconnect(user_id)
|
|
else:
|
|
# Queue for offline delivery
|
|
if user_id not in self.offline_events:
|
|
self.offline_events[user_id] = []
|
|
self.offline_events[user_id].append(event)
|
|
logger.debug(f"Queued event {event.event} for offline user {user_id}")
|
|
|
|
async def send_to_room(self, room: str, event: WebSocketEvent, exclude_user: Optional[int] = None):
|
|
"""Send event to all users in a room"""
|
|
if room not in self.rooms:
|
|
return
|
|
|
|
event.room = room
|
|
tasks = []
|
|
|
|
for user_id in self.rooms[room].copy(): # Copy to avoid modification during iteration
|
|
if exclude_user and user_id == exclude_user:
|
|
continue
|
|
|
|
tasks.append(self.send_to_user(user_id, event))
|
|
|
|
if tasks:
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
logger.debug(f"Sent event {event.event} to room {room} ({len(tasks)} users)")
|
|
|
|
async def broadcast_to_household(self, household_id: int, event: WebSocketEvent, exclude_user: Optional[int] = None):
|
|
"""Send event to all members of a household"""
|
|
room = f"household:{household_id}"
|
|
await self.send_to_room(room, event, exclude_user)
|
|
|
|
async def broadcast_to_list(self, list_id: int, event: WebSocketEvent, exclude_user: Optional[int] = None):
|
|
"""Send event to all users watching a specific list"""
|
|
room = f"list:{list_id}"
|
|
await self.send_to_room(room, event, exclude_user)
|
|
|
|
async def broadcast_to_expense(self, expense_id: int, event: WebSocketEvent, exclude_user: Optional[int] = None):
|
|
"""Send event to all users involved in an expense"""
|
|
room = f"expense:{expense_id}"
|
|
await self.send_to_room(room, event, exclude_user)
|
|
|
|
async def _send_offline_events(self, user_id: int):
|
|
"""Send queued events to a user who just connected"""
|
|
if user_id not in self.offline_events:
|
|
return
|
|
|
|
events = self.offline_events[user_id]
|
|
for event in events:
|
|
await self.send_to_user(user_id, event)
|
|
|
|
# Clear the queue
|
|
del self.offline_events[user_id]
|
|
logger.info(f"Sent {len(events)} offline events to user {user_id}")
|
|
|
|
def get_room_members(self, room: str) -> List[int]:
|
|
"""Get list of user IDs in a room"""
|
|
return list(self.rooms.get(room, set()))
|
|
|
|
def get_connected_users(self) -> List[int]:
|
|
"""Get list of all connected user IDs"""
|
|
return list(self.connections.keys())
|
|
|
|
def is_user_online(self, user_id: int) -> bool:
|
|
"""Check if a user is currently connected"""
|
|
return user_id in self.connections
|
|
|
|
async def ping_all(self):
|
|
"""Send ping to all connected users to keep connections alive"""
|
|
ping_event = WebSocketEvent(
|
|
event="ping",
|
|
payload={"timestamp": datetime.utcnow().isoformat()},
|
|
timestamp=datetime.utcnow().isoformat()
|
|
)
|
|
|
|
tasks = []
|
|
for user_id in list(self.connections.keys()):
|
|
tasks.append(self.send_to_user(user_id, ping_event))
|
|
|
|
if tasks:
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Global WebSocket manager instance
|
|
websocket_manager = WebSocketManager()
|
|
|
|
# Utility functions for creating common events
|
|
def create_expense_event(event_type: str, payload: Dict[str, Any], user_id: Optional[int] = None) -> WebSocketEvent:
|
|
"""Create a standardized expense-related WebSocket event"""
|
|
return WebSocketEvent(
|
|
event=event_type,
|
|
payload=payload,
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
user_id=user_id
|
|
)
|
|
|
|
def create_chore_event(event_type: str, payload: Dict[str, Any], user_id: Optional[int] = None) -> WebSocketEvent:
|
|
"""Create a standardized chore-related WebSocket event"""
|
|
return WebSocketEvent(
|
|
event=event_type,
|
|
payload=payload,
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
user_id=user_id
|
|
)
|
|
|
|
def create_group_event(event_type: str, payload: Dict[str, Any], user_id: Optional[int] = None) -> WebSocketEvent:
|
|
"""Create a standardized group-related WebSocket event"""
|
|
return WebSocketEvent(
|
|
event=event_type,
|
|
payload=payload,
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
user_id=user_id
|
|
)
|
|
|
|
def create_list_event(event_type: str, payload: Dict[str, Any], user_id: Optional[int] = None) -> WebSocketEvent:
|
|
"""Create a standardized list-related WebSocket event"""
|
|
return WebSocketEvent(
|
|
event=event_type,
|
|
payload=payload,
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
user_id=user_id
|
|
) |