from typing import defaultdict, List, Optional, Union from uuid import UUID import logging import httpx from fastapi import HTTPException, status from sqlalchemy import Delete, Select, func, desc, and_, text from sqlalchemy.orm import selectinload from src.auth.service import CurrentUser from src.config import AEROS_BASE_URL, DEFAULT_PROJECT_NAME, RELIABILITY_SERVICE_API from src.database.core import CollectorDbSession, DbSession from src.database.service import search_filter_sort_paginate from .model import AerosEquipment, AerosEquipmentDetail, MasterEquipment, AerosEquipmentGroup, ReliabilityPredictNonRepairable from .schema import EquipmentConfiguration from src.aeros_project.model import AerosProject from src.aeros_simulation.model import AerosNode import asyncio import re import requests import json from src.utils import save_to_pastebin import pandas as pd from src.aeros_simulation.service import get_aeros_schematic_by_name client = httpx.AsyncClient(timeout=300.0) log = logging.getLogger() async def get_project(*, db_session: DbSession): stmt = Select(AerosProject).order_by(desc(AerosProject.updated_at)).limit(1) result = await db_session.execute(stmt) found_record = result.scalar_one_or_none() return found_record async def get_all(*, common): """Returns all documents.""" query = Select(AerosEquipment).options( selectinload(AerosEquipment.master_equipment) ) results = await search_filter_sort_paginate(model=query, **common) reg_nodes = [node.node_name for node in results["items"]] equipment_data = { node.node_name: node for node in results["items"] } project = await get_project(db_session=common["db_session"]) updateNodeReq = {"projectName": project.project_name , "equipmentNames": reg_nodes} try: response = await client.post( f"{AEROS_BASE_URL}/api/UpdateDisParams/GetUpdatedNodeDistributions", json=updateNodeReq, headers={"Content-Type": "application/json"}, ) response.raise_for_status() res = response.json() except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) results["items"] = [ {"AerosData": data, "MasterData": equipment_data.get(data["equipmentName"]) } for data in res ] return results async def get_equipment_by_location_tag(*, db_session: DbSession, location_tag: str): query = ( Select(AerosEquipment) .where(AerosEquipment.location_tag == location_tag) .options(selectinload(AerosEquipment.aeros_equipment_details)) ) async def get_by_id(*, db_session: DbSession, id: UUID): query = ( Select(AerosEquipment) .where(AerosEquipment.id == id) .options(selectinload(AerosEquipment.aeros_equipment_details)) ) result = await db_session.execute(query) aerosEquipmentResult = result.scalar() if not aerosEquipmentResult: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="AerosEquipment not found" ) aerosNodeReq = { "projectName": "ParallelNode", "equipmentName": [aerosEquipmentResult.node_name], } try: response = await client.post( f"{AEROS_BASE_URL}/api/UpdateDisParams/GetUpdatedNodeDistributions", json=aerosNodeReq, headers={"Content-Type": "application/json"}, ) response.raise_for_status() aerosEquipmentData = response.json() except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) return aerosEquipmentResult, aerosEquipmentData async def get_aeros_equipment_by_location_tag(*, location_tag: Union[str, List[str]], project_name): if isinstance(location_tag, str): location_tag = [location_tag] else: location_tag = location_tag aerosNodeReq = { "projectName": project_name, "equipmentNames": location_tag, } try: response = await client.post( f"{AEROS_BASE_URL}/api/UpdateDisParams/GetUpdatedNodeDistributions", json=aerosNodeReq, headers={"Content-Type": "application/json"}, ) response.raise_for_status() aerosEquipmentData = response.json() if not aerosEquipmentData: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="AerosEquipment not found" ) response.raise_for_status() df = pd.DataFrame(aerosEquipmentData) return df.drop_duplicates().to_dict('records') except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) async def update_node( *, db_session: DbSession, equipment_nodes: List[dict], project_name: str ): updateNodeReq = {"projectName": project_name, "regNodeInputs": equipment_nodes} try: response = await client.post( f"{AEROS_BASE_URL}/api/UpdateDisParams/UpdateEquipmentDistributions", json=updateNodeReq, headers={"Content-Type": "application/json"}, ) result = response.json() return result except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) async def save_default_equipment(*, db_session: DbSession, project_name: str): equipmments = Select(MasterEquipment).where( MasterEquipment.location_tag.isnot(None) ) equipment_nodes = await db_session.execute(equipmments) reg_nodes = [node.location_tag for node in equipment_nodes.scalars().all()] updateNodeReq = {"projectName": project_name, "equipmentNames": reg_nodes} # Delete old data query = Delete(AerosEquipment) await db_session.execute(query) try: response = await client.post( f"{AEROS_BASE_URL}/api/UpdateDisParams/GetUpdatedNodeDistributions", json=updateNodeReq, headers={"Content-Type": "application/json"}, ) response.raise_for_status() results = response.json() nodes = [] # raise Exception(results) # save to db for equipment in results: node = AerosEquipment( node_name=equipment["equipmentName"], location_tag=equipment["equipmentName"], ) nodes.append(node) db_session.add_all(nodes) await db_session.commit() return results except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) def get_asset_batch(location_tags: List[str], nr_location_tags: List[str], base_url: str = RELIABILITY_SERVICE_API, timeout: int = 30)-> dict: """ Get asset batch data using GET request with JSON body. Args: location_tags: List of location tag strings base_url: Base URL for the API timeout: Request timeout in seconds Returns: Dictionary with response data or None if error """ url = f"{base_url}/asset/batch" results = defaultdict(dict) payload = { "location_tags": location_tags } headers = { 'Content-Type': 'application/json' } try: response = requests.get( url, json=payload, headers=headers, timeout=timeout ) response.raise_for_status() # Raises an HTTPError for bad responses data = response.json() reliabiility_data = data.get("data", []) for item in reliabiility_data: location_tag = item.get("location_tag", None) if not location_tag: print("Error processing item" + str(item)) continue try: is_nr = item["distribution"] != "NHPP" mtbf = item["mtbf"] mttr = item["mttr"] distribution, reldisp1, reldisp2 = get_distribution(item) results[location_tag]["cmDisType"] = "Normal" results[location_tag]["cmDisP1"] = 6 results[location_tag]["cmDisP2"] = 3 results[location_tag]["relDisType"] = distribution results[location_tag]["relDisP1"] = reldisp1 results[location_tag]["relDisP2"] = reldisp2 results[location_tag]["parameters"] = item.get("parameters", {}) except Exception as e: raise Exception(f"Error processing item {location_tag}: {e}") return results except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to fetch asset batch data " + str(e) ) def get_distribution(item): name = item["distribution"] # Map distribution names to the expected format if name == "Weibull-2P": beta = item["parameters"].get("beta", 0) alpha = item["parameters"].get("alpha", 0) return "Weibull2", beta, alpha elif name == "Weibull-3P": beta = item["parameters"].get("beta", 0) alpha = item["parameters"].get("alpha", 0) return "Weibull3", beta, alpha elif name == "Exponential-2P": lambda_ = item["parameters"].get("Lambda", 0) gamma = item["parameters"].get("gamma", 0) return "Exponential2", lambda_, gamma elif name == "NHPP": beta = item["parameters"].get("beta", 0) eta = item["parameters"].get("eta", 0) return "NHPPTTFF", beta, eta elif name == "Lognormal": mu = item["parameters"].get("mu", 0) sigma = item["parameters"].get("sigma", 0) return "Lognormal", mu, sigma elif name == "Normal": mu = item["parameters"].get("mu", 0) sigma = item["parameters"].get("sigma", 0) return "Normal", mu, 1000 else: return "NHPPTTFF", 1, 100000 async def update_oh_interval_offset(*, aeros_db_session, overhaul_offset, overhaul_interval, project_name): query = text(""" UPDATE public."RegularNodes" rn SET "OHInterval" = :new_oh_interval, "OHOffset" = :new_oh_offset FROM public."Schematics" s JOIN public."Projects" p ON s."ProjectId" = p."ProjectId" WHERE rn."SchematicId" = s."SchematicId" AND p."ProjectName" = :project_name """) await aeros_db_session.execute(query, { "new_oh_interval": overhaul_interval, "new_oh_offset": overhaul_offset, "project_name": project_name }) await aeros_db_session.commit() async def update_equipment_for_simulation(*, db_session: DbSession,aeros_db_session:CollectorDbSession ,project_name: str,simulation_duration:int ,overhaul_duration, overhaul_interval, offset ,schematic_name: str, custom_input: Optional[dict] = None): log.info("Updating equipment for simulation") aeros_schematic = await get_aeros_schematic_by_name(db_session=db_session, schematic_name=schematic_name) equipments = Select(AerosEquipment).where( AerosEquipment.location_tag.isnot(None) ).join(AerosEquipment.aeros_node).filter(and_( AerosNode.aeros_schematic_id == aeros_schematic.id, AerosNode.node_type == "RegularNode" )) rbd_group = Select(AerosEquipmentGroup) print("Getting equipment nodes") equipment_nodes = await db_session.execute(equipments) rbd_group_nodes = await db_session.execute(rbd_group) group_nodes = [group.group_name for group in rbd_group_nodes.scalars().all()] reg_nodes = [node.location_tag for node in equipment_nodes.scalars().all()] reg_nodes.extend(group_nodes) nodes_data = await get_aeros_equipment_by_location_tag( location_tag=reg_nodes, project_name=project_name ) def load_json_file(filename: str): """Load a JSON file and return its content.""" try: with open(filename, 'r', encoding='utf-8') as f: return json.load(f) except FileNotFoundError: raise FileNotFoundError(f"JSON file not found: {filename}") except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON in file {filename}: {e}") non_repairable = await db_session.execute(Select(ReliabilityPredictNonRepairable)) non_repairable_location_tags = [tag.location_tag for tag in non_repairable.scalars().all()] print("Getting reliability data") reliability_data = get_asset_batch(reg_nodes, non_repairable_location_tags, RELIABILITY_SERVICE_API) reqNodeInputs = [] results = defaultdict() trip_eq = load_json_file("src/aeros_equipment/trip_eq.json") print("Updating Overhaul Offset & Overhaul Interval") await update_oh_interval_offset(aeros_db_session=aeros_db_session, project_name=project_name,overhaul_interval=overhaul_interval, overhaul_offset=offset) for eq in nodes_data: try: # Check if eq already exists in results if eq["equipmentName"] in results.keys(): continue reliabiility = reliability_data.get(eq["equipmentName"], {}) if custom_input and eq["equipmentName"] in custom_input: custom_param = custom_input[eq["equipmentName"]] if not custom_param["mttr"]: continue if not custom_param['failure_rate']: continue # Check eq dengan "TRIP" di WO, jika ada masukkan parameter Reliabilitu, jika tidak ada MTBF = duration eq["cmDisP1"] = custom_param["mttr"] eq["relDisType"] = "Fixed" eq["relDisP1"] = float(custom_param["failure_rate"]) eq["relDisP2"] = 0 eq["ohDisP1"] = overhaul_duration eq["ohDisUnitCode"] = "UHour" reqNodeInputs.append(eq) results[eq["equipmentName"]] = { "mttr": eq["cmDisP1"], "distribution": eq["relDisType"], "beta": eq["relDisP1"], "eta": 0, "parameters": {}, "oh_duration": overhaul_duration } continue eq["cmDisP1"] = reliabiility.get("cmDisP1", 0) eq["relDisType"] = reliabiility.get("relDisType", "Fixed") eq["relDisP1"] = reliabiility.get("relDisP1", 0) eq["relDisP2"] = reliabiility.get("relDisP2", 0) eq["ohDisP1"] = overhaul_duration eq["ohDisUnitCode"] = "UHour" if eq["equipmentName"] in trip_eq: eq["cmDisP1"] = reliabiility.get("cmDisP1", 0) eq["relDisType"] = reliabiility.get("relDisType", "Fixed") eq["relDisP1"] = reliabiility.get("relDisP1", 0) eq["relDisP2"] = reliabiility.get("relDisP2", 0) eq["ohDisP1"] = overhaul_duration eq["ohDisUnitCode"] = "UHour" reqNodeInputs.append(eq) results[eq["equipmentName"]] = { "mttr": eq["cmDisP1"], "distribution": eq["relDisType"], "beta": eq["relDisP1"], "eta": eq["relDisP2"], "parameters": eq.get("parameters", {}), "oh_duration": overhaul_duration } except Exception as e: print(f"Error fetching data for {eq['equipmentName']}: {e}") # Add equipment with default values reqNodeInputs.append(eq) print("Updating equipment for simulation") await update_node(db_session=db_session, equipment_nodes=reqNodeInputs, project_name=project_name) print("Updated equipment for simulation") return results # Optimized individual fetch functions async def get_equipment_mttr(*, location_tag: str, client: httpx.AsyncClient) -> float: """ Get MTTR for a single equipment using provided client """ mttr_url = f"{RELIABILITY_SERVICE_API}/asset/mttr/{location_tag}" try: response = await client.get(mttr_url) if response.status_code == 200: mttr_data = response.json() return mttr_data.get("data", {}).get("hours", 0) else: log.warning(f"MTTR API returned status {response.status_code} for {location_tag}") return 0 except Exception as e: log.error(f"Error getting MTTR for {location_tag}: {str(e)}") return 0 async def get_equipment_reliability_parameter(*, location_tag: str, client: httpx.AsyncClient): """ Get reliability parameters for a single equipment using provided client """ reliability_url = f"{RELIABILITY_SERVICE_API}/reliability/{location_tag}/current" try: response = await client.get(reliability_url) if response.status_code == 200: reliability_data = response.json() data = reliability_data.get("data", {}) return { "distribution": data.get("distribution", ""), "beta": data.get("parameters", 0).get("beta", 0), "eta": data.get("parameters", 0).get("eta", 0) } else: log.warning(f"Reliability API returned status {response.status_code} for {location_tag}") return {"distribution": "", "beta": 0, "eta": 0} except Exception as e: log.error(f"Error getting reliability parameters for {location_tag}: {str(e)}") return {"distribution": "", "beta": 0, "eta": 0}