from typing import List, Optional, Tuple from uuid import UUID import numpy as np from sqlalchemy import and_, func, select from sqlalchemy.engine import result from sqlalchemy.orm import joinedload from src.database.core import DbSession from src.overhaul_activity.service import get_all_by_session_id from src.scope_equipment.model import ScopeEquipment from src.workorder.model import MasterWorkOrder from .schema import CalculationTimeConstrainsParametersCreate, CalculationTimeConstrainsRead, EquipmentResult, OptimumResult from .model import CalculationParam, OverhaulReferenceType, CalculationData, CalculationResult, CalculationEquipmentResult from fastapi import HTTPException, status from src.overhaul_scope.service import get_by_scope_name, get from src.scope_equipment.service import get_by_assetnum from src.overhaul_scope.service import get as get_scope from .schema import CalculationResultsRead def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int,numEquipments:int ,decay_base: float = 1.1) -> np.ndarray: if overhaul_cost < 0: raise ValueError("Overhaul cost cannot be negative") if days <= 0: raise ValueError("Days must be positive") exponents = np.arange(0, days) cost_per_equipment = overhaul_cost / numEquipments # Using a slower decay base to spread the budget depletion over more days results = cost_per_equipment / (decay_base ** exponents) results = np.where(np.isfinite(results), results, 0) return results # def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int, numEquipments: int, decay_base: float = 1.1) -> np.ndarray: # if overhaul_cost < 0: # raise ValueError("Overhaul cost cannot be negative") # if days <= 0: # raise ValueError("Days must be positive") # exponents = np.arange(0, days) # cost_per_equipment = overhaul_cost / numEquipments # # Introduce randomness by multiplying with a random factor # random_factors = np.random.normal(1.0, 0.1, numEquipments) # Mean 1.0, Std Dev 0.1 # results = np.array([cost_per_equipment * factor / (decay_base ** exponents) for factor in random_factors]) # results = np.where(np.isfinite(results), results, 0) # return results def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int) -> Tuple[np.ndarray, np.ndarray]: day_points = np.arange(0, days) # Parameters for failure rate base_rate = 0.2 # Base failure rate per day acceleration = 2.4 # How quickly failure rate increases grace_period = 170 # Days before failures start increasing significantly # Calculate daily failure rate using sigmoid function daily_failure_rate = base_rate / (1 + np.exp(-acceleration * (day_points - grace_period)/days)) noise = np.random.normal(0.0, 0.05, days) # Mean 0.0, Std Dev 0.05 daily_failure_rate += noise # Calculate cumulative failures failure_counts = np.cumsum(daily_failure_rate) # Calculate corrective costs based on cumulative failures and combined costs cost_per_failure = material_cost + service_cost corrective_costs = failure_counts * cost_per_failure return corrective_costs, daily_failure_rate # def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int, numEquipments: int) -> Tuple[np.ndarray, np.ndarray]: # day_points = np.arange(0, days) # # Parameters for failure rate # base_rate = 0.2 # Base failure rate per day # acceleration = 2.4 # How quickly failure rate increases # grace_period = 170 # Days before failures start increasing significantly # # Calculate daily failure rate using sigmoid function # daily_failure_rate = base_rate / (1 + np.exp(-acceleration * (day_points - grace_period)/days)) # # Introduce randomness in the failure rate # random_noise = np.random.normal(0.0, 0.05, (numEquipments, days)) # Mean 0.0, Std Dev 0.05 # daily_failure_rate = daily_failure_rate + random_noise # daily_failure_rate = np.clip(daily_failure_rate, 0, None) # Ensure failure rate is non-negative # # Calculate cumulative failures # failure_counts = np.cumsum(daily_failure_rate) # # Calculate corrective costs based on cumulative failures and combined costs # cost_per_failure = material_cost + service_cost # corrective_costs = failure_counts * cost_per_failure # return corrective_costs, daily_failure_rate async def create_param_and_data(*, db_session: DbSession, calculation_param_in: CalculationTimeConstrainsParametersCreate, created_by: str, parameter_id: Optional[UUID] = None): """Creates a new document.""" if calculation_param_in.ohSessionId is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="overhaul_session_id is required" ) calculationData = await CalculationData.create_with_param( db=db_session, overhaul_session_id=calculation_param_in.ohSessionId, avg_failure_cost=calculation_param_in.costPerFailure, overhaul_cost=calculation_param_in.overhaulCost, created_by=created_by, params_id=parameter_id ) return calculationData async def get_calculation_result(db_session: DbSession, calculation_id: str): days=365 scope_calculation = await get_calculation_data_by_id(db_session=db_session, calculation_id=calculation_id) if not scope_calculation: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="A data with this id does not exist.", ) scope_overhaul = await get_scope(db_session=db_session, overhaul_session_id=scope_calculation.overhaul_session_id) if not scope_overhaul: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="A data with this id does not exist.", ) calculation_results = [] for i in range(days): result = { "overhaul_cost": 0, "corrective_cost": 0, "num_failures": 0, "day": i + 1 } for eq in scope_calculation.equipment_results: if not eq.is_included: continue result["corrective_cost"] += float(eq.corrective_costs[i]) result["overhaul_cost"] += float(eq.overhaul_costs[i]) result["num_failures"] += int(eq.daily_failures[i]) calculation_results.append(CalculationResultsRead(**result)) # Check if calculation already exist return CalculationTimeConstrainsRead( id=scope_calculation.id, reference=scope_calculation.overhaul_session_id, scope=scope_overhaul.type, results=calculation_results, optimum_oh=scope_calculation.optimum_oh_day, equipment_results=scope_calculation.equipment_results ) async def get_calculation_data_by_id(db_session: DbSession, calculation_id) -> CalculationData: stmt = select(CalculationData).filter( CalculationData.id == calculation_id ).options( joinedload(CalculationData.equipment_results), joinedload(CalculationData.parameter) ) result = await db_session.execute(stmt) return result.unique().scalar() # async def create_calculation_result_service(db_session: DbSession, calculation_id: UUID, costPerFailure: Optional[float] = None): # days = 360 # calculation = await get_calculation_data_by_id(db_session=db_session, calculation_id=calculation_id) # # reference = await get_by_assetnum(db_session=db_session, assetnum=calculation.reference_id) if calculation.overhaul_reference_type == OverhaulReferenceType.ASSET else await get(db_session=db_session, scope_id=calculation.reference_id) # # Multiple Eequipment # equipments_scope = get_all_by_session_id(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id) # # Parameter # overhaulCost = calculation.parameter.overhaul_cost # costPerFailure = costPerFailure if costPerFailure else calculation.parameter.avg_failure_cost # overhaul_cost_points = get_overhaul_cost_by_time_chart( # overhaulCost, days=days) # for eq in equipments_scope: # corrective_cost_points, dailyNumberOfFailure = get_corrective_cost_time_chart( # costPerFailure, days) # total_cost = overhaul_cost_points + corrective_cost_points # optimumOHIndex = np.argmin(total_cost) # numbersOfFailure = sum(dailyNumberOfFailure[:optimumOHIndex]) # optimum = { # "overhaulCost": float(overhaul_cost_points[optimumOHIndex]), # "correctiveCost": float(corrective_cost_points[optimumOHIndex]), # "numOfFailures": int(numbersOfFailure), # "days": int(optimumOHIndex+1) # } # calculation_results = [] # for i in range(days): # result = CalculationResult( # parameter_id=calculation.parameter_id, # calculation_data_id=calculation.id, # day=(i + 1), # corrective_cost=float(corrective_cost_points[i]), # overhaul_cost=float(overhaul_cost_points[i]), # num_failures=int(dailyNumberOfFailure[i]), # ) # calculation_results.append(result) # calculation.optimum_oh_day = int(optimumOHIndex+1) # db_session.add_all(calculation_results) # await db_session.commit() # return CalculationTimeConstrainsRead( # id=calculation.id, # name=reference.scope_name if hasattr( # reference, "scope_name") else reference.master_equipment.name, # reference=reference.assetnum if hasattr( # reference, "assetnum") else reference.scope_name, # results=calculation_results, # optimumOh=optimum # ) async def create_calculation_result_service( db_session: DbSession, calculation: CalculationData, ) -> CalculationTimeConstrainsRead: days = 365 # Changed to 365 days as per requirement # Get all equipment for this calculation session equipments = await get_all_by_session_id(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id) scope = await get_scope(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id) calculation_data = await get_calculation_data_by_id(db_session=db_session, calculation_id=calculation.id) # Store results for each equipment equipment_results: List[CalculationEquipmentResult] = [] total_corrective_costs = np.zeros(days) total_daily_failures = np.zeros(days) # Calculate for each equipment for eq in equipments: corrective_costs, daily_failures = get_corrective_cost_time_chart( material_cost=eq.material_cost, service_cost=eq.service_cost, days=days ) overhaul_cost_points = get_overhaul_cost_by_time_chart( calculation_data.parameter.overhaul_cost, days=days, numEquipments=len(equipments) ) # Calculate individual equipment optimum points equipment_total_cost = corrective_costs + overhaul_cost_points equipment_optimum_index = np.argmin(equipment_total_cost) equipment_failure_sum = sum(daily_failures[:equipment_optimum_index]) equipment_results.append(CalculationEquipmentResult( corrective_costs=corrective_costs.tolist(), overhaul_costs=overhaul_cost_points.tolist(), daily_failures=daily_failures.tolist(), assetnum=eq.assetnum, material_cost=eq.material_cost, service_cost=eq.service_cost, optimum_day=int(equipment_optimum_index + 1), calculation_data_id=calculation.id )) # Add to totals total_corrective_costs += corrective_costs total_daily_failures += daily_failures db_session.add_all(equipment_results) # Calculate optimum points using total costs total_cost = total_corrective_costs + overhaul_cost_points optimum_oh_index = np.argmin(total_cost) numbers_of_failure = sum(total_daily_failures[:optimum_oh_index]) optimum = OptimumResult( overhaul_cost=float(overhaul_cost_points[optimum_oh_index]), corrective_cost=float(total_corrective_costs[optimum_oh_index]), num_failures=int(numbers_of_failure), days=int(optimum_oh_index + 1) ) # # Create calculation results for database # calculation_results = [] # for i in range(days): # result = CalculationResult( # parameter_id=calculation.parameter_id, # calculation_data_id=calculation.id, # day=(i + 1), # corrective_cost=float(total_corrective_costs[i]), # overhaul_cost=float(overhaul_cost_points[i]), # num_failures=int(total_daily_failures[i]), # ) # calculation_results.append(result) # Update calculation with optimum day calculation.optimum_oh_day = optimum.days await db_session.commit() # Return results including individual equipment data return CalculationTimeConstrainsRead( id=calculation.id, reference=calculation.overhaul_session_id, scope=scope.type, results=[], optimum_oh=optimum, equipment_results=equipment_results ) async def get_calculation_by_reference_and_parameter(*, db_session: DbSession, calculation_reference_id, parameter_id): stmt = select(CalculationData).filter(and_( CalculationData.reference_id == calculation_reference_id, CalculationData.parameter_id == parameter_id, )) result = await db_session.execute(stmt) return result.scalar() async def get_calculation_result_by_day(*, db_session: DbSession, calculation_id, simulation_day): stmt = select(CalculationResult).filter(and_( CalculationResult.day == simulation_day, CalculationResult.calculation_data_id == calculation_id )) result = await db_session.execute(stmt) return result.scalar() async def get_avg_cost_by_asset(*, db_session: DbSession, assetnum: str): stmt = ( select(func.avg(MasterWorkOrder.total_cost_max).label('average_cost')) .where(MasterWorkOrder.assetnum == assetnum) ) result = await db_session.execute(stmt) return result.scalar_one_or_none()