You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
68 lines
2.0 KiB
Python
68 lines
2.0 KiB
Python
from sqlalchemy import Select, Delete
|
|
|
|
from src.database.service import search_filter_sort_paginate
|
|
from .model import PlantMasterData
|
|
from .schema import PlantMasterDataCreate, PlantMasterDataUpdate
|
|
from typing import Optional
|
|
|
|
from src.database.core import DbSession
|
|
from src.auth.service import CurrentUser
|
|
|
|
|
|
async def get(
|
|
*, db_session: DbSession, masterdata_id: str
|
|
) -> Optional[PlantMasterData]:
|
|
"""Returns a document based on the given document id."""
|
|
query = Select(PlantMasterData).filter(PlantMasterData.id == masterdata_id)
|
|
result = await db_session.execute(query)
|
|
return result.scalars().one_or_none()
|
|
|
|
|
|
async def get_all(
|
|
*, db_session: DbSession, items_per_page: int, search: str = None, common
|
|
) -> list[PlantMasterData]:
|
|
"""Returns all documents."""
|
|
query = Select(PlantMasterData)
|
|
|
|
if search:
|
|
query = query.filter(PlantMasterData.total_project_cost.ilike(f"%{search}%"))
|
|
|
|
common["items_per_page"] = items_per_page
|
|
result = await search_filter_sort_paginate(model=query, **common)
|
|
return result
|
|
|
|
|
|
async def create(*, db_session: DbSession, masterdata_in: PlantMasterDataCreate):
|
|
"""Creates a new document."""
|
|
masterdata = PlantMasterData(**masterdata_in.model_dump())
|
|
db_session.add(masterdata)
|
|
await db_session.commit()
|
|
return masterdata
|
|
|
|
|
|
async def update(
|
|
*,
|
|
db_session: DbSession,
|
|
masterdata: PlantMasterData,
|
|
masterdata_in: PlantMasterDataUpdate,
|
|
):
|
|
"""Updates a document."""
|
|
data = masterdata_in.model_dump()
|
|
|
|
update_data = masterdata_in.model_dump(exclude_defaults=True)
|
|
|
|
for field in data:
|
|
if field in update_data:
|
|
setattr(masterdata, field, update_data[field])
|
|
|
|
await db_session.commit()
|
|
|
|
return masterdata
|
|
|
|
|
|
async def delete(*, db_session: DbSession, masterdata_id: str):
|
|
"""Deletes a document."""
|
|
query = Delete(PlantMasterData).where(PlantMasterData.id == masterdata_id)
|
|
await db_session.execute(query)
|
|
await db_session.commit()
|