You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

155 lines
5.7 KiB
Python

from datetime import datetime, timedelta
from typing import Any, Dict
import httpx
from fastapi import HTTPException
from starlette.config import Config
from src.config import MAXIMO_API_KEY, MAXIMO_BASE_URL
class MaximoDataMapper:
"""
Helper class to map MAXIMO API response to our data structure.
Update these mappings according to actual MAXIMO API documentation.
"""
def __init__(self, maximo_data: Dict[Any, Any]):
self.data = maximo_data
def get_start_date(self) -> datetime:
"""
Extract start date from MAXIMO data.
TODO: Update this based on actual MAXIMO API response structure
Example: might be data['startDate'] or data['SCHEDSTART'] etc.
"""
# This is a placeholder - update with actual MAXIMO field name
start_date_str = self.data.get("scheduleStart")
if not start_date_str:
raise ValueError("Start date not found in MAXIMO data")
return datetime.fromisoformat(start_date_str)
def get_end_date(self) -> datetime:
"""
Extract end date from MAXIMO data.
TODO: Update this based on actual MAXIMO API response structure
"""
# This is a placeholder - update with actual MAXIMO field name
end_date_str = self.data.get("scheduleEnd")
if not end_date_str:
raise ValueError("End date not found in MAXIMO data")
return datetime.fromisoformat(end_date_str)
def get_maximo_id(self) -> str:
"""
Extract MAXIMO ID from response.
TODO: Update this based on actual MAXIMO API response structure
"""
# This is a placeholder - update with actual MAXIMO field name
maximo_id = self.data.get("workOrderId")
if not maximo_id:
raise ValueError("MAXIMO ID not found in response")
return str(maximo_id)
def get_status(self) -> str:
"""
Extract status from MAXIMO data.
TODO: Update this based on actual MAXIMO API response structure
"""
# This is a placeholder - update with actual MAXIMO status field and values
status = self.data.get("status", "").upper()
return status
def get_total_cost(self) -> float:
"""
Extract total cost from MAXIMO data.
TODO: Update this based on actual MAXIMO API response structure
"""
# This is a placeholder - update with actual MAXIMO field name
cost = self.data.get("totalCost", 0)
return float(cost)
def get_scope_name(self) -> str:
scope_name = self.data.get("location", "A")
return scope_name
class MaximoService:
def __init__(self):
# TODO: Update these settings based on actual MAXIMO API configuration
self.base_url = MAXIMO_BASE_URL
self.api_key = MAXIMO_API_KEY
async def get_recent_overhaul(self) -> dict:
"""
Fetch most recent overhaul from MAXIMO.
TODO: Update this method based on actual MAXIMO API endpoints and parameters
"""
current_date = datetime.now()
schedule_start = current_date + timedelta(days=30) # Starting in 30 days
schedule_end = schedule_start + timedelta(days=90) # 90 day overhaul period
return {
"scheduleStart": schedule_start.isoformat(),
"scheduleEnd": schedule_end.isoformat(),
"workOrderId": "WO-2024-12345",
"status": "PLAN", # Common Maximo statuses: SCHEDULED, INPRG, COMP, CLOSE
"totalCost": 10000000.00,
"description": "Annual Turbine Overhaul",
"priority": 1,
"location": "A",
"assetDetails": [
{
"assetnum": "ASSET001",
"description": "Gas Turbine",
"status": "OPERATING",
},
{
"assetnum": "ASSET002",
"description": "Steam Turbine",
"status": "OPERATING",
},
],
"workType": "OH", # OH for Overhaul
"createdBy": "MAXADMIN",
"createdDate": (current_date - timedelta(days=10)).isoformat(),
"lastModifiedBy": "MAXADMIN",
"lastModifiedDate": current_date.isoformat(),
}
async with httpx.AsyncClient() as client:
try:
# TODO: Update endpoint and parameters based on actual MAXIMO API
response = await client.get(
f"{self.base_url}/your-endpoint-here",
headers={
"Authorization": f"Bearer {self.api_key}",
# Add any other required headers
},
params={
# Update these parameters based on actual MAXIMO API
"orderBy": "-scheduleEnd", # Example parameter
"limit": 1,
},
)
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"MAXIMO API error: {response.text}",
)
data = response.json()
if not data:
raise HTTPException(
status_code=404, detail="No recent overhaul found"
)
# TODO: Update this based on actual MAXIMO response structure
return data[0] if isinstance(data, list) else data
except httpx.RequestError as e:
raise HTTPException(
status_code=503, detail=f"Failed to connect to MAXIMO: {str(e)}"
)