from datetime import datetime from typing import Optional from uuid import uuid4, uuid4, UUID import logging import httpx from fastapi import HTTPException, status from sqlalchemy import delete, select, update, and_ from sqlalchemy.orm import selectinload from src.config import AEROS_BASE_URL, DEFAULT_PROJECT_NAME from src.database.core import DbSession from src.database.service import CommonParameters, search_filter_sort_paginate from src.utils import save_to_pastebin import aiohttp import asyncio log = logging.getLogger(__name__) from .model import ( AerosNode, AerosSimulation, AerosSimulationCalcResult, AerosSimulationPlotResult, AerosSchematic ) from src.aeros_equipment.model import AerosEquipment, AerosEquipmentCustomParameterData from src.aeros_equipment.schema import EquipmentWithCustomParameters from .schema import SimulationInput, SimulationRankingParameters from .utils import calculate_eaf client = httpx.AsyncClient(timeout=300.0) active_simulations = {} async def get_all(common: CommonParameters): query = select(AerosSimulation).where(AerosSimulation.status == "completed") results = await search_filter_sort_paginate(model=query, **common) return results async def get_simulation_by_id( *, db_session: DbSession, simulation_id: Optional[UUID] = None, is_completed: bool = False, ): """Get a simulation by id.""" query = select(AerosSimulation) if is_completed: query = query.where(AerosSimulation.status == "completed") if simulation_id: query = query.where(AerosSimulation.id == simulation_id) else: query = query.order_by(AerosSimulation.completed_at.desc()).limit(1) results = await db_session.execute(query) return results.scalar() async def get_simulation_node_by(*, db_session: DbSession, **kwargs): """Get a simulation node by column.""" # Build WHERE conditions from kwargs conditions = [] for key, value in kwargs.items(): if hasattr(AerosNode, key): conditions.append(getattr(AerosNode, key) == value) if not conditions: raise ValueError("No valid column conditions provided") query = select(AerosNode).where(*conditions) result = await db_session.execute(query) return result.scalar() async def get_or_save_node(*, db_session: DbSession, node_data: dict, type: str = "calc"): """Get a simulation node by column.""" node = await get_simulation_node_by( db_session=db_session, node_name=node_data["nodeName"] ) raise Exception(node_data) if not node: print("Creating new node") if type == "calc": print("Creating calc node") node = AerosNode( node_name=node_data["nodeName"], node_type=node_data["nodeType"], node_id=convert_id_to_none_if_negative(node_data["nodeId"]), original_node_id=convert_id_to_none_if_negative(node_data["originalNodeId"]), structure_name=node_data["structureName"], schematic_name=node_data["schematicName"], schematic_id=convert_id_to_none_if_negative(node_data["schematicId"]), original_schematic_id=convert_id_to_none_if_negative( node_data["originalSchematicId"] ), ref_schematic_id=convert_id_to_none_if_negative(node_data["refSchematicId"]), orignal_ref_schematic_id=convert_id_to_none_if_negative( node_data["orinalRefSchematic"] ), ) else: print("Creating plot node") nodeId = convert_id_to_none_if_negative(node_data["nodeId"]) nodeName = node_data["nodeName"] node = AerosNode( node_name=nodeName, node_type=node_data["nodeType"], node_id=nodeId, original_node_id=convert_id_to_none_if_negative(node_data["originalNodeId"]), schematic_name=node_data["schematicName"], schematic_id=convert_id_to_none_if_negative(node_data["parentSchematicId"]), original_schematic_id=convert_id_to_none_if_negative( node_data["originalParentSchematicId"] ), ref_schematic_id=convert_id_to_none_if_negative(node_data["targetSchematicId"]), orignal_ref_schematic_id=convert_id_to_none_if_negative( node_data["originalTargetSchematicId"] ), ) db_session.add(node) await db_session.commit() return node async def execute_simulation(*, db_session: DbSession, simulation_id: Optional[UUID] = None, sim_data: dict, is_saved: bool = False, eq_update: dict = None): """Execute the actual simulation call""" if eq_update is None: eq_update = {} print("Executing simulation with id: %s", simulation_id, sim_data["SchematicName"]) try: response = await client.post( f"{AEROS_BASE_URL}/api/Simulation/RunSimulation", json=sim_data, headers={"Content-Type": "application/json"}, ) response.raise_for_status() result = response.json() if is_saved: simulation = await get_simulation_by_id( db_session=db_session, simulation_id=simulation_id ) simulation.status = "proccessing" simulation.result = result await db_session.commit() await save_simulation_result( db_session=db_session, simulation_id=simulation_id, result=result, schematic_name=sim_data["SchematicName"],eq_update=eq_update ) print("Simulation completed with id: %s", simulation_id) return result except Exception as e: simulation = await get_simulation_by_id( db_session=db_session, simulation_id=simulation_id ) simulation.status = "failed" simulation.error = str(e) await db_session.commit() log.error("Simulation failed with error: %s", str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) from e async def get_all_aeros_node(*, db_session: DbSession, schematic_name: Optional[str] = None): query = select(AerosNode) if schematic_name: aeros_schematic = await get_aeros_schematic_by_name(db_session=db_session, schematic_name=schematic_name) if not aeros_schematic: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Schematic not found") query = query.where(AerosNode.aeros_schematic_id == aeros_schematic.id) results = await db_session.execute(query) return results.scalars().all() async def save_simulation_result( *, db_session: DbSession, simulation_id: UUID, result: dict, schematic_name: str, eq_update: dict ): print("Saving simulation result") """Save the simulation result.""" calc_result = result["nodeResultOuts"] plot_result = result["plotNodeOuts"] """Save the simulation result""" avaiable_nodes = { f"{node.node_type}:{node.node_name}": node for node in await get_all_aeros_node(db_session=db_session, schematic_name=schematic_name) } calc_objects = [] plot_objects = [] try: for result in calc_result: node_type = "RegularNode" if result["nodeType"] == "RegularNode" else "SchematicNode" node = avaiable_nodes.get(f"{node_type}:{result['nodeName']}", None) eq_reliability = eq_update.get(result["nodeName"], { "eta": 0, "beta": 0, "mttr": 0, "parameters": {} }) if not node: if result["nodeType"] != "RegularNode" and result["nodeType"] != "Schematic": continue node = await get_or_save_node( db_session=db_session, node_data=result, type="calc" ) eaf, derating_hours = calculate_eaf( available_hours=result["totalUpTime"], period_hours=result["totalUpTime"] + result["totalDowntime"], actual_production=result["production"], ideal_production=result["idealProduction"] ) efor = (result["totalDowntime"] / (result["totalDowntime"] + result["totalUpTime"]))*100 if (result["totalDowntime"] + result["totalUpTime"]) > 0 else 0 calc_result = AerosSimulationCalcResult( aeros_simulation_id=simulation_id, aeros_node_id=node.id, total_downtime=result["totalDowntime"], total_uptime=result["totalUpTime"], num_events=result["numEvents"], production=result["production"], production_std=result["productionStd"], ideal_production=result["idealProduction"], availability=result["availability"], efficiency=result["efficiency"], effective_loss=result["effectiveLoss"], num_cm=result["numCM"], cm_waiting_time=result["cmWaitingTime"], total_cm_downtime=result["totalCMDowntime"], num_pm=result["numPM"], total_pm_downtime=result["totalPMDowntime"], num_ip=result["numIP"], total_ip_downtime=result["totalIPDowntime"], num_oh=result["numOH"], total_oh_downtime=result["totalOHDowntime"], t_wait_for_crew=result["tWaitForCrew"], t_wait_for_spare=result["tWaitForSpare"], duration_at_full=result["durationAtFull"], duration_above_hh=result["durationAboveHH"], duration_above_h=result["durationAboveH"], duration_below_l=result["durationBelowL"], duration_below_ll=result["durationBelowLL"], duration_at_empty=result["durationAtEmpty"], stg_input=result["stgInput"], stg_output=result["stgOutput"], average_level=result["averageLevel"], potential_production=result["potentialProduction"], eaf=eaf, efor=efor, derating_hours=derating_hours, beta=eq_reliability["beta"] if node_type == "RegularNode" else None, eta=eq_reliability["eta"] if node_type == "RegularNode" else None, mttr=eq_reliability["mttr"] if node_type == "RegularNode" else None, parameters=eq_reliability["parameters"] if node_type == "RegularNode" else None ) calc_objects.append(calc_result) for result in plot_result: node_type = "RegularNode" if result["nodeType"] == "RegularNode" else "SchematicNode" node = avaiable_nodes.get(f"{node_type}:{result['nodeName']}", None) if not node: if result["nodeType"] != "RegularNode" and result["nodeType"] != "Schematic": continue node = await get_or_save_node( db_session=db_session, node_data=result, type="plot" ) plot_result = AerosSimulationPlotResult( aeros_simulation_id=simulation_id, aeros_node_id=node.id, max_flow_rate=result["maxFlowrate"], storage_capacity=result["storageCapacity"], point_availabilities=result["pointAvailabilities"], point_flowrates=result["pointFlowrates"], timestamp_outs=result["timeStampOuts"], ) plot_objects.append(plot_result) except Exception as e: simulation = await get_simulation_by_id( db_session=db_session, simulation_id=simulation_id ) simulation.status = "failed" simulation.result = str(e) await db_session.commit() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) db_session.add_all(calc_objects) db_session.add_all(plot_objects) simulation = await get_simulation_by_id( db_session=db_session, simulation_id=simulation_id ) simulation.status = "completed" simulation.completed_at = datetime.now() await db_session.commit() return async def process_single_schematic(*, db_session: DbSession, sim_data: dict, schematic): """Process a single schematic simulation and return the nodes""" try: # Execute simulation for this schematic results = await execute_simulation(db_session=db_session, sim_data=sim_data) # Create main schematic node mainSchematicId = uuid4() mainSchematic = AerosNode( id=mainSchematicId, node_name=schematic.schematic_name, schematic_name=schematic.schematic_name, schematic_id=None, node_type="SchematicNode", aeros_schematic_id=schematic.id, structure_name=schematic.schematic_name ) # Process simulation results recursively nodes = await save_recusive_simulation_result_node( db_session=db_session, data=results, schematic_name=mainSchematic.node_name, schematic_id=mainSchematicId, aeros_schematic_id=schematic.id ) nodes.append(mainSchematic) return nodes except Exception as e: print(f"Error processing schematic {schematic.schematic_name}: {e}") raise # Re-raise to be caught by asyncio.gather async def save_recusive_simulation_result_node(*, db_session: DbSession, data, schematic_name: str, aeros_schematic_id ,schematic_id: Optional[UUID] = None): ## Get All schematic #doing multiple simulation with all schematic #1 Record schmatic ID from master schematic, ex - TJB - Unit 3 - = 1 #2 Get The highest parent from Plot data using nodeName == schematicName #3 save the highest parent, add master schematic ID, get highest parent_id, # continue looping through all plot data, check if it regular node and schemmaticName = highest parent schematic ID, save # If schematicName = Parent schematic name, but not regular node, that mean that node is schematic and should have children # search for children schematic and save them plotResult = data["plotNodeOuts"] structure_names = {result["nodeName"]:result["structureName"] for result in data["nodeResultOuts"]} results = [] for result in plotResult: if result["schematicName"] == schematic_name and result["nodeType"] == "RegularNode": node = AerosNode( node_name=result["nodeName"], schematic_id=schematic_id, node_type="RegularNode", schematic_name=schematic_name, aeros_schematic_id=aeros_schematic_id, structure_name=structure_names.get(result["nodeName"]) ) results.append(node) elif result["schematicName"] == schematic_name and result["nodeType"] == "SubSchematic": schematicId = uuid4() schematic = AerosNode( id=schematicId, node_name=result["nodeName"], schematic_name=schematic_name, schematic_id=schematic_id, node_type="SchematicNode", aeros_schematic_id=aeros_schematic_id, structure_name=structure_names.get(result["nodeName"]) ) results.append(schematic) res = await save_recusive_simulation_result_node(db_session=db_session, data=data, schematic_name=result["nodeName"], schematic_id=schematicId, aeros_schematic_id=aeros_schematic_id) results.extend(res) else: continue return results async def get_aeros_schematic_by_name(*, db_session: DbSession, schematic_name: str): query = select(AerosSchematic).where(AerosSchematic.schematic_name == schematic_name) results = await db_session.execute(query) return results.scalar_one_or_none() async def save_default_simulation_node( *, db_session: DbSession, project_name: str = "trialapi" ): tasks = [] all_results = [] # Get all schematic schematics = await get_all_schematic_aeros(db_session=db_session) for schematic in schematics: sim_data = { "projectName": project_name, "SchematicName": schematic.schematic_name, "SimSeed": 1, "SimDuration": 1, "DurationUnit": "UMinute", "SimNumRun": 1, } # Create a task for each simulation results = await process_single_schematic( db_session=db_session, sim_data=sim_data, schematic=schematic ) all_results.extend(results) # all_results_lists = await asyncio.gather(*tasks, return_exceptions=True) # for i, result in enumerate(all_results_lists): # if isinstance(result, Exception): # print(f"Simulation failed for schematic {schematics[i].schematic_name}: {result}") # # You might want to handle this differently based on your requirements # continue # all_results.extend(result) # # delete old data await db_session.execute(delete(AerosNode)) db_session.add_all(all_results) await db_session.commit() def convert_id_to_none_if_negative(value): """Convert ID to None if it's below 0, otherwise return the value.""" return None if value < 0 else value async def create_simulation(*, db_session: DbSession, simulation_in: SimulationInput): """Create a new simulation.""" input = simulation_in.model_dump(exclude={"SimulationName"}) active_simulations = { "status": "running", "started_at": datetime.now(), "simulation_name": simulation_in.SimulationName, "schematic_name": "- TJB - Unit 3 -", } simulation = AerosSimulation(**active_simulations) db_session.add(simulation) await db_session.commit() return simulation async def get_simulation_with_calc_result( *, db_session: DbSession, simulation_id: UUID, aeros_node_id: Optional[UUID] = None, schematic_name: Optional[str] = None, node_type: Optional[str] = None ): """Get a simulation by id.""" query = (select(AerosSimulationCalcResult).filter( AerosSimulationCalcResult.aeros_simulation_id == simulation_id)) if schematic_name: if schematic_name == "WTP": query = query.join( AerosNode, AerosNode.id == AerosSimulationCalcResult.aeros_node_id ).filter(AerosNode.structure_name.contains(schematic_name)) else: query = query.join( AerosNode, AerosNode.id == AerosSimulationCalcResult.aeros_node_id ).filter(AerosNode.structure_name.contains(schematic_name)) if node_type: query = query.join( AerosNode, AerosNode.id == AerosSimulationCalcResult.aeros_node_id ).filter(AerosNode.node_type == node_type) query = query.options( selectinload(AerosSimulationCalcResult.aeros_node).options( selectinload(AerosNode.equipment) )) simulation = await db_session.execute(query) return simulation.scalars().all() async def get_plant_calc_result( *, db_session, simulation_id: UUID ): query = (select(AerosSimulationCalcResult).filter( AerosSimulationCalcResult.aeros_simulation_id == simulation_id, ).join(AerosNode, AerosNode.id == AerosSimulationCalcResult.aeros_node_id) .filter(AerosNode.node_name == "- TJB - Unit 3 -")) query = query.options( selectinload(AerosSimulationCalcResult.aeros_node).options( selectinload(AerosNode.equipment) )) calc = await db_session.execute(query) return calc.scalar_one_or_none() async def get_result_ranking(*, db_session: DbSession, simulation_id: UUID): query = select(AerosEquipment, AerosSimulationCalcResult.eaf).join(AerosNode, AerosNode.node_name == AerosEquipment.node_name).join(AerosSimulationCalcResult, AerosSimulationCalcResult.aeros_node_id == AerosNode.id) query = query.filter( and_( AerosSimulationCalcResult.aeros_simulation_id == simulation_id, AerosNode.node_type == "RegularNode", AerosEquipment.custom_parameters.any() ) ) query = query.order_by(AerosSimulationCalcResult.eaf.desc()).limit(10) query = query.options( selectinload(AerosEquipment.custom_parameters)).options( selectinload(AerosEquipment.master_equipment) ) result = await db_session.execute(query) data = [ SimulationRankingParameters( location_tag=equipment.location_tag, master_equipment=equipment.master_equipment, custom_parameters=equipment.custom_parameters, eaf=eaf ) for equipment, eaf in result ] return data async def get_simulation_with_plot_result( *, db_session: DbSession, simulation_id: UUID, node_type: Optional[str] = None ): """Get a simulation by id.""" query = ( select(AerosSimulation) .where(AerosSimulation.id == simulation_id) .options( selectinload(AerosSimulation.plot_results).options( selectinload(AerosSimulationPlotResult.aeros_node) ) ) ) if node_type: query = query.join( AerosNode, AerosNode.id == AerosSimulation.plot_results.aeros_node_id ).filter(AerosNode.node_type == node_type) simulation = await db_session.execute(query) return simulation.scalar() async def get_calc_result_by( *, db_session: DbSession, simulation_id: UUID, node_name: Optional[str] = None ): """Get a simulation node by column.""" # Build WHERE conditions from kwargs query = select(AerosSimulationCalcResult).where( AerosSimulationCalcResult.aeros_simulation_id == simulation_id ) if node_name: query = query.join(AerosSimulationCalcResult.aeros_node).filter(AerosNode.node_name == node_name) result = await db_session.execute(query) return result.scalar() async def get_custom_parameters(*, db_session: DbSession, simulation_id: UUID): """Get a simulation node by column.""" # Build WHERE conditions from kwargs query = select(AerosSimulationCalcResult).where( AerosSimulationCalcResult.aeros_simulation_id == simulation_id ) query = query.join( AerosNode, AerosNode.id == AerosSimulationCalcResult.aeros_node_id ) query = query.where(AerosNode.node_type == "RegularNode") query = ( query.order_by(AerosSimulationCalcResult.eaf.desc()) .limit(20) .options(selectinload(AerosSimulationCalcResult.aeros_node)) ) result = await db_session.execute(query) return result.scalars().all() async def get_regular_nodes_by_schematic(*, db_session: DbSession, schematic_name: str) -> set[UUID]: """ Get all regular node IDs that are descendants of a given schematic (system or subsystem). Uses recursive CTE to traverse the hierarchy. """ # Using recursive CTE to find all descendants # First, find the root node(s) with the given schematic name root_cte = ( select(AerosNode.id, AerosNode.schematic_id, AerosNode.ref_schematic_id,AerosNode.node_type, AerosNode.node_name) .where(AerosNode.node_name == schematic_name) .cte(name="hierarchy", recursive=True) ) # Recursive part: find all children children_cte = ( select(AerosNode.id, AerosNode.schematic_id,AerosNode.ref_schematic_id ,AerosNode.node_type, AerosNode.node_name) .select_from( AerosNode.join(root_cte, AerosNode.schematic_id == root_cte.c.ref_schematic_id) ) ) # Union the base case and recursive case hierarchy_cte = root_cte.union_all(children_cte) # Final query to get only regular nodes from the hierarchy query = ( select(hierarchy_cte.c.id) .where(hierarchy_cte.c.node_type == "RegularNode") # Adjust this condition based on your node_type values ) result = await db_session.execute(query) return set(result.scalars().all()) async def get_all_schematic_aeros(*, db_session: DbSession): query = select(AerosSchematic) results = await db_session.execute(query) return results.scalars().all() async def update_simulation(*, db_session: DbSession, simulation_id: UUID, data: dict): query = update(AerosSimulation).where(AerosSimulation.id == simulation_id).values(**data) await db_session.execute(query) await db_session.commit()