You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
126 lines
3.6 KiB
Python
126 lines
3.6 KiB
Python
from typing import Optional
|
|
|
|
from fastapi import HTTPException, status
|
|
from sqlalchemy import Delete, Select, func
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from src.auth.service import CurrentUser
|
|
from src.database.core import DbSession
|
|
from src.database.service import search_filter_sort_paginate
|
|
# from src.scope_equipment_job.model import ScopeEquipmentJob
|
|
# from src.overhaul_activity.model import OverhaulActivity
|
|
|
|
from src.workscope_group.model import MasterActivity
|
|
from src.workscope_group_maintenance_type.model import WorkscopeOHType
|
|
from src.overhaul_scope.model import MaintenanceType
|
|
from .model import EquipmentWorkscopeGroup
|
|
from .schema import OverhaulJobCreate
|
|
|
|
|
|
async def get_all(*, common, location_tag: str, scope: Optional[str] = None):
|
|
"""Returns all documents."""
|
|
query = (
|
|
Select(EquipmentWorkscopeGroup)
|
|
.where(EquipmentWorkscopeGroup.location_tag == location_tag)
|
|
)
|
|
|
|
if scope:
|
|
query = (
|
|
query
|
|
.join(EquipmentWorkscopeGroup.workscope_group)
|
|
.join(MasterActivity.oh_types)
|
|
.join(WorkscopeOHType.oh_type)
|
|
.filter(MaintenanceType.name == scope)
|
|
)
|
|
|
|
results = await search_filter_sort_paginate(model=query, **common)
|
|
return results
|
|
|
|
|
|
async def create(
|
|
*, db_session: DbSession, overhaul_equipment_id, overhaul_job_in: OverhaulJobCreate
|
|
):
|
|
overhaul_jobs = []
|
|
|
|
if not overhaul_equipment_id:
|
|
raise ValueError("assetnum parameter is required")
|
|
|
|
equipment_stmt = Select(OverhaulJob).where(
|
|
OverhaulJob.overhaul_activity_id == overhaul_equipment_id
|
|
)
|
|
|
|
equipment = await db_session.scalar(equipment_stmt)
|
|
|
|
for job_id in overhaul_job_in.job_ids:
|
|
overhaul_equipment_job = OverhaulJob(
|
|
overhaul_activity_id=overhaul_equipment_id, scope_equipment_job_id=job_id
|
|
)
|
|
|
|
overhaul_jobs.append(overhaul_equipment_job)
|
|
|
|
db_session.add_all(overhaul_jobs)
|
|
await db_session.commit()
|
|
return overhaul_job_in.job_ids
|
|
|
|
|
|
async def delete(
|
|
*,
|
|
db_session: DbSession,
|
|
overhaul_job_id: str,
|
|
) -> bool:
|
|
"""
|
|
Deletes a scope job and returns success status.
|
|
|
|
Args:
|
|
db_session: Database session
|
|
scope_job_id: ID of the scope job to delete
|
|
user_id: ID of user performing the deletion
|
|
|
|
Returns:
|
|
bool: True if deletion was successful, False otherwise
|
|
|
|
Raises:
|
|
NotFoundException: If scope job doesn't exist
|
|
AuthorizationError: If user lacks delete permission
|
|
"""
|
|
try:
|
|
# Check if job exists
|
|
scope_job = await db_session.get(OverhaulJob, overhaul_job_id)
|
|
if not scope_job:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="A data with this id does not exist.",
|
|
)
|
|
|
|
# Perform deletion
|
|
await db_session.delete(scope_job)
|
|
await db_session.commit()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
await db_session.rollback()
|
|
raise
|
|
|
|
|
|
# async def update(*, db_session: DbSession, scope: OverhaulScope, scope_in: ScopeUpdate):
|
|
# """Updates a document."""
|
|
# data = scope_in.model_dump()
|
|
|
|
# update_data = scope_in.model_dump(exclude_defaults=True)
|
|
|
|
# for field in data:
|
|
# if field in update_data:
|
|
# setattr(scope, field, update_data[field])
|
|
|
|
# await db_session.commit()
|
|
|
|
# return scope
|
|
|
|
|
|
# async def delete(*, db_session: DbSession, scope_id: str):
|
|
# """Deletes a document."""
|
|
# query = Delete(OverhaulScope).where(OverhaulScope.id == scope_id)
|
|
# await db_session.execute(query)
|
|
# await db_session.commit()
|