You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
be-optimumoh/src/overhaul_scope/service.py

224 lines
8.4 KiB
Python

from typing import Optional
from sqlalchemy import Delete, Select, func
from sqlalchemy.orm import selectinload
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import search_filter_sort_paginate
from src.overhaul_activity.model import OverhaulActivity
from src.utils import time_now, update_model
from src.standard_scope.model import MasterEquipment, StandardScope, EquipmentOHHistory
from src.workscope_group.model import MasterActivity
from src.workscope_group_maintenance_type.model import WorkscopeOHType
from src.equipment_workscope_group.model import EquipmentWorkscopeGroup
from .model import OverhaulScope, MaintenanceType
from .schema import ScopeCreate, ScopeUpdate
from .utils import get_material_cost, get_service_cost
from datetime import datetime
from uuid import UUID
async def get(
*, db_session: DbSession, overhaul_session_id: UUID
) -> Optional[OverhaulScope]:
"""Returns a document based on the given document id."""
query = Select(OverhaulScope).filter(OverhaulScope.id == overhaul_session_id).options(selectinload(OverhaulScope.maintenance_type))
result = await db_session.execute(query)
return result.scalars().one_or_none()
async def get_prev_oh(*, db_session: DbSession, overhaul_session: OverhaulScope) -> Optional[OverhaulScope]:
"""Returns a document based on the given document id."""
result = Select(OverhaulScope).where(OverhaulScope.end_date < overhaul_session.end_date).order_by(OverhaulScope.end_date.desc()).limit(1)
result = await db_session.execute(result)
return result.scalars().one_or_none()
async def get_all(*, common, scope_name: Optional[str] = None):
"""Returns all documents."""
query = Select(OverhaulScope)
if scope_name:
query = query.filter(OverhaulScope.maintenance_type.name == scope_name)
results = await search_filter_sort_paginate(model=query, **common)
return results
async def create(*, db_session: DbSession, scope_in: ScopeCreate):
# Ensure dates are datetime objects
if isinstance(scope_in.start_date, str):
try:
start_date = datetime.fromisoformat(scope_in.start_date.replace('Z', '+00:00'))
except ValueError:
start_date = datetime.strptime(scope_in.start_date, "%Y-%m-%d %H:%M:%S")
else:
start_date = scope_in.start_date
# Handle end_date (which could be None)
end_date = None
if scope_in.end_date:
if isinstance(scope_in.end_date, str):
try:
end_date = datetime.fromisoformat(scope_in.end_date.replace('Z', '+00:00'))
except ValueError:
end_date = datetime.strptime(scope_in.end_date, "%Y-%m-%d %H:%M:%S")
else:
end_date = scope_in.end_date
# Calculate duration in days if both dates are available
duration_days = None
if start_date and end_date:
duration_days = (end_date - start_date).days
maintenance_type = Select(MaintenanceType).where(MaintenanceType.name == scope_in.type)
maintenance_type = await db_session.execute(maintenance_type)
maintenance_type = maintenance_type.scalars().one_or_none()
if maintenance_type is None:
maintenance_type = MaintenanceType(name=scope_in.type)
db_session.add(maintenance_type)
await db_session.flush()
# Create the OverhaulScope object with all hardcoded values
overhaul_session = OverhaulScope(
start_date=scope_in.start_date,
end_date=scope_in.end_date,
maintenance_type=maintenance_type,
duration_oh=duration_days,
crew_number=scope_in.crew_number, # Hardcoded crew number
status=scope_in.status # Hardcoded status
)
# raise Exception(overhaul_session.start_date)
db_session.add(overhaul_session)
# Need to flush to get the id
await db_session.flush()
scope_name = scope_in.type
# Fix the function call - parameters were in wrong order
equipments = await get_by_scope_name(db_session=db_session, scope_name=scope_name)
material_cost = get_material_cost(
scope=overhaul_session.type, total_equipment=len(equipments)
)
service_cost = get_service_cost(
scope=overhaul_session.type, total_equipment=len(equipments)
)
scope_equipments = [
OverhaulActivity(
assetnum=equipment.assetnum,
overhaul_scope_id=overhaul_session.id,
material_cost=material_cost,
service_cost=service_cost,
)
for equipment in equipments
]
if scope_equipments: # Only add if there are items
db_session.add_all(scope_equipments)
await db_session.commit()
return overhaul_session
async def update(*, db_session: DbSession, scope: OverhaulScope, scope_in: ScopeUpdate):
"""Updates a document."""
data = scope_in.model_dump()
update_data = scope_in.model_dump(exclude_defaults=True)
update_model(scope, update_data)
await db_session.commit()
return scope
async def delete(*, db_session: DbSession, scope_id: str):
"""Deletes a document."""
query = Delete(OverhaulScope).where(OverhaulScope.id == scope_id)
await db_session.execute(query)
await db_session.commit()
async def get_overview_overhaul(*, db_session: DbSession):
current_date = time_now().date()
# 1. Check for ongoing overhaul
ongoing_query = (
Select(OverhaulScope)
.where(
OverhaulScope.start_date <= current_date,
OverhaulScope.end_date >= current_date,
)
)
ongoing_result = await db_session.execute(ongoing_query.options(selectinload(OverhaulScope.maintenance_type)))
ongoing_overhaul = ongoing_result.scalar_one_or_none()
# 2. If no ongoing overhaul, get the closest scheduled overhaul
if ongoing_overhaul is None:
# Get the closest future overhaul (next scheduled)
next_overhaul_query = (
Select(OverhaulScope)
.where(OverhaulScope.start_date > current_date)
.order_by(OverhaulScope.start_date.asc())
.limit(1)
)
next_result = await db_session.execute(next_overhaul_query.options(selectinload(OverhaulScope.maintenance_type)))
next_overhaul = next_result.scalar_one_or_none()
if next_overhaul:
print(f"Next scheduled overhaul starts on: {next_overhaul.start_date}")
selected_overhaul = next_overhaul
else:
print("No upcoming overhauls scheduled")
selected_overhaul = None
else:
print(f"Ongoing overhaul found: {ongoing_overhaul.start_date} to {ongoing_overhaul.end_date}")
selected_overhaul = ongoing_overhaul
equipments = (
Select(StandardScope)
.outerjoin(StandardScope.oh_history) # Use outerjoin to handle None values
.join(StandardScope.workscope_groups)
.join(EquipmentWorkscopeGroup.workscope_group)
.join(MasterActivity.oh_types)
.join(MasterEquipment, StandardScope.location_tag == MasterEquipment.location_tag)
.filter(WorkscopeOHType.maintenance_type_id == selected_overhaul.maintenance_type_id)
.filter(
(StandardScope.is_alternating_oh == False) |
(StandardScope.oh_history == None) |
(StandardScope.oh_history.has(EquipmentOHHistory.last_oh_type != selected_overhaul.maintenance_type.name))
).distinct()
)
results = await db_session.execute(equipments)
#Remaining days based on status
remaining_days = (selected_overhaul.start_date - current_date).days if selected_overhaul.status == "Upcoming" else (selected_overhaul.end_date - current_date).days
return {
"status": selected_overhaul.status,
"overhaul": {
"id": selected_overhaul.id,
"type": selected_overhaul.maintenance_type.name,
"start_date": selected_overhaul.start_date,
"end_date": selected_overhaul.end_date,
"duration_oh": selected_overhaul.duration_oh,
"crew_number": selected_overhaul.crew_number,
"remaining_days": remaining_days,
"equipment_count": len(results.scalars().all()),
},
}
async def get_all_oh_with_history_service(*, db_session: DbSession):
query = Select(OverhaulScope).options(selectinload(OverhaulScope.maintenance_type)).where(OverhaulScope.wo_parent.isnot(None))
results = await db_session.execute(query)
return results.scalars().all()