|
|
|
|
@ -1,12 +1,10 @@
|
|
|
|
|
import json
|
|
|
|
|
import logging
|
|
|
|
|
from typing import Dict, Union
|
|
|
|
|
from typing import Dict, Union, Tuple
|
|
|
|
|
from decimal import Decimal, getcontext
|
|
|
|
|
import math
|
|
|
|
|
|
|
|
|
|
from src.aeros_simulation.service import get_simulation_with_calc_result
|
|
|
|
|
from src.database.core import DbSession
|
|
|
|
|
from src.logging import setup_logging
|
|
|
|
|
|
|
|
|
|
# Set high precision for decimal calculations
|
|
|
|
|
getcontext().prec = 50
|
|
|
|
|
@ -14,7 +12,6 @@ getcontext().prec = 50
|
|
|
|
|
Structure = Union[str, Dict[str, list]]
|
|
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
setup_logging(logger=log)
|
|
|
|
|
|
|
|
|
|
def prod(iterable):
|
|
|
|
|
"""Compute product of all elements in iterable with high precision."""
|
|
|
|
|
@ -28,7 +25,7 @@ def prod(iterable):
|
|
|
|
|
|
|
|
|
|
def system_availability(structure: Structure, availabilities: Dict[str, float]) -> float:
|
|
|
|
|
"""Recursively compute system availability with precise calculations."""
|
|
|
|
|
if isinstance(structure, str): # base case
|
|
|
|
|
if isinstance(structure, str): # base case - component
|
|
|
|
|
if structure not in availabilities:
|
|
|
|
|
raise ValueError(f"Component '{structure}' not found in availabilities")
|
|
|
|
|
return float(Decimal(str(availabilities[structure])))
|
|
|
|
|
@ -39,7 +36,7 @@ def system_availability(structure: Structure, availabilities: Dict[str, float])
|
|
|
|
|
if not components: # Handle empty series
|
|
|
|
|
return 1.0
|
|
|
|
|
|
|
|
|
|
# Convert to Decimal for precise calculation
|
|
|
|
|
# Series: A_system = A1 * A2 * ... * An
|
|
|
|
|
product = Decimal('1.0')
|
|
|
|
|
for s in components:
|
|
|
|
|
availability = system_availability(s, availabilities)
|
|
|
|
|
@ -51,7 +48,7 @@ def system_availability(structure: Structure, availabilities: Dict[str, float])
|
|
|
|
|
if not components: # Handle empty parallel
|
|
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
|
# Calculate 1 - prod(1 - availability) with high precision
|
|
|
|
|
# Parallel: A_system = 1 - (1-A1) * (1-A2) * ... * (1-An)
|
|
|
|
|
product = Decimal('1.0')
|
|
|
|
|
for s in components:
|
|
|
|
|
availability = system_availability(s, availabilities)
|
|
|
|
|
@ -62,8 +59,9 @@ def system_availability(structure: Structure, availabilities: Dict[str, float])
|
|
|
|
|
return float(result)
|
|
|
|
|
|
|
|
|
|
elif "parallel_no_redundancy" in structure:
|
|
|
|
|
# Load sharing - system availability is minimum of components
|
|
|
|
|
components = structure["parallel_no_redundancy"]
|
|
|
|
|
if not components: # Handle empty parallel_no_redundancy
|
|
|
|
|
if not components:
|
|
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
|
availabilities_list = [system_availability(s, availabilities) for s in components]
|
|
|
|
|
@ -88,140 +86,207 @@ def get_all_components(structure: Structure) -> set:
|
|
|
|
|
return components
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def compute_contributions(structure: Structure, availabilities: Dict[str, float]):
|
|
|
|
|
def birnbaum_importance(structure: Structure, availabilities: Dict[str, float], component: str) -> float:
|
|
|
|
|
"""
|
|
|
|
|
Compute contributions of each component to system availability.
|
|
|
|
|
Handles nested structures recursively with precise calculations.
|
|
|
|
|
Calculate Birnbaum importance for a component.
|
|
|
|
|
|
|
|
|
|
Birnbaum importance = ∂A_system/∂A_component
|
|
|
|
|
|
|
|
|
|
This is approximated as:
|
|
|
|
|
I_B = A_system(A_i=1) - A_system(A_i=0)
|
|
|
|
|
|
|
|
|
|
Where A_i is the availability of component i.
|
|
|
|
|
"""
|
|
|
|
|
# Convert all availabilities to precise decimals for internal calculations
|
|
|
|
|
precise_availabilities = {k: float(Decimal(str(v))) for k, v in availabilities.items()}
|
|
|
|
|
# Create copies for calculations
|
|
|
|
|
avail_up = availabilities.copy()
|
|
|
|
|
avail_down = availabilities.copy()
|
|
|
|
|
|
|
|
|
|
# Validate inputs
|
|
|
|
|
all_components = get_all_components(structure)
|
|
|
|
|
missing_components = all_components - set(precise_availabilities.keys())
|
|
|
|
|
if missing_components:
|
|
|
|
|
missing_availabilities = {node: 1.0 for node in missing_components} # Changed from 100 to 1.0 (assuming availability is 0-1 range)
|
|
|
|
|
log.warning(f"Missing Component: {missing_components}")
|
|
|
|
|
precise_availabilities.update(missing_availabilities)
|
|
|
|
|
# Set component availability to 1 (perfect)
|
|
|
|
|
avail_up[component] = 1.0
|
|
|
|
|
|
|
|
|
|
baseline = system_availability(structure, precise_availabilities)
|
|
|
|
|
deltas = {c: Decimal('0.0') for c in precise_availabilities}
|
|
|
|
|
# Set component availability to 0 (failed)
|
|
|
|
|
avail_down[component] = 0.0
|
|
|
|
|
|
|
|
|
|
# Calculate system availability in both cases
|
|
|
|
|
system_up = system_availability(structure, avail_up)
|
|
|
|
|
system_down = system_availability(structure, avail_down)
|
|
|
|
|
|
|
|
|
|
# Birnbaum importance is the difference
|
|
|
|
|
return system_up - system_down
|
|
|
|
|
|
|
|
|
|
def force_component_down(substructure: Structure, component: str, avail_copy: Dict[str, float]):
|
|
|
|
|
"""Recursively set a component's availability to 0 in the structure."""
|
|
|
|
|
if isinstance(substructure, str):
|
|
|
|
|
if substructure == component:
|
|
|
|
|
avail_copy[substructure] = 0.0
|
|
|
|
|
elif isinstance(substructure, dict):
|
|
|
|
|
for component_list in substructure.values():
|
|
|
|
|
for sub_component in component_list:
|
|
|
|
|
force_component_down(sub_component, component, avail_copy)
|
|
|
|
|
|
|
|
|
|
def recurse_contributions(substructure: Structure, avail: Dict[str, float], weight: Decimal):
|
|
|
|
|
"""
|
|
|
|
|
Recursively assign contributions with precise arithmetic.
|
|
|
|
|
weight = fraction of total system availability change attributed to this substructure.
|
|
|
|
|
"""
|
|
|
|
|
if isinstance(substructure, str):
|
|
|
|
|
deltas[substructure] += weight
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if isinstance(substructure, dict):
|
|
|
|
|
if "series" in substructure:
|
|
|
|
|
# In series, each component contributes equally to the weight
|
|
|
|
|
for s in substructure["series"]:
|
|
|
|
|
recurse_contributions(s, avail, weight)
|
|
|
|
|
|
|
|
|
|
elif "parallel" in substructure:
|
|
|
|
|
# For parallel systems, calculate delta contribution for each branch
|
|
|
|
|
for s in substructure["parallel"]:
|
|
|
|
|
av_copy = avail.copy()
|
|
|
|
|
|
|
|
|
|
# Get all components in this branch and force them down
|
|
|
|
|
branch_components = get_all_components(s)
|
|
|
|
|
for comp in branch_components:
|
|
|
|
|
force_component_down(substructure, comp, av_copy)
|
|
|
|
|
|
|
|
|
|
reduced = system_availability(substructure, av_copy)
|
|
|
|
|
delta = Decimal(str(baseline)) - Decimal(str(reduced))
|
|
|
|
|
|
|
|
|
|
if delta > 0: # Only distribute weight if there's actual contribution
|
|
|
|
|
baseline_decimal = Decimal(str(baseline))
|
|
|
|
|
contribution_weight = (delta * weight / baseline_decimal) if baseline_decimal > 0 else Decimal('0')
|
|
|
|
|
recurse_contributions(s, avail, contribution_weight)
|
|
|
|
|
|
|
|
|
|
elif "parallel_no_redundancy" in substructure:
|
|
|
|
|
components = substructure["parallel_no_redundancy"]
|
|
|
|
|
component_values = []
|
|
|
|
|
|
|
|
|
|
for s in components:
|
|
|
|
|
component_values.append((s, system_availability(s, avail)))
|
|
|
|
|
|
|
|
|
|
# Find minimum availability with proper float comparison
|
|
|
|
|
min_val = min(val for _, val in component_values)
|
|
|
|
|
|
|
|
|
|
# Use small epsilon for float comparison
|
|
|
|
|
epsilon = Decimal('1e-10')
|
|
|
|
|
weakest_components = [comp for comp, val in component_values
|
|
|
|
|
if abs(Decimal(str(val)) - Decimal(str(min_val))) < epsilon]
|
|
|
|
|
|
|
|
|
|
# Distribute weight equally among weakest components
|
|
|
|
|
if weakest_components:
|
|
|
|
|
weight_per_weakest = weight / Decimal(str(len(weakest_components)))
|
|
|
|
|
else:
|
|
|
|
|
weight_per_weakest = Decimal('0')
|
|
|
|
|
|
|
|
|
|
for s in components:
|
|
|
|
|
if s in weakest_components:
|
|
|
|
|
recurse_contributions(s, avail, weight_per_weakest)
|
|
|
|
|
else:
|
|
|
|
|
recurse_contributions(s, avail, Decimal('0.0'))
|
|
|
|
|
|
|
|
|
|
# Start recursion with full baseline weight
|
|
|
|
|
if baseline > 0:
|
|
|
|
|
recurse_contributions(structure, precise_availabilities, Decimal(str(baseline)))
|
|
|
|
|
|
|
|
|
|
# Convert deltas back to float for final calculations
|
|
|
|
|
deltas_float = {k: float(v) for k, v in deltas.items()}
|
|
|
|
|
total_delta = sum(deltas_float.values())
|
|
|
|
|
|
|
|
|
|
# Calculate percentages with precision handling
|
|
|
|
|
if total_delta > 1e-10: # Use small epsilon instead of direct comparison to 0
|
|
|
|
|
total_delta_decimal = Decimal(str(total_delta))
|
|
|
|
|
percentages = {c: float(Decimal(str(d)) / total_delta_decimal) for c, d in deltas_float.items()}
|
|
|
|
|
else:
|
|
|
|
|
percentages = {c: 0.0 for c in deltas_float.keys()}
|
|
|
|
|
|
|
|
|
|
# Ensure percentages sum to 1.0 (or very close) by normalizing
|
|
|
|
|
percentage_sum = sum(percentages.values())
|
|
|
|
|
if percentage_sum > 1e-10: # Only normalize if sum is meaningful
|
|
|
|
|
percentages = {k: v / percentage_sum for k, v in percentages.items()}
|
|
|
|
|
def criticality_importance(structure: Structure, availabilities: Dict[str, float], component: str) -> float:
|
|
|
|
|
"""
|
|
|
|
|
Calculate Criticality importance for a component.
|
|
|
|
|
|
|
|
|
|
Criticality importance = Birnbaum importance * (1 - A_component) / (1 - A_system)
|
|
|
|
|
|
|
|
|
|
This represents the probability that component i is critical to system failure.
|
|
|
|
|
"""
|
|
|
|
|
birnbaum = birnbaum_importance(structure, availabilities, component)
|
|
|
|
|
system_avail = system_availability(structure, availabilities)
|
|
|
|
|
component_avail = availabilities[component]
|
|
|
|
|
|
|
|
|
|
if system_avail >= 1.0: # Perfect system
|
|
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
|
criticality = birnbaum * (1.0 - component_avail) / (1.0 - system_avail)
|
|
|
|
|
return criticality
|
|
|
|
|
|
|
|
|
|
return baseline, deltas_float, percentages
|
|
|
|
|
|
|
|
|
|
def fussell_vesely_importance(structure: Structure, availabilities: Dict[str, float], component: str) -> float:
|
|
|
|
|
"""
|
|
|
|
|
Calculate Fussell-Vesely importance for a component.
|
|
|
|
|
|
|
|
|
|
FV importance = (A_system - A_system(A_i=0)) / A_system
|
|
|
|
|
|
|
|
|
|
This represents the fractional decrease in system availability when component i fails.
|
|
|
|
|
"""
|
|
|
|
|
system_avail = system_availability(structure, availabilities)
|
|
|
|
|
|
|
|
|
|
if system_avail <= 0.0:
|
|
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
|
# Calculate system availability with component failed
|
|
|
|
|
avail_down = availabilities.copy()
|
|
|
|
|
avail_down[component] = 0.0
|
|
|
|
|
system_down = system_availability(structure, avail_down)
|
|
|
|
|
|
|
|
|
|
fv = (system_avail - system_down) / system_avail
|
|
|
|
|
return fv
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_contribution(availabilities):
|
|
|
|
|
"""Calculate contribution with input validation and normalization."""
|
|
|
|
|
with open('src/aeros_contribution/result.json', 'r') as model_file:
|
|
|
|
|
structure = json.load(model_file)
|
|
|
|
|
def compute_all_importance_measures(structure: Structure, availabilities: Dict[str, float]) -> Dict[str, Dict[str, float]]:
|
|
|
|
|
"""
|
|
|
|
|
Compute all importance measures for each component.
|
|
|
|
|
|
|
|
|
|
# Normalize availabilities to 0-1 range if they appear to be percentages
|
|
|
|
|
Returns:
|
|
|
|
|
Dictionary with component names as keys and importance measures as values
|
|
|
|
|
"""
|
|
|
|
|
# Normalize availabilities to 0-1 range if needed
|
|
|
|
|
normalized_availabilities = {}
|
|
|
|
|
for k, v in availabilities.items():
|
|
|
|
|
if v > 1.0:
|
|
|
|
|
# Assume it's a percentage, convert to fraction
|
|
|
|
|
normalized_availabilities[k] = v / 100.0
|
|
|
|
|
else:
|
|
|
|
|
normalized_availabilities[k] = v
|
|
|
|
|
|
|
|
|
|
# Clamp to valid range [0, 1]
|
|
|
|
|
normalized_availabilities[k] = max(0.0, min(1.0, normalized_availabilities[k]))
|
|
|
|
|
|
|
|
|
|
baseline, deltas, percentages = compute_contributions(structure, normalized_availabilities)
|
|
|
|
|
# Get all components in the system
|
|
|
|
|
all_components = get_all_components(structure)
|
|
|
|
|
|
|
|
|
|
# Check for missing components
|
|
|
|
|
missing_components = all_components - set(normalized_availabilities.keys())
|
|
|
|
|
if missing_components:
|
|
|
|
|
log.warning(f"Missing components (assuming 100% availability): {missing_components}")
|
|
|
|
|
for comp in missing_components:
|
|
|
|
|
normalized_availabilities[comp] = 1.0
|
|
|
|
|
|
|
|
|
|
# Calculate system baseline availability
|
|
|
|
|
system_avail = system_availability(structure, normalized_availabilities)
|
|
|
|
|
|
|
|
|
|
# Calculate importance measures for each component
|
|
|
|
|
results = {}
|
|
|
|
|
total_birnbaum = 0.0
|
|
|
|
|
|
|
|
|
|
for component in all_components:
|
|
|
|
|
if component in normalized_availabilities:
|
|
|
|
|
birnbaum = birnbaum_importance(structure, normalized_availabilities, component)
|
|
|
|
|
criticality = criticality_importance(structure, normalized_availabilities, component)
|
|
|
|
|
fv = fussell_vesely_importance(structure, normalized_availabilities, component)
|
|
|
|
|
|
|
|
|
|
results[component] = {
|
|
|
|
|
'birnbaum_importance': birnbaum,
|
|
|
|
|
'criticality_importance': criticality,
|
|
|
|
|
'fussell_vesely_importance': fv,
|
|
|
|
|
'component_availability': normalized_availabilities[component]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
total_birnbaum += birnbaum
|
|
|
|
|
|
|
|
|
|
# Calculate contribution percentages based on Birnbaum importance
|
|
|
|
|
if total_birnbaum > 0:
|
|
|
|
|
for component in results:
|
|
|
|
|
contribution_pct = results[component]['birnbaum_importance'] / total_birnbaum
|
|
|
|
|
results[component]['contribution_percentage'] = contribution_pct
|
|
|
|
|
else:
|
|
|
|
|
for component in results:
|
|
|
|
|
results[component]['contribution_percentage'] = 0.0
|
|
|
|
|
|
|
|
|
|
# Add system-level information
|
|
|
|
|
results['_system_info'] = {
|
|
|
|
|
'system_availability': system_avail,
|
|
|
|
|
'system_unavailability': 1.0 - system_avail,
|
|
|
|
|
'total_birnbaum_importance': total_birnbaum
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_contribution_accurate(availabilities: Dict[str, float], structure_file: str = 'src/aeros_contribution/result.json') -> Dict[str, Dict[str, float]]:
|
|
|
|
|
"""
|
|
|
|
|
Calculate component contributions using proper importance measures.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
availabilities: Dictionary of component availabilities
|
|
|
|
|
structure_file: Path to RBD structure JSON file
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Dictionary containing all importance measures and contributions
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
with open(structure_file, 'r') as model_file:
|
|
|
|
|
structure = json.load(model_file)
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
raise FileNotFoundError(f"Structure file not found: {structure_file}")
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
raise ValueError(f"Invalid JSON in structure file: {structure_file}")
|
|
|
|
|
|
|
|
|
|
# Compute all importance measures
|
|
|
|
|
results = compute_all_importance_measures(structure, availabilities)
|
|
|
|
|
|
|
|
|
|
# Extract system information
|
|
|
|
|
system_info = results.pop('_system_info')
|
|
|
|
|
|
|
|
|
|
# Log results
|
|
|
|
|
log.info(f"System Availability: {system_info['system_availability']:.6f}")
|
|
|
|
|
log.info(f"System Unavailability: {system_info['system_unavailability']:.6f}")
|
|
|
|
|
|
|
|
|
|
# Sort components by Birnbaum importance (most critical first)
|
|
|
|
|
sorted_components = sorted(results.items(),
|
|
|
|
|
key=lambda x: x[1]['birnbaum_importance'],
|
|
|
|
|
reverse=True)
|
|
|
|
|
|
|
|
|
|
print("\n=== COMPONENT IMPORTANCE ANALYSIS ===")
|
|
|
|
|
print(f"System Availability: {system_info['system_availability']:.6f} ({system_info['system_availability']*100:.4f}%)")
|
|
|
|
|
print(f"System Unavailability: {system_info['system_unavailability']:.6f}")
|
|
|
|
|
print("\nComponent Rankings (by Birnbaum Importance):")
|
|
|
|
|
print(f"{'Component':<20} {'Availability':<12} {'Birnbaum':<12} {'Criticality':<12} {'F-V':<12} {'Contribution%':<12}")
|
|
|
|
|
print("-" * 92)
|
|
|
|
|
|
|
|
|
|
for component, measures in sorted_components:
|
|
|
|
|
print(f"{component:<20} {measures['component_availability']:<12.6f} "
|
|
|
|
|
f"{measures['birnbaum_importance']:<12.6f} {measures['criticality_importance']:<12.6f} "
|
|
|
|
|
f"{measures['fussell_vesely_importance']:<12.6f} {measures['contribution_percentage']*100:<12.4f}")
|
|
|
|
|
|
|
|
|
|
log.info(f"System EAF: {baseline:.10f}")
|
|
|
|
|
# Return results with system info included
|
|
|
|
|
# results['_system_info'] = system_info
|
|
|
|
|
|
|
|
|
|
return percentages
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Legacy function for backwards compatibility
|
|
|
|
|
def calculate_contribution(availabilities):
|
|
|
|
|
"""Legacy function - redirects to improved version."""
|
|
|
|
|
try:
|
|
|
|
|
return calculate_contribution_accurate(availabilities)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
log.error(f"Error in contribution calculation: {e}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def update_contribution_bulk_mappings(*, db_session, simulation_id):
|
|
|
|
|
@ -235,19 +300,22 @@ async def update_contribution_bulk_mappings(*, db_session, simulation_id):
|
|
|
|
|
# Ensure availability values are properly normalized
|
|
|
|
|
availabilities = {}
|
|
|
|
|
for calc in calc_results:
|
|
|
|
|
availability = calc.availability # Convert percentage to fraction
|
|
|
|
|
# Clamp to valid range and handle potential precision issues
|
|
|
|
|
availability = max(0.0, min(1.0, availability))
|
|
|
|
|
availability = calc.availability
|
|
|
|
|
availabilities[calc.aeros_node.node_name] = availability
|
|
|
|
|
|
|
|
|
|
contribution = calculate_contribution(availabilities)
|
|
|
|
|
importance = calculate_contribution(availabilities)
|
|
|
|
|
|
|
|
|
|
# Prepare bulk update data with rounded contributions to avoid precision issues in DB
|
|
|
|
|
for calc in calc_results:
|
|
|
|
|
contribution_value = contribution.get(calc.aeros_node.node_name, 0.0)
|
|
|
|
|
for calc in calc_results:
|
|
|
|
|
# Round to reasonable precision for database storage
|
|
|
|
|
calc.contribution = round(contribution_value, 10)
|
|
|
|
|
eq_importance = importance.get(calc.aeros_node.node_name, {})
|
|
|
|
|
|
|
|
|
|
if not eq_importance:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
calc.contribution = importance.get(calc.aeros_node.node_name).get('birnbaum_importance', 0)
|
|
|
|
|
calc.criticality = importance.get(calc.aeros_node.node_name).get('criticality_importance', 0)
|
|
|
|
|
|
|
|
|
|
await db_session.commit()
|
|
|
|
|
|
|
|
|
|
return contribution
|
|
|
|
|
return importance
|