import datetime from typing import defaultdict, Coroutine, List, Optional, Tuple,Dict from uuid import UUID import calendar import numpy as np import requests from fastapi import HTTPException, status from sqlalchemy import and_, case, func, select, update from sqlalchemy.orm import joinedload from src.database.core import DbSession from src.overhaul_activity.service import get_all_by_session_id from src.overhaul_scope.service import get as get_scope, get_prev_oh from src.utils import get_latest_numOfFail from src.workorder.model import MasterWorkOrder from src.sparepart.model import MasterSparePart from src.overhaul_activity.model import OverhaulActivity from .model import (CalculationData, CalculationEquipmentResult, CalculationResult) from .schema import (CalculationResultsRead, CalculationSelectedEquipmentUpdate, CalculationTimeConstrainsParametersCreate, CalculationTimeConstrainsRead, OptimumResult) from .utils import get_months_between from src.scope_equipment_part.model import ScopeEquipmentPart import copy class ReliabilityService: """Service class for handling reliability API calls""" def __init__(self, base_url: str = "http://192.168.1.82:8000"): self.base_url = base_url async def get_number_of_failures(self, location_tag, start_date, end_date, token, max_interval=24): url_prediction = ( f"http://192.168.1.82:8000/reliability/main/number-of-failures/" f"{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" ) results = {} try: response = requests.get( url_prediction, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {token}", }, timeout=10 ) response.raise_for_status() prediction_data = response.json() except (requests.RequestException, ValueError) as e: raise Exception(f"Failed to fetch or parse prediction data: {e}") if not prediction_data or "data" not in prediction_data or not isinstance(prediction_data["data"], list): raise Exception("Invalid or empty prediction data format.") # Since data is cumulative, we need to preserve the decimal values last_cumulative_value = 0 # Parse prediction data and preserve cumulative nature for item in prediction_data["data"]: try: date = datetime.datetime.strptime(item["date"], "%d %b %Y") last_day = calendar.monthrange(date.year, date.month)[1] value = item.get("num_fail", 0) if date.day == last_day: # End of month if value is not None and value > 0: # PRESERVE the decimal values - don't convert to int! results[date.date()] = round(float(value), 3) # Keep 3 decimal places last_cumulative_value = float(value) else: # If no value, use previous cumulative value results[date.date()] = last_cumulative_value except (KeyError, ValueError): continue # Fill missing months by continuing the cumulative trend current = start_date.replace(day=1) for _ in range(max_interval): last_day = calendar.monthrange(current.year, current.month)[1] last_day_date = datetime.date(current.year, current.month, last_day) if last_day_date not in results: # Since it's cumulative, add a small increment to continue the trend # You can adjust this increment based on your typical monthly increase monthly_increment = 0.05 # Adjust this value based on your data pattern last_cumulative_value += monthly_increment results[last_day_date] = round(last_cumulative_value, 3) else: # Update our tracking value last_cumulative_value = results[last_day_date] # Move to next month if current.month == 12: current = current.replace(year=current.year + 1, month=1) else: current = current.replace(month=current.month + 1) # Sort results by date results = dict(sorted(results.items())) return results async def get_equipment_foh(self, location_tag: str, token: str) -> float: """ Get forced outage hours for equipment """ url = f"{self.base_url}/reliability/asset/mdt/{location_tag}" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {token}", } try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() result = response.json() return result["data"]["hours"] except (requests.RequestException, ValueError) as e: raise Exception(f"Failed to fetch FOH data for {location_tag}: {e}") def _parse_failure_predictions( self, prediction_data: List[dict], start_date: datetime.date, max_interval: int ) -> Dict[datetime.date, int]: """ Parse and normalize failure prediction data """ results = {} # Parse prediction data for item in prediction_data: try: date = datetime.datetime.strptime(item["date"], "%d %b %Y").date() last_day = calendar.monthrange(date.year, date.month)[1] value = item.get("num_fail", 0) if date.day == last_day: if date.month == start_date.month and date.year == start_date.year: results[date] = 0 else: results[date] = max(0, int(value)) if value is not None else 0 except (KeyError, ValueError): continue # Fill missing months with 0 current = start_date.replace(day=1) for _ in range(max_interval): last_day = calendar.monthrange(current.year, current.month)[1] last_day_date = datetime.date(current.year, current.month, last_day) if last_day_date not in results: results[last_day_date] = 0 # Move to next month if current.month == 12: current = current.replace(year=current.year + 1, month=1) else: current = current.replace(month=current.month + 1) return dict(sorted(results.items())) class SparePartsService: """Service class for spare parts management and procurement calculations""" def __init__(self, spare_parts_db: dict): self.spare_parts_db = spare_parts_db def calculate_stock_at_date(self, sparepart_id: UUID, target_date: datetime.date): """ Calculate projected stock for a spare part at a specific date """ if sparepart_id not in self.spare_parts_db: return 0 spare_part = self.spare_parts_db[sparepart_id] projected_stock = spare_part["stock"] # Add all procurements that arrive by target_date for procurement in spare_part["data"].sparepart_procurements: eta_date = getattr(procurement, procurement.status, None) if eta_date and eta_date <= target_date: projected_stock += procurement.quantity return projected_stock async def check_spare_parts_availability( self, db_session: DbSession, equipment: OverhaulActivity, overhaul_date: datetime.date ) -> Tuple[bool, List[dict]]: """ Check if spare parts are available for equipment overhaul at specific date. If not available, calculate procurement costs needed. """ procurement_costs = [] all_available = True requirements_query = select(ScopeEquipmentPart).where( ScopeEquipmentPart.assetnum == equipment.assetnum ) requirements = await db_session.execute(requirements_query) requirements = requirements.scalars().all() for requirement in requirements: sparepart_id = requirement.sparepart_id quantity_needed = requirement.required_stock if sparepart_id not in self.spare_parts_db: raise Exception(f"Spare part {sparepart_id} not found in database") spare_part = self.spare_parts_db[sparepart_id]["data"] available_stock = self.calculate_stock_at_date(sparepart_id, overhaul_date) if available_stock < quantity_needed: # Need to procure additional stock shortage = quantity_needed - available_stock procurement_cost = { "sparepart_id": str(sparepart_id), "sparepart_name": spare_part.name, "quantity": shortage, "cost_per_unit": spare_part.cost_per_stock, "total_cost": shortage * spare_part.cost_per_stock, "description": f"Insufficient projected stock for {spare_part.name} on {overhaul_date} (need: {quantity_needed}, projected: {available_stock})" } procurement_costs.append(procurement_cost) all_available = False else: spare_part.stock -= quantity_needed return all_available, procurement_costs class OverhaulCalculator: """Main calculator for overhaul cost optimization""" def __init__( self, reliability_service: ReliabilityService, spare_parts_service: SparePartsService ): self.reliability_service = reliability_service self.spare_parts_service = spare_parts_service async def simulate_equipment_overhaul( self, db_session: DbSession, equipment, preventive_cost: float, predicted_failures: Dict[datetime.date, int], interval_months: int, forced_outage_hours: float, start_date: datetime.date, total_months: int = 24 ): """ Simulate overhaul strategy for specific equipment including spare parts costs """ total_preventive_cost = 0 total_corrective_cost = 0 total_procurement_cost = 0 all_procurement_details = [] months_since_overhaul = 0 # Convert failures dict to month-indexed dict failures_by_month = { i: val for i, (date, val) in enumerate(sorted(predicted_failures.items())) } cost_per_failure = equipment.material_cost # Simulate for the total period for month in range(total_months): # Calculate current date current_date = self._add_months_to_date(start_date, month) # Check if it's time for overhaul if months_since_overhaul >= interval_months: # Perform preventive overhaul total_preventive_cost += preventive_cost months_since_overhaul = 0 # Calculate corrective costs if months_since_overhaul == 0: expected_failures = 0 # No failures immediately after overhaul else: expected_failures = failures_by_month.get(months_since_overhaul, 0) equivalent_force_derated_hours = 0 # Can be enhanced based on requirements failure_cost = ( (expected_failures * cost_per_failure) + ((forced_outage_hours + equivalent_force_derated_hours) * equipment.service_cost) ) total_corrective_cost += failure_cost months_since_overhaul += 1 overhaul_target_date = self._add_months_to_date(start_date, interval_months) # Check spare parts availability and calculate procurement costs parts_available, procurement_costs = await self.spare_parts_service.check_spare_parts_availability( db_session, equipment, overhaul_target_date ) # Add procurement costs if parts are not available if not parts_available: month_procurement_cost = sum(pc["total_cost"] for pc in procurement_costs) total_procurement_cost += month_procurement_cost all_procurement_details.extend(procurement_costs) # Calculate monthly averages monthly_preventive_cost = total_preventive_cost / total_months monthly_corrective_cost = total_corrective_cost / total_months monthly_total_cost = monthly_preventive_cost + monthly_corrective_cost + total_procurement_cost return { "interval_months":interval_months, "preventive_cost":monthly_preventive_cost, "corrective_cost":monthly_corrective_cost, "procurement_cost":total_procurement_cost, "total_cost":monthly_total_cost, "procurement_details":all_procurement_details } async def find_optimal_overhaul_interval( self, db_session: DbSession, equipment, preventive_cost: float, predicted_failures: Dict[datetime.date, int], forced_outage_hours: float, start_date: datetime.date, max_interval: int = 24 ): """ Find optimal overhaul interval by testing different intervals """ all_results = [] for interval in range(1, max_interval + 1): result = await self.simulate_equipment_overhaul( db_session=db_session, equipment=equipment, preventive_cost=preventive_cost, predicted_failures=predicted_failures, interval_months=interval, forced_outage_hours=forced_outage_hours, start_date=start_date, total_months=max_interval ) all_results.append(result) # Find optimal result (minimum total cost) optimal_result = min(all_results, key=lambda x: x["total_cost"]) return optimal_result, all_results async def calculate_fleet_optimization( self, db_session: DbSession, equipments: list, overhaul_cost: float, start_date: datetime.date, end_date: datetime.date, calculation, token: str ) -> Dict: """ Calculate optimization for entire fleet of equipment """ max_interval = self._get_months_between(start_date, end_date) preventive_cost_per_equipment = overhaul_cost / len(equipments) fleet_results = [] total_corrective_costs = np.zeros(max_interval) total_preventive_costs = np.zeros(max_interval) total_procurement_costs = np.zeros(max_interval) total_failures = np.zeros(max_interval) for equipment in equipments: # Get reliability data predicted_failures = await self.reliability_service.get_number_of_failures( location_tag=equipment.equipment.location_tag, start_date=start_date, end_date=end_date, token=token ) forced_outage_hours = await self.reliability_service.get_equipment_foh( location_tag=equipment.equipment.location_tag, token=token ) # Find optimal interval for this equipment optimal_result, all_results = await self.find_optimal_overhaul_interval( db_session=db_session, equipment=equipment, preventive_cost=preventive_cost_per_equipment, predicted_failures=predicted_failures, forced_outage_hours=forced_outage_hours, start_date=start_date, max_interval=max_interval ) # Aggregate costs corrective_costs = [r["corrective_cost"] for r in all_results] preventive_costs = [r["preventive_cost"] for r in all_results] procurement_costs = [r["procurement_cost"] for r in all_results] procurement_details = [r["procurement_details"] for r in all_results] failures = list(predicted_failures.values()) fleet_results.append( CalculationEquipmentResult( corrective_costs=corrective_costs, overhaul_costs=preventive_costs, procurement_costs=procurement_costs, daily_failures=failures, assetnum=equipment.assetnum, material_cost=equipment.material_cost, service_cost=equipment.service_cost, optimum_day=optimal_result["interval_months"], calculation_data_id=calculation.id, master_equipment=equipment.equipment, procurement_details=procurement_details ) ) total_corrective_costs += np.array(corrective_costs) total_preventive_costs += np.array(preventive_costs) total_procurement_costs += np.array(procurement_costs) # Calculate fleet optimal interval total_costs = total_corrective_costs + total_preventive_costs + total_procurement_costs fleet_optimal_index = np.argmin(total_costs) calculation.optimum_oh_day =fleet_optimal_index + 1 db_session.add_all(fleet_results) await db_session.commit() return { 'id': calculation.id, 'fleet_results': fleet_results, 'fleet_optimal_interval': fleet_optimal_index + 1, 'fleet_optimal_cost': total_costs[fleet_optimal_index], 'total_corrective_costs': total_corrective_costs.tolist(), 'total_preventive_costs': total_preventive_costs.tolist(), 'total_procurement_costs': total_procurement_costs.tolist(), } def _add_months_to_date(self, start_date: datetime.date, months: int) -> datetime.date: """Helper method to add months to a date""" year = start_date.year month = start_date.month + months while month > 12: year += 1 month -= 12 return datetime.date(year, month, start_date.day) def _get_months_between(self, start_date: datetime.date, end_date: datetime.date) -> int: """Calculate number of months between two dates""" return (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month) async def run_simulation(*, db_session: DbSession, calculation: CalculationData, token: str): equipments = await get_all_by_session_id( db_session=db_session, overhaul_session_id=calculation.overhaul_session_id ) scope = await get_scope(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id) prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope) calculation_data = await get_calculation_data_by_id( db_session=db_session, calculation_id=calculation.id ) sparepars_query = await db_session.execute( select(MasterSparePart)) spareparts = { sparepart.id: { 'data': sparepart, 'stock': copy.copy(sparepart.stock) } for sparepart in sparepars_query.scalars().all() } reliability_service = ReliabilityService() spare_parts_service = SparePartsService(spareparts) optimum_calculator_service = OverhaulCalculator(reliability_service, spare_parts_service) # Set the date range for the calculation if prev_oh_scope: # Start date is the day after the previous scope's end date start_date = datetime.datetime.combine(prev_oh_scope.end_date + datetime.timedelta(days=1), datetime.time.min) # End date is the start date of the current scope end_date = datetime.datetime.combine(scope.start_date, datetime.time.min) else: # If there's no previous scope, use the start and end dates from the current scope start_date = datetime.datetime.combine(scope.start_date, datetime.time.min) end_date = datetime.datetime.combine(scope.end_date, datetime.time.min) results = await optimum_calculator_service.calculate_fleet_optimization( db_session=db_session, equipments=equipments, start_date=start_date, end_date=end_date, overhaul_cost=calculation_data.parameter.overhaul_cost, calculation=calculation, token=token ) return results async def get_corrective_cost_time_chart( material_cost: float, service_cost: float, location_tag: str, token, start_date: datetime.datetime, end_date: datetime.datetime ) -> Tuple[np.ndarray, np.ndarray]: days_difference = (end_date - start_date).days today = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) tomorrow = today + datetime.timedelta(days=1) # Initialize monthly data dictionary monthly_data = {} latest_num = 1 # Handle historical data (any portion before or including today) historical_start = start_date if start_date <= today else None historical_end = min(today, end_date) if historical_start and historical_start <= historical_end: url_history = f"http://192.168.1.82:8000/reliability/main/failures/{location_tag}/{historical_start.strftime('%Y-%m-%d')}/{historical_end.strftime('%Y-%m-%d')}" try: response = requests.get( url_history, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {token}", }, ) history_data = response.json() # Process historical data - accumulate failures by month history_dict = {} monthly_failures = {} for item in history_data["data"]: date = datetime.datetime.strptime(item["date"], "%d %b %Y") month_key = datetime.datetime(date.year, date.month, 1) # Initialize if first occurrence of this month if month_key not in history_dict: history_dict[month_key] = 0 # Accumulate failures for this month if item["num_fail"] is not None: history_dict[month_key] += item["num_fail"] # Sort months chronologically sorted_months = sorted(history_dict.keys()) if sorted_months: failures = np.array([history_dict[month] for month in sorted_months]) cum_failure = np.cumsum(failures) for month_key in sorted_months: monthly_failures[month_key] = int(cum_failure[sorted_months.index(month_key)]) # Update monthly_data with cumulative historical data monthly_data.update(monthly_failures) # Get the latest number for predictions if we have historical data if failures.size > 0: latest_num = max(1, failures[-1]) # Use the last month's failures, minimum 1 except Exception as e: raise Exception(f"Error fetching historical data: {e}") if location_tag == '3TR-TF005': raise Exception("tes",monthly_data) if end_date >= start_date: url_prediction = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" try: response = requests.get( url_prediction, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {token}", }, ) prediction_data = response.json() # Process prediction data - but only use it for future dates if prediction_data["data"]: for item in prediction_data["data"]: date = datetime.datetime.strptime(item["date"], "%d %b %Y") # Only apply prediction data for dates after today if date > today: month_key = datetime.datetime(date.year, date.month, 1) monthly_data[month_key] = item["num_fail"] if item["num_fail"] is not None else 0 # Update latest_num with the last prediction if available last_prediction = prediction_data["data"][-1]["num_fail"] if last_prediction is not None: latest_num = max(1, round(last_prediction)) except Exception as e: print(f"Error fetching prediction data: {e}") # Fill in any missing months in the range current_date = datetime.datetime(start_date.year, start_date.month, 1) end_month = datetime.datetime(end_date.year, end_date.month, 1) while current_date <= end_month: if current_date not in monthly_data: # Try to find the most recent month with data prev_months = [m for m in monthly_data.keys() if m < current_date] if prev_months: # Use the most recent previous month's data latest_month = max(prev_months) monthly_data[current_date] = monthly_data[latest_month] else: # If no previous months exist, look for future months future_months = [m for m in monthly_data.keys() if m > current_date] if future_months: # Use the earliest future month's data earliest_future = min(future_months) monthly_data[current_date] = monthly_data[earliest_future] else: # No data available at all, use default monthly_data[current_date] = latest_num # Move to next month if current_date.month == 12: current_date = datetime.datetime(current_date.year + 1, 1, 1) else: current_date = datetime.datetime(current_date.year, current_date.month + 1, 1) # Convert to list maintaining chronological order complete_data = [] for month in sorted(monthly_data.keys()): complete_data.append(monthly_data[month]) if latest_num < 1: raise ValueError("Number of failures cannot be negative", latest_num) # Convert to numpy array monthly_failure = np.array(complete_data) cost_per_failure = (material_cost + service_cost) / latest_num raise Exception(monthly_data, location_tag) try: corrective_costs = monthly_failure * cost_per_failure except Exception as e: raise Exception(f"Error calculating corrective costs: {monthly_failure}", location_tag) return corrective_costs, monthly_failure # days_difference = (end_date - start_date).days # today = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) # tomorrow = today + datetime.timedelta(days=1) # url_prediction = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{tomorrow.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" # url_history = f"http://192.168.1.82:8000/reliability/main/failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{today.strftime('%Y-%m-%d')}" # # Initialize monthly data dictionary # monthly_data = {} # # Get historical data (start_date to today) # if start_date <= today: # try: # response = requests.get( # url_history, # headers={ # "Content-Type": "application/json", # "Authorization": f"Bearer {token}", # }, # ) # history_data = response.json() # # Process historical data - accumulate failures by month # history_dict = {} # monthly_failures = {} # for item in history_data["data"]: # date = datetime.datetime.strptime(item["date"], "%d %b %Y") # month_key = datetime.datetime(date.year, date.month, 1) # # Initialize if first occurrence of this month # if month_key not in history_dict: # history_dict[month_key] = 0 # # Accumulate failures for this month # if item["num_fail"] is not None: # history_dict[month_key] += item["num_fail"] # # Sort months chronologically # sorted_months = sorted(history_dict.keys()) # failures = np.array([history_dict[month] for month in sorted_months]) # cum_failure = np.cumsum(failures) # for month_key in sorted_months: # monthly_failures[month_key] = int(cum_failure[sorted_months.index(month_key)]) # # Update monthly_data with cumulative historical data # monthly_data.update(monthly_failures) # except Exception as e: # # print(f"Error fetching historical data: {e}") # raise Exception(e) # latest_num = 1 # # Get prediction data (today+1 to end_date) # if end_date > today: # try: # response = requests.get( # url_prediction, # headers={ # "Content-Type": "application/json", # "Authorization": f"Bearer {token}", # }, # ) # prediction_data = response.json() # # Use the last prediction value for future months # # Get the latest number from prediction data # latest_num = prediction_data["data"][-1]["num_fail"] # # Ensure the value is at least 1 # if not latest_num or latest_num < 1: # latest_num = 1 # else: # # Round the number to the nearest integer # latest_num = round(latest_num) # # Create prediction dictionary # prediction_dict = {} # for item in prediction_data["data"]: # date = datetime.datetime.strptime(item["date"], "%d %b %Y") # month_key = datetime.datetime(date.year, date.month, 1) # prediction_dict[month_key] = round(item["num_fail"]) # # Update monthly_data with prediction data # for key in prediction_dict: # if key not in monthly_data: # Don't overwrite historical data # monthly_data[key] = prediction_dict[key] # except Exception as e: # print(f"Error fetching prediction data: {e}") # # Create a complete date range covering all months from start to end # current_date = datetime.datetime(start_date.year, start_date.month, 1) # while current_date <= end_date: # if current_date not in monthly_data: # # Initialize to check previous months # previous_month = current_date.replace(day=1) - datetime.timedelta(days=1) # # Now previous_month is the last day of the previous month # # Convert back to first day of previous month for consistency # previous_month = previous_month.replace(day=1) # # Keep going back until we find data or run out of months to check # month_diff = (current_date.year - start_date.year) * 12 + (current_date.month - start_date.month) # max_attempts = max(1, month_diff) # Ensure at least 1 attempt # attempts = 0 # while previous_month not in monthly_data and attempts < max_attempts: # # Move to the previous month (last day of the month before) # previous_month = previous_month.replace(day=1) - datetime.timedelta(days=1) # # Convert to first day of month # previous_month = previous_month.replace(day=1) # attempts += 1 # # Use the found value or default to 0 if no previous month with data exists # if previous_month in monthly_data: # monthly_data[current_date] = monthly_data[previous_month] # else: # monthly_data[current_date] = 0 # # Move to next month # if current_date.month == 12: # current_date = datetime.datetime(current_date.year + 1, 1, 1) # else: # current_date = datetime.datetime(current_date.year, current_date.month + 1, 1) # # # Convert to list maintaining chronological order # complete_data = [] # for month in sorted(monthly_data.keys()): # complete_data.append(monthly_data[month]) # # Convert to numpy array # monthly_failure = np.array(complete_data) # cost_per_failure = (material_cost + service_cost) / latest_num # if cost_per_failure == 0: # raise ValueError("Cost per failure cannot be zero") # # if location_tag == "3TR-TF005": # # raise Exception(cost_per_failure, latest_num) # corrective_costs = monthly_failure * cost_per_failure # return corrective_costs, monthly_failure # # except Exception as e: # # print(f"Error fetching or processing data: {str(e)}") # # raise def get_overhaul_cost_by_time_chart( overhaul_cost: float, months_num: int, numEquipments: int, decay_base: float = 1.01 ) -> np.ndarray: if overhaul_cost < 0: raise ValueError("Overhaul cost cannot be negative") if months_num <= 0: raise ValueError("months_num must be positive") rate = np.arange(1, months_num + 1) cost_per_equipment = overhaul_cost / numEquipments # results = cost_per_equipment - ((cost_per_equipment / hours) * rate) results = cost_per_equipment / rate return results async def create_param_and_data( *, db_session: DbSession, calculation_param_in: CalculationTimeConstrainsParametersCreate, created_by: str, parameter_id: Optional[UUID] = None, ): """Creates a new document.""" if calculation_param_in.ohSessionId is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="overhaul_session_id is required", ) calculationData = await CalculationData.create_with_param( db=db_session, overhaul_session_id=calculation_param_in.ohSessionId, avg_failure_cost=calculation_param_in.costPerFailure, overhaul_cost=calculation_param_in.overhaulCost, created_by=created_by, params_id=parameter_id, ) return calculationData async def get_calculation_result(db_session: DbSession, calculation_id: str): scope_calculation = await get_calculation_data_by_id( db_session=db_session, calculation_id=calculation_id ) if not scope_calculation: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="A data with this id does not exist.", ) scope_overhaul = await get_scope( db_session=db_session, overhaul_session_id=scope_calculation.overhaul_session_id ) if not scope_overhaul: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="A data with this id does not exist.", ) prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope_overhaul) # Set the date range for the calculation if prev_oh_scope: # Start date is the day after the previous scope's end date start_date = datetime.datetime.combine(prev_oh_scope.end_date + datetime.timedelta(days=1), datetime.time.min) # End date is the start date of the current scope end_date = datetime.datetime.combine(scope_overhaul.start_date, datetime.time.min) else: # If there's no previous scope, use the start and end dates from the current scope start_date = datetime.datetime.combine(scope_overhaul.start_date, datetime.time.min) end_date = datetime.datetime.combine(scope_overhaul.end_date, datetime.time.min) months_num = get_months_between(start_date, end_date) calculation_results = [] for i in range(months_num): result = { "overhaul_cost": 0, "corrective_cost": 0, "procurement_cost": 0, "num_failures": 0, "day": i + 1, "procurement_details": {}, } ## Add risk Cost # risk cost = ((Down Time1 * MW Loss 1) + (Downtime2 * Mw 2) + .... (DowntimeN * MwN) ) * Harga listrik (Efficicency HL App) for eq in scope_calculation.equipment_results: if not eq.is_included: continue result["corrective_cost"] += float(eq.corrective_costs[i]) result["overhaul_cost"] += float(eq.overhaul_costs[i]) result["procurement_cost"] += float(eq.procurement_costs[i]) result["num_failures"] += int(eq.daily_failures[i]) if eq.procurement_details[i]: result["procurement_details"][eq.assetnum] = eq.procurement_details[i] calculation_results.append(CalculationResultsRead(**result)) # Check if calculation already exist return CalculationTimeConstrainsRead( id=scope_calculation.id, reference=scope_calculation.overhaul_session_id, scope=scope_overhaul.type, results=calculation_results, optimum_oh=scope_calculation.optimum_oh_day, equipment_results=scope_calculation.equipment_results, ) async def get_calculation_data_by_id( db_session: DbSession, calculation_id ) -> CalculationData: stmt = ( select(CalculationData) .filter(CalculationData.id == calculation_id) .options( joinedload(CalculationData.equipment_results), joinedload(CalculationData.parameter), ) ) result = await db_session.execute(stmt) return result.unique().scalar() async def get_calculation_by_assetnum( *, db_session: DbSession, assetnum: str, calculation_id: str ): stmt = ( select(CalculationEquipmentResult) .where(CalculationEquipmentResult.assetnum == assetnum) .where(CalculationEquipmentResult.calculation_data_id == calculation_id) ) result = await db_session.execute(stmt) return result.scalar() async def get_number_of_failures(location_tag, start_date, end_date, token, max_interval=24): url_prediction = ( f"http://192.168.1.82:8000/reliability/main/number-of-failures/" f"{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" ) results = {} try: response = requests.get( url_prediction, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {token}", }, timeout=10 ) response.raise_for_status() prediction_data = response.json() except (requests.RequestException, ValueError) as e: raise Exception(f"Failed to fetch or parse prediction data: {e}") if not prediction_data or "data" not in prediction_data or not isinstance(prediction_data["data"], list): raise Exception("Invalid or empty prediction data format.") last_data = prediction_data["data"][-1] last_data_date = datetime.datetime.strptime(last_data["date"], "%d %b %Y") results[datetime.date(last_data_date.year, last_data_date.month, last_data_date.day)] = round(last_data["num_fail"]) if last_data["num_fail"] is not None else 0 # Parse prediction data for item in prediction_data["data"]: try: date = datetime.datetime.strptime(item["date"], "%d %b %Y") last_day = calendar.monthrange(date.year, date.month)[1] value = item.get("num_fail", 0) if date.day == last_day: if date.month == start_date.month and date.year == start_date.year: results[date.date()] = 0 else: results[date.date()] = 0 if value <= 0 else int(value) except (KeyError, ValueError): continue # skip invalid items # Fill missing months with 0 current = start_date.replace(day=1) for _ in range(max_interval): last_day = calendar.monthrange(current.year, current.month)[1] last_day_date = datetime.date(current.year, current.month, last_day) if last_day_date not in results: results[last_day_date] = 0 # move to next month if current.month == 12: current = current.replace(year=current.year + 1, month=1) else: current = current.replace(month=current.month + 1) # Sort results by date results = dict(sorted(results.items())) return results async def get_equipment_foh( location_tag: str, token: str ): url_mdt = ( f"http://192.168.1.82:8000/reliability/asset/mdt/{location_tag}" ) try: response = requests.get( url_mdt, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {token}", }, timeout=10 ) response.raise_for_status() result = response.json() except (requests.RequestException, ValueError) as e: raise Exception(f"Failed to fetch or parse mdt data: {e}") mdt_data = result["data"]["hours"] return mdt_data # Function to simulate overhaul strategy for a single equipment def simulate_equipment_overhaul(equipment, preventive_cost,predicted_num_failures, interval_months, forced_outage_hours_value ,total_months=24): """ Simulates overhaul strategy for a specific piece of equipment and returns the associated costs. """ total_preventive_cost = 0 total_corrective_cost = 0 months_since_overhaul = 0 failures_by_month = {i: val for i, (date, val) in enumerate(sorted(predicted_num_failures.items()))} cost_per_failure = equipment.material_cost # Simulate for the total period for month in range(total_months): # If it's time for overhaul if months_since_overhaul >= interval_months: # Perform preventive overhaul total_preventive_cost += preventive_cost months_since_overhaul = 0 if months_since_overhaul == 0: # Calculate failures for this month based on time since last overhaul expected_failures = 0 equivalent_force_derated_hours = 0 failure_cost = (expected_failures * cost_per_failure) + ((forced_outage_hours_value + equivalent_force_derated_hours) * equipment.service_cost) total_corrective_cost += failure_cost else: # Calculate failures for this month based on time since last overhaul expected_failures = failures_by_month.get(months_since_overhaul, 0) equivalent_force_derated_hours = 0 failure_cost = (expected_failures * cost_per_failure) + ((forced_outage_hours_value + equivalent_force_derated_hours) * equipment.service_cost) total_corrective_cost += failure_cost # Increment time since overhaul months_since_overhaul += 1 # Calculate costs per month (to normalize for comparison) monthly_preventive_cost = total_preventive_cost / total_months monthly_corrective_cost = total_corrective_cost / total_months monthly_total_cost = monthly_preventive_cost + monthly_corrective_cost return { 'interval': interval_months, 'preventive_cost': monthly_preventive_cost, 'corrective_cost': monthly_corrective_cost, 'total_cost': monthly_total_cost } async def create_calculation_result_service( db_session: DbSession, calculation: CalculationData, token: str ) -> CalculationTimeConstrainsRead: # Get all equipment for this calculation session equipments = await get_all_by_session_id( db_session=db_session, overhaul_session_id=calculation.overhaul_session_id ) scope = await get_scope(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id) prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope) calculation_data = await get_calculation_data_by_id( db_session=db_session, calculation_id=calculation.id ) # Set the date range for the calculation if prev_oh_scope: # Start date is the day after the previous scope's end date start_date = datetime.datetime.combine(prev_oh_scope.end_date + datetime.timedelta(days=1), datetime.time.min) # End date is the start date of the current scope end_date = datetime.datetime.combine(scope.start_date, datetime.time.min) else: # If there's no previous scope, use the start and end dates from the current scope start_date = datetime.datetime.combine(scope.start_date, datetime.time.min) end_date = datetime.datetime.combine(scope.end_date, datetime.time.min) max_interval = get_months_between(start_date, end_date) overhaul_cost = calculation_data.parameter.overhaul_cost / len(equipments) # Store results for each equipment results = [] total_corrective_costs = np.zeros(max_interval) total_overhaul_costs = np.zeros(max_interval) total_daily_failures = np.zeros(max_interval) total_costs = np.zeros(max_interval) # Calculate for each equipment for eq in equipments: equipment_results = [] corrective_costs = [] overhaul_costs = [] total = [] predicted_num_failures = await get_number_of_failures( location_tag=eq.equipment.location_tag, start_date=start_date, end_date=end_date, token=token ) foh_value = await get_equipment_foh( location_tag=eq.equipment.location_tag, token=token ) for interval in range(1, max_interval+1): result = simulate_equipment_overhaul(eq, overhaul_cost, predicted_num_failures, interval, foh_value, total_months=max_interval) corrective_costs.append(result['corrective_cost']) overhaul_costs.append(result['preventive_cost']) total.append(result['total_cost']) equipment_results.append(result) optimal_result = min(equipment_results, key=lambda x: x['total_cost']) results.append( CalculationEquipmentResult( corrective_costs=corrective_costs, overhaul_costs=overhaul_costs, daily_failures=[failure for _, failure in predicted_num_failures.items()], assetnum=eq.assetnum, material_cost=eq.material_cost, service_cost=eq.service_cost, optimum_day=optimal_result['interval'], calculation_data_id=calculation.id, master_equipment=eq.equipment, ) ) if len(predicted_num_failures.values()) < max_interval: raise Exception(eq.equipment.assetnum) total_corrective_costs += np.array(corrective_costs) total_overhaul_costs += np.array(overhaul_costs) total_daily_failures += np.array([failure for _, failure in predicted_num_failures.items()]) total_costs += np.array(total_costs) db_session.add_all(results) total_costs_point = total_corrective_costs + total_overhaul_costs # Calculate optimum points using total costs optimum_oh_index = np.argmin(total_costs_point) numbers_of_failure = sum(total_daily_failures[:optimum_oh_index]) optimum = OptimumResult( overhaul_cost=float(total_overhaul_costs[optimum_oh_index]), corrective_cost=float(total_corrective_costs[optimum_oh_index]), num_failures=int(numbers_of_failure), days=int(optimum_oh_index + 1), ) calculation.optimum_oh_day = optimum.days await db_session.commit() # Return results including individual equipment data return CalculationTimeConstrainsRead( id=calculation.id, reference=calculation.overhaul_session_id, scope=scope.type, results=[], optimum_oh=optimum, equipment_results=results, ) async def get_calculation_by_reference_and_parameter( *, db_session: DbSession, calculation_reference_id, parameter_id ): stmt = select(CalculationData).filter( and_( CalculationData.reference_id == calculation_reference_id, CalculationData.parameter_id == parameter_id, ) ) result = await db_session.execute(stmt) return result.scalar() async def get_calculation_result_by_day( *, db_session: DbSession, calculation_id, simulation_day ): stmt = select(CalculationResult).filter( and_( CalculationResult.day == simulation_day, CalculationResult.calculation_data_id == calculation_id, ) ) result = await db_session.execute(stmt) return result.scalar() async def get_avg_cost_by_asset(*, db_session: DbSession, assetnum: str): stmt = select(func.avg(MasterWorkOrder.total_cost_max).label("average_cost")).where( MasterWorkOrder.assetnum == assetnum ) result = await db_session.execute(stmt) return result.scalar_one_or_none() async def bulk_update_equipment( *, db: DbSession, selected_equipments: List[CalculationSelectedEquipmentUpdate], calculation_data_id: UUID, ): # Create a dictionary mapping assetnum to is_included status case_mappings = {asset.assetnum: asset.is_included for asset in selected_equipments} # Get all assetnums that need to be updated assetnums = list(case_mappings.keys()) # Create a list of when clauses for the case statement when_clauses = [ (CalculationEquipmentResult.assetnum == assetnum, is_included) for assetnum, is_included in case_mappings.items() ] # Build the update statement stmt = ( update(CalculationEquipmentResult) .where(CalculationEquipmentResult.calculation_data_id == calculation_data_id) .where(CalculationEquipmentResult.assetnum.in_(assetnums)) .values( { "is_included": case( *when_clauses ) # Unpack the when clauses as separate arguments } ) ) await db.execute(stmt) await db.commit() return assetnums