diff --git a/src/calculation_time_constrains/flows.py b/src/calculation_time_constrains/flows.py index da53f40..e84be27 100644 --- a/src/calculation_time_constrains/flows.py +++ b/src/calculation_time_constrains/flows.py @@ -21,7 +21,7 @@ from .service import (create_calculation_result_service, create_param_and_data, get_calculation_by_reference_and_parameter, get_calculation_data_by_id, get_calculation_result, get_corrective_cost_time_chart, - get_overhaul_cost_by_time_chart, run_simulation) + get_overhaul_cost_by_time_chart, run_simulation, run_simulation_with_spareparts) from src.database.core import CollectorDbSession @@ -97,7 +97,7 @@ async def create_calculation( # results = await create_calculation_result_service( # db_session=db_session, calculation=calculation_data, token=token # ) - results = await run_simulation( + results = await run_simulation_with_spareparts( db_session=db_session, calculation=calculation_data, token=token, collector_db_session=collector_db_session ) diff --git a/src/calculation_time_constrains/schema.py b/src/calculation_time_constrains/schema.py index a447f62..dddb964 100644 --- a/src/calculation_time_constrains/schema.py +++ b/src/calculation_time_constrains/schema.py @@ -46,12 +46,15 @@ class CalculationTimeConstrainsParametersCreate(CalculationTimeConstrainsBase): class CalculationResultsRead(CalculationTimeConstrainsBase): - day: int - corrective_cost: float overhaul_cost: float + corrective_cost: float procurement_cost: float - procurement_details: Optional[Dict[str, Any]] num_failures: float + day: int + month: int + total_cost: float + procurement_details: Dict + sparepart_summary: dict class OptimumResult(CalculationTimeConstrainsBase): @@ -74,14 +77,44 @@ class EquipmentResult(CalculationTimeConstrainsBase): is_included: bool master_equipment: Optional[MasterEquipmentBase] = Field(None) +class FleetStatistics(CalculationTimeConstrainsBase): + total_equipment: int + included_equipment: int + excluded_equipment: int + equipment_with_sparepart_constraints: int + total_procurement_items: int + critical_procurement_items: int + +class OptimalAnalysis(CalculationTimeConstrainsBase): + optimal_month: int + planned_month: Optional[int] + timing_recommendation: str + optimal_total_cost: float + optimal_breakdown: Dict + cost_trend: str + months_from_planned: Optional[int] + cost_savings_vs_planned: Optional[float] + sparepart_impact: Dict + +class AnalysisMetadata(CalculationTimeConstrainsBase): + max_interval_months: int + last_overhaul_date: Optional[str] + next_planned_overhaul: str + calculation_type: str + total_equipment_analyzed: int + included_in_optimization: int class CalculationTimeConstrainsRead(CalculationTimeConstrainsBase): id: UUID reference: UUID scope: str results: List[CalculationResultsRead] + optimum_oh: int + optimum_oh_month: int equipment_results: List[EquipmentResult] - optimum_oh: Any + fleet_statistics: dict + optimal_analysis: dict + analysis_metadata: dict class CalculationTimeConstrainsCreate(CalculationTimeConstrainsBase): diff --git a/src/calculation_time_constrains/service.py b/src/calculation_time_constrains/service.py index 62665ed..904dc6f 100644 --- a/src/calculation_time_constrains/service.py +++ b/src/calculation_time_constrains/service.py @@ -10,11 +10,13 @@ import numpy as np import requests from fastapi import HTTPException, status from sqlalchemy import and_, case, func, select, update -from sqlalchemy.orm import joinedload +from sqlalchemy.orm import joinedload, selectinload from src.database.core import DbSession +from src.logging import setup_logging from src.overhaul_activity.service import get_all as get_all_by_session_id from src.overhaul_scope.service import get as get_scope, get_prev_oh +from src.sparepart.service import load_sparepart_data_from_db from src.utils import get_latest_numOfFail from src.workorder.model import MasterWorkOrder from src.sparepart.model import MasterSparePart @@ -27,7 +29,7 @@ from .schema import (CalculationResultsRead, CalculationTimeConstrainsParametersCreate, CalculationTimeConstrainsRead, OptimumResult) -from .utils import calculate_failures_per_month, create_time_series_data, get_months_between +from .utils import analyze_monthly_metrics, calculate_failures_per_month, calculate_risk_cost_per_failure, create_time_series_data, get_monthly_risk_analysis, get_months_between from src.equipment_sparepart.model import ScopeEquipmentPart import copy import random @@ -45,820 +47,71 @@ import json client = httpx.AsyncClient(timeout=300.0) -# class ReliabilityService: -# """Service class for handling reliability API calls""" - -# def __init__(self, base_url: str = "http://192.168.1.82:8000", use_dummy_data=False): -# self.base_url = base_url -# self.use_dummy_data = use_dummy_data - -# async def get_number_of_failures(self, location_tag, start_date, end_date, token, max_interval=24): -# if self.use_dummy_data: -# return self._generate_dummy_failure_data(location_tag, start_date, end_date, max_interval) - -# url_prediction = ( -# f"{self.base_url}/main/number-of-failures/" -# f"{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" -# ) -# results = {} - -# try: -# response = requests.get( -# url_prediction, -# headers={ -# "Content-Type": "application/json", -# "Authorization": f"Bearer {token}", -# }, -# timeout=10 -# ) -# response.raise_for_status() -# prediction_data = response.json() -# except (requests.RequestException, ValueError) as e: -# raise Exception(f"Failed to fetch or parse prediction data: {e}") - -# if not prediction_data or "data" not in prediction_data or not isinstance(prediction_data["data"], list): -# raise Exception("Invalid or empty prediction data format.") - -# # Since data is cumulative, we need to preserve the decimal values -# last_cumulative_value = 0 - -# # Parse prediction data and preserve cumulative nature -# for item in prediction_data["data"]: -# try: -# date = datetime.datetime.strptime(item["date"], "%d %b %Y") -# last_day = calendar.monthrange(date.year, date.month)[1] -# value = item.get("num_fail", 0) - -# if date.day == last_day: # End of month -# if value is not None and value > 0: -# # PRESERVE the decimal values - don't convert to int! -# results[date.date()] = round(float(value), 3) # Keep 3 decimal places -# last_cumulative_value = float(value) -# else: -# # If no value, use previous cumulative value -# results[date.date()] = last_cumulative_value - -# except (KeyError, ValueError): -# continue - -# # Fill missing months by continuing the cumulative trend -# current = start_date.replace(day=1) - -# for _ in range(max_interval): -# last_day = calendar.monthrange(current.year, current.month)[1] -# last_day_date = datetime.date(current.year, current.month, last_day) - -# if last_day_date not in results: -# # Since it's cumulative, add a small increment to continue the trend -# # You can adjust this increment based on your typical monthly increase -# monthly_increment = 0.05 # Adjust this value based on your data pattern -# last_cumulative_value += monthly_increment -# results[last_day_date] = round(last_cumulative_value, 3) -# else: -# # Update our tracking value -# last_cumulative_value = results[last_day_date] - -# # Move to next month -# if current.month == 12: -# current = current.replace(year=current.year + 1, month=1) -# else: -# current = current.replace(month=current.month + 1) - -# # Sort results by date -# results = dict(sorted(results.items())) -# return results - -# def _generate_dummy_failure_data(self, location_tag: str, start_date: datetime.date, end_date: datetime.date, max_interval: int = 24) -> Dict[datetime.date, float]: -# """ -# Generate realistic dummy failure prediction data for demonstration purposes. -# Creates a realistic pattern with seasonal variations and some randomness. -# """ -# results = {} - -# # Set seed based on location_tag for consistent results per location -# random.seed(hash(location_tag) % 1000) - -# # Base parameters for realistic failure patterns -# base_monthly_failures = random.uniform(0.5, 1.25) # Base failures per month -# seasonal_amplitude = random.uniform(0.3, 0.8) # Seasonal variation strength -# trend_slope = random.uniform(-0.01, 0.02) # Long-term trend (slight increase over time) -# noise_level = random.uniform(0.1, 0.3) # Random variation - -# # Determine equipment factor random between 1.0 and 1.9 -# equipment_factor = random.uniform(1.0, 1.9) -# current = start_date.replace(day=1) -# cumulative_failures = 0 -# month_count = 0 - -# for _ in range(max_interval): -# #If month count == 0, set cumulative_failures to 0 -# last_day = calendar.monthrange(current.year, current.month)[1] -# last_day_date = datetime.datetime(current.year, current.month, last_day) - - -# # Stop if we've passed the end_date -# if last_day_date > end_date: -# break - -# # Calculate seasonal factor (higher in summer/winter, lower in spring/fall) -# seasonal_factor = 1 + seasonal_amplitude * math.sin(2 * math.pi * current.month / 12) - -# # Calculate trend factor (gradual increase over time) -# trend_factor = 1 + trend_slope * month_count - -# # Calculate noise (random variation) -# noise_factor = 1 + random.uniform(-noise_level, noise_level) - -# # Calculate monthly failures (non-cumulative) -# monthly_failures = (base_monthly_failures * -# equipment_factor * -# seasonal_factor * -# trend_factor * -# noise_factor) - -# # Ensure minimum realistic value -# monthly_failures = max(0.1, monthly_failures) - -# # Add to cumulative total -# cumulative_failures += monthly_failures - -# # Store cumulative value rounded to 3 decimal places -# results[last_day_date] = round(cumulative_failures, 3) if month_count > 0 else 0 - -# # Move to next month -# month_count += 1 -# if current.month == 12: -# current = current.replace(year=current.year + 1, month=1) -# else: -# current = current.replace(month=current.month + 1) - -# return dict(sorted(results.items())) - -# async def get_equipment_foh(self, location_tag: str, token: str) -> float: -# """ -# Get forced outage hours for equipment -# """ -# url = f"{self.base_url}/asset/mdt/{location_tag}" -# headers = { -# "Content-Type": "application/json", -# "Authorization": f"Bearer {token}", -# } - -# try: -# response = requests.get(url, headers=headers, timeout=10) -# response.raise_for_status() -# result = response.json() -# return result["data"]["hours"] -# except (requests.RequestException, ValueError) as e: -# raise Exception(f"Failed to fetch FOH data for {location_tag}: {e}") - -# def _parse_failure_predictions( -# self, -# prediction_data: List[dict], -# start_date: datetime.date, -# max_interval: int -# ) -> Dict[datetime.date, int]: -# """ -# Parse and normalize failure prediction data -# """ -# results = {} - -# # Parse prediction data -# for item in prediction_data: -# try: -# date = datetime.datetime.strptime(item["date"], "%d %b %Y").date() -# last_day = calendar.monthrange(date.year, date.month)[1] -# value = item.get("num_fail", 0) - -# if date.day == last_day: -# if date.month == start_date.month and date.year == start_date.year: -# results[date] = 0 -# else: -# results[date] = max(0, int(value)) if value is not None else 0 -# except (KeyError, ValueError): -# continue - -# # Fill missing months with 0 -# current = start_date.replace(day=1) -# for _ in range(max_interval): -# last_day = calendar.monthrange(current.year, current.month)[1] -# last_day_date = datetime.date(current.year, current.month, last_day) - -# if last_day_date not in results: -# results[last_day_date] = 0 - -# # Move to next month -# if current.month == 12: -# current = current.replace(year=current.year + 1, month=1) -# else: -# current = current.replace(month=current.month + 1) - -# return dict(sorted(results.items())) - - -# class SparePartsService: -# """Service class for spare parts management and procurement calculations""" - -# def __init__(self, spare_parts_db: dict): -# self.spare_parts_db = spare_parts_db - -# def calculate_stock_at_date(self, sparepart_id: UUID, target_date: datetime.date): -# """ -# Calculate projected stock for a spare part at a specific date -# """ -# if sparepart_id not in self.spare_parts_db: -# return 0 - -# spare_part = self.spare_parts_db[sparepart_id] -# projected_stock = spare_part["stock"] - -# # Add all procurements that arrive by target_date -# for procurement in spare_part["data"].sparepart_procurements: -# eta_date = getattr(procurement, procurement.status, None) -# if eta_date and eta_date <= target_date: -# projected_stock += procurement.quantity - -# return projected_stock - -# async def reduce_stock(self, db_session, location_tag: str): -# requirements_query = select(ScopeEquipmentPart).where( -# ScopeEquipmentPart.location_tag == location_tag -# ) - -# requirements = await db_session.execute(requirements_query) -# requirements = requirements.scalars().all() - -# for requirement in requirements: -# sparepart_id = requirement.sparepart_id -# quantity_needed = requirement.required_stock - -# if sparepart_id in self.spare_parts_db: -# self.spare_parts_db[sparepart_id]["stock"] -= quantity_needed - -# async def check_spare_parts_availability( -# self, -# db_session: DbSession, -# equipment: OverhaulActivity, -# overhaul_date: datetime.date -# ) -> Tuple[bool, List[dict]]: -# """ -# Check if spare parts are available for equipment overhaul at specific date. -# If not available, calculate procurement costs needed. -# """ -# procurement_costs = [] -# all_available = True - -# requirements_query = select(ScopeEquipmentPart).where( -# ScopeEquipmentPart.location_tag == equipment.location_tag -# ) - -# requirements = await db_session.execute(requirements_query) -# requirements = requirements.scalars().all() - - - -# for requirement in requirements: -# sparepart_id = requirement.sparepart_id -# quantity_needed = requirement.required_stock - -# if sparepart_id not in self.spare_parts_db: -# raise Exception(f"Spare part {sparepart_id} not found in database") - -# spare_part = self.spare_parts_db[sparepart_id] -# spare_part_data = spare_part["data"] -# available_stock = self.calculate_stock_at_date(sparepart_id, overhaul_date) - -# if available_stock < quantity_needed: -# # Need to procure additional stock -# shortage = quantity_needed - available_stock -# procurement_cost = { -# "sparepart_id": str(sparepart_id), -# "sparepart_name": spare_part_data.name, -# "quantity": shortage, -# "cost_per_unit": spare_part_data.cost_per_stock, -# "total_cost": shortage * spare_part_data.cost_per_stock, -# "description": f"Insufficient projected stock for {spare_part_data.name} on {overhaul_date} (need: {quantity_needed}, projected: {available_stock})" -# } -# procurement_costs.append(procurement_cost) -# all_available = False -# return all_available, procurement_costs - -# class OverhaulCalculator: - - - -# """Main calculator for overhaul cost optimization""" - -# def __init__( -# self, -# reliability_service: ReliabilityService, -# spare_parts_service: SparePartsService -# ): -# self.reliability_service = reliability_service -# self.spare_parts_service = spare_parts_service - -# async def simulate_equipment_overhaul( -# self, -# db_session: DbSession, -# equipment, -# preventive_cost: float, -# predicted_failures: Dict[datetime.date, int], -# interval_months: int, -# forced_outage_hours: float, -# start_date: datetime.date, -# total_months: int = 24 -# ): -# """ -# Simulate overhaul strategy for specific equipment including spare parts costs -# """ -# total_preventive_cost = 0 -# total_corrective_cost = 0 -# total_procurement_cost = 0 -# all_procurement_details = [] -# months_since_overhaul = 0 - -# # Convert failures dict to month-indexed dict -# failures_by_month = { -# i: val for i, (date, val) in enumerate(sorted(predicted_failures.items())) -# } - -# cost_per_failure = equipment.material_cost - -# # Simulate for the total period -# for month in range(total_months): -# # Calculate current date -# current_date = self._add_months_to_date(start_date, month) - -# # Check if it's time for overhaul -# if months_since_overhaul >= interval_months: -# # Perform preventive overhaul -# total_preventive_cost += preventive_cost -# months_since_overhaul = 0 - -# # Calculate corrective costs -# if months_since_overhaul == 0: -# expected_failures = 0 # No failures immediately after overhaul -# else: -# expected_failures = round(failures_by_month.get(months_since_overhaul, 0)) - -# equivalent_force_derated_hours = 0 # Can be enhanced based on requirements -# failure_cost = ( -# (expected_failures * cost_per_failure) + -# ((forced_outage_hours + equivalent_force_derated_hours) * equipment.service_cost) -# ) -# total_corrective_cost += failure_cost - -# months_since_overhaul += 1 - - -# overhaul_target_date = self._add_months_to_date(start_date, interval_months) -# # Check spare parts availability and calculate procurement costs -# parts_available, procurement_costs = await self.spare_parts_service.check_spare_parts_availability( -# db_session, -# equipment, -# overhaul_target_date -# ) - -# # Add procurement costs if parts are not available -# if not parts_available: -# month_procurement_cost = sum(pc["total_cost"] for pc in procurement_costs) -# total_procurement_cost += month_procurement_cost -# all_procurement_details.extend(procurement_costs) - -# # Calculate monthly averages -# monthly_preventive_cost = total_preventive_cost / total_months -# monthly_corrective_cost = total_corrective_cost / total_months -# monthly_total_cost = monthly_preventive_cost + monthly_corrective_cost + total_procurement_cost - - -# return { -# "interval_months":interval_months, -# "preventive_cost":monthly_preventive_cost, -# "corrective_cost":monthly_corrective_cost, -# "procurement_cost":total_procurement_cost, -# "total_cost":monthly_total_cost, -# "procurement_details":all_procurement_details -# } - -# async def find_optimal_overhaul_interval( -# self, -# db_session: DbSession, -# equipment, -# preventive_cost: float, -# predicted_failures: Dict[datetime.date, int], -# forced_outage_hours: float, -# start_date: datetime.date, -# max_interval: int = 24 -# ): -# """ -# Find optimal overhaul interval by testing different intervals -# """ -# all_results = [] - -# for interval in range(1, max_interval + 1): -# result = await self.simulate_equipment_overhaul( -# db_session=db_session, -# equipment=equipment, -# preventive_cost=preventive_cost, -# predicted_failures=predicted_failures, -# interval_months=interval, -# forced_outage_hours=forced_outage_hours, -# start_date=start_date, -# total_months=max_interval -# ) -# all_results.append(result) - -# # Find optimal result (minimum total cost) -# optimal_result = min(all_results, key=lambda x: x["total_cost"]) - -# return optimal_result, all_results - -# async def calculate_fleet_optimization( -# self, -# db_session: DbSession, -# equipments: list, -# overhaul_cost: float, -# start_date: datetime.date, -# end_date: datetime.date, -# calculation, -# token: str -# ) -> Dict: -# """ -# Calculate optimization for entire fleet of equipment -# """ -# max_interval = self._get_months_between(start_date, end_date) -# preventive_cost_per_equipment = overhaul_cost / len(equipments) - -# fleet_results = [] -# total_corrective_costs = np.zeros(max_interval) -# total_preventive_costs = np.zeros(max_interval) -# total_procurement_costs = np.zeros(max_interval) -# total_failures = np.zeros(max_interval) - -# for equipment in equipments: -# # Get reliability data -# predicted_failures = await self.reliability_service.get_number_of_failures( -# location_tag=equipment.equipment.location_tag, -# start_date=start_date, -# end_date=end_date, -# token=token, -# max_interval=max_interval -# ) - - -# forced_outage_hours = await self.reliability_service.get_equipment_foh( -# location_tag=equipment.equipment.location_tag, -# token=token -# ) - -# # Find optimal interval for this equipment -# optimal_result, all_results = await self.find_optimal_overhaul_interval( -# db_session=db_session, -# equipment=equipment, -# preventive_cost=preventive_cost_per_equipment, -# predicted_failures=predicted_failures, -# forced_outage_hours=forced_outage_hours, -# start_date=start_date, -# max_interval=max_interval -# ) - -# #reduce sparepart stock -# await self.spare_parts_service.reduce_stock(db_session, equipment.location_tag) - -# # Aggregate costs -# corrective_costs = [r["corrective_cost"] for r in all_results] -# preventive_costs = [r["preventive_cost"] for r in all_results] -# procurement_costs = [r["procurement_cost"] for r in all_results] -# procurement_details = [r["procurement_details"] for r in all_results] -# failures = [round(r) for r in predicted_failures.values()] - - -# fleet_results.append( -# CalculationEquipmentResult( -# corrective_costs=corrective_costs, -# overhaul_costs=preventive_costs, -# procurement_costs=procurement_costs, -# daily_failures=failures, -# assetnum=equipment.assetnum, -# material_cost=equipment.material_cost, -# service_cost=equipment.service_cost, -# optimum_day=optimal_result["interval_months"], -# calculation_data_id=calculation.id, -# master_equipment=equipment.equipment, -# procurement_details=procurement_details -# ) -# ) - -# total_corrective_costs += np.array(corrective_costs) -# total_preventive_costs += np.array(preventive_costs) -# total_procurement_costs += np.array(procurement_costs) - -# # Calculate fleet optimal interval -# total_costs = total_corrective_costs + total_preventive_costs + total_procurement_costs -# fleet_optimal_index = np.argmin(total_costs) -# calculation.optimum_oh_day =fleet_optimal_index + 1 - -# db_session.add_all(fleet_results) -# await db_session.commit() - -# return { -# 'id': calculation.id, -# 'fleet_results': fleet_results, -# 'fleet_optimal_interval': fleet_optimal_index + 1, -# 'fleet_optimal_cost': total_costs[fleet_optimal_index], -# 'total_corrective_costs': total_corrective_costs.tolist(), -# 'total_preventive_costs': total_preventive_costs.tolist(), -# 'total_procurement_costs': total_procurement_costs.tolist(), -# } - -# def _add_months_to_date(self, start_date: datetime.date, months: int) -> datetime.date: -# """Helper method to add months to a date""" -# year = start_date.year -# month = start_date.month + months - -# while month > 12: -# year += 1 -# month -= 12 - -# return datetime.date(year, month, start_date.day) - -# def _get_months_between(self, start_date: datetime.date, end_date: datetime.date) -> int: -# """Calculate number of months between two dates""" -# return (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month) - - -# class OptimumCostModel: -# def __init__(self, token, last_oh_date, next_oh_date,interval =30, base_url: str = "http://192.168.1.82:8000"): -# self.api_base_url = base_url -# self.token = token -# self.last_oh_date = last_oh_date -# self.next_oh_date = next_oh_date -# self.interval=30 - - -# def _get_months_between(self, start_date: datetime.date, end_date: datetime.date) -> int: -# """Calculate number of months between two dates""" -# return (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month) - -# def _get_reliability_from_api(self, target_date, location_tag): -# """ -# Get reliability value from API for a specific date. - -# Parameters: -# target_date: datetime object for the target date - -# Returns: -# reliability value (float) -# """ -# # Format date for API call -# date_str = target_date.strftime('%Y-%m-%d %H:%M:%S.%f') - -# # Construct API URL -# url = f"{self.api_base_url}/calculate/reliability/{location_tag}/{date_str}" - -# header = { -# 'Content-Type': 'application/json', -# 'Authorization': 'Bearer ' + self.token -# } - -# try: -# response = requests.get(url, headers=header) -# response.raise_for_status() - -# data = response.json() - -# reliability = data['data']['value'] - -# return reliability - -# except requests.RequestException as e: -# print(f"API Error: {e}") -# return None -# except KeyError as e: -# print(f"Data parsing error: {e}") - -# return None - - -# def _get_reliability_equipment(self, location_tag): -# current_date = self.last_oh_date -# results = defaultdict() - -# print("Fetching reliability data from API...") - -# while current_date <= self.next_oh_date: -# reliability = self._get_reliability_from_api(current_date, location_tag) -# results[current_date] = reliability - -# if reliability is not None: -# print(f"Date: {current_date.strftime('%Y-%m-%d')}, Reliability: {reliability:.6f}") -# else: -# print(f"Date: {current_date.strftime('%Y-%m-%d')}, Reliability: Failed to fetch") - -# current_date += timedelta(days=31) - -# return results - - -# def _calculate_costs_each_point(self, reliabilities, preventive_cost: float, failure_replacement_cost: float): -# # Calculate costs for each time point -# results = [] -# dates = list(reliabilities.keys()) -# reliabilities_list = list(reliabilities.values()) - -# for i, (date, reliability) in enumerate(reliabilities.items()): -# if reliability is None: -# continue - -# # Calculate time from last OH in days -# time_from_last_oh = (date - self.last_oh_date).days - -# # Calculate failure replacement cost -# failure_prob = 1 - reliability - -# # For expected operating time, we need to integrate R(t) from 0 to T -# # Since we have discrete points, we'll approximate using trapezoidal rule -# if i == 0: -# expected_operating_time = time_from_last_oh # First point -# else: -# # Approximate integral using available reliability values -# time_points = [(dates[j] - self.last_oh_date).days for j in range(i+1)] -# rel_values = [rel for rel in reliabilities_list[:i+1] if rel is not None] - -# if len(rel_values) > 1: -# expected_operating_time = np.trapezoid(rel_values, time_points) -# else: -# expected_operating_time = time_from_last_oh * reliability - -# # Calculate costs -# if expected_operating_time > 0: -# failure_cost = (failure_prob * failure_replacement_cost) / expected_operating_time -# preventive_cost = (reliability * preventive_cost) / expected_operating_time -# else: -# # failure_cost = failure_prob * self.IDRu -# # preventive_cost = reliability * self.IDRp - -# continue - -# total_cost = failure_cost + preventive_cost - -# results.append({ -# 'date': date, -# 'days_from_last_oh': time_from_last_oh, -# 'reliability': reliability, -# 'failure_probability': failure_prob, -# 'expected_operating_time': expected_operating_time, -# 'failure_replacement_cost': failure_cost, -# 'preventive_replacement_cost': preventive_cost, -# 'total_cost': total_cost, -# 'procurement_cost': 0, -# 'procurement_details': [] -# }) - -# return results - -# def _find_optimal_timing(self, results_df): -# """ -# Find the timing that gives the lowest total cost. - -# Parameters: -# results_df: DataFrame from calculate_costs_over_period - -# Returns: -# Dictionary with optimal timing information -# """ -# if results_df.empty: -# return None - -# # Find minimum total cost -# min_cost_idx = results_df['total_cost'].idxmin() -# optimal_row = results_df.loc[min_cost_idx] - -# return { -# 'optimal_index': min_cost_idx, -# 'optimal_date': optimal_row['date'], -# 'days_from_last_oh': optimal_row['days_from_last_oh'], -# 'reliability': optimal_row['reliability'], -# 'failure_cost': optimal_row['failure_replacement_cost'], -# 'preventive_cost': optimal_row['preventive_replacement_cost'], -# 'total_cost': optimal_row['total_cost'], -# 'procurement_cost': 0 -# } - - -# async def calculate_cost_all_equipment( -# self, -# db_session: DbSession, -# equipments: list, -# preventive_cost: float, -# calculation, -# ) : -# """ -# Calculate optimization for entire fleet of equipment -# """ -# max_interval = self._get_months_between(self.last_oh_date, self.next_oh_date) -# preventive_cost_per_equipment = preventive_cost / len(equipments) - -# fleet_results = [] -# total_corrective_costs = np.zeros(max_interval) -# total_preventive_costs = np.zeros(max_interval) -# total_procurement_costs = np.zeros(max_interval) -# total_failures = np.zeros(max_interval) - -# for equipment in equipments: -# # Get reliability data -# cost_per_failure = equipment.material_cost -# reliabilities = self._get_reliability_equipment( -# location_tag=equipment.location_tag, -# ) - -# predicted_cost = self._calculate_costs_each_point(reliabilities=reliabilities, preventive_cost=preventive_cost_per_equipment, failure_replacement_cost=cost_per_failure) - -# optimum_cost = self._find_optimal_timing(pd.DataFrame(predicted_cost)) - -# # Aggregate costs -# corrective_costs = [r["failure_replacement_cost"] for r in predicted_cost] -# preventive_costs = [r["preventive_replacement_cost"] for r in predicted_cost] -# procurement_costs = [r["procurement_cost"] for r in predicted_cost] -# procurement_details = [r["procurement_details"] for r in predicted_cost] -# failures = [(1-r["reliability"]) for r in predicted_cost] - -# fleet_results.append( -# CalculationEquipmentResult( -# corrective_costs=corrective_costs, -# overhaul_costs=preventive_costs, -# procurement_costs=procurement_costs, -# daily_failures=failures, -# location_tag=equipment.location_tag, -# material_cost=equipment.material_cost, -# service_cost=equipment.service_cost, -# optimum_day=optimum_cost['optimal_index'], -# calculation_data_id=calculation.id, -# procurement_details=procurement_details -# ) -# ) - -# total_corrective_costs += np.array(corrective_costs) -# total_preventive_costs += np.array(preventive_costs) -# total_procurement_costs += np.array(procurement_costs) - -# # Calculate fleet optimal interval -# total_costs = total_corrective_costs + total_preventive_costs + total_procurement_costs -# fleet_optimal_index = np.argmin(total_costs) - -# calculation.optimum_oh_day =fleet_optimal_index + 1 - -# db_session.add_all(fleet_results) -# await db_session.commit() - -# return { -# 'id': calculation.id, -# 'fleet_results': fleet_results, -# 'fleet_optimal_interval': fleet_optimal_index + 1, -# 'fleet_optimal_cost': total_costs[fleet_optimal_index], -# 'total_corrective_costs': total_corrective_costs.tolist(), -# 'total_preventive_costs': total_preventive_costs.tolist(), -# 'total_procurement_costs': total_procurement_costs.tolist(), -# } +log = logging.getLogger(__name__) +setup_logging(logger=log) class OptimumCostModel: - def __init__(self, token, last_oh_date, next_oh_date, interval=30, base_url: str = "http://192.168.1.82:8000"): + def __init__(self, token: str, last_oh_date: date, next_oh_date: date, + time_window_months: Optional[int] = None, + base_url: str = "http://192.168.1.82:8000"): + """ + Initialize the Optimum Cost Model for overhaul timing optimization. + + Args: + token: API authentication token + last_oh_date: Date of last overhaul + next_oh_date: Planned date of next overhaul + time_window_months: Analysis window in months (default: 1.5x planned interval) + base_url: API base URL + """ self.api_base_url = base_url self.token = token self.last_oh_date = last_oh_date self.next_oh_date = next_oh_date - self.interval = interval self.session = None - - # Pre-calculate date range for reuse + + # Calculate planned overhaul interval in months + self.planned_oh_months = self._get_months_between(last_oh_date, next_oh_date) + + # Set analysis time window (default: 1.5x planned interval) + self.time_window_months = time_window_months or int(self.planned_oh_months * 1.5) + + # Pre-calculate date range for API calls self.date_range = self._generate_date_range() - - # Setup logging for debugging + + # Setup logging logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) + + self.logger.info(f"OptimumCostModel initialized:") + self.logger.info(f" - Planned OH interval: {self.planned_oh_months} months") + self.logger.info(f" - Analysis window: {self.time_window_months} months") + + def _get_months_between(self, start_date: date, end_date: date) -> int: + """Calculate number of months between two dates""" + return (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month) def _generate_date_range(self) -> List[datetime]: - """Pre-generate the date range to avoid repeated calculations""" + """Generate date range for analysis based on time window""" dates = [] - current_date = self.last_oh_date - while current_date <= self.next_oh_date: + current_date = datetime.combine(self.last_oh_date, datetime.min.time()) + end_date = current_date + timedelta(days=self.time_window_months * 30) + + while current_date <= end_date: dates.append(current_date) current_date += timedelta(days=31) + return dates - def _get_months_between(self, start_date: date, end_date: date) -> int: - """Calculate number of months between two dates""" - return (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month) - async def _create_session(self): """Create aiohttp session with connection pooling""" if self.session is None: - timeout = aiohttp.ClientTimeout() + timeout = aiohttp.ClientTimeout(total=300) connector = aiohttp.TCPConnector( - limit=500, # Total connection pool size - limit_per_host=200, # Max connections per host - ttl_dns_cache=00, # DNS cache TTL + limit=500, + limit_per_host=200, + ttl_dns_cache=300, use_dns_cache=True, force_close=False, enable_cleanup_closed=True @@ -875,246 +128,424 @@ class OptimumCostModel: await self.session.close() self.session = None - async def _get_reliability_from_api_async( - self, - target_date: datetime, - location_tag: str, - max_retries: int = 3, - retry_delay: float = 1.0 -) -> Optional[float]: - date_str = target_date.strftime('%Y-%m-%d %H:%M:%S.%f') - url = f"{self.api_base_url}/calculate/failure-rate/{location_tag}/{date_str}" - - for attempt in range(max_retries + 1): - try: - async with self.session.get(url) as response: - if response.status == 200: - data = await response.json() - return data['data']['value'] - else: - # Server error - may be worth retrying - print(f"Server error {response.status} for {location_tag} on {date_str}") - if attempt < max_retries: - await asyncio.sleep(retry_delay * (2 ** attempt)) - continue - - except aiohttp.ClientError as e: - print(f"Network error for {location_tag} on {date_str} (attempt {attempt + 1}): {e}") - if attempt < max_retries: - await asyncio.sleep(retry_delay * (2 ** attempt)) - continue - return None - except Exception as e: - print(f"Unexpected error for {location_tag} on {date_str}: {e}") - return None - - return None - - async def _get_reliability_equipment_batch( - self, - location_tags: List[str], - batch_size: int = 100, - max_retries: int = 3, - rate_limit_delay: float = 0.1 - ): - await self._create_session() - - # Prepare all tasks with retry wrapper - tasks = [] - task_mapping = [] - - for location_tag in location_tags: - for target_date in self.date_range: - # Create a task with retry logic - task = self._execute_with_retry( - target_date, - location_tag, - max_retries=max_retries - ) - tasks.append(task) - task_mapping.append((location_tag, target_date)) - - print(f"Executing {len(tasks)} API calls in batches of {batch_size}...") - - # Process with controlled concurrency - semaphore = asyncio.Semaphore(batch_size) - results = defaultdict(dict) - completed = 0 - total_batches = (len(tasks) + batch_size - 1) // batch_size - - async def process_task(task, location_tag, target_date): - nonlocal completed - async with semaphore: - try: - reliability = await task - results[location_tag][target_date] = reliability - except Exception as e: - results[location_tag][target_date] = None - print(f"Failed for {location_tag} on {target_date}: {str(e)}") - finally: - completed += 1 - if completed % 100 == 0: - print(f"Progress: {completed}/{len(tasks)} ({completed/len(tasks):.1%})") - - # Process all tasks with rate limiting - batch_tasks = [] - for i, task in enumerate(tasks): - batch_tasks.append(process_task(task, *task_mapping[i])) - if i > 0 and i % batch_size == 0: - await asyncio.sleep(rate_limit_delay) # Gentle rate limiting - - await asyncio.gather(*batch_tasks) - results_serializable = {str(k): v for k, v in results.items()} - return dict(results) - - def json_datetime_handler(self,obj): - if isinstance(obj, datetime): - return obj.isoformat() # Or str(obj) - return str(obj) - - - async def _execute_with_retry( - self, - target_date: datetime, - location_tag: str, - max_retries: int = 3, - base_delay: float = 1.0 - ) -> Optional[float]: - """ - Execute API call with exponential backoff retry logic. - """ - for attempt in range(max_retries + 1): - try: - return await self._get_reliability_from_api_async(target_date, location_tag) - except aiohttp.ClientError as e: - if attempt == max_retries: - raise - delay = base_delay * (2 ** attempt) - await asyncio.sleep(delay) - except Exception as e: - raise # Non-retryable errors - - def _get_equipment_fr( - self, - location_tag: str, - token: str - ): - failure_rate_url = f"{self.api_base_url}/asset/failure-rate/{location_tag}" + async def get_failures_prediction(self, simulation_id: str, location_tag: str, birnbaum_importance: float): + """Get failure predictions for equipment from simulation service""" + plot_result_url = f"{self.api_base_url}/aeros/simulation/result/plot/{simulation_id}/{location_tag}?use_location_tag=1" + try: response = requests.get( - failure_rate_url, + plot_result_url, headers={ "Content-Type": "application/json", - "Authorization": f"Bearer {token}", + "Authorization": f"Bearer {self.token}", }, - timeout=10 + timeout=30 ) response.raise_for_status() - result = response.json() + prediction_data = response.json() except (requests.RequestException, ValueError) as e: - raise Exception(f"Failed to fetch or parse mdt data: {e}") + self.logger.error(f"Failed to fetch prediction data for {location_tag}: {e}") + return None - fr = result["data"]["failure_rate"] + plot_data = prediction_data.get('data', {}).get('timestamp_outs') if prediction_data.get("data") else None + + if not plot_data: + self.logger.warning(f"No plot data available for {location_tag}") + return None + + time_series = create_time_series_data(plot_data, 43830) + monthly_data = analyze_monthly_metrics(time_series) + + return monthly_data - return fr + async def get_simulation_results(self, simulation_id: str = "default"): + """Get simulation results for Birnbaum importance calculations""" + headers = { + "Authorization": f"Bearer {self.token}", + "Content-Type": "application/json" + } - def _get_failure_rate(self, location_tag: str, token: str): - failure_rate_url = f"{self.api_base_url}/asset/failure-rate/{location_tag}" - try: - response = requests.get( - failure_rate_url, - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {token}", - }, - timeout=10 - ) - response.raise_for_status() - result = response.json() - except (requests.RequestException, ValueError) as e: - raise Exception(f"Failed to fetch or parse mdt data: {e}") + calc_result_url = f"{self.api_base_url}/aeros/simulation/result/calc/{simulation_id}?nodetype=RegularNode" + plant_result_url = f"{self.api_base_url}/aeros/simulation/result/calc/{simulation_id}/plant" - fr = result["data"]["failure_rate"] + async with httpx.AsyncClient(timeout=300.0) as client: + calc_task = client.get(calc_result_url, headers=headers) + plant_task = client.get(plant_result_url, headers=headers) - return fr + calc_response, plant_response = await asyncio.gather(calc_task, plant_task) - def _calculate_costs_vectorized(self, failures_prediction: List[Dict], - preventive_cost: float, failure_replacement_cost: float) -> List[Dict]: - - # Extract data from failures_prediction - months = [item['month'] for item in failures_prediction] - failure_counts = [item['failures'] for item in failures_prediction] + calc_response.raise_for_status() + plant_response.raise_for_status() + + calc_data = calc_response.json()["data"] + plant_data = plant_response.json()["data"] + + return { + "calc_result": calc_data, + "plant_result": plant_data + } + + def _calculate_equipment_costs(self, failures_prediction: Dict, birnbaum_importance: float, + preventive_cost: float, failure_replacement_cost: float, + location_tag: str) -> List[Dict]: + """Calculate costs for each month for a single equipment""" - # Calculate failure costs for each month - failure_costs = [count * failure_replacement_cost for count in failure_counts] - + if not failures_prediction: + self.logger.warning(f"No failure prediction data for {location_tag}") + return [] + + months = list(failures_prediction.keys()) + num_months = len(months) + + # Calculate risk costs and failure costs + risk_costs = [] + cumulative_risk_costs = [] + failure_counts = [] - # Calculate preventive costs (distributed equally across months) - num_months = np.array([i+1 for i in range(len(failures_prediction))]) - preventive_costs = preventive_cost/num_months + cumulative_risk = 0 - # Total cost = Failure cost + Preventive cost - total_costs = [failure_costs[i] + preventive_costs[i] for i in range(len(num_months))] + for month_key in months: + data = failures_prediction[month_key] + + # Risk cost = flow_rate × birnbaum_importance × downtime_hours × energy_price + monthly_risk = data['avg_flow_rate'] * birnbaum_importance * data['total_oos_hours'] * 1000 + risk_costs.append(monthly_risk) + + cumulative_risk += monthly_risk + cumulative_risk_costs.append(cumulative_risk) + + failure_counts.append(data['cumulative_failures']) - # Convert back to list of dictionaries + # Calculate costs for each month results = [] - for i in range(len(failures_prediction)): + + for i in range(num_months): + month_index = i + 1 + + # Failure cost = cumulative failures × replacement cost + cumulative risk cost + failure_cost = (failure_counts[i] * failure_replacement_cost) + cumulative_risk_costs[i] + + # Preventive cost = overhaul cost distributed over months + preventive_cost_month = preventive_cost / month_index + + # Total cost = failure cost + preventive cost + total_cost = failure_cost + preventive_cost_month + results.append({ - 'month': months[i], - 'number_of_failure': failure_counts[i], - 'failure_replacement_cost': failure_costs[i], - 'preventive_replacement_cost': preventive_costs[i], - 'total_cost': total_costs[i], - 'procurement_cost': 0, - 'procurement_details': [] + 'month': month_index, + 'number_of_failures': failure_counts[i], + 'failure_cost': failure_cost, + 'preventive_cost': preventive_cost_month, + 'total_cost': total_cost, + 'is_after_planned_oh': month_index > self.planned_oh_months, + 'delay_months': max(0, month_index - self.planned_oh_months), + 'risk_cost': cumulative_risk_costs[i], + 'monthly_risk_cost': risk_costs[i], + 'procurement_cost': 0, # For database compatibility + 'procurement_details': [] # For database compatibility }) return results - def _find_optimal_timing_vectorized(self, results: List[Dict]) -> Optional[Dict]: - """ - Vectorized optimal timing calculation - """ - if not results: + def _find_optimal_timing(self, cost_results: List[Dict], location_tag: str) -> Optional[Dict]: + """Find optimal timing for equipment overhaul""" + if not cost_results: return None - total_costs = np.array([r['total_cost'] for r in results]) + # Find month with minimum total cost + min_cost = float('inf') + optimal_result = None + optimal_index = -1 + + for i, result in enumerate(cost_results): + if result['total_cost'] < min_cost: + min_cost = result['total_cost'] + optimal_result = result + optimal_index = i + + if optimal_result is None: + return None + + # Calculate cost comparison with planned timing + planned_cost = None + cost_vs_planned = None + + if self.planned_oh_months <= len(cost_results): + planned_cost = cost_results[self.planned_oh_months - 1]['total_cost'] + cost_vs_planned = optimal_result['total_cost'] - planned_cost + + return { + 'location_tag': location_tag, + 'optimal_month': optimal_result['month'], + 'optimal_index': optimal_index, + 'optimal_cost': optimal_result['total_cost'], + 'failure_cost': optimal_result['failure_cost'], + 'preventive_cost': optimal_result['preventive_cost'], + 'number_of_failures': optimal_result['number_of_failures'], + 'is_delayed': optimal_result['is_after_planned_oh'], + 'delay_months': optimal_result['delay_months'], + 'planned_oh_month': self.planned_oh_months, + 'planned_cost': planned_cost, + 'cost_vs_planned': cost_vs_planned, + 'savings_from_delay': -cost_vs_planned if cost_vs_planned and cost_vs_planned < 0 else 0, + 'cost_of_delay': cost_vs_planned if cost_vs_planned and cost_vs_planned > 0 else 0, + 'all_monthly_costs': cost_results + } - min_idx = np.argmin(total_costs) - optimal_result = results[min_idx] + async def calculate_optimal_timing_single_equipment(self, equipment, birnbaum_importance: float, + simulation_id: str = "default") -> Optional[Dict]: + """Calculate optimal overhaul timing for a single equipment""" + + location_tag = equipment.location_tag + self.logger.info(f"Calculating optimal timing for {location_tag}") + + # Get failure predictions + monthly_data = await self.get_failures_prediction(simulation_id, location_tag, birnbaum_importance) + + if not monthly_data: + self.logger.warning(f"No monthly data available for {location_tag}") + return None + + # Calculate costs + preventive_cost = equipment.overhaul_cost + equipment.service_cost + failure_replacement_cost = equipment.material_cost + (3 * 111000 * 3) # Material + Labor + + cost_results = self._calculate_equipment_costs( + failures_prediction=monthly_data, + birnbaum_importance=birnbaum_importance, + preventive_cost=preventive_cost, + failure_replacement_cost=failure_replacement_cost, + location_tag=location_tag + ) + + # Find optimal timing + optimal_timing = self._find_optimal_timing(cost_results, location_tag) + + if optimal_timing: + self.logger.info(f"Optimal timing for {location_tag}: Month {optimal_timing['optimal_month']} " + f"(Cost: ${optimal_timing['optimal_cost']:,.2f})") + + if optimal_timing['is_delayed']: + self.logger.info(f" - Delay recommended: {optimal_timing['delay_months']} months") + self.logger.info(f" - Savings from delay: ${optimal_timing['savings_from_delay']:,.2f}") + + return optimal_timing + async def calculate_cost_all_equipment(self, db_session, equipments: List, calculation, + preventive_cost: float, simulation_id: str = "default") -> Dict: + """ + Calculate optimal overhaul timing for entire fleet and save to database + """ + + self.logger.info(f"Starting fleet optimization for {len(equipments)} equipment items") + max_interval = self.time_window_months + + # Get Birnbaum importance values + try: + importance_results = await self.get_simulation_results(simulation_id) + equipment_birnbaum = { + imp['aeros_node']['node_name']: imp['contribution'] + for imp in importance_results["calc_result"] + } + except Exception as e: + self.logger.error(f"Failed to get simulation results: {e}") + equipment_birnbaum = {} + + # Initialize fleet aggregation arrays + fleet_results = [] + total_corrective_costs = np.zeros(max_interval) + total_preventive_costs = np.zeros(max_interval) + total_procurement_costs = np.zeros(max_interval) + total_costs = np.zeros(max_interval) + + for equipment in equipments: + location_tag = equipment.location_tag + birnbaum = equipment_birnbaum.get(location_tag, 0.0) + + if birnbaum == 0.0: + self.logger.warning(f"No Birnbaum importance found for {location_tag}, using 0.0") + + try: + # Get failure predictions + monthly_data = await self.get_failures_prediction(simulation_id, location_tag, birnbaum) + + if not monthly_data: + continue + + # Calculate costs + equipment_preventive_cost = equipment.overhaul_cost + equipment.service_cost + failure_replacement_cost = equipment.material_cost + (3 * 111000 * 3) + + cost_results = self._calculate_equipment_costs( + failures_prediction=monthly_data, + birnbaum_importance=birnbaum, + preventive_cost=equipment_preventive_cost, + failure_replacement_cost=failure_replacement_cost, + location_tag=location_tag + ) + + if not cost_results: + continue + + # Find optimal timing + optimal_timing = self._find_optimal_timing(cost_results, location_tag) + + if not optimal_timing: + continue + + # Prepare arrays for database (pad to max_interval length) + corrective_costs = [r["failure_cost"] for r in cost_results] + preventive_costs = [r["preventive_cost"] for r in cost_results] + procurement_costs = [r["procurement_cost"] for r in cost_results] + failures = [r["number_of_failures"] for r in cost_results] + total_costs_equipment = [r['total_cost'] for r in cost_results] + procurement_details = [r["procurement_details"] for r in cost_results] + + # Pad arrays to max_interval length + def pad_array(arr, target_length): + if len(arr) < target_length: + return arr + [arr[-1]] * (target_length - len(arr)) # Use last value for padding + return arr[:target_length] + + corrective_costs = pad_array(corrective_costs, max_interval) + preventive_costs = pad_array(preventive_costs, max_interval) + procurement_costs = pad_array(procurement_costs, max_interval) + failures = pad_array(failures, max_interval) + total_costs_equipment = pad_array(total_costs_equipment, max_interval) + procurement_details = pad_array(procurement_details, max_interval) + + # Create database result object + equipment_result = CalculationEquipmentResult( + corrective_costs=corrective_costs, + overhaul_costs=preventive_costs, + procurement_costs=procurement_costs, + daily_failures=failures, + location_tag=equipment.location_tag, + material_cost=equipment.material_cost, + service_cost=equipment.service_cost, + optimum_day=optimal_timing['optimal_index'], + calculation_data_id=calculation.id, + procurement_details=procurement_details + ) + + fleet_results.append(equipment_result) + + # Aggregate costs for fleet analysis + total_corrective_costs += np.array(corrective_costs) + total_preventive_costs += np.array(preventive_costs) + total_procurement_costs += np.array(procurement_costs) + total_costs += np.array(total_costs_equipment) + + self.logger.info(f"Processed {location_tag}: Optimal month {optimal_timing['optimal_month']}") + + except Exception as e: + self.logger.error(f"Failed to calculate timing for {location_tag}: {e}") + continue + + # Calculate fleet optimal interval + fleet_optimal_index = np.argmin(total_costs) + fleet_optimal_cost = total_costs[fleet_optimal_index] + + # Update calculation with results + calculation.optimum_oh_day = fleet_optimal_index + calculation.max_interval = max_interval + + # Save all results to database + db_session.add_all(fleet_results) + await db_session.commit() + + self.logger.info(f"Fleet optimization completed:") + self.logger.info(f" - Fleet optimal month: {fleet_optimal_index + 1}") + self.logger.info(f" - Fleet optimal cost: ${fleet_optimal_cost:,.2f}") + self.logger.info(f" - Results saved to database for {len(fleet_results)} equipment") + return { - 'optimal_index': min_idx, - 'optimal_month': optimal_result['month'], - 'number_of_failure': optimal_result['number_of_failure'], - 'failure_cost': optimal_result['failure_replacement_cost'], - 'preventive_cost': optimal_result['preventive_replacement_cost'], - 'total_cost': optimal_result['total_cost'], - 'procurement_cost': optimal_result['procurement_cost'] + 'id': calculation.id, + 'fleet_results': fleet_results, + 'fleet_optimal_interval': fleet_optimal_index + 1, + 'fleet_optimal_cost': fleet_optimal_cost, + 'total_corrective_costs': total_corrective_costs.tolist(), + 'total_preventive_costs': total_preventive_costs.tolist(), + 'total_procurement_costs': total_procurement_costs.tolist(), + 'analysis_parameters': { + 'planned_oh_months': self.planned_oh_months, + 'analysis_window_months': self.time_window_months, + 'last_oh_date': self.last_oh_date.isoformat(), + 'next_oh_date': self.next_oh_date.isoformat() + } } - def _get_number_of_failure_after_last_oh(self, target_date, token, location_tag): - date = target_date.strftime('%Y-%m-%d %H:%M:%S.%f') - nof = f"http://192.168.1.82:8000/reliability/calculate/failures/{location_tag}/{date}" - header = { - 'Content-Type': 'application/json', - 'Authorization': 'Bearer ' + token - } +class OptimumCostModelWithSpareparts: + def __init__(self, token: str, last_oh_date: date, next_oh_date: date, + sparepart_manager, + time_window_months: Optional[int] = None, + base_url: str = "http://192.168.1.82:8000"): + """ + Initialize the Optimum Cost Model with sparepart management + """ + self.api_base_url = base_url + self.token = token + self.last_oh_date = last_oh_date + self.next_oh_date = next_oh_date + self.session = None + self.sparepart_manager = sparepart_manager + + # Calculate planned overhaul interval in months + self.planned_oh_months = self._get_months_between(last_oh_date, next_oh_date) + + # Set analysis time window (default: 1.5x planned interval) + self.time_window_months = time_window_months or int(self.planned_oh_months * 1.5) + + # Pre-calculate date range for API calls + self.date_range = self._generate_date_range() + + self.logger = log - response = requests.get(nof, headers=header) + def _get_months_between(self, start_date: date, end_date: date) -> int: + """Calculate number of months between two dates""" + return (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month) - data = response.json() + def _generate_date_range(self) -> List[datetime]: + """Generate date range for analysis based on time window""" + dates = [] + current_date = datetime.combine(self.last_oh_date, datetime.min.time()) + end_date = current_date + timedelta(days=self.time_window_months * 30) + + while current_date <= end_date: + dates.append(current_date) + current_date += timedelta(days=31) + + return dates - return data['data']['value'] + async def _create_session(self): + """Create aiohttp session with connection pooling""" + if self.session is None: + timeout = aiohttp.ClientTimeout(total=300) + connector = aiohttp.TCPConnector( + limit=500, + limit_per_host=200, + ttl_dns_cache=300, + use_dns_cache=True, + force_close=False, + enable_cleanup_closed=True + ) + self.session = aiohttp.ClientSession( + timeout=timeout, + connector=connector, + headers={'Authorization': f'Bearer {self.token}'} + ) - - async def get_failures_prediction(self, simulation_id: str, location_tag): - # calc_result_url = f"{RBD_SERVICE_API}/aeros/simulation/result/calc/{simulation_id}?nodetype=RegularNode" - plot_result_url = f"{RBD_SERVICE_API}/aeros/simulation/result/plot/{simulation_id}/{location_tag}?use_location_tag=1" - # calc_plant_result = f"{RBD_SERVICE_API}/aeros/simulation/result/calc/{simulation_id}/plant" + async def _close_session(self): + """Close aiohttp session""" + if self.session: + await self.session.close() + self.session = None + async def get_failures_prediction(self, simulation_id: str, location_tag: str, birnbaum_importance: float): + """Get failure predictions for equipment from simulation service""" + plot_result_url = f"{self.api_base_url}/aeros/simulation/result/plot/{simulation_id}/{location_tag}?use_location_tag=1" try: response = requests.get( @@ -1123,200 +554,522 @@ class OptimumCostModel: "Content-Type": "application/json", "Authorization": f"Bearer {self.token}", }, - timeout=10 + timeout=30 ) response.raise_for_status() prediction_data = response.json() except (requests.RequestException, ValueError) as e: - raise Exception(f"Failed to fetch or parse prediction data: {e}") - - if not prediction_data["data"]: - results = [] - for i in range(1, 33): - results.append({ - "month": i, - "failures": 0 - }) - return results - - plot_data = prediction_data['data']['timestamp_outs'] - time_series = create_time_series_data(plot_data) + self.logger.error(f"Failed to fetch prediction data for {location_tag}: {e}") + return None + + plot_data = prediction_data.get('data', {}).get('timestamp_outs') if prediction_data.get("data") else None - return calculate_failures_per_month(time_series) - - async def get_simulation_results(self): + if not plot_data: + self.logger.warning(f"No plot data available for {location_tag}") + return None + + time_series = create_time_series_data(plot_data, 43830) + monthly_data = analyze_monthly_metrics(time_series) + + return monthly_data + + async def get_simulation_results(self, simulation_id: str = "default"): + """Get simulation results for Birnbaum importance calculations""" headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } - calc_result_url = f"{RBD_SERVICE_API}/aeros/simulation/result/calc/default?nodetype=RegularNode" - # plot_result_url = f"{RBD_SERVICE_API}/aeros/simulation/result/plot/{simulation_id}?nodetype=RegularNode" - calc_plant_result = f"{RBD_SERVICE_API}/aeros/simulation/result/calc/default/plant" + calc_result_url = f"{self.api_base_url}/aeros/simulation/result/calc/{simulation_id}?nodetype=RegularNode" + plant_result_url = f"{self.api_base_url}/aeros/simulation/result/calc/{simulation_id}/plant" async with httpx.AsyncClient(timeout=300.0) as client: calc_task = client.get(calc_result_url, headers=headers) - # plot_task = client.get(plot_result_url, headers=headers) - plant_task = client.get(calc_plant_result, headers=headers) + plant_task = client.get(plant_result_url, headers=headers) - # Run all three requests concurrently calc_response, plant_response = await asyncio.gather(calc_task, plant_task) calc_response.raise_for_status() - # plot_response.raise_for_status() plant_response.raise_for_status() calc_data = calc_response.json()["data"] - # plot_data = plot_response.json()["data"] plant_data = plant_response.json()["data"] return { "calc_result": calc_data, - # "plot_result": plot_data, "plant_result": plant_data } - async def calculate_cost_all_equipment( - self, - db_session, # DbSession type - equipments: List, # List of equipment objects - preventive_cost: float, - calculation, - ) -> Dict: - """ - Optimized calculation for entire fleet of equipment using concurrent API calls - """ - try: + def _calculate_equipment_costs_with_spareparts(self, failures_prediction: Dict, birnbaum_importance: float, + preventive_cost: float, failure_replacement_cost: float, + location_tag: str, planned_overhauls: List = None) -> List[Dict]: + """Calculate costs for each month including sparepart costs and availability""" + + if not failures_prediction: + self.logger.warning(f"No failure prediction data for {location_tag}") + return [] - max_interval = len(self.date_range) - preventive_cost_per_equipment = preventive_cost / len(equipments) + months = list(failures_prediction.keys()) + num_months = len(months) + + # Calculate basic costs (same as before) + risk_costs = [] + cumulative_risk_costs = [] + failure_counts = [] + + cumulative_risk = 0 + + for month_key in months: + data = failures_prediction[month_key] + monthly_risk = data['avg_flow_rate'] * birnbaum_importance * data['total_oos_hours'] * 1000 + risk_costs.append(monthly_risk) + + cumulative_risk += monthly_risk + cumulative_risk_costs.append(cumulative_risk) + + failure_counts.append(data['cumulative_failures']) + + # Calculate costs for each month including sparepart considerations + results = [] + + for i in range(num_months): + month_index = i + 1 + + # Basic failure and preventive costs + failure_cost = (failure_counts[i] * failure_replacement_cost) + cumulative_risk_costs[i] + preventive_cost_month = preventive_cost / month_index + + # Check sparepart availability for this month + sparepart_analysis = self._analyze_sparepart_availability( + location_tag, month_index - 1, planned_overhauls or [] + ) + + + # Calculate procurement costs if spareparts are needed + procurement_cost = sparepart_analysis['total_procurement_cost'] + procurement_details = sparepart_analysis + + # Adjust total cost based on sparepart availability + if not sparepart_analysis['available']: + total_cost = failure_cost + preventive_cost_month + procurement_cost + else: + # All spareparts available + total_cost = failure_cost + preventive_cost_month + procurement_cost + + results.append({ + 'month': month_index, + 'number_of_failures': failure_counts[i], + 'failure_cost': failure_cost, + 'preventive_cost': preventive_cost_month, + 'procurement_cost': procurement_cost, + 'total_cost': total_cost, + 'is_after_planned_oh': month_index > self.planned_oh_months, + 'delay_months': max(0, month_index - self.planned_oh_months), + 'risk_cost': cumulative_risk_costs[i], + 'monthly_risk_cost': risk_costs[i], + 'procurement_details': procurement_details, + 'sparepart_available': sparepart_analysis['available'], + 'sparepart_status': sparepart_analysis['message'], + 'can_proceed': sparepart_analysis['can_proceed_with_delays'] + }) + + return results - # Extract location tags for batch processing - location_tags = [eq.location_tag for eq in equipments] + def _analyze_sparepart_availability(self, equipment_tag: str, target_month: int, + planned_overhauls: List) -> Dict: + """Analyze sparepart availability for equipment at target month""" + if not self.sparepart_manager: + return { + 'available': True, + 'message': 'Sparepart manager not initialized', + 'total_procurement_cost': 0, + 'procurement_needed': [], + 'can_proceed_with_delays': True + } + + # Convert planned overhauls to format expected by sparepart manager + other_overhauls = [(eq_tag, month) for eq_tag, month in planned_overhauls + if eq_tag != equipment_tag and month <= target_month] + + return self.sparepart_manager.check_sparepart_availability( + equipment_tag, target_month, other_overhauls + ) - print(f"Starting reliability data fetch for {len(equipments)} equipment...") + def _find_optimal_timing_with_spareparts(self, cost_results: List[Dict], location_tag: str) -> Optional[Dict]: + """Find optimal timing considering sparepart constraints""" + if not cost_results: + return None - # Fetch all reliability data concurrently - # all_reliabilities = await self._get_reliability_equipment_batch(location_tags) + # Filter out months where overhaul cannot proceed due to critical sparepart shortages + feasible_results = [r for r in cost_results if r['can_proceed']] + + # if not feasible_results: + # self.logger.warning(f"No feasible overhaul months for {location_tag} due to sparepart constraints") + # # Return the earliest month with the least critical parts missing + # min_critical_missing = min(r['missing_critical_parts'] for r in cost_results) + # for result in cost_results: + # if result['missing_critical_parts'] == min_critical_missing: + # return self._create_optimal_result(result, location_tag, "INFEASIBLE") + # return None + + # Find month with minimum total cost among feasible options + min_cost = float('inf') + optimal_result = None + optimal_index = -1 + + for i, result in enumerate(cost_results): + if result in feasible_results and result['total_cost'] < min_cost: + min_cost = result['total_cost'] + optimal_result = result + optimal_index = i + + if optimal_result is None: + return None + + return self._create_optimal_result(optimal_result, location_tag, "OPTIMAL") - print("Processing cost calculations...") - - importance_results = await self.get_simulation_results() + def _create_optimal_result(self, optimal_result: Dict, location_tag: str, status: str) -> Dict: + """Create standardized optimal result dictionary""" + # Calculate cost comparison with planned timing + planned_cost = None + cost_vs_planned = None + + if self.planned_oh_months <= len(optimal_result.get('all_monthly_costs', [])): + # This would need the full cost results array + pass # Will be calculated in the calling function + + return { + 'location_tag': location_tag, + 'optimal_month': optimal_result['month'], + 'optimal_index': optimal_result['month'] - 1, + 'optimal_cost': optimal_result['total_cost'], + 'failure_cost': optimal_result['failure_cost'], + 'preventive_cost': optimal_result['preventive_cost'], + 'procurement_cost': optimal_result['procurement_cost'], + 'number_of_failures': optimal_result['number_of_failures'], + 'is_delayed': optimal_result['is_after_planned_oh'], + 'delay_months': optimal_result['delay_months'], + 'planned_oh_month': self.planned_oh_months, + 'planned_cost': planned_cost, + 'cost_vs_planned': cost_vs_planned, + 'savings_from_delay': 0, # Will be calculated later + 'cost_of_delay': 0, # Will be calculated later + 'sparepart_available': optimal_result['sparepart_available'], + 'sparepart_status': optimal_result['sparepart_status'], + 'procurement_details': optimal_result['procurement_details'], + # 'missing_critical_parts': optimal_result['missing_critical_parts'], + 'optimization_status': status, + 'all_monthly_costs': [] # Will be filled by calling function + } + + async def calculate_cost_all_equipment_with_spareparts(self, db_session,collector_db_session ,equipments: List, + calculation, preventive_cost: float, + simulation_id: str = "default") -> Dict: + """ + Calculate optimal overhaul timing for entire fleet considering sparepart constraints + """ + + self.logger.info(f"Starting fleet optimization with sparepart management for {len(equipments)} equipment") + max_interval = self.time_window_months + + # Get Birnbaum importance values + try: + importance_results = await self.get_simulation_results(simulation_id) + equipment_birnbaum = { + imp['aeros_node']['node_name']: imp['contribution'] + for imp in importance_results["calc_result"] + } + except Exception as e: + self.logger.error(f"Failed to get simulation results: {e}") + equipment_birnbaum = {} + + # Phase 1: Calculate individual optimal timings without considering interactions + individual_results = {} + + for equipment in equipments: + location_tag = equipment.location_tag + birnbaum = equipment_birnbaum.get(location_tag, 0.0) - equipment_birbaum = {imp['aeros_node']['node_name']: imp['contribution'] for imp in importance_results["calc_result"]} - - # Process each equipment's costs - fleet_results = [] - total_corrective_costs = np.zeros(max_interval) - total_preventive_costs = np.zeros(max_interval) - total_procurement_costs = np.zeros(max_interval) - total_costs = np.zeros(max_interval) - total_failures = np.zeros(max_interval) - - for equipment in equipments: - location_tag = equipment.location_tag - cost_per_failure = equipment.material_cost - overhaul_cost = equipment.overhaul_cost - service_cost = equipment.service_cost - - failures_prediction = await self.get_failures_prediction( - simulation_id = "6825d5b9-c165-45fe-b553-8c71c1a15903", - location_tag=location_tag - ) + try: + # Get failure predictions + monthly_data = await self.get_failures_prediction(simulation_id, location_tag, birnbaum) - energy_price_per_mwh = 1000000 + if not monthly_data: + continue - birnbaum = equipment_birbaum.get(location_tag, 0) - mw_lost_when_failed = birnbaum * 660 - cost_per_hour_downtime = mw_lost_when_failed * energy_price_per_mwh - - # # Get pre-fetched reliability data - # failure_rate = all_reliabilities.get(location_tag, {}) - # # failure_rate = self._get_equipment_fr(location_tag, self.token) + # Calculate costs without considering other equipment (first pass) + equipment_preventive_cost = equipment.overhaul_cost + equipment.service_cost + failure_replacement_cost = equipment.material_cost + (3 * 111000 * 3) - - # Calculate costs using vectorized operations - predicted_costs = self._calculate_costs_vectorized( - preventive_cost=overhaul_cost + service_cost, - failure_replacement_cost=cost_per_failure + cost_per_hour_downtime, - failures_prediction=failures_prediction + cost_results = self._calculate_equipment_costs_with_spareparts( + failures_prediction=monthly_data, + birnbaum_importance=birnbaum, + preventive_cost=equipment_preventive_cost, + failure_replacement_cost=failure_replacement_cost, + location_tag=location_tag, + planned_overhauls=[] # Empty in first pass ) - - if not predicted_costs: - self.logger.warning(f"No valid cost predictions for equipment {location_tag}") + + if not cost_results: continue + + # Find individual optimal timing + optimal_timing = self._find_optimal_timing_with_spareparts(cost_results, location_tag) + + if optimal_timing: + optimal_timing['all_monthly_costs'] = cost_results + individual_results[location_tag] = optimal_timing + + self.logger.info(f"Individual optimal for {location_tag}: Month {optimal_timing['optimal_month']}") + + except Exception as e: + self.logger.error(f"Failed to calculate individual timing for {location_tag}: {e}") + continue + + # Phase 2: Optimize considering sparepart interactions + self.logger.info("Phase 2: Optimizing with sparepart interactions...") + + # Start with individual optimal timings + current_plan = [(tag, result['optimal_month']) for tag, result in individual_results.items()] + + # Iteratively improve the plan considering sparepart constraints + improved_plan = self._optimize_fleet_with_sparepart_constraints( + individual_results, equipments, equipment_birnbaum, simulation_id + ) + + # Phase 3: Generate final results and database objects + fleet_results = [] + total_corrective_costs = np.zeros(max_interval) + total_preventive_costs = np.zeros(max_interval) + total_procurement_costs = np.zeros(max_interval) + total_costs = np.zeros(max_interval) + + total_fleet_procurement_cost = 0 + + for equipment in equipments: + location_tag = equipment.location_tag + + if location_tag not in individual_results: + continue + + # Get the optimized timing from improved plan + equipment_timing = next((month for tag, month in improved_plan if tag == location_tag), + individual_results[location_tag]['optimal_month']) + + # Get the cost data for this timing + cost_data = individual_results[location_tag]['all_monthly_costs'][equipment_timing - 1] + + # Prepare arrays for database + all_costs = individual_results[location_tag]['all_monthly_costs'] + + corrective_costs = [r["failure_cost"] for r in all_costs] + preventive_costs = [r["preventive_cost"] for r in all_costs] + procurement_costs = [r["procurement_cost"] for r in all_costs] + failures = [r["number_of_failures"] for r in all_costs] + total_costs_equipment = [r['total_cost'] for r in all_costs] + procurement_details = [r["procurement_details"] for r in all_costs] + + # Pad arrays to max_interval length + def pad_array(arr, target_length): + if len(arr) < target_length: + return arr + [arr[-1]] * (target_length - len(arr)) + return arr[:target_length] + + corrective_costs = pad_array(corrective_costs, max_interval) + preventive_costs = pad_array(preventive_costs, max_interval) + procurement_costs = pad_array(procurement_costs, max_interval) + failures = pad_array(failures, max_interval) + total_costs_equipment = pad_array(total_costs_equipment, max_interval) + procurement_details = pad_array(procurement_details, max_interval) + + # Create database result object + equipment_result = CalculationEquipmentResult( + corrective_costs=corrective_costs, + overhaul_costs=preventive_costs, + procurement_costs=procurement_costs, + daily_failures=failures, + location_tag=equipment.location_tag, + material_cost=equipment.material_cost, + service_cost=equipment.service_cost, + optimum_day=equipment_timing - 1, # Convert to 0-based index + calculation_data_id=calculation.id, + procurement_details=procurement_details + ) + + fleet_results.append(equipment_result) + + # Aggregate costs for fleet analysis + total_corrective_costs += np.array(corrective_costs) + total_preventive_costs += np.array(preventive_costs) + total_procurement_costs += np.array(procurement_costs) + total_costs += np.array(total_costs_equipment) + + # Add to total fleet procurement cost + total_fleet_procurement_cost += cost_data['procurement_cost'] + + self.logger.info(f"Final timing for {location_tag}: Month {equipment_timing} " + f"(Procurement cost: ${cost_data['procurement_cost']:,.2f})") + + # Calculate fleet optimal interval + fleet_optimal_index = np.argmin(total_costs) + fleet_optimal_cost = total_costs[fleet_optimal_index] + + # Generate procurement optimization report + procurement_plan = self.sparepart_manager.optimize_procurement_timing(improved_plan) + + # Update calculation with results + calculation.optimum_oh_day = fleet_optimal_index + calculation.max_interval = max_interval + + # Save all results to database + db_session.add_all(fleet_results) + await db_session.commit() + + self.logger.info(f"Fleet optimization with spareparts completed:") + self.logger.info(f" - Fleet optimal month: {fleet_optimal_index + 1}") + self.logger.info(f" - Fleet optimal cost: ${fleet_optimal_cost:,.2f}") + self.logger.info(f" - Total procurement cost: ${total_fleet_procurement_cost:,.2f}") + self.logger.info(f" - Equipment with sparepart constraints: {len([r for r in individual_results.values() if not r['sparepart_available']])}") + + return { + 'id': calculation.id, + 'fleet_results': fleet_results, + 'fleet_optimal_interval': fleet_optimal_index + 1, + 'fleet_optimal_cost': fleet_optimal_cost, + 'total_corrective_costs': total_corrective_costs.tolist(), + 'total_preventive_costs': total_preventive_costs.tolist(), + 'total_procurement_costs': total_procurement_costs.tolist(), + 'individual_results': individual_results, + 'optimized_plan': improved_plan, + 'procurement_plan': procurement_plan, + 'total_fleet_procurement_cost': total_fleet_procurement_cost, + 'analysis_parameters': { + 'planned_oh_months': self.planned_oh_months, + 'analysis_window_months': self.time_window_months, + 'last_oh_date': self.last_oh_date.isoformat(), + 'next_oh_date': self.next_oh_date.isoformat(), + 'sparepart_optimization_enabled': True + } + } - optimum_cost = self._find_optimal_timing_vectorized(predicted_costs) - - # Pad arrays to ensure consistent length - corrective_costs = [r["failure_replacement_cost"] for r in predicted_costs] - preventive_costs = [r["preventive_replacement_cost"] for r in predicted_costs] - procurement_costs = [r["procurement_cost"] for r in predicted_costs] - procurement_details = [r["procurement_details"] for r in predicted_costs] - failures = [r["number_of_failure"] for r in predicted_costs] - total_costs_per_equipment = [r['total_cost'] for r in predicted_costs] - - # Pad arrays to max_interval length - def pad_array(arr, target_length): - if len(arr) < target_length: - return arr + [0] * (target_length - len(arr)) - return arr[:target_length] - - corrective_costs = pad_array(corrective_costs, max_interval) - preventive_costs = pad_array(preventive_costs, max_interval) - procurement_costs = pad_array(procurement_costs, max_interval) - failures = pad_array(failures, max_interval) - total_costs_per_equipment = pad_array(total_costs_per_equipment, max_interval) - - fleet_results.append( - CalculationEquipmentResult( # Assuming this class exists - corrective_costs=corrective_costs, - overhaul_costs=preventive_costs, - procurement_costs=procurement_costs, - daily_failures=failures, - location_tag=equipment.location_tag, - material_cost=equipment.material_cost, - service_cost=equipment.service_cost, - optimum_day=optimum_cost['optimal_index'] if optimum_cost else 0, - calculation_data_id=calculation.id, - procurement_details=procurement_details - ) + def _optimize_fleet_with_sparepart_constraints(self, individual_results: Dict, equipments: List, + equipment_birnbaum: Dict, simulation_id: str) -> List[Tuple[str, int]]: + """ + Optimize fleet overhaul timing considering sparepart sharing constraints + """ + # Start with individual optimal timings + current_plan = [(tag, result['optimal_month']) for tag, result in individual_results.items()] + + # Sort by optimal month to process in chronological order + current_plan.sort(key=lambda x: x[1]) + + improved_plan = [] + processed_equipment = [] + + for equipment_tag, optimal_month in current_plan: + # Check sparepart availability considering already processed equipment + sparepart_analysis = self.sparepart_manager.check_sparepart_availability( + equipment_tag, optimal_month - 1, processed_equipment + ) + + if sparepart_analysis['available'] or sparepart_analysis['can_proceed_with_delays']: + # Can proceed with optimal timing + improved_plan.append((equipment_tag, optimal_month)) + processed_equipment.append((equipment_tag, optimal_month)) + self.logger.info(f"Confirmed optimal timing for {equipment_tag}: Month {optimal_month}") + else: + # Need to find alternative timing + alternative_month = self._find_alternative_timing( + equipment_tag, optimal_month, individual_results[equipment_tag]['all_monthly_costs'], + processed_equipment ) + + if alternative_month: + improved_plan.append((equipment_tag, alternative_month)) + processed_equipment.append((equipment_tag, alternative_month)) + self.logger.info(f"Alternative timing for {equipment_tag}: Month {alternative_month} " + f"(was {optimal_month})") + else: + # Force original timing with procurement + improved_plan.append((equipment_tag, optimal_month)) + processed_equipment.append((equipment_tag, optimal_month)) + self.logger.warning(f"Forced timing for {equipment_tag}: Month {optimal_month} " + f"(requires emergency procurement)") + + return improved_plan - # Aggregate costs using veoptimal_resultctorized operations - total_corrective_costs += np.array(corrective_costs) - total_preventive_costs += np.array(preventive_costs) - total_procurement_costs += np.array(procurement_costs) - total_costs += np.array(total_costs_per_equipment) - - # Calculate fleet optimal interval - # total_costs = total_corrective_costs + total_preventive_costs + total_procurement_costs - fleet_optimal_index = np.argmin(total_costs) - - - calculation.optimum_oh_day = fleet_optimal_index - calculation.max_interval = max_interval-1 - - - # Batch database operations - db_session.add_all(fleet_results) - await db_session.commit() - - self.logger.info(f"Calculation completed successfully for {len(fleet_results)} equipment") - - return { - 'id': calculation.id, - 'fleet_results': fleet_results, - 'fleet_optimal_interval': fleet_optimal_index + 1, - 'fleet_optimal_cost': total_costs[fleet_optimal_index], - 'total_corrective_costs': total_corrective_costs.tolist(), - 'total_preventive_costs': total_preventive_costs.tolist(), - 'total_procurement_costs': total_procurement_costs.tolist(), - } - - finally: - # Always clean up the session - await self._close_session() + def _find_alternative_timing(self, equipment_tag: str, preferred_month: int, + cost_results: List[Dict], processed_equipment: List[Tuple[str, int]]) -> Optional[int]: + """ + Find alternative timing when preferred month has sparepart constraints + """ + # Try months around the preferred timing + search_range = 6 # Look 3 months before and after + + candidates = [] + + for offset in range(-search_range//2, search_range//2 + 1): + candidate_month = preferred_month + offset + + if candidate_month <= 0 or candidate_month > len(cost_results): + continue + + if candidate_month == preferred_month: + continue # Already know this doesn't work + + # Check sparepart availability for this month + sparepart_analysis = self.sparepart_manager.check_sparepart_availability( + equipment_tag, candidate_month - 1, processed_equipment + ) + + if sparepart_analysis['available'] or sparepart_analysis['can_proceed_with_delays']: + cost_data = cost_results[candidate_month - 1] + candidates.append((candidate_month, cost_data['total_cost'])) + + if not candidates: + return None + + # Return the month with lowest cost among feasible alternatives + candidates.sort(key=lambda x: x[1]) + return candidates[0][0] + + def generate_sparepart_report(self, results: Dict) -> str: + """Generate comprehensive sparepart analysis report""" + individual_results = results['individual_results'] + procurement_plan = results['procurement_plan'] + + report = f""" + SPAREPART ANALYSIS REPORT + {'='*50} + + FLEET SUMMARY: + - Total equipment analyzed: {len(individual_results)} + - Total procurement cost: ${results['total_fleet_procurement_cost']:,.2f} + - Equipment requiring procurement: {len([r for r in individual_results.values() if r['procurement_cost'] > 0])} + + PROCUREMENT SUMMARY: + - Total procurement items: {procurement_plan['summary']['total_items'] if 'summary' in procurement_plan else 0} + - Critical items: {procurement_plan['summary']['critical_items'] if 'summary' in procurement_plan else 0} + - Unique spareparts: {procurement_plan['summary']['unique_spareparts'] if 'summary' in procurement_plan else 0} + - Suppliers involved: {procurement_plan['summary']['suppliers_involved'] if 'summary' in procurement_plan else 0} + + EQUIPMENT DETAILS: + """ + + for tag, result in individual_results.items(): + status = "✓ Available" if result['sparepart_available'] else "⚠ Procurement needed" + report += f"- {tag}: Month {result['optimal_month']} - {status}" + if result['procurement_cost'] > 0: + report += f" (${result['procurement_cost']:,.2f})" + report += "\n" + + if procurement_plan.get('procurement_by_month'): + report += "\nPROCUREMENT SCHEDULE:\n" + for month, items in procurement_plan['procurement_by_month'].items(): + month_cost = sum(item['total_cost'] for item in items) + report += f"Month {month + 1}: {len(items)} items - ${month_cost:,.2f}\n" + + return report async def __aenter__(self): """Async context manager entry""" @@ -1328,66 +1081,110 @@ class OptimumCostModel: await self._close_session() - -async def run_simulation(*, db_session: DbSession, calculation: CalculationData, token: str, collector_db_session: CollectorDbSession): +# Updated run_simulation function with sparepart management +async def run_simulation_with_spareparts(*, db_session, calculation, token: str, collector_db_session, + time_window_months: Optional[int] = None, + simulation_id: str = "default") -> Dict: + """ + Run complete overhaul optimization simulation with sparepart management + """ + + # Get equipment and scope data equipments = await get_standard_scope_by_session_id( - db_session=db_session, overhaul_session_id=calculation.overhaul_session_id, collector_db=collector_db_session + db_session=db_session, + overhaul_session_id=calculation.overhaul_session_id, + collector_db=collector_db_session ) - scope = await get_scope(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id) - prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope) - calculation_data = await get_calculation_data_by_id( db_session=db_session, calculation_id=calculation.id ) + + sparepart_manager = await load_sparepart_data_from_db(scope=scope, prev_oh_scope=prev_oh_scope, db_session=collector_db_session) + + # Initialize optimization model with sparepart management + optimum_oh_model = OptimumCostModelWithSpareparts( + token=token, + last_oh_date=prev_oh_scope.end_date, + next_oh_date=scope.start_date, + time_window_months=time_window_months, + base_url=RBD_SERVICE_API, + sparepart_manager=sparepart_manager + ) + + try: + # Run fleet optimization with sparepart management + results = await optimum_oh_model.calculate_cost_all_equipment_with_spareparts( + db_session=db_session, + collector_db_session=collector_db_session, + equipments=equipments, + calculation=calculation_data, + preventive_cost=calculation_data.parameter.overhaul_cost, + simulation_id=simulation_id + ) + + # Generate sparepart report + # sparepart_report = optimum_oh_model.generate_sparepart_report(results) + # print(sparepart_report) + + return results + + finally: + await optimum_oh_model._close_session() - sparepars_query = await db_session.execute( - select(MasterSparePart)) - spareparts = { - sparepart.id: { - 'data': sparepart, - 'stock': copy.copy(sparepart.stock) - } for sparepart in sparepars_query.scalars().all() - } +async def run_simulation(*, db_session, calculation, token: str, collector_db_session, + time_window_months: Optional[int] = None, + simulation_id: str = "default") -> Dict: + """ + Run complete overhaul optimization simulation + + Args: + time_window_months: Analysis window in months (default: 1.5x planned interval) + simulation_id: Simulation ID for failure predictions + """ + + # Get equipment and scope data + equipments = await get_standard_scope_by_session_id( + db_session=db_session, + overhaul_session_id=calculation.overhaul_session_id, + collector_db=collector_db_session + ) + + scope = await get_scope(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id) + prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope) - # reliability_service = ReliabilityService(base_url=REALIBILITY_SERVICE_API, use_dummy_data=True) - # spare_parts_service = SparePartsService(spareparts) - # optimum_calculator_service = OverhaulCalculator(reliability_service, spare_parts_service) + calculation_data = await get_calculation_data_by_id( + db_session=db_session, calculation_id=calculation.id + ) + # Initialize optimization model optimum_oh_model = OptimumCostModel( token=token, last_oh_date=prev_oh_scope.end_date, next_oh_date=scope.start_date, - base_url=REALIBILITY_SERVICE_API - ) - - # Set the date range for the calculation - # if prev_oh_scope: - # # Start date is the day after the previous scope's end date - # start_date = datetime.datetime.combine(prev_oh_scope.end_date + datetime.timedelta(days=1), datetime.time.min) - # # End date is the start date of the current scope - # end_date = datetime.datetime.combine(scope.start_date, datetime.time.min) - # else: - # # If there's no previous scope, use the start and end dates from the current scope - # start_date = datetime.datetime.combine(scope.start_date, datetime.time.min) - # end_date = datetime.datetime.combine(scope.end_date, datetime.time.min) - - - results = await optimum_oh_model.calculate_cost_all_equipment( - db_session=db_session, - equipments=equipments, - calculation=calculation_data, - preventive_cost=calculation_data.parameter.overhaul_cost, + time_window_months=time_window_months, + base_url=RBD_SERVICE_API ) - - return results - + try: + # Run fleet optimization and save to database + results = await optimum_oh_model.calculate_cost_all_equipment( + db_session=db_session, + equipments=equipments, + calculation=calculation_data, + preventive_cost=calculation_data.parameter.overhaul_cost, + simulation_id=simulation_id + ) + + return results + + finally: + await optimum_oh_model._close_session() async def get_corrective_cost_time_chart( @@ -1556,159 +1353,6 @@ async def get_corrective_cost_time_chart( return corrective_costs, monthly_failure - # days_difference = (end_date - start_date).days - - # today = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) - # tomorrow = today + datetime.timedelta(days=1) - # url_prediction = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{tomorrow.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" - # url_history = f"http://192.168.1.82:8000/reliability/main/failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{today.strftime('%Y-%m-%d')}" - - # # Initialize monthly data dictionary - # monthly_data = {} - - # # Get historical data (start_date to today) - # if start_date <= today: - # try: - # response = requests.get( - # url_history, - # headers={ - # "Content-Type": "application/json", - # "Authorization": f"Bearer {token}", - # }, - # ) - # history_data = response.json() - - - # # Process historical data - accumulate failures by month - # history_dict = {} - # monthly_failures = {} - - # for item in history_data["data"]: - # date = datetime.datetime.strptime(item["date"], "%d %b %Y") - # month_key = datetime.datetime(date.year, date.month, 1) - - # # Initialize if first occurrence of this month - # if month_key not in history_dict: - # history_dict[month_key] = 0 - - # # Accumulate failures for this month - # if item["num_fail"] is not None: - # history_dict[month_key] += item["num_fail"] - - # # Sort months chronologically - # sorted_months = sorted(history_dict.keys()) - - # failures = np.array([history_dict[month] for month in sorted_months]) - # cum_failure = np.cumsum(failures) - - # for month_key in sorted_months: - # monthly_failures[month_key] = int(cum_failure[sorted_months.index(month_key)]) - - - # # Update monthly_data with cumulative historical data - # monthly_data.update(monthly_failures) - # except Exception as e: - # # print(f"Error fetching historical data: {e}") - # raise Exception(e) - - - # latest_num = 1 - - # # Get prediction data (today+1 to end_date) - # if end_date > today: - # try: - # response = requests.get( - # url_prediction, - # headers={ - # "Content-Type": "application/json", - # "Authorization": f"Bearer {token}", - # }, - # ) - # prediction_data = response.json() - - # # Use the last prediction value for future months - # # Get the latest number from prediction data - # latest_num = prediction_data["data"][-1]["num_fail"] - - # # Ensure the value is at least 1 - # if not latest_num or latest_num < 1: - # latest_num = 1 - # else: - # # Round the number to the nearest integer - # latest_num = round(latest_num) - - # # Create prediction dictionary - # prediction_dict = {} - # for item in prediction_data["data"]: - # date = datetime.datetime.strptime(item["date"], "%d %b %Y") - # month_key = datetime.datetime(date.year, date.month, 1) - # prediction_dict[month_key] = round(item["num_fail"]) - - # # Update monthly_data with prediction data - # for key in prediction_dict: - # if key not in monthly_data: # Don't overwrite historical data - # monthly_data[key] = prediction_dict[key] - # except Exception as e: - # print(f"Error fetching prediction data: {e}") - - # # Create a complete date range covering all months from start to end - # current_date = datetime.datetime(start_date.year, start_date.month, 1) - # while current_date <= end_date: - # if current_date not in monthly_data: - # # Initialize to check previous months - # previous_month = current_date.replace(day=1) - datetime.timedelta(days=1) - # # Now previous_month is the last day of the previous month - # # Convert back to first day of previous month for consistency - # previous_month = previous_month.replace(day=1) - - # # Keep going back until we find data or run out of months to check - # month_diff = (current_date.year - start_date.year) * 12 + (current_date.month - start_date.month) - # max_attempts = max(1, month_diff) # Ensure at least 1 attempt - # attempts = 0 - - # while previous_month not in monthly_data and attempts < max_attempts: - # # Move to the previous month (last day of the month before) - # previous_month = previous_month.replace(day=1) - datetime.timedelta(days=1) - # # Convert to first day of month - # previous_month = previous_month.replace(day=1) - # attempts += 1 - - # # Use the found value or default to 0 if no previous month with data exists - # if previous_month in monthly_data: - # monthly_data[current_date] = monthly_data[previous_month] - # else: - # monthly_data[current_date] = 0 - - # # Move to next month - # if current_date.month == 12: - # current_date = datetime.datetime(current_date.year + 1, 1, 1) - # else: - # current_date = datetime.datetime(current_date.year, current_date.month + 1, 1) - - - # # # Convert to list maintaining chronological order - # complete_data = [] - # for month in sorted(monthly_data.keys()): - # complete_data.append(monthly_data[month]) - - - # # Convert to numpy array - # monthly_failure = np.array(complete_data) - # cost_per_failure = (material_cost + service_cost) / latest_num - # if cost_per_failure == 0: - # raise ValueError("Cost per failure cannot be zero") - - # # if location_tag == "3TR-TF005": - # # raise Exception(cost_per_failure, latest_num) - - # corrective_costs = monthly_failure * cost_per_failure - - - # return corrective_costs, monthly_failure - - # # except Exception as e: - # # print(f"Error fetching or processing data: {str(e)}") - # # raise def get_overhaul_cost_by_time_chart( overhaul_cost: float, months_num: int, numEquipments: int, decay_base: float = 1.01 @@ -1754,81 +1398,295 @@ async def create_param_and_data( async def get_calculation_result(db_session: DbSession, calculation_id: str): - scope_calculation = await get_calculation_data_by_id( - db_session=db_session, calculation_id=calculation_id - ) - if not scope_calculation: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="A data with this id does not exist.", + """ + Get calculation results with improved error handling, performance, and sparepart details + """ + try: + # Get calculation data with equipment results in single query + calculation_query = await db_session.execute( + select(CalculationData) + .options(selectinload(CalculationData.equipment_results)) + .where(CalculationData.id == calculation_id) ) + scope_calculation = calculation_query.scalar_one_or_none() + + if not scope_calculation: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Calculation with id {calculation_id} does not exist.", + ) - scope_overhaul = await get_scope( - db_session=db_session, overhaul_session_id=scope_calculation.overhaul_session_id - ) - if not scope_overhaul: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="A data with this id does not exist.", + # Get scope information + scope_overhaul = await get_scope( + db_session=db_session, + overhaul_session_id=scope_calculation.overhaul_session_id ) + if not scope_overhaul: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Overhaul scope for session {scope_calculation.overhaul_session_id} does not exist.", + ) - prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope_overhaul) - - # # Set the date range for the calculation - # if prev_oh_scope: - # # Start date is the day after the previous scope's end date - # start_date = datetime.combine(prev_oh_scope.end_date + timedelta(days=1), time.min) - # # End date is the start date of the current scope - # end_date = datetime.combine(scope_overhaul.start_date, datetime.time.min) - # else: - # # If there's no previous scope, use the start and end dates from the current scope - # start_date = datetime.combine(scope_overhaul.start_date, datetime.time.min) - # end_date = datetime.combine(scope_overhaul.end_date, datetime.time.min) - - data_num = scope_calculation.max_interval - - calculation_results = [] - for i in range(data_num): - result = { - "overhaul_cost": 0, - "corrective_cost": 0, - "procurement_cost": 0, - "num_failures": 0, - "day": i + 1, - "procurement_details": {}, - } - ## Add risk Cost - # risk cost = ((Down Time1 * MW Loss 1) + (Downtime2 * Mw 2) + .... (DowntimeN * MwN) ) * Harga listrik (Efficicency HL App) + # Get previous overhaul scope for analysis context + prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope_overhaul) - for eq in scope_calculation.equipment_results: + # Validate data integrity + data_num = scope_calculation.max_interval + if data_num <= 0: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid max_interval in calculation data.", + ) - if len(eq.procurement_details[i]) > 0: - result["procurement_details"][eq.location_tag] = { - "is_included": eq.is_included, - "details": eq.procurement_details[i], + # Filter included equipment for performance + included_equipment = [eq for eq in scope_calculation.equipment_results if eq.is_included] + all_equipment = scope_calculation.equipment_results + + # Pre-calculate aggregated statistics + calculation_results = [] + fleet_statistics = { + 'total_equipment': len(all_equipment), + 'included_equipment': len(included_equipment), + 'excluded_equipment': len(all_equipment) - len(included_equipment), + 'equipment_with_sparepart_constraints': 0, + 'total_procurement_items': 0, + 'critical_procurement_items': 0 + } + + # Process each month + for month_index in range(data_num): + month_result = { + "overhaul_cost": 0.0, + "corrective_cost": 0.0, + "procurement_cost": 0.0, + "num_failures": 0.0, + "day": month_index + 1, + "month": month_index + 1, # More intuitive naming + "procurement_details": {}, + "sparepart_summary": { + "total_procurement_cost": 0.0, + "equipment_requiring_procurement": 0, + "critical_shortages": 0, + "existing_orders_value": 0.0, + "new_orders_required": 0, + "urgent_procurements": 0 } - if not eq.is_included: - continue + } - result["corrective_cost"] += float(eq.corrective_costs[i]) - result["overhaul_cost"] += float(eq.overhaul_costs[i]) - result["procurement_cost"] += float(eq.procurement_costs[i]) - result["num_failures"] += float(eq.daily_failures[i]) + equipment_requiring_procurement = 0 + total_existing_orders_value = 0.0 + total_new_orders_value = 0.0 + critical_shortages = 0 + urgent_procurements = 0 - result["num_failures"] = result["num_failures"] - calculation_results.append(CalculationResultsRead(**result)) + # Process all equipment (included and excluded) for complete procurement picture + for eq in all_equipment: + # Validate array bounds + if month_index >= len(eq.procurement_details): + continue + procurement_detail = eq.procurement_details[month_index] + + # Handle equipment with procurement needs + if (procurement_detail and + isinstance(procurement_detail, dict) and + procurement_detail.get("procurement_needed")): + + equipment_requiring_procurement += 1 + + # Extract PR/PO summary if available + pr_po_summary = procurement_detail.get("pr_po_summary", {}) + + # Aggregate existing orders value + existing_orders_value = pr_po_summary.get("total_existing_value", 0) + total_existing_orders_value += existing_orders_value + + # Aggregate new orders value + new_orders_value = pr_po_summary.get("total_new_orders_value", 0) + total_new_orders_value += new_orders_value + + # Count critical shortages + critical_missing = procurement_detail.get("critical_missing_parts", 0) + if critical_missing > 0: + critical_shortages += 1 + + # Count urgent procurements + recommendations = procurement_detail.get("recommendations", []) + urgent_items = [r for r in recommendations if r.get("priority") == "CRITICAL"] + if urgent_items: + urgent_procurements += 1 + + # Add detailed procurement info for this equipment + month_result["procurement_details"][eq.location_tag] = { + "is_included": eq.is_included, + "location_tag": eq.location_tag, + "details": procurement_detail.get("procurement_needed", []), + "detailed_message": procurement_detail.get("detailed_message", ""), + "pr_po_summary": pr_po_summary, + "recommendations": recommendations, + "sparepart_available": procurement_detail.get("sparepart_available", True), + "can_proceed": procurement_detail.get("can_proceed_with_delays", True), + "critical_missing_parts": critical_missing, + "existing_orders_value": existing_orders_value, + "new_orders_value": new_orders_value + } + + # Only include costs from equipment marked as included + if eq.is_included: + # Validate array bounds before accessing + if (month_index < len(eq.corrective_costs) and + month_index < len(eq.overhaul_costs) and + month_index < len(eq.procurement_costs) and + month_index < len(eq.daily_failures)): + + month_result["corrective_cost"] += float(eq.corrective_costs[month_index]) + month_result["overhaul_cost"] += float(eq.overhaul_costs[month_index]) + month_result["procurement_cost"] += float(eq.procurement_costs[month_index]) + month_result["num_failures"] += float(eq.daily_failures[month_index]) + + # Update month sparepart summary + month_result["sparepart_summary"].update({ + "total_procurement_cost": month_result["procurement_cost"], + "equipment_requiring_procurement": equipment_requiring_procurement, + "critical_shortages": critical_shortages, + "existing_orders_value": total_existing_orders_value, + "new_orders_required": len([eq for eq in all_equipment + if month_index < len(eq.procurement_details) and + eq.procurement_details[month_index] and + eq.procurement_details[month_index].get("procurement_needed")]), + "urgent_procurements": urgent_procurements + }) + + # Calculate total cost for this month + month_result["total_cost"] = (month_result["corrective_cost"] + + month_result["overhaul_cost"] + + month_result["procurement_cost"]) + + calculation_results.append(CalculationResultsRead(**month_result)) + + # Update fleet statistics + fleet_statistics['equipment_with_sparepart_constraints'] = len([ + eq for eq in all_equipment + if any(detail and detail.get("procurement_needed") + for detail in eq.procurement_details if detail) + ]) + + fleet_statistics['total_procurement_items'] = sum([ + len(detail.get("procurement_needed", [])) + for eq in all_equipment + for detail in eq.procurement_details + if detail and isinstance(detail, dict) + ]) + + # Calculate optimal timing analysis + optimal_analysis = _analyze_optimal_timing( + calculation_results, scope_calculation.optimum_oh_day, prev_oh_scope, scope_overhaul + ) + + # Return comprehensive result + return CalculationTimeConstrainsRead( + id=scope_calculation.id, + reference=scope_calculation.overhaul_session_id, + scope=scope_overhaul.maintenance_type.name, + results=calculation_results, + optimum_oh=scope_calculation.optimum_oh_day, + optimum_oh_month=scope_calculation.optimum_oh_day + 1, # 1-based month + equipment_results=scope_calculation.equipment_results, + fleet_statistics=fleet_statistics, + optimal_analysis=optimal_analysis, + analysis_metadata={ + "max_interval_months": data_num, + "last_overhaul_date": prev_oh_scope.end_date.isoformat() if prev_oh_scope else None, + "next_planned_overhaul": scope_overhaul.start_date.isoformat(), + "calculation_type": "sparepart_optimized" if fleet_statistics['equipment_with_sparepart_constraints'] > 0 else "standard", + "total_equipment_analyzed": len(all_equipment), + "included_in_optimization": len(included_equipment) + } + ) + + except HTTPException: + # Re-raise HTTP exceptions as-is + raise + except Exception as e: + # Log the error for debugging + import logging + logger = logging.getLogger(__name__) + logger.error(f"Error in get_calculation_result for calculation_id {calculation_id}: {str(e)}") + + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Internal error processing calculation results: {str(e)}", + ) + + +def _analyze_optimal_timing(calculation_results: List, optimum_oh_day: int, + prev_oh_scope, scope_overhaul) -> Dict: + """Analyze optimal timing and provide recommendations""" + + if not calculation_results: + return {} + + # Find the result for optimal day + optimal_result = None + if 0 <= optimum_oh_day < len(calculation_results): + optimal_result = calculation_results[optimum_oh_day] + + # Calculate planned overhaul timing + planned_oh_months = None + if prev_oh_scope and scope_overhaul: + planned_oh_months = (scope_overhaul.start_date.year - prev_oh_scope.end_date.year) * 12 + \ + (scope_overhaul.start_date.month - prev_oh_scope.end_date.month) + + # Find minimum cost point + min_cost_result = min(calculation_results, key=lambda x: x.total_cost) + min_cost_month = min_cost_result.month + + # Calculate timing recommendation + timing_recommendation = "OPTIMAL" + if planned_oh_months: + if optimum_oh_day + 1 < planned_oh_months: + timing_recommendation = "EARLY" + elif optimum_oh_day + 1 > planned_oh_months: + timing_recommendation = "DELAYED" + else: + timing_recommendation = "ON_SCHEDULE" + + # Analyze cost trends + cost_trend = "STABLE" + if len(calculation_results) > 1: + early_costs = [r.total_cost for r in calculation_results[:len(calculation_results)//3]] + late_costs = [r.total_cost for r in calculation_results[-len(calculation_results)//3:]] + + avg_early = sum(early_costs) / len(early_costs) if early_costs else 0 + avg_late = sum(late_costs) / len(late_costs) if late_costs else 0 + + if avg_late > avg_early * 1.2: + cost_trend = "INCREASING" + elif avg_late < avg_early * 0.8: + cost_trend = "DECREASING" + + return { + "optimal_month": optimum_oh_day + 1, + "planned_month": planned_oh_months, + "timing_recommendation": timing_recommendation, + "optimal_total_cost": optimal_result.total_cost if optimal_result else 0, + "optimal_breakdown": { + "corrective_cost": optimal_result.corrective_cost if optimal_result else 0, + "overhaul_cost": optimal_result.overhaul_cost if optimal_result else 0, + "procurement_cost": optimal_result.procurement_cost if optimal_result else 0, + "num_failures": optimal_result.num_failures if optimal_result else 0 + }, + "cost_trend": cost_trend, + "months_from_planned": (optimum_oh_day + 1 - planned_oh_months) if planned_oh_months else None, + "cost_savings_vs_planned": None, # Would need planned month cost to calculate + "sparepart_impact": { + "equipment_with_constraints": optimal_result.sparepart_summary["equipment_requiring_procurement"] if optimal_result else 0, + "critical_shortages": optimal_result.sparepart_summary["critical_shortages"] if optimal_result else 0, + "procurement_investment": optimal_result.sparepart_summary["total_procurement_cost"] if optimal_result else 0 + } + } - # Check if calculation already exist - return CalculationTimeConstrainsRead( - id=scope_calculation.id, - reference=scope_calculation.overhaul_session_id, - scope=scope_overhaul.maintenance_type.name, - results=calculation_results, - optimum_oh=scope_calculation.optimum_oh_day, - equipment_results=scope_calculation.equipment_results, - ) async def get_calculation_data_by_id( @@ -1860,6 +1718,7 @@ async def get_calculation_by_assetnum( return result.scalar() + async def get_number_of_failures(location_tag, start_date, end_date, token, max_interval=24): url_prediction = ( f"http://192.168.1.82:8000/reliability/main/number-of-failures/" @@ -1950,7 +1809,6 @@ async def get_equipment_foh( return mdt_data - # Function to simulate overhaul strategy for a single equipment def simulate_equipment_overhaul(equipment, preventive_cost,predicted_num_failures, interval_months, forced_outage_hours_value ,total_months=24): """ diff --git a/src/calculation_time_constrains/utils.py b/src/calculation_time_constrains/utils.py index 4ad56ac..f8714ba 100644 --- a/src/calculation_time_constrains/utils.py +++ b/src/calculation_time_constrains/utils.py @@ -1,4 +1,7 @@ import datetime +import json + +import pandas as pd def get_months_between(start_date: datetime.datetime, end_date: datetime.datetime) -> int: """ @@ -34,8 +37,8 @@ def create_time_series_data(chart_data, max_hours=24096): # Add hourly data point hourly_data.append({ - 'hour': hour, - 'flowrate': current_flow_rate, + 'cumulativeTime': hour, + 'flowRate': current_flow_rate, 'currentEQStatus': current_eq_status }) @@ -87,4 +90,193 @@ def calculate_failures_per_month(hourly_data): 'failures': monthly_data.get(month, monthly_data.get(month-1, 0)) }) - return result \ No newline at end of file + return result + + +def analyze_monthly_metrics(timestamp_outs): + """ + Analyze time series data to calculate monthly metrics: + 1. Failure count per month + 2. Cumulative failure count each month + 3. Total out-of-service time per month + 4. Average flow rate per month + """ + + # Check if timestamp_outs is None or empty + if timestamp_outs is None or not timestamp_outs: + # Return empty results with zero values + return {} + + # Convert to DataFrame for easier manipulation + df = pd.DataFrame(timestamp_outs) + + # Check if DataFrame is empty after creation + if df.empty: + return {} + + # Check if required columns exist + required_columns = ['cumulativeTime', 'currentEQStatus', 'flowRate'] + missing_columns = [col for col in required_columns if col not in df.columns] + if missing_columns: + return {} + + # Assuming the simulation starts from a reference date (you can modify this) + # For this example, I'll use January 1, 2024 as the start date + start_date = datetime.datetime(2025, 10, 22) + + # Convert cumulative hours to actual datetime + df['datetime'] = df['cumulativeTime'].apply( + lambda x: start_date + datetime.timedelta(hours=x) + ) + + # Extract month-year for grouping + df['month_year'] = df['datetime'].dt.to_period('M') + + # Calculate time duration for each record (difference between consecutive cumulative times) + df['duration_hours'] = df['cumulativeTime'].diff().fillna(df['cumulativeTime'].iloc[0]) + + # Initialize results dictionary + monthly_results = {} + + # Track cumulative failures across all months + cumulative_failures = 0 + cummulative_oos = 0 + + # Group by month-year and ensure chronological order + for month_period, group in df.groupby('month_year'): + month_str = str(month_period) + monthly_results[month_str] = {} + + # 1. Count failures per month + # A failure is when currentEQStatus changes from "Svc" to "OoS" + status_changes = group['currentEQStatus'].shift() != group['currentEQStatus'] + failures = ((group['currentEQStatus'] == 'OoS') & status_changes).sum() + monthly_results[month_str]['failures_count'] = int(failures) + + # 2. Add failures to cumulative count + cumulative_failures += failures + monthly_results[month_str]['cumulative_failures'] = int(cumulative_failures) + + # 3. Total out-of-service time per month (in hours) + oos_time = group[group['currentEQStatus'] == 'OoS']['duration_hours'].sum() + monthly_results[month_str]['total_oos_hours'] = float(oos_time) + + cummulative_oos += oos_time + monthly_results[month_str]['cummulative_oos'] = float(cummulative_oos) + + # 4. Average flow rate per month (weighted by duration) + # Calculate weighted average flow rate + total_flow_time = (group['flowRate'] * group['duration_hours']).sum() + total_time = group['duration_hours'].sum() + avg_flow_rate = total_flow_time / total_time if total_time > 0 else 0 + monthly_results[month_str]['avg_flow_rate'] = float(avg_flow_rate) + + # Additional useful metrics + monthly_results[month_str]['total_hours'] = float(total_time) + monthly_results[month_str]['service_hours'] = float( + group[group['currentEQStatus'] == 'Svc']['duration_hours'].sum() + ) + monthly_results[month_str]['availability_percentage'] = float( + (monthly_results[month_str]['service_hours'] / total_time * 100) if total_time > 0 else 0 + ) + + + return monthly_results + +def calculate_risk_cost_per_failure(monthly_results, birnbaum_importance, energy_price): + """ + Calculate risk cost per failure for each month based on: + 1. Equipment capacity contribution to system (flowrate * birnbaum_importance * availability) + 2. Capacity lost to downtime per month + 3. Energy price + + Parameters: + - monthly_results: Output from analyze_monthly_metrics() + - birnbaum_importance: Birnbaum importance factor for this equipment + - energy_price: Price per unit of energy/flow + + Returns: + - Dictionary with monthly risk costs and array of risk costs per failure + """ + + risk_costs = {} + risk_cost_array = [] + + for month, data in monthly_results.items(): + # Extract monthly data + avg_flow_rate = data['avg_flow_rate'] + availability = data['availability_percentage'] / 100 # Convert to decimal + total_oos_hours = data['total_oos_hours'] + failures_count = data['failures_count'] + + # 1. Calculate equipment capacity contribution to system + # Capacity = avg_flowrate * birnbaum_importance * availability + equipment_capacity = avg_flow_rate * birnbaum_importance * availability + + # 2. Calculate capacity lost to downtime per month + # Lost capacity = avg_flowrate * birnbaum_importance * downtime_hours + capacity_lost_to_downtime = avg_flow_rate * birnbaum_importance * total_oos_hours + + # 3. Calculate total risk cost for the month + # Risk cost = capacity_lost * energy_price + monthly_risk_cost = capacity_lost_to_downtime * energy_price + + # 4. Calculate risk cost per failure for this month + if failures_count > 0: + risk_cost_per_failure = monthly_risk_cost / failures_count + else: + # If no failures, set to 0 or use alternative approach + risk_cost_per_failure = 0 + + # Store results + risk_costs[month] = { + 'equipment_capacity': equipment_capacity, + 'capacity_lost_to_downtime': capacity_lost_to_downtime, + 'monthly_risk_cost': monthly_risk_cost, + 'failures_count': failures_count, + 'risk_cost_per_failure': risk_cost_per_failure + } + + # Add to array + risk_cost_array.append(risk_cost_per_failure) + + return { + 'monthly_details': risk_costs, + 'risk_cost_per_failure_array': risk_cost_array + } + +# Example usage: +def get_monthly_risk_analysis(timestamp_outs, birnbaum_importance, energy_price): + """ + Complete analysis combining monthly metrics with risk cost calculation + """ + # Get monthly metrics + monthly_metrics = analyze_monthly_metrics(timestamp_outs) + + # Calculate risk costs + risk_analysis = calculate_risk_cost_per_failure( + monthly_metrics, + birnbaum_importance, + energy_price + ) + + # Combine results for comprehensive view + combined_results = {} + for month in monthly_metrics.keys(): + combined_results[month] = { + **monthly_metrics[month], + **risk_analysis['monthly_details'][month] + } + + return { + 'monthly_data': combined_results, + 'risk_cost_array': risk_analysis['risk_cost_per_failure_array'] + } + +# Usage example: +# birnbaum_importance = 0.85 # Example value +# energy_price = 100 # Example: $100 per unit +# +# results = get_monthly_risk_analysis(timestamp_outs, birnbaum_importance, energy_price) +# risk_cost_array = results['risk_cost_array'] +# print("Risk cost per failure each month:", risk_cost_array) \ No newline at end of file diff --git a/src/logging.py b/src/logging.py index 9b35e12..34431b5 100644 --- a/src/logging.py +++ b/src/logging.py @@ -1,4 +1,5 @@ import logging +import sys from src.config import LOG_LEVEL from src.enums import OptimumOHEnum @@ -30,3 +31,15 @@ def configure_logging(): # sometimes the slack client can be too verbose logging.getLogger("slack_sdk.web.base_client").setLevel(logging.CRITICAL) + + +def setup_logging(logger): + # Your logging configuration here + logger.setLevel(logging.DEBUG) + # Create formatter + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + + # Create console handler + stream_handler = logging.StreamHandler(sys.stdout) + stream_handler.setFormatter(formatter) + logger.addHandler(stream_handler) \ No newline at end of file diff --git a/src/sparepart/service.py b/src/sparepart/service.py index be12328..00bc635 100644 --- a/src/sparepart/service.py +++ b/src/sparepart/service.py @@ -1,4 +1,13 @@ from typing import Optional +import asyncio +import logging +from datetime import datetime, timedelta, date +from typing import List, Dict, Optional, Tuple +from collections import defaultdict +from uuid import UUID +import numpy as np +from dataclasses import dataclass +from enum import Enum from sqlalchemy import Delete, Select, text from sqlalchemy.orm import joinedload, selectinload @@ -6,13 +15,13 @@ from sqlalchemy.orm import joinedload, selectinload from src.auth.service import CurrentUser from src.database.core import DbSession from src.database.service import CommonParameters, search_filter_sort_paginate +from src.logging import setup_logging +from src.overhaul_scope.service import get as get_scope +from src.overhaul_scope.service import get_prev_oh -# async def get(*, db_session: DbSession, activity_id: str) -> Optional[ActivityMaster]: -# """Returns a document based on the given document id.""" -# result = await db_session.get(MasterActivity, activity_id) -# return result - +log = logging.getLogger(__name__) +setup_logging(logger=log) async def get_all(db_session: DbSession): """ @@ -34,16 +43,20 @@ async def get_all(db_session: DbSession): h.issue_date as pr_issue_date, h.status as pr_status, pl.qty_ordered as pr_qty_ordered, - pl.description + pl.description, + pl.unit_cost, + pl.line_cost FROM public.maximo_sparepart_pr_po h JOIN public.maximo_sparepart_pr_po_line pl ON h.num = pl.num WHERE h.type = 'PR' AND h.issue_date IS NOT NULL AND h.num LIKE 'K%' - ORDER BY pl.item_num, TO_DATE(h.issue_date, 'YYYY-MM-DD') DESC + ORDER BY pl.item_num, h.issue_date DESC ) SELECT DISTINCT ON (pr.item_num) pr.item_num, + pr.line_cost, + pr.unit_cost, pr.description, COALESCE(i.curbaltotal, 0) as current_balance_total, pr.pr_number, @@ -55,7 +68,7 @@ async def get_all(db_session: DbSession): END as po_exists, COALESCE(po.qty_received, 0) as po_qty_received, COALESCE(po.qty_ordered, 0) as po_qty_ordered, - COALESCE(po.estimated_arrival_date, '') as po_estimated_arrival_date + po.estimated_arrival_date as po_estimated_arrival_date FROM latest_prs pr LEFT JOIN public.maximo_inventory i ON pr.item_num = i.itemnum LEFT JOIN LATERAL ( @@ -83,6 +96,8 @@ async def get_all(db_session: DbSession): spare_parts.append({ "item_num": row.item_num, "description": row.description, + "line_cost": row.line_cost, + "unit_cost": row.unit_cost, "current_balance_total": float(row.current_balance_total) if row.current_balance_total is not None else 0.0, "pr_number": row.pr_number, "pr_issue_date": row.pr_issue_date, @@ -97,35 +112,765 @@ async def get_all(db_session: DbSession): -# async def create(*, db_session: DbSession, activty_in: ActivityMasterCreate): -# activity = MasterActivity(**activty_in.model_dump()) -# db_session.add(activity) -# await db_session.commit() -# return activity +class ProcurementStatus(Enum): + PLANNED = "planned" + ORDERED = "ordered" + RECEIVED = "received" + CANCELLED = "cancelled" + +@dataclass +class SparepartRequirement: + """Sparepart requirement for equipment overhaul""" + sparepart_id: str + quantity_required: int + lead_time: int + sparepart_name: str + +@dataclass +class SparepartStock: + """Current sparepart stock information""" + sparepart_id: str + sparepart_name: str + current_stock: int + unit_cost: float + location: str + +@dataclass +class ProcurementRecord: + """Purchase Order/Purchase Request record""" + po_pr_id: str + sparepart_id: str + sparepart_name: str + quantity: int + unit_cost: float + total_cost: float + order_date: date + expected_delivery_date: date + status: ProcurementStatus + +class SparepartManager: + """Manages sparepart availability and procurement for overhaul optimization""" + + def __init__(self, analysis_start_date: date, analysis_window_months: int): + self.analysis_start_date = analysis_start_date + self.analysis_window_months = analysis_window_months + self.logger = log + + # Storage for sparepart data + self.sparepart_stocks: Dict[str, SparepartStock] = {} + self.equipment_requirements: Dict[str, List[SparepartRequirement]] = {} + self.procurement_records: List[ProcurementRecord] = [] + + # Monthly projected stocks + self.projected_stocks: Dict[str, List[int]] = {} + + def add_sparepart_stock(self, stock: SparepartStock): + """Add sparepart stock information""" + self.sparepart_stocks[stock.sparepart_id] = stock + + def add_equipment_requirements(self, equipment_tag: str, requirements: List[SparepartRequirement]): + """Add sparepart requirements for equipment""" + self.equipment_requirements[equipment_tag] = requirements + + def add_procurement_record(self, record: ProcurementRecord): + """Add procurement record (PO/PR)""" + self.procurement_records.append(record) + def _calculate_monthly_deliveries(self) -> Dict[str, Dict[int, int]]: + """Calculate expected deliveries for each sparepart by month""" + deliveries = defaultdict(lambda: defaultdict(int)) + + for record in self.procurement_records: + if record.status in [ProcurementStatus.ORDERED, ProcurementStatus.PLANNED]: + # Skip records with no expected delivery date (e.g., still PR stage) + if not record.expected_delivery_date: + continue + + months_from_start = ( + (record.expected_delivery_date.year - self.analysis_start_date.year) * 12 + + (record.expected_delivery_date.month - self.analysis_start_date.month) + ) + + if 0 <= months_from_start < self.analysis_window_months: + deliveries[record.sparepart_id][months_from_start] += record.quantity + + return deliveries -# async def update( -# *, -# db_session: DbSession, -# activity: MasterActivity, -# activity_in: ActivityMasterCreate -# ): -# """Updates a document.""" -# data = activity_in.model_dump() -# update_data = activity_in.model_dump(exclude_defaults=True) + def _project_monthly_stocks(self) -> Dict[str, List[int]]: + """Project sparepart stock levels for each month""" + projected_stocks = {} + monthly_deliveries = self._calculate_monthly_deliveries() + + for sparepart_id, stock_info in self.sparepart_stocks.items(): + monthly_stock = [] + current_stock = float(stock_info.current_stock) + + for month in range(self.analysis_window_months): + # Add any deliveries for this month + if sparepart_id in monthly_deliveries and month in monthly_deliveries[sparepart_id]: + current_stock += float(monthly_deliveries[sparepart_id][month]) + + monthly_stock.append(current_stock) + + # Note: We don't subtract usage here yet - that will be done during optimization + + projected_stocks[sparepart_id] = monthly_stock + + self.projected_stocks = projected_stocks + return projected_stocks + + def check_sparepart_availability(self, equipment_tag: str, target_month: int, + consider_other_overhauls: List[Tuple[str, int]] = None) -> Dict: + """ + Check if spareparts are available for equipment overhaul at target month + + Args: + equipment_tag: Equipment location tag + target_month: Month when overhaul is planned (0-based) + consider_other_overhauls: List of (equipment_tag, month) for other planned overhauls + + Returns: + Dict with availability status and details + """ + if equipment_tag not in self.equipment_requirements: + return { + 'available': True, + 'message': f'No sparepart requirements defined for {equipment_tag}', + 'missing_parts': [], + 'procurement_needed': [], + 'total_procurement_cost': 0, + 'can_proceed_with_delays': True, + 'pr_po_summary': { + 'existing_orders': [], + 'required_new_orders': [], + 'total_existing_value': 0, + 'total_new_orders_value': 0 + } + } + + requirements = self.equipment_requirements[equipment_tag] + missing_parts = [] + procurement_needed = [] + total_procurement_cost = 0 + + # Calculate stock after considering other overhauls + adjusted_stocks = self._calculate_adjusted_stocks(target_month, consider_other_overhauls or []) + + existing_orders = self._get_existing_orders_for_month(target_month) + + pr_po_summary = { + 'existing_orders': [], + 'required_new_orders': [], + 'total_existing_value': 0, + 'total_new_orders_value': 0, + 'orders_by_status': { + 'planned': [], + 'ordered': [], + 'received': [], + 'cancelled': [] + } + } + + + for requirement in requirements: + sparepart_id = requirement.sparepart_id + needed_quantity = requirement.quantity_required + sparepart_name = requirement.sparepart_name + + current_stock = adjusted_stocks.get(sparepart_id, 0) + + existing_sparepart_orders = [order for order in existing_orders + if order.sparepart_id == sparepart_id] + + total_ordered_quantity = sum(order.quantity for order in existing_sparepart_orders + if order.status in [ProcurementStatus.PLANNED, ProcurementStatus.ORDERED]) + + effective_stock = current_stock + total_ordered_quantity + + # if sparepart_id not in adjusted_stocks: + # missing_parts.append({ + # 'sparepart_id': sparepart_id, + # 'sparepart_name': requirement.sparepart_name, + # 'required': needed_quantity, + # 'available': 0, + # 'shortage': needed_quantity, + # 'criticality': "warning" + # }) + # continue + + # available_stock = adjusted_stocks[sparepart_id] + + # if available_stock < needed_quantity: + # shortage = needed_quantity - available_stock + # missing_parts.append({ + # 'sparepart_id': sparepart_id, + # 'sparepart_name': requirement.sparepart_name, + # 'required': needed_quantity, + # 'available': available_stock, + # 'shortage': shortage, + # 'criticality': "warning" + # }) + + # # Calculate procurement needs + # if sparepart_id in self.sparepart_stocks: + # stock_info = self.sparepart_stocks[sparepart_id] + # procurement_cost = shortage * stock_info.unit_cost + # total_procurement_cost += procurement_cost + + + # # Calculate when to order (considering lead time) + # order_month = max(0, target_month - requirement.lead_time) + + # procurement_needed.append({ + # 'sparepart_id': sparepart_id, + # 'sparepart_name': requirement.sparepart_name, + # 'quantity_needed': shortage, + # 'unit_cost': stock_info.unit_cost, + # 'total_cost': procurement_cost, + # 'order_by_month': order_month, + # 'lead_time_months': requirement.lead_time, + # 'criticality': "warning" + # }) + if effective_stock >= needed_quantity: + # Sufficient stock available (including from existing orders) + if existing_sparepart_orders: + # Add existing order info to summary + for order in existing_sparepart_orders: + order_info = { + 'po_pr_id': order.po_pr_id, + 'sparepart_id': sparepart_id, + 'sparepart_name': sparepart_name, + 'quantity': order.quantity, + 'unit_cost': order.unit_cost, + 'total_cost': order.total_cost, + 'order_date': order.order_date.isoformat(), + 'expected_delivery_date': order.expected_delivery_date.isoformat(), + 'status': order.status.value, + 'months_until_delivery': self._calculate_months_until_delivery(order.expected_delivery_date, target_month), + 'is_on_time': self._is_delivery_on_time(order.expected_delivery_date, target_month), + 'usage': 'covers_requirement' + } + pr_po_summary['existing_orders'].append(order_info) + pr_po_summary['total_existing_value'] += order.total_cost + pr_po_summary['orders_by_status'][order.status.value].append(order_info) + else: + # Insufficient stock - need additional procurement + shortage = needed_quantity - effective_stock + + missing_parts.append({ + 'sparepart_id': sparepart_id, + 'sparepart_name': sparepart_name, + 'required': needed_quantity, + 'current_stock': current_stock, + 'ordered_quantity': total_ordered_quantity, + 'effective_available': effective_stock, + 'shortage': shortage, + 'criticality': "warning", + 'existing_orders': len(existing_sparepart_orders), + 'existing_orders_details': [ + { + 'po_pr_id': order.po_pr_id, + 'quantity': order.quantity, + 'status': order.status.value, + 'expected_delivery': order.expected_delivery_date.isoformat(), + 'is_on_time': self._is_delivery_on_time(order.expected_delivery_date, target_month) + } for order in existing_sparepart_orders + ] + }) + + # Calculate additional procurement needed + if sparepart_id in self.sparepart_stocks: + stock_info = self.sparepart_stocks[sparepart_id] + procurement_cost = shortage * stock_info.unit_cost + total_procurement_cost += procurement_cost + + # Calculate when to order (considering lead time) + order_month = max(0, target_month - requirement.lead_time) + order_date = self.analysis_start_date + timedelta(days=order_month * 30) + expected_delivery = order_date + timedelta(days=requirement.lead_time * 30) + + new_order = { + 'sparepart_id': sparepart_id, + 'sparepart_name': sparepart_name, + 'quantity_needed': shortage, + 'unit_cost': stock_info.unit_cost, + 'total_cost': procurement_cost, + 'order_by_month': order_month, + 'recommended_order_date': order_date.isoformat(), + 'expected_delivery_date': expected_delivery.isoformat(), + 'lead_time_months': requirement.lead_time, + 'criticality': "warning", + 'urgency': self._calculate_urgency(order_month, target_month), + 'reason': f'Additional {shortage} units needed beyond existing orders' + } + + procurement_needed.append(new_order) + pr_po_summary['required_new_orders'].append(new_order) + pr_po_summary['total_new_orders_value'] += procurement_cost + + # Check for critical parts + critical_missing = [p for p in missing_parts if p['criticality'] == 'critical'] + + # Generate comprehensive summary + availability_summary = self._generate_comprehensive_availability_message( + missing_parts, critical_missing, pr_po_summary + ) + + return { + 'available': len(critical_missing) == 0, + 'total_missing_parts': len(missing_parts), + 'critical_missing_parts': len(critical_missing), + 'missing_parts': missing_parts, + 'procurement_needed': procurement_needed, + 'total_procurement_cost': total_procurement_cost, + 'can_proceed_with_delays': len(critical_missing) == 0, + 'message': availability_summary['message'], + 'detailed_message': availability_summary['detailed_message'], + 'pr_po_summary': pr_po_summary, + 'recommendations': self._generate_procurement_recommendations(pr_po_summary, target_month) + } + + def _calculate_months_until_delivery(self, delivery_date: date, target_month: int) -> int: + """Calculate months from target month to delivery date""" + target_date = self.analysis_start_date + timedelta(days=target_month * 30) + months_diff = (delivery_date.year - target_date.year) * 12 + (delivery_date.month - target_date.month) + return months_diff -# for field in data: -# if field in update_data: -# setattr(activity, field, update_data[field]) + def _calculate_urgency(self, order_month: int, target_month: int) -> str: + """Calculate urgency level for new procurement""" + time_gap = target_month - order_month + + if time_gap <= 1: + return "URGENT" + elif time_gap <= 3: + return "HIGH" + elif time_gap <= 6: + return "MEDIUM" + else: + return "LOW" + + def _generate_procurement_recommendations(self, pr_po_summary: Dict, target_month: int) -> List[Dict]: + """Generate procurement recommendations based on analysis""" + recommendations = [] + + # Check for late deliveries + late_orders = [order for order in pr_po_summary['existing_orders'] + if not order['is_on_time']] + + if late_orders: + recommendations.append({ + 'type': 'LATE_DELIVERY_WARNING', + 'priority': 'HIGH', + 'message': f"{len(late_orders)} existing orders will deliver late", + 'details': [f"PO/PR {order['po_pr_id']} for {order['sparepart_name']}" + for order in late_orders], + 'action': 'Expedite delivery or find alternative suppliers' + }) + + # Check for urgent new orders + urgent_orders = [order for order in pr_po_summary['required_new_orders'] + if order['urgency'] == 'URGENT'] + + if urgent_orders: + recommendations.append({ + 'type': 'URGENT_PROCUREMENT', + 'priority': 'CRITICAL', + 'message': f"{len(urgent_orders)} spareparts need immediate ordering", + 'details': [f"{order['sparepart_name']}: {order['quantity_needed']} units" + for order in urgent_orders], + 'action': 'Place orders immediately or consider expedited delivery' + }) + + # Check for cancelled orders + cancelled_orders = pr_po_summary['orders_by_status'].get('cancelled', []) + if cancelled_orders: + recommendations.append({ + 'type': 'CANCELLED_ORDER_IMPACT', + 'priority': 'MEDIUM', + 'message': f"{len(cancelled_orders)} cancelled orders may affect availability", + 'details': [f"Cancelled: PO/PR {order['po_pr_id']} for {order['sparepart_name']}" + for order in cancelled_orders], + 'action': 'Review impact and place replacement orders if necessary' + }) + + # Budget recommendations + total_investment = pr_po_summary['total_existing_value'] + pr_po_summary['total_new_orders_value'] + if total_investment > 0: + recommendations.append({ + 'type': 'BUDGET_SUMMARY', + 'priority': 'INFO', + 'message': f'Total sparepart investment: ${total_investment:,.2f}', + 'details': [ + f"Existing orders: ${pr_po_summary['total_existing_value']:,.2f}", + f"Additional orders needed: ${pr_po_summary['total_new_orders_value']:,.2f}" + ], + 'action': 'Ensure budget allocation for sparepart procurement' + }) + + return recommendations + + def _is_delivery_on_time(self, delivery_date: datetime, target_month: int) -> bool: + """Check if delivery will arrive on time for target month""" + target_date = self.analysis_start_date + timedelta(days=target_month * 30) + del_time = delivery_date.date() if delivery_date else None + + return del_time <= target_date if del_time else False -# await db_session.commit() + def _calculate_adjusted_stocks(self, target_month: int, other_overhauls: List[Tuple[str, int]]) -> Dict[str, int]: + """Calculate stock levels after considering consumption from other planned overhauls""" + adjusted_stocks = {} + + for sparepart_id, monthly_stocks in self.projected_stocks.items(): + if target_month < len(monthly_stocks): + stock_at_month = monthly_stocks[target_month] + + # Subtract consumption from other overhauls happening at or before target month + for other_equipment, other_month in other_overhauls: + if other_month <= target_month and other_equipment in self.equipment_requirements: + for req in self.equipment_requirements[other_equipment]: + if req.sparepart_id == sparepart_id: + stock_at_month -= req.quantity_required + + adjusted_stocks[sparepart_id] = max(0, stock_at_month) + + return adjusted_stocks + + def _get_existing_orders_for_month(self, target_month: int) -> List[ProcurementRecord]: + """Get existing PR/PO orders that could supply spareparts for target month""" + target_date = self.analysis_start_date + timedelta(days=target_month * 30) + + relevant_orders = [] + for record in self.procurement_records: + date_expected_delivery = record.expected_delivery_date.date() if record.expected_delivery_date else None + # Include orders that deliver before or around the target month + # and are not cancelled + if (record.status != ProcurementStatus.CANCELLED and date_expected_delivery and + date_expected_delivery <= target_date): # 15 days buffer + relevant_orders.append(record) + + return relevant_orders + + def _generate_comprehensive_availability_message(self, missing_parts: List[Dict], + critical_missing: List[Dict], + pr_po_summary: Dict) -> Dict: + """Generate comprehensive availability message with PR/PO details""" + + if not missing_parts: + if pr_po_summary['existing_orders']: + message = f"All spareparts available through {len(pr_po_summary['existing_orders'])} existing orders" + detailed_message = f"Total existing order value: ${pr_po_summary['total_existing_value']:,.2f}" + else: + message = "All spareparts available from current stock" + detailed_message = "No additional procurement required" + else: + if critical_missing: + message = f"CRITICAL: {len(critical_missing)} critical spareparts missing. Overhaul cannot proceed." + detailed_message = f"Additional procurement required: ${pr_po_summary['total_new_orders_value']:,.2f}" + else: + message = f"WARNING: {len(missing_parts)} spareparts missing, but no critical parts." + if pr_po_summary['total_new_orders_value'] > 0: + detailed_message = f"Additional procurement required: ${pr_po_summary['total_new_orders_value']:,.2f}. " + else: + detailed_message = "" + + if pr_po_summary['existing_orders']: + detailed_message += f"Existing orders cover some requirements (${pr_po_summary['total_existing_value']:,.2f})." + + return { + 'message': message, + 'detailed_message': detailed_message + } + + def _generate_availability_message(self, missing_parts: List[Dict], critical_missing: List[Dict]) -> str: + """Generate human-readable availability message""" + if not missing_parts: + return "All spareparts available" + + if critical_missing: + return f"CRITICAL: {len(critical_missing)} critical spareparts missing. Overhaul cannot proceed." + + return f"WARNING: {len(missing_parts)} spareparts missing, but no critical parts. Overhaul can proceed with procurement." + + def optimize_procurement_timing(self, planned_overhauls: List[Tuple[str, int]]) -> Dict: + """ + Optimize procurement timing for multiple equipment overhauls + + Args: + planned_overhauls: List of (equipment_tag, planned_month) tuples + + Returns: + Optimized procurement plan + """ + procurement_plan = [] + total_procurement_cost = 0 + + # Sort overhauls by month + sorted_overhauls = sorted(planned_overhauls, key=lambda x: x[1]) + + # Track cumulative procurement needs + processed_overhauls = [] + + for equipment_tag, target_month in sorted_overhauls: + availability = self.check_sparepart_availability( + equipment_tag, target_month, processed_overhauls + ) + + for procurement in availability['procurement_needed']: + procurement_plan.append({ + 'equipment_tag': equipment_tag, + 'target_overhaul_month': target_month, + **procurement + }) + total_procurement_cost += procurement['total_cost'] + + processed_overhauls.append((equipment_tag, target_month)) + + # Group by order month for better planning + procurement_by_month = defaultdict(list) + for item in procurement_plan: + procurement_by_month[item['order_by_month']].append(item) + + return { + 'total_procurement_cost': total_procurement_cost, + 'procurement_plan': procurement_plan, + 'procurement_by_month': dict(procurement_by_month), + # 'summary': self._generate_procurement_summary(procurement_plan) + } + + def _generate_procurement_summary(self, procurement_plan: List[Dict]) -> Dict: + """Generate procurement summary statistics""" + if not procurement_plan: + return {'message': 'No procurement needed'} + + critical_items = [p for p in procurement_plan if p['criticality'] == 'critical'] + total_items = len(procurement_plan) + total_cost = sum(p['total_cost'] for p in procurement_plan) + + # Group by supplier + by_supplier = defaultdict(list) + for item in procurement_plan: + by_supplier[item['supplier']].append(item) + + return { + 'total_items': total_items, + 'critical_items': len(critical_items), + 'total_cost': total_cost, + 'unique_spareparts': len(set(p['sparepart_id'] for p in procurement_plan)), + 'suppliers_involved': len(by_supplier), + 'by_supplier': { + supplier: { + 'item_count': len(items), + 'total_cost': sum(i['total_cost'] for i in items) + } + for supplier, items in by_supplier.items() + } + } + + def get_monthly_procurement_schedule(self) -> Dict[int, List[Dict]]: + """Get procurement schedule by month""" + if not hasattr(self, '_monthly_schedule'): + self._monthly_schedule = {} + return self._monthly_schedule + + def update_projected_stocks_with_consumption(self, equipment_overhauls: List[Tuple[str, int]]): + """Update projected stocks considering sparepart consumption from overhauls""" + # Create a copy of projected stocks + updated_stocks = {} + + for sparepart_id, monthly_stocks in self.projected_stocks.items(): + updated_stocks[sparepart_id] = monthly_stocks.copy() + + # Apply consumption from overhauls + for equipment_tag, overhaul_month in equipment_overhauls: + if equipment_tag in self.equipment_requirements: + for requirement in self.equipment_requirements[equipment_tag]: + sparepart_id = requirement.sparepart_id + quantity_needed = requirement.quantity_required + + if sparepart_id in updated_stocks: + # Reduce stock from overhaul month onwards + for month in range(overhaul_month, len(updated_stocks[sparepart_id])): + updated_stocks[sparepart_id][month] = max( + 0, updated_stocks[sparepart_id][month] - quantity_needed + ) + + return updated_stocks + + +# Integration functions for database operations +async def load_sparepart_data_from_db(scope, prev_oh_scope, db_session) -> SparepartManager: + """Load sparepart data from database""" + # You'll need to implement these queries based on your database schema + # Get scope dates for analysis window + # scope = await get_scope(db_session=db_session, overhaul_session_id=overhaul_session_id) + # prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope) + + analysis_start_date = prev_oh_scope.end_date + analysis_window_months = int(((scope.start_date - prev_oh_scope.end_date).days / 30) * 1.5) + + sparepart_manager = SparepartManager(analysis_start_date, analysis_window_months) + + # Load sparepart stocks + # Example query - adjust based on your schema + query = text(""" + SELECT + mi.id, + mi.itemnum, + mi.itemsetid, + mi."location", + mi.curbaltotal, + mi.avgcost, + mspl.description + FROM public.maximo_inventory mi + LEFT JOIN public.maximo_sparepart_pr_po_line mspl + ON mi.itemnum = mspl.item_num + """) + log.info("Fetch sparepart") + sparepart_stocks_query = await db_session.execute(query) + + for stock_record in sparepart_stocks_query: + stock = SparepartStock( + sparepart_id=stock_record.itemnum, + sparepart_name=stock_record.description, + current_stock=stock_record.curbaltotal, + unit_cost=stock_record.avgcost, + location=stock_record.location or "Unknown", + ) + sparepart_manager.add_sparepart_stock(stock) + + # Load equipment sparepart requirements + # You'll need to create this table/relationship + query = text("""WITH oh_workorders AS ( + -- First, get all OH work orders + SELECT DISTINCT + wonum, + asset_location + FROM public.wo_staging_maximo_2 + WHERE worktype = 'OH' AND asset_location IS NOT NULL +), +sparepart_usage AS ( + -- Get sparepart usage for OH work orders + SELECT + oh.asset_location, + mwm.itemnum, + mwm.itemqty, + mwm.wonum + FROM oh_workorders oh + INNER JOIN public.maximo_workorder_materials mwm + ON oh.wonum = mwm.wonum +), +location_sparepart_stats AS ( + -- Calculate average usage per sparepart per location + SELECT + asset_location, + itemnum, + COUNT(DISTINCT wonum) as total_wo_count, + SUM(itemqty) as total_qty_used, + AVG(itemqty) as avg_qty_per_wo, + MIN(itemqty) as min_qty_used, + MAX(itemqty) as max_qty_used + FROM sparepart_usage + GROUP BY asset_location, itemnum +), +pr_po_combined AS ( + -- Combine PR and PO data by num to get issue_date and estimated_arrival_date + SELECT + mspl.item_num, + mspl.num, + MAX(CASE WHEN mspo.type = 'PR' THEN mspo.issue_date END) as issue_date, + MAX(CASE WHEN mspo.type = 'PO' THEN mspo.estimated_arrival_date END) as estimated_arrival_date + FROM public.maximo_sparepart_pr_po_line mspl + INNER JOIN public.maximo_sparepart_pr_po mspo + ON mspl.num = mspo.num + WHERE mspo.type IN ('PR', 'PO') + GROUP BY mspl.item_num, mspl.num +), +leadtime_stats AS ( + -- Calculate lead time statistics for each item + SELECT + item_num, + ROUND(AVG( + EXTRACT(EPOCH FROM (estimated_arrival_date - issue_date)) / 86400 / 30.44 + ), 1) as avg_leadtime_months, + ROUND(MIN( + EXTRACT(EPOCH FROM (estimated_arrival_date - issue_date)) / 86400 / 30.44 + ), 1) as min_leadtime_months, + COUNT(*) as leadtime_sample_size + FROM pr_po_combined + WHERE issue_date IS NOT NULL + AND estimated_arrival_date IS NOT NULL + AND estimated_arrival_date > issue_date + GROUP BY item_num +), +item_descriptions AS ( + -- Get unique descriptions for each item (optimized) + SELECT DISTINCT + item_num, + FIRST_VALUE(description) OVER ( + PARTITION BY item_num + ORDER BY created_at DESC NULLS LAST + ) as description + FROM public.maximo_sparepart_pr_po_line + WHERE description IS NOT NULL +) +SELECT + lss.asset_location, + lss.itemnum, + COALESCE(id.description, 'No description available') as item_description, + lss.total_wo_count, + lss.total_qty_used, + ROUND(lss.avg_qty_per_wo, 2) as avg_qty_per_wo, + lss.min_qty_used, + lss.max_qty_used, + COALESCE(lt.avg_leadtime_months, 0) as avg_leadtime_months, + COALESCE(lt.min_leadtime_months, 0) as min_leadtime_months, + COALESCE(lt.leadtime_sample_size, 0) as leadtime_sample_size +FROM location_sparepart_stats lss +LEFT JOIN item_descriptions id ON lss.itemnum = id.item_num +LEFT JOIN leadtime_stats lt ON lss.itemnum = lt.item_num +ORDER BY lss.asset_location, lss.itemnum;""") + + equipment_requirements_query = await db_session.execute(query) + + equipment_requirements = defaultdict(list) + for req_record in equipment_requirements_query: + requirement = SparepartRequirement( + sparepart_id=req_record.itemnum, + quantity_required=float(req_record.avg_qty_per_wo), + lead_time=float(req_record.avg_leadtime_months), + sparepart_name=req_record.item_description + + ) + equipment_requirements[req_record.asset_location].append(requirement) + + for equipment_tag, requirements in equipment_requirements.items(): + sparepart_manager.add_equipment_requirements(equipment_tag, requirements) + + + # Load procurement records (PO/PR) + procurement_query = await get_all(db_session=db_session) + + + for proc_record in procurement_query: + procurement = ProcurementRecord( + po_pr_id=proc_record["pr_number"], + sparepart_id=proc_record["item_num"], + sparepart_name=proc_record["description"], + quantity=proc_record["pr_qty_ordered"], + unit_cost=proc_record["unit_cost"], + total_cost=proc_record["line_cost"], + order_date=proc_record['pr_issue_date'], + expected_delivery_date=proc_record['po_estimated_arrival_date'], + status=ProcurementStatus("ordered"), + ) + sparepart_manager.add_procurement_record(procurement) + + # Calculate projected stocks + sparepart_manager._project_monthly_stocks() + + return sparepart_manager -# return activity -# async def delete(*, db_session: DbSession, activity_id: str): -# """Deletes a document.""" -# activity = await db_session.get(MasterActivity, activity_id) -# await db_session.delete(activity) -# await db_session.commit() + \ No newline at end of file