You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
69 lines
2.3 KiB
Python
69 lines
2.3 KiB
Python
|
|
|
|
from fastapi import HTTPException
|
|
from sqlalchemy import Select, Delete, and_
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from .model import OverhaulSchedule
|
|
from .schema import OverhaulScheduleCreate, OverhaulScheduleUpdate
|
|
from typing import Optional
|
|
|
|
from src.database.core import DbSession
|
|
from src.scope.service import get_by_scope_name
|
|
|
|
|
|
async def get(*, db_session: DbSession, overhaul_history_id: str) -> Optional[OverhaulSchedule]:
|
|
"""Returns a document based on the given document id."""
|
|
result = await db_session.get(OverhaulSchedule, overhaul_history_id)
|
|
return result.scalars().one_or_none()
|
|
|
|
|
|
async def get_all(*, db_session: DbSession):
|
|
"""Returns all documents."""
|
|
query = Select(OverhaulSchedule).options(
|
|
selectinload(OverhaulSchedule.scope))
|
|
result = await db_session.execute(query)
|
|
return result.scalars().all()
|
|
|
|
|
|
async def create(*, db_session: DbSession, overhaul_schedule_in: OverhaulScheduleCreate):
|
|
"""Creates a new document."""
|
|
scope = await get_by_scope_name(db_session=db_session, scope_name=overhaul_schedule_in.scope_id)
|
|
|
|
if not scope:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail="Not Found"
|
|
)
|
|
overhaul_schedule_in.scope_id = scope.id
|
|
|
|
overhaul_schedule = OverhaulSchedule(**overhaul_schedule_in.model_dump())
|
|
|
|
db_session.add(overhaul_schedule)
|
|
await db_session.commit()
|
|
results = Select(OverhaulSchedule).options(selectinload(OverhaulSchedule.scope)).filter(OverhaulSchedule.id == overhaul_schedule.id)
|
|
return await db_session.scalar(results)
|
|
|
|
|
|
async def update(*, db_session: DbSession, overhaul_schedule: OverhaulSchedule, overhaul_schedule_in: OverhaulScheduleUpdate):
|
|
"""Updates a document."""
|
|
data = overhaul_schedule_in.model_dump()
|
|
|
|
update_data = overhaul_schedule_in.model_dump(exclude_defaults=True)
|
|
|
|
for field in data:
|
|
if field in update_data:
|
|
setattr(overhaul_schedule, field, update_data[field])
|
|
|
|
await db_session.commit()
|
|
|
|
return overhaul_schedule
|
|
|
|
|
|
async def delete(*, db_session: DbSession, overhaul_schedule_id: str):
|
|
"""Deletes a document."""
|
|
query = Delete(OverhaulSchedule).where(
|
|
overhaul_schedule_id == overhaul_schedule_id)
|
|
await db_session.execute(query)
|
|
await db_session.commit()
|