From cb979385cf3c8bb53b737906d50817e79e5a56b3 Mon Sep 17 00:00:00 2001 From: Cizz22 Date: Tue, 11 Feb 2025 10:23:14 +0700 Subject: [PATCH] add api to calculation --- src/auth/service.py | 8 +++ src/calculation_time_constrains/flows.py | 5 +- src/calculation_time_constrains/router.py | 6 +- src/calculation_time_constrains/service.py | 79 +++++++++++++++++----- 4 files changed, 76 insertions(+), 22 deletions(-) diff --git a/src/auth/service.py b/src/auth/service.py index 75f582b..f4da601 100644 --- a/src/auth/service.py +++ b/src/auth/service.py @@ -51,5 +51,13 @@ class JWTBearer(HTTPBearer): async def get_current_user(request: Request) -> UserBase: return request.state.user +async def get_token(request: Request): + token = request.headers.get("Authorization") + + if token: + return token.split(" ")[1] + + return "" CurrentUser = Annotated[UserBase, Depends(get_current_user)] +Token = Annotated[str, Depends(get_token)] diff --git a/src/calculation_time_constrains/flows.py b/src/calculation_time_constrains/flows.py index 7f97fcc..3f10735 100644 --- a/src/calculation_time_constrains/flows.py +++ b/src/calculation_time_constrains/flows.py @@ -10,6 +10,7 @@ from sqlalchemy import select from sqlalchemy.orm import joinedload from src.database.core import DbSession +from src.auth.service import Token from src.overhaul_scope.service import get_all from src.scope_equipment.model import ScopeEquipment from src.scope_equipment.service import get_by_assetnum @@ -77,12 +78,12 @@ async def get_create_calculation_parameters(*, db_session: DbSession, calculatio ) -async def create_calculation(*, db_session: DbSession, calculation_time_constrains_in: CalculationTimeConstrainsParametersCreate, created_by: str): +async def create_calculation(*,token:str, db_session: DbSession, calculation_time_constrains_in: CalculationTimeConstrainsParametersCreate, created_by: str): calculation_data = await create_param_and_data( db_session=db_session, calculation_param_in=calculation_time_constrains_in, created_by=created_by) - results = await create_calculation_result_service(db_session=db_session, calculation=calculation_data) + results = await create_calculation_result_service(db_session=db_session, calculation=calculation_data, token=token) return results diff --git a/src/calculation_time_constrains/router.py b/src/calculation_time_constrains/router.py index 198fa6b..c800d03 100644 --- a/src/calculation_time_constrains/router.py +++ b/src/calculation_time_constrains/router.py @@ -6,7 +6,7 @@ from typing import Union from fastapi import APIRouter from fastapi.params import Query -from src.auth.service import CurrentUser +from src.auth.service import CurrentUser, Token from src.database.core import DbSession from src.models import StandardResponse @@ -28,13 +28,13 @@ router = APIRouter() @router.post("", response_model=StandardResponse[Union[str, CalculationTimeConstrainsRead]]) -async def create_calculation_time_constrains(db_session: DbSession, current_user: CurrentUser, calculation_time_constrains_in: CalculationTimeConstrainsParametersCreate, scope_calculation_id: Optional[str] = Query(None), with_results: Optional[int] = Query(0)): +async def create_calculation_time_constrains(token:Token ,db_session: DbSession, current_user: CurrentUser, calculation_time_constrains_in: CalculationTimeConstrainsParametersCreate, scope_calculation_id: Optional[str] = Query(None), with_results: Optional[int] = Query(0)): """Save calculation time constrains Here""" if scope_calculation_id: results = await get_or_create_scope_equipment_calculation(db_session=db_session, scope_calculation_id=scope_calculation_id, calculation_time_constrains_in=calculation_time_constrains_in) else: - results = await create_calculation(db_session=db_session, calculation_time_constrains_in=calculation_time_constrains_in, created_by=current_user.name) + results = await create_calculation(token=token ,db_session=db_session, calculation_time_constrains_in=calculation_time_constrains_in, created_by=current_user.name) if not with_results: diff --git a/src/calculation_time_constrains/service.py b/src/calculation_time_constrains/service.py index fccebd8..f7ede25 100644 --- a/src/calculation_time_constrains/service.py +++ b/src/calculation_time_constrains/service.py @@ -27,6 +27,9 @@ from .schema import CalculationTimeConstrainsRead from .schema import OptimumResult from .schema import CalculationSelectedEquipmentUpdate +import requests +import datetime + def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int,numEquipments:int ,decay_base: float = 1.01) -> np.ndarray: if overhaul_cost < 0: @@ -58,25 +61,65 @@ def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int,numEquipment # results = np.where(np.isfinite(results), results, 0) # return results -def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int) -> Tuple[np.ndarray, np.ndarray]: - day_points = np.arange(0, days) +async def get_corrective_cost_time_chart(material_cost: float, service_cost: float, location_tag: str, token) -> Tuple[np.ndarray, np.ndarray]: + """ + Fetch failure data from API and calculate corrective costs, ensuring 365 days of data. + + Args: + material_cost: Cost of materials per failure + service_cost: Cost of service per failure + location_tag: Location tag of the equipment + token: Authorization token + + Returns: + Tuple of (corrective_costs, daily_failure_rate) + """ + url = f'http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/2024-01-01/2024-12-31' + + try: + response = requests.get( + url, + headers={ + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {token}' + }, + ) + data = response.json() + + # Create a complete date range for 2024 + start_date = datetime.datetime(2024, 1, 1) + date_range = [start_date + datetime.timedelta(days=x) for x in range(365)] + + # Create a dictionary of existing data + data_dict = { + datetime.datetime.strptime(item['date'], '%d %b %Y'): item['num_fail'] + for item in data['data'] + } + + # Fill in missing dates with nearest available value + complete_data = [] + last_known_value = 0 # Default value if no data is available - # Parameters for failure rate - base_rate = 0.2 # Base failure rate per day - acceleration = 2.4 # How quickly failure rate increases - grace_period = 170 # Days before failures start increasing significantly + for date in date_range: + if date in data_dict: + if data_dict[date] is not None: + last_known_value = data_dict[date] + complete_data.append(last_known_value) + else: + complete_data.append(last_known_value) - # Calculate daily failure rate using sigmoid function - daily_failure_rate = base_rate / (1 + np.exp(-acceleration * (day_points - grace_period)/days)) + # Convert to numpy array + daily_failure = np.array(complete_data) - # Calculate cumulative failures - failure_counts = np.cumsum(daily_failure_rate) + # Calculate corrective costs + cost_per_failure = material_cost + service_cost + corrective_costs = daily_failure * cost_per_failure - # Calculate corrective costs based on cumulative failures and combined costs - cost_per_failure = material_cost + service_cost - corrective_costs = failure_counts * cost_per_failure + return corrective_costs, daily_failure - return corrective_costs, daily_failure_rate + except Exception as e: + print(f"Error fetching or processing data: {str(e)}") + raise # def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int, numEquipments: int) -> Tuple[np.ndarray, np.ndarray]: # day_points = np.arange(0, days) @@ -246,6 +289,7 @@ async def get_calculation_data_by_id(db_session: DbSession, calculation_id) -> C async def create_calculation_result_service( db_session: DbSession, calculation: CalculationData, + token: str ) -> CalculationTimeConstrainsRead: days = 365 # Changed to 365 days as per requirement @@ -265,15 +309,16 @@ async def create_calculation_result_service( # Calculate for each equipment for eq in equipments: - corrective_costs, daily_failures = get_corrective_cost_time_chart( + corrective_costs, daily_failures = await get_corrective_cost_time_chart( material_cost=eq.material_cost, service_cost=eq.service_cost, - days=days + token=token, + location_tag=eq.equipment.location_tag ) overhaul_cost_points = get_overhaul_cost_by_time_chart( calculation_data.parameter.overhaul_cost, - days=days, + days=len(corrective_costs), numEquipments=len(equipments) )