feat: add upload file route

main
MrWaradana 2 months ago
parent 7bd1e55a46
commit 7b67922a96

@ -13,6 +13,7 @@ from src.plant_transaction_data.router import router as plant_transaction_data
from src.equipment.router import router as equipment_router
from src.yeardata.router import router as yeardata_router
from src.equipment_master.router import router as equipment_master_router
from src.uploaded_file.router import router as uploaded_file_router
class ErrorMessage(BaseModel):
@ -44,6 +45,10 @@ authenticated_api_router = APIRouter(
dependencies=[Depends(JWTBearer())],
)
authenticated_api_router.include_router(
uploaded_file_router, prefix="/uploaded-files", tags=["uploaded_files"]
)
# Master Data
authenticated_api_router.include_router(
masterdata_router, prefix="/masterdata", tags=["masterdata"]

@ -18,18 +18,22 @@ from src.enums import ResponseStatus
class TimeStampMixin(object):
"""Timestamping mixin"""
def _now():
return datetime.now(pytz.timezone(TIMEZONE))
created_at = Column(
DateTime(timezone=True), default=datetime.now(pytz.timezone(TIMEZONE))
DateTime(timezone=True), default=_now
)
created_at._creation_order = 9998
updated_at = Column(
DateTime(timezone=True), default=datetime.now(pytz.timezone(TIMEZONE))
DateTime(timezone=True), default=_now, onupdate=_now
)
updated_at._creation_order = 9998
@staticmethod
def _updated_at(mapper, connection, target):
target.updated_at = datetime.now(pytz.timezone(TIMEZONE))
target.updated_at = TimeStampMixin._now()
@classmethod
def __declare_last__(cls):

@ -0,0 +1,14 @@
from sqlalchemy import Column, Integer, String, Text
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin
class UploadedFileData(Base, DefaultMixin, IdentityMixin):
__tablename__ = "lcc_uploaded_file"
filename = Column(String, nullable=False)
file_content = Column(Text, nullable=False)
file_url = Column(String, nullable=False)
file_size = Column(Integer, nullable=False)
file_type = Column(String, nullable=False)

@ -0,0 +1,141 @@
from typing import Optional
from fastapi import APIRouter, Form, HTTPException, status, Query, UploadFile, File
from .model import UploadedFileData
from src.uploaded_file.schema import UploadedFileDataCreate, UploadedFileDataUpdate, UploadedFileDataRead, UploadedFileDataPagination
from src.uploaded_file.service import get, get_all, create, update, delete
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.database.core import DbSession
from src.auth.service import CurrentUser
from src.models import StandardResponse
from pathlib import Path
from uuid import uuid4
import os
from datetime import datetime
router = APIRouter()
@router.get("", response_model=StandardResponse[UploadedFileDataPagination])
async def get_uploaded_files(
db_session: DbSession,
common: CommonParameters,
items_per_page: Optional[int] = Query(5),
search: Optional[str] = Query(None),
):
"""Get all uploaded files pagination."""
uploaded_files = await get_all(
db_session=db_session,
items_per_page=items_per_page,
search=search,
common=common,
)
# return
return StandardResponse(
data=uploaded_files,
message="Data retrieved successfully",
)
@router.get("/{uploaded_file_id}", response_model=StandardResponse[UploadedFileDataRead])
async def get_uploaded_file(db_session: DbSession, uploaded_file_id: str):
uploaded_file = await get(db_session=db_session, uploaded_file_id=uploaded_file_id)
if not uploaded_file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(data=uploaded_file, message="Data retrieved successfully")
@router.post("", response_model=StandardResponse[UploadedFileDataRead])
async def create_uploaded_file(
db_session: DbSession,
current_user: CurrentUser,
filename: Optional[str] = Form(None),
file_content: str = Form(...),
uploaded_file: UploadFile = File(...),
):
"""
Accept an UploadFile along with the UploadedFileDataCreate payload.
The file bytes and filename are attached to the input model if possible,
and created_by is set from the current user.
"""
file_read = await uploaded_file.read()
file_name = filename or uploaded_file.filename
file_size = uploaded_file.size or len(file_read)
file_type = uploaded_file.content_type
try:
uploaded_file.file.seek(0)
except Exception:
raise Exception("Could not process uploaded file.")
safe_name = Path(file_name).name
unique_name = f"{safe_name}"
src_dir = Path(__file__).resolve().parent.parent
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
uploads_dir = src_dir / "uploads" / timestamp
os.makedirs(str(uploads_dir), exist_ok=True)
file_path = uploads_dir / unique_name
file_path.write_bytes(file_read)
relative_path = Path(timestamp) / unique_name
file_location = relative_path.as_posix()
file_url = f"/uploads/{file_location}"
uploaded_file_in = UploadedFileDataCreate(
filename=file_name,
file_content=file_content,
file_url=file_url,
file_size=file_size,
file_type=file_type,
created_by=current_user.name,
)
uploaded_file_obj = await create(db_session=db_session, uploaded_file_in=uploaded_file_in)
return StandardResponse(data=uploaded_file_obj, message="Data created successfully")
@router.put("/{uploaded_file_id}", response_model=StandardResponse[UploadedFileDataRead])
async def update_uploaded_file(
db_session: DbSession,
uploaded_file_id: str,
uploaded_file_in: UploadedFileDataUpdate,
current_user: CurrentUser,
):
uploaded_file = await get(db_session=db_session, uploaded_file_id=uploaded_file_id)
if not uploaded_file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
uploaded_file_in.updated_by = current_user.name
return StandardResponse(
data=await update(
db_session=db_session, uploaded_file=uploaded_file, uploaded_file_in=uploaded_file_in
),
message="Data updated successfully",
)
@router.delete("/{uploaded_file_id}", response_model=StandardResponse[UploadedFileDataRead])
async def delete_uploaded_file(db_session: DbSession, uploaded_file_id: str):
uploaded_file = await get(db_session=db_session, uploaded_file_id=uploaded_file_id)
if not uploaded_file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=[{"msg": "A data with this id does not exist."}],
)
await delete(db_session=db_session, uploaded_file_id=uploaded_file_id)
return StandardResponse(message="Data deleted successfully", data=uploaded_file)

@ -0,0 +1,30 @@
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from pydantic import Field
from src.models import DefaultBase, Pagination
class UploadedFileDataBase(DefaultBase):
filename: str = Field(..., nullable=False)
file_content: str = Field(..., nullable=False)
file_url: str = Field(..., nullable=False)
file_size: int = Field(..., nullable=False)
file_type: str = Field(..., nullable=False)
created_at: Optional[datetime] = Field(None, nullable=True)
updated_at: Optional[datetime] = Field(None, nullable=True)
created_by: Optional[str] = Field(None, nullable=True)
updated_by: Optional[str] = Field(None, nullable=True)
class UploadedFileDataCreate(UploadedFileDataBase):
pass
class UploadedFileDataUpdate(UploadedFileDataBase):
pass
class UploadedFileDataRead(UploadedFileDataBase):
id: UUID
wlc_version: Optional[str] = Field(None, nullable=False)
class UploadedFileDataPagination(Pagination):
items: List[UploadedFileDataRead] = []

@ -0,0 +1,66 @@
from sqlalchemy import Select, Delete, cast, String
from src.uploaded_file.model import UploadedFileData
from src.uploaded_file.schema import UploadedFileDataCreate, UploadedFileDataUpdate
from src.database.service import search_filter_sort_paginate
from typing import Optional
from src.database.core import DbSession
from src.auth.service import CurrentUser
async def get(*, db_session: DbSession, uploaded_file_id: str) -> Optional[UploadedFileData]:
"""Returns a document based on the given document id."""
query = Select(UploadedFileData).filter(UploadedFileData.id == uploaded_file_id)
result = await db_session.execute(query)
return result.scalars().one_or_none()
async def get_all(
*,
db_session: DbSession,
items_per_page: Optional[int],
search: Optional[str] = None,
common,
):
"""Returns all documents."""
query = Select(UploadedFileData).order_by(UploadedFileData.created_at.desc())
if search:
query = query.filter(cast(UploadedFileData.filename, String).ilike(f"%{search}%"))
common["items_per_page"] = items_per_page
results = await search_filter_sort_paginate(model=query, **common)
# return results.scalars().all()
return results
async def create(*, db_session: DbSession, uploaded_file_in: UploadedFileDataCreate):
"""Creates a new document."""
uploaded_file = UploadedFileData(**uploaded_file_in.model_dump())
db_session.add(uploaded_file)
await db_session.commit()
return uploaded_file
async def update(
*, db_session: DbSession, uploaded_file: UploadedFileData, uploaded_file_in: UploadedFileDataUpdate
):
"""Updates a document."""
data = uploaded_file_in.model_dump()
update_data = uploaded_file_in.model_dump(exclude_defaults=True)
for field in data:
if field in update_data:
setattr(uploaded_file, field, update_data[field])
await db_session.commit()
return uploaded_file
async def delete(*, db_session: DbSession, uploaded_file_id: str):
"""Deletes a document."""
query = Delete(UploadedFileData).where(UploadedFileData.id == uploaded_file_id)
await db_session.execute(query)
await db_session.commit()
Loading…
Cancel
Save