Refactor attribute access for clarity and robustness, and enhance subprocess command execution by using a list for arguments.

rest-api
MrWaradana 1 month ago
parent f381a46186
commit 7bcc102f1b

@ -250,19 +250,19 @@ async def get_master_by_assetnum(
y = yeardata_dict.get(year) y = yeardata_dict.get(year)
asset_crit_ens_energy_not_served = ( asset_crit_ens_energy_not_served = (
getattr(y, "asset_crit_ens_energy_not_served", 0) if y is not None else 0 y.asset_crit_ens_energy_not_served if y is not None else 0
) )
asset_crit_bpp_system = ( asset_crit_bpp_system = (
getattr(y, "asset_crit_bpp_system", 0) if y is not None else 0 y.asset_crit_bpp_system if y is not None else 0
) )
asset_crit_bpp_pembangkit = ( asset_crit_bpp_pembangkit = (
getattr(y, "asset_crit_bpp_pembangkit", 0) if y is not None else 0 y.asset_crit_bpp_pembangkit if y is not None else 0
) )
asset_crit_marginal_cost = ( asset_crit_marginal_cost = (
getattr(y, "asset_crit_marginal_cost", 0) if y is not None else 0 y.asset_crit_marginal_cost if y is not None else 0
) )
asset_crit_dmn_daya_mampu_netto = ( asset_crit_dmn_daya_mampu_netto = (
getattr(y, "asset_crit_dmn_daya_mampu_netto", 0) if y is not None else 0 y.asset_crit_dmn_daya_mampu_netto if y is not None else 0
) )
# Convert to floats and compute criticality safely # Convert to floats and compute criticality safely

@ -64,8 +64,7 @@ def handle_sqlalchemy_error(error: SQLAlchemyError):
""" """
Handle SQLAlchemy errors and return user-friendly error messages. Handle SQLAlchemy errors and return user-friendly error messages.
""" """
original_error = getattr(error, 'orig', None) original_error = error.orig if hasattr(error, 'orig') else None
print(original_error)
if isinstance(error, IntegrityError): if isinstance(error, IntegrityError):
if "unique constraint" in str(error).lower(): if "unique constraint" in str(error).lower():
@ -144,7 +143,7 @@ def handle_exception(request: Request, exc: Exception):
# Log unexpected errors # Log unexpected errors
error_message = f"{exc.__class__.__name__}: {str(exc)}" error_message = f"{exc.__class__.__name__}: {str(exc)}"
error_traceback = getattr(exc, '__traceback__', None) error_traceback = exc.__traceback__
# Get file and line info if available # Get file and line info if available
if error_traceback: if error_traceback:

@ -599,7 +599,7 @@ class Prediksi:
# Try to sign in again # Try to sign in again
try: try:
signin_res = await self.sign_in() signin_res = await self.sign_in()
if getattr(self, "access_token", None): if self.access_token:
return signin_res return signin_res
except Exception as signin_exc: except Exception as signin_exc:
print(f"Sign-in failed after sign-out: {signin_exc}") print(f"Sign-in failed after sign-out: {signin_exc}")
@ -610,7 +610,7 @@ class Prediksi:
On success updates self.access_token and returns it. Returns None on failure. On success updates self.access_token and returns it. Returns None on failure.
""" """
if not getattr(self, "refresh_token", None): if not self.refresh_token:
print("No refresh token available to refresh access token.") print("No refresh token available to refresh access token.")
return None return None
try: try:
@ -641,7 +641,7 @@ class Prediksi:
endpoint = f"{url}/main/number-of-failures/{assetnum}/{int(year)}/{int(year)}" endpoint = f"{url}/main/number-of-failures/{assetnum}/{int(year)}/{int(year)}"
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
try: try:
current_token = getattr(self, "access_token", None) current_token = self.access_token
response = await client.get( response = await client.get(
endpoint, endpoint,
timeout=30.0, timeout=30.0,
@ -650,7 +650,7 @@ class Prediksi:
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
except httpx.HTTPStatusError as e: except httpx.HTTPStatusError as e:
status = getattr(e.response, "status_code", None) status = e.response.status_code
# If we get a 401 or 403, try to refresh the access token and retry once # If we get a 401 or 403, try to refresh the access token and retry once
if status in (401, 403): if status in (401, 403):
print(f"Received {status} from reliability API, attempting to refresh/re-login...") print(f"Received {status} from reliability API, attempting to refresh/re-login...")
@ -660,7 +660,7 @@ class Prediksi:
if not new_access: if not new_access:
print("Refresh failed, attempting full sign-in...") print("Refresh failed, attempting full sign-in...")
await self.sign_in() await self.sign_in()
new_access = getattr(self, "access_token", None) new_access = self.access_token
if new_access: if new_access:
try: try:
@ -985,7 +985,7 @@ async def main(RELIABILITY_APP_URL=RELIABILITY_APP_URL, assetnum=None, token=Non
# If token not provided, sign in to obtain access_token/refresh_token # If token not provided, sign in to obtain access_token/refresh_token
if token is None: if token is None:
signin_res = await prediksi.sign_in() signin_res = await prediksi.sign_in()
if not getattr(prediksi, "access_token", None): if not prediksi.access_token:
print("Failed to obtain access token; aborting.") print("Failed to obtain access token; aborting.")
return return
else: else:

@ -276,8 +276,16 @@ else:
# Mapping dari kolom Excel ke nama kolom database sesuai gambar 2 # Mapping dari kolom Excel ke nama kolom database sesuai gambar 2
libreoffice_path = "soffice" # Sesuaikan dengan lokasi di Linux/Windows libreoffice_path = "soffice" # Sesuaikan dengan lokasi di Linux/Windows
command = f'"{libreoffice_path}" --headless --convert-to xlsx {file_ref} --outdir {ROOT_DIR}/hasil' command = [
subprocess.run(command, shell=True, check=True) libreoffice_path,
"--headless",
"--convert-to",
"xlsx",
file_ref,
"--outdir",
f"{ROOT_DIR}/hasil",
]
subprocess.run(command, check=True)
print("recalculate OK") print("recalculate OK")

@ -58,7 +58,7 @@ async def import_fs_charts(
updated, missing = await update_fs_charts_from_matrix( updated, missing = await update_fs_charts_from_matrix(
db_session=db_session, db_session=db_session,
payload=payload, payload=payload,
updated_by=getattr(current_user, "user_id", None) if current_user else None, updated_by=current_user.user_id if current_user else None,
) )
msg = "Data imported successfully." msg = "Data imported successfully."

@ -21,7 +21,7 @@ def _resolve_user_id(user: Optional[CurrentUser]) -> Optional[str]:
if user is None: if user is None:
return None return None
# UserBase guarantees user_id # UserBase guarantees user_id
return getattr(user, "user_id", None) return user.user_id
async def get( async def get(

@ -137,7 +137,7 @@ async def delete_uploaded_file(db_session: DbSession, uploaded_file_id: str):
) )
# Attempt to delete the file on disk if a file_url is present # Attempt to delete the file on disk if a file_url is present
file_url = getattr(uploaded_file, "file_url", None) file_url = uploaded_file.file_url
if file_url: if file_url:
try: try:
# file_url is stored like "/uploads/<timestamp>/<filename>" # file_url is stored like "/uploads/<timestamp>/<filename>"

Loading…
Cancel
Save