You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
508 lines
18 KiB
Python
508 lines
18 KiB
Python
from typing import defaultdict, List, Optional, Union
|
|
from uuid import UUID
|
|
import logging
|
|
import httpx
|
|
from fastapi import HTTPException, status
|
|
from sqlalchemy import Delete, Select, func, desc, and_, text
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from src.auth.service import CurrentUser
|
|
from src.config import AEROS_BASE_URL, DEFAULT_PROJECT_NAME, RELIABILITY_SERVICE_API
|
|
from src.database.core import CollectorDbSession, DbSession
|
|
from src.database.service import search_filter_sort_paginate
|
|
from .model import AerosEquipment, AerosEquipmentDetail, MasterEquipment, AerosEquipmentGroup, ReliabilityPredictNonRepairable
|
|
from .schema import EquipmentConfiguration
|
|
from src.aeros_project.model import AerosProject
|
|
from src.aeros_simulation.model import AerosNode
|
|
import asyncio
|
|
import re
|
|
import requests
|
|
import json
|
|
from src.utils import save_to_pastebin
|
|
import pandas as pd
|
|
from src.aeros_simulation.service import get_aeros_schematic_by_name
|
|
|
|
client = httpx.AsyncClient(timeout=300.0)
|
|
log = logging.getLogger()
|
|
|
|
async def get_project(*, db_session: DbSession):
|
|
stmt = Select(AerosProject).order_by(desc(AerosProject.updated_at)).limit(1)
|
|
result = await db_session.execute(stmt)
|
|
found_record = result.scalar_one_or_none()
|
|
|
|
return found_record
|
|
|
|
async def get_all(*, common):
|
|
"""Returns all documents."""
|
|
query = Select(AerosEquipment).options(
|
|
selectinload(AerosEquipment.master_equipment)
|
|
)
|
|
results = await search_filter_sort_paginate(model=query, **common)
|
|
reg_nodes = [node.node_name for node in results["items"]]
|
|
equipment_data = {
|
|
node.node_name: node for node in results["items"]
|
|
}
|
|
|
|
project = await get_project(db_session=common["db_session"])
|
|
|
|
updateNodeReq = {"projectName": project.project_name , "equipmentNames": reg_nodes}
|
|
|
|
try:
|
|
response = await client.post(
|
|
f"{AEROS_BASE_URL}/api/UpdateDisParams/GetUpdatedNodeDistributions",
|
|
json=updateNodeReq,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
res = response.json()
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
|
|
)
|
|
|
|
results["items"] = [
|
|
{"AerosData": data, "MasterData": equipment_data.get(data["equipmentName"]) } for data in res
|
|
]
|
|
|
|
return results
|
|
|
|
async def get_equipment_by_location_tag(*, db_session: DbSession, location_tag: str):
|
|
query = (
|
|
Select(AerosEquipment)
|
|
.where(AerosEquipment.location_tag == location_tag)
|
|
.options(selectinload(AerosEquipment.aeros_equipment_details))
|
|
)
|
|
|
|
|
|
|
|
async def get_by_id(*, db_session: DbSession, id: UUID):
|
|
query = (
|
|
Select(AerosEquipment)
|
|
.where(AerosEquipment.id == id)
|
|
.options(selectinload(AerosEquipment.aeros_equipment_details))
|
|
)
|
|
result = await db_session.execute(query)
|
|
aerosEquipmentResult = result.scalar()
|
|
|
|
if not aerosEquipmentResult:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="AerosEquipment not found"
|
|
)
|
|
|
|
aerosNodeReq = {
|
|
"projectName": "ParallelNode",
|
|
"equipmentName": [aerosEquipmentResult.node_name],
|
|
}
|
|
|
|
try:
|
|
response = await client.post(
|
|
f"{AEROS_BASE_URL}/api/UpdateDisParams/GetUpdatedNodeDistributions",
|
|
json=aerosNodeReq,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
response.raise_for_status()
|
|
aerosEquipmentData = response.json()
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
|
|
)
|
|
|
|
return aerosEquipmentResult, aerosEquipmentData
|
|
|
|
|
|
async def get_aeros_equipment_by_location_tag(*, location_tag: Union[str, List[str]], project_name):
|
|
if isinstance(location_tag, str):
|
|
location_tag = [location_tag]
|
|
else:
|
|
location_tag = location_tag
|
|
|
|
aerosNodeReq = {
|
|
"projectName": project_name,
|
|
"equipmentNames": location_tag,
|
|
}
|
|
|
|
try:
|
|
response = await client.post(
|
|
f"{AEROS_BASE_URL}/api/UpdateDisParams/GetUpdatedNodeDistributions",
|
|
json=aerosNodeReq,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
response.raise_for_status()
|
|
aerosEquipmentData = response.json()
|
|
|
|
|
|
if not aerosEquipmentData:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="AerosEquipment not found"
|
|
)
|
|
|
|
response.raise_for_status()
|
|
|
|
df = pd.DataFrame(aerosEquipmentData)
|
|
|
|
return df.drop_duplicates().to_dict('records')
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
|
|
)
|
|
|
|
|
|
async def update_node(
|
|
*, db_session: DbSession, equipment_nodes: List[dict], project_name: str
|
|
):
|
|
updateNodeReq = {"projectName": project_name, "regNodeInputs": equipment_nodes}
|
|
|
|
|
|
try:
|
|
response = await client.post(
|
|
f"{AEROS_BASE_URL}/api/UpdateDisParams/UpdateEquipmentDistributions",
|
|
json=updateNodeReq,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
result = response.json()
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
|
|
)
|
|
|
|
|
|
async def save_default_equipment(*, db_session: DbSession, project_name: str):
|
|
equipmments = Select(MasterEquipment).where(
|
|
MasterEquipment.location_tag.isnot(None)
|
|
)
|
|
equipment_nodes = await db_session.execute(equipmments)
|
|
|
|
reg_nodes = [node.location_tag for node in equipment_nodes.scalars().all()]
|
|
|
|
updateNodeReq = {"projectName": project_name, "equipmentNames": reg_nodes}
|
|
|
|
# Delete old data
|
|
query = Delete(AerosEquipment)
|
|
await db_session.execute(query)
|
|
|
|
try:
|
|
response = await client.post(
|
|
f"{AEROS_BASE_URL}/api/UpdateDisParams/GetUpdatedNodeDistributions",
|
|
json=updateNodeReq,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
results = response.json()
|
|
|
|
nodes = []
|
|
|
|
# raise Exception(results)
|
|
|
|
# save to db
|
|
for equipment in results:
|
|
node = AerosEquipment(
|
|
node_name=equipment["equipmentName"],
|
|
location_tag=equipment["equipmentName"],
|
|
)
|
|
|
|
nodes.append(node)
|
|
|
|
db_session.add_all(nodes)
|
|
await db_session.commit()
|
|
|
|
return results
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
|
|
)
|
|
|
|
|
|
def get_asset_batch(location_tags: List[str], nr_location_tags: List[str],
|
|
base_url: str = RELIABILITY_SERVICE_API,
|
|
timeout: int = 30)-> dict:
|
|
"""
|
|
Get asset batch data using GET request with JSON body.
|
|
|
|
Args:
|
|
location_tags: List of location tag strings
|
|
base_url: Base URL for the API
|
|
timeout: Request timeout in seconds
|
|
|
|
Returns:
|
|
Dictionary with response data or None if error
|
|
"""
|
|
url = f"{base_url}/asset/batch"
|
|
results = defaultdict(dict)
|
|
|
|
payload = {
|
|
"location_tags": location_tags
|
|
}
|
|
|
|
headers = {
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
try:
|
|
response = requests.get(
|
|
url,
|
|
json=payload,
|
|
headers=headers,
|
|
timeout=timeout
|
|
)
|
|
|
|
response.raise_for_status() # Raises an HTTPError for bad responses
|
|
data = response.json()
|
|
reliabiility_data = data.get("data", [])
|
|
|
|
for item in reliabiility_data:
|
|
location_tag = item.get("location_tag", None)
|
|
|
|
if not location_tag:
|
|
print("Error processing item" + str(item))
|
|
continue
|
|
|
|
try:
|
|
is_nr = item["distribution"] != "NHPP"
|
|
mtbf = item["mtbf"]
|
|
mttr = item["mttr"]
|
|
distribution, reldisp1, reldisp2 = get_distribution(item)
|
|
|
|
results[location_tag]["cmDisType"] = "Normal"
|
|
results[location_tag]["cmDisP1"] = 6
|
|
results[location_tag]["cmDisP2"] = 3
|
|
results[location_tag]["relDisType"] = distribution
|
|
results[location_tag]["relDisP1"] = reldisp1
|
|
results[location_tag]["relDisP2"] = reldisp2
|
|
results[location_tag]["parameters"] = item.get("parameters", {})
|
|
except Exception as e:
|
|
raise Exception(f"Error processing item {location_tag}: {e}")
|
|
return results
|
|
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to fetch asset batch data " + str(e)
|
|
)
|
|
|
|
|
|
def get_distribution(item):
|
|
name = item["distribution"]
|
|
# Map distribution names to the expected format
|
|
if name == "Weibull-2P":
|
|
beta = item["parameters"].get("beta", 0)
|
|
alpha = item["parameters"].get("alpha", 0)
|
|
return "Weibull2", beta, alpha
|
|
elif name == "Weibull-3P":
|
|
beta = item["parameters"].get("beta", 0)
|
|
alpha = item["parameters"].get("alpha", 0)
|
|
return "Weibull3", beta, alpha
|
|
elif name == "Exponential-2P":
|
|
lambda_ = item["parameters"].get("Lambda", 0)
|
|
gamma = item["parameters"].get("gamma", 0)
|
|
return "Exponential2", lambda_, gamma
|
|
elif name == "NHPP":
|
|
beta = item["parameters"].get("beta", 0)
|
|
eta = item["parameters"].get("eta", 0)
|
|
return "NHPPTTFF", beta, eta
|
|
elif name == "Lognormal":
|
|
mu = item["parameters"].get("mu", 0)
|
|
sigma = item["parameters"].get("sigma", 0)
|
|
return "Lognormal", mu, sigma
|
|
elif name == "Normal":
|
|
mu = item["parameters"].get("mu", 0)
|
|
sigma = item["parameters"].get("sigma", 0)
|
|
return "Normal", mu, 1000
|
|
else:
|
|
return "NHPPTTFF", 1, 100000
|
|
|
|
async def update_oh_interval_offset(*, aeros_db_session, overhaul_offset, overhaul_interval, project_name):
|
|
query = text("""
|
|
UPDATE public."RegularNodes" rn
|
|
SET "OHInterval" = :new_oh_interval,
|
|
"OHOffset" = :new_oh_offset
|
|
FROM public."Schematics" s
|
|
JOIN public."Projects" p
|
|
ON s."ProjectId" = p."ProjectId"
|
|
WHERE rn."SchematicId" = s."SchematicId"
|
|
AND p."ProjectName" = :project_name
|
|
""")
|
|
|
|
await aeros_db_session.execute(query, {
|
|
"new_oh_interval": overhaul_interval,
|
|
"new_oh_offset": overhaul_offset,
|
|
"project_name": project_name
|
|
})
|
|
|
|
await aeros_db_session.commit()
|
|
|
|
|
|
async def update_equipment_for_simulation(*, db_session: DbSession,aeros_db_session:CollectorDbSession ,project_name: str,simulation_duration:int ,overhaul_duration, overhaul_interval, offset ,schematic_name: str, custom_input: Optional[dict] = None):
|
|
log.info("Updating equipment for simulation")
|
|
|
|
aeros_schematic = await get_aeros_schematic_by_name(db_session=db_session, schematic_name=schematic_name)
|
|
|
|
equipments = Select(AerosEquipment).where(
|
|
AerosEquipment.location_tag.isnot(None)
|
|
).join(AerosEquipment.aeros_node).filter(and_(
|
|
AerosNode.aeros_schematic_id == aeros_schematic.id,
|
|
AerosNode.node_type == "RegularNode"
|
|
))
|
|
|
|
rbd_group = Select(AerosEquipmentGroup)
|
|
|
|
print("Getting equipment nodes")
|
|
|
|
equipment_nodes = await db_session.execute(equipments)
|
|
rbd_group_nodes = await db_session.execute(rbd_group)
|
|
|
|
group_nodes = [group.group_name for group in rbd_group_nodes.scalars().all()]
|
|
reg_nodes = [node.location_tag for node in equipment_nodes.scalars().all()]
|
|
|
|
reg_nodes.extend(group_nodes)
|
|
|
|
nodes_data = await get_aeros_equipment_by_location_tag(
|
|
location_tag=reg_nodes, project_name=project_name
|
|
)
|
|
|
|
def load_json_file(filename: str):
|
|
"""Load a JSON file and return its content."""
|
|
try:
|
|
with open(filename, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except FileNotFoundError:
|
|
raise FileNotFoundError(f"JSON file not found: {filename}")
|
|
except json.JSONDecodeError as e:
|
|
raise ValueError(f"Invalid JSON in file {filename}: {e}")
|
|
|
|
non_repairable = await db_session.execute(Select(ReliabilityPredictNonRepairable))
|
|
non_repairable_location_tags = [tag.location_tag for tag in non_repairable.scalars().all()]
|
|
|
|
print("Getting reliability data")
|
|
reliability_data = get_asset_batch(reg_nodes, non_repairable_location_tags, RELIABILITY_SERVICE_API)
|
|
|
|
reqNodeInputs = []
|
|
results = defaultdict()
|
|
|
|
trip_eq = load_json_file("src/aeros_equipment/trip_eq.json")
|
|
|
|
print("Updating Overhaul Offset & Overhaul Interval")
|
|
await update_oh_interval_offset(aeros_db_session=aeros_db_session, project_name=project_name,overhaul_interval=overhaul_interval, overhaul_offset=offset)
|
|
|
|
for eq in nodes_data:
|
|
try:
|
|
# Check if eq already exists in results
|
|
if eq["equipmentName"] in results.keys():
|
|
continue
|
|
|
|
|
|
reliabiility = reliability_data.get(eq["equipmentName"], {})
|
|
|
|
if custom_input and eq["equipmentName"] in custom_input:
|
|
custom_param = custom_input[eq["equipmentName"]]
|
|
|
|
if not custom_param["mttr"]:
|
|
continue
|
|
if not custom_param['failure_rate']:
|
|
continue
|
|
|
|
# Check eq dengan "TRIP" di WO, jika ada masukkan parameter Reliabilitu, jika tidak ada MTBF = duration
|
|
|
|
eq["cmDisP1"] = custom_param["mttr"]
|
|
eq["relDisType"] = "Fixed"
|
|
eq["relDisP1"] = float(custom_param["failure_rate"])
|
|
eq["relDisP2"] = 0
|
|
eq["ohDisP1"] = overhaul_duration
|
|
eq["ohDisUnitCode"] = "UHour"
|
|
|
|
reqNodeInputs.append(eq)
|
|
results[eq["equipmentName"]] = {
|
|
"mttr": eq["cmDisP1"],
|
|
"distribution": eq["relDisType"],
|
|
"beta": eq["relDisP1"],
|
|
"eta": 0,
|
|
"parameters": {},
|
|
"oh_duration": overhaul_duration
|
|
}
|
|
continue
|
|
|
|
eq["cmDisP1"] = reliabiility.get("cmDisP1", 0)
|
|
eq["relDisType"] = reliabiility.get("relDisType", "Fixed")
|
|
eq["relDisP1"] = reliabiility.get("relDisP1", 0)
|
|
eq["relDisP2"] = 150000
|
|
eq["ohDisP1"] = overhaul_duration
|
|
eq["ohDisUnitCode"] = "UHour"
|
|
|
|
if eq["equipmentName"] in trip_eq:
|
|
eq["cmDisP1"] = reliabiility.get("cmDisP1", 0)
|
|
eq["relDisType"] = reliabiility.get("relDisType", "Fixed")
|
|
eq["relDisP1"] = reliabiility.get("relDisP1", 0)
|
|
eq["relDisP2"] = reliabiility.get("relDisP2", 0)
|
|
eq["ohDisP1"] = overhaul_duration
|
|
eq["ohDisUnitCode"] = "UHour"
|
|
|
|
reqNodeInputs.append(eq)
|
|
results[eq["equipmentName"]] = {
|
|
"mttr": eq["cmDisP1"],
|
|
"distribution": eq["relDisType"],
|
|
"beta": eq["relDisP1"],
|
|
"eta": eq["relDisP2"],
|
|
"parameters": eq.get("parameters", {}),
|
|
"oh_duration": overhaul_duration
|
|
}
|
|
except Exception as e:
|
|
print(f"Error fetching data for {eq['equipmentName']}: {e}")
|
|
# Add equipment with default values
|
|
reqNodeInputs.append(eq)
|
|
|
|
print("Updating equipment for simulation")
|
|
await update_node(db_session=db_session, equipment_nodes=reqNodeInputs, project_name=project_name)
|
|
print("Updated equipment for simulation")
|
|
|
|
return results
|
|
|
|
|
|
# Optimized individual fetch functions
|
|
async def get_equipment_mttr(*, location_tag: str, client: httpx.AsyncClient) -> float:
|
|
"""
|
|
Get MTTR for a single equipment using provided client
|
|
"""
|
|
mttr_url = f"{RELIABILITY_SERVICE_API}/asset/mttr/{location_tag}"
|
|
try:
|
|
response = await client.get(mttr_url)
|
|
if response.status_code == 200:
|
|
mttr_data = response.json()
|
|
return mttr_data.get("data", {}).get("hours", 0)
|
|
else:
|
|
log.warning(f"MTTR API returned status {response.status_code} for {location_tag}")
|
|
return 0
|
|
except Exception as e:
|
|
log.error(f"Error getting MTTR for {location_tag}: {str(e)}")
|
|
return 0
|
|
|
|
async def get_equipment_reliability_parameter(*, location_tag: str, client: httpx.AsyncClient):
|
|
"""
|
|
Get reliability parameters for a single equipment using provided client
|
|
"""
|
|
reliability_url = f"{RELIABILITY_SERVICE_API}/reliability/{location_tag}/current"
|
|
try:
|
|
response = await client.get(reliability_url)
|
|
if response.status_code == 200:
|
|
reliability_data = response.json()
|
|
data = reliability_data.get("data", {})
|
|
return {
|
|
"distribution": data.get("distribution", ""),
|
|
"beta": data.get("parameters", 0).get("beta", 0),
|
|
"eta": data.get("parameters", 0).get("eta", 0)
|
|
}
|
|
else:
|
|
log.warning(f"Reliability API returned status {response.status_code} for {location_tag}")
|
|
return {"distribution": "", "beta": 0, "eta": 0}
|
|
except Exception as e:
|
|
log.error(f"Error getting reliability parameters for {location_tag}: {str(e)}")
|
|
return {"distribution": "", "beta": 0, "eta": 0}
|