feature/reliability_stat
Cizz22 4 months ago
parent 0dc09ab73c
commit 3fa022f08f

@ -2,6 +2,9 @@ import datetime
from typing import Coroutine, List, Optional, Tuple,Dict from typing import Coroutine, List, Optional, Tuple,Dict
from uuid import UUID from uuid import UUID
import calendar import calendar
import httpx
from src.calculation_target_reliability.service import RBD_SERVICE_API
from src.config import REALIBILITY_SERVICE_API from src.config import REALIBILITY_SERVICE_API
import numpy as np import numpy as np
import requests import requests
@ -24,7 +27,7 @@ from .schema import (CalculationResultsRead,
CalculationTimeConstrainsParametersCreate, CalculationTimeConstrainsParametersCreate,
CalculationTimeConstrainsRead, OptimumResult) CalculationTimeConstrainsRead, OptimumResult)
from .utils import get_months_between from .utils import calculate_failures_per_month, create_time_series_data, get_months_between
from src.equipment_sparepart.model import ScopeEquipmentPart from src.equipment_sparepart.model import ScopeEquipmentPart
import copy import copy
import random import random
@ -38,7 +41,9 @@ import aiohttp
from datetime import datetime, date from datetime import datetime, date
import asyncio import asyncio
import json import json
from src.utils import save_to_pastebin # from src.utils import save_to_pastebin
client = httpx.AsyncClient(timeout=300.0)
# class ReliabilityService: # class ReliabilityService:
# """Service class for handling reliability API calls""" # """Service class for handling reliability API calls"""
@ -1033,69 +1038,37 @@ class OptimumCostModel:
return fr return fr
def _calculate_costs_vectorized(self, failure_rate: Dict[datetime, float], def _calculate_costs_vectorized(self, failures_prediction: List[Dict],
preventive_cost: float, failure_replacement_cost: float) -> List[Dict]: preventive_cost: float, failure_replacement_cost: float) -> List[Dict]:
valid_data = [(date, rel) for date, rel in failure_rate.items() if rel is not None]
if not valid_data: # Extract data from failures_prediction
return [] months = [item['month'] for item in failures_prediction]
failure_counts = [item['failures'] for item in failures_prediction]
# Sort by date (past to future)
valid_data.sort(key=lambda x: x[0]) # Calculate failure costs for each month
dates, reliability_values = zip(*valid_data) failure_costs = [count * failure_replacement_cost for count in failure_counts]
dates = np.array(dates)
reliability_values = np.array(reliability_values) # Calculate preventive costs (distributed equally across months)
num_months = np.array([i+1 for i in range(len(failures_prediction))])
# Calculate days from last OH preventive_costs = preventive_cost/num_months
days_from_last_oh = np.array([(date - self.last_oh_date).days for date in dates])
date_point = np.array([i+1 for i in range(len(dates))])
# Calculate failure probabilities
failure_probs = 1 - reliability_values
# Calculate expected operating times using trapezoidal integration
# This is the denominator: ∫₀ᵀ R(t) dt for each T
# expected_operating_times = np.zeros_like(days_from_last_oh, dtype=float)
# for i in range(len(days_from_last_oh)):
# # Time points from 0 to current point T
# time_points = [(d - self.last_oh_date).days for d in dates[:i+1]]
# # Reliability values (assuming reliability at time 0 is 1.0)
# rel_values = reliability_values[:i+1]
# # Calculate expected operating time up to this point
# expected_operating_times[i] = np.trapz(rel_values, time_points)
# Calculate costs according to the formula
# Failure cost = (1-R(T)) × IDRu / ∫₀ᵀ R(t) dt
# failure_costs = (failure_rate * failure_replacement_cost * expected_operating_times)
# # Preventive cost = R(T) × IDRp / ∫₀ᵀ R(t) dt
# preventive_costs = (reliability_values * preventive_cost) / expected_operating_times
failure_costs = reliability_values * failure_replacement_cost * days_from_last_oh
preventive_costs = preventive_cost / date_point
# Total cost = Failure cost + Preventive cost # Total cost = Failure cost + Preventive cost
total_costs = failure_costs + preventive_costs total_costs = [failure_costs[i] + preventive_costs[i] for i in range(len(num_months))]
# Convert back to list of dictionaries # Convert back to list of dictionaries
results = [] results = []
for i in range(len(dates)): for i in range(len(failures_prediction)):
results.append({ results.append({
'date': dates[i], 'month': months[i],
'days_from_last_oh': days_from_last_oh[i], 'number_of_failure': failure_counts[i],
'failure_rate': reliability_values[i],
'failure_probability': failure_probs[i],
'number_of_failure': round(reliability_values[i] * days_from_last_oh[i]),
'expected_operating_time': days_from_last_oh[i],
'failure_replacement_cost': failure_costs[i], 'failure_replacement_cost': failure_costs[i],
'preventive_replacement_cost': preventive_costs[i], 'preventive_replacement_cost': preventive_costs[i],
'total_cost': total_costs[i], 'total_cost': total_costs[i],
'procurement_cost': 0, 'procurement_cost': 0,
'procurement_details': [] 'procurement_details': []
}) })
return results return results
def _find_optimal_timing_vectorized(self, results: List[Dict]) -> Optional[Dict]: def _find_optimal_timing_vectorized(self, results: List[Dict]) -> Optional[Dict]:
@ -1112,14 +1085,12 @@ class OptimumCostModel:
return { return {
'optimal_index': min_idx, 'optimal_index': min_idx,
'optimal_date': optimal_result['date'], 'optimal_month': optimal_result['month'],
'days_from_last_oh': optimal_result['days_from_last_oh'],
'failure_rate': optimal_result['failure_rate'],
'number_of_failure': optimal_result['number_of_failure'], 'number_of_failure': optimal_result['number_of_failure'],
'failure_cost': optimal_result['failure_replacement_cost'], 'failure_cost': optimal_result['failure_replacement_cost'],
'preventive_cost': optimal_result['preventive_replacement_cost'], 'preventive_cost': optimal_result['preventive_replacement_cost'],
'total_cost': optimal_result['total_cost'], 'total_cost': optimal_result['total_cost'],
'procurement_cost': 0 'procurement_cost': optimal_result['procurement_cost']
} }
def _get_number_of_failure_after_last_oh(self, target_date, token, location_tag): def _get_number_of_failure_after_last_oh(self, target_date, token, location_tag):
@ -1138,6 +1109,41 @@ class OptimumCostModel:
return data['data']['value'] return data['data']['value']
async def get_failures_prediction(self, simulation_id: str, location_tag):
# calc_result_url = f"{RBD_SERVICE_API}/aeros/simulation/result/calc/{simulation_id}?nodetype=RegularNode"
plot_result_url = f"{RBD_SERVICE_API}/aeros/simulation/result/plot/{simulation_id}/{location_tag}?use_location_tag=1"
# calc_plant_result = f"{RBD_SERVICE_API}/aeros/simulation/result/calc/{simulation_id}/plant"
try:
response = requests.get(
plot_result_url,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.token}",
},
timeout=10
)
response.raise_for_status()
prediction_data = response.json()
except (requests.RequestException, ValueError) as e:
raise Exception(f"Failed to fetch or parse prediction data: {e}")
if not prediction_data["data"]:
results = []
for i in range(1, 33):
results.append({
"month": i,
"failures": 0
})
return results
plot_data = prediction_data['data']['timestamp_outs']
time_series = create_time_series_data(plot_data)
return calculate_failures_per_month(time_series)
async def calculate_cost_all_equipment( async def calculate_cost_all_equipment(
self, self,
@ -1160,8 +1166,7 @@ class OptimumCostModel:
print(f"Starting reliability data fetch for {len(equipments)} equipment...") print(f"Starting reliability data fetch for {len(equipments)} equipment...")
# Fetch all reliability data concurrently # Fetch all reliability data concurrently
all_reliabilities = await self._get_reliability_equipment_batch(location_tags) # all_reliabilities = await self._get_reliability_equipment_batch(location_tags)
print("Processing cost calculations...") print("Processing cost calculations...")
@ -1178,16 +1183,22 @@ class OptimumCostModel:
cost_per_failure = equipment.material_cost cost_per_failure = equipment.material_cost
overhaul_cost = equipment.overhaul_cost overhaul_cost = equipment.overhaul_cost
service_cost = equipment.service_cost service_cost = equipment.service_cost
failures_prediction = await self.get_failures_prediction(
simulation_id = "864f1558-79cc-40b1-8d1f-8e7339d9d8ce",
location_tag=location_tag
)
# Get pre-fetched reliability data # # Get pre-fetched reliability data
failure_rate = all_reliabilities.get(location_tag, {}) # failure_rate = all_reliabilities.get(location_tag, {})
# failure_rate = self._get_equipment_fr(location_tag, self.token) # # failure_rate = self._get_equipment_fr(location_tag, self.token)
# Calculate costs using vectorized operations # Calculate costs using vectorized operations
predicted_costs = self._calculate_costs_vectorized( predicted_costs = self._calculate_costs_vectorized(
preventive_cost=overhaul_cost + service_cost, preventive_cost=overhaul_cost + service_cost,
failure_replacement_cost=cost_per_failure, failure_replacement_cost=cost_per_failure,
failure_rate=failure_rate failures_prediction=failures_prediction
) )
if not predicted_costs: if not predicted_costs:

@ -7,3 +7,84 @@ def get_months_between(start_date: datetime.datetime, end_date: datetime.datetim
months = (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month) months = (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month)
# Add 1 to include both start and end months # Add 1 to include both start and end months
return months return months
def create_time_series_data(chart_data, max_hours=24096):
# Filter out data points with currentEvent = "ON_OH"
filtered_data = [data for data in chart_data if data['currentEvent'] != 'ON_OH']
# Sort filtered data by cumulative time
sorted_data = sorted(filtered_data, key=lambda x: x['cumulativeTime'])
if not sorted_data:
return []
hourly_data = []
current_state_index = 0
current_flow_rate = sorted_data[0]['flowRate']
current_eq_status = sorted_data[0]['currentEQStatus']
for hour in range(1, max_hours + 1):
# Check if we need to advance to the next state
while (current_state_index < len(sorted_data) - 1 and
hour >= int(sorted_data[current_state_index + 1]['cumulativeTime'])):
current_state_index += 1
current_flow_rate = sorted_data[current_state_index]['flowRate']
current_eq_status = sorted_data[current_state_index]['currentEQStatus']
# Add hourly data point
hourly_data.append({
'hour': hour,
'flowrate': current_flow_rate,
'currentEQStatus': current_eq_status
})
return hourly_data
def calculate_failures_per_month(hourly_data):
"""
Calculate the cumulative number of failures up to each month from hourly data.
A failure is defined as when currentEQStatus = "OoS".
Only counts the start of each failure period (transition from "Svc" to "OoS").
Args:
hourly_data: List of dicts with 'hour', 'flowrate', and 'currentEQStatus' keys
Returns:
List of dicts with 'month' and 'failures' keys (cumulative count)
"""
total_failures = 0
previous_eq_status = None
monthly_data = {}
for data_point in hourly_data:
hour = data_point['hour']
current_eq_status = data_point['currentEQStatus']
# Calculate which month this hour belongs to (1-based)
# Assuming 30 days per month = 720 hours per month
month = ((hour - 1) // 720) + 1
# Check if this is the start of a failure (transition to "OoS")
if current_eq_status == "OoS" and previous_eq_status is not None and previous_eq_status != "OoS":
total_failures += 1
# Special case: if the very first data point is a failure
elif current_eq_status == "OoS" and previous_eq_status is None:
total_failures += 1
# Store the current cumulative count for this month
monthly_data[month] = total_failures
previous_eq_status = current_eq_status
# Convert to list format
result = []
if monthly_data:
max_month = max(monthly_data.keys())
for month in range(1, max_month + 1):
result.append({
'month': month,
'failures': monthly_data.get(month, monthly_data.get(month-1, 0))
})
return result
Loading…
Cancel
Save