from collections import defaultdict from datetime import datetime from typing import List, Optional from uuid import UUID from fastapi import APIRouter, BackgroundTasks, HTTPException, background, status, Query from sqlalchemy import select from src.aeros_equipment.model import AerosEquipment from src.aeros_simulation.model import EafContribution from src.auth.service import CurrentUser from src.database.core import DbSession from src.database.service import CommonParameters from src.models import StandardResponse from src.aeros_equipment.service import update_equipment_for_simulation from src.aeros_project.service import get_project from .schema import ( SimulationCalcResult, SimulationInput, SimulationPagination, SimulationPlot, SimulationPlotResult, SimulationCalc, SimulationData, SimulationRankingParameters ) from .service import ( create_simulation, # execute_simulation, get_all, get_custom_parameters, get_simulation_by_id, get_simulation_with_calc_result, get_simulation_with_plot_result, update_simulation, get_result_ranking, get_plant_calc_result ) from .simulation_save_service import calculate_plant_eaf, execute_simulation from src.aeros_equipment.schema import EquipmentWithCustomParameters router = APIRouter() active_simulations = {} @router.get("", response_model=StandardResponse[SimulationPagination]) async def get_all_simulation(db_session: DbSession, common: CommonParameters): """Get all simulation.""" results = await get_all(common) return { "data": results, "status": "success", "message": "Simulations result retrieved successfully", } @router.get("/{simulation_id}", response_model=StandardResponse[SimulationData]) async def get_simulation(db_session: DbSession, simulation_id): """Get simulation.""" result = await get_simulation_by_id(db_session=db_session, simulation_id=simulation_id) return { "data": result, "status": "success", "message": "Simulation result retrieved successfully", } @router.post("/run", response_model=StandardResponse[str]) async def run_simulations( db_session: DbSession, simulation_in: SimulationInput, background_tasks: BackgroundTasks, ): """RUN Simulation""" simulation = await create_simulation( db_session=db_session, simulation_in=simulation_in ) simulation_id = simulation.id #simulation_id = "2e0755bf-8cce-4743-9659-8d9920d556e7" project = await get_project(db_session=db_session) try: sim_data = simulation_in.model_dump(exclude={"SimulationName"}) sim_data["HubCnnId"] = str(simulation_id) sim_data["projectName"] = project.project_name # ##background_tasks.add_task(execute_simulation, db_session=db_session ,simulation_id=simulation_id, sim_data=sim_data) results = await update_equipment_for_simulation( db_session=db_session, project_name=project.project_name, schematic_name=simulation_in.SchematicName, custom_input=simulation_in.CustomInput ) # await update_simulation( # db_session=db_session, simulation_id=simulation_id, data={"reliability": results} # ) await execute_simulation( db_session=db_session, simulation_id=simulation_id, sim_data=sim_data, is_saved=True, eq_update=results ) await calculate_plant_eaf(db_session=db_session, simulation_id=simulation_id) return { "data": str(simulation_id), "status": "success", "message": "Simulation created successfully", } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @router.get( "/result/calc/{simulation_id}", response_model=StandardResponse[List[SimulationCalc]], ) async def get_simulation_result(db_session: DbSession, simulation_id, schematic_name: Optional[str] = Query(None), node_type = Query(None, alias="nodetype")): """Get simulation result.""" simulation_result = await get_simulation_with_calc_result( db_session=db_session, simulation_id=simulation_id, schematic_name=schematic_name, node_type=node_type ) return { "data": simulation_result, "status": "success", "message": "Simulation result retrieved successfully", } @router.get( "/result/calc/{simulation_id}/plant", response_model=StandardResponse[SimulationCalc], ) async def get_simulation_result_plant(db_session: DbSession, simulation_id): """Get simulation result.""" simulation_result = await get_plant_calc_result( db_session=db_session, simulation_id=simulation_id ) return { "data": simulation_result, "status": "success", "message": "Simulation result retrieved successfully", } @router.get( "/result/plot/{simulation_id}", response_model=StandardResponse[List[SimulationPlot]], ) async def get_simulation_result_plot(db_session: DbSession, simulation_id): """Get simulation result.""" simulation_result = await get_simulation_with_plot_result( db_session=db_session, simulation_id=simulation_id ) return { "data": simulation_result, "status": "success", "message": "Simulation result retrieved successfully", } @router.get( "/result/plot/{simulation_id}/{node_id}", response_model=StandardResponse[SimulationPlot], ) async def get_simulation_result_plot_per_node(db_session: DbSession, simulation_id, node_id): """Get simulation result.""" simulation_result = await get_simulation_with_plot_result( db_session=db_session, simulation_id=simulation_id, node_id=node_id ) return { "data": simulation_result, "status": "success", "message": "Simulation result retrieved successfully", } @router.get("/result/ranking/{simulation_id}", response_model=StandardResponse[List[SimulationRankingParameters]]) async def get_simulation_result_ranking(db_session: DbSession, simulation_id): """Get simulation result.""" simulation_result = await get_result_ranking(db_session=db_session, simulation_id=simulation_id) return { "data": simulation_result, "status": "success", "message": "Simulation result retrieved successfully", } @router.get("/custom_parameters", response_model=StandardResponse[list]) async def get_custom_parameters_controller(db_session: DbSession): """Get simulation result.""" latest_simulation = await get_simulation_by_id( db_session=db_session, simulation_id=None, is_completed=True ) custom_parameters = await get_custom_parameters( db_session=db_session, simulation_id=latest_simulation.id ) results = [{node.aeros_node.node_name: node.eaf} for node in custom_parameters] return { "data": results, "status": "success", "message": "Simulation result retrieved successfully", } airflow_router = APIRouter() @airflow_router.post("/calculate_eaf_contribution", response_model=StandardResponse[dict]) async def calculate_contribution( db_session: DbSession, simulation_in: SimulationInput, batch_num: int = Query(0, ge=0) ): """RUN Simulation""" #simulation_id = "2e0755bf-8cce-4743-9659-8d9920d556e7" project = await get_project(db_session=db_session) main_edh = 7.250000000000884 try: contribution_results = defaultdict() simulations_eq = select(AerosEquipment) eaf_contributions_data = [] eqs = (await db_session.execute(simulations_eq)).scalars().all() batch_size = 20 start_index = batch_num * batch_size end_index = start_index + batch_size if start_index >= len(eqs): return { "data": contribution_results, "status": "success", "message": "No more equipment to process", } if end_index > len(eqs): end_index = len(eqs) eqs = eqs[start_index:end_index] for eq in eqs: simulation = await create_simulation( db_session=db_session, simulation_in=simulation_in ) sim_data = simulation_in.model_dump(exclude={"SimulationName"}) sim_data["HubCnnId"] = str(simulation.id) sim_data["projectName"] = project.project_name custom_input = { eq.node_name: { "mttr": 8760, "failure_rate": 0.01, } } results = await update_equipment_for_simulation( db_session=db_session, project_name=project.project_name, schematic_name=simulation_in.SchematicName, custom_input=custom_input ) # await update_simulation( # db_session=db_session, simulation_id=simulation_id, data={"reliability": results} # ) await execute_simulation( db_session=db_session, simulation_id=simulation.id, sim_data=sim_data, is_saved=True, eq_update=results ) eaf, edh = await calculate_plant_eaf(db_session=db_session, simulation_id=simulation.id) eaf_contribution = (main_edh - edh)/main_edh if main_edh else 0 contribution_results[eq.node_name] = { "eaf": eaf, "edh": edh, "eaf_contribution": eaf_contribution } eaf_conf = EafContribution( location_tag=eq.node_name, eaf_contribution=eaf_contribution ) eaf_contributions_data.append(eaf_conf) await db_session.delete(simulation) await db_session.commit() db_session.add_all(eaf_contributions_data) await db_session.commit() return { "data": contribution_results, "status": "success", "message": "Simulation created successfully", } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) # @router.get("/status/{simulation_id}", response_model=StandardResponse[None]) # async def get_simulation_status(simulation_id: str): # """Get simulation status.""" # if simulation_id not in active_simulations: # raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Simulation not found") # return active_simulations[simulation_id] # @router.post("/cancel/{simulation_id}", response_model=StandardResponse[None]) # async def cancel_simulation(simulation_id: str): # """Cancel simulation.""" # if simulation_id not in active_simulations: # raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Simulation not found") # active_simulations[simulation_id].update({ # "status": "cancelled", # "cancelled_at": datetime.now() # }) # return active_simulations[simulation_id]