You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

81 lines
2.7 KiB
Python

# app/auth/auth_bearer.py
from typing import Annotated, Optional
from fastapi import Depends, Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import requests
import src.config as config
from .model import UserBase
class JWTBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request):
credentials: HTTPAuthorizationCredentials = await super(
JWTBearer, self
).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(
status_code=403, detail="Invalid authentication scheme."
)
user_info = self.verify_jwt(credentials.credentials)
if not user_info:
raise HTTPException(
status_code=403, detail="Invalid token or expired token."
)
request.state.user = user_info
from src.context import set_user_id, set_username, set_role
if hasattr(user_info, "user_id"):
set_user_id(str(user_info.user_id))
if hasattr(user_info, "username"):
set_username(user_info.username)
elif hasattr(user_info, "name"):
set_username(user_info.name)
if hasattr(user_info, "role"):
set_role(user_info.role)
return user_info
else:
raise HTTPException(status_code=403, detail="Invalid authorization code.")
def verify_jwt(self, jwtoken: str) -> Optional[UserBase]:
try:
response = requests.get(
f"{config.AUTH_SERVICE_API}/verify-token",
headers={"Authorization": f"Bearer {jwtoken}"},
)
if not response.ok:
return None
user_data = response.json()
return UserBase(**user_data["data"])
except Exception as e:
logging.error(f"Token verification error: {str(e)}")
return None
# Create dependency to get current user from request state
async def get_current_user(request: Request) -> UserBase:
return request.state.user
async def get_token(request: Request):
token = request.headers.get("Authorization")
if token:
return token.replace("Bearer ", "") # Menghapus prefix "Bearer "
else:
return request.cookies.get("access_token") # Fallback ke cookie
return "" # Mengembalikan token atau None jika tidak ada
CurrentUser = Annotated[UserBase, Depends(get_current_user)]
Token = Annotated[str, Depends(get_token)]