You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

389 lines
12 KiB
Python

from typing import defaultdict, List, Optional, Union
from uuid import UUID
import logging
import httpx
from fastapi import HTTPException, status
from sqlalchemy import Delete, Select, func, desc, and_
from sqlalchemy.orm import selectinload
from src.auth.service import CurrentUser
from src.config import AEROS_BASE_URL, DEFAULT_PROJECT_NAME
from src.database.core import DbSession
from src.database.service import search_filter_sort_paginate
from .model import AerosEquipment, AerosEquipmentDetail, MasterEquipment
from .schema import EquipmentConfiguration
from src.aeros_project.model import AerosProject
from src.aeros_simulation.model import AerosNode
import asyncio
import re
import requests
import json
from src.utils import save_to_pastebin
import pandas as pd
from src.aeros_simulation.service import get_aeros_schematic_by_name
client = httpx.AsyncClient(timeout=300.0)
log = logging.getLogger()
async def get_project(*, db_session: DbSession):
stmt = Select(AerosProject).order_by(desc(AerosProject.updated_at)).limit(1)
result = await db_session.execute(stmt)
found_record = result.scalar_one_or_none()
return found_record
async def get_all(*, common):
"""Returns all documents."""
query = Select(AerosEquipment).options(
selectinload(AerosEquipment.master_equipment)
)
results = await search_filter_sort_paginate(model=query, **common)
reg_nodes = [node.node_name for node in results["items"]]
equipment_data = {
node.node_name: node for node in results["items"]
}
project = await get_project(db_session=common["db_session"])
updateNodeReq = {"projectName": project.project_name , "equipmentNames": reg_nodes}
try:
response = await client.post(
f"{AEROS_BASE_URL}/api/UpdateDisParams/GetUpdatedNodeDistributions",
json=updateNodeReq,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
res = response.json()
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
)
results["items"] = [
{"AerosData": data, "MasterData": equipment_data.get(data["equipmentName"]) } for data in res
]
return results
async def get_equipment_by_location_tag(*, db_session: DbSession, location_tag: str):
query = (
Select(AerosEquipment)
.where(AerosEquipment.location_tag == location_tag)
.options(selectinload(AerosEquipment.aeros_equipment_details))
)
async def get_by_id(*, db_session: DbSession, id: UUID):
query = (
Select(AerosEquipment)
.where(AerosEquipment.id == id)
.options(selectinload(AerosEquipment.aeros_equipment_details))
)
result = await db_session.execute(query)
aerosEquipmentResult = result.scalar()
if not aerosEquipmentResult:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="AerosEquipment not found"
)
aerosNodeReq = {
"projectName": "ParallelNode",
"equipmentName": [aerosEquipmentResult.node_name],
}
try:
response = await client.post(
f"{AEROS_BASE_URL}/api/UpdateDisParams/GetUpdatedNodeDistributions",
json=aerosNodeReq,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
aerosEquipmentData = response.json()
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
)
return aerosEquipmentResult, aerosEquipmentData
async def get_aeros_equipment_by_location_tag(*, location_tag: Union[str, List[str]], project_name):
if isinstance(location_tag, str):
location_tag = [location_tag]
else:
location_tag = location_tag
aerosNodeReq = {
"projectName": project_name,
"equipmentNames": location_tag,
}
try:
response = await client.post(
f"{AEROS_BASE_URL}/api/UpdateDisParams/GetUpdatedNodeDistributions",
json=aerosNodeReq,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
aerosEquipmentData = response.json()
if not aerosEquipmentData:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="AerosEquipment not found"
)
df = pd.DataFrame(aerosEquipmentData)
return df.drop_duplicates().to_dict('records')
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
)
async def update_node(
*, db_session: DbSession, equipment_nodes: List[dict], project_name: str
):
updateNodeReq = {"projectName": project_name, "regNodeInputs": equipment_nodes}
try:
response = await client.post(
f"{AEROS_BASE_URL}/api/UpdateDisParams/UpdateEquipmentDistributions",
json=updateNodeReq,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
result = response.json()
return result
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
)
async def save_default_equipment(*, db_session: DbSession, project_name: str):
equipmments = Select(MasterEquipment).where(
MasterEquipment.location_tag.isnot(None)
)
equipment_nodes = await db_session.execute(equipmments)
reg_nodes = [node.location_tag for node in equipment_nodes.scalars().all()]
updateNodeReq = {"projectName": project_name, "equipmentNames": reg_nodes}
# Delete old data
query = Delete(AerosEquipment)
await db_session.execute(query)
try:
response = await client.post(
f"{AEROS_BASE_URL}/api/UpdateDisParams/GetUpdatedNodeDistributions",
json=updateNodeReq,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
results = response.json()
nodes = []
# raise Exception(results)
# save to db
for equipment in results:
node = AerosEquipment(
node_name=equipment["equipmentName"],
location_tag=equipment["equipmentName"],
)
nodes.append(node)
db_session.add_all(nodes)
await db_session.commit()
return results
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
)
def get_asset_batch(location_tags: List[str],
base_url: str = "http://192.168.1.82:8000",
timeout: int = 30)-> dict:
"""
Get asset batch data using GET request with JSON body.
Args:
location_tags: List of location tag strings
base_url: Base URL for the API
timeout: Request timeout in seconds
Returns:
Dictionary with response data or None if error
"""
url = f"{base_url}/reliability/asset/batch"
results = defaultdict(dict)
payload = {
"location_tags": location_tags
}
headers = {
'Content-Type': 'application/json'
}
try:
response = requests.get(
url,
json=payload,
headers=headers,
timeout=timeout
)
response.raise_for_status() # Raises an HTTPError for bad responses
data = response.json()
reliabiility_data = data.get("data", [])
for item in reliabiility_data:
location_tag = item["location_tag"]
results[location_tag]["cmDisP1"] = item.get("mttr", 0)
results[location_tag]["relDisType"] = item["distribution"]
results[location_tag]["relDisP1"] = item.get("parameters", 0).get("beta", 0)
results[location_tag]["relDisP2"] = item.get("parameters", 0).get("eta", 0)
return results
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to fetch asset batch data " + str(e)
)
async def update_equipment_for_simulation(*, db_session: DbSession, project_name: str, schematic_name: str, custom_input: Optional[dict] = None):
log.info("Updating equipment for simulation")
aeros_schematic = await get_aeros_schematic_by_name(db_session=db_session, schematic_name=schematic_name)
equipments = Select(AerosEquipment).where(
AerosEquipment.location_tag.isnot(None)
).join(AerosEquipment.aeros_node).filter(and_(
AerosNode.aeros_schematic_id == aeros_schematic.id,
AerosNode.node_type == "RegularNode"
))
print("Getting equipment nodes")
equipment_nodes = await db_session.execute(equipments)
reg_nodes = [node.location_tag for node in equipment_nodes.scalars().all()]
nodes_data = await get_aeros_equipment_by_location_tag(
location_tag=reg_nodes, project_name=project_name
)
print("Getting reliability data")
reliability_data = get_asset_batch(reg_nodes)
reqNodeInputs = []
results = defaultdict()
for eq in nodes_data:
try:
reliabiility = reliability_data.get(eq["equipmentName"], {})
if custom_input and eq["equipmentName"] in custom_input:
eq["cmDisP1"] = reliabiility.get("cmDisP1", 0)
eq["relDisType"] = "Fixed"
eq["relDisP1"] = float(custom_input[eq["equipmentName"]])
eq["relDisP2"] = 0
reqNodeInputs.append(eq)
results[eq["equipmentName"]] = {
"mttr": eq["cmDisP1"],
"distribution": eq["relDisType"],
"beta": eq["relDisP1"],
"eta": 0
}
continue
eq["cmDisP1"] = reliabiility.get("cmDisP1", 0)
eq["relDisType"] = "NHPPTTFF"
eq["relDisP1"] = reliabiility.get("relDisP1", 0)
eq["relDisP2"] = reliabiility.get("relDisP2", 0)
reqNodeInputs.append(eq)
results[eq["equipmentName"]] = {
"mttr": eq["cmDisP1"],
"distribution": eq["relDisType"],
"beta": eq["relDisP1"],
"eta": eq["relDisP2"]
}
except Exception as e:
print(f"Error fetching data for {eq['equipmentName']}: {e}")
# Add equipment with default values
reqNodeInputs.append(eq)
print("Updating equipment for simulation")
await update_node(db_session=db_session, equipment_nodes=reqNodeInputs, project_name=project_name)
print("Updated equipment for simulation")
return results
# Optimized individual fetch functions
async def get_equipment_mttr(*, location_tag: str, client: httpx.AsyncClient) -> float:
"""
Get MTTR for a single equipment using provided client
"""
mttr_url = f"http://192.168.1.82:8000/reliability/asset/mttr/{location_tag}"
try:
response = await client.get(mttr_url)
if response.status_code == 200:
mttr_data = response.json()
return mttr_data.get("data", {}).get("hours", 0)
else:
log.warning(f"MTTR API returned status {response.status_code} for {location_tag}")
return 0
except Exception as e:
log.error(f"Error getting MTTR for {location_tag}: {str(e)}")
return 0
async def get_equipment_reliability_parameter(*, location_tag: str, client: httpx.AsyncClient):
"""
Get reliability parameters for a single equipment using provided client
"""
reliability_url = f"http://192.168.1.82:8000/reliability/reliability/{location_tag}/current"
try:
response = await client.get(reliability_url)
if response.status_code == 200:
reliability_data = response.json()
data = reliability_data.get("data", {})
return {
"distribution": data.get("distribution", ""),
"beta": data.get("parameters", 0).get("beta", 0),
"eta": data.get("parameters", 0).get("eta", 0)
}
else:
log.warning(f"Reliability API returned status {response.status_code} for {location_tag}")
return {"distribution": "", "beta": 0, "eta": 0}
except Exception as e:
log.error(f"Error getting reliability parameters for {location_tag}: {str(e)}")
return {"distribution": "", "beta": 0, "eta": 0}