import asyncio import requests from temporalio import activity import os import redis.asyncio as redis AHM_BASE_URL = os.getenv("AHM_BASE_URL", "http://192.168.1.82:8000/ahm") AHM_SIMULATION_CALLBACK_URL = os.getenv("AHM_SIMULATION_CALLBACK_URL", "/api/v1/simulations/rbd/callback") REDIS_HOST = os.getenv("REDIS_HOST", "192.168.1.82") REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) redis_client = redis.Redis( host=REDIS_HOST, # or docker service name port=REDIS_PORT, db=0, decode_responses=True ) @activity.defn async def update_equipment_for_simulation_activity(params: dict): # ✅ Import inside the activity function from src.aeros_equipment.service import update_equipment_for_simulation from src.database.core import async_session, async_aeros_session async with async_session() as db_session, async_aeros_session() as aeros_db_session: return await update_equipment_for_simulation( db_session=db_session, aeros_db_session=aeros_db_session, project_name=params["projectName"], overhaul_duration=params["OverhaulDuration"], overhaul_interval=params["OverhaulInterval"], offset=params["OffSet"], schematic_name=params["SchematicName"], custom_input=params["CustomInput"], simulation_duration=params["SimDuration"] ) @activity.defn async def execute_simulation_activity(params: dict): from src.database.core import async_session from src.aeros_simulation.simulation_save_service import execute_simulation async with async_session() as db_session: return await execute_simulation( db_session=db_session, simulation_id=params["sim_data"]["HubCnnId"], sim_data=params["sim_data"], is_saved=True, eq_update=params["eq_update"] ) @activity.defn async def calculate_plant_eaf_activity(params: dict): from src.aeros_simulation.simulation_save_service import calculate_plant_eaf from src.database.core import async_session async with async_session() as db_session: return await calculate_plant_eaf( db_session=db_session, simulation_id=params["HubCnnId"], mo_downtime=params["MaintenanceOutages"], po_downtime=params["OverhaulDuration"], oh_interval=params["OverhaulInterval"], offset=params["OffSet"] ) @activity.defn async def update_contribution_bulk_mappings_activity(sim_id: str): from src.aeros_contribution.service import update_contribution_bulk_mappings from src.database.core import async_session async with async_session() as db_session: return await update_contribution_bulk_mappings( db_session=db_session, simulation_id=sim_id, ) @activity.defn async def call_callback_ahm(params): sim_id = params["simulation_id"] job_id = params["job_id"] url = f"{AHM_BASE_URL}{AHM_SIMULATION_CALLBACK_URL}" payload = { "simulation_id": sim_id, "job_id": job_id, "status" : "completed" } try: callback_response = requests.post(url, json=payload, timeout=5) callback_response.raise_for_status() except Exception as e: raise e LOCK_KEY = "lock:aeros_process" LOCK_TIMEOUT = 864000 # seconds @activity.defn async def acquire_lock(job_id: str) -> dict: try: while True: acquired = await redis_client.set( LOCK_KEY, job_id, nx=True, ex=LOCK_TIMEOUT ) if acquired: # Lock acquired redis_client.hset( f"process:{job_id}", mapping={ "job_id": job_id, "status": "Processing" } ) await redis_client.expire(f"process:{job_id}", LOCK_TIMEOUT) return { "job_id": job_id, "status": "Ready" } # Someone else holds the lock redis_client.hset( f"process:{job_id}", mapping={ "job_id": job_id, "status": "Queue" } ) await redis_client.expire(f"process:{job_id}", 300) # Cancellation-friendly wait await asyncio.sleep(3) except Exception: redis_client.hset( f"process:{job_id}", mapping={ "job_id": job_id, "status": "Failed" } ) await redis_client.expire(f"process:{job_id}", 300) raise @activity.defn async def release_lock(job_id: str): await redis_client.delete(LOCK_KEY) redis_client.hset( f"process:{job_id}", mapping={ "job_id": job_id, "status": "Done" } ) await redis_client.expire(f"process:{job_id}", 300)