You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

504 lines
18 KiB
Python

from typing import defaultdict, List, Optional, Union
from uuid import UUID
import logging
import httpx
from fastapi import HTTPException, status
from sqlalchemy import Delete, Select, func, desc, and_, text
from sqlalchemy.orm import selectinload
from src.auth.service import CurrentUser
from src.config import AEROS_BASE_URL, DEFAULT_PROJECT_NAME, RELIABILITY_SERVICE_API
from src.database.core import CollectorDbSession, DbSession
from src.database.service import search_filter_sort_paginate
from .model import AerosEquipment, AerosEquipmentDetail, MasterEquipment, AerosEquipmentGroup, ReliabilityPredictNonRepairable
from .schema import EquipmentConfiguration
from src.aeros_project.model import AerosProject
from src.aeros_simulation.model import AerosNode
import asyncio
import re
import requests
import json
from src.utils import save_to_pastebin
import pandas as pd
from src.aeros_simulation.service import get_aeros_schematic_by_name
client = httpx.AsyncClient(timeout=300.0)
log = logging.getLogger()
async def get_project(*, db_session: DbSession):
stmt = Select(AerosProject).order_by(desc(AerosProject.updated_at)).limit(1)
result = await db_session.execute(stmt)
found_record = result.scalar_one_or_none()
return found_record
async def get_all(*, common):
"""Returns all documents."""
query = Select(AerosEquipment).options(
selectinload(AerosEquipment.master_equipment)
)
results = await search_filter_sort_paginate(model=query, **common)
reg_nodes = [node.node_name for node in results["items"]]
equipment_data = {
node.node_name: node for node in results["items"]
}
project = await get_project(db_session=common["db_session"])
updateNodeReq = {"projectName": project.project_name , "equipmentNames": reg_nodes}
try:
response = await client.post(
f"{AEROS_BASE_URL}/api/UpdateDisParams/GetUpdatedNodeDistributions",
json=updateNodeReq,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
res = response.json()
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
)
results["items"] = [
{"AerosData": data, "MasterData": equipment_data.get(data["equipmentName"]) } for data in res
]
return results
async def get_equipment_by_location_tag(*, db_session: DbSession, location_tag: str):
query = (
Select(AerosEquipment)
.where(AerosEquipment.location_tag == location_tag)
.options(selectinload(AerosEquipment.aeros_equipment_details))
)
async def get_by_id(*, db_session: DbSession, id: UUID):
query = (
Select(AerosEquipment)
.where(AerosEquipment.id == id)
.options(selectinload(AerosEquipment.aeros_equipment_details))
)
result = await db_session.execute(query)
aerosEquipmentResult = result.scalar()
if not aerosEquipmentResult:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="AerosEquipment not found"
)
aerosNodeReq = {
"projectName": "ParallelNode",
"equipmentName": [aerosEquipmentResult.node_name],
}
try:
response = await client.post(
f"{AEROS_BASE_URL}/api/UpdateDisParams/GetUpdatedNodeDistributions",
json=aerosNodeReq,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
aerosEquipmentData = response.json()
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
)
return aerosEquipmentResult, aerosEquipmentData
async def get_aeros_equipment_by_location_tag(*, location_tag: Union[str, List[str]], project_name):
if isinstance(location_tag, str):
location_tag = [location_tag]
else:
location_tag = location_tag
aerosNodeReq = {
"projectName": project_name,
"equipmentNames": location_tag,
}
try:
response = await client.post(
f"{AEROS_BASE_URL}/api/UpdateDisParams/GetUpdatedNodeDistributions",
json=aerosNodeReq,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
aerosEquipmentData = response.json()
if not aerosEquipmentData:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="AerosEquipment not found"
)
df = pd.DataFrame(aerosEquipmentData)
return df.drop_duplicates().to_dict('records')
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
)
async def update_node(
*, db_session: DbSession, equipment_nodes: List[dict], project_name: str
):
updateNodeReq = {"projectName": project_name, "regNodeInputs": equipment_nodes}
try:
response = await client.post(
f"{AEROS_BASE_URL}/api/UpdateDisParams/UpdateEquipmentDistributions",
json=updateNodeReq,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
result = response.json()
return result
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
)
async def save_default_equipment(*, db_session: DbSession, project_name: str):
equipmments = Select(MasterEquipment).where(
MasterEquipment.location_tag.isnot(None)
)
equipment_nodes = await db_session.execute(equipmments)
reg_nodes = [node.location_tag for node in equipment_nodes.scalars().all()]
updateNodeReq = {"projectName": project_name, "equipmentNames": reg_nodes}
# Delete old data
query = Delete(AerosEquipment)
await db_session.execute(query)
try:
response = await client.post(
f"{AEROS_BASE_URL}/api/UpdateDisParams/GetUpdatedNodeDistributions",
json=updateNodeReq,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
results = response.json()
nodes = []
# raise Exception(results)
# save to db
for equipment in results:
node = AerosEquipment(
node_name=equipment["equipmentName"],
location_tag=equipment["equipmentName"],
)
nodes.append(node)
db_session.add_all(nodes)
await db_session.commit()
return results
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
)
def get_asset_batch(location_tags: List[str], nr_location_tags: List[str],
base_url: str = RELIABILITY_SERVICE_API,
timeout: int = 30)-> dict:
"""
Get asset batch data using GET request with JSON body.
Args:
location_tags: List of location tag strings
base_url: Base URL for the API
timeout: Request timeout in seconds
Returns:
Dictionary with response data or None if error
"""
url = f"{base_url}/asset/batch"
results = defaultdict(dict)
payload = {
"location_tags": location_tags
}
headers = {
'Content-Type': 'application/json'
}
try:
response = requests.get(
url,
json=payload,
headers=headers,
timeout=timeout
)
response.raise_for_status() # Raises an HTTPError for bad responses
data = response.json()
reliabiility_data = data.get("data", [])
print(reliabiility_data)
for item in reliabiility_data:
location_tag = item["location_tag"]
try:
is_nr = item["distribution"] != "NHPP"
mtbf = item["mtbf"]
mttr = item["mttr"]
distribution, reldisp1, reldisp2 = get_distribution(item)
results[location_tag]["cmDisType"] = "Normal"
results[location_tag]["cmDisP1"] = 6
results[location_tag]["cmDisP2"] = 3
results[location_tag]["relDisType"] = distribution
results[location_tag]["relDisP1"] = reldisp1
results[location_tag]["relDisP2"] = reldisp2
results[location_tag]["parameters"] = item.get("parameters", {})
except Exception as e:
raise Exception(f"Error processing item {location_tag}: {e}")
return results
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to fetch asset batch data " + str(e)
)
def get_distribution(item):
name = item["distribution"]
# Map distribution names to the expected format
if name == "Weibull-2P":
beta = item["parameters"].get("beta", 0)
alpha = item["parameters"].get("alpha", 0)
return "Weibull2", beta, alpha
elif name == "Weibull-3P":
beta = item["parameters"].get("beta", 0)
alpha = item["parameters"].get("alpha", 0)
return "Weibull3", beta, alpha
elif name == "Exponential-2P":
lambda_ = item["parameters"].get("Lambda", 0)
gamma = item["parameters"].get("gamma", 0)
return "Exponential2", lambda_, gamma
elif name == "NHPP":
beta = item["parameters"].get("beta", 0)
eta = item["parameters"].get("eta", 0)
return "NHPPTTFF", beta, eta
elif name == "Lognormal":
mu = item["parameters"].get("mu", 0)
sigma = item["parameters"].get("sigma", 0)
return "Lognormal", mu, sigma
elif name == "Normal":
mu = item["parameters"].get("mu", 0)
sigma = item["parameters"].get("sigma", 0)
return "Normal", mu, 1000
else:
return name, 0, 0
async def update_oh_interval_offset(*, aeros_db_session, overhaul_offset, overhaul_interval, project_name):
query = text("""
UPDATE public."RegularNodes" rn
SET "OHInterval" = :new_oh_interval,
"OHOffset" = :new_oh_offset
FROM public."Schematics" s
JOIN public."Projects" p
ON s."ProjectId" = p."ProjectId"
WHERE rn."SchematicId" = s."SchematicId"
AND p."ProjectName" = :project_name
""")
await aeros_db_session.execute(query, {
"new_oh_interval": overhaul_interval,
"new_oh_offset": overhaul_offset,
"project_name": project_name
})
await aeros_db_session.commit()
async def update_equipment_for_simulation(*, db_session: DbSession,aeros_db_session:CollectorDbSession ,project_name: str,simulation_duration:int ,overhaul_duration, overhaul_interval, offset ,schematic_name: str, custom_input: Optional[dict] = None):
log.info("Updating equipment for simulation")
aeros_schematic = await get_aeros_schematic_by_name(db_session=db_session, schematic_name=schematic_name)
equipments = Select(AerosEquipment).where(
AerosEquipment.location_tag.isnot(None)
).join(AerosEquipment.aeros_node).filter(and_(
AerosNode.aeros_schematic_id == aeros_schematic.id,
AerosNode.node_type == "RegularNode"
))
rbd_group = Select(AerosEquipmentGroup)
print("Getting equipment nodes")
equipment_nodes = await db_session.execute(equipments)
rbd_group_nodes = await db_session.execute(rbd_group)
group_nodes = [group.group_name for group in rbd_group_nodes.scalars().all()]
reg_nodes = [node.location_tag for node in equipment_nodes.scalars().all()]
reg_nodes.extend(group_nodes)
nodes_data = await get_aeros_equipment_by_location_tag(
location_tag=reg_nodes, project_name=project_name
)
def load_json_file(filename: str):
"""Load a JSON file and return its content."""
try:
with open(filename, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
raise FileNotFoundError(f"JSON file not found: {filename}")
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in file {filename}: {e}")
non_repairable = await db_session.execute(Select(ReliabilityPredictNonRepairable))
non_repairable_location_tags = [tag.location_tag for tag in non_repairable.scalars().all()]
print("Getting reliability data")
reliability_data = get_asset_batch(reg_nodes, non_repairable_location_tags, RELIABILITY_SERVICE_API)
reqNodeInputs = []
results = defaultdict()
trip_eq = load_json_file("src/aeros_equipment/trip_eq.json")
print("Updating Overhaul Offset & Overhaul Interval")
await update_oh_interval_offset(aeros_db_session=aeros_db_session, project_name=project_name,overhaul_interval=overhaul_interval, overhaul_offset=offset)
for eq in nodes_data:
try:
# Check if eq already exists in results
if eq["equipmentName"] in results.keys():
continue
reliabiility = reliability_data.get(eq["equipmentName"], {})
if custom_input and eq["equipmentName"] in custom_input:
custom_param = custom_input[eq["equipmentName"]]
if not custom_param["mttr"]:
continue
if not custom_param['failure_rate']:
continue
# Check eq dengan "TRIP" di WO, jika ada masukkan parameter Reliabilitu, jika tidak ada MTBF = duration
eq["cmDisP1"] = custom_param["mttr"]
eq["relDisType"] = "Fixed"
eq["relDisP1"] = float(custom_param["failure_rate"])
eq["relDisP2"] = 0
eq["ohDisP1"] = overhaul_duration
eq["ohDisUnitCode"] = "UHour"
reqNodeInputs.append(eq)
results[eq["equipmentName"]] = {
"mttr": eq["cmDisP1"],
"distribution": eq["relDisType"],
"beta": eq["relDisP1"],
"eta": 0,
"parameters": {},
"oh_duration": overhaul_duration
}
continue
eq["cmDisP1"] = reliabiility.get("cmDisP1", 0)
eq["relDisType"] = "Fixed"
eq["relDisP1"] = simulation_duration + offset + 1
eq["relDisP2"] = 0
eq["ohDisP1"] = overhaul_duration
eq["ohDisUnitCode"] = "UHour"
if eq["equipmentName"] in trip_eq:
eq["cmDisP1"] = reliabiility.get("cmDisP1", 0)
eq["relDisType"] = reliabiility.get("relDisType", "Fixed")
eq["relDisP1"] = reliabiility.get("relDisP1", 0)
eq["relDisP2"] = reliabiility.get("relDisP2", 0)
eq["ohDisP1"] = overhaul_duration
eq["ohDisUnitCode"] = "UHour"
reqNodeInputs.append(eq)
results[eq["equipmentName"]] = {
"mttr": eq["cmDisP1"],
"distribution": eq["relDisType"],
"beta": eq["relDisP1"],
"eta": eq["relDisP2"],
"parameters": eq.get("parameters", {}),
"oh_duration": overhaul_duration
}
except Exception as e:
print(f"Error fetching data for {eq['equipmentName']}: {e}")
# Add equipment with default values
reqNodeInputs.append(eq)
print("Updating equipment for simulation")
await update_node(db_session=db_session, equipment_nodes=reqNodeInputs, project_name=project_name)
print("Updated equipment for simulation")
return results
# Optimized individual fetch functions
async def get_equipment_mttr(*, location_tag: str, client: httpx.AsyncClient) -> float:
"""
Get MTTR for a single equipment using provided client
"""
mttr_url = f"{RELIABILITY_SERVICE_API}/asset/mttr/{location_tag}"
try:
response = await client.get(mttr_url)
if response.status_code == 200:
mttr_data = response.json()
return mttr_data.get("data", {}).get("hours", 0)
else:
log.warning(f"MTTR API returned status {response.status_code} for {location_tag}")
return 0
except Exception as e:
log.error(f"Error getting MTTR for {location_tag}: {str(e)}")
return 0
async def get_equipment_reliability_parameter(*, location_tag: str, client: httpx.AsyncClient):
"""
Get reliability parameters for a single equipment using provided client
"""
reliability_url = f"{RELIABILITY_SERVICE_API}/reliability/{location_tag}/current"
try:
response = await client.get(reliability_url)
if response.status_code == 200:
reliability_data = response.json()
data = reliability_data.get("data", {})
return {
"distribution": data.get("distribution", ""),
"beta": data.get("parameters", 0).get("beta", 0),
"eta": data.get("parameters", 0).get("eta", 0)
}
else:
log.warning(f"Reliability API returned status {response.status_code} for {location_tag}")
return {"distribution": "", "beta": 0, "eta": 0}
except Exception as e:
log.error(f"Error getting reliability parameters for {location_tag}: {str(e)}")
return {"distribution": "", "beta": 0, "eta": 0}