add verify token new

feature/reliability_stat
Cizz22 8 months ago
parent 21553992d4
commit ed8714f69a

@ -9,7 +9,7 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
import src.config as config import src.config as config
from .model import UserBase from .model import UserBase
from .util import extract_template
class JWTBearer(HTTPBearer): class JWTBearer(HTTPBearer):
def __init__(self, auto_error: bool = True): def __init__(self, auto_error: bool = True):
@ -24,7 +24,16 @@ class JWTBearer(HTTPBearer):
raise HTTPException( raise HTTPException(
status_code=403, detail="Invalid authentication scheme." status_code=403, detail="Invalid authentication scheme."
) )
user_info = self.verify_jwt(credentials.credentials) method = request.method
if method == "OPTIONS":
return
path = extract_template(request.url.path, request.path_params)
endpoint = f"/optimumoh/{path}"
user_info = self.verify_jwt(credentials.credentials, method, endpoint)
if not user_info: if not user_info:
raise HTTPException( raise HTTPException(
status_code=403, detail="Invalid token or expired token." status_code=403, detail="Invalid token or expired token."
@ -35,11 +44,11 @@ class JWTBearer(HTTPBearer):
else: else:
raise HTTPException(status_code=403, detail="Invalid authorization code.") raise HTTPException(status_code=403, detail="Invalid authorization code.")
def verify_jwt(self, jwtoken: str) -> Optional[UserBase]: def verify_jwt(self, jwtoken: str, method: str, endpoint: str) -> Optional[UserBase]:
try: try:
response = requests.get( response = requests.get(
f"{config.AUTH_SERVICE_API}/verify-token?url=http://localhost:8000", f"{config.AUTH_SERVICE_API}/verify-token",
headers={"Authorization": f"Bearer {jwtoken}"}, headers={"Authorization": f"Bearer {jwtoken}"},
) )

@ -0,0 +1,9 @@
def extract_template(path_string, value_dict):
template = path_string
# Replace each value in the dict with its corresponding key placeholder
for key, value in value_dict.items():
if str(value) in template:
template = template.replace(str(value), f'<{key}>')
return template

@ -17,6 +17,8 @@ async def get_activities(common: CommonParameters):
# return # return
data = await get_all(common=common) data = await get_all(common=common)
return StandardResponse( return StandardResponse(
data=data, data=data,
message="Data retrieved successfully", message="Data retrieved successfully",

Loading…
Cancel
Save