from typing import Optional from uuid import UUID import numpy as np from sqlalchemy import and_, func, select from sqlalchemy.orm import joinedload from src.database.core import DbSession from src.scope_equipment.model import ScopeEquipment from src.workorder.model import MasterWorkOrder from .schema import CalculationTimeConstrainsParametersCreate, CalculationTimeConstrainsRead from .model import CalculationParam, OverhaulReferenceType, CalculationData, CalculationResult from fastapi import HTTPException, status from src.overhaul_scope.service import get_by_scope_name, get from src.scope_equipment.service import get_by_assetnum def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int) -> list: exponents = np.arange(0, days) results = overhaul_cost / (2 ** exponents) return results def get_corrective_cost_time_chart(cost_per_failure: float, days: int) -> list: day_points = np.arange(0, days) # Parameters for failure rate base_rate = 5.4 # Base failure rate per day acceleration = 11.2 # How quickly failure rate increases grace_period = 15 # Days before failures start increasing significantly # Calculate daily failure rate using sigmoid function daily_failure_rate = base_rate / \ (1 + np.exp(-acceleration * (day_points - grace_period)/days)) # Calculate cumulative failures failure_counts = np.cumsum(daily_failure_rate) # Calculate corrective costs based on cumulative failures and cost per failure corrective_costs = failure_counts * cost_per_failure return corrective_costs, failure_counts async def create_param_and_data(*, db_session: DbSession, calculation_param_in: CalculationTimeConstrainsParametersCreate, created_by: str, parameter_id: Optional[UUID] = None): """Creates a new document.""" if calculation_param_in.scopeOH is None and calculation_param_in.assetnum is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Either scope_id or assetnum must be provided" ) if calculation_param_in.scopeOH is not None and calculation_param_in.assetnum is not None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Only one of scope_id or assetnum must be provided" ) scope = await get_by_scope_name(db_session=db_session, scope_name=calculation_param_in.scopeOH) if calculation_param_in.scopeOH is not None else None if calculation_param_in.scopeOH is not None and scope is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Scope not found" ) overhaul_reference_type = OverhaulReferenceType.SCOPE if scope is not None else OverhaulReferenceType.ASSET reference_id = scope.id if scope else calculation_param_in.assetnum calculationData = await CalculationData.create_with_param( db=db_session, overhaul_reference_type=overhaul_reference_type, reference_id=reference_id, avg_failure_cost=calculation_param_in.costPerFailure, overhaul_cost=calculation_param_in.overhaulCost, created_by=created_by, params_id=parameter_id ) return calculationData async def get_calculation_result(db_session: DbSession, calculation_id: str): calculation = await get_calculation_data_by_id(db_session=db_session, calculation_id=calculation_id) reference = await get_by_assetnum(db_session=db_session, assetnum=calculation.reference_id) if calculation.overhaul_reference_type == OverhaulReferenceType.ASSET else await get(db_session=db_session, scope_id=calculation.reference_id) stmt = select(CalculationResult).filter( CalculationResult.calculation_data_id == calculation_id).order_by(CalculationResult.day) optimum = stmt.filter(CalculationResult.day == calculation.optimum_oh_day) results = await db_session.execute(stmt) optimumOh = await db_session.scalar(optimum) optimumRes = { "overhaulCost": optimumOh.overhaul_cost, "correctiveCost": optimumOh.corrective_cost, "numOfFailures": optimumOh.num_failures, "days": optimumOh.day } return CalculationTimeConstrainsRead( id=calculation.id, name=reference.scope_name if hasattr( reference, "scope_name") else reference.master_equipment.name, reference=reference.assetnum if hasattr( reference, "assetnum") else reference.scope_name, results=results.scalars().all(), optimumOh=optimumRes ) async def get_calculation_data_by_id(db_session: DbSession, calculation_id) -> CalculationData: stmt = select(CalculationData).filter(CalculationData.id == calculation_id).options(joinedload(CalculationData.parameter)) result = await db_session.execute(stmt) return result.unique().scalar() async def create_calculation_result_service(db_session: DbSession, calculation_id: UUID, costPerFailure: Optional[float] = None): days = 60 calculation = await get_calculation_data_by_id(db_session=db_session, calculation_id=calculation_id) reference = await get_by_assetnum(db_session=db_session, assetnum=calculation.reference_id) if calculation.overhaul_reference_type == OverhaulReferenceType.ASSET else await get(db_session=db_session, scope_id=calculation.reference_id) # Parameter overhaulCost = calculation.parameter.overhaul_cost costPerFailure = costPerFailure if costPerFailure else calculation.parameter.avg_failure_cost overhaul_cost_points = get_overhaul_cost_by_time_chart( overhaulCost, days=days) corrective_cost_points, dailyNumberOfFailure = get_corrective_cost_time_chart( costPerFailure, days) total_cost = overhaul_cost_points + corrective_cost_points optimumOHIndex = np.argmin(total_cost) numbersOfFailure = sum(dailyNumberOfFailure[:optimumOHIndex]) optimum = { "overhaulCost": float(overhaul_cost_points[optimumOHIndex]), "correctiveCost": float(corrective_cost_points[optimumOHIndex]), "numOfFailures": int(numbersOfFailure), "days": int(optimumOHIndex+1) } calculation_results = [] for i in range(days): result = CalculationResult( parameter_id=calculation.parameter_id, calculation_data_id=calculation.id, day=(i + 1), corrective_cost=float(corrective_cost_points[i]), overhaul_cost=float(overhaul_cost_points[i]), num_failures=int(dailyNumberOfFailure[i]), ) calculation_results.append(result) calculation.optimum_oh_day = int(optimumOHIndex+1) db_session.add_all(calculation_results) await db_session.commit() return CalculationTimeConstrainsRead( id=calculation.id, name=reference.scope_name if hasattr( reference, "scope_name") else reference.master_equipment.name, reference=reference.assetnum if hasattr( reference, "assetnum") else reference.scope_name, results=calculation_results, optimumOh=optimum ) async def get_calculation_by_reference_and_parameter(*, db_session: DbSession, calculation_reference_id, parameter_id): stmt = select(CalculationData).filter(and_( CalculationData.reference_id == calculation_reference_id, CalculationData.parameter_id == parameter_id, )) result = await db_session.execute(stmt) return result.scalar() async def get_calculation_result_by_day(*, db_session: DbSession, calculation_id, simulation_day): stmt = select(CalculationResult).filter(and_( CalculationResult.day == simulation_day, CalculationResult.calculation_data_id == calculation_id )) result = await db_session.execute(stmt) return result.scalar() async def get_avg_cost_by_asset(*, db_session: DbSession, assetnum: str): stmt = ( select(func.avg(MasterWorkOrder.total_cost_max).label('average_cost')) .where(MasterWorkOrder.assetnum == assetnum) ) result = await db_session.execute(stmt) return result.scalar_one_or_none()