You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

155 lines
5.1 KiB
Python

import random
from typing import Optional
from sqlalchemy import Delete, Select, and_, text
from sqlalchemy.orm import selectinload
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters, search_filter_sort_paginate
from .model import ScopeEquipmentPart
from .schema import ScopeEquipmentActivityCreate, ScopeEquipmentActivityUpdate
# async def get(*, db_session: DbSession, scope_equipment_activity_id: str) -> Optional[ScopeEquipmentActivity]:
# """Returns a document based on the given document id."""
# result = await db_session.get(ScopeEquipmentActivity, scope_equipment_activity_id)
# return result
def create_dummy_parts(assetnum: str, count: int = 5):
"""
Create a list of dummy ScopeEquipmentPart objects with random stock values.
Args:
assetnum (str): The base asset number to generate dummy parts for.
count (int): The number of parts to create. Default is 5.
Returns:
List[ScopeEquipmentPart]: A list of dummy ScopeEquipmentPart objects.
"""
parts = []
for i in range(1, count + 1):
# Generate a unique part asset number
part_assetnum = f"{assetnum}_PART_{i}"
stock = random.randint(1, 100) # Random stock value between 1 and 100
parts.append({"assetnum": part_assetnum, "stock": stock})
return parts
from sqlalchemy import text
from typing import Optional, List, Dict, Any
from datetime import datetime
async def get_all(
db_session: DbSession,
location_tag: Optional[str] = None,
start_year: int = 2023,
end_year: Optional[int] = None
) -> List[Dict[str, Any]]:
"""
Get overhaul spare parts consumption data with optimized query
Args:
db_session: Database session
location_tag: Optional location filter
start_year: Year to start analysis (default: 2023)
end_year: Year to end analysis (default: start_year + 1)
Returns:
List of dictionaries containing spare parts consumption data
"""
# Set end year if not provided
if end_year is None:
end_year = start_year + 1
# Build dynamic query
base_query = """
WITH filtered_wo AS (
SELECT wonum, asset_location
FROM public.wo_staging_maximo_2
WHERE worktype = 'OH'
"""
params = {}
# Add location filter to CTE if provided
if location_tag:
base_query += " AND asset_location = :location_tag"
params["location_tag"] = location_tag
base_query += """
),
filtered_transactions AS (
SELECT wonum, itemnum, curbal
FROM public.maximo_material_use_transactions
WHERE issuetype = 'ISSUE'
AND wonum IN (SELECT wonum FROM filtered_wo)
)
SELECT
fwo.asset_location AS location_tag,
ft.itemnum,
spl.description AS sparepart_name,
COUNT(*) AS parts_consumed_in_oh,
MIN(ft.curbal) AS min_remaining_balance,
MAX(mi.curbaltotal) AS inv_curbaltotal
FROM filtered_wo fwo
INNER JOIN filtered_transactions ft ON fwo.wonum = ft.wonum
INNER JOIN public.maximo_inventory mi ON ft.itemnum = mi.itemnum
LEFT JOIN public.maximo_sparepart_pr_po_line spl ON ft.itemnum = spl.item_num
GROUP BY fwo.asset_location, ft.itemnum, spl.description
ORDER BY fwo.asset_location, ft.itemnum
"""
query = text(base_query)
try:
results = await db_session.execute(query, params)
equipment_parts = []
for row in results:
equipment_parts.append({
"location_tag": row.location_tag,
"itemnum": row.itemnum,
"sparepart_name": row.sparepart_name,
"parts_consumed_in_oh": row.parts_consumed_in_oh,
"min_remaining_balance": float(row.min_remaining_balance),
"inv_curbaltotal": float(row.inv_curbaltotal)
})
return equipment_parts
except Exception as e:
# Log the error appropriately in your application
print(f"Database query error: {e}")
raise
# async def create(*, db_session: DbSession, scope_equipment_activty_in: ScopeEquipmentActivityCreate):
# activity = ScopeEquipmentActivity(
# **scope_equipment_activty_in.model_dump())
# db_session.add(activity)
# await db_session.commit()
# return activity
# async def update(*, db_session: DbSession, activity: ScopeEquipmentActivity, scope_equipment_activty_in: ScopeEquipmentActivityUpdate):
# """Updates a document."""
# data = scope_equipment_activty_in.model_dump()
# update_data = scope_equipment_activty_in.model_dump(exclude_defaults=True)
# for field in data:
# if field in update_data:
# setattr(activity, field, update_data[field])
# await db_session.commit()
# return activity
# async def delete(*, db_session: DbSession, scope_equipment_activity_id: str):
# """Deletes a document."""
# activity = await db_session.get(ScopeEquipmentActivity, scope_equipment_activity_id)
# await db_session.delete(activity)
# await db_session.commit()