From dcf2428c01b1883104ede5fe2480b18f6a98ab86 Mon Sep 17 00:00:00 2001 From: Cizz22 Date: Tue, 20 May 2025 10:57:10 +0700 Subject: [PATCH] fix oh sim --- .env | 2 +- src/calculation_time_constrains/service.py | 761 +++++++++++++-------- 2 files changed, 459 insertions(+), 304 deletions(-) diff --git a/.env b/.env index f8dd73c..08afb8b 100644 --- a/.env +++ b/.env @@ -1,6 +1,6 @@ ENV=development LOG_LEVEL=ERROR -PORT=3020 +PORT=3021 HOST=0.0.0.0 DATABASE_HOSTNAME=192.168.1.85 diff --git a/src/calculation_time_constrains/service.py b/src/calculation_time_constrains/service.py index baa75d0..545ef83 100644 --- a/src/calculation_time_constrains/service.py +++ b/src/calculation_time_constrains/service.py @@ -1,6 +1,7 @@ import datetime from typing import Coroutine, List, Optional, Tuple from uuid import UUID +import calendar import numpy as np import requests @@ -59,93 +60,84 @@ from .utils import get_months_between # results = np.where(np.isfinite(results), results, 0) # return results -async def get_corrective_cost_time_chart( - material_cost: float, - service_cost: float, - location_tag: str, - token, - start_date: datetime.datetime, - end_date: datetime.datetime -) -> Tuple[np.ndarray, np.ndarray]: - days_difference = (end_date - start_date).days - - url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" +# async def get_corrective_cost_time_chart( +# material_cost: float, +# service_cost: float, +# location_tag: str, +# token, +# start_date: datetime.datetime, +# end_date: datetime.datetime +# ) -> Tuple[np.ndarray, np.ndarray]: +# days_difference = (end_date - start_date).days +# url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" - try: - response = requests.get( - url, - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {token}", - }, - ) - data = response.json() - latest_num = data["data"][-1]["num_fail"] - if not latest_num: - latest_num = 1 +# try: +# response = requests.get( +# url, +# headers={ +# "Content-Type": "application/json", +# "Authorization": f"Bearer {token}", +# }, +# ) +# data = response.json() +# latest_num = data["data"][-1]["num_fail"] - # Create a complete date range for 2025 - # start_date = datetime.datetime(2025, 1, 1) - # date_range = [start_date + datetime.timedelta(days=x) for x in range(days_difference)] +# if not latest_num: +# latest_num = 1 - # Create a dictionary of existing data - data_dict = { - datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"] - for item in data["data"] - } +# # Create a complete date range for 2025 +# # start_date = datetime.datetime(2025, 1, 1) +# # date_range = [start_date + datetime.timedelta(days=x) for x in range(days_difference)] - # Initialize all months in the range with 0 - monthly_data = {} - current_date = start_date.replace(day=1) - while current_date <= end_date: - monthly_data[current_date] = 0 - # Move to next month - if current_date.month == 12: - current_date = datetime.datetime(current_date.year + 1, 1, 1) - else: - current_date = datetime.datetime(current_date.year, current_date.month + 1, 1) +# # Create a dictionary of existing data +# data_dict = { +# datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"] +# for item in data["data"] +# } - # Get the last day's value for each month - for date in data_dict.keys(): - month_key = datetime.datetime(date.year, date.month, 1) - if month_key in monthly_data and data_dict[date] is not None: - # Update only if the value is higher (to get the last day's value) - monthly_data[month_key] = max(monthly_data[month_key], data_dict[date]) +# # Initialize all months in the range with 0 +# monthly_data = {} +# current_date = start_date.replace(day=1) +# while current_date <= end_date: +# monthly_data[current_date] = 0 +# # Move to next month +# if current_date.month == 12: +# current_date = datetime.datetime(current_date.year + 1, 1, 1) +# else: +# current_date = datetime.datetime(current_date.year, current_date.month + 1, 1) - # Convert to list maintaining chronological order - complete_data = [] - for month in sorted(monthly_data.keys()): - complete_data.append(monthly_data[month]) +# # Get the last day's value for each month +# for date in data_dict.keys(): +# month_key = datetime.datetime(date.year, date.month, 1) +# if month_key in monthly_data and data_dict[date] is not None: +# # Update only if the value is higher (to get the last day's value) +# monthly_data[month_key] = max(monthly_data[month_key], data_dict[date]) - # Convert to numpy array - monthly_failure = np.array(complete_data) +# # Convert to list maintaining chronological order +# complete_data = [] +# for month in sorted(monthly_data.keys()): +# complete_data.append(monthly_data[month]) - # Calculate corrective costs - cost_per_failure = (material_cost + service_cost) / latest_num - if cost_per_failure == 0: - raise ValueError("Cost per failure cannot be zero") +# # Convert to numpy array +# monthly_failure = np.array(complete_data) - corrective_costs = monthly_failure * cost_per_failure +# # Calculate corrective costs +# cost_per_failure = (material_cost + service_cost) / latest_num +# if cost_per_failure == 0: +# raise ValueError("Cost per failure cannot be zero") +# corrective_costs = monthly_failure * cost_per_failure - return corrective_costs, monthly_failure - except Exception as e: - print(f"Error fetching or processing data: {str(e)}") - raise +# return corrective_costs, monthly_failure +# except Exception as e: +# print(f"Error fetching or processing data: {str(e)}") +# raise -# async def get_corrective_cost_time_chart( -# material_cost: float, -# service_cost: float, -# location_tag: str, -# token, -# start_date: datetime.datetime, -# end_date: datetime.datetime -# ) -> Tuple[np.ndarray, np.ndarray]: async def get_corrective_cost_time_chart( @@ -169,6 +161,7 @@ async def get_corrective_cost_time_chart( historical_start = start_date if start_date <= today else None historical_end = min(today, end_date) + if historical_start and historical_start <= historical_end: url_history = f"http://192.168.1.82:8000/reliability/main/failures/{location_tag}/{historical_start.strftime('%Y-%m-%d')}/{historical_end.strftime('%Y-%m-%d')}" @@ -198,6 +191,9 @@ async def get_corrective_cost_time_chart( if item["num_fail"] is not None: history_dict[month_key] += item["num_fail"] + + + # Sort months chronologically sorted_months = sorted(history_dict.keys()) @@ -218,11 +214,13 @@ async def get_corrective_cost_time_chart( except Exception as e: raise Exception(f"Error fetching historical data: {e}") - # Handle prediction data (any portion after today) - prediction_start = max(tomorrow, start_date) + if location_tag == '3TR-TF005': + raise Exception("tes",monthly_data) + + + if end_date >= start_date: + url_prediction = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" - if end_date >= tomorrow: - url_prediction = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{prediction_start.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" try: response = requests.get( @@ -234,33 +232,27 @@ async def get_corrective_cost_time_chart( ) prediction_data = response.json() - # Process prediction data + # Process prediction data - but only use it for future dates if prediction_data["data"]: - # Use the last prediction value for future months - latest_prediction = prediction_data["data"][-1]["num_fail"] - - # Ensure the value is at least 1 and rounded - latest_num = max(1, round(latest_prediction)) if latest_prediction is not None else 1 - - # Create prediction dictionary - prediction_dict = {} for item in prediction_data["data"]: date = datetime.datetime.strptime(item["date"], "%d %b %Y") - month_key = datetime.datetime(date.year, date.month, 1) - prediction_dict[month_key] = round(item["num_fail"]) if item["num_fail"] is not None else latest_num - # Update monthly_data with prediction data - for key in prediction_dict: - monthly_data[key] = prediction_dict[key] + # Only apply prediction data for dates after today + if date > today: + month_key = datetime.datetime(date.year, date.month, 1) + + monthly_data[month_key] = item["num_fail"] if item["num_fail"] is not None else 0 + + + # Update latest_num with the last prediction if available + last_prediction = prediction_data["data"][-1]["num_fail"] + if last_prediction is not None: + latest_num = max(1, round(last_prediction)) except Exception as e: print(f"Error fetching prediction data: {e}") - # If we can't get prediction data but the range is in the future, - # we need to at least populate with our best estimate - if not monthly_data: - # Create a default entry for the start month - start_month_key = datetime.datetime(start_date.year, start_date.month, 1) - monthly_data[start_month_key] = latest_num + + # Fill in any missing months in the range current_date = datetime.datetime(start_date.year, start_date.month, 1) @@ -298,170 +290,175 @@ async def get_corrective_cost_time_chart( for month in sorted(monthly_data.keys()): complete_data.append(monthly_data[month]) - # Convert to numpy array - monthly_failure = np.array(complete_data) - cost_per_failure = (material_cost + service_cost) / latest_num - - if cost_per_failure == 0: - raise ValueError("Cost per failure cannot be zero") - - corrective_costs = monthly_failure * cost_per_failure - - return corrective_costs, monthly_failure - - days_difference = (end_date - start_date).days - - today = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) - tomorrow = today + datetime.timedelta(days=1) - url_prediction = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{tomorrow.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" - url_history = f"http://192.168.1.82:8000/reliability/main/failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{today.strftime('%Y-%m-%d')}" - - # Initialize monthly data dictionary - monthly_data = {} - - # Get historical data (start_date to today) - if start_date <= today: - try: - response = requests.get( - url_history, - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {token}", - }, - ) - history_data = response.json() - - - # Process historical data - accumulate failures by month - history_dict = {} - monthly_failures = {} - - for item in history_data["data"]: - date = datetime.datetime.strptime(item["date"], "%d %b %Y") - month_key = datetime.datetime(date.year, date.month, 1) - - # Initialize if first occurrence of this month - if month_key not in history_dict: - history_dict[month_key] = 0 - - # Accumulate failures for this month - if item["num_fail"] is not None: - history_dict[month_key] += item["num_fail"] - - # Sort months chronologically - sorted_months = sorted(history_dict.keys()) - - failures = np.array([history_dict[month] for month in sorted_months]) - cum_failure = np.cumsum(failures) - - for month_key in sorted_months: - monthly_failures[month_key] = int(cum_failure[sorted_months.index(month_key)]) - - - # Update monthly_data with cumulative historical data - monthly_data.update(monthly_failures) - except Exception as e: - # print(f"Error fetching historical data: {e}") - raise Exception(e) - - - latest_num = 1 - - # Get prediction data (today+1 to end_date) - if end_date > today: - try: - response = requests.get( - url_prediction, - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {token}", - }, - ) - prediction_data = response.json() - - # Use the last prediction value for future months - # Get the latest number from prediction data - latest_num = prediction_data["data"][-1]["num_fail"] - - # Ensure the value is at least 1 - if not latest_num or latest_num < 1: - latest_num = 1 - else: - # Round the number to the nearest integer - latest_num = round(latest_num) - - # Create prediction dictionary - prediction_dict = {} - for item in prediction_data["data"]: - date = datetime.datetime.strptime(item["date"], "%d %b %Y") - month_key = datetime.datetime(date.year, date.month, 1) - prediction_dict[month_key] = round(item["num_fail"]) - - # Update monthly_data with prediction data - for key in prediction_dict: - if key not in monthly_data: # Don't overwrite historical data - monthly_data[key] = prediction_dict[key] - except Exception as e: - print(f"Error fetching prediction data: {e}") - - # Create a complete date range covering all months from start to end - current_date = datetime.datetime(start_date.year, start_date.month, 1) - while current_date <= end_date: - if current_date not in monthly_data: - # Initialize to check previous months - previous_month = current_date.replace(day=1) - datetime.timedelta(days=1) - # Now previous_month is the last day of the previous month - # Convert back to first day of previous month for consistency - previous_month = previous_month.replace(day=1) - - # Keep going back until we find data or run out of months to check - month_diff = (current_date.year - start_date.year) * 12 + (current_date.month - start_date.month) - max_attempts = max(1, month_diff) # Ensure at least 1 attempt - attempts = 0 - - while previous_month not in monthly_data and attempts < max_attempts: - # Move to the previous month (last day of the month before) - previous_month = previous_month.replace(day=1) - datetime.timedelta(days=1) - # Convert to first day of month - previous_month = previous_month.replace(day=1) - attempts += 1 - - # Use the found value or default to 0 if no previous month with data exists - if previous_month in monthly_data: - monthly_data[current_date] = monthly_data[previous_month] - else: - monthly_data[current_date] = 0 - - # Move to next month - if current_date.month == 12: - current_date = datetime.datetime(current_date.year + 1, 1, 1) - else: - current_date = datetime.datetime(current_date.year, current_date.month + 1, 1) - - - # # Convert to list maintaining chronological order - complete_data = [] - for month in sorted(monthly_data.keys()): - complete_data.append(monthly_data[month]) - + if latest_num < 1: + raise ValueError("Number of failures cannot be negative", latest_num) # Convert to numpy array monthly_failure = np.array(complete_data) cost_per_failure = (material_cost + service_cost) / latest_num - if cost_per_failure == 0: - raise ValueError("Cost per failure cannot be zero") - - # if location_tag == "3TR-TF005": - # raise Exception(cost_per_failure, latest_num) - corrective_costs = monthly_failure * cost_per_failure + raise Exception(monthly_data, location_tag) + try: + corrective_costs = monthly_failure * cost_per_failure + except Exception as e: + raise Exception(f"Error calculating corrective costs: {monthly_failure}", location_tag) return corrective_costs, monthly_failure - # except Exception as e: - # print(f"Error fetching or processing data: {str(e)}") - # raise + # days_difference = (end_date - start_date).days + + # today = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + # tomorrow = today + datetime.timedelta(days=1) + # url_prediction = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{tomorrow.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" + # url_history = f"http://192.168.1.82:8000/reliability/main/failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{today.strftime('%Y-%m-%d')}" + + # # Initialize monthly data dictionary + # monthly_data = {} + + # # Get historical data (start_date to today) + # if start_date <= today: + # try: + # response = requests.get( + # url_history, + # headers={ + # "Content-Type": "application/json", + # "Authorization": f"Bearer {token}", + # }, + # ) + # history_data = response.json() + + + # # Process historical data - accumulate failures by month + # history_dict = {} + # monthly_failures = {} + + # for item in history_data["data"]: + # date = datetime.datetime.strptime(item["date"], "%d %b %Y") + # month_key = datetime.datetime(date.year, date.month, 1) + + # # Initialize if first occurrence of this month + # if month_key not in history_dict: + # history_dict[month_key] = 0 + + # # Accumulate failures for this month + # if item["num_fail"] is not None: + # history_dict[month_key] += item["num_fail"] + + # # Sort months chronologically + # sorted_months = sorted(history_dict.keys()) + + # failures = np.array([history_dict[month] for month in sorted_months]) + # cum_failure = np.cumsum(failures) + + # for month_key in sorted_months: + # monthly_failures[month_key] = int(cum_failure[sorted_months.index(month_key)]) + + + # # Update monthly_data with cumulative historical data + # monthly_data.update(monthly_failures) + # except Exception as e: + # # print(f"Error fetching historical data: {e}") + # raise Exception(e) + + + # latest_num = 1 + + # # Get prediction data (today+1 to end_date) + # if end_date > today: + # try: + # response = requests.get( + # url_prediction, + # headers={ + # "Content-Type": "application/json", + # "Authorization": f"Bearer {token}", + # }, + # ) + # prediction_data = response.json() + + # # Use the last prediction value for future months + # # Get the latest number from prediction data + # latest_num = prediction_data["data"][-1]["num_fail"] + + # # Ensure the value is at least 1 + # if not latest_num or latest_num < 1: + # latest_num = 1 + # else: + # # Round the number to the nearest integer + # latest_num = round(latest_num) + + # # Create prediction dictionary + # prediction_dict = {} + # for item in prediction_data["data"]: + # date = datetime.datetime.strptime(item["date"], "%d %b %Y") + # month_key = datetime.datetime(date.year, date.month, 1) + # prediction_dict[month_key] = round(item["num_fail"]) + + # # Update monthly_data with prediction data + # for key in prediction_dict: + # if key not in monthly_data: # Don't overwrite historical data + # monthly_data[key] = prediction_dict[key] + # except Exception as e: + # print(f"Error fetching prediction data: {e}") + + # # Create a complete date range covering all months from start to end + # current_date = datetime.datetime(start_date.year, start_date.month, 1) + # while current_date <= end_date: + # if current_date not in monthly_data: + # # Initialize to check previous months + # previous_month = current_date.replace(day=1) - datetime.timedelta(days=1) + # # Now previous_month is the last day of the previous month + # # Convert back to first day of previous month for consistency + # previous_month = previous_month.replace(day=1) + + # # Keep going back until we find data or run out of months to check + # month_diff = (current_date.year - start_date.year) * 12 + (current_date.month - start_date.month) + # max_attempts = max(1, month_diff) # Ensure at least 1 attempt + # attempts = 0 + + # while previous_month not in monthly_data and attempts < max_attempts: + # # Move to the previous month (last day of the month before) + # previous_month = previous_month.replace(day=1) - datetime.timedelta(days=1) + # # Convert to first day of month + # previous_month = previous_month.replace(day=1) + # attempts += 1 + + # # Use the found value or default to 0 if no previous month with data exists + # if previous_month in monthly_data: + # monthly_data[current_date] = monthly_data[previous_month] + # else: + # monthly_data[current_date] = 0 + + # # Move to next month + # if current_date.month == 12: + # current_date = datetime.datetime(current_date.year + 1, 1, 1) + # else: + # current_date = datetime.datetime(current_date.year, current_date.month + 1, 1) + + + # # # Convert to list maintaining chronological order + # complete_data = [] + # for month in sorted(monthly_data.keys()): + # complete_data.append(monthly_data[month]) + + + # # Convert to numpy array + # monthly_failure = np.array(complete_data) + # cost_per_failure = (material_cost + service_cost) / latest_num + # if cost_per_failure == 0: + # raise ValueError("Cost per failure cannot be zero") + + # # if location_tag == "3TR-TF005": + # # raise Exception(cost_per_failure, latest_num) + + # corrective_costs = monthly_failure * cost_per_failure + + + # return corrective_costs, monthly_failure + + # # except Exception as e: + # # print(f"Error fetching or processing data: {str(e)}") + # # raise def get_overhaul_cost_by_time_chart( overhaul_cost: float, months_num: int, numEquipments: int, decay_base: float = 1.01 @@ -687,13 +684,135 @@ async def get_calculation_by_assetnum( # ) +async def get_number_of_failures(location_tag, start_date, end_date, token, max_interval=24): + url_prediction = ( + f"http://192.168.1.82:8000/reliability/main/number-of-failures/" + f"{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" + ) + + results = {} + + try: + response = requests.get( + url_prediction, + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {token}", + }, + timeout=10 + ) + response.raise_for_status() + prediction_data = response.json() + except (requests.RequestException, ValueError) as e: + raise Exception(f"Failed to fetch or parse prediction data: {e}") + + if not prediction_data or "data" not in prediction_data or not isinstance(prediction_data["data"], list): + raise Exception("Invalid or empty prediction data format.") + + last_data = prediction_data["data"][-1] + last_data_date = datetime.datetime.strptime(last_data["date"], "%d %b %Y") + results[datetime.date(last_data_date.year, last_data_date.month, last_data_date.day)] = round(last_data["num_fail"]) if last_data["num_fail"] is not None else 0 + + + # Parse prediction data + for item in prediction_data["data"]: + try: + date = datetime.datetime.strptime(item["date"], "%d %b %Y") + last_day = calendar.monthrange(date.year, date.month)[1] + if date.day == last_day: + results[date.date()] = round(item.get("num_fail", 0)) + + except (KeyError, ValueError): + continue # skip invalid items + + # Fill missing months with 0 + current = start_date.replace(day=1) + for _ in range(max_interval): + last_day = calendar.monthrange(current.year, current.month)[1] + last_day_date = datetime.date(current.year, current.month, last_day) + if last_day_date not in results: + results[last_day_date] = 0 + # move to next month + if current.month == 12: + current = current.replace(year=current.year + 1, month=1) + else: + current = current.replace(month=current.month + 1) + + # Sort results by date + results = dict(sorted(results.items())) + + + return results + +# Function to simulate overhaul strategy for a single equipment +def simulate_equipment_overhaul(equipment, preventive_cost,predicted_num_failures, interval_months ,total_months=24): + """ + Simulates overhaul strategy for a specific piece of equipment + and returns the associated costs. + """ + total_preventive_cost = 0 + total_corrective_cost = 0 + months_since_overhaul = 0 + + failures_by_month = {i: val for i, (date, val) in enumerate(sorted(predicted_num_failures.items()))} + + cost_per_failure = equipment.material_cost + equipment.service_cost + + # Simulate for the total period + for month in range(total_months): + # If it's time for overhaul + if months_since_overhaul >= interval_months: + # Perform preventive overhaul + total_preventive_cost += preventive_cost + months_since_overhaul = 0 + + if months_since_overhaul == 0: + # Calculate failures for this month based on time since last overhaul + expected_failures = 0 + failure_cost = expected_failures * cost_per_failure + total_corrective_cost += failure_cost + + else: + # Calculate failures for this month based on time since last overhaul + expected_failures = failures_by_month.get(months_since_overhaul, 0) + failure_cost = expected_failures * cost_per_failure + total_corrective_cost += failure_cost + + # Increment time since overhaul + months_since_overhaul += 1 + + # Calculate costs per month (to normalize for comparison) + monthly_preventive_cost = total_preventive_cost / total_months + monthly_corrective_cost = total_corrective_cost / total_months + monthly_total_cost = monthly_preventive_cost + monthly_corrective_cost + + return { + 'interval': interval_months, + 'preventive_cost': monthly_preventive_cost, + 'corrective_cost': monthly_corrective_cost, + 'total_cost': monthly_total_cost + } + + + async def create_calculation_result_service( db_session: DbSession, calculation: CalculationData, token: str ) -> CalculationTimeConstrainsRead: + # Get all equipment for this calculation session + equipments = await get_all_by_session_id( + db_session=db_session, overhaul_session_id=calculation.overhaul_session_id + ) + scope = await get_scope(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id) + prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope) + + calculation_data = await get_calculation_data_by_id( + db_session=db_session, calculation_id=calculation.id + ) + # Set the date range for the calculation if prev_oh_scope: # Start date is the day after the previous scope's end date @@ -705,81 +824,117 @@ async def create_calculation_result_service( start_date = datetime.datetime.combine(scope.start_date, datetime.time.min) end_date = datetime.datetime.combine(scope.end_date, datetime.time.min) - months_num = get_months_between(start_date, end_date) - - # Get all equipment for this calculation session - equipments = await get_all_by_session_id( - db_session=db_session, overhaul_session_id=calculation.overhaul_session_id - ) - scope = await get_scope( - db_session=db_session, overhaul_session_id=calculation.overhaul_session_id - ) - - calculation_data = await get_calculation_data_by_id( - db_session=db_session, calculation_id=calculation.id - ) + max_interval = get_months_between(start_date, end_date) + overhaul_cost = calculation_data.parameter.overhaul_cost / len(equipments) # Store results for each equipment - equipment_results: List[CalculationEquipmentResult] = [] - total_corrective_costs = np.zeros(months_num) - total_overhaul_cost = np.zeros(months_num) - total_daily_failures = np.zeros(months_num) + results = [] + + total_corrective_costs = np.zeros(max_interval) + total_overhaul_costs = np.zeros(max_interval) + total_daily_failures = np.zeros(max_interval) + total_costs = np.zeros(max_interval) # Calculate for each equipment for eq in equipments: - corrective_costs, daily_failures = await get_corrective_cost_time_chart( - material_cost=eq.material_cost, - service_cost=eq.service_cost, - token=token, + equipment_results = [] + corrective_costs = [] + overhaul_costs = [] + total = [] + + predicted_num_failures = await get_number_of_failures( location_tag=eq.equipment.location_tag, start_date=start_date, - end_date=end_date + end_date=end_date, + token=token ) - overhaul_cost_points = get_overhaul_cost_by_time_chart( - calculation_data.parameter.overhaul_cost, - months_num=months_num, - numEquipments=len(equipments), - ) + for interval in range(1, max_interval+1): + result = simulate_equipment_overhaul(eq, overhaul_cost, predicted_num_failures, interval,total_months=max_interval) + corrective_costs.append(result['corrective_cost']) + overhaul_costs.append(result['preventive_cost']) + total.append(result['total_cost']) + equipment_results.append(result) + + optimal_result = min(equipment_results, key=lambda x: x['total_cost']) + + results.append( + CalculationEquipmentResult( + corrective_costs=corrective_costs, + overhaul_costs=overhaul_costs, + daily_failures=[failure for _, failure in predicted_num_failures.items()], + assetnum=eq.assetnum, + material_cost=eq.material_cost, + service_cost=eq.service_cost, + optimum_day=optimal_result['interval'], + calculation_data_id=calculation.id, + master_equipment=eq.equipment, + ) + ) + if len(predicted_num_failures.values()) < max_interval: + raise Exception(eq.equipment.assetnum) - # Calculate individual equipment optimum points - equipment_total_cost = corrective_costs + overhaul_cost_points - equipment_optimum_index = np.argmin(equipment_total_cost) - equipment_failure_sum = sum(daily_failures[:equipment_optimum_index]) + total_corrective_costs += np.array(corrective_costs) + total_overhaul_costs += np.array(overhaul_costs) + total_daily_failures += np.array([failure for _, failure in predicted_num_failures.items()]) + total_costs += np.array(total_costs) - equipment_results.append( - CalculationEquipmentResult( - corrective_costs=corrective_costs.tolist(), - overhaul_costs=overhaul_cost_points.tolist(), - daily_failures=daily_failures.tolist(), - assetnum=eq.assetnum, - material_cost=eq.material_cost, - service_cost=eq.service_cost, - optimum_day=int(equipment_optimum_index + 1), - calculation_data_id=calculation.id, - master_equipment=eq.equipment, - ) - ) + # corrective_costs, daily_failures = await get_corrective_cost_time_chart( + # material_cost=eq.material_cost, + # service_cost=eq.service_cost, + # token=token, + # location_tag=eq.equipment.location_tag, + # start_date=start_date, + # end_date=end_date + # ) - # Add to totals - total_corrective_costs += corrective_costs - total_overhaul_cost += overhaul_cost_points - total_daily_failures += daily_failures + # overhaul_cost_points = get_overhaul_cost_by_time_chart( + # calculation_data.parameter.overhaul_cost, + # months_num=months_num, + # numEquipments=len(equipments), + # ) - db_session.add_all(equipment_results) - # Calculate optimum points using total costs - total_cost = total_corrective_costs + total_overhaul_cost - optimum_oh_index = np.argmin(total_cost) + # # Calculate individual equipment optimum points + # equipment_total_cost = corrective_costs + overhaul_cost_points + # equipment_optimum_index = np.argmin(equipment_total_cost) + # equipment_failure_sum = sum(daily_failures[:equipment_optimum_index]) + + + # equipment_results.append( + # CalculationEquipmentResult( + # corrective_costs=corrective_costs.tolist(), + # overhaul_costs=overhaul_cost_points.tolist(), + # daily_failures=daily_failures.tolist(), + # assetnum=eq.assetnum, + # material_cost=eq.material_cost, + # service_cost=eq.service_cost, + # optimum_day=int(equipment_optimum_index + 1), + # calculation_data_id=calculation.id, + # master_equipment=eq.equipment, + # ) + # ) + + # # Add to totals + # total_corrective_costs += corrective_costs + # total_overhaul_cost += overhaul_cost_points + # total_daily_failures += daily_failures + + db_session.add_all(results) + + total_costs_point = total_corrective_costs + total_overhaul_costs + + # Calculate optimum points using total costs + optimum_oh_index = np.argmin(total_costs_point) numbers_of_failure = sum(total_daily_failures[:optimum_oh_index]) optimum = OptimumResult( - overhaul_cost=float(overhaul_cost_points[optimum_oh_index]), + overhaul_cost=float(total_overhaul_costs[optimum_oh_index]), corrective_cost=float(total_corrective_costs[optimum_oh_index]), num_failures=int(numbers_of_failure), days=int(optimum_oh_index + 1), @@ -810,7 +965,7 @@ async def create_calculation_result_service( scope=scope.type, results=[], optimum_oh=optimum, - equipment_results=equipment_results, + equipment_results=results, )