import asyncio from typing import List, Optional from uuid import UUID from sqlalchemy import Delete, Select, func, select from sqlalchemy import update as sqlUpdate from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import joinedload, selectinload from src.auth.service import CurrentUser from src.database.core import DbSession from src.database.service import CommonParameters, search_filter_sort_paginate from src.overhaul_activity.utils import get_material_cost, get_service_cost from src.overhaul_scope.model import OverhaulScope from src.overhaul_scope.service import get as get_session, get_prev_oh from src.standard_scope.model import MasterEquipment, StandardScope from src.standard_scope.service import get_by_oh_session_id from src.workscope_group.model import MasterActivity from src.equipment_workscope_group.model import EquipmentWorkscopeGroup from src.overhaul_scope.model import MaintenanceType from src.workscope_group_maintenance_type.model import WorkscopeOHType from src.overhaul_scope.service import get as get_overhaul from src.standard_scope.model import EquipmentOHHistory from .model import OverhaulActivity from .schema import (OverhaulActivityCreate, OverhaulActivityRead, OverhaulActivityUpdate) import json from src.database.core import CollectorDbSession from src.maximo.service import get_cm_cost_summary, get_oh_cost_summary async def get( *, db_session: DbSession, assetnum: str, overhaul_session_id: Optional[UUID] = None ) -> Optional[OverhaulActivityRead]: """Returns a document based on the given document id.""" query = ( Select(OverhaulActivity) .where(OverhaulActivity.assetnum == assetnum) .options(joinedload(OverhaulActivity.equipment)) ) if overhaul_session_id: query = query.filter(OverhaulActivity.overhaul_scope_id == overhaul_session_id) result = await db_session.execute(query) return result.scalar() def get_cost_per_failute(): with open('src/overhaul_activity/cost_failure.json', 'r') as f: data = json.load(f) return data['data'] async def get_all( *, common: CommonParameters, overhaul_session_id: UUID, location_tag: Optional[str] = None, scope_name: Optional[str] = None, all: bool = False, collector_db: CollectorDbSession ): # query = ( # Select(OverhaulActivity) # .where(OverhaulActivity.overhaul_scope_id == overhaul_session_id) # .options(joinedload(OverhaulActivity.equipment).options(joinedload(MasterEquipment.parent).options(joinedload(MasterEquipment.parent)))) # .options(selectinload(OverhaulActivity.overhaul_scope)) # .options(selectinload(OverhaulActivity.overhaul_jobs).options(joinedload(OverhaulJob.scope_equipment_job).options(joinedload(ScopeEquipmentJob.job)))) # ) # if assetnum: # query = query.filter(OverhaulActivity.assetnum == assetnum).options( # joinedload(OverhaulActivity.overhaul_scope) # ) # if scope_name: # query = query.filter(OverhaulActivity.scope_name == scope_name).options( # joinedload(OverhaulActivity.overhaul_scope) # ) # results = await search_filter_sort_paginate(model=query, **common) ##raise Exception(results['items'][0].equipment.parent.__dict__) # equipments, overhaul = await get_by_oh_session_id( # db_session=db_session, oh_session_id=overhaul_session_id # ) overhaul = await get_overhaul(db_session=common['db_session'], overhaul_session_id=overhaul_session_id) prev_oh_scope = await get_prev_oh(db_session=common['db_session'], overhaul_session=overhaul) query = ( Select(StandardScope) .outerjoin(StandardScope.oh_history) # Use outerjoin to handle None values .join(StandardScope.workscope_groups) .join(EquipmentWorkscopeGroup.workscope_group) .join(MasterActivity.oh_types) .join(WorkscopeOHType.oh_type) .join(MasterEquipment, StandardScope.location_tag == MasterEquipment.location_tag) .filter(MaintenanceType.name == overhaul.maintenance_type.name).filter( (StandardScope.is_alternating_oh == False) | (StandardScope.oh_history == None) | (StandardScope.oh_history.has(EquipmentOHHistory.last_oh_type != overhaul.maintenance_type.name)) ).distinct() ) num_equipments = len((await common['db_session'].execute(query)).scalars().all()) material_cost = await get_cm_cost_summary(collector_db=collector_db, last_oh_date=prev_oh_scope.end_date, upcoming_oh_date=overhaul.start_date) service_cost = get_service_cost(scope=overhaul.maintenance_type.name, total_equipment=num_equipments) overhaul_cost = await get_oh_cost_summary(collector_db=collector_db, last_oh_date=prev_oh_scope.end_date, upcoming_oh_date=overhaul.start_date) equipments = await search_filter_sort_paginate(model=query, **common) data = equipments['items'] results = [] for equipment in data: if not equipment.master_equipment: continue cost = material_cost.get(equipment.location_tag, 0) oh_cost = overhaul_cost.get(equipment.location_tag, 0) res = OverhaulActivityRead( id=equipment.id, material_cost=float(cost), service_cost=equipment.service_cost, overhaul_cost=float(oh_cost), location_tag=equipment.location_tag, equipment_name=equipment.master_equipment.name if equipment.master_equipment else None, oh_scope=overhaul.maintenance_type.name, ) results.append(res) results.sort(key=lambda x: x.material_cost, reverse=True) equipments['items'] = results return equipments async def get_standard_scope_by_session_id(*, db_session: DbSession, overhaul_session_id: UUID, collector_db: CollectorDbSession): overhaul = await get_session(db_session=db_session, overhaul_session_id=overhaul_session_id) prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=overhaul) query = ( Select(StandardScope) .outerjoin( StandardScope.oh_history ) # Use outerjoin to handle None values .join(StandardScope.workscope_groups) .join(EquipmentWorkscopeGroup.workscope_group) .join(MasterActivity.oh_types) .join(WorkscopeOHType.oh_type) .join( MasterEquipment, StandardScope.location_tag == MasterEquipment.location_tag, ) .filter(MaintenanceType.name == overhaul.maintenance_type.name) .filter( (StandardScope.is_alternating_oh == False) | (StandardScope.oh_history is None) | ( StandardScope.oh_history.has( EquipmentOHHistory.last_oh_type != overhaul.maintenance_type.name ) ) ) .distinct() ) data = await db_session.execute(query) eqs = data.scalars().all() results = [] material_cost = await get_cm_cost_summary(collector_db=collector_db, last_oh_date=prev_oh_scope.end_date, upcoming_oh_date=overhaul.start_date) #service_cost = get_service_cost(scope=overhaul.maintenance_type.name, total_equipment=len(eqs)) overhaul_cost = await get_oh_cost_summary(collector_db=collector_db, last_oh_date=prev_oh_scope.end_date, upcoming_oh_date=overhaul.start_date) for equipment in eqs: cost = material_cost.get(equipment.location_tag,0) oh_cost = overhaul_cost.get(equipment.location_tag,0) res = OverhaulActivityRead( id=equipment.id, material_cost=float(cost), service_cost=equipment.service_cost, overhaul_cost=float(oh_cost), location_tag=equipment.location_tag, equipment_name=equipment.master_equipment.name if equipment.master_equipment else None, oh_scope=overhaul.maintenance_type.name, ) results.append(res) return results async def add_equipment_to_session( *, db_session: DbSession, collector_db: CollectorDbSession, overhaul_session_id: UUID, location_tag: str ) -> Optional[StandardScope]: """ Add a new equipment to an existing overhaul session. Creates a dummy workscope group if needed. Silently skips if equipment already exists in the session. Args: db_session: Database session collector_db: Collector database session overhaul_session_id: The UUID of the overhaul session location_tag: The location tag of the equipment to add Returns: StandardScope: The newly created StandardScope record, or None if already exists """ try: # Get the overhaul session overhaul = await get_session(db_session=db_session, overhaul_session_id=overhaul_session_id) # Check if MasterEquipment exists master_eq_query = select(MasterEquipment).filter( MasterEquipment.location_tag == location_tag ) master_eq_result = await db_session.execute(master_eq_query) master_equipment = master_eq_result.scalar_one_or_none() if not master_equipment: print("equipment not found in master") return None # Check if equipment already exists in StandardScope existing_query = select(StandardScope).filter( StandardScope.location_tag == location_tag ) existing_result = await db_session.execute(existing_query) existing_equipment = existing_result.scalar_one_or_none() if existing_equipment: # Equipment already in StandardScope, check if it's in current session # by verifying if it matches the session's maintenance type criteria check_query = ( Select(StandardScope) .outerjoin(StandardScope.oh_history) .join(StandardScope.workscope_groups) .join(EquipmentWorkscopeGroup.workscope_group) .join(MasterActivity.oh_types) .join(WorkscopeOHType.oh_type) .filter(StandardScope.location_tag == location_tag) .filter(MaintenanceType.name == overhaul.maintenance_type.name) .filter( (StandardScope.is_alternating_oh == False) | (StandardScope.oh_history is None) | ( StandardScope.oh_history.has( EquipmentOHHistory.last_oh_type != overhaul.maintenance_type.name ) ) ) ) check_result = await db_session.execute(check_query) in_session = check_result.scalar_one_or_none() if in_session: # Already in current session, skip silently return None # Equipment exists but not in this session, need to add workscope # Find or create dummy workscope group dummy_workscope = await get_or_create_dummy_workscope( db_session=db_session, maintenance_type_name=overhaul.maintenance_type.name ) # Add the dummy workscope to existing equipment if not already there if dummy_workscope not in existing_equipment.workscope_groups: existing_equipment.workscope_groups.append(dummy_workscope) await db_session.commit() await db_session.refresh(existing_equipment) return existing_equipment # Equipment not in StandardScope at all, create new # First, get or create dummy workscope group dummy_workscope = await get_or_create_dummy_workscope( db_session=db_session, maintenance_type_name=overhaul.maintenance_type.name ) # Create new StandardScope record new_equipment = StandardScope( location_tag=location_tag, is_alternating_oh=False, # Add other required fields based on your model ) # Associate with dummy workscope group new_equipment.workscope_groups = [dummy_workscope] db_session.add(new_equipment) await db_session.commit() await db_session.refresh(new_equipment) return new_equipment except Exception as e: # Log the error but don't raise it print(f"Error adding equipment {location_tag}: {str(e)}") await db_session.rollback() return None async def get_or_create_dummy_workscope( *, db_session: DbSession, maintenance_type_name: str ) -> EquipmentWorkscopeGroup: """ Get or create a dummy workscope group for included equipment. Args: db_session: Database session maintenance_type_name: Name of the maintenance type (e.g., "A", "B") Returns: EquipmentWorkscopeGroup: The dummy workscope group """ dummy_name = f"Included Equipment Workscope - {maintenance_type_name}" # Check if dummy workscope already exists query = ( select(EquipmentWorkscopeGroup) .join(EquipmentWorkscopeGroup.workscope_group) .join(MasterActivity.oh_types) .join(WorkscopeOHType.oh_type) .filter(MasterActivity.name == dummy_name) .filter(MaintenanceType.name == maintenance_type_name) ) result = await db_session.execute(query) existing = result.scalar_one_or_none() if existing: return existing # Create dummy workscope group # First, get the maintenance type mt_query = select(MaintenanceType).filter(MaintenanceType.name == maintenance_type_name) mt_result = await db_session.execute(mt_query) maintenance_type = mt_result.scalar_one() # Create MasterActivity (workscope_group) master_activity = MasterActivity( workscope=dummy_name, ) # Create WorkscopeOHType relationship workscope_oh_type = WorkscopeOHType( workscope_group=master_activity, oh_type=maintenance_type ) # Create EquipmentWorkscopeGroup equipment_workscope_group = EquipmentWorkscopeGroup( workscope_group=master_activity, # Add other required fields ) db_session.add(equipment_workscope_group) await db_session.commit() await db_session.refresh(equipment_workscope_group) return equipment_workscope_group async def add_multiple_equipment_to_session( *, db_session: DbSession, collector_db: CollectorDbSession, overhaul_session_id: UUID, location_tags: List[str] ) -> List[StandardScope]: """ Add multiple equipment to an existing overhaul session. Silently skips equipment that already exist or have errors. Args: db_session: Database session collector_db: Collector database session overhaul_session_id: The UUID of the overhaul session location_tags: List of location tags to add Returns: List[StandardScope]: List of newly created/updated StandardScope records """ results = [] for location_tag in location_tags: equipment = await add_equipment_to_session( db_session=db_session, collector_db=collector_db, overhaul_session_id=overhaul_session_id, location_tag=location_tag ) if equipment: results.append(equipment) return results async def get_all_by_session_id(*, db_session: DbSession, overhaul_session_id): query = ( Select(OverhaulActivity) .where(OverhaulActivity.overhaul_scope_id == overhaul_session_id) .options(joinedload(OverhaulActivity.equipment).options(joinedload(MasterEquipment.parent).options(joinedload(MasterEquipment.parent)))) .options(selectinload(OverhaulActivity.overhaul_scope)) ) results = await db_session.execute(query) return results.scalars().all() async def create( *, db_session: DbSession, overhaul_activty_in: OverhaulActivityCreate, overhaul_session_id: UUID ): """Creates a new document.""" assetnums = overhaul_activty_in.assetnums if not assetnums: return [] # Get session and count in parallel session = await get_session( db_session=db_session, overhaul_session_id=overhaul_session_id ) equipment_count = await db_session.scalar( select(func.count()) .select_from(OverhaulActivity) .where(OverhaulActivity.overhaul_scope_id == overhaul_session_id) ) # Calculate costs for all records total_equipment = equipment_count + len(assetnums) material_cost = get_material_cost( scope=session.type, total_equipment=total_equipment ) service_cost = get_service_cost(scope=session.type, total_equipment=total_equipment) # Create the insert statement stmt = insert(OverhaulActivity).values( [ { "assetnum": assetnum, "overhaul_scope_id": overhaul_session_id, "material_cost": material_cost, "service_cost": service_cost, } for assetnum in assetnums ] ) # Add the ON CONFLICT DO NOTHING clause stmt = stmt.on_conflict_do_nothing(index_elements=["assetnum", "overhaul_scope_id"]) # Execute the statement await db_session.execute(stmt) await db_session.execute( sqlUpdate(OverhaulActivity) .where(OverhaulActivity.overhaul_scope_id == overhaul_session_id) .values(material_cost=material_cost, service_cost=service_cost) ) await db_session.commit() return assetnums async def update( *, db_session: DbSession, activity: OverhaulActivity, overhaul_activity_in: OverhaulActivityUpdate ): """Updates a document.""" data = overhaul_activity_in.model_dump() update_data = overhaul_activity_in.model_dump(exclude_defaults=True) for field in data: if field in update_data: setattr(activity, field, update_data[field]) await db_session.commit() return activity async def delete(*, db_session: DbSession, overhaul_activity_id: str): """Deletes a document.""" activity = await db_session.get(OverhaulActivity, overhaul_activity_id) await db_session.delete(activity) await db_session.commit()