add lock to rbd to prevent db collation

main
Cizz22 3 weeks ago
parent fe875a3677
commit 1abf9ad8ef

36
poetry.lock generated

@ -169,6 +169,19 @@ doc = ["Sphinx (>=7.4,<8.0)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)",
test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "truststore (>=0.9.1) ; python_version >= \"3.10\"", "uvloop (>=0.21.0b1) ; platform_python_implementation == \"CPython\" and platform_system != \"Windows\""]
trio = ["trio (>=0.26.1)"]
[[package]]
name = "async-timeout"
version = "5.0.1"
description = "Timeout context manager for asyncio programs"
optional = false
python-versions = ">=3.8"
groups = ["main"]
markers = "python_version == \"3.11\" and python_full_version < \"3.11.3\""
files = [
{file = "async_timeout-5.0.1-py3-none-any.whl", hash = "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c"},
{file = "async_timeout-5.0.1.tar.gz", hash = "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3"},
]
[[package]]
name = "asyncpg"
version = "0.30.0"
@ -1982,6 +1995,27 @@ files = [
{file = "pyyaml-6.0.2.tar.gz", hash = "sha256:d584d9ec91ad65861cc08d42e834324ef890a082e591037abe114850ff7bbc3e"},
]
[[package]]
name = "redis"
version = "7.1.0"
description = "Python client for Redis database and key-value store"
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "redis-7.1.0-py3-none-any.whl", hash = "sha256:23c52b208f92b56103e17c5d06bdc1a6c2c0b3106583985a76a18f83b265de2b"},
{file = "redis-7.1.0.tar.gz", hash = "sha256:b1cc3cfa5a2cb9c2ab3ba700864fb0ad75617b41f01352ce5779dabf6d5f9c3c"},
]
[package.dependencies]
async-timeout = {version = ">=4.0.3", markers = "python_full_version < \"3.11.3\""}
[package.extras]
circuit-breaker = ["pybreaker (>=1.4.0)"]
hiredis = ["hiredis (>=3.2.0)"]
jwt = ["pyjwt (>=2.9.0)"]
ocsp = ["cryptography (>=36.0.1)", "pyopenssl (>=20.0.1)", "requests (>=2.31.0)"]
[[package]]
name = "requests"
version = "2.32.3"
@ -2777,4 +2811,4 @@ propcache = ">=0.2.1"
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "c77b9ab7225ed5fd08d9945ae8f411394ffdc5be23414a79801c3c450e6ff8df"
content-hash = "9d879fd1a129aee9afcf93e7a1933a8d5b511a3bc3a09cb8b598a22835085716"

@ -30,6 +30,7 @@ isort = "^6.0.1"
dotenv = "^0.9.9"
aiohttp = "^3.12.14"
ijson = "^3.4.0"
redis = "^7.1.0"
[build-system]

@ -1,9 +1,22 @@
import asyncio
import requests
from temporalio import activity
import os
import redis.asyncio as redis
AHM_BASE_URL = os.getenv("AHM_BASE_URL", "http://192.168.1.82:8000/ahm")
AHM_SIMULATION_CALLBACK_URL = os.getenv("AHM_SIMULATION_CALLBACK_URL", "/api/v1/simulations/rbd/callback")
REDIS_HOST = os.getenv("REDIS_HOST", "http://192.168.1.82")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
redis_client = redis.Redis(
host=REDIS_HOST, # or docker service name
port=REDIS_PORT,
db=0,
decode_responses=True
)
@activity.defn
async def update_equipment_for_simulation_activity(params: dict):
@ -84,4 +97,76 @@ async def call_callback_ahm(params):
callback_response = requests.post(url, json=payload, timeout=5)
callback_response.raise_for_status()
except Exception as e:
raise e
raise e
LOCK_KEY = "lock:aeros_process"
LOCK_TIMEOUT = 180 # seconds
@activity.defn
async def acquire_lock(job_id: str) -> dict:
try:
while True:
acquired = await redis_client.set(
LOCK_KEY,
job_id,
nx=True,
ex=LOCK_TIMEOUT
)
if acquired:
# Lock acquired
redis_client.hset(
f"process:{job_id}",
mapping={
"job_id": job_id,
"status": "Processing"
}
)
await redis_client.expire(f"process:{job_id}", LOCK_TIMEOUT)
return {
"job_id": job_id,
"status": "Ready"
}
# Someone else holds the lock
redis_client.hset(
f"process:{job_id}",
mapping={
"job_id": job_id,
"status": "Queue"
}
)
await redis_client.expire(f"process:{job_id}", 300)
# Cancellation-friendly wait
await asyncio.sleep(3)
except Exception:
redis_client.hset(
f"process:{job_id}",
mapping={
"job_id": job_id,
"status": "Failed"
}
)
await redis_client.expire(f"process:{job_id}", 300)
raise
@activity.defn
async def release_lock(job_id: str):
lock_owner = await redis_client.get(LOCK_KEY)
if lock_owner == job_id:
await redis_client.delete(LOCK_KEY)
redis_client.hset(
f"process:{job_id}",
mapping={
"job_id": job_id,
"status": "Done"
}
)
await redis_client.expire(f"process:{job_id}", 300)

@ -2,7 +2,7 @@ from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from temporal.activity import calculate_plant_eaf_activity, execute_simulation_activity, update_contribution_bulk_mappings_activity, update_equipment_for_simulation_activity,call_callback_ahm
from temporal.activity import calculate_plant_eaf_activity, execute_simulation_activity, update_contribution_bulk_mappings_activity, update_equipment_for_simulation_activity,call_callback_ahm, acquire_lock, release_lock
@ -22,6 +22,14 @@ class SimulationWorkflow:
@workflow.run
async def run(self, sim_data: dict) -> str:
self.status = "QUEUE"
await workflow.execute_activity(
acquire_lock,
sim_data["HubCnnId"],
start_to_close_timeout=timedelta(minutes=10),
)
self.status = "RUNNING"
# 1. Update equipment for simulation
@ -65,4 +73,9 @@ class SimulationWorkflow:
self.status = "COMPLETED"
# etc…
self.isDone = True
await workflow.execute_activity(
release_lock,
sim_data["HubCnnId"],
start_to_close_timeout=timedelta(minutes=10),
)
return sim_data["HubCnnId"] # simulation_id

Loading…
Cancel
Save