You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1070 lines
46 KiB
Python
1070 lines
46 KiB
Python
import os
|
|
import asyncio
|
|
import pandas as pd
|
|
import numpy as np
|
|
import numpy_financial as npf # Gunakan numpy-financial untuk fungsi keuangan
|
|
from statsmodels.tsa.holtwinters import ExponentialSmoothing
|
|
from sklearn.linear_model import LinearRegression
|
|
from sklearn.tree import DecisionTreeRegressor
|
|
import matplotlib.pyplot as plt
|
|
from starlette.config import Config
|
|
from uuid import uuid4
|
|
from psycopg2.extras import DictCursor
|
|
import httpx
|
|
from dotenv import load_dotenv
|
|
|
|
import sys
|
|
import os
|
|
from .formula import rc_labor_cost, rc_lost_cost, rc_total_cost
|
|
from src.modules.config import get_connection, get_production_connection
|
|
import json
|
|
|
|
load_dotenv()
|
|
|
|
|
|
class Prediksi:
|
|
def __init__(self, RELIABILITY_APP_URL=None):
|
|
# Allow passing the URL or fallback to environment/default so callers can omit the parameter
|
|
self.RELIABILITY_APP_URL = RELIABILITY_APP_URL or os.getenv(
|
|
"RELIABILITY_APP_URL", "http://192.168.1.82:8000/reliability"
|
|
)
|
|
# Base URL for auth endpoints (sign-in, refresh-token)
|
|
self.AUTH_APP_URL = os.getenv("AUTH_APP_URL", "http://192.168.1.82:8000")
|
|
# tokens will be stored here after sign-in/refresh
|
|
self.access_token = None
|
|
self.refresh_token = None
|
|
|
|
# Fungsi untuk mengambil data dari database
|
|
def __get_param(self, equipment_id):
|
|
try:
|
|
# Mendapatkan koneksi dari config.py
|
|
connection, connection_wo_db = get_connection()
|
|
if connection is None:
|
|
print("Database connection failed.")
|
|
return None
|
|
# Membuat cursor menggunakan DictCursor
|
|
cursor = connection.cursor(cursor_factory=DictCursor)
|
|
# print(f"Getting params for equipment_id: {equipment_id}")
|
|
# Query untuk mendapatkan data
|
|
query = """
|
|
SELECT
|
|
(select COALESCE(forecasting_target_year, 2056) from lcc_ms_equipment_data where assetnum = %s) AS forecasting_target_year
|
|
"""
|
|
cursor.execute(query, (equipment_id,))
|
|
par1 = cursor.fetchone()
|
|
return par1["forecasting_target_year"]
|
|
|
|
except Exception as e:
|
|
print(f"Error saat get params dari database: {e}")
|
|
return None
|
|
|
|
finally:
|
|
if connection:
|
|
connection.close()
|
|
|
|
# Fungsi untuk mengambil data dari database
|
|
def __fetch_data_from_db(self, equipment_id):
|
|
try:
|
|
# Get connection from config.py (using only the first connection)
|
|
connection, connection_wo_db = get_connection()
|
|
if connection is None:
|
|
print("Database connection failed.")
|
|
return None
|
|
|
|
# Membuat cursor menggunakan DictCursor
|
|
cursor = connection.cursor(cursor_factory=DictCursor)
|
|
# print(f"Fetcing data for equipment_id: {equipment_id}")
|
|
# Query untuk mendapatkan data
|
|
query = """
|
|
SELECT
|
|
tahun AS year,
|
|
rc_cm_material_cost,
|
|
rc_cm_labor_cost,
|
|
rc_pm_material_cost,
|
|
rc_pm_labor_cost,
|
|
rc_oh_material_cost,
|
|
rc_oh_labor_cost,
|
|
rc_predictive_material_cost,
|
|
rc_predictive_labor_cost
|
|
FROM lcc_equipment_tr_data
|
|
WHERE assetnum = %s
|
|
and is_actual=1
|
|
and seq != 0
|
|
;
|
|
"""
|
|
cursor.execute(query, (equipment_id,))
|
|
|
|
# Mengambil hasil dan mengonversi ke DataFrame pandas
|
|
data = cursor.fetchall()
|
|
columns = [
|
|
desc[0] for desc in cursor.description
|
|
] # Mengambil nama kolom dari hasil query
|
|
df = pd.DataFrame(data, columns=columns)
|
|
return df
|
|
|
|
except Exception as e:
|
|
print(f"Error saat mengambil data dari database: {e}")
|
|
return None
|
|
|
|
finally:
|
|
if connection:
|
|
connection.close()
|
|
|
|
# Fungsi untuk prediksi menggunakan Future Value (FV)
|
|
def __future_value_predict(self, rate, nper, pmt, pv, years):
|
|
# Hitung Future Value untuk tahun-tahun proyeksi
|
|
fv_values = []
|
|
for i in range(len(years)):
|
|
fv = npf.fv(rate, nper + i, pmt, pv) # Menggunakan numpy_financial.fv
|
|
fv_values.append(fv)
|
|
return fv_values
|
|
|
|
# Fungsi untuk menghapus data proyeksi pada tahun tertentu
|
|
# def __delete_predictions_from_db(self, equipment_id):
|
|
# try:
|
|
# connections = get_connection()
|
|
# connection = (
|
|
# connections[0] if isinstance(connections, tuple) else connections
|
|
# )
|
|
# if connection is None:
|
|
# print("Database connection failed.")
|
|
# return None
|
|
|
|
# cursor = connection.cursor()
|
|
|
|
# # Query untuk menghapus data berdasarkan tahun proyeksi
|
|
# delete_query = """
|
|
# DELETE FROM lcc_equipment_tr_data
|
|
# WHERE assetnum = %s AND is_actual = 0;
|
|
# """ # Asumsikan kolom is_actual digunakan untuk membedakan data proyeksi dan data aktual
|
|
|
|
# # Eksekusi query delete
|
|
# cursor.execute(delete_query, (equipment_id,))
|
|
# connection.commit()
|
|
# # print(f"Data proyeksi untuk tahun {equipment_id} berhasil dihapus.")
|
|
|
|
# except Exception as e:
|
|
# print(f"Error saat menghapus data proyeksi dari database: {e}")
|
|
|
|
# finally:
|
|
# if connection:
|
|
# connection.close()
|
|
|
|
# Fungsi untuk menyimpan data proyeksi ke database
|
|
async def __insert_predictions_to_db(self, data, equipment_id, token):
|
|
try:
|
|
connection, connection_wo_db = get_connection()
|
|
if connection is None:
|
|
print("Database connection failed.")
|
|
return None
|
|
|
|
cursor = connection.cursor()
|
|
|
|
# Query untuk mendapatkan nilai maksimum seq dari data actual
|
|
get_max_seq_query = """
|
|
SELECT COALESCE(MAX(seq), 0) FROM lcc_equipment_tr_data WHERE assetnum = %s AND is_actual = 1
|
|
"""
|
|
cursor.execute(get_max_seq_query, (equipment_id,))
|
|
max_seq = cursor.fetchone()[0]
|
|
|
|
# Query untuk insert data
|
|
insert_query = """
|
|
INSERT INTO lcc_equipment_tr_data (
|
|
id,
|
|
seq,
|
|
is_actual,
|
|
tahun, assetnum,
|
|
rc_cm_material_cost,
|
|
rc_cm_labor_cost,
|
|
rc_pm_material_cost,
|
|
rc_pm_labor_cost,
|
|
rc_oh_material_cost,
|
|
rc_oh_labor_cost,
|
|
rc_predictive_material_cost,
|
|
rc_predictive_labor_cost,
|
|
created_by, created_at
|
|
) VALUES (
|
|
%s, %s, 0, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'Sys', NOW()
|
|
)
|
|
"""
|
|
# Menyiapkan data untuk batch insert atau update
|
|
records_to_insert = []
|
|
|
|
check_existence_query = """
|
|
SELECT id FROM lcc_equipment_tr_data
|
|
WHERE assetnum = %s AND tahun = %s AND is_actual = 0
|
|
"""
|
|
|
|
update_query = """
|
|
UPDATE lcc_equipment_tr_data
|
|
SET
|
|
seq = %s,
|
|
rc_cm_material_cost = %s,
|
|
rc_cm_labor_cost = %s,
|
|
rc_pm_material_cost = %s,
|
|
rc_pm_labor_cost = %s,
|
|
rc_oh_material_cost = %s,
|
|
rc_oh_labor_cost = %s,
|
|
rc_predictive_material_cost = %s,
|
|
rc_predictive_labor_cost = %s,
|
|
updated_by = 'Sys',
|
|
updated_at = NOW()
|
|
WHERE id = %s
|
|
"""
|
|
|
|
for idx, row in data.iterrows():
|
|
loop_seq = max_seq + idx + 1
|
|
# Check if data exists
|
|
cursor.execute(check_existence_query, (equipment_id, int(row["year"])))
|
|
existing_record = cursor.fetchone()
|
|
if existing_record:
|
|
# print("Update existing record")
|
|
# Update existing record
|
|
record_id = existing_record[0]
|
|
cursor.execute(update_query, (
|
|
int(loop_seq),
|
|
float(row.get("rc_cm_material_cost", 0)) if not pd.isna(row.get("rc_cm_material_cost", 0)) else 0.0,
|
|
float(row.get("rc_cm_labor_cost", 0)) if not pd.isna(row.get("rc_cm_labor_cost", 0)) else 0.0,
|
|
float(row.get("rc_pm_material_cost", 0)) if not pd.isna(row.get("rc_pm_material_cost", 0)) else 0.0,
|
|
float(row.get("rc_pm_labor_cost", 0)) if not pd.isna(row.get("rc_pm_labor_cost", 0)) else 0.0,
|
|
float(row.get("rc_oh_material_cost", 0)) if not pd.isna(row.get("rc_oh_material_cost", 0)) else 0.0,
|
|
float(row.get("rc_oh_labor_cost", 0)) if not pd.isna(row.get("rc_oh_labor_cost", 0)) else 0.0,
|
|
float(row.get("rc_predictive_material_cost", 0)) if not pd.isna(row.get("rc_predictive_material_cost", 0)) else 0.0,
|
|
float(row.get("rc_predictive_labor_cost", 0)) if not pd.isna(row.get("rc_predictive_labor_cost", 0)) else 0.0,
|
|
record_id
|
|
))
|
|
else:
|
|
# Prepare for insert
|
|
records_to_insert.append(
|
|
(
|
|
str(uuid4()), # id
|
|
int(loop_seq), # seq
|
|
int(row["year"]),
|
|
equipment_id,
|
|
float(row.get("rc_cm_material_cost", 0)) if not pd.isna(row.get("rc_cm_material_cost", 0)) else 0.0,
|
|
float(row.get("rc_cm_labor_cost", 0)) if not pd.isna(row.get("rc_cm_labor_cost", 0)) else 0.0,
|
|
float(row.get("rc_pm_material_cost", 0)) if not pd.isna(row.get("rc_pm_material_cost", 0)) else 0.0,
|
|
float(row.get("rc_pm_labor_cost", 0)) if not pd.isna(row.get("rc_pm_labor_cost", 0)) else 0.0,
|
|
float(row.get("rc_oh_material_cost", 0)) if not pd.isna(row.get("rc_oh_material_cost", 0)) else 0.0,
|
|
float(row.get("rc_oh_labor_cost", 0)) if not pd.isna(row.get("rc_oh_labor_cost", 0)) else 0.0,
|
|
float(row.get("rc_predictive_material_cost", 0)) if not pd.isna(row.get("rc_predictive_material_cost", 0)) else 0.0,
|
|
float(row.get("rc_predictive_labor_cost", 0)) if not pd.isna(row.get("rc_predictive_labor_cost", 0)) else 0.0,
|
|
)
|
|
)
|
|
|
|
# Eksekusi batch insert jika ada data baru
|
|
if records_to_insert:
|
|
cursor.executemany(insert_query, records_to_insert)
|
|
|
|
connection.commit()
|
|
|
|
# Recalculate total costs and update asset criticality
|
|
self.__update_data_lcc(equipment_id)
|
|
|
|
except Exception as e:
|
|
print(f"Error saat menyimpan data ke database: {e}")
|
|
|
|
finally:
|
|
if connection:
|
|
connection.close()
|
|
|
|
def __get_asset_criticality_params(self, equipment_id):
|
|
try:
|
|
connection, connection_wo_db = get_connection()
|
|
efdh_foh_sum = None
|
|
if connection is None:
|
|
print("Database connection failed.")
|
|
return None
|
|
|
|
cursor = connection.cursor(cursor_factory=DictCursor)
|
|
|
|
# Query untuk mendapatkan asset criticality
|
|
query = """
|
|
SELECT row_to_json(t) AS asset_criticality
|
|
FROM (
|
|
SELECT
|
|
asset_crit_ens_energy_not_served,
|
|
asset_crit_bpp_system,
|
|
asset_crit_bpp_pembangkit,
|
|
asset_crit_dmn_daya_mampu_netto,
|
|
asset_crit_marginal_cost,
|
|
asset_crit_efdh_equivalent_forced_derated_hours,
|
|
asset_crit_foh_forced_outage_hours,
|
|
asset_crit_extra_fuel_cost
|
|
FROM lcc_ms_year_data
|
|
) t
|
|
"""
|
|
cursor.execute(query, (equipment_id,))
|
|
result = cursor.fetchone()
|
|
asset_crit = result.get("asset_criticality") if result else None
|
|
if not asset_crit:
|
|
return None
|
|
|
|
# asset_crit may already be a dict (from row_to_json) or a JSON string
|
|
try:
|
|
ac = asset_crit if isinstance(asset_crit, dict) else json.loads(asset_crit)
|
|
except Exception:
|
|
ac = {}
|
|
|
|
def _f(key):
|
|
try:
|
|
return float(ac.get(key) or 0.0)
|
|
except Exception:
|
|
return 0.0
|
|
|
|
ens = _f("asset_crit_ens_energy_not_served") # ENS
|
|
bpp_syst = _f("asset_crit_bpp_system") # BPP_SYST
|
|
dmn = _f("asset_crit_dmn_daya_mampu_netto") # DMN
|
|
extra_fuel = _f("asset_crit_extra_fuel_cost") # Extra Fuel Cost
|
|
|
|
# Formula from image:
|
|
# Asset Criticality = (ENS/1 hour * (7% * BPP_SYST)) + ((DMN - ENS/1 hour) * Extra Fuel Cost)
|
|
# ENS/1 hour is ENS (division by 1)
|
|
part1 = ens * (0.07 * bpp_syst)
|
|
part2 = max(0.0, (dmn - ens)) * extra_fuel
|
|
asset_criticality = part1 + part2
|
|
|
|
# efdh = _f("asset_crit_efdh_equivalent_forced_derated_hours") # EFDH per Year
|
|
# foh = _f("asset_crit_foh_forced_outage_hours") # FOH per Year
|
|
|
|
query_each_equipment = """
|
|
SELECT
|
|
efdh_equivalent_forced_derated_hours,
|
|
foh_forced_outage_hours
|
|
FROM lcc_ms_equipment_data
|
|
WHERE assetnum = %s
|
|
"""
|
|
cursor.execute(query_each_equipment, (equipment_id,))
|
|
result_eq = cursor.fetchone()
|
|
|
|
if result_eq:
|
|
eq_efdh = float(result_eq.get("efdh_equivalent_force_derated_hours") or 0.0) # EFDH per Equipment
|
|
eq_foh = float(result_eq.get("foh_force_outage_hours") or 0.0) # FOH per Equipment
|
|
|
|
efdh_foh_equipment = eq_efdh + eq_foh
|
|
|
|
# if efdh_foh_equipment == 0:
|
|
# efdh_foh_sum = efdh + foh
|
|
# else:
|
|
efdh_foh_sum = efdh_foh_equipment
|
|
|
|
return {
|
|
"asset_criticality": asset_criticality,
|
|
"efdh_foh_sum": efdh_foh_sum,
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error saat mendapatkan asset criticality dari database: {e}")
|
|
return None
|
|
|
|
finally:
|
|
if connection:
|
|
connection.close()
|
|
|
|
|
|
# Fungsi untuk menghapus data proyeksi pada tahun tertentu
|
|
def __update_data_lcc(self, equipment_id):
|
|
try:
|
|
connection, connection_wo_db = get_connection()
|
|
production_connection = get_production_connection()
|
|
if connection is None or production_connection is None:
|
|
print("Database connection failed.")
|
|
return None
|
|
|
|
cursor = connection.cursor(cursor_factory=DictCursor)
|
|
|
|
# --- OPTIMIZATION START: Fetch static data ONCE ---
|
|
|
|
# Fetch man_hour_rate (constant for all years currently based on code logic)
|
|
man_hour_rate = 0.0
|
|
try:
|
|
prod_cur = production_connection.cursor()
|
|
prod_cur.execute("""SELECT value_num FROM lcc_ms_master WHERE name='manhours_rate'""")
|
|
r2 = prod_cur.fetchone()
|
|
if r2 and r2[0] is not None:
|
|
man_hour_rate = float(r2[0])
|
|
prod_cur.close()
|
|
except Exception:
|
|
pass
|
|
|
|
# Fetch Asset Criticality (constant for the asset)
|
|
asset_criticality_data = self.__get_asset_criticality_params(equipment_id)
|
|
ac = asset_criticality_data if isinstance(asset_criticality_data, dict) else {}
|
|
asset_criticality_value = float(ac.get("asset_criticality", 0.0))
|
|
|
|
# --- OPTIMIZATION END ---
|
|
|
|
# Ambil semua baris untuk assetnum
|
|
select_q = '''
|
|
SELECT id, seq, tahun,
|
|
raw_cm_interval, raw_cm_material_cost, rc_cm_labor_cost, rc_cm_material_cost,
|
|
raw_pm_interval, raw_pm_material_cost, rc_pm_labor_cost, rc_pm_material_cost,
|
|
raw_predictive_interval, raw_predictive_material_cost, rc_predictive_labor_cost, rc_predictive_material_cost,
|
|
raw_oh_interval, raw_oh_material_cost, rc_oh_labor_cost, rc_oh_material_cost,
|
|
efdh_equivalent_forced_derated_hours, foh_forced_outage_hours
|
|
FROM lcc_equipment_tr_data
|
|
WHERE assetnum = %s;
|
|
'''
|
|
cursor.execute(select_q, (equipment_id,))
|
|
rows = cursor.fetchall()
|
|
|
|
update_q = '''
|
|
UPDATE lcc_equipment_tr_data
|
|
SET rc_cm_material_cost = %s,
|
|
rc_cm_labor_cost = %s,
|
|
rc_pm_material_cost = %s,
|
|
rc_pm_labor_cost = %s,
|
|
rc_predictive_material_cost = %s,
|
|
rc_predictive_labor_cost = %s,
|
|
rc_oh_material_cost = %s,
|
|
rc_oh_labor_cost = %s,
|
|
rc_total_cost = %s,
|
|
updated_by = 'Sys', updated_at = NOW()
|
|
WHERE id = %s;
|
|
'''
|
|
|
|
batch_params = []
|
|
|
|
for r in rows:
|
|
try:
|
|
# yr = r.get("tahun") if isinstance(r, dict) else r[2]
|
|
# man_hour = man_hour_rate # Used pre-fetched value
|
|
|
|
# seq = int(r.get("seq") or 0) if isinstance(r, dict) else int(r[1] or 0)
|
|
|
|
raw_pm_material_cost = float(r.get("raw_pm_material_cost") or 0.0)
|
|
raw_predictive_material_cost = float(r.get("raw_predictive_material_cost") or 0.0)
|
|
raw_oh_material_cost = float(r.get("raw_oh_material_cost") or 0.0)
|
|
|
|
efdh_equivalent_forced_derated_hours = float(r.get("efdh_equivalent_forced_derated_hours") or 0.0)
|
|
foh_forced_outage_hours = float(r.get("foh_forced_outage_hours") or 0.0)
|
|
|
|
rc_cm_material_cost = float(r.get("rc_cm_material_cost") or 0.0)
|
|
|
|
# compute per-column costs using helpers
|
|
rc_cm_material = rc_cm_material_cost
|
|
rc_cm_labor = float(r.get("rc_cm_labor_cost") or 0.0)
|
|
rc_pm_labor = float(r.get("rc_pm_labor_cost") or 0.0)
|
|
rc_predictive_labor = float(r.get("rc_predictive_labor_cost") or 0.0)
|
|
rc_oh_labor = float(r.get("rc_oh_labor_cost") or 0.0)
|
|
|
|
try:
|
|
rc_pm_material = raw_pm_material_cost if raw_pm_material_cost > 0 else float(r.get("rc_pm_material_cost") or 0.0)
|
|
except Exception:
|
|
rc_pm_material = 0.0
|
|
|
|
|
|
try:
|
|
rc_predictive_material = raw_predictive_material_cost if raw_predictive_material_cost > 0 else float(r.get("rc_predictive_material_cost") or 0.0)
|
|
except Exception:
|
|
rc_predictive_material = 0.0
|
|
|
|
rc_oh_material = raw_oh_material_cost if raw_oh_material_cost > 0 else float(r.get("rc_oh_material_cost") or 0.0)
|
|
|
|
|
|
efdh_foh_sum = efdh_equivalent_forced_derated_hours + foh_forced_outage_hours if efdh_equivalent_forced_derated_hours and foh_forced_outage_hours else 0.0
|
|
|
|
# single multiplier used for all RC groups
|
|
ac_multiplier = efdh_foh_sum * asset_criticality_value
|
|
|
|
total = rc_total_cost(
|
|
rc_cm=rc_cm_material + rc_cm_labor + ac_multiplier,
|
|
rc_pm=rc_pm_material + rc_pm_labor + ac_multiplier,
|
|
rc_predictive=rc_predictive_material + rc_predictive_labor + ac_multiplier,
|
|
rc_oh=rc_oh_material + rc_oh_labor + ac_multiplier,
|
|
)
|
|
|
|
id_val = r.get("id") if isinstance(r, dict) else r[0]
|
|
|
|
batch_params.append((
|
|
rc_cm_material,
|
|
rc_cm_labor,
|
|
rc_pm_material,
|
|
rc_pm_labor,
|
|
rc_predictive_material,
|
|
rc_predictive_labor,
|
|
rc_oh_material,
|
|
rc_oh_labor,
|
|
total,
|
|
id_val
|
|
))
|
|
|
|
except Exception:
|
|
# ignore row-specific errors and continue
|
|
continue
|
|
|
|
# Execute Batch Update
|
|
if batch_params:
|
|
cursor.executemany(update_q, batch_params)
|
|
|
|
# For seq=0 rows, set rc_total_cost to acquisition_cost
|
|
cursor.execute(
|
|
"update lcc_equipment_tr_data set rc_total_cost = (select acquisition_cost from lcc_ms_equipment_data where assetnum=lcc_equipment_tr_data.assetnum) where assetnum = %s and seq=0;",
|
|
(equipment_id,)
|
|
)
|
|
|
|
connection.commit()
|
|
|
|
except Exception as e:
|
|
print(f"Error saat update data proyeksi dari database: {e}")
|
|
|
|
finally:
|
|
if connection:
|
|
connection.close()
|
|
if 'production_connection' in locals() and production_connection:
|
|
try:
|
|
production_connection.close()
|
|
except Exception:
|
|
pass
|
|
|
|
# Fungsi untuk mengambil parameter dari database
|
|
def __get_rate_and_max_year(self, equipment_id):
|
|
try:
|
|
connection, connection_wo_db = get_connection()
|
|
if connection is None:
|
|
print("Database connection failed.")
|
|
return None
|
|
|
|
cursor = connection.cursor(cursor_factory=DictCursor)
|
|
|
|
# Query untuk mendapatkan rate dan max_year
|
|
query = """
|
|
SELECT
|
|
(SELECT value_num / 100 FROM lcc_ms_master where name='inflation_rate') AS rate,
|
|
(SELECT MAX(tahun) FROM lcc_equipment_tr_data WHERE is_actual = 1 AND assetnum = %s) AS max_year
|
|
"""
|
|
cursor.execute(query, (equipment_id,))
|
|
result = cursor.fetchone()
|
|
|
|
# Debug hasil query
|
|
# print(f"Result: {result}")
|
|
|
|
rate = result["rate"]
|
|
max_year = result["max_year"]
|
|
|
|
# Validasi nilai rate dan max_year
|
|
if rate is None:
|
|
raise Exception(
|
|
"Nilai 'rate' tidak boleh kosong. Periksa tabel 'lcc_ms_master'."
|
|
)
|
|
if max_year is None:
|
|
raise Exception(
|
|
"Nilai 'max_year' tidak boleh kosong. Periksa tabel 'lcc_equipment_tr_data'."
|
|
)
|
|
|
|
return rate, max_year
|
|
|
|
except Exception as e:
|
|
print(f"Error saat mendapatkan parameter dari database: {e}")
|
|
raise # Lempar kembali exception agar program berhenti
|
|
|
|
finally:
|
|
if connection:
|
|
connection.close()
|
|
|
|
|
|
|
|
|
|
# Authentication: sign-in and refresh helpers
|
|
async def sign_in(self, username: str = "lcca_admin", password: str = "password") -> dict:
|
|
"""Sign in to AUTH_APP_URL/sign-in using provided username/password.
|
|
|
|
Stores access_token and refresh_token on the instance when successful and returns the parsed response dict.
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(
|
|
f"{self.AUTH_APP_URL}/sign-in",
|
|
json={"username": username, "password": password},
|
|
timeout=30.0,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if isinstance(data, dict) and "data" in data:
|
|
d = data.get("data") or {}
|
|
# set tokens if present
|
|
self.access_token = d.get("access_token")
|
|
self.refresh_token = d.get("refresh_token")
|
|
return data
|
|
except httpx.HTTPError as e:
|
|
print(f"Sign-in failed: {e}")
|
|
# Try to sign out if sign-in failed
|
|
try:
|
|
signout_url = f"{self.AUTH_APP_URL}/sign-out"
|
|
async with httpx.AsyncClient() as client:
|
|
await client.get(signout_url, timeout=10.0)
|
|
print("Signed out due to sign-in failure.")
|
|
except Exception as signout_exc:
|
|
print(f"Sign-out failed: {signout_exc}")
|
|
# Try to sign in again
|
|
try:
|
|
signin_res = await self.sign_in()
|
|
if getattr(self, "access_token", None):
|
|
return signin_res
|
|
except Exception as signin_exc:
|
|
print(f"Sign-in failed after sign-out: {signin_exc}")
|
|
return None
|
|
|
|
async def refresh_access_token(self) -> str:
|
|
"""Refresh the access token using the stored refresh_token via AUTH_APP_URL/refresh-token.
|
|
|
|
On success updates self.access_token and returns it. Returns None on failure.
|
|
"""
|
|
if not getattr(self, "refresh_token", None):
|
|
print("No refresh token available to refresh access token.")
|
|
return None
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.get(
|
|
f"{self.AUTH_APP_URL}/refresh-token",
|
|
headers={"Authorization": f"Bearer {self.refresh_token}"},
|
|
timeout=30.0,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if isinstance(data, dict) and "data" in data:
|
|
new_access = data.get("data", {}).get("access_token")
|
|
if new_access:
|
|
self.access_token = new_access
|
|
print("Access token refreshed.")
|
|
return new_access
|
|
print("Refresh response did not contain a new access token.")
|
|
return None
|
|
except httpx.HTTPError as e:
|
|
print(f"Error refreshing token: {e}")
|
|
return None
|
|
|
|
# ======================================================================================================================================================
|
|
|
|
async def _fetch_api_data(self, assetnum: str, year: int) -> dict:
|
|
url = self.RELIABILITY_APP_URL
|
|
endpoint = f"{url}/main/number-of-failures/{assetnum}/{int(year)}/{int(year)}"
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
current_token = getattr(self, "access_token", None)
|
|
response = await client.get(
|
|
endpoint,
|
|
timeout=30.0,
|
|
headers={"Authorization": f"Bearer {current_token}"} if current_token else {},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except httpx.HTTPStatusError as e:
|
|
status = getattr(e.response, "status_code", None)
|
|
# If we get a 401 or 403, try to refresh the access token and retry once
|
|
if status in (401, 403):
|
|
print(f"Received {status} from reliability API, attempting to refresh/re-login...")
|
|
# Try refreshing token first
|
|
new_access = await self.refresh_access_token()
|
|
# If refresh failed (e.g. refresh token expired), try full sign-in
|
|
if not new_access:
|
|
print("Refresh failed, attempting full sign-in...")
|
|
await self.sign_in()
|
|
new_access = getattr(self, "access_token", None)
|
|
|
|
if new_access:
|
|
try:
|
|
response = await client.get(
|
|
endpoint,
|
|
timeout=30.0,
|
|
headers={"Authorization": f"Bearer {new_access}"},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except httpx.HTTPError as e2:
|
|
print(f"HTTP error occurred after retry: {e2}")
|
|
return {}
|
|
print(f"HTTP error occurred: {e}")
|
|
return {}
|
|
except httpx.HTTPError as e:
|
|
print(f"HTTP error occurred: {e}")
|
|
return {}
|
|
|
|
def __get_man_hour_rate(self, staff_level: str = "Junior"):
|
|
connection = None
|
|
try:
|
|
connection, connection_wo_db = get_connection()
|
|
if connection is None:
|
|
return 0.0
|
|
|
|
cursor = connection.cursor()
|
|
# Takes from salary_per_hour_idr on specific staff_job_level
|
|
query = "SELECT salary_per_hour_idr FROM lcc_manpower_cost WHERE staff_job_level = %s LIMIT 1"
|
|
cursor.execute(query, (staff_level,))
|
|
result = cursor.fetchone()
|
|
if result:
|
|
return float(result[0])
|
|
return 0.0
|
|
except Exception as e:
|
|
print(f"Error getting man hour rate for {staff_level}: {e}")
|
|
return 0.0
|
|
finally:
|
|
if connection:
|
|
connection.close()
|
|
|
|
async def predict_equipment_data(self, assetnum, token):
|
|
try:
|
|
# Mengambil data dari database
|
|
df = self.__fetch_data_from_db(assetnum)
|
|
if df is None:
|
|
print("Data tidak tersedia untuk prediksi.")
|
|
return
|
|
# Mendapatkan tahun proyeksi dari DB
|
|
par_tahun_target = self.__get_param(assetnum)
|
|
|
|
if par_tahun_target is None:
|
|
raise ValueError(f"Asset {assetnum} not found in master data (lcc_ms_equipment_data).")
|
|
|
|
# Tahun proyeksi
|
|
current_max_year = int(df["year"].max()) if not df.empty and not pd.isna(df["year"].max()) else 2024
|
|
future_years = list(range(current_max_year + 1, int(par_tahun_target) + 1))
|
|
print("future_years", future_years)
|
|
# Hasil prediksi
|
|
predictions = {"year": future_years}
|
|
|
|
# Fungsi untuk prediksi menggunakan Linear Regression
|
|
def linear_regression_predict(column, years):
|
|
x = df["year"].values.reshape(-1, 1)
|
|
y = df[column].fillna(0).values
|
|
model = LinearRegression()
|
|
model.fit(x, y)
|
|
future_x = np.array(years).reshape(-1, 1)
|
|
preds = model.predict(future_x)
|
|
return np.abs(preds)
|
|
|
|
# Fungsi untuk prediksi menggunakan Exponential Smoothing
|
|
def exponential_smoothing_predict(column, years):
|
|
data_series = df[column].fillna(0).values
|
|
|
|
# Add a small epsilon to avoid zeros in the data if needed
|
|
if np.any(data_series == 0):
|
|
data_series = data_series + 1e-10
|
|
|
|
model = ExponentialSmoothing(
|
|
data_series, trend="add", seasonal=None, seasonal_periods=None
|
|
)
|
|
model_fit = model.fit(optimized=True, use_brute=False)
|
|
preds = model_fit.forecast(len(years))
|
|
return np.abs(preds)
|
|
|
|
# Fungsi untuk prediksi menggunakan Decision Tree
|
|
def decision_tree_predict(column, years):
|
|
x = df["year"].values.reshape(-1, 1)
|
|
y = df[column].fillna(0).values
|
|
model = DecisionTreeRegressor()
|
|
model.fit(x, y)
|
|
future_x = np.array(years).reshape(-1, 1)
|
|
preds = model.predict(future_x)
|
|
return np.abs(preds)
|
|
|
|
# Mendapatkan rate dan tahun maksimal
|
|
rate, max_year = self.__get_rate_and_max_year(assetnum)
|
|
man_hour_rate = self.__get_man_hour_rate() # Defaults to 'junior'
|
|
|
|
pmt = 0
|
|
|
|
# Prediksi untuk setiap kolom
|
|
for column in df.columns:
|
|
if column == "year":
|
|
continue
|
|
|
|
n_future = len(future_years)
|
|
col_lower = column.lower()
|
|
try:
|
|
# Case untuk kolom yang terkait dengan corrective maintenance (cm)
|
|
if "cm" in col_lower:
|
|
recent_df = df
|
|
recent_n = df.shape[0]
|
|
|
|
recent_n = max(1, recent_n)
|
|
recent_vals = (
|
|
recent_df
|
|
.sort_values("year", ascending=True)
|
|
.head(recent_n)[column]
|
|
.dropna()
|
|
)
|
|
|
|
# Fallback ke semua nilai non-na jika tidak ada recent_vals
|
|
if recent_vals.empty:
|
|
recent_vals = df[column].dropna()
|
|
|
|
# Jika masih kosong, pakai default (interval minimal 1, lainnya 0)
|
|
if "labor" in col_lower:
|
|
preds_list = []
|
|
for yr in future_years:
|
|
failures_data = await self._fetch_api_data(assetnum, yr)
|
|
# Interval from number of failures
|
|
interval = 0.0
|
|
if isinstance(failures_data, dict):
|
|
data_list = failures_data.get("data")
|
|
# data is a list of objects, extract num_fail from first item
|
|
if isinstance(data_list, list) and len(data_list) > 0:
|
|
first_item = data_list[0]
|
|
if isinstance(first_item, dict):
|
|
num_fail = first_item.get("num_fail")
|
|
if num_fail is not None:
|
|
try:
|
|
interval = float(num_fail)
|
|
except Exception:
|
|
interval = 0.0
|
|
|
|
# interval * labor_time(3) * labor_human(1) * man_hour_rate
|
|
cost = rc_labor_cost(interval, 3.0, 1.0, man_hour_rate)
|
|
preds_list.append(cost)
|
|
preds = np.array(preds_list, dtype=float)
|
|
|
|
elif recent_vals.empty:
|
|
avg = 0.0
|
|
preds = np.repeat(float(avg), n_future)
|
|
else:
|
|
avg = pd.to_numeric(recent_vals, errors="coerce").fillna(0).mean()
|
|
avg = 0.0 if pd.isna(avg) else float(avg)
|
|
preds = np.repeat(float(avg), n_future)
|
|
|
|
else:
|
|
# Для kolom non-cm, gunakan nilai dari last actual year bila ada,
|
|
# jika tidak ada gunakan last available non-NA value, jika tidak ada pakai 0.0
|
|
if "is_actual" in df.columns and not df[df["is_actual"] == 1].empty:
|
|
last_actual_year_series = df[df["is_actual"] == 1]["year"]
|
|
last_actual_year = (
|
|
int(last_actual_year_series.max())
|
|
if not last_actual_year_series.isna().all()
|
|
else int(df["year"].max())
|
|
)
|
|
else:
|
|
last_actual_year = int(df["year"].max())
|
|
|
|
row_vals = df[df["year"] == last_actual_year]
|
|
|
|
value = None
|
|
|
|
if not row_vals.empty:
|
|
val = row_vals[column].iloc[-1]
|
|
if not pd.isna(val):
|
|
try:
|
|
value = float(val)
|
|
except Exception:
|
|
# jika bukan numeric, set 0.0
|
|
value = 0.0
|
|
|
|
if value is None:
|
|
non_na = df[column].dropna()
|
|
if not non_na.empty:
|
|
try:
|
|
value = float(non_na.iloc[-1])
|
|
except Exception:
|
|
value = 0.0
|
|
else:
|
|
value = 0.0
|
|
|
|
preds = np.repeat(float(value), n_future)
|
|
|
|
except Exception:
|
|
# Jika terjadi error unexpected, fallback ke nol
|
|
preds = np.repeat(0.0, n_future)
|
|
|
|
# Pastikan semua prediksi bernilai non-negatif float dan berbentuk list sesuai panjang future_years
|
|
preds = np.abs(np.array(preds, dtype=float))
|
|
predictions[column] = preds.tolist()
|
|
# if "cost" in column.lower():
|
|
# # Prediksi Future Value
|
|
# nper = max_year - df["year"].max()
|
|
# pv = -df[column].iloc[-1]
|
|
# predictions[column] = self.__future_value_predict(
|
|
# rate, nper, pmt, pv, future_years
|
|
# )
|
|
# elif df[column].nunique() < 5:
|
|
# predictions[column] = exponential_smoothing_predict(
|
|
# column, future_years
|
|
# )
|
|
# elif df[column].isnull().sum() > 0:
|
|
# predictions[column] = decision_tree_predict(
|
|
# column, future_years
|
|
# )
|
|
# else:
|
|
# predictions[column] = linear_regression_predict(
|
|
# column, future_years
|
|
# )
|
|
|
|
# for column in df.columns:
|
|
# if column != "year":
|
|
# if "cost" in column.lower():
|
|
# # Prediksi Future Value
|
|
# # ensure nper is an integer and non-negative
|
|
# try:
|
|
# nper = int(max_year - df["year"].max())
|
|
# except Exception:
|
|
# nper = 0
|
|
# if nper < 0:
|
|
# nper = 0
|
|
|
|
# # safe conversion of last observed value to numeric present value (pv)
|
|
# try:
|
|
# last_val = df[column].iloc[-1]
|
|
# pv = -float(last_val) if not pd.isna(last_val) else 0.0
|
|
# except Exception:
|
|
# pv = 0.0
|
|
|
|
# # compute future values and ensure preds is a numpy float array
|
|
# fv_list = self.__future_value_predict(
|
|
# rate, nper, pmt, pv, future_years
|
|
# )
|
|
# preds = np.array(fv_list, dtype=float)
|
|
# predictions[column] = preds
|
|
# elif df[column].nunique() < 5:
|
|
# preds = exponential_smoothing_predict(column, future_years)
|
|
# elif df[column].isnull().sum() > 0:
|
|
# preds = decision_tree_predict(column, future_years)
|
|
# else:
|
|
# # Produce sideways / fluctuating predictions around recent level (deterministic)
|
|
# series = df[column].dropna().values
|
|
# if len(series) == 0:
|
|
# base = 0.0
|
|
# else:
|
|
# base = float(np.mean(series[-3:])) if len(series) >= 3 else float(series[-1])
|
|
# # amplitude based on historical std, fallback to a small fraction of base
|
|
# hist_std = float(np.std(series)) if len(series) > 1 else max(abs(base) * 0.01, 0.0)
|
|
# amp = max(hist_std, abs(base) * 0.01)
|
|
# t = np.arange(len(future_years))
|
|
# preds = base + amp * np.sin(2 * np.pi * t / max(len(future_years), 1))
|
|
# # avoid negative predictions for inherently non-negative series
|
|
# preds = np.where(preds < 0, 0, preds)
|
|
|
|
# # normalize preds to numpy float array
|
|
# preds = np.array(preds, dtype=float)
|
|
|
|
# # Columns containing "human" should be rounded to one decimal and clamped 0.0-3.0
|
|
# if "human" in column.lower():
|
|
# # humans must be whole numbers (no decimals) and capped between 0 and 3
|
|
# preds = np.nan_to_num(preds, nan=0.0)
|
|
# preds = np.rint(preds) # round to nearest integer
|
|
# preds = np.clip(preds, 0, 3).astype(int)
|
|
|
|
# # Columns containing "labor_time" should be reasonable yearly hours.
|
|
# # If predictions are unrealistically large, scale them down proportionally to a sane max (e.g., 2000 hours/year),
|
|
# # then round to one decimal and ensure non-negative.
|
|
# if "labor_time" in column.lower():
|
|
# max_yearly_hours = 2000.0
|
|
# current_max = np.nanmax(preds) if preds.size > 0 else 0.0
|
|
# if current_max > max_yearly_hours and current_max > 0:
|
|
# scale = max_yearly_hours / current_max
|
|
# preds = preds * scale
|
|
# preds = np.clip(preds, 0.0, max_yearly_hours)
|
|
# preds = np.round(preds, 1)
|
|
|
|
# predictions[column] = preds
|
|
|
|
# Konversi hasil ke DataFrame
|
|
predictions_df = pd.DataFrame(predictions)
|
|
# print(predictions_df)
|
|
# Hapus data prediksi yang ada sebelumnya
|
|
# self.__delete_predictions_from_db(assetnum)
|
|
|
|
# Insert hasil prediksi ke database
|
|
try:
|
|
await self.__insert_predictions_to_db(
|
|
predictions_df, assetnum, token
|
|
)
|
|
except Exception as e:
|
|
print(f"Error saat insert data ke database: {e}")
|
|
# self.__insert_predictions_to_db(predictions_df, p_equipment_id)
|
|
|
|
# Update data untuk total RiskCost per tahun
|
|
self.__update_data_lcc(assetnum)
|
|
|
|
except Exception as e:
|
|
print(f"Program dihentikan: {e}")
|
|
|
|
|
|
RELIABILITY_APP_URL = os.getenv("RELIABILITY_APP_URL", "http://192.168.1.82:8000/reliability")
|
|
async def main(RELIABILITY_APP_URL=RELIABILITY_APP_URL, assetnum=None, token=None):
|
|
connection = None
|
|
try:
|
|
prediksi = Prediksi(RELIABILITY_APP_URL)
|
|
|
|
# If token not provided, sign in to obtain access_token/refresh_token
|
|
if token is None:
|
|
signin_res = await prediksi.sign_in()
|
|
if not getattr(prediksi, "access_token", None):
|
|
print("Failed to obtain access token; aborting.")
|
|
return
|
|
else:
|
|
# Use provided token as access token
|
|
prediksi.access_token = token
|
|
|
|
# If an assetnum was provided, run only for that assetnum
|
|
if assetnum:
|
|
print(f"Predicting single assetnum: {assetnum}")
|
|
try:
|
|
await prediksi.predict_equipment_data(assetnum, prediksi.access_token)
|
|
except Exception as e:
|
|
print(f"Error Predicting {assetnum}: {e}")
|
|
print("Selesai.")
|
|
return
|
|
|
|
# Otherwise fetch all assetnums from DB and loop
|
|
connection, connection_wo_db = get_connection()
|
|
if connection is None:
|
|
print("Database connection failed.")
|
|
return
|
|
|
|
cursor = connection.cursor(cursor_factory=DictCursor)
|
|
query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master"
|
|
cursor.execute(query_main)
|
|
results = cursor.fetchall()
|
|
|
|
# Close connection early as we have the data and don't need it for the async loop
|
|
if connection:
|
|
connection.close()
|
|
connection = None
|
|
|
|
# Concurrency limit to prevent overwhelming DB/API
|
|
MAX_CONCURRENT_TASKS = 10
|
|
sem = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
|
|
|
|
total_assets = len(results)
|
|
print(f"Starting prediction for {total_assets} assets with {MAX_CONCURRENT_TASKS} concurrent tasks.")
|
|
|
|
async def bound_predict(idx, asset):
|
|
async with sem:
|
|
try:
|
|
if not asset or str(asset).strip() == "":
|
|
print(f"[{idx}/{total_assets}] Skipping empty assetnum")
|
|
return
|
|
|
|
print(f"[{idx}/{total_assets}] Predicting assetnum: {asset}")
|
|
await prediksi.predict_equipment_data(asset, prediksi.access_token)
|
|
except Exception as e:
|
|
print(f"Error Predicting {asset}: {e}")
|
|
|
|
tasks = []
|
|
for idx, row in enumerate(results, start=1):
|
|
current_asset = row.get("assetnum") if hasattr(row, "get") else row[0]
|
|
tasks.append(bound_predict(idx, current_asset))
|
|
|
|
await asyncio.gather(*tasks)
|
|
|
|
print("Selesai.")
|
|
except Exception as e:
|
|
print(f"Error in main: {e}")
|
|
return
|
|
finally:
|
|
if connection:
|
|
connection.close()
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Run equipment prediction")
|
|
parser.add_argument(
|
|
"--assetnum", "-a",
|
|
type=str,
|
|
default=None,
|
|
help="Specific assetnum to predict. If not provided, predicts all assets."
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
asyncio.run(
|
|
main(assetnum=args.assetnum)
|
|
)
|