feat: Integrate historical Maximo work order data for overhaul and standard scopes, and add an API endpoint to retrieve all calculation time constraints.

main
Cizz22 1 month ago
parent afcf5ab780
commit 08a2d2f843

@ -72,6 +72,7 @@ authenticated_api_router.include_router(
overhaul_router, prefix="/overhauls", tags=["overhaul"]
)
# authenticated_api_router.include_router(job_router, prefix="/jobs", tags=["job"])
# # # Overhaul session data

@ -18,9 +18,9 @@ from .schema import (CalculationResultsRead,
CalculationTimeConstrainsParametersCreate,
CalculationTimeConstrainsParametersRead,
CalculationTimeConstrainsParametersRetrive,
CalculationTimeConstrainsRead, CreateCalculationQuery, EquipmentResult)
CalculationTimeConstrainsRead, CreateCalculationQuery, EquipmentResult, CalculationTimeConstrainsReadNoResult)
from .service import (bulk_update_equipment, get_calculation_result,
get_calculation_result_by_day, get_calculation_by_assetnum)
get_calculation_result_by_day, get_calculation_by_assetnum, get_all_calculations)
from src.database.core import CollectorDbSession
router = APIRouter()
@ -65,6 +65,28 @@ async def create_calculation_time_constrains(
return StandardResponse(data=results, message="Data created successfully")
@router.get(
"", response_model=StandardResponse[List[CalculationTimeConstrainsReadNoResult]]
)
async def get_all_simulation_calculations(
db_session: DbSession,
token: Token,
current_user: CurrentUser,
):
"""Get all calculation time constrains Here"""
calculations = await get_all_calculations(
db_session=db_session,
)
return StandardResponse(
data=calculations,
message="Data retrieved successfully",
)
@router.get(
"/parameters",
response_model=StandardResponse[

@ -104,6 +104,17 @@ class AnalysisMetadata(CalculationTimeConstrainsBase):
total_equipment_analyzed: int
included_in_optimization: int
class CalculationTimeConstrainsReadNoResult(CalculationTimeConstrainsBase):
id: UUID
created_at: datetime
optimum_oh_day: Optional[int]
max_interval: Optional[int]
optimum_analysis: Optional[dict]
# optimum_oh_day: int
# max_interval: int
# optimal_analysis: dict
# analysis_metadata: dict
class CalculationTimeConstrainsRead(CalculationTimeConstrainsBase):
id: UUID
reference: UUID

@ -1118,6 +1118,22 @@ async def get_calculation_data_by_id(
return result.unique().scalar()
async def get_all_calculations(
db_session: DbSession,
) -> List[CalculationData]:
stmt = (
select(CalculationData)
.where(
CalculationData.optimum_oh_day.isnot(None),
CalculationData.max_interval.isnot(None),
CalculationData.optimum_analysis.isnot(None),
)
.order_by(CalculationData.created_at.desc())
)
result = await db_session.execute(stmt)
return result.scalars().all()
async def get_calculation_by_assetnum(
*, db_session: DbSession, assetnum: str, calculation_id: str
):

@ -8,7 +8,7 @@ class CommonParams(DefultBase):
# This ensures no extra query params are allowed
current_user: Optional[str] = Field(None, alias="currentUser")
page: int = Field(1, gt=0, lt=2147483647)
items_per_page: int = Field(5, gt=-2, lt=2147483647)
items_per_page: int = Field(5, gt=-2, lt=2147483647, alias="itemsPerPage")
query_str: Optional[str] = Field(None, alias="q")
filter_spec: Optional[str] = Field(None, alias="filter")
sort_by: List[str] = Field(default_factory=list, alias="sortBy[]")

@ -1,11 +1,12 @@
from datetime import datetime
from typing import Optional, Union
from sqlalchemy import select, func, cast, Numeric, text
from sqlalchemy.orm import Session
from sqlalchemy import and_
from sqlalchemy.sql import not_
from src.maximo.model import WorkOrderData # Assuming this is where your model is
from src.database.core import CollectorDbSession
from src.database.core import CollectorDbSession, DbSession
from src.overhaul_scope.model import OverhaulScope
async def get_cm_cost_summary(collector_db: CollectorDbSession, last_oh_date:datetime, upcoming_oh_date:datetime):
query = text("""WITH part_costs AS (
@ -251,3 +252,81 @@ async def get_oh_cost_summary(collector_db: CollectorDbSession, last_oh_date:dat
return {
item["location_tag"]: item["avg_cost"] for item in data
}
from uuid import UUID
async def get_history_oh_wo(*, db_session: DbSession, collector_db_session: CollectorDbSession, oh_session_id: UUID, parent_wo_num: Optional[Union[str, list]] = None):
## Get Parent wo num from oh session table
if not parent_wo_num:
query = select(OverhaulScope.wo_parent).where(OverhaulScope.id == oh_session_id)
result = await db_session.execute(query)
parent_wo_num = result.scalar()
if not parent_wo_num:
return []
# Ensure parent_wo_num is a list and removed duplicates if any
if isinstance(parent_wo_num, str):
parent_wo_num = [parent_wo_num]
else:
parent_wo_num = list(set(parent_wo_num))
sql_query = text("""
WITH target_wos AS (
SELECT
w.wonum,
w.assetnum,
COALESCE(w.actmatcost, 0) as actmatcost,
COALESCE(w.actservcost, 0) as actservcost
FROM public.wo_maximo w
WHERE w.xx_parent = ANY(:parent_wo_num)
),
wo_tasks AS (
SELECT
t.xx_parent AS parent_wonum,
JSON_AGG(t.description) AS task_list
FROM public.wo_maximo t
JOIN target_wos tw ON t.xx_parent = tw.wonum
GROUP BY t.xx_parent
)
SELECT
w.assetnum,
e.name AS equipment_name,
e.location_tag,
JSON_OBJECT_AGG(w.wonum, COALESCE(wt.task_list, '[]'::json)) AS wonum_list,
COUNT(w.wonum) AS total_wo_count,
COALESCE(SUM(w.actmatcost), 0) AS total_material_cost,
COALESCE(SUM(w.actservcost), 0) AS total_service_cost,
COALESCE(SUM(w.actmatcost + w.actservcost), 0) AS total_actual_cost
FROM target_wos w
INNER JOIN public.ms_equipment_master e
ON w.assetnum = e.assetnum
LEFT JOIN wo_tasks wt
ON w.wonum = wt.parent_wonum
GROUP BY
w.assetnum,
e.name,
e.location_tag
ORDER BY total_actual_cost DESC;
""")
results = await collector_db_session.execute(sql_query, {"parent_wo_num": parent_wo_num})
return [
{
"assetnum": row.assetnum,
"equipment_name": row.equipment_name,
"location_tag": row.location_tag,
"wonum_list": row.wonum_list,
"total_wo_count": row.total_wo_count,
"total_material_cost": float(row.total_material_cost),
"total_service_cost": float(row.total_service_cost),
"total_actual_cost": float(row.total_actual_cost)
}
for row in results
]

@ -1,3 +1,5 @@
from sqlalchemy import null
from sqlalchemy import JSON
from sqlalchemy import Column, DateTime, Float, Integer, String, ForeignKey, UUID
from sqlalchemy.orm import relationship
@ -14,6 +16,7 @@ class OverhaulScope(Base, DefaultMixin):
status = Column(String, nullable=False, default="Upcoming")
maintenance_type_id = Column(
UUID(as_uuid=True), ForeignKey("oh_ms_maintenance_type.id"), nullable=False)
wo_parent = Column(JSON, nullable=True)
maintenance_type = relationship("MaintenanceType", lazy="selectin", backref="overhaul_scopes")
# activity_equipments = relationship("OverhaulActivity", lazy="selectin")

@ -1,3 +1,4 @@
from typing import List
from typing import Optional
from fastapi import APIRouter, HTTPException, status
@ -9,7 +10,7 @@ from src.models import StandardResponse
from .model import OverhaulScope
from .schema import ScopeCreate, ScopePagination, ScopeRead, ScopeUpdate
from .service import create, delete, get, get_all, update
from .service import create, delete, get, get_all, update,get_all_oh_with_history_service
router = APIRouter()
@ -25,6 +26,9 @@ async def get_scopes(common: CommonParameters, scope_name: Optional[str] = None)
message="Data retrieved successfully",
)
@router.get("/history", response_model=StandardResponse[List[ScopeRead]])
async def get_history(db_session: DbSession):
return StandardResponse(data=await get_all_oh_with_history_service(db_session=db_session), message="Data retrieved successfully")
@router.get("/{overhaul_session_id}", response_model=StandardResponse[ScopeRead])
async def get_scope(db_session: DbSession, overhaul_session_id: str):
@ -79,3 +83,5 @@ async def delete_scope(db_session: DbSession, scope_id: str):
await delete(db_session=db_session, scope_id=scope_id)
return StandardResponse(message="Data deleted successfully", data=scope)

@ -220,53 +220,7 @@ async def get_overview_overhaul(*, db_session: DbSession):
},
}
# if ongoing_result:
# ongoing_overhaul, equipment_count = ongoing_result # Unpack the result tuple
# return {
# "status": "Ongoing",
# "overhaul": {
# "id": ongoing_overhaul.id,
# "type": ongoing_overhaul.maintenance_type.name,
# "start_date": ongoing_overhaul.start_date,
# "end_date": ongoing_overhaul.end_date,
# "duration_oh": ongoing_overhaul.duration_oh,
# "crew_number": ongoing_overhaul.crew_number,
# "remaining_days": (ongoing_overhaul.end_date - current_date).days,
# "equipment_count": equipment_count,
# },
# }
# # For upcoming overhaul with count
# upcoming_query = (
# Select(OverhaulScope, func.count(OverhaulActivity.id).label("equipment_count"))
# .outerjoin(OverhaulScope.activity_equipments)
# .where(
# OverhaulScope.start_date > current_date,
# )
# .group_by(OverhaulScope.id)
# .order_by(OverhaulScope.start_date)
# )
# upcoming_result = await db_session.execute(upcoming_query)
# upcoming_result = upcoming_result.first()
# if upcoming_result:
# upcoming_overhaul, equipment_count = upcoming_result # Unpack the result tuple
# days_until = (upcoming_overhaul.start_date - current_date).days
# return {
# "status": "Upcoming",
# "overhaul": {
# "id": upcoming_overhaul.id,
# "type": upcoming_overhaul.type,
# "start_date": upcoming_overhaul.start_date,
# "end_date": upcoming_overhaul.end_date,
# "duration_oh": upcoming_overhaul.duration_oh,
# "crew_number": upcoming_overhaul.crew_number,
# "remaining_days": days_until,
# "equipment_count": equipment_count,
# },
# }
# return {"status": "no_overhaul", "overhaul": None}
async def get_all_oh_with_history_service(*, db_session: DbSession):
query = Select(OverhaulScope).options(selectinload(OverhaulScope.maintenance_type)).where(OverhaulScope.wo_parent.isnot(None))
results = await db_session.execute(query)
return results.scalars().all()

@ -4,15 +4,15 @@ from fastapi import APIRouter, HTTPException, status
from fastapi.params import Query
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.core import DbSession, CollectorDbSession
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.models import StandardResponse
from .schema import (MasterEquipmentPagination, ScopeEquipmentCreate,
ScopeEquipmentPagination, ScopeEquipmentRead,
ScopeEquipmentUpdate)
from .service import (create, delete, get_all, get_all_master_equipment, update)
from .service import (create, delete, get_all, get_all_master_equipment, update, get_history_standard_scope_wo_service)
from uuid import UUID
router = APIRouter()
@ -47,6 +47,13 @@ async def create_scope_equipment(
return StandardResponse(data=scope, message="Data created successfully")
@router.get("/history/{oh_session_id}", response_model=StandardResponse[List[dict]])
async def get_history_standard_scope_wo(
db_session: DbSession, collector_db_session:CollectorDbSession, oh_session_id:UUID):
results = await get_history_standard_scope_wo_service(db_session=db_session, collector_db_session=collector_db_session, oh_session_id=oh_session_id)
return StandardResponse(data=results, message="Data retrieved successfully")
# @router.put("/{assetnum}", response_model=StandardResponse[ScopeEquipmentRead])
# async def update_scope_equipment(
# db_session: DbSession, assetnum: str, scope__equipment_in: ScopeEquipmentUpdate

@ -7,7 +7,7 @@ from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import selectinload
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.core import DbSession, CollectorDbSession
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.overhaul_scope.model import OverhaulScope
from src.standard_scope.enum import ScopeEquipmentType
@ -18,6 +18,7 @@ from src.workscope_group.model import MasterActivity
from src.workscope_group_maintenance_type.model import WorkscopeOHType
from src.overhaul_scope.model import MaintenanceType
from src.overhaul_scope.service import get as get_overhaul
from src.maximo.service import get_history_oh_wo
from .model import MasterEquipment, MasterEquipmentTree, StandardScope
from .schema import ScopeEquipmentCreate, ScopeEquipmentUpdate
from uuid import UUID
@ -220,3 +221,25 @@ async def get_equipment_level_by_no(*, db_session: DbSession, level: int):
result = await db_session.execute(query)
return result.scalars().all()
async def get_history_standard_scope_wo_service(*, db_session: DbSession, collector_db_session:CollectorDbSession, oh_session_id:UUID):
planning_oh_data = await get_by_oh_session_id(db_session=db_session, oh_session_id=oh_session_id)
planning_scopes = planning_oh_data[0]
overhaul = planning_oh_data[1]
results = await get_history_oh_wo(
db_session=db_session,
collector_db_session=collector_db_session,
oh_session_id=oh_session_id,
parent_wo_num=overhaul.wo_parent
)
scope_cost_map = {scope.location_tag: scope.service_cost for scope in planning_scopes}
for result in results:
result["planning_service_cost"] = scope_cost_map.get(result["location_tag"], 0)
return results
Loading…
Cancel
Save