from datetime import timedelta from temporalio import workflow with workflow.unsafe.imports_passed_through(): from temporal.activity import calculate_plant_eaf_activity, execute_simulation_activity, update_contribution_bulk_mappings_activity, update_equipment_for_simulation_activity,call_callback_ahm, acquire_lock, release_lock @workflow.defn class SimulationWorkflow: def __init__(self): self.status = "INITIALIZED" self.isDone = False @workflow.query def get_status(self) -> str: return self.status @workflow.query def is_done(self) -> bool: return self.isDone @workflow.run async def run(self, sim_data: dict) -> str: self.status = "QUEUE" await workflow.execute_activity( acquire_lock, sim_data["HubCnnId"], start_to_close_timeout=timedelta(days=10) ) self.status = "RUNNING" # 1. Update equipment for simulation results = await workflow.execute_activity( update_equipment_for_simulation_activity, {**sim_data}, # ✅ one positional argument start_to_close_timeout=timedelta(days=1) ) # 2. Execute simulation await workflow.execute_activity( execute_simulation_activity, {"sim_data": sim_data, "eq_update": results}, start_to_close_timeout=timedelta(days=1) ) # 3. Calculate EAF await workflow.execute_activity( calculate_plant_eaf_activity, sim_data, start_to_close_timeout=timedelta(days=1) ) await workflow.execute_activity( update_contribution_bulk_mappings_activity, sim_data["HubCnnId"], start_to_close_timeout=timedelta(days=1) ) if sim_data.get("AhmJobId"): await workflow.execute_activity( call_callback_ahm, { "simulation_id": sim_data["HubCnnId"], "job_id": sim_data["AhmJobId"] }, start_to_close_timeout=timedelta(days=1) ) self.status = "COMPLETED" # etc… self.isDone = True await workflow.execute_activity( release_lock, sim_data["HubCnnId"], start_to_close_timeout=timedelta(minutes=10), ) return sim_data["HubCnnId"] # simulation_id