diff --git a/be/app/api/v1/endpoints/items.py b/be/app/api/v1/endpoints/items.py index eae8b2f..ec554da 100644 --- a/be/app/api/v1/endpoints/items.py +++ b/be/app/api/v1/endpoints/items.py @@ -243,4 +243,41 @@ async def unclaim_item( } await broadcast_event(f"list_{updated_item.list_id}", event) - return updated_item \ No newline at end of file + return updated_item + + +@router.put( + "/lists/{list_id}/items/reorder", + status_code=status.HTTP_204_NO_CONTENT, + summary="Reorder Items in List", + tags=["Items"] +) +async def reorder_list_items( + list_id: int, + ordered_ids: PyList[int], + db: AsyncSession = Depends(get_transactional_session), + current_user: UserModel = Depends(current_active_user), +): + """Reorders items in a list based on the provided ordered list of item IDs.""" + user_email = current_user.email + logger.info(f"User {user_email} reordering items in list {list_id}: {ordered_ids}") + + try: + await crud_list.check_list_permission(db=db, list_id=list_id, user_id=current_user.id) + except ListPermissionError as e: + raise ListPermissionError(list_id, "reorder items in this list") + + await crud_item.reorder_items(db=db, list_id=list_id, ordered_ids=ordered_ids) + + # Broadcast the reorder event + event = { + "type": "item_reordered", + "payload": { + "list_id": list_id, + "ordered_ids": ordered_ids + } + } + await broadcast_event(f"list_{list_id}", event) + + logger.info(f"Items in list {list_id} reordered successfully by user {user_email}.") + return Response(status_code=status.HTTP_204_NO_CONTENT) \ No newline at end of file diff --git a/be/app/api/v1/endpoints/websocket.py b/be/app/api/v1/endpoints/websocket.py index d2a31cd..98cc4be 100644 --- a/be/app/api/v1/endpoints/websocket.py +++ b/be/app/api/v1/endpoints/websocket.py @@ -1,79 +1,280 @@ from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query, status, Depends from sqlalchemy.ext.asyncio import AsyncSession -from app.auth import get_jwt_strategy, get_user_from_token +from app.auth import get_user_from_token from app.database import get_transactional_session -from app.crud import list as crud_list -from app.core.redis import subscribe_to_channel, unsubscribe_from_channel +from app.crud import list as crud_list, group as crud_group +from app.core.websocket import websocket_manager, WebSocketEvent from app.models import User import asyncio +import json +import logging router = APIRouter() +logger = logging.getLogger(__name__) + +@router.websocket("/ws/household/{household_id}") +async def household_websocket_endpoint( + websocket: WebSocket, + household_id: int, + token: str = Query(...), + db: AsyncSession = Depends(get_transactional_session), +): + """ + Main WebSocket endpoint for household-wide real-time features + Handles: chores, expenses, general household activity, member presence + """ + user = await get_user_from_token(token, db) + if not user: + await websocket.close(code=status.WS_1008_POLICY_VIOLATION) + return -async def _verify_jwt(token: str): - """Return user_id from JWT or None if invalid/expired.""" - strategy = get_jwt_strategy() try: - # FastAPI Users' JWTStrategy.read_token returns the user ID encoded in the token - user_id = await strategy.read_token(token) - return user_id - except Exception: # pragma: no cover – any decoding/expiration error - return None + # Verify user is member of this household + household = await crud_group.get_group_with_user_check(db, household_id, user.id) + if not household: + await websocket.close(code=status.WS_1003_UNSUPPORTED_DATA) + return + except Exception as e: + logger.error(f"Household permission check failed: {e}") + await websocket.close(code=status.WS_1003_UNSUPPORTED_DATA) + return + # Connect to WebSocket manager + connected = await websocket_manager.connect(websocket, user.id, user.name or f"User {user.id}") + if not connected: + return -@router.websocket("/ws/lists/{list_id}") + # Subscribe to household room + household_room = f"household:{household_id}" + websocket_manager.join_room(user.id, household_room) + + # Notify other members that user came online + member_activity_event = WebSocketEvent( + event="group:member_activity", + payload={ + "user_id": user.id, + "last_seen": user.created_at.isoformat(), # Use created_at as placeholder + "is_active": True, + "action": "connected" + }, + timestamp=websocket_manager.connections[user.id].connected_at.isoformat(), + user_id=user.id + ) + await websocket_manager.send_to_room(household_room, member_activity_event, exclude_user=user.id) + + try: + # Send welcome message + welcome_event = WebSocketEvent( + event="connection:established", + payload={ + "message": f"Connected to household {household.name}", + "household_id": household_id, + "rooms": [household_room] + }, + timestamp=websocket_manager.connections[user.id].connected_at.isoformat() + ) + await websocket_manager.send_to_user(user.id, welcome_event) + + # Keep connection alive and handle incoming messages + while True: + try: + # Wait for incoming messages + message = await websocket.receive_text() + data = json.loads(message) + + # Handle different message types + await handle_websocket_message(user.id, household_id, data, db) + + except WebSocketDisconnect: + break + except json.JSONDecodeError: + logger.warning(f"Invalid JSON from user {user.id}") + except Exception as e: + logger.error(f"Error handling WebSocket message from user {user.id}: {e}") + + except WebSocketDisconnect: + pass + except Exception as e: + logger.error(f"WebSocket error for user {user.id} in household {household_id}: {e}") + finally: + # Notify other members that user went offline + offline_event = WebSocketEvent( + event="group:member_activity", + payload={ + "user_id": user.id, + "last_seen": websocket_manager.connections[user.id].last_activity.isoformat() if user.id in websocket_manager.connections else None, + "is_active": False, + "action": "disconnected" + }, + timestamp=websocket_manager.connections[user.id].last_activity.isoformat() if user.id in websocket_manager.connections else None, + user_id=user.id + ) + await websocket_manager.send_to_room(household_room, offline_event, exclude_user=user.id) + + # Disconnect from WebSocket manager + await websocket_manager.disconnect(user.id) + +@router.websocket("/ws/list/{list_id}") async def list_websocket_endpoint( websocket: WebSocket, list_id: int, token: str = Query(...), db: AsyncSession = Depends(get_transactional_session), ): - """Authenticated WebSocket endpoint for a specific list.""" + """ + WebSocket endpoint for list-specific real-time features + Handles: item claiming, real-time collaboration, purchase confirmations + """ user = await get_user_from_token(token, db) if not user: await websocket.close(code=status.WS_1008_POLICY_VIOLATION) return try: - # Verify the user has access to this list's group + # Verify user has access to this list await crud_list.check_list_permission(db, list_id, user.id) - except Exception: + except Exception as e: + logger.error(f"List permission check failed: {e}") await websocket.close(code=status.WS_1003_UNSUPPORTED_DATA) return - await websocket.accept() - - # Temporary: Test connection without Redis - try: - print(f"WebSocket connected for list {list_id}, user {user.id}") - # Send a test message - await websocket.send_text('{"event": "connected", "payload": {"message": "WebSocket connected successfully"}}') - - # Keep connection alive - while True: - await asyncio.sleep(10) - # Send periodic ping to keep connection alive - await websocket.send_text('{"event": "ping", "payload": {}}') - except WebSocketDisconnect: - print(f"WebSocket disconnected for list {list_id}, user {user.id}") - pass - except Exception as e: - print(f"WebSocket error for list {list_id}, user {user.id}: {e}") - pass + # Connect to WebSocket manager (reuse existing connection if available) + if user.id not in websocket_manager.connections: + connected = await websocket_manager.connect(websocket, user.id, user.name or f"User {user.id}") + if not connected: + return + else: + # Use existing connection but accept this websocket for this specific list + await websocket.accept() -@router.websocket("/ws/{household_id}") -async def household_websocket_endpoint( - websocket: WebSocket, - household_id: int, - token: str = Query(...), - db: AsyncSession = Depends(get_transactional_session) -): - """Authenticated WebSocket endpoint for a household.""" - user = await get_user_from_token(token, db) - if not user: - await websocket.close(code=status.WS_1008_POLICY_VIOLATION) - return - - # TODO: Add permission check for household - - await websocket.accept() - # ... rest of household logic \ No newline at end of file + # Subscribe to list-specific room + list_room = f"list:{list_id}" + websocket_manager.join_room(user.id, list_room) + + try: + # Send list connection confirmation + list_welcome = WebSocketEvent( + event="list:connected", + payload={ + "message": f"Connected to list {list_id}", + "list_id": list_id, + "active_users": websocket_manager.get_room_members(list_room) + }, + timestamp=websocket_manager.connections[user.id].last_activity.isoformat() + ) + await websocket_manager.send_to_user(user.id, list_welcome) + + # Keep connection alive for list-specific events + while True: + try: + # For list-specific endpoint, we primarily listen for events + # Most interactions will be handled through the main household WebSocket + await asyncio.sleep(30) # Periodic check + + # Send periodic ping to keep list connection active + ping_event = WebSocketEvent( + event="list:ping", + payload={"list_id": list_id}, + timestamp=websocket_manager.connections[user_id].last_activity.isoformat() if user.id in websocket_manager.connections else None + ) + await websocket_manager.send_to_user(user.id, ping_event) + + except WebSocketDisconnect: + break + except Exception as e: + logger.error(f"List WebSocket error for user {user.id}: {e}") + break + + except WebSocketDisconnect: + pass + finally: + # Leave list room but don't disconnect main connection + websocket_manager.leave_room(user.id, list_room) + +async def handle_websocket_message(user_id: int, household_id: int, data: dict, db: AsyncSession): + """ + Handle incoming WebSocket messages from clients + Supports editing indicators, presence updates, etc. + """ + message_type = data.get("type") + payload = data.get("payload", {}) + + if message_type == "editing_started": + # User started editing an entity + entity_type = payload.get("entity_type") + entity_id = payload.get("entity_id") + field = payload.get("field") + + if entity_type and entity_id: + event = WebSocketEvent( + event=f"{entity_type}:editing_started", + payload={ + f"{entity_type}_id": entity_id, + "user_id": user_id, + "user_name": websocket_manager.connections[user_id].user_name, + "field": field + }, + timestamp=websocket_manager.connections[user_id].last_activity.isoformat(), + user_id=user_id + ) + + # Send to appropriate room + if entity_type == "expense": + await websocket_manager.broadcast_to_expense(entity_id, event, exclude_user=user_id) + elif entity_type == "chore": + await websocket_manager.broadcast_to_household(household_id, event, exclude_user=user_id) + elif entity_type == "list": + await websocket_manager.broadcast_to_list(entity_id, event, exclude_user=user_id) + + elif message_type == "editing_stopped": + # User stopped editing an entity + entity_type = payload.get("entity_type") + entity_id = payload.get("entity_id") + + if entity_type and entity_id: + event = WebSocketEvent( + event=f"{entity_type}:editing_stopped", + payload={f"{entity_type}_id": entity_id}, + timestamp=websocket_manager.connections[user_id].last_activity.isoformat(), + user_id=user_id + ) + + # Send to appropriate room + if entity_type == "expense": + await websocket_manager.broadcast_to_expense(entity_id, event, exclude_user=user_id) + elif entity_type == "chore": + await websocket_manager.broadcast_to_household(household_id, event, exclude_user=user_id) + elif entity_type == "list": + await websocket_manager.broadcast_to_list(entity_id, event, exclude_user=user_id) + + elif message_type == "presence_update": + # User presence/activity update + action = payload.get("action", "active") + + event = WebSocketEvent( + event="group:member_activity", + payload={ + "user_id": user_id, + "last_seen": websocket_manager.connections[user_id].last_activity.isoformat(), + "is_active": True, + "action": action + }, + timestamp=websocket_manager.connections[user_id].last_activity.isoformat(), + user_id=user_id + ) + + await websocket_manager.broadcast_to_household(household_id, event, exclude_user=user_id) + + else: + logger.warning(f"Unknown WebSocket message type: {message_type} from user {user_id}") + +# Utility endpoint to get current WebSocket status +@router.get("/ws/status") +async def websocket_status(): + """Get current WebSocket connection status""" + return { + "connected_users": websocket_manager.get_connected_users(), + "total_connections": len(websocket_manager.connections), + "total_rooms": len(websocket_manager.rooms), + "rooms": {room: len(users) for room, users in websocket_manager.rooms.items()} + } \ No newline at end of file diff --git a/be/app/auth.py b/be/app/auth.py index 31ad6c4..7497e3d 100644 --- a/be/app/auth.py +++ b/be/app/auth.py @@ -183,18 +183,28 @@ async def get_user_from_token(token: str, db: AsyncSession) -> Optional[User]: strategy = get_jwt_strategy() + print(f"Attempting to decode token: {token[:50]}...") try: - user_id = await strategy.read_token(token) + user_id_raw = await strategy.read_token(token) + print(f"Raw user_id from token: {user_id_raw} (type: {type(user_id_raw)})") + # FastAPI Users may return the user ID as a string – convert to int for DB lookup + user_id = int(user_id_raw) + print(f"Converted user_id: {user_id}") except Exception: - # Any decoding/parsing/expiry error – treat as invalid token + # Any decoding/parsing/expiry error or malformed data – treat as invalid token + print("Token decode failed") return None # Fetch the user from the database. We avoid failing hard – return ``None`` # if the user does not exist or is inactive/deleted. + print(f"Querying database for user_id: {user_id}") result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() + print(f"Database query result: {user}") if user is None or getattr(user, "is_deleted", False) or not user.is_active: + print(f"User validation failed - user: {user}, is_deleted: {getattr(user, 'is_deleted', False) if user else 'N/A'}, is_active: {user.is_active if user else 'N/A'}") return None + print(f"Successfully authenticated user: {user.id}") return user \ No newline at end of file diff --git a/be/app/core/websocket.py b/be/app/core/websocket.py new file mode 100644 index 0000000..a257c3c --- /dev/null +++ b/be/app/core/websocket.py @@ -0,0 +1,263 @@ +# be/app/core/websocket.py +# Real-time WebSocket Manager - Unified event broadcasting system + +import json +import asyncio +from typing import Dict, List, Set, Any, Optional, Union +from datetime import datetime +from fastapi import WebSocket, WebSocketDisconnect +from dataclasses import dataclass, asdict +import logging + +logger = logging.getLogger(__name__) + +@dataclass +class WebSocketEvent: + """Unified WebSocket event structure""" + event: str + payload: Any + timestamp: str + user_id: Optional[int] = None + room: Optional[str] = None + + def to_json(self) -> str: + return json.dumps(asdict(self)) + +@dataclass +class ConnectedClient: + """Represents a connected WebSocket client""" + websocket: WebSocket + user_id: int + user_name: str + rooms: Set[str] + connected_at: datetime + last_activity: datetime + +class WebSocketManager: + """ + Manages WebSocket connections, rooms, and event broadcasting + Supports household-wide and entity-specific room subscriptions + """ + + def __init__(self): + # Active connections by user_id + self.connections: Dict[int, ConnectedClient] = {} + + # Room subscriptions: room_name -> set of user_ids + self.rooms: Dict[str, Set[int]] = {} + + # Event queue for offline users + self.offline_events: Dict[int, List[WebSocketEvent]] = {} + + async def connect(self, websocket: WebSocket, user_id: int, user_name: str) -> bool: + """Connect a user and store their WebSocket connection""" + try: + await websocket.accept() + + # Remove existing connection if any + if user_id in self.connections: + await self.disconnect(user_id) + + client = ConnectedClient( + websocket=websocket, + user_id=user_id, + user_name=user_name, + rooms=set(), + connected_at=datetime.utcnow(), + last_activity=datetime.utcnow() + ) + + self.connections[user_id] = client + + # Send any queued offline events + await self._send_offline_events(user_id) + + logger.info(f"WebSocket connected: user {user_id} ({user_name})") + return True + + except Exception as e: + logger.error(f"WebSocket connection failed for user {user_id}: {e}") + return False + + async def disconnect(self, user_id: int): + """Disconnect a user and clean up their subscriptions""" + if user_id not in self.connections: + return + + client = self.connections[user_id] + + # Remove from all rooms + for room in list(client.rooms): + self.leave_room(user_id, room) + + # Close WebSocket connection + try: + await client.websocket.close() + except Exception: + pass + + # Remove from connections + del self.connections[user_id] + + logger.info(f"WebSocket disconnected: user {user_id}") + + def join_room(self, user_id: int, room: str): + """Subscribe user to a room for targeted events""" + if user_id not in self.connections: + return False + + # Add to room + if room not in self.rooms: + self.rooms[room] = set() + self.rooms[room].add(user_id) + + # Update client rooms + self.connections[user_id].rooms.add(room) + + logger.debug(f"User {user_id} joined room: {room}") + return True + + def leave_room(self, user_id: int, room: str): + """Unsubscribe user from a room""" + if room in self.rooms: + self.rooms[room].discard(user_id) + + # Clean up empty rooms + if not self.rooms[room]: + del self.rooms[room] + + if user_id in self.connections: + self.connections[user_id].rooms.discard(room) + + logger.debug(f"User {user_id} left room: {room}") + + async def send_to_user(self, user_id: int, event: WebSocketEvent): + """Send event to a specific user""" + if user_id in self.connections: + client = self.connections[user_id] + try: + await client.websocket.send_text(event.to_json()) + client.last_activity = datetime.utcnow() + logger.debug(f"Sent event {event.event} to user {user_id}") + except Exception as e: + logger.error(f"Failed to send event to user {user_id}: {e}") + await self.disconnect(user_id) + else: + # Queue for offline delivery + if user_id not in self.offline_events: + self.offline_events[user_id] = [] + self.offline_events[user_id].append(event) + logger.debug(f"Queued event {event.event} for offline user {user_id}") + + async def send_to_room(self, room: str, event: WebSocketEvent, exclude_user: Optional[int] = None): + """Send event to all users in a room""" + if room not in self.rooms: + return + + event.room = room + tasks = [] + + for user_id in self.rooms[room].copy(): # Copy to avoid modification during iteration + if exclude_user and user_id == exclude_user: + continue + + tasks.append(self.send_to_user(user_id, event)) + + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + logger.debug(f"Sent event {event.event} to room {room} ({len(tasks)} users)") + + async def broadcast_to_household(self, household_id: int, event: WebSocketEvent, exclude_user: Optional[int] = None): + """Send event to all members of a household""" + room = f"household:{household_id}" + await self.send_to_room(room, event, exclude_user) + + async def broadcast_to_list(self, list_id: int, event: WebSocketEvent, exclude_user: Optional[int] = None): + """Send event to all users watching a specific list""" + room = f"list:{list_id}" + await self.send_to_room(room, event, exclude_user) + + async def broadcast_to_expense(self, expense_id: int, event: WebSocketEvent, exclude_user: Optional[int] = None): + """Send event to all users involved in an expense""" + room = f"expense:{expense_id}" + await self.send_to_room(room, event, exclude_user) + + async def _send_offline_events(self, user_id: int): + """Send queued events to a user who just connected""" + if user_id not in self.offline_events: + return + + events = self.offline_events[user_id] + for event in events: + await self.send_to_user(user_id, event) + + # Clear the queue + del self.offline_events[user_id] + logger.info(f"Sent {len(events)} offline events to user {user_id}") + + def get_room_members(self, room: str) -> List[int]: + """Get list of user IDs in a room""" + return list(self.rooms.get(room, set())) + + def get_connected_users(self) -> List[int]: + """Get list of all connected user IDs""" + return list(self.connections.keys()) + + def is_user_online(self, user_id: int) -> bool: + """Check if a user is currently connected""" + return user_id in self.connections + + async def ping_all(self): + """Send ping to all connected users to keep connections alive""" + ping_event = WebSocketEvent( + event="ping", + payload={"timestamp": datetime.utcnow().isoformat()}, + timestamp=datetime.utcnow().isoformat() + ) + + tasks = [] + for user_id in list(self.connections.keys()): + tasks.append(self.send_to_user(user_id, ping_event)) + + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + +# Global WebSocket manager instance +websocket_manager = WebSocketManager() + +# Utility functions for creating common events +def create_expense_event(event_type: str, payload: Dict[str, Any], user_id: Optional[int] = None) -> WebSocketEvent: + """Create a standardized expense-related WebSocket event""" + return WebSocketEvent( + event=event_type, + payload=payload, + timestamp=datetime.utcnow().isoformat(), + user_id=user_id + ) + +def create_chore_event(event_type: str, payload: Dict[str, Any], user_id: Optional[int] = None) -> WebSocketEvent: + """Create a standardized chore-related WebSocket event""" + return WebSocketEvent( + event=event_type, + payload=payload, + timestamp=datetime.utcnow().isoformat(), + user_id=user_id + ) + +def create_group_event(event_type: str, payload: Dict[str, Any], user_id: Optional[int] = None) -> WebSocketEvent: + """Create a standardized group-related WebSocket event""" + return WebSocketEvent( + event=event_type, + payload=payload, + timestamp=datetime.utcnow().isoformat(), + user_id=user_id + ) + +def create_list_event(event_type: str, payload: Dict[str, Any], user_id: Optional[int] = None) -> WebSocketEvent: + """Create a standardized list-related WebSocket event""" + return WebSocketEvent( + event=event_type, + payload=payload, + timestamp=datetime.utcnow().isoformat(), + user_id=user_id + ) \ No newline at end of file diff --git a/be/app/crud/chore.py b/be/app/crud/chore.py index f260b45..bd989c7 100644 --- a/be/app/crud/chore.py +++ b/be/app/crud/chore.py @@ -12,6 +12,7 @@ from app.core.chore_utils import calculate_next_due_date from app.crud.group import get_group_by_id, is_user_member, get_user_role_in_group from app.crud.history import create_chore_history_entry, create_assignment_history_entry from app.core.exceptions import ChoreNotFoundError, GroupNotFoundError, PermissionDeniedError, DatabaseIntegrityError +from app.core.websocket import websocket_manager, create_chore_event logger = logging.getLogger(__name__) @@ -157,7 +158,17 @@ async def create_chore( selectinload(Chore.child_chores) ) ) - return result.scalar_one() + loaded_chore = result.scalar_one() + + # Broadcast chore creation event via WebSocket + await _broadcast_chore_event( + "chore:created", + loaded_chore, + user_id, + exclude_user=user_id + ) + + return loaded_chore except Exception as e: logger.error(f"Error creating chore: {e}", exc_info=True) raise DatabaseIntegrityError(f"Could not create chore. Error: {str(e)}") @@ -326,7 +337,17 @@ async def update_chore( .where(Chore.id == db_chore.id) .options(*get_chore_loader_options()) ) - return result.scalar_one() + updated_chore = result.scalar_one() + + # Broadcast chore update event via WebSocket + await _broadcast_chore_event( + "chore:updated", + updated_chore, + user_id, + exclude_user=user_id + ) + + return updated_chore except Exception as e: logger.error(f"Error updating chore {chore_id}: {e}", exc_info=True) raise DatabaseIntegrityError(f"Could not update chore {chore_id}. Error: {str(e)}") @@ -352,6 +373,14 @@ async def delete_chore( event_data={"chore_name": db_chore.name} ) + # Broadcast chore deletion event via WebSocket (before actual deletion) + await _broadcast_chore_event( + "chore:deleted", + db_chore, + user_id, + exclude_user=user_id + ) + if db_chore.type == ChoreTypeEnum.group: if not group_id: raise ValueError("group_id is required for group chores") @@ -413,7 +442,18 @@ async def create_chore_assignment( .where(ChoreAssignment.id == db_assignment.id) .options(*get_assignment_loader_options()) ) - return result.scalar_one() + loaded_assignment = result.scalar_one() + + # Broadcast chore assignment event via WebSocket + await _broadcast_chore_assignment_event( + "chore:assigned", + loaded_assignment, + chore, + user_id, + exclude_user=user_id + ) + + return loaded_assignment except Exception as e: logger.error(f"Error creating chore assignment: {e}", exc_info=True) raise DatabaseIntegrityError(f"Could not create chore assignment. Error: {str(e)}") @@ -533,7 +573,19 @@ async def update_chore_assignment( .where(ChoreAssignment.id == assignment_id) .options(*get_assignment_loader_options()) ) - return result.scalar_one() + updated_assignment = result.scalar_one() + + # Broadcast assignment update event via WebSocket + event_type = "chore:completed" if updated_assignment.is_complete and not original_status else "chore:assignment_updated" + await _broadcast_chore_assignment_event( + event_type, + updated_assignment, + updated_assignment.chore, + user_id, + exclude_user=user_id + ) + + return updated_assignment except Exception as e: logger.error(f"Error updating assignment: {e}", exc_info=True) await db.rollback() @@ -598,3 +650,91 @@ def get_assignment_loader_options(): selectinload(ChoreAssignment.history).selectinload(ChoreAssignmentHistory.changed_by_user), selectinload(ChoreAssignment.chore).options(*get_chore_loader_options()) ] + + +# WebSocket Broadcasting Helper Functions + +async def _broadcast_chore_event( + event_type: str, + chore: Chore, + user_id: int, + exclude_user: Optional[int] = None +): + """ + Broadcast chore-related events to relevant rooms + Sends to household rooms for group chores, or user-specific events for personal chores + """ + try: + # Prepare event payload + event_payload = { + "chore_id": chore.id, + "name": chore.name, + "description": chore.description, + "type": chore.type.value if chore.type else None, + "frequency": chore.frequency.value if chore.frequency else None, + "next_due_date": chore.next_due_date.isoformat() if chore.next_due_date else None, + "created_by_id": chore.created_by_id, + "group_id": chore.group_id, + "created_at": chore.created_at.isoformat() if chore.created_at else None, + "updated_at": chore.updated_at.isoformat() if chore.updated_at else None, + } + + # Create the WebSocket event + event = create_chore_event(event_type, event_payload, user_id) + + # Broadcast to household if chore belongs to a group + if chore.group_id: + await websocket_manager.broadcast_to_household( + chore.group_id, + event, + exclude_user=exclude_user + ) + else: + # For personal chores, send only to the creator + await websocket_manager.send_to_user(chore.created_by_id, event) + + logger.debug(f"Broadcasted {event_type} event for chore {chore.id}") + + except Exception as e: + # Don't fail the CRUD operation if WebSocket broadcast fails + logger.error(f"Failed to broadcast chore event {event_type} for chore {chore.id}: {e}") + + +async def _broadcast_chore_assignment_event( + event_type: str, + assignment: ChoreAssignment, + chore: Chore, + user_id: int, + exclude_user: Optional[int] = None +): + """ + Broadcast chore assignment-related events for task management + """ + try: + event_payload = { + "assignment_id": assignment.id, + "chore_id": assignment.chore_id, + "chore_name": chore.name, + "assigned_to_user_id": assignment.assigned_to_user_id, + "due_date": assignment.due_date.isoformat() if assignment.due_date else None, + "is_complete": assignment.is_complete, + "completed_at": assignment.completed_at.isoformat() if assignment.completed_at else None, + "group_id": chore.group_id, + } + + event = create_chore_event(event_type, event_payload, user_id) + + # Broadcast to household if chore belongs to a group + if chore.group_id: + await websocket_manager.broadcast_to_household(chore.group_id, event, exclude_user) + else: + # For personal chores, send to creator and assigned user + users_to_notify = {chore.created_by_id, assignment.assigned_to_user_id} + for notify_user_id in users_to_notify: + if exclude_user is None or notify_user_id != exclude_user: + await websocket_manager.send_to_user(notify_user_id, event) + + logger.debug(f"Broadcasted {event_type} event for assignment {assignment.id}") + + except Exception as e: + logger.error(f"Failed to broadcast assignment event {event_type}: {e}") diff --git a/be/app/crud/expense.py b/be/app/crud/expense.py index 1cf8c1a..435c953 100644 --- a/be/app/crud/expense.py +++ b/be/app/crud/expense.py @@ -37,6 +37,8 @@ from app.core.exceptions import ( ) from app.models import RecurrencePattern from app.crud.audit import create_financial_audit_log +# Add WebSocket imports +from app.core.websocket import websocket_manager, create_expense_event # Placeholder for InvalidOperationError if not defined in app.core.exceptions # This should be a proper HTTPException subclass if used in API layer @@ -238,6 +240,14 @@ async def create_expense(db: AsyncSession, expense_in: ExpenseCreate, current_us entity=loaded_expense, ) + # Broadcast expense creation event via WebSocket + await _broadcast_expense_event( + "expense:created", + loaded_expense, + current_user_id, + exclude_user=current_user_id + ) + # await transaction.commit() # Explicit commit removed, context manager handles it. return loaded_expense @@ -718,6 +728,14 @@ async def update_expense(db: AsyncSession, expense_db: ExpenseModel, expense_in: details={"before": before_state, "after": after_state} ) + # Broadcast expense update event via WebSocket + await _broadcast_expense_event( + "expense:updated", + expense_db, + current_user_id, + exclude_user=current_user_id + ) + await db.refresh(expense_db) return expense_db except IntegrityError as e: @@ -757,6 +775,14 @@ async def delete_expense(db: AsyncSession, expense_db: ExpenseModel, current_use details=details ) + # Broadcast expense deletion event via WebSocket (before actual deletion) + await _broadcast_expense_event( + "expense:deleted", + expense_db, + current_user_id, + exclude_user=current_user_id + ) + # Manually delete related records in correct order to avoid foreign key constraint issues from app.models import ExpenseSplit as ExpenseSplitModel, SettlementActivity as SettlementActivityModel @@ -788,8 +814,95 @@ async def delete_expense(db: AsyncSession, expense_db: ExpenseModel, current_use except SQLAlchemyError as e: logger.error(f"Database transaction error during expense deletion for ID {expense_id_for_log}: {str(e)}", exc_info=True) raise DatabaseTransactionError(f"Failed to delete expense ID {expense_id_for_log} due to a database transaction error.") from e - return None -# Note: The InvalidOperationError is a simple ValueError placeholder. -# For API endpoints, these should be translated to appropriate HTTPExceptions. -# Ensure app.core.exceptions has proper HTTP error classes if needed. \ No newline at end of file + +# WebSocket Broadcasting Helper Functions + +async def _broadcast_expense_event( + event_type: str, + expense: ExpenseModel, + user_id: int, + exclude_user: Optional[int] = None +): + """ + Broadcast expense-related events to relevant rooms + Sends to household, list, and expense-specific rooms as appropriate + """ + try: + # Prepare event payload + event_payload = { + "expense_id": expense.id, + "description": expense.description, + "total_amount": str(expense.total_amount), + "currency": expense.currency, + "paid_by_user_id": expense.paid_by_user_id, + "version": expense.version, + "updated_at": expense.updated_at.isoformat() if expense.updated_at else None, + } + + # Create the WebSocket event + event = create_expense_event(event_type, event_payload, user_id) + + # Broadcast to household if expense is linked to a group + if expense.group_id: + await websocket_manager.broadcast_to_household( + expense.group_id, + event, + exclude_user=exclude_user + ) + + # Broadcast to list if expense is linked to a specific list + if expense.list_id: + await websocket_manager.broadcast_to_list( + expense.list_id, + event, + exclude_user=exclude_user + ) + + # Broadcast to expense-specific room for splits and settlements + await websocket_manager.broadcast_to_expense( + expense.id, + event, + exclude_user=exclude_user + ) + + logger.debug(f"Broadcasted {event_type} event for expense {expense.id}") + + except Exception as e: + # Don't fail the CRUD operation if WebSocket broadcast fails + logger.error(f"Failed to broadcast expense event {event_type} for expense {expense.id}: {e}") + + +async def _broadcast_settlement_event( + event_type: str, + settlement_activity: Any, # SettlementActivity model + expense: ExpenseModel, + user_id: int, + exclude_user: Optional[int] = None +): + """ + Broadcast settlement-related events for expense splits + """ + try: + event_payload = { + "settlement_id": settlement_activity.id, + "expense_id": expense.id, + "expense_split_id": settlement_activity.expense_split_id, + "settled_amount": str(settlement_activity.amount_paid), + "settled_by_user_id": settlement_activity.settled_by_user_id, + "settlement_date": settlement_activity.settlement_date.isoformat(), + } + + event = create_expense_event(event_type, event_payload, user_id) + + # Broadcast to all relevant rooms + if expense.group_id: + await websocket_manager.broadcast_to_household(expense.group_id, event, exclude_user) + if expense.list_id: + await websocket_manager.broadcast_to_list(expense.list_id, event, exclude_user) + await websocket_manager.broadcast_to_expense(expense.id, event, exclude_user) + + logger.debug(f"Broadcasted {event_type} event for settlement {settlement_activity.id}") + + except Exception as e: + logger.error(f"Failed to broadcast settlement event {event_type}: {e}") \ No newline at end of file diff --git a/be/app/crud/group.py b/be/app/crud/group.py index 08848c3..3b914d8 100644 --- a/be/app/crud/group.py +++ b/be/app/crud/group.py @@ -23,6 +23,8 @@ from app.core.exceptions import ( InvalidOperationError ) from app.core.cache import cache +# Add WebSocket imports +from app.core.websocket import websocket_manager, create_group_event logger = logging.getLogger(__name__) # Initialize logger @@ -61,6 +63,14 @@ async def create_group(db: AsyncSession, group_in: GroupCreate, creator_id: int) # This should not happen if we just created it, but as a safeguard raise GroupOperationError("Failed to load group after creation.") + # Broadcast group creation event via WebSocket (to creator only initially) + await _broadcast_group_event( + "group:created", + loaded_group, + creator_id, + exclude_user=None # Send to creator + ) + return loaded_group except IntegrityError as e: logger.error(f"Database integrity error during group creation: {str(e)}", exc_info=True) @@ -169,6 +179,15 @@ async def add_user_to_group(db: AsyncSession, group_id: int, user_id: int, role: if loaded_user_group is None: raise GroupOperationError(f"Failed to load user group association after adding user {user_id} to group {group_id}.") + # Broadcast member joined event via WebSocket + await _broadcast_member_event( + "group:member_joined", + loaded_user_group.group, + loaded_user_group.user, + user_id, + exclude_user=user_id # Don't send to the joining user + ) + return loaded_user_group except IntegrityError as e: logger.error(f"Database integrity error while adding user to group: {str(e)}", exc_info=True) @@ -184,6 +203,15 @@ async def remove_user_from_group(db: AsyncSession, group_id: int, user_id: int) """Removes a user from a group.""" try: async with db.begin_nested() if db.in_transaction() else db.begin(): + # Get user info before deletion for WebSocket broadcast + user_to_remove = None + if user_id: + user_result = await db.execute( + select(UserModel) + .where(UserModel.id == user_id) + ) + user_to_remove = user_result.scalar_one_or_none() + result = await db.execute( delete(UserGroupModel) .where(UserGroupModel.group_id == group_id, UserGroupModel.user_id == user_id) @@ -198,6 +226,16 @@ async def remove_user_from_group(db: AsyncSession, group_id: int, user_id: int) .values(version=GroupModel.version + 1) ) + # Broadcast member left event via WebSocket + if user_to_remove: + await _broadcast_member_left_event( + "group:member_left", + group_id, + user_to_remove, + user_id, + exclude_user=user_id # Don't send to the leaving user + ) + return deleted except OperationalError as e: logger.error(f"Database connection error while removing user from group: {str(e)}", exc_info=True) @@ -309,6 +347,14 @@ async def delete_group(db: AsyncSession, group_id: int, *, expected_version: int f"Version mismatch for group {group_id}. Current version is {group.version}, expected {expected_version}." ) + # Broadcast group deletion event via WebSocket (before actual deletion) + await _broadcast_group_event( + "group:deleted", + group, + group.created_by_id, + exclude_user=None # Send to all members + ) + # Delete the group - cascading delete will handle related records await db.delete(group) await db.flush() @@ -319,4 +365,105 @@ async def delete_group(db: AsyncSession, group_id: int, *, expected_version: int raise DatabaseConnectionError(f"Database connection error: {str(e)}") except SQLAlchemyError as e: logger.error(f"Unexpected SQLAlchemy error while deleting group {group_id}: {str(e)}", exc_info=True) - raise DatabaseTransactionError(f"Failed to delete group: {str(e)}") \ No newline at end of file + raise DatabaseTransactionError(f"Failed to delete group: {str(e)}") + + +# WebSocket Broadcasting Helper Functions + +async def _broadcast_group_event( + event_type: str, + group: GroupModel, + user_id: int, + exclude_user: Optional[int] = None +): + """ + Broadcast group-related events to household members + """ + try: + # Prepare event payload + event_payload = { + "group_id": group.id, + "name": group.name, + "created_by_id": group.created_by_id, + "version": group.version, + "created_at": group.created_at.isoformat() if group.created_at else None, + "updated_at": group.updated_at.isoformat() if group.updated_at else None, + } + + # Create the WebSocket event + event = create_group_event(event_type, event_payload, user_id) + + # Broadcast to all household members + await websocket_manager.broadcast_to_household( + group.id, + event, + exclude_user=exclude_user + ) + + logger.debug(f"Broadcasted {event_type} event for group {group.id}") + + except Exception as e: + # Don't fail the CRUD operation if WebSocket broadcast fails + logger.error(f"Failed to broadcast group event {event_type} for group {group.id}: {e}") + + +async def _broadcast_member_event( + event_type: str, + group: GroupModel, + user: UserModel, + acting_user_id: int, + exclude_user: Optional[int] = None +): + """ + Broadcast member-related events to household + """ + try: + event_payload = { + "group_id": group.id, + "user": { + "id": user.id, + "email": user.email, + "username": user.username, + "is_active": user.is_active, + }, + "group_name": group.name, + } + + event = create_group_event(event_type, event_payload, acting_user_id) + + # Broadcast to all household members + await websocket_manager.broadcast_to_household(group.id, event, exclude_user) + + logger.debug(f"Broadcasted {event_type} event for user {user.id} in group {group.id}") + + except Exception as e: + logger.error(f"Failed to broadcast member event {event_type}: {e}") + + +async def _broadcast_member_left_event( + event_type: str, + group_id: int, + user: UserModel, + acting_user_id: int, + exclude_user: Optional[int] = None +): + """ + Broadcast member left events to household + """ + try: + event_payload = { + "group_id": group_id, + "user_id": user.id, + "user_email": user.email, + "user_username": user.username, + } + + event = create_group_event(event_type, event_payload, acting_user_id) + + # Broadcast to remaining household members + await websocket_manager.broadcast_to_household(group_id, event, exclude_user) + + logger.debug(f"Broadcasted {event_type} event for user {user.id} leaving group {group_id}") + + except Exception as e: + logger.error(f"Failed to broadcast member left event {event_type}: {e}") \ No newline at end of file diff --git a/be/app/crud/invite.py b/be/app/crud/invite.py index b933cc4..084ba3d 100644 --- a/be/app/crud/invite.py +++ b/be/app/crud/invite.py @@ -16,6 +16,7 @@ from app.core.exceptions import ( DatabaseTransactionError, InviteOperationError ) +from app.core.websocket import websocket_manager, create_group_event logger = logging.getLogger(__name__) @@ -95,6 +96,14 @@ async def create_invite(db: AsyncSession, group_id: int, creator_id: int, expire if loaded_invite is None: raise InviteOperationError("Failed to load invite after creation and flush.") + # Broadcast invite creation event via WebSocket + await _broadcast_invite_event( + "group:invite_created", + loaded_invite, + creator_id, + exclude_user=creator_id + ) + return loaded_invite except InviteOperationError: raise @@ -177,6 +186,14 @@ async def deactivate_invite(db: AsyncSession, invite: InviteModel) -> InviteMode if updated_invite is None: raise InviteOperationError("Failed to load invite after deactivation.") + # Broadcast invite used event via WebSocket + await _broadcast_invite_event( + "group:invite_used", + updated_invite, + updated_invite.created_by_id, + exclude_user=None # Notify all household members + ) + return updated_invite except OperationalError as e: logger.error(f"Database connection error deactivating invite: {str(e)}", exc_info=True) @@ -185,3 +202,42 @@ async def deactivate_invite(db: AsyncSession, invite: InviteModel) -> InviteMode logger.error(f"Unexpected SQLAlchemy error deactivating invite: {str(e)}", exc_info=True) raise DatabaseTransactionError(f"DB transaction error deactivating invite: {str(e)}") + +# WebSocket Broadcasting Helper Functions + +async def _broadcast_invite_event( + event_type: str, + invite: InviteModel, + user_id: int, + exclude_user: Optional[int] = None +): + """ + Broadcast invite-related events to household members + """ + try: + # Prepare event payload + event_payload = { + "invite_code": invite.code, + "group_id": invite.group_id, + "created_by_id": invite.created_by_id, + "expires_at": invite.expires_at.isoformat() if invite.expires_at else None, + "is_active": invite.is_active, + "group_name": invite.group.name if invite.group else None, + } + + # Create the WebSocket event + event = create_group_event(event_type, event_payload, user_id) + + # Broadcast to all household members + await websocket_manager.broadcast_to_household( + invite.group_id, + event, + exclude_user=exclude_user + ) + + logger.debug(f"Broadcasted {event_type} event for invite {invite.code}") + + except Exception as e: + # Don't fail the CRUD operation if WebSocket broadcast fails + logger.error(f"Failed to broadcast invite event {event_type}: {e}") + diff --git a/be/app/crud/item.py b/be/app/crud/item.py index 2e4d866..362325b 100644 --- a/be/app/crud/item.py +++ b/be/app/crud/item.py @@ -19,6 +19,7 @@ from app.core.exceptions import ( ConflictError, ItemOperationError ) +from app.core.websocket import websocket_manager, create_list_event logger = logging.getLogger(__name__) @@ -56,6 +57,14 @@ async def create_item(db: AsyncSession, item_in: ItemCreate, list_id: int, user_ if loaded_item is None: raise ItemOperationError("Failed to load item after creation.") + # Broadcast item creation event via WebSocket + await _broadcast_item_event( + "item:created", + loaded_item, + user_id, + exclude_user=user_id + ) + return loaded_item except IntegrityError as e: logger.error(f"Database integrity error during item creation: {str(e)}", exc_info=True) @@ -166,6 +175,14 @@ async def update_item(db: AsyncSession, item_db: ItemModel, item_in: ItemUpdate, if updated_item is None: raise ItemOperationError("Failed to load item after update.") + # Broadcast item update event via WebSocket + await _broadcast_item_event( + "item:updated", + updated_item, + user_id, + exclude_user=user_id + ) + return updated_item except IntegrityError as e: logger.error(f"Database integrity error during item update: {str(e)}", exc_info=True) @@ -183,6 +200,14 @@ async def delete_item(db: AsyncSession, item_db: ItemModel) -> None: """Deletes an item record. Version check should be done by the caller (API endpoint).""" try: async with db.begin_nested() if db.in_transaction() else db.begin() as transaction: + # Broadcast item deletion event via WebSocket (before actual deletion) + await _broadcast_item_event( + "item:deleted", + item_db, + item_db.added_by_id, # Use item creator as originator + exclude_user=None + ) + await db.delete(item_db) except OperationalError as e: logger.error(f"Database connection error while deleting item: {str(e)}", exc_info=True) @@ -204,6 +229,15 @@ async def claim_item(db: AsyncSession, item: ItemModel, user_id: int) -> ItemMod db.add(item) await db.flush() await db.refresh(item, attribute_names=['claimed_by_user', 'version', 'claimed_at']) + + # Broadcast item claim event via WebSocket + await _broadcast_item_event( + "item:claimed", + item, + user_id, + exclude_user=user_id + ) + return item async def unclaim_item(db: AsyncSession, item: ItemModel) -> ItemModel: @@ -212,4 +246,82 @@ async def unclaim_item(db: AsyncSession, item: ItemModel) -> ItemModel: item.claimed_at = None item.version += 1 db.add(item) - await db.flush() \ No newline at end of file + await db.flush() + + # Broadcast item unclaim event via WebSocket + await _broadcast_item_event( + "item:unclaimed", + item, + item.added_by_id, # Use item creator as originator for unclaim + exclude_user=None + ) + + +# WebSocket Broadcasting Helper Functions + +async def _broadcast_item_event( + event_type: str, + item: ItemModel, + user_id: int, + exclude_user: Optional[int] = None +): + """ + Broadcast item-related events to relevant rooms + Sends to household and list-specific rooms as appropriate + """ + try: + # Prepare event payload + event_payload = { + "item_id": item.id, + "list_id": item.list_id, + "name": item.name, + "quantity": item.quantity, + "is_complete": item.is_complete, + "claimed_by_user_id": item.claimed_by_user_id, + "claimed_at": item.claimed_at.isoformat() if item.claimed_at else None, + "added_by_id": item.added_by_id, + "position": item.position, + "version": item.version, + "updated_at": item.updated_at.isoformat() if item.updated_at else None, + } + + # Create the WebSocket event + event = create_list_event(event_type, event_payload, user_id) + + # Broadcast to list-specific room for real-time collaboration + await websocket_manager.broadcast_to_list( + item.list_id, + event, + exclude_user=exclude_user + ) + + logger.debug(f"Broadcasted {event_type} event for item {item.id} in list {item.list_id}") + + except Exception as e: + # Don't fail the CRUD operation if WebSocket broadcast fails + logger.error(f"Failed to broadcast item event {event_type} for item {item.id}: {e}") + +async def reorder_items(db: AsyncSession, list_id: int, ordered_ids: PyList[int]) -> None: + """Reorders items in a list based on the provided ordered list of item IDs.""" + try: + async with db.begin_nested() if db.in_transaction() else db.begin() as transaction: + # Get all items in the list + stmt = select(ItemModel).where(ItemModel.list_id == list_id) + result = await db.execute(stmt) + items = {item.id: item for item in result.scalars().all()} + + # Update positions based on the ordered IDs + for position, item_id in enumerate(ordered_ids, 1): + if item_id in items: + items[item_id].position = position + db.add(items[item_id]) + + await db.flush() + logger.info(f"Reordered {len(ordered_ids)} items in list {list_id}") + + except OperationalError as e: + logger.error(f"Database connection error while reordering items: {str(e)}", exc_info=True) + raise DatabaseConnectionError(f"Database connection error while reordering items: {str(e)}") + except SQLAlchemyError as e: + logger.error(f"Unexpected SQLAlchemy error while reordering items: {str(e)}", exc_info=True) + raise DatabaseTransactionError(f"Failed to reorder items: {str(e)}") \ No newline at end of file diff --git a/be/app/crud/list.py b/be/app/crud/list.py index 7ecd7b8..a5679d3 100644 --- a/be/app/crud/list.py +++ b/be/app/crud/list.py @@ -21,6 +21,8 @@ from app.core.exceptions import ( ConflictError, ListOperationError ) +# Add WebSocket imports +from app.core.websocket import websocket_manager, create_list_event logger = logging.getLogger(__name__) @@ -52,6 +54,14 @@ async def create_list(db: AsyncSession, list_in: ListCreate, creator_id: int) -> if loaded_list is None: raise ListOperationError("Failed to load list after creation.") + # Broadcast list creation event via WebSocket + await _broadcast_list_event( + "list:created", + loaded_list, + creator_id, + exclude_user=creator_id + ) + return loaded_list except IntegrityError as e: logger.error(f"Database integrity error during list creation: {str(e)}", exc_info=True) @@ -157,6 +167,14 @@ async def update_list(db: AsyncSession, list_db: ListModel, list_in: ListUpdate) if updated_list is None: raise ListOperationError("Failed to load list after update.") + # Broadcast list update event via WebSocket + await _broadcast_list_event( + "list:updated", + updated_list, + updated_list.created_by_id, # Use list creator as event originator + exclude_user=None # Don't exclude anyone for list updates + ) + return updated_list except IntegrityError as e: logger.error(f"Database integrity error during list update: {str(e)}", exc_info=True) @@ -230,6 +248,56 @@ async def check_list_permission(db: AsyncSession, list_id: int, user_id: int, re except SQLAlchemyError as e: raise DatabaseQueryError(f"Failed to check list permissions: {str(e)}") + +# WebSocket Broadcasting Helper Functions + +async def _broadcast_list_event( + event_type: str, + list_obj: ListModel, + user_id: int, + exclude_user: Optional[int] = None +): + """ + Broadcast list-related events to relevant rooms + Sends to household and list-specific rooms as appropriate + """ + try: + # Prepare event payload + event_payload = { + "list_id": list_obj.id, + "name": list_obj.name, + "description": list_obj.description, + "is_complete": list_obj.is_complete, + "created_by_id": list_obj.created_by_id, + "group_id": list_obj.group_id, + "version": list_obj.version, + "updated_at": list_obj.updated_at.isoformat() if list_obj.updated_at else None, + } + + # Create the WebSocket event + event = create_list_event(event_type, event_payload, user_id) + + # Broadcast to household if list belongs to a group + if list_obj.group_id: + await websocket_manager.broadcast_to_household( + list_obj.group_id, + event, + exclude_user=exclude_user + ) + + # Broadcast to list-specific room for real-time collaboration + await websocket_manager.broadcast_to_list( + list_obj.id, + event, + exclude_user=exclude_user + ) + + logger.debug(f"Broadcasted {event_type} event for list {list_obj.id}") + + except Exception as e: + # Don't fail the CRUD operation if WebSocket broadcast fails + logger.error(f"Failed to broadcast list event {event_type} for list {list_obj.id}: {e}") + async def get_list_status(db: AsyncSession, list_id: int) -> ListStatus: """Gets the update timestamps and item count for a list.""" try: diff --git a/be/app/schemas/expense.py b/be/app/schemas/expense.py index 4383f16..157abd9 100644 --- a/be/app/schemas/expense.py +++ b/be/app/schemas/expense.py @@ -2,52 +2,75 @@ from pydantic import BaseModel, ConfigDict, validator, Field from typing import List, Optional from decimal import Decimal from datetime import datetime -from app.models import SplitTypeEnum, ExpenseSplitStatusEnum, ExpenseOverallStatusEnum +from app.models import SplitTypeEnum, ExpenseSplitStatusEnum, ExpenseOverallStatusEnum, RecurrenceTypeEnum from app.schemas.user import UserPublic -from app.schemas.settlement_activity import SettlementActivityPublic -from app.schemas.recurrence import RecurrencePatternCreate, RecurrencePatternPublic + +# Align with unified types - SettlementActivity first to avoid circular imports +class SettlementActivityBase(BaseModel): + expense_split_id: int + paid_by_user_id: int + amount_paid: Decimal + paid_at: Optional[datetime] = None + +class SettlementActivityCreate(SettlementActivityBase): + pass + +class SettlementActivityPublic(SettlementActivityBase): + id: int + created_by_user_id: int + created_at: datetime + updated_at: datetime + payer: Optional[UserPublic] = None + creator: Optional[UserPublic] = None + model_config = ConfigDict(from_attributes=True) class ExpenseSplitBase(BaseModel): user_id: int owed_amount: Optional[Decimal] = None share_percentage: Optional[Decimal] = None share_units: Optional[int] = None - # Note: Status is handled by the backend, not in create/update payloads class ExpenseSplitCreate(ExpenseSplitBase): pass +class ExpenseSplitUpdate(BaseModel): + owed_amount: Optional[Decimal] = None + share_percentage: Optional[Decimal] = None + share_units: Optional[int] = None + status: Optional[ExpenseSplitStatusEnum] = None + class ExpenseSplitPublic(ExpenseSplitBase): id: int expense_id: int status: ExpenseSplitStatusEnum - user: Optional[UserPublic] = None + paid_at: Optional[datetime] = None created_at: datetime updated_at: datetime - paid_at: Optional[datetime] = None + user: Optional[UserPublic] = None settlement_activities: List[SettlementActivityPublic] = [] model_config = ConfigDict(from_attributes=True) class RecurrencePatternBase(BaseModel): - type: str = Field(..., description="Type of recurrence: daily, weekly, monthly, yearly") - interval: int = Field(..., description="Interval of recurrence (e.g., every X days/weeks/months/years)") - days_of_week: Optional[List[int]] = Field(None, description="Days of week for weekly recurrence (0-6, Sunday-Saturday)") - end_date: Optional[datetime] = Field(None, description="Optional end date for the recurrence") - max_occurrences: Optional[int] = Field(None, description="Optional maximum number of occurrences") + type: RecurrenceTypeEnum + interval: int = Field(default=1, ge=1) + days_of_week: Optional[str] = Field(None, description="JSON string of array for weekly recurrence") + end_date: Optional[datetime] = None + max_occurrences: Optional[int] = Field(None, ge=1) class RecurrencePatternCreate(RecurrencePatternBase): pass class RecurrencePatternUpdate(RecurrencePatternBase): - pass + type: Optional[RecurrenceTypeEnum] = None + interval: Optional[int] = None + version: int -class RecurrencePatternInDB(RecurrencePatternBase): +class RecurrencePatternPublic(RecurrencePatternBase): id: int created_at: datetime updated_at: datetime - - class Config: - from_attributes = True + version: int + model_config = ConfigDict(from_attributes=True) class ExpenseBase(BaseModel): description: str @@ -60,10 +83,10 @@ class ExpenseBase(BaseModel): item_id: Optional[int] = None paid_by_user_id: int is_recurring: bool = Field(False, description="Whether this is a recurring expense") - recurrence_pattern: Optional[RecurrencePatternCreate] = Field(None, description="Recurrence pattern for recurring expenses") class ExpenseCreate(ExpenseBase): - splits_in: Optional[List[ExpenseSplitCreate]] = None + splits_in: Optional[List[ExpenseSplitCreate]] = None + recurrence_pattern: Optional[RecurrencePatternCreate] = None @validator('total_amount') def total_amount_must_be_positive(cls, v): @@ -93,27 +116,36 @@ class ExpenseUpdate(BaseModel): split_type: Optional[SplitTypeEnum] = None list_id: Optional[int] = None group_id: Optional[int] = None - item_id: Optional[int] = None - version: int + item_id: Optional[int] = None is_recurring: Optional[bool] = None recurrence_pattern: Optional[RecurrencePatternUpdate] = None next_occurrence: Optional[datetime] = None + version: int + + @validator('total_amount') + def total_amount_must_be_positive(cls, v): + if v is not None and v <= Decimal('0'): + raise ValueError('Total amount must be positive') + return v class ExpensePublic(ExpenseBase): id: int + created_by_user_id: int + overall_settlement_status: ExpenseOverallStatusEnum + next_occurrence: Optional[datetime] = None + last_occurrence: Optional[datetime] = None + parent_expense_id: Optional[int] = None created_at: datetime updated_at: datetime version: int - created_by_user_id: int - splits: List[ExpenseSplitPublic] = [] + + # Relationships paid_by_user: Optional[UserPublic] = None - overall_settlement_status: ExpenseOverallStatusEnum - is_recurring: bool - next_occurrence: Optional[datetime] - last_occurrence: Optional[datetime] - recurrence_pattern: Optional[RecurrencePatternInDB] - parent_expense_id: Optional[int] + created_by_user: Optional[UserPublic] = None + splits: List[ExpenseSplitPublic] = [] + recurrence_pattern: Optional[RecurrencePatternPublic] = None generated_expenses: List['ExpensePublic'] = [] + model_config = ConfigDict(from_attributes=True) class SettlementBase(BaseModel): @@ -143,10 +175,33 @@ class SettlementUpdate(BaseModel): description: Optional[str] = None version: int + @validator('amount') + def amount_must_be_positive(cls, v): + if v is not None and v <= Decimal('0'): + raise ValueError('Settlement amount must be positive') + return v + class SettlementPublic(SettlementBase): id: int + created_by_user_id: int created_at: datetime updated_at: datetime version: int - created_by_user_id: int - model_config = ConfigDict(from_attributes=True) \ No newline at end of file + + # Relationships + payer: Optional[UserPublic] = None + payee: Optional[UserPublic] = None + created_by_user: Optional[UserPublic] = None + + model_config = ConfigDict(from_attributes=True) + +# WebSocket event schemas for type safety +class ExpenseWebSocketEvent(BaseModel): + event: str + payload: dict + timestamp: datetime + user_id: Optional[int] = None + room: Optional[str] = None + +# Update forward references +ExpensePublic.model_rebuild() \ No newline at end of file diff --git a/docs/websockets.md b/docs/websockets.md new file mode 100644 index 0000000..0fc4605 --- /dev/null +++ b/docs/websockets.md @@ -0,0 +1,183 @@ +# 🛰️ Real-Time Communication Guide + +A comprehensive, end-to-end reference for the Household Management App's WebSocket infrastructure. + +--- + +## 1. High-Level Overview + +| Layer | Purpose | +|-------|---------| +| **Backend** | Broadcasts domain events (chores, groups, items, expenses…) to targeted rooms with optional user exclusion. | +| **Frontend** | Maintains a single authenticated WebSocket connection per household, routes incoming events to Pinia stores, updates UI & triggers notifications. | + +All events share a **type-safe JSON contract**: + +```jsonc +{ + "event": "chore:updated", // namespaced event key (domain:action) + "payload": { /* entity-specific */ }, + "timestamp": "2025-06-29T12:34:56Z", + "user_id": 42, // originating user (optional) + "room": "household:7" // broadcast room (optional) +} +``` + +--- + +## 2. Backend Architecture + +### 2.1 Entry Point + +```mermaid +sequenceDiagram + participant Client + participant FastAPI as FastAPI /api/v1/ws/household/{id} + participant Manager as WebSocketManager (core/websocket.py) + Client->>FastAPI: WebSocket handshake (token query param) + FastAPI->>Manager: register(socket, household_id, user) + Manager-->>Client: connection:established +``` + +* **Endpoint**: `be/app/api/v1/endpoints/websocket.py` +* **Authentication**: JWT access token retrieved from query string and validated via FastAPI dependency. +* **Manager (`core/websocket.py`)** + * Keeps `Dict[str, Set[WebSocket]]` mapping of **rooms** ➜ sockets. + * Offers helpers:`broadcast(room, event, payload, *, exclude_user_id=None)`. + * Auto-removes closed connections & handles ping/pong. + +### 2.2 Room Conventions + +| Room | Used For | Example | +|------|----------|---------| +| `household:{id}` | Global events within a household | `household:7` | +| `list:{id}` | Shopping list specific events | `list:12` | +| `expense:{id}` | Expense settlement events | `expense:88` | + +### 2.3 Event Producers + +| Module | Function(s) | Emitted Events | +|--------|-------------|---------------| +| `crud/chore.py` | `create_chore`, `update_chore`, `delete_chore`, … | `chore:created`, `chore:updated`, `chore:deleted`, `chore:assigned`, `chore:completed`, `chore:assignment_updated` | +| `crud/group.py` | `create_group`, `add_user_to_group`, `remove_user_from_group`, `delete_group` | `group:created`, `group:member_joined`, `group:member_left`, `group:deleted` | +| `crud/invite.py` | `create_invite`, `deactivate_invite` | `group:invite_created`, `group:invite_used` | +| `crud/item.py` | `create_item`, `update_item`, `delete_item`, `claim_item`, `unclaim_item` | `item:created`, `item:updated`, `item:deleted`, `item:claimed`, `item:unclaimed` | +| `crud/expense.py` | `create_expense`, `update_expense`, `delete_expense` | `expense:created`, `expense:updated`, `expense:deleted` | + +> **Smart User Exclusion**: Every helper call passes the acting `user_id`; the manager omits that socket to prevent "echo" effects on the originator. + +### 2.4 Reliability & Performance + +* **Graceful Degradation** – Exceptions in broadcasting are caught; CRUD logic always completes. +* **Ping / Pong** – Server sends `ping` every **25 s**; client replies with `pong`. +* **Scalability** – Manager is stateless apart from room maps and can be swapped for Redis-backed pub/sub later. + +--- + +## 3. Frontend Architecture + +### 3.1 useSocket Composable + +Located at `fe/src/composables/useSocket.ts`. + +* **Single Source of Truth** – Global reactive state (`socket`, `listeners`, `isConnected`…). +* **Connection Logic** + * URL: `ws(s)://{host}/api/v1/ws/household/{householdId}?token=JWT`. + * Exponential reconnect (1 s → 2 s → 4 s → …, max 5 attempts). + * Heartbeat every **30 s** (`presence_update: heartbeat`). +* **Event Routing** + * `listeners: Map>` – any store/component registers via `on(event, cb)`. + * Raw server `ping` handled automatically. +* **Outgoing Convenience** + * `emit(event, payload)` – client-side custom events (rarely used). + * `startEditing/stopEditing`, `updatePresence` helpers. + +### 3.2 Pinia Store Integration + +| Store | Event Handlers | Highlights | +|-------|----------------|-----------| +| **choreStore** | `chore:*`, `timer:*` | Updates local arrays & triggers assignment/completion notifications. | +| **groupStore** | `group:*`, `invite:*` | Live member joins/leaves, group CRUD & smart redirection. | +| **itemStore** | `item:*` | Real-time shopping list sync with claim/unclaim notifications. | +| **listDetailStore** | `list:updated`, `expense:*` | Keeps list view & expenses fresh during collaboration. | + +All stores follow a **canonical pattern**: + +```ts +function setupWebSocketListeners() { + on('entity:created', handleCreated) + on('entity:updated', handleUpdated) + // ... +} + +function cleanupWebSocketListeners() { + off('entity:created', handleCreated) + // ... +} +``` + +### 3.3 Notification Strategy + +`fe/src/stores/notifications.ts` provides a lightweight queue. Stores push human-friendly messages: + +* Self-actions filtered (`user_id !== currentUser.id`). +* Importance-based durations (info 3 s, success 5 s, errors manual close). +* Display component: `components/global/NotificationDisplay.vue`. + +### 3.4 Presence & Editing (Future) + +API scaffolding exists (`startEditing`, `presence_update`). UI indicators will be added in the next milestone. + +--- + +## 4. Event Reference (Cheat-Sheet) + +| Domain | Event | Payload Shape | +|--------|-------|---------------| +| **Chores** | `chore:created` | `{ chore: Chore }` | +| | `chore:updated` | `{ chore: Chore }` | +| | `chore:deleted` | `{ choreId: number, groupId? }` | +| | `chore:assigned` / `chore:assignment_updated` | `{ assignment: ChoreAssignment }` | +| | `chore:completed` | `{ assignment: ChoreAssignment, points: number }` | +| **Groups** | `group:created` | `{ group: GroupPublic }` | +| | `group:member_joined` / `member_left` | `{ groupId: number, member / userId }` | +| | `group:deleted` | `{ groupId: number }` | +| **Invites** | `group:invite_created` / `invite_used` | `{ groupId: number, invite… }` | +| **Items** | `item:*` | Item entity specific | +| **Lists** | `list:updated` | `{ list: List }` | +| **Expenses** | `expense:*` | Expense entity specific | + +--- + +## 5. Adding a New Real-Time Feature + +1. **Backend** – Identify CRUD mutation ➜ call `broadcast_event()` with new `event` key. +2. **Frontend** – In relevant Pinia store: + * import `useSocket`, + * register `on('new:event', handler)` in `setupWebSocketListeners()`. +3. **UI / Notifications** – Optionally surface via component or toast. +4. **Docs** – Append to the table above. + +--- + +## 6. Troubleshooting & Tips + +| Symptom | Likely Cause | Fix | +|---------|--------------|-----| +| No connection | Invalid JWT / expired | Refresh token & reconnect. | +| Echo events | Backend `exclude_user_id` missing | Pass `current_user.id` to broadcast helper. | +| Stale UI after reconnect | Stores lost listeners on HMR | Call `setupWebSocketListeners()` in Pinia definition (runs once). | +| High CPU on server | Broadcast loops | Ensure room targeting is specific (e.g., list-level not household-level for item events). | + +--- + +## 7. Roadmap + +* Presence indicators per list/chore (avatars + typing)… +* Redis-based pub/sub for horizontal scaling. +* WebSocket analytics dashboard (latency, event throughput). + +--- + +> **Last updated**: 2025-06-29 | +> **Author**: Engineering Team diff --git a/fe/.prettierrc.json b/fe/.prettierrc.json index 18b1821..c7242a6 100644 --- a/fe/.prettierrc.json +++ b/fe/.prettierrc.json @@ -2,5 +2,5 @@ "$schema": "https://json.schemastore.org/prettierrc", "semi": false, "singleQuote": true, - "printWidth": 150 + "printWidth": 250 } \ No newline at end of file diff --git a/fe/src/components/SmartShoppingList.vue b/fe/src/components/SmartShoppingList.vue index 69b7ad2..af6db2b 100644 --- a/fe/src/components/SmartShoppingList.vue +++ b/fe/src/components/SmartShoppingList.vue @@ -631,7 +631,7 @@ const createExpenseFromShopping = () => { onMounted(() => { // Connect to WebSocket for real-time updates if online if (props.isOnline && props.list.id) { - listsStore.connectWebSocket(props.list.id, authStore.token) + listsStore.connectWebSocket(props.list.id, authStore.accessToken || '') } }) @@ -643,7 +643,7 @@ onUnmounted(() => { // Watch for online status changes watch(() => props.isOnline, (isOnline) => { if (isOnline && props.list.id) { - listsStore.connectWebSocket(props.list.id, authStore.token) + listsStore.connectWebSocket(props.list.id, authStore.accessToken || '') } else { listsStore.disconnectWebSocket() } diff --git a/fe/src/components/UniversalFAB.vue b/fe/src/components/UniversalFAB.vue new file mode 100644 index 0000000..12cd33c --- /dev/null +++ b/fe/src/components/UniversalFAB.vue @@ -0,0 +1,1000 @@ + + + + + \ No newline at end of file diff --git a/fe/src/components/dashboard/UniversalFAB.vue b/fe/src/components/dashboard/UniversalFAB.vue deleted file mode 100644 index ae5fa50..0000000 --- a/fe/src/components/dashboard/UniversalFAB.vue +++ /dev/null @@ -1,428 +0,0 @@ - - - - - \ No newline at end of file diff --git a/fe/src/components/expenses/SettlementFlow.vue b/fe/src/components/expenses/SettlementFlow.vue index 4e183b4..14db0de 100644 --- a/fe/src/components/expenses/SettlementFlow.vue +++ b/fe/src/components/expenses/SettlementFlow.vue @@ -40,14 +40,14 @@

Settlement Summary

-
+
- {{ debt.fromUser.name }} + {{ payerName }} - {{ debt.toUser.name }} + {{ payeeName }}
-
- {{ formatCurrency(debt.amount) }} +
+ {{ formatCurrency(totalAmount) }}
@@ -58,16 +58,17 @@
- -
+ +

What this covers:

-
+
- {{ item.name }} - {{ formatDate(item.date) }} + {{ expense.description }} + {{ formatDate(expense.expense_date || expense.created_at) + }}
-
{{ formatCurrency(item.amount) }}
+
{{ formatCurrency(parseFloat(expense.total_amount)) }}
@@ -222,9 +223,11 @@ import { ref, reactive, computed, onMounted, watch } from 'vue' import { Dialog, Heading, Button, Card, Input, Textarea } from '@/components/ui' import BaseIcon from '@/components/BaseIcon.vue' -import type { ExpenseSplit, Expense } from '@/types/expense' +import type { ExpenseSplit, Expense, SettlementActivityCreate } from '@/types/expense' import { useExpenses } from '@/composables/useExpenses' import { useNotificationStore } from '@/stores/notifications' +import { useAuthStore } from '@/stores/auth' +import { settlementService } from '@/services/settlementService' import { format } from 'date-fns' interface Props { @@ -236,6 +239,7 @@ const props = defineProps() const open = defineModel('modelValue', { default: false }) const notifications = useNotificationStore() +const authStore = useAuthStore() const { settleExpenseSplit } = useExpenses() // Step management @@ -246,27 +250,27 @@ const steps = [ { id: 'verification', label: 'Verify & Confirm' } ] -// Settlement data -const totalAmount = ref(125.50) // This would come from props -const debts = ref([ - { - id: 1, - fromUser: { name: 'You' }, - toUser: { name: 'Alice' }, - amount: 75.25 - }, - { - id: 2, - fromUser: { name: 'You' }, - toUser: { name: 'Bob' }, - amount: 50.25 - } -]) +// Real settlement data from props +const totalAmount = computed(() => { + if (!props.split) return 0 + const owed = parseFloat(props.split.owed_amount) + const paid = props.split.settlement_activities.reduce((sum, activity) => { + return sum + parseFloat(activity.amount_paid) + }, 0) + return owed - paid +}) -const settlementItems = ref([ - { id: 1, name: 'Grocery Shopping - Whole Foods', date: new Date('2024-01-15'), amount: 89.50 }, - { id: 2, name: 'Dinner at Italian Place', date: new Date('2024-01-18'), amount: 36.00 } -]) +const currentUserOwes = computed(() => { + return props.split?.user_id === authStore.user?.id +}) + +const payerName = computed(() => { + return props.split?.user?.name || props.split?.user?.email || 'Unknown User' +}) + +const payeeName = computed(() => { + return props.expense?.paid_by_user?.name || props.expense?.paid_by_user?.email || 'Unknown User' +}) // Payment method selection const selectedPaymentMethod = ref('') @@ -300,7 +304,8 @@ const canProceed = computed(() => { const canConfirm = computed(() => { return selectedPaymentMethod.value.length > 0 && - (selectedPaymentMethod.value !== 'custom' || customPaymentMethod.value.trim().length > 0) + (selectedPaymentMethod.value !== 'custom' || customPaymentMethod.value.trim().length > 0) && + totalAmount.value > 0 }) // Methods @@ -327,8 +332,9 @@ function formatCurrency(amount: number) { }).format(amount) } -function formatDate(date: Date) { - return format(date, 'MMM d, yyyy') +function formatDate(date: Date | string) { + const dateObj = typeof date === 'string' ? new Date(date) : date + return format(dateObj, 'MMM d, yyyy') } function getSelectedPaymentMethodName() { @@ -351,27 +357,29 @@ function handleReceiptUpload(event: Event) { } async function confirmSettlement() { - if (!props.split || !canConfirm.value) return + if (!props.split || !canConfirm.value || !authStore.user) return processing.value = true try { - await settleExpenseSplit(props.split.id, { + const activityData: SettlementActivityCreate = { expense_split_id: props.split.id, paid_by_user_id: props.split.user_id, amount_paid: totalAmount.value.toString() - }) + } + + await settlementService.recordSettlementActivity(props.split.id, activityData) notifications.addNotification({ type: 'success', - message: 'Settlement confirmed successfully!' + message: 'Settlement recorded successfully!' }) open.value = false resetForm() - } catch (error) { + } catch (error: any) { notifications.addNotification({ type: 'error', - message: 'Failed to confirm settlement. Please try again.' + message: error.response?.data?.detail || 'Failed to record settlement. Please try again.' }) } finally { processing.value = false @@ -387,10 +395,10 @@ async function submitDispute() { submittingDispute.value = true try { - // API call to submit dispute + // TODO: Add dispute API call when backend supports it notifications.addNotification({ type: 'success', - message: 'Dispute submitted. Both parties will be notified.' + message: 'Dispute submitted. The expense creator will be notified.' }) showDispute.value = false diff --git a/fe/src/components/global/NotificationDisplay.vue b/fe/src/components/global/NotificationDisplay.vue index cc7f51a..316313b 100644 --- a/fe/src/components/global/NotificationDisplay.vue +++ b/fe/src/components/global/NotificationDisplay.vue @@ -1,143 +1,768 @@ - \ No newline at end of file diff --git a/fe/src/components/list-detail/ItemsList.vue b/fe/src/components/list-detail/ItemsList.vue index 1e0da62..d11f15b 100644 --- a/fe/src/components/list-detail/ItemsList.vue +++ b/fe/src/components/list-detail/ItemsList.vue @@ -4,9 +4,8 @@ :class="{ 'highlight': supermarktMode && group.items.some(i => i.is_complete) }">

{{ group.categoryName }}

+ :disabled="!isOnline || supermarktMode" class="space-y-1" ghost-class="opacity-50" + drag-class="shadow-lg"> \ No newline at end of file diff --git a/fe/src/components/settlements/SettlementCard.vue b/fe/src/components/settlements/SettlementCard.vue new file mode 100644 index 0000000..5827818 --- /dev/null +++ b/fe/src/components/settlements/SettlementCard.vue @@ -0,0 +1,135 @@ + + + + + \ No newline at end of file diff --git a/fe/src/components/settlements/SettlementForm.vue b/fe/src/components/settlements/SettlementForm.vue new file mode 100644 index 0000000..461745a --- /dev/null +++ b/fe/src/components/settlements/SettlementForm.vue @@ -0,0 +1,250 @@ +