You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
be-optimumoh/src/sparepart/service.py

1414 lines
58 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from typing import Optional
import asyncio
import logging
from datetime import datetime, timedelta, date
from typing import List, Dict, Optional, Tuple
from collections import defaultdict
from uuid import UUID
import numpy as np
from dataclasses import dataclass
from enum import Enum
from sqlalchemy import Delete, Select, select, text
from sqlalchemy.orm import joinedload, selectinload
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.logging import setup_logging
from src.overhaul_activity.service import get_standard_scope_by_session_id
from src.overhaul_scope.service import get as get_scope, get_overview_overhaul
from src.overhaul_scope.service import get_prev_oh
from src.sparepart.model import SparepartRemark
from src.sparepart.schema import ProcurementRecord, ProcurementStatus, SparepartRequirement, SparepartStock
log = logging.getLogger(__name__)
setup_logging(logger=log)
from sqlalchemy import text
import math
from sqlalchemy import text
# async def get_spareparts_paginated(
# *,
# db_session,
# collector_db_session,
# ):
# """
# Get spare parts for work orders under specific parent WO(s),
# including inventory and PR/PO data.
# """
# # Normalize parent_num to array for SQL ANY()
# # parent_nums = parent_num if isinstance(parent_num, (list, tuple)) else [parent_num]
# parent_nums = []
# data_query = text("""
# WITH selected_wo AS (
# SELECT
# wonum,
# xx_parent,
# location_tag,
# assetnum,
# siteid,
# reportdate
# FROM public.wo_maxim
# WHERE xx_parent = ANY(:parent_nums)
# ),
# wo_materials AS (
# SELECT
# wm.wonum,
# wm.itemnum,
# wm.itemqty,
# wm.inv_itemnum,
# wm.inv_location,
# wm.inv_curbaltotal,
# wm.inv_avgcost,
# sw.location_tag
# FROM public.wo_maxim_material wm
# JOIN selected_wo sw ON wm.wonum = sw.wonum
# ),
# -- PR Lines
# pr_lines AS (
# SELECT
# pl.item_num,
# h.num AS pr_number,
# h.issue_date AS pr_issue_date,
# h.status AS pr_status,
# pl.qty_ordered AS pr_qty_ordered,
# pl.qty_requested AS pr_qty_requested
# FROM public.maximo_sparepart_pr_po h
# JOIN public.maximo_sparepart_pr_po_line pl
# ON h.num = pl.num
# WHERE h.type = 'PR'
# AND EXTRACT(YEAR FROM h.issue_date) >= 2019
# ),
# -- PO Lines
# po_lines AS (
# SELECT
# pl.item_num,
# h.num AS po_number,
# h.estimated_arrival_date AS po_estimated_arrival_date,
# h.vendeliverydate AS po_vendeliverydate,
# h.receipts AS po_receipt,
# h.status AS po_status,
# pl.qty_ordered AS po_qty_ordered,
# pl.qty_received AS po_qty_received
# FROM public.maximo_sparepart_pr_po h
# JOIN public.maximo_sparepart_pr_po_line pl
# ON h.num = pl.num
# WHERE h.type = 'PO'
# AND (h.receipts = 'NONE')
# AND (h.status IS NOT NULL)
# ),
# -- Item Descriptions
# item_descriptions AS (
# SELECT DISTINCT
# item_num,
# FIRST_VALUE(description) OVER (
# PARTITION BY item_num
# ORDER BY created_at DESC NULLS LAST
# ) AS description
# FROM public.maximo_sparepart_pr_po_line
# WHERE description IS NOT NULL
# ),
# -- Unified PR/PO data
# pr_po_unified AS (
# SELECT
# pr.item_num,
# pr.pr_number,
# pr.pr_issue_date,
# pr.pr_qty_ordered,
# pr.pr_status,
# po.po_number,
# COALESCE(po.po_qty_ordered, 0) AS po_qty_ordered,
# COALESCE(po.po_qty_received, 0) AS po_qty_received,
# po.po_estimated_arrival_date,
# po.po_vendeliverydate,
# po.po_receipt,
# po.po_status,
# CASE WHEN po.po_number IS NOT NULL THEN 'YES' ELSE 'NO' END AS po_exists
# FROM pr_lines pr
# LEFT JOIN po_lines po
# ON pr.item_num = po.item_num
# AND pr.pr_number = po.po_number
# ),
# -- Aggregate PR/PO info
# pr_po_agg AS (
# SELECT
# item_num,
# SUM(COALESCE(pr_qty_ordered, 0)) AS total_pr_qty,
# SUM(COALESCE(po_qty_ordered, 0)) AS total_po_qty,
# SUM(COALESCE(po_qty_received, 0)) AS total_po_received,
# JSON_AGG(
# JSON_BUILD_OBJECT(
# 'pr_number', pr_number,
# 'pr_issue_date', pr_issue_date,
# 'pr_qty_requested', pr_qty_ordered,
# 'pr_status', pr_status,
# 'po_exists', po_exists,
# 'po_qty_ordered', po_qty_ordered,
# 'po_qty_received', po_qty_received,
# 'po_estimated_arrival_date', po_estimated_arrival_date,
# 'po_vendeliverydate', po_vendeliverydate,
# 'po_receipt', po_receipt,
# 'po_status', po_status
# )
# ORDER BY pr_issue_date DESC
# ) AS pr_po_details
# FROM pr_po_unified
# GROUP BY item_num
# )
# SELECT
# wm.itemnum,
# COALESCE(id.description, 'No description available') AS item_description,
# SUM(wm.itemqty) AS total_required_for_oh,
# COALESCE(MAX(wm.inv_curbaltotal), 0) AS current_balance_total,
# COALESCE(ap.total_pr_qty, 0) AS total_pr_qty,
# COALESCE(ap.total_po_qty, 0) AS total_po_qty,
# COALESCE(ap.total_po_received, 0) AS total_po_received,
# ap.pr_po_details
# FROM wo_materials wm
# LEFT JOIN item_descriptions id
# ON wm.itemnum = id.item_num
# LEFT JOIN pr_po_agg ap
# ON wm.itemnum = ap.item_num
# GROUP BY
# wm.itemnum, id.description,
# ap.total_pr_qty, ap.total_po_qty, ap.total_po_received, ap.pr_po_details
# ORDER BY wm.itemnum;
# """)
# rows = await collector_db_session.execute(data_query, {"parent_nums": parent_nums})
# spare_parts = []
# for row in rows:
# spare_parts.append({
# "item_num": row.itemnum,
# "description": row.item_description,
# "current_balance_total": float(row.current_balance_total or 0.0),
# "total_required_for_oh": float(row.total_required_for_oh or 0.0),
# "total_pr_qty": row.total_pr_qty,
# "total_po_qty": row.total_po_qty,
# "total_po_received": row.total_po_received,
# "pr_po_details": row.pr_po_details,
# })
# return spare_parts
async def get_spareparts_paginated(*, db_session, collector_db_session):
"""
Get paginated spare parts with usage, inventory, and PR/PO information.
Uses two queries: one for data, one for total count.
Args:
db_session: SQLAlchemy database session
page (int): Page number (1-based)
items_per_page (int): Number of items per page
"""
# calculate limit/offset
# limit = items_per_page
# offset = (page - 1) * items_per_page
# wo_materials AS (
# SELECT
# wm.wonum,
# wm.itemnum,
# wm.itemqty,
# wm.inv_itemnum,
# wm.inv_location,
# wm.inv_curbaltotal,
# wm.inv_avgcost,
# sw.asset_location as location_tag
# FROM public.wo_maxim_material wm
# JOIN oh_workorders sw ON wm.wonum = sw.wonum
# ),
# -----------------------------
# Query #1: Fetch paginated rows
# -----------------------------
data_query = text("""
WITH oh_workorders AS (
SELECT DISTINCT wonum, asset_location, asset_unit
FROM public.wo_staging_maximo_2
WHERE worktype = 'OH'
AND asset_location IS NOT NULL
AND EXTRACT(YEAR FROM reportdate) >= 2019
AND asset_unit IN ('3', '00')
),
wo_materials AS (
SELECT
wm.wonum,
wm.itemnum,
wm.itemqty,
inv.itemnum AS inv_itemnum,
inv.location AS inv_location,
inv.curbaltotal AS inv_curbaltotal,
inv.avgcost AS inv_avgcost,
sw.asset_location as location_tag
FROM public.maximo_workorder_materials wm
JOIN maximo_inventory inv ON inv.itemnum = wm.itemnum
JOIN oh_workorders sw ON wm.wonum = sw.wonum
),
location_sparepart_stats AS (
SELECT location_tag, itemnum,
COUNT(DISTINCT wonum) as total_wo_count,
SUM(itemqty) as total_qty_used,
AVG(itemqty) as avg_qty_per_wo,
MIN(itemqty) as min_qty_used,
MAX(itemqty) as max_qty_used
FROM wo_materials
GROUP BY location_tag, itemnum
HAVING SUM(itemqty) > 0
),
pr_lines AS (
SELECT
pl.item_num,
h.num as pr_number,
h.issue_date as pr_issue_date,
h.status as pr_status,
pl.qty_ordered as pr_qty_ordered,
pl.qty_requested as pr_qty_requested
FROM public.maximo_sparepart_pr_po h
JOIN public.maximo_sparepart_pr_po_line pl ON h.num = pl.num
WHERE h.type = 'PR' AND EXTRACT(YEAR FROM h.issue_date) >= 2023
),
item_descriptions AS (
SELECT DISTINCT
item_num,
FIRST_VALUE(description) OVER (
PARTITION BY item_num
ORDER BY created_at DESC NULLS LAST
) as description
FROM public.maximo_sparepart_pr_po_line
WHERE description IS NOT NULL
),
po_lines AS (
SELECT
pl.item_num,
h.num as po_number,
h.estimated_arrival_date as po_estimated_arrival_date,
h.vendeliverydate as po_vendeliverydate,
h.receipts as po_receipt,
h.status as po_status,
pl.qty_ordered as po_qty_ordered,
pl.qty_received as po_qty_received
FROM public.maximo_sparepart_pr_po h
JOIN public.maximo_sparepart_pr_po_line pl ON h.num = pl.num
WHERE h.type = 'PO'
AND (h.receipts = 'NONE')
AND (h.status IS NOT NULL)
),
pr_po_unified AS (
SELECT
pr.item_num,
pr.pr_number,
pr.pr_issue_date,
pr.pr_qty_ordered,
pr.pr_status,
po.po_number,
COALESCE(po.po_qty_ordered,0) as po_qty_ordered,
COALESCE(po.po_qty_received,0) as po_qty_received,
po.po_estimated_arrival_date,
po.po_vendeliverydate,
po.po_receipt,
po.po_status,
CASE WHEN po.po_number IS NOT NULL THEN 'YES' ELSE 'NO' END as po_exists
FROM pr_lines pr
LEFT JOIN po_lines po
ON pr.item_num = po.item_num
AND pr.pr_number = po.po_number
),
pr_po_agg AS (
SELECT
item_num,
SUM(COALESCE(pr_qty_ordered,0)) as total_pr_qty,
SUM(COALESCE(po_qty_ordered,0)) as total_po_qty,
SUM(COALESCE(po_qty_received,0)) as total_po_received,
JSON_AGG(
JSON_BUILD_OBJECT(
'pr_number', pr_number,
'pr_issue_date', pr_issue_date,
'pr_qty_requested', pr_qty_ordered,
'pr_status', pr_status,
'po_exists', po_exists,
'po_qty_ordered', po_qty_ordered,
'po_qty_received', po_qty_received,
'po_estimated_arrival_date', po_estimated_arrival_date,
'po_vendeliverydate', po_vendeliverydate,
'po_receipt', po_receipt,
'po_status', po_status
) ORDER BY pr_issue_date DESC
) as pr_po_details
FROM pr_po_unified
GROUP BY item_num
),
inv_summary AS (
SELECT
itemnum,
MAX(inv_curbaltotal) AS total_curbaltotal,
AVG(inv_avgcost) AS avg_cost
FROM wo_materials
GROUP BY itemnum
)
SELECT
lss.itemnum,
COALESCE(id.description, 'No description available') as item_description,
lss.total_wo_count,
lss.total_qty_used,
ROUND(CAST(lss.avg_qty_per_wo AS NUMERIC), 2) as avg_qty_per_wo,
lss.min_qty_used,
lss.max_qty_used,
COALESCE(i.total_curbaltotal,0) as current_balance_total,
COALESCE(ap.total_pr_qty,0) as total_pr_qty,
COALESCE(ap.total_po_qty,0) as total_po_qty,
COALESCE(ap.total_po_received,0) as total_po_received,
ap.pr_po_details
FROM location_sparepart_stats lss
LEFT JOIN item_descriptions id ON lss.itemnum = id.item_num
LEFT JOIN inv_summary i ON lss.itemnum = i.itemnum
LEFT JOIN pr_po_agg ap ON lss.itemnum = ap.item_num
ORDER BY lss.location_tag, lss.itemnum;
""")
overhaul = await get_overview_overhaul(db_session=db_session)
standard_overhaul = await get_standard_scope_by_session_id(db_session=db_session, collector_db=collector_db_session, overhaul_session_id=overhaul['overhaul']['id'])
asset_locations = [eq.location_tag for eq in standard_overhaul]
rows = await collector_db_session.execute(
data_query,
{"asset_locations": asset_locations}
)
sparepart_remark = (await db_session.execute(
select(SparepartRemark)
)).scalars().all()
sparepart_remark_dict = {item.itemnum: item.remark for item in sparepart_remark}
spare_parts = []
for row in rows:
spare_parts.append({
"item_num": row.itemnum,
"description": row.item_description,
"remark": sparepart_remark_dict.get(row.itemnum, ""),
"current_balance_total": float(row.current_balance_total) if row.current_balance_total else 0.0,
"total_required_for_oh": float(row.avg_qty_per_wo),
"total_pr_qty": row.total_pr_qty,
"total_po_qty": row.total_po_qty,
"total_po_received": row.total_po_received,
"pr_po_details": row.pr_po_details
})
return spare_parts
class SparepartManager:
"""Manages sparepart availability and procurement for overhaul optimization"""
def __init__(self, analysis_start_date: date, analysis_window_months: int):
self.analysis_start_date = analysis_start_date
self.analysis_window_months = analysis_window_months
self.logger = log
# Storage for sparepart data
self.sparepart_stocks: Dict[str, SparepartStock] = {}
self.equipment_requirements: Dict[str, List[SparepartRequirement]] = {}
self.procurement_records: List[ProcurementRecord] = []
# Monthly projected stocks
self.projected_stocks: Dict[str, List[int]] = {}
def add_sparepart_stock(self, stock: SparepartStock):
"""Add sparepart stock information"""
self.sparepart_stocks[stock.sparepart_id] = stock
def add_equipment_requirements(self, equipment_tag: str, requirements: List[SparepartRequirement]):
"""Add sparepart requirements for equipment"""
self.equipment_requirements[equipment_tag] = requirements
def add_procurement_record(self, record: ProcurementRecord):
"""Add procurement record (PO/PR)"""
self.procurement_records.append(record)
def _calculate_monthly_deliveries(self) -> Dict[str, Dict[int, int]]:
"""Calculate expected deliveries for each sparepart by month"""
deliveries = defaultdict(lambda: defaultdict(int))
for record in self.procurement_records:
if record.status in [ProcurementStatus.ORDERED, ProcurementStatus.PLANNED]:
# Skip records with no expected delivery date (e.g., still PR stage)
if not record.expected_delivery_date:
continue
months_from_start = (
(record.expected_delivery_date.year - self.analysis_start_date.year) * 12 +
(record.expected_delivery_date.month - self.analysis_start_date.month)
)
if 0 <= months_from_start < self.analysis_window_months:
deliveries[record.sparepart_id][months_from_start] += record.quantity
return deliveries
def _project_monthly_stocks(self) -> Dict[str, List[int]]:
"""Project sparepart stock levels for each month"""
projected_stocks = {}
monthly_deliveries = self._calculate_monthly_deliveries()
for sparepart_id, stock_info in self.sparepart_stocks.items():
monthly_stock = []
current_stock = float(stock_info.current_stock)
for month in range(self.analysis_window_months):
# Add any deliveries for this month
if sparepart_id in monthly_deliveries and month in monthly_deliveries[sparepart_id]:
current_stock += float(monthly_deliveries[sparepart_id][month])
monthly_stock.append(current_stock)
# Note: We don't subtract usage here yet - that will be done during optimization
projected_stocks[sparepart_id] = monthly_stock
self.projected_stocks = projected_stocks
return projected_stocks
def check_sparepart_availability(self, equipment_tag: str, target_month: int,
consider_other_overhauls: List[Tuple[str, int]] = None) -> Dict:
"""
Check if spareparts are available for equipment overhaul at target month
Args:
equipment_tag: Equipment location tag
target_month: Month when overhaul is planned (0-based)
consider_other_overhauls: List of (equipment_tag, month) for other planned overhauls
Returns:
Dict with availability status and details
"""
if equipment_tag not in self.equipment_requirements:
return {
'available': True,
'message': f'No sparepart requirements defined for {equipment_tag}',
'missing_parts': [],
'procurement_needed': [],
'total_procurement_cost': 0,
'can_proceed_with_delays': True,
'pr_po_summary': {
'existing_orders': [],
'required_new_orders': [],
'total_existing_value': 0,
'total_new_orders_value': 0
}
}
requirements = self.equipment_requirements[equipment_tag]
missing_parts = []
procurement_needed = []
total_procurement_cost = 0
# Calculate stock after considering other overhauls
adjusted_stocks = self._calculate_adjusted_stocks(target_month, consider_other_overhauls or [])
existing_orders = self._get_existing_orders_for_month(target_month)
pr_po_summary = {
'existing_orders': [],
'required_new_orders': [],
'total_existing_value': 0,
'total_new_orders_value': 0,
'orders_by_status': {
'planned': [],
'ordered': [],
'received': [],
'cancelled': []
}
}
for requirement in requirements:
sparepart_id = requirement.sparepart_id
needed_quantity = requirement.quantity_required
sparepart_name = requirement.sparepart_name
unit_cost = requirement.avg_cost if requirement.avg_cost > 0 else requirement.unit_cost
current_stock = adjusted_stocks.get(sparepart_id, 0)
existing_sparepart_orders = [order for order in existing_orders
if order.sparepart_id == sparepart_id]
total_ordered_quantity = sum(order.quantity for order in existing_sparepart_orders)
effective_stock = current_stock + total_ordered_quantity
if effective_stock >= needed_quantity:
# Sufficient stock available (including from existing orders)
if existing_sparepart_orders:
# Add existing order info to summary
for order in existing_sparepart_orders:
order_info = {
'po_pr_id': order.po_pr_id,
'sparepart_id': sparepart_id,
'sparepart_name': sparepart_name,
'quantity': order.quantity,
'unit_cost': order.unit_cost,
'total_cost': order.total_cost,
'order_date': order.order_date.isoformat(),
'expected_delivery_date': order.expected_delivery_date.isoformat(),
'status': order.status.value,
'months_until_delivery': self._calculate_months_until_delivery(order.expected_delivery_date, target_month),
'is_on_time': self._is_delivery_on_time(order.expected_delivery_date, target_month),
'usage': 'covers_requirement'
}
pr_po_summary['existing_orders'].append(order_info)
pr_po_summary['total_existing_value'] += order.total_cost
pr_po_summary['orders_by_status'][order.status.value].append(order_info)
else:
# Insufficient stock - need additional procurement
shortage = needed_quantity - effective_stock
missing_parts.append({
'sparepart_id': sparepart_id,
'sparepart_name': sparepart_name,
'required': needed_quantity,
'current_stock': current_stock,
'ordered_quantity': total_ordered_quantity,
'effective_available': effective_stock,
'shortage': shortage,
'criticality': "warning",
'existing_orders': len(existing_sparepart_orders),
'existing_orders_details': [
{
'po_pr_id': order.po_pr_id,
'quantity': order.quantity,
'status': order.status.value,
'expected_delivery': order.expected_delivery_date.isoformat(),
'is_on_time': self._is_delivery_on_time(order.expected_delivery_date, target_month)
} for order in existing_sparepart_orders
]
})
# Calculate additional procurement needed
if sparepart_id in self.sparepart_stocks:
procurement_cost = shortage * unit_cost
total_procurement_cost += procurement_cost
# Calculate when to order (considering lead time)
order_month = max(0, target_month - requirement.lead_time)
order_date = self.analysis_start_date + timedelta(days=order_month * 30)
expected_delivery = order_date + timedelta(days=requirement.lead_time * 30)
new_order = {
'sparepart_id': sparepart_id,
'sparepart_name': sparepart_name,
'quantity_needed': shortage,
'unit_cost': unit_cost,
'total_cost': procurement_cost,
'order_by_month': order_month,
'recommended_order_date': order_date.isoformat(),
'expected_delivery_date': expected_delivery.isoformat(),
'lead_time_months': requirement.lead_time,
'criticality': "warning",
'urgency': self._calculate_urgency(order_month, target_month),
'reason': f'Additional {shortage} units needed beyond existing orders'
}
procurement_needed.append(new_order)
pr_po_summary['required_new_orders'].append(new_order)
pr_po_summary['total_new_orders_value'] += procurement_cost
# Check for critical parts
critical_missing = [p for p in missing_parts if p['criticality'] == 'critical']
# Generate comprehensive summary
availability_summary = self._generate_comprehensive_availability_message(
missing_parts, critical_missing, pr_po_summary
)
return {
'available': len(critical_missing) == 0,
'total_missing_parts': len(missing_parts),
'critical_missing_parts': len(critical_missing),
'missing_parts': missing_parts,
'procurement_needed': procurement_needed,
'total_procurement_cost': total_procurement_cost,
'can_proceed_with_delays': len(critical_missing) == 0,
'message': availability_summary['message'],
'detailed_message': availability_summary['detailed_message'],
'pr_po_summary': pr_po_summary,
'recommendations': self._generate_procurement_recommendations(pr_po_summary, target_month)
}
def _calculate_months_until_delivery(self, delivery_date: date, target_month: int) -> int:
"""Calculate months from target month to delivery date"""
target_date = self.analysis_start_date + timedelta(days=target_month * 30)
months_diff = (delivery_date.year - target_date.year) * 12 + (delivery_date.month - target_date.month)
return months_diff
def _calculate_urgency(self, order_month: int, target_month: int) -> str:
"""Calculate urgency level for new procurement"""
time_gap = target_month - order_month
if time_gap <= 1:
return "URGENT"
elif time_gap <= 3:
return "HIGH"
elif time_gap <= 6:
return "MEDIUM"
else:
return "LOW"
def _generate_procurement_recommendations(self, pr_po_summary: Dict, target_month: int) -> List[Dict]:
"""Generate procurement recommendations based on analysis"""
recommendations = []
# Check for late deliveries
late_orders = [order for order in pr_po_summary['existing_orders']
if not order['is_on_time']]
if late_orders:
recommendations.append({
'type': 'LATE_DELIVERY_WARNING',
'priority': 'HIGH',
'message': f"{len(late_orders)} existing orders will deliver late",
'details': [f"PO/PR {order['po_pr_id']} for {order['sparepart_name']}"
for order in late_orders],
'action': 'Expedite delivery or find alternative suppliers'
})
# Check for urgent new orders
urgent_orders = [order for order in pr_po_summary['required_new_orders']
if order['urgency'] == 'URGENT']
if urgent_orders:
recommendations.append({
'type': 'URGENT_PROCUREMENT',
'priority': 'CRITICAL',
'message': f"{len(urgent_orders)} spareparts need immediate ordering",
'details': [f"{order['sparepart_name']}: {order['quantity_needed']} units"
for order in urgent_orders],
'action': 'Place orders immediately or consider expedited delivery'
})
# Check for cancelled orders
cancelled_orders = pr_po_summary['orders_by_status'].get('cancelled', [])
if cancelled_orders:
recommendations.append({
'type': 'CANCELLED_ORDER_IMPACT',
'priority': 'MEDIUM',
'message': f"{len(cancelled_orders)} cancelled orders may affect availability",
'details': [f"Cancelled: PO/PR {order['po_pr_id']} for {order['sparepart_name']}"
for order in cancelled_orders],
'action': 'Review impact and place replacement orders if necessary'
})
# Budget recommendations
total_investment = pr_po_summary['total_existing_value'] + pr_po_summary['total_new_orders_value']
if total_investment > 0:
recommendations.append({
'type': 'BUDGET_SUMMARY',
'priority': 'INFO',
'message': f'Total sparepart investment: Rp. {total_investment:,.2f}',
'details': [
f"Existing orders: Rp. {pr_po_summary['total_existing_value']:,.2f}",
f"Additional orders needed: Rp. {pr_po_summary['total_new_orders_value']:,.2f}"
],
'action': 'Ensure budget allocation for sparepart procurement'
})
return recommendations
def _is_delivery_on_time(self, delivery_date: datetime, target_month: int) -> bool:
"""Check if delivery will arrive on time for target month"""
target_date = self.analysis_start_date + timedelta(days=target_month * 30)
del_time = delivery_date.date() if delivery_date else None
return del_time <= target_date if del_time else False
def _calculate_adjusted_stocks(self, target_month: int, other_overhauls: List[Tuple[str, int]]) -> Dict[str, int]:
"""Calculate stock levels after considering consumption from other planned overhauls"""
adjusted_stocks = {}
for sparepart_id, monthly_stocks in self.projected_stocks.items():
if target_month < len(monthly_stocks):
stock_at_month = monthly_stocks[target_month]
# Subtract consumption from other overhauls happening at or before target month
for other_equipment, other_month in other_overhauls:
if other_month <= target_month and other_equipment in self.equipment_requirements:
for req in self.equipment_requirements[other_equipment]:
if req.sparepart_id == sparepart_id:
stock_at_month -= req.quantity_required
adjusted_stocks[sparepart_id] = max(0, stock_at_month)
return adjusted_stocks
def _get_existing_orders_for_month(self, target_month: int) -> List[ProcurementRecord]:
"""Get existing PR/PO orders that could supply spareparts for target month"""
target_date = self.analysis_start_date + timedelta(days=target_month * 30)
relevant_orders = []
for record in self.procurement_records:
date_expected_delivery = record.expected_delivery_date.date() if record.expected_delivery_date else None
# Include orders that deliver before or around the target month
# and are not cancelled
if (record.status != ProcurementStatus.CANCELLED and date_expected_delivery and
date_expected_delivery <= target_date): # 15 days buffer
relevant_orders.append(record)
return relevant_orders
def _generate_comprehensive_availability_message(self, missing_parts: List[Dict],
critical_missing: List[Dict],
pr_po_summary: Dict) -> Dict:
"""Generate comprehensive availability message with PR/PO details"""
if not missing_parts:
if pr_po_summary['existing_orders']:
message = f"All spareparts available through {len(pr_po_summary['existing_orders'])} existing orders"
detailed_message = f"Total existing order value: Rp. {pr_po_summary['total_existing_value']:,.2f}"
else:
message = "All spareparts available from current stock"
detailed_message = "No additional procurement required"
else:
if critical_missing:
message = f"CRITICAL: {len(critical_missing)} critical spareparts missing. Overhaul cannot proceed."
detailed_message = f"Additional procurement required: Rp. {pr_po_summary['total_new_orders_value']:,.2f}"
else:
message = f"WARNING: {len(missing_parts)} spareparts missing, but no critical parts."
if pr_po_summary['total_new_orders_value'] > 0:
detailed_message = f"Additional procurement required: Rp. {pr_po_summary['total_new_orders_value']:,.2f}. "
else:
detailed_message = ""
if pr_po_summary['existing_orders']:
detailed_message += f"Existing orders cover some requirements (Rp. {pr_po_summary['total_existing_value']:,.2f})."
return {
'message': message,
'detailed_message': detailed_message
}
def _generate_availability_message(self, missing_parts: List[Dict], critical_missing: List[Dict]) -> str:
"""Generate human-readable availability message"""
if not missing_parts:
return "All spareparts available"
if critical_missing:
return f"CRITICAL: {len(critical_missing)} critical spareparts missing. Overhaul cannot proceed."
return f"WARNING: {len(missing_parts)} spareparts missing, but no critical parts. Overhaul can proceed with procurement."
def optimize_procurement_timing(self, planned_overhauls: List[Tuple[str, int]]) -> Dict:
"""
Optimize procurement timing for multiple equipment overhauls
Args:
planned_overhauls: List of (equipment_tag, planned_month) tuples
Returns:
Optimized procurement plan
"""
procurement_plan = []
total_procurement_cost = 0
# Sort overhauls by month
sorted_overhauls = sorted(planned_overhauls, key=lambda x: x[1])
# Track cumulative procurement needs
processed_overhauls = []
for equipment_tag, target_month in sorted_overhauls:
availability = self.check_sparepart_availability(
equipment_tag, target_month, processed_overhauls
)
for procurement in availability['procurement_needed']:
procurement_plan.append({
'equipment_tag': equipment_tag,
'target_overhaul_month': target_month,
**procurement
})
total_procurement_cost += procurement['total_cost']
processed_overhauls.append((equipment_tag, target_month))
# Group by order month for better planning
procurement_by_month = defaultdict(list)
for item in procurement_plan:
procurement_by_month[item['order_by_month']].append(item)
return {
'total_procurement_cost': total_procurement_cost,
'procurement_plan': procurement_plan,
'procurement_by_month': dict(procurement_by_month),
# 'summary': self._generate_procurement_summary(procurement_plan)
}
def _generate_procurement_summary(self, procurement_plan: List[Dict]) -> Dict:
"""Generate procurement summary statistics"""
if not procurement_plan:
return {'message': 'No procurement needed'}
critical_items = [p for p in procurement_plan if p['criticality'] == 'critical']
total_items = len(procurement_plan)
total_cost = sum(p['total_cost'] for p in procurement_plan)
# Group by supplier
by_supplier = defaultdict(list)
for item in procurement_plan:
by_supplier[item['supplier']].append(item)
return {
'total_items': total_items,
'critical_items': len(critical_items),
'total_cost': total_cost,
'unique_spareparts': len(set(p['sparepart_id'] for p in procurement_plan)),
'suppliers_involved': len(by_supplier),
'by_supplier': {
supplier: {
'item_count': len(items),
'total_cost': sum(i['total_cost'] for i in items)
}
for supplier, items in by_supplier.items()
}
}
def get_monthly_procurement_schedule(self) -> Dict[int, List[Dict]]:
"""Get procurement schedule by month"""
if not hasattr(self, '_monthly_schedule'):
self._monthly_schedule = {}
return self._monthly_schedule
def update_projected_stocks_with_consumption(self, equipment_overhauls: List[Tuple[str, int]]):
"""Update projected stocks considering sparepart consumption from overhauls"""
# Create a copy of projected stocks
updated_stocks = {}
for sparepart_id, monthly_stocks in self.projected_stocks.items():
updated_stocks[sparepart_id] = monthly_stocks.copy()
# Apply consumption from overhauls
for equipment_tag, overhaul_month in equipment_overhauls:
if equipment_tag in self.equipment_requirements:
for requirement in self.equipment_requirements[equipment_tag]:
sparepart_id = requirement.sparepart_id
quantity_needed = requirement.quantity_required
if sparepart_id in updated_stocks:
# Reduce stock from overhaul month onwards
for month in range(overhaul_month, len(updated_stocks[sparepart_id])):
updated_stocks[sparepart_id][month] = max(
0, updated_stocks[sparepart_id][month] - quantity_needed
)
return updated_stocks
# Integration functions for database operations
async def load_sparepart_data_from_db(scope, prev_oh_scope, db_session, analysis_window_months = None) -> SparepartManager:
"""Load sparepart data from database"""
# You'll need to implement these queries based on your database schema
# Get scope dates for analysis window
# scope = await get_scope(db_session=db_session, overhaul_session_id=overhaul_session_id)
# prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope)
analysis_start_date = prev_oh_scope.end_date
analysis_window_months = int(((scope.start_date - prev_oh_scope.end_date).days / 30) * 1.2) if not analysis_window_months else analysis_window_months
sparepart_manager = SparepartManager(analysis_start_date, analysis_window_months)
start_date = prev_oh_scope.end_date
end_date = scope.start_date
# Load sparepart stocks
# Example query - adjust based on your schema
query = text("""SELECT
wm.inv_itemnum AS itemnum,
wm.inv_itemsetid AS itemsetid,
wm.inv_location AS location,
MAX(wm.inv_curbaltotal) AS curbaltotal,
AVG(wm.inv_avgcost) AS avgcost,
COALESCE(mspl.description, 'No description available') AS description
FROM public.wo_maxim_material wm
LEFT JOIN public.maximo_sparepart_pr_po_line mspl
ON wm.inv_itemnum = mspl.item_num
WHERE wm.inv_itemnum IS NOT NULL
GROUP BY wm.inv_itemnum, wm.inv_itemsetid, wm.inv_location, mspl.description
""")
log.info("Fetch sparepart")
sparepart_stocks_query = await db_session.execute(query)
for stock_record in sparepart_stocks_query:
stock = SparepartStock(
sparepart_id=stock_record.itemnum,
sparepart_name=stock_record.description,
current_stock=stock_record.curbaltotal,
unit_cost=stock_record.avgcost,
location=stock_record.location or "Unknown",
)
sparepart_manager.add_sparepart_stock(stock)
# parent_nums = []
# query = text("""
# WITH target_wo AS (
# -- Work orders from the given parent(s)
# SELECT
# wonum,
# xx_parent,
# location_tag AS asset_location
# FROM public.wo_maxim
# WHERE xx_parent = ANY(:parent_nums)
# ),
# target_materials AS (
# -- Materials directly linked to target WOs (new requirement data)
# SELECT
# tw.asset_location,
# wm.itemnum,
# wm.inv_avgcost
# SUM(wm.itemqty) AS total_qty_required
# FROM public.wo_maxim_material wm
# JOIN target_wo tw ON wm.wonum = tw.wonum
# WHERE wm.itemnum IS NOT NULL
# GROUP BY tw.asset_location, wm.itemnum
# ),
# -- Historical OH work orders (for lead time reference)
# oh_workorders AS (
# SELECT DISTINCT
# wonum,
# asset_location
# FROM public.wo_staging_maximo_2
# WHERE worktype = 'OH'
# AND asset_location IS NOT NULL
# AND asset_unit IN ('3', '00')
# ),
# sparepart_usage AS (
# SELECT
# oh.asset_location,
# mwm.itemnum,
# mwm.itemqty,
# mwm.wonum
# FROM oh_workorders oh
# INNER JOIN public.wo_maxim_material mwm
# ON oh.wonum = mwm.wonum
# ),
# location_sparepart_stats AS (
# SELECT
# asset_location,
# itemnum,
# COUNT(DISTINCT wonum) as total_wo_count,
# SUM(itemqty) as total_qty_used,
# AVG(itemqty) as avg_qty_per_wo
# FROM sparepart_usage
# GROUP BY asset_location, itemnum
# ),
# pr_po_combined AS (
# SELECT
# mspl.item_num,
# mspl.num,
# mspl.unit_cost,
# mspl.qty_ordered,
# MAX(CASE WHEN mspo.type = 'PR' THEN mspo.issue_date END) as issue_date,
# MAX(CASE WHEN mspo.type = 'PO' THEN mspo.vendeliverydate END) as vendeliverydate,
# MAX(CASE WHEN mspo.type = 'PO' THEN mspo.estimated_arrival_date END) as estimated_arrival_date
# FROM public.maximo_sparepart_pr_po_line mspl
# INNER JOIN public.maximo_sparepart_pr_po mspo
# ON mspl.num = mspo.num
# WHERE mspo.type IN ('PR', 'PO')
# GROUP BY mspl.item_num, mspl.num, mspl.unit_cost, mspl.qty_ordered
# ),
# leadtime_stats AS (
# SELECT
# item_num,
# ROUND(CAST(AVG(
# EXTRACT(EPOCH FROM (
# COALESCE(vendeliverydate, estimated_arrival_date) - issue_date
# )) / 86400 / 30.44
# ) AS NUMERIC), 1) as avg_leadtime_months,
# ROUND(CAST(MIN(
# EXTRACT(EPOCH FROM (
# COALESCE(vendeliverydate, estimated_arrival_date) - issue_date
# )) / 86400 / 30.44
# ) AS NUMERIC), 1) as min_leadtime_months,
# ROUND(CAST(MAX(
# EXTRACT(EPOCH FROM (
# COALESCE(vendeliverydate, estimated_arrival_date) - issue_date
# )) / 86400 / 30.44
# ) AS NUMERIC), 1) as max_leadtime_months,
# COUNT(*) as leadtime_sample_size,
# COUNT(CASE WHEN vendeliverydate IS NOT NULL THEN 1 END) as vendelivery_count,
# COUNT(CASE WHEN vendeliverydate IS NULL AND estimated_arrival_date IS NOT NULL THEN 1 END) as estimated_only_count
# FROM pr_po_combined
# WHERE issue_date IS NOT NULL
# AND COALESCE(vendeliverydate, estimated_arrival_date) IS NOT NULL
# AND COALESCE(vendeliverydate, estimated_arrival_date) > issue_date
# GROUP BY item_num
# ),
# cost_stats AS (
# SELECT
# item_num,
# ROUND(CAST(AVG(unit_cost) AS NUMERIC), 2) as avg_unit_cost,
# ROUND(CAST(MIN(unit_cost) AS NUMERIC), 2) as min_unit_cost,
# ROUND(CAST(MAX(unit_cost) AS NUMERIC), 2) as max_unit_cost,
# COUNT(*) as cost_sample_size,
# ROUND(CAST(AVG(unit_cost * qty_ordered) AS NUMERIC), 2) as avg_order_value,
# ROUND(CAST(SUM(unit_cost * qty_ordered) AS NUMERIC), 2) as total_value_ordered
# FROM pr_po_combined
# WHERE unit_cost IS NOT NULL AND unit_cost > 0
# GROUP BY item_num
# ),
# item_descriptions AS (
# SELECT DISTINCT
# item_num,
# FIRST_VALUE(description) OVER (
# PARTITION BY item_num
# ORDER BY created_at DESC NULLS LAST
# ) as description
# FROM public.maximo_sparepart_pr_po_line
# WHERE description IS NOT NULL
# )
# SELECT
# tr.asset_location,
# tr.itemnum,
# COALESCE(id.description, 'No description available') as item_description,
# tr.total_qty_required AS total_required_for_oh,
# tr.inv_avgcost,
# COALESCE(lt.avg_leadtime_months, 0) as avg_leadtime_months,
# COALESCE(cs.avg_unit_cost, 0) as avg_unit_cost,
# ROUND(CAST(COALESCE(tr.total_qty_required * cs.avg_unit_cost, 0) AS NUMERIC), 2) as estimated_cost_for_oh
# FROM target_materials tr
# LEFT JOIN item_descriptions id ON tr.itemnum = id.item_num
# LEFT JOIN leadtime_stats lt ON tr.itemnum = lt.item_num
# LEFT JOIN cost_stats cs ON tr.itemnum = cs.item_num
# ORDER BY tr.asset_location, tr.itemnum;
# """)
# equipment_requirements_query = await db_session.execute(query, {"parent_nums": parent_nums})
# equipment_requirements = defaultdict(list)
# for req_record in equipment_requirements_query:
# requirement = SparepartRequirement(
# sparepart_id=req_record.itemnum,
# quantity_required=float(req_record.total_required_for_oh or 0.0),
# lead_time=float(req_record.avg_leadtime_months or 0.0),
# sparepart_name=req_record.item_description,
# unit_cost=float(req_record.avg_unit_cost or 0.0),
# avg_cost=float(req_record.inv_avgcost or 0.0),
# )
# equipment_requirements[req_record.asset_location].append(requirement)
# for equipment_tag, requirements in equipment_requirements.items():
# sparepart_manager.add_equipment_requirements(equipment_tag, requirements)
# Load equipment sparepart requirements
# You'll need to create this table/relationship
query = text("""WITH oh_workorders AS (
-- First, get all OH work orders
SELECT DISTINCT
wonum,
asset_location
FROM public.wo_staging_maximo_2
WHERE worktype = 'OH' AND asset_location IS NOT NULL and asset_unit IN ('3', '00') AND EXTRACT(YEAR FROM reportdate) >= 2019
),
sparepart_usage AS (
SELECT
oh.asset_location,
mwm.itemnum,
mwm.itemqty,
mwm.wonum,
mwm.inv_avgcost
FROM oh_workorders oh
INNER JOIN public.wo_maxim_material mwm
ON oh.wonum = mwm.wonum
),
location_sparepart_stats AS (
-- Calculate average usage per sparepart per location
SELECT
asset_location,
itemnum,
COUNT(DISTINCT wonum) as total_wo_count,
SUM(itemqty) as total_qty_used,
AVG(itemqty) as avg_qty_per_wo,
MIN(itemqty) as min_qty_used,
MAX(itemqty) as max_qty_used
FROM sparepart_usage
GROUP BY asset_location, itemnum
),
pr_po_combined AS (
-- Combine PR and PO data by num to get issue_date and delivery dates
SELECT
mspl.item_num,
mspl.num,
mspl.unit_cost,
mspl.qty_ordered,
MAX(CASE WHEN mspo.type = 'PR' THEN mspo.issue_date END) as issue_date,
MAX(CASE WHEN mspo.type = 'PO' THEN mspo.vendeliverydate END) as vendeliverydate,
MAX(CASE WHEN mspo.type = 'PO' THEN mspo.estimated_arrival_date END) as estimated_arrival_date
FROM public.maximo_sparepart_pr_po_line mspl
INNER JOIN public.maximo_sparepart_pr_po mspo
ON mspl.num = mspo.num
WHERE mspo.type IN ('PR', 'PO')
GROUP BY mspl.item_num, mspl.num, mspl.unit_cost, mspl.qty_ordered
),
leadtime_stats AS (
-- Calculate lead time statistics for each item
-- Prioritize vendeliverydate over estimated_arrival_date
SELECT
item_num,
ROUND(CAST(AVG(
EXTRACT(EPOCH FROM (
COALESCE(vendeliverydate, estimated_arrival_date) - issue_date
)) / 86400 / 30.44
) AS NUMERIC), 1) as avg_leadtime_months,
ROUND(CAST(MIN(
EXTRACT(EPOCH FROM (
COALESCE(vendeliverydate, estimated_arrival_date) - issue_date
)) / 86400 / 30.44
) AS NUMERIC), 1) as min_leadtime_months,
ROUND(CAST(MAX(
EXTRACT(EPOCH FROM (
COALESCE(vendeliverydate, estimated_arrival_date) - issue_date
)) / 86400 / 30.44
) AS NUMERIC), 1) as max_leadtime_months,
COUNT(*) as leadtime_sample_size,
-- Additional metrics for transparency
COUNT(CASE WHEN vendeliverydate IS NOT NULL THEN 1 END) as vendelivery_count,
COUNT(CASE WHEN vendeliverydate IS NULL AND estimated_arrival_date IS NOT NULL THEN 1 END) as estimated_only_count
FROM pr_po_combined
WHERE issue_date IS NOT NULL
AND COALESCE(vendeliverydate, estimated_arrival_date) IS NOT NULL
AND COALESCE(vendeliverydate, estimated_arrival_date) > issue_date
GROUP BY item_num
),
cost_stats AS (
-- Calculate cost statistics for each item
SELECT
item_num,
ROUND(CAST(AVG(unit_cost) AS NUMERIC), 2) as avg_unit_cost,
ROUND(CAST(MIN(unit_cost) AS NUMERIC), 2) as min_unit_cost,
ROUND(CAST(MAX(unit_cost) AS NUMERIC), 2) as max_unit_cost,
COUNT(*) as cost_sample_size,
-- Total value statistics
ROUND(CAST(AVG(unit_cost * qty_ordered) AS NUMERIC), 2) as avg_order_value,
ROUND(CAST(SUM(unit_cost * qty_ordered) AS NUMERIC), 2) as total_value_ordered
FROM pr_po_combined
WHERE unit_cost IS NOT NULL AND unit_cost > 0
GROUP BY item_num
),
item_descriptions AS (
-- Get unique descriptions for each item (optimized)
SELECT DISTINCT
item_num,
FIRST_VALUE(description) OVER (
PARTITION BY item_num
ORDER BY created_at DESC NULLS LAST
) as description
FROM public.maximo_sparepart_pr_po_line
WHERE description IS NOT NULL
),
item_inventory as (
SELECT
itemnum,
avgcost
FROM public.maximo_inventory
)
SELECT
lss.asset_location,
lss.itemnum,
COALESCE(id.description, 'No description available') as item_description,
lss.total_wo_count,
lss.total_qty_used,
ROUND(CAST(lss.avg_qty_per_wo AS NUMERIC), 2) as avg_qty_per_wo,
lss.min_qty_used,
lss.max_qty_used,
iin.inv_avgcost,
-- Lead time metrics
COALESCE(lt.avg_leadtime_months, 0) as avg_leadtime_months,
COALESCE(lt.min_leadtime_months, 0) as min_leadtime_months,
COALESCE(lt.max_leadtime_months, 0) as max_leadtime_months,
COALESCE(lt.leadtime_sample_size, 0) as leadtime_sample_size,
COALESCE(lt.vendelivery_count, 0) as vendelivery_count,
COALESCE(lt.estimated_only_count, 0) as estimated_only_count,
-- Cost metrics
COALESCE(cs.avg_unit_cost, 0) as avg_unit_cost,
COALESCE(cs.min_unit_cost, 0) as min_unit_cost,
COALESCE(cs.max_unit_cost, 0) as max_unit_cost,
COALESCE(cs.cost_sample_size, 0) as cost_sample_size,
COALESCE(cs.avg_order_value, 0) as avg_order_value,
COALESCE(cs.total_value_ordered, 0) as total_value_ordered,
-- Estimated total cost for average OH
ROUND(CAST(COALESCE(lss.avg_qty_per_wo * cs.avg_unit_cost, 0) AS NUMERIC), 2) as estimated_cost_per_oh
FROM location_sparepart_stats lss
LEFT JOIN item_descriptions id ON lss.itemnum = id.item_num
LEFT JOIN leadtime_stats lt ON lss.itemnum = lt.item_num
LEFT JOIN cost_stats cs ON lss.itemnum = cs.item_num
LEFT JOIN sparepart_usage iin ON lss.itemnum = iin.itemnum
ORDER BY lss.asset_location, lss.itemnum;""")
equipment_requirements_query = await db_session.execute(query)
equipment_requirements = defaultdict(list)
for req_record in equipment_requirements_query:
requirement = SparepartRequirement(
sparepart_id=req_record.itemnum,
quantity_required=float(req_record.avg_qty_per_wo),
lead_time=float(req_record.avg_leadtime_months),
sparepart_name=req_record.item_description,
unit_cost=float(req_record.avg_unit_cost),
avg_cost=float(req_record.avgcost or 0)
)
equipment_requirements[req_record.asset_location].append(requirement)
for equipment_tag, requirements in equipment_requirements.items():
sparepart_manager.add_equipment_requirements(equipment_tag, requirements)
# Load procurement records (PO/PR)
query = text("""
WITH active_pos AS (
-- Get all POs that are NOT complete (not in inventory yet) and NOT closed
SELECT
pl.item_num,
h.num as po_number,
pl.qty_received,
pl.qty_ordered,
h.estimated_arrival_date,
h.vendeliverydate,
h.receipts as po_receipts,
h.status as po_status,
pl.description,
pl.unit_cost,
pl.line_cost
FROM public.maximo_sparepart_pr_po h
JOIN public.maximo_sparepart_pr_po_line pl
ON h.num = pl.num
WHERE h.type = 'PO'
-- Exclude POs where receipts = 'COMPLETE'
AND (h.receipts IS NULL OR h.receipts != 'COMPLETE')
-- Exclude closed POs
AND (h.status IS NULL OR h.status = 'APPR')
),
po_with_pr_date AS (
-- Force join with PR to ensure every PO has a PR
SELECT
po.*,
pr.issue_date as pr_issue_date
FROM active_pos po
INNER JOIN public.maximo_sparepart_pr_po pr
ON pr.num = po.po_number
AND pr.type = 'PR'
),
WITH item_inventory AS (
SELECT
itemnum,
MAX(inv_curbaltotal) AS current_balance_total,
AVG(inv_avgcost) AS avg_cost
FROM public.wo_maxim_material
WHERE inv_itemnum IS NOT NULL
GROUP BY itemnum
)
SELECT
po.item_num,
po.description,
po.line_cost,
po.unit_cost,
COALESCE(i.current_balance_total, 0) as current_balance_total,
po.po_number,
po.pr_issue_date,
po.po_status,
po.po_receipts,
COALESCE(po.qty_received, 0) as po_qty_received,
COALESCE(po.qty_ordered, 0) as po_qty_ordered,
po.estimated_arrival_date as po_estimated_arrival_date,
po.vendeliverydate as po_vendor_delivery_date
FROM po_with_pr_date po
LEFT JOIN item_inventory i
ON po.item_num = i.itemnum
ORDER BY po.item_num, po.pr_issue_date DESC;
""")
# Execute the query
result = await db_session.execute(query)
# Fetch all results and convert to list of dictionaries
procurement_query = []
for row in result:
procurement_query.append({
"item_num": row.item_num,
"description": row.description,
"line_cost": row.line_cost,
"unit_cost": row.unit_cost,
"current_balance_total": float(row.current_balance_total) if row.current_balance_total is not None else 0.0,
"po_number": row.po_number,
"pr_issue_date": row.pr_issue_date,
"po_status": row.po_status,
"po_receipts": row.po_receipts,
"po_qty_received": float(row.po_qty_received) if row.po_qty_received is not None else 0.0,
"po_qty_ordered": float(row.po_qty_ordered) if row.po_qty_ordered is not None else 0.0,
"po_estimated_arrival_date": row.po_estimated_arrival_date,
"po_vendor_delivery_date": row.po_vendor_delivery_date
})
for proc_record in procurement_query:
procurement = ProcurementRecord(
po_pr_id=proc_record["po_number"],
sparepart_id=proc_record["item_num"],
sparepart_name=proc_record["description"],
quantity=proc_record["po_qty_ordered"],
unit_cost=proc_record["unit_cost"],
total_cost=proc_record["line_cost"],
order_date=proc_record['pr_issue_date'],
expected_delivery_date=proc_record['po_estimated_arrival_date'],
po_vendor_delivery_date=proc_record['po_vendor_delivery_date'],
status=ProcurementStatus("ordered"),
)
sparepart_manager.add_procurement_record(procurement)
# Calculate projected stocks
sparepart_manager._project_monthly_stocks()
return sparepart_manager
async def create_remark(*, db_session, collector_db_session, remark_in):
# Step 1: Check if remark already exists for this itemnum
result = await db_session.execute(
select(SparepartRemark).where(SparepartRemark.itemnum == remark_in.itemnum)
)
existing_remark = result.scalar_one_or_none()
# Step 2: If it already exists, you can decide what to do
if existing_remark:
# Option B: Update existing remark (if needed)
existing_remark.remark = remark_in.remark
await db_session.commit()
await db_session.refresh(existing_remark)
return existing_remark
# Step 3: If it doesnt exist, create new one
new_remark = SparepartRemark(
itemnum=remark_in.itemnum,
remark=remark_in.remark,
)
db_session.add(new_remark)
await db_session.commit()
await db_session.refresh(new_remark)
return new_remark