from typing import Optional, List from dataclasses import dataclass from sqlalchemy import Delete, Select import httpx from src.auth.service import CurrentUser from src.contribution_util import calculate_contribution, calculate_contribution_accurate from src.database.core import DbSession, CollectorDbSession from datetime import datetime, timedelta import random from .utils import generate_down_periods from src.overhaul_scope.service import get as get_overhaul from bisect import bisect_left from collections import defaultdict import asyncio from .schema import AssetWeight,MaintenanceScenario,OptimizationResult from src.overhaul_activity.service import get_standard_scope_by_session_id RBD_SERVICE_API = "http://192.168.1.82:8000/rbd" client = httpx.AsyncClient(timeout=300.0) async def run_rbd_simulation(*, sim_hours: int, token): sim_data = { "SimulationName": "Simulation OH Reliability Target", "SchematicName": "- TJB - Unit 3 -", "SimSeed": 1, "SimDuration": sim_hours, "DurationUnit": "UHour", } headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } rbd_simulation_url = f"{RBD_SERVICE_API}/aeros/simulation/run" async with httpx.AsyncClient(timeout=300.0) as client: response = await client.post(rbd_simulation_url, json=sim_data, headers=headers) response.raise_for_status() return response.json() async def get_simulation_results(*, simulation_id: str, token: str): headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } calc_result_url = f"{RBD_SERVICE_API}/aeros/simulation/result/calc/{simulation_id}?nodetype=RegularNode" # plot_result_url = f"{RBD_SERVICE_API}/aeros/simulation/result/plot/{simulation_id}?nodetype=RegularNode" calc_plant_result = f"{RBD_SERVICE_API}/aeros/simulation/result/calc/{simulation_id}/plant" async with httpx.AsyncClient(timeout=300.0) as client: calc_task = client.get(calc_result_url, headers=headers) # plot_task = client.get(plot_result_url, headers=headers) plant_task = client.get(calc_plant_result, headers=headers) # Run all three requests concurrently calc_response, plant_response = await asyncio.gather(calc_task, plant_task) calc_response.raise_for_status() # plot_response.raise_for_status() plant_response.raise_for_status() calc_data = calc_response.json()["data"] # plot_data = plot_response.json()["data"] plant_data = plant_response.json()["data"] return { "calc_result": calc_data, # "plot_result": plot_data, "plant_result": plant_data } # def calculate_asset_weights(plant_result, eq_results): # """ # Calculate each asset's contribution weight to plant EAF # Based on production capacity and criticality # """ # plant_ideal_production = plant_result.get('ideal_production', 0) # results = [] # for asset in eq_results: # # Weight based on production capacity # capacity_weight = asset.get('ideal_production', 0) / plant_ideal_production if plant_ideal_production > 0 else 0 # # Get asset EAF # asset_eaf = asset.get('eaf', 0) # # Calculate EAF impact (how much this asset affects plant EAF) # eaf_impact = (100 - asset_eaf) * capacity_weight # asset_weight = AssetWeight( # node=asset.get('aeros_node'), # capacity_weight=capacity_weight, # eaf_impact=eaf_impact, # eaf=asset_eaf, # num_of_failures=asset.get('num_events', 0), # ideal_production=asset.get('ideal_production', 0), # downtime_hours=asset # ) # results.append(asset_weight) # return results def calculate_asset_eaf_contributions(plant_result, eq_results, standard_scope, eaf_gap): """ Calculate each asset's contribution to plant EAF with realistic improvement potential. Higher contribution = more impact on improving plant EAF """ # Convert EAF gap from percentage to fraction if needed # Assuming eaf_gap is a percentage (e.g., 1.0 for 1%), convert to fraction eaf_gap_fraction = eaf_gap / 100.0 if eaf_gap > 1.0 else eaf_gap results = [] filtered_assets = [] # To track assets that were filtered out # # Get availabilities and calculate importance # availabilities = {asset.get('aeros_node').get('node_name'): asset.get('availability') # for asset in eq_results} # importance_results = calculate_contribution_accurate(availabilities, "src/calculation_target_reliability/result.json") # Define realistic thresholds MIN_BIRNBAUM_IMPORTANCE = 0.0005 # Filter out components with very low impact REALISTIC_MAX_AVAILABILITY = 0.995 # 99.5% practical maximum MIN_IMPROVEMENT_PERCENT = 0.05 # Minimum improvement to consider (0.5%) min_improvement_fraction = MIN_IMPROVEMENT_PERCENT / 100.0 for asset in eq_results: asset_name = asset.get('aeros_node').get('node_name') # Skip if not in standard scope if asset_name not in standard_scope: continue birnbaum = asset.get('contribution') current_availability = asset.get('availability') # Calculate required improvement required_impr = 0.1 * birnbaum # # CHECK FILTERS - Is this asset worth considering? # filter_reason = None # # Filter 1: Is the component important enough? # if birnbaum < MIN_BIRNBAUM_IMPORTANCE: # filter_reason = f"Low importance (Birnbaum: {birnbaum:.4f} < {MIN_BIRNBAUM_IMPORTANCE})" # # Filter 2: Would improvement exceed realistic maximum? # elif (current_availability + required_impr) > REALISTIC_MAX_AVAILABILITY: # filter_reason = f"Exceeds realistic maximum ({current_availability + required_impr:.3f} > {REALISTIC_MAX_AVAILABILITY})" # # Filter 3: Is the improvement too small to be worthwhile? # elif required_impr < min_improvement_fraction: # filter_reason = f"Improvement too small ({required_impr*100:.2f}% < {MIN_IMPROVEMENT_PERCENT}%)" # # If filtered, add to filtered list and skip # if filter_reason: # filtered_assets.append({ # 'asset': asset_name, # 'reason': filter_reason, # 'birnbaum': birnbaum, # 'current_availability': current_availability, # 'required_improvement': required_impr # }) # continue # If it passed all filters, include it in results contribution = AssetWeight( node=asset.get('aeros_node'), availability=current_availability, contribution=birnbaum, required_improvement=required_impr, num_of_failures=asset.get('num_events', 0), down_time=asset.get('total_downtime') ) results.append(contribution) # raise Exception(filtered_assets) # Sort by contribution (Birnbaum Importance) - most critical first results.sort(key=lambda x: x.contribution, reverse=True) return results def project_eaf_improvement(asset: AssetWeight, improvement_factor: float = 0.3) -> float: """ Project EAF improvement after maintenance This is a simplified model - you should replace with your actual prediction logic """ current_downtime_pct = 100 - asset.eaf # Assume maintenance reduces downtime by improvement_factor improved_downtime_pct = current_downtime_pct * (1 - improvement_factor) projected_eaf = 100 - improved_downtime_pct return min(projected_eaf, 99.9) # Cap at 99.9% async def identify_worst_eaf_contributors(*, simulation_result, target_eaf: float, db_session: DbSession, oh_session_id: str, collector_db: CollectorDbSession, simulation_id: str): """ Identify equipment that contributes most to plant EAF reduction Args: simulation_result: Dictionary containing calc_result and plant_result target_eaf: Target plant EAF percentage (for gap calculation) Returns: OptimizationResult with asset contributions ranked by impact """ # Calculate current plant EAF and asset contributions calc_result = simulation_result['calc_result'] plant_result = simulation_result['plant_result'] # plot_result = simulation_result['plot_result'] # Get equipment results from calc_result eq_results = calc_result if isinstance(calc_result, list) else [calc_result] current_plant_eaf = plant_result.get("eaf", 0) eaf_gap = (target_eaf - current_plant_eaf)/100.0 # # Verify our calculation by summing contributions # total_calculated_downtime = sum(contrib.eaf_impact for contrib in asset_contributions) # calculated_plant_eaf = 100 - total_calculated_downtime standard_scope = await get_standard_scope_by_session_id( db_session=db_session, overhaul_session_id=oh_session_id, collector_db=collector_db ) standard_scope_location_tags = [tag.location_tag for tag in standard_scope] asset_contributions = calculate_asset_eaf_contributions(plant_result, eq_results, standard_scope_location_tags, eaf_gap=eaf_gap) project_eaf_improvement = 0.0 selected_eq = [] for asset in asset_contributions: if (project_eaf_improvement + asset.required_improvement) <= eaf_gap: selected_eq.append(asset) project_eaf_improvement += asset.required_improvement else: break # optimization_success = current_plant_eaf + project_eaf_improvement >= target_eaf return OptimizationResult( current_plant_eaf=current_plant_eaf, target_plant_eaf=target_eaf, eaf_gap=eaf_gap, asset_contributions=selected_eq, optimization_success=True, simulation_id=simulation_id ) # def optimize_maintenance_priority(*, simulation_result, target_eaf: float): # """ # Optimize maintenance priorities to achieve target plant EAF # Args: # simulation_result: Dictionary containing calc_result and plant_result # target_eaf: Target plant EAF percentage # Returns: # OptimizationResult with recommendations # """ # # Calculate current plant EAF and asset weights # calc_result = simulation_result['calc_result'] # plant_result = simulation_result['plant_result'] # # Get equipment results from calc_result # eq_results = calc_result if isinstance(calc_result, list) else [calc_result] # equipment_eaf_weights = calculate_asset_weights(plant_result, eq_results) # current_plant_eaf = plant_result.get("eaf", 0) # if current_plant_eaf >= target_eaf: # return OptimizationResult( # current_plant_eaf=current_plant_eaf, # target_plant_eaf=target_eaf, # eaf_gap=0.0, # recommended_assets=[], # projected_plant_eaf=current_plant_eaf, # optimization_success=True # ) # # Create maintenance scenarios for all assets # scenarios = [] # for asset in equipment_eaf_weights: # if asset.eaf < 99.9: # Only consider assets with improvement potential # projected_eaf = project_eaf_improvement(asset) # eaf_improvement = projected_eaf - asset.eaf # # Calculate plant-level benefit (how much this asset improvement affects plant EAF) # plant_benefit = eaf_improvement * asset.capacity_weight # # Priority score combines multiple factors # priority_score = ( # plant_benefit * 0.8 + # Plant impact (50%) # (100 - asset.eaf) * asset.capacity_weight * 0.2 # Current performance gap (30%) # # asset.num_of_failures * asset.capacity_weight * 0.1 # Failure frequency (20%) # ) # scenario = MaintenanceScenario( # location_tag=asset.node['node_name'], # Placeholder - replace with actual name # current_eaf=asset.eaf, # projected_eaf_improvement=eaf_improvement, # priority_score=priority_score, # plant_level_benefit=plant_benefit, # capacity_weight=asset.capacity_weight # ) # scenarios.append(scenario) # # Sort by priority score (highest first) # scenarios.sort(key=lambda x: x.priority_score, reverse=True) # # Select assets for maintenance to achieve target EAF # selected_scenarios = [] # cumulative_benefit = 0.0 # eaf_gap = target_eaf - current_plant_eaf # for scenario in scenarios: # if cumulative_benefit < eaf_gap: # selected_scenarios.append(scenario) # cumulative_benefit += scenario.plant_level_benefit # if cumulative_benefit >= eaf_gap: # break # projected_plant_eaf = current_plant_eaf + cumulative_benefit # optimization_success = projected_plant_eaf >= target_eaf # return OptimizationResult( # current_plant_eaf=current_plant_eaf, # target_plant_eaf=target_eaf, # eaf_gap=eaf_gap, # recommended_assets=selected_scenarios, # projected_plant_eaf=projected_plant_eaf, # optimization_success=optimization_success # )