Compare commits

...

18 Commits

Author SHA1 Message Date
Cizz22 6128922e80 Merge branch 'main' of 145.223.23.187:DigitalTwin/be-lcca 12 hours ago
MrWaradana f9c8b0d749 fix pydantic error list 3 days ago
MrWaradana 3a7124386b refactor: Centralize `search` and `items_per_page` query parameters into `CommonParams` and remove redundant definitions from specific schema query classes. 3 days ago
MrWaradana 85eca71bf8 fix optional dependencies 3 days ago
MrWaradana fca4d18d2b fix items_per_page problem 3 days ago
MrWaradana 33e85cface fix master data Depends dependecies 3 days ago
MrWaradana 53cf29822b refactor: Update `QueryParams` to inherit from `DefaultBase` with an `itemsPerPage` alias and inject it as a dependency in the masterdata router. 4 days ago
MrWaradana aaabe1b8c4 fix middleware 1 week ago
MrWaradana 8fc47edc1b security revision 1 week ago
MrWaradana 589d5f099f add equipment master schema 2 weeks ago
MrWaradana 6e479406b9 add export route and add image data on response for equipment master 2 weeks ago
MrWaradana 7a4050ee4a feat: Add request validation middleware to enforce security and data integrity checks on items_per_page limitation 2 weeks ago
MrWaradana 18df242c6b feat: route for export all data 2 weeks ago
MrWaradana b11edfd98c jenkins 2 weeks ago
MrWaradana fa22434416 clean uneccesary testing 2 weeks ago
CIzz22 6268859798 Merge pull request 'chore: Comment out all test setup and fixtures in `conftest.py`.' (#3) from CIzz22/be-lcca:main into main
Reviewed-on: DigitalTwin/be-lcca#3
2 weeks ago
CIzz22 1c6176c1f7 Merge pull request 'fix dockerfile' (#2) from CIzz22/be-lcca:main into main
Reviewed-on: DigitalTwin/be-lcca#2
2 weeks ago
CIzz22 2dd0ce14ec Merge pull request 'main' (#1) from CIzz22/be-lcca:main into main
Reviewed-on: DigitalTwin/be-lcca#1
2 weeks ago

21
Jenkinsfile vendored

@ -4,7 +4,6 @@ pipeline {
environment { environment {
DOCKER_HUB_USERNAME = 'aimodocker' DOCKER_HUB_USERNAME = 'aimodocker'
// This creates DOCKER_AUTH_USR and DOCKER_AUTH_PSW // This creates DOCKER_AUTH_USR and DOCKER_AUTH_PSW
DOCKER_AUTH = credentials('aimodocker')
IMAGE_NAME = 'lcca-service' IMAGE_NAME = 'lcca-service'
SERVICE_NAME = 'ahm-app' SERVICE_NAME = 'ahm-app'
@ -55,13 +54,6 @@ pipeline {
// } // }
// } // }
stage('Docker Login') {
steps {
// Fixed variable names based on the 'DOCKER_AUTH' environment key
sh "echo ${DOCKER_AUTH_PSW} | docker login -u ${DOCKER_AUTH_USR} --password-stdin"
}
}
stage('Build & Tag') { stage('Build & Tag') {
steps { steps {
script { script {
@ -75,14 +67,19 @@ pipeline {
} }
} }
stage('Push to Docker Hub') { stage('Docker Login & Push') {
steps { steps {
script { script {
def fullImageName = "${DOCKER_HUB_USERNAME}/${IMAGE_NAME}" def fullImageName = "${DOCKER_HUB_USERNAME}/${IMAGE_NAME}"
sh "docker push ${fullImageName}:${IMAGE_TAG}" withCredentials([usernamePassword(credentialsId: 'aimodocker', passwordVariable: 'DOCKER_PSW', usernameVariable: 'DOCKER_USR')]) {
// Use single quotes to prevent Groovy from interpolating the secret in logs
sh 'echo $DOCKER_PSW | docker login -u $DOCKER_USR --password-stdin'
if (SECONDARY_TAG) { sh "docker push ${fullImageName}:${IMAGE_TAG}"
sh "docker push ${fullImageName}:${SECONDARY_TAG}"
if (SECONDARY_TAG) {
sh "docker push ${fullImageName}:${SECONDARY_TAG}"
}
} }
} }
} }

@ -33,6 +33,23 @@ async def get_yeardatas(
message="Data retrieved successfully", message="Data retrieved successfully",
) )
@router.get("/export-all", response_model=StandardResponse[AcquisitionCostDataPagination])
async def get_yeardatas_export_all(
db_session: DbSession,
common: CommonParameters,
):
"""Get all acquisition_cost_data for export."""
common["all"] = True
get_acquisition_cost_data = await get_all(
db_session=db_session,
items_per_page=-1,
common=common,
)
return StandardResponse(
data=get_acquisition_cost_data,
message="All Acquisition Cost Data retrieved successfully",
)
@router.get("/{acquisition_cost_data_id}", response_model=StandardResponse[AcquisitionCostDataRead]) @router.get("/{acquisition_cost_data_id}", response_model=StandardResponse[AcquisitionCostDataRead])
async def get_acquisition_cost_data(db_session: DbSession, acquisition_cost_data_id: str): async def get_acquisition_cost_data(db_session: DbSession, acquisition_cost_data_id: str):

@ -34,13 +34,4 @@ class AcquisitionCostDataPagination(Pagination):
class ListQueryParams(CommonParams): class ListQueryParams(CommonParams):
items_per_page: Optional[int] = Field( pass
default=5,
ge=1,
le=1000,
description="Number of items per page"
)
search: Optional[str] = Field(
default=None,
description="Search keyword"
)

@ -1,5 +1,5 @@
import logging import logging
from typing import Annotated, List from typing import Annotated, List, Optional
from sqlalchemy import desc, func, or_, Select from sqlalchemy import desc, func, or_, Select
from sqlalchemy_filters import apply_pagination from sqlalchemy_filters import apply_pagination
@ -18,9 +18,11 @@ QueryStr = constr(pattern=r"^[ -~]+$", min_length=1)
def common_parameters( def common_parameters(
db_session: DbSession, # type: ignore db_session: DbSession, # type: ignore
current_user: QueryStr = Query(None, alias="currentUser"), # type: ignore current_user: Optional[str] = Query(None, alias="currentUser"), # type: ignore
current_user_snake: Optional[str] = Query(None, alias="current_user"), # type: ignore
page: int = Query(1, gt=0, lt=2147483647), page: int = Query(1, gt=0, lt=2147483647),
items_per_page: int = Query(5, alias="itemsPerPage", gt=-2, lt=2147483647), items_per_page: Optional[int] = Query(None, alias="items_per_page", gt=-2, lt=2147483647),
items_per_page_camel: Optional[int] = Query(None, alias="itemsPerPage", gt=-2, lt=2147483647),
query_str: QueryStr = Query(None, alias="q"), # type: ignore query_str: QueryStr = Query(None, alias="q"), # type: ignore
filter_spec: QueryStr = Query(None, alias="filter"), # type: ignore filter_spec: QueryStr = Query(None, alias="filter"), # type: ignore
sort_by: List[str] = Query([], alias="sortBy[]"), sort_by: List[str] = Query([], alias="sortBy[]"),
@ -28,15 +30,23 @@ def common_parameters(
all: int = Query(0), all: int = Query(0),
# role: QueryStr = Depends(get_current_role), # role: QueryStr = Depends(get_current_role),
): ):
# Support both snake_case and camelCase for pagination size
final_items_per_page = items_per_page_camel if items_per_page_camel is not None else (
items_per_page if items_per_page is not None else 5
)
# Support both snake_case and camelCase for current user
final_current_user = current_user or current_user_snake
return { return {
"db_session": db_session, "db_session": db_session,
"page": page, "page": page,
"items_per_page": items_per_page, "items_per_page": final_items_per_page,
"query_str": query_str, "query_str": query_str,
"filter_spec": filter_spec, "filter_spec": filter_spec,
"sort_by": sort_by, "sort_by": sort_by,
"descending": descending, "descending": descending,
"current_user": current_user, "current_user": final_current_user,
# "role": role, # "role": role,
"all": bool(all), "all": bool(all),
} }

@ -62,6 +62,23 @@ async def get_equipments(
message="Data retrieved successfully", message="Data retrieved successfully",
) )
@router.get("/export-all", response_model=StandardResponse[EquipmentPagination])
async def get_equipments_export_all(
db_session: DbSession,
common: CommonParameters,
):
"""Get all equipment for export."""
common["all"] = True
equipment_data = await get_all(
db_session=db_session,
items_per_page=-1,
common=common,
)
return StandardResponse(
data=equipment_data,
message="All Equipment Data retrieved successfully",
)
@router.get("/maximo/{assetnum}", response_model=StandardResponse[List[dict]]) @router.get("/maximo/{assetnum}", response_model=StandardResponse[List[dict]])
@ -211,6 +228,18 @@ async def get_calculated_top_10_replacement_priorities(db_session: DbSession, co
message="Top 10 Replacement Priorities Data retrieved successfully", message="Top 10 Replacement Priorities Data retrieved successfully",
) )
@router.get(
"/top-10-replacement-priorities-export-all",
response_model=StandardResponse[EquipmentTop10Pagination],
)
async def get_calculated_top_10_replacement_priorities_all(db_session: DbSession, common: CommonParameters):
common["all"] = True
equipment_data = await get_top_10_replacement_priorities(db_session=db_session, common=common)
return StandardResponse(
data=equipment_data,
message="All Replacement Priorities Data retrieved successfully",
)
@router.get( @router.get(
"/top-10-economic-life", "/top-10-economic-life",
response_model=StandardResponse[EquipmentTop10Pagination], response_model=StandardResponse[EquipmentTop10Pagination],
@ -224,6 +253,18 @@ async def get_calculated_top_10_economic_life(db_session: DbSession, common: Com
message="Top 10 Economic Life Data retrieved successfully", message="Top 10 Economic Life Data retrieved successfully",
) )
@router.get(
"/top-10-economic-life-export-all",
response_model=StandardResponse[EquipmentTop10Pagination],
)
async def get_calculated_top_10_economic_life_all(db_session: DbSession, common: CommonParameters):
common["all"] = True
equipment_data = await get_top_10_economic_life(db_session=db_session, common=common)
return StandardResponse(
data=equipment_data,
message="All Economic Life Data retrieved successfully",
)
@router.get("/tree", response_model=StandardResponse[EquipmentRead]) @router.get("/tree", response_model=StandardResponse[EquipmentRead])
async def get_equipment_tree(): async def get_equipment_tree():

@ -34,9 +34,17 @@ class EquipmentBase(DefaultBase):
updated_by: Optional[str] = Field(None) updated_by: Optional[str] = Field(None)
class EquipmentMasterBase(DefaultBase): class EquipmentMasterBase(DefaultBase):
location_tag: Optional[str] = Field(None) id: Optional[UUID] = Field(None)
assetnum: Optional[str] = Field(None)
name: Optional[str] = Field(None) name: Optional[str] = Field(None)
parent_id: Optional[UUID] = Field(None)
equipment_tree_id: Optional[UUID] = Field(None)
category_id: Optional[UUID] = Field(None)
system_tag: Optional[str] = Field(None)
assetnum: Optional[str] = Field(None)
location_tag: Optional[str] = Field(None)
image_name: Optional[str] = Field(None)
description: Optional[str] = Field(None)
class MasterBase(DefaultBase): class MasterBase(DefaultBase):
assetnum: Optional[str] = Field(None) assetnum: Optional[str] = Field(None)
@ -162,15 +170,5 @@ class CountRemainingLifeResponse(DefaultBase):
critical: int critical: int
class ListQueryParams(CommonParams): class ListQueryParams(CommonParams):
items_per_page: Optional[int] = Field( pass
default=5,
ge=1,
le=1000,
description="Number of items per page",
alias="itemsPerPage"
)
search: Optional[str] = Field(
default=None,
description="Search keyword"
)

@ -31,7 +31,8 @@ class EquipmentMaster(Base, DefaultMixin):
system_tag = Column(String, nullable=True) system_tag = Column(String, nullable=True)
assetnum = Column(String, nullable=True) assetnum = Column(String, nullable=True)
location_tag = Column(String, nullable=True) location_tag = Column(String, nullable=True)
image_name = Column(String, nullable=True)
description = Column(String, nullable=True)
# Relationship definitions # Relationship definitions
# Define both sides of the relationship # Define both sides of the relationship
# parent = relationship( # parent = relationship(

@ -28,6 +28,21 @@ async def get_all_equipment_master_tree(
data=equipment_masters, message="Data retrieved successfully" data=equipment_masters, message="Data retrieved successfully"
) )
@router.get("/export-all", response_model=StandardResponse[EquipmentMasterPaginated])
async def get_all_equipment_master_tree_export_all(
db_session: DbSession,
common: CommonParameters,
):
common["all"] = True
equipment_masters = await get_all_master(
db_session=db_session,
common=common,
)
return StandardResponse(
data=equipment_masters, message="All Equipment Master Data retrieved successfully"
)
@router.get( @router.get(
"/{equipment_master_id}", response_model=StandardResponse[EquipmentMasterRead] "/{equipment_master_id}", response_model=StandardResponse[EquipmentMasterRead]

@ -46,5 +46,4 @@ class EquipmentMasterPaginated(Pagination):
class EquipmentMasterQuery(CommonParams): class EquipmentMasterQuery(CommonParams):
parent_id : Optional[str] = None parent_id : Optional[str] = None
items_per_page : Optional[int] = 5
search : Optional[str] = None search : Optional[str] = None

@ -32,6 +32,23 @@ async def get_yeardatas(
message="Data retrieved successfully", message="Data retrieved successfully",
) )
@router.get("/export-all", response_model=StandardResponse[ManpowerCostPagination])
async def get_yeardatas_export_all(
db_session: DbSession,
common: CommonParameters,
):
"""Get all manpower_cost_data for export."""
common["all"] = True
get_acquisition_cost_data = await get_all(
db_session=db_session,
items_per_page=-1,
common=common,
)
return StandardResponse(
data=get_acquisition_cost_data,
message="All Manpower Cost Data retrieved successfully",
)
@router.get("/{acquisition_cost_data_id}", response_model=StandardResponse[ManpowerCostRead]) @router.get("/{acquisition_cost_data_id}", response_model=StandardResponse[ManpowerCostRead])
async def get_acquisition_cost_data(db_session: DbSession, acquisition_cost_data_id: str): async def get_acquisition_cost_data(db_session: DbSession, acquisition_cost_data_id: str):

@ -34,5 +34,4 @@ class ManpowerCostPagination(Pagination):
class QueryParams(CommonParams): class QueryParams(CommonParams):
items_per_page: Optional[int] = Field(5) pass
search: Optional[str] = Field(None)

@ -32,6 +32,23 @@ async def get_yeardatas(
message="Data retrieved successfully", message="Data retrieved successfully",
) )
@router.get("/export-all", response_model=StandardResponse[ManpowerCostPagination])
async def get_yeardatas_export_all(
db_session: DbSession,
common: CommonParameters,
):
"""Get all manpower_master_data for export."""
common["all"] = True
get_acquisition_cost_data = await get_all(
db_session=db_session,
items_per_page=-1,
common=common,
)
return StandardResponse(
data=get_acquisition_cost_data,
message="All Manpower Master Data retrieved successfully",
)
@router.get("/{acquisition_cost_data_id}", response_model=StandardResponse[ManpowerCostRead]) @router.get("/{acquisition_cost_data_id}", response_model=StandardResponse[ManpowerCostRead])
async def get_acquisition_cost_data(db_session: DbSession, acquisition_cost_data_id: str): async def get_acquisition_cost_data(db_session: DbSession, acquisition_cost_data_id: str):

@ -33,5 +33,4 @@ class ManpowerCostPagination(Pagination):
items: List[ManpowerCostRead] = [] items: List[ManpowerCostRead] = []
class QueryParams(CommonParams): class QueryParams(CommonParams):
items_per_page: Optional[int] = Field(5)
search: Optional[str] = Field(None) search: Optional[str] = Field(None)

@ -1,9 +1,9 @@
from typing import Annotated, Optional, List from typing import Annotated, Optional, List
from fastapi import APIRouter, HTTPException, status, Query from fastapi import APIRouter, HTTPException, status, Query, Depends
from sqlalchemy import Select from sqlalchemy import Select
from src.manpower_cost.schema import QueryParams from .schema import QueryParams
from .model import MasterData from .model import MasterData
from .schema import ( from .schema import (
MasterDataPagination, MasterDataPagination,
@ -25,7 +25,7 @@ router = APIRouter()
async def get_masterdatas( async def get_masterdatas(
db_session: DbSession, db_session: DbSession,
common: CommonParameters, common: CommonParameters,
params: Annotated[QueryParams, Query()], params: Annotated[QueryParams, Depends()],
): ):
"""Get all documents.""" """Get all documents."""
# return # return
@ -40,6 +40,23 @@ async def get_masterdatas(
message="Data retrieved successfully", message="Data retrieved successfully",
) )
@router.get("/export-all", response_model=StandardResponse[MasterDataPagination])
async def get_masterdatas_export_all(
db_session: DbSession,
common: CommonParameters,
):
"""Get all documents for export."""
common["all"] = True
master_datas = await get_all(
db_session=db_session,
items_per_page=-1,
common=common,
)
return StandardResponse(
data=master_datas,
message="All Master Data retrieved successfully",
)
@router.get("/{masterdata_id}", response_model=StandardResponse[MasterDataRead]) @router.get("/{masterdata_id}", response_model=StandardResponse[MasterDataRead])
async def get_masterdata(db_session: DbSession, masterdata_id: str): async def get_masterdata(db_session: DbSession, masterdata_id: str):

@ -2,8 +2,8 @@ from datetime import datetime
from typing import List, Optional from typing import List, Optional
from uuid import UUID from uuid import UUID
from pydantic import BaseModel, Field from pydantic import BaseModel, Field, model_validator
from src.models import DefaultBase, Pagination from src.models import CommonParams, DefaultBase, Pagination
from src.auth.service import CurrentUser from src.auth.service import CurrentUser
@ -52,13 +52,5 @@ class MasterDataPagination(Pagination):
items: List[MasterDataRead] = [] items: List[MasterDataRead] = []
class QueryParams(BaseModel): class QueryParams(CommonParams):
items_per_page: Optional[int] = Field( pass
5,
ge=1,
description="Items per page"
)
search: Optional[str] = Field(
None,
description="Search keyword"
)

@ -45,9 +45,3 @@ class QueryParams(CommonParams):
..., ...,
description="Simulation identifier", description="Simulation identifier",
) )
items_per_page: Optional[int] = Field(
5,
ge=1,
description="Items per page"
)
search: Optional[str] = Field(None)

@ -14,6 +14,37 @@ ALLOWED_MULTI_PARAMS = {
"exclude[]", "exclude[]",
} }
# Whitelist of ALL allowed query parameter names across the application.
# Any param NOT in this set will be rejected.
ALLOWED_QUERY_PARAMS = {
# CommonParameters (from database/service.py common_parameters)
"currentUser",
"page",
"itemsPerPage",
"q",
"filter",
"sortBy[]",
"descending[]",
"all",
# ListQueryParams / QueryParams used across routers
"items_per_page",
"search",
# equipment_master specific
"parent_id",
# masterdata_simulations / plant_transaction_data_simulations specific
"simulation_id",
# exclude
"exclude[]",
}
# Query params that are ONLY allowed for "write" operations (read operations use ALLOWED_QUERY_PARAMS).
# For GET/POST/PUT/etc, whitelisting still applies.
WRITE_METHOD_ALLOWED_PARAMS = {
# Only auth/session params are allowed in query for write methods.
# Data values (like simulation_id) must be in the JSON body for these methods.
"currentUser",
}
MAX_QUERY_PARAMS = 50 MAX_QUERY_PARAMS = 50
MAX_QUERY_LENGTH = 2000 MAX_QUERY_LENGTH = 2000
MAX_JSON_BODY_SIZE = 1024 * 100 # 100 KB MAX_JSON_BODY_SIZE = 1024 * 100 # 100 KB
@ -62,31 +93,31 @@ def has_control_chars(value: str) -> bool:
def inspect_value(value: str, source: str): def inspect_value(value: str, source: str):
if XSS_PATTERN.search(value): if XSS_PATTERN.search(value):
raise HTTPException( raise HTTPException(
status_code=400, status_code=422,
detail=f"Potential XSS payload detected in {source}", detail=f"Potential XSS payload detected in {source}",
) )
if SQLI_PATTERN.search(value): if SQLI_PATTERN.search(value):
raise HTTPException( raise HTTPException(
status_code=400, status_code=422,
detail=f"Potential SQL injection payload detected in {source}", detail=f"Potential SQL injection payload detected in {source}",
) )
if RCE_PATTERN.search(value): if RCE_PATTERN.search(value):
raise HTTPException( raise HTTPException(
status_code=400, status_code=422,
detail=f"Potential RCE payload detected in {source}", detail=f"Potential RCE payload detected in {source}",
) )
if TRAVERSAL_PATTERN.search(value): if TRAVERSAL_PATTERN.search(value):
raise HTTPException( raise HTTPException(
status_code=400, status_code=422,
detail=f"Potential Path Traversal payload detected in {source}", detail=f"Potential Path Traversal payload detected in {source}",
) )
if has_control_chars(value): if has_control_chars(value):
raise HTTPException( raise HTTPException(
status_code=400, status_code=422,
detail=f"Invalid control characters detected in {source}", detail=f"Invalid control characters detected in {source}",
) )
@ -96,7 +127,7 @@ def inspect_json(obj, path="body"):
for key, value in obj.items(): for key, value in obj.items():
if key in FORBIDDEN_JSON_KEYS: if key in FORBIDDEN_JSON_KEYS:
raise HTTPException( raise HTTPException(
status_code=400, status_code=422,
detail=f"Forbidden JSON key detected: {path}.{key}", detail=f"Forbidden JSON key detected: {path}.{key}",
) )
inspect_json(value, f"{path}.{key}") inspect_json(value, f"{path}.{key}")
@ -126,12 +157,28 @@ class RequestValidationMiddleware(BaseHTTPMiddleware):
if len(params) > MAX_QUERY_PARAMS: if len(params) > MAX_QUERY_PARAMS:
raise HTTPException( raise HTTPException(
status_code=400, status_code=422,
detail="Too many query parameters", detail="Too many query parameters",
) )
# ------------------------- # -------------------------
# 2. Duplicate parameters # 2. Query param whitelist
# -------------------------
# For GET, we allow data parameters like page, search, etc.
# For POST, PUT, DELETE, PATCH, we ONLY allow auth/session params.
active_whitelist = ALLOWED_QUERY_PARAMS if request.method == "GET" else WRITE_METHOD_ALLOWED_PARAMS
unknown_params = [
key for key, _ in params if key not in active_whitelist
]
if unknown_params:
raise HTTPException(
status_code=422,
detail=f"Unknown query parameters are not allowed for {request.method} request: {unknown_params}",
)
# -------------------------
# 3. Duplicate parameters
# ------------------------- # -------------------------
counter = Counter(key for key, _ in params) counter = Counter(key for key, _ in params)
duplicates = [ duplicates = [
@ -141,12 +188,48 @@ class RequestValidationMiddleware(BaseHTTPMiddleware):
if duplicates: if duplicates:
raise HTTPException( raise HTTPException(
status_code=400, status_code=422,
detail=f"Duplicate query parameters are not allowed: {duplicates}", detail=f"Duplicate query parameters are not allowed: {duplicates}",
) )
# ------------------------- # -------------------------
# 3. Query param inspection # 4. JSON body inspection & Single source enforcement
# Ensuring data comes from ONLY one source (Query OR Body).
# -------------------------
content_type = request.headers.get("content-type", "")
has_json_header = content_type.startswith("application/json")
# Read body now so we can check if it's actually empty
body = b""
if has_json_header:
body = await request.body()
# We consider it a "JSON body" source ONLY if it's not empty and not just "{}"
has_actual_json_body = has_json_header and body and body.strip() != b"{}"
# Check for data parameters in query (anything whitelisted as 'data' but not 'session/auth')
data_params_in_query = [
key for key, _ in params
if key in ALLOWED_QUERY_PARAMS and key not in WRITE_METHOD_ALLOWED_PARAMS
]
if has_actual_json_body:
# If sending actual JSON body, we forbid any data in query string (one source only)
if data_params_in_query:
raise HTTPException(
status_code=422,
detail=f"Single source enforcement: Data received from both JSON body and query string ({data_params_in_query}). Use only one source.",
)
# Special case: GET with actual body is discouraged/forbidden
if request.method == "GET":
raise HTTPException(
status_code=422,
detail="GET requests must use query parameters, not JSON body.",
)
# -------------------------
# 5. Query param inspection
# ------------------------- # -------------------------
pagination_size_keys = {"size", "itemsPerPage", "per_page", "limit", "items_per_page"} pagination_size_keys = {"size", "itemsPerPage", "per_page", "limit", "items_per_page"}
for key, value in params: for key, value in params:
@ -158,16 +241,24 @@ class RequestValidationMiddleware(BaseHTTPMiddleware):
try: try:
size_val = int(value) size_val = int(value)
if size_val > 50: if size_val > 50:
raise HTTPException(status_code=400, detail=f"Pagination size '{key}' cannot exceed 50") raise HTTPException(
status_code=422,
detail=f"Pagination size '{key}' cannot exceed 50",
)
if size_val % 5 != 0: if size_val % 5 != 0:
raise HTTPException(status_code=400, detail=f"Pagination size '{key}' must be a multiple of 5") raise HTTPException(
status_code=422,
detail=f"Pagination size '{key}' must be a multiple of 5",
)
except ValueError: except ValueError:
raise HTTPException(status_code=400, detail=f"Pagination size '{key}' must be an integer") raise HTTPException(
status_code=422,
detail=f"Pagination size '{key}' must be an integer",
)
# ------------------------- # -------------------------
# 4. Content-Type sanity # 6. Content-Type sanity
# ------------------------- # -------------------------
content_type = request.headers.get("content-type", "")
if content_type and not any( if content_type and not any(
content_type.startswith(t) content_type.startswith(t)
for t in ( for t in (
@ -182,32 +273,22 @@ class RequestValidationMiddleware(BaseHTTPMiddleware):
) )
# ------------------------- # -------------------------
# 5. JSON body inspection # 7. JSON body inspection & Re-injection
# ------------------------- # -------------------------
if content_type.startswith("application/json"): if has_json_header:
body = await request.body()
#if len(body) > MAX_JSON_BODY_SIZE:
# raise HTTPException(
# status_code=413,
# detail="JSON body too large",
# )
if body: if body:
try: try:
payload = json.loads(body) payload = json.loads(body)
except json.JSONDecodeError: except json.JSONDecodeError:
raise HTTPException( raise HTTPException(
status_code=400, status_code=422,
detail="Invalid JSON body", detail="Invalid JSON body",
) )
inspect_json(payload) inspect_json(payload)
# Re-inject body for downstream handlers # Re-inject body for downstream handlers
async def receive(): async def receive():
return {"type": "http.request", "body": body} return {"type": "http.request", "body": body}
request._receive = receive # noqa: protected-access request._receive = receive # noqa: protected-access
return await call_next(request) return await call_next(request)

@ -2,7 +2,7 @@
from datetime import datetime from datetime import datetime
from typing import Generic, List, Optional, TypeVar from typing import Generic, List, Optional, TypeVar
import uuid import uuid
from pydantic import BaseModel, Field, SecretStr, ConfigDict from pydantic import BaseModel, Field, SecretStr, ConfigDict, model_validator
from sqlalchemy import Column, DateTime, String, func, event from sqlalchemy import Column, DateTime, String, func, event
from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
@ -100,16 +100,29 @@ class StandardResponse(BaseModel, Generic[T]):
class CommonParams(DefaultBase): class CommonParams(DefaultBase):
# This ensures no extra query params are allowed # This ensures no extra query params are allowed
current_user: Optional[str] = Field(None, alias="currentUser") current_user: Optional[str] = Field(None, alias="current_user")
currentUser: Optional[str] = Field(None, description="Alias for current_user")
page: int = Field(1, gt=0, lt=2147483647) page: int = Field(1, gt=0, lt=2147483647)
items_per_page: int = Field(5, gt=-2, lt=2147483647, alias="itemsPerPage") items_per_page: int = Field(5, gt=-2, lt=2147483647, alias="items_per_page")
itemsPerPage: Optional[int] = Field(None, description="Alias for items_per_page")
query_str: Optional[str] = Field(None, alias="q") query_str: Optional[str] = Field(None, alias="q")
search: Optional[str] = Field(None, description="Search keyword")
filter_spec: Optional[str] = Field(None, alias="filter") filter_spec: Optional[str] = Field(None, alias="filter")
sort_by: List[str] = Field(default_factory=list, alias="sortBy[]") sort_by: List[str] = Field(default=[], alias="sortBy[]")
descending: List[bool] = Field(default_factory=list, alias="descending[]") descending: List[bool] = Field(default=[], alias="descending[]")
exclude: List[str] = Field(default_factory=list, alias="exclude[]") exclude: List[str] = Field(default=[], alias="exclude[]")
all_params: int = Field(0, alias="all") all_params: int = Field(0, alias="all")
@model_validator(mode="before")
@classmethod
def resolve_aliases(cls, data: any) -> any:
if isinstance(data, dict):
if "itemsPerPage" in data and data["itemsPerPage"] is not None:
data.setdefault("items_per_page", data["itemsPerPage"])
if "currentUser" in data and data["currentUser"] is not None:
data.setdefault("current_user", data["currentUser"])
return data
# Property to mirror your original return dict's bool conversion # Property to mirror your original return dict's bool conversion
@property @property
def is_all(self) -> bool: def is_all(self) -> bool:

@ -1,4 +1,4 @@
from typing import List, Optional from typing import Annotated, List, Optional
from uuid import UUID from uuid import UUID
from fastapi import APIRouter, HTTPException, Query, status from fastapi import APIRouter, HTTPException, Query, status
@ -16,6 +16,7 @@ from .schema import (
PlantFSTransactionDataRead, PlantFSTransactionDataRead,
PlantFSTransactionDataUpdate, PlantFSTransactionDataUpdate,
PlantFSChartData, PlantFSChartData,
ListQueryParams,
) )
from .service import create, delete, get, get_all, update, update_fs_charts_from_matrix, get_charts from .service import create, delete, get, get_all, update, update_fs_charts_from_matrix, get_charts
@ -28,15 +29,14 @@ router = APIRouter()
async def list_fs_transactions( async def list_fs_transactions(
db_session: DbSession, db_session: DbSession,
common: CommonParameters, common: CommonParameters,
items_per_page: Optional[int] = Query(5), params: Annotated[ListQueryParams, Query()],
search: Optional[str] = Query(None),
): ):
"""Return paginated financial statement transaction data.""" """Return paginated financial statement transaction data."""
records = await get_all( records = await get_all(
db_session=db_session, db_session=db_session,
items_per_page=items_per_page, items_per_page=params.items_per_page,
search=search, search=params.search,
common=common, common=common,
) )
@ -166,8 +166,3 @@ async def delete_fs_transaction(
await delete(db_session=db_session, fs_transaction_id=str(fs_transaction_id)) await delete(db_session=db_session, fs_transaction_id=str(fs_transaction_id))
return StandardResponse(data=record, message="Data deleted successfully") return StandardResponse(data=record, message="Data deleted successfully")

@ -4,7 +4,7 @@ from uuid import UUID
from pydantic import Field from pydantic import Field
from src.models import DefaultBase, Pagination from src.models import CommonParams, DefaultBase, Pagination
class PlantFSTransactionDataBase(DefaultBase): class PlantFSTransactionDataBase(DefaultBase):
@ -100,3 +100,11 @@ class PlantFSChartData(DefaultBase):
bep_year: Optional[int] = Field(None, ge=0, le=9999) bep_year: Optional[int] = Field(None, ge=0, le=9999)
bep_total_lcc: Optional[float] = Field(None, ge=0, le=1_000_000_000_000_000) bep_total_lcc: Optional[float] = Field(None, ge=0, le=1_000_000_000_000_000)
class ListQueryParams(CommonParams):
search: Optional[str] = Field(
default=None,
description="Search keyword",
)

@ -1,4 +1,4 @@
from typing import Optional from typing import Annotated, Optional
from fastapi import APIRouter, HTTPException, status, Query from fastapi import APIRouter, HTTPException, status, Query
from .model import PlantMasterData from .model import PlantMasterData
@ -7,6 +7,7 @@ from .schema import (
PlantMasterDataRead, PlantMasterDataRead,
PlantMasterDataCreate, PlantMasterDataCreate,
PlantMasterDataUpdate, PlantMasterDataUpdate,
ListQueryParams,
) )
from .service import get, get_all, create, update, delete from .service import get, get_all, create, update, delete
@ -22,15 +23,14 @@ router = APIRouter()
async def get_masterdatas( async def get_masterdatas(
db_session: DbSession, db_session: DbSession,
common: CommonParameters, common: CommonParameters,
items_per_page: Optional[int] = Query(5), params: Annotated[ListQueryParams, Query()],
search: Optional[str] = Query(None),
): ):
"""Get all documents.""" """Get all documents."""
# return # return
master_datas = await get_all( master_datas = await get_all(
db_session=db_session, db_session=db_session,
items_per_page=items_per_page, items_per_page=params.items_per_page,
search=search, search=params.search,
common=common, common=common,
) )
return StandardResponse( return StandardResponse(
@ -38,6 +38,23 @@ async def get_masterdatas(
message="Data retrieved successfully", message="Data retrieved successfully",
) )
@router.get("/export-all", response_model=StandardResponse[PlantMasterDataPagination])
async def get_masterdatas_export_all(
db_session: DbSession,
common: CommonParameters,
):
"""Get all documents for export."""
common["all"] = True
master_datas = await get_all(
db_session=db_session,
items_per_page=-1,
common=common,
)
return StandardResponse(
data=master_datas,
message="All Plant Master Data retrieved successfully",
)
@router.get("/{masterdata_id}", response_model=StandardResponse[PlantMasterDataRead]) @router.get("/{masterdata_id}", response_model=StandardResponse[PlantMasterDataRead])
async def get_masterdata(db_session: DbSession, masterdata_id: str): async def get_masterdata(db_session: DbSession, masterdata_id: str):

@ -3,7 +3,7 @@ from typing import List, Optional
from uuid import UUID from uuid import UUID
from pydantic import Field from pydantic import Field
from src.models import DefaultBase, Pagination from src.models import CommonParams, DefaultBase, Pagination
from src.auth.service import CurrentUser from src.auth.service import CurrentUser
@ -85,3 +85,11 @@ class PlantMasterDataRead(PlantMasterdataBase):
class PlantMasterDataPagination(Pagination): class PlantMasterDataPagination(Pagination):
items: List[PlantMasterDataRead] = [] items: List[PlantMasterDataRead] = []
class ListQueryParams(CommonParams):
search: Optional[str] = Field(
default=None,
description="Search keyword",
)

@ -1,4 +1,4 @@
from typing import List, Optional from typing import Annotated, List, Optional
from fastapi import APIRouter, HTTPException, status, Query from fastapi import APIRouter, HTTPException, status, Query
from .model import PlantTransactionData from .model import PlantTransactionData
@ -10,6 +10,7 @@ from .schema import (
PlantTransactionDataCreate, PlantTransactionDataCreate,
PlantTransactionDataUpdate, PlantTransactionDataUpdate,
PlantTransactionFSImport, PlantTransactionFSImport,
ListQueryParams,
) )
from .service import ( from .service import (
get, get,
@ -33,14 +34,13 @@ router = APIRouter()
async def get_transaction_datas( async def get_transaction_datas(
db_session: DbSession, db_session: DbSession,
common: CommonParameters, common: CommonParameters,
items_per_page: Optional[int] = Query(5), params: Annotated[ListQueryParams, Query()],
search: Optional[str] = Query(None),
): ):
"""Get all transaction_data pagination.""" """Get all transaction_data pagination."""
plant_transaction_data = await get_all( plant_transaction_data = await get_all(
db_session=db_session, db_session=db_session,
items_per_page=items_per_page, items_per_page=params.items_per_page,
search=search, search=params.search,
common=common, common=common,
) )
# return # return
@ -49,6 +49,23 @@ async def get_transaction_datas(
message="Data retrieved successfully", message="Data retrieved successfully",
) )
@router.get("/export-all", response_model=StandardResponse[PlantTransactionDataPagination])
async def get_transaction_datas_export_all(
db_session: DbSession,
common: CommonParameters,
):
"""Get all transaction_data for export."""
common["all"] = True
plant_transaction_data = await get_all(
db_session=db_session,
items_per_page=-1,
common=common,
)
return StandardResponse(
data=plant_transaction_data,
message="All Plant Transaction Data retrieved successfully",
)
@router.get("/charts", response_model=StandardResponse[PlantChartData]) @router.get("/charts", response_model=StandardResponse[PlantChartData])
async def get_chart_data(db_session: DbSession, common: CommonParameters): async def get_chart_data(db_session: DbSession, common: CommonParameters):
chart_data, bep_year, bep_total_lcc = await get_charts( chart_data, bep_year, bep_total_lcc = await get_charts(

@ -3,7 +3,7 @@ from typing import Any, List, Optional
from uuid import UUID from uuid import UUID
from pydantic import Field from pydantic import Field
from src.models import DefaultBase, Pagination from src.models import CommonParams, DefaultBase, Pagination
class PlantTransactionDataBase(DefaultBase): class PlantTransactionDataBase(DefaultBase):
@ -117,3 +117,8 @@ class PlantTransactionDataRead(PlantTransactionDataBase):
class PlantTransactionDataPagination(Pagination): class PlantTransactionDataPagination(Pagination):
items: List[PlantTransactionDataRead] = [] items: List[PlantTransactionDataRead] = []
class ListQueryParams(CommonParams):
pass

@ -1,4 +1,4 @@
from typing import List, Optional from typing import Annotated, List, Optional
from uuid import UUID from uuid import UUID
from fastapi import APIRouter, HTTPException, status, Query from fastapi import APIRouter, HTTPException, status, Query
@ -11,6 +11,7 @@ from src.plant_transaction_data_simulations.schema import (
PlantTransactionDataSimulationsCreate, PlantTransactionDataSimulationsCreate,
PlantTransactionDataSimulationsUpdate, PlantTransactionDataSimulationsUpdate,
PlantTransactionFSImportSimulations, PlantTransactionFSImportSimulations,
ListQueryParams,
) )
from src.plant_transaction_data_simulations.service import ( from src.plant_transaction_data_simulations.service import (
get, get,
@ -34,17 +35,15 @@ router = APIRouter()
async def get_transaction_datas( async def get_transaction_datas(
db_session: DbSession, db_session: DbSession,
common: CommonParameters, common: CommonParameters,
simulation_id: UUID = Query(..., description="Simulation identifier"), params: Annotated[ListQueryParams, Query()],
items_per_page: Optional[int] = Query(5),
search: Optional[str] = Query(None),
): ):
"""Get all transaction_data pagination.""" """Get all transaction_data pagination."""
plant_transaction_data = await get_all( plant_transaction_data = await get_all(
db_session=db_session, db_session=db_session,
items_per_page=items_per_page, items_per_page=params.items_per_page,
search=search, search=params.search,
common=common, common=common,
simulation_id=simulation_id, simulation_id=params.simulation_id,
) )
# return # return
return StandardResponse( return StandardResponse(
@ -52,6 +51,25 @@ async def get_transaction_datas(
message="Data retrieved successfully", message="Data retrieved successfully",
) )
@router.get("/export-all", response_model=StandardResponse[PlantTransactionDataSimulationsPagination])
async def get_transaction_datas_export_all(
db_session: DbSession,
common: CommonParameters,
simulation_id: UUID = Query(..., description="Simulation identifier"),
):
"""Get all transaction_data for export."""
common["all"] = True
plant_transaction_data = await get_all(
db_session=db_session,
items_per_page=-1,
common=common,
simulation_id=simulation_id,
)
return StandardResponse(
data=plant_transaction_data,
message="All Plant Transaction Data Simulations retrieved successfully",
)
@router.get("/charts", response_model=StandardResponse[PlantChartDataSimulations]) @router.get("/charts", response_model=StandardResponse[PlantChartDataSimulations])
async def get_chart_data( async def get_chart_data(
db_session: DbSession, db_session: DbSession,

@ -3,7 +3,7 @@ from typing import Any, List, Optional
from uuid import UUID from uuid import UUID
from pydantic import Field from pydantic import Field
from src.models import DefaultBase, Pagination from src.models import CommonParams, DefaultBase, Pagination
class PlantTransactionDataSimulationsBase(DefaultBase): class PlantTransactionDataSimulationsBase(DefaultBase):
@ -140,3 +140,11 @@ class PlantTransactionDataSimulationsRead(PlantTransactionDataSimulationsBase):
class PlantTransactionDataSimulationsPagination(Pagination): class PlantTransactionDataSimulationsPagination(Pagination):
items: List[PlantTransactionDataSimulationsRead] = [] items: List[PlantTransactionDataSimulationsRead] = []
class ListQueryParams(CommonParams):
simulation_id: UUID = Field(
...,
description="Simulation identifier",
)

@ -1,4 +1,4 @@
from typing import Optional from typing import Annotated, Optional
from fastapi import APIRouter, HTTPException, Query, status from fastapi import APIRouter, HTTPException, Query, status
@ -13,6 +13,7 @@ from src.simulations.schema import (
SimulationRead, SimulationRead,
SimulationRunPayload, SimulationRunPayload,
SimulationUpdate, SimulationUpdate,
ListQueryParams,
) )
from src.simulations.service import create, delete, get, get_all, run_simulation, update from src.simulations.service import create, delete, get, get_all, run_simulation, update
@ -24,18 +25,33 @@ async def get_simulations(
db_session: DbSession, db_session: DbSession,
common: CommonParameters, common: CommonParameters,
current_user: CurrentUser, current_user: CurrentUser,
items_per_page: Optional[int] = Query(5), params: Annotated[ListQueryParams, Query()],
search: Optional[str] = Query(None),
): ):
simulations = await get_all( simulations = await get_all(
db_session=db_session, db_session=db_session,
items_per_page=items_per_page, items_per_page=params.items_per_page,
search=search, search=params.search,
common=common, common=common,
owner=current_user.name, owner=current_user.name,
) )
return StandardResponse(data=simulations, message="Data retrieved successfully") return StandardResponse(data=simulations, message="Data retrieved successfully")
@router.get("/export-all", response_model=StandardResponse[SimulationPagination])
async def get_simulations_export_all(
db_session: DbSession,
common: CommonParameters,
current_user: CurrentUser,
):
"""Get all simulations for export."""
common["all"] = True
simulations = await get_all(
db_session=db_session,
items_per_page=-1,
common=common,
owner=current_user.name,
)
return StandardResponse(data=simulations, message="All Simulations Data retrieved successfully")
@router.get("/{simulation_id}", response_model=StandardResponse[SimulationRead]) @router.get("/{simulation_id}", response_model=StandardResponse[SimulationRead])
async def get_simulation( async def get_simulation(

@ -4,7 +4,7 @@ from uuid import UUID
from pydantic import Field from pydantic import Field
from src.models import DefaultBase, Pagination from src.models import CommonParams, DefaultBase, Pagination
from src.masterdata_simulations.schema import MasterDataSimulationRead from src.masterdata_simulations.schema import MasterDataSimulationRead
from src.plant_transaction_data_simulations.schema import ( from src.plant_transaction_data_simulations.schema import (
PlantTransactionDataSimulationsRead, PlantTransactionDataSimulationsRead,
@ -51,3 +51,11 @@ class MasterDataOverride(DefaultBase):
class SimulationRunPayload(DefaultBase): class SimulationRunPayload(DefaultBase):
label: Optional[str] = Field(None) label: Optional[str] = Field(None)
overrides: List[MasterDataOverride] = Field(default_factory=list) overrides: List[MasterDataOverride] = Field(default_factory=list)
class ListQueryParams(CommonParams):
search: Optional[str] = Field(
default=None,
description="Search keyword",
)

@ -1,8 +1,8 @@
from typing import Optional from typing import Annotated, Optional
from fastapi import APIRouter, Form, HTTPException, status, Query, UploadFile, File from fastapi import APIRouter, Form, HTTPException, status, Query, UploadFile, File
from .model import UploadedFileData from .model import UploadedFileData
from src.uploaded_file.schema import UploadedFileDataCreate, UploadedFileDataUpdate, UploadedFileDataRead, UploadedFileDataPagination from src.uploaded_file.schema import UploadedFileDataCreate, UploadedFileDataUpdate, UploadedFileDataRead, UploadedFileDataPagination, ListQueryParams
from src.uploaded_file.service import get, get_all, create, update, delete from src.uploaded_file.service import get, get_all, create, update, delete
from src.database.service import CommonParameters, search_filter_sort_paginate from src.database.service import CommonParameters, search_filter_sort_paginate
@ -20,14 +20,13 @@ router = APIRouter()
async def get_uploaded_files( async def get_uploaded_files(
db_session: DbSession, db_session: DbSession,
common: CommonParameters, common: CommonParameters,
items_per_page: Optional[int] = Query(5), params: Annotated[ListQueryParams, Query()],
search: Optional[str] = Query(None),
): ):
"""Get all uploaded files pagination.""" """Get all uploaded files pagination."""
uploaded_files = await get_all( uploaded_files = await get_all(
db_session=db_session, db_session=db_session,
items_per_page=items_per_page, items_per_page=params.items_per_page,
search=search, search=params.search,
common=common, common=common,
) )
# return # return
@ -36,6 +35,23 @@ async def get_uploaded_files(
message="Data retrieved successfully", message="Data retrieved successfully",
) )
@router.get("/export-all", response_model=StandardResponse[UploadedFileDataPagination])
async def get_uploaded_files_export_all(
db_session: DbSession,
common: CommonParameters,
):
"""Get all uploaded files for export."""
common["all"] = True
uploaded_files = await get_all(
db_session=db_session,
items_per_page=-1,
common=common,
)
return StandardResponse(
data=uploaded_files,
message="All Uploaded Files Data retrieved successfully",
)
@router.get("/{uploaded_file_id}", response_model=StandardResponse[UploadedFileDataRead]) @router.get("/{uploaded_file_id}", response_model=StandardResponse[UploadedFileDataRead])
async def get_uploaded_file(db_session: DbSession, uploaded_file_id: str): async def get_uploaded_file(db_session: DbSession, uploaded_file_id: str):

@ -3,7 +3,7 @@ from typing import List, Optional
from uuid import UUID from uuid import UUID
from pydantic import Field from pydantic import Field
from src.models import DefaultBase, Pagination from src.models import CommonParams, DefaultBase, Pagination
class UploadedFileDataBase(DefaultBase): class UploadedFileDataBase(DefaultBase):
filename: str = Field(...) filename: str = Field(...)
@ -28,3 +28,7 @@ class UploadedFileDataRead(UploadedFileDataBase):
class UploadedFileDataPagination(Pagination): class UploadedFileDataPagination(Pagination):
items: List[UploadedFileDataRead] = [] items: List[UploadedFileDataRead] = []
class ListQueryParams(CommonParams):
pass

@ -1,8 +1,8 @@
from typing import Optional from typing import Annotated, Optional
from fastapi import APIRouter, HTTPException, status, Query from fastapi import APIRouter, HTTPException, status, Query
from .model import Yeardata from .model import Yeardata
from .schema import YeardataPagination, YeardataRead, YeardataCreate, YeardataUpdate from .schema import YeardataPagination, YeardataRead, YeardataCreate, YeardataUpdate, ListQueryParams
from .service import get, get_all, create, update, delete from .service import get, get_all, create, update, delete
from src.database.service import CommonParameters, search_filter_sort_paginate from src.database.service import CommonParameters, search_filter_sort_paginate
@ -17,14 +17,13 @@ router = APIRouter()
async def get_yeardatas( async def get_yeardatas(
db_session: DbSession, db_session: DbSession,
common: CommonParameters, common: CommonParameters,
items_per_page: Optional[int] = Query(5), params: Annotated[ListQueryParams, Query()],
search: Optional[str] = Query(None),
): ):
"""Get all yeardata pagination.""" """Get all yeardata pagination."""
year_data = await get_all( year_data = await get_all(
db_session=db_session, db_session=db_session,
items_per_page=items_per_page, items_per_page=params.items_per_page,
search=search, search=params.search,
common=common, common=common,
) )
# return # return
@ -33,6 +32,23 @@ async def get_yeardatas(
message="Data retrieved successfully", message="Data retrieved successfully",
) )
@router.get("/export-all", response_model=StandardResponse[YeardataPagination])
async def get_yeardatas_export_all(
db_session: DbSession,
common: CommonParameters,
):
"""Get all yeardata for export."""
common["all"] = True
year_data = await get_all(
db_session=db_session,
items_per_page=-1,
common=common,
)
return StandardResponse(
data=year_data,
message="All Year Data retrieved successfully",
)
@router.get("/{yeardata_id}", response_model=StandardResponse[YeardataRead]) @router.get("/{yeardata_id}", response_model=StandardResponse[YeardataRead])
async def get_yeardata(db_session: DbSession, yeardata_id: str): async def get_yeardata(db_session: DbSession, yeardata_id: str):

@ -3,7 +3,7 @@ from typing import List, Optional
from uuid import UUID from uuid import UUID
from pydantic import Field, field_validator from pydantic import Field, field_validator
from src.models import DefaultBase, Pagination from src.models import CommonParams, DefaultBase, Pagination
class YeardataBase(DefaultBase): class YeardataBase(DefaultBase):
@ -61,3 +61,8 @@ class YeardataRead(YeardataBase):
class YeardataPagination(Pagination): class YeardataPagination(Pagination):
items: List[YeardataRead] = [] items: List[YeardataRead] = []
class ListQueryParams(CommonParams):
pass

Loading…
Cancel
Save