You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
135 lines
4.5 KiB
Python
135 lines
4.5 KiB
Python
|
|
|
|
from fastapi import HTTPException, status
|
|
from sqlalchemy import Select, Delete, desc, func, not_, insert
|
|
|
|
from src.workorder.model import MasterWorkOrder
|
|
from .model import ScopeEquipment, MasterEquipment
|
|
from src.scope.service import get_by_scope_name as get_scope_by_name_service
|
|
from .schema import ScopeEquipmentCreate, ScopeEquipmentUpdate
|
|
from typing import Optional, Union
|
|
from sqlalchemy.orm import selectinload
|
|
from src.database.service import CommonParameters, search_filter_sort_paginate
|
|
|
|
from src.database.core import DbSession
|
|
from src.auth.service import CurrentUser
|
|
|
|
|
|
async def get(*, db_session: DbSession, scope_equipment_id: str) -> Optional[ScopeEquipment]:
|
|
"""Returns a document based on the given document id."""
|
|
query = Select(ScopeEquipment).filter(
|
|
ScopeEquipment.id == scope_equipment_id)
|
|
result = await db_session.execute(query)
|
|
return result.scalars().one_or_none()
|
|
|
|
|
|
async def get_all(*, db_session: DbSession, common, scope_name: str = None, exclude: bool = False):
|
|
"""Returns all documents."""
|
|
query = Select(ScopeEquipment).options(selectinload(
|
|
ScopeEquipment.scope), selectinload(ScopeEquipment.master_equipment))
|
|
|
|
query = query.order_by(desc(ScopeEquipment.created_at))
|
|
|
|
if scope_name:
|
|
scope = await get_scope_by_name_service(db_session=db_session, scope_name=scope_name)
|
|
query = query.filter(ScopeEquipment.scope_id == scope.id) if not exclude else query.filter(
|
|
ScopeEquipment.scope_id != scope.id)
|
|
|
|
results = await search_filter_sort_paginate(model=query, **common)
|
|
return results
|
|
|
|
|
|
async def create(*, db_session: DbSession, scope_equipment_in: ScopeEquipmentCreate):
|
|
"""Creates a new document."""
|
|
# scope_equipment = ScopeEquipment(**scope_equipment_in.model_dump())
|
|
assetnums = scope_equipment_in.assetnums
|
|
scope = get_scope_by_name_service(
|
|
db_session=db_session, scope_name=scope_equipment_in.scope_name)
|
|
|
|
if not scope:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="A scope with this name does not exist.",
|
|
)
|
|
|
|
results = []
|
|
|
|
for assetnum in assetnums:
|
|
stmt = insert(ScopeEquipment).values(
|
|
assetnum=assetnum,
|
|
scope_id=scope.id
|
|
).on_conflict_do_update(
|
|
index_elements=["assetnum"],
|
|
set_={
|
|
"scope_id": scope.id
|
|
}
|
|
)
|
|
|
|
db_session.execute(stmt)
|
|
results.append(assetnum)
|
|
|
|
await db_session.commit()
|
|
return results
|
|
|
|
|
|
async def update(*, db_session: DbSession, scope_equipment: ScopeEquipment, scope_equipment_in: ScopeEquipmentUpdate):
|
|
"""Updates a document."""
|
|
data = scope_equipment_in.model_dump()
|
|
|
|
update_data = scope_equipment_in.model_dump(exclude_defaults=True)
|
|
|
|
for field in data:
|
|
if field in update_data:
|
|
setattr(scope_equipment, field, update_data[field])
|
|
|
|
await db_session.commit()
|
|
|
|
return scope_equipment
|
|
|
|
|
|
async def delete(*, db_session: DbSession, scope_equipment_id: str):
|
|
"""Deletes a document."""
|
|
query = Delete(ScopeEquipment).where(
|
|
ScopeEquipment.id == scope_equipment_id)
|
|
await db_session.execute(query)
|
|
await db_session.commit()
|
|
|
|
|
|
async def get_by_scope_name(*, db_session: DbSession, scope_name: Union[str, list]) -> Optional[ScopeEquipment]:
|
|
"""Returns a document based on the given document id."""
|
|
scope = await get_scope_by_name_service(db_session=db_session, scope_name=scope_name)
|
|
|
|
query = Select(ScopeEquipment)
|
|
|
|
if scope:
|
|
query = query.filter(ScopeEquipment.scope_id == scope.id)
|
|
|
|
result = await db_session.execute(query)
|
|
return result.scalars().all()
|
|
|
|
|
|
async def get_exculed_scope_name(*, db_session: DbSession, scope_name: Union[str, list]) -> Optional[ScopeEquipment]:
|
|
scope = await get_scope_by_name_service(db_session=db_session, scope_name=scope_name)
|
|
|
|
query = Select(ScopeEquipment)
|
|
|
|
if scope:
|
|
query = query.filter(ScopeEquipment.scope_id != scope.id)
|
|
|
|
else:
|
|
query = query.filter(ScopeEquipment.scope_id != None)
|
|
|
|
result = await db_session.execute(query)
|
|
return result.scalars().all()
|
|
|
|
|
|
async def get_all_master_equipment(*, db_session: DbSession, exclude: Optional[str] = None, common: CommonParameters):
|
|
query = Select(MasterEquipment).filter(MasterEquipment.assetnum != None)
|
|
|
|
if exclude:
|
|
query = query.filter(
|
|
not_(MasterEquipment.location_tag.op("~")(f"{exclude.capitalize()}$")))
|
|
|
|
results = await search_filter_sort_paginate(model=query, **common)
|
|
return results
|