@ -161,29 +161,36 @@ def inspect_value(value: str, source: str):
return
if XSS_PATTERN . search ( value ) :
raise HTTPException ( status_code = 422 , detail = f " Potential XSS payload detected in { source } " )
log . warning ( f " Security violation: Potential XSS payload detected in { source } , value: { value } " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
if SQLI_PATTERN . search ( value ) :
raise HTTPException ( status_code = 422 , detail = f " Potential SQL injection payload detected in { source } " )
log . warning ( f " Security violation: Potential SQL injection payload detected in { source } , value: { value } " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
if RCE_PATTERN . search ( value ) :
raise HTTPException ( status_code = 422 , detail = f " Potential RCE payload detected in { source } " )
log . warning ( f " Security violation: Potential RCE payload detected in { source } , value: { value } " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
if TRAVERSAL_PATTERN . search ( value ) :
raise HTTPException ( status_code = 422 , detail = f " Potential Path Traversal payload detected in { source } " )
log . warning ( f " Security violation: Potential Path Traversal payload detected in { source } , value: { value } " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
if has_control_chars ( value ) :
raise HTTPException ( status_code = 422 , detail = f " Invalid control characters detected in { source } " )
log . warning ( f " Security violation: Invalid control characters detected in { source } " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
def inspect_json ( obj , path = " body " , check_whitelist = True ) :
if isinstance ( obj , dict ) :
for key , value in obj . items ( ) :
if key in FORBIDDEN_JSON_KEYS :
raise HTTPException ( status_code = 422 , detail = f " Forbidden JSON key detected: { path } . { key } " )
log . warning ( f " Security violation: Forbidden JSON key detected: { path } . { key } " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
if check_whitelist and key not in ALLOWED_DATA_PARAMS :
raise HTTPException ( status_code = 422 , detail = f " Unknown JSON key detected: { path } . { key } " )
log . warning ( f " Security violation: Unknown JSON key detected: { path } . { key } " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
# Recurse. If the key is a dynamic container, we stop whitelist checking for children.
should_check_subkeys = check_whitelist and ( key not in DYNAMIC_KEYS )
@ -213,14 +220,16 @@ class RequestValidationMiddleware(BaseHTTPMiddleware):
ALLOW_DUPLICATE_HEADERS = { ' accept ' , ' accept-encoding ' , ' accept-language ' , ' accept-charset ' , ' cookie ' }
real_duplicates = [ h for h in duplicate_headers if h not in ALLOW_DUPLICATE_HEADERS ]
if real_duplicates :
raise HTTPException ( status_code = 422 , detail = f " Duplicate headers are not allowed: { real_duplicates } " )
log . warning ( f " Security violation: Duplicate headers detected: { real_duplicates } " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
# Whitelist headers
unknown_headers = [ key for key in header_keys if key not in ALLOWED_HEADERS ]
if unknown_headers :
filtered_unknown = [ h for h in unknown_headers if not h . startswith ( ' sec- ' ) ]
if filtered_unknown :
raise HTTPException ( status_code = 422 , detail = f " Unknown headers detected: { filtered_unknown } " )
log . warning ( f " Security violation: Unknown headers detected: { filtered_unknown } " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
# Inspect header values
for key , value in request . headers . items ( ) :
@ -231,17 +240,20 @@ class RequestValidationMiddleware(BaseHTTPMiddleware):
# 1. Query string limits
# -------------------------
if len ( request . url . query ) > MAX_QUERY_LENGTH :
raise HTTPException ( status_code = 422 , detail = " Query string too long " )
log . warning ( f " Security violation: Query string too long " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
params = request . query_params . multi_items ( )
if len ( params ) > MAX_QUERY_PARAMS :
raise HTTPException ( status_code = 422 , detail = " Too many query parameters " )
log . warning ( f " Security violation: Too many query parameters " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
# Check for unknown query parameters
unknown_params = [ key for key , _ in params if key not in ALLOWED_DATA_PARAMS ]
if unknown_params :
raise HTTPException ( status_code = 422 , detail = f " Unknown query parameters detected: { unknown_params } " )
log . warning ( f " Security violation: Unknown query parameters detected: { unknown_params } " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
# -------------------------
# 2. Duplicate parameters
@ -253,7 +265,8 @@ class RequestValidationMiddleware(BaseHTTPMiddleware):
]
if duplicates :
raise HTTPException ( status_code = 422 , detail = f " Duplicate query parameters are not allowed: { duplicates } " )
log . warning ( f " Security violation: Duplicate query parameters detected: { duplicates } " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
# -------------------------
# 3. Query param inspection & Pagination
@ -267,11 +280,14 @@ class RequestValidationMiddleware(BaseHTTPMiddleware):
try :
size_val = int ( value )
if size_val > 50 :
raise HTTPException ( status_code = 422 , detail = f " Pagination size ' { key } ' cannot exceed 50 " )
log . warning ( f " Security violation: Pagination size too large ( { size_val } ) " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
if size_val % 5 != 0 :
raise HTTPException ( status_code = 422 , detail = f " Pagination size ' { key } ' must be a multiple of 5 " )
log . warning ( f " Security violation: Pagination size not multiple of 5 ( { size_val } ) " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
except ValueError :
raise HTTPException ( status_code = 422 , detail = f " Pagination size ' { key } ' must be an integer " )
log . warning ( f " Security violation: Pagination size invalid value " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
# -------------------------
# 4. Content-Type sanity
@ -281,7 +297,8 @@ class RequestValidationMiddleware(BaseHTTPMiddleware):
content_type . startswith ( t )
for t in ( " application/json " , " multipart/form-data " , " application/x-www-form-urlencoded " )
) :
raise HTTPException ( status_code = 422 , detail = " Unsupported Content-Type " )
log . warning ( f " Security violation: Unsupported Content-Type: { content_type } " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
# -------------------------
# 5. Single source check (Query vs JSON Body)
@ -295,7 +312,8 @@ class RequestValidationMiddleware(BaseHTTPMiddleware):
has_body = True
if has_query and has_body :
raise HTTPException ( status_code = 422 , detail = " Parameters must be from a single source (query string or JSON body), mixed sources are not allowed " )
log . warning ( f " Security violation: Mixed parameters (query + JSON body) " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
# -------------------------
# 6. JSON body inspection
@ -309,7 +327,8 @@ class RequestValidationMiddleware(BaseHTTPMiddleware):
try :
payload = json . loads ( body )
except json . JSONDecodeError :
raise HTTPException ( status_code = 422 , detail = " Invalid JSON body " )
log . warning ( f " Security violation: Invalid JSON body " )
raise HTTPException ( status_code = 422 , detail = " Invalid request parameters " )
inspect_json ( payload )