import json import logging from typing import Dict, Union, Tuple from decimal import Decimal, getcontext import math from src.aeros_simulation.service import get_simulation_with_calc_result # Set high precision for decimal calculations getcontext().prec = 50 Structure = Union[str, Dict[str, list]] log = logging.getLogger(__name__) def prod(iterable): """Compute product of all elements in iterable with high precision.""" result = Decimal('1.0') for x in iterable: if isinstance(x, (int, float)): x = Decimal(str(x)) result *= x return float(result) def system_availability(structure: Structure, availabilities: Dict[str, float]) -> float: """Recursively compute system availability with precise calculations.""" if isinstance(structure, str): # base case - component if structure not in availabilities: raise ValueError(f"Component '{structure}' not found in availabilities") return float(Decimal(str(availabilities[structure]))) if isinstance(structure, dict): if "series" in structure: components = structure["series"] if not components: # Handle empty series return 1.0 # Series: A_system = A1 * A2 * ... * An product = Decimal('1.0') for s in components: availability = system_availability(s, availabilities) product *= Decimal(str(availability)) return float(product) elif "parallel" in structure: components = structure["parallel"] if not components: # Handle empty parallel return 0.0 # Parallel: A_system = 1 - (1-A1) * (1-A2) * ... * (1-An) product = Decimal('1.0') for s in components: availability = system_availability(s, availabilities) unavailability = Decimal('1.0') - Decimal(str(availability)) product *= unavailability result = Decimal('1.0') - product return float(result) elif "k_of_n" in structure: k = structure["k_of_n"]["k"] components = structure["k_of_n"]["components"] if not components: return 0.0 component_availabilities = [system_availability(s, availabilities) for s in components] return k_of_n_availability(component_availabilities, k) raise ValueError(f"Invalid structure definition: {structure}") from itertools import combinations from decimal import Decimal from math import comb def k_of_n_availability(availabilities: list[float], k: int) -> float: n = len(availabilities) total = Decimal('0.0') # Iterate over all combinations of components that can be working for j in range(k, n+1): for subset in combinations(range(n), j): prob = Decimal('1.0') for i in range(n): if i in subset: prob *= Decimal(str(availabilities[i])) else: prob *= (Decimal('1.0') - Decimal(str(availabilities[i]))) total += prob return float(total) def get_all_components(structure: Structure) -> set: """Extract all component names from a structure.""" components = set() def extract_components(substructure): if isinstance(substructure, str): components.add(substructure) elif isinstance(substructure, dict): for component_list in substructure.values(): for component in component_list: extract_components(component) extract_components(structure) return components def birnbaum_importance(structure: Structure, availabilities: Dict[str, float], component: str) -> float: """ Calculate Birnbaum importance for a component. Birnbaum importance = ∂A_system/∂A_component This is approximated as: I_B = A_system(A_i=1) - A_system(A_i=0) Where A_i is the availability of component i. """ # Create copies for calculations avail_up = availabilities.copy() avail_down = availabilities.copy() # Set component availability to 1 (perfect) avail_up[component] = 1.0 # Set component availability to 0 (failed) avail_down[component] = 0.0 # Calculate system availability in both cases system_up = system_availability(structure, avail_up) system_down = system_availability(structure, avail_down) # Birnbaum importance is the difference return system_up - system_down def criticality_importance(structure: Structure, availabilities: Dict[str, float], component: str) -> float: """ Calculate Criticality importance for a component. Criticality importance = Birnbaum importance * (1 - A_component) / (1 - A_system) This represents the probability that component i is critical to system failure. """ birnbaum = birnbaum_importance(structure, availabilities, component) system_avail = system_availability(structure, availabilities) component_avail = availabilities[component] if system_avail >= 1.0: # Perfect system return 0.0 criticality = birnbaum * (1.0 - component_avail) / (1.0 - system_avail) return criticality def fussell_vesely_importance(structure: Structure, availabilities: Dict[str, float], component: str) -> float: """ Calculate Fussell-Vesely importance for a component. FV importance = (A_system - A_system(A_i=0)) / A_system This represents the fractional decrease in system availability when component i fails. """ system_avail = system_availability(structure, availabilities) if system_avail <= 0.0: return 0.0 # Calculate system availability with component failed avail_down = availabilities.copy() avail_down[component] = 0.0 system_down = system_availability(structure, avail_down) fv = (system_avail - system_down) / system_avail return fv def compute_all_importance_measures(structure: Structure, availabilities: Dict[str, float]) -> Dict[str, Dict[str, float]]: """ Compute all importance measures for each component. Returns: Dictionary with component names as keys and importance measures as values """ # Normalize availabilities to 0-1 range if needed normalized_availabilities = {} for k, v in availabilities.items(): if v > 1.0: normalized_availabilities[k] = v / 100.0 else: normalized_availabilities[k] = v # Clamp to valid range [0, 1] normalized_availabilities[k] = max(0.0, min(1.0, normalized_availabilities[k])) # Get all components in the system all_components = get_all_components(structure) # Check for missing components missing_components = all_components - set(normalized_availabilities.keys()) if missing_components: log.warning(f"Missing components (assuming 100% availability): {missing_components}") for comp in missing_components: normalized_availabilities[comp] = 1.0 # Calculate system baseline availability system_avail = system_availability(structure, normalized_availabilities) # Calculate importance measures for each component results = {} total_birnbaum = 0.0 for component in all_components: if component in normalized_availabilities: birnbaum = birnbaum_importance(structure, normalized_availabilities, component) criticality = criticality_importance(structure, normalized_availabilities, component) fv = fussell_vesely_importance(structure, normalized_availabilities, component) results[component] = { 'birnbaum_importance': birnbaum, 'criticality_importance': criticality, 'fussell_vesely_importance': fv, 'component_availability': normalized_availabilities[component] } total_birnbaum += birnbaum # Calculate contribution percentages based on Birnbaum importance if total_birnbaum > 0: for component in results: contribution_pct = results[component]['birnbaum_importance'] / total_birnbaum results[component]['contribution_percentage'] = contribution_pct else: for component in results: results[component]['contribution_percentage'] = 0.0 # Add system-level information results['_system_info'] = { 'system_availability': system_avail, 'system_unavailability': 1.0 - system_avail, 'total_birnbaum_importance': total_birnbaum } return results def calculate_contribution_accurate(availabilities: Dict[str, float], structure_file: str = 'src/aeros_contribution/result.json') -> Dict[str, Dict[str, float]]: """ Calculate component contributions using proper importance measures. Args: availabilities: Dictionary of component availabilities structure_file: Path to RBD structure JSON file Returns: Dictionary containing all importance measures and contributions """ try: with open(structure_file, 'r') as model_file: structure = json.load(model_file) except FileNotFoundError: raise FileNotFoundError(f"Structure file not found: {structure_file}") except json.JSONDecodeError: raise ValueError(f"Invalid JSON in structure file: {structure_file}") # Compute all importance measures results = compute_all_importance_measures(structure, availabilities) # Extract system information system_info = results.pop('_system_info') # Log results log.info(f"System Availability: {system_info['system_availability']:.6f}") log.info(f"System Unavailability: {system_info['system_unavailability']:.6f}") # Sort components by Birnbaum importance (most critical first) sorted_components = sorted(results.items(), key=lambda x: x[1]['birnbaum_importance'], reverse=True) print("\n=== COMPONENT IMPORTANCE ANALYSIS ===") print(f"System Availability: {system_info['system_availability']:.6f} ({system_info['system_availability']*100:.4f}%)") print(f"System Unavailability: {system_info['system_unavailability']:.6f}") print("\nComponent Rankings (by Birnbaum Importance):") print(f"{'Component':<20} {'Availability':<12} {'Birnbaum':<12} {'Criticality':<12} {'F-V':<12} {'Contribution%':<12}") print("-" * 92) for component, measures in sorted_components: print(f"{component:<20} {measures['component_availability']:<12.6f} " f"{measures['birnbaum_importance']:<12.6f} {measures['criticality_importance']:<12.6f} " f"{measures['fussell_vesely_importance']:<12.6f} {measures['contribution_percentage']*100:<12.4f}") # Return results with system info included # results['_system_info'] = system_info return results # Legacy function for backwards compatibility def calculate_contribution(availabilities): """Legacy function - redirects to improved version.""" try: return calculate_contribution_accurate(availabilities) except Exception as e: log.error(f"Error in contribution calculation: {e}") raise async def update_contribution_bulk_mappings(*, db_session, simulation_id): """Update contribution mappings with precise calculations.""" calc_results = await get_simulation_with_calc_result( db_session=db_session, simulation_id=simulation_id, node_type="RegularNode" ) # Ensure availability values are properly normalized availabilities = {} for calc in calc_results: availability = calc.availability availabilities[calc.aeros_node.node_name] = availability importance = calculate_contribution(availabilities) # Prepare bulk update data with rounded contributions to avoid precision issues in DB for calc in calc_results: # Round to reasonable precision for database storage eq_importance = importance.get(calc.aeros_node.node_name, {}) if not eq_importance: continue calc.contribution = importance.get(calc.aeros_node.node_name).get('birnbaum_importance', 0) calc.criticality = importance.get(calc.aeros_node.node_name).get('criticality_importance', 0) calc.contribution_factor = importance.get(calc.aeros_node.node_name).get('fussell_vesely_importance', 0) await db_session.commit() return importance