You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

490 lines
17 KiB
Python

import datetime
from typing import Coroutine, List, Optional, Tuple
from uuid import UUID
import numpy as np
import requests
from fastapi import HTTPException, status
from sqlalchemy import and_, case, func, select, update
from sqlalchemy.orm import joinedload
from src.database.core import DbSession
from src.overhaul_activity.service import get_all_by_session_id
from src.overhaul_scope.service import get as get_scope
from src.workorder.model import MasterWorkOrder
from .model import (CalculationData, CalculationEquipmentResult,
CalculationResult)
from .schema import (CalculationResultsRead,
CalculationSelectedEquipmentUpdate,
CalculationTimeConstrainsParametersCreate,
CalculationTimeConstrainsRead, OptimumResult)
from src.utils import get_latest_numOfFail
def get_overhaul_cost_by_time_chart(
overhaul_cost: float, days: int, numEquipments: int, decay_base: float = 1.01
) -> np.ndarray:
if overhaul_cost < 0:
raise ValueError("Overhaul cost cannot be negative")
if days <= 0:
raise ValueError("Days must be positive")
hours = days * 24
rate = np.arange(1, hours+1)
cost_per_equipment = overhaul_cost / numEquipments
results = cost_per_equipment - ((cost_per_equipment / hours) * rate)
return results
# def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int, numEquipments: int, decay_base: float = 1.1) -> np.ndarray:
# if overhaul_cost < 0:
# raise ValueError("Overhaul cost cannot be negative")
# if days <= 0:
# raise ValueError("Days must be positive")
# exponents = np.arange(0, days)
# cost_per_equipment = overhaul_cost / numEquipments
# # Introduce randomness by multiplying with a random factor
# random_factors = np.random.normal(1.0, 0.1, numEquipments) # Mean 1.0, Std Dev 0.1
# results = np.array([cost_per_equipment * factor / (decay_base ** exponents) for factor in random_factors])
# results = np.where(np.isfinite(results), results, 0)
# return results
async def get_corrective_cost_time_chart(
material_cost: float, service_cost: float, location_tag: str, token, max_days: int
) -> Tuple[np.ndarray, np.ndarray]:
start_date = datetime.datetime(2025, 1, 1)
end_date = start_date + datetime.timedelta(days=max_days)
url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
try:
response = requests.get(
url,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
data = response.json()
## Get latest data fromdata_today
# latest_num_of_fail:float = get_latest_numOfFail(location_tag=location_tag, token=token)
latest_num = data['data'][-1]['num_fail']
if not latest_num:
latest_num = 1
# Create a complete date range for 2024
start_date = datetime.datetime(2025, 1, 1)
date_range = [start_date + datetime.timedelta(days=x) for x in range(max_days)]
# Create a dictionary of existing data
data_dict = {
datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"]
for item in data["data"]
}
# Fill in missing dates with nearest available value
complete_data = []
last_known_value = 0 # Default value if no data is available
not_full_data = []
for date in date_range:
if date in data_dict:
if data_dict[date] is not None:
last_known_value = data_dict[date]
complete_data.append(last_known_value)
else:
complete_data.append(0)
# Convert to numpy array
daily_failure = np.array(complete_data)
hourly_failure = np.repeat(daily_failure, 24) / 24
# failure_counts = np.cumsum(daily_failure)
# Calculate corrective costs
cost_per_failure = (material_cost + service_cost) / latest_num
if cost_per_failure == 0:
raise ValueError("Cost per failure cannot be zero")
corrective_costs = hourly_failure * cost_per_failure
return corrective_costs, hourly_failure
except Exception as e:
print(f"Error fetching or processing data: {str(e)}")
raise
# def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int, numEquipments: int) -> Tuple[np.ndarray, np.ndarray]:
# day_points = np.arange(0, days)
# # Parameters for failure rate
# base_rate = 0.04 # Base failure rate per day
# acceleration = 0.7 # How quickly failure rate increases
# grace_period = 49 # Days before failures start increasing significantly
# # Calculate daily failure rate using sigmoid function
# daily_failure_rate = base_rate / (1 + np.exp(-acceleration * (day_points - grace_period)/days))
# # Introduce randomness in the failure rate
# random_noise = np.random.normal(0.0, 0.05, (numEquipments, days)) # Mean 0.0, Std Dev 0.05
# daily_failure_rate = daily_failure_rate + random_noise
# daily_failure_rate = np.clip(daily_failure_rate, 0, None) # Ensure failure rate is non-negative
# # Calculate cumulative failures
# failure_counts = np.cumsum(daily_failure_rate)
# # Calculate corrective costs based on cumulative failures and combined costs
# cost_per_failure = material_cost + service_cost
# corrective_costs = failure_counts * cost_per_failure
# return corrective_costs, daily_failure_rate
async def create_param_and_data(
*,
db_session: DbSession,
calculation_param_in: CalculationTimeConstrainsParametersCreate,
created_by: str,
parameter_id: Optional[UUID] = None,
):
"""Creates a new document."""
if calculation_param_in.ohSessionId is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="overhaul_session_id is required",
)
calculationData = await CalculationData.create_with_param(
db=db_session,
overhaul_session_id=calculation_param_in.ohSessionId,
avg_failure_cost=calculation_param_in.costPerFailure,
overhaul_cost=calculation_param_in.overhaulCost,
created_by=created_by,
params_id=parameter_id,
)
return calculationData
async def get_calculation_result(db_session: DbSession, calculation_id: str):
days = 667*24
scope_calculation = await get_calculation_data_by_id(
db_session=db_session, calculation_id=calculation_id
)
if not scope_calculation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
scope_overhaul = await get_scope(
db_session=db_session, overhaul_session_id=scope_calculation.overhaul_session_id
)
if not scope_overhaul:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
calculation_results = []
for i in range(days):
result = {
"overhaul_cost": 0,
"corrective_cost": 0,
"num_failures": 0,
"day": i + 1,
}
for eq in scope_calculation.equipment_results:
if not eq.is_included:
continue
result["corrective_cost"] += float(eq.corrective_costs[i])
result["overhaul_cost"] += float(eq.overhaul_costs[i])
result["num_failures"] += int(eq.daily_failures[i])
calculation_results.append(CalculationResultsRead(**result))
# Check if calculation already exist
return CalculationTimeConstrainsRead(
id=scope_calculation.id,
reference=scope_calculation.overhaul_session_id,
scope=scope_overhaul.type,
results=calculation_results,
optimum_oh=scope_calculation.optimum_oh_day,
# equipment_results=scope_calculation.equipment_results,
)
async def get_calculation_data_by_id(
db_session: DbSession, calculation_id
) -> CalculationData:
stmt = (
select(CalculationData)
.filter(CalculationData.id == calculation_id)
.options(
joinedload(CalculationData.equipment_results),
joinedload(CalculationData.parameter),
)
)
result = await db_session.execute(stmt)
return result.unique().scalar()
# async def create_calculation_result_service(db_session: DbSession, calculation_id: UUID, costPerFailure: Optional[float] = None):
# days = 360
# calculation = await get_calculation_data_by_id(db_session=db_session, calculation_id=calculation_id)
# # reference = await get_by_assetnum(db_session=db_session, assetnum=calculation.reference_id) if calculation.overhaul_reference_type == OverhaulReferenceType.ASSET else await get(db_session=db_session, scope_id=calculation.reference_id)
# # Multiple Eequipment
# equipments_scope = get_all_by_session_id(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id)
# # Parameter
# overhaulCost = calculation.parameter.overhaul_cost
# costPerFailure = costPerFailure if costPerFailure else calculation.parameter.avg_failure_cost
# overhaul_cost_points = get_overhaul_cost_by_time_chart(
# overhaulCost, days=days)
# for eq in equipments_scope:
# corrective_cost_points, dailyNumberOfFailure = get_corrective_cost_time_chart(
# costPerFailure, days)
# total_cost = overhaul_cost_points + corrective_cost_points
# optimumOHIndex = np.argmin(total_cost)
# numbersOfFailure = sum(dailyNumberOfFailure[:optimumOHIndex])
# optimum = {
# "overhaulCost": float(overhaul_cost_points[optimumOHIndex]),
# "correctiveCost": float(corrective_cost_points[optimumOHIndex]),
# "numOfFailures": int(numbersOfFailure),
# "days": int(optimumOHIndex+1)
# }
# calculation_results = []
# for i in range(days):
# result = CalculationResult(
# parameter_id=calculation.parameter_id,
# calculation_data_id=calculation.id,
# day=(i + 1),
# corrective_cost=float(corrective_cost_points[i]),
# overhaul_cost=float(overhaul_cost_points[i]),
# num_failures=int(dailyNumberOfFailure[i]),
# )
# calculation_results.append(result)
# calculation.optimum_oh_day = int(optimumOHIndex+1)
# db_session.add_all(calculation_results)
# await db_session.commit()
# return CalculationTimeConstrainsRead(
# id=calculation.id,
# name=reference.scope_name if hasattr(
# reference, "scope_name") else reference.master_equipment.name,
# reference=reference.assetnum if hasattr(
# reference, "assetnum") else reference.scope_name,
# results=calculation_results,
# optimumOh=optimum
# )
async def create_calculation_result_service(
db_session: DbSession, calculation: CalculationData, token: str
) -> CalculationTimeConstrainsRead:
days = 667 # Changed to 365 days as per requirement
# Get all equipment for this calculation session
equipments = await get_all_by_session_id(
db_session=db_session, overhaul_session_id=calculation.overhaul_session_id
)
scope = await get_scope(
db_session=db_session, overhaul_session_id=calculation.overhaul_session_id
)
calculation_data = await get_calculation_data_by_id(
db_session=db_session, calculation_id=calculation.id
)
# Store results for each equipment
equipment_results: List[CalculationEquipmentResult] = []
total_corrective_costs = np.zeros(days*24)
total_daily_failures = np.zeros(days*24)
# Calculate for each equipment
for eq in equipments:
corrective_costs, daily_failures = await get_corrective_cost_time_chart(
material_cost=eq.material_cost,
service_cost=eq.service_cost,
token=token,
location_tag=eq.equipment.location_tag,
max_days=667
)
overhaul_cost_points = get_overhaul_cost_by_time_chart(
calculation_data.parameter.overhaul_cost,
days=667,
numEquipments=len(equipments),
)
# Calculate individual equipment optimum points
equipment_total_cost = corrective_costs + overhaul_cost_points
equipment_optimum_index = np.argmin(equipment_total_cost)
equipment_failure_sum = sum(daily_failures[:equipment_optimum_index])
equipment_results.append(
CalculationEquipmentResult(
corrective_costs=corrective_costs.tolist(),
overhaul_costs=overhaul_cost_points.tolist(),
daily_failures=daily_failures.tolist(),
assetnum=eq.assetnum,
material_cost=eq.material_cost,
service_cost=eq.service_cost,
optimum_day=int(equipment_optimum_index + 1),
calculation_data_id=calculation.id,
master_equipment=eq.equipment,
)
)
# Add to totals
total_corrective_costs += corrective_costs
total_daily_failures += daily_failures
db_session.add_all(equipment_results)
# Calculate optimum points using total costs
total_cost = total_corrective_costs + overhaul_cost_points
optimum_oh_index = np.argmin(total_cost)
numbers_of_failure = sum(total_daily_failures[:optimum_oh_index])
optimum = OptimumResult(
overhaul_cost=float(overhaul_cost_points[optimum_oh_index]),
corrective_cost=float(total_corrective_costs[optimum_oh_index]),
num_failures=int(numbers_of_failure),
days=int(optimum_oh_index + 1),
)
# # Create calculation results for database
# calculation_results = []
# for i in range(days):
# result = CalculationResult(
# parameter_id=calculation.parameter_id,
# calculation_data_id=calculation.id,
# day=(i + 1),
# corrective_cost=float(total_corrective_costs[i]),
# overhaul_cost=float(overhaul_cost_points[i]),
# num_failures=int(total_daily_failures[i]),
# )
# calculation_results.append(result)
# Update calculation with optimum day
calculation.optimum_oh_day = optimum.days
await db_session.commit()
# Return results including individual equipment data
return CalculationTimeConstrainsRead(
id=calculation.id,
reference=calculation.overhaul_session_id,
scope=scope.type,
results=[],
optimum_oh=optimum,
equipment_results=equipment_results,
)
async def get_calculation_by_reference_and_parameter(
*, db_session: DbSession, calculation_reference_id, parameter_id
):
stmt = select(CalculationData).filter(
and_(
CalculationData.reference_id == calculation_reference_id,
CalculationData.parameter_id == parameter_id,
)
)
result = await db_session.execute(stmt)
return result.scalar()
async def get_calculation_result_by_day(
*, db_session: DbSession, calculation_id, simulation_day
):
stmt = select(CalculationResult).filter(
and_(
CalculationResult.day == simulation_day,
CalculationResult.calculation_data_id == calculation_id,
)
)
result = await db_session.execute(stmt)
return result.scalar()
async def get_avg_cost_by_asset(*, db_session: DbSession, assetnum: str):
stmt = select(func.avg(MasterWorkOrder.total_cost_max).label("average_cost")).where(
MasterWorkOrder.assetnum == assetnum
)
result = await db_session.execute(stmt)
return result.scalar_one_or_none()
async def bulk_update_equipment(
*,
db: DbSession,
selected_equipments: List[CalculationSelectedEquipmentUpdate],
calculation_data_id: UUID,
):
# Create a dictionary mapping assetnum to is_included status
case_mappings = {asset.assetnum: asset.is_included for asset in selected_equipments}
# Get all assetnums that need to be updated
assetnums = list(case_mappings.keys())
# Create a list of when clauses for the case statement
when_clauses = [
(CalculationEquipmentResult.assetnum == assetnum, is_included)
for assetnum, is_included in case_mappings.items()
]
# Build the update statement
stmt = (
update(CalculationEquipmentResult)
.where(CalculationEquipmentResult.calculation_data_id == calculation_data_id)
.where(CalculationEquipmentResult.assetnum.in_(assetnums))
.values(
{
"is_included": case(
*when_clauses
) # Unpack the when clauses as separate arguments
}
)
)
await db.execute(stmt)
await db.commit()
return assetnums