feat: Add JSON logging, centralize request ID context management, and introduce schema validation for year and sequence fields.

rest-api
MrWaradana 1 month ago
parent 7bcc102f1b
commit 67342b463c

@ -0,0 +1,18 @@
from contextvars import ContextVar
from typing import Optional, Final
REQUEST_ID_CTX_KEY: Final[str] = "request_id"
_request_id_ctx_var: ContextVar[Optional[str]] = ContextVar(
REQUEST_ID_CTX_KEY, default=None)
def get_request_id() -> Optional[str]:
return _request_id_ctx_var.get()
def set_request_id(request_id: str):
return _request_id_ctx_var.set(request_id)
def reset_request_id(token):
_request_id_ctx_var.reset(token)

@ -99,7 +99,7 @@ class EquipmentUpdate(EquipmentBase):
class ReplacementBase(DefaultBase):
"""Schema for replacement history (from lcc_ms_equipment_historical_data)."""
acquisition_year: Optional[int] = Field(None, nullable=True)
acquisition_year: Optional[int] = Field(None, nullable=True, ge=1900, le=9999)
acquisition_cost: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)
acquisition_year_ref: Optional[str] = Field(None, nullable=True)
created_at: Optional[datetime] = Field(None, nullable=True)
@ -111,7 +111,7 @@ class EquipmentRead(DefaultBase):
min_eac_value: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)
min_seq: Optional[float] = Field(None, nullable=True)
min_eac_year: Optional[float] = Field(None, nullable=True)
last_actual_year: Optional[int] = Field(None, nullable=True)
last_actual_year: Optional[int] = Field(None, nullable=True, ge=1900, le=9999)
maximo_data: Optional[List[dict]] = Field(None, nullable=True)
joined_maximo: Optional[List[dict]] = Field(None, nullable=True)
min_eac_disposal_cost: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)
@ -121,7 +121,7 @@ class EquipmentRead(DefaultBase):
class EquipmentTop10(EquipmentBase):
id: UUID
equipment_master: EquipmentMasterBase
forecasting_target_year: Optional[int] = Field(None, nullable=True)
forecasting_target_year: Optional[int] = Field(None, nullable=True, ge=1900, le=9999)
minimum_eac_seq: Optional[int] = Field(None, nullable=True)
minimum_eac_year: Optional[int] = Field(None, nullable=True)
minimum_eac: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)
@ -138,9 +138,9 @@ class EquipmentTop10Pagination(Pagination):
class EquipmentDataMaster(EquipmentBase):
id: UUID
equipment_master: Optional[EquipmentMasterBase] = Field(None, nullable=True)
forecasting_target_year: Optional[int] = Field(None, nullable=True)
minimum_eac_seq: Optional[int] = Field(None, nullable=True)
minimum_eac_year: Optional[int] = Field(None, nullable=True)
forecasting_target_year: Optional[int] = Field(None, nullable=True, ge=1900, le=9999)
minimum_eac_seq: Optional[int] = Field(None, nullable=True, ge=0)
minimum_eac_year: Optional[int] = Field(None, nullable=True, ge=1900, le=9999)
minimum_eac: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)
minimum_npv: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)
minimum_pmt: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)

@ -1,4 +1,9 @@
import logging
import json
import datetime
import os
import sys
from typing import Optional
from src.config import LOG_LEVEL
from src.enums import OptimumOHEnum
@ -14,20 +19,91 @@ class LogLevels(OptimumOHEnum):
debug = "DEBUG"
class JSONFormatter(logging.Formatter):
"""
Custom formatter to output logs in JSON format.
"""
def format(self, record):
from src.context import get_request_id
request_id = None
try:
request_id = get_request_id()
except Exception:
pass
log_record = {
"timestamp": datetime.datetime.fromtimestamp(record.created).astimezone().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"logger": record.name,
"module": record.module,
"funcName": record.funcName,
"lineno": record.lineno,
"pid": os.getpid(),
"request_id": request_id,
}
# Capture exception info if available
if record.exc_info:
log_record["exception"] = self.formatException(record.exc_info)
# Capture stack info if available
if record.stack_info:
log_record["stack_trace"] = self.formatStack(record.stack_info)
# Add any extra attributes passed to the log call
# We skip standard attributes to avoid duplication
standard_attrs = {
"args", "asctime", "created", "exc_info", "exc_text", "filename",
"funcName", "levelname", "levelno", "lineno", "module", "msecs",
"message", "msg", "name", "pathname", "process", "processName",
"relativeCreated", "stack_info", "thread", "threadName"
}
for key, value in record.__dict__.items():
if key not in standard_attrs:
log_record[key] = value
return json.dumps(log_record)
def configure_logging():
log_level = str(LOG_LEVEL).upper() # cast to string
log_levels = list(LogLevels)
if log_level not in log_levels:
# we use error as the default log level
logging.basicConfig(level=LogLevels.error)
return
log_level = LogLevels.error
# Get the root logger
root_logger = logging.getLogger()
root_logger.setLevel(log_level)
# Clear existing handlers to avoid duplicate logs
if root_logger.hasHandlers():
root_logger.handlers.clear()
if log_level == LogLevels.debug:
logging.basicConfig(level=log_level, format=LOG_FORMAT_DEBUG)
return
# Create a stream handler that outputs to stdout
handler = logging.StreamHandler(sys.stdout)
logging.basicConfig(level=log_level)
# Use JSONFormatter for all environments, or could be conditional
# For now, let's assume the user wants JSON everywhere as requested
formatter = JSONFormatter()
# If debug mode is specifically requested and we want the old format for debug:
# if log_level == LogLevels.debug:
# formatter = logging.Formatter(LOG_FORMAT_DEBUG)
handler.setFormatter(formatter)
root_logger.addHandler(handler)
# Reconfigure uvicorn loggers to use our JSON formatter
for logger_name in ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"]:
logger = logging.getLogger(logger_name)
logger.handlers = []
logger.propagate = True
# sometimes the slack client can be too verbose
logging.getLogger("slack_sdk.web.base_client").setLevel(logging.CRITICAL)

@ -4,7 +4,7 @@ import logging
from os import path
from uuid import uuid1
from typing import Optional, Final
from contextvars import ContextVar
from fastapi import FastAPI, HTTPException, status
from fastapi.responses import JSONResponse
@ -50,13 +50,7 @@ app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(GZipMiddleware, minimum_size=2000)
REQUEST_ID_CTX_KEY: Final[str] = "request_id"
_request_id_ctx_var: ContextVar[Optional[str]] = ContextVar(
REQUEST_ID_CTX_KEY, default=None)
def get_request_id() -> Optional[str]:
return _request_id_ctx_var.get()
from src.context import set_request_id, reset_request_id, get_request_id
app.add_middleware(RequestValidationMiddleware)
@ -67,7 +61,8 @@ async def db_session_middleware(request: Request, call_next):
# we create a per-request id such that we can ensure that our session is scoped for a particular request.
# see: https://github.com/tiangolo/fastapi/issues/726
ctx_token = _request_id_ctx_var.set(request_id)
ctx_token = set_request_id(request_id)
try:
session = async_scoped_session(async_session, scopefunc=get_request_id)
@ -81,10 +76,11 @@ async def db_session_middleware(request: Request, call_next):
await request.state.db.close()
await request.state.collector_db.close()
_request_id_ctx_var.reset(ctx_token)
reset_request_id(ctx_token)
return response
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response = await call_next(request)

@ -62,7 +62,7 @@ async def _apply_masterdata_update_logic(
- Mengatur discount_rate_changed = True jika rec_name == "discount_rate"
"""
nonlocal run_plant_calculation
rec_name = getattr(record, "name", None)
rec_name = record.name if record and hasattr(record, 'name') else None
if rec_name in [
"umur_teknis",
"discount_rate",

@ -19,15 +19,9 @@ MAX_QUERY_LENGTH = 2000
MAX_JSON_BODY_SIZE = 1024 * 100 # 100 KB
# Very targeted patterns. Avoid catastrophic regex nonsense.
XSS_PATTERN = re.compile(
r"(<script|</script|javascript:|onerror\s*=|onload\s*=|<svg|<img)",
re.IGNORECASE,
)
SQLI_PATTERN = re.compile(
r"(\bUNION\b|\bSELECT\b|\bINSERT\b|\bDELETE\b|\bDROP\b|--|\bOR\b\s+1=1)",
re.IGNORECASE,
)
XSS_PATTERN_STR = r"(<script|</script|javascript:|onerror\s*=|onload\s*=|<svg|<img)"
SQLI_PATTERN_STR = r"(\bUNION\b|\bSELECT\b|\bINSERT\b|\bDELETE\b|\bDROP\b|--|\bOR\b\s+1=1)"
# JSON prototype pollution keys
FORBIDDEN_JSON_KEYS = {"__proto__", "constructor", "prototype"}

Loading…
Cancel
Save