# Define base error model import logging from typing import Any, Dict, List, Optional from asyncpg.exceptions import DataError as AsyncPGDataError from asyncpg.exceptions import PostgresError from fastapi import FastAPI, HTTPException, Request from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse from pydantic import BaseModel from slowapi import _rate_limit_exceeded_handler from slowapi.errors import RateLimitExceeded from sqlalchemy.exc import (DataError, DBAPIError, IntegrityError, SQLAlchemyError) from src.enums import ResponseStatus from starlette.exceptions import HTTPException as StarletteHTTPException log = logging.getLogger(__name__) class ErrorDetail(BaseModel): field: Optional[str] = None message: str code: Optional[str] = None params: Optional[Dict[str, Any]] = None class ErrorResponse(BaseModel): data: Optional[Any] = None message: str status: ResponseStatus = ResponseStatus.ERROR errors: Optional[List[ErrorDetail]] = None # Custom exception handler setup def get_request_context(request: Request): """ Get detailed request context for logging. """ def get_client_ip(): """ Get the real client IP address from Kong Gateway headers. Kong sets X-Real-IP and X-Forwarded-For headers by default. """ # Kong specific headers if "X-Real-IP" in request.headers: return request.headers["X-Real-IP"] # Fallback to X-Forwarded-For if "X-Forwarded-For" in request.headers: # Get the first IP (original client) return request.headers["X-Forwarded-For"].split(",")[0].strip() # Last resort return request.client.host return { "endpoint": request.url.path, "url": str(request.url), "method": request.method, "remote_addr": get_client_ip(), } def handle_sqlalchemy_error(error: SQLAlchemyError): """ Handle SQLAlchemy errors and return user-friendly error messages. """ try: original_error = error.orig except AttributeError: original_error = None if isinstance(error, IntegrityError): if "unique constraint" in str(error).lower(): return "This record already exists.", 409 elif "foreign key constraint" in str(error).lower(): return "Related record not found.", 400 else: return "Data integrity error.", 400 elif isinstance(error, DataError) or isinstance(original_error, AsyncPGDataError): return "Invalid data provided.", 400 elif isinstance(error, DBAPIError): if "unique constraint" in str(error).lower(): return "This record already exists.", 409 elif "foreign key constraint" in str(error).lower(): return "Related record not found.", 400 elif "null value in column" in str(error).lower(): return "Required data missing.", 400 elif "invalid input for query argument" in str(error).lower(): return "Invalid data provided.", 400 else: return "Database error.", 500 else: # Log the full error for debugging purposes log.error(f"Unexpected database error: {str(error)}") return "An unexpected database error occurred.", 500 def handle_exception(request: Request, exc: Exception): """ Global exception handler for Fastapi application. """ import uuid error_id = str(uuid.uuid1()) request_info = get_request_context(request) # Store error_id in request.state for middleware/logging request.state.error_id = error_id if isinstance(exc, RateLimitExceeded): log.warning( f"Rate limit exceeded | Error ID: {error_id}", extra={ "error_id": error_id, "error_category": "rate_limit", "request": request_info, "detail": str(exc.description) if hasattr(exc, "description") else str(exc), }, ) return JSONResponse( status_code=429, content={ "data": None, "message": "Rate limit exceeded", "status": ResponseStatus.ERROR, "error_id": error_id } ) if isinstance(exc, RequestValidationError): log.warning( f"Validation error occurred | Error ID: {error_id}", extra={ "error_id": error_id, "error_category": "validation", "errors": exc.errors(), "request": request_info, }, ) return JSONResponse( status_code=422, content={ "data": exc.errors(), "message": "Validation Error", "status": ResponseStatus.ERROR, "error_id": error_id }, ) if isinstance(exc, (HTTPException, StarletteHTTPException)): log.error( f"HTTP exception occurred | Error ID: {error_id}", extra={ "error_id": error_id, "error_category": "http", "status_code": exc.status_code, "detail": exc.detail if hasattr(exc, "detail") else str(exc), "request": request_info, }, ) return JSONResponse( status_code=exc.status_code, content={ "data": None, "message": str(exc.detail) if hasattr(exc, "detail") else str(exc), "status": ResponseStatus.ERROR, "error_id": error_id }, ) if isinstance(exc, SQLAlchemyError): error_message, status_code = handle_sqlalchemy_error(exc) log.error( f"Database error occurred | Error ID: {error_id}", extra={ "error_id": error_id, "error_category": "database", "error_message": error_message, "request": request_info, "exception": str(exc), }, ) return JSONResponse( status_code=status_code, content={ "data": None, "message": error_message, "status": ResponseStatus.ERROR, "error_id": error_id }, ) # Log unexpected errors log.error( f"Unexpected error occurred | Error ID: {error_id}", extra={ "error_id": error_id, "error_category": "unexpected", "error_message": str(exc), "request": request_info, }, exc_info=True, ) return JSONResponse( status_code=500, content={ "data": None, "message": "An unexpected error occurred", "status": ResponseStatus.ERROR, "error_id": error_id }, )