dummy overhaul history

feature/reliability_stat
Cizz22 1 year ago
parent 06f881cef3
commit d6f0a4816b

@ -31,6 +31,7 @@ def get_env_tags(tag_list: List[str]) -> dict:
return tags
def get_config():
try:
# Try to load from .env file first
@ -41,6 +42,7 @@ def get_config():
return config
config = get_config()
@ -69,6 +71,8 @@ SQLALCHEMY_DATABASE_URI = f"postgresql+asyncpg://{_DATABASE_CREDENTIAL_USER}:{_Q
TIMEZONE = "Asia/Jakarta"
MAXIMO_BASE_URL = config("MAXIMO_BASE_URL", default="http://example.com")
MAXIMO_API_KEY = config("MAXIMO_API_KEY", default="keys")
AUTH_SERVICE_API = config(
"AUTH_SERVICE_API", default="http://192.168.1.82:8000/auth")

@ -1,11 +1,11 @@
from datetime import datetime
from datetime import datetime, timedelta
from typing import Any, Dict
from fastapi import HTTPException
import httpx
from starlette.config import Config
from src.config import config
from src.config import MAXIMO_API_KEY, MAXIMO_BASE_URL
class MaximoDataMapper:
@ -69,18 +69,56 @@ class MaximoDataMapper:
cost = self.data.get('totalCost', 0)
return float(cost)
def get_scope_name(self) -> str:
scope_name = self.data.get('location', "A")
return scope_name
class MaximoService:
def __init__(self):
# TODO: Update these settings based on actual MAXIMO API configuration
self.base_url = config.get("MAXIMO_BASE_URL")
self.api_key = config.get("MAXIMO_API_KEY")
self.base_url = MAXIMO_BASE_URL
self.api_key = MAXIMO_API_KEY
async def get_recent_overhaul(self) -> dict:
"""
Fetch most recent overhaul from MAXIMO.
TODO: Update this method based on actual MAXIMO API endpoints and parameters
"""
current_date = datetime.now()
schedule_start = current_date + \
timedelta(days=30) # Starting in 30 days
schedule_end = schedule_start + \
timedelta(days=90) # 90 day overhaul period
return {
"scheduleStart": schedule_start.isoformat(),
"scheduleEnd": schedule_end.isoformat(),
"workOrderId": "WO-2024-12345",
"status": "PLAN", # Common Maximo statuses: SCHEDULED, INPRG, COMP, CLOSE
"totalCost": 10000000.00,
"description": "Annual Turbine Overhaul",
"priority": 1,
"location": "A",
"assetDetails": [
{
"assetnum": "ASSET001",
"description": "Gas Turbine",
"status": "OPERATING"
},
{
"assetnum": "ASSET002",
"description": "Steam Turbine",
"status": "OPERATING"
}
],
"workType": "OH", # OH for Overhaul
"createdBy": "MAXADMIN",
"createdDate": (current_date - timedelta(days=10)).isoformat(),
"lastModifiedBy": "MAXADMIN",
"lastModifiedDate": current_date.isoformat()
}
async with httpx.AsyncClient() as client:
try:
# TODO: Update endpoint and parameters based on actual MAXIMO API

@ -38,13 +38,14 @@ async def get_history(db_session: DbSession, overhaul_history_id: str):
@router.post("", response_model=StandardResponse[OverhaulHistoryRead])
async def create_history(db_session: DbSession, scope_in: OverhaulHistoryRead):
async def create_history(db_session: DbSession, scope_in: OverhaulHistoryCreate):
try:
maximo_service = MaximoService()
maximo_data = await maximo_service.get_recent_overhaul()
overhaul = await start_overhaul(db_session=db_session, maximo_data=maximo_data)
except HTTPException as he:
raise he

@ -22,7 +22,7 @@ class OverhaulHistoryUpdate(OverhaulHistoryBase):
class OverhaulHistoryRead(OverhaulHistoryBase):
id: UUID
scope: ScopeRead
scope_id: UUID
schedule_start_date: datetime
schedule_end_date: Optional[datetime]
total_cost: Optional[float] = Field(0)

@ -34,21 +34,23 @@ async def start_overhaul(*, db_session: DbSession, maximo_data: dict):
maximo_id = mapper.get_maximo_id()
# Check for existing overhaul
existing_overhaul = db_session.query(OverhaulHistory).filter(
existing_overhaul = Select(OverhaulHistory).filter(
and_(
OverhaulHistory.maximo_id == maximo_id,
OverhaulHistory.status == OverhaulStatus.IN_PROGRESS
)
).first()
)
res = await db_session.execute(existing_overhaul)
if existing_overhaul:
if res.first():
raise HTTPException(
status_code=409,
detail=f"Overhaul with MAXIMO ID {maximo_id} already started"
)
status, status_reason = await determine_overhaul_status(maximo_data)
scope = await get_by_scope_name(db_session=db_session, scope_name="A")
scope = await get_by_scope_name(db_session=db_session, scope_name=mapper.get_scope_name())
overhaul = OverhaulHistory(
scope_id=scope.id,

Loading…
Cancel
Save