From 20d6267108f0768d84751216020f033bb01e94d9 Mon Sep 17 00:00:00 2001 From: Cizz22 Date: Wed, 12 Feb 2025 10:48:28 +0700 Subject: [PATCH] minor add --- src/calculation_time_constrains/service.py | 156 +++++++++++---------- src/overhaul_job/router.py | 11 +- src/overhaul_job/service.py | 6 + src/scope_equipment_job/router.py | 1 - 4 files changed, 97 insertions(+), 77 deletions(-) diff --git a/src/calculation_time_constrains/service.py b/src/calculation_time_constrains/service.py index 044de39..6719399 100644 --- a/src/calculation_time_constrains/service.py +++ b/src/calculation_time_constrains/service.py @@ -1,5 +1,5 @@ import datetime -from typing import List, Optional, Tuple +from typing import Coroutine, List, Optional, Tuple from uuid import UUID import numpy as np @@ -54,93 +54,93 @@ def get_overhaul_cost_by_time_chart( # return results -async def get_corrective_cost_time_chart( - material_cost: float, service_cost: float, location_tag: str, token -) -> Tuple[np.ndarray, np.ndarray]: - """ - Fetch failure data from API and calculate corrective costs, ensuring 365 days of data. - - Args: - material_cost: Cost of materials per failure - service_cost: Cost of service per failure - location_tag: Location tag of the equipment - token: Authorization token - - Returns: - Tuple of (corrective_costs, daily_failure_rate) - """ - url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/2024-01-01/2024-12-31" - - try: - response = requests.get( - url, - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {token}", - }, - ) - data = response.json() +# async def get_corrective_cost_time_chart( +# material_cost: float, service_cost: float, location_tag: str, token +# ) -> Tuple[np.ndarray, np.ndarray]: +# """ +# Fetch failure data from API and calculate corrective costs, ensuring 365 days of data. + +# Args: +# material_cost: Cost of materials per failure +# service_cost: Cost of service per failure +# location_tag: Location tag of the equipment +# token: Authorization token + +# Returns: +# Tuple of (corrective_costs, daily_failure_rate) +# """ +# url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/2024-01-01/2024-12-31" + +# try: +# response = requests.get( +# url, +# headers={ +# "Content-Type": "application/json", +# "Authorization": f"Bearer {token}", +# }, +# ) +# data = response.json() - # Create a complete date range for 2024 - start_date = datetime.datetime(2024, 1, 1) - date_range = [start_date + datetime.timedelta(days=x) for x in range(365)] +# # Create a complete date range for 2024 +# start_date = datetime.datetime(2024, 1, 1) +# date_range = [start_date + datetime.timedelta(days=x) for x in range(365)] - # Create a dictionary of existing data - data_dict = { - datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"] - for item in data["data"] - } +# # Create a dictionary of existing data +# data_dict = { +# datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"] +# for item in data["data"] +# } - # Fill in missing dates with nearest available value - complete_data = [] - last_known_value = 0 # Default value if no data is available +# # Fill in missing dates with nearest available value +# complete_data = [] +# last_known_value = 0 # Default value if no data is available - for date in date_range: - if date in data_dict: - if data_dict[date] is not None: - last_known_value = data_dict[date] - complete_data.append(last_known_value) - else: - complete_data.append(last_known_value) +# for date in date_range: +# if date in data_dict: +# if data_dict[date] is not None: +# last_known_value = data_dict[date] +# complete_data.append(last_known_value) +# else: +# complete_data.append(last_known_value) - # Convert to numpy array - daily_failure = np.array(complete_data) +# # Convert to numpy array +# daily_failure = np.array(complete_data) - # Calculate corrective costs - cost_per_failure = material_cost + service_cost - corrective_costs = daily_failure * cost_per_failure +# # Calculate corrective costs +# cost_per_failure = material_cost + service_cost +# corrective_costs = daily_failure * cost_per_failure - return corrective_costs, daily_failure +# return corrective_costs, daily_failure - except Exception as e: - print(f"Error fetching or processing data: {str(e)}") - raise +# except Exception as e: +# print(f"Error fetching or processing data: {str(e)}") +# raise -# def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int, numEquipments: int) -> Tuple[np.ndarray, np.ndarray]: -# day_points = np.arange(0, days) +def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int, numEquipments: int) -> Tuple[np.ndarray, np.ndarray]: + day_points = np.arange(0, days) -# # Parameters for failure rate -# base_rate = 0.2 # Base failure rate per day -# acceleration = 2.4 # How quickly failure rate increases -# grace_period = 170 # Days before failures start increasing significantly + # Parameters for failure rate + base_rate = 0.04 # Base failure rate per day + acceleration = 0.7 # How quickly failure rate increases + grace_period = 49 # Days before failures start increasing significantly -# # Calculate daily failure rate using sigmoid function -# daily_failure_rate = base_rate / (1 + np.exp(-acceleration * (day_points - grace_period)/days)) + # Calculate daily failure rate using sigmoid function + daily_failure_rate = base_rate / (1 + np.exp(-acceleration * (day_points - grace_period)/days)) -# # Introduce randomness in the failure rate -# random_noise = np.random.normal(0.0, 0.05, (numEquipments, days)) # Mean 0.0, Std Dev 0.05 -# daily_failure_rate = daily_failure_rate + random_noise -# daily_failure_rate = np.clip(daily_failure_rate, 0, None) # Ensure failure rate is non-negative + # Introduce randomness in the failure rate + random_noise = np.random.normal(0.0, 0.05, (numEquipments, days)) # Mean 0.0, Std Dev 0.05 + daily_failure_rate = daily_failure_rate + random_noise + daily_failure_rate = np.clip(daily_failure_rate, 0, None) # Ensure failure rate is non-negative -# # Calculate cumulative failures -# failure_counts = np.cumsum(daily_failure_rate) + # Calculate cumulative failures + failure_counts = np.cumsum(daily_failure_rate) -# # Calculate corrective costs based on cumulative failures and combined costs -# cost_per_failure = material_cost + service_cost -# corrective_costs = failure_counts * cost_per_failure + # Calculate corrective costs based on cumulative failures and combined costs + cost_per_failure = material_cost + service_cost + corrective_costs = failure_counts * cost_per_failure -# return corrective_costs, daily_failure_rate + return corrective_costs, daily_failure_rate async def create_param_and_data( @@ -317,11 +317,17 @@ async def create_calculation_result_service( # Calculate for each equipment for eq in equipments: - corrective_costs, daily_failures = await get_corrective_cost_time_chart( + # corrective_costs, daily_failures = await get_corrective_cost_time_chart( + # material_cost=eq.material_cost, + # service_cost=eq.service_cost, + # token=token, + # location_tag=eq.equipment.location_tag, + # ) + corrective_costs, daily_failures = get_corrective_cost_time_chart( material_cost=eq.material_cost, service_cost=eq.service_cost, - token=token, - location_tag=eq.equipment.location_tag, + days=days, + numEquipments=len(equipments), ) overhaul_cost_points = get_overhaul_cost_by_time_chart( diff --git a/src/overhaul_job/router.py b/src/overhaul_job/router.py index 5d9b222..e1066ad 100644 --- a/src/overhaul_job/router.py +++ b/src/overhaul_job/router.py @@ -9,7 +9,7 @@ from src.models import StandardResponse from .schema import (OverhaulJobBase, OverhaulJobCreate, OverhaulJobPagination, OverhaulJobRead) -from .service import create, get_all +from .service import create, get_all, delete router = APIRouter() @@ -45,6 +45,15 @@ async def create_overhaul_equipment_jobs( message="Data created successfully", ) +@router.delete("/{overhaul_job_id}", response_model=StandardResponse[None]) +async def delete_overhaul_equipment_job(db_session: DbSession,overhaul_job_id): + await delete(db_session=db_session, overhaul_job_id=overhaul_job_id) + + return StandardResponse( + data=None, + message="Data deleted successfully", + ) + # @router.post("", response_model=StandardResponse[List[str]]) # async def create_scope(db_session: DbSession, scope_in: OverhaulJobCreate): diff --git a/src/overhaul_job/service.py b/src/overhaul_job/service.py index 744a194..1f29456 100644 --- a/src/overhaul_job/service.py +++ b/src/overhaul_job/service.py @@ -52,6 +52,12 @@ async def create( return overhaul_job_in.job_ids +async def delete(*, db_session: DbSession, overhaul_job_id): + """Deletes a document.""" + activity = await db_session.get(OverhaulJob, overhaul_job_id) + await db_session.delete(activity) + await db_session.commit() + # async def update(*, db_session: DbSession, scope: OverhaulScope, scope_in: ScopeUpdate): # """Updates a document.""" # data = scope_in.model_dump() diff --git a/src/scope_equipment_job/router.py b/src/scope_equipment_job/router.py index f523987..f9d23dc 100644 --- a/src/scope_equipment_job/router.py +++ b/src/scope_equipment_job/router.py @@ -42,5 +42,4 @@ async def create_scope_equipment_jobs( @router.delete("/{assetnum}", response_model=StandardResponse[None]) async def delete_scope_equipment_job(db_session: DbSession, assetnum, scope_job_id): - await delete(db_session=db_session, assetnum=assetnum, scope_job_id=scope_job_id)