feature/reliability_stat
Cizz22 4 months ago
parent 91e97b9107
commit e83e891f8f

@ -21,7 +21,7 @@ from .service import (create_calculation_result_service, create_param_and_data,
get_calculation_by_reference_and_parameter,
get_calculation_data_by_id, get_calculation_result,
get_corrective_cost_time_chart,
get_overhaul_cost_by_time_chart, run_simulation)
get_overhaul_cost_by_time_chart, run_simulation, run_simulation_with_spareparts)
from src.database.core import CollectorDbSession
@ -97,7 +97,7 @@ async def create_calculation(
# results = await create_calculation_result_service(
# db_session=db_session, calculation=calculation_data, token=token
# )
results = await run_simulation(
results = await run_simulation_with_spareparts(
db_session=db_session, calculation=calculation_data, token=token, collector_db_session=collector_db_session
)

@ -46,12 +46,15 @@ class CalculationTimeConstrainsParametersCreate(CalculationTimeConstrainsBase):
class CalculationResultsRead(CalculationTimeConstrainsBase):
day: int
corrective_cost: float
overhaul_cost: float
corrective_cost: float
procurement_cost: float
procurement_details: Optional[Dict[str, Any]]
num_failures: float
day: int
month: int
total_cost: float
procurement_details: Dict
sparepart_summary: dict
class OptimumResult(CalculationTimeConstrainsBase):
@ -74,14 +77,44 @@ class EquipmentResult(CalculationTimeConstrainsBase):
is_included: bool
master_equipment: Optional[MasterEquipmentBase] = Field(None)
class FleetStatistics(CalculationTimeConstrainsBase):
total_equipment: int
included_equipment: int
excluded_equipment: int
equipment_with_sparepart_constraints: int
total_procurement_items: int
critical_procurement_items: int
class OptimalAnalysis(CalculationTimeConstrainsBase):
optimal_month: int
planned_month: Optional[int]
timing_recommendation: str
optimal_total_cost: float
optimal_breakdown: Dict
cost_trend: str
months_from_planned: Optional[int]
cost_savings_vs_planned: Optional[float]
sparepart_impact: Dict
class AnalysisMetadata(CalculationTimeConstrainsBase):
max_interval_months: int
last_overhaul_date: Optional[str]
next_planned_overhaul: str
calculation_type: str
total_equipment_analyzed: int
included_in_optimization: int
class CalculationTimeConstrainsRead(CalculationTimeConstrainsBase):
id: UUID
reference: UUID
scope: str
results: List[CalculationResultsRead]
optimum_oh: int
optimum_oh_month: int
equipment_results: List[EquipmentResult]
optimum_oh: Any
fleet_statistics: dict
optimal_analysis: dict
analysis_metadata: dict
class CalculationTimeConstrainsCreate(CalculationTimeConstrainsBase):

File diff suppressed because it is too large Load Diff

@ -1,4 +1,7 @@
import datetime
import json
import pandas as pd
def get_months_between(start_date: datetime.datetime, end_date: datetime.datetime) -> int:
"""
@ -34,8 +37,8 @@ def create_time_series_data(chart_data, max_hours=24096):
# Add hourly data point
hourly_data.append({
'hour': hour,
'flowrate': current_flow_rate,
'cumulativeTime': hour,
'flowRate': current_flow_rate,
'currentEQStatus': current_eq_status
})
@ -87,4 +90,193 @@ def calculate_failures_per_month(hourly_data):
'failures': monthly_data.get(month, monthly_data.get(month-1, 0))
})
return result
return result
def analyze_monthly_metrics(timestamp_outs):
"""
Analyze time series data to calculate monthly metrics:
1. Failure count per month
2. Cumulative failure count each month
3. Total out-of-service time per month
4. Average flow rate per month
"""
# Check if timestamp_outs is None or empty
if timestamp_outs is None or not timestamp_outs:
# Return empty results with zero values
return {}
# Convert to DataFrame for easier manipulation
df = pd.DataFrame(timestamp_outs)
# Check if DataFrame is empty after creation
if df.empty:
return {}
# Check if required columns exist
required_columns = ['cumulativeTime', 'currentEQStatus', 'flowRate']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
return {}
# Assuming the simulation starts from a reference date (you can modify this)
# For this example, I'll use January 1, 2024 as the start date
start_date = datetime.datetime(2025, 10, 22)
# Convert cumulative hours to actual datetime
df['datetime'] = df['cumulativeTime'].apply(
lambda x: start_date + datetime.timedelta(hours=x)
)
# Extract month-year for grouping
df['month_year'] = df['datetime'].dt.to_period('M')
# Calculate time duration for each record (difference between consecutive cumulative times)
df['duration_hours'] = df['cumulativeTime'].diff().fillna(df['cumulativeTime'].iloc[0])
# Initialize results dictionary
monthly_results = {}
# Track cumulative failures across all months
cumulative_failures = 0
cummulative_oos = 0
# Group by month-year and ensure chronological order
for month_period, group in df.groupby('month_year'):
month_str = str(month_period)
monthly_results[month_str] = {}
# 1. Count failures per month
# A failure is when currentEQStatus changes from "Svc" to "OoS"
status_changes = group['currentEQStatus'].shift() != group['currentEQStatus']
failures = ((group['currentEQStatus'] == 'OoS') & status_changes).sum()
monthly_results[month_str]['failures_count'] = int(failures)
# 2. Add failures to cumulative count
cumulative_failures += failures
monthly_results[month_str]['cumulative_failures'] = int(cumulative_failures)
# 3. Total out-of-service time per month (in hours)
oos_time = group[group['currentEQStatus'] == 'OoS']['duration_hours'].sum()
monthly_results[month_str]['total_oos_hours'] = float(oos_time)
cummulative_oos += oos_time
monthly_results[month_str]['cummulative_oos'] = float(cummulative_oos)
# 4. Average flow rate per month (weighted by duration)
# Calculate weighted average flow rate
total_flow_time = (group['flowRate'] * group['duration_hours']).sum()
total_time = group['duration_hours'].sum()
avg_flow_rate = total_flow_time / total_time if total_time > 0 else 0
monthly_results[month_str]['avg_flow_rate'] = float(avg_flow_rate)
# Additional useful metrics
monthly_results[month_str]['total_hours'] = float(total_time)
monthly_results[month_str]['service_hours'] = float(
group[group['currentEQStatus'] == 'Svc']['duration_hours'].sum()
)
monthly_results[month_str]['availability_percentage'] = float(
(monthly_results[month_str]['service_hours'] / total_time * 100) if total_time > 0 else 0
)
return monthly_results
def calculate_risk_cost_per_failure(monthly_results, birnbaum_importance, energy_price):
"""
Calculate risk cost per failure for each month based on:
1. Equipment capacity contribution to system (flowrate * birnbaum_importance * availability)
2. Capacity lost to downtime per month
3. Energy price
Parameters:
- monthly_results: Output from analyze_monthly_metrics()
- birnbaum_importance: Birnbaum importance factor for this equipment
- energy_price: Price per unit of energy/flow
Returns:
- Dictionary with monthly risk costs and array of risk costs per failure
"""
risk_costs = {}
risk_cost_array = []
for month, data in monthly_results.items():
# Extract monthly data
avg_flow_rate = data['avg_flow_rate']
availability = data['availability_percentage'] / 100 # Convert to decimal
total_oos_hours = data['total_oos_hours']
failures_count = data['failures_count']
# 1. Calculate equipment capacity contribution to system
# Capacity = avg_flowrate * birnbaum_importance * availability
equipment_capacity = avg_flow_rate * birnbaum_importance * availability
# 2. Calculate capacity lost to downtime per month
# Lost capacity = avg_flowrate * birnbaum_importance * downtime_hours
capacity_lost_to_downtime = avg_flow_rate * birnbaum_importance * total_oos_hours
# 3. Calculate total risk cost for the month
# Risk cost = capacity_lost * energy_price
monthly_risk_cost = capacity_lost_to_downtime * energy_price
# 4. Calculate risk cost per failure for this month
if failures_count > 0:
risk_cost_per_failure = monthly_risk_cost / failures_count
else:
# If no failures, set to 0 or use alternative approach
risk_cost_per_failure = 0
# Store results
risk_costs[month] = {
'equipment_capacity': equipment_capacity,
'capacity_lost_to_downtime': capacity_lost_to_downtime,
'monthly_risk_cost': monthly_risk_cost,
'failures_count': failures_count,
'risk_cost_per_failure': risk_cost_per_failure
}
# Add to array
risk_cost_array.append(risk_cost_per_failure)
return {
'monthly_details': risk_costs,
'risk_cost_per_failure_array': risk_cost_array
}
# Example usage:
def get_monthly_risk_analysis(timestamp_outs, birnbaum_importance, energy_price):
"""
Complete analysis combining monthly metrics with risk cost calculation
"""
# Get monthly metrics
monthly_metrics = analyze_monthly_metrics(timestamp_outs)
# Calculate risk costs
risk_analysis = calculate_risk_cost_per_failure(
monthly_metrics,
birnbaum_importance,
energy_price
)
# Combine results for comprehensive view
combined_results = {}
for month in monthly_metrics.keys():
combined_results[month] = {
**monthly_metrics[month],
**risk_analysis['monthly_details'][month]
}
return {
'monthly_data': combined_results,
'risk_cost_array': risk_analysis['risk_cost_per_failure_array']
}
# Usage example:
# birnbaum_importance = 0.85 # Example value
# energy_price = 100 # Example: $100 per unit
#
# results = get_monthly_risk_analysis(timestamp_outs, birnbaum_importance, energy_price)
# risk_cost_array = results['risk_cost_array']
# print("Risk cost per failure each month:", risk_cost_array)

@ -1,4 +1,5 @@
import logging
import sys
from src.config import LOG_LEVEL
from src.enums import OptimumOHEnum
@ -30,3 +31,15 @@ def configure_logging():
# sometimes the slack client can be too verbose
logging.getLogger("slack_sdk.web.base_client").setLevel(logging.CRITICAL)
def setup_logging(logger):
# Your logging configuration here
logger.setLevel(logging.DEBUG)
# Create formatter
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
# Create console handler
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)

@ -1,4 +1,13 @@
from typing import Optional
import asyncio
import logging
from datetime import datetime, timedelta, date
from typing import List, Dict, Optional, Tuple
from collections import defaultdict
from uuid import UUID
import numpy as np
from dataclasses import dataclass
from enum import Enum
from sqlalchemy import Delete, Select, text
from sqlalchemy.orm import joinedload, selectinload
@ -6,13 +15,13 @@ from sqlalchemy.orm import joinedload, selectinload
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.logging import setup_logging
from src.overhaul_scope.service import get as get_scope
from src.overhaul_scope.service import get_prev_oh
# async def get(*, db_session: DbSession, activity_id: str) -> Optional[ActivityMaster]:
# """Returns a document based on the given document id."""
# result = await db_session.get(MasterActivity, activity_id)
# return result
log = logging.getLogger(__name__)
setup_logging(logger=log)
async def get_all(db_session: DbSession):
"""
@ -34,16 +43,20 @@ async def get_all(db_session: DbSession):
h.issue_date as pr_issue_date,
h.status as pr_status,
pl.qty_ordered as pr_qty_ordered,
pl.description
pl.description,
pl.unit_cost,
pl.line_cost
FROM public.maximo_sparepart_pr_po h
JOIN public.maximo_sparepart_pr_po_line pl ON h.num = pl.num
WHERE h.type = 'PR'
AND h.issue_date IS NOT NULL
AND h.num LIKE 'K%'
ORDER BY pl.item_num, TO_DATE(h.issue_date, 'YYYY-MM-DD') DESC
ORDER BY pl.item_num, h.issue_date DESC
)
SELECT DISTINCT ON (pr.item_num)
pr.item_num,
pr.line_cost,
pr.unit_cost,
pr.description,
COALESCE(i.curbaltotal, 0) as current_balance_total,
pr.pr_number,
@ -55,7 +68,7 @@ async def get_all(db_session: DbSession):
END as po_exists,
COALESCE(po.qty_received, 0) as po_qty_received,
COALESCE(po.qty_ordered, 0) as po_qty_ordered,
COALESCE(po.estimated_arrival_date, '') as po_estimated_arrival_date
po.estimated_arrival_date as po_estimated_arrival_date
FROM latest_prs pr
LEFT JOIN public.maximo_inventory i ON pr.item_num = i.itemnum
LEFT JOIN LATERAL (
@ -83,6 +96,8 @@ async def get_all(db_session: DbSession):
spare_parts.append({
"item_num": row.item_num,
"description": row.description,
"line_cost": row.line_cost,
"unit_cost": row.unit_cost,
"current_balance_total": float(row.current_balance_total) if row.current_balance_total is not None else 0.0,
"pr_number": row.pr_number,
"pr_issue_date": row.pr_issue_date,
@ -97,35 +112,765 @@ async def get_all(db_session: DbSession):
# async def create(*, db_session: DbSession, activty_in: ActivityMasterCreate):
# activity = MasterActivity(**activty_in.model_dump())
# db_session.add(activity)
# await db_session.commit()
# return activity
class ProcurementStatus(Enum):
PLANNED = "planned"
ORDERED = "ordered"
RECEIVED = "received"
CANCELLED = "cancelled"
@dataclass
class SparepartRequirement:
"""Sparepart requirement for equipment overhaul"""
sparepart_id: str
quantity_required: int
lead_time: int
sparepart_name: str
@dataclass
class SparepartStock:
"""Current sparepart stock information"""
sparepart_id: str
sparepart_name: str
current_stock: int
unit_cost: float
location: str
@dataclass
class ProcurementRecord:
"""Purchase Order/Purchase Request record"""
po_pr_id: str
sparepart_id: str
sparepart_name: str
quantity: int
unit_cost: float
total_cost: float
order_date: date
expected_delivery_date: date
status: ProcurementStatus
class SparepartManager:
"""Manages sparepart availability and procurement for overhaul optimization"""
def __init__(self, analysis_start_date: date, analysis_window_months: int):
self.analysis_start_date = analysis_start_date
self.analysis_window_months = analysis_window_months
self.logger = log
# Storage for sparepart data
self.sparepart_stocks: Dict[str, SparepartStock] = {}
self.equipment_requirements: Dict[str, List[SparepartRequirement]] = {}
self.procurement_records: List[ProcurementRecord] = []
# Monthly projected stocks
self.projected_stocks: Dict[str, List[int]] = {}
def add_sparepart_stock(self, stock: SparepartStock):
"""Add sparepart stock information"""
self.sparepart_stocks[stock.sparepart_id] = stock
def add_equipment_requirements(self, equipment_tag: str, requirements: List[SparepartRequirement]):
"""Add sparepart requirements for equipment"""
self.equipment_requirements[equipment_tag] = requirements
def add_procurement_record(self, record: ProcurementRecord):
"""Add procurement record (PO/PR)"""
self.procurement_records.append(record)
def _calculate_monthly_deliveries(self) -> Dict[str, Dict[int, int]]:
"""Calculate expected deliveries for each sparepart by month"""
deliveries = defaultdict(lambda: defaultdict(int))
for record in self.procurement_records:
if record.status in [ProcurementStatus.ORDERED, ProcurementStatus.PLANNED]:
# Skip records with no expected delivery date (e.g., still PR stage)
if not record.expected_delivery_date:
continue
months_from_start = (
(record.expected_delivery_date.year - self.analysis_start_date.year) * 12 +
(record.expected_delivery_date.month - self.analysis_start_date.month)
)
if 0 <= months_from_start < self.analysis_window_months:
deliveries[record.sparepart_id][months_from_start] += record.quantity
return deliveries
# async def update(
# *,
# db_session: DbSession,
# activity: MasterActivity,
# activity_in: ActivityMasterCreate
# ):
# """Updates a document."""
# data = activity_in.model_dump()
# update_data = activity_in.model_dump(exclude_defaults=True)
def _project_monthly_stocks(self) -> Dict[str, List[int]]:
"""Project sparepart stock levels for each month"""
projected_stocks = {}
monthly_deliveries = self._calculate_monthly_deliveries()
for sparepart_id, stock_info in self.sparepart_stocks.items():
monthly_stock = []
current_stock = float(stock_info.current_stock)
for month in range(self.analysis_window_months):
# Add any deliveries for this month
if sparepart_id in monthly_deliveries and month in monthly_deliveries[sparepart_id]:
current_stock += float(monthly_deliveries[sparepart_id][month])
monthly_stock.append(current_stock)
# Note: We don't subtract usage here yet - that will be done during optimization
projected_stocks[sparepart_id] = monthly_stock
self.projected_stocks = projected_stocks
return projected_stocks
def check_sparepart_availability(self, equipment_tag: str, target_month: int,
consider_other_overhauls: List[Tuple[str, int]] = None) -> Dict:
"""
Check if spareparts are available for equipment overhaul at target month
Args:
equipment_tag: Equipment location tag
target_month: Month when overhaul is planned (0-based)
consider_other_overhauls: List of (equipment_tag, month) for other planned overhauls
Returns:
Dict with availability status and details
"""
if equipment_tag not in self.equipment_requirements:
return {
'available': True,
'message': f'No sparepart requirements defined for {equipment_tag}',
'missing_parts': [],
'procurement_needed': [],
'total_procurement_cost': 0,
'can_proceed_with_delays': True,
'pr_po_summary': {
'existing_orders': [],
'required_new_orders': [],
'total_existing_value': 0,
'total_new_orders_value': 0
}
}
requirements = self.equipment_requirements[equipment_tag]
missing_parts = []
procurement_needed = []
total_procurement_cost = 0
# Calculate stock after considering other overhauls
adjusted_stocks = self._calculate_adjusted_stocks(target_month, consider_other_overhauls or [])
existing_orders = self._get_existing_orders_for_month(target_month)
pr_po_summary = {
'existing_orders': [],
'required_new_orders': [],
'total_existing_value': 0,
'total_new_orders_value': 0,
'orders_by_status': {
'planned': [],
'ordered': [],
'received': [],
'cancelled': []
}
}
for requirement in requirements:
sparepart_id = requirement.sparepart_id
needed_quantity = requirement.quantity_required
sparepart_name = requirement.sparepart_name
current_stock = adjusted_stocks.get(sparepart_id, 0)
existing_sparepart_orders = [order for order in existing_orders
if order.sparepart_id == sparepart_id]
total_ordered_quantity = sum(order.quantity for order in existing_sparepart_orders
if order.status in [ProcurementStatus.PLANNED, ProcurementStatus.ORDERED])
effective_stock = current_stock + total_ordered_quantity
# if sparepart_id not in adjusted_stocks:
# missing_parts.append({
# 'sparepart_id': sparepart_id,
# 'sparepart_name': requirement.sparepart_name,
# 'required': needed_quantity,
# 'available': 0,
# 'shortage': needed_quantity,
# 'criticality': "warning"
# })
# continue
# available_stock = adjusted_stocks[sparepart_id]
# if available_stock < needed_quantity:
# shortage = needed_quantity - available_stock
# missing_parts.append({
# 'sparepart_id': sparepart_id,
# 'sparepart_name': requirement.sparepart_name,
# 'required': needed_quantity,
# 'available': available_stock,
# 'shortage': shortage,
# 'criticality': "warning"
# })
# # Calculate procurement needs
# if sparepart_id in self.sparepart_stocks:
# stock_info = self.sparepart_stocks[sparepart_id]
# procurement_cost = shortage * stock_info.unit_cost
# total_procurement_cost += procurement_cost
# # Calculate when to order (considering lead time)
# order_month = max(0, target_month - requirement.lead_time)
# procurement_needed.append({
# 'sparepart_id': sparepart_id,
# 'sparepart_name': requirement.sparepart_name,
# 'quantity_needed': shortage,
# 'unit_cost': stock_info.unit_cost,
# 'total_cost': procurement_cost,
# 'order_by_month': order_month,
# 'lead_time_months': requirement.lead_time,
# 'criticality': "warning"
# })
if effective_stock >= needed_quantity:
# Sufficient stock available (including from existing orders)
if existing_sparepart_orders:
# Add existing order info to summary
for order in existing_sparepart_orders:
order_info = {
'po_pr_id': order.po_pr_id,
'sparepart_id': sparepart_id,
'sparepart_name': sparepart_name,
'quantity': order.quantity,
'unit_cost': order.unit_cost,
'total_cost': order.total_cost,
'order_date': order.order_date.isoformat(),
'expected_delivery_date': order.expected_delivery_date.isoformat(),
'status': order.status.value,
'months_until_delivery': self._calculate_months_until_delivery(order.expected_delivery_date, target_month),
'is_on_time': self._is_delivery_on_time(order.expected_delivery_date, target_month),
'usage': 'covers_requirement'
}
pr_po_summary['existing_orders'].append(order_info)
pr_po_summary['total_existing_value'] += order.total_cost
pr_po_summary['orders_by_status'][order.status.value].append(order_info)
else:
# Insufficient stock - need additional procurement
shortage = needed_quantity - effective_stock
missing_parts.append({
'sparepart_id': sparepart_id,
'sparepart_name': sparepart_name,
'required': needed_quantity,
'current_stock': current_stock,
'ordered_quantity': total_ordered_quantity,
'effective_available': effective_stock,
'shortage': shortage,
'criticality': "warning",
'existing_orders': len(existing_sparepart_orders),
'existing_orders_details': [
{
'po_pr_id': order.po_pr_id,
'quantity': order.quantity,
'status': order.status.value,
'expected_delivery': order.expected_delivery_date.isoformat(),
'is_on_time': self._is_delivery_on_time(order.expected_delivery_date, target_month)
} for order in existing_sparepart_orders
]
})
# Calculate additional procurement needed
if sparepart_id in self.sparepart_stocks:
stock_info = self.sparepart_stocks[sparepart_id]
procurement_cost = shortage * stock_info.unit_cost
total_procurement_cost += procurement_cost
# Calculate when to order (considering lead time)
order_month = max(0, target_month - requirement.lead_time)
order_date = self.analysis_start_date + timedelta(days=order_month * 30)
expected_delivery = order_date + timedelta(days=requirement.lead_time * 30)
new_order = {
'sparepart_id': sparepart_id,
'sparepart_name': sparepart_name,
'quantity_needed': shortage,
'unit_cost': stock_info.unit_cost,
'total_cost': procurement_cost,
'order_by_month': order_month,
'recommended_order_date': order_date.isoformat(),
'expected_delivery_date': expected_delivery.isoformat(),
'lead_time_months': requirement.lead_time,
'criticality': "warning",
'urgency': self._calculate_urgency(order_month, target_month),
'reason': f'Additional {shortage} units needed beyond existing orders'
}
procurement_needed.append(new_order)
pr_po_summary['required_new_orders'].append(new_order)
pr_po_summary['total_new_orders_value'] += procurement_cost
# Check for critical parts
critical_missing = [p for p in missing_parts if p['criticality'] == 'critical']
# Generate comprehensive summary
availability_summary = self._generate_comprehensive_availability_message(
missing_parts, critical_missing, pr_po_summary
)
return {
'available': len(critical_missing) == 0,
'total_missing_parts': len(missing_parts),
'critical_missing_parts': len(critical_missing),
'missing_parts': missing_parts,
'procurement_needed': procurement_needed,
'total_procurement_cost': total_procurement_cost,
'can_proceed_with_delays': len(critical_missing) == 0,
'message': availability_summary['message'],
'detailed_message': availability_summary['detailed_message'],
'pr_po_summary': pr_po_summary,
'recommendations': self._generate_procurement_recommendations(pr_po_summary, target_month)
}
def _calculate_months_until_delivery(self, delivery_date: date, target_month: int) -> int:
"""Calculate months from target month to delivery date"""
target_date = self.analysis_start_date + timedelta(days=target_month * 30)
months_diff = (delivery_date.year - target_date.year) * 12 + (delivery_date.month - target_date.month)
return months_diff
# for field in data:
# if field in update_data:
# setattr(activity, field, update_data[field])
def _calculate_urgency(self, order_month: int, target_month: int) -> str:
"""Calculate urgency level for new procurement"""
time_gap = target_month - order_month
if time_gap <= 1:
return "URGENT"
elif time_gap <= 3:
return "HIGH"
elif time_gap <= 6:
return "MEDIUM"
else:
return "LOW"
def _generate_procurement_recommendations(self, pr_po_summary: Dict, target_month: int) -> List[Dict]:
"""Generate procurement recommendations based on analysis"""
recommendations = []
# Check for late deliveries
late_orders = [order for order in pr_po_summary['existing_orders']
if not order['is_on_time']]
if late_orders:
recommendations.append({
'type': 'LATE_DELIVERY_WARNING',
'priority': 'HIGH',
'message': f"{len(late_orders)} existing orders will deliver late",
'details': [f"PO/PR {order['po_pr_id']} for {order['sparepart_name']}"
for order in late_orders],
'action': 'Expedite delivery or find alternative suppliers'
})
# Check for urgent new orders
urgent_orders = [order for order in pr_po_summary['required_new_orders']
if order['urgency'] == 'URGENT']
if urgent_orders:
recommendations.append({
'type': 'URGENT_PROCUREMENT',
'priority': 'CRITICAL',
'message': f"{len(urgent_orders)} spareparts need immediate ordering",
'details': [f"{order['sparepart_name']}: {order['quantity_needed']} units"
for order in urgent_orders],
'action': 'Place orders immediately or consider expedited delivery'
})
# Check for cancelled orders
cancelled_orders = pr_po_summary['orders_by_status'].get('cancelled', [])
if cancelled_orders:
recommendations.append({
'type': 'CANCELLED_ORDER_IMPACT',
'priority': 'MEDIUM',
'message': f"{len(cancelled_orders)} cancelled orders may affect availability",
'details': [f"Cancelled: PO/PR {order['po_pr_id']} for {order['sparepart_name']}"
for order in cancelled_orders],
'action': 'Review impact and place replacement orders if necessary'
})
# Budget recommendations
total_investment = pr_po_summary['total_existing_value'] + pr_po_summary['total_new_orders_value']
if total_investment > 0:
recommendations.append({
'type': 'BUDGET_SUMMARY',
'priority': 'INFO',
'message': f'Total sparepart investment: ${total_investment:,.2f}',
'details': [
f"Existing orders: ${pr_po_summary['total_existing_value']:,.2f}",
f"Additional orders needed: ${pr_po_summary['total_new_orders_value']:,.2f}"
],
'action': 'Ensure budget allocation for sparepart procurement'
})
return recommendations
def _is_delivery_on_time(self, delivery_date: datetime, target_month: int) -> bool:
"""Check if delivery will arrive on time for target month"""
target_date = self.analysis_start_date + timedelta(days=target_month * 30)
del_time = delivery_date.date() if delivery_date else None
return del_time <= target_date if del_time else False
# await db_session.commit()
def _calculate_adjusted_stocks(self, target_month: int, other_overhauls: List[Tuple[str, int]]) -> Dict[str, int]:
"""Calculate stock levels after considering consumption from other planned overhauls"""
adjusted_stocks = {}
for sparepart_id, monthly_stocks in self.projected_stocks.items():
if target_month < len(monthly_stocks):
stock_at_month = monthly_stocks[target_month]
# Subtract consumption from other overhauls happening at or before target month
for other_equipment, other_month in other_overhauls:
if other_month <= target_month and other_equipment in self.equipment_requirements:
for req in self.equipment_requirements[other_equipment]:
if req.sparepart_id == sparepart_id:
stock_at_month -= req.quantity_required
adjusted_stocks[sparepart_id] = max(0, stock_at_month)
return adjusted_stocks
def _get_existing_orders_for_month(self, target_month: int) -> List[ProcurementRecord]:
"""Get existing PR/PO orders that could supply spareparts for target month"""
target_date = self.analysis_start_date + timedelta(days=target_month * 30)
relevant_orders = []
for record in self.procurement_records:
date_expected_delivery = record.expected_delivery_date.date() if record.expected_delivery_date else None
# Include orders that deliver before or around the target month
# and are not cancelled
if (record.status != ProcurementStatus.CANCELLED and date_expected_delivery and
date_expected_delivery <= target_date): # 15 days buffer
relevant_orders.append(record)
return relevant_orders
def _generate_comprehensive_availability_message(self, missing_parts: List[Dict],
critical_missing: List[Dict],
pr_po_summary: Dict) -> Dict:
"""Generate comprehensive availability message with PR/PO details"""
if not missing_parts:
if pr_po_summary['existing_orders']:
message = f"All spareparts available through {len(pr_po_summary['existing_orders'])} existing orders"
detailed_message = f"Total existing order value: ${pr_po_summary['total_existing_value']:,.2f}"
else:
message = "All spareparts available from current stock"
detailed_message = "No additional procurement required"
else:
if critical_missing:
message = f"CRITICAL: {len(critical_missing)} critical spareparts missing. Overhaul cannot proceed."
detailed_message = f"Additional procurement required: ${pr_po_summary['total_new_orders_value']:,.2f}"
else:
message = f"WARNING: {len(missing_parts)} spareparts missing, but no critical parts."
if pr_po_summary['total_new_orders_value'] > 0:
detailed_message = f"Additional procurement required: ${pr_po_summary['total_new_orders_value']:,.2f}. "
else:
detailed_message = ""
if pr_po_summary['existing_orders']:
detailed_message += f"Existing orders cover some requirements (${pr_po_summary['total_existing_value']:,.2f})."
return {
'message': message,
'detailed_message': detailed_message
}
def _generate_availability_message(self, missing_parts: List[Dict], critical_missing: List[Dict]) -> str:
"""Generate human-readable availability message"""
if not missing_parts:
return "All spareparts available"
if critical_missing:
return f"CRITICAL: {len(critical_missing)} critical spareparts missing. Overhaul cannot proceed."
return f"WARNING: {len(missing_parts)} spareparts missing, but no critical parts. Overhaul can proceed with procurement."
def optimize_procurement_timing(self, planned_overhauls: List[Tuple[str, int]]) -> Dict:
"""
Optimize procurement timing for multiple equipment overhauls
Args:
planned_overhauls: List of (equipment_tag, planned_month) tuples
Returns:
Optimized procurement plan
"""
procurement_plan = []
total_procurement_cost = 0
# Sort overhauls by month
sorted_overhauls = sorted(planned_overhauls, key=lambda x: x[1])
# Track cumulative procurement needs
processed_overhauls = []
for equipment_tag, target_month in sorted_overhauls:
availability = self.check_sparepart_availability(
equipment_tag, target_month, processed_overhauls
)
for procurement in availability['procurement_needed']:
procurement_plan.append({
'equipment_tag': equipment_tag,
'target_overhaul_month': target_month,
**procurement
})
total_procurement_cost += procurement['total_cost']
processed_overhauls.append((equipment_tag, target_month))
# Group by order month for better planning
procurement_by_month = defaultdict(list)
for item in procurement_plan:
procurement_by_month[item['order_by_month']].append(item)
return {
'total_procurement_cost': total_procurement_cost,
'procurement_plan': procurement_plan,
'procurement_by_month': dict(procurement_by_month),
# 'summary': self._generate_procurement_summary(procurement_plan)
}
def _generate_procurement_summary(self, procurement_plan: List[Dict]) -> Dict:
"""Generate procurement summary statistics"""
if not procurement_plan:
return {'message': 'No procurement needed'}
critical_items = [p for p in procurement_plan if p['criticality'] == 'critical']
total_items = len(procurement_plan)
total_cost = sum(p['total_cost'] for p in procurement_plan)
# Group by supplier
by_supplier = defaultdict(list)
for item in procurement_plan:
by_supplier[item['supplier']].append(item)
return {
'total_items': total_items,
'critical_items': len(critical_items),
'total_cost': total_cost,
'unique_spareparts': len(set(p['sparepart_id'] for p in procurement_plan)),
'suppliers_involved': len(by_supplier),
'by_supplier': {
supplier: {
'item_count': len(items),
'total_cost': sum(i['total_cost'] for i in items)
}
for supplier, items in by_supplier.items()
}
}
def get_monthly_procurement_schedule(self) -> Dict[int, List[Dict]]:
"""Get procurement schedule by month"""
if not hasattr(self, '_monthly_schedule'):
self._monthly_schedule = {}
return self._monthly_schedule
def update_projected_stocks_with_consumption(self, equipment_overhauls: List[Tuple[str, int]]):
"""Update projected stocks considering sparepart consumption from overhauls"""
# Create a copy of projected stocks
updated_stocks = {}
for sparepart_id, monthly_stocks in self.projected_stocks.items():
updated_stocks[sparepart_id] = monthly_stocks.copy()
# Apply consumption from overhauls
for equipment_tag, overhaul_month in equipment_overhauls:
if equipment_tag in self.equipment_requirements:
for requirement in self.equipment_requirements[equipment_tag]:
sparepart_id = requirement.sparepart_id
quantity_needed = requirement.quantity_required
if sparepart_id in updated_stocks:
# Reduce stock from overhaul month onwards
for month in range(overhaul_month, len(updated_stocks[sparepart_id])):
updated_stocks[sparepart_id][month] = max(
0, updated_stocks[sparepart_id][month] - quantity_needed
)
return updated_stocks
# Integration functions for database operations
async def load_sparepart_data_from_db(scope, prev_oh_scope, db_session) -> SparepartManager:
"""Load sparepart data from database"""
# You'll need to implement these queries based on your database schema
# Get scope dates for analysis window
# scope = await get_scope(db_session=db_session, overhaul_session_id=overhaul_session_id)
# prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope)
analysis_start_date = prev_oh_scope.end_date
analysis_window_months = int(((scope.start_date - prev_oh_scope.end_date).days / 30) * 1.5)
sparepart_manager = SparepartManager(analysis_start_date, analysis_window_months)
# Load sparepart stocks
# Example query - adjust based on your schema
query = text("""
SELECT
mi.id,
mi.itemnum,
mi.itemsetid,
mi."location",
mi.curbaltotal,
mi.avgcost,
mspl.description
FROM public.maximo_inventory mi
LEFT JOIN public.maximo_sparepart_pr_po_line mspl
ON mi.itemnum = mspl.item_num
""")
log.info("Fetch sparepart")
sparepart_stocks_query = await db_session.execute(query)
for stock_record in sparepart_stocks_query:
stock = SparepartStock(
sparepart_id=stock_record.itemnum,
sparepart_name=stock_record.description,
current_stock=stock_record.curbaltotal,
unit_cost=stock_record.avgcost,
location=stock_record.location or "Unknown",
)
sparepart_manager.add_sparepart_stock(stock)
# Load equipment sparepart requirements
# You'll need to create this table/relationship
query = text("""WITH oh_workorders AS (
-- First, get all OH work orders
SELECT DISTINCT
wonum,
asset_location
FROM public.wo_staging_maximo_2
WHERE worktype = 'OH' AND asset_location IS NOT NULL
),
sparepart_usage AS (
-- Get sparepart usage for OH work orders
SELECT
oh.asset_location,
mwm.itemnum,
mwm.itemqty,
mwm.wonum
FROM oh_workorders oh
INNER JOIN public.maximo_workorder_materials mwm
ON oh.wonum = mwm.wonum
),
location_sparepart_stats AS (
-- Calculate average usage per sparepart per location
SELECT
asset_location,
itemnum,
COUNT(DISTINCT wonum) as total_wo_count,
SUM(itemqty) as total_qty_used,
AVG(itemqty) as avg_qty_per_wo,
MIN(itemqty) as min_qty_used,
MAX(itemqty) as max_qty_used
FROM sparepart_usage
GROUP BY asset_location, itemnum
),
pr_po_combined AS (
-- Combine PR and PO data by num to get issue_date and estimated_arrival_date
SELECT
mspl.item_num,
mspl.num,
MAX(CASE WHEN mspo.type = 'PR' THEN mspo.issue_date END) as issue_date,
MAX(CASE WHEN mspo.type = 'PO' THEN mspo.estimated_arrival_date END) as estimated_arrival_date
FROM public.maximo_sparepart_pr_po_line mspl
INNER JOIN public.maximo_sparepart_pr_po mspo
ON mspl.num = mspo.num
WHERE mspo.type IN ('PR', 'PO')
GROUP BY mspl.item_num, mspl.num
),
leadtime_stats AS (
-- Calculate lead time statistics for each item
SELECT
item_num,
ROUND(AVG(
EXTRACT(EPOCH FROM (estimated_arrival_date - issue_date)) / 86400 / 30.44
), 1) as avg_leadtime_months,
ROUND(MIN(
EXTRACT(EPOCH FROM (estimated_arrival_date - issue_date)) / 86400 / 30.44
), 1) as min_leadtime_months,
COUNT(*) as leadtime_sample_size
FROM pr_po_combined
WHERE issue_date IS NOT NULL
AND estimated_arrival_date IS NOT NULL
AND estimated_arrival_date > issue_date
GROUP BY item_num
),
item_descriptions AS (
-- Get unique descriptions for each item (optimized)
SELECT DISTINCT
item_num,
FIRST_VALUE(description) OVER (
PARTITION BY item_num
ORDER BY created_at DESC NULLS LAST
) as description
FROM public.maximo_sparepart_pr_po_line
WHERE description IS NOT NULL
)
SELECT
lss.asset_location,
lss.itemnum,
COALESCE(id.description, 'No description available') as item_description,
lss.total_wo_count,
lss.total_qty_used,
ROUND(lss.avg_qty_per_wo, 2) as avg_qty_per_wo,
lss.min_qty_used,
lss.max_qty_used,
COALESCE(lt.avg_leadtime_months, 0) as avg_leadtime_months,
COALESCE(lt.min_leadtime_months, 0) as min_leadtime_months,
COALESCE(lt.leadtime_sample_size, 0) as leadtime_sample_size
FROM location_sparepart_stats lss
LEFT JOIN item_descriptions id ON lss.itemnum = id.item_num
LEFT JOIN leadtime_stats lt ON lss.itemnum = lt.item_num
ORDER BY lss.asset_location, lss.itemnum;""")
equipment_requirements_query = await db_session.execute(query)
equipment_requirements = defaultdict(list)
for req_record in equipment_requirements_query:
requirement = SparepartRequirement(
sparepart_id=req_record.itemnum,
quantity_required=float(req_record.avg_qty_per_wo),
lead_time=float(req_record.avg_leadtime_months),
sparepart_name=req_record.item_description
)
equipment_requirements[req_record.asset_location].append(requirement)
for equipment_tag, requirements in equipment_requirements.items():
sparepart_manager.add_equipment_requirements(equipment_tag, requirements)
# Load procurement records (PO/PR)
procurement_query = await get_all(db_session=db_session)
for proc_record in procurement_query:
procurement = ProcurementRecord(
po_pr_id=proc_record["pr_number"],
sparepart_id=proc_record["item_num"],
sparepart_name=proc_record["description"],
quantity=proc_record["pr_qty_ordered"],
unit_cost=proc_record["unit_cost"],
total_cost=proc_record["line_cost"],
order_date=proc_record['pr_issue_date'],
expected_delivery_date=proc_record['po_estimated_arrival_date'],
status=ProcurementStatus("ordered"),
)
sparepart_manager.add_procurement_record(procurement)
# Calculate projected stocks
sparepart_manager._project_monthly_stocks()
return sparepart_manager
# return activity
# async def delete(*, db_session: DbSession, activity_id: str):
# """Deletes a document."""
# activity = await db_session.get(MasterActivity, activity_id)
# await db_session.delete(activity)
# await db_session.commit()
Loading…
Cancel
Save