You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
be-optimumoh/src/sparepart/service.py

1068 lines
44 KiB
Python

from typing import Optional
import asyncio
import logging
from datetime import datetime, timedelta, date
from typing import List, Dict, Optional, Tuple
from collections import defaultdict
from uuid import UUID
import numpy as np
from dataclasses import dataclass
from enum import Enum
from sqlalchemy import Delete, Select, text
from sqlalchemy.orm import joinedload, selectinload
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.logging import setup_logging
from src.overhaul_scope.service import get as get_scope
from src.overhaul_scope.service import get_prev_oh
log = logging.getLogger(__name__)
setup_logging(logger=log)
from sqlalchemy import text
import math
async def get_spareparts_paginated(db_session):
"""
Get paginated spare parts with usage, inventory, and PR/PO information.
Uses two queries: one for data, one for total count.
Args:
db_session: SQLAlchemy database session
page (int): Page number (1-based)
items_per_page (int): Number of items per page
"""
# calculate limit/offset
# limit = items_per_page
# offset = (page - 1) * items_per_page
# -----------------------------
# Query #1: Fetch paginated rows
# -----------------------------
data_query = text("""
WITH oh_workorders AS (
SELECT DISTINCT wonum, asset_location, asset_unit
FROM public.wo_staging_maximo_2
WHERE worktype = 'OH'
AND asset_location IS NOT NULL
AND EXTRACT(YEAR FROM reportdate) >= 2019
AND asset_unit IN ('3', '00')
),
sparepart_usage AS (
SELECT oh.asset_location, mwm.itemnum, mwm.itemqty, mwm.wonum
FROM oh_workorders oh
INNER JOIN public.maximo_workorder_materials mwm ON oh.wonum = mwm.wonum
),
location_sparepart_stats AS (
SELECT asset_location, itemnum,
COUNT(DISTINCT wonum) as total_wo_count,
SUM(itemqty) as total_qty_used,
AVG(itemqty) as avg_qty_per_wo,
MIN(itemqty) as min_qty_used,
MAX(itemqty) as max_qty_used
FROM sparepart_usage
GROUP BY asset_location, itemnum
HAVING SUM(itemqty) > 0
),
pr_lines AS (
SELECT
pl.item_num,
h.num as pr_number,
h.issue_date as pr_issue_date,
h.status as pr_status,
pl.qty_ordered as pr_qty_ordered,
pl.qty_requested as pr_qty_requested
FROM public.maximo_sparepart_pr_po h
JOIN public.maximo_sparepart_pr_po_line pl ON h.num = pl.num
WHERE h.type = 'PR' AND EXTRACT(YEAR FROM h.issue_date) >= 2023
),
item_descriptions AS (
SELECT DISTINCT
item_num,
FIRST_VALUE(description) OVER (
PARTITION BY item_num
ORDER BY created_at DESC NULLS LAST
) as description
FROM public.maximo_sparepart_pr_po_line
WHERE description IS NOT NULL
),
po_lines AS (
SELECT
pl.item_num,
h.num as po_number,
h.estimated_arrival_date as po_estimated_arrival_date,
h.vendeliverydate as po_vendeliverydate,
h.receipts as po_receipt,
h.status as po_status,
pl.qty_ordered as po_qty_ordered,
pl.qty_received as po_qty_received
FROM public.maximo_sparepart_pr_po h
JOIN public.maximo_sparepart_pr_po_line pl ON h.num = pl.num
WHERE h.type = 'PO'
AND (h.receipts = 'NONE')
AND (h.status IS NOT NULL)
),
pr_po_unified AS (
SELECT
pr.item_num,
pr.pr_number,
pr.pr_issue_date,
pr.pr_qty_ordered,
pr.pr_status,
po.po_number,
COALESCE(po.po_qty_ordered,0) as po_qty_ordered,
COALESCE(po.po_qty_received,0) as po_qty_received,
po.po_estimated_arrival_date,
po.po_vendeliverydate,
po.po_receipt,
po.po_status,
CASE WHEN po.po_number IS NOT NULL THEN 'YES' ELSE 'NO' END as po_exists
FROM pr_lines pr
LEFT JOIN po_lines po
ON pr.item_num = po.item_num
AND pr.pr_number = po.po_number
),
pr_po_agg AS (
SELECT
item_num,
SUM(COALESCE(pr_qty_ordered,0)) as total_pr_qty,
SUM(COALESCE(po_qty_ordered,0)) as total_po_qty,
SUM(COALESCE(po_qty_received,0)) as total_po_received,
JSON_AGG(
JSON_BUILD_OBJECT(
'pr_number', pr_number,
'pr_issue_date', pr_issue_date,
'pr_qty_requested', pr_qty_ordered,
'pr_status', pr_status,
'po_exists', po_exists,
'po_qty_ordered', po_qty_ordered,
'po_qty_received', po_qty_received,
'po_estimated_arrival_date', po_estimated_arrival_date,
'po_vendeliverydate', po_vendeliverydate,
'po_receipt', po_receipt,
'po_status', po_status
) ORDER BY pr_issue_date DESC
) as pr_po_details
FROM pr_po_unified
GROUP BY item_num
)
SELECT
lss.itemnum,
COALESCE(id.description, 'No description available') as item_description,
lss.total_wo_count,
lss.total_qty_used,
ROUND(CAST(lss.avg_qty_per_wo AS NUMERIC), 2) as avg_qty_per_wo,
lss.min_qty_used,
lss.max_qty_used,
COALESCE(i.curbaltotal,0) as current_balance_total,
COALESCE(ap.total_pr_qty,0) as total_pr_qty,
COALESCE(ap.total_po_qty,0) as total_po_qty,
COALESCE(ap.total_po_received,0) as total_po_received,
ap.pr_po_details
FROM location_sparepart_stats lss
LEFT JOIN item_descriptions id ON lss.itemnum = id.item_num
LEFT JOIN public.maximo_inventory i ON lss.itemnum = i.itemnum
LEFT JOIN pr_po_agg ap ON lss.itemnum = ap.item_num
ORDER BY lss.asset_location, lss.itemnum
""")
rows = await db_session.execute(data_query)
spare_parts = []
for row in rows:
spare_parts.append({
"item_num": row.itemnum,
"description": row.item_description,
"current_balance_total": float(row.current_balance_total) if row.current_balance_total else 0.0,
"total_required_for_oh": float(row.avg_qty_per_wo),
"total_pr_qty": row.total_pr_qty,
"total_po_qty": row.total_po_qty,
"total_po_received": row.total_po_received,
"pr_po_details": row.pr_po_details
})
return spare_parts
# # -----------------------------
# # Query #2: Count total rows
# # -----------------------------
# count_query = text("""
# WITH oh_workorders AS (
# SELECT DISTINCT wonum, asset_location, asset_unit
# FROM public.wo_staging_maximo_2
# WHERE worktype = 'OH'
# AND asset_location IS NOT NULL
# AND EXTRACT(YEAR FROM reportdate) >= 2019
# AND asset_unit IN ('3', '00')
# ),
# sparepart_usage AS (
# SELECT oh.asset_location, mwm.itemnum, mwm.itemqty, mwm.wonum
# FROM oh_workorders oh
# INNER JOIN public.maximo_workorder_materials mwm ON oh.wonum = mwm.wonum
# ),
# location_sparepart_stats AS (
# SELECT asset_location, itemnum
# FROM sparepart_usage
# GROUP BY asset_location, itemnum
# )
# SELECT COUNT(*) as total_count
# FROM location_sparepart_stats;
# """)
# total_count_result = await db_session.execute(count_query)
# total_count = total_count_result.scalar() or 0
# # calculate total pages
# total_pages = math.ceil(total_count / items_per_page) if items_per_page > 0 else 1
# return {
# "total": total_count,
# "page": page,
# "items_per_page": items_per_page,
# "total_pages": total_pages,
# "items": spare_parts
# }
class ProcurementStatus(Enum):
PLANNED = "planned"
ORDERED = "ordered"
RECEIVED = "received"
CANCELLED = "cancelled"
@dataclass
class SparepartRequirement:
"""Sparepart requirement for equipment overhaul"""
sparepart_id: str
quantity_required: int
lead_time: int
sparepart_name: str
unit_cost: float
@dataclass
class SparepartStock:
"""Current sparepart stock information"""
sparepart_id: str
sparepart_name: str
current_stock: int
unit_cost: float
location: str
@dataclass
class ProcurementRecord:
"""Purchase Order/Purchase Request record"""
po_pr_id: str
sparepart_id: str
sparepart_name: str
quantity: int
unit_cost: float
total_cost: float
order_date: date
expected_delivery_date: date
status: ProcurementStatus
po_vendor_delivery_date: date
class SparepartManager:
"""Manages sparepart availability and procurement for overhaul optimization"""
def __init__(self, analysis_start_date: date, analysis_window_months: int):
self.analysis_start_date = analysis_start_date
self.analysis_window_months = analysis_window_months
self.logger = log
# Storage for sparepart data
self.sparepart_stocks: Dict[str, SparepartStock] = {}
self.equipment_requirements: Dict[str, List[SparepartRequirement]] = {}
self.procurement_records: List[ProcurementRecord] = []
# Monthly projected stocks
self.projected_stocks: Dict[str, List[int]] = {}
def add_sparepart_stock(self, stock: SparepartStock):
"""Add sparepart stock information"""
self.sparepart_stocks[stock.sparepart_id] = stock
def add_equipment_requirements(self, equipment_tag: str, requirements: List[SparepartRequirement]):
"""Add sparepart requirements for equipment"""
self.equipment_requirements[equipment_tag] = requirements
def add_procurement_record(self, record: ProcurementRecord):
"""Add procurement record (PO/PR)"""
self.procurement_records.append(record)
def _calculate_monthly_deliveries(self) -> Dict[str, Dict[int, int]]:
"""Calculate expected deliveries for each sparepart by month"""
deliveries = defaultdict(lambda: defaultdict(int))
for record in self.procurement_records:
if record.status in [ProcurementStatus.ORDERED, ProcurementStatus.PLANNED]:
# Skip records with no expected delivery date (e.g., still PR stage)
if not record.expected_delivery_date:
continue
months_from_start = (
(record.expected_delivery_date.year - self.analysis_start_date.year) * 12 +
(record.expected_delivery_date.month - self.analysis_start_date.month)
)
if 0 <= months_from_start < self.analysis_window_months:
deliveries[record.sparepart_id][months_from_start] += record.quantity
return deliveries
def _project_monthly_stocks(self) -> Dict[str, List[int]]:
"""Project sparepart stock levels for each month"""
projected_stocks = {}
monthly_deliveries = self._calculate_monthly_deliveries()
for sparepart_id, stock_info in self.sparepart_stocks.items():
monthly_stock = []
current_stock = float(stock_info.current_stock)
for month in range(self.analysis_window_months):
# Add any deliveries for this month
if sparepart_id in monthly_deliveries and month in monthly_deliveries[sparepart_id]:
current_stock += float(monthly_deliveries[sparepart_id][month])
monthly_stock.append(current_stock)
# Note: We don't subtract usage here yet - that will be done during optimization
projected_stocks[sparepart_id] = monthly_stock
self.projected_stocks = projected_stocks
return projected_stocks
def check_sparepart_availability(self, equipment_tag: str, target_month: int,
consider_other_overhauls: List[Tuple[str, int]] = None) -> Dict:
"""
Check if spareparts are available for equipment overhaul at target month
Args:
equipment_tag: Equipment location tag
target_month: Month when overhaul is planned (0-based)
consider_other_overhauls: List of (equipment_tag, month) for other planned overhauls
Returns:
Dict with availability status and details
"""
if equipment_tag not in self.equipment_requirements:
return {
'available': True,
'message': f'No sparepart requirements defined for {equipment_tag}',
'missing_parts': [],
'procurement_needed': [],
'total_procurement_cost': 0,
'can_proceed_with_delays': True,
'pr_po_summary': {
'existing_orders': [],
'required_new_orders': [],
'total_existing_value': 0,
'total_new_orders_value': 0
}
}
requirements = self.equipment_requirements[equipment_tag]
missing_parts = []
procurement_needed = []
total_procurement_cost = 0
# Calculate stock after considering other overhauls
adjusted_stocks = self._calculate_adjusted_stocks(target_month, consider_other_overhauls or [])
existing_orders = self._get_existing_orders_for_month(target_month)
pr_po_summary = {
'existing_orders': [],
'required_new_orders': [],
'total_existing_value': 0,
'total_new_orders_value': 0,
'orders_by_status': {
'planned': [],
'ordered': [],
'received': [],
'cancelled': []
}
}
for requirement in requirements:
sparepart_id = requirement.sparepart_id
needed_quantity = requirement.quantity_required
sparepart_name = requirement.sparepart_name
unit_cost = requirement.unit_cost
current_stock = adjusted_stocks.get(sparepart_id, 0)
existing_sparepart_orders = [order for order in existing_orders
if order.sparepart_id == sparepart_id]
total_ordered_quantity = sum(order.quantity for order in existing_sparepart_orders)
effective_stock = current_stock + total_ordered_quantity
if effective_stock >= needed_quantity:
# Sufficient stock available (including from existing orders)
if existing_sparepart_orders:
# Add existing order info to summary
for order in existing_sparepart_orders:
order_info = {
'po_pr_id': order.po_pr_id,
'sparepart_id': sparepart_id,
'sparepart_name': sparepart_name,
'quantity': order.quantity,
'unit_cost': order.unit_cost,
'total_cost': order.total_cost,
'order_date': order.order_date.isoformat(),
'expected_delivery_date': order.expected_delivery_date.isoformat(),
'status': order.status.value,
'months_until_delivery': self._calculate_months_until_delivery(order.expected_delivery_date, target_month),
'is_on_time': self._is_delivery_on_time(order.expected_delivery_date, target_month),
'usage': 'covers_requirement'
}
pr_po_summary['existing_orders'].append(order_info)
pr_po_summary['total_existing_value'] += order.total_cost
pr_po_summary['orders_by_status'][order.status.value].append(order_info)
else:
# Insufficient stock - need additional procurement
shortage = needed_quantity - effective_stock
missing_parts.append({
'sparepart_id': sparepart_id,
'sparepart_name': sparepart_name,
'required': needed_quantity,
'current_stock': current_stock,
'ordered_quantity': total_ordered_quantity,
'effective_available': effective_stock,
'shortage': shortage,
'criticality': "warning",
'existing_orders': len(existing_sparepart_orders),
'existing_orders_details': [
{
'po_pr_id': order.po_pr_id,
'quantity': order.quantity,
'status': order.status.value,
'expected_delivery': order.expected_delivery_date.isoformat(),
'is_on_time': self._is_delivery_on_time(order.expected_delivery_date, target_month)
} for order in existing_sparepart_orders
]
})
# Calculate additional procurement needed
if sparepart_id in self.sparepart_stocks:
procurement_cost = shortage * unit_cost
total_procurement_cost += procurement_cost
# Calculate when to order (considering lead time)
order_month = max(0, target_month - requirement.lead_time)
order_date = self.analysis_start_date + timedelta(days=order_month * 30)
expected_delivery = order_date + timedelta(days=requirement.lead_time * 30)
new_order = {
'sparepart_id': sparepart_id,
'sparepart_name': sparepart_name,
'quantity_needed': shortage,
'unit_cost': unit_cost,
'total_cost': procurement_cost,
'order_by_month': order_month,
'recommended_order_date': order_date.isoformat(),
'expected_delivery_date': expected_delivery.isoformat(),
'lead_time_months': requirement.lead_time,
'criticality': "warning",
'urgency': self._calculate_urgency(order_month, target_month),
'reason': f'Additional {shortage} units needed beyond existing orders'
}
procurement_needed.append(new_order)
pr_po_summary['required_new_orders'].append(new_order)
pr_po_summary['total_new_orders_value'] += procurement_cost
# Check for critical parts
critical_missing = [p for p in missing_parts if p['criticality'] == 'critical']
# Generate comprehensive summary
availability_summary = self._generate_comprehensive_availability_message(
missing_parts, critical_missing, pr_po_summary
)
return {
'available': len(critical_missing) == 0,
'total_missing_parts': len(missing_parts),
'critical_missing_parts': len(critical_missing),
'missing_parts': missing_parts,
'procurement_needed': procurement_needed,
'total_procurement_cost': total_procurement_cost,
'can_proceed_with_delays': len(critical_missing) == 0,
'message': availability_summary['message'],
'detailed_message': availability_summary['detailed_message'],
'pr_po_summary': pr_po_summary,
'recommendations': self._generate_procurement_recommendations(pr_po_summary, target_month)
}
def _calculate_months_until_delivery(self, delivery_date: date, target_month: int) -> int:
"""Calculate months from target month to delivery date"""
target_date = self.analysis_start_date + timedelta(days=target_month * 30)
months_diff = (delivery_date.year - target_date.year) * 12 + (delivery_date.month - target_date.month)
return months_diff
def _calculate_urgency(self, order_month: int, target_month: int) -> str:
"""Calculate urgency level for new procurement"""
time_gap = target_month - order_month
if time_gap <= 1:
return "URGENT"
elif time_gap <= 3:
return "HIGH"
elif time_gap <= 6:
return "MEDIUM"
else:
return "LOW"
def _generate_procurement_recommendations(self, pr_po_summary: Dict, target_month: int) -> List[Dict]:
"""Generate procurement recommendations based on analysis"""
recommendations = []
# Check for late deliveries
late_orders = [order for order in pr_po_summary['existing_orders']
if not order['is_on_time']]
if late_orders:
recommendations.append({
'type': 'LATE_DELIVERY_WARNING',
'priority': 'HIGH',
'message': f"{len(late_orders)} existing orders will deliver late",
'details': [f"PO/PR {order['po_pr_id']} for {order['sparepart_name']}"
for order in late_orders],
'action': 'Expedite delivery or find alternative suppliers'
})
# Check for urgent new orders
urgent_orders = [order for order in pr_po_summary['required_new_orders']
if order['urgency'] == 'URGENT']
if urgent_orders:
recommendations.append({
'type': 'URGENT_PROCUREMENT',
'priority': 'CRITICAL',
'message': f"{len(urgent_orders)} spareparts need immediate ordering",
'details': [f"{order['sparepart_name']}: {order['quantity_needed']} units"
for order in urgent_orders],
'action': 'Place orders immediately or consider expedited delivery'
})
# Check for cancelled orders
cancelled_orders = pr_po_summary['orders_by_status'].get('cancelled', [])
if cancelled_orders:
recommendations.append({
'type': 'CANCELLED_ORDER_IMPACT',
'priority': 'MEDIUM',
'message': f"{len(cancelled_orders)} cancelled orders may affect availability",
'details': [f"Cancelled: PO/PR {order['po_pr_id']} for {order['sparepart_name']}"
for order in cancelled_orders],
'action': 'Review impact and place replacement orders if necessary'
})
# Budget recommendations
total_investment = pr_po_summary['total_existing_value'] + pr_po_summary['total_new_orders_value']
if total_investment > 0:
recommendations.append({
'type': 'BUDGET_SUMMARY',
'priority': 'INFO',
'message': f'Total sparepart investment: Rp. {total_investment:,.2f}',
'details': [
f"Existing orders: Rp. {pr_po_summary['total_existing_value']:,.2f}",
f"Additional orders needed: Rp. {pr_po_summary['total_new_orders_value']:,.2f}"
],
'action': 'Ensure budget allocation for sparepart procurement'
})
return recommendations
def _is_delivery_on_time(self, delivery_date: datetime, target_month: int) -> bool:
"""Check if delivery will arrive on time for target month"""
target_date = self.analysis_start_date + timedelta(days=target_month * 30)
del_time = delivery_date.date() if delivery_date else None
return del_time <= target_date if del_time else False
def _calculate_adjusted_stocks(self, target_month: int, other_overhauls: List[Tuple[str, int]]) -> Dict[str, int]:
"""Calculate stock levels after considering consumption from other planned overhauls"""
adjusted_stocks = {}
for sparepart_id, monthly_stocks in self.projected_stocks.items():
if target_month < len(monthly_stocks):
stock_at_month = monthly_stocks[target_month]
# Subtract consumption from other overhauls happening at or before target month
for other_equipment, other_month in other_overhauls:
if other_month <= target_month and other_equipment in self.equipment_requirements:
for req in self.equipment_requirements[other_equipment]:
if req.sparepart_id == sparepart_id:
stock_at_month -= req.quantity_required
adjusted_stocks[sparepart_id] = max(0, stock_at_month)
return adjusted_stocks
def _get_existing_orders_for_month(self, target_month: int) -> List[ProcurementRecord]:
"""Get existing PR/PO orders that could supply spareparts for target month"""
target_date = self.analysis_start_date + timedelta(days=target_month * 30)
relevant_orders = []
for record in self.procurement_records:
date_expected_delivery = record.expected_delivery_date.date() if record.expected_delivery_date else None
# Include orders that deliver before or around the target month
# and are not cancelled
if (record.status != ProcurementStatus.CANCELLED and date_expected_delivery and
date_expected_delivery <= target_date): # 15 days buffer
relevant_orders.append(record)
return relevant_orders
def _generate_comprehensive_availability_message(self, missing_parts: List[Dict],
critical_missing: List[Dict],
pr_po_summary: Dict) -> Dict:
"""Generate comprehensive availability message with PR/PO details"""
if not missing_parts:
if pr_po_summary['existing_orders']:
message = f"All spareparts available through {len(pr_po_summary['existing_orders'])} existing orders"
detailed_message = f"Total existing order value: Rp. {pr_po_summary['total_existing_value']:,.2f}"
else:
message = "All spareparts available from current stock"
detailed_message = "No additional procurement required"
else:
if critical_missing:
message = f"CRITICAL: {len(critical_missing)} critical spareparts missing. Overhaul cannot proceed."
detailed_message = f"Additional procurement required: Rp. {pr_po_summary['total_new_orders_value']:,.2f}"
else:
message = f"WARNING: {len(missing_parts)} spareparts missing, but no critical parts."
if pr_po_summary['total_new_orders_value'] > 0:
detailed_message = f"Additional procurement required: Rp. {pr_po_summary['total_new_orders_value']:,.2f}. "
else:
detailed_message = ""
if pr_po_summary['existing_orders']:
detailed_message += f"Existing orders cover some requirements (Rp. {pr_po_summary['total_existing_value']:,.2f})."
return {
'message': message,
'detailed_message': detailed_message
}
def _generate_availability_message(self, missing_parts: List[Dict], critical_missing: List[Dict]) -> str:
"""Generate human-readable availability message"""
if not missing_parts:
return "All spareparts available"
if critical_missing:
return f"CRITICAL: {len(critical_missing)} critical spareparts missing. Overhaul cannot proceed."
return f"WARNING: {len(missing_parts)} spareparts missing, but no critical parts. Overhaul can proceed with procurement."
def optimize_procurement_timing(self, planned_overhauls: List[Tuple[str, int]]) -> Dict:
"""
Optimize procurement timing for multiple equipment overhauls
Args:
planned_overhauls: List of (equipment_tag, planned_month) tuples
Returns:
Optimized procurement plan
"""
procurement_plan = []
total_procurement_cost = 0
# Sort overhauls by month
sorted_overhauls = sorted(planned_overhauls, key=lambda x: x[1])
# Track cumulative procurement needs
processed_overhauls = []
for equipment_tag, target_month in sorted_overhauls:
availability = self.check_sparepart_availability(
equipment_tag, target_month, processed_overhauls
)
for procurement in availability['procurement_needed']:
procurement_plan.append({
'equipment_tag': equipment_tag,
'target_overhaul_month': target_month,
**procurement
})
total_procurement_cost += procurement['total_cost']
processed_overhauls.append((equipment_tag, target_month))
# Group by order month for better planning
procurement_by_month = defaultdict(list)
for item in procurement_plan:
procurement_by_month[item['order_by_month']].append(item)
return {
'total_procurement_cost': total_procurement_cost,
'procurement_plan': procurement_plan,
'procurement_by_month': dict(procurement_by_month),
# 'summary': self._generate_procurement_summary(procurement_plan)
}
def _generate_procurement_summary(self, procurement_plan: List[Dict]) -> Dict:
"""Generate procurement summary statistics"""
if not procurement_plan:
return {'message': 'No procurement needed'}
critical_items = [p for p in procurement_plan if p['criticality'] == 'critical']
total_items = len(procurement_plan)
total_cost = sum(p['total_cost'] for p in procurement_plan)
# Group by supplier
by_supplier = defaultdict(list)
for item in procurement_plan:
by_supplier[item['supplier']].append(item)
return {
'total_items': total_items,
'critical_items': len(critical_items),
'total_cost': total_cost,
'unique_spareparts': len(set(p['sparepart_id'] for p in procurement_plan)),
'suppliers_involved': len(by_supplier),
'by_supplier': {
supplier: {
'item_count': len(items),
'total_cost': sum(i['total_cost'] for i in items)
}
for supplier, items in by_supplier.items()
}
}
def get_monthly_procurement_schedule(self) -> Dict[int, List[Dict]]:
"""Get procurement schedule by month"""
if not hasattr(self, '_monthly_schedule'):
self._monthly_schedule = {}
return self._monthly_schedule
def update_projected_stocks_with_consumption(self, equipment_overhauls: List[Tuple[str, int]]):
"""Update projected stocks considering sparepart consumption from overhauls"""
# Create a copy of projected stocks
updated_stocks = {}
for sparepart_id, monthly_stocks in self.projected_stocks.items():
updated_stocks[sparepart_id] = monthly_stocks.copy()
# Apply consumption from overhauls
for equipment_tag, overhaul_month in equipment_overhauls:
if equipment_tag in self.equipment_requirements:
for requirement in self.equipment_requirements[equipment_tag]:
sparepart_id = requirement.sparepart_id
quantity_needed = requirement.quantity_required
if sparepart_id in updated_stocks:
# Reduce stock from overhaul month onwards
for month in range(overhaul_month, len(updated_stocks[sparepart_id])):
updated_stocks[sparepart_id][month] = max(
0, updated_stocks[sparepart_id][month] - quantity_needed
)
return updated_stocks
# Integration functions for database operations
async def load_sparepart_data_from_db(scope, prev_oh_scope, db_session) -> SparepartManager:
"""Load sparepart data from database"""
# You'll need to implement these queries based on your database schema
# Get scope dates for analysis window
# scope = await get_scope(db_session=db_session, overhaul_session_id=overhaul_session_id)
# prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope)
analysis_start_date = prev_oh_scope.end_date
analysis_window_months = int(((scope.start_date - prev_oh_scope.end_date).days / 30) * 1.5)
sparepart_manager = SparepartManager(analysis_start_date, analysis_window_months)
# Load sparepart stocks
# Example query - adjust based on your schema
query = text("""
SELECT
mi.id,
mi.itemnum,
mi.itemsetid,
mi."location",
mi.curbaltotal,
mi.avgcost,
mspl.description
FROM public.maximo_inventory mi
LEFT JOIN public.maximo_sparepart_pr_po_line mspl
ON mi.itemnum = mspl.item_num
""")
log.info("Fetch sparepart")
sparepart_stocks_query = await db_session.execute(query)
for stock_record in sparepart_stocks_query:
stock = SparepartStock(
sparepart_id=stock_record.itemnum,
sparepart_name=stock_record.description,
current_stock=stock_record.curbaltotal,
unit_cost=stock_record.avgcost,
location=stock_record.location or "Unknown",
)
sparepart_manager.add_sparepart_stock(stock)
# Load equipment sparepart requirements
# You'll need to create this table/relationship
query = text("""WITH oh_workorders AS (
-- First, get all OH work orders
SELECT DISTINCT
wonum,
asset_location
FROM public.wo_staging_maximo_2
WHERE worktype = 'OH' AND asset_location IS NOT NULL
),
sparepart_usage AS (
-- Get sparepart usage for OH work orders
SELECT
oh.asset_location,
mwm.itemnum,
mwm.itemqty,
mwm.wonum
FROM oh_workorders oh
INNER JOIN public.maximo_workorder_materials mwm
ON oh.wonum = mwm.wonum
),
location_sparepart_stats AS (
-- Calculate average usage per sparepart per location
SELECT
asset_location,
itemnum,
COUNT(DISTINCT wonum) as total_wo_count,
SUM(itemqty) as total_qty_used,
AVG(itemqty) as avg_qty_per_wo,
MIN(itemqty) as min_qty_used,
MAX(itemqty) as max_qty_used
FROM sparepart_usage
GROUP BY asset_location, itemnum
),
pr_po_combined AS (
-- Combine PR and PO data by num to get issue_date and delivery dates
SELECT
mspl.item_num,
mspl.num,
mspl.unit_cost,
mspl.qty_ordered,
MAX(CASE WHEN mspo.type = 'PR' THEN mspo.issue_date END) as issue_date,
MAX(CASE WHEN mspo.type = 'PO' THEN mspo.vendeliverydate END) as vendeliverydate,
MAX(CASE WHEN mspo.type = 'PO' THEN mspo.estimated_arrival_date END) as estimated_arrival_date
FROM public.maximo_sparepart_pr_po_line mspl
INNER JOIN public.maximo_sparepart_pr_po mspo
ON mspl.num = mspo.num
WHERE mspo.type IN ('PR', 'PO')
GROUP BY mspl.item_num, mspl.num, mspl.unit_cost, mspl.qty_ordered
),
leadtime_stats AS (
-- Calculate lead time statistics for each item
-- Prioritize vendeliverydate over estimated_arrival_date
SELECT
item_num,
ROUND(CAST(AVG(
EXTRACT(EPOCH FROM (
COALESCE(vendeliverydate, estimated_arrival_date) - issue_date
)) / 86400 / 30.44
) AS NUMERIC), 1) as avg_leadtime_months,
ROUND(CAST(MIN(
EXTRACT(EPOCH FROM (
COALESCE(vendeliverydate, estimated_arrival_date) - issue_date
)) / 86400 / 30.44
) AS NUMERIC), 1) as min_leadtime_months,
ROUND(CAST(MAX(
EXTRACT(EPOCH FROM (
COALESCE(vendeliverydate, estimated_arrival_date) - issue_date
)) / 86400 / 30.44
) AS NUMERIC), 1) as max_leadtime_months,
COUNT(*) as leadtime_sample_size,
-- Additional metrics for transparency
COUNT(CASE WHEN vendeliverydate IS NOT NULL THEN 1 END) as vendelivery_count,
COUNT(CASE WHEN vendeliverydate IS NULL AND estimated_arrival_date IS NOT NULL THEN 1 END) as estimated_only_count
FROM pr_po_combined
WHERE issue_date IS NOT NULL
AND COALESCE(vendeliverydate, estimated_arrival_date) IS NOT NULL
AND COALESCE(vendeliverydate, estimated_arrival_date) > issue_date
GROUP BY item_num
),
cost_stats AS (
-- Calculate cost statistics for each item
SELECT
item_num,
ROUND(CAST(AVG(unit_cost) AS NUMERIC), 2) as avg_unit_cost,
ROUND(CAST(MIN(unit_cost) AS NUMERIC), 2) as min_unit_cost,
ROUND(CAST(MAX(unit_cost) AS NUMERIC), 2) as max_unit_cost,
COUNT(*) as cost_sample_size,
-- Total value statistics
ROUND(CAST(AVG(unit_cost * qty_ordered) AS NUMERIC), 2) as avg_order_value,
ROUND(CAST(SUM(unit_cost * qty_ordered) AS NUMERIC), 2) as total_value_ordered
FROM pr_po_combined
WHERE unit_cost IS NOT NULL AND unit_cost > 0
GROUP BY item_num
),
item_descriptions AS (
-- Get unique descriptions for each item (optimized)
SELECT DISTINCT
item_num,
FIRST_VALUE(description) OVER (
PARTITION BY item_num
ORDER BY created_at DESC NULLS LAST
) as description
FROM public.maximo_sparepart_pr_po_line
WHERE description IS NOT NULL
)
SELECT
lss.asset_location,
lss.itemnum,
COALESCE(id.description, 'No description available') as item_description,
lss.total_wo_count,
lss.total_qty_used,
ROUND(CAST(lss.avg_qty_per_wo AS NUMERIC), 2) as avg_qty_per_wo,
lss.min_qty_used,
lss.max_qty_used,
-- Lead time metrics
COALESCE(lt.avg_leadtime_months, 0) as avg_leadtime_months,
COALESCE(lt.min_leadtime_months, 0) as min_leadtime_months,
COALESCE(lt.max_leadtime_months, 0) as max_leadtime_months,
COALESCE(lt.leadtime_sample_size, 0) as leadtime_sample_size,
COALESCE(lt.vendelivery_count, 0) as vendelivery_count,
COALESCE(lt.estimated_only_count, 0) as estimated_only_count,
-- Cost metrics
COALESCE(cs.avg_unit_cost, 0) as avg_unit_cost,
COALESCE(cs.min_unit_cost, 0) as min_unit_cost,
COALESCE(cs.max_unit_cost, 0) as max_unit_cost,
COALESCE(cs.cost_sample_size, 0) as cost_sample_size,
COALESCE(cs.avg_order_value, 0) as avg_order_value,
COALESCE(cs.total_value_ordered, 0) as total_value_ordered,
-- Estimated total cost for average OH
ROUND(CAST(COALESCE(lss.avg_qty_per_wo * cs.avg_unit_cost, 0) AS NUMERIC), 2) as estimated_cost_per_oh
FROM location_sparepart_stats lss
LEFT JOIN item_descriptions id ON lss.itemnum = id.item_num
LEFT JOIN leadtime_stats lt ON lss.itemnum = lt.item_num
LEFT JOIN cost_stats cs ON lss.itemnum = cs.item_num
ORDER BY lss.asset_location, lss.itemnum;""")
equipment_requirements_query = await db_session.execute(query)
equipment_requirements = defaultdict(list)
for req_record in equipment_requirements_query:
requirement = SparepartRequirement(
sparepart_id=req_record.itemnum,
quantity_required=float(req_record.avg_qty_per_wo),
lead_time=float(req_record.avg_leadtime_months),
sparepart_name=req_record.item_description,
unit_cost=float(req_record.avg_unit_cost)
)
equipment_requirements[req_record.asset_location].append(requirement)
for equipment_tag, requirements in equipment_requirements.items():
sparepart_manager.add_equipment_requirements(equipment_tag, requirements)
# Load procurement records (PO/PR)
query = text("""
WITH active_pos AS (
-- Get all POs that are NOT complete (not in inventory yet) and NOT closed
SELECT
pl.item_num,
h.num as po_number,
pl.qty_received,
pl.qty_ordered,
h.estimated_arrival_date,
h.vendeliverydate,
h.receipts as po_receipts,
h.status as po_status,
pl.description,
pl.unit_cost,
pl.line_cost
FROM public.maximo_sparepart_pr_po h
JOIN public.maximo_sparepart_pr_po_line pl ON h.num = pl.num
WHERE h.type = 'PO'
-- Exclude POs where receipts = 'COMPLETE'
AND (h.receipts IS NULL OR h.receipts != 'COMPLETE')
-- Exclude closed POs
AND (h.status IS NULL OR h.status != 'CLOSE')
),
po_with_pr_date AS (
-- Join with PR to get the issue_date
SELECT
po.*,
pr.issue_date as pr_issue_date
FROM active_pos po
LEFT JOIN public.maximo_sparepart_pr_po pr
ON pr.num = po.po_number
AND pr.type = 'PR'
)
SELECT
po.item_num,
po.description,
po.line_cost,
po.unit_cost,
COALESCE(i.curbaltotal, 0) as current_balance_total,
po.po_number,
po.pr_issue_date,
po.po_status,
po.po_receipts,
COALESCE(po.qty_received, 0) as po_qty_received,
COALESCE(po.qty_ordered, 0) as po_qty_ordered,
po.estimated_arrival_date as po_estimated_arrival_date,
po.vendeliverydate as po_vendor_delivery_date
FROM po_with_pr_date po
LEFT JOIN public.maximo_inventory i ON po.item_num = i.itemnum
ORDER BY po.item_num, po.pr_issue_date DESC;
""")
# Execute the query
result = await db_session.execute(query)
# Fetch all results and convert to list of dictionaries
procurement_query = []
for row in result:
procurement_query.append({
"item_num": row.item_num,
"description": row.description,
"line_cost": row.line_cost,
"unit_cost": row.unit_cost,
"current_balance_total": float(row.current_balance_total) if row.current_balance_total is not None else 0.0,
"po_number": row.po_number,
"pr_issue_date": row.pr_issue_date,
"po_status": row.po_status,
"po_receipts": row.po_receipts,
"po_qty_received": float(row.po_qty_received) if row.po_qty_received is not None else 0.0,
"po_qty_ordered": float(row.po_qty_ordered) if row.po_qty_ordered is not None else 0.0,
"po_estimated_arrival_date": row.po_estimated_arrival_date,
"po_vendor_delivery_date": row.po_vendor_delivery_date
})
for proc_record in procurement_query:
procurement = ProcurementRecord(
po_pr_id=proc_record["po_number"],
sparepart_id=proc_record["item_num"],
sparepart_name=proc_record["description"],
quantity=proc_record["po_qty_ordered"],
unit_cost=proc_record["unit_cost"],
total_cost=proc_record["line_cost"],
order_date=proc_record['pr_issue_date'],
expected_delivery_date=proc_record['po_estimated_arrival_date'],
po_vendor_delivery_date=proc_record['po_vendor_delivery_date'],
status=ProcurementStatus("ordered"),
)
sparepart_manager.add_procurement_record(procurement)
# Calculate projected stocks
sparepart_manager._project_monthly_stocks()
return sparepart_manager