You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2086 lines
86 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import datetime
from typing import Coroutine, List, Optional, Tuple,Dict
from uuid import UUID
import calendar
import httpx
from src.calculation_target_reliability.service import RBD_SERVICE_API
from src.config import REALIBILITY_SERVICE_API
import numpy as np
import requests
from fastapi import HTTPException, status
from sqlalchemy import and_, case, func, select, update
from sqlalchemy.orm import joinedload, selectinload
from src.database.core import DbSession
from src.logging import setup_logging
from src.overhaul_activity.service import get_all as get_all_by_session_id
from src.overhaul_scope.service import get as get_scope, get_prev_oh
from src.sparepart.service import load_sparepart_data_from_db
from src.utils import get_latest_numOfFail
from src.workorder.model import MasterWorkOrder
from src.sparepart.model import MasterSparePart
from src.database.core import CollectorDbSession
from src.overhaul_activity.model import OverhaulActivity
from .model import (CalculationData, CalculationEquipmentResult,
CalculationResult)
from .schema import (CalculationResultsRead,
CalculationSelectedEquipmentUpdate,
CalculationTimeConstrainsParametersCreate,
CalculationTimeConstrainsRead, OptimumResult)
from .utils import analyze_monthly_metrics, calculate_failures_per_month, calculate_risk_cost_per_failure, create_time_series_data, failures_per_month, fetch_reliability, get_monthly_risk_analysis, get_months_between, simulate_failures
from src.equipment_sparepart.model import ScopeEquipmentPart
import copy
import random
import math
from src.overhaul_activity.service import get_standard_scope_by_session_id
from collections import defaultdict
from datetime import timedelta
import pandas as pd
import logging
import aiohttp
from datetime import datetime, date
import asyncio
import json
# from src.utils import save_to_pastebin
client = httpx.AsyncClient(timeout=300.0)
log = logging.getLogger(__name__)
setup_logging(logger=log)
class OptimumCostModel:
def __init__(self, token: str, last_oh_date: date, next_oh_date: date,
time_window_months: Optional[int] = None,
base_url: str = "http://192.168.1.82:8000"):
"""
Initialize the Optimum Cost Model for overhaul timing optimization.
Args:
token: API authentication token
last_oh_date: Date of last overhaul
next_oh_date: Planned date of next overhaul
time_window_months: Analysis window in months (default: 1.5x planned interval)
base_url: API base URL
"""
self.api_base_url = base_url
self.token = token
self.last_oh_date = last_oh_date
self.next_oh_date = next_oh_date
self.session = None
# Calculate planned overhaul interval in months
self.planned_oh_months = self._get_months_between(last_oh_date, next_oh_date)
# Set analysis time window (default: 1.5x planned interval)
self.time_window_months = time_window_months or int(self.planned_oh_months * 1.5)
# Pre-calculate date range for API calls
self.date_range = self._generate_date_range()
# Setup logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
self.logger.info(f"OptimumCostModel initialized:")
self.logger.info(f" - Planned OH interval: {self.planned_oh_months} months")
self.logger.info(f" - Analysis window: {self.time_window_months} months")
def _get_months_between(self, start_date: date, end_date: date) -> int:
"""Calculate number of months between two dates"""
return (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month)
def _generate_date_range(self) -> List[datetime]:
"""Generate date range for analysis based on time window"""
dates = []
current_date = datetime.combine(self.last_oh_date, datetime.min.time())
end_date = current_date + timedelta(days=self.time_window_months * 30)
while current_date <= end_date:
dates.append(current_date)
current_date += timedelta(days=31)
return dates
async def _create_session(self):
"""Create aiohttp session with connection pooling"""
if self.session is None:
timeout = aiohttp.ClientTimeout(total=300)
connector = aiohttp.TCPConnector(
limit=500,
limit_per_host=200,
ttl_dns_cache=300,
use_dns_cache=True,
force_close=False,
enable_cleanup_closed=True
)
self.session = aiohttp.ClientSession(
timeout=timeout,
connector=connector,
headers={'Authorization': f'Bearer {self.token}'}
)
async def _close_session(self):
"""Close aiohttp session"""
if self.session:
await self.session.close()
self.session = None
async def get_failures_prediction(self, simulation_id: str, location_tag: str, birnbaum_importance: float):
"""Get failure predictions for equipment from simulation service"""
plot_result_url = f"{self.api_base_url}/aeros/simulation/result/plot/{simulation_id}/{location_tag}?use_location_tag=1"
try:
response = requests.get(
plot_result_url,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.token}",
},
timeout=30
)
response.raise_for_status()
prediction_data = response.json()
except (requests.RequestException, ValueError) as e:
self.logger.error(f"Failed to fetch prediction data for {location_tag}: {e}")
return None
plot_data = prediction_data.get('data', {}).get('timestamp_outs') if prediction_data.get("data") else None
if not plot_data:
self.logger.warning(f"No plot data available for {location_tag}")
return None
time_series = create_time_series_data(plot_data, 43830)
monthly_data = analyze_monthly_metrics(time_series)
return monthly_data
async def get_simulation_results(self, simulation_id: str = "default"):
"""Get simulation results for Birnbaum importance calculations"""
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
calc_result_url = f"{self.api_base_url}/aeros/simulation/result/calc/{simulation_id}?nodetype=RegularNode"
plant_result_url = f"{self.api_base_url}/aeros/simulation/result/calc/{simulation_id}/plant"
async with httpx.AsyncClient(timeout=300.0) as client:
calc_task = client.get(calc_result_url, headers=headers)
plant_task = client.get(plant_result_url, headers=headers)
calc_response, plant_response = await asyncio.gather(calc_task, plant_task)
calc_response.raise_for_status()
plant_response.raise_for_status()
calc_data = calc_response.json()["data"]
plant_data = plant_response.json()["data"]
return {
"calc_result": calc_data,
"plant_result": plant_data
}
def _calculate_equipment_costs(self, failures_prediction: Dict, birnbaum_importance: float,
preventive_cost: float, failure_replacement_cost: float,
location_tag: str) -> List[Dict]:
"""Calculate costs for each month for a single equipment"""
if not failures_prediction:
self.logger.warning(f"No failure prediction data for {location_tag}")
return []
months = list(failures_prediction.keys())
num_months = len(months)
# Calculate risk costs and failure costs
risk_costs = []
cumulative_risk_costs = []
failure_counts = []
cumulative_risk = 0
for month_key in months:
data = failures_prediction[month_key]
# Risk cost = flow_rate × birnbaum_importance × downtime_hours × energy_price
monthly_risk = data['avg_flow_rate'] * birnbaum_importance * data['total_oos_hours'] * 1000000
risk_costs.append(monthly_risk)
cumulative_risk += monthly_risk
cumulative_risk_costs.append(cumulative_risk)
failure_counts.append(data['cumulative_failures'])
raise Exception(cumulative_risk_costs)
# Calculate costs for each month
results = []
for i in range(num_months):
month_index = i + 1
# Failure cost = cumulative failures × replacement cost + cumulative risk cost
failure_cost = (failure_counts[i] * failure_replacement_cost) + cumulative_risk_costs[i]
# Preventive cost = overhaul cost distributed over months
preventive_cost_month = preventive_cost / month_index
# Total cost = failure cost + preventive cost
total_cost = failure_cost + preventive_cost_month
results.append({
'month': month_index,
'number_of_failures': failure_counts[i],
'failure_cost': failure_cost,
'preventive_cost': preventive_cost_month,
'total_cost': total_cost,
'is_after_planned_oh': month_index > self.planned_oh_months,
'delay_months': max(0, month_index - self.planned_oh_months),
'risk_cost': cumulative_risk_costs[i],
'monthly_risk_cost': risk_costs[i],
'procurement_cost': 0, # For database compatibility
'procurement_details': [] # For database compatibility
})
return results
def _find_optimal_timing(self, cost_results: List[Dict], location_tag: str) -> Optional[Dict]:
"""Find optimal timing for equipment overhaul"""
if not cost_results:
return None
# Find month with minimum total cost
min_cost = float('inf')
optimal_result = None
optimal_index = -1
for i, result in enumerate(cost_results):
if result['total_cost'] < min_cost:
min_cost = result['total_cost']
optimal_result = result
optimal_index = i
if optimal_result is None:
return None
# Calculate cost comparison with planned timing
planned_cost = None
cost_vs_planned = None
if self.planned_oh_months <= len(cost_results):
planned_cost = cost_results[self.planned_oh_months - 1]['total_cost']
cost_vs_planned = optimal_result['total_cost'] - planned_cost
return {
'location_tag': location_tag,
'optimal_month': optimal_result['month'],
'optimal_index': optimal_index,
'optimal_cost': optimal_result['total_cost'],
'failure_cost': optimal_result['failure_cost'],
'preventive_cost': optimal_result['preventive_cost'],
'number_of_failures': optimal_result['number_of_failures'],
'is_delayed': optimal_result['is_after_planned_oh'],
'delay_months': optimal_result['delay_months'],
'planned_oh_month': self.planned_oh_months,
'planned_cost': planned_cost,
'cost_vs_planned': cost_vs_planned,
'savings_from_delay': -cost_vs_planned if cost_vs_planned and cost_vs_planned < 0 else 0,
'cost_of_delay': cost_vs_planned if cost_vs_planned and cost_vs_planned > 0 else 0,
'all_monthly_costs': cost_results
}
async def calculate_optimal_timing_single_equipment(self, equipment, birnbaum_importance: float,
simulation_id: str = "default") -> Optional[Dict]:
"""Calculate optimal overhaul timing for a single equipment"""
location_tag = equipment.location_tag
self.logger.info(f"Calculating optimal timing for {location_tag}")
# Get failure predictions
monthly_data = await self.get_failures_prediction(simulation_id, location_tag, birnbaum_importance)
if not monthly_data:
self.logger.warning(f"No monthly data available for {location_tag}")
return None
# Calculate costs
preventive_cost = equipment.overhaul_cost + equipment.service_cost
failure_replacement_cost = equipment.material_cost + (3 * 111000 * 3) # Material + Labor
cost_results = self._calculate_equipment_costs(
failures_prediction=monthly_data,
birnbaum_importance=birnbaum_importance,
preventive_cost=preventive_cost,
failure_replacement_cost=failure_replacement_cost,
location_tag=location_tag
)
# Find optimal timing
optimal_timing = self._find_optimal_timing(cost_results, location_tag)
if optimal_timing:
self.logger.info(f"Optimal timing for {location_tag}: Month {optimal_timing['optimal_month']} "
f"(Cost: ${optimal_timing['optimal_cost']:,.2f})")
if optimal_timing['is_delayed']:
self.logger.info(f" - Delay recommended: {optimal_timing['delay_months']} months")
self.logger.info(f" - Savings from delay: ${optimal_timing['savings_from_delay']:,.2f}")
return optimal_timing
async def calculate_cost_all_equipment(self, db_session, equipments: List, calculation,
preventive_cost: float, simulation_id: str = "default") -> Dict:
"""
Calculate optimal overhaul timing for entire fleet and save to database
"""
self.logger.info(f"Starting fleet optimization for {len(equipments)} equipment items")
max_interval = self.time_window_months
# Get Birnbaum importance values
try:
importance_results = await self.get_simulation_results(simulation_id)
equipment_birnbaum = {
imp['aeros_node']['node_name']: imp['contribution']
for imp in importance_results["calc_result"]
}
except Exception as e:
self.logger.error(f"Failed to get simulation results: {e}")
equipment_birnbaum = {}
# Initialize fleet aggregation arrays
fleet_results = []
total_corrective_costs = np.zeros(max_interval)
total_preventive_costs = np.zeros(max_interval)
total_procurement_costs = np.zeros(max_interval)
total_costs = np.zeros(max_interval)
for equipment in equipments:
location_tag = equipment.location_tag
birnbaum = equipment_birnbaum.get(location_tag, 0.0)
if birnbaum == 0.0:
self.logger.warning(f"No Birnbaum importance found for {location_tag}, using 0.0")
try:
# Get failure predictions
monthly_data = await self.get_failures_prediction(simulation_id, location_tag, birnbaum)
if not monthly_data:
continue
# Calculate costs
equipment_preventive_cost = equipment.overhaul_cost + equipment.service_cost
failure_replacement_cost = equipment.material_cost + (3 * 111000 * 3)
cost_results = self._calculate_equipment_costs(
failures_prediction=monthly_data,
birnbaum_importance=birnbaum,
preventive_cost=equipment_preventive_cost,
failure_replacement_cost=failure_replacement_cost,
location_tag=location_tag
)
if not cost_results:
continue
# Find optimal timing
optimal_timing = self._find_optimal_timing(cost_results, location_tag)
if not optimal_timing:
continue
# Prepare arrays for database (pad to max_interval length)
corrective_costs = [r["failure_cost"] for r in cost_results]
preventive_costs = [r["preventive_cost"] for r in cost_results]
procurement_costs = [r["procurement_cost"] for r in cost_results]
failures = [r["number_of_failures"] for r in cost_results]
total_costs_equipment = [r['total_cost'] for r in cost_results]
procurement_details = [r["procurement_details"] for r in cost_results]
# Pad arrays to max_interval length
def pad_array(arr, target_length):
if len(arr) < target_length:
return arr + [arr[-1]] * (target_length - len(arr)) # Use last value for padding
return arr[:target_length]
corrective_costs = pad_array(corrective_costs, max_interval)
preventive_costs = pad_array(preventive_costs, max_interval)
procurement_costs = pad_array(procurement_costs, max_interval)
failures = pad_array(failures, max_interval)
total_costs_equipment = pad_array(total_costs_equipment, max_interval)
procurement_details = pad_array(procurement_details, max_interval)
# Create database result object
equipment_result = CalculationEquipmentResult(
corrective_costs=corrective_costs,
overhaul_costs=preventive_costs,
procurement_costs=procurement_costs,
daily_failures=failures,
location_tag=equipment.location_tag,
material_cost=equipment.material_cost,
service_cost=equipment.service_cost,
optimum_day=optimal_timing['optimal_index'],
calculation_data_id=calculation.id,
procurement_details=procurement_details
)
fleet_results.append(equipment_result)
# Aggregate costs for fleet analysis
total_corrective_costs += np.array(corrective_costs)
total_preventive_costs += np.array(preventive_costs)
total_procurement_costs += np.array(procurement_costs)
total_costs += np.array(total_costs_equipment)
self.logger.info(f"Processed {location_tag}: Optimal month {optimal_timing['optimal_month']}")
except Exception as e:
self.logger.error(f"Failed to calculate timing for {location_tag}: {e}")
continue
# Calculate fleet optimal interval
fleet_optimal_index = np.argmin(total_costs)
fleet_optimal_cost = total_costs[fleet_optimal_index]
# Update calculation with results
calculation.optimum_oh_day = fleet_optimal_index
calculation.max_interval = max_interval
# Save all results to database
db_session.add_all(fleet_results)
await db_session.commit()
self.logger.info(f"Fleet optimization completed:")
self.logger.info(f" - Fleet optimal month: {fleet_optimal_index + 1}")
self.logger.info(f" - Fleet optimal cost: ${fleet_optimal_cost:,.2f}")
self.logger.info(f" - Results saved to database for {len(fleet_results)} equipment")
return {
'id': calculation.id,
'fleet_results': fleet_results,
'fleet_optimal_interval': fleet_optimal_index + 1,
'fleet_optimal_cost': fleet_optimal_cost,
'total_corrective_costs': total_corrective_costs.tolist(),
'total_preventive_costs': total_preventive_costs.tolist(),
'total_procurement_costs': total_procurement_costs.tolist(),
'analysis_parameters': {
'planned_oh_months': self.planned_oh_months,
'analysis_window_months': self.time_window_months,
'last_oh_date': self.last_oh_date.isoformat(),
'next_oh_date': self.next_oh_date.isoformat()
}
}
class OptimumCostModelWithSpareparts:
def __init__(self, token: str, last_oh_date: date, next_oh_date: date,
sparepart_manager,
time_window_months: Optional[int] = None,
base_url: str = "http://192.168.1.82:8000"):
"""
Initialize the Optimum Cost Model with sparepart management
"""
self.api_base_url = base_url
self.token = token
self.last_oh_date = last_oh_date
self.next_oh_date = next_oh_date
self.session = None
self.sparepart_manager = sparepart_manager
# Calculate planned overhaul interval in months
self.planned_oh_months = self._get_months_between(last_oh_date, next_oh_date)
# Set analysis time window (default: 1.5x planned interval)
self.time_window_months = time_window_months or int(self.planned_oh_months * 1.5)
# Pre-calculate date range for API calls
self.date_range = self._generate_date_range()
self.logger = log
def _get_months_between(self, start_date: date, end_date: date) -> int:
"""Calculate number of months between two dates"""
return (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month)
def _generate_date_range(self) -> List[datetime]:
"""Generate date range for analysis based on time window"""
dates = []
current_date = datetime.combine(self.last_oh_date, datetime.min.time())
end_date = current_date + timedelta(days=self.time_window_months * 30)
while current_date <= end_date:
dates.append(current_date)
current_date += timedelta(days=31)
return dates
async def _create_session(self):
"""Create aiohttp session with connection pooling"""
if self.session is None:
timeout = aiohttp.ClientTimeout(total=300)
connector = aiohttp.TCPConnector(
limit=500,
limit_per_host=200,
ttl_dns_cache=300,
use_dns_cache=True,
force_close=False,
enable_cleanup_closed=True
)
self.session = aiohttp.ClientSession(
timeout=timeout,
connector=connector,
headers={'Authorization': f'Bearer {self.token}'}
)
async def _close_session(self):
"""Close aiohttp session"""
if self.session:
await self.session.close()
self.session = None
async def max_flowrate(self, simulation_id: str, location_tag: str):
"""Get failure predictions for equipment from simulation service"""
plot_result_url = f"{self.api_base_url}/aeros/simulation/result/plot/{simulation_id}/{location_tag}?use_location_tag=1"
try:
response = requests.get(
plot_result_url,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.token}",
},
timeout=30
)
response.raise_for_status()
prediction_data = response.json()
except (requests.RequestException, ValueError) as e:
self.logger.error(f"Failed to fetch prediction data for {location_tag}: {e}")
return None
data = prediction_data.get('data', {})
if not data:
return None
max_flowrate = data.get("max_flow_rate")
return max_flowrate
# plot_data = prediction_data.get('data', {}).get('timestamp_outs') if prediction_data.get("data") else None
# if not plot_data:
# self.logger.warning(f"No plot data available for {location_tag}")
# return None
# time_series = create_time_series_data(plot_data, 43830)
# monthly_data = analyze_monthly_metrics(time_series)
# return monthly_data
async def get_simulation_results(self, simulation_id: str = "default"):
"""Get simulation results for Birnbaum importance calculations"""
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
calc_result_url = f"{self.api_base_url}/aeros/simulation/result/calc/{simulation_id}?nodetype=RegularNode"
plant_result_url = f"{self.api_base_url}/aeros/simulation/result/calc/{simulation_id}/plant"
async with httpx.AsyncClient(timeout=300.0) as client:
calc_task = client.get(calc_result_url, headers=headers)
plant_task = client.get(plant_result_url, headers=headers)
calc_response, plant_response = await asyncio.gather(calc_task, plant_task)
calc_response.raise_for_status()
plant_response.raise_for_status()
calc_data = calc_response.json()["data"]
plant_data = plant_response.json()["data"]
return {
"calc_result": calc_data,
"plant_result": plant_data
}
def _calculate_equipment_costs_with_spareparts(self, failures_prediction: list, birnbaum_importance: float,
preventive_cost: float, failure_replacement_cost: float, max_interval:int,
location_tag: str, planned_overhauls: List = None) -> List[Dict]:
"""Calculate costs for each month including sparepart costs and availability"""
if not failures_prediction:
self.logger.warning(f"No failure prediction data for {location_tag}")
return []
# months = list(failures_prediction.keys())
num_months = max_interval
# Calculate basic costs (same as before)
risk_costs = []
cumulative_risk_costs = []
failure_counts = []
cumulative_risk = 0
for i in range(num_months):
data = failures_prediction[i]
monthly_risk = data['avg_flowrate'] * birnbaum_importance * data['total_oos_hours'] * 1000000
risk_costs.append(monthly_risk)
cumulative_risk += monthly_risk
cumulative_risk_costs.append(cumulative_risk)
failure_counts.append(data['cumulative_failures'])
# Calculate costs for each month including sparepart considerations
results = []
for i in range(num_months):
month_index = i + 1
# Basic failure and preventive costs
failure_cost = (failure_counts[i] * (failure_replacement_cost)) + cumulative_risk_costs[i]
preventive_cost_month = preventive_cost / month_index
# Check sparepart availability for this month
sparepart_analysis = self._analyze_sparepart_availability(
location_tag, month_index - 1, planned_overhauls or []
)
# Calculate procurement costs if spareparts are needed
procurement_cost = sparepart_analysis['total_procurement_cost']
procurement_details = sparepart_analysis
# Adjust total cost based on sparepart availability
if not sparepart_analysis['available']:
total_cost = failure_cost + preventive_cost_month + procurement_cost
else:
# All spareparts available
total_cost = failure_cost + preventive_cost_month + procurement_cost
results.append({
'month': month_index,
'number_of_failures': failure_counts[i],
'failure_cost': failure_cost,
'preventive_cost': preventive_cost_month,
'procurement_cost': procurement_cost,
'total_cost': total_cost,
'is_after_planned_oh': month_index > self.planned_oh_months,
'delay_months': max(0, month_index - self.planned_oh_months),
'risk_cost': cumulative_risk_costs[i],
'monthly_risk_cost': risk_costs[i],
'procurement_details': procurement_details,
'sparepart_available': sparepart_analysis['available'],
'sparepart_status': sparepart_analysis['message'],
'can_proceed': sparepart_analysis['can_proceed_with_delays']
})
return results
def _analyze_sparepart_availability(self, equipment_tag: str, target_month: int,
planned_overhauls: List) -> Dict:
"""Analyze sparepart availability for equipment at target month"""
if not self.sparepart_manager:
return {
'available': True,
'message': 'Sparepart manager not initialized',
'total_procurement_cost': 0,
'procurement_needed': [],
'can_proceed_with_delays': True
}
# Convert planned overhauls to format expected by sparepart manager
other_overhauls = [(eq_tag, month) for eq_tag, month in planned_overhauls
if eq_tag != equipment_tag and month <= target_month]
return self.sparepart_manager.check_sparepart_availability(
equipment_tag, target_month, other_overhauls
)
def _find_optimal_timing_with_spareparts(self, cost_results: List[Dict], location_tag: str) -> Optional[Dict]:
"""Find optimal timing considering sparepart constraints"""
if not cost_results:
return None
# Filter out months where overhaul cannot proceed due to critical sparepart shortages
feasible_results = [r for r in cost_results if r['can_proceed']]
# if not feasible_results:
# self.logger.warning(f"No feasible overhaul months for {location_tag} due to sparepart constraints")
# # Return the earliest month with the least critical parts missing
# min_critical_missing = min(r['missing_critical_parts'] for r in cost_results)
# for result in cost_results:
# if result['missing_critical_parts'] == min_critical_missing:
# return self._create_optimal_result(result, location_tag, "INFEASIBLE")
# return None
# Find month with minimum total cost among feasible options
min_cost = float('inf')
optimal_result = None
optimal_index = -1
for i, result in enumerate(cost_results):
if result in feasible_results and result['total_cost'] < min_cost:
min_cost = result['total_cost']
optimal_result = result
optimal_index = i
if optimal_result is None:
return None
return self._create_optimal_result(optimal_result, location_tag, "OPTIMAL")
def _create_optimal_result(self, optimal_result: Dict, location_tag: str, status: str) -> Dict:
"""Create standardized optimal result dictionary"""
# Calculate cost comparison with planned timing
planned_cost = None
cost_vs_planned = None
if self.planned_oh_months <= len(optimal_result.get('all_monthly_costs', [])):
# This would need the full cost results array
pass # Will be calculated in the calling function
return {
'location_tag': location_tag,
'optimal_month': optimal_result['month'],
'optimal_index': optimal_result['month'] - 1,
'optimal_cost': optimal_result['total_cost'],
'failure_cost': optimal_result['failure_cost'],
'preventive_cost': optimal_result['preventive_cost'],
'procurement_cost': optimal_result['procurement_cost'],
'number_of_failures': optimal_result['number_of_failures'],
'is_delayed': optimal_result['is_after_planned_oh'],
'delay_months': optimal_result['delay_months'],
'planned_oh_month': self.planned_oh_months,
'planned_cost': planned_cost,
'cost_vs_planned': cost_vs_planned,
'savings_from_delay': 0, # Will be calculated later
'cost_of_delay': 0, # Will be calculated later
'sparepart_available': optimal_result['sparepart_available'],
'sparepart_status': optimal_result['sparepart_status'],
'procurement_details': optimal_result['procurement_details'],
# 'missing_critical_parts': optimal_result['missing_critical_parts'],
'optimization_status': status,
'all_monthly_costs': [] # Will be filled by calling function
}
async def calculate_cost_all_equipment_with_spareparts(self, db_session,collector_db_session ,equipments: List,
calculation, preventive_cost: float,
simulation_id: str = "default") -> Dict:
"""
Calculate optimal overhaul timing for entire fleet considering sparepart constraints
"""
self.logger.info(f"Starting fleet optimization with sparepart management for {len(equipments)} equipment")
max_interval = self.time_window_months
# Get Birnbaum importance values
try:
importance_results = await self.get_simulation_results(simulation_id)
equipment_birnbaum = {
imp['aeros_node']['node_name']: imp['contribution']
for imp in importance_results["calc_result"]
}
except Exception as e:
self.logger.error(f"Failed to get simulation results: {e}")
equipment_birnbaum = {}
location_tags = [equipment.location_tag for equipment in equipments]
reliabity_parameter = {
res['location_tag']: res for res in fetch_reliability(location_tags)
}
# Phase 1: Calculate individual optimal timings without considering interactions
individual_results = {}
for equipment in equipments:
location_tag = equipment.location_tag
birnbaum = equipment_birnbaum.get(location_tag, 0.0)
asset_reliability = reliabity_parameter.get(location_tag)
distribution = asset_reliability.get("distribution")
parameters = asset_reliability.get("parameters", {})
try:
# Get failure predictions
max_flowrate = await self.max_flowrate(simulation_id, location_tag) or 15
results = simulate_failures(distribution,parameters , 3, max_flowrate, months=max_interval, runs=500)
# Calculate costs without considering other equipment (first pass)
equipment_preventive_cost = equipment.overhaul_cost + equipment.service_cost
failure_replacement_cost = equipment.material_cost + (3 * 111000 * 3)
cost_results = self._calculate_equipment_costs_with_spareparts(
failures_prediction=results,
birnbaum_importance=birnbaum,
preventive_cost=equipment_preventive_cost,
failure_replacement_cost=failure_replacement_cost,
location_tag=location_tag,
planned_overhauls=[], # Empty in first pass
max_interval=max_interval
)
if not cost_results:
continue
# Find individual optimal timing
optimal_timing = self._find_optimal_timing_with_spareparts(cost_results, location_tag)
if optimal_timing:
optimal_timing['all_monthly_costs'] = cost_results
individual_results[location_tag] = optimal_timing
self.logger.info(f"Individual optimal for {location_tag}: Month {optimal_timing['optimal_month']}")
except Exception as e:
self.logger.error(f"Failed to calculate individual timing for {location_tag}: {e}")
raise Exception(e)
# Phase 2: Optimize considering sparepart interactions
self.logger.info("Phase 2: Optimizing with sparepart interactions...")
# Start with individual optimal timings
current_plan = [(tag, result['optimal_month']) for tag, result in individual_results.items()]
# Iteratively improve the plan considering sparepart constraints
improved_plan = self._optimize_fleet_with_sparepart_constraints(
individual_results, equipments, equipment_birnbaum, simulation_id
)
# Phase 3: Generate final results and database objects
fleet_results = []
total_corrective_costs = np.zeros(max_interval)
total_preventive_costs = np.zeros(max_interval)
total_procurement_costs = np.zeros(max_interval)
total_costs = np.zeros(max_interval)
total_fleet_procurement_cost = 0
for equipment in equipments:
location_tag = equipment.location_tag
if location_tag not in individual_results:
continue
# Get the optimized timing from improved plan
equipment_timing = next((month for tag, month in improved_plan if tag == location_tag),
individual_results[location_tag]['optimal_month'])
# Get the cost data for this timing
cost_data = individual_results[location_tag]['all_monthly_costs'][equipment_timing - 1]
# Prepare arrays for database
all_costs = individual_results[location_tag]['all_monthly_costs']
corrective_costs = [r["failure_cost"] for r in all_costs]
preventive_costs = [r["preventive_cost"] for r in all_costs]
procurement_costs = [r["procurement_cost"] for r in all_costs]
failures = [r["number_of_failures"] for r in all_costs]
total_costs_equipment = [r['total_cost'] for r in all_costs]
procurement_details = [r["procurement_details"] for r in all_costs]
# Pad arrays to max_interval length
def pad_array(arr, target_length):
if len(arr) < target_length:
return arr + [arr[-1]] * (target_length - len(arr))
return arr[:target_length]
corrective_costs = pad_array(corrective_costs, max_interval)
preventive_costs = pad_array(preventive_costs, max_interval)
procurement_costs = pad_array(procurement_costs, max_interval)
failures = pad_array(failures, max_interval)
total_costs_equipment = pad_array(total_costs_equipment, max_interval)
procurement_details = pad_array(procurement_details, max_interval)
# Create database result object
equipment_result = CalculationEquipmentResult(
corrective_costs=corrective_costs,
overhaul_costs=preventive_costs,
procurement_costs=procurement_costs,
daily_failures=failures,
location_tag=equipment.location_tag,
material_cost=equipment.material_cost,
service_cost=equipment.service_cost,
optimum_day=equipment_timing - 1, # Convert to 0-based index
calculation_data_id=calculation.id,
procurement_details=procurement_details
)
fleet_results.append(equipment_result)
# Aggregate costs for fleet analysis
total_corrective_costs += np.array(corrective_costs)
total_preventive_costs += np.array(preventive_costs)
total_procurement_costs += np.array(procurement_costs)
total_costs += np.array(total_costs_equipment)
# Add to total fleet procurement cost
total_fleet_procurement_cost += cost_data['procurement_cost']
self.logger.info(f"Final timing for {location_tag}: Month {equipment_timing} "
f"(Procurement cost: ${cost_data['procurement_cost']:,.2f})")
# Calculate fleet optimal interval
fleet_optimal_index = np.argmin(total_costs)
fleet_optimal_cost = total_costs[fleet_optimal_index]
# Generate procurement optimization report
procurement_plan = self.sparepart_manager.optimize_procurement_timing(improved_plan)
# Update calculation with results
calculation.optimum_oh_day = fleet_optimal_index
calculation.max_interval = max_interval
# Save all results to database
db_session.add_all(fleet_results)
await db_session.commit()
self.logger.info(f"Fleet optimization with spareparts completed:")
self.logger.info(f" - Fleet optimal month: {fleet_optimal_index + 1}")
self.logger.info(f" - Fleet optimal cost: ${fleet_optimal_cost:,.2f}")
self.logger.info(f" - Total procurement cost: ${total_fleet_procurement_cost:,.2f}")
self.logger.info(f" - Equipment with sparepart constraints: {len([r for r in individual_results.values() if not r['sparepart_available']])}")
return {
'id': calculation.id,
'fleet_results': fleet_results,
'fleet_optimal_interval': fleet_optimal_index + 1,
'fleet_optimal_cost': fleet_optimal_cost,
'total_corrective_costs': total_corrective_costs.tolist(),
'total_preventive_costs': total_preventive_costs.tolist(),
'total_procurement_costs': total_procurement_costs.tolist(),
'individual_results': individual_results,
'optimized_plan': improved_plan,
'procurement_plan': procurement_plan,
'total_fleet_procurement_cost': total_fleet_procurement_cost,
'analysis_parameters': {
'planned_oh_months': self.planned_oh_months,
'analysis_window_months': self.time_window_months,
'last_oh_date': self.last_oh_date.isoformat(),
'next_oh_date': self.next_oh_date.isoformat(),
'sparepart_optimization_enabled': True
}
}
def _optimize_fleet_with_sparepart_constraints(self, individual_results: Dict, equipments: List,
equipment_birnbaum: Dict, simulation_id: str) -> List[Tuple[str, int]]:
"""
Optimize fleet overhaul timing considering sparepart sharing constraints
"""
# Start with individual optimal timings
current_plan = [(tag, result['optimal_month']) for tag, result in individual_results.items()]
# Sort by optimal month to process in chronological order
current_plan.sort(key=lambda x: x[1])
improved_plan = []
processed_equipment = []
for equipment_tag, optimal_month in current_plan:
# Check sparepart availability considering already processed equipment
sparepart_analysis = self.sparepart_manager.check_sparepart_availability(
equipment_tag, optimal_month - 1, processed_equipment
)
if sparepart_analysis['available'] or sparepart_analysis['can_proceed_with_delays']:
# Can proceed with optimal timing
improved_plan.append((equipment_tag, optimal_month))
processed_equipment.append((equipment_tag, optimal_month))
self.logger.info(f"Confirmed optimal timing for {equipment_tag}: Month {optimal_month}")
else:
# Need to find alternative timing
alternative_month = self._find_alternative_timing(
equipment_tag, optimal_month, individual_results[equipment_tag]['all_monthly_costs'],
processed_equipment
)
if alternative_month:
improved_plan.append((equipment_tag, alternative_month))
processed_equipment.append((equipment_tag, alternative_month))
self.logger.info(f"Alternative timing for {equipment_tag}: Month {alternative_month} "
f"(was {optimal_month})")
else:
# Force original timing with procurement
improved_plan.append((equipment_tag, optimal_month))
processed_equipment.append((equipment_tag, optimal_month))
self.logger.warning(f"Forced timing for {equipment_tag}: Month {optimal_month} "
f"(requires emergency procurement)")
return improved_plan
def _find_alternative_timing(self, equipment_tag: str, preferred_month: int,
cost_results: List[Dict], processed_equipment: List[Tuple[str, int]]) -> Optional[int]:
"""
Find alternative timing when preferred month has sparepart constraints
"""
# Try months around the preferred timing
search_range = 6 # Look 3 months before and after
candidates = []
for offset in range(-search_range//2, search_range//2 + 1):
candidate_month = preferred_month + offset
if candidate_month <= 0 or candidate_month > len(cost_results):
continue
if candidate_month == preferred_month:
continue # Already know this doesn't work
# Check sparepart availability for this month
sparepart_analysis = self.sparepart_manager.check_sparepart_availability(
equipment_tag, candidate_month - 1, processed_equipment
)
if sparepart_analysis['available'] or sparepart_analysis['can_proceed_with_delays']:
cost_data = cost_results[candidate_month - 1]
candidates.append((candidate_month, cost_data['total_cost']))
if not candidates:
return None
# Return the month with lowest cost among feasible alternatives
candidates.sort(key=lambda x: x[1])
return candidates[0][0]
def generate_sparepart_report(self, results: Dict) -> str:
"""Generate comprehensive sparepart analysis report"""
individual_results = results['individual_results']
procurement_plan = results['procurement_plan']
report = f"""
SPAREPART ANALYSIS REPORT
{'='*50}
FLEET SUMMARY:
- Total equipment analyzed: {len(individual_results)}
- Total procurement cost: ${results['total_fleet_procurement_cost']:,.2f}
- Equipment requiring procurement: {len([r for r in individual_results.values() if r['procurement_cost'] > 0])}
PROCUREMENT SUMMARY:
- Total procurement items: {procurement_plan['summary']['total_items'] if 'summary' in procurement_plan else 0}
- Critical items: {procurement_plan['summary']['critical_items'] if 'summary' in procurement_plan else 0}
- Unique spareparts: {procurement_plan['summary']['unique_spareparts'] if 'summary' in procurement_plan else 0}
- Suppliers involved: {procurement_plan['summary']['suppliers_involved'] if 'summary' in procurement_plan else 0}
EQUIPMENT DETAILS:
"""
for tag, result in individual_results.items():
status = "✓ Available" if result['sparepart_available'] else "⚠ Procurement needed"
report += f"- {tag}: Month {result['optimal_month']} - {status}"
if result['procurement_cost'] > 0:
report += f" (${result['procurement_cost']:,.2f})"
report += "\n"
if procurement_plan.get('procurement_by_month'):
report += "\nPROCUREMENT SCHEDULE:\n"
for month, items in procurement_plan['procurement_by_month'].items():
month_cost = sum(item['total_cost'] for item in items)
report += f"Month {month + 1}: {len(items)} items - ${month_cost:,.2f}\n"
return report
async def __aenter__(self):
"""Async context manager entry"""
await self._create_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
await self._close_session()
# Updated run_simulation function with sparepart management
async def run_simulation_with_spareparts(*, db_session, calculation, token: str, collector_db_session,
time_window_months: Optional[int] = None,
simulation_id: str = "default") -> Dict:
"""
Run complete overhaul optimization simulation with sparepart management
"""
# Get equipment and scope data
equipments = await get_standard_scope_by_session_id(
db_session=db_session,
overhaul_session_id=calculation.overhaul_session_id,
collector_db=collector_db_session
)
scope = await get_scope(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id)
prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope)
calculation_data = await get_calculation_data_by_id(
db_session=db_session, calculation_id=calculation.id
)
sparepart_manager = await load_sparepart_data_from_db(scope=scope, prev_oh_scope=prev_oh_scope, db_session=collector_db_session)
# Initialize optimization model with sparepart management
optimum_oh_model = OptimumCostModelWithSpareparts(
token=token,
last_oh_date=prev_oh_scope.end_date,
next_oh_date=scope.start_date,
time_window_months=time_window_months,
base_url=RBD_SERVICE_API,
sparepart_manager=sparepart_manager
)
try:
# Run fleet optimization with sparepart management
results = await optimum_oh_model.calculate_cost_all_equipment_with_spareparts(
db_session=db_session,
collector_db_session=collector_db_session,
equipments=equipments,
calculation=calculation_data,
preventive_cost=calculation_data.parameter.overhaul_cost,
simulation_id=simulation_id
)
# Generate sparepart report
# sparepart_report = optimum_oh_model.generate_sparepart_report(results)
# print(sparepart_report)
return results
finally:
await optimum_oh_model._close_session()
async def run_simulation(*, db_session, calculation, token: str, collector_db_session,
time_window_months: Optional[int] = None,
simulation_id: str = "default") -> Dict:
"""
Run complete overhaul optimization simulation
Args:
time_window_months: Analysis window in months (default: 1.5x planned interval)
simulation_id: Simulation ID for failure predictions
"""
# Get equipment and scope data
equipments = await get_standard_scope_by_session_id(
db_session=db_session,
overhaul_session_id=calculation.overhaul_session_id,
collector_db=collector_db_session
)
scope = await get_scope(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id)
prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope)
calculation_data = await get_calculation_data_by_id(
db_session=db_session, calculation_id=calculation.id
)
# Initialize optimization model
optimum_oh_model = OptimumCostModel(
token=token,
last_oh_date=prev_oh_scope.end_date,
next_oh_date=scope.start_date,
time_window_months=60,
base_url=RBD_SERVICE_API
)
try:
# Run fleet optimization and save to database
results = await optimum_oh_model.calculate_cost_all_equipment(
db_session=db_session,
equipments=equipments,
calculation=calculation_data,
preventive_cost=calculation_data.parameter.overhaul_cost,
simulation_id=simulation_id
)
return results
finally:
await optimum_oh_model._close_session()
async def get_corrective_cost_time_chart(
material_cost: float,
service_cost: float,
location_tag: str,
token,
start_date: datetime,
end_date: datetime
) -> Tuple[np.ndarray, np.ndarray]:
days_difference = (end_date - start_date).days
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
tomorrow = today + timedelta(days=1)
# Initialize monthly data dictionary
monthly_data = {}
latest_num = 1
# Handle historical data (any portion before or including today)
historical_start = start_date if start_date <= today else None
historical_end = min(today, end_date)
if historical_start and historical_start <= historical_end:
url_history = f"http://192.168.1.82:8000/reliability/main/failures/{location_tag}/{historical_start.strftime('%Y-%m-%d')}/{historical_end.strftime('%Y-%m-%d')}"
try:
response = requests.get(
url_history,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
history_data = response.json()
# Process historical data - accumulate failures by month
history_dict = {}
monthly_failures = {}
for item in history_data["data"]:
date = datetime.datetime.strptime(item["date"], "%d %b %Y")
month_key = datetime.datetime(date.year, date.month, 1)
# Initialize if first occurrence of this month
if month_key not in history_dict:
history_dict[month_key] = 0
# Accumulate failures for this month
if item["num_fail"] is not None:
history_dict[month_key] += item["num_fail"]
# Sort months chronologically
sorted_months = sorted(history_dict.keys())
if sorted_months:
failures = np.array([history_dict[month] for month in sorted_months])
cum_failure = np.cumsum(failures)
for month_key in sorted_months:
monthly_failures[month_key] = int(cum_failure[sorted_months.index(month_key)])
# Update monthly_data with cumulative historical data
monthly_data.update(monthly_failures)
# Get the latest number for predictions if we have historical data
if failures.size > 0:
latest_num = max(1, failures[-1]) # Use the last month's failures, minimum 1
except Exception as e:
raise Exception(f"Error fetching historical data: {e}")
if location_tag == '3TR-TF005':
raise Exception("tes",monthly_data)
if end_date >= start_date:
url_prediction = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
try:
response = requests.get(
url_prediction,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
prediction_data = response.json()
# Process prediction data - but only use it for future dates
if prediction_data["data"]:
for item in prediction_data["data"]:
date = datetime.strptime(item["date"], "%d %b %Y")
# Only apply prediction data for dates after today
if date > today:
month_key = datetime(date.year, date.month, 1)
monthly_data[month_key] = item["num_fail"] if item["num_fail"] is not None else 0
# Update latest_num with the last prediction if available
last_prediction = prediction_data["data"][-1]["num_fail"]
if last_prediction is not None:
latest_num = max(1, round(last_prediction))
except Exception as e:
print(f"Error fetching prediction data: {e}")
# Fill in any missing months in the range
current_date = datetime(start_date.year, start_date.month, 1)
end_month = datetime(end_date.year, end_date.month, 1)
while current_date <= end_month:
if current_date not in monthly_data:
# Try to find the most recent month with data
prev_months = [m for m in monthly_data.keys() if m < current_date]
if prev_months:
# Use the most recent previous month's data
latest_month = max(prev_months)
monthly_data[current_date] = monthly_data[latest_month]
else:
# If no previous months exist, look for future months
future_months = [m for m in monthly_data.keys() if m > current_date]
if future_months:
# Use the earliest future month's data
earliest_future = min(future_months)
monthly_data[current_date] = monthly_data[earliest_future]
else:
# No data available at all, use default
monthly_data[current_date] = latest_num
# Move to next month
if current_date.month == 12:
current_date = datetime(current_date.year + 1, 1, 1)
else:
current_date = datetime(current_date.year, current_date.month + 1, 1)
# Convert to list maintaining chronological order
complete_data = []
for month in sorted(monthly_data.keys()):
complete_data.append(monthly_data[month])
if latest_num < 1:
raise ValueError("Number of failures cannot be negative", latest_num)
# Convert to numpy array
monthly_failure = np.array(complete_data)
cost_per_failure = (material_cost + service_cost) / latest_num
raise Exception(monthly_data, location_tag)
try:
corrective_costs = monthly_failure * cost_per_failure
except Exception as e:
raise Exception(f"Error calculating corrective costs: {monthly_failure}", location_tag)
return corrective_costs, monthly_failure
def get_overhaul_cost_by_time_chart(
overhaul_cost: float, months_num: int, numEquipments: int, decay_base: float = 1.01
) -> np.ndarray:
if overhaul_cost < 0:
raise ValueError("Overhaul cost cannot be negative")
if months_num <= 0:
raise ValueError("months_num must be positive")
rate = np.arange(1, months_num + 1)
cost_per_equipment = overhaul_cost / numEquipments
# results = cost_per_equipment - ((cost_per_equipment / hours) * rate)
results = cost_per_equipment / rate
return results
async def create_param_and_data(
*,
db_session: DbSession,
calculation_param_in: CalculationTimeConstrainsParametersCreate,
created_by: str,
parameter_id: Optional[UUID] = None,
):
"""Creates a new document."""
if calculation_param_in.ohSessionId is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="overhaul_session_id is required",
)
calculationData = await CalculationData.create_with_param(
db=db_session,
overhaul_session_id=calculation_param_in.ohSessionId,
avg_failure_cost=calculation_param_in.costPerFailure,
overhaul_cost=calculation_param_in.overhaulCost,
created_by=created_by,
params_id=parameter_id,
)
return calculationData
async def get_calculation_result(db_session: DbSession, calculation_id: str):
"""
Get calculation results with improved error handling, performance, and sparepart details
"""
try:
# Get calculation data with equipment results in single query
calculation_query = await db_session.execute(
select(CalculationData)
.options(selectinload(CalculationData.equipment_results))
.where(CalculationData.id == calculation_id)
)
scope_calculation = calculation_query.scalar_one_or_none()
if not scope_calculation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Calculation with id {calculation_id} does not exist.",
)
# Get scope information
scope_overhaul = await get_scope(
db_session=db_session,
overhaul_session_id=scope_calculation.overhaul_session_id
)
if not scope_overhaul:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Overhaul scope for session {scope_calculation.overhaul_session_id} does not exist.",
)
# Get previous overhaul scope for analysis context
prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope_overhaul)
# Validate data integrity
data_num = scope_calculation.max_interval
if data_num <= 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid max_interval in calculation data.",
)
# Filter included equipment for performance
included_equipment = [eq for eq in scope_calculation.equipment_results if eq.is_included]
all_equipment = scope_calculation.equipment_results
# Pre-calculate aggregated statistics
calculation_results = []
fleet_statistics = {
'total_equipment': len(all_equipment),
'included_equipment': len(included_equipment),
'excluded_equipment': len(all_equipment) - len(included_equipment),
'equipment_with_sparepart_constraints': 0,
'total_procurement_items': 0,
'critical_procurement_items': 0
}
# Process each month
for month_index in range(data_num):
month_result = {
"overhaul_cost": 0.0,
"corrective_cost": 0.0,
"procurement_cost": 0.0,
"num_failures": 0.0,
"day": month_index + 1,
"month": month_index + 1, # More intuitive naming
"procurement_details": {},
"sparepart_summary": {
"total_procurement_cost": 0.0,
"equipment_requiring_procurement": 0,
"critical_shortages": 0,
"existing_orders_value": 0.0,
"new_orders_required": 0,
"urgent_procurements": 0
}
}
equipment_requiring_procurement = 0
total_existing_orders_value = 0.0
total_new_orders_value = 0.0
critical_shortages = 0
urgent_procurements = 0
# Process all equipment (included and excluded) for complete procurement picture
for eq in all_equipment:
# Validate array bounds
if month_index >= len(eq.procurement_details):
continue
procurement_detail = eq.procurement_details[month_index]
# Handle equipment with procurement needs
if (procurement_detail and
isinstance(procurement_detail, dict) and
procurement_detail.get("procurement_needed")):
equipment_requiring_procurement += 1
# Extract PR/PO summary if available
pr_po_summary = procurement_detail.get("pr_po_summary", {})
# Aggregate existing orders value
existing_orders_value = pr_po_summary.get("total_existing_value", 0)
total_existing_orders_value += existing_orders_value
# Aggregate new orders value
new_orders_value = pr_po_summary.get("total_new_orders_value", 0)
total_new_orders_value += new_orders_value
# Count critical shortages
critical_missing = procurement_detail.get("critical_missing_parts", 0)
if critical_missing > 0:
critical_shortages += 1
# Count urgent procurements
recommendations = procurement_detail.get("recommendations", [])
urgent_items = [r for r in recommendations if r.get("priority") == "CRITICAL"]
if urgent_items:
urgent_procurements += 1
# Add detailed procurement info for this equipment
month_result["procurement_details"][eq.location_tag] = {
"is_included": eq.is_included,
"location_tag": eq.location_tag,
"details": procurement_detail.get("procurement_needed", []),
"detailed_message": procurement_detail.get("detailed_message", ""),
"pr_po_summary": pr_po_summary,
"recommendations": recommendations,
"sparepart_available": procurement_detail.get("sparepart_available", True),
"can_proceed": procurement_detail.get("can_proceed_with_delays", True),
"critical_missing_parts": critical_missing,
"existing_orders_value": existing_orders_value,
"new_orders_value": new_orders_value
}
# Only include costs from equipment marked as included
if eq.is_included:
# Validate array bounds before accessing
if (month_index < len(eq.corrective_costs) and
month_index < len(eq.overhaul_costs) and
month_index < len(eq.procurement_costs) and
month_index < len(eq.daily_failures)):
month_result["corrective_cost"] += float(eq.corrective_costs[month_index])
month_result["overhaul_cost"] += float(eq.overhaul_costs[month_index])
month_result["procurement_cost"] += float(eq.procurement_costs[month_index])
month_result["num_failures"] += float(eq.daily_failures[month_index])
# Update month sparepart summary
month_result["sparepart_summary"].update({
"total_procurement_cost": month_result["procurement_cost"],
"equipment_requiring_procurement": equipment_requiring_procurement,
"critical_shortages": critical_shortages,
"existing_orders_value": total_existing_orders_value,
"new_orders_required": len([eq for eq in all_equipment
if month_index < len(eq.procurement_details) and
eq.procurement_details[month_index] and
eq.procurement_details[month_index].get("procurement_needed")]),
"urgent_procurements": urgent_procurements
})
# Calculate total cost for this month
month_result["total_cost"] = (month_result["corrective_cost"] +
month_result["overhaul_cost"] +
month_result["procurement_cost"])
calculation_results.append(CalculationResultsRead(**month_result))
# Update fleet statistics
fleet_statistics['equipment_with_sparepart_constraints'] = len([
eq for eq in all_equipment
if any(detail and detail.get("procurement_needed")
for detail in eq.procurement_details if detail)
])
fleet_statistics['total_procurement_items'] = sum([
len(detail.get("procurement_needed", []))
for eq in all_equipment
for detail in eq.procurement_details
if detail and isinstance(detail, dict)
])
# Calculate optimal timing analysis
optimal_analysis = _analyze_optimal_timing(
calculation_results, scope_calculation.optimum_oh_day, prev_oh_scope, scope_overhaul
)
# Return comprehensive result
return CalculationTimeConstrainsRead(
id=scope_calculation.id,
reference=scope_calculation.overhaul_session_id,
scope=scope_overhaul.maintenance_type.name,
results=calculation_results,
optimum_oh=scope_calculation.optimum_oh_day,
optimum_oh_month=scope_calculation.optimum_oh_day + 1, # 1-based month
equipment_results=scope_calculation.equipment_results,
fleet_statistics=fleet_statistics,
optimal_analysis=optimal_analysis,
analysis_metadata={
"max_interval_months": data_num,
"last_overhaul_date": prev_oh_scope.end_date.isoformat() if prev_oh_scope else None,
"next_planned_overhaul": scope_overhaul.start_date.isoformat(),
"calculation_type": "sparepart_optimized" if fleet_statistics['equipment_with_sparepart_constraints'] > 0 else "standard",
"total_equipment_analyzed": len(all_equipment),
"included_in_optimization": len(included_equipment)
}
)
except HTTPException:
# Re-raise HTTP exceptions as-is
raise
except Exception as e:
# Log the error for debugging
import logging
logger = logging.getLogger(__name__)
logger.error(f"Error in get_calculation_result for calculation_id {calculation_id}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal error processing calculation results: {str(e)}",
)
def _analyze_optimal_timing(calculation_results: List, optimum_oh_day: int,
prev_oh_scope, scope_overhaul) -> Dict:
"""Analyze optimal timing and provide recommendations"""
if not calculation_results:
return {}
# Find the result for optimal day
optimal_result = None
if 0 <= optimum_oh_day < len(calculation_results):
optimal_result = calculation_results[optimum_oh_day]
# Calculate planned overhaul timing
planned_oh_months = None
if prev_oh_scope and scope_overhaul:
planned_oh_months = (scope_overhaul.start_date.year - prev_oh_scope.end_date.year) * 12 + \
(scope_overhaul.start_date.month - prev_oh_scope.end_date.month)
# Find minimum cost point
min_cost_result = min(calculation_results, key=lambda x: x.total_cost)
min_cost_month = min_cost_result.month
# Calculate timing recommendation
timing_recommendation = "OPTIMAL"
if planned_oh_months:
if optimum_oh_day + 1 < planned_oh_months:
timing_recommendation = "EARLY"
elif optimum_oh_day + 1 > planned_oh_months:
timing_recommendation = "DELAYED"
else:
timing_recommendation = "ON_SCHEDULE"
# Analyze cost trends
cost_trend = "STABLE"
if len(calculation_results) > 1:
early_costs = [r.total_cost for r in calculation_results[:len(calculation_results)//3]]
late_costs = [r.total_cost for r in calculation_results[-len(calculation_results)//3:]]
avg_early = sum(early_costs) / len(early_costs) if early_costs else 0
avg_late = sum(late_costs) / len(late_costs) if late_costs else 0
if avg_late > avg_early * 1.2:
cost_trend = "INCREASING"
elif avg_late < avg_early * 0.8:
cost_trend = "DECREASING"
return {
"optimal_month": optimum_oh_day + 1,
"planned_month": planned_oh_months,
"timing_recommendation": timing_recommendation,
"optimal_total_cost": optimal_result.total_cost if optimal_result else 0,
"optimal_breakdown": {
"corrective_cost": optimal_result.corrective_cost if optimal_result else 0,
"overhaul_cost": optimal_result.overhaul_cost if optimal_result else 0,
"procurement_cost": optimal_result.procurement_cost if optimal_result else 0,
"num_failures": optimal_result.num_failures if optimal_result else 0
},
"cost_trend": cost_trend,
"months_from_planned": (optimum_oh_day + 1 - planned_oh_months) if planned_oh_months else None,
"cost_savings_vs_planned": None, # Would need planned month cost to calculate
"sparepart_impact": {
"equipment_with_constraints": optimal_result.sparepart_summary["equipment_requiring_procurement"] if optimal_result else 0,
"critical_shortages": optimal_result.sparepart_summary["critical_shortages"] if optimal_result else 0,
"procurement_investment": optimal_result.sparepart_summary["total_procurement_cost"] if optimal_result else 0
}
}
async def get_calculation_data_by_id(
db_session: DbSession, calculation_id
) -> CalculationData:
stmt = (
select(CalculationData)
.filter(CalculationData.id == calculation_id)
.options(
joinedload(CalculationData.equipment_results),
joinedload(CalculationData.parameter),
)
)
result = await db_session.execute(stmt)
return result.unique().scalar()
async def get_calculation_by_assetnum(
*, db_session: DbSession, assetnum: str, calculation_id: str
):
stmt = (
select(CalculationEquipmentResult)
.where(CalculationEquipmentResult.assetnum == assetnum)
.where(CalculationEquipmentResult.calculation_data_id == calculation_id)
)
result = await db_session.execute(stmt)
return result.scalar()
async def get_number_of_failures(location_tag, start_date, end_date, token, max_interval=24):
url_prediction = (
f"http://192.168.1.82:8000/reliability/main/number-of-failures/"
f"{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
)
results = {}
try:
response = requests.get(
url_prediction,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
timeout=10
)
response.raise_for_status()
prediction_data = response.json()
except (requests.RequestException, ValueError) as e:
raise Exception(f"Failed to fetch or parse prediction data: {e}")
if not prediction_data or "data" not in prediction_data or not isinstance(prediction_data["data"], list):
raise Exception("Invalid or empty prediction data format.")
last_data = prediction_data["data"][-1]
last_data_date = datetime.strptime(last_data["date"], "%d %b %Y")
results[datetime.date(last_data_date.year, last_data_date.month, last_data_date.day)] = round(last_data["num_fail"]) if last_data["num_fail"] is not None else 0
# Parse prediction data
for item in prediction_data["data"]:
try:
date = datetime.strptime(item["date"], "%d %b %Y")
last_day = calendar.monthrange(date.year, date.month)[1]
value = item.get("num_fail", 0)
if date.day == last_day:
if date.month == start_date.month and date.year == start_date.year:
results[date.date()] = 0
else:
results[date.date()] = 0 if value <= 0 else int(value)
except (KeyError, ValueError):
continue # skip invalid items
# Fill missing months with 0
current = start_date.replace(day=1)
for _ in range(max_interval):
last_day = calendar.monthrange(current.year, current.month)[1]
last_day_date = datetime.date(current.year, current.month, last_day)
if last_day_date not in results:
results[last_day_date] = 0
# move to next month
if current.month == 12:
current = current.replace(year=current.year + 1, month=1)
else:
current = current.replace(month=current.month + 1)
# Sort results by date
results = dict(sorted(results.items()))
return results
async def get_equipment_foh(
location_tag: str,
token: str
):
url_mdt = (
f"http://192.168.1.82:8000/reliability/asset/mdt/{location_tag}"
)
try:
response = requests.get(
url_mdt,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
timeout=10
)
response.raise_for_status()
result = response.json()
except (requests.RequestException, ValueError) as e:
raise Exception(f"Failed to fetch or parse mdt data: {e}")
mdt_data = result["data"]["hours"]
return mdt_data
# Function to simulate overhaul strategy for a single equipment
def simulate_equipment_overhaul(equipment, preventive_cost,predicted_num_failures, interval_months, forced_outage_hours_value ,total_months=24):
"""
Simulates overhaul strategy for a specific piece of equipment
and returns the associated costs.
"""
total_preventive_cost = 0
total_corrective_cost = 0
months_since_overhaul = 0
failures_by_month = {i: val for i, (date, val) in enumerate(sorted(predicted_num_failures.items()))}
cost_per_failure = equipment.material_cost
# Simulate for the total period
for month in range(total_months):
# If it's time for overhaul
if months_since_overhaul >= interval_months:
# Perform preventive overhaul
total_preventive_cost += preventive_cost
months_since_overhaul = 0
if months_since_overhaul == 0:
# Calculate failures for this month based on time since last overhaul
expected_failures = 0
equivalent_force_derated_hours = 0
failure_cost = (expected_failures * cost_per_failure) + ((forced_outage_hours_value + equivalent_force_derated_hours) * equipment.service_cost)
total_corrective_cost += failure_cost
else:
# Calculate failures for this month based on time since last overhaul
expected_failures = failures_by_month.get(months_since_overhaul, 0)
equivalent_force_derated_hours = 0
failure_cost = (expected_failures * cost_per_failure) + ((forced_outage_hours_value + equivalent_force_derated_hours) * equipment.service_cost)
total_corrective_cost += failure_cost
# Increment time since overhaul
months_since_overhaul += 1
# Calculate costs per month (to normalize for comparison)
monthly_preventive_cost = total_preventive_cost / total_months
monthly_corrective_cost = total_corrective_cost / total_months
monthly_total_cost = monthly_preventive_cost + monthly_corrective_cost
return {
'interval': interval_months,
'preventive_cost': monthly_preventive_cost,
'corrective_cost': monthly_corrective_cost,
'total_cost': monthly_total_cost
}
async def create_calculation_result_service(
db_session: DbSession, calculation: CalculationData, token: str
) -> CalculationTimeConstrainsRead:
# Get all equipment for this calculation session
equipments = await get_all_by_session_id(
db_session=db_session, overhaul_session_id=calculation.overhaul_session_id
)
scope = await get_scope(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id)
prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope)
calculation_data = await get_calculation_data_by_id(
db_session=db_session, calculation_id=calculation.id
)
# Set the date range for the calculation
if prev_oh_scope:
# Start date is the day after the previous scope's end date
start_date = datetime.combine(prev_oh_scope.end_date + datetime.timedelta(days=1), datetime.time.min)
# End date is the start date of the current scope
end_date = datetime.combine(scope.start_date, datetime.time.min)
else:
# If there's no previous scope, use the start and end dates from the current scope
start_date = datetime.combine(scope.start_date, datetime.time.min)
end_date = datetime.combine(scope.end_date, datetime.time.min)
max_interval = get_months_between(start_date, end_date)
overhaul_cost = calculation_data.parameter.overhaul_cost / len(equipments)
# Store results for each equipment
results = []
total_corrective_costs = np.zeros(max_interval)
total_overhaul_costs = np.zeros(max_interval)
total_daily_failures = np.zeros(max_interval)
total_costs = np.zeros(max_interval)
# Calculate for each equipment
for eq in equipments:
equipment_results = []
corrective_costs = []
overhaul_costs = []
total = []
predicted_num_failures = await get_number_of_failures(
location_tag=eq.location_tag,
start_date=start_date,
end_date=end_date,
token=token
)
foh_value = await get_equipment_foh(
location_tag=eq.location_tag,
token=token
)
for interval in range(1, max_interval+1):
result = simulate_equipment_overhaul(eq, overhaul_cost, predicted_num_failures, interval, foh_value, total_months=max_interval)
corrective_costs.append(result['corrective_cost'])
overhaul_costs.append(result['preventive_cost'])
total.append(result['total_cost'])
equipment_results.append(result)
optimal_result = min(equipment_results, key=lambda x: x['total_cost'])
results.append(
CalculationEquipmentResult(
corrective_costs=corrective_costs,
overhaul_costs=overhaul_costs,
daily_failures=[failure for _, failure in predicted_num_failures.items()],
assetnum=eq.assetnum,
material_cost=eq.material_cost,
service_cost=eq.service_cost,
optimum_day=optimal_result['interval'],
calculation_data_id=calculation.id,
master_equipment=eq.master_equipment,
)
)
if len(predicted_num_failures.values()) < max_interval:
raise Exception(eq.equipment.assetnum)
total_corrective_costs += np.array(corrective_costs)
total_overhaul_costs += np.array(overhaul_costs)
total_daily_failures += np.array([failure for _, failure in predicted_num_failures.items()])
total_costs += np.array(total_costs)
db_session.add_all(results)
total_costs_point = total_corrective_costs + total_overhaul_costs
# Calculate optimum points using total costs
optimum_oh_index = np.argmin(total_costs_point)
numbers_of_failure = sum(total_daily_failures[:optimum_oh_index])
optimum = OptimumResult(
overhaul_cost=float(total_overhaul_costs[optimum_oh_index]),
corrective_cost=float(total_corrective_costs[optimum_oh_index]),
num_failures=int(numbers_of_failure),
days=int(optimum_oh_index + 1),
)
calculation.optimum_oh_day = optimum.days
await db_session.commit()
# Return results including individual equipment data
return CalculationTimeConstrainsRead(
id=calculation.id,
reference=calculation.overhaul_session_id,
scope=scope.type,
results=[],
optimum_oh=optimum,
equipment_results=results,
)
async def get_calculation_by_reference_and_parameter(
*, db_session: DbSession, calculation_reference_id, parameter_id
):
stmt = select(CalculationData).filter(
and_(
CalculationData.reference_id == calculation_reference_id,
CalculationData.parameter_id == parameter_id,
)
)
result = await db_session.execute(stmt)
return result.scalar()
async def get_calculation_result_by_day(
*, db_session: DbSession, calculation_id, simulation_day
):
stmt = select(CalculationResult).filter(
and_(
CalculationResult.day == simulation_day,
CalculationResult.calculation_data_id == calculation_id,
)
)
result = await db_session.execute(stmt)
return result.scalar()
async def get_avg_cost_by_asset(*, db_session: DbSession, assetnum: str):
stmt = select(func.avg(MasterWorkOrder.total_cost_max).label("average_cost")).where(
MasterWorkOrder.assetnum == assetnum
)
result = await db_session.execute(stmt)
return result.scalar_one_or_none()
async def bulk_update_equipment(
*,
db: DbSession,
selected_equipments: List[CalculationSelectedEquipmentUpdate],
calculation_data_id: UUID,
):
# Create a dictionary mapping assetnum to is_included status
case_mappings = {asset.location_tag: asset.is_included for asset in selected_equipments}
# Get all assetnums that need to be updated
location_tags = list(case_mappings.keys())
# Create a list of when clauses for the case statement
when_clauses = [
(CalculationEquipmentResult.location_tag == location_tag, is_included)
for location_tag, is_included in case_mappings.items()
]
# Build the update statement
stmt = (
update(CalculationEquipmentResult)
.where(CalculationEquipmentResult.calculation_data_id == calculation_data_id)
.where(CalculationEquipmentResult.location_tag.in_(location_tags))
.values(
{
"is_included": case(
*when_clauses
) # Unpack the when clauses as separate arguments
}
)
)
await db.execute(stmt)
await db.commit()
return location_tags