You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1026 lines
46 KiB
Python

import os
import asyncio
import pandas as pd
import numpy as np
import numpy_financial as npf # Gunakan numpy-financial untuk fungsi keuangan
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeRegressor
import matplotlib.pyplot as plt
from starlette.config import Config
from uuid import uuid4
from psycopg2.extras import DictCursor
import httpx
from dotenv import load_dotenv
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from config import get_connection
from modules.equipment.formula import rc_labor_cost, rc_lost_cost, rc_total_cost
import json
load_dotenv()
class Prediksi:
def __init__(self, RELIABILITY_APP_URL=None):
# Allow passing the URL or fallback to environment/default so callers can omit the parameter
self.RELIABILITY_APP_URL = RELIABILITY_APP_URL or os.getenv(
"RELIABILITY_APP_URL", "http://192.168.1.82:8000/reliability"
)
# Base URL for auth endpoints (sign-in, refresh-token)
self.AUTH_APP_URL = os.getenv("AUTH_APP_URL", "http://192.168.1.82:8000")
# tokens will be stored here after sign-in/refresh
self.access_token = None
self.refresh_token = None
# Fungsi untuk mengambil data dari database
def __get_param(self, equipment_id):
try:
# Mendapatkan koneksi dari config.py
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
if connection is None:
print("Database connection failed.")
return None
# Membuat cursor menggunakan DictCursor
cursor = connection.cursor(cursor_factory=DictCursor)
# print(f"Getting params for equipment_id: {equipment_id}")
# Query untuk mendapatkan data
query = """
SELECT
(select COALESCE(forecasting_target_year, 2056) from lcc_ms_equipment_data where assetnum = %s) AS forecasting_target_year
"""
cursor.execute(query, (equipment_id,))
par1 = cursor.fetchone()
return par1["forecasting_target_year"]
except Exception as e:
print(f"Error saat get params dari database: {e}")
return None
finally:
if connection:
connection.close()
# Fungsi untuk mengambil data dari database
def __fetch_data_from_db(self, equipment_id):
try:
# Get connection from config.py (using only the first connection)
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
if connection is None:
print("Database connection failed.")
return None
# Membuat cursor menggunakan DictCursor
cursor = connection.cursor(cursor_factory=DictCursor)
# print(f"Fetcing data for equipment_id: {equipment_id}")
# Query untuk mendapatkan data
query = """
SELECT
tahun AS year,
raw_cm_interval AS cm_interval,
raw_cm_material_cost AS cm_cost,
raw_cm_labor_time AS cm_labor_time,
raw_cm_labor_human AS cm_labor_human,
raw_pm_interval AS pm_interval,
raw_pm_material_cost AS pm_cost,
raw_pm_labor_time AS pm_labor_time,
raw_pm_labor_human AS pm_labor_human,
raw_oh_interval AS oh_interval,
raw_oh_material_cost AS oh_cost,
raw_oh_labor_time AS oh_labor_time,
raw_oh_labor_human AS oh_labor_human,
raw_predictive_material_cost AS predictive_material_cost,
raw_predictive_labor_time AS predictive_labor_time,
raw_predictive_labor_human AS predictive_labor_human,
raw_predictive_interval AS predictive_interval,
"raw_loss_output_MW" AS loss_output_mw,
raw_loss_output_price AS loss_price
FROM lcc_equipment_tr_data
WHERE assetnum = %s
and is_actual=1
;
"""
cursor.execute(query, (equipment_id,))
# Mengambil hasil dan mengonversi ke DataFrame pandas
data = cursor.fetchall()
columns = [
desc[0] for desc in cursor.description
] # Mengambil nama kolom dari hasil query
df = pd.DataFrame(data, columns=columns)
return df
except Exception as e:
print(f"Error saat mengambil data dari database: {e}")
return None
finally:
if connection:
connection.close()
# Fungsi untuk prediksi menggunakan Future Value (FV)
def __future_value_predict(self, rate, nper, pmt, pv, years):
# Hitung Future Value untuk tahun-tahun proyeksi
fv_values = []
for i in range(len(years)):
fv = npf.fv(rate, nper + i, pmt, pv) # Menggunakan numpy_financial.fv
fv_values.append(fv)
return fv_values
# Fungsi untuk menghapus data proyeksi pada tahun tertentu
def __delete_predictions_from_db(self, equipment_id):
try:
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
if connection is None:
print("Database connection failed.")
return None
cursor = connection.cursor()
# Query untuk menghapus data berdasarkan tahun proyeksi
delete_query = """
DELETE FROM lcc_equipment_tr_data
WHERE assetnum = %s AND is_actual = 0;
""" # Asumsikan kolom is_actual digunakan untuk membedakan data proyeksi dan data aktual
# Eksekusi query delete
cursor.execute(delete_query, (equipment_id,))
connection.commit()
# print(f"Data proyeksi untuk tahun {equipment_id} berhasil dihapus.")
except Exception as e:
print(f"Error saat menghapus data proyeksi dari database: {e}")
finally:
if connection:
connection.close()
# Fungsi untuk menyimpan data proyeksi ke database
async def __insert_predictions_to_db(self, data, equipment_id, token):
try:
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
if connection is None:
print("Database connection failed.")
return None
cursor = connection.cursor()
# Query untuk mendapatkan nilai maksimum seq
get_max_seq_query = """
SELECT COALESCE(MAX(seq), 0) FROM lcc_equipment_tr_data WHERE assetnum = %s
"""
cursor.execute(get_max_seq_query, (equipment_id,))
max_seq = cursor.fetchone()[0]
# Query untuk insert data
insert_query = """
INSERT INTO lcc_equipment_tr_data (
id,
seq,
is_actual,
raw_pm_interval,
tahun, assetnum,
raw_cm_interval, raw_cm_material_cost, raw_cm_labor_time, raw_cm_labor_human,
raw_pm_material_cost, raw_pm_labor_time, raw_pm_labor_human,
raw_oh_interval, raw_oh_material_cost, raw_oh_labor_time, raw_oh_labor_human,
raw_predictive_interval, raw_predictive_material_cost, raw_predictive_labor_time, raw_predictive_labor_human,
"raw_loss_output_MW", raw_loss_output_price
, created_by, created_at
) VALUES (
%s, %s, 0, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'Sys', NOW()
)
"""
# If a token was provided, store locally so fetch_api_data can use/refresh it
if token:
self.access_token = token
# Fetch data from external API (uses instance access_token and will try refresh on 403)
async def fetch_api_data(assetnum: str, year: int) -> dict:
url = self.RELIABILITY_APP_URL
endpoint = f"{url}/main/number-of-failures/{assetnum}/{int(year)}/{int(year)}"
async with httpx.AsyncClient() as client:
try:
current_token = getattr(self, "access_token", None)
response = await client.get(
endpoint,
timeout=30.0,
headers={"Authorization": f"Bearer {current_token}"} if current_token else {},
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
status = getattr(e.response, "status_code", None)
# If we get a 403, try to refresh the access token and retry once
if status == 403:
print("Received 403 from reliability API, attempting to refresh access token...")
new_access = await self.refresh_access_token()
if new_access:
try:
response = await client.get(
endpoint,
timeout=30.0,
headers={"Authorization": f"Bearer {new_access}"},
)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e2:
print(f"HTTP error occurred after refresh: {e2}")
return {}
print(f"HTTP error occurred: {e}")
return {}
except httpx.HTTPError as e:
print(f"HTTP error occurred: {e}")
return {}
# Menyiapkan data untuk batch insert
# print(f"Data to be inserted: {data}")
records_to_insert = []
for _, row in data.iterrows():
max_seq = max_seq + 1
# (token already stored before defining fetch_api_data)
# maintain previous cm_interval between iterations using attribute on fetch_api_data
if not hasattr(fetch_api_data, "prev_cm"):
fetch_api_data.prev_cm = None
# Update values from API (current year)
api_data = await fetch_api_data(equipment_id, row["year"])
if api_data and "data" in api_data and isinstance(api_data["data"], list) and len(api_data["data"]) > 0:
try:
cur_cm = float(api_data["data"][0].get("num_fail", row.get("cm_interval", 1)))
except Exception:
cur_cm = float(row.get("cm_interval", 1)) if not pd.isna(row.get("cm_interval", None)) else 1.0
else:
try:
val = float(row.get("cm_interval", 1))
cur_cm = val if val >= 1 else 1.0
except Exception:
cur_cm = 1.0
# Determine previous cm_interval: prefer stored prev_cm, otherwise try API for previous year, else fallback to cur_cm
if fetch_api_data.prev_cm is not None:
prev_cm = float(fetch_api_data.prev_cm)
else:
try:
api_prev = await fetch_api_data(equipment_id, int(row["year"]) - 1)
if api_prev and "data" in api_prev and isinstance(api_prev["data"], list) and len(api_prev["data"]) > 0:
prev_cm = float(api_prev["data"][0].get("num_fail", cur_cm))
else:
# attempt to use any available previous value from the row if present, otherwise fallback to current
prev_cm = float(row.get("cm_interval", cur_cm)) if not pd.isna(row.get("cm_interval", None)) else cur_cm
except Exception:
prev_cm = cur_cm
# compute difference: current year interval minus previous year interval
try:
cm_interval_diff = float(cur_cm) - float(prev_cm)
except Exception:
cm_interval_diff = 0.0
# append record using the difference for raw_cm_interval
records_to_insert.append(
(
str(uuid4()),
int(max_seq),
float(row["pm_interval"]) if not pd.isna(row.get("pm_interval", None)) else 0.0,
float(row["year"]) if not pd.isna(row.get("year", None)) else 0.0,
equipment_id,
cm_interval_diff,
float(row["cm_cost"]) if not pd.isna(row.get("cm_cost", None)) else 0.0,
float(row["cm_labor_time"]) if not pd.isna(row.get("cm_labor_time", None)) else 0.0,
float(row["cm_labor_human"]) if not pd.isna(row.get("cm_labor_human", None)) else 0.0,
float(row["pm_cost"]) if not pd.isna(row.get("pm_cost", None)) else 0.0,
float(row["pm_labor_time"]) if not pd.isna(row.get("pm_labor_time", None)) else 0.0,
float(row["pm_labor_human"]) if not pd.isna(row.get("pm_labor_human", None)) else 0.0,
float(row["oh_interval"]) if not pd.isna(row.get("oh_interval", None)) else 0.0,
float(row["oh_cost"]) if not pd.isna(row.get("oh_cost", None)) else 0.0,
float(row["oh_labor_time"]) if not pd.isna(row.get("oh_labor_time", None)) else 0.0,
float(row["oh_labor_human"]) if not pd.isna(row.get("oh_labor_human", None)) else 0.0,
float(row["predictive_interval"]) if not pd.isna(row.get("predictive_interval", None)) else 0.0,
float(row["predictive_material_cost"]) if not pd.isna(row.get("predictive_material_cost", None)) else 0.0,
float(row["predictive_labor_time"]) if not pd.isna(row.get("predictive_labor_time", None)) else 0.0,
float(row["predictive_labor_human"]) if not pd.isna(row.get("predictive_labor_human", None)) else 0.0,
float(row["loss_output_mw"]) if not pd.isna(row.get("loss_output_mw", None)) else 0.0,
float(row["loss_price"]) if not pd.isna(row.get("loss_price", None)) else 0.0,
)
)
# store current cm for next iteration
fetch_api_data.prev_cm = cur_cm
# Eksekusi batch insert
cursor.executemany(insert_query, records_to_insert)
connection.commit()
# print("Data proyeksi berhasil dimasukkan ke database.")
except Exception as e:
print(f"Error saat menyimpan data ke database: {e}")
finally:
if connection:
connection.close()
def __get_asset_criticality_params(self, equipment_id):
try:
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
if connection is None:
print("Database connection failed.")
return None
cursor = connection.cursor(cursor_factory=DictCursor)
# Query untuk mendapatkan asset criticality
query = """
SELECT row_to_json(t) AS asset_criticality
FROM (
SELECT
asset_crit_ens_energy_not_served,
asset_crit_bpp_system,
asset_crit_bpp_pembangkit,
asset_crit_dmn_daya_mampu_netto,
asset_crit_marginal_cost,
asset_crit_efdh_equivalent_force_derated_hours,
asset_crit_foh_force_outage_hours,
asset_crit_extra_fuel_cost
FROM lcc_ms_equipment_data
WHERE assetnum = %s
) t
"""
cursor.execute(query, (equipment_id,))
result = cursor.fetchone()
asset_crit = result.get("asset_criticality") if result else None
if not asset_crit:
return None
# asset_crit may already be a dict (from row_to_json) or a JSON string
try:
ac = asset_crit if isinstance(asset_crit, dict) else json.loads(asset_crit)
except Exception:
ac = {}
def _f(key):
try:
return float(ac.get(key) or 0.0)
except Exception:
return 0.0
ens = _f("asset_crit_ens_energy_not_served") # ENS
bpp_syst = _f("asset_crit_bpp_system") # BPP_SYST
dmn = _f("asset_crit_dmn_daya_mampu_netto") # DMN
extra_fuel = _f("asset_crit_extra_fuel_cost") # Extra Fuel Cost
# Formula from image:
# Asset Criticality = (ENS/1 hour * (7% * BPP_SYST)) + ((DMN - ENS/1 hour) * Extra Fuel Cost)
# ENS/1 hour is ENS (division by 1)
part1 = ens * (0.07 * bpp_syst)
part2 = max(0.0, (dmn - ens)) * extra_fuel
asset_criticality = part1 + part2
efdh = _f("asset_crit_efdh_equivalent_force_derated_hours") # EFDH
foh = _f("asset_crit_foh_force_outage_hours")
return {
"asset_criticality": asset_criticality,
"efdh_oh_sum": efdh + foh,
}
except Exception as e:
print(f"Error saat mendapatkan asset criticality dari database: {e}")
return None
finally:
if connection:
connection.close()
# Fungsi untuk menghapus data proyeksi pada tahun tertentu
def __update_data_lcc(self, equipment_id):
try:
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
if connection is None:
print("Database connection failed.")
return None
cursor = connection.cursor(cursor_factory=DictCursor)
# Ambil semua baris untuk assetnum
select_q = '''
SELECT id, seq, tahun,
raw_cm_interval, raw_cm_material_cost, raw_cm_labor_time, raw_cm_labor_human,
raw_pm_interval, raw_pm_material_cost, raw_pm_labor_time, raw_pm_labor_human,
raw_predictive_interval, raw_predictive_material_cost, raw_predictive_labor_time, raw_predictive_labor_human,
raw_oh_interval, raw_oh_material_cost, raw_oh_labor_time, raw_oh_labor_human,
raw_predictive_interval, raw_predictive_material_cost, raw_predictive_labor_time, raw_predictive_labor_human,
"raw_loss_output_MW" as raw_loss_output_mw, raw_loss_output_price,
raw_operational_cost, raw_maintenance_cost, rc_material_cost
FROM lcc_equipment_tr_data
WHERE assetnum = %s;
'''
cursor.execute(select_q, (equipment_id,))
rows = cursor.fetchall()
# Helper to get man_hour for a year (fallback to master 'manhours_rate')
def _get_man_hour_for_year(year):
try:
cur = connection.cursor()
cur.execute("SELECT man_hour FROM lcc_ms_year_data WHERE year = %s", (year,))
r = cur.fetchone()
if r and r[0] is not None:
return float(r[0])
cur.execute("SELECT value_num FROM lcc_ms_master WHERE name='manhours_rate'")
r2 = cur.fetchone()
if r2 and r2[0] is not None:
return float(r2[0])
except Exception:
pass
return 0.0
update_q = '''
UPDATE lcc_equipment_tr_data
SET rc_cm_material_cost = %s,
rc_cm_labor_cost = %s,
rc_pm_material_cost = %s,
rc_pm_labor_cost = %s,
rc_predictive_material_cost = %s,
rc_predictive_labor_cost = %s,
rc_oh_material_cost = %s,
rc_oh_labor_cost = %s,
rc_lost_cost = %s,
rc_operation_cost = %s,
rc_maintenance_cost = %s,
rc_total_cost = %s,
updated_by = 'Sys', updated_at = NOW()
WHERE id = %s;
'''
for r in rows:
try:
yr = r.get("tahun") if isinstance(r, dict) else r[2]
man_hour = _get_man_hour_for_year(yr)
raw_cm_interval = float(r.get("raw_cm_interval") or 0.0)
raw_cm_labor_time = float(r.get("raw_cm_labor_time") or 0.0)
raw_cm_labor_human = float(r.get("raw_cm_labor_human") or 0.0)
raw_pm_interval = float(r.get("raw_pm_interval") or 0.0)
raw_pm_material_cost = float(r.get("raw_pm_material_cost") or 0.0)
raw_pm_labor_time = float(r.get("raw_pm_labor_time") or 0.0)
raw_pm_labor_human = float(r.get("raw_pm_labor_human") or 0.0)
raw_predictive_interval = float(r.get("raw_predictive_interval") or 0.0)
raw_predictive_material_cost = float(r.get("raw_predictive_material_cost") or 0.0)
raw_predictive_labor_time = float(r.get("raw_predictive_labor_time") or 0.0)
raw_predictive_labor_human = float(r.get("raw_predictive_labor_human") or 0.0)
raw_oh_interval = float(r.get("raw_oh_interval") or 0.0)
raw_oh_material_cost = float(r.get("raw_oh_material_cost") or 0.0)
raw_oh_labor_time = float(r.get("raw_oh_labor_time") or 0.0)
raw_oh_labor_human = float(r.get("raw_oh_labor_human") or 0.0)
raw_loss_output_mw = float(r.get("raw_loss_output_mw") or 0.0)
raw_loss_output_price = float(r.get("raw_loss_output_price") or 0.0)
raw_operational_cost = float(r.get("raw_operational_cost") or 0.0)
raw_maintenance_cost = float(r.get("raw_maintenance_cost") or 0.0)
rc_cm_material_cost = float(r.get("rc_cm_material_cost") or 0.0)
# compute per-column costs using helpers
rc_cm_material = rc_cm_material_cost
rc_cm_labor = rc_labor_cost(raw_cm_interval, raw_cm_labor_time, raw_cm_labor_human, man_hour)
try:
if np.isfinite(raw_pm_interval) and raw_pm_interval != 0:
rc_pm_material = raw_pm_material_cost * raw_pm_interval
else:
rc_pm_material = raw_pm_material_cost
except Exception:
rc_pm_material = 0.0
rc_pm_labor = rc_labor_cost(raw_pm_interval, raw_pm_labor_time, raw_pm_labor_human, man_hour)
try:
if np.isfinite(raw_predictive_interval) and raw_predictive_interval != 0:
rc_predictive_material = raw_predictive_material_cost * raw_predictive_interval
else:
rc_predictive_material = raw_predictive_material_cost
except Exception:
rc_predictive_material = 0.0
rc_predictive_labor = raw_predictive_labor_human
try:
rc_predictive_labor = rc_labor_cost(raw_predictive_interval, raw_predictive_labor_time, raw_predictive_labor_human, man_hour)
except Exception:
rc_predictive_labor = 0.0
rc_oh_material = raw_oh_material_cost
rc_oh_labor = rc_labor_cost(raw_oh_interval, raw_oh_labor_time, raw_oh_labor_human, man_hour)
rc_lost = rc_lost_cost(raw_loss_output_mw, raw_loss_output_price, raw_cm_interval)
rc_operation = raw_operational_cost
rc_maintenance = raw_maintenance_cost
asset_criticality_data = self.__get_asset_criticality_params(equipment_id)
asset_criticality_value = 0.0
# Simplify extraction and avoid repeating the multiplication
ac = asset_criticality_data if isinstance(asset_criticality_data, dict) else {}
try:
efdh_oh_sum = float(ac.get("efdh_oh_sum", 0.0))
except Exception:
efdh_oh_sum = 0.0
try:
asset_criticality_value = float(ac.get("asset_criticality", 0.0))
except Exception:
asset_criticality_value = 0.0
# single multiplier used for all RC groups
ac_multiplier = efdh_oh_sum * asset_criticality_value
total = rc_total_cost(
rc_cm=rc_cm_material + rc_cm_labor + ac_multiplier,
rc_pm=rc_pm_material + rc_pm_labor + ac_multiplier,
rc_predictive=rc_predictive_material + rc_predictive_labor + ac_multiplier,
rc_oh=rc_oh_material + rc_oh_labor + ac_multiplier,
rc_lost=rc_lost,
rc_operation=rc_operation,
rc_maintenance=rc_maintenance,
)
id_val = r.get("id") if isinstance(r, dict) else r[0]
cursor.execute(
update_q,
(
rc_cm_material,
rc_cm_labor,
rc_pm_material,
rc_pm_labor,
rc_predictive_material,
rc_predictive_labor,
rc_oh_material,
rc_oh_labor,
rc_lost,
rc_operation,
rc_maintenance,
total,
id_val,
),
)
except Exception:
# ignore row-specific errors and continue
continue
# For seq=0 rows, set rc_total_cost to acquisition_cost
cursor.execute(
"update lcc_equipment_tr_data set rc_total_cost = (select acquisition_cost from lcc_ms_equipment_data where assetnum=lcc_equipment_tr_data.assetnum) where assetnum = %s and seq=0;",
(equipment_id,)
)
connection.commit()
except Exception as e:
print(f"Error saat update data proyeksi dari database: {e}")
finally:
if connection:
connection.close()
# Fungsi untuk mengambil parameter dari database
def __get_rate_and_max_year(self, equipment_id):
try:
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
if connection is None:
print("Database connection failed.")
return None
cursor = connection.cursor(cursor_factory=DictCursor)
# Query untuk mendapatkan rate dan max_year
query = """
SELECT
(SELECT value_num / 100 FROM lcc_ms_master where name='inflation_rate') AS rate,
(SELECT MAX(tahun) FROM lcc_equipment_tr_data WHERE is_actual = 1 AND assetnum = %s) AS max_year
"""
cursor.execute(query, (equipment_id,))
result = cursor.fetchone()
# Debug hasil query
# print(f"Result: {result}")
rate = result["rate"]
max_year = result["max_year"]
# Validasi nilai rate dan max_year
if rate is None:
raise Exception(
"Nilai 'rate' tidak boleh kosong. Periksa tabel 'lcc_ms_master'."
)
if max_year is None:
raise Exception(
"Nilai 'max_year' tidak boleh kosong. Periksa tabel 'lcc_equipment_tr_data'."
)
return rate, max_year
except Exception as e:
print(f"Error saat mendapatkan parameter dari database: {e}")
raise # Lempar kembali exception agar program berhenti
finally:
if connection:
connection.close()
# Authentication: sign-in and refresh helpers
async def sign_in(self, username: str = "user14", password: str = "password") -> dict:
"""Sign in to AUTH_APP_URL/sign-in using provided username/password.
Stores access_token and refresh_token on the instance when successful and returns the parsed response dict.
"""
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{self.AUTH_APP_URL}/sign-in",
json={"username": username, "password": password},
timeout=30.0,
)
resp.raise_for_status()
data = resp.json()
if isinstance(data, dict) and "data" in data:
d = data.get("data") or {}
# set tokens if present
self.access_token = d.get("access_token")
self.refresh_token = d.get("refresh_token")
return data
except httpx.HTTPError as e:
print(f"Sign-in failed: {e}")
return None
async def refresh_access_token(self) -> str:
"""Refresh the access token using the stored refresh_token via AUTH_APP_URL/refresh-token.
On success updates self.access_token and returns it. Returns None on failure.
"""
if not getattr(self, "refresh_token", None):
print("No refresh token available to refresh access token.")
return None
try:
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{self.AUTH_APP_URL}/refresh-token",
headers={"Authorization": f"Bearer {self.refresh_token}"},
timeout=30.0,
)
resp.raise_for_status()
data = resp.json()
if isinstance(data, dict) and "data" in data:
new_access = data.get("data", {}).get("access_token")
if new_access:
self.access_token = new_access
print("Access token refreshed.")
return new_access
print("Refresh response did not contain a new access token.")
return None
except httpx.HTTPError as e:
print(f"Error refreshing token: {e}")
return None
# ======================================================================================================================================================
async def predict_equipment_data(self, assetnum, token):
try:
# Mengambil data dari database
df = self.__fetch_data_from_db(assetnum)
if df is None:
print("Data tidak tersedia untuk prediksi.")
return
# Mendapatkan tahun proyeksi dari DB
par_tahun_target = self.__get_param(assetnum)
# Tahun proyeksi
future_years = list(range(df["year"].max() + 1, par_tahun_target + 1))
# Hasil prediksi
predictions = {"year": future_years}
# Fungsi untuk prediksi menggunakan Linear Regression
def linear_regression_predict(column, years):
x = df["year"].values.reshape(-1, 1)
y = df[column].fillna(0).values
model = LinearRegression()
model.fit(x, y)
future_x = np.array(years).reshape(-1, 1)
preds = model.predict(future_x)
return np.abs(preds)
# Fungsi untuk prediksi menggunakan Exponential Smoothing
def exponential_smoothing_predict(column, years):
data_series = df[column].fillna(0).values
# Add a small epsilon to avoid zeros in the data if needed
if np.any(data_series == 0):
data_series = data_series + 1e-10
model = ExponentialSmoothing(
data_series, trend="add", seasonal=None, seasonal_periods=None
)
model_fit = model.fit(optimized=True, use_brute=False)
preds = model_fit.forecast(len(years))
return np.abs(preds)
# Fungsi untuk prediksi menggunakan Decision Tree
def decision_tree_predict(column, years):
x = df["year"].values.reshape(-1, 1)
y = df[column].fillna(0).values
model = DecisionTreeRegressor()
model.fit(x, y)
future_x = np.array(years).reshape(-1, 1)
preds = model.predict(future_x)
return np.abs(preds)
# Mendapatkan rate dan tahun maksimal
rate, max_year = self.__get_rate_and_max_year(assetnum)
pmt = 0
# Prediksi untuk setiap kolom
for column in df.columns:
if column == "year":
continue
n_future = len(future_years)
col_lower = column.lower()
try:
# Case untuk kolom yang terkait dengan corrective maintenance (cm)
if "cm" in col_lower:
# Tentukan jumlah baris recent yang dianggap actual jika kolom is_actual ada
if "is_actual" in df.columns:
recent_df = df[df["is_actual"] == 1]
recent_n = recent_df.shape[0]
else:
recent_df = df
recent_n = df.shape[0]
recent_n = max(1, recent_n)
recent_vals = (
recent_df.sort_values("year", ascending=False)
.head(recent_n)[column]
.dropna()
)
# Fallback ke semua nilai non-na jika tidak ada recent_vals
if recent_vals.empty:
recent_vals = df[column].dropna()
# Jika masih kosong, pakai default (interval minimal 1, lainnya 0)
if recent_vals.empty:
avg = 0.0
else:
# Pastikan numeric; jika gagal, pakai mean dari yang bisa dikonversi
try:
avg = float(np.nanmean(recent_vals.astype(float)))
except Exception:
# jika conversion gagal gunakan mean pandas (objek mungkin numeric-like)
avg = float(recent_vals.mean())
if "interval" in col_lower:
avg = max(0.0, avg)
preds = np.repeat(float(avg), n_future)
else:
# Untuk kolom non-cm, gunakan nilai dari last actual year bila ada,
# jika tidak ada gunakan last available non-NA value, jika tidak ada pakai 0.0
if "is_actual" in df.columns and not df[df["is_actual"] == 1].empty:
last_actual_year_series = df[df["is_actual"] == 1]["year"]
last_actual_year = (
int(last_actual_year_series.max())
if not last_actual_year_series.isna().all()
else int(df["year"].max())
)
else:
last_actual_year = int(df["year"].max())
row_vals = df[df["year"] == last_actual_year]
value = None
if not row_vals.empty:
val = row_vals[column].iloc[-1]
if not pd.isna(val):
try:
value = float(val)
except Exception:
# jika bukan numeric, set 0.0
value = 0.0
if value is None:
non_na = df[column].dropna()
if not non_na.empty:
try:
value = float(non_na.iloc[-1])
except Exception:
value = 0.0
else:
value = 0.0
preds = np.repeat(float(value), n_future)
except Exception:
# Jika terjadi error unexpected, fallback ke nol
preds = np.repeat(0.0, n_future)
# Pastikan semua prediksi bernilai non-negatif float dan berbentuk list sesuai panjang future_years
preds = np.abs(np.array(preds, dtype=float))
predictions[column] = preds.tolist()
# if "cost" in column.lower():
# # Prediksi Future Value
# nper = max_year - df["year"].max()
# pv = -df[column].iloc[-1]
# predictions[column] = self.__future_value_predict(
# rate, nper, pmt, pv, future_years
# )
# elif df[column].nunique() < 5:
# predictions[column] = exponential_smoothing_predict(
# column, future_years
# )
# elif df[column].isnull().sum() > 0:
# predictions[column] = decision_tree_predict(
# column, future_years
# )
# else:
# predictions[column] = linear_regression_predict(
# column, future_years
# )
# for column in df.columns:
# if column != "year":
# if "cost" in column.lower():
# # Prediksi Future Value
# # ensure nper is an integer and non-negative
# try:
# nper = int(max_year - df["year"].max())
# except Exception:
# nper = 0
# if nper < 0:
# nper = 0
# # safe conversion of last observed value to numeric present value (pv)
# try:
# last_val = df[column].iloc[-1]
# pv = -float(last_val) if not pd.isna(last_val) else 0.0
# except Exception:
# pv = 0.0
# # compute future values and ensure preds is a numpy float array
# fv_list = self.__future_value_predict(
# rate, nper, pmt, pv, future_years
# )
# preds = np.array(fv_list, dtype=float)
# predictions[column] = preds
# elif df[column].nunique() < 5:
# preds = exponential_smoothing_predict(column, future_years)
# elif df[column].isnull().sum() > 0:
# preds = decision_tree_predict(column, future_years)
# else:
# # Produce sideways / fluctuating predictions around recent level (deterministic)
# series = df[column].dropna().values
# if len(series) == 0:
# base = 0.0
# else:
# base = float(np.mean(series[-3:])) if len(series) >= 3 else float(series[-1])
# # amplitude based on historical std, fallback to a small fraction of base
# hist_std = float(np.std(series)) if len(series) > 1 else max(abs(base) * 0.01, 0.0)
# amp = max(hist_std, abs(base) * 0.01)
# t = np.arange(len(future_years))
# preds = base + amp * np.sin(2 * np.pi * t / max(len(future_years), 1))
# # avoid negative predictions for inherently non-negative series
# preds = np.where(preds < 0, 0, preds)
# # normalize preds to numpy float array
# preds = np.array(preds, dtype=float)
# # Columns containing "human" should be rounded to one decimal and clamped 0.0-3.0
# if "human" in column.lower():
# # humans must be whole numbers (no decimals) and capped between 0 and 3
# preds = np.nan_to_num(preds, nan=0.0)
# preds = np.rint(preds) # round to nearest integer
# preds = np.clip(preds, 0, 3).astype(int)
# # Columns containing "labor_time" should be reasonable yearly hours.
# # If predictions are unrealistically large, scale them down proportionally to a sane max (e.g., 2000 hours/year),
# # then round to one decimal and ensure non-negative.
# if "labor_time" in column.lower():
# max_yearly_hours = 2000.0
# current_max = np.nanmax(preds) if preds.size > 0 else 0.0
# if current_max > max_yearly_hours and current_max > 0:
# scale = max_yearly_hours / current_max
# preds = preds * scale
# preds = np.clip(preds, 0.0, max_yearly_hours)
# preds = np.round(preds, 1)
# predictions[column] = preds
# Konversi hasil ke DataFrame
predictions_df = pd.DataFrame(predictions)
# print(predictions_df)
# Hapus data prediksi yang ada sebelumnya
self.__delete_predictions_from_db(assetnum)
# Insert hasil prediksi ke database
try:
await self.__insert_predictions_to_db(
predictions_df, assetnum, token
)
except Exception as e:
print(f"Error saat insert data ke database: {e}")
# self.__insert_predictions_to_db(predictions_df, p_equipment_id)
# Update data untuk total RiskCost per tahun
self.__update_data_lcc(assetnum)
except Exception as e:
print(f"Program dihentikan: {e}")
RELIABILITY_APP_URL = os.getenv("RELIABILITY_APP_URL", "http://192.168.1.82:8000/reliability")
async def main(RELIABILITY_APP_URL=RELIABILITY_APP_URL, assetnum=None):
connection = None
try:
prediksi = Prediksi(RELIABILITY_APP_URL)
# Sign in to obtain access_token/refresh_token before processing
signin_res = await prediksi.sign_in()
if not getattr(prediksi, "access_token", None):
print("Failed to obtain access token; aborting.")
return
# If an assetnum was provided, run only for that assetnum
if assetnum:
print(f"Processing single assetnum: {assetnum}")
try:
await prediksi.predict_equipment_data(assetnum, prediksi.access_token)
except Exception as e:
print(f"Error processing {assetnum}: {e}")
print("Selesai.")
return
# Otherwise fetch all assetnums from DB and loop
connections = get_connection()
connection = connections[0] if isinstance(connections, tuple) else connections
if connection is None:
print("Database connection failed.")
return
cursor = connection.cursor(cursor_factory=DictCursor)
query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master"
cursor.execute(query_main)
results = cursor.fetchall()
for idx, row in enumerate(results, start=1):
current_asset = row.get("assetnum") if hasattr(row, "get") else row[0]
if not current_asset or str(current_asset).strip() == "":
print(f"[{idx}/{len(results)}] Skipping empty assetnum")
continue
print(f"[{idx}/{len(results)}] Processing assetnum: {current_asset}")
try:
await prediksi.predict_equipment_data(current_asset, prediksi.access_token)
except Exception as e:
print(f"Error processing {current_asset}: {e}")
print("Selesai.")
except Exception as e:
print(f"Error in main: {e}")
return
finally:
if connection:
connection.close()
if __name__ == "__main__":
asyncio.run(
main()
)