You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2148 lines
82 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import datetime
from typing import Coroutine, List, Optional, Tuple,Dict
from uuid import UUID
import calendar
from src.config import REALIBILITY_SERVICE_API
import numpy as np
import requests
from fastapi import HTTPException, status
from sqlalchemy import and_, case, func, select, update
from sqlalchemy.orm import joinedload
from src.database.core import DbSession
from src.overhaul_activity.service import get_all as get_all_by_session_id
from src.overhaul_scope.service import get as get_scope, get_prev_oh
from src.utils import get_latest_numOfFail
from src.workorder.model import MasterWorkOrder
from src.sparepart.model import MasterSparePart
from src.overhaul_activity.model import OverhaulActivity
from .model import (CalculationData, CalculationEquipmentResult,
CalculationResult)
from .schema import (CalculationResultsRead,
CalculationSelectedEquipmentUpdate,
CalculationTimeConstrainsParametersCreate,
CalculationTimeConstrainsRead, OptimumResult)
from .utils import get_months_between
from src.equipment_sparepart.model import ScopeEquipmentPart
import copy
import random
import math
from src.overhaul_activity.service import get_standard_scope_by_session_id
from collections import defaultdict
from datetime import timedelta
import pandas as pd
import logging
import aiohttp
from datetime import datetime, date
import asyncio
import json
from src.utils import save_to_pastebin
# class ReliabilityService:
# """Service class for handling reliability API calls"""
# def __init__(self, base_url: str = "http://192.168.1.82:8000", use_dummy_data=False):
# self.base_url = base_url
# self.use_dummy_data = use_dummy_data
# async def get_number_of_failures(self, location_tag, start_date, end_date, token, max_interval=24):
# if self.use_dummy_data:
# return self._generate_dummy_failure_data(location_tag, start_date, end_date, max_interval)
# url_prediction = (
# f"{self.base_url}/main/number-of-failures/"
# f"{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
# )
# results = {}
# try:
# response = requests.get(
# url_prediction,
# headers={
# "Content-Type": "application/json",
# "Authorization": f"Bearer {token}",
# },
# timeout=10
# )
# response.raise_for_status()
# prediction_data = response.json()
# except (requests.RequestException, ValueError) as e:
# raise Exception(f"Failed to fetch or parse prediction data: {e}")
# if not prediction_data or "data" not in prediction_data or not isinstance(prediction_data["data"], list):
# raise Exception("Invalid or empty prediction data format.")
# # Since data is cumulative, we need to preserve the decimal values
# last_cumulative_value = 0
# # Parse prediction data and preserve cumulative nature
# for item in prediction_data["data"]:
# try:
# date = datetime.datetime.strptime(item["date"], "%d %b %Y")
# last_day = calendar.monthrange(date.year, date.month)[1]
# value = item.get("num_fail", 0)
# if date.day == last_day: # End of month
# if value is not None and value > 0:
# # PRESERVE the decimal values - don't convert to int!
# results[date.date()] = round(float(value), 3) # Keep 3 decimal places
# last_cumulative_value = float(value)
# else:
# # If no value, use previous cumulative value
# results[date.date()] = last_cumulative_value
# except (KeyError, ValueError):
# continue
# # Fill missing months by continuing the cumulative trend
# current = start_date.replace(day=1)
# for _ in range(max_interval):
# last_day = calendar.monthrange(current.year, current.month)[1]
# last_day_date = datetime.date(current.year, current.month, last_day)
# if last_day_date not in results:
# # Since it's cumulative, add a small increment to continue the trend
# # You can adjust this increment based on your typical monthly increase
# monthly_increment = 0.05 # Adjust this value based on your data pattern
# last_cumulative_value += monthly_increment
# results[last_day_date] = round(last_cumulative_value, 3)
# else:
# # Update our tracking value
# last_cumulative_value = results[last_day_date]
# # Move to next month
# if current.month == 12:
# current = current.replace(year=current.year + 1, month=1)
# else:
# current = current.replace(month=current.month + 1)
# # Sort results by date
# results = dict(sorted(results.items()))
# return results
# def _generate_dummy_failure_data(self, location_tag: str, start_date: datetime.date, end_date: datetime.date, max_interval: int = 24) -> Dict[datetime.date, float]:
# """
# Generate realistic dummy failure prediction data for demonstration purposes.
# Creates a realistic pattern with seasonal variations and some randomness.
# """
# results = {}
# # Set seed based on location_tag for consistent results per location
# random.seed(hash(location_tag) % 1000)
# # Base parameters for realistic failure patterns
# base_monthly_failures = random.uniform(0.5, 1.25) # Base failures per month
# seasonal_amplitude = random.uniform(0.3, 0.8) # Seasonal variation strength
# trend_slope = random.uniform(-0.01, 0.02) # Long-term trend (slight increase over time)
# noise_level = random.uniform(0.1, 0.3) # Random variation
# # Determine equipment factor random between 1.0 and 1.9
# equipment_factor = random.uniform(1.0, 1.9)
# current = start_date.replace(day=1)
# cumulative_failures = 0
# month_count = 0
# for _ in range(max_interval):
# #If month count == 0, set cumulative_failures to 0
# last_day = calendar.monthrange(current.year, current.month)[1]
# last_day_date = datetime.datetime(current.year, current.month, last_day)
# # Stop if we've passed the end_date
# if last_day_date > end_date:
# break
# # Calculate seasonal factor (higher in summer/winter, lower in spring/fall)
# seasonal_factor = 1 + seasonal_amplitude * math.sin(2 * math.pi * current.month / 12)
# # Calculate trend factor (gradual increase over time)
# trend_factor = 1 + trend_slope * month_count
# # Calculate noise (random variation)
# noise_factor = 1 + random.uniform(-noise_level, noise_level)
# # Calculate monthly failures (non-cumulative)
# monthly_failures = (base_monthly_failures *
# equipment_factor *
# seasonal_factor *
# trend_factor *
# noise_factor)
# # Ensure minimum realistic value
# monthly_failures = max(0.1, monthly_failures)
# # Add to cumulative total
# cumulative_failures += monthly_failures
# # Store cumulative value rounded to 3 decimal places
# results[last_day_date] = round(cumulative_failures, 3) if month_count > 0 else 0
# # Move to next month
# month_count += 1
# if current.month == 12:
# current = current.replace(year=current.year + 1, month=1)
# else:
# current = current.replace(month=current.month + 1)
# return dict(sorted(results.items()))
# async def get_equipment_foh(self, location_tag: str, token: str) -> float:
# """
# Get forced outage hours for equipment
# """
# url = f"{self.base_url}/asset/mdt/{location_tag}"
# headers = {
# "Content-Type": "application/json",
# "Authorization": f"Bearer {token}",
# }
# try:
# response = requests.get(url, headers=headers, timeout=10)
# response.raise_for_status()
# result = response.json()
# return result["data"]["hours"]
# except (requests.RequestException, ValueError) as e:
# raise Exception(f"Failed to fetch FOH data for {location_tag}: {e}")
# def _parse_failure_predictions(
# self,
# prediction_data: List[dict],
# start_date: datetime.date,
# max_interval: int
# ) -> Dict[datetime.date, int]:
# """
# Parse and normalize failure prediction data
# """
# results = {}
# # Parse prediction data
# for item in prediction_data:
# try:
# date = datetime.datetime.strptime(item["date"], "%d %b %Y").date()
# last_day = calendar.monthrange(date.year, date.month)[1]
# value = item.get("num_fail", 0)
# if date.day == last_day:
# if date.month == start_date.month and date.year == start_date.year:
# results[date] = 0
# else:
# results[date] = max(0, int(value)) if value is not None else 0
# except (KeyError, ValueError):
# continue
# # Fill missing months with 0
# current = start_date.replace(day=1)
# for _ in range(max_interval):
# last_day = calendar.monthrange(current.year, current.month)[1]
# last_day_date = datetime.date(current.year, current.month, last_day)
# if last_day_date not in results:
# results[last_day_date] = 0
# # Move to next month
# if current.month == 12:
# current = current.replace(year=current.year + 1, month=1)
# else:
# current = current.replace(month=current.month + 1)
# return dict(sorted(results.items()))
# class SparePartsService:
# """Service class for spare parts management and procurement calculations"""
# def __init__(self, spare_parts_db: dict):
# self.spare_parts_db = spare_parts_db
# def calculate_stock_at_date(self, sparepart_id: UUID, target_date: datetime.date):
# """
# Calculate projected stock for a spare part at a specific date
# """
# if sparepart_id not in self.spare_parts_db:
# return 0
# spare_part = self.spare_parts_db[sparepart_id]
# projected_stock = spare_part["stock"]
# # Add all procurements that arrive by target_date
# for procurement in spare_part["data"].sparepart_procurements:
# eta_date = getattr(procurement, procurement.status, None)
# if eta_date and eta_date <= target_date:
# projected_stock += procurement.quantity
# return projected_stock
# async def reduce_stock(self, db_session, location_tag: str):
# requirements_query = select(ScopeEquipmentPart).where(
# ScopeEquipmentPart.location_tag == location_tag
# )
# requirements = await db_session.execute(requirements_query)
# requirements = requirements.scalars().all()
# for requirement in requirements:
# sparepart_id = requirement.sparepart_id
# quantity_needed = requirement.required_stock
# if sparepart_id in self.spare_parts_db:
# self.spare_parts_db[sparepart_id]["stock"] -= quantity_needed
# async def check_spare_parts_availability(
# self,
# db_session: DbSession,
# equipment: OverhaulActivity,
# overhaul_date: datetime.date
# ) -> Tuple[bool, List[dict]]:
# """
# Check if spare parts are available for equipment overhaul at specific date.
# If not available, calculate procurement costs needed.
# """
# procurement_costs = []
# all_available = True
# requirements_query = select(ScopeEquipmentPart).where(
# ScopeEquipmentPart.location_tag == equipment.location_tag
# )
# requirements = await db_session.execute(requirements_query)
# requirements = requirements.scalars().all()
# for requirement in requirements:
# sparepart_id = requirement.sparepart_id
# quantity_needed = requirement.required_stock
# if sparepart_id not in self.spare_parts_db:
# raise Exception(f"Spare part {sparepart_id} not found in database")
# spare_part = self.spare_parts_db[sparepart_id]
# spare_part_data = spare_part["data"]
# available_stock = self.calculate_stock_at_date(sparepart_id, overhaul_date)
# if available_stock < quantity_needed:
# # Need to procure additional stock
# shortage = quantity_needed - available_stock
# procurement_cost = {
# "sparepart_id": str(sparepart_id),
# "sparepart_name": spare_part_data.name,
# "quantity": shortage,
# "cost_per_unit": spare_part_data.cost_per_stock,
# "total_cost": shortage * spare_part_data.cost_per_stock,
# "description": f"Insufficient projected stock for {spare_part_data.name} on {overhaul_date} (need: {quantity_needed}, projected: {available_stock})"
# }
# procurement_costs.append(procurement_cost)
# all_available = False
# return all_available, procurement_costs
# class OverhaulCalculator:
# """Main calculator for overhaul cost optimization"""
# def __init__(
# self,
# reliability_service: ReliabilityService,
# spare_parts_service: SparePartsService
# ):
# self.reliability_service = reliability_service
# self.spare_parts_service = spare_parts_service
# async def simulate_equipment_overhaul(
# self,
# db_session: DbSession,
# equipment,
# preventive_cost: float,
# predicted_failures: Dict[datetime.date, int],
# interval_months: int,
# forced_outage_hours: float,
# start_date: datetime.date,
# total_months: int = 24
# ):
# """
# Simulate overhaul strategy for specific equipment including spare parts costs
# """
# total_preventive_cost = 0
# total_corrective_cost = 0
# total_procurement_cost = 0
# all_procurement_details = []
# months_since_overhaul = 0
# # Convert failures dict to month-indexed dict
# failures_by_month = {
# i: val for i, (date, val) in enumerate(sorted(predicted_failures.items()))
# }
# cost_per_failure = equipment.material_cost
# # Simulate for the total period
# for month in range(total_months):
# # Calculate current date
# current_date = self._add_months_to_date(start_date, month)
# # Check if it's time for overhaul
# if months_since_overhaul >= interval_months:
# # Perform preventive overhaul
# total_preventive_cost += preventive_cost
# months_since_overhaul = 0
# # Calculate corrective costs
# if months_since_overhaul == 0:
# expected_failures = 0 # No failures immediately after overhaul
# else:
# expected_failures = round(failures_by_month.get(months_since_overhaul, 0))
# equivalent_force_derated_hours = 0 # Can be enhanced based on requirements
# failure_cost = (
# (expected_failures * cost_per_failure) +
# ((forced_outage_hours + equivalent_force_derated_hours) * equipment.service_cost)
# )
# total_corrective_cost += failure_cost
# months_since_overhaul += 1
# overhaul_target_date = self._add_months_to_date(start_date, interval_months)
# # Check spare parts availability and calculate procurement costs
# parts_available, procurement_costs = await self.spare_parts_service.check_spare_parts_availability(
# db_session,
# equipment,
# overhaul_target_date
# )
# # Add procurement costs if parts are not available
# if not parts_available:
# month_procurement_cost = sum(pc["total_cost"] for pc in procurement_costs)
# total_procurement_cost += month_procurement_cost
# all_procurement_details.extend(procurement_costs)
# # Calculate monthly averages
# monthly_preventive_cost = total_preventive_cost / total_months
# monthly_corrective_cost = total_corrective_cost / total_months
# monthly_total_cost = monthly_preventive_cost + monthly_corrective_cost + total_procurement_cost
# return {
# "interval_months":interval_months,
# "preventive_cost":monthly_preventive_cost,
# "corrective_cost":monthly_corrective_cost,
# "procurement_cost":total_procurement_cost,
# "total_cost":monthly_total_cost,
# "procurement_details":all_procurement_details
# }
# async def find_optimal_overhaul_interval(
# self,
# db_session: DbSession,
# equipment,
# preventive_cost: float,
# predicted_failures: Dict[datetime.date, int],
# forced_outage_hours: float,
# start_date: datetime.date,
# max_interval: int = 24
# ):
# """
# Find optimal overhaul interval by testing different intervals
# """
# all_results = []
# for interval in range(1, max_interval + 1):
# result = await self.simulate_equipment_overhaul(
# db_session=db_session,
# equipment=equipment,
# preventive_cost=preventive_cost,
# predicted_failures=predicted_failures,
# interval_months=interval,
# forced_outage_hours=forced_outage_hours,
# start_date=start_date,
# total_months=max_interval
# )
# all_results.append(result)
# # Find optimal result (minimum total cost)
# optimal_result = min(all_results, key=lambda x: x["total_cost"])
# return optimal_result, all_results
# async def calculate_fleet_optimization(
# self,
# db_session: DbSession,
# equipments: list,
# overhaul_cost: float,
# start_date: datetime.date,
# end_date: datetime.date,
# calculation,
# token: str
# ) -> Dict:
# """
# Calculate optimization for entire fleet of equipment
# """
# max_interval = self._get_months_between(start_date, end_date)
# preventive_cost_per_equipment = overhaul_cost / len(equipments)
# fleet_results = []
# total_corrective_costs = np.zeros(max_interval)
# total_preventive_costs = np.zeros(max_interval)
# total_procurement_costs = np.zeros(max_interval)
# total_failures = np.zeros(max_interval)
# for equipment in equipments:
# # Get reliability data
# predicted_failures = await self.reliability_service.get_number_of_failures(
# location_tag=equipment.equipment.location_tag,
# start_date=start_date,
# end_date=end_date,
# token=token,
# max_interval=max_interval
# )
# forced_outage_hours = await self.reliability_service.get_equipment_foh(
# location_tag=equipment.equipment.location_tag,
# token=token
# )
# # Find optimal interval for this equipment
# optimal_result, all_results = await self.find_optimal_overhaul_interval(
# db_session=db_session,
# equipment=equipment,
# preventive_cost=preventive_cost_per_equipment,
# predicted_failures=predicted_failures,
# forced_outage_hours=forced_outage_hours,
# start_date=start_date,
# max_interval=max_interval
# )
# #reduce sparepart stock
# await self.spare_parts_service.reduce_stock(db_session, equipment.location_tag)
# # Aggregate costs
# corrective_costs = [r["corrective_cost"] for r in all_results]
# preventive_costs = [r["preventive_cost"] for r in all_results]
# procurement_costs = [r["procurement_cost"] for r in all_results]
# procurement_details = [r["procurement_details"] for r in all_results]
# failures = [round(r) for r in predicted_failures.values()]
# fleet_results.append(
# CalculationEquipmentResult(
# corrective_costs=corrective_costs,
# overhaul_costs=preventive_costs,
# procurement_costs=procurement_costs,
# daily_failures=failures,
# assetnum=equipment.assetnum,
# material_cost=equipment.material_cost,
# service_cost=equipment.service_cost,
# optimum_day=optimal_result["interval_months"],
# calculation_data_id=calculation.id,
# master_equipment=equipment.equipment,
# procurement_details=procurement_details
# )
# )
# total_corrective_costs += np.array(corrective_costs)
# total_preventive_costs += np.array(preventive_costs)
# total_procurement_costs += np.array(procurement_costs)
# # Calculate fleet optimal interval
# total_costs = total_corrective_costs + total_preventive_costs + total_procurement_costs
# fleet_optimal_index = np.argmin(total_costs)
# calculation.optimum_oh_day =fleet_optimal_index + 1
# db_session.add_all(fleet_results)
# await db_session.commit()
# return {
# 'id': calculation.id,
# 'fleet_results': fleet_results,
# 'fleet_optimal_interval': fleet_optimal_index + 1,
# 'fleet_optimal_cost': total_costs[fleet_optimal_index],
# 'total_corrective_costs': total_corrective_costs.tolist(),
# 'total_preventive_costs': total_preventive_costs.tolist(),
# 'total_procurement_costs': total_procurement_costs.tolist(),
# }
# def _add_months_to_date(self, start_date: datetime.date, months: int) -> datetime.date:
# """Helper method to add months to a date"""
# year = start_date.year
# month = start_date.month + months
# while month > 12:
# year += 1
# month -= 12
# return datetime.date(year, month, start_date.day)
# def _get_months_between(self, start_date: datetime.date, end_date: datetime.date) -> int:
# """Calculate number of months between two dates"""
# return (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month)
# class OptimumCostModel:
# def __init__(self, token, last_oh_date, next_oh_date,interval =30, base_url: str = "http://192.168.1.82:8000"):
# self.api_base_url = base_url
# self.token = token
# self.last_oh_date = last_oh_date
# self.next_oh_date = next_oh_date
# self.interval=30
# def _get_months_between(self, start_date: datetime.date, end_date: datetime.date) -> int:
# """Calculate number of months between two dates"""
# return (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month)
# def _get_reliability_from_api(self, target_date, location_tag):
# """
# Get reliability value from API for a specific date.
# Parameters:
# target_date: datetime object for the target date
# Returns:
# reliability value (float)
# """
# # Format date for API call
# date_str = target_date.strftime('%Y-%m-%d %H:%M:%S.%f')
# # Construct API URL
# url = f"{self.api_base_url}/calculate/reliability/{location_tag}/{date_str}"
# header = {
# 'Content-Type': 'application/json',
# 'Authorization': 'Bearer ' + self.token
# }
# try:
# response = requests.get(url, headers=header)
# response.raise_for_status()
# data = response.json()
# reliability = data['data']['value']
# return reliability
# except requests.RequestException as e:
# print(f"API Error: {e}")
# return None
# except KeyError as e:
# print(f"Data parsing error: {e}")
# return None
# def _get_reliability_equipment(self, location_tag):
# current_date = self.last_oh_date
# results = defaultdict()
# print("Fetching reliability data from API...")
# while current_date <= self.next_oh_date:
# reliability = self._get_reliability_from_api(current_date, location_tag)
# results[current_date] = reliability
# if reliability is not None:
# print(f"Date: {current_date.strftime('%Y-%m-%d')}, Reliability: {reliability:.6f}")
# else:
# print(f"Date: {current_date.strftime('%Y-%m-%d')}, Reliability: Failed to fetch")
# current_date += timedelta(days=31)
# return results
# def _calculate_costs_each_point(self, reliabilities, preventive_cost: float, failure_replacement_cost: float):
# # Calculate costs for each time point
# results = []
# dates = list(reliabilities.keys())
# reliabilities_list = list(reliabilities.values())
# for i, (date, reliability) in enumerate(reliabilities.items()):
# if reliability is None:
# continue
# # Calculate time from last OH in days
# time_from_last_oh = (date - self.last_oh_date).days
# # Calculate failure replacement cost
# failure_prob = 1 - reliability
# # For expected operating time, we need to integrate R(t) from 0 to T
# # Since we have discrete points, we'll approximate using trapezoidal rule
# if i == 0:
# expected_operating_time = time_from_last_oh # First point
# else:
# # Approximate integral using available reliability values
# time_points = [(dates[j] - self.last_oh_date).days for j in range(i+1)]
# rel_values = [rel for rel in reliabilities_list[:i+1] if rel is not None]
# if len(rel_values) > 1:
# expected_operating_time = np.trapezoid(rel_values, time_points)
# else:
# expected_operating_time = time_from_last_oh * reliability
# # Calculate costs
# if expected_operating_time > 0:
# failure_cost = (failure_prob * failure_replacement_cost) / expected_operating_time
# preventive_cost = (reliability * preventive_cost) / expected_operating_time
# else:
# # failure_cost = failure_prob * self.IDRu
# # preventive_cost = reliability * self.IDRp
# continue
# total_cost = failure_cost + preventive_cost
# results.append({
# 'date': date,
# 'days_from_last_oh': time_from_last_oh,
# 'reliability': reliability,
# 'failure_probability': failure_prob,
# 'expected_operating_time': expected_operating_time,
# 'failure_replacement_cost': failure_cost,
# 'preventive_replacement_cost': preventive_cost,
# 'total_cost': total_cost,
# 'procurement_cost': 0,
# 'procurement_details': []
# })
# return results
# def _find_optimal_timing(self, results_df):
# """
# Find the timing that gives the lowest total cost.
# Parameters:
# results_df: DataFrame from calculate_costs_over_period
# Returns:
# Dictionary with optimal timing information
# """
# if results_df.empty:
# return None
# # Find minimum total cost
# min_cost_idx = results_df['total_cost'].idxmin()
# optimal_row = results_df.loc[min_cost_idx]
# return {
# 'optimal_index': min_cost_idx,
# 'optimal_date': optimal_row['date'],
# 'days_from_last_oh': optimal_row['days_from_last_oh'],
# 'reliability': optimal_row['reliability'],
# 'failure_cost': optimal_row['failure_replacement_cost'],
# 'preventive_cost': optimal_row['preventive_replacement_cost'],
# 'total_cost': optimal_row['total_cost'],
# 'procurement_cost': 0
# }
# async def calculate_cost_all_equipment(
# self,
# db_session: DbSession,
# equipments: list,
# preventive_cost: float,
# calculation,
# ) :
# """
# Calculate optimization for entire fleet of equipment
# """
# max_interval = self._get_months_between(self.last_oh_date, self.next_oh_date)
# preventive_cost_per_equipment = preventive_cost / len(equipments)
# fleet_results = []
# total_corrective_costs = np.zeros(max_interval)
# total_preventive_costs = np.zeros(max_interval)
# total_procurement_costs = np.zeros(max_interval)
# total_failures = np.zeros(max_interval)
# for equipment in equipments:
# # Get reliability data
# cost_per_failure = equipment.material_cost
# reliabilities = self._get_reliability_equipment(
# location_tag=equipment.location_tag,
# )
# predicted_cost = self._calculate_costs_each_point(reliabilities=reliabilities, preventive_cost=preventive_cost_per_equipment, failure_replacement_cost=cost_per_failure)
# optimum_cost = self._find_optimal_timing(pd.DataFrame(predicted_cost))
# # Aggregate costs
# corrective_costs = [r["failure_replacement_cost"] for r in predicted_cost]
# preventive_costs = [r["preventive_replacement_cost"] for r in predicted_cost]
# procurement_costs = [r["procurement_cost"] for r in predicted_cost]
# procurement_details = [r["procurement_details"] for r in predicted_cost]
# failures = [(1-r["reliability"]) for r in predicted_cost]
# fleet_results.append(
# CalculationEquipmentResult(
# corrective_costs=corrective_costs,
# overhaul_costs=preventive_costs,
# procurement_costs=procurement_costs,
# daily_failures=failures,
# location_tag=equipment.location_tag,
# material_cost=equipment.material_cost,
# service_cost=equipment.service_cost,
# optimum_day=optimum_cost['optimal_index'],
# calculation_data_id=calculation.id,
# procurement_details=procurement_details
# )
# )
# total_corrective_costs += np.array(corrective_costs)
# total_preventive_costs += np.array(preventive_costs)
# total_procurement_costs += np.array(procurement_costs)
# # Calculate fleet optimal interval
# total_costs = total_corrective_costs + total_preventive_costs + total_procurement_costs
# fleet_optimal_index = np.argmin(total_costs)
# calculation.optimum_oh_day =fleet_optimal_index + 1
# db_session.add_all(fleet_results)
# await db_session.commit()
# return {
# 'id': calculation.id,
# 'fleet_results': fleet_results,
# 'fleet_optimal_interval': fleet_optimal_index + 1,
# 'fleet_optimal_cost': total_costs[fleet_optimal_index],
# 'total_corrective_costs': total_corrective_costs.tolist(),
# 'total_preventive_costs': total_preventive_costs.tolist(),
# 'total_procurement_costs': total_procurement_costs.tolist(),
# }
class OptimumCostModel:
def __init__(self, token, last_oh_date, next_oh_date, interval=30, base_url: str = "http://192.168.1.82:8000"):
self.api_base_url = base_url
self.token = token
self.last_oh_date = last_oh_date
self.next_oh_date = next_oh_date
self.interval = interval
self.session = None
# Pre-calculate date range for reuse
self.date_range = self._generate_date_range()
# Setup logging for debugging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def _generate_date_range(self) -> List[datetime]:
"""Pre-generate the date range to avoid repeated calculations"""
dates = []
current_date = self.last_oh_date
while current_date <= self.next_oh_date:
dates.append(current_date)
current_date += timedelta(days=31)
return dates
def _get_months_between(self, start_date: date, end_date: date) -> int:
"""Calculate number of months between two dates"""
return (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month)
async def _create_session(self):
"""Create aiohttp session with connection pooling"""
if self.session is None:
timeout = aiohttp.ClientTimeout()
connector = aiohttp.TCPConnector(
limit=500, # Total connection pool size
limit_per_host=200, # Max connections per host
ttl_dns_cache=00, # DNS cache TTL
use_dns_cache=True,
force_close=False,
enable_cleanup_closed=True
)
self.session = aiohttp.ClientSession(
timeout=timeout,
connector=connector,
headers={'Authorization': f'Bearer {self.token}'}
)
async def _close_session(self):
"""Close aiohttp session"""
if self.session:
await self.session.close()
self.session = None
async def _get_reliability_from_api_async(
self,
target_date: datetime,
location_tag: str,
max_retries: int = 3,
retry_delay: float = 1.0
) -> Optional[float]:
date_str = target_date.strftime('%Y-%m-%d %H:%M:%S.%f')
url = f"{self.api_base_url}/calculate/reliability/{location_tag}/{date_str}"
for attempt in range(max_retries + 1):
try:
async with self.session.get(url) as response:
if response.status == 200:
data = await response.json()
return data['data']['value']
else:
# Server error - may be worth retrying
print(f"Server error {response.status} for {location_tag} on {date_str}")
if attempt < max_retries:
await asyncio.sleep(retry_delay * (2 ** attempt))
continue
except aiohttp.ClientError as e:
print(f"Network error for {location_tag} on {date_str} (attempt {attempt + 1}): {e}")
if attempt < max_retries:
await asyncio.sleep(retry_delay * (2 ** attempt))
continue
return None
except Exception as e:
print(f"Unexpected error for {location_tag} on {date_str}: {e}")
return None
return None
"""
Async version of reliability API call with retry logic
Args:
target_date: Date to query
location_tag: Location identifier
max_retries: Maximum number of retry attempts
retry_delay: Initial delay between retries (exponential backoff)
Returns:
Reliability value or None if failed
"""
date_str = target_date.strftime('%Y-%m-%d %H:%M:%S.%f')
url = f"{self.api_base_url}/calculate/reliability/{location_tag}/{date_str}"
for attempt in range(max_retries + 1):
try:
async with self.session.get(url) as response:
if response.status == 200:
data = await response.json()
return data['data']['value']
elif response.status >= 500:
# Server error - may be worth retrying
error_msg = f"Server error {response.status} for {location_tag} on {date_str}"
if attempt < max_retries:
await asyncio.sleep(retry_delay * (2 ** attempt))
continue
else:
# Client error - no point retrying
print(f"API Error for {location_tag} on {date_str}: Status {response.status}")
return None
except aiohttp.ClientError as e:
print(f"Network error for {location_tag} on {date_str} (attempt {attempt + 1}): {e}")
if attempt < max_retries:
await asyncio.sleep(retry_delay * (2 ** attempt))
continue
return None
except Exception as e:
print(f"Unexpected error for {location_tag} on {date_str}: {e}")
return None
return None
async def _get_reliability_equipment_batch(
self,
location_tags: List[str],
batch_size: int = 100,
max_retries: int = 3,
rate_limit_delay: float = 0.1
):
await self._create_session()
# Prepare all tasks with retry wrapper
tasks = []
task_mapping = []
for location_tag in location_tags:
for target_date in self.date_range:
# Create a task with retry logic
task = self._execute_with_retry(
target_date,
location_tag,
max_retries=max_retries
)
tasks.append(task)
task_mapping.append((location_tag, target_date))
print(f"Executing {len(tasks)} API calls in batches of {batch_size}...")
# Process with controlled concurrency
semaphore = asyncio.Semaphore(batch_size)
results = defaultdict(dict)
completed = 0
total_batches = (len(tasks) + batch_size - 1) // batch_size
async def process_task(task, location_tag, target_date):
nonlocal completed
async with semaphore:
try:
reliability = await task
results[location_tag][target_date] = reliability
except Exception as e:
results[location_tag][target_date] = None
print(f"Failed for {location_tag} on {target_date}: {str(e)}")
finally:
completed += 1
if completed % 100 == 0:
print(f"Progress: {completed}/{len(tasks)} ({completed/len(tasks):.1%})")
# Process all tasks with rate limiting
batch_tasks = []
for i, task in enumerate(tasks):
batch_tasks.append(process_task(task, *task_mapping[i]))
if i > 0 and i % batch_size == 0:
await asyncio.sleep(rate_limit_delay) # Gentle rate limiting
await asyncio.gather(*batch_tasks)
results_serializable = {str(k): v for k, v in results.items()}
return dict(results)
def json_datetime_handler(self,obj):
if isinstance(obj, datetime):
return obj.isoformat() # Or str(obj)
return str(obj)
async def _execute_with_retry(
self,
target_date: datetime,
location_tag: str,
max_retries: int = 3,
base_delay: float = 1.0
) -> Optional[float]:
"""
Execute API call with exponential backoff retry logic.
"""
for attempt in range(max_retries + 1):
try:
return await self._get_reliability_from_api_async(target_date, location_tag)
except aiohttp.ClientError as e:
if attempt == max_retries:
raise
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
except Exception as e:
raise # Non-retryable errors
def _get_equipment_fr(
self,
location_tag: str,
token: str
):
failure_rate_url = f"{self.api_base_url}/reliability/asset/failure-rate/{self.location_tag}"
try:
response = requests.get(
failure_rate_url,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
timeout=10
)
response.raise_for_status()
result = response.json()
except (requests.RequestException, ValueError) as e:
raise Exception(f"Failed to fetch or parse mdt data: {e}")
fr = result["data"]["failure_rate"]
return fr
def _calculate_costs_vectorized(self, reliabilities: Dict[datetime, float],
preventive_cost: float, failure_replacement_cost: float, failure_rate) -> List[Dict]:
valid_data = [(date, rel) for date, rel in reliabilities.items() if rel is not None]
if not valid_data:
return []
# Sort by date (past to future)
valid_data.sort(key=lambda x: x[0])
dates, reliability_values = zip(*valid_data)
dates = np.array(dates)
reliability_values = np.array(reliability_values)
# Calculate days from last OH
days_from_last_oh = np.array([(date - self.last_oh_date).days for date in dates])
# Calculate failure probabilities
failure_probs = 1 - reliability_values
# Calculate expected operating times using trapezoidal integration
# This is the denominator: ∫₀ᵀ R(t) dt for each T
expected_operating_times = np.zeros_like(days_from_last_oh, dtype=float)
for i in range(len(days_from_last_oh)):
# Time points from 0 to current point T
time_points = [(d - self.last_oh_date).days for d in dates[:i+1]]
# Reliability values (assuming reliability at time 0 is 1.0)
rel_values = reliability_values[:i+1]
# Calculate expected operating time up to this point
expected_operating_times[i] = np.trapz(rel_values, time_points)
# Calculate costs according to the formula
# Failure cost = (1-R(T)) × IDRu / ∫₀ᵀ R(t) dt
failure_costs = (failure_rate * failure_replacement_cost * expected_operating_times)
# Preventive cost = R(T) × IDRp / ∫₀ᵀ R(t) dt
preventive_costs = (reliability_values * preventive_cost) / expected_operating_times
# Total cost = Failure cost + Preventive cost
total_costs = failure_costs + preventive_costs
# Convert back to list of dictionaries
results = []
for i in range(len(dates)):
if i == 0:
continue
results.append({
'date': dates[i],
'days_from_last_oh': days_from_last_oh[i],
'reliability': reliability_values[i],
'failure_probability': failure_probs[i],
'expected_operating_time': expected_operating_times[i],
'failure_replacement_cost': failure_costs[i],
'preventive_replacement_cost': preventive_costs[i],
'total_cost': total_costs[i],
'procurement_cost': 0,
'procurement_details': []
})
return results
def _find_optimal_timing_vectorized(self, results: List[Dict]) -> Optional[Dict]:
"""
Vectorized optimal timing calculation
"""
if not results:
return None
total_costs = np.array([r['total_cost'] for r in results])
min_idx = np.argmin(total_costs)
optimal_result = results[min_idx]
return {
'optimal_index': min_idx,
'optimal_date': optimal_result['date'],
'days_from_last_oh': optimal_result['days_from_last_oh'],
'reliability': optimal_result['reliability'],
'failure_cost': optimal_result['failure_replacement_cost'],
'preventive_cost': optimal_result['preventive_replacement_cost'],
'total_cost': optimal_result['total_cost'],
'procurement_cost': 0
}
async def calculate_cost_all_equipment(
self,
db_session, # DbSession type
equipments: List, # List of equipment objects
preventive_cost: float,
calculation,
) -> Dict:
"""
Optimized calculation for entire fleet of equipment using concurrent API calls
"""
try:
max_interval = len(self.date_range)
preventive_cost_per_equipment = preventive_cost / len(equipments)
# Extract location tags for batch processing
location_tags = [eq.location_tag for eq in equipments]
print(f"Starting reliability data fetch for {len(equipments)} equipment...")
# Fetch all reliability data concurrently
all_reliabilities = await self._get_reliability_equipment_batch(location_tags)
print("Processing cost calculations...")
# Process each equipment's costs
fleet_results = []
total_corrective_costs = np.zeros(max_interval)
total_preventive_costs = np.zeros(max_interval)
total_procurement_costs = np.zeros(max_interval)
total_failures = np.zeros(max_interval)
for equipment in equipments:
location_tag = equipment.location_tag
cost_per_failure = equipment.material_cost
# Get pre-fetched reliability data
reliabilities = all_reliabilities.get(location_tag, {})
failure_rate = self._get_equipment_fr(location_tag, self.token)
if not reliabilities:
self.logger.warning(f"No reliability data for equipment {location_tag}")
continue
# Calculate costs using vectorized operations
predicted_costs = self._calculate_costs_vectorized(
reliabilities=reliabilities,
preventive_cost=preventive_cost_per_equipment,
failure_replacement_cost=cost_per_failure
failute_rate=failure_rate
)
if not predicted_costs:
self.logger.warning(f"No valid cost predictions for equipment {location_tag}")
continue
optimum_cost = self._find_optimal_timing_vectorized(predicted_costs)
# Pad arrays to ensure consistent length
corrective_costs = [r["failure_replacement_cost"] for r in predicted_costs]
preventive_costs = [r["preventive_replacement_cost"] for r in predicted_costs]
procurement_costs = [r["procurement_cost"] for r in predicted_costs]
procurement_details = [r["procurement_details"] for r in predicted_costs]
failures = [(1-r["reliability"]) for r in predicted_costs]
# Pad arrays to max_interval length
def pad_array(arr, target_length):
if len(arr) < target_length:
return arr + [0] * (target_length - len(arr))
return arr[:target_length]
corrective_costs = pad_array(corrective_costs, max_interval)
preventive_costs = pad_array(preventive_costs, max_interval)
procurement_costs = pad_array(procurement_costs, max_interval)
failures = pad_array(failures, max_interval)
fleet_results.append(
CalculationEquipmentResult( # Assuming this class exists
corrective_costs=corrective_costs,
overhaul_costs=preventive_costs,
procurement_costs=procurement_costs,
daily_failures=failures,
location_tag=equipment.location_tag,
material_cost=equipment.material_cost,
service_cost=equipment.service_cost,
optimum_day=optimum_cost['optimal_index'] if optimum_cost else 0,
calculation_data_id=calculation.id,
procurement_details=procurement_details
)
)
# Aggregate costs using vectorized operations
total_corrective_costs += np.array(corrective_costs)
total_preventive_costs += np.array(preventive_costs)
total_procurement_costs += np.array(procurement_costs)
# Calculate fleet optimal interval
total_costs = total_corrective_costs + total_preventive_costs + total_procurement_costs
fleet_optimal_index = np.argmin(total_costs)
calculation.optimum_oh_day = fleet_optimal_index + 1
calculation.max_interval = max_interval-1
# Batch database operations
db_session.add_all(fleet_results)
await db_session.commit()
self.logger.info(f"Calculation completed successfully for {len(fleet_results)} equipment")
return {
'id': calculation.id,
'fleet_results': fleet_results,
'fleet_optimal_interval': fleet_optimal_index + 1,
'fleet_optimal_cost': total_costs[fleet_optimal_index],
'total_corrective_costs': total_corrective_costs.tolist(),
'total_preventive_costs': total_preventive_costs.tolist(),
'total_procurement_costs': total_procurement_costs.tolist(),
}
finally:
# Always clean up the session
await self._close_session()
async def __aenter__(self):
"""Async context manager entry"""
await self._create_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
await self._close_session()
async def run_simulation(*, db_session: DbSession, calculation: CalculationData, token: str):
equipments = await get_standard_scope_by_session_id(
db_session=db_session, overhaul_session_id=calculation.overhaul_session_id
)
equipments = equipments[:100]
scope = await get_scope(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id)
prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope)
calculation_data = await get_calculation_data_by_id(
db_session=db_session, calculation_id=calculation.id
)
sparepars_query = await db_session.execute(
select(MasterSparePart))
spareparts = {
sparepart.id: {
'data': sparepart,
'stock': copy.copy(sparepart.stock)
} for sparepart in sparepars_query.scalars().all()
}
# reliability_service = ReliabilityService(base_url=REALIBILITY_SERVICE_API, use_dummy_data=True)
# spare_parts_service = SparePartsService(spareparts)
# optimum_calculator_service = OverhaulCalculator(reliability_service, spare_parts_service)
optimum_oh_model = OptimumCostModel(
token=token,
last_oh_date=prev_oh_scope.end_date,
next_oh_date=scope.start_date,
base_url=REALIBILITY_SERVICE_API
)
# Set the date range for the calculation
# if prev_oh_scope:
# # Start date is the day after the previous scope's end date
# start_date = datetime.datetime.combine(prev_oh_scope.end_date + datetime.timedelta(days=1), datetime.time.min)
# # End date is the start date of the current scope
# end_date = datetime.datetime.combine(scope.start_date, datetime.time.min)
# else:
# # If there's no previous scope, use the start and end dates from the current scope
# start_date = datetime.datetime.combine(scope.start_date, datetime.time.min)
# end_date = datetime.datetime.combine(scope.end_date, datetime.time.min)
results = await optimum_oh_model.calculate_cost_all_equipment(
db_session=db_session,
equipments=equipments,
calculation=calculation_data,
preventive_cost=calculation_data.parameter.overhaul_cost,
)
return results
async def get_corrective_cost_time_chart(
material_cost: float,
service_cost: float,
location_tag: str,
token,
start_date: datetime,
end_date: datetime
) -> Tuple[np.ndarray, np.ndarray]:
days_difference = (end_date - start_date).days
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
tomorrow = today + timedelta(days=1)
# Initialize monthly data dictionary
monthly_data = {}
latest_num = 1
# Handle historical data (any portion before or including today)
historical_start = start_date if start_date <= today else None
historical_end = min(today, end_date)
if historical_start and historical_start <= historical_end:
url_history = f"http://192.168.1.82:8000/reliability/main/failures/{location_tag}/{historical_start.strftime('%Y-%m-%d')}/{historical_end.strftime('%Y-%m-%d')}"
try:
response = requests.get(
url_history,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
history_data = response.json()
# Process historical data - accumulate failures by month
history_dict = {}
monthly_failures = {}
for item in history_data["data"]:
date = datetime.datetime.strptime(item["date"], "%d %b %Y")
month_key = datetime.datetime(date.year, date.month, 1)
# Initialize if first occurrence of this month
if month_key not in history_dict:
history_dict[month_key] = 0
# Accumulate failures for this month
if item["num_fail"] is not None:
history_dict[month_key] += item["num_fail"]
# Sort months chronologically
sorted_months = sorted(history_dict.keys())
if sorted_months:
failures = np.array([history_dict[month] for month in sorted_months])
cum_failure = np.cumsum(failures)
for month_key in sorted_months:
monthly_failures[month_key] = int(cum_failure[sorted_months.index(month_key)])
# Update monthly_data with cumulative historical data
monthly_data.update(monthly_failures)
# Get the latest number for predictions if we have historical data
if failures.size > 0:
latest_num = max(1, failures[-1]) # Use the last month's failures, minimum 1
except Exception as e:
raise Exception(f"Error fetching historical data: {e}")
if location_tag == '3TR-TF005':
raise Exception("tes",monthly_data)
if end_date >= start_date:
url_prediction = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
try:
response = requests.get(
url_prediction,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
prediction_data = response.json()
# Process prediction data - but only use it for future dates
if prediction_data["data"]:
for item in prediction_data["data"]:
date = datetime.strptime(item["date"], "%d %b %Y")
# Only apply prediction data for dates after today
if date > today:
month_key = datetime(date.year, date.month, 1)
monthly_data[month_key] = item["num_fail"] if item["num_fail"] is not None else 0
# Update latest_num with the last prediction if available
last_prediction = prediction_data["data"][-1]["num_fail"]
if last_prediction is not None:
latest_num = max(1, round(last_prediction))
except Exception as e:
print(f"Error fetching prediction data: {e}")
# Fill in any missing months in the range
current_date = datetime(start_date.year, start_date.month, 1)
end_month = datetime(end_date.year, end_date.month, 1)
while current_date <= end_month:
if current_date not in monthly_data:
# Try to find the most recent month with data
prev_months = [m for m in monthly_data.keys() if m < current_date]
if prev_months:
# Use the most recent previous month's data
latest_month = max(prev_months)
monthly_data[current_date] = monthly_data[latest_month]
else:
# If no previous months exist, look for future months
future_months = [m for m in monthly_data.keys() if m > current_date]
if future_months:
# Use the earliest future month's data
earliest_future = min(future_months)
monthly_data[current_date] = monthly_data[earliest_future]
else:
# No data available at all, use default
monthly_data[current_date] = latest_num
# Move to next month
if current_date.month == 12:
current_date = datetime(current_date.year + 1, 1, 1)
else:
current_date = datetime(current_date.year, current_date.month + 1, 1)
# Convert to list maintaining chronological order
complete_data = []
for month in sorted(monthly_data.keys()):
complete_data.append(monthly_data[month])
if latest_num < 1:
raise ValueError("Number of failures cannot be negative", latest_num)
# Convert to numpy array
monthly_failure = np.array(complete_data)
cost_per_failure = (material_cost + service_cost) / latest_num
raise Exception(monthly_data, location_tag)
try:
corrective_costs = monthly_failure * cost_per_failure
except Exception as e:
raise Exception(f"Error calculating corrective costs: {monthly_failure}", location_tag)
return corrective_costs, monthly_failure
# days_difference = (end_date - start_date).days
# today = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
# tomorrow = today + datetime.timedelta(days=1)
# url_prediction = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{tomorrow.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
# url_history = f"http://192.168.1.82:8000/reliability/main/failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{today.strftime('%Y-%m-%d')}"
# # Initialize monthly data dictionary
# monthly_data = {}
# # Get historical data (start_date to today)
# if start_date <= today:
# try:
# response = requests.get(
# url_history,
# headers={
# "Content-Type": "application/json",
# "Authorization": f"Bearer {token}",
# },
# )
# history_data = response.json()
# # Process historical data - accumulate failures by month
# history_dict = {}
# monthly_failures = {}
# for item in history_data["data"]:
# date = datetime.datetime.strptime(item["date"], "%d %b %Y")
# month_key = datetime.datetime(date.year, date.month, 1)
# # Initialize if first occurrence of this month
# if month_key not in history_dict:
# history_dict[month_key] = 0
# # Accumulate failures for this month
# if item["num_fail"] is not None:
# history_dict[month_key] += item["num_fail"]
# # Sort months chronologically
# sorted_months = sorted(history_dict.keys())
# failures = np.array([history_dict[month] for month in sorted_months])
# cum_failure = np.cumsum(failures)
# for month_key in sorted_months:
# monthly_failures[month_key] = int(cum_failure[sorted_months.index(month_key)])
# # Update monthly_data with cumulative historical data
# monthly_data.update(monthly_failures)
# except Exception as e:
# # print(f"Error fetching historical data: {e}")
# raise Exception(e)
# latest_num = 1
# # Get prediction data (today+1 to end_date)
# if end_date > today:
# try:
# response = requests.get(
# url_prediction,
# headers={
# "Content-Type": "application/json",
# "Authorization": f"Bearer {token}",
# },
# )
# prediction_data = response.json()
# # Use the last prediction value for future months
# # Get the latest number from prediction data
# latest_num = prediction_data["data"][-1]["num_fail"]
# # Ensure the value is at least 1
# if not latest_num or latest_num < 1:
# latest_num = 1
# else:
# # Round the number to the nearest integer
# latest_num = round(latest_num)
# # Create prediction dictionary
# prediction_dict = {}
# for item in prediction_data["data"]:
# date = datetime.datetime.strptime(item["date"], "%d %b %Y")
# month_key = datetime.datetime(date.year, date.month, 1)
# prediction_dict[month_key] = round(item["num_fail"])
# # Update monthly_data with prediction data
# for key in prediction_dict:
# if key not in monthly_data: # Don't overwrite historical data
# monthly_data[key] = prediction_dict[key]
# except Exception as e:
# print(f"Error fetching prediction data: {e}")
# # Create a complete date range covering all months from start to end
# current_date = datetime.datetime(start_date.year, start_date.month, 1)
# while current_date <= end_date:
# if current_date not in monthly_data:
# # Initialize to check previous months
# previous_month = current_date.replace(day=1) - datetime.timedelta(days=1)
# # Now previous_month is the last day of the previous month
# # Convert back to first day of previous month for consistency
# previous_month = previous_month.replace(day=1)
# # Keep going back until we find data or run out of months to check
# month_diff = (current_date.year - start_date.year) * 12 + (current_date.month - start_date.month)
# max_attempts = max(1, month_diff) # Ensure at least 1 attempt
# attempts = 0
# while previous_month not in monthly_data and attempts < max_attempts:
# # Move to the previous month (last day of the month before)
# previous_month = previous_month.replace(day=1) - datetime.timedelta(days=1)
# # Convert to first day of month
# previous_month = previous_month.replace(day=1)
# attempts += 1
# # Use the found value or default to 0 if no previous month with data exists
# if previous_month in monthly_data:
# monthly_data[current_date] = monthly_data[previous_month]
# else:
# monthly_data[current_date] = 0
# # Move to next month
# if current_date.month == 12:
# current_date = datetime.datetime(current_date.year + 1, 1, 1)
# else:
# current_date = datetime.datetime(current_date.year, current_date.month + 1, 1)
# # # Convert to list maintaining chronological order
# complete_data = []
# for month in sorted(monthly_data.keys()):
# complete_data.append(monthly_data[month])
# # Convert to numpy array
# monthly_failure = np.array(complete_data)
# cost_per_failure = (material_cost + service_cost) / latest_num
# if cost_per_failure == 0:
# raise ValueError("Cost per failure cannot be zero")
# # if location_tag == "3TR-TF005":
# # raise Exception(cost_per_failure, latest_num)
# corrective_costs = monthly_failure * cost_per_failure
# return corrective_costs, monthly_failure
# # except Exception as e:
# # print(f"Error fetching or processing data: {str(e)}")
# # raise
def get_overhaul_cost_by_time_chart(
overhaul_cost: float, months_num: int, numEquipments: int, decay_base: float = 1.01
) -> np.ndarray:
if overhaul_cost < 0:
raise ValueError("Overhaul cost cannot be negative")
if months_num <= 0:
raise ValueError("months_num must be positive")
rate = np.arange(1, months_num + 1)
cost_per_equipment = overhaul_cost / numEquipments
# results = cost_per_equipment - ((cost_per_equipment / hours) * rate)
results = cost_per_equipment / rate
return results
async def create_param_and_data(
*,
db_session: DbSession,
calculation_param_in: CalculationTimeConstrainsParametersCreate,
created_by: str,
parameter_id: Optional[UUID] = None,
):
"""Creates a new document."""
if calculation_param_in.ohSessionId is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="overhaul_session_id is required",
)
calculationData = await CalculationData.create_with_param(
db=db_session,
overhaul_session_id=calculation_param_in.ohSessionId,
avg_failure_cost=calculation_param_in.costPerFailure,
overhaul_cost=calculation_param_in.overhaulCost,
created_by=created_by,
params_id=parameter_id,
)
return calculationData
async def get_calculation_result(db_session: DbSession, calculation_id: str):
scope_calculation = await get_calculation_data_by_id(
db_session=db_session, calculation_id=calculation_id
)
if not scope_calculation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
scope_overhaul = await get_scope(
db_session=db_session, overhaul_session_id=scope_calculation.overhaul_session_id
)
if not scope_overhaul:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope_overhaul)
# # Set the date range for the calculation
# if prev_oh_scope:
# # Start date is the day after the previous scope's end date
# start_date = datetime.combine(prev_oh_scope.end_date + timedelta(days=1), time.min)
# # End date is the start date of the current scope
# end_date = datetime.combine(scope_overhaul.start_date, datetime.time.min)
# else:
# # If there's no previous scope, use the start and end dates from the current scope
# start_date = datetime.combine(scope_overhaul.start_date, datetime.time.min)
# end_date = datetime.combine(scope_overhaul.end_date, datetime.time.min)
data_num = scope_calculation.max_interval
calculation_results = []
for i in range(data_num):
result = {
"overhaul_cost": 0,
"corrective_cost": 0,
"procurement_cost": 0,
"num_failures": 0,
"day": i + 1,
"procurement_details": {},
}
## Add risk Cost
# risk cost = ((Down Time1 * MW Loss 1) + (Downtime2 * Mw 2) + .... (DowntimeN * MwN) ) * Harga listrik (Efficicency HL App)
for eq in scope_calculation.equipment_results:
if len(eq.procurement_details[i]) > 0:
result["procurement_details"][eq.location_tag] = {
"is_included": eq.is_included,
"details": eq.procurement_details[i],
}
if not eq.is_included:
continue
result["corrective_cost"] += float(eq.corrective_costs[i])
result["overhaul_cost"] += float(eq.overhaul_costs[i])
result["procurement_cost"] += float(eq.procurement_costs[i])
result["num_failures"] += int(eq.daily_failures[i])
calculation_results.append(CalculationResultsRead(**result))
# Check if calculation already exist
return CalculationTimeConstrainsRead(
id=scope_calculation.id,
reference=scope_calculation.overhaul_session_id,
scope=scope_overhaul.maintenance_type.name,
results=calculation_results,
optimum_oh=scope_calculation.optimum_oh_day,
equipment_results=scope_calculation.equipment_results,
)
async def get_calculation_data_by_id(
db_session: DbSession, calculation_id
) -> CalculationData:
stmt = (
select(CalculationData)
.filter(CalculationData.id == calculation_id)
.options(
joinedload(CalculationData.equipment_results),
joinedload(CalculationData.parameter),
)
)
result = await db_session.execute(stmt)
return result.unique().scalar()
async def get_calculation_by_assetnum(
*, db_session: DbSession, assetnum: str, calculation_id: str
):
stmt = (
select(CalculationEquipmentResult)
.where(CalculationEquipmentResult.assetnum == assetnum)
.where(CalculationEquipmentResult.calculation_data_id == calculation_id)
)
result = await db_session.execute(stmt)
return result.scalar()
async def get_number_of_failures(location_tag, start_date, end_date, token, max_interval=24):
url_prediction = (
f"http://192.168.1.82:8000/reliability/main/number-of-failures/"
f"{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
)
results = {}
try:
response = requests.get(
url_prediction,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
timeout=10
)
response.raise_for_status()
prediction_data = response.json()
except (requests.RequestException, ValueError) as e:
raise Exception(f"Failed to fetch or parse prediction data: {e}")
if not prediction_data or "data" not in prediction_data or not isinstance(prediction_data["data"], list):
raise Exception("Invalid or empty prediction data format.")
last_data = prediction_data["data"][-1]
last_data_date = datetime.strptime(last_data["date"], "%d %b %Y")
results[datetime.date(last_data_date.year, last_data_date.month, last_data_date.day)] = round(last_data["num_fail"]) if last_data["num_fail"] is not None else 0
# Parse prediction data
for item in prediction_data["data"]:
try:
date = datetime.strptime(item["date"], "%d %b %Y")
last_day = calendar.monthrange(date.year, date.month)[1]
value = item.get("num_fail", 0)
if date.day == last_day:
if date.month == start_date.month and date.year == start_date.year:
results[date.date()] = 0
else:
results[date.date()] = 0 if value <= 0 else int(value)
except (KeyError, ValueError):
continue # skip invalid items
# Fill missing months with 0
current = start_date.replace(day=1)
for _ in range(max_interval):
last_day = calendar.monthrange(current.year, current.month)[1]
last_day_date = datetime.date(current.year, current.month, last_day)
if last_day_date not in results:
results[last_day_date] = 0
# move to next month
if current.month == 12:
current = current.replace(year=current.year + 1, month=1)
else:
current = current.replace(month=current.month + 1)
# Sort results by date
results = dict(sorted(results.items()))
return results
async def get_equipment_foh(
location_tag: str,
token: str
):
url_mdt = (
f"http://192.168.1.82:8000/reliability/asset/mdt/{location_tag}"
)
try:
response = requests.get(
url_mdt,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
timeout=10
)
response.raise_for_status()
result = response.json()
except (requests.RequestException, ValueError) as e:
raise Exception(f"Failed to fetch or parse mdt data: {e}")
mdt_data = result["data"]["hours"]
return mdt_data
# Function to simulate overhaul strategy for a single equipment
def simulate_equipment_overhaul(equipment, preventive_cost,predicted_num_failures, interval_months, forced_outage_hours_value ,total_months=24):
"""
Simulates overhaul strategy for a specific piece of equipment
and returns the associated costs.
"""
total_preventive_cost = 0
total_corrective_cost = 0
months_since_overhaul = 0
failures_by_month = {i: val for i, (date, val) in enumerate(sorted(predicted_num_failures.items()))}
cost_per_failure = equipment.material_cost
# Simulate for the total period
for month in range(total_months):
# If it's time for overhaul
if months_since_overhaul >= interval_months:
# Perform preventive overhaul
total_preventive_cost += preventive_cost
months_since_overhaul = 0
if months_since_overhaul == 0:
# Calculate failures for this month based on time since last overhaul
expected_failures = 0
equivalent_force_derated_hours = 0
failure_cost = (expected_failures * cost_per_failure) + ((forced_outage_hours_value + equivalent_force_derated_hours) * equipment.service_cost)
total_corrective_cost += failure_cost
else:
# Calculate failures for this month based on time since last overhaul
expected_failures = failures_by_month.get(months_since_overhaul, 0)
equivalent_force_derated_hours = 0
failure_cost = (expected_failures * cost_per_failure) + ((forced_outage_hours_value + equivalent_force_derated_hours) * equipment.service_cost)
total_corrective_cost += failure_cost
# Increment time since overhaul
months_since_overhaul += 1
# Calculate costs per month (to normalize for comparison)
monthly_preventive_cost = total_preventive_cost / total_months
monthly_corrective_cost = total_corrective_cost / total_months
monthly_total_cost = monthly_preventive_cost + monthly_corrective_cost
return {
'interval': interval_months,
'preventive_cost': monthly_preventive_cost,
'corrective_cost': monthly_corrective_cost,
'total_cost': monthly_total_cost
}
async def create_calculation_result_service(
db_session: DbSession, calculation: CalculationData, token: str
) -> CalculationTimeConstrainsRead:
# Get all equipment for this calculation session
equipments = await get_all_by_session_id(
db_session=db_session, overhaul_session_id=calculation.overhaul_session_id
)
scope = await get_scope(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id)
prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope)
calculation_data = await get_calculation_data_by_id(
db_session=db_session, calculation_id=calculation.id
)
# Set the date range for the calculation
if prev_oh_scope:
# Start date is the day after the previous scope's end date
start_date = datetime.combine(prev_oh_scope.end_date + datetime.timedelta(days=1), datetime.time.min)
# End date is the start date of the current scope
end_date = datetime.combine(scope.start_date, datetime.time.min)
else:
# If there's no previous scope, use the start and end dates from the current scope
start_date = datetime.combine(scope.start_date, datetime.time.min)
end_date = datetime.combine(scope.end_date, datetime.time.min)
max_interval = get_months_between(start_date, end_date)
overhaul_cost = calculation_data.parameter.overhaul_cost / len(equipments)
# Store results for each equipment
results = []
total_corrective_costs = np.zeros(max_interval)
total_overhaul_costs = np.zeros(max_interval)
total_daily_failures = np.zeros(max_interval)
total_costs = np.zeros(max_interval)
# Calculate for each equipment
for eq in equipments:
equipment_results = []
corrective_costs = []
overhaul_costs = []
total = []
predicted_num_failures = await get_number_of_failures(
location_tag=eq.location_tag,
start_date=start_date,
end_date=end_date,
token=token
)
foh_value = await get_equipment_foh(
location_tag=eq.location_tag,
token=token
)
for interval in range(1, max_interval+1):
result = simulate_equipment_overhaul(eq, overhaul_cost, predicted_num_failures, interval, foh_value, total_months=max_interval)
corrective_costs.append(result['corrective_cost'])
overhaul_costs.append(result['preventive_cost'])
total.append(result['total_cost'])
equipment_results.append(result)
optimal_result = min(equipment_results, key=lambda x: x['total_cost'])
results.append(
CalculationEquipmentResult(
corrective_costs=corrective_costs,
overhaul_costs=overhaul_costs,
daily_failures=[failure for _, failure in predicted_num_failures.items()],
assetnum=eq.assetnum,
material_cost=eq.material_cost,
service_cost=eq.service_cost,
optimum_day=optimal_result['interval'],
calculation_data_id=calculation.id,
master_equipment=eq.master_equipment,
)
)
if len(predicted_num_failures.values()) < max_interval:
raise Exception(eq.equipment.assetnum)
total_corrective_costs += np.array(corrective_costs)
total_overhaul_costs += np.array(overhaul_costs)
total_daily_failures += np.array([failure for _, failure in predicted_num_failures.items()])
total_costs += np.array(total_costs)
db_session.add_all(results)
total_costs_point = total_corrective_costs + total_overhaul_costs
# Calculate optimum points using total costs
optimum_oh_index = np.argmin(total_costs_point)
numbers_of_failure = sum(total_daily_failures[:optimum_oh_index])
optimum = OptimumResult(
overhaul_cost=float(total_overhaul_costs[optimum_oh_index]),
corrective_cost=float(total_corrective_costs[optimum_oh_index]),
num_failures=int(numbers_of_failure),
days=int(optimum_oh_index + 1),
)
calculation.optimum_oh_day = optimum.days
await db_session.commit()
# Return results including individual equipment data
return CalculationTimeConstrainsRead(
id=calculation.id,
reference=calculation.overhaul_session_id,
scope=scope.type,
results=[],
optimum_oh=optimum,
equipment_results=results,
)
async def get_calculation_by_reference_and_parameter(
*, db_session: DbSession, calculation_reference_id, parameter_id
):
stmt = select(CalculationData).filter(
and_(
CalculationData.reference_id == calculation_reference_id,
CalculationData.parameter_id == parameter_id,
)
)
result = await db_session.execute(stmt)
return result.scalar()
async def get_calculation_result_by_day(
*, db_session: DbSession, calculation_id, simulation_day
):
stmt = select(CalculationResult).filter(
and_(
CalculationResult.day == simulation_day,
CalculationResult.calculation_data_id == calculation_id,
)
)
result = await db_session.execute(stmt)
return result.scalar()
async def get_avg_cost_by_asset(*, db_session: DbSession, assetnum: str):
stmt = select(func.avg(MasterWorkOrder.total_cost_max).label("average_cost")).where(
MasterWorkOrder.assetnum == assetnum
)
result = await db_session.execute(stmt)
return result.scalar_one_or_none()
async def bulk_update_equipment(
*,
db: DbSession,
selected_equipments: List[CalculationSelectedEquipmentUpdate],
calculation_data_id: UUID,
):
# Create a dictionary mapping assetnum to is_included status
case_mappings = {asset.location_tag: asset.is_included for asset in selected_equipments}
# Get all assetnums that need to be updated
assetnums = list(case_mappings.keys())
# Create a list of when clauses for the case statement
when_clauses = [
(CalculationEquipmentResult.location_tag == location_tag, is_included)
for location_tag, is_included in case_mappings.items()
]
# Build the update statement
stmt = (
update(CalculationEquipmentResult)
.where(CalculationEquipmentResult.calculation_data_id == calculation_data_id)
.where(CalculationEquipmentResult.assetnum.in_(assetnums))
.values(
{
"is_included": case(
*when_clauses
) # Unpack the when clauses as separate arguments
}
)
)
await db.execute(stmt)
await db.commit()
return assetnums