feature/reliability_stat
Cizz22 5 months ago
parent e069046d90
commit 8ae93be047

@ -877,7 +877,7 @@ class OptimumCostModel:
retry_delay: float = 1.0
) -> Optional[float]:
date_str = target_date.strftime('%Y-%m-%d %H:%M:%S.%f')
url = f"{self.api_base_url}/calculate/reliability/{location_tag}/{date_str}"
url = f"{self.api_base_url}/calculate/failure-rate/{location_tag}/{date_str}"
for attempt in range(max_retries + 1):
try:
@ -1055,9 +1055,29 @@ class OptimumCostModel:
return fr
def _calculate_costs_vectorized(self, reliabilities: Dict[datetime, float],
preventive_cost: float, failure_replacement_cost: float, failure_rate) -> List[Dict]:
valid_data = [(date, rel) for date, rel in reliabilities.items() if rel is not None]
def _get_failure_rate(self, location_tag: str, token: str):
failure_rate_url = f"{self.api_base_url}/asset/failure-rate/{location_tag}"
try:
response = requests.get(
failure_rate_url,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
timeout=10
)
response.raise_for_status()
result = response.json()
except (requests.RequestException, ValueError) as e:
raise Exception(f"Failed to fetch or parse mdt data: {e}")
fr = result["data"]["failure_rate"]
return fr
def _calculate_costs_vectorized(self, failure_rate: Dict[datetime, float],
preventive_cost: float, failure_replacement_cost: float) -> List[Dict]:
valid_data = [(date, rel) for date, rel in failure_rate.items() if rel is not None]
if not valid_data:
return []
@ -1069,28 +1089,33 @@ class OptimumCostModel:
reliability_values = np.array(reliability_values)
# Calculate days from last OH
days_from_last_oh = np.array([(date - self.last_oh_date).days for date in dates])
days_from_last_oh = np.array([(date - self.last_oh_date).days * 24 for date in dates])
date_point = np.array([i+1 for i in range(len(dates))])
# Calculate failure probabilities
failure_probs = 1 - reliability_values
# Calculate expected operating times using trapezoidal integration
# This is the denominator: ∫₀ᵀ R(t) dt for each T
expected_operating_times = np.zeros_like(days_from_last_oh, dtype=float)
for i in range(len(days_from_last_oh)):
# Time points from 0 to current point T
time_points = [(d - self.last_oh_date).days for d in dates[:i+1]]
# Reliability values (assuming reliability at time 0 is 1.0)
rel_values = reliability_values[:i+1]
# Calculate expected operating time up to this point
expected_operating_times[i] = np.trapz(rel_values, time_points)
# expected_operating_times = np.zeros_like(days_from_last_oh, dtype=float)
# for i in range(len(days_from_last_oh)):
# # Time points from 0 to current point T
# time_points = [(d - self.last_oh_date).days for d in dates[:i+1]]
# # Reliability values (assuming reliability at time 0 is 1.0)
# rel_values = reliability_values[:i+1]
# # Calculate expected operating time up to this point
# expected_operating_times[i] = np.trapz(rel_values, time_points)
# Calculate costs according to the formula
# Failure cost = (1-R(T)) × IDRu / ∫₀ᵀ R(t) dt
failure_costs = (failure_rate * failure_replacement_cost * expected_operating_times)
# Preventive cost = R(T) × IDRp / ∫₀ᵀ R(t) dt
preventive_costs = (reliability_values * preventive_cost) / expected_operating_times
# failure_costs = (failure_rate * failure_replacement_cost * expected_operating_times)
# # Preventive cost = R(T) × IDRp / ∫₀ᵀ R(t) dt
# preventive_costs = (reliability_values * preventive_cost) / expected_operating_times
failure_costs = reliability_values * failure_replacement_cost * days_from_last_oh
preventive_costs = preventive_cost / date_point
# Total cost = Failure cost + Preventive cost
@ -1099,14 +1124,13 @@ class OptimumCostModel:
# Convert back to list of dictionaries
results = []
for i in range(len(dates)):
if i == 0:
continue
results.append({
'date': dates[i],
'days_from_last_oh': days_from_last_oh[i],
'reliability': reliability_values[i],
'days_from_last_oh': days_from_last_oh[i] / 24,
'failure_rate': reliability_values[i],
'failure_probability': failure_probs[i],
'expected_operating_time': expected_operating_times[i],
'number_of_failure': round(reliability_values[i] * days_from_last_oh[i]),
'expected_operating_time': days_from_last_oh[i],
'failure_replacement_cost': failure_costs[i],
'preventive_replacement_cost': preventive_costs[i],
'total_cost': total_costs[i],
@ -1124,6 +1148,7 @@ class OptimumCostModel:
return None
total_costs = np.array([r['total_cost'] for r in results])
min_idx = np.argmin(total_costs)
optimal_result = results[min_idx]
@ -1131,13 +1156,31 @@ class OptimumCostModel:
'optimal_index': min_idx,
'optimal_date': optimal_result['date'],
'days_from_last_oh': optimal_result['days_from_last_oh'],
'reliability': optimal_result['reliability'],
'failure_rate': optimal_result['failure_rate'],
'number_of_failure': optimal_result['number_of_failure'],
'failure_cost': optimal_result['failure_replacement_cost'],
'preventive_cost': optimal_result['preventive_replacement_cost'],
'total_cost': optimal_result['total_cost'],
'procurement_cost': 0
}
def _get_number_of_failure_after_last_oh(self, target_date, token, location_tag):
date = target_date.strftime('%Y-%m-%d %H:%M:%S.%f')
nof = f"http://192.168.1.82:8000/reliability/calculate/failures/{location_tag}/{date}"
header = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + token
}
response = requests.get(nof, headers=header)
data = response.json()
return data['data']['value']
async def calculate_cost_all_equipment(
self,
db_session, # DbSession type
@ -1177,17 +1220,11 @@ class OptimumCostModel:
cost_per_failure = equipment.material_cost
# Get pre-fetched reliability data
reliabilities = all_reliabilities.get(location_tag, {})
failure_rate = self._get_equipment_fr(location_tag, self.token)
if not reliabilities:
self.logger.warning(f"No reliability data for equipment {location_tag}")
continue
failure_rate = all_reliabilities.get(location_tag, {})
# failure_rate = self._get_equipment_fr(location_tag, self.token)
# Calculate costs using vectorized operations
predicted_costs = self._calculate_costs_vectorized(
reliabilities=reliabilities,
preventive_cost=preventive_cost_per_equipment,
failure_replacement_cost=cost_per_failure,
failure_rate=failure_rate
@ -1204,8 +1241,8 @@ class OptimumCostModel:
preventive_costs = [r["preventive_replacement_cost"] for r in predicted_costs]
procurement_costs = [r["procurement_cost"] for r in predicted_costs]
procurement_details = [r["procurement_details"] for r in predicted_costs]
failures = [(1-r["reliability"]) for r in predicted_costs]
total_costs = [r['total_cost'] for r in predicted_costs]
failures = [r["number_of_failure"] for r in predicted_costs]
total_costs_per_equipment = [r['total_cost'] for r in predicted_costs]
# Pad arrays to max_interval length
def pad_array(arr, target_length):
@ -1217,7 +1254,7 @@ class OptimumCostModel:
preventive_costs = pad_array(preventive_costs, max_interval)
procurement_costs = pad_array(procurement_costs, max_interval)
failures = pad_array(failures, max_interval)
total_costs = pad_array(total_costs, max_interval)
total_costs_per_equipment = pad_array(total_costs_per_equipment, max_interval)
fleet_results.append(
CalculationEquipmentResult( # Assuming this class exists
@ -1238,7 +1275,7 @@ class OptimumCostModel:
total_corrective_costs += np.array(corrective_costs)
total_preventive_costs += np.array(preventive_costs)
total_procurement_costs += np.array(procurement_costs)
total_costs += np.array(total_costs)
total_costs += np.array(total_costs_per_equipment)
# Calculate fleet optimal interval
# total_costs = total_corrective_costs + total_preventive_costs + total_procurement_costs
@ -1285,8 +1322,6 @@ async def run_simulation(*, db_session: DbSession, calculation: CalculationData,
db_session=db_session, overhaul_session_id=calculation.overhaul_session_id
)
equipments = equipments[:100]
scope = await get_scope(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id)

File diff suppressed because it is too large Load Diff

@ -25,6 +25,7 @@ from .model import OverhaulActivity
from .schema import (OverhaulActivityCreate, OverhaulActivityRead,
OverhaulActivityUpdate)
import json
async def get(
*, db_session: DbSession, assetnum: str, overhaul_session_id: Optional[UUID] = None
@ -42,7 +43,11 @@ async def get(
result = await db_session.execute(query)
return result.scalar()
def get_cost_per_failute():
with open('src/overhaul_activity/cost_failure.json', 'r') as f:
data = json.load(f)
return data['data']
async def get_all(
*,
@ -97,20 +102,24 @@ async def get_all(
num_equipments = len((await common['db_session'].execute(query)).scalars().all())
material_cost = get_material_cost("B", num_equipments)
data_cost = get_cost_per_failute()
equipments = await search_filter_sort_paginate(model=query, **common)
data = equipments['items']
results = []
for equipment in data:
if not equipment.master_equipment:
continue
cost = next((item for item in data_cost if item['location'] == equipment.location_tag), None)
res = OverhaulActivityRead(
id=equipment.id,
material_cost=material_cost,
material_cost=cost.get('avg_cost', 0) if cost else 0,
service_cost=200000000,
location_tag=equipment.location_tag,
equipment_name=equipment.master_equipment.name if equipment.master_equipment else None,
@ -146,11 +155,13 @@ async def get_standard_scope_by_session_id(*, db_session: DbSession, overhaul_se
material_cost = get_material_cost("B", len(eqs))
results = []
data_cost = get_cost_per_failute()
for equipment in eqs:
cost = next((item for item in data_cost if item['location'] == equipment.location_tag), None)
res = OverhaulActivityRead(
id=equipment.id,
material_cost=material_cost,
material_cost=cost.get('avg_cost', 0) if cost else 0,
service_cost=200000000,
location_tag=equipment.location_tag,
equipment_name=equipment.master_equipment.name if equipment.master_equipment else None,

Loading…
Cancel
Save