minor add

main
Cizz22 11 months ago
parent 9a126d013c
commit 20d6267108

@ -1,5 +1,5 @@
import datetime import datetime
from typing import List, Optional, Tuple from typing import Coroutine, List, Optional, Tuple
from uuid import UUID from uuid import UUID
import numpy as np import numpy as np
@ -54,93 +54,93 @@ def get_overhaul_cost_by_time_chart(
# return results # return results
async def get_corrective_cost_time_chart( # async def get_corrective_cost_time_chart(
material_cost: float, service_cost: float, location_tag: str, token # material_cost: float, service_cost: float, location_tag: str, token
) -> Tuple[np.ndarray, np.ndarray]: # ) -> Tuple[np.ndarray, np.ndarray]:
""" # """
Fetch failure data from API and calculate corrective costs, ensuring 365 days of data. # Fetch failure data from API and calculate corrective costs, ensuring 365 days of data.
Args: # Args:
material_cost: Cost of materials per failure # material_cost: Cost of materials per failure
service_cost: Cost of service per failure # service_cost: Cost of service per failure
location_tag: Location tag of the equipment # location_tag: Location tag of the equipment
token: Authorization token # token: Authorization token
Returns: # Returns:
Tuple of (corrective_costs, daily_failure_rate) # Tuple of (corrective_costs, daily_failure_rate)
""" # """
url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/2024-01-01/2024-12-31" # url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/2024-01-01/2024-12-31"
try: # try:
response = requests.get( # response = requests.get(
url, # url,
headers={ # headers={
"Content-Type": "application/json", # "Content-Type": "application/json",
"Authorization": f"Bearer {token}", # "Authorization": f"Bearer {token}",
}, # },
) # )
data = response.json() # data = response.json()
# Create a complete date range for 2024 # # Create a complete date range for 2024
start_date = datetime.datetime(2024, 1, 1) # start_date = datetime.datetime(2024, 1, 1)
date_range = [start_date + datetime.timedelta(days=x) for x in range(365)] # date_range = [start_date + datetime.timedelta(days=x) for x in range(365)]
# Create a dictionary of existing data # # Create a dictionary of existing data
data_dict = { # data_dict = {
datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"] # datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"]
for item in data["data"] # for item in data["data"]
} # }
# Fill in missing dates with nearest available value # # Fill in missing dates with nearest available value
complete_data = [] # complete_data = []
last_known_value = 0 # Default value if no data is available # last_known_value = 0 # Default value if no data is available
for date in date_range: # for date in date_range:
if date in data_dict: # if date in data_dict:
if data_dict[date] is not None: # if data_dict[date] is not None:
last_known_value = data_dict[date] # last_known_value = data_dict[date]
complete_data.append(last_known_value) # complete_data.append(last_known_value)
else: # else:
complete_data.append(last_known_value) # complete_data.append(last_known_value)
# Convert to numpy array # # Convert to numpy array
daily_failure = np.array(complete_data) # daily_failure = np.array(complete_data)
# Calculate corrective costs # # Calculate corrective costs
cost_per_failure = material_cost + service_cost # cost_per_failure = material_cost + service_cost
corrective_costs = daily_failure * cost_per_failure # corrective_costs = daily_failure * cost_per_failure
return corrective_costs, daily_failure # return corrective_costs, daily_failure
except Exception as e: # except Exception as e:
print(f"Error fetching or processing data: {str(e)}") # print(f"Error fetching or processing data: {str(e)}")
raise # raise
# def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int, numEquipments: int) -> Tuple[np.ndarray, np.ndarray]: def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int, numEquipments: int) -> Tuple[np.ndarray, np.ndarray]:
# day_points = np.arange(0, days) day_points = np.arange(0, days)
# # Parameters for failure rate # Parameters for failure rate
# base_rate = 0.2 # Base failure rate per day base_rate = 0.04 # Base failure rate per day
# acceleration = 2.4 # How quickly failure rate increases acceleration = 0.7 # How quickly failure rate increases
# grace_period = 170 # Days before failures start increasing significantly grace_period = 49 # Days before failures start increasing significantly
# # Calculate daily failure rate using sigmoid function # Calculate daily failure rate using sigmoid function
# daily_failure_rate = base_rate / (1 + np.exp(-acceleration * (day_points - grace_period)/days)) daily_failure_rate = base_rate / (1 + np.exp(-acceleration * (day_points - grace_period)/days))
# # Introduce randomness in the failure rate # Introduce randomness in the failure rate
# random_noise = np.random.normal(0.0, 0.05, (numEquipments, days)) # Mean 0.0, Std Dev 0.05 random_noise = np.random.normal(0.0, 0.05, (numEquipments, days)) # Mean 0.0, Std Dev 0.05
# daily_failure_rate = daily_failure_rate + random_noise daily_failure_rate = daily_failure_rate + random_noise
# daily_failure_rate = np.clip(daily_failure_rate, 0, None) # Ensure failure rate is non-negative daily_failure_rate = np.clip(daily_failure_rate, 0, None) # Ensure failure rate is non-negative
# # Calculate cumulative failures # Calculate cumulative failures
# failure_counts = np.cumsum(daily_failure_rate) failure_counts = np.cumsum(daily_failure_rate)
# # Calculate corrective costs based on cumulative failures and combined costs # Calculate corrective costs based on cumulative failures and combined costs
# cost_per_failure = material_cost + service_cost cost_per_failure = material_cost + service_cost
# corrective_costs = failure_counts * cost_per_failure corrective_costs = failure_counts * cost_per_failure
# return corrective_costs, daily_failure_rate return corrective_costs, daily_failure_rate
async def create_param_and_data( async def create_param_and_data(
@ -317,11 +317,17 @@ async def create_calculation_result_service(
# Calculate for each equipment # Calculate for each equipment
for eq in equipments: for eq in equipments:
corrective_costs, daily_failures = await get_corrective_cost_time_chart( # corrective_costs, daily_failures = await get_corrective_cost_time_chart(
# material_cost=eq.material_cost,
# service_cost=eq.service_cost,
# token=token,
# location_tag=eq.equipment.location_tag,
# )
corrective_costs, daily_failures = get_corrective_cost_time_chart(
material_cost=eq.material_cost, material_cost=eq.material_cost,
service_cost=eq.service_cost, service_cost=eq.service_cost,
token=token, days=days,
location_tag=eq.equipment.location_tag, numEquipments=len(equipments),
) )
overhaul_cost_points = get_overhaul_cost_by_time_chart( overhaul_cost_points = get_overhaul_cost_by_time_chart(

@ -9,7 +9,7 @@ from src.models import StandardResponse
from .schema import (OverhaulJobBase, OverhaulJobCreate, OverhaulJobPagination, from .schema import (OverhaulJobBase, OverhaulJobCreate, OverhaulJobPagination,
OverhaulJobRead) OverhaulJobRead)
from .service import create, get_all from .service import create, get_all, delete
router = APIRouter() router = APIRouter()
@ -45,6 +45,15 @@ async def create_overhaul_equipment_jobs(
message="Data created successfully", message="Data created successfully",
) )
@router.delete("/{overhaul_job_id}", response_model=StandardResponse[None])
async def delete_overhaul_equipment_job(db_session: DbSession,overhaul_job_id):
await delete(db_session=db_session, overhaul_job_id=overhaul_job_id)
return StandardResponse(
data=None,
message="Data deleted successfully",
)
# @router.post("", response_model=StandardResponse[List[str]]) # @router.post("", response_model=StandardResponse[List[str]])
# async def create_scope(db_session: DbSession, scope_in: OverhaulJobCreate): # async def create_scope(db_session: DbSession, scope_in: OverhaulJobCreate):

@ -52,6 +52,12 @@ async def create(
return overhaul_job_in.job_ids return overhaul_job_in.job_ids
async def delete(*, db_session: DbSession, overhaul_job_id):
"""Deletes a document."""
activity = await db_session.get(OverhaulJob, overhaul_job_id)
await db_session.delete(activity)
await db_session.commit()
# async def update(*, db_session: DbSession, scope: OverhaulScope, scope_in: ScopeUpdate): # async def update(*, db_session: DbSession, scope: OverhaulScope, scope_in: ScopeUpdate):
# """Updates a document.""" # """Updates a document."""
# data = scope_in.model_dump() # data = scope_in.model_dump()

@ -42,5 +42,4 @@ async def create_scope_equipment_jobs(
@router.delete("/{assetnum}", response_model=StandardResponse[None]) @router.delete("/{assetnum}", response_model=StandardResponse[None])
async def delete_scope_equipment_job(db_session: DbSession, assetnum, scope_job_id): async def delete_scope_equipment_job(db_session: DbSession, assetnum, scope_job_id):
await delete(db_session=db_session, assetnum=assetnum, scope_job_id=scope_job_id) await delete(db_session=db_session, assetnum=assetnum, scope_job_id=scope_job_id)

Loading…
Cancel
Save