From 58d1ae57249a90f1b5f4a8911bc24fefb244d3d9 Mon Sep 17 00:00:00 2001 From: Cizz22 Date: Tue, 30 Sep 2025 12:27:44 +0700 Subject: [PATCH] fix --- src/calculation_budget_constrains/service.py | 200 +++++++----- src/calculation_target_reliability/schema.py | 3 +- src/calculation_target_reliability/service.py | 307 ++++++------------ 3 files changed, 219 insertions(+), 291 deletions(-) diff --git a/src/calculation_budget_constrains/service.py b/src/calculation_budget_constrains/service.py index 9aab57f..921b301 100644 --- a/src/calculation_budget_constrains/service.py +++ b/src/calculation_budget_constrains/service.py @@ -17,108 +17,152 @@ from src.overhaul_activity.service import get_all_by_session_id, get_standard_sc # ): # At the module level, add this dictionary to store persistent EAF values -_equipment_eaf_cache = {} +from collections import defaultdict +from uuid import UUID +from typing import List, Dict, Tuple + +from src.database.core import CollectorDbSession, DbSession +from src.overhaul_activity.service import get_standard_scope_by_session_id +from src.contribution_util import calculate_contribution_accurate -import random async def get_all_budget_constrains( - *, db_session: DbSession, collector_db: CollectorDbSession, session_id: UUID, simulation_result, cost_threshold: float = 100000000 -): - """Get all overhaul overview with EAF values that sum to 100%.""" - calc_result = simulation_result['calc_result'] - plant_result = simulation_result['plant_result'] - # plot_result = simulation_result['plot_result'] - + *, + db_session: DbSession, + collector_db: CollectorDbSession, + session_id: UUID, + simulation_result: Dict, + cost_threshold: float = 100_000_000, + use_optimal: bool = True, # default = optimal (knapsack) +) -> Tuple[List[Dict], List[Dict]]: + """ + Select equipment under budget constraint using contribution + cost efficiency. + Returns (priority_list, consequence_list). + """ + + calc_result = simulation_result["calc_result"] + plant_result = simulation_result["plant_result"] + equipments = await get_standard_scope_by_session_id( db_session=db_session, overhaul_session_id=session_id, - collector_db=collector_db + collector_db=collector_db, ) - # If no equipments found, return empty list if not equipments: return [], [] - # Create or retrieve persistent EAF values - global _equipment_eaf_cache - - # Generate EAF values for new equipment IDs - equipment_ids = [equipment.id for equipment in equipments] + # Flatten results eq_results = calc_result if isinstance(calc_result, list) else [calc_result] + # Calculate contribution map (node_name → contribution) equipments_eaf_contribution = calculate_asset_eaf_contributions( - plant_result=plant_result, - eq_results=eq_results, -) - - # Create result array of dictionaries - result = [ - { - "id": equipment.id, - "location_tag": equipment.location_tag, - "name": equipment.equipment_name, - "total_cost": equipment.overhaul_cost + equipment.service_cost, - "eaf_contribution": equipments_eaf_contribution.get(equipment.location_tag, 0), - #'cost_benefit_ratio': (equipment.overhaul_cost + equipment.service_cost) / equipments_eaf_contribution.get(equipment.location_tag, 0) if equipments_eaf_contribution.get(equipment.location_tag, 0) > 0 else 0 - } - for equipment in equipments - ] - - result.sort(key=lambda x: x['eaf_contribution'], reverse=True) - - priority_list = [] - total_cost = 0 - remaining_budget = cost_threshold - - for equipment in result: - # # Normalize cost (0-1) - higher cost = higher priority - # normalized_cost = equipment["total_cost"] / max_cost if max_cost > 0 else 0 - - # # Composite score: 70% EAF contribution + 30% cost impact - # # EAF contribution is already relative, so use directly - # equipment["priority_score"] = (0.7 * equipment["eaf_contribution"]) + (0.3 * normalized_cost) - - if equipment['total_cost'] <= remaining_budget: - # We can afford this improvement, so add it to the plan - priority_list.append(equipment) - total_cost += equipment['total_cost'] - remaining_budget -= equipment['total_cost'] - else: - # This candidate is too expensive for the remaining budget - # We break out of the loop. Since the list is sorted by ratio, - # anything after this is worse value and also won't fit. - # In a more complex solution, you might skip and keep looking for smaller items. - break - - # Sort by composite priority score (highest to lowest) - # result.sort(key=lambda x: x["priority_score"], reverse=True) - selected_components = {item['location_tag'] for item in priority_list} - consequence_list = [candidate for candidate in result if candidate['location_tag'] not in selected_components] - - consequence_list.sort(key=lambda x: x['eaf_contribution'], reverse=True) - priority_list.sort(key=lambda x: x['eaf_contribution'], reverse=True) + plant_result=plant_result, eq_results=eq_results + ) + # Build base result list + result = [] + for equipment in equipments: + contribution = equipments_eaf_contribution.get(equipment.location_tag, 0.0) + total_cost = (equipment.overhaul_cost or 0) + (equipment.service_cost or 0) + + result.append( + { + "id": equipment.id, + "location_tag": equipment.location_tag, + "name": equipment.equipment_name, + "total_cost": total_cost, + "eaf_contribution": contribution, + } + ) + + # Normalize contribution so sum = 1.0 + total_contrib = sum(item["eaf_contribution"] for item in result) or 1.0 + for item in result: + item["contribution_norm"] = item["eaf_contribution"] / total_contrib + + # Calculate efficiency and composite score + for item in result: + cost = item["total_cost"] or 1.0 + efficiency = item["contribution_norm"] / cost + item["priority_score"] = 0.7 * item["contribution_norm"] + 0.3 * efficiency + + # Choose method + if use_optimal: + priority_list, consequence_list = knapsack_selection(result, cost_threshold) + else: + priority_list, consequence_list = greedy_selection(result, cost_threshold) return priority_list, consequence_list -# def calculate_asset_eaf_contributions(plant_result, eq_results): """ - Calculate each asset's negative contribution to plant EAF - Higher contribution = more impact on reducing plant EAF + Calculate each asset's negative contribution to plant EAF. + Key assumption: eq_results have aeros_node.node_name matching equipment.location_tag. """ results = defaultdict(float) - - # availabilities = {asset.get('aeros_node').get('node_name'): asset.get('availability') - # for asset in eq_results} + for asset in eq_results: + node_name = asset.get("aeros_node", {}).get("node_name") + if node_name: + results[node_name] = asset.get("contribution", 0.0) + return results + - # importance_results = calculate_contribution_accurate(availabilities, "src/calculation_target_reliability/result.json") +def greedy_selection(equipments: List[dict], budget: float) -> Tuple[List[dict], List[dict]]: + """Greedy fallback: select items by score until budget is used.""" + # Sort by priority_score descending + equipments_sorted = sorted(equipments, key=lambda x: x["priority_score"], reverse=True) + total_cost = 0 + selected, excluded = [], [] + for eq in equipments_sorted: + if total_cost + eq["cost"] <= budget: + selected.append(eq) + total_cost += eq["cost"] + else: + excluded.append(eq) + return selected, excluded - for asset in eq_results: - results[asset['aeros_node']['node_name']] = asset['contribution'] - # Sort by contribution (worst contributors first) - # results = sorted(results.items(), key=lambda x: x[1], reverse=True) - return results \ No newline at end of file +def knapsack_selection(equipments: List[dict], budget: float, scale: int = 10_000_000) -> Tuple[List[dict], List[dict]]: + """ + Select equipment optimally within budget using 0/1 knapsack DP. + Uses scaling + 1D DP optimization to avoid MemoryError. + Falls back to greedy if W is too large. + """ + n = len(equipments) + + # Scale costs + budget + costs = [int(eq["total_cost"] // scale) for eq in equipments] + values = [eq["priority_score"] for eq in equipments] + W = int(budget // scale) + + # Fallback if W is still too large + if W > 1_000_000: + print("too big") + return greedy_selection(equipments, budget) + + # 1D DP array + dp = [0.0] * (W + 1) + keep = [[False] * (W + 1) for _ in range(n)] # track selection choices + + for i in range(n): + cost, value = costs[i], values[i] + for w in range(W, cost - 1, -1): + if dp[w - cost] + value > dp[w]: + dp[w] = dp[w - cost] + value + keep[i][w] = True + + # Backtrack to find selected items + selected, excluded = [], [] + w = W + for i in range(n - 1, -1, -1): + if keep[i][w]: + selected.append(equipments[i]) + w -= costs[i] + else: + excluded.append(equipments[i]) + + return selected, excluded + \ No newline at end of file diff --git a/src/calculation_target_reliability/schema.py b/src/calculation_target_reliability/schema.py index f58fd1e..9855ed8 100644 --- a/src/calculation_target_reliability/schema.py +++ b/src/calculation_target_reliability/schema.py @@ -38,6 +38,7 @@ class AssetWeight(OverhaulBase): required_improvement: float num_of_failures: int down_time: float + efficiency: float class MaintenanceScenario(OverhaulBase): location_tag: str @@ -52,7 +53,7 @@ class OptimizationResult(OverhaulBase): current_plant_eaf: float target_plant_eaf: float eaf_gap: float - asset_contributions: List[AssetWeight] + asset_contributions: List[dict] optimization_success: bool = False simulation_id: Optional[str] = None diff --git a/src/calculation_target_reliability/service.py b/src/calculation_target_reliability/service.py index eca6f73..75dfad3 100644 --- a/src/calculation_target_reliability/service.py +++ b/src/calculation_target_reliability/service.py @@ -74,118 +74,70 @@ async def get_simulation_results(*, simulation_id: str, token: str): "plant_result": plant_data } -# def calculate_asset_weights(plant_result, eq_results): -# """ -# Calculate each asset's contribution weight to plant EAF -# Based on production capacity and criticality -# """ -# plant_ideal_production = plant_result.get('ideal_production', 0) -# results = [] - -# for asset in eq_results: -# # Weight based on production capacity -# capacity_weight = asset.get('ideal_production', 0) / plant_ideal_production if plant_ideal_production > 0 else 0 - -# # Get asset EAF -# asset_eaf = asset.get('eaf', 0) - -# # Calculate EAF impact (how much this asset affects plant EAF) -# eaf_impact = (100 - asset_eaf) * capacity_weight - -# asset_weight = AssetWeight( -# node=asset.get('aeros_node'), -# capacity_weight=capacity_weight, -# eaf_impact=eaf_impact, -# eaf=asset_eaf, -# num_of_failures=asset.get('num_events', 0), -# ideal_production=asset.get('ideal_production', 0), -# downtime_hours=asset -# ) -# results.append(asset_weight) - -# return results - def calculate_asset_eaf_contributions(plant_result, eq_results, standard_scope, eaf_gap): """ Calculate each asset's contribution to plant EAF with realistic improvement potential. - Higher contribution = more impact on improving plant EAF + Ranking: + 1. Highest contribution (Birnbaum Importance) + 2. Tie-breaker: Contribution per downtime (efficiency) """ - # Convert EAF gap from percentage to fraction if needed - # Assuming eaf_gap is a percentage (e.g., 1.0 for 1%), convert to fraction + eaf_gap_fraction = eaf_gap / 100.0 if eaf_gap > 1.0 else eaf_gap - - results = [] - filtered_assets = [] # To track assets that were filtered out - - # # Get availabilities and calculate importance - # availabilities = {asset.get('aeros_node').get('node_name'): asset.get('availability') - # for asset in eq_results} - # importance_results = calculate_contribution_accurate(availabilities, "src/calculation_target_reliability/result.json") - - # Define realistic thresholds - MIN_BIRNBAUM_IMPORTANCE = 0.0005 # Filter out components with very low impact - REALISTIC_MAX_AVAILABILITY = 0.995 # 99.5% practical maximum - MIN_IMPROVEMENT_PERCENT = 0.005 # Minimum improvement to consider (0.5%) + + MIN_BIRNBAUM_IMPORTANCE = 0.0005 + REALISTIC_MAX_AVAILABILITY = 0.995 # 99.5% + MIN_IMPROVEMENT_PERCENT = 0.005 # 0.5% min_improvement_fraction = MIN_IMPROVEMENT_PERCENT / 100.0 + results = [] + for asset in eq_results: - asset_name = asset.get('aeros_node').get('node_name') - - # Skip if not in standard scope + asset_name = asset.get("aeros_node").get("node_name") if asset_name not in standard_scope: continue - - - birnbaum = asset.get('contribution') - current_availability = asset.get('availability') - - # Calculate required improvement - required_impr = 0.005 * birnbaum - - # # CHECK FILTERS - Is this asset worth considering? - # filter_reason = None - - # # Filter 1: Is the component important enough? - # if birnbaum < MIN_BIRNBAUM_IMPORTANCE: - # filter_reason = f"Low importance (Birnbaum: {birnbaum:.4f} < {MIN_BIRNBAUM_IMPORTANCE})" - - # # Filter 2: Would improvement exceed realistic maximum? - # elif (current_availability + required_impr) > REALISTIC_MAX_AVAILABILITY: - # filter_reason = f"Exceeds realistic maximum ({current_availability + required_impr:.3f} > {REALISTIC_MAX_AVAILABILITY})" - - # # Filter 3: Is the improvement too small to be worthwhile? - # elif required_impr < min_improvement_fraction: - # filter_reason = f"Improvement too small ({required_impr*100:.2f}% < {MIN_IMPROVEMENT_PERCENT}%)" - - # # If filtered, add to filtered list and skip - # if filter_reason: - # filtered_assets.append({ - # 'asset': asset_name, - # 'reason': filter_reason, - # 'birnbaum': birnbaum, - # 'current_availability': current_availability, - # 'required_improvement': required_impr - # }) - # continue - - # If it passed all filters, include it in results + + birnbaum = asset.get("contribution", 0.0) + current_availability = asset.get("availability", 0.0) + downtime = asset.get("total_downtime", 0.0) + + # Filter 1: Importance too low + if birnbaum < MIN_BIRNBAUM_IMPORTANCE: + continue + + # Max possible availability improvement + max_possible_improvement = REALISTIC_MAX_AVAILABILITY - current_availability + if max_possible_improvement <= 0: + continue + + # Required improvement (limited by plant gap and availability ceiling) + required_impr = min(eaf_gap_fraction, max_possible_improvement) * birnbaum + + # Filter 2: Improvement too small + if required_impr < min_improvement_fraction: + continue + + # Contribution efficiency (secondary metric) + efficiency = birnbaum / downtime if downtime > 0 else birnbaum + contribution = AssetWeight( - node=asset.get('aeros_node'), + node=asset.get("aeros_node"), availability=current_availability, contribution=birnbaum, required_improvement=required_impr, - num_of_failures=asset.get('num_events', 0), - down_time=asset.get('total_downtime') + num_of_failures=asset.get("num_events", 0), + down_time=downtime, + efficiency= efficiency ) + + results.append(contribution) - # raise Exception(filtered_assets) - - # Sort by contribution (Birnbaum Importance) - most critical first - results.sort(key=lambda x: x.contribution, reverse=True) - + # Sort: 1) contribution (desc), 2) efficiency (desc) + results.sort(key=lambda x: (x.contribution, x.efficiency), reverse=True) return results + + def project_eaf_improvement(asset: AssetWeight, improvement_factor: float = 0.3) -> float: """ Project EAF improvement after maintenance @@ -197,145 +149,76 @@ def project_eaf_improvement(asset: AssetWeight, improvement_factor: float = 0.3) projected_eaf = 100 - improved_downtime_pct return min(projected_eaf, 99.9) # Cap at 99.9% -async def identify_worst_eaf_contributors(*, simulation_result, target_eaf: float, db_session: DbSession, oh_session_id: str, collector_db: CollectorDbSession, simulation_id: str): +async def identify_worst_eaf_contributors( + *, + simulation_result, + target_eaf: float, + db_session: DbSession, + oh_session_id: str, + collector_db: CollectorDbSession, + simulation_id: str, +): """ Identify equipment that contributes most to plant EAF reduction - - Args: - simulation_result: Dictionary containing calc_result and plant_result - target_eaf: Target plant EAF percentage (for gap calculation) - - Returns: - OptimizationResult with asset contributions ranked by impact + in order to reach a target EAF. """ - # Calculate current plant EAF and asset contributions - calc_result = simulation_result['calc_result'] - plant_result = simulation_result['plant_result'] - # plot_result = simulation_result['plot_result'] - # Get equipment results from calc_result + # Extract results + calc_result = simulation_result["calc_result"] + plant_result = simulation_result["plant_result"] + + # Ensure list of equipment eq_results = calc_result if isinstance(calc_result, list) else [calc_result] - current_plant_eaf = plant_result.get("eaf", 0) - eaf_gap = (target_eaf - current_plant_eaf)/100.0 - # # Verify our calculation by summing contributions - # total_calculated_downtime = sum(contrib.eaf_impact for contrib in asset_contributions) - # calculated_plant_eaf = 100 - total_calculated_downtime + # Current plant EAF and gap + current_plant_eaf = plant_result.get("eaf", 0) + eaf_gap = (target_eaf - current_plant_eaf) / 100.0 + # Get standard scope (equipment allowed for overhaul/optimization) standard_scope = await get_standard_scope_by_session_id( db_session=db_session, overhaul_session_id=oh_session_id, - collector_db=collector_db + collector_db=collector_db, ) - standard_scope_location_tags = [tag.location_tag for tag in standard_scope] - - asset_contributions = calculate_asset_eaf_contributions(plant_result, eq_results, standard_scope_location_tags, eaf_gap=eaf_gap) - + + # Compute contributions + asset_contributions = calculate_asset_eaf_contributions( + plant_result, eq_results, standard_scope_location_tags, eaf_gap=eaf_gap + ) + project_eaf_improvement = 0.0 selected_eq = [] - - for asset in asset_contributions: + + # Greedy select until gap is closed + for asset in asset_contributions: + if project_eaf_improvement >= eaf_gap: + break + if (project_eaf_improvement + asset.required_improvement) <= eaf_gap: selected_eq.append(asset) project_eaf_improvement += asset.required_improvement else: - break - - # optimization_success = current_plant_eaf + project_eaf_improvement >= target_eaf + # allow overshoot tolerance by skipping large ones, continue with smaller ones + continue + # Build output with efficiency included return OptimizationResult( current_plant_eaf=current_plant_eaf, target_plant_eaf=target_eaf, eaf_gap=eaf_gap, - asset_contributions=selected_eq, - optimization_success=True, - simulation_id=simulation_id - ) - -# def optimize_maintenance_priority(*, simulation_result, target_eaf: float): -# """ -# Optimize maintenance priorities to achieve target plant EAF - -# Args: -# simulation_result: Dictionary containing calc_result and plant_result -# target_eaf: Target plant EAF percentage - -# Returns: -# OptimizationResult with recommendations -# """ -# # Calculate current plant EAF and asset weights -# calc_result = simulation_result['calc_result'] -# plant_result = simulation_result['plant_result'] - -# # Get equipment results from calc_result -# eq_results = calc_result if isinstance(calc_result, list) else [calc_result] - -# equipment_eaf_weights = calculate_asset_weights(plant_result, eq_results) -# current_plant_eaf = plant_result.get("eaf", 0) - -# if current_plant_eaf >= target_eaf: -# return OptimizationResult( -# current_plant_eaf=current_plant_eaf, -# target_plant_eaf=target_eaf, -# eaf_gap=0.0, -# recommended_assets=[], -# projected_plant_eaf=current_plant_eaf, -# optimization_success=True -# ) - -# # Create maintenance scenarios for all assets -# scenarios = [] -# for asset in equipment_eaf_weights: -# if asset.eaf < 99.9: # Only consider assets with improvement potential -# projected_eaf = project_eaf_improvement(asset) -# eaf_improvement = projected_eaf - asset.eaf - -# # Calculate plant-level benefit (how much this asset improvement affects plant EAF) -# plant_benefit = eaf_improvement * asset.capacity_weight - -# # Priority score combines multiple factors -# priority_score = ( -# plant_benefit * 0.8 + # Plant impact (50%) -# (100 - asset.eaf) * asset.capacity_weight * 0.2 # Current performance gap (30%) -# # asset.num_of_failures * asset.capacity_weight * 0.1 # Failure frequency (20%) -# ) - -# scenario = MaintenanceScenario( -# location_tag=asset.node['node_name'], # Placeholder - replace with actual name -# current_eaf=asset.eaf, -# projected_eaf_improvement=eaf_improvement, -# priority_score=priority_score, -# plant_level_benefit=plant_benefit, -# capacity_weight=asset.capacity_weight -# ) -# scenarios.append(scenario) - -# # Sort by priority score (highest first) -# scenarios.sort(key=lambda x: x.priority_score, reverse=True) - -# # Select assets for maintenance to achieve target EAF -# selected_scenarios = [] -# cumulative_benefit = 0.0 - -# eaf_gap = target_eaf - current_plant_eaf - -# for scenario in scenarios: -# if cumulative_benefit < eaf_gap: -# selected_scenarios.append(scenario) -# cumulative_benefit += scenario.plant_level_benefit - -# if cumulative_benefit >= eaf_gap: -# break - -# projected_plant_eaf = current_plant_eaf + cumulative_benefit -# optimization_success = projected_plant_eaf >= target_eaf - -# return OptimizationResult( -# current_plant_eaf=current_plant_eaf, -# target_plant_eaf=target_eaf, -# eaf_gap=eaf_gap, -# recommended_assets=selected_scenarios, -# projected_plant_eaf=projected_plant_eaf, -# optimization_success=optimization_success -# ) + asset_contributions=[ + { + "node": asset.node, + "availability": asset.availability, + "contribution": asset.contribution, + "required_improvement": asset.required_improvement, + "num_of_failures": asset.num_of_failures, + "down_time": asset.down_time, + "efficiency": asset.efficiency, + } + for asset in selected_eq + ], + optimization_success=(current_plant_eaf + project_eaf_improvement) >= target_eaf, + simulation_id=simulation_id, + ) \ No newline at end of file