fix oh sim

feature/reliability_stat
Cizz22 8 months ago
parent 2f01245042
commit dcf2428c01

@ -1,6 +1,6 @@
ENV=development
LOG_LEVEL=ERROR
PORT=3020
PORT=3021
HOST=0.0.0.0
DATABASE_HOSTNAME=192.168.1.85

@ -1,6 +1,7 @@
import datetime
from typing import Coroutine, List, Optional, Tuple
from uuid import UUID
import calendar
import numpy as np
import requests
@ -59,93 +60,84 @@ from .utils import get_months_between
# results = np.where(np.isfinite(results), results, 0)
# return results
async def get_corrective_cost_time_chart(
material_cost: float,
service_cost: float,
location_tag: str,
token,
start_date: datetime.datetime,
end_date: datetime.datetime
) -> Tuple[np.ndarray, np.ndarray]:
days_difference = (end_date - start_date).days
url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
# async def get_corrective_cost_time_chart(
# material_cost: float,
# service_cost: float,
# location_tag: str,
# token,
# start_date: datetime.datetime,
# end_date: datetime.datetime
# ) -> Tuple[np.ndarray, np.ndarray]:
# days_difference = (end_date - start_date).days
# url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
try:
response = requests.get(
url,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
data = response.json()
latest_num = data["data"][-1]["num_fail"]
if not latest_num:
latest_num = 1
# try:
# response = requests.get(
# url,
# headers={
# "Content-Type": "application/json",
# "Authorization": f"Bearer {token}",
# },
# )
# data = response.json()
# latest_num = data["data"][-1]["num_fail"]
# Create a complete date range for 2025
# start_date = datetime.datetime(2025, 1, 1)
# date_range = [start_date + datetime.timedelta(days=x) for x in range(days_difference)]
# if not latest_num:
# latest_num = 1
# Create a dictionary of existing data
data_dict = {
datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"]
for item in data["data"]
}
# # Create a complete date range for 2025
# # start_date = datetime.datetime(2025, 1, 1)
# # date_range = [start_date + datetime.timedelta(days=x) for x in range(days_difference)]
# Initialize all months in the range with 0
monthly_data = {}
current_date = start_date.replace(day=1)
while current_date <= end_date:
monthly_data[current_date] = 0
# Move to next month
if current_date.month == 12:
current_date = datetime.datetime(current_date.year + 1, 1, 1)
else:
current_date = datetime.datetime(current_date.year, current_date.month + 1, 1)
# # Create a dictionary of existing data
# data_dict = {
# datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"]
# for item in data["data"]
# }
# Get the last day's value for each month
for date in data_dict.keys():
month_key = datetime.datetime(date.year, date.month, 1)
if month_key in monthly_data and data_dict[date] is not None:
# Update only if the value is higher (to get the last day's value)
monthly_data[month_key] = max(monthly_data[month_key], data_dict[date])
# # Initialize all months in the range with 0
# monthly_data = {}
# current_date = start_date.replace(day=1)
# while current_date <= end_date:
# monthly_data[current_date] = 0
# # Move to next month
# if current_date.month == 12:
# current_date = datetime.datetime(current_date.year + 1, 1, 1)
# else:
# current_date = datetime.datetime(current_date.year, current_date.month + 1, 1)
# Convert to list maintaining chronological order
complete_data = []
for month in sorted(monthly_data.keys()):
complete_data.append(monthly_data[month])
# # Get the last day's value for each month
# for date in data_dict.keys():
# month_key = datetime.datetime(date.year, date.month, 1)
# if month_key in monthly_data and data_dict[date] is not None:
# # Update only if the value is higher (to get the last day's value)
# monthly_data[month_key] = max(monthly_data[month_key], data_dict[date])
# Convert to numpy array
monthly_failure = np.array(complete_data)
# # Convert to list maintaining chronological order
# complete_data = []
# for month in sorted(monthly_data.keys()):
# complete_data.append(monthly_data[month])
# Calculate corrective costs
cost_per_failure = (material_cost + service_cost) / latest_num
if cost_per_failure == 0:
raise ValueError("Cost per failure cannot be zero")
# # Convert to numpy array
# monthly_failure = np.array(complete_data)
corrective_costs = monthly_failure * cost_per_failure
# # Calculate corrective costs
# cost_per_failure = (material_cost + service_cost) / latest_num
# if cost_per_failure == 0:
# raise ValueError("Cost per failure cannot be zero")
# corrective_costs = monthly_failure * cost_per_failure
return corrective_costs, monthly_failure
except Exception as e:
print(f"Error fetching or processing data: {str(e)}")
raise
# return corrective_costs, monthly_failure
# except Exception as e:
# print(f"Error fetching or processing data: {str(e)}")
# raise
# async def get_corrective_cost_time_chart(
# material_cost: float,
# service_cost: float,
# location_tag: str,
# token,
# start_date: datetime.datetime,
# end_date: datetime.datetime
# ) -> Tuple[np.ndarray, np.ndarray]:
async def get_corrective_cost_time_chart(
@ -169,6 +161,7 @@ async def get_corrective_cost_time_chart(
historical_start = start_date if start_date <= today else None
historical_end = min(today, end_date)
if historical_start and historical_start <= historical_end:
url_history = f"http://192.168.1.82:8000/reliability/main/failures/{location_tag}/{historical_start.strftime('%Y-%m-%d')}/{historical_end.strftime('%Y-%m-%d')}"
@ -198,6 +191,9 @@ async def get_corrective_cost_time_chart(
if item["num_fail"] is not None:
history_dict[month_key] += item["num_fail"]
# Sort months chronologically
sorted_months = sorted(history_dict.keys())
@ -218,11 +214,13 @@ async def get_corrective_cost_time_chart(
except Exception as e:
raise Exception(f"Error fetching historical data: {e}")
# Handle prediction data (any portion after today)
prediction_start = max(tomorrow, start_date)
if location_tag == '3TR-TF005':
raise Exception("tes",monthly_data)
if end_date >= start_date:
url_prediction = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
if end_date >= tomorrow:
url_prediction = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{prediction_start.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
try:
response = requests.get(
@ -234,33 +232,27 @@ async def get_corrective_cost_time_chart(
)
prediction_data = response.json()
# Process prediction data
# Process prediction data - but only use it for future dates
if prediction_data["data"]:
# Use the last prediction value for future months
latest_prediction = prediction_data["data"][-1]["num_fail"]
# Ensure the value is at least 1 and rounded
latest_num = max(1, round(latest_prediction)) if latest_prediction is not None else 1
# Create prediction dictionary
prediction_dict = {}
for item in prediction_data["data"]:
date = datetime.datetime.strptime(item["date"], "%d %b %Y")
month_key = datetime.datetime(date.year, date.month, 1)
prediction_dict[month_key] = round(item["num_fail"]) if item["num_fail"] is not None else latest_num
# Update monthly_data with prediction data
for key in prediction_dict:
monthly_data[key] = prediction_dict[key]
# Only apply prediction data for dates after today
if date > today:
month_key = datetime.datetime(date.year, date.month, 1)
monthly_data[month_key] = item["num_fail"] if item["num_fail"] is not None else 0
# Update latest_num with the last prediction if available
last_prediction = prediction_data["data"][-1]["num_fail"]
if last_prediction is not None:
latest_num = max(1, round(last_prediction))
except Exception as e:
print(f"Error fetching prediction data: {e}")
# If we can't get prediction data but the range is in the future,
# we need to at least populate with our best estimate
if not monthly_data:
# Create a default entry for the start month
start_month_key = datetime.datetime(start_date.year, start_date.month, 1)
monthly_data[start_month_key] = latest_num
# Fill in any missing months in the range
current_date = datetime.datetime(start_date.year, start_date.month, 1)
@ -298,170 +290,175 @@ async def get_corrective_cost_time_chart(
for month in sorted(monthly_data.keys()):
complete_data.append(monthly_data[month])
# Convert to numpy array
monthly_failure = np.array(complete_data)
cost_per_failure = (material_cost + service_cost) / latest_num
if cost_per_failure == 0:
raise ValueError("Cost per failure cannot be zero")
corrective_costs = monthly_failure * cost_per_failure
return corrective_costs, monthly_failure
days_difference = (end_date - start_date).days
today = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
tomorrow = today + datetime.timedelta(days=1)
url_prediction = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{tomorrow.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
url_history = f"http://192.168.1.82:8000/reliability/main/failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{today.strftime('%Y-%m-%d')}"
# Initialize monthly data dictionary
monthly_data = {}
# Get historical data (start_date to today)
if start_date <= today:
try:
response = requests.get(
url_history,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
history_data = response.json()
# Process historical data - accumulate failures by month
history_dict = {}
monthly_failures = {}
for item in history_data["data"]:
date = datetime.datetime.strptime(item["date"], "%d %b %Y")
month_key = datetime.datetime(date.year, date.month, 1)
# Initialize if first occurrence of this month
if month_key not in history_dict:
history_dict[month_key] = 0
# Accumulate failures for this month
if item["num_fail"] is not None:
history_dict[month_key] += item["num_fail"]
# Sort months chronologically
sorted_months = sorted(history_dict.keys())
failures = np.array([history_dict[month] for month in sorted_months])
cum_failure = np.cumsum(failures)
for month_key in sorted_months:
monthly_failures[month_key] = int(cum_failure[sorted_months.index(month_key)])
# Update monthly_data with cumulative historical data
monthly_data.update(monthly_failures)
except Exception as e:
# print(f"Error fetching historical data: {e}")
raise Exception(e)
latest_num = 1
# Get prediction data (today+1 to end_date)
if end_date > today:
try:
response = requests.get(
url_prediction,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
prediction_data = response.json()
# Use the last prediction value for future months
# Get the latest number from prediction data
latest_num = prediction_data["data"][-1]["num_fail"]
# Ensure the value is at least 1
if not latest_num or latest_num < 1:
latest_num = 1
else:
# Round the number to the nearest integer
latest_num = round(latest_num)
# Create prediction dictionary
prediction_dict = {}
for item in prediction_data["data"]:
date = datetime.datetime.strptime(item["date"], "%d %b %Y")
month_key = datetime.datetime(date.year, date.month, 1)
prediction_dict[month_key] = round(item["num_fail"])
# Update monthly_data with prediction data
for key in prediction_dict:
if key not in monthly_data: # Don't overwrite historical data
monthly_data[key] = prediction_dict[key]
except Exception as e:
print(f"Error fetching prediction data: {e}")
# Create a complete date range covering all months from start to end
current_date = datetime.datetime(start_date.year, start_date.month, 1)
while current_date <= end_date:
if current_date not in monthly_data:
# Initialize to check previous months
previous_month = current_date.replace(day=1) - datetime.timedelta(days=1)
# Now previous_month is the last day of the previous month
# Convert back to first day of previous month for consistency
previous_month = previous_month.replace(day=1)
# Keep going back until we find data or run out of months to check
month_diff = (current_date.year - start_date.year) * 12 + (current_date.month - start_date.month)
max_attempts = max(1, month_diff) # Ensure at least 1 attempt
attempts = 0
while previous_month not in monthly_data and attempts < max_attempts:
# Move to the previous month (last day of the month before)
previous_month = previous_month.replace(day=1) - datetime.timedelta(days=1)
# Convert to first day of month
previous_month = previous_month.replace(day=1)
attempts += 1
# Use the found value or default to 0 if no previous month with data exists
if previous_month in monthly_data:
monthly_data[current_date] = monthly_data[previous_month]
else:
monthly_data[current_date] = 0
# Move to next month
if current_date.month == 12:
current_date = datetime.datetime(current_date.year + 1, 1, 1)
else:
current_date = datetime.datetime(current_date.year, current_date.month + 1, 1)
# # Convert to list maintaining chronological order
complete_data = []
for month in sorted(monthly_data.keys()):
complete_data.append(monthly_data[month])
if latest_num < 1:
raise ValueError("Number of failures cannot be negative", latest_num)
# Convert to numpy array
monthly_failure = np.array(complete_data)
cost_per_failure = (material_cost + service_cost) / latest_num
if cost_per_failure == 0:
raise ValueError("Cost per failure cannot be zero")
# if location_tag == "3TR-TF005":
# raise Exception(cost_per_failure, latest_num)
corrective_costs = monthly_failure * cost_per_failure
raise Exception(monthly_data, location_tag)
try:
corrective_costs = monthly_failure * cost_per_failure
except Exception as e:
raise Exception(f"Error calculating corrective costs: {monthly_failure}", location_tag)
return corrective_costs, monthly_failure
# except Exception as e:
# print(f"Error fetching or processing data: {str(e)}")
# raise
# days_difference = (end_date - start_date).days
# today = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
# tomorrow = today + datetime.timedelta(days=1)
# url_prediction = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{tomorrow.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
# url_history = f"http://192.168.1.82:8000/reliability/main/failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{today.strftime('%Y-%m-%d')}"
# # Initialize monthly data dictionary
# monthly_data = {}
# # Get historical data (start_date to today)
# if start_date <= today:
# try:
# response = requests.get(
# url_history,
# headers={
# "Content-Type": "application/json",
# "Authorization": f"Bearer {token}",
# },
# )
# history_data = response.json()
# # Process historical data - accumulate failures by month
# history_dict = {}
# monthly_failures = {}
# for item in history_data["data"]:
# date = datetime.datetime.strptime(item["date"], "%d %b %Y")
# month_key = datetime.datetime(date.year, date.month, 1)
# # Initialize if first occurrence of this month
# if month_key not in history_dict:
# history_dict[month_key] = 0
# # Accumulate failures for this month
# if item["num_fail"] is not None:
# history_dict[month_key] += item["num_fail"]
# # Sort months chronologically
# sorted_months = sorted(history_dict.keys())
# failures = np.array([history_dict[month] for month in sorted_months])
# cum_failure = np.cumsum(failures)
# for month_key in sorted_months:
# monthly_failures[month_key] = int(cum_failure[sorted_months.index(month_key)])
# # Update monthly_data with cumulative historical data
# monthly_data.update(monthly_failures)
# except Exception as e:
# # print(f"Error fetching historical data: {e}")
# raise Exception(e)
# latest_num = 1
# # Get prediction data (today+1 to end_date)
# if end_date > today:
# try:
# response = requests.get(
# url_prediction,
# headers={
# "Content-Type": "application/json",
# "Authorization": f"Bearer {token}",
# },
# )
# prediction_data = response.json()
# # Use the last prediction value for future months
# # Get the latest number from prediction data
# latest_num = prediction_data["data"][-1]["num_fail"]
# # Ensure the value is at least 1
# if not latest_num or latest_num < 1:
# latest_num = 1
# else:
# # Round the number to the nearest integer
# latest_num = round(latest_num)
# # Create prediction dictionary
# prediction_dict = {}
# for item in prediction_data["data"]:
# date = datetime.datetime.strptime(item["date"], "%d %b %Y")
# month_key = datetime.datetime(date.year, date.month, 1)
# prediction_dict[month_key] = round(item["num_fail"])
# # Update monthly_data with prediction data
# for key in prediction_dict:
# if key not in monthly_data: # Don't overwrite historical data
# monthly_data[key] = prediction_dict[key]
# except Exception as e:
# print(f"Error fetching prediction data: {e}")
# # Create a complete date range covering all months from start to end
# current_date = datetime.datetime(start_date.year, start_date.month, 1)
# while current_date <= end_date:
# if current_date not in monthly_data:
# # Initialize to check previous months
# previous_month = current_date.replace(day=1) - datetime.timedelta(days=1)
# # Now previous_month is the last day of the previous month
# # Convert back to first day of previous month for consistency
# previous_month = previous_month.replace(day=1)
# # Keep going back until we find data or run out of months to check
# month_diff = (current_date.year - start_date.year) * 12 + (current_date.month - start_date.month)
# max_attempts = max(1, month_diff) # Ensure at least 1 attempt
# attempts = 0
# while previous_month not in monthly_data and attempts < max_attempts:
# # Move to the previous month (last day of the month before)
# previous_month = previous_month.replace(day=1) - datetime.timedelta(days=1)
# # Convert to first day of month
# previous_month = previous_month.replace(day=1)
# attempts += 1
# # Use the found value or default to 0 if no previous month with data exists
# if previous_month in monthly_data:
# monthly_data[current_date] = monthly_data[previous_month]
# else:
# monthly_data[current_date] = 0
# # Move to next month
# if current_date.month == 12:
# current_date = datetime.datetime(current_date.year + 1, 1, 1)
# else:
# current_date = datetime.datetime(current_date.year, current_date.month + 1, 1)
# # # Convert to list maintaining chronological order
# complete_data = []
# for month in sorted(monthly_data.keys()):
# complete_data.append(monthly_data[month])
# # Convert to numpy array
# monthly_failure = np.array(complete_data)
# cost_per_failure = (material_cost + service_cost) / latest_num
# if cost_per_failure == 0:
# raise ValueError("Cost per failure cannot be zero")
# # if location_tag == "3TR-TF005":
# # raise Exception(cost_per_failure, latest_num)
# corrective_costs = monthly_failure * cost_per_failure
# return corrective_costs, monthly_failure
# # except Exception as e:
# # print(f"Error fetching or processing data: {str(e)}")
# # raise
def get_overhaul_cost_by_time_chart(
overhaul_cost: float, months_num: int, numEquipments: int, decay_base: float = 1.01
@ -687,13 +684,135 @@ async def get_calculation_by_assetnum(
# )
async def get_number_of_failures(location_tag, start_date, end_date, token, max_interval=24):
url_prediction = (
f"http://192.168.1.82:8000/reliability/main/number-of-failures/"
f"{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
)
results = {}
try:
response = requests.get(
url_prediction,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
timeout=10
)
response.raise_for_status()
prediction_data = response.json()
except (requests.RequestException, ValueError) as e:
raise Exception(f"Failed to fetch or parse prediction data: {e}")
if not prediction_data or "data" not in prediction_data or not isinstance(prediction_data["data"], list):
raise Exception("Invalid or empty prediction data format.")
last_data = prediction_data["data"][-1]
last_data_date = datetime.datetime.strptime(last_data["date"], "%d %b %Y")
results[datetime.date(last_data_date.year, last_data_date.month, last_data_date.day)] = round(last_data["num_fail"]) if last_data["num_fail"] is not None else 0
# Parse prediction data
for item in prediction_data["data"]:
try:
date = datetime.datetime.strptime(item["date"], "%d %b %Y")
last_day = calendar.monthrange(date.year, date.month)[1]
if date.day == last_day:
results[date.date()] = round(item.get("num_fail", 0))
except (KeyError, ValueError):
continue # skip invalid items
# Fill missing months with 0
current = start_date.replace(day=1)
for _ in range(max_interval):
last_day = calendar.monthrange(current.year, current.month)[1]
last_day_date = datetime.date(current.year, current.month, last_day)
if last_day_date not in results:
results[last_day_date] = 0
# move to next month
if current.month == 12:
current = current.replace(year=current.year + 1, month=1)
else:
current = current.replace(month=current.month + 1)
# Sort results by date
results = dict(sorted(results.items()))
return results
# Function to simulate overhaul strategy for a single equipment
def simulate_equipment_overhaul(equipment, preventive_cost,predicted_num_failures, interval_months ,total_months=24):
"""
Simulates overhaul strategy for a specific piece of equipment
and returns the associated costs.
"""
total_preventive_cost = 0
total_corrective_cost = 0
months_since_overhaul = 0
failures_by_month = {i: val for i, (date, val) in enumerate(sorted(predicted_num_failures.items()))}
cost_per_failure = equipment.material_cost + equipment.service_cost
# Simulate for the total period
for month in range(total_months):
# If it's time for overhaul
if months_since_overhaul >= interval_months:
# Perform preventive overhaul
total_preventive_cost += preventive_cost
months_since_overhaul = 0
if months_since_overhaul == 0:
# Calculate failures for this month based on time since last overhaul
expected_failures = 0
failure_cost = expected_failures * cost_per_failure
total_corrective_cost += failure_cost
else:
# Calculate failures for this month based on time since last overhaul
expected_failures = failures_by_month.get(months_since_overhaul, 0)
failure_cost = expected_failures * cost_per_failure
total_corrective_cost += failure_cost
# Increment time since overhaul
months_since_overhaul += 1
# Calculate costs per month (to normalize for comparison)
monthly_preventive_cost = total_preventive_cost / total_months
monthly_corrective_cost = total_corrective_cost / total_months
monthly_total_cost = monthly_preventive_cost + monthly_corrective_cost
return {
'interval': interval_months,
'preventive_cost': monthly_preventive_cost,
'corrective_cost': monthly_corrective_cost,
'total_cost': monthly_total_cost
}
async def create_calculation_result_service(
db_session: DbSession, calculation: CalculationData, token: str
) -> CalculationTimeConstrainsRead:
# Get all equipment for this calculation session
equipments = await get_all_by_session_id(
db_session=db_session, overhaul_session_id=calculation.overhaul_session_id
)
scope = await get_scope(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id)
prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope)
calculation_data = await get_calculation_data_by_id(
db_session=db_session, calculation_id=calculation.id
)
# Set the date range for the calculation
if prev_oh_scope:
# Start date is the day after the previous scope's end date
@ -705,81 +824,117 @@ async def create_calculation_result_service(
start_date = datetime.datetime.combine(scope.start_date, datetime.time.min)
end_date = datetime.datetime.combine(scope.end_date, datetime.time.min)
months_num = get_months_between(start_date, end_date)
# Get all equipment for this calculation session
equipments = await get_all_by_session_id(
db_session=db_session, overhaul_session_id=calculation.overhaul_session_id
)
scope = await get_scope(
db_session=db_session, overhaul_session_id=calculation.overhaul_session_id
)
calculation_data = await get_calculation_data_by_id(
db_session=db_session, calculation_id=calculation.id
)
max_interval = get_months_between(start_date, end_date)
overhaul_cost = calculation_data.parameter.overhaul_cost / len(equipments)
# Store results for each equipment
equipment_results: List[CalculationEquipmentResult] = []
total_corrective_costs = np.zeros(months_num)
total_overhaul_cost = np.zeros(months_num)
total_daily_failures = np.zeros(months_num)
results = []
total_corrective_costs = np.zeros(max_interval)
total_overhaul_costs = np.zeros(max_interval)
total_daily_failures = np.zeros(max_interval)
total_costs = np.zeros(max_interval)
# Calculate for each equipment
for eq in equipments:
corrective_costs, daily_failures = await get_corrective_cost_time_chart(
material_cost=eq.material_cost,
service_cost=eq.service_cost,
token=token,
equipment_results = []
corrective_costs = []
overhaul_costs = []
total = []
predicted_num_failures = await get_number_of_failures(
location_tag=eq.equipment.location_tag,
start_date=start_date,
end_date=end_date
end_date=end_date,
token=token
)
overhaul_cost_points = get_overhaul_cost_by_time_chart(
calculation_data.parameter.overhaul_cost,
months_num=months_num,
numEquipments=len(equipments),
)
for interval in range(1, max_interval+1):
result = simulate_equipment_overhaul(eq, overhaul_cost, predicted_num_failures, interval,total_months=max_interval)
corrective_costs.append(result['corrective_cost'])
overhaul_costs.append(result['preventive_cost'])
total.append(result['total_cost'])
equipment_results.append(result)
optimal_result = min(equipment_results, key=lambda x: x['total_cost'])
results.append(
CalculationEquipmentResult(
corrective_costs=corrective_costs,
overhaul_costs=overhaul_costs,
daily_failures=[failure for _, failure in predicted_num_failures.items()],
assetnum=eq.assetnum,
material_cost=eq.material_cost,
service_cost=eq.service_cost,
optimum_day=optimal_result['interval'],
calculation_data_id=calculation.id,
master_equipment=eq.equipment,
)
)
if len(predicted_num_failures.values()) < max_interval:
raise Exception(eq.equipment.assetnum)
# Calculate individual equipment optimum points
equipment_total_cost = corrective_costs + overhaul_cost_points
equipment_optimum_index = np.argmin(equipment_total_cost)
equipment_failure_sum = sum(daily_failures[:equipment_optimum_index])
total_corrective_costs += np.array(corrective_costs)
total_overhaul_costs += np.array(overhaul_costs)
total_daily_failures += np.array([failure for _, failure in predicted_num_failures.items()])
total_costs += np.array(total_costs)
equipment_results.append(
CalculationEquipmentResult(
corrective_costs=corrective_costs.tolist(),
overhaul_costs=overhaul_cost_points.tolist(),
daily_failures=daily_failures.tolist(),
assetnum=eq.assetnum,
material_cost=eq.material_cost,
service_cost=eq.service_cost,
optimum_day=int(equipment_optimum_index + 1),
calculation_data_id=calculation.id,
master_equipment=eq.equipment,
)
)
# corrective_costs, daily_failures = await get_corrective_cost_time_chart(
# material_cost=eq.material_cost,
# service_cost=eq.service_cost,
# token=token,
# location_tag=eq.equipment.location_tag,
# start_date=start_date,
# end_date=end_date
# )
# Add to totals
total_corrective_costs += corrective_costs
total_overhaul_cost += overhaul_cost_points
total_daily_failures += daily_failures
# overhaul_cost_points = get_overhaul_cost_by_time_chart(
# calculation_data.parameter.overhaul_cost,
# months_num=months_num,
# numEquipments=len(equipments),
# )
db_session.add_all(equipment_results)
# Calculate optimum points using total costs
total_cost = total_corrective_costs + total_overhaul_cost
optimum_oh_index = np.argmin(total_cost)
# # Calculate individual equipment optimum points
# equipment_total_cost = corrective_costs + overhaul_cost_points
# equipment_optimum_index = np.argmin(equipment_total_cost)
# equipment_failure_sum = sum(daily_failures[:equipment_optimum_index])
# equipment_results.append(
# CalculationEquipmentResult(
# corrective_costs=corrective_costs.tolist(),
# overhaul_costs=overhaul_cost_points.tolist(),
# daily_failures=daily_failures.tolist(),
# assetnum=eq.assetnum,
# material_cost=eq.material_cost,
# service_cost=eq.service_cost,
# optimum_day=int(equipment_optimum_index + 1),
# calculation_data_id=calculation.id,
# master_equipment=eq.equipment,
# )
# )
# # Add to totals
# total_corrective_costs += corrective_costs
# total_overhaul_cost += overhaul_cost_points
# total_daily_failures += daily_failures
db_session.add_all(results)
total_costs_point = total_corrective_costs + total_overhaul_costs
# Calculate optimum points using total costs
optimum_oh_index = np.argmin(total_costs_point)
numbers_of_failure = sum(total_daily_failures[:optimum_oh_index])
optimum = OptimumResult(
overhaul_cost=float(overhaul_cost_points[optimum_oh_index]),
overhaul_cost=float(total_overhaul_costs[optimum_oh_index]),
corrective_cost=float(total_corrective_costs[optimum_oh_index]),
num_failures=int(numbers_of_failure),
days=int(optimum_oh_index + 1),
@ -810,7 +965,7 @@ async def create_calculation_result_service(
scope=scope.type,
results=[],
optimum_oh=optimum,
equipment_results=equipment_results,
equipment_results=results,
)

Loading…
Cancel
Save