feature/reliability_stat
Cizz22 7 months ago
parent ed8714f69a
commit a8a63e91f4

@ -22,7 +22,7 @@ from .service import (create_calculation_result_service, create_param_and_data,
get_calculation_by_reference_and_parameter,
get_calculation_data_by_id, get_calculation_result,
get_corrective_cost_time_chart,
get_overhaul_cost_by_time_chart)
get_overhaul_cost_by_time_chart, run_simulation)
async def get_create_calculation_parameters(
@ -92,11 +92,14 @@ async def create_calculation(
created_by=created_by,
)
results = await create_calculation_result_service(
# results = await create_calculation_result_service(
# db_session=db_session, calculation=calculation_data, token=token
# )
results = await run_simulation(
db_session=db_session, calculation=calculation_data, token=token
)
return results
return results["id"]
async def get_or_create_scope_equipment_calculation(
@ -115,11 +118,13 @@ async def get_or_create_scope_equipment_calculation(
detail="A data with this id does not exist.",
)
return scope_calculation.id
# Check if calculation already exist
return CalculationTimeConstrainsRead(
id=scope_calculation.id,
reference=scope_calculation.overhaul_session_id,
results=scope_calculation.results,
optimum_oh=scope_calculation.optimum_oh_day,
equipment_results=scope_calculation.equipment_results,
)
# return CalculationTimeConstrainsRead(
# id=scope_calculation.id,
# reference=scope_calculation.overhaul_session_id,
# results=scope_calculation.results,
# optimum_oh=scope_calculation.optimum_oh_day,
# equipment_results=scope_calculation.equipment_results,
# )

@ -140,6 +140,7 @@ class CalculationEquipmentResult(Base, DefaultMixin):
corrective_costs = Column(JSON, nullable=False)
overhaul_costs = Column(JSON, nullable=False)
daily_failures = Column(JSON, nullable=False)
procurement_costs = Column(JSON, nullable=False)
assetnum = Column(String(255), nullable=False)
material_cost = Column(Float, nullable=False)
service_cost = Column(Float, nullable=False)

@ -49,10 +49,7 @@ async def create_calculation_time_constrains(
created_by=current_user.name,
)
if not with_results:
results = str(results.id)
return StandardResponse(data=results, message="Data created successfully")
return StandardResponse(data=str(results), message="Data created successfully")
@router.get(

@ -49,6 +49,7 @@ class CalculationResultsRead(CalculationTimeConstrainsBase):
day: int
corrective_cost: float
overhaul_cost: float
procurement_cost: float
num_failures: int
@ -63,6 +64,7 @@ class EquipmentResult(CalculationTimeConstrainsBase):
id: UUID
corrective_costs: List[float]
overhaul_costs: List[float]
procurement_costs: List[float]
daily_failures: List[float]
assetnum: str
material_cost: float

@ -1,5 +1,5 @@
import datetime
from typing import Coroutine, List, Optional, Tuple
from typing import Coroutine, List, Optional, Tuple,Dict
from uuid import UUID
import calendar
@ -14,7 +14,8 @@ from src.overhaul_activity.service import get_all_by_session_id
from src.overhaul_scope.service import get as get_scope, get_prev_oh
from src.utils import get_latest_numOfFail
from src.workorder.model import MasterWorkOrder
from src.sparepart.model import MasterSparePart
from src.overhaul_activity.model import OverhaulActivity
from .model import (CalculationData, CalculationEquipmentResult,
CalculationResult)
from .schema import (CalculationResultsRead,
@ -23,120 +24,508 @@ from .schema import (CalculationResultsRead,
CalculationTimeConstrainsRead, OptimumResult)
from .utils import get_months_between
from src.scope_equipment_part.model import ScopeEquipmentPart
class ReliabilityService:
"""Service class for handling reliability API calls"""
def __init__(self, base_url: str = "http://192.168.1.82:8000"):
self.base_url = base_url
async def get_number_of_failures(self, location_tag, start_date, end_date, token, max_interval=24):
url_prediction = (
f"http://192.168.1.82:8000/reliability/main/number-of-failures/"
f"{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
)
results = {}
try:
response = requests.get(
url_prediction,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
timeout=10
)
response.raise_for_status()
prediction_data = response.json()
except (requests.RequestException, ValueError) as e:
raise Exception(f"Failed to fetch or parse prediction data: {e}")
if not prediction_data or "data" not in prediction_data or not isinstance(prediction_data["data"], list):
raise Exception("Invalid or empty prediction data format.")
# Since data is cumulative, we need to preserve the decimal values
last_cumulative_value = 0
# Parse prediction data and preserve cumulative nature
for item in prediction_data["data"]:
try:
date = datetime.datetime.strptime(item["date"], "%d %b %Y")
last_day = calendar.monthrange(date.year, date.month)[1]
value = item.get("num_fail", 0)
if date.day == last_day: # End of month
if value is not None and value > 0:
# PRESERVE the decimal values - don't convert to int!
results[date.date()] = round(float(value), 3) # Keep 3 decimal places
last_cumulative_value = float(value)
else:
# If no value, use previous cumulative value
results[date.date()] = last_cumulative_value
except (KeyError, ValueError):
continue
# Fill missing months by continuing the cumulative trend
current = start_date.replace(day=1)
for _ in range(max_interval):
last_day = calendar.monthrange(current.year, current.month)[1]
last_day_date = datetime.date(current.year, current.month, last_day)
if last_day_date not in results:
# Since it's cumulative, add a small increment to continue the trend
# You can adjust this increment based on your typical monthly increase
monthly_increment = 0.05 # Adjust this value based on your data pattern
last_cumulative_value += monthly_increment
results[last_day_date] = round(last_cumulative_value, 3)
else:
# Update our tracking value
last_cumulative_value = results[last_day_date]
# Move to next month
if current.month == 12:
current = current.replace(year=current.year + 1, month=1)
else:
current = current.replace(month=current.month + 1)
# Sort results by date
results = dict(sorted(results.items()))
return results
async def get_equipment_foh(self, location_tag: str, token: str) -> float:
"""
Get forced outage hours for equipment
"""
url = f"{self.base_url}/reliability/asset/mdt/{location_tag}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
result = response.json()
return result["data"]["hours"]
except (requests.RequestException, ValueError) as e:
raise Exception(f"Failed to fetch FOH data for {location_tag}: {e}")
def _parse_failure_predictions(
self,
prediction_data: List[dict],
start_date: datetime.date,
max_interval: int
) -> Dict[datetime.date, int]:
"""
Parse and normalize failure prediction data
"""
results = {}
# Parse prediction data
for item in prediction_data:
try:
date = datetime.datetime.strptime(item["date"], "%d %b %Y").date()
last_day = calendar.monthrange(date.year, date.month)[1]
value = item.get("num_fail", 0)
if date.day == last_day:
if date.month == start_date.month and date.year == start_date.year:
results[date] = 0
else:
results[date] = max(0, int(value)) if value is not None else 0
except (KeyError, ValueError):
continue
# Fill missing months with 0
current = start_date.replace(day=1)
for _ in range(max_interval):
last_day = calendar.monthrange(current.year, current.month)[1]
last_day_date = datetime.date(current.year, current.month, last_day)
if last_day_date not in results:
results[last_day_date] = 0
# Move to next month
if current.month == 12:
current = current.replace(year=current.year + 1, month=1)
else:
current = current.replace(month=current.month + 1)
return dict(sorted(results.items()))
class SparePartsService:
"""Service class for spare parts management and procurement calculations"""
def __init__(self, spare_parts_db: dict):
self.spare_parts_db = spare_parts_db
def calculate_stock_at_date(self, sparepart_id: UUID, target_date: datetime.date):
"""
Calculate projected stock for a spare part at a specific date
"""
if sparepart_id not in self.spare_parts_db:
return 0
spare_part = self.spare_parts_db[sparepart_id]
projected_stock = spare_part.stock
# Add all procurements that arrive by target_date
for procurement in spare_part.data.sparepart_procurements:
eta_date = getattr(procurement, procurement.status, None)
if eta_date and eta_date <= target_date:
projected_stock += procurement.quantity
return projected_stock
async def check_spare_parts_availability(
self,
db_session: DbSession,
equipment: OverhaulActivity,
overhaul_date: datetime.date
) -> Tuple[bool, List[dict]]:
"""
Check if spare parts are available for equipment overhaul at specific date.
If not available, calculate procurement costs needed.
"""
procurement_costs = []
all_available = True
requirements_query = select(ScopeEquipmentPart).where(
ScopeEquipmentPart.assetnum == equipment.assetnum
)
requirements = await db_session.execute(requirements_query)
requirements = requirements.scalars().all()
# def get_overhaul_cost_by_time_chart(
# overhaul_cost: float, days: int, numEquipments: int, decay_base: float = 1.01
# ) -> np.ndarray:
# if overhaul_cost < 0:
# raise ValueError("Overhaul cost cannot be negative")
# if days <= 0:
# raise ValueError("Days must be positive")
# hours = days * 24
# rate = np.arange(1, hours + 1)
for requirement in requirements:
sparepart_id = requirement.sparepart_id
quantity_needed = requirement.required_stock
# cost_per_equipment = overhaul_cost / numEquipments
if sparepart_id not in self.spare_parts_db:
raise Exception(f"Spare part {sparepart_id} not found in database")
# results = cost_per_equipment - ((cost_per_equipment / hours) * rate)
spare_part = self.spare_parts_db[sparepart_id]
available_stock = self.calculate_stock_at_date(sparepart_id, overhaul_date)
# return results
if available_stock < quantity_needed:
# Need to procure additional stock
shortage = quantity_needed - available_stock
procurement_cost = {
"sparepart_id": sparepart_id,
"sparepart_name": spare_part.name,
"quantity": shortage,
"cost_per_unit": spare_part.cost_per_stock,
"total_cost": shortage * spare_part.cost_per_stock,
"description": f"Insufficient projected stock for {spare_part.name} on {overhaul_date} (need: {quantity_needed}, projected: {available_stock})"
}
procurement_costs.append(procurement_cost)
all_available = False
else:
spare_part.stock -= quantity_needed
return all_available, procurement_costs
class OverhaulCalculator:
"""Main calculator for overhaul cost optimization"""
def __init__(
self,
reliability_service: ReliabilityService,
spare_parts_service: SparePartsService
):
self.reliability_service = reliability_service
self.spare_parts_service = spare_parts_service
async def simulate_equipment_overhaul(
self,
db_session: DbSession,
equipment,
preventive_cost: float,
predicted_failures: Dict[datetime.date, int],
interval_months: int,
forced_outage_hours: float,
start_date: datetime.date,
total_months: int = 24
):
"""
Simulate overhaul strategy for specific equipment including spare parts costs
"""
total_preventive_cost = 0
total_corrective_cost = 0
total_procurement_cost = 0
all_procurement_details = []
months_since_overhaul = 0
# Convert failures dict to month-indexed dict
failures_by_month = {
i: val for i, (date, val) in enumerate(sorted(predicted_failures.items()))
}
cost_per_failure = equipment.material_cost
# def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int, numEquipments: int, decay_base: float = 1.1) -> np.ndarray:
# if overhaul_cost < 0:
# raise ValueError("Overhaul cost cannot be negative")
# if days <= 0:
# raise ValueError("Days must be positive")
# Simulate for the total period
for month in range(total_months):
# Calculate current date
current_date = self._add_months_to_date(start_date, month)
# exponents = np.arange(0, days)
# cost_per_equipment = overhaul_cost / numEquipments
# Check if it's time for overhaul
if months_since_overhaul >= interval_months:
# Perform preventive overhaul
total_preventive_cost += preventive_cost
# # Introduce randomness by multiplying with a random factor
# random_factors = np.random.normal(1.0, 0.1, numEquipments) # Mean 1.0, Std Dev 0.1
# results = np.array([cost_per_equipment * factor / (decay_base ** exponents) for factor in random_factors])
# Check spare parts availability and calculate procurement costs
parts_available, procurement_costs = await self.spare_parts_service.check_spare_parts_availability(
db_session,
equipment,
current_date
)
# results = np.where(np.isfinite(results), results, 0)
# return results
# Add procurement costs if parts are not available
if not parts_available:
month_procurement_cost = sum(pc["total_cost"] for pc in procurement_costs)
total_procurement_cost += month_procurement_cost
all_procurement_details.extend(procurement_costs)
# async def get_corrective_cost_time_chart(
# material_cost: float,
# service_cost: float,
# location_tag: str,
# token,
# start_date: datetime.datetime,
# end_date: datetime.datetime
# ) -> Tuple[np.ndarray, np.ndarray]:
# days_difference = (end_date - start_date).days
months_since_overhaul = 0
# url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
# Calculate corrective costs
if months_since_overhaul == 0:
expected_failures = 0 # No failures immediately after overhaul
else:
expected_failures = failures_by_month.get(months_since_overhaul, 0)
equivalent_force_derated_hours = 0 # Can be enhanced based on requirements
failure_cost = (
(expected_failures * cost_per_failure) +
((forced_outage_hours + equivalent_force_derated_hours) * equipment.service_cost)
)
total_corrective_cost += failure_cost
months_since_overhaul += 1
# Calculate monthly averages
monthly_preventive_cost = total_preventive_cost / total_months
monthly_corrective_cost = total_corrective_cost / total_months
monthly_procurement_cost = total_procurement_cost / total_months
monthly_total_cost = monthly_preventive_cost + monthly_corrective_cost + monthly_procurement_cost
return {
"interval_months":interval_months,
"preventive_cost":monthly_preventive_cost,
"corrective_cost":monthly_corrective_cost,
"procurement_cost":monthly_procurement_cost,
"total_cost":monthly_total_cost,
"procurement_details":all_procurement_details
}
async def find_optimal_overhaul_interval(
self,
db_session: DbSession,
equipment,
preventive_cost: float,
predicted_failures: Dict[datetime.date, int],
forced_outage_hours: float,
start_date: datetime.date,
max_interval: int = 24
):
"""
Find optimal overhaul interval by testing different intervals
"""
all_results = []
for interval in range(1, max_interval + 1):
result = await self.simulate_equipment_overhaul(
db_session=db_session,
equipment=equipment,
preventive_cost=preventive_cost,
predicted_failures=predicted_failures,
interval_months=interval,
forced_outage_hours=forced_outage_hours,
start_date=start_date,
total_months=max_interval
)
all_results.append(result)
# Find optimal result (minimum total cost)
optimal_result = min(all_results, key=lambda x: x["total_cost"])
return optimal_result, all_results
async def calculate_fleet_optimization(
self,
db_session: DbSession,
equipments: list,
overhaul_cost: float,
start_date: datetime.date,
end_date: datetime.date,
calculation,
token: str
) -> Dict:
"""
Calculate optimization for entire fleet of equipment
"""
max_interval = self._get_months_between(start_date, end_date)
preventive_cost_per_equipment = overhaul_cost / len(equipments)
fleet_results = []
total_corrective_costs = np.zeros(max_interval)
total_preventive_costs = np.zeros(max_interval)
total_procurement_costs = np.zeros(max_interval)
total_failures = np.zeros(max_interval)
for equipment in equipments:
# Get reliability data
predicted_failures = await self.reliability_service.get_number_of_failures(
location_tag=equipment.equipment.location_tag,
start_date=start_date,
end_date=end_date,
token=token
)
forced_outage_hours = await self.reliability_service.get_equipment_foh(
location_tag=equipment.equipment.location_tag,
token=token
)
# Find optimal interval for this equipment
optimal_result, all_results = await self.find_optimal_overhaul_interval(
db_session=db_session,
equipment=equipment,
preventive_cost=preventive_cost_per_equipment,
predicted_failures=predicted_failures,
forced_outage_hours=forced_outage_hours,
start_date=start_date,
max_interval=max_interval
)
# try:
# response = requests.get(
# url,
# headers={
# "Content-Type": "application/json",
# "Authorization": f"Bearer {token}",
# },
# )
# data = response.json()
# latest_num = data["data"][-1]["num_fail"]
# Aggregate costs
corrective_costs = [r["corrective_cost"] for r in all_results]
preventive_costs = [r["preventive_cost"] for r in all_results]
procurement_costs = [r["procurement_cost"] for r in all_results]
failures = list(predicted_failures.values())
# if not latest_num:
# latest_num = 1
# # Create a complete date range for 2025
# # start_date = datetime.datetime(2025, 1, 1)
# # date_range = [start_date + datetime.timedelta(days=x) for x in range(days_difference)]
fleet_results.append(
CalculationEquipmentResult(
corrective_costs=corrective_costs,
overhaul_costs=preventive_costs,
procurement_costs=procurement_costs,
daily_failures=failures,
assetnum=equipment.assetnum,
material_cost=equipment.material_cost,
service_cost=equipment.service_cost,
optimum_day=optimal_result["interval_months"],
calculation_data_id=calculation.id,
master_equipment=equipment.equipment,
)
)
# # Create a dictionary of existing data
# data_dict = {
# datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"]
# for item in data["data"]
# }
total_corrective_costs += np.array(corrective_costs)
total_preventive_costs += np.array(preventive_costs)
total_procurement_costs += np.array(procurement_costs)
# Calculate fleet optimal interval
total_costs = total_corrective_costs + total_preventive_costs + total_procurement_costs
fleet_optimal_index = np.argmin(total_costs)
db_session.add_all(fleet_results)
await db_session.commit()
return {
'id': calculation.id,
'fleet_results': fleet_results,
'fleet_optimal_interval': fleet_optimal_index + 1,
'fleet_optimal_cost': total_costs[fleet_optimal_index],
'total_corrective_costs': total_corrective_costs.tolist(),
'total_preventive_costs': total_preventive_costs.tolist(),
'total_procurement_costs': total_procurement_costs.tolist(),
}
# # Initialize all months in the range with 0
# monthly_data = {}
# current_date = start_date.replace(day=1)
# while current_date <= end_date:
# monthly_data[current_date] = 0
# # Move to next month
# if current_date.month == 12:
# current_date = datetime.datetime(current_date.year + 1, 1, 1)
# else:
# current_date = datetime.datetime(current_date.year, current_date.month + 1, 1)
def _add_months_to_date(self, start_date: datetime.date, months: int) -> datetime.date:
"""Helper method to add months to a date"""
year = start_date.year
month = start_date.month + months
# # Get the last day's value for each month
# for date in data_dict.keys():
# month_key = datetime.datetime(date.year, date.month, 1)
# if month_key in monthly_data and data_dict[date] is not None:
# # Update only if the value is higher (to get the last day's value)
# monthly_data[month_key] = max(monthly_data[month_key], data_dict[date])
while month > 12:
year += 1
month -= 12
# # Convert to list maintaining chronological order
# complete_data = []
# for month in sorted(monthly_data.keys()):
# complete_data.append(monthly_data[month])
return datetime.date(year, month, start_date.day)
# # Convert to numpy array
# monthly_failure = np.array(complete_data)
def _get_months_between(self, start_date: datetime.date, end_date: datetime.date) -> int:
"""Calculate number of months between two dates"""
return (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month)
# # Calculate corrective costs
# cost_per_failure = (material_cost + service_cost) / latest_num
# if cost_per_failure == 0:
# raise ValueError("Cost per failure cannot be zero")
async def run_simulation(*, db_session: DbSession, calculation: CalculationData, token: str):
equipments = await get_all_by_session_id(
db_session=db_session, overhaul_session_id=calculation.overhaul_session_id
)
# corrective_costs = monthly_failure * cost_per_failure
scope = await get_scope(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id)
prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope)
# return corrective_costs, monthly_failure
calculation_data = await get_calculation_data_by_id(
db_session=db_session, calculation_id=calculation.id
)
# except Exception as e:
# print(f"Error fetching or processing data: {str(e)}")
# raise
sparepars_query = await db_session.execute(
select(MasterSparePart))
spareparts = {
sparepart.id: {
'data': sparepart,
'stock': sparepart.stock
} for sparepart in sparepars_query.scalars().all()
}
reliability_service = ReliabilityService()
spare_parts_service = SparePartsService(spareparts)
optimum_calculator_service = OverhaulCalculator(reliability_service, spare_parts_service)
# Set the date range for the calculation
if prev_oh_scope:
# Start date is the day after the previous scope's end date
start_date = datetime.datetime.combine(prev_oh_scope.end_date + datetime.timedelta(days=1), datetime.time.min)
# End date is the start date of the current scope
end_date = datetime.datetime.combine(scope.start_date, datetime.time.min)
else:
# If there's no previous scope, use the start and end dates from the current scope
start_date = datetime.datetime.combine(scope.start_date, datetime.time.min)
end_date = datetime.datetime.combine(scope.end_date, datetime.time.min)
results = await optimum_calculator_service.calculate_fleet_optimization(
db_session=db_session,
equipments=equipments,
start_date=start_date,
end_date=end_date,
overhaul_cost=calculation_data.parameter.overhaul_cost,
calculation=calculation,
token=token
)
return results
@ -477,32 +866,6 @@ def get_overhaul_cost_by_time_chart(
return results
# def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int, numEquipments: int) -> Tuple[np.ndarray, np.ndarray]:
# day_points = np.arange(0, days)
# # Parameters for failure rate
# base_rate = 0.04 # Base failure rate per day
# acceleration = 0.7 # How quickly failure rate increases
# grace_period = 49 # Days before failures start increasing significantly
# # Calculate daily failure rate using sigmoid function
# daily_failure_rate = base_rate / (1 + np.exp(-acceleration * (day_points - grace_period)/days))
# # Introduce randomness in the failure rate
# random_noise = np.random.normal(0.0, 0.05, (numEquipments, days)) # Mean 0.0, Std Dev 0.05
# daily_failure_rate = daily_failure_rate + random_noise
# daily_failure_rate = np.clip(daily_failure_rate, 0, None) # Ensure failure rate is non-negative
# # Calculate cumulative failures
# failure_counts = np.cumsum(daily_failure_rate)
# # Calculate corrective costs based on cumulative failures and combined costs
# cost_per_failure = material_cost + service_cost
# corrective_costs = failure_counts * cost_per_failure
# return corrective_costs, daily_failure_rate
async def create_param_and_data(
*,
db_session: DbSession,
@ -530,8 +893,6 @@ async def create_param_and_data(
async def get_calculation_result(db_session: DbSession, calculation_id: str):
scope_calculation = await get_calculation_data_by_id(
db_session=db_session, calculation_id=calculation_id
)
@ -570,6 +931,7 @@ async def get_calculation_result(db_session: DbSession, calculation_id: str):
result = {
"overhaul_cost": 0,
"corrective_cost": 0,
"procurement_cost": 0,
"num_failures": 0,
"day": i + 1,
}
@ -581,6 +943,7 @@ async def get_calculation_result(db_session: DbSession, calculation_id: str):
continue
result["corrective_cost"] += float(eq.corrective_costs[i])
result["overhaul_cost"] += float(eq.overhaul_costs[i])
result["procurement_cost"] += float(eq.procurement_costs[i])
result["num_failures"] += int(eq.daily_failures[i])
calculation_results.append(CalculationResultsRead(**result))
@ -625,65 +988,6 @@ async def get_calculation_by_assetnum(
return result.scalar()
# async def create_calculation_result_service(db_session: DbSession, calculation_id: UUID, costPerFailure: Optional[float] = None):
# days = 360
# calculation = await get_calculation_data_by_id(db_session=db_session, calculation_id=calculation_id)
# # reference = await get_by_assetnum(db_session=db_session, assetnum=calculation.reference_id) if calculation.overhaul_reference_type == OverhaulReferenceType.ASSET else await get(db_session=db_session, scope_id=calculation.reference_id)
# # Multiple Eequipment
# equipments_scope = get_all_by_session_id(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id)
# # Parameter
# overhaulCost = calculation.parameter.overhaul_cost
# costPerFailure = costPerFailure if costPerFailure else calculation.parameter.avg_failure_cost
# overhaul_cost_points = get_overhaul_cost_by_time_chart(
# overhaulCost, days=days)
# for eq in equipments_scope:
# corrective_cost_points, dailyNumberOfFailure = get_corrective_cost_time_chart(
# costPerFailure, days)
# total_cost = overhaul_cost_points + corrective_cost_points
# optimumOHIndex = np.argmin(total_cost)
# numbersOfFailure = sum(dailyNumberOfFailure[:optimumOHIndex])
# optimum = {
# "overhaulCost": float(overhaul_cost_points[optimumOHIndex]),
# "correctiveCost": float(corrective_cost_points[optimumOHIndex]),
# "numOfFailures": int(numbersOfFailure),
# "days": int(optimumOHIndex+1)
# }
# calculation_results = []
# for i in range(days):
# result = CalculationResult(
# parameter_id=calculation.parameter_id,
# calculation_data_id=calculation.id,
# day=(i + 1),
# corrective_cost=float(corrective_cost_points[i]),
# overhaul_cost=float(overhaul_cost_points[i]),
# num_failures=int(dailyNumberOfFailure[i]),
# )
# calculation_results.append(result)
# calculation.optimum_oh_day = int(optimumOHIndex+1)
# db_session.add_all(calculation_results)
# await db_session.commit()
# return CalculationTimeConstrainsRead(
# id=calculation.id,
# name=reference.scope_name if hasattr(
# reference, "scope_name") else reference.master_equipment.name,
# reference=reference.assetnum if hasattr(
# reference, "assetnum") else reference.scope_name,
# results=calculation_results,
# optimumOh=optimum
# )
async def get_number_of_failures(location_tag, start_date, end_date, token, max_interval=24):
url_prediction = (
f"http://192.168.1.82:8000/reliability/main/number-of-failures/"
@ -719,8 +1023,12 @@ async def get_number_of_failures(location_tag, start_date, end_date, token, max_
try:
date = datetime.datetime.strptime(item["date"], "%d %b %Y")
last_day = calendar.monthrange(date.year, date.month)[1]
value = item.get("num_fail", 0)
if date.day == last_day:
results[date.date()] = round(item.get("num_fail", 0))
if date.month == start_date.month and date.year == start_date.year:
results[date.date()] = 0
else:
results[date.date()] = 0 if value <= 0 else int(value)
except (KeyError, ValueError):
continue # skip invalid items
@ -744,8 +1052,35 @@ async def get_number_of_failures(location_tag, start_date, end_date, token, max_
return results
async def get_equipment_foh(
location_tag: str,
token: str
):
url_mdt = (
f"http://192.168.1.82:8000/reliability/asset/mdt/{location_tag}"
)
try:
response = requests.get(
url_mdt,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
timeout=10
)
response.raise_for_status()
result = response.json()
except (requests.RequestException, ValueError) as e:
raise Exception(f"Failed to fetch or parse mdt data: {e}")
mdt_data = result["data"]["hours"]
return mdt_data
# Function to simulate overhaul strategy for a single equipment
def simulate_equipment_overhaul(equipment, preventive_cost,predicted_num_failures, interval_months ,total_months=24):
def simulate_equipment_overhaul(equipment, preventive_cost,predicted_num_failures, interval_months, forced_outage_hours_value ,total_months=24):
"""
Simulates overhaul strategy for a specific piece of equipment
and returns the associated costs.
@ -756,7 +1091,7 @@ def simulate_equipment_overhaul(equipment, preventive_cost,predicted_num_failure
failures_by_month = {i: val for i, (date, val) in enumerate(sorted(predicted_num_failures.items()))}
cost_per_failure = equipment.material_cost + equipment.service_cost
cost_per_failure = equipment.material_cost
# Simulate for the total period
for month in range(total_months):
@ -769,13 +1104,15 @@ def simulate_equipment_overhaul(equipment, preventive_cost,predicted_num_failure
if months_since_overhaul == 0:
# Calculate failures for this month based on time since last overhaul
expected_failures = 0
failure_cost = expected_failures * cost_per_failure
equivalent_force_derated_hours = 0
failure_cost = (expected_failures * cost_per_failure) + ((forced_outage_hours_value + equivalent_force_derated_hours) * equipment.service_cost)
total_corrective_cost += failure_cost
else:
# Calculate failures for this month based on time since last overhaul
expected_failures = failures_by_month.get(months_since_overhaul, 0)
failure_cost = expected_failures * cost_per_failure
equivalent_force_derated_hours = 0
failure_cost = (expected_failures * cost_per_failure) + ((forced_outage_hours_value + equivalent_force_derated_hours) * equipment.service_cost)
total_corrective_cost += failure_cost
# Increment time since overhaul
@ -794,7 +1131,6 @@ def simulate_equipment_overhaul(equipment, preventive_cost,predicted_num_failure
}
async def create_calculation_result_service(
db_session: DbSession, calculation: CalculationData, token: str
) -> CalculationTimeConstrainsRead:
@ -849,8 +1185,13 @@ async def create_calculation_result_service(
token=token
)
foh_value = await get_equipment_foh(
location_tag=eq.equipment.location_tag,
token=token
)
for interval in range(1, max_interval+1):
result = simulate_equipment_overhaul(eq, overhaul_cost, predicted_num_failures, interval,total_months=max_interval)
result = simulate_equipment_overhaul(eq, overhaul_cost, predicted_num_failures, interval, foh_value, total_months=max_interval)
corrective_costs.append(result['corrective_cost'])
overhaul_costs.append(result['preventive_cost'])
total.append(result['total_cost'])
@ -882,48 +1223,6 @@ async def create_calculation_result_service(
total_costs += np.array(total_costs)
# corrective_costs, daily_failures = await get_corrective_cost_time_chart(
# material_cost=eq.material_cost,
# service_cost=eq.service_cost,
# token=token,
# location_tag=eq.equipment.location_tag,
# start_date=start_date,
# end_date=end_date
# )
# overhaul_cost_points = get_overhaul_cost_by_time_chart(
# calculation_data.parameter.overhaul_cost,
# months_num=months_num,
# numEquipments=len(equipments),
# )
# # Calculate individual equipment optimum points
# equipment_total_cost = corrective_costs + overhaul_cost_points
# equipment_optimum_index = np.argmin(equipment_total_cost)
# equipment_failure_sum = sum(daily_failures[:equipment_optimum_index])
# equipment_results.append(
# CalculationEquipmentResult(
# corrective_costs=corrective_costs.tolist(),
# overhaul_costs=overhaul_cost_points.tolist(),
# daily_failures=daily_failures.tolist(),
# assetnum=eq.assetnum,
# material_cost=eq.material_cost,
# service_cost=eq.service_cost,
# optimum_day=int(equipment_optimum_index + 1),
# calculation_data_id=calculation.id,
# master_equipment=eq.equipment,
# )
# )
# # Add to totals
# total_corrective_costs += corrective_costs
# total_overhaul_cost += overhaul_cost_points
# total_daily_failures += daily_failures
db_session.add_all(results)
total_costs_point = total_corrective_costs + total_overhaul_costs
@ -939,21 +1238,6 @@ async def create_calculation_result_service(
num_failures=int(numbers_of_failure),
days=int(optimum_oh_index + 1),
)
# # Create calculation results for database
# calculation_results = []
# for i in range(days):
# result = CalculationResult(
# parameter_id=calculation.parameter_id,
# calculation_data_id=calculation.id,
# day=(i + 1),
# corrective_cost=float(total_corrective_costs[i]),
# overhaul_cost=float(overhaul_cost_points[i]),
# num_failures=int(total_daily_failures[i]),
# )
# calculation_results.append(result)
# Update calculation with optimum day
calculation.optimum_oh_day = optimum.days
await db_session.commit()

@ -6,4 +6,4 @@ def get_months_between(start_date: datetime.datetime, end_date: datetime.datetim
"""
months = (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month)
# Add 1 to include both start and end months
return months + 1
return months

@ -25,6 +25,14 @@ class OverhaulActivity(Base, DefaultMixin):
uselist=False, # Add this if it's a one-to-one relationship
)
# sparepart_equipments = relationship(
# "ScopeEquipmentPart",
# lazy="select", # or "joined", "subquery", "dynamic" depending on your needs
# primaryjoin="OverhaulActivity.assetnum == foreign(ScopeEquipmentPart.assetnum)",
# uselist=True
# )
overhaul_scope = relationship(
"OverhaulScope",
lazy="raise",

@ -8,14 +8,14 @@ from src.workorder.model import MasterWorkOrder
class ScopeEquipmentPart(Base, DefaultMixin):
__tablename__ = "oh_tr_scope_equipment_part"
__tablename__ = "oh_ms_scope_equipment_part"
required_stock = Column(Float, nullable=False, default=0)
sparepart_id = Column(UUID(as_uuid=True), ForeignKey("oh_ms_sparepart.id"), nullable=False)
assetnum = Column(String, nullable=False)
stock = Column(Integer, nullable=False, default=0)
location_tag = Column(String, nullable=False)
master_equipments = relationship(
"MasterEquipment",
lazy="raise",
primaryjoin="and_(ScopeEquipmentPart.assetnum == foreign(MasterEquipment.assetnum))",
uselist=False,
part = relationship(
"MasterSparePart",
lazy="selectin",
)

@ -0,0 +1,35 @@
from sqlalchemy import UUID, Column, Float, ForeignKey, Integer, String, Date
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin, TimeStampMixin
from src.workorder.model import MasterWorkOrder
class MasterSparePart(Base, DefaultMixin):
__tablename__ = "oh_ms_sparepart"
assetnum = Column(String, nullable=False)
location_tag = Column(String, nullable=False)
stock = Column(Integer, nullable=False, default=0)
name = Column(String, nullable=False)
cost_per_stock = Column(Float, nullable=False)
unit = Column(String, nullable=False)
sparepart_procurements = relationship("MasterSparepartProcurement", lazy="selectin")
class MasterSparepartProcurement(Base, DefaultMixin):
__tablename__ = "oh_ms_sparepart_procurement"
sparepart_id = Column(
UUID(as_uuid=True),
ForeignKey("oh_ms_sparepart.id", ondelete="cascade"),
nullable=False,
)
quantity = Column(Integer, nullable=False)
status = Column(String, nullable=False)
eta_requisition = Column(Date, nullable=False)
eta_ordered = Column(Date, nullable=True)
eta_received = Column(Date, nullable=True)

@ -0,0 +1,86 @@
from fastapi import APIRouter, HTTPException, Query, status
from src.database.service import (CommonParameters, DbSession,
search_filter_sort_paginate)
from src.models import StandardResponse
from .schema import (ActivityMaster, ActivityMasterCreate,
ActivityMasterPagination)
from .service import create, delete, get, get_all, update
router = APIRouter()
@router.get("", response_model=StandardResponse[ActivityMasterPagination])
async def get_activities(common: CommonParameters):
"""Get all scope activity pagination."""
# return
data = await get_all(common=common)
return StandardResponse(
data=data,
message="Data retrieved successfully",
)
@router.post("", response_model=StandardResponse[ActivityMasterCreate])
async def create_activity(db_session: DbSession, activity_in: ActivityMasterCreate):
activity = await create(db_session=db_session, activty_in=activity_in)
return StandardResponse(data=activity, message="Data created successfully")
@router.get(
"/{scope_equipment_activity_id}", response_model=StandardResponse[ActivityMaster]
)
async def get_activity(db_session: DbSession, activity_id: str):
activity = await get(db_session=db_session, activity_id=activity_id)
if not activity:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(data=activity, message="Data retrieved successfully")
@router.put(
"/{scope_equipment_activity_id}", response_model=StandardResponse[ActivityMaster]
)
async def update_scope(
db_session: DbSession, activity_in: ActivityMasterCreate, activity_id
):
activity = await get(db_session=db_session, activity_id=activity_id)
if not activity:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(
data=await update(
db_session=db_session, activity=activity, activity_in=activity_in
),
message="Data updated successfully",
)
@router.delete(
"/{scope_equipment_activity_id}", response_model=StandardResponse[ActivityMaster]
)
async def delete_scope(db_session: DbSession, activity_id: str):
activity = await get(db_session=db_session, activity_id=activity_id)
if not activity:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=[{"msg": "A data with this id does not exist."}],
)
await delete(db_session=db_session, activity_id=activity_id)
return StandardResponse(message="Data deleted successfully", data=activity)

@ -0,0 +1,75 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from uuid import UUID
from pydantic import BaseModel, Field
from src.models import DefultBase, Pagination
class ActivityMaster(DefultBase):
pass
class ActivityMasterDetail(DefultBase):
name: str
class ActivityMasterCreate(ActivityMaster):
description: str
class ActivityMasterTasks(DefultBase):
description: str
oh_type: str
class ActivityMasterRead(ActivityMaster):
id: UUID
workscope: str
system: str
subsystem: str
tasks: List[ActivityMasterTasks]
class ActivityMasterPagination(Pagination):
items: List[ActivityMasterRead] = []
# {
# "overview": {
# "totalEquipment": 30,
# "nextSchedule": {
# "date": "2025-01-12",
# "Overhaul": "B",
# "equipmentCount": 30
# }
# },
# "criticalParts": [
# "Boiler feed pump",
# "Boiler reheater system",
# "Drum Level (Right) Root Valve A",
# "BCP A Discharge Valve",
# "BFPT A EXH Press HI Root VLV"
# ],
# "schedules": [
# {
# "date": "2025-01-12",
# "Overhaul": "B",
# "status": "upcoming"
# }
# // ... other scheduled overhauls
# ],
# "systemComponents": {
# "boiler": {
# "status": "operational",
# "lastOverhaul": "2024-06-15"
# },
# "turbine": {
# "hpt": { "status": "operational" },
# "ipt": { "status": "operational" },
# "lpt": { "status": "operational" }
# }
# // ... other major components
# }
# }

@ -0,0 +1,60 @@
from typing import Optional
from sqlalchemy import Delete, Select
from sqlalchemy.orm import joinedload, selectinload
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters, search_filter_sort_paginate
from .model import MasterActivity
from .schema import ActivityMaster, ActivityMasterCreate
async def get(*, db_session: DbSession, activity_id: str) -> Optional[ActivityMaster]:
"""Returns a document based on the given document id."""
result = await db_session.get(MasterActivity, activity_id)
return result
async def get_all(common: CommonParameters):
query = Select(MasterActivity)
results = await search_filter_sort_paginate(model=query, **common)
return results
async def create(*, db_session: DbSession, activty_in: ActivityMasterCreate):
activity = MasterActivity(**activty_in.model_dump())
db_session.add(activity)
await db_session.commit()
return activity
async def update(
*,
db_session: DbSession,
activity: MasterActivity,
activity_in: ActivityMasterCreate
):
"""Updates a document."""
data = activity_in.model_dump()
update_data = activity_in.model_dump(exclude_defaults=True)
for field in data:
if field in update_data:
setattr(activity, field, update_data[field])
await db_session.commit()
return activity
async def delete(*, db_session: DbSession, activity_id: str):
"""Deletes a document."""
activity = await db_session.get(MasterActivity, activity_id)
await db_session.delete(activity)
await db_session.commit()
Loading…
Cancel
Save