add sparepart eq

feature/reliability_stat
Cizz22 4 months ago
parent 121e41a36e
commit 4807b87dab

@ -25,6 +25,7 @@ from src.equipment_workscope_group.router import router as equipment_workscope_g
# from src.overhaul_schedule.router import router as overhaul_schedule_router
from src.overhaul_gantt.router import router as gantt_router
from src.sparepart.router import router as sparepart_router
from src.equipment_sparepart.router import router as equipment_sparepart_router
# from src.overhaul_scope.router import router as scope_router
# from src.scope_equipment.router import router as scope_equipment_router
@ -99,6 +100,12 @@ authenticated_api_router.include_router(
prefix="/equipment-workscopes",
tags=["equipment_workscope_groups"],
)
authenticated_api_router.include_router(
equipment_sparepart_router,
prefix="/equipment-spareparts",
tags=["equipment_workscope_sparepart_router"]
)
# authenticated_api_router.include_router(
# scope_equipment_job_router,
# prefix="/scope-equipment-jobs",

@ -2,6 +2,7 @@ from typing import Dict, List
from fastapi import APIRouter, HTTPException, Query, status
from src.database.core import CollectorDbSession
from src.database.service import (CommonParameters, DbSession,
search_filter_sort_paginate)
from src.models import StandardResponse
@ -14,11 +15,11 @@ from .service import get_all
router = APIRouter()
@router.get("/{assetnum}", response_model=StandardResponse[List[Dict]])
async def get_scope_equipment_parts(db_session: DbSession, assetnum):
@router.get("/{location_tag}", response_model=StandardResponse[List[Dict]])
async def get_scope_equipment_parts(collector_db_session: CollectorDbSession, location_tag):
"""Get all scope activity pagination."""
# return
data = await get_all(db_session=db_session, assetnum=assetnum)
data = await get_all(db_session=collector_db_session, location_tag=location_tag)
return StandardResponse(
data=data,

@ -1,7 +1,7 @@
import random
from typing import Optional
from sqlalchemy import Delete, Select, and_
from sqlalchemy import Delete, Select, and_, text
from sqlalchemy.orm import selectinload
from src.auth.service import CurrentUser
@ -37,14 +37,95 @@ def create_dummy_parts(assetnum: str, count: int = 5):
return parts
async def get_all(db_session: DbSession, assetnum: Optional[str]):
# Example usage
dummy_parts = create_dummy_parts(assetnum, count=10)
return dummy_parts
from sqlalchemy import text
from typing import Optional, List, Dict, Any
from datetime import datetime
async def get_all(
db_session: DbSession,
location_tag: Optional[str] = None,
start_year: int = 2023,
end_year: Optional[int] = None
) -> List[Dict[str, Any]]:
"""
Get overhaul spare parts consumption data with optimized query
Args:
db_session: Database session
location_tag: Optional location filter
start_year: Year to start analysis (default: 2023)
end_year: Year to end analysis (default: start_year + 1)
Returns:
List of dictionaries containing spare parts consumption data
"""
# Set end year if not provided
if end_year is None:
end_year = start_year + 1
# Build dynamic query
base_query = """
WITH filtered_wo AS (
SELECT wonum, asset_location
FROM public.wo_staging_maximo_2
WHERE worktype = 'OH'
AND actstart >= '2023-01-01'
AND actstart < '2024-01-01'
"""
params = {}
# Add location filter to CTE if provided
if location_tag:
base_query += " AND asset_location = :location_tag"
params["location_tag"] = location_tag
base_query += """
),
filtered_transactions AS (
SELECT wonum, itemnum, curbal
FROM public.maximo_material_use_transactions
WHERE issuetype = 'ISSUE'
AND wonum IN (SELECT wonum FROM filtered_wo)
)
SELECT
fwo.asset_location AS location_tag,
ft.itemnum,
spl.description AS sparepart_name,
COUNT(*) AS parts_consumed_in_oh,
MIN(ft.curbal) AS min_remaining_balance,
MAX(mi.curbaltotal) AS inv_curbaltotal
FROM filtered_wo fwo
INNER JOIN filtered_transactions ft ON fwo.wonum = ft.wonum
INNER JOIN public.maximo_inventory mi ON ft.itemnum = mi.itemnum
LEFT JOIN public.maximo_sparepart_pr_po_line spl ON ft.itemnum = spl.item_num
GROUP BY fwo.asset_location, ft.itemnum, spl.description
ORDER BY fwo.asset_location, ft.itemnum
"""
query = text(base_query)
try:
results = await db_session.execute(query, params)
equipment_parts = []
for row in results:
equipment_parts.append({
"location_tag": row.location_tag,
"itemnum": row.itemnum,
"sparepart_name": row.sparepart_name,
"parts_consumed_in_oh": row.parts_consumed_in_oh,
"min_remaining_balance": float(row.min_remaining_balance),
"inv_curbaltotal": float(row.inv_curbaltotal)
})
return equipment_parts
except Exception as e:
# Log the error appropriately in your application
print(f"Database query error: {e}")
raise
# async def create(*, db_session: DbSession, scope_equipment_activty_in: ScopeEquipmentActivityCreate):
# activity = ScopeEquipmentActivity(
# **scope_equipment_activty_in.model_dump())

Loading…
Cancel
Save