fix time constrain calculation

main
Cizz22 1 year ago
parent 26f5ac9c91
commit 8ce49618c9

@ -62,16 +62,12 @@ class CalculationData(Base, DefaultMixin, IdentityMixin):
parameter_id = Column(UUID(as_uuid=True), ForeignKey(
'oh_ms_calculation_param.id'), nullable=True)
overhaul_reference_type = Column(String(10), nullable=False)
reference_id = Column(String, nullable=False)
overhaul_session_id= Column(UUID(as_uuid=True), ForeignKey('oh_ms_overhaul.id'))
optimum_oh_day = Column(Integer, nullable=True)
parameter = relationship(
"CalculationParam", back_populates="calculation_data")
scope = relationship("Scope", foreign_keys=[
reference_id], primaryjoin="and_(CalculationData.overhaul_reference_type=='SCOPE', " "CalculationData.reference_id==Scope.id)")
equipment = relationship("ScopeEquipment", foreign_keys=[
reference_id], primaryjoin="and_(CalculationData.overhaul_reference_type=='ASSET', " "CalculationData.reference_id==ScopeEquipment.assetnum)")
@classmethod
async def create_with_param(

@ -37,8 +37,7 @@ class CalculationTimeConstrainsParametersRead(CalculationTimeConstrainsBase):
class CalculationTimeConstrainsParametersCreate(CalculationTimeConstrainsBase):
overhaulCost: Optional[float] = Field(0, description="Overhaul cost")
scopeOH: Optional[str] = Field(None, description="Scope OH")
assetnum: Optional[str] = Field(None, description="Assetnum")
ohSessionId: Optional[UUID] = Field(None, description="Scope OH")
costPerFailure: Optional[float] = Field(0,
description="Cost per failure")
@ -59,8 +58,8 @@ class CalculationResultsRead(CalculationTimeConstrainsBase):
class CalculationTimeConstrainsRead(CalculationTimeConstrainsBase):
id: Union[UUID, str]
name: str
reference: str
results: List[CalculationResultsRead]
results: List[Any]
equipment_results: List[Any]
optimumOh: Dict[str, Any]
@ -70,3 +69,21 @@ class CalculationTimeConstrainsCreate(CalculationTimeConstrainsBase):
class CalculationTimeConstrainsSimulationRead(CalculationTimeConstrainsBase):
simulation: CalculationResultsRead
@dataclass
class EquipmentResult:
corrective_costs: List[float]
overhaul_costs: List[float]
daily_failures: List[float]
equipment_id: str
material_cost: float
service_cost: float
optimum: OptimumResult # Added optimum result for each equipment
@dataclass
class OptimumResult:
overhaul_cost: float
corrective_cost: float
num_failures: int
days: int

@ -1,12 +1,13 @@
from typing import Optional
from typing import List, Optional
from uuid import UUID
import numpy as np
from sqlalchemy import and_, func, select
from sqlalchemy.orm import joinedload
from src.database.core import DbSession
from src.overhaul_activity.service import get_all_by_session_id
from src.scope_equipment.model import ScopeEquipment
from src.workorder.model import MasterWorkOrder
from .schema import CalculationTimeConstrainsParametersCreate, CalculationTimeConstrainsRead
from .schema import CalculationTimeConstrainsParametersCreate, CalculationTimeConstrainsRead, EquipmentResult, OptimumResult
from .model import CalculationParam, OverhaulReferenceType, CalculationData, CalculationResult
from fastapi import HTTPException, status
from src.overhaul_scope.service import get_by_scope_name, get
@ -19,7 +20,7 @@ def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int) -> list:
return results
def get_corrective_cost_time_chart(cost_per_failure: float, days: int) -> list:
def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int) -> Tuple[np.ndarray, np.ndarray]:
day_points = np.arange(0, days)
# Parameters for failure rate
@ -28,16 +29,16 @@ def get_corrective_cost_time_chart(cost_per_failure: float, days: int) -> list:
grace_period = 15 # Days before failures start increasing significantly
# Calculate daily failure rate using sigmoid function
daily_failure_rate = base_rate / \
(1 + np.exp(-acceleration * (day_points - grace_period)/days))
daily_failure_rate = base_rate / (1 + np.exp(-acceleration * (day_points - grace_period)/days))
# Calculate cumulative failures
failure_counts = np.cumsum(daily_failure_rate)
# Calculate corrective costs based on cumulative failures and cost per failure
# Calculate corrective costs based on cumulative failures and combined costs
cost_per_failure = material_cost + service_cost
corrective_costs = failure_counts * cost_per_failure
return corrective_costs, failure_counts
return corrective_costs, daily_failure_rate
async def create_param_and_data(*, db_session: DbSession, calculation_param_in: CalculationTimeConstrainsParametersCreate, created_by: str, parameter_id: Optional[UUID] = None):
@ -62,13 +63,9 @@ async def create_param_and_data(*, db_session: DbSession, calculation_param_in:
detail="Scope not found"
)
overhaul_reference_type = OverhaulReferenceType.SCOPE if scope is not None else OverhaulReferenceType.ASSET
reference_id = scope.id if scope else calculation_param_in.assetnum
calculationData = await CalculationData.create_with_param(
db=db_session,
overhaul_reference_type=overhaul_reference_type,
reference_id=reference_id,
overhaul_session_id=calculation_param_in.ohSessionId,
avg_failure_cost=calculation_param_in.costPerFailure,
overhaul_cost=calculation_param_in.overhaulCost,
created_by=created_by,
@ -116,58 +113,160 @@ async def get_calculation_data_by_id(db_session: DbSession, calculation_id) -> C
return result.unique().scalar()
async def create_calculation_result_service(db_session: DbSession, calculation_id: UUID, costPerFailure: Optional[float] = None):
days = 60
# async def create_calculation_result_service(db_session: DbSession, calculation_id: UUID, costPerFailure: Optional[float] = None):
# days = 360
# calculation = await get_calculation_data_by_id(db_session=db_session, calculation_id=calculation_id)
# # reference = await get_by_assetnum(db_session=db_session, assetnum=calculation.reference_id) if calculation.overhaul_reference_type == OverhaulReferenceType.ASSET else await get(db_session=db_session, scope_id=calculation.reference_id)
# # Multiple Eequipment
# equipments_scope = get_all_by_session_id(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id)
# # Parameter
# overhaulCost = calculation.parameter.overhaul_cost
# costPerFailure = costPerFailure if costPerFailure else calculation.parameter.avg_failure_cost
# overhaul_cost_points = get_overhaul_cost_by_time_chart(
# overhaulCost, days=days)
# for eq in equipments_scope:
# corrective_cost_points, dailyNumberOfFailure = get_corrective_cost_time_chart(
# costPerFailure, days)
# total_cost = overhaul_cost_points + corrective_cost_points
# optimumOHIndex = np.argmin(total_cost)
# numbersOfFailure = sum(dailyNumberOfFailure[:optimumOHIndex])
# optimum = {
# "overhaulCost": float(overhaul_cost_points[optimumOHIndex]),
# "correctiveCost": float(corrective_cost_points[optimumOHIndex]),
# "numOfFailures": int(numbersOfFailure),
# "days": int(optimumOHIndex+1)
# }
# calculation_results = []
# for i in range(days):
# result = CalculationResult(
# parameter_id=calculation.parameter_id,
# calculation_data_id=calculation.id,
# day=(i + 1),
# corrective_cost=float(corrective_cost_points[i]),
# overhaul_cost=float(overhaul_cost_points[i]),
# num_failures=int(dailyNumberOfFailure[i]),
# )
# calculation_results.append(result)
# calculation.optimum_oh_day = int(optimumOHIndex+1)
# db_session.add_all(calculation_results)
# await db_session.commit()
# return CalculationTimeConstrainsRead(
# id=calculation.id,
# name=reference.scope_name if hasattr(
# reference, "scope_name") else reference.master_equipment.name,
# reference=reference.assetnum if hasattr(
# reference, "assetnum") else reference.scope_name,
# results=calculation_results,
# optimumOh=optimum
# )
async def create_calculation_result_service(
db_session: DbSession,
calculation_id: UUID,
) -> CalculationTimeConstrainsRead:
days = 365 # Changed to 365 days as per requirement
calculation = await get_calculation_data_by_id(db_session=db_session, calculation_id=calculation_id)
reference = await get_by_assetnum(db_session=db_session, assetnum=calculation.reference_id) if calculation.overhaul_reference_type == OverhaulReferenceType.ASSET else await get(db_session=db_session, scope_id=calculation.reference_id)
# Parameter
overhaulCost = calculation.parameter.overhaul_cost
costPerFailure = costPerFailure if costPerFailure else calculation.parameter.avg_failure_cost
# Get all equipment for this calculation session
equipments = await get_all_by_session_id(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id)
# Calculate overhaul costs once since it's shared
overhaul_cost_points = get_overhaul_cost_by_time_chart(
overhaulCost, days=days)
calculation.parameter.overhaul_cost,
days=days
)
# Store results for each equipment
equipment_results: List[EquipmentResult] = []
total_corrective_costs = np.zeros(days)
total_daily_failures = np.zeros(days)
# Calculate for each equipment
for eq in equipments:
corrective_costs, daily_failures = get_corrective_cost_time_chart(
material_cost=eq.material_cost,
service_cost=eq.service_cost,
days=days
)
corrective_cost_points, dailyNumberOfFailure = get_corrective_cost_time_chart(
costPerFailure, days)
# Calculate individual equipment optimum points
equipment_total_cost = corrective_costs + overhaul_cost_points
equipment_optimum_index = np.argmin(equipment_total_cost)
equipment_failure_sum = sum(daily_failures[:equipment_optimum_index])
total_cost = overhaul_cost_points + corrective_cost_points
equipment_optimum = OptimumResult(
overhaul_cost=float(overhaul_cost_points[equipment_optimum_index]),
corrective_cost=float(corrective_costs[equipment_optimum_index]),
num_failures=int(equipment_failure_sum),
days=int(equipment_optimum_index + 1)
)
optimumOHIndex = np.argmin(total_cost)
numbersOfFailure = sum(dailyNumberOfFailure[:optimumOHIndex])
equipment_results.append(EquipmentResult(
corrective_costs=corrective_costs.tolist(),
overhaul_costs=overhaul_cost_points.tolist(),
daily_failures=daily_failures.tolist(),
equipment_id=eq.id,
material_cost=eq.material_cost,
service_cost=eq.service_cost,
optimum=equipment_optimum
))
optimum = {
"overhaulCost": float(overhaul_cost_points[optimumOHIndex]),
"correctiveCost": float(corrective_cost_points[optimumOHIndex]),
"numOfFailures": int(numbersOfFailure),
"days": int(optimumOHIndex+1)
}
# Add to totals
total_corrective_costs += corrective_costs
total_daily_failures += daily_failures
# Calculate optimum points using total costs
total_cost = total_corrective_costs + overhaul_cost_points
optimum_oh_index = np.argmin(total_cost)
numbers_of_failure = sum(total_daily_failures[:optimum_oh_index])
optimum = OptimumResult(
overhaul_cost=float(overhaul_cost_points[optimum_oh_index]),
corrective_cost=float(total_corrective_costs[optimum_oh_index]),
num_failures=int(numbers_of_failure),
days=int(optimum_oh_index + 1)
)
# Create calculation results for database
calculation_results = []
for i in range(days):
result = CalculationResult(
parameter_id=calculation.parameter_id,
calculation_data_id=calculation.id,
day=(i + 1),
corrective_cost=float(corrective_cost_points[i]),
corrective_cost=float(total_corrective_costs[i]),
overhaul_cost=float(overhaul_cost_points[i]),
num_failures=int(dailyNumberOfFailure[i]),
num_failures=int(total_daily_failures[i]),
)
calculation_results.append(result)
calculation.optimum_oh_day = int(optimumOHIndex+1)
# Update calculation with optimum day
calculation.optimum_oh_day = optimum.days
# Save to database
db_session.add_all(calculation_results)
await db_session.commit()
# Return results including individual equipment data
return CalculationTimeConstrainsRead(
id=calculation.id,
name=reference.scope_name if hasattr(
reference, "scope_name") else reference.master_equipment.name,
reference=reference.assetnum if hasattr(
reference, "assetnum") else reference.scope_name,
name=calculation.name,
reference=calculation.overhaul_session_id,
results=calculation_results,
optimumOh=optimum
optimum_oh=optimum,
equipment_results=equipment_results
)

@ -49,6 +49,14 @@ async def get_all(*, common: CommonParameters, overhaul_session_id: UUID, assetn
return results
async def get_all_by_session_id(*, db_session:DbSession, overhaul_session_id):
query = Select(OverhaulActivity).where(
OverhaulActivity.overhaul_scope_id == overhaul_session_id).options(joinedload(OverhaulActivity.equipment))
results = await db_session.execute(query)
return results.scalars().all()
# async def create(*, db_session: DbSession, overhaul_activty_in: OverhaulActivityCreate, overhaul_session_id: UUID):
# # Check if the combination of assetnum and activity_id already exists
# existing_equipment_query = (

Loading…
Cancel
Save