mitlist/be/app/crud/item.py
2025-05-07 20:16:16 +02:00

108 lines
4.6 KiB
Python

# app/crud/item.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import delete as sql_delete, update as sql_update # Use aliases
from sqlalchemy.exc import SQLAlchemyError, IntegrityError, OperationalError
from typing import Optional, List as PyList
from datetime import datetime, timezone
from app.models import Item as ItemModel
from app.schemas.item import ItemCreate, ItemUpdate
from app.core.exceptions import (
ItemNotFoundError,
DatabaseConnectionError,
DatabaseIntegrityError,
DatabaseQueryError,
DatabaseTransactionError
)
async def create_item(db: AsyncSession, item_in: ItemCreate, list_id: int, user_id: int) -> ItemModel:
"""Creates a new item record for a specific list."""
try:
async with db.begin():
db_item = ItemModel(
name=item_in.name,
quantity=item_in.quantity,
list_id=list_id,
added_by_id=user_id,
is_complete=False # Default on creation
)
db.add(db_item)
await db.flush()
await db.refresh(db_item)
return db_item
except IntegrityError as e:
raise DatabaseIntegrityError(f"Failed to create item: {str(e)}")
except OperationalError as e:
raise DatabaseConnectionError(f"Database connection error: {str(e)}")
except SQLAlchemyError as e:
raise DatabaseTransactionError(f"Failed to create item: {str(e)}")
async def get_items_by_list_id(db: AsyncSession, list_id: int) -> PyList[ItemModel]:
"""Gets all items belonging to a specific list, ordered by creation time."""
try:
async with db.begin():
result = await db.execute(
select(ItemModel)
.where(ItemModel.list_id == list_id)
.order_by(ItemModel.created_at.asc()) # Or desc() if preferred
)
return result.scalars().all()
except OperationalError as e:
raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}")
except SQLAlchemyError as e:
raise DatabaseQueryError(f"Failed to query items: {str(e)}")
async def get_item_by_id(db: AsyncSession, item_id: int) -> Optional[ItemModel]:
"""Gets a single item by its ID."""
try:
async with db.begin():
result = await db.execute(select(ItemModel).where(ItemModel.id == item_id))
return result.scalars().first()
except OperationalError as e:
raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}")
except SQLAlchemyError as e:
raise DatabaseQueryError(f"Failed to query item: {str(e)}")
async def update_item(db: AsyncSession, item_db: ItemModel, item_in: ItemUpdate, user_id: int) -> ItemModel:
"""Updates an existing item record."""
try:
async with db.begin():
update_data = item_in.model_dump(exclude_unset=True) # Get only provided fields
# Special handling for is_complete
if 'is_complete' in update_data:
if update_data['is_complete'] is True:
# Mark as complete: set completed_by_id if not already set
if item_db.completed_by_id is None:
update_data['completed_by_id'] = user_id
else:
# Mark as incomplete: clear completed_by_id
update_data['completed_by_id'] = None
# Ensure updated_at is refreshed (handled by onupdate in model, but explicit is fine too)
# update_data['updated_at'] = datetime.now(timezone.utc)
for key, value in update_data.items():
setattr(item_db, key, value)
db.add(item_db) # Add to session to track changes
await db.flush()
await db.refresh(item_db)
return item_db
except IntegrityError as e:
raise DatabaseIntegrityError(f"Failed to update item: {str(e)}")
except OperationalError as e:
raise DatabaseConnectionError(f"Database connection error: {str(e)}")
except SQLAlchemyError as e:
raise DatabaseTransactionError(f"Failed to update item: {str(e)}")
async def delete_item(db: AsyncSession, item_db: ItemModel) -> None:
"""Deletes an item record."""
try:
async with db.begin():
await db.delete(item_db)
return None # Or return True/False
except OperationalError as e:
raise DatabaseConnectionError(f"Database connection error: {str(e)}")
except SQLAlchemyError as e:
raise DatabaseTransactionError(f"Failed to delete item: {str(e)}")