diff --git a/poetry.lock b/poetry.lock index db9c73c..8bccc1a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -169,6 +169,19 @@ doc = ["Sphinx (>=7.4,<8.0)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "truststore (>=0.9.1) ; python_version >= \"3.10\"", "uvloop (>=0.21.0b1) ; platform_python_implementation == \"CPython\" and platform_system != \"Windows\""] trio = ["trio (>=0.26.1)"] +[[package]] +name = "async-timeout" +version = "5.0.1" +description = "Timeout context manager for asyncio programs" +optional = false +python-versions = ">=3.8" +groups = ["main"] +markers = "python_version == \"3.11\" and python_full_version < \"3.11.3\"" +files = [ + {file = "async_timeout-5.0.1-py3-none-any.whl", hash = "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c"}, + {file = "async_timeout-5.0.1.tar.gz", hash = "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3"}, +] + [[package]] name = "asyncpg" version = "0.30.0" @@ -1982,6 +1995,27 @@ files = [ {file = "pyyaml-6.0.2.tar.gz", hash = "sha256:d584d9ec91ad65861cc08d42e834324ef890a082e591037abe114850ff7bbc3e"}, ] +[[package]] +name = "redis" +version = "7.1.0" +description = "Python client for Redis database and key-value store" +optional = false +python-versions = ">=3.10" +groups = ["main"] +files = [ + {file = "redis-7.1.0-py3-none-any.whl", hash = "sha256:23c52b208f92b56103e17c5d06bdc1a6c2c0b3106583985a76a18f83b265de2b"}, + {file = "redis-7.1.0.tar.gz", hash = "sha256:b1cc3cfa5a2cb9c2ab3ba700864fb0ad75617b41f01352ce5779dabf6d5f9c3c"}, +] + +[package.dependencies] +async-timeout = {version = ">=4.0.3", markers = "python_full_version < \"3.11.3\""} + +[package.extras] +circuit-breaker = ["pybreaker (>=1.4.0)"] +hiredis = ["hiredis (>=3.2.0)"] +jwt = ["pyjwt (>=2.9.0)"] +ocsp = ["cryptography (>=36.0.1)", "pyopenssl (>=20.0.1)", "requests (>=2.31.0)"] + [[package]] name = "requests" version = "2.32.3" @@ -2777,4 +2811,4 @@ propcache = ">=0.2.1" [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "c77b9ab7225ed5fd08d9945ae8f411394ffdc5be23414a79801c3c450e6ff8df" +content-hash = "9d879fd1a129aee9afcf93e7a1933a8d5b511a3bc3a09cb8b598a22835085716" diff --git a/pyproject.toml b/pyproject.toml index 65a0e8a..8bc34b8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ isort = "^6.0.1" dotenv = "^0.9.9" aiohttp = "^3.12.14" ijson = "^3.4.0" +redis = "^7.1.0" [build-system] diff --git a/temporal/activity.py b/temporal/activity.py index 84e0c3b..9e29497 100644 --- a/temporal/activity.py +++ b/temporal/activity.py @@ -1,9 +1,22 @@ +import asyncio import requests from temporalio import activity import os +import redis.asyncio as redis AHM_BASE_URL = os.getenv("AHM_BASE_URL", "http://192.168.1.82:8000/ahm") AHM_SIMULATION_CALLBACK_URL = os.getenv("AHM_SIMULATION_CALLBACK_URL", "/api/v1/simulations/rbd/callback") +REDIS_HOST = os.getenv("REDIS_HOST", "http://192.168.1.82") +REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) + + +redis_client = redis.Redis( + host=REDIS_HOST, # or docker service name + port=REDIS_PORT, + db=0, + decode_responses=True +) + @activity.defn async def update_equipment_for_simulation_activity(params: dict): @@ -84,4 +97,76 @@ async def call_callback_ahm(params): callback_response = requests.post(url, json=payload, timeout=5) callback_response.raise_for_status() except Exception as e: - raise e \ No newline at end of file + raise e + + +LOCK_KEY = "lock:aeros_process" +LOCK_TIMEOUT = 180 # seconds + +@activity.defn +async def acquire_lock(job_id: str) -> dict: + try: + while True: + acquired = await redis_client.set( + LOCK_KEY, + job_id, + nx=True, + ex=LOCK_TIMEOUT + ) + + if acquired: + # Lock acquired + redis_client.hset( + f"process:{job_id}", + mapping={ + "job_id": job_id, + "status": "Processing" + } + ) + await redis_client.expire(f"process:{job_id}", LOCK_TIMEOUT) + + return { + "job_id": job_id, + "status": "Ready" + } + + # Someone else holds the lock + redis_client.hset( + f"process:{job_id}", + mapping={ + "job_id": job_id, + "status": "Queue" + } + ) + await redis_client.expire(f"process:{job_id}", 300) + + # Cancellation-friendly wait + await asyncio.sleep(3) + + + except Exception: + redis_client.hset( + f"process:{job_id}", + mapping={ + "job_id": job_id, + "status": "Failed" + } + ) + await redis_client.expire(f"process:{job_id}", 300) + raise + +@activity.defn +async def release_lock(job_id: str): + lock_owner = await redis_client.get(LOCK_KEY) + + if lock_owner == job_id: + await redis_client.delete(LOCK_KEY) + + redis_client.hset( + f"process:{job_id}", + mapping={ + "job_id": job_id, + "status": "Done" + } + ) + await redis_client.expire(f"process:{job_id}", 300) \ No newline at end of file diff --git a/temporal/workflow.py b/temporal/workflow.py index 54d60e0..7b33d39 100644 --- a/temporal/workflow.py +++ b/temporal/workflow.py @@ -2,7 +2,7 @@ from datetime import timedelta from temporalio import workflow with workflow.unsafe.imports_passed_through(): - from temporal.activity import calculate_plant_eaf_activity, execute_simulation_activity, update_contribution_bulk_mappings_activity, update_equipment_for_simulation_activity,call_callback_ahm + from temporal.activity import calculate_plant_eaf_activity, execute_simulation_activity, update_contribution_bulk_mappings_activity, update_equipment_for_simulation_activity,call_callback_ahm, acquire_lock, release_lock @@ -22,6 +22,14 @@ class SimulationWorkflow: @workflow.run async def run(self, sim_data: dict) -> str: + self.status = "QUEUE" + + await workflow.execute_activity( + acquire_lock, + sim_data["HubCnnId"], + start_to_close_timeout=timedelta(minutes=10), + ) + self.status = "RUNNING" # 1. Update equipment for simulation @@ -65,4 +73,9 @@ class SimulationWorkflow: self.status = "COMPLETED" # etc… self.isDone = True + await workflow.execute_activity( + release_lock, + sim_data["HubCnnId"], + start_to_close_timeout=timedelta(minutes=10), + ) return sim_data["HubCnnId"] # simulation_id