You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
be-optimumoh/src/auth/service.py

117 lines
3.3 KiB
Python

# app/auth/auth_bearer.py
import json
from typing import Annotated, Optional
import requests
from fastapi import Depends, HTTPException, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.sql.expression import false
import src.config as config
from .model import UserBase
from .util import extract_template
class JWTBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request):
credentials: HTTPAuthorizationCredentials = await super(
JWTBearer, self
).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(
status_code=403, detail="Invalid authentication scheme."
)
method = request.method
if method == "OPTIONS":
return
path = extract_template(request.url.path, request.path_params)
endpoint = f"/optimumoh{path}"
user_info, message = self.verify_jwt(credentials.credentials, method, endpoint)
if not user_info:
message = message.get("message", "Invalid token or expired token.")
raise HTTPException(
status_code=403, detail=message
)
request.state.user = message
return message
else:
raise HTTPException(status_code=403, detail="Invalid authorization code.")
def verify_jwt(self, jwtoken: str, method: str, endpoint: str):
try:
response = requests.get(
f"{config.AUTH_SERVICE_API}/verify-token",
headers={"Authorization": f"Bearer {jwtoken}"},
)
if not response.ok:
return False, response.json()
user_data = response.json()
return True, UserBase(**user_data["data"])
except Exception as e:
print(f"Token verification error: {str(e)}")
return False, str(e)
# Create dependency to get current user from request state
async def get_current_user(request: Request) -> UserBase:
return request.state.user
async def get_token(request: Request):
token = request.headers.get("Authorization")
if token:
return token.split(" ")[1]
return ""
async def internal_key(request: Request):
api_key = request.headers.get("X-Internal-Key")
if api_key != config.API_KEY:
raise HTTPException(
status_code=403, detail="Invalid Key."
)
try:
payload = json.dumps({
"username": "user10",
"password": "123456"
})
response = requests.post(
f"{config.AUTH_SERVICE_API}/sign-in",
data=payload
)
if not response.ok:
print(str(response.json()))
raise Exception("error auth")
user_data = response.json()
return user_data['data']['access_token']
except Exception as e:
raise Exception(str(e))
CurrentUser = Annotated[UserBase, Depends(get_current_user)]
Token = Annotated[str, Depends(get_token)]
InternalKey = Annotated[str, Depends(internal_key)]