from collections import defaultdict from datetime import datetime from typing import List, Optional from uuid import UUID from sqlalchemy.orm import selectinload from fastapi import APIRouter, BackgroundTasks, HTTPException, background, status, Query from sqlalchemy import select, text from temporalio.client import Client from src.aeros_contribution.service import update_contribution_bulk_mappings from src.aeros_equipment.model import AerosEquipment from src.aeros_simulation.model import AerosSimulationCalcResult, EafContribution, AerosNode from src.auth.service import CurrentUser from src.config import TEMPORAL_URL from src.database.core import CollectorDbSession, DbSession from src.database.service import CommonParameters from src.models import StandardResponse from src.aeros_equipment.service import update_equipment_for_simulation from src.aeros_project.service import get_project from temporal.workflow import SimulationWorkflow from .schema import ( SimulationCalcResult, SimulationInput, SimulationPagination, SimulationPlot, SimulationPlotResult, SimulationCalc, SimulationData, SimulationRankingParameters ) from .service import ( create_simulation, # execute_simulation, get_all, get_custom_parameters, get_default_simulation, get_simulation_by_id, get_simulation_with_calc_result, get_simulation_with_plot_result, update_simulation, get_result_ranking, get_plant_calc_result ) from .simulation_save_service import calculate_plant_eaf, execute_simulation from src.aeros_equipment.schema import EquipmentWithCustomParameters router = APIRouter() active_simulations = {} @router.get("", response_model=StandardResponse[SimulationPagination]) async def get_all_simulation(db_session: DbSession, current_user:CurrentUser,common: CommonParameters, status: Optional[str] = Query(None)): """Get all simulation.""" results = await get_all(common, status, current_user) return { "data": results, "status": "success", "message": "Simulations result retrieved successfully", } @router.get("/{simulation_id}", response_model=StandardResponse[SimulationData]) async def get_simulation(db_session: DbSession, simulation_id): """Get simulation.""" result = await get_simulation_by_id(db_session=db_session, simulation_id=simulation_id) return { "data": result, "status": "success", "message": "Simulation result retrieved successfully", } @router.post("/run", response_model=StandardResponse[str]) async def run_simulations( db_session: DbSession, simulation_in: SimulationInput, current_user:CurrentUser ): """RUN Simulation""" temporal_client = await Client.connect(TEMPORAL_URL) simulation = await create_simulation( db_session=db_session, simulation_in=simulation_in, current_user=current_user ) simulation_id = simulation.id project = await get_project(db_session=db_session) sim_data = simulation_in.model_dump() sim_data["HubCnnId"] = str(simulation_id) sim_data["projectName"] = project.project_name # # Prepare async background task # async def run_full_simulation(): # try: # results = await update_equipment_for_simulation( # db_session=db_session, # project_name=project.project_name, # schematic_name=simulation_in.SchematicName, # overhaul_duration=simulation_in.OverhaulDuration, # overhaul_interval=simulation_in.OverhaulInterval, # offset=simulation_in.OffSet, # custom_input=simulation_in.CustomInput, # ) # await execute_simulation( # db_session=db_session, # simulation_id=simulation_id, # sim_data=sim_data, # is_saved=True, # eq_update=results, # ) # await calculate_plant_eaf( # db_session=db_session, # simulation_id=simulation_id, # is_default=simulation_in.IsDefault, # konkin_offset=simulation_in.Konkin_offset, # ) # await update_contribution_bulk_mappings( # db_session=db_session, simulation_id=simulation_id # ) # except Exception as e: # # TODO: log error into DB or logger # print(f"Simulation {simulation_id} failed: {e}") # # Add to background # background_tasks.add_task(run_full_simulation) handle = await temporal_client.start_workflow( SimulationWorkflow.run, sim_data, id=f"simulation-{simulation_id}", task_queue="simulation-task-queue", ) return { "data": str(simulation_id), "status": "success", "message": "Simulation started successfully", } @router.get( "/result/calc/{simulation_id}", response_model=StandardResponse[List[SimulationCalc]], ) async def get_simulation_result(db_session: DbSession, simulation_id, schematic_name: Optional[str] = Query(None), node_type = Query(None, alias="nodetype")): """Get simulation result.""" if simulation_id == 'default': simulation = await get_default_simulation(db_session=db_session) simulation_id = simulation.id simulation_result = await get_simulation_with_calc_result( db_session=db_session, simulation_id=simulation_id, schematic_name=schematic_name, node_type=node_type ) return { "data": simulation_result, "status": "success", "message": "Simulation result retrieved successfully", } @router.get( "/result/calc/{simulation_id}/plant", response_model=StandardResponse[SimulationCalc], ) async def get_simulation_result_plant(db_session: DbSession, simulation_id): """Get simulation result.""" if simulation_id == 'default': simulation = await get_default_simulation(db_session=db_session) simulation_id = simulation.id simulation_result = await get_plant_calc_result( db_session=db_session, simulation_id=simulation_id ) return { "data": simulation_result, "status": "success", "message": "Simulation result retrieved successfully", } @router.get( "/result/plot/{simulation_id}", response_model=StandardResponse[List[SimulationPlot]], ) async def get_simulation_result_plot(db_session: DbSession, simulation_id): """Get simulation result.""" if simulation_id == 'default': simulation = await get_default_simulation(db_session=db_session) simulation_id = simulation.id simulation_result = await get_simulation_with_plot_result( db_session=db_session, simulation_id=simulation_id ) return { "data": simulation_result, "status": "success", "message": "Simulation result retrieved successfully", } @router.get( "/result/plot/{simulation_id}/{node_id}", response_model=StandardResponse[SimulationPlot], ) async def get_simulation_result_plot_per_node(db_session: DbSession, simulation_id, node_id, use_location_tag: Optional[int] = Query(0)): """Get simulation result.""" if simulation_id == 'default': simulation = await get_default_simulation(db_session=db_session) simulation_id = simulation.id simulation_result = await get_simulation_with_plot_result( db_session=db_session, simulation_id=simulation_id, node_id=node_id, use_location_tag=use_location_tag ) return { "data": simulation_result, "status": "success", "message": "Simulation result retrieved successfully", } @router.get("/result/ranking/{simulation_id}", response_model=StandardResponse[List[SimulationRankingParameters]]) async def get_simulation_result_ranking(db_session: DbSession, simulation_id, limit:int = Query(None)): """Get simulation result.""" if simulation_id == 'default': simulation = await get_default_simulation(db_session=db_session) simulation_id = simulation.id simulation_result = await get_result_ranking(db_session=db_session, simulation_id=simulation_id, limit=limit) return { "data": simulation_result, "status": "success", "message": "Simulation result retrieved successfully", } @router.get("/result/critical/{simulation_id}", response_model=StandardResponse[List[SimulationCalc]]) async def get_critical_equipment(db_session:DbSession, simulation_id: str): # Step 1: Get all failure events for this simulation if simulation_id == 'default': simulation = await get_default_simulation(db_session=db_session) simulation_id = simulation.id failure_query = text(""" SELECT DISTINCT (elem ->> 'currentEvent') AS jenis, (elem ->> 'cumulativeTime')::numeric AS cumulative_time FROM rbd_tr_aeros_simulation_plot_result AS a JOIN public.rbd_ms_aeros_node AS b ON a.aeros_node_id = b.id JOIN LATERAL jsonb_array_elements(a.timestamp_outs) AS elem ON TRUE WHERE a.aeros_simulation_id = :simulation_id AND b.node_name = '- TJB - Unit 3 -' AND (elem ->> 'currentEQStatus') = 'OoS' AND (elem ->> 'currentEvent') != 'ON_OH' ORDER BY cumulative_time; """) query = await db_session.execute(failure_query, { "simulation_id": simulation_id, }) failures = query.fetchall() results = [] # Step 2: For each failure, find which equipment caused it for fail in failures: cumulative_time = fail.cumulative_time jenis = fail.jenis equipment_query = text(""" SELECT b.id FROM rbd_tr_aeros_simulation_plot_result AS a JOIN public.rbd_ms_aeros_node AS b ON a.aeros_node_id = b.id WHERE EXISTS ( SELECT 1 FROM jsonb_array_elements(a.timestamp_outs) AS elem WHERE (elem ->> 'currentEQStatus') = 'OoS' AND ABS((elem ->> 'cumulativeTime')::numeric - 5425.848679666753) <= 5 ) AND a.aeros_simulation_id = :simulation_id AND b.node_type = 'RegularNode'; """) equipment = (await db_session.execute(equipment_query, { "simulation_id": simulation_id, "cumulative_time": cumulative_time })).fetchall() equipment_list = [eq.id for eq in equipment] results.extend(equipment_list) query = (select(AerosSimulationCalcResult).filter( AerosSimulationCalcResult.aeros_simulation_id == simulation_id)).filter(AerosSimulationCalcResult.aeros_node_id.in_(results)) query = query.options( selectinload(AerosSimulationCalcResult.aeros_node).options( selectinload(AerosNode.equipment) )) data = await db_session.execute(query) equipments = data.scalars().all() return { "data": equipments, "status": "success", "message": "Success", } @router.get("/custom_parameters", response_model=StandardResponse[list]) async def get_custom_parameters_controller(db_session: DbSession): """Get simulation result.""" latest_simulation = await get_simulation_by_id( db_session=db_session, simulation_id=None, is_completed=True ) custom_parameters = await get_custom_parameters( db_session=db_session, simulation_id=latest_simulation.id ) results = [{node.aeros_node.node_name: node.eaf} for node in custom_parameters] return { "data": results, "status": "success", "message": "Simulation result retrieved successfully", } @router.get("/metrics/{simulation_id}", response_model=StandardResponse[list]) async def get_custom_parameters_controller(db_session: DbSession, simulation_id:UUID): """Get simulation result.""" results = await calculate_plant_eaf(db_session, simulation_id, 0, 1200, 15000, 0) return { "data": results, "status": "success", "message": "Simulation result retrieved successfully", } airflow_router = APIRouter() @airflow_router.post("/calculate_eaf_contribution", response_model=StandardResponse[dict]) async def calculate_contribution( db_session: DbSession, simulation_in: SimulationInput, batch_num: int = Query(0, ge=0) ): """RUN Simulation""" #simulation_id = "2e0755bf-8cce-4743-9659-8d9920d556e7" project = await get_project(db_session=db_session) main_edh = 1.5000000000000966 main_efficiency_uptime = 697.5303030303029 - 1.5000000000000966 try: contribution_results = defaultdict() simulations_eq = select(AerosEquipment) eaf_contributions_data = [] eqs = (await db_session.execute(simulations_eq)).scalars().all() batch_size = 20 start_index = batch_num * batch_size end_index = start_index + batch_size if start_index >= len(eqs): return { "data": contribution_results, "status": "success", "message": "No more equipment to process", } if end_index > len(eqs): end_index = len(eqs) eqs = eqs[start_index:end_index] for eq in eqs: simulation = await create_simulation( db_session=db_session, simulation_in=simulation_in ) sim_data = simulation_in.model_dump(exclude={"SimulationName"}) sim_data["HubCnnId"] = str(simulation.id) sim_data["projectName"] = project.project_name custom_input = { eq.node_name: { "mttr": 721, "failure_rate": 0.01, } } results = await update_equipment_for_simulation( db_session=db_session, project_name=project.project_name, schematic_name=simulation_in.SchematicName, custom_input=custom_input ) # await update_simulation( # db_session=db_session, simulation_id=simulation_id, data={"reliability": results} # ) await execute_simulation( db_session=db_session, simulation_id=simulation.id, sim_data=sim_data, is_saved=True, eq_update=results ) eaf, edh, efficiency_uptime = await calculate_plant_eaf(db_session=db_session, simulation_id=simulation.id) eaf_contribution = (main_efficiency_uptime - efficiency_uptime)/main_efficiency_uptime if main_efficiency_uptime else 0 contribution_results[eq.node_name] = { "eaf": eaf, "edh": edh, "efficiency_uptime": efficiency_uptime, "eaf_contribution": eaf_contribution } eaf_conf = EafContribution( location_tag=eq.node_name, eaf_contribution=eaf_contribution, efficiency_uptime=efficiency_uptime, edh=edh, ) eaf_contributions_data.append(eaf_conf) await db_session.delete(simulation) await db_session.commit() db_session.add_all(eaf_contributions_data) await db_session.commit() return { "data": contribution_results, "status": "success", "message": "Simulation created successfully", } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) )