You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

289 lines
11 KiB
Python

from typing import List, Optional, Tuple
from uuid import UUID
import numpy as np
from sqlalchemy import and_, func, select
from sqlalchemy.orm import joinedload
from src.database.core import DbSession
from src.overhaul_activity.service import get_all_by_session_id
from src.scope_equipment.model import ScopeEquipment
from src.workorder.model import MasterWorkOrder
from .schema import CalculationTimeConstrainsParametersCreate, CalculationTimeConstrainsRead, EquipmentResult, OptimumResult
from .model import CalculationParam, OverhaulReferenceType, CalculationData, CalculationResult
from fastapi import HTTPException, status
from src.overhaul_scope.service import get_by_scope_name, get
from src.scope_equipment.service import get_by_assetnum
def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int) -> list:
exponents = np.arange(0, days)
results = overhaul_cost / (2 ** exponents)
results = np.where(np.isfinite(results), results, 0)
return results
def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int) -> Tuple[np.ndarray, np.ndarray]:
day_points = np.arange(0, days)
# Parameters for failure rate
base_rate = 5.4 # Base failure rate per day
acceleration = 11.2 # How quickly failure rate increases
grace_period = 15 # Days before failures start increasing significantly
# Calculate daily failure rate using sigmoid function
daily_failure_rate = base_rate / (1 + np.exp(-acceleration * (day_points - grace_period)/days))
# Calculate cumulative failures
failure_counts = np.cumsum(daily_failure_rate)
# Calculate corrective costs based on cumulative failures and combined costs
cost_per_failure = material_cost + service_cost
corrective_costs = failure_counts * cost_per_failure
return corrective_costs, daily_failure_rate
async def create_param_and_data(*, db_session: DbSession, calculation_param_in: CalculationTimeConstrainsParametersCreate, created_by: str, parameter_id: Optional[UUID] = None):
"""Creates a new document."""
if calculation_param_in.ohSessionId is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="overhaul_session_id is required"
)
calculationData = await CalculationData.create_with_param(
db=db_session,
overhaul_session_id=calculation_param_in.ohSessionId,
avg_failure_cost=calculation_param_in.costPerFailure,
overhaul_cost=calculation_param_in.overhaulCost,
created_by=created_by,
params_id=parameter_id
)
return calculationData
async def get_calculation_result(db_session: DbSession, calculation_id: str):
calculation = await get_calculation_data_by_id(db_session=db_session, calculation_id=calculation_id)
reference = await get_by_assetnum(db_session=db_session, assetnum=calculation.reference_id) if calculation.overhaul_reference_type == OverhaulReferenceType.ASSET else await get(db_session=db_session, scope_id=calculation.reference_id)
stmt = select(CalculationResult).filter(
CalculationResult.calculation_data_id == calculation_id).order_by(CalculationResult.day)
optimum = stmt.filter(CalculationResult.day == calculation.optimum_oh_day)
results = await db_session.execute(stmt)
optimumOh = await db_session.scalar(optimum)
optimumRes = {
"overhaulCost": optimumOh.overhaul_cost,
"correctiveCost": optimumOh.corrective_cost,
"numOfFailures": optimumOh.num_failures,
"days": optimumOh.day
}
return CalculationTimeConstrainsRead(
id=calculation.id,
name=reference.scope_name if hasattr(
reference, "scope_name") else reference.master_equipment.name,
reference=reference.assetnum if hasattr(
reference, "assetnum") else reference.scope_name,
results=results.scalars().all(),
optimumOh=optimumRes
)
async def get_calculation_data_by_id(db_session: DbSession, calculation_id) -> CalculationData:
stmt = select(CalculationData).filter(CalculationData.id ==
calculation_id).options(joinedload(CalculationData.parameter))
result = await db_session.execute(stmt)
return result.unique().scalar()
# async def create_calculation_result_service(db_session: DbSession, calculation_id: UUID, costPerFailure: Optional[float] = None):
# days = 360
# calculation = await get_calculation_data_by_id(db_session=db_session, calculation_id=calculation_id)
# # reference = await get_by_assetnum(db_session=db_session, assetnum=calculation.reference_id) if calculation.overhaul_reference_type == OverhaulReferenceType.ASSET else await get(db_session=db_session, scope_id=calculation.reference_id)
# # Multiple Eequipment
# equipments_scope = get_all_by_session_id(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id)
# # Parameter
# overhaulCost = calculation.parameter.overhaul_cost
# costPerFailure = costPerFailure if costPerFailure else calculation.parameter.avg_failure_cost
# overhaul_cost_points = get_overhaul_cost_by_time_chart(
# overhaulCost, days=days)
# for eq in equipments_scope:
# corrective_cost_points, dailyNumberOfFailure = get_corrective_cost_time_chart(
# costPerFailure, days)
# total_cost = overhaul_cost_points + corrective_cost_points
# optimumOHIndex = np.argmin(total_cost)
# numbersOfFailure = sum(dailyNumberOfFailure[:optimumOHIndex])
# optimum = {
# "overhaulCost": float(overhaul_cost_points[optimumOHIndex]),
# "correctiveCost": float(corrective_cost_points[optimumOHIndex]),
# "numOfFailures": int(numbersOfFailure),
# "days": int(optimumOHIndex+1)
# }
# calculation_results = []
# for i in range(days):
# result = CalculationResult(
# parameter_id=calculation.parameter_id,
# calculation_data_id=calculation.id,
# day=(i + 1),
# corrective_cost=float(corrective_cost_points[i]),
# overhaul_cost=float(overhaul_cost_points[i]),
# num_failures=int(dailyNumberOfFailure[i]),
# )
# calculation_results.append(result)
# calculation.optimum_oh_day = int(optimumOHIndex+1)
# db_session.add_all(calculation_results)
# await db_session.commit()
# return CalculationTimeConstrainsRead(
# id=calculation.id,
# name=reference.scope_name if hasattr(
# reference, "scope_name") else reference.master_equipment.name,
# reference=reference.assetnum if hasattr(
# reference, "assetnum") else reference.scope_name,
# results=calculation_results,
# optimumOh=optimum
# )
async def create_calculation_result_service(
db_session: DbSession,
calculation_id: UUID,
) -> CalculationTimeConstrainsRead:
days = 365 # Changed to 365 days as per requirement
calculation = await get_calculation_data_by_id(db_session=db_session, calculation_id=calculation_id)
# Get all equipment for this calculation session
equipments = await get_all_by_session_id(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id)
# Calculate overhaul costs once since it's shared
overhaul_cost_points = get_overhaul_cost_by_time_chart(
calculation.parameter.overhaul_cost,
days=days
)
# Store results for each equipment
equipment_results: List[EquipmentResult] = []
total_corrective_costs = np.zeros(days)
total_daily_failures = np.zeros(days)
# Calculate for each equipment
for eq in equipments:
corrective_costs, daily_failures = get_corrective_cost_time_chart(
material_cost=eq.material_cost,
service_cost=eq.service_cost,
days=days
)
# Calculate individual equipment optimum points
equipment_total_cost = corrective_costs + overhaul_cost_points
equipment_optimum_index = np.argmin(equipment_total_cost)
equipment_failure_sum = sum(daily_failures[:equipment_optimum_index])
equipment_optimum = OptimumResult(
overhaul_cost=float(overhaul_cost_points[equipment_optimum_index]),
corrective_cost=float(corrective_costs[equipment_optimum_index]),
num_failures=int(equipment_failure_sum),
days=int(equipment_optimum_index + 1)
)
equipment_results.append(EquipmentResult(
corrective_costs=corrective_costs.tolist(),
overhaul_costs=overhaul_cost_points.tolist(),
daily_failures=daily_failures.tolist(),
assetnum=eq.assetnum,
material_cost=eq.material_cost,
service_cost=eq.service_cost,
optimum=equipment_optimum
))
# Add to totals
total_corrective_costs += corrective_costs
total_daily_failures += daily_failures
# Calculate optimum points using total costs
total_cost = total_corrective_costs + overhaul_cost_points
optimum_oh_index = np.argmin(total_cost)
numbers_of_failure = sum(total_daily_failures[:optimum_oh_index])
optimum = OptimumResult(
overhaul_cost=float(overhaul_cost_points[optimum_oh_index]),
corrective_cost=float(total_corrective_costs[optimum_oh_index]),
num_failures=int(numbers_of_failure),
days=int(optimum_oh_index + 1)
)
# Create calculation results for database
calculation_results = []
for i in range(days):
result = CalculationResult(
parameter_id=calculation.parameter_id,
calculation_data_id=calculation.id,
day=(i + 1),
corrective_cost=float(total_corrective_costs[i]),
overhaul_cost=float(overhaul_cost_points[i]),
num_failures=int(total_daily_failures[i]),
)
calculation_results.append(result)
# Update calculation with optimum day
calculation.optimum_oh_day = optimum.days
# Save to database
db_session.add_all(calculation_results)
await db_session.commit()
# Return results including individual equipment data
return CalculationTimeConstrainsRead(
id=calculation.id,
reference=calculation.overhaul_session_id,
results=calculation_results,
optimum_oh=optimum,
equipment_results=equipment_results
)
async def get_calculation_by_reference_and_parameter(*, db_session: DbSession, calculation_reference_id, parameter_id):
stmt = select(CalculationData).filter(and_(
CalculationData.reference_id == calculation_reference_id,
CalculationData.parameter_id == parameter_id,
))
result = await db_session.execute(stmt)
return result.scalar()
async def get_calculation_result_by_day(*, db_session: DbSession, calculation_id, simulation_day):
stmt = select(CalculationResult).filter(and_(
CalculationResult.day == simulation_day,
CalculationResult.calculation_data_id == calculation_id
))
result = await db_session.execute(stmt)
return result.scalar()
async def get_avg_cost_by_asset(*, db_session: DbSession, assetnum: str):
stmt = (
select(func.avg(MasterWorkOrder.total_cost_max).label('average_cost'))
.where(MasterWorkOrder.assetnum == assetnum)
)
result = await db_session.execute(stmt)
return result.scalar_one_or_none()