You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

176 lines
6.8 KiB
Python

from typing import Optional
from uuid import UUID
import numpy as np
from sqlalchemy import and_, select
from sqlalchemy.orm import joinedload
from src.database.core import DbSession
from .schema import CalculationTimeConstrainsParametersCreate, CalculationTimeConstrainsRead
from .model import CalculationParam, OverhaulReferenceType, CalculationData, CalculationResult
from fastapi import HTTPException, status
from src.scope.service import get_by_scope_name, get
def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int) -> list:
exponents = np.arange(0, days)
results = overhaul_cost / (2 ** exponents)
return results
def get_corrective_cost_time_chart(cost_per_failure: float, days: int) -> list:
day_points = np.arange(0, days)
# Parameters for failure rate
base_rate = 5.4 # Base failure rate per day
acceleration = 11.2 # How quickly failure rate increases
grace_period = 15 # Days before failures start increasing significantly
# Calculate daily failure rate using sigmoid function
daily_failure_rate = base_rate / \
(1 + np.exp(-acceleration * (day_points - grace_period)/days))
# Calculate cumulative failures
failure_counts = np.cumsum(daily_failure_rate)
# Calculate corrective costs based on cumulative failures and cost per failure
corrective_costs = failure_counts * cost_per_failure
return corrective_costs, failure_counts
async def create_param_and_data(*, db_session: DbSession, calculation_param_in: CalculationTimeConstrainsParametersCreate, created_by: str, parameter_id: Optional[UUID] = None):
"""Creates a new document."""
if calculation_param_in.scopeOH is None and calculation_param_in.assetnum is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Either scope_id or assetnum must be provided"
)
if calculation_param_in.scopeOH is not None and calculation_param_in.assetnum is not None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Only one of scope_id or assetnum must be provided"
)
scope = await get_by_scope_name(db_session=db_session, scope_name=calculation_param_in.scopeOH) if calculation_param_in.scopeOH is not None else None
if calculation_param_in.scopeOH is not None and scope is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Scope not found"
)
overhaul_reference_type = OverhaulReferenceType.SCOPE if scope is not None else OverhaulReferenceType.ASSET
reference_id = scope.id if scope else calculation_param_in.assetnum
calculationData = await CalculationData.create_with_param(
db=db_session,
overhaul_reference_type=overhaul_reference_type,
reference_id=reference_id,
avg_failure_cost=calculation_param_in.costPerFailure,
overhaul_cost=calculation_param_in.overhaulCost,
created_by=created_by,
params_id=parameter_id
)
return calculationData
async def get_calculation_result(db_session: DbSession, calculation_id: str):
calculation = await get_calculation_data_by_id(db_session=db_session, calculation_id=calculation_id)
reference = calculation.reference_id if calculation.overhaul_reference_type == OverhaulReferenceType.ASSET else await get(db_session=db_session, scope_id=calculation.reference_id)
stmt = select(CalculationResult).filter(
CalculationResult.calculation_data_id == calculation_id).order_by(CalculationResult.day)
optimum = stmt.filter(CalculationResult.day == calculation.optimum_oh_day)
results = await db_session.execute(stmt)
optimumOh = await db_session.scalar(optimum)
optimumRes = {
"overhaulCost": optimumOh.overhaul_cost,
"correctiveCost": optimumOh.corrective_cost,
"numOfFailures": optimumOh.num_failures,
"days": optimumOh.day
}
return CalculationTimeConstrainsRead(
id=calculation.id,
reference=reference if isinstance(
reference, str) else reference.scope_name,
results=results.scalars().all(),
optimumOh=optimumRes
)
async def get_calculation_data_by_id(db_session: DbSession, calculation_id) -> CalculationData:
stmt = select(CalculationData).filter(CalculationData.id ==
calculation_id).options(joinedload(CalculationData.parameter))
result = await db_session.execute(stmt)
return result.unique().scalar()
async def create_calculation_result_service(db_session: DbSession, calculation_id: UUID):
days = 60
calculation = await get_calculation_data_by_id(db_session=db_session, calculation_id=calculation_id)
reference = calculation.reference_id if calculation.overhaul_reference_type == OverhaulReferenceType.ASSET else await get(db_session=db_session, scope_id=calculation.reference_id)
# Parameter
overhaulCost = calculation.parameter.overhaul_cost
costPerFailure = calculation.parameter.avg_failure_cost
overhaul_cost_points = get_overhaul_cost_by_time_chart(
overhaulCost, days=days)
corrective_cost_points, dailyNumberOfFailure = get_corrective_cost_time_chart(
costPerFailure, days)
total_cost = overhaul_cost_points + corrective_cost_points
optimumOHIndex = np.argmin(total_cost)
numbersOfFailure = sum(dailyNumberOfFailure[:optimumOHIndex])
optimum = {
"overhaulCost": float(overhaul_cost_points[optimumOHIndex]),
"correctiveCost": float(corrective_cost_points[optimumOHIndex]),
"numOfFailures": int(numbersOfFailure),
"days": int(optimumOHIndex+1)
}
calculation_results = []
for i in range(days):
result = CalculationResult(
parameter_id=calculation.parameter_id,
calculation_data_id=calculation.id,
day=(i + 1),
corrective_cost=float(corrective_cost_points[i]),
overhaul_cost=float(overhaul_cost_points[i]),
num_failures=int(dailyNumberOfFailure[i]),
)
calculation_results.append(result)
calculation.optimum_oh_day = int(optimumOHIndex+1)
db_session.add_all(calculation_results)
await db_session.commit()
return CalculationTimeConstrainsRead(
id=calculation.id,
reference=reference if isinstance(
reference, str) else reference.scope_name,
results=calculation_results,
optimumOh=optimum
)
async def get_calculation_by_reference_and_parameter(*, db_session: DbSession, calculation_reference_id, parameter_id):
stmt = select(CalculationData).filter(and_(
CalculationData.reference_id == calculation_reference_id,
CalculationData.parameter_id == parameter_id,
))
result = await db_session.execute(stmt)
return result.scalar()