diff --git a/src/aeros_equipment/service.py b/src/aeros_equipment/service.py index 0cfa68e..8a962bc 100644 --- a/src/aeros_equipment/service.py +++ b/src/aeros_equipment/service.py @@ -342,146 +342,175 @@ async def update_oh_interval_offset(*, aeros_db_session, overhaul_offset, overha await aeros_db_session.commit() -async def update_equipment_for_simulation(*, db_session: DbSession,aeros_db_session:CollectorDbSession ,project_name: str,simulation_duration:int ,overhaul_duration, overhaul_interval, offset ,schematic_name: str, custom_input: Optional[dict] = None): +async def update_equipment_for_simulation( + *, + db_session: DbSession, + aeros_db_session: CollectorDbSession, + project_name: str, + simulation_duration: int, + overhaul_duration, + overhaul_interval, + offset, + schematic_name: str, + custom_input: Optional[dict] = None +): log.info("Updating equipment for simulation") - aeros_schematic = await get_aeros_schematic_by_name(db_session=db_session, schematic_name=schematic_name) + aeros_schematic = await get_aeros_schematic_by_name( + db_session=db_session, + schematic_name=schematic_name + ) - equipments = Select(AerosEquipment).where( - AerosEquipment.location_tag.isnot(None) - ).join(AerosEquipment.aeros_node).filter(and_( - AerosNode.aeros_schematic_id == aeros_schematic.id, - AerosNode.node_type == "RegularNode" - )) + equipments = ( + Select(AerosEquipment) + .where(AerosEquipment.location_tag.isnot(None)) + .join(AerosEquipment.aeros_node) + .filter( + AerosNode.aeros_schematic_id == aeros_schematic.id, + AerosNode.node_type == "RegularNode" + ) + ) rbd_group = Select(AerosEquipmentGroup) - print("Getting equipment nodes") - equipment_nodes = await db_session.execute(equipments) rbd_group_nodes = await db_session.execute(rbd_group) - group_nodes = [group.group_name for group in rbd_group_nodes.scalars().all()] - reg_nodes = [node.location_tag for node in equipment_nodes.scalars().all()] + group_nodes = [g.group_name for g in rbd_group_nodes.scalars().all()] + reg_nodes = [n.location_tag for n in equipment_nodes.scalars().all()] reg_nodes.extend(group_nodes) nodes_data = await get_aeros_equipment_by_location_tag( - location_tag=reg_nodes, project_name=project_name + location_tag=reg_nodes, + project_name=project_name ) - + def load_json_file(filename: str): - """Load a JSON file and return its content.""" - try: - with open(filename, 'r', encoding='utf-8') as f: - return json.load(f) - except FileNotFoundError: - raise FileNotFoundError(f"JSON file not found: {filename}") - except json.JSONDecodeError as e: - raise ValueError(f"Invalid JSON in file {filename}: {e}") + with open(filename, "r", encoding="utf-8") as f: + return json.load(f) + + non_repairable = await db_session.execute( + Select(ReliabilityPredictNonRepairable) + ) + non_repairable_location_tags = [ + t.location_tag for t in non_repairable.scalars().all() + ] - non_repairable = await db_session.execute(Select(ReliabilityPredictNonRepairable)) - non_repairable_location_tags = [tag.location_tag for tag in non_repairable.scalars().all()] + reliability_data = get_asset_batch( + reg_nodes, + non_repairable_location_tags, + RELIABILITY_SERVICE_API + ) - print("Getting reliability data") - reliability_data = get_asset_batch(reg_nodes, non_repairable_location_tags, RELIABILITY_SERVICE_API) + # Normalize custom input keys once + custom_map = {} + if custom_input: + custom_map = { + k.strip().upper(): v + for k, v in custom_input.items() + } + log.info("Custom input provided for: %s", list(custom_map.keys())) reqNodeInputs = [] - results = defaultdict() - - trip_eq = load_json_file("src/aeros_equipment/trip_eq.json") - - print("Updating Overhaul Offset & Overhaul Interval") - await update_oh_interval_offset(aeros_db_session=aeros_db_session, project_name=project_name,overhaul_interval=overhaul_interval, overhaul_offset=offset) + results = {} + + await update_oh_interval_offset( + aeros_db_session=aeros_db_session, + project_name=project_name, + overhaul_interval=overhaul_interval, + overhaul_offset=offset + ) for eq in nodes_data: - try: - # Check if eq already exists in results - if eq["equipmentName"] in results.keys(): - continue - - - reliabiility = reliability_data.get(eq["equipmentName"], {}) - - if custom_input and eq["equipmentName"] in custom_input: - custom_param = custom_input[eq["equipmentName"]] - - if not custom_param["mttr"]: - continue - if not custom_param['failure_rate']: - continue - - # Check eq dengan "TRIP" di WO, jika ada masukkan parameter Reliabilitu, jika tidak ada MTBF = duration - if custom_param['failure_rate'] == 0: - MTBF = 100000 - else: - MTBF = 1e+6 / float(custom_param['failure_rate']) - - - eq["cmDisP1"] = custom_param["mttr"] - eq["relDisType"] = custom_param["distribution"] if "distribution" in custom_param else "Fixed" - # eq["relDisP1"] = float(custom_param["failure_rate"]) - # eq["relDisP2"] = 0 - - if eq["relDisType"] == "Fixed": - eq["relDisP1"] = MTBF - eq["relDisP2"] = 0 - else: - eq["relDisP1"] = 1 - eq["relDisP2"] = MTBF - - - eq["ohDisP1"] = overhaul_duration - eq["ohDisUnitCode"] = "UHour" - - reqNodeInputs.append(eq) - results[eq["equipmentName"]] = { - "mttr": eq["cmDisP1"], - "distribution": eq["relDisType"], - "beta": eq["relDisP1"], - "eta": 0, - "parameters": {}, - "oh_duration": overhaul_duration - } + try: + eq_name_raw = eq["equipmentName"] + eq_name = eq_name_raw.strip().upper() + + log.debug("Processing equipment: %s", eq_name_raw) + + # ---- CUSTOM INPUT HAS PRIORITY ---- + if eq_name in custom_map: + custom_param = custom_map[eq_name] + + mttr = custom_param.get("mttr") + failure_rate = custom_param.get("failure_rate") + + if mttr is None or failure_rate is None: + log.warning( + "Custom input incomplete for %s, skipping", + eq_name_raw + ) continue - - # eq["cmDisP1"] = reliabiility.get("cmDisP1", 0) - # eq["relDisType"] = reliabiility.get("relDisType", "Fixed") - # eq["relDisP1"] = reliabiility.get("relDisP1", 0) - # eq["relDisP2"] = 100000 - # eq["ohDisP1"] = overhaul_duration - # eq["ohDisUnitCode"] = "UHour" - - # if eq["equipmentName"] in trip_eq: - eq["cmDisType"] = reliabiility.get("cmDisType", "Fixed") - eq["cmDisP1"] = reliabiility.get("cmDisP1", 0) - eq["cmDisP2"] = reliabiility.get("cmDisP2", 0) - eq["relDisType"] = reliabiility.get("relDisType", "Fixed") - eq["relDisP1"] = reliabiility.get("relDisP1", 0) - eq["relDisP2"] = reliabiility.get("relDisP2", 0) + + if failure_rate == 0: + MTBF = 100000 + else: + MTBF = 1e6 / float(failure_rate) + + eq["cmDisP1"] = mttr + eq["relDisType"] = custom_param.get("distribution", "Fixed") + + if eq["relDisType"] == "Fixed": + eq["relDisP1"] = MTBF + eq["relDisP2"] = 0 + else: + eq["relDisP1"] = 1 + eq["relDisP2"] = MTBF + eq["ohDisP1"] = overhaul_duration eq["ohDisUnitCode"] = "UHour" reqNodeInputs.append(eq) - results[eq["equipmentName"]] = { - "mttr": eq["cmDisP1"], + results[eq_name_raw] = { + "mttr": mttr, "distribution": eq["relDisType"], "beta": eq["relDisP1"], "eta": eq["relDisP2"], - "parameters": eq.get("parameters", {}), + "parameters": {}, "oh_duration": overhaul_duration } - except Exception as e: - print(f"Error fetching data for {eq['equipmentName']}: {e}") - # Add equipment with default values - reqNodeInputs.append(eq) - print("Updating equipment for simulation") - await update_node(db_session=db_session, equipment_nodes=reqNodeInputs, project_name=project_name) - print("Updated equipment for simulation") + continue # skip default logic - return results + # ---- PREVENT DUPLICATE DEFAULT INSERT ---- + if eq_name_raw in results: + continue + + reliability = reliability_data.get(eq_name_raw, {}) + + eq["cmDisType"] = reliability.get("cmDisType", "Fixed") + eq["cmDisP1"] = reliability.get("cmDisP1", 0) + eq["cmDisP2"] = reliability.get("cmDisP2", 0) + eq["relDisType"] = reliability.get("relDisType", "Fixed") + eq["relDisP1"] = reliability.get("relDisP1", 0) + eq["relDisP2"] = reliability.get("relDisP2", 0) + eq["ohDisP1"] = overhaul_duration + eq["ohDisUnitCode"] = "UHour" + + reqNodeInputs.append(eq) + results[eq_name_raw] = { + "mttr": eq["cmDisP1"], + "distribution": eq["relDisType"], + "beta": eq["relDisP1"], + "eta": eq["relDisP2"], + "parameters": eq.get("parameters", {}), + "oh_duration": overhaul_duration + } + except Exception as e: + log.exception( + "Error processing equipment %s", eq.get("equipmentName") + ) + reqNodeInputs.append(eq) + + await update_node( + db_session=db_session, + equipment_nodes=reqNodeInputs, + project_name=project_name + ) + + return results # Optimized individual fetch functions async def get_equipment_mttr(*, location_tag: str, client: httpx.AsyncClient) -> float: diff --git a/temporal/activity.py b/temporal/activity.py index ebf8e5f..0aa97fc 100644 --- a/temporal/activity.py +++ b/temporal/activity.py @@ -6,7 +6,7 @@ import redis.asyncio as redis AHM_BASE_URL = os.getenv("AHM_BASE_URL", "http://192.168.1.82:8000/ahm") AHM_SIMULATION_CALLBACK_URL = os.getenv("AHM_SIMULATION_CALLBACK_URL", "/api/v1/simulations/rbd/callback") -REDIS_HOST = os.getenv("REDIS_HOST", "http://192.168.1.82") +REDIS_HOST = os.getenv("REDIS_HOST", "192.168.1.82") REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))