import datetime from typing import Coroutine, List, Optional, Tuple from uuid import UUID import numpy as np import requests from fastapi import HTTPException, status from sqlalchemy import and_, case, func, select, update from sqlalchemy.orm import joinedload from src.database.core import DbSession from src.overhaul_activity.service import get_all_by_session_id from src.overhaul_scope.service import get as get_scope from src.utils import get_latest_numOfFail from src.workorder.model import MasterWorkOrder from .model import (CalculationData, CalculationEquipmentResult, CalculationResult) from .schema import (CalculationResultsRead, CalculationSelectedEquipmentUpdate, CalculationTimeConstrainsParametersCreate, CalculationTimeConstrainsRead, OptimumResult) def get_overhaul_cost_by_time_chart( overhaul_cost: float, days: int, numEquipments: int, decay_base: float = 1.01 ) -> np.ndarray: if overhaul_cost < 0: raise ValueError("Overhaul cost cannot be negative") if days <= 0: raise ValueError("Days must be positive") hours = days * 24 rate = np.arange(1, hours + 1) cost_per_equipment = overhaul_cost / numEquipments results = cost_per_equipment - ((cost_per_equipment / hours) * rate) return results # def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int, numEquipments: int, decay_base: float = 1.1) -> np.ndarray: # if overhaul_cost < 0: # raise ValueError("Overhaul cost cannot be negative") # if days <= 0: # raise ValueError("Days must be positive") # exponents = np.arange(0, days) # cost_per_equipment = overhaul_cost / numEquipments # # Introduce randomness by multiplying with a random factor # random_factors = np.random.normal(1.0, 0.1, numEquipments) # Mean 1.0, Std Dev 0.1 # results = np.array([cost_per_equipment * factor / (decay_base ** exponents) for factor in random_factors]) # results = np.where(np.isfinite(results), results, 0) # return results async def get_corrective_cost_time_chart( material_cost: float, service_cost: float, location_tag: str, token, max_days: int ) -> Tuple[np.ndarray, np.ndarray]: start_date = datetime.datetime(2025, 1, 1) end_date = start_date + datetime.timedelta(days=max_days) url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}" try: response = requests.get( url, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {token}", }, ) data = response.json() ## Get latest data fromdata_today # latest_num_of_fail:float = get_latest_numOfFail(location_tag=location_tag, token=token) latest_num = data["data"][-1]["num_fail"] if not latest_num: latest_num = 1 # Create a complete date range for 2024 start_date = datetime.datetime(2025, 1, 1) date_range = [start_date + datetime.timedelta(days=x) for x in range(max_days)] # Create a dictionary of existing data data_dict = { datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"] for item in data["data"] } # Fill in missing dates with nearest available value complete_data = [] last_known_value = 0 # Default value if no data is available not_full_data = [] for date in date_range: if date in data_dict: if data_dict[date] is not None: last_known_value = data_dict[date] complete_data.append(last_known_value) else: complete_data.append(0) # Convert to numpy array daily_failure = np.array(complete_data) hourly_failure = np.repeat(daily_failure, 24) / 24 # failure_counts = np.cumsum(daily_failure) # Calculate corrective costs cost_per_failure = (material_cost + service_cost) / latest_num if cost_per_failure == 0: raise ValueError("Cost per failure cannot be zero") corrective_costs = hourly_failure * cost_per_failure return corrective_costs, hourly_failure except Exception as e: print(f"Error fetching or processing data: {str(e)}") raise # def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int, numEquipments: int) -> Tuple[np.ndarray, np.ndarray]: # day_points = np.arange(0, days) # # Parameters for failure rate # base_rate = 0.04 # Base failure rate per day # acceleration = 0.7 # How quickly failure rate increases # grace_period = 49 # Days before failures start increasing significantly # # Calculate daily failure rate using sigmoid function # daily_failure_rate = base_rate / (1 + np.exp(-acceleration * (day_points - grace_period)/days)) # # Introduce randomness in the failure rate # random_noise = np.random.normal(0.0, 0.05, (numEquipments, days)) # Mean 0.0, Std Dev 0.05 # daily_failure_rate = daily_failure_rate + random_noise # daily_failure_rate = np.clip(daily_failure_rate, 0, None) # Ensure failure rate is non-negative # # Calculate cumulative failures # failure_counts = np.cumsum(daily_failure_rate) # # Calculate corrective costs based on cumulative failures and combined costs # cost_per_failure = material_cost + service_cost # corrective_costs = failure_counts * cost_per_failure # return corrective_costs, daily_failure_rate async def create_param_and_data( *, db_session: DbSession, calculation_param_in: CalculationTimeConstrainsParametersCreate, created_by: str, parameter_id: Optional[UUID] = None, ): """Creates a new document.""" if calculation_param_in.ohSessionId is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="overhaul_session_id is required", ) calculationData = await CalculationData.create_with_param( db=db_session, overhaul_session_id=calculation_param_in.ohSessionId, avg_failure_cost=calculation_param_in.costPerFailure, overhaul_cost=calculation_param_in.overhaulCost, created_by=created_by, params_id=parameter_id, ) return calculationData async def get_calculation_result(db_session: DbSession, calculation_id: str): days = 667 * 24 scope_calculation = await get_calculation_data_by_id( db_session=db_session, calculation_id=calculation_id ) if not scope_calculation: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="A data with this id does not exist.", ) scope_overhaul = await get_scope( db_session=db_session, overhaul_session_id=scope_calculation.overhaul_session_id ) if not scope_overhaul: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="A data with this id does not exist.", ) calculation_results = [] for i in range(days): result = { "overhaul_cost": 0, "corrective_cost": 0, "num_failures": 0, "day": i + 1, } for eq in scope_calculation.equipment_results: if not eq.is_included: continue result["corrective_cost"] += float(eq.corrective_costs[i]) result["overhaul_cost"] += float(eq.overhaul_costs[i]) result["num_failures"] += int(eq.daily_failures[i]) calculation_results.append(CalculationResultsRead(**result)) # Check if calculation already exist return CalculationTimeConstrainsRead( id=scope_calculation.id, reference=scope_calculation.overhaul_session_id, scope=scope_overhaul.type, results=calculation_results, optimum_oh=scope_calculation.optimum_oh_day, # equipment_results=scope_calculation.equipment_results, ) async def get_calculation_data_by_id( db_session: DbSession, calculation_id ) -> CalculationData: stmt = ( select(CalculationData) .filter(CalculationData.id == calculation_id) .options( joinedload(CalculationData.equipment_results), joinedload(CalculationData.parameter), ) ) result = await db_session.execute(stmt) return result.unique().scalar() async def get_calculation_by_assetnum( *, db_session: DbSession, assetnum: str, calculation_id: str ): stmt = ( select(CalculationEquipmentResult) .where(CalculationEquipmentResult.assetnum == assetnum) .where(CalculationEquipmentResult.calculation_data_id == calculation_id) ) result = await db_session.execute(stmt) return result.scalar() # async def create_calculation_result_service(db_session: DbSession, calculation_id: UUID, costPerFailure: Optional[float] = None): # days = 360 # calculation = await get_calculation_data_by_id(db_session=db_session, calculation_id=calculation_id) # # reference = await get_by_assetnum(db_session=db_session, assetnum=calculation.reference_id) if calculation.overhaul_reference_type == OverhaulReferenceType.ASSET else await get(db_session=db_session, scope_id=calculation.reference_id) # # Multiple Eequipment # equipments_scope = get_all_by_session_id(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id) # # Parameter # overhaulCost = calculation.parameter.overhaul_cost # costPerFailure = costPerFailure if costPerFailure else calculation.parameter.avg_failure_cost # overhaul_cost_points = get_overhaul_cost_by_time_chart( # overhaulCost, days=days) # for eq in equipments_scope: # corrective_cost_points, dailyNumberOfFailure = get_corrective_cost_time_chart( # costPerFailure, days) # total_cost = overhaul_cost_points + corrective_cost_points # optimumOHIndex = np.argmin(total_cost) # numbersOfFailure = sum(dailyNumberOfFailure[:optimumOHIndex]) # optimum = { # "overhaulCost": float(overhaul_cost_points[optimumOHIndex]), # "correctiveCost": float(corrective_cost_points[optimumOHIndex]), # "numOfFailures": int(numbersOfFailure), # "days": int(optimumOHIndex+1) # } # calculation_results = [] # for i in range(days): # result = CalculationResult( # parameter_id=calculation.parameter_id, # calculation_data_id=calculation.id, # day=(i + 1), # corrective_cost=float(corrective_cost_points[i]), # overhaul_cost=float(overhaul_cost_points[i]), # num_failures=int(dailyNumberOfFailure[i]), # ) # calculation_results.append(result) # calculation.optimum_oh_day = int(optimumOHIndex+1) # db_session.add_all(calculation_results) # await db_session.commit() # return CalculationTimeConstrainsRead( # id=calculation.id, # name=reference.scope_name if hasattr( # reference, "scope_name") else reference.master_equipment.name, # reference=reference.assetnum if hasattr( # reference, "assetnum") else reference.scope_name, # results=calculation_results, # optimumOh=optimum # ) async def create_calculation_result_service( db_session: DbSession, calculation: CalculationData, token: str ) -> CalculationTimeConstrainsRead: days = 667 # Changed to 365 days as per requirement # Get all equipment for this calculation session equipments = await get_all_by_session_id( db_session=db_session, overhaul_session_id=calculation.overhaul_session_id ) scope = await get_scope( db_session=db_session, overhaul_session_id=calculation.overhaul_session_id ) calculation_data = await get_calculation_data_by_id( db_session=db_session, calculation_id=calculation.id ) # Store results for each equipment equipment_results: List[CalculationEquipmentResult] = [] total_corrective_costs = np.zeros(days * 24) total_daily_failures = np.zeros(days * 24) # Calculate for each equipment for eq in equipments: corrective_costs, daily_failures = await get_corrective_cost_time_chart( material_cost=eq.material_cost, service_cost=eq.service_cost, token=token, location_tag=eq.equipment.location_tag, max_days=667, ) overhaul_cost_points = get_overhaul_cost_by_time_chart( calculation_data.parameter.overhaul_cost, days=667, numEquipments=len(equipments), ) # Calculate individual equipment optimum points equipment_total_cost = corrective_costs + overhaul_cost_points equipment_optimum_index = np.argmin(equipment_total_cost) equipment_failure_sum = sum(daily_failures[:equipment_optimum_index]) equipment_results.append( CalculationEquipmentResult( corrective_costs=corrective_costs.tolist(), overhaul_costs=overhaul_cost_points.tolist(), daily_failures=daily_failures.tolist(), assetnum=eq.assetnum, material_cost=eq.material_cost, service_cost=eq.service_cost, optimum_day=int(equipment_optimum_index + 1), calculation_data_id=calculation.id, master_equipment=eq.equipment, ) ) # Add to totals total_corrective_costs += corrective_costs total_daily_failures += daily_failures db_session.add_all(equipment_results) # Calculate optimum points using total costs total_cost = total_corrective_costs + overhaul_cost_points optimum_oh_index = np.argmin(total_cost) numbers_of_failure = sum(total_daily_failures[:optimum_oh_index]) optimum = OptimumResult( overhaul_cost=float(overhaul_cost_points[optimum_oh_index]), corrective_cost=float(total_corrective_costs[optimum_oh_index]), num_failures=int(numbers_of_failure), days=int(optimum_oh_index + 1), ) # # Create calculation results for database # calculation_results = [] # for i in range(days): # result = CalculationResult( # parameter_id=calculation.parameter_id, # calculation_data_id=calculation.id, # day=(i + 1), # corrective_cost=float(total_corrective_costs[i]), # overhaul_cost=float(overhaul_cost_points[i]), # num_failures=int(total_daily_failures[i]), # ) # calculation_results.append(result) # Update calculation with optimum day calculation.optimum_oh_day = optimum.days await db_session.commit() # Return results including individual equipment data return CalculationTimeConstrainsRead( id=calculation.id, reference=calculation.overhaul_session_id, scope=scope.type, results=[], optimum_oh=optimum, equipment_results=equipment_results, ) async def get_calculation_by_reference_and_parameter( *, db_session: DbSession, calculation_reference_id, parameter_id ): stmt = select(CalculationData).filter( and_( CalculationData.reference_id == calculation_reference_id, CalculationData.parameter_id == parameter_id, ) ) result = await db_session.execute(stmt) return result.scalar() async def get_calculation_result_by_day( *, db_session: DbSession, calculation_id, simulation_day ): stmt = select(CalculationResult).filter( and_( CalculationResult.day == simulation_day, CalculationResult.calculation_data_id == calculation_id, ) ) result = await db_session.execute(stmt) return result.scalar() async def get_avg_cost_by_asset(*, db_session: DbSession, assetnum: str): stmt = select(func.avg(MasterWorkOrder.total_cost_max).label("average_cost")).where( MasterWorkOrder.assetnum == assetnum ) result = await db_session.execute(stmt) return result.scalar_one_or_none() async def bulk_update_equipment( *, db: DbSession, selected_equipments: List[CalculationSelectedEquipmentUpdate], calculation_data_id: UUID, ): # Create a dictionary mapping assetnum to is_included status case_mappings = {asset.assetnum: asset.is_included for asset in selected_equipments} # Get all assetnums that need to be updated assetnums = list(case_mappings.keys()) # Create a list of when clauses for the case statement when_clauses = [ (CalculationEquipmentResult.assetnum == assetnum, is_included) for assetnum, is_included in case_mappings.items() ] # Build the update statement stmt = ( update(CalculationEquipmentResult) .where(CalculationEquipmentResult.calculation_data_id == calculation_data_id) .where(CalculationEquipmentResult.assetnum.in_(assetnums)) .values( { "is_included": case( *when_clauses ) # Unpack the when clauses as separate arguments } ) ) await db.execute(stmt) await db.commit() return assetnums