change to month

main
Cizz22 11 months ago
parent 5d5a47e802
commit 5414060824

@ -77,7 +77,7 @@ class CalculationTimeConstrainsRead(CalculationTimeConstrainsBase):
reference: UUID
scope: str
results: List[CalculationResultsRead]
# equipment_results: List[EquipmentResult]
equipment_results: List[EquipmentResult]
optimum_oh: Any

@ -21,24 +21,26 @@ from .schema import (CalculationResultsRead,
CalculationTimeConstrainsParametersCreate,
CalculationTimeConstrainsRead, OptimumResult)
from .utils import get_months_between
def get_overhaul_cost_by_time_chart(
overhaul_cost: float, days: int, numEquipments: int, decay_base: float = 1.01
) -> np.ndarray:
if overhaul_cost < 0:
raise ValueError("Overhaul cost cannot be negative")
if days <= 0:
raise ValueError("Days must be positive")
hours = days * 24
# def get_overhaul_cost_by_time_chart(
# overhaul_cost: float, days: int, numEquipments: int, decay_base: float = 1.01
# ) -> np.ndarray:
# if overhaul_cost < 0:
# raise ValueError("Overhaul cost cannot be negative")
# if days <= 0:
# raise ValueError("Days must be positive")
rate = np.arange(1, hours + 1)
# hours = days * 24
cost_per_equipment = overhaul_cost / numEquipments
# rate = np.arange(1, hours + 1)
results = cost_per_equipment - ((cost_per_equipment / hours) * rate)
# cost_per_equipment = overhaul_cost / numEquipments
return results
# results = cost_per_equipment - ((cost_per_equipment / hours) * rate)
# return results
# def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int, numEquipments: int, decay_base: float = 1.1) -> np.ndarray:
@ -58,14 +60,88 @@ def get_overhaul_cost_by_time_chart(
# return results
# async def get_corrective_cost_time_chart(
# material_cost: float, service_cost: float, location_tag: str, token, max_days: int
# ) -> Tuple[np.ndarray, np.ndarray]:
# start_date = datetime.datetime(2025, 1, 1)
# end_date = start_date + datetime.timedelta(days=max_days)
# url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
# try:
# response = requests.get(
# url,
# headers={
# "Content-Type": "application/json",
# "Authorization": f"Bearer {token}",
# },
# )
# data = response.json()
# ## Get latest data fromdata_today
# # latest_num_of_fail:float = get_latest_numOfFail(location_tag=location_tag, token=token)
# latest_num = data["data"][-1]["num_fail"]
# if not latest_num:
# latest_num = 1
# # Create a complete date range for 2024
# start_date = datetime.datetime(2025, 1, 1)
# date_range = [start_date + datetime.timedelta(days=x) for x in range(max_days)]
# # Create a dictionary of existing data
# data_dict = {
# datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"]
# for item in data["data"]
# }
# # Fill in missing dates with nearest available value
# complete_data = []
# last_known_value = 0 # Default value if no data is available
# not_full_data = []
# for date in date_range:
# if date in data_dict:
# if data_dict[date] is not None:
# last_known_value = data_dict[date]
# complete_data.append(last_known_value)
# else:
# complete_data.append(0)
# # Convert to numpy array
# daily_failure = np.array(complete_data)
# hourly_failure = np.repeat(daily_failure, 24) / 24
# # failure_counts = np.cumsum(daily_failure)
# # Calculate corrective costs
# cost_per_failure = (material_cost + service_cost) / latest_num
# if cost_per_failure == 0:
# raise ValueError("Cost per failure cannot be zero")
# corrective_costs = hourly_failure * cost_per_failure
# return corrective_costs, hourly_failure
# except Exception as e:
# print(f"Error fetching or processing data: {str(e)}")
# raise
async def get_corrective_cost_time_chart(
material_cost: float, service_cost: float, location_tag: str, token, max_days: int
material_cost: float,
service_cost: float,
location_tag: str,
token,
start_date: datetime.datetime,
end_date: datetime.datetime
) -> Tuple[np.ndarray, np.ndarray]:
days_difference = (end_date - start_date).days
start_date = datetime.datetime(2025, 1, 1)
end_date = start_date + datetime.timedelta(days=max_days)
url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
try:
response = requests.get(
url,
@ -75,18 +151,14 @@ async def get_corrective_cost_time_chart(
},
)
data = response.json()
## Get latest data fromdata_today
# latest_num_of_fail:float = get_latest_numOfFail(location_tag=location_tag, token=token)
latest_num = data["data"][-1]["num_fail"]
if not latest_num:
latest_num = 1
# Create a complete date range for 2024
# Create a complete date range for 2025
start_date = datetime.datetime(2025, 1, 1)
date_range = [start_date + datetime.timedelta(days=x) for x in range(max_days)]
date_range = [start_date + datetime.timedelta(days=x) for x in range(days_difference)]
# Create a dictionary of existing data
data_dict = {
@ -94,39 +166,55 @@ async def get_corrective_cost_time_chart(
for item in data["data"]
}
# Fill in missing dates with nearest available value
complete_data = []
last_known_value = 0 # Default value if no data is available
not_full_data = []
# Group data by month
monthly_data = {}
for date in date_range:
if date in data_dict:
if data_dict[date] is not None:
last_known_value = data_dict[date]
complete_data.append(last_known_value)
else:
complete_data.append(0)
# Convert to numpy array
daily_failure = np.array(complete_data)
month_key = date.replace(day=1)
if month_key not in monthly_data:
monthly_data[month_key] = 0
if date in data_dict and data_dict[date] is not None:
monthly_data[month_key] += data_dict[date]
hourly_failure = np.repeat(daily_failure, 24) / 24
# failure_counts = np.cumsum(daily_failure)
# Convert monthly data to list
complete_data = []
for month in sorted(monthly_data.keys()):
complete_data.append(monthly_data[month])
# Convert to numpy array
monthly_failure = np.array(complete_data)
# Calculate corrective costs
cost_per_failure = (material_cost + service_cost) / latest_num
if cost_per_failure == 0:
raise ValueError("Cost per failure cannot be zero")
corrective_costs = hourly_failure * cost_per_failure
corrective_costs = monthly_failure * cost_per_failure
return corrective_costs, hourly_failure
return corrective_costs, monthly_failure
except Exception as e:
print(f"Error fetching or processing data: {str(e)}")
raise
def get_overhaul_cost_by_time_chart(
overhaul_cost: float, months_num: int, numEquipments: int, decay_base: float = 1.01
) -> np.ndarray:
if overhaul_cost < 0:
raise ValueError("Overhaul cost cannot be negative")
if months_num <= 0:
raise ValueError("months_num must be positive")
rate = np.arange(1, months_num + 1)
cost_per_equipment = overhaul_cost / numEquipments
# results = cost_per_equipment - ((cost_per_equipment / hours) * rate)
results = cost_per_equipment / rate
return results
# def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int, numEquipments: int) -> Tuple[np.ndarray, np.ndarray]:
# day_points = np.arange(0, days)
@ -181,7 +269,10 @@ async def create_param_and_data(
async def get_calculation_result(db_session: DbSession, calculation_id: str):
days = 667 * 24
start_date = datetime.datetime(2025, 1, 1)
end_date = start_date + datetime.timedelta(days=677)
months_num = get_months_between(start_date, end_date)
scope_calculation = await get_calculation_data_by_id(
db_session=db_session, calculation_id=calculation_id
)
@ -201,7 +292,7 @@ async def get_calculation_result(db_session: DbSession, calculation_id: str):
)
calculation_results = []
for i in range(days):
for i in range(months_num):
result = {
"overhaul_cost": 0,
"corrective_cost": 0,
@ -225,7 +316,7 @@ async def get_calculation_result(db_session: DbSession, calculation_id: str):
scope=scope_overhaul.type,
results=calculation_results,
optimum_oh=scope_calculation.optimum_oh_day,
# equipment_results=scope_calculation.equipment_results,
equipment_results=scope_calculation.equipment_results,
)
@ -320,7 +411,10 @@ async def get_calculation_by_assetnum(
async def create_calculation_result_service(
db_session: DbSession, calculation: CalculationData, token: str
) -> CalculationTimeConstrainsRead:
days = 667 # Changed to 365 days as per requirement
start_date = datetime.datetime(2025, 1, 1)
end_date = start_date + datetime.timedelta(days=677)
months_num = get_months_between(start_date, end_date)
# Get all equipment for this calculation session
equipments = await get_all_by_session_id(
@ -336,8 +430,8 @@ async def create_calculation_result_service(
# Store results for each equipment
equipment_results: List[CalculationEquipmentResult] = []
total_corrective_costs = np.zeros(days * 24)
total_daily_failures = np.zeros(days * 24)
total_corrective_costs = np.zeros(months_num)
total_daily_failures = np.zeros(months_num)
# Calculate for each equipment
for eq in equipments:
@ -346,12 +440,13 @@ async def create_calculation_result_service(
service_cost=eq.service_cost,
token=token,
location_tag=eq.equipment.location_tag,
max_days=667,
start_date=start_date,
end_date=end_date
)
overhaul_cost_points = get_overhaul_cost_by_time_chart(
calculation_data.parameter.overhaul_cost,
days=667,
months_num=months_num,
numEquipments=len(equipments),
)

@ -0,0 +1,9 @@
import datetime
def get_months_between(start_date: datetime.datetime, end_date: datetime.datetime) -> int:
"""
Calculate number of months between two dates.
"""
months = (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month)
# Add 1 to include both start and end months
return months + 1
Loading…
Cancel
Save