Update: Change secret mount point from .env to vault

main
alka 2 weeks ago
parent fff11eb527
commit 19ec01cc6d

@ -1,7 +1,8 @@
import anyio
from licaeros import LicensedSession, device_fingerprint_hex
from src.config import AEROS_BASE_URL, AEROS_LICENSE_ID, AEROS_LICENSE_SECRET, WINDOWS_AEROS_BASE_URL
from src.config import AEROS_BASE_URL, WINDOWS_AEROS_BASE_URL, VAULT_URL, ROLE_ID, SECRET_ID, AEROS_SECRET_PATH
import logging
from src.utils import get_vault_secrets
log = logging.getLogger(__name__)
@ -9,6 +10,9 @@ log = logging.getLogger(__name__)
_aeros_session = None
def get_aeros_session(base_url):
AEROS_LICENSE_ID, AEROS_LICENSE_SECRET = get_vault_secrets(vault_url=VAULT_URL,role_id=ROLE_ID,secret_id=SECRET_ID,secret_path=AEROS_SECRET_PATH,secret_keys_to_be_returned=['aeros_license_id', 'aeros_license_secret'])
global _aeros_session
if _aeros_session is None:
log.info(f"Initializing LicensedSession with base URL: {base_url}")

@ -98,5 +98,10 @@ RELIABILITY_SERVICE_API = config("RELIABILITY_SERVICE_API", default="http://192.
CLAMAV_HOST = config("CLAMAV_HOST", default="192.168.1.82")
CLAMAV_PORT = config("CLAMAV_PORT", cast=int, default=3310)
AEROS_LICENSE_ID = config("AEROS_LICENSE_ID", default="")
AEROS_LICENSE_SECRET = config("AEROS_LICENSE_SECRET", default="")
# AEROS_LICENSE_ID = config("AEROS_LICENSE_ID", default="")
# AEROS_LICENSE_SECRET = config("AEROS_LICENSE_SECRET", default="")
VAULT_URL=config('VAULT_URL')
ROLE_ID=config('ROLE_ID')
SECRET_ID=config('SECRET_ID')
AEROS_SECRET_PATH=config('AEROS_SECRET_PATH')

@ -7,7 +7,8 @@ import pytz
from dateutil.relativedelta import relativedelta
from src.config import RELIABILITY_SERVICE_API, TIMEZONE
import hvac
from typing import Optional, Dict, List
def parse_relative_expression(date_str: str) -> Optional[datetime]:
"""
@ -185,3 +186,47 @@ def sanitize_filename(filename: str) -> str:
filename = filename[:200]
return filename.strip()
def get_vault_secrets(
vault_url: str,
role_id: str,
secret_id: str,
secret_path: str,
secret_keys_to_be_returned: List[str],
mount_point: str = "secret"
) -> Optional[Dict[str, str]]:
try:
client = hvac.Client(url=vault_url)
# Login using AppRole
client.auth.approle.login(
role_id=role_id,
secret_id=secret_id
)
if not client.is_authenticated():
raise Exception("Vault authentication failed")
# Read secret
response = client.secrets.kv.v2.read_secret_version(
path=secret_path,
mount_point=mount_point
)
secret_data = response["data"]["data"]
# Filter only requested keys
result = {}
for key in secret_keys_to_be_returned:
if key not in secret_data:
raise KeyError(f"Key '{key}' not found in secret")
result[key] = secret_data[key]
return result
except Exception as e:
print(f"Error retrieving secret from Vault: {str(e)}")
return None
Loading…
Cancel
Save