from uuid import UUID from sqlalchemy import Select, Delete from sqlalchemy.orm import joinedload from typing import List, Optional from src.scope_equipment.model import ScopeEquipment from .model import OverhaulActivity from .schema import OverhaulActivityCreate, OverhaulActivityUpdate, OverhaulActivityRead from src.database.core import DbSession from src.database.service import CommonParameters, search_filter_sort_paginate from src.auth.service import CurrentUser async def get(*, db_session: DbSession, assetnum: str, overhaul_session_id: Optional[UUID] = None) -> Optional[OverhaulActivityRead]: """Returns a document based on the given document id.""" query = Select(OverhaulActivity).where( OverhaulActivity.assetnum == assetnum).options(joinedload(OverhaulActivity.equipment)) if overhaul_session_id: query = query.filter( OverhaulActivity.overhaul_scope_id == overhaul_session_id) result = await db_session.execute(query) return result.scalar() async def get_all(*, common: CommonParameters, overhaul_session_id: UUID, assetnum: Optional[str] = None, scope_name: Optional[str] = None): query = Select(OverhaulActivity).where( OverhaulActivity.overhaul_scope_id == overhaul_session_id).options(joinedload(OverhaulActivity.equipment)) if assetnum: query = query.filter(OverhaulActivity.assetnum == assetnum).options( joinedload(OverhaulActivity.overhaul_scope)) if scope_name: query = query.filter(OverhaulActivity.scope_name == scope_name).options( joinedload(OverhaulActivity.overhaul_scope)) results = await search_filter_sort_paginate(model=query, **common) return results async def create(*, db_session: DbSession, overhaul_activty_in: OverhaulActivityCreate, overhaul_session_id: UUID): # Check if the combination of assetnum and activity_id already exists existing_equipment_query = ( Select(OverhaulActivity) .where( OverhaulActivity.assetnum == overhaul_activty_in.assetnum, OverhaulActivity.overhaul_scope_id == overhaul_session_id ) ) result = await db_session.execute(existing_equipment_query) existing_activity = result.scalar_one_or_none() # If the combination exists, raise an exception or return the existing activity if existing_activity: raise ValueError("This assetnum already exist.") activity = OverhaulActivity( **overhaul_activty_in.model_dump(), overhaul_scope_id=overhaul_session_id) db_session.add(activity) await db_session.commit() # Refresh and load relationships using joinedload query = ( Select(OverhaulActivity) .options(joinedload(OverhaulActivity.equipment)) .where(OverhaulActivity.id == activity.id) ) result = await db_session.execute(query) activity_with_relationship = result.scalar_one() return activity_with_relationship async def update(*, db_session: DbSession, activity: OverhaulActivity, overhaul_activity_in: OverhaulActivityUpdate): """Updates a document.""" data = overhaul_activity_in.model_dump() update_data = overhaul_activity_in.model_dump(exclude_defaults=True) for field in data: if field in update_data: setattr(activity, field, update_data[field]) await db_session.commit() return activity async def delete(*, db_session: DbSession, overhaul_activity_id: str): """Deletes a document.""" activity = await db_session.get(OverhaulActivity, overhaul_activity_id) await db_session.delete(activity) await db_session.commit()