# app/crud/item.py from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from sqlalchemy import delete as sql_delete, update as sql_update # Use aliases from sqlalchemy.exc import SQLAlchemyError, IntegrityError, OperationalError from typing import Optional, List as PyList from datetime import datetime, timezone from app.models import Item as ItemModel from app.schemas.item import ItemCreate, ItemUpdate from app.core.exceptions import ( ItemNotFoundError, DatabaseConnectionError, DatabaseIntegrityError, DatabaseQueryError, DatabaseTransactionError, ConflictError ) async def create_item(db: AsyncSession, item_in: ItemCreate, list_id: int, user_id: int) -> ItemModel: """Creates a new item record for a specific list.""" try: db_item = ItemModel( name=item_in.name, quantity=item_in.quantity, list_id=list_id, added_by_id=user_id, is_complete=False # Default on creation # version is implicitly set to 1 by model default ) db.add(db_item) await db.flush() await db.refresh(db_item) await db.commit() # Explicitly commit here return db_item except IntegrityError as e: await db.rollback() # Rollback on integrity error raise DatabaseIntegrityError(f"Failed to create item: {str(e)}") except OperationalError as e: await db.rollback() # Rollback on operational error raise DatabaseConnectionError(f"Database connection error: {str(e)}") except SQLAlchemyError as e: await db.rollback() # Rollback on other SQLAlchemy errors raise DatabaseTransactionError(f"Failed to create item: {str(e)}") except Exception as e: # Catch any other exception and attempt rollback await db.rollback() raise # Re-raise the original exception async def get_items_by_list_id(db: AsyncSession, list_id: int) -> PyList[ItemModel]: """Gets all items belonging to a specific list, ordered by creation time.""" try: result = await db.execute( select(ItemModel) .where(ItemModel.list_id == list_id) .order_by(ItemModel.created_at.asc()) # Or desc() if preferred ) return result.scalars().all() except OperationalError as e: raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}") except SQLAlchemyError as e: raise DatabaseQueryError(f"Failed to query items: {str(e)}") async def get_item_by_id(db: AsyncSession, item_id: int) -> Optional[ItemModel]: """Gets a single item by its ID.""" try: result = await db.execute(select(ItemModel).where(ItemModel.id == item_id)) return result.scalars().first() except OperationalError as e: raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}") except SQLAlchemyError as e: raise DatabaseQueryError(f"Failed to query item: {str(e)}") async def update_item(db: AsyncSession, item_db: ItemModel, item_in: ItemUpdate, user_id: int) -> ItemModel: """Updates an existing item record, checking for version conflicts.""" try: # Check version conflict if item_db.version != item_in.version: raise ConflictError( f"Item '{item_db.name}' (ID: {item_db.id}) has been modified. " f"Your version is {item_in.version}, current version is {item_db.version}. Please refresh." ) update_data = item_in.model_dump(exclude_unset=True, exclude={'version'}) # Exclude version # Special handling for is_complete if 'is_complete' in update_data: if update_data['is_complete'] is True: if item_db.completed_by_id is None: # Only set if not already completed by someone update_data['completed_by_id'] = user_id else: update_data['completed_by_id'] = None # Clear if marked incomplete # Apply updates for key, value in update_data.items(): setattr(item_db, key, value) item_db.version += 1 # Increment version db.add(item_db) await db.flush() await db.refresh(item_db) # Commit the transaction if not part of a larger transaction await db.commit() return item_db except IntegrityError as e: await db.rollback() raise DatabaseIntegrityError(f"Failed to update item due to integrity constraint: {str(e)}") except OperationalError as e: await db.rollback() raise DatabaseConnectionError(f"Database connection error while updating item: {str(e)}") except ConflictError: # Re-raise ConflictError await db.rollback() raise except SQLAlchemyError as e: await db.rollback() raise DatabaseTransactionError(f"Failed to update item: {str(e)}") async def delete_item(db: AsyncSession, item_db: ItemModel) -> None: """Deletes an item record. Version check should be done by the caller (API endpoint).""" try: await db.delete(item_db) await db.commit() return None except OperationalError as e: await db.rollback() raise DatabaseConnectionError(f"Database connection error while deleting item: {str(e)}") except SQLAlchemyError as e: await db.rollback() raise DatabaseTransactionError(f"Failed to delete item: {str(e)}")