feature/reliability_stat
Cizz22 4 months ago
parent a650b72c19
commit 1851673e72

@ -24,7 +24,7 @@ async def get_target_reliability(
"""Get all scope pagination."""
results = await get_simulation_results(
simulation_id = "f31103ef-1ac8-4c29-8f66-ea9ccf06bd87",
simulation_id = "efa8ef4c-0417-4d2d-95f3-41e4283737ab",
token=token
)

@ -6,6 +6,7 @@ from uuid import UUID
from sqlalchemy import Delete, Select
from src.auth.service import CurrentUser
from src.contribution_util import calculate_contribution_accurate
from src.database.core import CollectorDbSession, DbSession
# from src.scope_equipment.model import ScopeEquipment
# from src.scope_equipment.service import get_by_scope_name
@ -56,44 +57,48 @@ async def get_all_budget_constrains(
"location_tag": equipment.location_tag,
"name": equipment.equipment_name,
"total_cost": equipment.overhaul_cost + equipment.service_cost,
"eaf_contribution": equipments_eaf_contribution.get(equipment.location_tag, 0)
"eaf_contribution": equipments_eaf_contribution.get(equipment.location_tag, 0),
#'cost_benefit_ratio': (equipment.overhaul_cost + equipment.service_cost) / equipments_eaf_contribution.get(equipment.location_tag, 0) if equipments_eaf_contribution.get(equipment.location_tag, 0) > 0 else 0
}
for equipment in equipments
]
# Calculate composite priority score for fair sorting
max_cost = max(eq["total_cost"] for eq in result) if result else 1
result.sort(key=lambda x: x['eaf_contribution'], reverse=True)
for equipment in result:
# Normalize cost (0-1) - higher cost = higher priority
normalized_cost = equipment["total_cost"] / max_cost if max_cost > 0 else 0
# Composite score: 70% EAF contribution + 30% cost impact
# EAF contribution is already relative, so use directly
equipment["priority_score"] = (0.7 * equipment["eaf_contribution"]) + (0.3 * normalized_cost)
# Sort by composite priority score (highest to lowest)
result.sort(key=lambda x: x["priority_score"], reverse=True)
# Filter equipment up to threshold
cumulative_cost = 0
included_results = []
priority_list = []
total_cost = 0
remaining_budget = cost_threshold
for equipment in result:
cumulative_cost += equipment["total_cost"]
if cumulative_cost >= cost_threshold:
# # Normalize cost (0-1) - higher cost = higher priority
# normalized_cost = equipment["total_cost"] / max_cost if max_cost > 0 else 0
# # Composite score: 70% EAF contribution + 30% cost impact
# # EAF contribution is already relative, so use directly
# equipment["priority_score"] = (0.7 * equipment["eaf_contribution"]) + (0.3 * normalized_cost)
if equipment['total_cost'] <= remaining_budget:
# We can afford this improvement, so add it to the plan
priority_list.append(equipment)
total_cost += equipment['total_cost']
remaining_budget -= equipment['total_cost']
else:
# This candidate is too expensive for the remaining budget
# We break out of the loop. Since the list is sorted by ratio,
# anything after this is worse value and also won't fit.
# In a more complex solution, you might skip and keep looking for smaller items.
break
included_results.append(equipment)
# Rest equipment is consequence list
consequence_results = result[len(included_results):]
# Sort by composite priority score (highest to lowest)
# result.sort(key=lambda x: x["priority_score"], reverse=True)
selected_components = {item['location_tag'] for item in priority_list}
consequence_list = [candidate for candidate in result if candidate['location_tag'] not in selected_components]
consequence_list.sort(key=lambda x: x['eaf_contribution'], reverse=True)
priority_list.sort(key=lambda x: x['eaf_contribution'], reverse=True)
#Sort
consequence_results.sort(key=lambda x: x["eaf_contribution"], reverse=True)
included_results.sort(key=lambda x: x["eaf_contribution"], reverse=True)
return included_results, consequence_results
return priority_list, consequence_list
#
@ -104,6 +109,12 @@ def calculate_asset_eaf_contributions(plant_result, eq_results):
"""
results = defaultdict(float)
# availabilities = {asset.get('aeros_node').get('node_name'): asset.get('availability')
# for asset in eq_results}
# importance_results = calculate_contribution_accurate(availabilities, "src/calculation_target_reliability/result.json")
for asset in eq_results:
results[asset['aeros_node']['node_name']] = asset['contribution']

File diff suppressed because it is too large Load Diff

@ -59,7 +59,7 @@ async def get_target_reliability(
# )
if not simulation_id:
simulation_id = "f31103ef-1ac8-4c29-8f66-ea9ccf06bd87"
simulation_id = "efa8ef4c-0417-4d2d-95f3-41e4283737ab"
results = await get_simulation_results(
simulation_id=simulation_id,

@ -33,8 +33,9 @@ class OverhaulRead(OverhaulBase):
class AssetWeight(OverhaulBase):
node: dict
availability:float
contribution: float
eaf_impact: float
required_improvement: float
num_of_failures: int
down_time: float

@ -3,6 +3,7 @@ from dataclasses import dataclass
from sqlalchemy import Delete, Select
import httpx
from src.auth.service import CurrentUser
from src.contribution_util import calculate_contribution, calculate_contribution_accurate
from src.database.core import DbSession, CollectorDbSession
from datetime import datetime, timedelta
import random
@ -47,9 +48,9 @@ async def get_simulation_results(*, simulation_id: str, token: str):
"Content-Type": "application/json"
}
calc_result_url = f"{RBD_SERVICE_API}/aeros/simulation/result/calc/{simulation_id}?nodetype=RegularNode"
calc_result_url = f"{RBD_SERVICE_API}/aeros/simulation/result/calc/default?nodetype=RegularNode"
# plot_result_url = f"{RBD_SERVICE_API}/aeros/simulation/result/plot/{simulation_id}?nodetype=RegularNode"
calc_plant_result = f"{RBD_SERVICE_API}/aeros/simulation/result/calc/{simulation_id}/plant"
calc_plant_result = f"{RBD_SERVICE_API}/aeros/simulation/result/calc/default/plant"
async with httpx.AsyncClient(timeout=300.0) as client:
calc_task = client.get(calc_result_url, headers=headers)
@ -104,43 +105,85 @@ async def get_simulation_results(*, simulation_id: str, token: str):
# return results
def calculate_asset_eaf_contributions(plant_result, eq_results, standard_scope):
def calculate_asset_eaf_contributions(plant_result, eq_results, standard_scope, eaf_gap):
"""
Calculate each asset's negative contribution to plant EAF
Higher contribution = more impact on reducing plant EAF
Calculate each asset's contribution to plant EAF with realistic improvement potential.
Higher contribution = more impact on improving plant EAF
"""
plant_production = plant_result.get('total_downtime', 0)
plant_eaf = plant_result.get('eaf')
# Convert EAF gap from percentage to fraction if needed
# Assuming eaf_gap is a percentage (e.g., 1.0 for 1%), convert to fraction
eaf_gap_fraction = eaf_gap / 100.0 if eaf_gap > 1.0 else eaf_gap
results = []
filtered_assets = [] # To track assets that were filtered out
# # Get availabilities and calculate importance
# availabilities = {asset.get('aeros_node').get('node_name'): asset.get('availability')
# for asset in eq_results}
# importance_results = calculate_contribution_accurate(availabilities, "src/calculation_target_reliability/result.json")
# Define realistic thresholds
MIN_BIRNBAUM_IMPORTANCE = 0.0005 # Filter out components with very low impact
REALISTIC_MAX_AVAILABILITY = 0.995 # 99.5% practical maximum
MIN_IMPROVEMENT_PERCENT = 0.05 # Minimum improvement to consider (0.5%)
min_improvement_fraction = MIN_IMPROVEMENT_PERCENT / 100.0
for asset in eq_results:
# # Weight based on production capacity (just for seri)
# capacity_weight = asset.get('total_downtime', 0) / plant_production if plant_production > 0 else 0
if asset.get('aeros_node').get('node_name') not in standard_scope:
asset_name = asset.get('aeros_node').get('node_name')
# Skip if not in standard scope
if asset_name not in standard_scope:
continue
# # Get asset EAF and downtime
plant_eaf_minus = 100 - plant_eaf
birnbaum = asset.get('contribution')
current_availability = asset.get('availability')
# Calculate required improvement
required_impr = eaf_gap_fraction / birnbaum if birnbaum > 0 else 0
# # CHECK FILTERS - Is this asset worth considering?
# filter_reason = None
# # Filter 1: Is the component important enough?
# if birnbaum < MIN_BIRNBAUM_IMPORTANCE:
# filter_reason = f"Low importance (Birnbaum: {birnbaum:.4f} < {MIN_BIRNBAUM_IMPORTANCE})"
# # Calculate this asset's contribution to plant EAF reduction
# # This is how much this asset alone reduces the overall plant EAF
eaf_contribution = plant_eaf_minus * asset.get("contribution")
# # Filter 2: Would improvement exceed realistic maximum?
# elif (current_availability + required_impr) > REALISTIC_MAX_AVAILABILITY:
# filter_reason = f"Exceeds realistic maximum ({current_availability + required_impr:.3f} > {REALISTIC_MAX_AVAILABILITY})"
# # Calculate actual downtime hours (if simulation hours available)
# sim_duration = plant_result.get('sim_duration', 8760) # Default to 1 year
# # Filter 3: Is the improvement too small to be worthwhile?
# elif required_impr < min_improvement_fraction:
# filter_reason = f"Improvement too small ({required_impr*100:.2f}% < {MIN_IMPROVEMENT_PERCENT}%)"
# # If filtered, add to filtered list and skip
# if filter_reason:
# filtered_assets.append({
# 'asset': asset_name,
# 'reason': filter_reason,
# 'birnbaum': birnbaum,
# 'current_availability': current_availability,
# 'required_improvement': required_impr
# })
# continue
# If it passed all filters, include it in results
contribution = AssetWeight(
node=asset.get('aeros_node'),
contribution=asset.get("contribution"),
eaf_impact=eaf_contribution,
availability=current_availability,
contribution=birnbaum,
required_improvement=required_impr,
num_of_failures=asset.get('num_events', 0),
down_time=asset.get('total_downtime')
)
results.append(contribution)
# raise Exception(filtered_assets)
# Sort by contribution (Birnbaum Importance) - most critical first
results.sort(key=lambda x: x.contribution, reverse=True)
return results
def project_eaf_improvement(asset: AssetWeight, improvement_factor: float = 0.3) -> float:
@ -173,9 +216,7 @@ async def identify_worst_eaf_contributors(*, simulation_result, target_eaf: floa
# Get equipment results from calc_result
eq_results = calc_result if isinstance(calc_result, list) else [calc_result]
current_plant_eaf = plant_result.get("eaf", 0)
eaf_gap = target_eaf - current_plant_eaf
eaf_gap = (target_eaf - current_plant_eaf)/100.0
# # Verify our calculation by summing contributions
# total_calculated_downtime = sum(contrib.eaf_impact for contrib in asset_contributions)
@ -189,26 +230,26 @@ async def identify_worst_eaf_contributors(*, simulation_result, target_eaf: floa
standard_scope_location_tags = [tag.location_tag for tag in standard_scope]
asset_contributions = calculate_asset_eaf_contributions(plant_result, eq_results, standard_scope_location_tags)
asset_contributions = calculate_asset_eaf_contributions(plant_result, eq_results, standard_scope_location_tags, eaf_gap=eaf_gap)
project_eaf_improvement = 0.0
selected_eq = []
# project_eaf_improvement = 0.0
# selected_eq = []
for asset in asset_contributions:
if (project_eaf_improvement + asset.eaf_impact) <= eaf_gap:
selected_eq.append(asset)
project_eaf_improvement += asset.eaf_impact
else:
break
# for asset in asset_contributions:
# if (project_eaf_improvement + asset.eaf_impact) <= eaf_gap:
# selected_eq.append(asset)
# project_eaf_improvement += asset.eaf_impact
# else:
# break
optimization_success = current_plant_eaf + project_eaf_improvement >= target_eaf
# optimization_success = current_plant_eaf + project_eaf_improvement >= target_eaf
return OptimizationResult(
current_plant_eaf=current_plant_eaf + project_eaf_improvement,
current_plant_eaf=current_plant_eaf,
target_plant_eaf=target_eaf,
eaf_gap=eaf_gap,
asset_contributions=selected_eq,
optimization_success=optimization_success,
asset_contributions=asset_contributions,
optimization_success=True,
simulation_id=simulation_id
)

@ -1112,7 +1112,7 @@ class OptimumCostModel:
async def get_failures_prediction(self, simulation_id: str, location_tag):
# calc_result_url = f"{RBD_SERVICE_API}/aeros/simulation/result/calc/{simulation_id}?nodetype=RegularNode"
plot_result_url = f"{RBD_SERVICE_API}/aeros/simulation/result/plot/{simulation_id}/{location_tag}?use_location_tag=1"
plot_result_url = f"{RBD_SERVICE_API}/aeros/simulation/result/plot/default/{location_tag}?use_location_tag=1"
# calc_plant_result = f"{RBD_SERVICE_API}/aeros/simulation/result/calc/{simulation_id}/plant"

@ -0,0 +1,286 @@
import json
import logging
from typing import Dict, Union, Tuple
from decimal import Decimal, getcontext
import math
# Set high precision for decimal calculations
getcontext().prec = 50
Structure = Union[str, Dict[str, list]]
log = logging.getLogger(__name__)
def prod(iterable):
"""Compute product of all elements in iterable with high precision."""
result = Decimal('1.0')
for x in iterable:
if isinstance(x, (int, float)):
x = Decimal(str(x))
result *= x
return float(result)
def system_availability(structure: Structure, availabilities: Dict[str, float]) -> float:
"""Recursively compute system availability with precise calculations."""
if isinstance(structure, str): # base case - component
if structure not in availabilities:
raise ValueError(f"Component '{structure}' not found in availabilities")
return float(Decimal(str(availabilities[structure])))
if isinstance(structure, dict):
if "series" in structure:
components = structure["series"]
if not components: # Handle empty series
return 1.0
# Series: A_system = A1 * A2 * ... * An
product = Decimal('1.0')
for s in components:
availability = system_availability(s, availabilities)
product *= Decimal(str(availability))
return float(product)
elif "parallel" in structure:
components = structure["parallel"]
if not components: # Handle empty parallel
return 0.0
# Parallel: A_system = 1 - (1-A1) * (1-A2) * ... * (1-An)
product = Decimal('1.0')
for s in components:
availability = system_availability(s, availabilities)
unavailability = Decimal('1.0') - Decimal(str(availability))
product *= unavailability
result = Decimal('1.0') - product
return float(result)
elif "parallel_no_redundancy" in structure:
# Load sharing - system availability is minimum of components
components = structure["parallel_no_redundancy"]
if not components:
return 0.0
availabilities_list = [system_availability(s, availabilities) for s in components]
return min(availabilities_list)
raise ValueError(f"Invalid structure definition: {structure}")
def get_all_components(structure: Structure) -> set:
"""Extract all component names from a structure."""
components = set()
def extract_components(substructure):
if isinstance(substructure, str):
components.add(substructure)
elif isinstance(substructure, dict):
for component_list in substructure.values():
for component in component_list:
extract_components(component)
extract_components(structure)
return components
def birnbaum_importance(structure: Structure, availabilities: Dict[str, float], component: str) -> float:
"""
Calculate Birnbaum importance for a component.
Birnbaum importance = A_system/A_component
This is approximated as:
I_B = A_system(A_i=1) - A_system(A_i=0)
Where A_i is the availability of component i.
"""
# Create copies for calculations
avail_up = availabilities.copy()
avail_down = availabilities.copy()
# Set component availability to 1 (perfect)
avail_up[component] = 1.0
# Set component availability to 0 (failed)
avail_down[component] = 0.0
# Calculate system availability in both cases
system_up = system_availability(structure, avail_up)
system_down = system_availability(structure, avail_down)
# Birnbaum importance is the difference
return system_up - system_down
def criticality_importance(structure: Structure, availabilities: Dict[str, float], component: str) -> float:
"""
Calculate Criticality importance for a component.
Criticality importance = Birnbaum importance * (1 - A_component) / (1 - A_system)
This represents the probability that component i is critical to system failure.
"""
birnbaum = birnbaum_importance(structure, availabilities, component)
system_avail = system_availability(structure, availabilities)
component_avail = availabilities[component]
if system_avail >= 1.0: # Perfect system
return 0.0
criticality = birnbaum * (1.0 - component_avail) / (1.0 - system_avail)
return criticality
def fussell_vesely_importance(structure: Structure, availabilities: Dict[str, float], component: str) -> float:
"""
Calculate Fussell-Vesely importance for a component.
FV importance = (A_system - A_system(A_i=0)) / A_system
This represents the fractional decrease in system availability when component i fails.
"""
system_avail = system_availability(structure, availabilities)
if system_avail <= 0.0:
return 0.0
# Calculate system availability with component failed
avail_down = availabilities.copy()
avail_down[component] = 0.0
system_down = system_availability(structure, avail_down)
fv = (system_avail - system_down) / system_avail
return fv
def compute_all_importance_measures(structure: Structure, availabilities: Dict[str, float]) -> Dict[str, Dict[str, float]]:
"""
Compute all importance measures for each component.
Returns:
Dictionary with component names as keys and importance measures as values
"""
# Normalize availabilities to 0-1 range if needed
normalized_availabilities = {}
for k, v in availabilities.items():
if v > 1.0:
normalized_availabilities[k] = v / 100.0
else:
normalized_availabilities[k] = v
# Clamp to valid range [0, 1]
normalized_availabilities[k] = max(0.0, min(1.0, normalized_availabilities[k]))
# Get all components in the system
all_components = get_all_components(structure)
# Check for missing components
missing_components = all_components - set(normalized_availabilities.keys())
if missing_components:
log.warning(f"Missing components (assuming 100% availability): {missing_components}")
for comp in missing_components:
normalized_availabilities[comp] = 1.0
# Calculate system baseline availability
system_avail = system_availability(structure, normalized_availabilities)
# Calculate importance measures for each component
results = {}
total_birnbaum = 0.0
for component in all_components:
if component in normalized_availabilities:
birnbaum = birnbaum_importance(structure, normalized_availabilities, component)
criticality = criticality_importance(structure, normalized_availabilities, component)
fv = fussell_vesely_importance(structure, normalized_availabilities, component)
results[component] = {
'birnbaum_importance': birnbaum,
'criticality_importance': criticality,
'fussell_vesely_importance': fv,
'component_availability': normalized_availabilities[component]
}
total_birnbaum += birnbaum
# Calculate contribution percentages based on Birnbaum importance
if total_birnbaum > 0:
for component in results:
contribution_pct = results[component]['birnbaum_importance'] / total_birnbaum
results[component]['contribution_percentage'] = contribution_pct
else:
for component in results:
results[component]['contribution_percentage'] = 0.0
# Add system-level information
results['_system_info'] = {
'system_availability': system_avail,
'system_unavailability': 1.0 - system_avail,
'total_birnbaum_importance': total_birnbaum
}
return results
def calculate_contribution_accurate(availabilities: Dict[str, float], structure_file: str = 'src/overhaul/rbd_structure.json') -> Dict[str, Dict[str, float]]:
"""
Calculate component contributions using proper importance measures.
Args:
availabilities: Dictionary of component availabilities
structure_file: Path to RBD structure JSON file
Returns:
Dictionary containing all importance measures and contributions
"""
try:
with open(structure_file, 'r') as model_file:
structure = json.load(model_file)
except FileNotFoundError:
raise FileNotFoundError(f"Structure file not found: {structure_file}")
except json.JSONDecodeError:
raise ValueError(f"Invalid JSON in structure file: {structure_file}")
# Compute all importance measures
results = compute_all_importance_measures(structure, availabilities)
# Extract system information
system_info = results.pop('_system_info')
# Log results
log.info(f"System Availability: {system_info['system_availability']:.6f}")
log.info(f"System Unavailability: {system_info['system_unavailability']:.6f}")
# Sort components by Birnbaum importance (most critical first)
sorted_components = sorted(results.items(),
key=lambda x: x[1]['birnbaum_importance'],
reverse=True)
print("\n=== COMPONENT IMPORTANCE ANALYSIS ===")
print(f"System Availability: {system_info['system_availability']:.6f} ({system_info['system_availability']*100:.4f}%)")
print(f"System Unavailability: {system_info['system_unavailability']:.6f}")
print("\nComponent Rankings (by Birnbaum Importance):")
print(f"{'Component':<20} {'Availability':<12} {'Birnbaum':<12} {'Criticality':<12} {'F-V':<12} {'Contribution%':<12}")
print("-" * 92)
for component, measures in sorted_components:
print(f"{component:<20} {measures['component_availability']:<12.6f} "
f"{measures['birnbaum_importance']:<12.6f} {measures['criticality_importance']:<12.6f} "
f"{measures['fussell_vesely_importance']:<12.6f} {measures['contribution_percentage']*100:<12.4f}")
# Return results with system info included
# results['_system_info'] = system_info
return results
# Legacy function for backwards compatibility
def calculate_contribution(availabilities):
"""Legacy function - redirects to improved version."""
try:
return calculate_contribution_accurate(availabilities)
except Exception as e:
log.error(f"Error in contribution calculation: {e}")
raise

@ -0,0 +1,23 @@
{
"series": [
"Plant Control",
"SAC-IAC",
"SCR",
"Feedwater System",
"Boiler",
"Turbine",
"Generator",
"Condensate Water",
"Cooling Water",
"Air Flue Gas",
"Ash Handling",
"SPS",
"KLH",
"CHS",
"CL",
"Desalination",
"WTP",
"FGD",
"SSB"
]
}

@ -2,6 +2,7 @@ from typing import List
from fastapi import APIRouter, HTTPException, status
from src.auth.service import Token
from src.database.core import DbSession
from src.models import StandardResponse
from src.overhaul.service import (get_overhaul_critical_parts,
@ -17,11 +18,11 @@ router = APIRouter()
@router.get("", response_model=StandardResponse[OverhaulRead])
async def get_overhaul(db_session: DbSession):
async def get_overhaul(db_session: DbSession, token:Token):
"""Get all scope pagination."""
overview = await get_overhaul_overview(db_session=db_session)
schedules = await get_overhaul_schedules(db_session=db_session)
criticalParts = get_overhaul_critical_parts()
criticalParts = await get_overhaul_critical_parts(db_session=db_session, session_id=overview["overhaul"]["id"], token=token)
systemComponents = get_overhaul_system_components()
return StandardResponse(

@ -28,7 +28,7 @@ class OverhaulSystemComponents(OverhaulBase):
class OverhaulRead(OverhaulBase):
overview: Dict[str, Any]
criticalParts: List[str]
criticalParts: dict
schedules: List[ScopeRead]
systemComponents: Dict[str, Any]

@ -1,12 +1,18 @@
import asyncio
from typing import Optional
import httpx
from sqlalchemy import Delete, Select
from src.auth.service import CurrentUser
from src.calculation_target_reliability.service import RBD_SERVICE_API
from src.database.core import DbSession
from src.contribution_util import calculate_contribution
from src.overhaul_activity.service import get_standard_scope_by_session_id
from src.overhaul_scope.model import OverhaulScope
from src.overhaul_scope.service import get_all as get_all_session
from src.overhaul_scope.service import get_overview_overhaul
from src.standard_scope.service import get_by_oh_session_id
async def get_overhaul_overview(db_session: DbSession):
@ -16,17 +22,92 @@ async def get_overhaul_overview(db_session: DbSession):
return results
def get_overhaul_critical_parts():
async def get_simulation_results(*, simulation_id: str, token: str):
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
calc_result_url = f"{RBD_SERVICE_API}/aeros/simulation/result/calc/default?nodetype=RegularNode"
# plot_result_url = f"{RBD_SERVICE_API}/aeros/simulation/result/plot/{simulation_id}?nodetype=RegularNode"
calc_plant_result = f"{RBD_SERVICE_API}/aeros/simulation/result/calc/default/plant"
async with httpx.AsyncClient(timeout=300.0) as client:
calc_task = client.get(calc_result_url, headers=headers)
# plot_task = client.get(plot_result_url, headers=headers)
plant_task = client.get(calc_plant_result, headers=headers)
# Run all three requests concurrently
calc_response, plant_response = await asyncio.gather(calc_task, plant_task)
calc_response.raise_for_status()
# plot_response.raise_for_status()
plant_response.raise_for_status()
calc_data = calc_response.json()["data"]
# plot_data = plot_response.json()["data"]
plant_data = plant_response.json()["data"]
return {
"calc_result": calc_data,
# "plot_result": plot_data,
"plant_result": plant_data
}
async def get_overhaul_critical_parts(db_session, session_id, token):
"""Get all overhaul critical parts."""
return [
"Boiler feed pump",
"Boiler reheater system",
"Drum Level (Right) Root Valve A",
"BCP A Discharge Valve",
"BFPT A EXH Press HI Root VLV",
equipments, _ = await get_by_oh_session_id(
db_session=db_session,
oh_session_id=session_id,
)
criticality_simulation = await get_simulation_results(
simulation_id="efa8ef4c-0417-4d2d-95f3-41e4283737ab",
token=token
)
rbd_simulation = {asset['aeros_node']["node_name"]: {
"availability": asset["availability"],
"criticality": asset["criticality"]
} for asset in criticality_simulation["calc_result"]}
# Create the base result list
base_result = [
{
"id": equipment.id,
"location_tag": equipment.location_tag,
"name": equipment.master_equipment.name,
"matrix": rbd_simulation.get(equipment.location_tag)
} for equipment in equipments
]
# Filter out items without matrix data (where rbd_simulation.get() returned None)
filtered_result = [item for item in base_result if item["matrix"] is not None]
# Sort by availability (lowest to highest) and limit to 10
availability_result = sorted(
filtered_result,
key=lambda x: x["matrix"]["availability"]
)[:10]
# Sort by criticality (highest to lowest) and limit to 10
criticality_result = sorted(
filtered_result,
key=lambda x: x["matrix"]["criticality"],
reverse=True
)[:10]
return {
"availability" : availability_result,
"criticality": criticality_result
}
async def get_overhaul_schedules(*, db_session: DbSession):
"""Get all overhaul schedules."""
query = Select(OverhaulScope)
@ -76,7 +157,7 @@ def get_overhaul_system_components():
"total_uptime": 17419.000000000062,
},
"SCR": {
"availability": 0.9996577686516085,
"availability": 0.9196577686516085,
"efficiency": 0.9996690612127086,
"total_uptime": 17526.0,
},
@ -139,8 +220,22 @@ def get_overhaul_system_components():
"total_uptime": 17402.000000000062,
},
}
availabilities = {schematic: item['availability'] for schematic, item in powerplant_reliability.items() }
percentages = calculate_contribution(availabilities)
for schema, contribution in percentages.items():
powerplant_reliability[schema]["critical_contribution"] = contribution['criticality_importance']
# Sort the powerplant_reliability dictionary by critical_contribution in descending order
sorted_powerplant_reliability = dict(sorted(
powerplant_reliability.items(),
key=lambda x: x[1]["critical_contribution"],
reverse=True # Set to True for high to low sorting
))
return powerplant_reliability
return sorted_powerplant_reliability
return {
"HPT": {

@ -8,7 +8,7 @@ from src.database.core import DbSession
from src.database.service import search_filter_sort_paginate
from src.overhaul_activity.model import OverhaulActivity
from src.utils import time_now
from src.standard_scope.model import StandardScope, EquipmentOHHistory
from src.standard_scope.model import MasterEquipment, StandardScope, EquipmentOHHistory
from src.workscope_group.model import MasterActivity
from src.workscope_group_maintenance_type.model import WorkscopeOHType
from src.equipment_workscope_group.model import EquipmentWorkscopeGroup
@ -192,6 +192,7 @@ async def get_overview_overhaul(*, db_session: DbSession):
.join(StandardScope.workscope_groups)
.join(EquipmentWorkscopeGroup.workscope_group)
.join(MasterActivity.oh_types)
.join(MasterEquipment, StandardScope.location_tag == MasterEquipment.location_tag)
.filter(WorkscopeOHType.maintenance_type_id == selected_overhaul.maintenance_type_id)
.filter(
(StandardScope.is_alternating_oh == False) |

@ -74,6 +74,7 @@ async def get_by_oh_session_id(*, db_session: DbSession, oh_session_id: UUID):
.join(EquipmentWorkscopeGroup.workscope_group)
.join(MasterActivity.oh_types)
.join(WorkscopeOHType.oh_type)
.join(MasterEquipment, StandardScope.location_tag == MasterEquipment.location_tag)
.filter(MaintenanceType.name == overhaul.maintenance_type.name).filter(
(StandardScope.is_alternating_oh == False) |
(StandardScope.oh_history == None) |

Loading…
Cancel
Save