@ -18,14 +18,33 @@ MAX_QUERY_PARAMS = 50
MAX_QUERY_LENGTH = 2000
MAX_JSON_BODY_SIZE = 1024 * 100 # 100 KB
# Very targeted patterns. Avoid catastrophic regex nonsense.
XSS_PATTERN = re . compile (
r " (<script|</script|javascript:|onerror \ s*=|onload \ s*=|<svg|<img) " ,
r " (<script|<iframe|<embed|<object|<svg|<img|<video|<audio|<base|<link|<meta|<form|<button| "
r " javascript:|vbscript:|data:text/html|onerror \ s*=|onload \ s*=|onmouseover \ s*=|onfocus \ s*=| "
r " onclick \ s*=|onscroll \ s*=|ondblclick \ s*=|onkeydown \ s*=|onkeypress \ s*=|onkeyup \ s*=| "
r " onloadstart \ s*=|onpageshow \ s*=|onresize \ s*=|onunload \ s*=|style \ s*= \ s*[ ' \" ].expression \ s \ (| "
r " eval \ s* \ (|setTimeout \ s* \ (|setInterval \ s* \ (|Function \ s* \ () " ,
re . IGNORECASE ,
)
SQLI_PATTERN = re . compile (
r " ( \ bUNION \ b| \ bSELECT \ b| \ bINSERT \ b| \ bDELETE \ b| \ bDROP \ b|--| \ bOR \ b \ s+1=1) " ,
r " ( \ bUNION \ b| \ bSELECT \ b| \ bINSERT \ b| \ bUPDATE \ b| \ bDELETE \ b| \ bDROP \ b| \ bALTER \ b| \ bCREATE \ b| \ bTRUNCATE \ b| "
r " \ bEXEC \ b| \ bEXECUTE \ b| \ bDECLARE \ b| \ bWAITFOR \ b| \ bDELAY \ b| \ bGROUP \ b \ s+ \ bBY \ b| \ bHAVING \ b| \ bORDER \ b \ s+ \ bBY \ b| "
r " \ bINFORMATION_SCHEMA \ b| \ bSYS \ b \ .| \ bSYSOBJECTS \ b| \ bPG_SLEEP \ b| \ bSLEEP \ b \ (|--|/ \ | \ /|#| \ bOR \ b \ s+[ ' \" ]? \ d+[ ' \" ]? \ s*= \ s*[ ' \" ]? \ d+| "
r " \ bAND \ b \ s+[ ' \" ]? \ d+[ ' \" ]? \ s*= \ s*[ ' \" ]? \ d+| "
r " \ bXP_CMDSHELL \ b| \ bLOAD_FILE \ b| \ bINTO \ s+OUTFILE \ b) " ,
re . IGNORECASE ,
)
RCE_PATTERN = re . compile (
r " ( \ $ \ (|.*|[;&|] \ s*(cat|ls|id|whoami|pwd|ifconfig|ip|netstat|nc|netcat|nmap|curl|wget|python|php|perl|ruby|bash|sh|cmd|powershell|pwsh|sc \ s+|tasklist|taskkill|base64|sudo|crontab|ssh|ftp|tftp)| "
r " \ b(cat|ls|id|whoami|pwd|ifconfig|ip|netstat|nc|netcat|nmap|curl|wget|python|php|perl|ruby|bash|sh|cmd|powershell|pwsh|base64|sudo|crontab) \ b| "
r " /etc/passwd|/etc/shadow|/etc/group|/etc/issue|/proc/self/|/windows/system32/|C: \\ Windows \\ ) " ,
re . IGNORECASE ,
)
TRAVERSAL_PATTERN = re . compile (
r " ( \ . \ ./| \ . \ . \\ | %2e %2e %2f | %2e %2e /| \ . \ . %2f | %2e %2e %5c ) " ,
re . IGNORECASE ,
)
@ -52,6 +71,18 @@ def inspect_value(value: str, source: str):
status_code = 400 ,
detail = f " Potential SQL injection payload detected in { source } " ,
)
if RCE_PATTERN . search ( value ) :
raise HTTPException (
status_code = 400 ,
detail = f " Potential RCE payload detected in { source } " ,
)
if TRAVERSAL_PATTERN . search ( value ) :
raise HTTPException (
status_code = 400 ,
detail = f " Potential traversal payload detected in { source } " ,
)
if has_control_chars ( value ) :
raise HTTPException (
@ -117,10 +148,31 @@ class RequestValidationMiddleware(BaseHTTPMiddleware):
# -------------------------
# 3. Query param inspection
# -------------------------
pagination_size_keys = { " size " , " itemsPerPage " , " per_page " , " limit " }
for key , value in params :
if value :
inspect_value ( value , f " query param ' { key } ' " )
# Pagination constraint: multiples of 5, max 50
if key in pagination_size_keys and value :
try :
size_val = int ( value )
if size_val > 50 :
raise HTTPException (
status_code = 400 ,
detail = f " Pagination size ' { key } ' cannot exceed 50 " ,
)
if size_val % 5 != 0 :
raise HTTPException (
status_code = 400 ,
detail = f " Pagination size ' { key } ' must be a multiple of 5 " ,
)
except ValueError :
raise HTTPException (
status_code = 400 ,
detail = f " Pagination size ' { key } ' must be an integer " ,
)
# -------------------------
# 4. Content-Type sanity
# -------------------------