From 367aeb3e431720a91e68dd441e2b5a811c1b2552 Mon Sep 17 00:00:00 2001 From: Cizz22 Date: Thu, 17 Jul 2025 14:31:13 +0700 Subject: [PATCH] fix rbd project name --- .env | 2 +- src/aeros_equipment/model.py | 9 +- src/aeros_equipment/schema.py | 22 ++--- src/aeros_equipment/service.py | 167 ++++++++++++++++++++++++++++++-- src/aeros_project/service.py | 11 ++- src/aeros_simulation/router.py | 10 +- src/aeros_simulation/service.py | 12 ++- 7 files changed, 208 insertions(+), 25 deletions(-) diff --git a/.env b/.env index b9af272..067acce 100644 --- a/.env +++ b/.env @@ -16,4 +16,4 @@ COLLECTOR_CREDENTIAL_PASSWORD=postgres COLLECTOR_NAME=digital_twin -WINDOWS_AEROS_BASE_URL=http://192.168.1.87:8800 \ No newline at end of file +WINDOWS_AEROS_BASE_URL=http://192.168.1.87:8800 diff --git a/src/aeros_equipment/model.py b/src/aeros_equipment/model.py index b363514..a56d4a4 100644 --- a/src/aeros_equipment/model.py +++ b/src/aeros_equipment/model.py @@ -66,11 +66,18 @@ class AerosEquipment(Base, DefaultMixin): master_equipment = relationship( "MasterEquipment", - lazy="joined", + lazy="raise", primaryjoin="and_(AerosEquipment.location_tag == foreign(MasterEquipment.location_tag))", uselist=False, # Add this if it's a one-to-one relationship ) + aeros_node = relationship( + "AerosNode", + lazy="joined", + primaryjoin="and_(AerosEquipment.node_name == foreign(AerosNode.node_name))", + uselist=False, + ) + class AerosEquipmentDetail(Base, DefaultMixin): diff --git a/src/aeros_equipment/schema.py b/src/aeros_equipment/schema.py index 0e3d6b3..baffd63 100644 --- a/src/aeros_equipment/schema.py +++ b/src/aeros_equipment/schema.py @@ -60,12 +60,12 @@ class FlowrateUnit(str, Enum): PER_MINUTE = "PerMinute" -class DistributionType(str, Enum): - LOGNORMAL = "Lognormal" - NORMAL = "Normal" - FIXED = "Fixed" - UNIFORM = "Uniform" - EXPONENTIAL = "Exponential" +# class DistributionType(str, Enum): +# LOGNORMAL = "Lognormal" +# NORMAL = "Normal" +# FIXED = "Fixed" +# UNIFORM = "Uniform" +# EXPONENTIAL = "Exponential" class UnitCode(str, Enum): @@ -93,7 +93,7 @@ class EquipmentConfiguration(EquipmentBase): ) # Reliability Distribution Parameters - rel_dis_type: DistributionType = Field( + rel_dis_type: str = Field( ..., alias="relDisType", description="Reliability distribution type" ) rel_dis_p1: float = Field( @@ -110,7 +110,7 @@ class EquipmentConfiguration(EquipmentBase): ) # Corrective Maintenance Distribution Parameters - cm_dis_type: DistributionType = Field( + cm_dis_type: str = Field( ..., alias="cmDisType", description="Corrective maintenance distribution type" ) cm_dis_p1: float = Field( @@ -135,7 +135,7 @@ class EquipmentConfiguration(EquipmentBase): ) # Inspection Distribution Parameters - ip_dis_type: DistributionType = Field( + ip_dis_type: str = Field( ..., alias="ipDisType", description="Inspection distribution type" ) ip_dis_p1: float = Field( @@ -152,7 +152,7 @@ class EquipmentConfiguration(EquipmentBase): ) # Preventive Maintenance Distribution Parameters - pm_dis_type: DistributionType = Field( + pm_dis_type: str = Field( ..., alias="pmDisType", description="Preventive maintenance distribution type" ) pm_dis_p1: float = Field( @@ -177,7 +177,7 @@ class EquipmentConfiguration(EquipmentBase): ) # Overhaul Distribution Parameters - oh_dis_type: DistributionType = Field( + oh_dis_type: str = Field( ..., alias="ohDisType", description="Overhaul distribution type" ) oh_dis_p1: float = Field( diff --git a/src/aeros_equipment/service.py b/src/aeros_equipment/service.py index 3b2f1d3..e6ca8c4 100644 --- a/src/aeros_equipment/service.py +++ b/src/aeros_equipment/service.py @@ -1,21 +1,30 @@ -from typing import List, Optional +from typing import defaultdict, List, Optional, Union from uuid import UUID - +import logging import httpx from fastapi import HTTPException, status -from sqlalchemy import Delete, Select, func +from sqlalchemy import Delete, Select, func, desc from sqlalchemy.orm import selectinload from src.auth.service import CurrentUser from src.config import AEROS_BASE_URL, DEFAULT_PROJECT_NAME from src.database.core import DbSession from src.database.service import search_filter_sort_paginate - from .model import AerosEquipment, AerosEquipmentDetail, MasterEquipment from .schema import EquipmentConfiguration +from src.aeros_project.model import AerosProject +from src.aeros_simulation.model import AerosNode +import asyncio client = httpx.AsyncClient(timeout=300.0) +log = logging.getLogger() + +async def get_project(*, db_session: DbSession): + stmt = Select(AerosProject).order_by(desc(AerosProject.updated_at)).limit(1) + result = await db_session.execute(stmt) + found_record = result.scalar_one_or_none() + return found_record async def get_all(*, common): """Returns all documents.""" @@ -28,7 +37,9 @@ async def get_all(*, common): node.node_name: node for node in results["items"] } - updateNodeReq = {"projectName": DEFAULT_PROJECT_NAME, "equipmentNames": reg_nodes} + project = await get_project(db_session=common["db_session"]) + + updateNodeReq = {"projectName": project.project_name , "equipmentNames": reg_nodes} try: response = await client.post( @@ -95,11 +106,43 @@ async def get_by_id(*, db_session: DbSession, id: UUID): return aerosEquipmentResult, aerosEquipmentData +async def get_aeros_equipment_by_location_tag(*, location_tag: Union[str, List[str]], project_name): + if isinstance(location_tag, str): + location_tag = [location_tag] + else: + location_tag = location_tag + + aerosNodeReq = { + "projectName": project_name, + "equipmentNames": location_tag, + } + + try: + response = await client.post( + f"{AEROS_BASE_URL}/api/UpdateDisParams/GetUpdatedNodeDistributions", + json=aerosNodeReq, + headers={"Content-Type": "application/json"}, + ) + response.raise_for_status() + aerosEquipmentData = response.json() + + if not aerosEquipmentData: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="AerosEquipment not found" + ) + + return aerosEquipmentData + + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) + ) + async def update_node( - *, db_session: DbSession, equipment_nodes: List[EquipmentConfiguration] + *, db_session: DbSession, equipment_nodes: List[dict], project_name: str ): - updateNodeReq = {"projectName": "ParallelNode", "regNodeInputs": equipment_nodes} + updateNodeReq = {"projectName": project_name, "regNodeInputs": equipment_nodes} try: response = await client.post( @@ -145,6 +188,8 @@ async def save_default_equipment(*, db_session: DbSession, project_name: str): nodes = [] + # raise Exception(results) + # save to db for equipment in results: node = AerosEquipment( @@ -162,3 +207,111 @@ async def save_default_equipment(*, db_session: DbSession, project_name: str): raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) + + +async def fetch_equipment_data_concurrent(equipment_item, client: httpx.AsyncClient): + """ + Fetch both MTTR and reliability data for a single equipment concurrently + """ + location_tag = equipment_item["equipmentName"] + + # Run both API calls concurrently for this equipment + mttr_task = get_equipment_mttr(location_tag=location_tag, client=client) + reliability_task = get_equipment_reliability_parameter(location_tag=location_tag, client=client) + + # Wait for both to complete + mttr, reliability_data = await asyncio.gather(mttr_task, reliability_task, return_exceptions=True) + + + # Handle exceptions + if isinstance(mttr, Exception): + log.error(f"MTTR fetch failed for {location_tag}: {mttr}") + mttr = 0 + + if isinstance(reliability_data, Exception): + log.error(f"Reliability fetch failed for {location_tag}: {reliability_data}") + reliability_data = {"distribution": "", "beta": 0, "eta": 0} + + # Update the equipment item + equipment_item["cmDisP1"] = mttr + equipment_item["relDisType"] = "NHPPTTFF" + equipment_item["relDisP1"] = reliability_data["beta"] + equipment_item["relDisP2"] = reliability_data["eta"] + + return equipment_item + + +async def update_equipment_for_simulation(*, db_session: DbSession, project_name: str, schematic_name: str): + log.info("Updating equipment for simulation") + + equipments = Select(AerosEquipment).where( + AerosEquipment.location_tag.isnot(None) + ).join(AerosEquipment.aeros_node).filter(AerosNode.schematic_name == schematic_name) + + equipment_nodes = await db_session.execute(equipments) + reg_nodes = [node.location_tag for node in equipment_nodes.scalars().all()] + + + nodes_data = await get_aeros_equipment_by_location_tag( + location_tag=reg_nodes, project_name=project_name + ) + + reqNodeInputs = [] + + async with httpx.AsyncClient(timeout=300.0) as client: + for eq in nodes_data: + try: + updated_eq = await fetch_equipment_data_concurrent(eq, client) + reqNodeInputs.append(updated_eq) + except Exception as e: + print(f"Error fetching data for {eq['equipmentName']}: {e}") + # Add equipment with default values + eq["cmDisP1"] = 0 + eq["relDisType"] = "" + eq["relDisP1"] = 0 + eq["relDisP2"] = 0 + reqNodeInputs.append(eq) + + await update_node(db_session=db_session, equipment_nodes=reqNodeInputs, project_name=project_name) + log.info("Updated equipment for simulation") + + +# Optimized individual fetch functions +async def get_equipment_mttr(*, location_tag: str, client: httpx.AsyncClient) -> float: + """ + Get MTTR for a single equipment using provided client + """ + mttr_url = f"http://192.168.1.82:8000/reliability/asset/mttr/{location_tag}" + try: + response = await client.get(mttr_url) + if response.status_code == 200: + mttr_data = response.json() + return mttr_data.get("data", {}).get("hours", 0) + else: + log.warning(f"MTTR API returned status {response.status_code} for {location_tag}") + return 0 + except Exception as e: + log.error(f"Error getting MTTR for {location_tag}: {str(e)}") + return 0 + +async def get_equipment_reliability_parameter(*, location_tag: str, client: httpx.AsyncClient): + """ + Get reliability parameters for a single equipment using provided client + """ + reliability_url = f"http://192.168.1.82:8000/reliability/reliability/{location_tag}/current" + try: + response = await client.get(reliability_url) + if response.status_code == 200: + reliability_data = response.json() + data = reliability_data.get("data", {}) + return { + "distribution": data.get("distribution", ""), + "beta": data.get("parameters", 0).get("beta", 0), + "eta": data.get("parameters", 0).get("eta", 0) + } + else: + log.warning(f"Reliability API returned status {response.status_code} for {location_tag}") + return {"distribution": "", "beta": 0, "eta": 0} + except Exception as e: + log.error(f"Error getting reliability parameters for {location_tag}: {str(e)}") + return {"distribution": "", "beta": 0, "eta": 0} diff --git a/src/aeros_project/service.py b/src/aeros_project/service.py index f5c9cea..e459d60 100644 --- a/src/aeros_project/service.py +++ b/src/aeros_project/service.py @@ -64,7 +64,7 @@ async def import_aro_project(*, db_session: DbSession, aeros_project_in: AerosPr # Prepare file for upload files = { - "file": (file.filename.replace(" ", "_"), content, file.content_type or "application/octet-stream") + "file": (file.filename, content, file.content_type or "application/octet-stream") } response = await client.post( @@ -78,6 +78,7 @@ async def import_aro_project(*, db_session: DbSession, aeros_project_in: AerosPr aro_path = upload_result.get("full_path") filename = upload_result.get("stored_filename").replace(".aro", "") + if not aro_path: raise HTTPException( status_code=500, @@ -107,7 +108,7 @@ async def import_aro_project(*, db_session: DbSession, aeros_project_in: AerosPr # If aeros record found, then update it if latest_project: - latest_project.project_name = project_name + latest_project.project_name = filename latest_project.aro_file_path = aro_path else: # else create new aeros record db_session.add(aeros_project) @@ -140,6 +141,12 @@ async def fetch_aro_record(*, db_session: DbSession): return found_record +async def get_project(*, db_session: DbSession): + stmt = select(AerosProject).order_by(desc(AerosProject.updated_at)).limit(1) + result = await db_session.execute(stmt) + found_record = result.scalar_one_or_none() + + return found_record async def _initialize_default_project_data( *, diff --git a/src/aeros_simulation/router.py b/src/aeros_simulation/router.py index 8ea7a22..4e150ef 100644 --- a/src/aeros_simulation/router.py +++ b/src/aeros_simulation/router.py @@ -8,7 +8,8 @@ from src.auth.service import CurrentUser from src.database.core import DbSession from src.database.service import CommonParameters from src.models import StandardResponse - +from src.aeros_equipment.service import update_equipment_for_simulation +from src.aeros_project.service import get_project from .schema import ( SimulationCalcResult, SimulationInput, @@ -55,12 +56,19 @@ async def run_simulations( ) simulation_id = simulation.id + project = await get_project(db_session=db_session) + try: sim_data = simulation_in.model_dump(exclude={"SimulationName"}) sim_data["HubCnnId"] = str(simulation_id) + ##background_tasks.add_task(execute_simulation, db_session=db_session ,simulation_id=simulation_id, sim_data=sim_data) + await update_equipment_for_simulation( + db_session=db_session, project_name=project.project_name, schematic_name=simulation_in.SchematicName + ) + await execute_simulation( db_session=db_session, simulation_id=simulation_id, sim_data=sim_data, is_saved=True ) diff --git a/src/aeros_simulation/service.py b/src/aeros_simulation/service.py index 3f503d6..f10611e 100644 --- a/src/aeros_simulation/service.py +++ b/src/aeros_simulation/service.py @@ -1,7 +1,7 @@ from datetime import datetime from typing import Optional from uuid import UUID - +import logging import httpx from fastapi import HTTPException, status from sqlalchemy import delete, select @@ -11,6 +11,8 @@ from src.config import AEROS_BASE_URL, DEFAULT_PROJECT_NAME from src.database.core import DbSession from src.database.service import CommonParameters, search_filter_sort_paginate +log = logging.getLogger(__name__) + from .model import ( AerosNode, AerosSimulation, @@ -102,6 +104,8 @@ async def execute_simulation( is_saved: bool = False, ): """Execute the actual simulation call""" + log.info("Executing simulation with id: %s", simulation_id) + try: response = await client.post( f"{AEROS_BASE_URL}/api/Simulation/RunSimulation", @@ -122,6 +126,7 @@ async def execute_simulation( db_session=db_session, simulation_id=simulation_id, result=result ) + log.info("Simulation completed with id: %s", simulation_id) return result except Exception as e: @@ -132,6 +137,8 @@ async def execute_simulation( simulation.error = str(e) await db_session.commit() + log.error("Simulation failed with error: %s", str(e)) + raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @@ -260,13 +267,14 @@ async def save_default_simulation_node( ): sim_data = { "projectName": project_name, - "SchematicName": "Boiler", + "SchematicName": "- BTG_PC -", "SimSeed": 1, "SimDuration": 3, "DurationUnit": "UYear", "SimNumRun": 1, } + results = await execute_simulation(db_session=db_session, sim_data=sim_data) nodes = []