feat: fix minor

feature/reliability_stat
Cizz22 11 months ago
parent aa432c6015
commit 0d18a030f7

@ -59,84 +59,93 @@ from .utils import get_months_between
# results = np.where(np.isfinite(results), results, 0) # results = np.where(np.isfinite(results), results, 0)
# return results # return results
async def get_corrective_cost_time_chart(
material_cost: float,
service_cost: float,
location_tag: str,
token,
start_date: datetime.datetime,
end_date: datetime.datetime
) -> Tuple[np.ndarray, np.ndarray]:
days_difference = (end_date - start_date).days
# async def get_corrective_cost_time_chart( url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
# material_cost: float, service_cost: float, location_tag: str, token, max_days: int
# ) -> Tuple[np.ndarray, np.ndarray]:
# start_date = datetime.datetime(2025, 1, 1)
# end_date = start_date + datetime.timedelta(days=max_days)
# url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
# try:
# response = requests.get(
# url,
# headers={
# "Content-Type": "application/json",
# "Authorization": f"Bearer {token}",
# },
# )
# data = response.json()
# ## Get latest data fromdata_today
# # latest_num_of_fail:float = get_latest_numOfFail(location_tag=location_tag, token=token)
# latest_num = data["data"][-1]["num_fail"]
# if not latest_num: try:
# latest_num = 1 response = requests.get(
url,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
data = response.json()
latest_num = data["data"][-1]["num_fail"]
# # Create a complete date range for 2024 if not latest_num:
# start_date = datetime.datetime(2025, 1, 1) latest_num = 1
# date_range = [start_date + datetime.timedelta(days=x) for x in range(max_days)]
# # Create a dictionary of existing data # Create a complete date range for 2025
# data_dict = { # start_date = datetime.datetime(2025, 1, 1)
# datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"] # date_range = [start_date + datetime.timedelta(days=x) for x in range(days_difference)]
# for item in data["data"]
# }
# # Fill in missing dates with nearest available value # Create a dictionary of existing data
# complete_data = [] data_dict = {
# last_known_value = 0 # Default value if no data is available datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"]
# not_full_data = [] for item in data["data"]
}
# for date in date_range: # Initialize all months in the range with 0
# if date in data_dict: monthly_data = {}
# if data_dict[date] is not None: current_date = start_date.replace(day=1)
# last_known_value = data_dict[date] while current_date <= end_date:
# complete_data.append(last_known_value) monthly_data[current_date] = 0
# else: # Move to next month
# complete_data.append(0) if current_date.month == 12:
# # Convert to numpy array current_date = datetime.datetime(current_date.year + 1, 1, 1)
# daily_failure = np.array(complete_data) else:
current_date = datetime.datetime(current_date.year, current_date.month + 1, 1)
# Get the last day's value for each month
for date in data_dict.keys():
month_key = datetime.datetime(date.year, date.month, 1)
if month_key in monthly_data and data_dict[date] is not None:
# Update only if the value is higher (to get the last day's value)
monthly_data[month_key] = max(monthly_data[month_key], data_dict[date])
# Convert to list maintaining chronological order
complete_data = []
for month in sorted(monthly_data.keys()):
complete_data.append(monthly_data[month])
# Convert to numpy array
monthly_failure = np.array(complete_data)
# hourly_failure = np.repeat(daily_failure, 24) / 24 # Calculate corrective costs
cost_per_failure = (material_cost + service_cost) / latest_num
if cost_per_failure == 0:
raise ValueError("Cost per failure cannot be zero")
# # failure_counts = np.cumsum(daily_failure) corrective_costs = monthly_failure * cost_per_failure
# # Calculate corrective costs
# cost_per_failure = (material_cost + service_cost) / latest_num
# if cost_per_failure == 0:
# raise ValueError("Cost per failure cannot be zero")
# corrective_costs = hourly_failure * cost_per_failure return corrective_costs, monthly_failure
# return corrective_costs, hourly_failure except Exception as e:
print(f"Error fetching or processing data: {str(e)}")
raise
# except Exception as e:
# print(f"Error fetching or processing data: {str(e)}")
# raise
async def get_corrective_cost_time_chart( # async def get_corrective_cost_time_chart(
material_cost: float, # material_cost: float,
service_cost: float, # service_cost: float,
location_tag: str, # location_tag: str,
token, # token,
start_date: datetime.datetime, # start_date: datetime.datetime,
end_date: datetime.datetime # end_date: datetime.datetime
) -> Tuple[np.ndarray, np.ndarray]: # ) -> Tuple[np.ndarray, np.ndarray]:
days_difference = (end_date - start_date).days days_difference = (end_date - start_date).days
today = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) today = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
@ -159,8 +168,11 @@ async def get_corrective_cost_time_chart(
) )
history_data = response.json() history_data = response.json()
# Process historical data - accumulate failures by month
# Process historical data - accumulate failures by month
history_dict = {} history_dict = {}
monthly_failures = {}
for item in history_data["data"]: for item in history_data["data"]:
date = datetime.datetime.strptime(item["date"], "%d %b %Y") date = datetime.datetime.strptime(item["date"], "%d %b %Y")
month_key = datetime.datetime(date.year, date.month, 1) month_key = datetime.datetime(date.year, date.month, 1)
@ -173,10 +185,22 @@ async def get_corrective_cost_time_chart(
if item["num_fail"] is not None: if item["num_fail"] is not None:
history_dict[month_key] += item["num_fail"] history_dict[month_key] += item["num_fail"]
# Update monthly_data with historical data # Sort months chronologically
sorted_months = sorted(monthly_failures.keys())
# Calculate cumulative failures
running_total = 0
for month in sorted_months:
running_total += monthly_failures[month]
history_dict[month] = running_total
# Update monthly_data with cumulative historical data
monthly_data.update(history_dict) monthly_data.update(history_dict)
except Exception as e: except Exception as e:
print(f"Error fetching historical data: {e}") # print(f"Error fetching historical data: {e}")
raise Exception(e)
latest_num = 1 latest_num = 1

Loading…
Cancel
Save