from collections import defaultdict from datetime import datetime from typing import List, Optional from uuid import UUID from sqlalchemy.orm import selectinload from fastapi import APIRouter, BackgroundTasks, HTTPException, background, status, Query from sqlalchemy import select, text from temporalio.client import Client from src.aeros_contribution.service import update_contribution_bulk_mappings from src.aeros_equipment.model import AerosEquipment from src.aeros_simulation.model import AerosSimulationCalcResult, EafContribution, AerosNode from src.aeros_simulation.utils import date_to_utc, hours_between, year_window_utc from src.auth.service import CurrentUser from src.config import TEMPORAL_URL from src.database.core import CollectorDbSession, DbSession from src.database.service import CommonParameters from src.models import StandardResponse from src.aeros_equipment.service import update_equipment_for_simulation from src.aeros_project.service import get_project from temporal.workflow import SimulationWorkflow from .schema import ( AhmMetricInput, SimulationCalcResult, SimulationInput, SimulationPagination, SimulationPlot, SimulationPlotResult, SimulationCalc, SimulationData, SimulationRankingParameters, YearlySimulationInput ) from .service import ( create_simulation, # execute_simulation, get_all, get_custom_parameters, get_default_simulation, get_simulation_by_id, get_simulation_with_calc_result, get_simulation_with_plot_result, update_simulation, get_result_ranking, get_plant_calc_result ) from .simulation_save_service import calculate_plant_eaf, execute_simulation from src.aeros_equipment.schema import EquipmentWithCustomParameters router = APIRouter() active_simulations = {} @router.get("", response_model=StandardResponse[SimulationPagination]) async def get_all_simulation(db_session: DbSession, current_user:CurrentUser,common: CommonParameters, status: Optional[str] = Query(None)): """Get all simulation.""" results = await get_all(common, status, current_user) return { "data": results, "status": "success", "message": "Simulations result retrieved successfully", } @router.get("/{simulation_id}", response_model=StandardResponse[SimulationData]) async def get_simulation(db_session: DbSession, simulation_id): """Get simulation.""" result = await get_simulation_by_id(db_session=db_session, simulation_id=simulation_id) return { "data": result, "status": "success", "message": "Simulation result retrieved successfully", } @router.post("/run", response_model=StandardResponse[str]) async def run_simulations( db_session: DbSession, simulation_in: SimulationInput, current_user:CurrentUser ): """RUN Simulation""" temporal_client = await Client.connect(TEMPORAL_URL) simulation = await create_simulation( db_session=db_session, simulation_in=simulation_in, current_user=current_user ) simulation_id = simulation.id project = await get_project(db_session=db_session) sim_data = simulation_in.model_dump() sim_data["HubCnnId"] = str(simulation_id) sim_data["projectName"] = project.project_name handle = await temporal_client.start_workflow( SimulationWorkflow.run, sim_data, id=f"simulation-{simulation_id}", task_queue="simulation-task-queue", ) return { "data": str(simulation_id), "status": "success", "message": "Simulation started successfully", } @router.post("/run/yearly", response_model=StandardResponse[str]) async def run_yearly_simulation( db_session: DbSession, yearly_in: YearlySimulationInput, current_user: CurrentUser ): year = yearly_in.year sim_start, sim_end = year_window_utc(year) sim_duration_hours = hours_between(sim_start, sim_end) # Fetch last overhaul data last_oh_query = text(""" SELECT start_date, end_date, duration_oh FROM public.oh_ms_overhaul WHERE end_date <= :sim_start ORDER BY end_date DESC LIMIT 1; """) next_oh_query = text(""" SELECT start_date, end_date, duration_oh FROM public.oh_ms_overhaul WHERE start_date >= :sim_start ORDER BY start_date ASC LIMIT 1; """) last_overhaul = (await db_session.execute( last_oh_query, {"sim_start": sim_start.date()} )).mappings().first() next_overhaul = (await db_session.execute( next_oh_query, {"sim_start": sim_start.date()} )).mappings().first() offset_hours = (365 * 24) // 2 if year % 2 == 0 else ((365 * 24) // 2) + 8760; if last_overhaul: last_oh_dt = date_to_utc(last_overhaul["end_date"]) offset_hours = max( int((sim_start - last_oh_dt).total_seconds() // 3600), 0 ) if offset_hours > 17520: offset_hours -= 17520 overhaul_interval = 8760 * 2 overhaul_duration = 1200 if next_overhaul: next_oh_start = date_to_utc(next_overhaul["start_date"]) next_oh_duration_hours = next_overhaul["duration_oh"] * 24 overhaul_interval = int( (next_oh_start - last_oh_dt).total_seconds() // 3600 ) overhaul_duration = next_oh_duration_hours simulation_input = SimulationInput( SimDuration=sim_duration_hours, DurationUnit="UHour", OffSet=offset_hours, OverhaulInterval=overhaul_interval, MaintenanceOutages=0, SimulationName=f"Simulation {year} LCCA", IsDefault=False, OverhaulDuration=overhaul_duration, AhmJobId=None ) temporal_client = await Client.connect(TEMPORAL_URL) simulation = await create_simulation( db_session=db_session, simulation_in=simulation_input, current_user=current_user ) project = await get_project(db_session=db_session) sim_data = simulation_input.model_dump() sim_data["HubCnnId"] = str(simulation.id) sim_data["SimulationStart"] = sim_start.isoformat() sim_data["SimulationEnd"] = sim_end.isoformat() sim_data["HubCnnId"] = str(simulation.id) sim_data["projectName"] = project.project_name await temporal_client.start_workflow( SimulationWorkflow.run, sim_data, id=f"simulation-{simulation.id}", task_queue="simulation-task-queue", ) return { "data": str(simulation.id), "status": "success", "message": f"Yearly simulation {year} started (UHour mode)", } @router.get( "/result/calc/{simulation_id}", response_model=StandardResponse[List[SimulationCalc]], ) async def get_simulation_result(db_session: DbSession, simulation_id, schematic_name: Optional[str] = Query(None), node_type = Query(None, alias="nodetype")): """Get simulation result.""" if simulation_id == 'default': simulation = await get_default_simulation(db_session=db_session) simulation_id = simulation.id simulation_result = await get_simulation_with_calc_result( db_session=db_session, simulation_id=simulation_id, schematic_name=schematic_name, node_type=node_type ) return { "data": simulation_result, "status": "success", "message": "Simulation result retrieved successfully", } @router.get( "/result/calc/{simulation_id}/plant", response_model=StandardResponse[SimulationCalc], ) async def get_simulation_result_plant(db_session: DbSession, simulation_id): """Get simulation result.""" if simulation_id == 'default': simulation = await get_default_simulation(db_session=db_session) simulation_id = simulation.id simulation_result = await get_plant_calc_result( db_session=db_session, simulation_id=simulation_id ) return { "data": simulation_result, "status": "success", "message": "Simulation result retrieved successfully", } @router.get( "/result/plot/{simulation_id}", response_model=StandardResponse[List[SimulationPlot]], ) async def get_simulation_result_plot(db_session: DbSession, simulation_id): """Get simulation result.""" if simulation_id == 'default': simulation = await get_default_simulation(db_session=db_session) simulation_id = simulation.id simulation_result = await get_simulation_with_plot_result( db_session=db_session, simulation_id=simulation_id ) return { "data": simulation_result, "status": "success", "message": "Simulation result retrieved successfully", } @router.get( "/result/plot/{simulation_id}/{node_id}", response_model=StandardResponse[SimulationPlot], ) async def get_simulation_result_plot_per_node(db_session: DbSession, simulation_id, node_id, use_location_tag: Optional[int] = Query(0)): """Get simulation result.""" if simulation_id == 'default': simulation = await get_default_simulation(db_session=db_session) simulation_id = simulation.id simulation_result = await get_simulation_with_plot_result( db_session=db_session, simulation_id=simulation_id, node_id=node_id, use_location_tag=use_location_tag ) return { "data": simulation_result, "status": "success", "message": "Simulation result retrieved successfully", } @router.get("/result/ranking/{simulation_id}", response_model=StandardResponse[List[SimulationRankingParameters]]) async def get_simulation_result_ranking(db_session: DbSession, simulation_id, limit:int = Query(None)): """Get simulation result.""" if simulation_id == 'default': simulation = await get_default_simulation(db_session=db_session) simulation_id = simulation.id simulation_result = await get_result_ranking(db_session=db_session, simulation_id=simulation_id, limit=limit) return { "data": simulation_result, "status": "success", "message": "Simulation result retrieved successfully", } @router.get("/result/critical/{simulation_id}", response_model=StandardResponse[List[SimulationCalc]]) async def get_critical_equipment(db_session:DbSession, simulation_id: str): # Step 1: Get all failure events for this simulation if simulation_id == 'default': simulation = await get_default_simulation(db_session=db_session) simulation_id = simulation.id failure_query = text(""" SELECT DISTINCT (elem ->> 'currentEvent') AS jenis, (elem ->> 'cumulativeTime')::numeric AS cumulative_time FROM rbd_tr_aeros_simulation_plot_result AS a JOIN public.rbd_ms_aeros_node AS b ON a.aeros_node_id = b.id JOIN LATERAL jsonb_array_elements(a.timestamp_outs) AS elem ON TRUE WHERE a.aeros_simulation_id = :simulation_id AND b.node_name = '- TJB - Unit 3 -' AND (elem ->> 'currentEQStatus') = 'OoS' AND (elem ->> 'currentEvent') != 'ON_OH' ORDER BY cumulative_time; """) query = await db_session.execute(failure_query, { "simulation_id": simulation_id, }) failures = query.fetchall() results = [] # Step 2: For each failure, find which equipment caused it for fail in failures: cumulative_time = fail.cumulative_time jenis = fail.jenis equipment_query = text(""" SELECT b.id FROM rbd_tr_aeros_simulation_plot_result AS a JOIN public.rbd_ms_aeros_node AS b ON a.aeros_node_id = b.id WHERE EXISTS ( SELECT 1 FROM jsonb_array_elements(a.timestamp_outs) AS elem WHERE (elem ->> 'currentEQStatus') = 'OoS' AND ABS((elem ->> 'cumulativeTime')::numeric - :cumulative_time) <= 2 ) AND a.aeros_simulation_id = :simulation_id AND b.node_type = 'RegularNode'; """) equipment = (await db_session.execute(equipment_query, { "simulation_id": simulation_id, "cumulative_time": cumulative_time })).fetchall() equipment_list = [eq.id for eq in equipment] results.extend(equipment_list) query = (select(AerosSimulationCalcResult).filter( AerosSimulationCalcResult.aeros_simulation_id == simulation_id)).filter(AerosSimulationCalcResult.aeros_node_id.in_(results)) query = query.options( selectinload(AerosSimulationCalcResult.aeros_node).options( selectinload(AerosNode.equipment) )) data = await db_session.execute(query) equipments = data.scalars().all() return { "data": equipments, "status": "success", "message": "Success", } @router.get("/custom_parameters", response_model=StandardResponse[list]) async def get_custom_parameters_controller(db_session: DbSession): """Get simulation result.""" latest_simulation = await get_simulation_by_id( db_session=db_session, simulation_id=None, is_completed=True ) custom_parameters = await get_custom_parameters( db_session=db_session, simulation_id=latest_simulation.id ) results = [{node.aeros_node.node_name: node.eaf} for node in custom_parameters] return { "data": results, "status": "success", "message": "Simulation result retrieved successfully", } @router.get("/metrics/{simulation_id}", response_model=StandardResponse[list]) async def get_metric_controller(db_session: DbSession, simulation_id:UUID): """Get simulation result.""" results = await calculate_plant_eaf(db_session, simulation_id, 0, 1200, 15000, 0) return { "data": results, "status": "success", "message": "Simulation result retrieved successfully", } @router.post("/ahm_metrics", response_model=StandardResponse[dict]) async def get_ahm_metrics_controller(db_session: DbSession, metrics_in:AhmMetricInput): simulation_result = await get_plant_calc_result( db_session=db_session, simulation_id=metrics_in.target_simulation_id ) default_simulation_id = metrics_in.baseline_simulation_id if metrics_in.baseline_simulation_id else (await get_default_simulation(db_session=db_session)).id default_simulation_result = await get_plant_calc_result( db_session=db_session, simulation_id=default_simulation_id ) result = { "eaf_before": default_simulation_result.eaf, "eaf_after": simulation_result.eaf, "efor_before": default_simulation_result.efor, "efor_after": simulation_result.efor, "eaf_delta": simulation_result.eaf - default_simulation_result.eaf, "efor_delta": simulation_result.efor - default_simulation_result.efor, } return { "data": result, "status": "success", "message": "Simulation result retrieved successfully", } airflow_router = APIRouter() @airflow_router.post("/calculate_eaf_contribution", response_model=StandardResponse[dict]) async def calculate_contribution( db_session: DbSession, simulation_in: SimulationInput, batch_num: int = Query(0, ge=0) ): """RUN Simulation""" #simulation_id = "2e0755bf-8cce-4743-9659-8d9920d556e7" project = await get_project(db_session=db_session) main_edh = 1.5000000000000966 main_efficiency_uptime = 697.5303030303029 - 1.5000000000000966 try: contribution_results = defaultdict() simulations_eq = select(AerosEquipment) eaf_contributions_data = [] eqs = (await db_session.execute(simulations_eq)).scalars().all() batch_size = 20 start_index = batch_num * batch_size end_index = start_index + batch_size if start_index >= len(eqs): return { "data": contribution_results, "status": "success", "message": "No more equipment to process", } if end_index > len(eqs): end_index = len(eqs) eqs = eqs[start_index:end_index] for eq in eqs: simulation = await create_simulation( db_session=db_session, simulation_in=simulation_in ) sim_data = simulation_in.model_dump(exclude={"SimulationName"}) sim_data["HubCnnId"] = str(simulation.id) sim_data["projectName"] = project.project_name custom_input = { eq.node_name: { "mttr": 721, "failure_rate": 0.01, } } results = await update_equipment_for_simulation( db_session=db_session, project_name=project.project_name, schematic_name=simulation_in.SchematicName, custom_input=custom_input ) # await update_simulation( # db_session=db_session, simulation_id=simulation_id, data={"reliability": results} # ) await execute_simulation( db_session=db_session, simulation_id=simulation.id, sim_data=sim_data, is_saved=True, eq_update=results ) eaf, edh, efficiency_uptime = await calculate_plant_eaf(db_session=db_session, simulation_id=simulation.id) eaf_contribution = (main_efficiency_uptime - efficiency_uptime)/main_efficiency_uptime if main_efficiency_uptime else 0 contribution_results[eq.node_name] = { "eaf": eaf, "edh": edh, "efficiency_uptime": efficiency_uptime, "eaf_contribution": eaf_contribution } eaf_conf = EafContribution( location_tag=eq.node_name, eaf_contribution=eaf_contribution, efficiency_uptime=efficiency_uptime, edh=edh, ) eaf_contributions_data.append(eaf_conf) await db_session.delete(simulation) await db_session.commit() db_session.add_all(eaf_contributions_data) await db_session.commit() return { "data": contribution_results, "status": "success", "message": "Simulation created successfully", } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) )