You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

534 lines
24 KiB
Python

import os
import asyncio
import pandas as pd
import numpy as np
import numpy_financial as npf # Gunakan numpy-financial untuk fungsi keuangan
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeRegressor
import matplotlib.pyplot as plt
from starlette.config import Config
from uuid import uuid4
from psycopg2.extras import DictCursor
import httpx
from dotenv import load_dotenv
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from config import get_connection
load_dotenv()
class Prediksi:
def __init__(self, RELIABILITY_APP_URL=None):
# Allow passing the URL or fallback to environment/default so callers can omit the parameter
self.RELIABILITY_APP_URL = RELIABILITY_APP_URL or os.getenv(
"RELIABILITY_APP_URL", "http://192.168.1.82:8000/reliability"
)
# Fungsi untuk mengambil data dari database
def __get_param(self, equipment_id):
try:
# Mendapatkan koneksi dari config.py
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
if connection is None:
print("Database connection failed.")
return None
# Membuat cursor menggunakan DictCursor
cursor = connection.cursor(cursor_factory=DictCursor)
# print(f"Getting params for equipment_id: {equipment_id}")
# Query untuk mendapatkan data
query = """
SELECT
(select COALESCE(forecasting_target_year, 2056) from lcc_ms_equipment_data where assetnum = %s) AS forecasting_target_year
"""
cursor.execute(query, (equipment_id,))
par1 = cursor.fetchone()
return par1["forecasting_target_year"]
except Exception as e:
print(f"Error saat get params dari database: {e}")
return None
finally:
if connection:
connection.close()
# Fungsi untuk mengambil data dari database
def __fetch_data_from_db(self, equipment_id):
try:
# Get connection from config.py (using only the first connection)
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
if connection is None:
print("Database connection failed.")
return None
# Membuat cursor menggunakan DictCursor
cursor = connection.cursor(cursor_factory=DictCursor)
# print(f"Fetcing data for equipment_id: {equipment_id}")
# Query untuk mendapatkan data
query = """
SELECT
tahun AS year,
raw_cm_interval AS cm_interval,
raw_cm_material_cost AS cm_cost,
raw_cm_labor_time AS cm_labor_time,
raw_cm_labor_human AS cm_labor_human,
raw_pm_interval AS pm_interval,
raw_pm_material_cost AS pm_cost,
raw_pm_labor_time AS pm_labor_time,
raw_pm_labor_human AS pm_labor_human,
raw_oh_interval AS oh_interval,
raw_oh_material_cost AS oh_cost,
raw_oh_labor_time AS oh_labor_time,
raw_oh_labor_human AS oh_labor_human,
raw_predictive_material_cost AS predictive_material_cost,
raw_predictive_labor_time AS predictive_labor_time,
raw_predictive_labor_human AS predictive_labor_human,
raw_predictive_interval AS predictive_interval,
"raw_loss_output_MW" AS loss_output_mw,
raw_loss_output_price AS loss_price
FROM lcc_equipment_tr_data
WHERE assetnum = %s
and is_actual=1
;
"""
cursor.execute(query, (equipment_id,))
# Mengambil hasil dan mengonversi ke DataFrame pandas
data = cursor.fetchall()
columns = [
desc[0] for desc in cursor.description
] # Mengambil nama kolom dari hasil query
df = pd.DataFrame(data, columns=columns)
return df
except Exception as e:
print(f"Error saat mengambil data dari database: {e}")
return None
finally:
if connection:
connection.close()
# Fungsi untuk prediksi menggunakan Future Value (FV)
def __future_value_predict(self, rate, nper, pmt, pv, years):
# Hitung Future Value untuk tahun-tahun proyeksi
fv_values = []
for i in range(len(years)):
fv = npf.fv(rate, nper + i, pmt, pv) # Menggunakan numpy_financial.fv
fv_values.append(fv)
return fv_values
# Fungsi untuk menghapus data proyeksi pada tahun tertentu
def __delete_predictions_from_db(self, equipment_id):
try:
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
if connection is None:
print("Database connection failed.")
return None
cursor = connection.cursor()
# Query untuk menghapus data berdasarkan tahun proyeksi
delete_query = """
DELETE FROM lcc_equipment_tr_data
WHERE assetnum = %s AND is_actual = 0;
""" # Asumsikan kolom is_actual digunakan untuk membedakan data proyeksi dan data aktual
# Eksekusi query delete
cursor.execute(delete_query, (equipment_id,))
connection.commit()
# print(f"Data proyeksi untuk tahun {equipment_id} berhasil dihapus.")
except Exception as e:
print(f"Error saat menghapus data proyeksi dari database: {e}")
finally:
if connection:
connection.close()
# Fungsi untuk menyimpan data proyeksi ke database
async def __insert_predictions_to_db(self, data, equipment_id, token):
try:
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
if connection is None:
print("Database connection failed.")
return None
cursor = connection.cursor()
# Query untuk mendapatkan nilai maksimum seq
get_max_seq_query = """
SELECT COALESCE(MAX(seq), 0) FROM lcc_equipment_tr_data WHERE assetnum = %s
"""
cursor.execute(get_max_seq_query, (equipment_id,))
max_seq = cursor.fetchone()[0]
# Query untuk insert data
insert_query = """
INSERT INTO lcc_equipment_tr_data (
id,
seq,
is_actual,
raw_pm_interval,
tahun, assetnum,
raw_cm_interval, raw_cm_material_cost, raw_cm_labor_time, raw_cm_labor_human,
raw_pm_material_cost, raw_pm_labor_time, raw_pm_labor_human,
raw_oh_interval, raw_oh_material_cost, raw_oh_labor_time, raw_oh_labor_human,
raw_predictive_interval, raw_predictive_material_cost, raw_predictive_labor_time, raw_predictive_labor_human,
"raw_loss_output_MW", raw_loss_output_price
, created_by, created_at
) VALUES (
%s, %s, 0, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'Sys', NOW()
)
"""
# Fetch data from external API
async def fetch_api_data(assetnum: str, year: int) -> dict:
url = self.RELIABILITY_APP_URL
# print(f"Using URL: {url}") # Add this for debugging
async with httpx.AsyncClient() as client:
# print(
# f"{url}/main/number-of-failures/{assetnum}/{int(year)}/{int(year)}"
# )
try:
response = await client.get(
f"{url}/main/number-of-failures/{assetnum}/{int(year)}/{int(year)}",
timeout=30.0,
headers={"Authorization": f"Bearer {token}"},
)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
print(f"HTTP error occurred: {e}")
return {}
# Menyiapkan data untuk batch insert
# print(f"Data to be inserted: {data}")
records_to_insert = []
for _, row in data.iterrows():
max_seq = max_seq + 1
# Update values from API
api_data = await fetch_api_data(equipment_id, row["year"])
if api_data and "data" in api_data and isinstance(api_data["data"], list) and len(api_data["data"]) > 0:
# Get current num_fail (ensure numeric)
try:
cm_interval_prediction = float(api_data["data"][0].get("num_fail", row.get("cm_interval", 1)))
except Exception:
cm_interval_prediction = float(row.get("cm_interval", 1)) if not pd.isna(row.get("cm_interval", None)) else 1
else:
# Fallback: ensure numeric scalar, not a tuple
try:
val = float(row.get("cm_interval", 1))
cm_interval_prediction = val if val >= 1 else 1.0
except Exception:
cm_interval_prediction = 1.0
records_to_insert.append(
(
str(uuid4()),
int(max_seq),
float(row["pm_interval"]) if not pd.isna(row.get("pm_interval", None)) else 0.0,
float(row["year"]) if not pd.isna(row.get("year", None)) else 0.0,
equipment_id,
cm_interval_prediction,
float(row["cm_cost"]) if not pd.isna(row.get("cm_cost", None)) else 0.0,
float(row["cm_labor_time"]) if not pd.isna(row.get("cm_labor_time", None)) else 0.0,
float(row["cm_labor_human"]) if not pd.isna(row.get("cm_labor_human", None)) else 0.0,
float(row["pm_cost"]) if not pd.isna(row.get("pm_cost", None)) else 0.0,
float(row["pm_labor_time"]) if not pd.isna(row.get("pm_labor_time", None)) else 0.0,
float(row["pm_labor_human"]) if not pd.isna(row.get("pm_labor_human", None)) else 0.0,
float(row["oh_interval"]) if not pd.isna(row.get("oh_interval", None)) else 0.0,
float(row["oh_cost"]) if not pd.isna(row.get("oh_cost", None)) else 0.0,
float(row["oh_labor_time"]) if not pd.isna(row.get("oh_labor_time", None)) else 0.0,
float(row["oh_labor_human"]) if not pd.isna(row.get("oh_labor_human", None)) else 0.0,
float(row["predictive_interval"]) if not pd.isna(row.get("predictive_interval", None)) else 0.0,
float(row["predictive_material_cost"]) if not pd.isna(row.get("predictive_material_cost", None)) else 0.0,
float(row["predictive_labor_time"]) if not pd.isna(row.get("predictive_labor_time", None)) else 0.0,
float(row["predictive_labor_human"]) if not pd.isna(row.get("predictive_labor_human", None)) else 0.0,
float(row["loss_output_mw"]) if not pd.isna(row.get("loss_output_mw", None)) else 0.0,
float(row["loss_price"]) if not pd.isna(row.get("loss_price", None)) else 0.0,
)
)
# Eksekusi batch insert
cursor.executemany(insert_query, records_to_insert)
connection.commit()
# print("Data proyeksi berhasil dimasukkan ke database.")
except Exception as e:
print(f"Error saat menyimpan data ke database: {e}")
finally:
if connection:
connection.close()
# Fungsi untuk menghapus data proyeksi pada tahun tertentu
def __update_data_lcc(self, equipment_id):
try:
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
if connection is None:
print("Database connection failed.")
return None
cursor = connection.cursor()
# Query untuk menghapus data berdasarkan tahun proyeksi
up_query = """
update lcc_equipment_tr_data
set
rc_cm_material_cost = raw_cm_material_cost
,rc_cm_labor_cost = (raw_cm_interval * raw_cm_labor_time * raw_cm_labor_human * COALESCE((SELECT man_hour FROM lcc_ms_year_data WHERE year = lcc_equipment_tr_data.tahun), coalesce((select value_num from lcc_ms_master where name='manhours_rate'), 0)) )
,rc_pm_material_cost = raw_pm_material_cost
,rc_pm_labor_cost = (raw_pm_labor_time * raw_pm_labor_human * COALESCE((SELECT man_hour FROM lcc_ms_year_data WHERE year = lcc_equipment_tr_data.tahun), coalesce((select value_num from lcc_ms_master where name='manhours_rate'), 0)) )
,rc_predictive_labor_cost = COALESCE( (raw_predictive_labor_time * raw_predictive_labor_human * COALESCE((SELECT man_hour FROM lcc_ms_year_data WHERE year = lcc_equipment_tr_data.tahun), coalesce((select value_num from lcc_ms_master where name='manhours_rate'), 0)) ) , 0)
,rc_oh_material_cost = raw_oh_material_cost
,rc_oh_labor_cost = (raw_oh_labor_time * raw_oh_labor_human * COALESCE((SELECT man_hour FROM lcc_ms_year_data WHERE year = lcc_equipment_tr_data.tahun), coalesce((select value_num from lcc_ms_master where name='manhours_rate'), 0)) )
,rc_project_material_cost = coalesce(raw_project_task_material_cost, 0)
,rc_lost_cost = coalesce(("raw_loss_output_MW" * raw_loss_output_price * raw_cm_interval), 0) * 1000
,rc_operation_cost = coalesce(raw_operational_cost, 0)
,rc_maintenance_cost = coalesce(raw_maintenance_cost, 0)
,rc_total_cost = (
raw_cm_material_cost
+ (raw_cm_interval * raw_cm_labor_time * raw_cm_labor_human * COALESCE((SELECT man_hour FROM lcc_ms_year_data WHERE year = lcc_equipment_tr_data.tahun), coalesce((select value_num from lcc_ms_master where name='manhours_rate'), 0)) )
+ raw_pm_material_cost
+ (raw_pm_labor_time * raw_pm_labor_human * COALESCE((SELECT man_hour FROM lcc_ms_year_data WHERE year = lcc_equipment_tr_data.tahun), coalesce((select value_num from lcc_ms_master where name='manhours_rate'), 0)) )
+ COALESCE( (raw_predictive_labor_time * raw_predictive_labor_human * COALESCE((SELECT man_hour FROM lcc_ms_year_data WHERE year = lcc_equipment_tr_data.tahun), coalesce((select value_num from lcc_ms_master where name='manhours_rate'), 0)) ) , 0)
+ raw_oh_material_cost
+ (raw_oh_labor_time * raw_oh_labor_human * COALESCE((SELECT man_hour FROM lcc_ms_year_data WHERE year = lcc_equipment_tr_data.tahun), coalesce((select value_num from lcc_ms_master where name='manhours_rate'), 0)) )
+ coalesce(raw_project_task_material_cost, 0)
+ coalesce(("raw_loss_output_MW" * raw_loss_output_price * raw_cm_interval), 0) * 1000
+ coalesce(raw_operational_cost, 0)
+ coalesce(raw_maintenance_cost, 0)
)
, updated_by = 'Sys', updated_at = NOW()
where assetnum = %s;
update lcc_equipment_tr_data set rc_total_cost = (select acquisition_cost from lcc_ms_equipment_data where assetnum=lcc_equipment_tr_data.assetnum) where assetnum = %s and seq=0;
""" # Asumsikan kolom is_actual digunakan untuk membedakan data proyeksi dan data aktual
# Eksekusi query delete
cursor.execute(up_query, (equipment_id, equipment_id))
connection.commit()
# print(f"Data berhasil diupdate.")
except Exception as e:
print(f"Error saat update data proyeksi dari database: {e}")
finally:
if connection:
connection.close()
# Fungsi untuk mengambil parameter dari database
def __get_rate_and_max_year(self, equipment_id):
try:
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
if connection is None:
print("Database connection failed.")
return None
cursor = connection.cursor(cursor_factory=DictCursor)
# Query untuk mendapatkan rate dan max_year
query = """
SELECT
(SELECT value_num / 100 FROM lcc_ms_master where name='inflation_rate') AS rate,
(SELECT MAX(tahun) FROM lcc_equipment_tr_data WHERE is_actual = 1 AND assetnum = %s) AS max_year
"""
cursor.execute(query, (equipment_id,))
result = cursor.fetchone()
# Debug hasil query
# print(f"Result: {result}")
rate = result["rate"]
max_year = result["max_year"]
# Validasi nilai rate dan max_year
if rate is None:
raise Exception(
"Nilai 'rate' tidak boleh kosong. Periksa tabel 'lcc_ms_master'."
)
if max_year is None:
raise Exception(
"Nilai 'max_year' tidak boleh kosong. Periksa tabel 'lcc_equipment_tr_data'."
)
return rate, max_year
except Exception as e:
print(f"Error saat mendapatkan parameter dari database: {e}")
raise # Lempar kembali exception agar program berhenti
finally:
if connection:
connection.close()
# ======================================================================================================================================================
async def predict_equipment_data(self, assetnum, token):
try:
# Mengambil data dari database
df = self.__fetch_data_from_db(assetnum)
if df is None:
print("Data tidak tersedia untuk prediksi.")
return
# Mendapatkan tahun proyeksi dari DB
par_tahun_target = self.__get_param(assetnum)
# Tahun proyeksi
future_years = list(range(df["year"].max() + 1, par_tahun_target + 1))
# Hasil prediksi
predictions = {"year": future_years}
# Fungsi untuk prediksi menggunakan Linear Regression
def linear_regression_predict(column, years):
x = df["year"].values.reshape(-1, 1)
y = df[column].fillna(0).values
model = LinearRegression()
model.fit(x, y)
future_x = np.array(years).reshape(-1, 1)
preds = model.predict(future_x)
return np.abs(preds)
# Fungsi untuk prediksi menggunakan Exponential Smoothing
def exponential_smoothing_predict(column, years):
data_series = df[column].fillna(0).values
# Add a small epsilon to avoid zeros in the data if needed
if np.any(data_series == 0):
data_series = data_series + 1e-10
model = ExponentialSmoothing(
data_series, trend="add", seasonal=None, seasonal_periods=None
)
model_fit = model.fit(optimized=True, use_brute=False)
preds = model_fit.forecast(len(years))
return np.abs(preds)
# Fungsi untuk prediksi menggunakan Decision Tree
def decision_tree_predict(column, years):
x = df["year"].values.reshape(-1, 1)
y = df[column].fillna(0).values
model = DecisionTreeRegressor()
model.fit(x, y)
future_x = np.array(years).reshape(-1, 1)
preds = model.predict(future_x)
return np.abs(preds)
# Mendapatkan rate dan tahun maksimal
rate, max_year = self.__get_rate_and_max_year(assetnum)
pmt = 0
# Prediksi untuk setiap kolom
for column in df.columns:
if column != "year":
if "cost" in column.lower():
# Prediksi Future Value
nper = max_year - df["year"].max()
pv = -df[column].iloc[-1]
predictions[column] = self.__future_value_predict(
rate, nper, pmt, pv, future_years
)
elif df[column].nunique() < 5:
predictions[column] = exponential_smoothing_predict(
column, future_years
)
elif df[column].isnull().sum() > 0:
predictions[column] = decision_tree_predict(
column, future_years
)
else:
predictions[column] = linear_regression_predict(
column, future_years
)
# Konversi hasil ke DataFrame
predictions_df = pd.DataFrame(predictions)
# print(predictions_df)
# Hapus data prediksi yang ada sebelumnya
self.__delete_predictions_from_db(assetnum)
# Insert hasil prediksi ke database
try:
await self.__insert_predictions_to_db(
predictions_df, assetnum, token
)
except Exception as e:
print(f"Error saat insert data ke database: {e}")
# self.__insert_predictions_to_db(predictions_df, p_equipment_id)
# Update data untuk total RiskCost per tahun
self.__update_data_lcc(assetnum)
except Exception as e:
print(f"Program dihentikan: {e}")
RELIABILITY_APP_URL = os.getenv("RELIABILITY_APP_URL", "http://192.168.1.82:8000/reliability")
async def main(RELIABILITY_APP_URL=RELIABILITY_APP_URL):
try:
connections = get_connection()
connection = connections[0] if isinstance(connections, tuple) else connections
if connection is None:
print("Database connection failed.")
return
cursor = connection.cursor(cursor_factory=DictCursor)
query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master"
cursor.execute(query_main)
results = cursor.fetchall()
prediksi = Prediksi(RELIABILITY_APP_URL)
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTc2MjQxODk5My4xNzI4NTYsImp0aSI6ImJ1OU0xQVlLSTZENTd2cC1OaDgtUlEiLCJ0eXBlIjoiYWNjZXNzIiwic3ViIjoiMzg1NzJhOTItZjE2Yy00MWIyLThjNmYtYWZhNTcyMzhhNWU3IiwibmJmIjoxNzYyNDE4OTkzLCJjc3JmIjoiNjY5NzVjNDEtNTg0ZS00OGFkLWJjMmItMDNlZDEyZDM2ZDczIiwiZXhwIjoxNzYyNDI2MTkzLCJub25jZSI6ImYzMThkNDVkNmYzZWRjMzNiN2Q0MmE0MGRkNDJkNDRhIn0.elDnyaoeJ48oOIUdMRZjt7gGICmK-2Awg6Rbl_BZ1PQ"
for idx, row in enumerate(results, start=1):
assetnum = row.get("assetnum") if hasattr(row, "get") else row[0]
if not assetnum or str(assetnum).strip() == "":
print(f"[{idx}/{len(results)}] Skipping empty assetnum")
continue
print(f"[{idx}/{len(results)}] Processing assetnum: {assetnum}")
try:
await prediksi.predict_equipment_data(assetnum, token)
except Exception as e:
print(f"Error processing {assetnum}: {e}")
print("Selesai.")
except Exception as e:
print(f"Error getting database connection: {e}")
return
except Exception as e:
print(f"Error getting database connection: {e}")
return
if __name__ == "__main__":
asyncio.run(
main()
)