diff --git a/src/calculation_time_constrains/schema.py b/src/calculation_time_constrains/schema.py index dd7f3b4..38addcf 100644 --- a/src/calculation_time_constrains/schema.py +++ b/src/calculation_time_constrains/schema.py @@ -77,7 +77,7 @@ class CalculationTimeConstrainsRead(CalculationTimeConstrainsBase): reference: UUID scope: str results: List[CalculationResultsRead] - # equipment_results: List[EquipmentResult] + equipment_results: List[EquipmentResult] optimum_oh: Any diff --git a/src/calculation_time_constrains/service.py b/src/calculation_time_constrains/service.py index 7b7fb1c..6f79d37 100644 --- a/src/calculation_time_constrains/service.py +++ b/src/calculation_time_constrains/service.py @@ -21,24 +21,26 @@ from .schema import (CalculationResultsRead, CalculationTimeConstrainsParametersCreate, CalculationTimeConstrainsRead, OptimumResult) +from .utils import get_months_between -def get_overhaul_cost_by_time_chart( - overhaul_cost: float, days: int, numEquipments: int, decay_base: float = 1.01 -) -> np.ndarray: - if overhaul_cost < 0: - raise ValueError("Overhaul cost cannot be negative") - if days <= 0: - raise ValueError("Days must be positive") - hours = days * 24 +# def get_overhaul_cost_by_time_chart( +# overhaul_cost: float, days: int, numEquipments: int, decay_base: float = 1.01 +# ) -> np.ndarray: +# if overhaul_cost < 0: +# raise ValueError("Overhaul cost cannot be negative") +# if days <= 0: +# raise ValueError("Days must be positive") - rate = np.arange(1, hours + 1) +# hours = days * 24 - cost_per_equipment = overhaul_cost / numEquipments +# rate = np.arange(1, hours + 1) - results = cost_per_equipment - ((cost_per_equipment / hours) * rate) +# cost_per_equipment = overhaul_cost / numEquipments - return results +# results = cost_per_equipment - ((cost_per_equipment / hours) * rate) + +# return results # def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int, numEquipments: int, decay_base: float = 1.1) -> np.ndarray: @@ -58,14 +60,88 @@ def get_overhaul_cost_by_time_chart( # return results +# async def get_corrective_cost_time_chart( +# material_cost: float, service_cost: float, location_tag: str, token, max_days: int +# ) -> Tuple[np.ndarray, np.ndarray]: + +# start_date = datetime.datetime(2025, 1, 1) +# end_date = start_date + datetime.timedelta(days=max_days) +# url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" + +# try: +# response = requests.get( +# url, +# headers={ +# "Content-Type": "application/json", +# "Authorization": f"Bearer {token}", +# }, +# ) +# data = response.json() + +# ## Get latest data fromdata_today +# # latest_num_of_fail:float = get_latest_numOfFail(location_tag=location_tag, token=token) + +# latest_num = data["data"][-1]["num_fail"] + +# if not latest_num: +# latest_num = 1 + +# # Create a complete date range for 2024 +# start_date = datetime.datetime(2025, 1, 1) +# date_range = [start_date + datetime.timedelta(days=x) for x in range(max_days)] + +# # Create a dictionary of existing data +# data_dict = { +# datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"] +# for item in data["data"] +# } + +# # Fill in missing dates with nearest available value +# complete_data = [] +# last_known_value = 0 # Default value if no data is available +# not_full_data = [] + +# for date in date_range: +# if date in data_dict: +# if data_dict[date] is not None: +# last_known_value = data_dict[date] +# complete_data.append(last_known_value) +# else: +# complete_data.append(0) +# # Convert to numpy array +# daily_failure = np.array(complete_data) + +# hourly_failure = np.repeat(daily_failure, 24) / 24 + +# # failure_counts = np.cumsum(daily_failure) + +# # Calculate corrective costs +# cost_per_failure = (material_cost + service_cost) / latest_num + +# if cost_per_failure == 0: +# raise ValueError("Cost per failure cannot be zero") + +# corrective_costs = hourly_failure * cost_per_failure + +# return corrective_costs, hourly_failure + +# except Exception as e: +# print(f"Error fetching or processing data: {str(e)}") +# raise + async def get_corrective_cost_time_chart( - material_cost: float, service_cost: float, location_tag: str, token, max_days: int + material_cost: float, + service_cost: float, + location_tag: str, + token, + start_date: datetime.datetime, + end_date: datetime.datetime ) -> Tuple[np.ndarray, np.ndarray]: + days_difference = (end_date - start_date).days - start_date = datetime.datetime(2025, 1, 1) - end_date = start_date + datetime.timedelta(days=max_days) url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" + try: response = requests.get( url, @@ -75,18 +151,14 @@ async def get_corrective_cost_time_chart( }, ) data = response.json() - - ## Get latest data fromdata_today - # latest_num_of_fail:float = get_latest_numOfFail(location_tag=location_tag, token=token) - latest_num = data["data"][-1]["num_fail"] if not latest_num: latest_num = 1 - # Create a complete date range for 2024 + # Create a complete date range for 2025 start_date = datetime.datetime(2025, 1, 1) - date_range = [start_date + datetime.timedelta(days=x) for x in range(max_days)] + date_range = [start_date + datetime.timedelta(days=x) for x in range(days_difference)] # Create a dictionary of existing data data_dict = { @@ -94,39 +166,55 @@ async def get_corrective_cost_time_chart( for item in data["data"] } - # Fill in missing dates with nearest available value - complete_data = [] - last_known_value = 0 # Default value if no data is available - not_full_data = [] - + # Group data by month + monthly_data = {} for date in date_range: - if date in data_dict: - if data_dict[date] is not None: - last_known_value = data_dict[date] - complete_data.append(last_known_value) - else: - complete_data.append(0) - # Convert to numpy array - daily_failure = np.array(complete_data) + month_key = date.replace(day=1) + if month_key not in monthly_data: + monthly_data[month_key] = 0 + + if date in data_dict and data_dict[date] is not None: + monthly_data[month_key] += data_dict[date] - hourly_failure = np.repeat(daily_failure, 24) / 24 - # failure_counts = np.cumsum(daily_failure) + # Convert monthly data to list + complete_data = [] + for month in sorted(monthly_data.keys()): + complete_data.append(monthly_data[month]) + + + # Convert to numpy array + monthly_failure = np.array(complete_data) # Calculate corrective costs cost_per_failure = (material_cost + service_cost) / latest_num - if cost_per_failure == 0: raise ValueError("Cost per failure cannot be zero") - corrective_costs = hourly_failure * cost_per_failure + corrective_costs = monthly_failure * cost_per_failure - return corrective_costs, hourly_failure + return corrective_costs, monthly_failure except Exception as e: print(f"Error fetching or processing data: {str(e)}") raise +def get_overhaul_cost_by_time_chart( + overhaul_cost: float, months_num: int, numEquipments: int, decay_base: float = 1.01 +) -> np.ndarray: + if overhaul_cost < 0: + raise ValueError("Overhaul cost cannot be negative") + if months_num <= 0: + raise ValueError("months_num must be positive") + + rate = np.arange(1, months_num + 1) + + cost_per_equipment = overhaul_cost / numEquipments + + # results = cost_per_equipment - ((cost_per_equipment / hours) * rate) + results = cost_per_equipment / rate + + return results # def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int, numEquipments: int) -> Tuple[np.ndarray, np.ndarray]: # day_points = np.arange(0, days) @@ -181,7 +269,10 @@ async def create_param_and_data( async def get_calculation_result(db_session: DbSession, calculation_id: str): - days = 667 * 24 + start_date = datetime.datetime(2025, 1, 1) + end_date = start_date + datetime.timedelta(days=677) + + months_num = get_months_between(start_date, end_date) scope_calculation = await get_calculation_data_by_id( db_session=db_session, calculation_id=calculation_id ) @@ -201,7 +292,7 @@ async def get_calculation_result(db_session: DbSession, calculation_id: str): ) calculation_results = [] - for i in range(days): + for i in range(months_num): result = { "overhaul_cost": 0, "corrective_cost": 0, @@ -225,7 +316,7 @@ async def get_calculation_result(db_session: DbSession, calculation_id: str): scope=scope_overhaul.type, results=calculation_results, optimum_oh=scope_calculation.optimum_oh_day, - # equipment_results=scope_calculation.equipment_results, + equipment_results=scope_calculation.equipment_results, ) @@ -320,7 +411,10 @@ async def get_calculation_by_assetnum( async def create_calculation_result_service( db_session: DbSession, calculation: CalculationData, token: str ) -> CalculationTimeConstrainsRead: - days = 667 # Changed to 365 days as per requirement + start_date = datetime.datetime(2025, 1, 1) + end_date = start_date + datetime.timedelta(days=677) + + months_num = get_months_between(start_date, end_date) # Get all equipment for this calculation session equipments = await get_all_by_session_id( @@ -336,8 +430,8 @@ async def create_calculation_result_service( # Store results for each equipment equipment_results: List[CalculationEquipmentResult] = [] - total_corrective_costs = np.zeros(days * 24) - total_daily_failures = np.zeros(days * 24) + total_corrective_costs = np.zeros(months_num) + total_daily_failures = np.zeros(months_num) # Calculate for each equipment for eq in equipments: @@ -346,12 +440,13 @@ async def create_calculation_result_service( service_cost=eq.service_cost, token=token, location_tag=eq.equipment.location_tag, - max_days=667, + start_date=start_date, + end_date=end_date ) overhaul_cost_points = get_overhaul_cost_by_time_chart( calculation_data.parameter.overhaul_cost, - days=667, + months_num=months_num, numEquipments=len(equipments), ) diff --git a/src/calculation_time_constrains/utils.py b/src/calculation_time_constrains/utils.py new file mode 100644 index 0000000..d7012b9 --- /dev/null +++ b/src/calculation_time_constrains/utils.py @@ -0,0 +1,9 @@ +import datetime + +def get_months_between(start_date: datetime.datetime, end_date: datetime.datetime) -> int: + """ + Calculate number of months between two dates. + """ + months = (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month) + # Add 1 to include both start and end months + return months + 1