Cizz22 3 months ago
parent 8c7b676d86
commit 1a5d3c2ec8

@ -126,91 +126,97 @@ logger = logging.getLogger(__name__)
# logger.exception(f"Database query failed: {e}")
# raise RuntimeError("Failed to fetch overhaul spare parts data.") from e
from typing import List, Dict, Any, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.sql import text
async def get_all(
db_session,
db_session: AsyncSession,
location_tag: Optional[str] = None,
start_year: int = 2023,
end_year: Optional[int] = None
) -> List[Dict[str, Any]]:
"""
Get overhaul spare parts consumption data with optimized query
Get overhaul spare parts consumption data with optimized query.
Args:
db_session: Database session
location_tag: Optional location filter
start_year: Year to start analysis (default: 2023)
end_year: Year to end analysis (default: start_year + 1)
db_session: SQLAlchemy async database session
location_tag: Optional filter for location (asset_location)
start_year: Starting year (default: 2023)
end_year: Ending year (default: start_year + 1)
Returns:
List of dictionaries containing spare parts consumption data
List of dictionaries with spare parts consumption data
"""
# Set end year if not provided
# Set default end year
if end_year is None:
end_year = start_year + 1
# Build dynamic query
base_query = """
# Build query dynamically
query_str = """
WITH filtered_wo AS (
SELECT wonum, asset_location
SELECT
wonum,
asset_location
FROM public.wo_staging_maximo_2
WHERE worktype = 'OH'
"""
params = {}
# Add location filter to CTE if provided
# Optional filter for location
if location_tag:
base_query += " AND asset_location = :location_tag"
query_str += " AND asset_location = :location_tag"
params["location_tag"] = location_tag
# filtered_materials AS (
# SELECT wonum, itemnum, itemqty, inv_curbaltotal, inv_avgcost
# FROM public.wo_maxim_material
# WHERE wonum IN (SELECT wonum FROM filtered_wo)
# )
base_query += """
query_str += """
),
filtered_materials AS (
SELECT wonum, itemnum, itemqty, inv_curbaltotal, inv_avgcost
FROM public.maximo_workorder_materials mat
JOIN public.maximo_inventory AS inv on inv.itemnum = mat.itemnum
WHERE wonum IN (SELECT wonum FROM filtered_wo)
)
SELECT
mat.wonum,
mat.itemnum,
mat.itemqty,
inv.curbaltotal AS inv_curbaltotal,
inv.avgcost AS inv_avgcost
FROM public.maximo_workorder_materials AS mat
LEFT JOIN public.maximo_inventory AS inv
ON inv.itemnum = mat.itemnum
WHERE mat.wonum IN (SELECT wonum FROM filtered_wo)
)
SELECT
fwo.asset_location AS location_tag,
ft.itemnum,
spl.description AS sparepart_name,
COALESCE(AVG(ft.itemqty), 0) AS parts_consumed_in_oh,
COALESCE(AVG(ft.inv_avgcost), 0) AS avgcost,
COALESCE(AVG(ft.inv_curbaltotal), 0) AS inv_curbaltotal
FROM filtered_wo fwo
INNER JOIN filtered_materials ft ON fwo.wonum = ft.wonum
LEFT JOIN public.maximo_sparepart_pr_po_line spl ON ft.itemnum = spl.item_num
COALESCE(spl.description, 'Unknown') AS sparepart_name,
SUM(ft.itemqty) AS total_parts_used,
COALESCE(AVG(ft.inv_avgcost), 0) AS avg_cost,
COALESCE(AVG(ft.inv_curbaltotal), 0) AS avg_inventory_balance
FROM filtered_wo AS fwo
INNER JOIN filtered_materials AS ft
ON fwo.wonum = ft.wonum
LEFT JOIN public.maximo_sparepart_pr_po_line AS spl
ON ft.itemnum = spl.item_num
GROUP BY fwo.asset_location, ft.itemnum, spl.description
ORDER BY fwo.asset_location, ft.itemnum
ORDER BY fwo.asset_location, ft.itemnum;
"""
query = text(base_query)
try:
results = await db_session.execute(query, params)
result = await db_session.execute(text(query_str), params)
rows = result.fetchall()
equipment_parts = []
for row in results:
for row in rows:
equipment_parts.append({
"location_tag": row.location_tag,
"itemnum": row.itemnum,
"sparepart_name": row.sparepart_name,
"parts_consumed_in_oh": row.parts_consumed_in_oh,
"inv_curbaltotal": float(row.inv_curbaltotal)
"total_parts_used": float(row.total_parts_used or 0),
"avg_cost": float(row.avg_cost or 0),
"avg_inventory_balance": float(row.avg_inventory_balance or 0),
})
return equipment_parts
except Exception as e:
# Log the error appropriately in your application
print(f"Database query error: {e}")
print(f"[get_all] Database query error: {e}")
raise
Loading…
Cancel
Save