temporal io implementation

main
Cizz22 3 months ago
parent 30a4466504
commit 5cb4bafeb1

@ -0,0 +1,35 @@
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from temporal.activity import calculate_plant_eaf_activity, execute_simulation_activity, update_contribution_bulk_mappings_activity, update_equipment_for_simulation_activity
from temporal.workflow import SimulationWorkflow
async def main():
client = await Client.connect("http://192.168.1.86:7233")
try:
worker = Worker(
client,
task_queue="simulation-task-queue",
workflows=[SimulationWorkflow],
activities=[
update_equipment_for_simulation_activity,
execute_simulation_activity,
calculate_plant_eaf_activity,
update_contribution_bulk_mappings_activity
],
max_concurrent_workflow_tasks=50,
max_concurrent_activities=12
)
await worker.run()
except Exception as e:
print(f"Worker failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())

@ -5,7 +5,7 @@ from uuid import UUID
from fastapi import APIRouter, BackgroundTasks, HTTPException, background, status, Query
from sqlalchemy import select
from temporalio.client import Client
from src.aeros_contribution.service import update_contribution_bulk_mappings
from src.aeros_equipment.model import AerosEquipment
from src.aeros_simulation.model import EafContribution
@ -15,6 +15,7 @@ from src.database.service import CommonParameters
from src.models import StandardResponse
from src.aeros_equipment.service import update_equipment_for_simulation
from src.aeros_project.service import get_project
from temporal.workflow import SimulationWorkflow
from .schema import (
SimulationCalcResult,
SimulationInput,
@ -77,6 +78,8 @@ async def run_simulations(
background_tasks: BackgroundTasks
):
"""RUN Simulation"""
temporal_client = await Client.connect("http://192.168.1.86:7233")
simulation = await create_simulation(
db_session=db_session, simulation_in=simulation_in
@ -85,48 +88,55 @@ async def run_simulations(
project = await get_project(db_session=db_session)
sim_data = simulation_in.model_dump(exclude={"SimulationName", "IsDefault"})
sim_data = simulation_in.model_dump()
sim_data["HubCnnId"] = str(simulation_id)
sim_data["projectName"] = project.project_name
# Prepare async background task
async def run_full_simulation():
try:
results = await update_equipment_for_simulation(
db_session=db_session,
project_name=project.project_name,
schematic_name=simulation_in.SchematicName,
overhaul_duration=simulation_in.OverhaulDuration,
overhaul_interval=simulation_in.OverhaulInterval,
offset=simulation_in.OffSet,
custom_input=simulation_in.CustomInput,
)
await execute_simulation(
db_session=db_session,
simulation_id=simulation_id,
sim_data=sim_data,
is_saved=True,
eq_update=results,
)
await calculate_plant_eaf(
db_session=db_session,
simulation_id=simulation_id,
is_default=simulation_in.IsDefault,
konkin_offset=simulation_in.Konkin_offset,
)
await update_contribution_bulk_mappings(
db_session=db_session, simulation_id=simulation_id
)
except Exception as e:
# TODO: log error into DB or logger
print(f"Simulation {simulation_id} failed: {e}")
# Add to background
background_tasks.add_task(run_full_simulation)
# # Prepare async background task
# async def run_full_simulation():
# try:
# results = await update_equipment_for_simulation(
# db_session=db_session,
# project_name=project.project_name,
# schematic_name=simulation_in.SchematicName,
# overhaul_duration=simulation_in.OverhaulDuration,
# overhaul_interval=simulation_in.OverhaulInterval,
# offset=simulation_in.OffSet,
# custom_input=simulation_in.CustomInput,
# )
# await execute_simulation(
# db_session=db_session,
# simulation_id=simulation_id,
# sim_data=sim_data,
# is_saved=True,
# eq_update=results,
# )
# await calculate_plant_eaf(
# db_session=db_session,
# simulation_id=simulation_id,
# is_default=simulation_in.IsDefault,
# konkin_offset=simulation_in.Konkin_offset,
# )
# await update_contribution_bulk_mappings(
# db_session=db_session, simulation_id=simulation_id
# )
# except Exception as e:
# # TODO: log error into DB or logger
# print(f"Simulation {simulation_id} failed: {e}")
# # Add to background
# background_tasks.add_task(run_full_simulation)
handle = await temporal_client.start_workflow(
SimulationWorkflow.run,
sim_data,
id=f"simulation-{simulation_id}",
task_queue="simulation-task-queue",
)
return {
"data": str(simulation_id),

@ -53,7 +53,7 @@ async def execute_simulation(
# Update simulation status
simulation = await get_simulation_by_id(db_session=db_session, simulation_id=simulation_id)
simulation.status = "processing"
simulation.status = "running"
await db_session.commit()
await process_large_json_streaming(

@ -0,0 +1,59 @@
from temporalio import activity
@activity.defn
async def update_equipment_for_simulation_activity(params: dict):
# ✅ Import inside the activity function
from src.aeros_equipment.service import update_equipment_for_simulation
from src.database.core import async_session
async with async_session() as db_session:
return await update_equipment_for_simulation(
db_session=db_session,
project_name=params["projectName"],
overhaul_duration=params["OverhaulDuration"],
overhaul_interval=params["OverhaulInterval"],
offset=params["OffSet"],
schematic_name=params["SchematicName"],
custom_input=params["CustomInput"]
)
@activity.defn
async def execute_simulation_activity(params: dict):
from src.database.core import async_session
from src.aeros_simulation.simulation_save_service import execute_simulation
async with async_session() as db_session:
return await execute_simulation(
db_session=db_session,
simulation_id=params["sim_data"]["HubCnnId"],
sim_data=params["sim_data"],
is_saved=True,
eq_update=params["eq_update"]
)
@activity.defn
async def calculate_plant_eaf_activity(params: dict):
from src.aeros_simulation.simulation_save_service import calculate_plant_eaf
from src.database.core import async_session
async with async_session() as db_session:
return await calculate_plant_eaf(
db_session=db_session,
simulation_id=params["HubCnnId"],
is_default=False,
)
@activity.defn
async def update_contribution_bulk_mappings_activity(sim_id: str):
from src.aeros_contribution.service import update_contribution_bulk_mappings
from src.database.core import async_session
async with async_session() as db_session:
return await update_contribution_bulk_mappings(
db_session=db_session,
simulation_id=sim_id,
)

@ -0,0 +1,40 @@
from datetime import timedelta
from temporalio import workflow
from temporal.activity import calculate_plant_eaf_activity, execute_simulation_activity, update_contribution_bulk_mappings_activity, update_equipment_for_simulation_activity
@workflow.defn
class SimulationWorkflow:
@workflow.run
async def run(self, sim_data: dict) -> str:
# 1. Update equipment
results = await workflow.execute_activity(
update_equipment_for_simulation_activity,
{**sim_data}, # ✅ one positional argument
start_to_close_timeout=timedelta(days=1)
)
# 2. Execute simulation
await workflow.execute_activity(
execute_simulation_activity,
{"sim_data": sim_data, "eq_update": results},
start_to_close_timeout=timedelta(days=1)
)
# 3. Calculate EAF
await workflow.execute_activity(
calculate_plant_eaf_activity,
sim_data,
start_to_close_timeout=timedelta(days=1)
)
await workflow.execute_activity(
update_contribution_bulk_mappings_activity,
sim_data["HubCnnId"],
start_to_close_timeout=timedelta(days=1)
)
# etc…
return sim_data["HubCnnId"] # simulation_id
Loading…
Cancel
Save