fix calculation model

main
Cizz22 1 year ago
parent 5eb1259092
commit 64cb95f8c3

@ -3,7 +3,7 @@ LOG_LEVEL=ERROR
PORT=3020
HOST=0.0.0.0
DATABASE_HOSTNAME=192.168.1.82
DATABASE_HOSTNAME=192.168.1.85
DATABASE_PORT=5432
DATABASE_CREDENTIAL_USER=postgres
DATABASE_CREDENTIAL_PASSWORD=postgres

@ -1,15 +1,34 @@
import numpy as np
from sqlalchemy import Select, func, select
from sqlalchemy.orm import joinedload
from src.workorder.model import MasterWorkOrder
from src.scope_equipment.model import ScopeEquipment
from src.scope.model import Scope
from src.database.core import DbSession
from src.scope.service import get_all
from .schema import CalculationTimeConstrainsParametersRead, CalculationTimeConstrainsCreate
from .service import get_overhaul_cost_by_time_chart, get_corrective_cost_time_chart
from .schema import CalculationTimeConstrainsParametersRead, CalculationTimeConstrainsParametersRetrive, CalculationTimeConstrainsParametersCreate
from .service import get_overhaul_cost_by_time_chart, get_corrective_cost_time_chart, create_param_and_data, get_calculation_data_by_id
from .model import CalculationResult
from .model import CalculationParam
async def get_create_calculation_parameters(*, db_session: DbSession, calculation_id: str):
if calculation_id is not None:
calculation = get_calculation_data_by_id(calculation_id)
if not calculation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return CalculationTimeConstrainsParametersRead(
costPerFailure=calculation.parameter.avg_failure_cost,
overhaulCost=calculation.parameter.overhaul_cost,
reference=calculation
)
async def get_create_calculation_parameters(*, db_session: DbSession):
stmt = (
select(
ScopeEquipment.scope_id,
@ -27,24 +46,27 @@ async def get_create_calculation_parameters(*, db_session: DbSession):
costFailurePerScope = {avaiableScopes.get(
costPerFailure[0]): costPerFailure[1] for costPerFailure in costFailure}
return CalculationTimeConstrainsParametersRead(
return CalculationTimeConstrainsParametersRetrive(
costPerFailure=costFailurePerScope,
availableScopes=avaiableScopes.values(),
recommendedScope="B",
historicalData={
"averageOverhaulCost": 10000000,
"lastCalculation": {
"id": "calc_122",
"date": "2024-10-15",
"scope": "B",
},
},
recommendedScope="A",
# historicalData={
# "averageOverhaulCost": 10000000,
# "lastCalculation": {
# "id": "calc_122",
# "date": "2024-10-15",
# "scope": "B",
# },
# },
)
async def create_calculation(*, db_session: DbSession, calculation_time_constrains_in: CalculationTimeConstrainsCreate):
async def create_calculation(*, db_session: DbSession, calculation_time_constrains_in: CalculationTimeConstrainsParametersCreate, created_by:str):
days = 365
##
calculation_data = await create_param_and_data(
db_session=db_session, calculation_param_in=calculation_time_constrains_in, created_by=created_by)
overhaul_cost_points = get_overhaul_cost_by_time_chart(
calculation_time_constrains_in.overhaulCost, days)
corrective_cost_points, dailyNumberOfFailure = get_corrective_cost_time_chart(
@ -57,36 +79,19 @@ async def create_calculation(*, db_session: DbSession, calculation_time_constrai
# raise Exception(optimumOH)
points = []
calculation_results = []
for i in range(days):
points.append({
"day": int(i+1),
"overhaulCost": float(overhaul_cost_points[i]),
"correctiveCost": float(corrective_cost_points[i]),
"totalCost": float(overhaul_cost_points[i] + corrective_cost_points[i]),
# "dailyFailureRate": float(daily_failure_rate[i]),
# "cumulativeFailures": float(failure_counts[i])
})
return {
"id": "calc_123",
"result": {
"summary": {
"scope": calculation_time_constrains_in.scopeOH,
"numberOfFailures": int(np.floor(numbersOfFailure)),
"optimumOHTime": int(optimumOH+1),
"optimumTotalCost": float(total_cost[optimumOH]),
},
"chartData": points,
# "comparisons": {
# "vsLastCalculation": {
# "costDifference": -50000000,
# "timeChange": "+15 days"
# }
# }
},
"simulationLimits": {
"minInterval": 30,
"maxInterval": 180
}
}
result = CalculationResult(
parameter_id=calculation_data.parameter_id,
calculation_data_id=calculation_data.id,
day=(i + 1),
corrective_cost=float(corrective_cost_points[i]),
overhaul_cost=float(overhaul_cost_points[i]),
num_failures=int(dailyNumberOfFailure[i]),
)
calculation_results.append(result)
db_session.add_all(calculation_results)
await db_session.commit()
return calculation_data.id

@ -0,0 +1,125 @@
from enum import Enum
from typing import List, Union
from sqlalchemy import UUID, Column, Float, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from src.database.core import Base, DbSession
from src.models import DefaultMixin, IdentityMixin, TimeStampMixin, UUIDMixin
class OverhaulReferenceType(str, Enum):
SCOPE = "SCOPE"
ASSET = "ASSET"
class CalculationParam(Base, DefaultMixin, IdentityMixin):
__tablename__ = "oh_ms_calculation_param"
avg_failure_cost = Column(Float, nullable=False)
overhaul_cost = Column(Float, nullable=False)
# Relationships
calculation_data = relationship(
"CalculationData", back_populates="parameter")
results = relationship("CalculationResult", back_populates="parameter")
# @classmethod
# async def create_with_references(
# cls,
# db: DbSession,
# avg_failure_cost: float,
# overhaul_cost: float,
# created_by: str,
# # list of {"reference_type": OverhaulReferenceType, "reference_id": str}
# references: List[dict]
# ):
# # Create parameter
# param = cls(
# avg_failure_cost=avg_failure_cost,
# overhaul_cost=overhaul_cost,
# created_by=created_by
# )
# db.add(param)
# await db.flush() # Flush to get the param.id
# # Create reference links
# for ref in references:
# reference_link = ReferenceLink(
# parameter_id=param.id,
# overhaul_reference_type=ref["reference_type"],
# reference_id=ref["reference_id"]
# )
# db.add(reference_link)
# await db.commit()
# await db.refresh(param)
# return param
class CalculationData(Base, DefaultMixin, IdentityMixin):
__tablename__ = "oh_tr_calculation_data"
parameter_id = Column(UUID(as_uuid=True), ForeignKey(
'oh_ms_calculation_param.id'), nullable=True)
overhaul_reference_type = Column(String(10), nullable=False)
reference_id = Column(UUID(as_uuid=True), nullable=False)
parameter = relationship(
"CalculationParam", back_populates="calculation_data")
scope = relationship("Scope", foreign_keys=[
reference_id], primaryjoin="and_(CalculationData.overhaul_reference_type=='SCOPE', " "CalculationData.reference_id==Scope.id)")
equipment = relationship("ScopeEquipment", foreign_keys=[
reference_id], primaryjoin="and_(CalculationData.overhaul_reference_type=='ASSET', " "CalculationData.reference_id==ScopeEquipment.assetnum)")
@classmethod
async def create_with_param(
cls,
overhaul_reference_type: str,
reference_id: Union[str, UUID],
db: DbSession,
avg_failure_cost: float,
overhaul_cost: float,
created_by: str,
):
# Create Params
params = CalculationParam(
avg_failure_cost=avg_failure_cost,
overhaul_cost=overhaul_cost,
created_by=created_by
)
db.add(params)
await db.flush()
calculation_data = cls(
overhaul_reference_type=overhaul_reference_type,
reference_id=reference_id,
created_by=created_by,
parameter_id=params.id
)
db.add(calculation_data)
await db.commit()
await db.refresh(calculation_data)
return calculation_data
class CalculationResult(Base, DefaultMixin):
__tablename__ = "oh_tr_calculation_result"
parameter_id = Column(UUID(as_uuid=True), ForeignKey(
'oh_ms_calculation_param.id'), nullable=False)
calculation_data_id = Column(UUID(as_uuid=True), ForeignKey(
'oh_tr_calculation_data.id'), nullable=False)
day = Column(Integer, nullable=False)
corrective_cost = Column(Float, nullable=False)
overhaul_cost = Column(Float, nullable=False)
num_failures = Column(Integer, nullable=False)
parameter = relationship("CalculationParam", back_populates="results")
reference_link = relationship("CalculationData")

@ -1,19 +1,30 @@
from typing import Any, Dict
from typing import Any, Dict, Optional, Union
from fastapi import APIRouter, HTTPException, status
from .schema import CalculationTimeConstrainsParametersRead, CalculationTimeConstrainsRead, CalculationTimeConstrainsCreate
from .schema import CalculationTimeConstrainsParametersRead, CalculationTimeConstrainsRead, CalculationTimeConstrainsParametersCreate, CalculationTimeConstrainsParametersRetrive
from src.database.core import DbSession
from src.models import StandardResponse
from src.auth.service import CurrentUser
from .service import create_param_and_data, get_calculation_result
from .flows import get_create_calculation_parameters, create_calculation
from fastapi.params import Query
router = APIRouter()
@router.get("/parameters", response_model=StandardResponse[CalculationTimeConstrainsParametersRead])
async def get_calculation_parameters(db_session: DbSession):
"""Get all calculation parameter pagination."""
@router.post("", response_model=StandardResponse[str])
async def create_calculation_time_constrains(db_session: DbSession, current_user: CurrentUser, calculation_time_constrains_in: CalculationTimeConstrainsParametersCreate):
"""Save calculation time constrains Here"""
calculation_data = await create_calculation(db_session=db_session, calculation_time_constrains_in=calculation_time_constrains_in, created_by=current_user.name)
parameters = await get_create_calculation_parameters(db_session=db_session)
return StandardResponse(data=str(calculation_data), message="Data created successfully")
@router.get("/parameters", response_model=StandardResponse[Union[CalculationTimeConstrainsParametersRetrive, CalculationTimeConstrainsParametersRead]])
async def get_calculation_parameters(db_session: DbSession, calculation_id: Optional[str] = Query(None)):
"""Get all calculation parameter."""
parameters = await get_create_calculation_parameters(db_session=db_session, calculation_id=calculation_id)
return StandardResponse(
data=parameters,
@ -21,32 +32,42 @@ async def get_calculation_parameters(db_session: DbSession):
)
@router.post("", response_model=StandardResponse[CalculationTimeConstrainsRead])
async def create_calculation_time_constrains(db_session: DbSession, calculation_time_constrains_in: CalculationTimeConstrainsCreate):
"""Calculate Here"""
calculation_result = await create_calculation(db_session=db_session, calculation_time_constrains_in=calculation_time_constrains_in)
@router.get("/{calculation_id}", response_model=StandardResponse[CalculationTimeConstrainsRead])
async def get_calculation_results(db_session: DbSession, calculation_id):
results = await get_calculation_result(db_session=db_session, calculation_id=calculation_id)
return StandardResponse(
data=CalculationTimeConstrainsRead(
id=calculation_id,
results=results
),
message="Data retrieved successfully",
)
# @router.post("/parameters", response_model=StandardResponse[str])
# async def create_calculation_time_constrains_parameter(db_session: DbSession, current_user: CurrentUser, calculation_time_constrains_in: CalculationTimeConstrainsParametersCreate):
# """Save parameter Here"""
# parameters = await create_param(db_session=db_session, calculation_param_in=calculation_time_constrains_in, created_by=current_user.name)
return StandardResponse(data=CalculationTimeConstrainsRead(
id=calculation_result["id"],
result=calculation_result["result"],
), message="Data created successfully")
# return StandardResponse(data=str(parameters.id), message="Parameter Data created successfully")
@router.get("/simulation", response_model=StandardResponse[Dict[str, Any]])
async def get_simulation_result():
# @router.get("/simulation", response_model=StandardResponse[Dict[str, Any]])
# async def get_simulation_result():
results = {
"simulation": {
"intervalDays": 45,
"numberOfFailures": 75,
"totalCost": 550000000,
},
"comparison": {
"vsOptimal": {
"failureDifference": 16,
"costDifference": 50000000
},
}
}
# results = {
# "simulation": {
# "intervalDays": 45,
# "numberOfFailures": 75,
# "totalCost": 550000000,
# },
# "comparison": {
# "vsOptimal": {
# "failureDifference": 16,
# "costDifference": 50000000
# },
# }
# }
return StandardResponse(data=results, message="Data retrieved successfully")
# return StandardResponse(data=results, message="Data retrieved successfully")

@ -12,30 +12,54 @@ class CalculationTimeConstrainsBase(DefultBase):
pass
class CalculationTimeConstrainsParametersRead(CalculationTimeConstrainsBase):
costPerFailure: dict = Field(..., description="Cost per failure")
class ReferenceLinkBase(DefultBase):
reference_id: str = Field(..., description="Reference ID")
overhaul_reference_type: str = Field(...,
description="Overhaul reference type")
class CalculationTimeConstrainsParametersRetrive(CalculationTimeConstrainsBase):
# type: ignore
costPerFailure: Union[dict,
float] = Field(..., description="Cost per failure")
availableScopes: List[str] = Field(..., description="Available scopes")
recommendedScope: str = Field(..., description="Recommended scope")
historicalData: Dict[str, Any] = Field(..., description="Historical data")
# historicalData: Dict[str, Any] = Field(..., description="Historical data")
class CalculationTimeConstrainsParametersRead(CalculationTimeConstrainsBase):
costPerFailure: Union[dict,
float] = Field(..., description="Cost per failure")
overhaulCost: Optional[float] = Field(None, description="Overhaul cost")
reference: Optional[List[ReferenceLinkBase]] = Field(
None, description="Reference")
class CalculationTimeConstrainsCreate(CalculationTimeConstrainsBase):
class CalculationTimeConstrainsParametersCreate(CalculationTimeConstrainsBase):
overhaulCost: float = Field(..., description="Overhaul cost")
scopeOH: str = Field(..., description="Scope OH")
scopeOH: Optional[str] = Field(None, description="Scope OH")
assetnum: Optional[str] = Field(None, description="Assetnum")
costPerFailure: float = Field(..., description="Cost per failure")
metadata: Dict[str, Any] = Field(..., description="Metadata")
# class CalculationTimeConstrainsCreate(CalculationTimeConstrainsBase):
# overhaulCost: float = Field(..., description="Overhaul cost")
# scopeOH: str = Field(..., description="Scope OH")
# costPerFailure: float = Field(..., description="Cost per failure")
# metadata: Dict[str, Any] = Field(..., description="Metadata")
class CalculationResultsRead(CalculationTimeConstrainsBase):
day: int
corrective_cost: float
overhaul_cost: float
num_failures: int
class CalculationTimeConstrainsRead(CalculationTimeConstrainsBase):
id: Union[UUID, str]
result: Dict[str, Any]
results: List[CalculationResultsRead]
{
"calculationId": "calc_123",
"intervalDays": 45,
}
class CalculationTimeConstrainsSimulationRead(CalculationTimeConstrainsBase):
simulation: Dict[str, Any]
comparison: Dict[str, Any]
comparison: Dict[str, Any]

@ -1,4 +1,11 @@
import numpy as np
from sqlalchemy import select
from sqlalchemy.orm import joinedload
from src.database.core import DbSession
from .schema import CalculationTimeConstrainsParametersCreate
from .model import CalculationParam, OverhaulReferenceType, CalculationData, CalculationResult
from fastapi import HTTPException, status
from src.scope.service import get_by_scope_name
def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int) -> list:
@ -8,20 +15,75 @@ def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int) -> list:
def get_corrective_cost_time_chart(cost_per_failure: float, days: int) -> list:
day_points = np.arange(0, days)
day_points = np.arange(0, days)
# Parameters for failure rate
base_rate = 1.2 # Base failure rate per day
acceleration = 7.2 # How quickly failure rate increases
grace_period = 50 # Days before failures start increasing significantly
# Calculate daily failure rate using sigmoid function
daily_failure_rate = base_rate / (1 + np.exp(-acceleration * (day_points - grace_period)/days))
daily_failure_rate = base_rate / \
(1 + np.exp(-acceleration * (day_points - grace_period)/days))
# Calculate cumulative failures
failure_counts = np.cumsum(daily_failure_rate)
# Calculate corrective costs based on cumulative failures and cost per failure
corrective_costs = failure_counts * cost_per_failure
return corrective_costs, failure_counts
async def create_param_and_data(*, db_session: DbSession, calculation_param_in: CalculationTimeConstrainsParametersCreate, created_by: str):
"""Creates a new document."""
if calculation_param_in.scopeOH is None and calculation_param_in.assetnum is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Either scope_id or assetnum must be provided"
)
if calculation_param_in.scopeOH is not None and calculation_param_in.assetnum is not None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Only one of scope_id or assetnum must be provided"
)
scope = await get_by_scope_name(db_session=db_session, scope_name=calculation_param_in.scopeOH) if calculation_param_in.scopeOH is not None else None
if calculation_param_in.scopeOH is not None and scope is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Scope not found"
)
overhaul_reference_type = OverhaulReferenceType.SCOPE if scope is not None else OverhaulReferenceType.ASSET
reference_id = scope.id if scope else calculation_param_in.assetnum
calculationData = await CalculationData.create_with_param(
db=db_session,
overhaul_reference_type=overhaul_reference_type,
reference_id=reference_id,
avg_failure_cost=calculation_param_in.costPerFailure,
overhaul_cost=calculation_param_in.overhaulCost,
created_by=created_by,
)
return corrective_costs, failure_counts
return calculationData
async def get_calculation_result(db_session: DbSession, calculation_id: str):
stmt = select(CalculationResult).filter(
CalculationResult.calculation_data_id == calculation_id).order_by(CalculationResult.day)
results = await db_session.execute(stmt)
return results.scalars().all()
async def get_calculation_data_by_id(db_session: DbSession, calculation_id) -> CalculationData:
stmt = select(CalculationData).filter(CalculationData.id ==
calculation_id).options(joinedload(CalculationData.parameter))
result = await db_session.execute(stmt)
return result.unique().scalar()

@ -39,17 +39,17 @@ class UUIDMixin:
default=uuid.uuid4, unique=True, nullable=False)
class DefaultMixin(TimeStampMixin, UUIDMixin):
"""Default mixin"""
pass
class IdentityMixin:
"""Identity mixin"""
created_by = Column(String(100), nullable=True)
updated_by = Column(String(100), nullable=True)
class DefaultMixin(TimeStampMixin, UUIDMixin):
"""Default mixin"""
pass
# Pydantic Models
class DefultBase(BaseModel):
class Config:

Loading…
Cancel
Save