From 0d18a030f7d8df35dc2d57f5d01ea4355680fb63 Mon Sep 17 00:00:00 2001 From: Cizz22 Date: Wed, 26 Feb 2025 16:07:43 +0700 Subject: [PATCH] feat: fix minor --- src/calculation_time_constrains/service.py | 156 ++++++++++++--------- 1 file changed, 90 insertions(+), 66 deletions(-) diff --git a/src/calculation_time_constrains/service.py b/src/calculation_time_constrains/service.py index ff8ff0f..4aa8f3b 100644 --- a/src/calculation_time_constrains/service.py +++ b/src/calculation_time_constrains/service.py @@ -59,84 +59,93 @@ from .utils import get_months_between # results = np.where(np.isfinite(results), results, 0) # return results +async def get_corrective_cost_time_chart( + material_cost: float, + service_cost: float, + location_tag: str, + token, + start_date: datetime.datetime, + end_date: datetime.datetime +) -> Tuple[np.ndarray, np.ndarray]: + days_difference = (end_date - start_date).days -# async def get_corrective_cost_time_chart( -# material_cost: float, service_cost: float, location_tag: str, token, max_days: int -# ) -> Tuple[np.ndarray, np.ndarray]: - -# start_date = datetime.datetime(2025, 1, 1) -# end_date = start_date + datetime.timedelta(days=max_days) -# url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" - -# try: -# response = requests.get( -# url, -# headers={ -# "Content-Type": "application/json", -# "Authorization": f"Bearer {token}", -# }, -# ) -# data = response.json() - -# ## Get latest data fromdata_today -# # latest_num_of_fail:float = get_latest_numOfFail(location_tag=location_tag, token=token) + url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" -# latest_num = data["data"][-1]["num_fail"] -# if not latest_num: -# latest_num = 1 + try: + response = requests.get( + url, + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {token}", + }, + ) + data = response.json() + latest_num = data["data"][-1]["num_fail"] -# # Create a complete date range for 2024 -# start_date = datetime.datetime(2025, 1, 1) -# date_range = [start_date + datetime.timedelta(days=x) for x in range(max_days)] + if not latest_num: + latest_num = 1 -# # Create a dictionary of existing data -# data_dict = { -# datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"] -# for item in data["data"] -# } + # Create a complete date range for 2025 + # start_date = datetime.datetime(2025, 1, 1) + # date_range = [start_date + datetime.timedelta(days=x) for x in range(days_difference)] -# # Fill in missing dates with nearest available value -# complete_data = [] -# last_known_value = 0 # Default value if no data is available -# not_full_data = [] + # Create a dictionary of existing data + data_dict = { + datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"] + for item in data["data"] + } -# for date in date_range: -# if date in data_dict: -# if data_dict[date] is not None: -# last_known_value = data_dict[date] -# complete_data.append(last_known_value) -# else: -# complete_data.append(0) -# # Convert to numpy array -# daily_failure = np.array(complete_data) + # Initialize all months in the range with 0 + monthly_data = {} + current_date = start_date.replace(day=1) + while current_date <= end_date: + monthly_data[current_date] = 0 + # Move to next month + if current_date.month == 12: + current_date = datetime.datetime(current_date.year + 1, 1, 1) + else: + current_date = datetime.datetime(current_date.year, current_date.month + 1, 1) + + # Get the last day's value for each month + for date in data_dict.keys(): + month_key = datetime.datetime(date.year, date.month, 1) + if month_key in monthly_data and data_dict[date] is not None: + # Update only if the value is higher (to get the last day's value) + monthly_data[month_key] = max(monthly_data[month_key], data_dict[date]) + + # Convert to list maintaining chronological order + complete_data = [] + for month in sorted(monthly_data.keys()): + complete_data.append(monthly_data[month]) + + # Convert to numpy array + monthly_failure = np.array(complete_data) -# hourly_failure = np.repeat(daily_failure, 24) / 24 + # Calculate corrective costs + cost_per_failure = (material_cost + service_cost) / latest_num + if cost_per_failure == 0: + raise ValueError("Cost per failure cannot be zero") -# # failure_counts = np.cumsum(daily_failure) + corrective_costs = monthly_failure * cost_per_failure -# # Calculate corrective costs -# cost_per_failure = (material_cost + service_cost) / latest_num -# if cost_per_failure == 0: -# raise ValueError("Cost per failure cannot be zero") -# corrective_costs = hourly_failure * cost_per_failure + return corrective_costs, monthly_failure -# return corrective_costs, hourly_failure + except Exception as e: + print(f"Error fetching or processing data: {str(e)}") + raise -# except Exception as e: -# print(f"Error fetching or processing data: {str(e)}") -# raise -async def get_corrective_cost_time_chart( - material_cost: float, - service_cost: float, - location_tag: str, - token, - start_date: datetime.datetime, - end_date: datetime.datetime -) -> Tuple[np.ndarray, np.ndarray]: +# async def get_corrective_cost_time_chart( +# material_cost: float, +# service_cost: float, +# location_tag: str, +# token, +# start_date: datetime.datetime, +# end_date: datetime.datetime +# ) -> Tuple[np.ndarray, np.ndarray]: days_difference = (end_date - start_date).days today = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) @@ -159,8 +168,11 @@ async def get_corrective_cost_time_chart( ) history_data = response.json() - # Process historical data - accumulate failures by month + + # Process historical data - accumulate failures by month history_dict = {} + monthly_failures = {} + for item in history_data["data"]: date = datetime.datetime.strptime(item["date"], "%d %b %Y") month_key = datetime.datetime(date.year, date.month, 1) @@ -173,10 +185,22 @@ async def get_corrective_cost_time_chart( if item["num_fail"] is not None: history_dict[month_key] += item["num_fail"] - # Update monthly_data with historical data + # Sort months chronologically + sorted_months = sorted(monthly_failures.keys()) + + # Calculate cumulative failures + running_total = 0 + for month in sorted_months: + running_total += monthly_failures[month] + history_dict[month] = running_total + + + # Update monthly_data with cumulative historical data monthly_data.update(history_dict) except Exception as e: - print(f"Error fetching historical data: {e}") + # print(f"Error fetching historical data: {e}") + raise Exception(e) + latest_num = 1