import redis.asyncio as redis from app.config import settings import json from typing import Any, Dict redis_pool = redis.from_url(settings.REDIS_URL, encoding="utf-8", decode_responses=True) async def get_redis(): return redis_pool # --------------------------------------------------------------------------- # Helper functions for Pub/Sub messaging # --------------------------------------------------------------------------- async def broadcast_event(channel: str, event: Dict[str, Any]) -> None: """Serialize the given event as JSON and publish it on the specified Redis channel. This is primarily used by API endpoints to notify WebSocket consumers of real-time changes (e.g. an item being claimed). Consumers subscribe to the corresponding channel (see ``subscribe_to_channel``) and relay the JSON payload to connected clients. """ redis_conn = await get_redis() # NOTE: ``json.dumps`` ensures we only send textual data over the wire # which plays nicely with WebSocket ``send_text`` in the frontend. await redis_conn.publish(channel, json.dumps(event)) async def subscribe_to_channel(channel: str): """Return a PubSub instance already subscribed to *channel*. The caller is responsible for reading messages from the returned ``pubsub`` object and for eventually invoking :func:`unsubscribe_from_channel` to clean up resources. """ redis_conn = await get_redis() pubsub = redis_conn.pubsub() await pubsub.subscribe(channel) return pubsub async def unsubscribe_from_channel(channel: str, pubsub) -> None: """Unsubscribe from *channel* and properly close the PubSub connection.""" try: await pubsub.unsubscribe(channel) finally: # Close the pubsub object to release the underlying connection back to # the pool. We ignore any exceptions here because we are often in a # ``finally`` block during WebSocket shutdown. await pubsub.close()