You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
be-optimumoh/src/overhaul_activity/service.py

543 lines
20 KiB
Python

import asyncio
import datetime
from typing import List, Optional
from uuid import UUID, uuid4
from sqlalchemy import Delete, Select, and_, func, select
from sqlalchemy import update as sqlUpdate
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import joinedload, selectinload
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.overhaul_activity.utils import get_material_cost, get_service_cost
from src.overhaul_scope.model import OverhaulScope
from src.overhaul_scope.service import get as get_session, get_prev_oh
from src.standard_scope.model import MasterEquipment, StandardScope
from src.standard_scope.service import get_by_oh_session_id
from src.workscope_group.model import MasterActivity
from src.equipment_workscope_group.model import EquipmentWorkscopeGroup
from src.overhaul_scope.model import MaintenanceType
from src.workscope_group_maintenance_type.model import WorkscopeOHType
from src.overhaul_scope.service import get as get_overhaul
from src.standard_scope.model import EquipmentOHHistory
from .model import OverhaulActivity
from .schema import (OverhaulActivityCreate, OverhaulActivityRead,
OverhaulActivityUpdate)
import json
from src.database.core import CollectorDbSession
from src.maximo.service import get_cm_cost_summary, get_oh_cost_summary
async def get(
*, db_session: DbSession, assetnum: str, overhaul_session_id: Optional[UUID] = None
) -> Optional[OverhaulActivityRead]:
"""Returns a document based on the given document id."""
query = (
Select(OverhaulActivity)
.where(OverhaulActivity.assetnum == assetnum)
.options(joinedload(OverhaulActivity.equipment))
)
if overhaul_session_id:
query = query.filter(OverhaulActivity.overhaul_scope_id == overhaul_session_id)
result = await db_session.execute(query)
return result.scalar()
def get_cost_per_failute():
with open('src/overhaul_activity/cost_failure.json', 'r') as f:
data = json.load(f)
return data['data']
async def get_all(
*,
common: CommonParameters,
overhaul_session_id: UUID,
location_tag: Optional[str] = None,
scope_name: Optional[str] = None,
all: bool = False,
collector_db: CollectorDbSession
):
# query = (
# Select(OverhaulActivity)
# .where(OverhaulActivity.overhaul_scope_id == overhaul_session_id)
# .options(joinedload(OverhaulActivity.equipment).options(joinedload(MasterEquipment.parent).options(joinedload(MasterEquipment.parent))))
# .options(selectinload(OverhaulActivity.overhaul_scope))
# .options(selectinload(OverhaulActivity.overhaul_jobs).options(joinedload(OverhaulJob.scope_equipment_job).options(joinedload(ScopeEquipmentJob.job))))
# )
# if assetnum:
# query = query.filter(OverhaulActivity.assetnum == assetnum).options(
# joinedload(OverhaulActivity.overhaul_scope)
# )
# if scope_name:
# query = query.filter(OverhaulActivity.scope_name == scope_name).options(
# joinedload(OverhaulActivity.overhaul_scope)
# )
# results = await search_filter_sort_paginate(model=query, **common)
##raise Exception(results['items'][0].equipment.parent.__dict__)
# equipments, overhaul = await get_by_oh_session_id(
# db_session=db_session, oh_session_id=overhaul_session_id
# )
overhaul = await get_overhaul(db_session=common['db_session'], overhaul_session_id=overhaul_session_id)
prev_oh_scope = await get_prev_oh(db_session=common['db_session'], overhaul_session=overhaul)
query = (
Select(StandardScope)
.outerjoin(StandardScope.oh_history) # Use outerjoin to handle None values
.join(StandardScope.workscope_groups)
.join(EquipmentWorkscopeGroup.workscope_group)
.join(MasterActivity.oh_types)
.join(WorkscopeOHType.oh_type)
.join(MasterEquipment, StandardScope.location_tag == MasterEquipment.location_tag)
.filter(MaintenanceType.name == overhaul.maintenance_type.name).filter(
(StandardScope.is_alternating_oh == False) |
(StandardScope.oh_history == None) |
(StandardScope.oh_history.has(EquipmentOHHistory.last_oh_type != overhaul.maintenance_type.name))
).distinct()
)
equipments = (await common['db_session'].execute(query)).scalars().all()
material_cost = await get_cm_cost_summary(collector_db=collector_db, last_oh_date=prev_oh_scope.end_date, upcoming_oh_date=overhaul.start_date)
service_cost = get_service_cost(scope=overhaul.maintenance_type.name, total_equipment=len(equipments))
overhaul_cost = await get_oh_cost_summary(collector_db=collector_db, last_oh_date=prev_oh_scope.end_date, upcoming_oh_date=overhaul.start_date)
results = []
for equipment in equipments:
if not equipment.master_equipment:
continue
cost = material_cost.get(equipment.location_tag, 0)
oh_cost = overhaul_cost.get(equipment.location_tag, 0)
res = OverhaulActivityRead(
id=equipment.id,
material_cost=float(cost),
service_cost=equipment.service_cost,
overhaul_cost=float(oh_cost),
location_tag=equipment.location_tag,
equipment_name=equipment.master_equipment.name,
oh_scope=overhaul.maintenance_type.name,
)
results.append(res)
# # Pagination parameters
# page = common.get("page", 1)
# items_per_page = common.get("items_per_page", 10)
# Build response data
data = {
"items": results,
"total": len(results),
}
return data
async def get_standard_scope_by_session_id(*, db_session: DbSession, overhaul_session_id: UUID, collector_db: CollectorDbSession):
overhaul = await get_session(db_session=db_session, overhaul_session_id=overhaul_session_id)
prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=overhaul)
query = (
Select(StandardScope)
.outerjoin(
StandardScope.oh_history
) # Use outerjoin to handle None values
.join(StandardScope.workscope_groups)
.join(EquipmentWorkscopeGroup.workscope_group)
.join(MasterActivity.oh_types)
.join(WorkscopeOHType.oh_type)
.join(
MasterEquipment,
StandardScope.location_tag == MasterEquipment.location_tag,
)
.filter(MaintenanceType.name == overhaul.maintenance_type.name)
.filter(
(StandardScope.is_alternating_oh == False)
| (StandardScope.oh_history is None)
| (
StandardScope.oh_history.has(
EquipmentOHHistory.last_oh_type
!= overhaul.maintenance_type.name
)
)
)
.distinct()
)
data = await db_session.execute(query)
eqs = data.scalars().all()
results = []
material_cost = await get_cm_cost_summary(collector_db=collector_db, last_oh_date=prev_oh_scope.end_date, upcoming_oh_date=overhaul.start_date)
#service_cost = get_service_cost(scope=overhaul.maintenance_type.name, total_equipment=len(eqs))
overhaul_cost = await get_oh_cost_summary(collector_db=collector_db, last_oh_date=prev_oh_scope.end_date, upcoming_oh_date=overhaul.start_date)
for equipment in eqs:
cost = material_cost.get(equipment.location_tag,0)
oh_cost = overhaul_cost.get(equipment.location_tag,0)
res = OverhaulActivityRead(
id=equipment.id,
material_cost=float(cost),
service_cost=equipment.service_cost,
overhaul_cost=float(oh_cost),
location_tag=equipment.location_tag,
equipment_name=equipment.master_equipment.name if equipment.master_equipment else None,
oh_scope=overhaul.maintenance_type.name,
)
results.append(res)
return results
async def add_equipment_to_session(
*,
db_session: DbSession,
collector_db: CollectorDbSession,
overhaul_session_id: UUID,
location_tag: str
) -> Optional[StandardScope]:
"""
Add a new equipment to an existing overhaul session.
If equipment's workscope already maps to the overhaul type, skip.
Otherwise, attach a dummy workscope group for inclusion.
Args:
db_session: Database session
collector_db: Collector database session
overhaul_session_id: The UUID of the overhaul session
location_tag: The location tag of the equipment to add
Returns:
StandardScope: The newly created or updated StandardScope record, or None if skipped
"""
try:
# Get the overhaul session
overhaul = await get_session(
db_session=db_session, overhaul_session_id=overhaul_session_id
)
# Check if MasterEquipment exists
master_eq_query = select(MasterEquipment).filter(
MasterEquipment.location_tag == location_tag
)
master_eq_result = await db_session.execute(master_eq_query)
master_equipment = master_eq_result.scalar_one_or_none()
if not master_equipment:
print("Equipment not found in master")
return None
# Check if equipment already exists in StandardScope
existing_query = select(StandardScope).filter(
StandardScope.location_tag == location_tag
)
existing_result = await db_session.execute(existing_query)
existing_equipment = existing_result.scalar_one_or_none()
# --- Step 1: Fetch equipment's actual workscope mappings ---
eq_workscope_query = (
select(EquipmentWorkscopeGroup)
.join(EquipmentWorkscopeGroup.workscope_group)
.join(
WorkscopeOHType,
WorkscopeOHType.workscope_group_id == MasterActivity.id,
)
.join(
MaintenanceType,
MaintenanceType.id == WorkscopeOHType.maintenance_type_id,
)
.filter(EquipmentWorkscopeGroup.location_tag == location_tag)
)
eq_workscopes = (await db_session.execute(eq_workscope_query)).scalars().all()
# --- Step 2: Check if already included via natural workscope ---
is_already_included = any(
ws.workscope_group
and any(
wot.oh_type.name == overhaul.maintenance_type.name
for wot in ws.workscope_group.oh_types
)
for ws in eq_workscopes
)
if is_already_included:
# Already belongs to this overhaul naturally → skip
return None
# --- Step 3: Handle existing equipment ---
if existing_equipment:
# Add dummy workscope if not already attached
dummy_workscope = await get_or_create_dummy_workscope(
db_session=db_session,
maintenance_type_name=overhaul.maintenance_type.name,
)
if dummy_workscope not in existing_equipment.workscope_groups:
dummy_workscope.location_tag = location_tag
await db_session.commit()
await db_session.refresh(existing_equipment)
return existing_equipment
# --- Step 4: Create new StandardScope for fresh equipment ---
dummy_workscope = await get_or_create_dummy_workscope(
db_session=db_session,
maintenance_type_name=overhaul.maintenance_type.name,
)
new_equipment = StandardScope(
location_tag=location_tag,
is_alternating_oh=True,
assigned_date=datetime.date.today(),
)
dummy_workscope.location_tag = location_tag
db_session.add(new_equipment)
await db_session.commit()
await db_session.refresh(new_equipment)
return new_equipment
except Exception as e:
print(f"Error adding equipment {location_tag}: {str(e)}")
await db_session.rollback()
return None
async def get_or_create_dummy_workscope(
*,
db_session: DbSession,
maintenance_type_name: str,
) -> EquipmentWorkscopeGroup:
"""
Get or create a dummy workscope group for included equipment.
Args:
db_session: Database session
maintenance_type_name: Name of the maintenance type (e.g., "A", "B")
Returns:
EquipmentWorkscopeGroup: The dummy workscope group
"""
dummy_name = f"Included Equipment Workscope - {maintenance_type_name}"
# --- Step 1: Check if dummy MasterActivity exists ---
query = select(MasterActivity).filter(MasterActivity.workscope == dummy_name)
result = await db_session.execute(query)
master_activity = result.scalar_one_or_none()
if not master_activity:
master_activity = MasterActivity(workscope=dummy_name)
db_session.add(master_activity)
await db_session.commit()
await db_session.refresh(master_activity)
# --- Step 2: Ensure WorkscopeOHType link exists ---
mt_query = select(MaintenanceType).filter(MaintenanceType.name == maintenance_type_name)
mt_result = await db_session.execute(mt_query)
maintenance_type = mt_result.scalar_one()
wo_oh_type_query = select(WorkscopeOHType).filter(
and_(
WorkscopeOHType.workscope_group_id == master_activity.id,
WorkscopeOHType.maintenance_type_id == maintenance_type.id,
)
)
workscope_oh_type = (await db_session.execute(wo_oh_type_query)).scalar_one_or_none()
if not workscope_oh_type:
workscope_oh_type = WorkscopeOHType(
id=uuid4(),
workscope_group=master_activity,
oh_type=maintenance_type,
)
db_session.add(workscope_oh_type)
await db_session.commit()
await db_session.refresh(workscope_oh_type)
# --- Step 3: Create new EquipmentWorkscopeGroup ---
equipment_workscope_group = EquipmentWorkscopeGroup(
workscope_group=master_activity,
)
db_session.add(equipment_workscope_group)
await db_session.commit()
await db_session.refresh(equipment_workscope_group)
return equipment_workscope_group
async def add_multiple_equipment_to_session(
*,
db_session: DbSession,
collector_db: CollectorDbSession,
overhaul_session_id: UUID,
location_tags: List[str]
) -> List[StandardScope]:
"""
Add multiple equipment to an existing overhaul session.
Silently skips equipment that already exist or have errors.
Args:
db_session: Database session
collector_db: Collector database session
overhaul_session_id: The UUID of the overhaul session
location_tags: List of location tags to add
Returns:
List[StandardScope]: List of newly created/updated StandardScope records
"""
results = []
for location_tag in location_tags:
equipment = await add_equipment_to_session(
db_session=db_session,
collector_db=collector_db,
overhaul_session_id=overhaul_session_id,
location_tag=location_tag
)
if equipment:
results.append(equipment)
return results
async def get_all_by_session_id(*, db_session: DbSession, overhaul_session_id):
query = (
Select(OverhaulActivity)
.where(OverhaulActivity.overhaul_scope_id == overhaul_session_id)
.options(joinedload(OverhaulActivity.equipment).options(joinedload(MasterEquipment.parent).options(joinedload(MasterEquipment.parent))))
.options(selectinload(OverhaulActivity.overhaul_scope))
)
results = await db_session.execute(query)
return results.scalars().all()
async def update(
*,
db_session: DbSession,
activity: OverhaulActivity,
overhaul_activity_in: OverhaulActivityUpdate
):
"""Updates a document."""
data = overhaul_activity_in.model_dump()
update_data = overhaul_activity_in.model_dump(exclude_defaults=True)
for field in data:
if field in update_data:
setattr(activity, field, update_data[field])
await db_session.commit()
return activity
async def remove_equipment_from_session(
*,
db_session: DbSession,
overhaul_session_id: UUID,
location_tag: str
) -> bool:
"""
Remove equipment from a given overhaul session.
Only removes dummy workscope group links; natural workscope mappings are preserved.
Args:
db_session: Database session
overhaul_session_id: The UUID of the overhaul session
location_tag: The location tag of the equipment to remove
Returns:
bool: True if removed, False if skipped (not found or naturally included)
"""
try:
# Get the overhaul session
overhaul = await get_session(
db_session=db_session, overhaul_session_id=overhaul_session_id
)
# Find the equipment in StandardScope
existing_query = select(StandardScope).filter(
StandardScope.location_tag == location_tag
)
existing_result = await db_session.execute(existing_query)
equipment = existing_result.scalar_one_or_none()
if not equipment:
print(f"Equipment {location_tag} not found in StandardScope")
return False
# --- Step 1: Check if equipment belongs naturally (exclude dummy groups) ---
eq_workscope_query = (
select(EquipmentWorkscopeGroup)
.join(EquipmentWorkscopeGroup.workscope_group)
.join(
WorkscopeOHType,
WorkscopeOHType.workscope_group_id == MasterActivity.id,
)
.join(
MaintenanceType,
MaintenanceType.id == WorkscopeOHType.maintenance_type_id,
)
.filter(EquipmentWorkscopeGroup.location_tag == location_tag)
.filter(~MasterActivity.workscope.like("Included Equipment Workscope%")) # 🚨 exclude dummy
)
eq_workscopes = (await db_session.execute(eq_workscope_query)).scalars().all()
is_natural_inclusion = any(
ws.workscope_group
and any(
wot.oh_type.name == overhaul.maintenance_type.name
for wot in ws.workscope_group.oh_types
)
for ws in eq_workscopes
)
if is_natural_inclusion:
print(f"Equipment {location_tag} is naturally part of OH {overhaul.maintenance_type.name}, skip removal")
return False
# --- Step 2: Remove dummy workscope group for this overhaul ---
dummy_name = f"Included Equipment Workscope - {overhaul.maintenance_type.name}"
dummy_group = next(
(wg for wg in equipment.workscope_groups if wg.workscope_group.workscope == dummy_name),
None,
)
if dummy_group:
equipment.workscope_groups.remove(dummy_group)
await db_session.commit()
await db_session.refresh(equipment)
# --- Step 3 (optional): Delete StandardScope if no groups left ---
if not equipment.workscope_groups:
await db_session.delete(equipment)
await db_session.commit()
print(f"Equipment {location_tag} completely removed from StandardScope")
return True
print(f"No dummy workscope group found for {location_tag} in OH {overhaul.maintenance_type.name}")
return False
except Exception as e:
print(f"Error removing equipment {location_tag}: {str(e)}")
await db_session.rollback()
return False