From 1a5d3c2ec88fd0267a002c7fef8905b1b1bc7853 Mon Sep 17 00:00:00 2001 From: Cizz22 Date: Fri, 17 Oct 2025 09:33:55 +0700 Subject: [PATCH] fix --- src/equipment_sparepart/service.py | 110 +++++++++++++++-------------- 1 file changed, 58 insertions(+), 52 deletions(-) diff --git a/src/equipment_sparepart/service.py b/src/equipment_sparepart/service.py index 70238b1..4d313dd 100644 --- a/src/equipment_sparepart/service.py +++ b/src/equipment_sparepart/service.py @@ -126,91 +126,97 @@ logger = logging.getLogger(__name__) # logger.exception(f"Database query failed: {e}") # raise RuntimeError("Failed to fetch overhaul spare parts data.") from e +from typing import List, Dict, Any, Optional +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.sql import text + async def get_all( - db_session, + db_session: AsyncSession, location_tag: Optional[str] = None, start_year: int = 2023, end_year: Optional[int] = None ) -> List[Dict[str, Any]]: """ - Get overhaul spare parts consumption data with optimized query + Get overhaul spare parts consumption data with optimized query. Args: - db_session: Database session - location_tag: Optional location filter - start_year: Year to start analysis (default: 2023) - end_year: Year to end analysis (default: start_year + 1) + db_session: SQLAlchemy async database session + location_tag: Optional filter for location (asset_location) + start_year: Starting year (default: 2023) + end_year: Ending year (default: start_year + 1) Returns: - List of dictionaries containing spare parts consumption data + List of dictionaries with spare parts consumption data """ - - # Set end year if not provided + # Set default end year if end_year is None: end_year = start_year + 1 - - # Build dynamic query - base_query = """ + + # Build query dynamically + query_str = """ WITH filtered_wo AS ( - SELECT wonum, asset_location - FROM public.wo_staging_maximo_2 + SELECT + wonum, + asset_location + FROM public.wo_staging_maximo_2 WHERE worktype = 'OH' """ - + params = {} - - # Add location filter to CTE if provided + + # Optional filter for location if location_tag: - base_query += " AND asset_location = :location_tag" + query_str += " AND asset_location = :location_tag" params["location_tag"] = location_tag - - # filtered_materials AS ( - # SELECT wonum, itemnum, itemqty, inv_curbaltotal, inv_avgcost - # FROM public.wo_maxim_material - # WHERE wonum IN (SELECT wonum FROM filtered_wo) - # ) - - base_query += """ + + query_str += """ ), filtered_materials AS ( - SELECT wonum, itemnum, itemqty, inv_curbaltotal, inv_avgcost - FROM public.maximo_workorder_materials mat - JOIN public.maximo_inventory AS inv on inv.itemnum = mat.itemnum - WHERE wonum IN (SELECT wonum FROM filtered_wo) - ) + SELECT + mat.wonum, + mat.itemnum, + mat.itemqty, + inv.curbaltotal AS inv_curbaltotal, + inv.avgcost AS inv_avgcost + FROM public.maximo_workorder_materials AS mat + LEFT JOIN public.maximo_inventory AS inv + ON inv.itemnum = mat.itemnum + WHERE mat.wonum IN (SELECT wonum FROM filtered_wo) + ) SELECT fwo.asset_location AS location_tag, ft.itemnum, - spl.description AS sparepart_name, - COALESCE(AVG(ft.itemqty), 0) AS parts_consumed_in_oh, - COALESCE(AVG(ft.inv_avgcost), 0) AS avgcost, - COALESCE(AVG(ft.inv_curbaltotal), 0) AS inv_curbaltotal - FROM filtered_wo fwo - INNER JOIN filtered_materials ft ON fwo.wonum = ft.wonum - LEFT JOIN public.maximo_sparepart_pr_po_line spl ON ft.itemnum = spl.item_num + COALESCE(spl.description, 'Unknown') AS sparepart_name, + SUM(ft.itemqty) AS total_parts_used, + COALESCE(AVG(ft.inv_avgcost), 0) AS avg_cost, + COALESCE(AVG(ft.inv_curbaltotal), 0) AS avg_inventory_balance + FROM filtered_wo AS fwo + INNER JOIN filtered_materials AS ft + ON fwo.wonum = ft.wonum + LEFT JOIN public.maximo_sparepart_pr_po_line AS spl + ON ft.itemnum = spl.item_num GROUP BY fwo.asset_location, ft.itemnum, spl.description - ORDER BY fwo.asset_location, ft.itemnum + ORDER BY fwo.asset_location, ft.itemnum; """ - - query = text(base_query) - + try: - results = await db_session.execute(query, params) - + result = await db_session.execute(text(query_str), params) + rows = result.fetchall() + equipment_parts = [] - for row in results: + for row in rows: equipment_parts.append({ "location_tag": row.location_tag, "itemnum": row.itemnum, "sparepart_name": row.sparepart_name, - "parts_consumed_in_oh": row.parts_consumed_in_oh, - "inv_curbaltotal": float(row.inv_curbaltotal) + "total_parts_used": float(row.total_parts_used or 0), + "avg_cost": float(row.avg_cost or 0), + "avg_inventory_balance": float(row.avg_inventory_balance or 0), }) - + return equipment_parts - + except Exception as e: - # Log the error appropriately in your application - print(f"Database query error: {e}") - raise \ No newline at end of file + print(f"[get_all] Database query error: {e}") + raise