diff --git a/run_worker.py b/run_worker.py new file mode 100644 index 0000000..520ddab --- /dev/null +++ b/run_worker.py @@ -0,0 +1,35 @@ +import asyncio +from temporalio.client import Client +from temporalio.worker import Worker + +from temporal.activity import calculate_plant_eaf_activity, execute_simulation_activity, update_contribution_bulk_mappings_activity, update_equipment_for_simulation_activity +from temporal.workflow import SimulationWorkflow + + + +async def main(): + client = await Client.connect("http://192.168.1.86:7233") + + try: + worker = Worker( + client, + task_queue="simulation-task-queue", + workflows=[SimulationWorkflow], + activities=[ + update_equipment_for_simulation_activity, + execute_simulation_activity, + calculate_plant_eaf_activity, + update_contribution_bulk_mappings_activity + ], + max_concurrent_workflow_tasks=50, + max_concurrent_activities=12 + ) + await worker.run() + except Exception as e: + print(f"Worker failed: {e}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/aeros_simulation/router.py b/src/aeros_simulation/router.py index 1c96700..11c6f8d 100644 --- a/src/aeros_simulation/router.py +++ b/src/aeros_simulation/router.py @@ -5,7 +5,7 @@ from uuid import UUID from fastapi import APIRouter, BackgroundTasks, HTTPException, background, status, Query from sqlalchemy import select - +from temporalio.client import Client from src.aeros_contribution.service import update_contribution_bulk_mappings from src.aeros_equipment.model import AerosEquipment from src.aeros_simulation.model import EafContribution @@ -15,6 +15,7 @@ from src.database.service import CommonParameters from src.models import StandardResponse from src.aeros_equipment.service import update_equipment_for_simulation from src.aeros_project.service import get_project +from temporal.workflow import SimulationWorkflow from .schema import ( SimulationCalcResult, SimulationInput, @@ -77,6 +78,8 @@ async def run_simulations( background_tasks: BackgroundTasks ): """RUN Simulation""" + + temporal_client = await Client.connect("http://192.168.1.86:7233") simulation = await create_simulation( db_session=db_session, simulation_in=simulation_in @@ -85,48 +88,55 @@ async def run_simulations( project = await get_project(db_session=db_session) - sim_data = simulation_in.model_dump(exclude={"SimulationName", "IsDefault"}) + sim_data = simulation_in.model_dump() sim_data["HubCnnId"] = str(simulation_id) sim_data["projectName"] = project.project_name - # Prepare async background task - async def run_full_simulation(): - try: - results = await update_equipment_for_simulation( - db_session=db_session, - project_name=project.project_name, - schematic_name=simulation_in.SchematicName, - overhaul_duration=simulation_in.OverhaulDuration, - overhaul_interval=simulation_in.OverhaulInterval, - offset=simulation_in.OffSet, - custom_input=simulation_in.CustomInput, - ) - - await execute_simulation( - db_session=db_session, - simulation_id=simulation_id, - sim_data=sim_data, - is_saved=True, - eq_update=results, - ) - - await calculate_plant_eaf( - db_session=db_session, - simulation_id=simulation_id, - is_default=simulation_in.IsDefault, - konkin_offset=simulation_in.Konkin_offset, - ) - - await update_contribution_bulk_mappings( - db_session=db_session, simulation_id=simulation_id - ) - - except Exception as e: - # TODO: log error into DB or logger - print(f"Simulation {simulation_id} failed: {e}") - - # Add to background - background_tasks.add_task(run_full_simulation) + # # Prepare async background task + # async def run_full_simulation(): + # try: + # results = await update_equipment_for_simulation( + # db_session=db_session, + # project_name=project.project_name, + # schematic_name=simulation_in.SchematicName, + # overhaul_duration=simulation_in.OverhaulDuration, + # overhaul_interval=simulation_in.OverhaulInterval, + # offset=simulation_in.OffSet, + # custom_input=simulation_in.CustomInput, + # ) + + # await execute_simulation( + # db_session=db_session, + # simulation_id=simulation_id, + # sim_data=sim_data, + # is_saved=True, + # eq_update=results, + # ) + + # await calculate_plant_eaf( + # db_session=db_session, + # simulation_id=simulation_id, + # is_default=simulation_in.IsDefault, + # konkin_offset=simulation_in.Konkin_offset, + # ) + + # await update_contribution_bulk_mappings( + # db_session=db_session, simulation_id=simulation_id + # ) + + # except Exception as e: + # # TODO: log error into DB or logger + # print(f"Simulation {simulation_id} failed: {e}") + + # # Add to background + # background_tasks.add_task(run_full_simulation) + + handle = await temporal_client.start_workflow( + SimulationWorkflow.run, + sim_data, + id=f"simulation-{simulation_id}", + task_queue="simulation-task-queue", + ) return { "data": str(simulation_id), diff --git a/src/aeros_simulation/simulation_save_service.py b/src/aeros_simulation/simulation_save_service.py index 10ad417..937d480 100644 --- a/src/aeros_simulation/simulation_save_service.py +++ b/src/aeros_simulation/simulation_save_service.py @@ -53,7 +53,7 @@ async def execute_simulation( # Update simulation status simulation = await get_simulation_by_id(db_session=db_session, simulation_id=simulation_id) - simulation.status = "processing" + simulation.status = "running" await db_session.commit() await process_large_json_streaming( diff --git a/temporal/activity.py b/temporal/activity.py new file mode 100644 index 0000000..03e3683 --- /dev/null +++ b/temporal/activity.py @@ -0,0 +1,59 @@ +from temporalio import activity + + + +@activity.defn +async def update_equipment_for_simulation_activity(params: dict): + # ✅ Import inside the activity function + from src.aeros_equipment.service import update_equipment_for_simulation + from src.database.core import async_session + + async with async_session() as db_session: + return await update_equipment_for_simulation( + db_session=db_session, + project_name=params["projectName"], + overhaul_duration=params["OverhaulDuration"], + overhaul_interval=params["OverhaulInterval"], + offset=params["OffSet"], + schematic_name=params["SchematicName"], + custom_input=params["CustomInput"] + ) + + +@activity.defn +async def execute_simulation_activity(params: dict): + from src.database.core import async_session + from src.aeros_simulation.simulation_save_service import execute_simulation + async with async_session() as db_session: + return await execute_simulation( + db_session=db_session, + simulation_id=params["sim_data"]["HubCnnId"], + sim_data=params["sim_data"], + is_saved=True, + eq_update=params["eq_update"] + ) + + +@activity.defn +async def calculate_plant_eaf_activity(params: dict): + from src.aeros_simulation.simulation_save_service import calculate_plant_eaf + from src.database.core import async_session + + async with async_session() as db_session: + return await calculate_plant_eaf( + db_session=db_session, + simulation_id=params["HubCnnId"], + is_default=False, + ) + + +@activity.defn +async def update_contribution_bulk_mappings_activity(sim_id: str): + from src.aeros_contribution.service import update_contribution_bulk_mappings + from src.database.core import async_session + + async with async_session() as db_session: + return await update_contribution_bulk_mappings( + db_session=db_session, + simulation_id=sim_id, + ) \ No newline at end of file diff --git a/temporal/workflow.py b/temporal/workflow.py new file mode 100644 index 0000000..cfbd664 --- /dev/null +++ b/temporal/workflow.py @@ -0,0 +1,40 @@ +from datetime import timedelta +from temporalio import workflow + +from temporal.activity import calculate_plant_eaf_activity, execute_simulation_activity, update_contribution_bulk_mappings_activity, update_equipment_for_simulation_activity + + + +@workflow.defn +class SimulationWorkflow: + @workflow.run + async def run(self, sim_data: dict) -> str: + # 1. Update equipment + results = await workflow.execute_activity( + update_equipment_for_simulation_activity, + {**sim_data}, # ✅ one positional argument + start_to_close_timeout=timedelta(days=1) + ) + + # 2. Execute simulation + await workflow.execute_activity( + execute_simulation_activity, + {"sim_data": sim_data, "eq_update": results}, + start_to_close_timeout=timedelta(days=1) + ) + + # 3. Calculate EAF + await workflow.execute_activity( + calculate_plant_eaf_activity, + sim_data, + start_to_close_timeout=timedelta(days=1) + ) + + await workflow.execute_activity( + update_contribution_bulk_mappings_activity, + sim_data["HubCnnId"], + start_to_close_timeout=timedelta(days=1) + ) + + # etc… + return sim_data["HubCnnId"] # simulation_id