from sqlalchemy import Select, Delete from src.database.service import search_filter_sort_paginate from .model import PlantMasterData from .schema import PlantMasterDataCreate, PlantMasterDataUpdate from typing import Optional from src.database.core import DbSession from src.auth.service import CurrentUser async def get( *, db_session: DbSession, masterdata_id: str ) -> Optional[PlantMasterData]: """Returns a document based on the given document id.""" query = Select(PlantMasterData).filter(PlantMasterData.id == masterdata_id) result = await db_session.execute(query) return result.scalars().one_or_none() async def get_all( *, db_session: DbSession, items_per_page: int, search: str = None, common ) -> list[PlantMasterData]: """Returns all documents.""" query = Select(PlantMasterData) if search: query = query.filter(PlantMasterData.total_project_cost.ilike(f"%{search}%")) common["items_per_page"] = items_per_page result = await search_filter_sort_paginate(model=query, **common) return result async def create(*, db_session: DbSession, masterdata_in: PlantMasterDataCreate): """Creates a new document.""" masterdata = PlantMasterData(**masterdata_in.model_dump()) db_session.add(masterdata) await db_session.commit() return masterdata async def update( *, db_session: DbSession, masterdata: PlantMasterData, masterdata_in: PlantMasterDataUpdate, ): """Updates a document.""" data = masterdata_in.model_dump() update_data = masterdata_in.model_dump(exclude_defaults=True) for field in data: if field in update_data: setattr(masterdata, field, update_data[field]) await db_session.commit() return masterdata async def delete(*, db_session: DbSession, masterdata_id: str): """Deletes a document.""" query = Delete(PlantMasterData).where(PlantMasterData.id == masterdata_id) await db_session.execute(query) await db_session.commit()