You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

98 lines
3.2 KiB
Python

import random
from typing import Optional
from sqlalchemy import Delete, Select, and_
from sqlalchemy.orm import selectinload
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.scope_equipment.model import MasterEquipment, MasterEquipmentTree
from src.scope_equipment.service import get_equipment_level_by_no
from .model import ScopeEquipmentJob
from .schema import ScopeEquipmentJobCreate
from src.overhaul_job.model import OverhaulJob
from src.overhaul_activity.model import OverhaulActivity
# async def get(*, db_session: DbSession, scope_equipment_activity_id: str) -> Optional[ScopeEquipmentActivity]:
# """Returns a document based on the given document id."""
# result = await db_session.get(ScopeEquipmentActivity, scope_equipment_activity_id)
# return result
async def get_all(db_session: DbSession, assetnum: Optional[str], common):
# Example usage
if not assetnum:
raise ValueError("assetnum parameter is required")
# First get the parent equipment
equipment_stmt = Select(MasterEquipment).where(MasterEquipment.assetnum == assetnum)
equipment: MasterEquipment = await db_session.scalar(equipment_stmt)
if not equipment:
raise ValueError(f"No equipment found with assetnum: {assetnum}")
# Build query for parts
stmt = (
Select(ScopeEquipmentJob)
.where(ScopeEquipmentJob.assetnum == assetnum)
.options(
selectinload(ScopeEquipmentJob.job),
selectinload(ScopeEquipmentJob.overhaul_jobs)
.selectinload(OverhaulJob.overhaul_activity)
.selectinload(OverhaulActivity.overhaul_scope),
)
)
results = await search_filter_sort_paginate(model=stmt, **common)
return results
async def create(
*, db_session: DbSession, assetnum, scope_job_in: ScopeEquipmentJobCreate
):
scope_jobs = []
if not assetnum:
raise ValueError("assetnum parameter is required")
# First get the parent equipment
equipment_stmt = Select(MasterEquipment).where(MasterEquipment.assetnum == assetnum)
equipment: MasterEquipment = await db_session.scalar(equipment_stmt)
if not equipment:
raise ValueError(f"No equipment found with assetnum: {assetnum}")
for job_id in scope_job_in.job_ids:
scope_equipment_job = ScopeEquipmentJob(assetnum=assetnum, job_id=job_id)
scope_jobs.append(scope_equipment_job)
db_session.add_all(scope_jobs)
await db_session.commit()
return
# async def update(*, db_session: DbSession, activity: ScopeEquipmentActivity, scope_equipment_activty_in: ScopeEquipmentActivityUpdate):
# """Updates a document."""
# data = scope_equipment_activty_in.model_dump()
# update_data = scope_equipment_activty_in.model_dump(exclude_defaults=True)
# for field in data:
# if field in update_data:
# setattr(activity, field, update_data[field])
# await db_session.commit()
# return activity
async def delete(*, db_session: DbSession, scope_job_id):
"""Deletes a document."""
activity = await db_session.get(ScopeEquipmentJob, scope_job_id)
await db_session.delete(activity)
await db_session.commit()