import random from typing import Optional from sqlalchemy import Delete, Select, and_ from sqlalchemy.orm import selectinload from src.auth.service import CurrentUser from src.database.core import DbSession from src.database.service import CommonParameters, search_filter_sort_paginate from src.scope_equipment.model import MasterEquipment, MasterEquipmentTree from src.scope_equipment.service import get_equipment_level_by_no from .model import ScopeEquipmentPart from .schema import ScopeEquipmentActivityCreate, ScopeEquipmentActivityUpdate # async def get(*, db_session: DbSession, scope_equipment_activity_id: str) -> Optional[ScopeEquipmentActivity]: # """Returns a document based on the given document id.""" # result = await db_session.get(ScopeEquipmentActivity, scope_equipment_activity_id) # return result def create_dummy_parts(assetnum: str, count: int = 5): """ Create a list of dummy ScopeEquipmentPart objects with random stock values. Args: assetnum (str): The base asset number to generate dummy parts for. count (int): The number of parts to create. Default is 5. Returns: List[ScopeEquipmentPart]: A list of dummy ScopeEquipmentPart objects. """ parts = [] for i in range(1, count + 1): # Generate a unique part asset number part_assetnum = f"{assetnum}_PART_{i}" stock = random.randint(1, 100) # Random stock value between 1 and 100 parts.append({"assetnum": part_assetnum, "stock": stock}) return parts async def get_all(db_session: DbSession, assetnum: Optional[str]): # Example usage dummy_parts = create_dummy_parts(assetnum, count=10) # if not assetnum: # raise ValueError("assetnum parameter is required") # db_session: DbSession = common.get("db_session") # # First get the parent equipment # equipment_stmt = Select(MasterEquipment).where( # MasterEquipment.assetnum == assetnum) # equipment: MasterEquipment = await db_session.scalar(equipment_stmt) # if not equipment: # raise ValueError(f"No equipment found with assetnum: {assetnum}") # # Build query for parts # stmt = ( # Select(ScopeEquipmentPart) # .join(ScopeEquipmentPart.master_equipments) # .join(MasterEquipment.equipment_tree) # .where( # and_( # MasterEquipment.parent_id == equipment.id, # MasterEquipmentTree.level_no == 4 # ) # ).options(selectinload(ScopeEquipmentPart.master_equipments)) # ) # results = await search_filter_sort_paginate(model=stmt, **common) return dummy_parts # async def create(*, db_session: DbSession, scope_equipment_activty_in: ScopeEquipmentActivityCreate): # activity = ScopeEquipmentActivity( # **scope_equipment_activty_in.model_dump()) # db_session.add(activity) # await db_session.commit() # return activity # async def update(*, db_session: DbSession, activity: ScopeEquipmentActivity, scope_equipment_activty_in: ScopeEquipmentActivityUpdate): # """Updates a document.""" # data = scope_equipment_activty_in.model_dump() # update_data = scope_equipment_activty_in.model_dump(exclude_defaults=True) # for field in data: # if field in update_data: # setattr(activity, field, update_data[field]) # await db_session.commit() # return activity # async def delete(*, db_session: DbSession, scope_equipment_activity_id: str): # """Deletes a document.""" # activity = await db_session.get(ScopeEquipmentActivity, scope_equipment_activity_id) # await db_session.delete(activity) # await db_session.commit()