You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

192 lines
6.1 KiB
Python

from datetime import datetime
from typing import List, Optional
from uuid import UUID
from fastapi import APIRouter, BackgroundTasks, HTTPException, background, status, Query
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters
from src.models import StandardResponse
from src.aeros_equipment.service import update_equipment_for_simulation
from src.aeros_project.service import get_project
from .schema import (
SimulationCalcResult,
SimulationInput,
SimulationPagination,
SimulationPlotResult,
SimulationCalc,
SimulationData
)
from .service import (
create_simulation,
execute_simulation,
get_all,
get_custom_parameters,
get_simulation_by_id,
get_simulation_with_calc_result,
get_simulation_with_plot_result,
update_simulation,
get_result_ranking
)
router = APIRouter()
active_simulations = {}
@router.get("", response_model=StandardResponse[SimulationPagination])
async def get_all_simulation(db_session: DbSession, common: CommonParameters):
"""Get all simulation."""
results = await get_all(common)
return {
"data": results,
"status": "success",
"message": "Simulations result retrieved successfully",
}
@router.get("/{simulation_id}", response_model=StandardResponse[SimulationData])
async def get_simulation(db_session: DbSession, simulation_id):
"""Get simulation."""
result = await get_simulation_by_id(db_session=db_session, simulation_id=simulation_id)
return {
"data": result,
"status": "success",
"message": "Simulation result retrieved successfully",
}
@router.post("/run", response_model=StandardResponse[str])
async def run_simulations(
db_session: DbSession,
simulation_in: SimulationInput,
background_tasks: BackgroundTasks,
):
"""RUN Simulation"""
simulation = await create_simulation(
db_session=db_session, simulation_in=simulation_in
)
simulation_id = simulation.id
project = await get_project(db_session=db_session)
try:
sim_data = simulation_in.model_dump(exclude={"SimulationName"})
sim_data["HubCnnId"] = str(simulation_id)
sim_data["projectName"] = project.project_name
##background_tasks.add_task(execute_simulation, db_session=db_session ,simulation_id=simulation_id, sim_data=sim_data)
results = await update_equipment_for_simulation(
db_session=db_session, project_name=project.project_name, schematic_name=simulation_in.SchematicName, custom_input=simulation_in.CustomInput
)
# await update_simulation(
# db_session=db_session, simulation_id=simulation_id, data={"reliability": results}
# )
await execute_simulation(
db_session=db_session, simulation_id=simulation_id, sim_data=sim_data, is_saved=True, eq_update=results
)
return {
"data": str(simulation_id),
"status": "success",
"message": "Simulation created successfully",
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
)
@router.get(
"/result/calc/{simulation_id}",
response_model=StandardResponse[List[SimulationCalc]],
)
async def get_simulation_result(db_session: DbSession, simulation_id, schematic_name: Optional[str] = Query(None)):
"""Get simulation result."""
simulation_result = await get_simulation_with_calc_result(
db_session=db_session, simulation_id=simulation_id, schematic_name=schematic_name
)
return {
"data": simulation_result,
"status": "success",
"message": "Simulation result retrieved successfully",
}
@router.get(
"/result/plot/{simulation_id}",
response_model=StandardResponse[SimulationPlotResult],
)
async def get_simulation_result_plot(db_session: DbSession, simulation_id):
"""Get simulation result."""
simulation_result = await get_simulation_with_plot_result(
db_session=db_session, simulation_id=simulation_id
)
return {
"data": simulation_result,
"status": "success",
"message": "Simulation result retrieved successfully",
}
@router.get("/result/ranking/{simulation_id}", response_model=StandardResponse[List[SimulationCalc]])
async def get_simulation_result_ranking(db_session: DbSession, simulation_id):
"""Get simulation result."""
simulation_result = await get_result_ranking(db_session=db_session, simulation_id=simulation_id)
return {
"data": simulation_result,
"status": "success",
"message": "Simulation result retrieved successfully",
}
@router.get("/custom_parameters", response_model=StandardResponse[list])
async def get_custom_parameters_controller(db_session: DbSession):
"""Get simulation result."""
latest_simulation = await get_simulation_by_id(
db_session=db_session, simulation_id=None, is_completed=True
)
custom_parameters = await get_custom_parameters(
db_session=db_session, simulation_id=latest_simulation.id
)
results = [{node.aeros_node.node_name: node.eaf} for node in custom_parameters]
return {
"data": results,
"status": "success",
"message": "Simulation result retrieved successfully",
}
# @router.get("/status/{simulation_id}", response_model=StandardResponse[None])
# async def get_simulation_status(simulation_id: str):
# """Get simulation status."""
# if simulation_id not in active_simulations:
# raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Simulation not found")
# return active_simulations[simulation_id]
# @router.post("/cancel/{simulation_id}", response_model=StandardResponse[None])
# async def cancel_simulation(simulation_id: str):
# """Cancel simulation."""
# if simulation_id not in active_simulations:
# raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Simulation not found")
# active_simulations[simulation_id].update({
# "status": "cancelled",
# "cancelled_at": datetime.now()
# })
# return active_simulations[simulation_id]