from fastapi import HTTPException, status from sqlalchemy import Select, Delete, desc, func, not_, or_ from sqlalchemy.dialects.postgresql import insert from src.workorder.model import MasterWorkOrder from .model import MasterEquipmentTree, ScopeEquipment, MasterEquipment from src.scope.service import get_by_scope_name as get_scope_by_name_service from .schema import ScopeEquipmentCreate, ScopeEquipmentUpdate from typing import Optional, Union from sqlalchemy.orm import selectinload from src.database.service import CommonParameters, search_filter_sort_paginate from src.database.core import DbSession from src.auth.service import CurrentUser async def get(*, db_session: DbSession, scope_equipment_id: str) -> Optional[ScopeEquipment]: """Returns a document based on the given document id.""" query = Select(ScopeEquipment).filter( ScopeEquipment.id == scope_equipment_id) result = await db_session.execute(query) return result.scalars().one_or_none() async def get_by_assetnum(*, db_session: DbSession, assetnum: str): query = Select(ScopeEquipment).filter(ScopeEquipment.assetnum == assetnum).options( selectinload(ScopeEquipment.master_equipment)) result = await db_session.execute(query) return result.unique().scalars().one_or_none() async def get_all(*, db_session: DbSession, common, scope_name: str = None, exclude: bool = False): """Returns all documents.""" query = Select(ScopeEquipment).options(selectinload( ScopeEquipment.parent_scope), selectinload(ScopeEquipment.current_scope), selectinload(ScopeEquipment.master_equipment)) query = query.order_by(desc(ScopeEquipment.created_at)) if scope_name: scope = await get_scope_by_name_service(db_session=db_session, scope_name=scope_name) query = query.filter(ScopeEquipment.current_scope_id == scope.id) if not exclude else query.filter( ScopeEquipment.current_scope_id != scope.id) results = await search_filter_sort_paginate(model=query, **common) return results async def create(*, db_session: DbSession, scope_equipment_in: ScopeEquipmentCreate): """Creates a new document.""" # scope_equipment = ScopeEquipment(**scope_equipment_in.model_dump()) assetnums = scope_equipment_in.assetnums scope = await get_scope_by_name_service( db_session=db_session, scope_name=scope_equipment_in.scope_name) if not scope: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="A scope with this name does not exist.", ) results = [] for assetnum in assetnums: stmt = insert(ScopeEquipment).values( assetnum=assetnum, current_scope_id=scope.id ) stmt = stmt.on_conflict_do_update( index_elements=["assetnum"], set_={ "current_scope_id": scope.id } ) await db_session.execute(stmt) results.append(assetnum) await db_session.commit() return results async def update(*, db_session: DbSession, scope_equipment: ScopeEquipment, scope_equipment_in: ScopeEquipmentUpdate): """Updates a document.""" data = scope_equipment_in.model_dump() update_data = scope_equipment_in.model_dump(exclude_defaults=True) for field in data: if field in update_data: setattr(scope_equipment, field, update_data[field]) await db_session.commit() return scope_equipment async def delete(*, db_session: DbSession, scope_equipment_id: str): """Deletes a document.""" # query = Delete(ScopeEquipment).where( # ScopeEquipment.id == scope_equipment_id) # await db_session.execute(query) # await db_session.commit() query = Select(ScopeEquipment).filter( ScopeEquipment.id == scope_equipment_id) scope_equipment = await db_session.execute(query) scope_equipment: ScopeEquipment = scope_equipment.scalars().one_or_none() if not scope_equipment: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="A data with this id does not exist.", ) if not scope_equipment.scope_id: await db_session.delete(scope_equipment) else: if scope_equipment.current_scope_id == scope_equipment.scope_id: await db_session.delete(scope_equipment) else: scope_equipment.current_scope_id = scope_equipment.scope_id await db_session.commit() async def get_by_scope_name(*, db_session: DbSession, scope_name: Union[str, list]) -> Optional[ScopeEquipment]: """Returns a document based on the given document id.""" scope = await get_scope_by_name_service(db_session=db_session, scope_name=scope_name) query = Select(ScopeEquipment).options(selectinload(ScopeEquipment.master_equipment)) if scope: query = query.filter(ScopeEquipment.current_scope_id == scope.id) result = await db_session.execute(query) return result.scalars().all() async def get_exculed_scope_name(*, db_session: DbSession, scope_name: Union[str, list]) -> Optional[ScopeEquipment]: scope = await get_scope_by_name_service(db_session=db_session, scope_name=scope_name) query = Select(ScopeEquipment) if scope: query = query.filter(ScopeEquipment.current_scope_id != scope.id) else: query = query.filter(ScopeEquipment.current_scope_id != None) result = await db_session.execute(query) return result.scalars().all() async def get_all_master_equipment(*, db_session: DbSession, exclude: Optional[str] = None, common: CommonParameters): scope = await get_scope_by_name_service(db_session=db_session, scope_name=exclude) query = Select(MasterEquipment).filter(MasterEquipment.assetnum != None) query = query.outerjoin( ScopeEquipment, MasterEquipment.assetnum == ScopeEquipment.assetnum) if scope: query = query.filter( not_(ScopeEquipment.current_scope_id == scope.id) ) results = await search_filter_sort_paginate(model=query, **common) return results async def get_equipment_level_by_no(*, db_session: DbSession, level: int): query = Select(MasterEquipmentTree).filter( MasterEquipmentTree.level_no == level) result = await db_session.scalar(query) return result