import os import asyncio import pandas as pd import numpy as np import numpy_financial as npf # Gunakan numpy-financial untuk fungsi keuangan from statsmodels.tsa.holtwinters import ExponentialSmoothing from sklearn.linear_model import LinearRegression from sklearn.tree import DecisionTreeRegressor import matplotlib.pyplot as plt from starlette.config import Config from uuid import uuid4 from psycopg2.extras import DictCursor import httpx from dotenv import load_dotenv import sys import os from .formula import rc_labor_cost, rc_lost_cost, rc_total_cost from src.modules.config import get_connection, get_production_connection, DEV_USERNAME, DEV_PASSWORD import json load_dotenv() class Prediksi: def __init__(self, RELIABILITY_APP_URL=None): # Allow passing the URL or fallback to environment/default so callers can omit the parameter self.RELIABILITY_APP_URL = RELIABILITY_APP_URL or os.getenv( "RELIABILITY_APP_URL", "http://192.168.1.82:8000/reliability" ) # Base URL for auth endpoints (sign-in, refresh-token) self.AUTH_APP_URL = os.getenv("AUTH_APP_URL", "http://192.168.1.82:8000") # tokens will be stored here after sign-in/refresh self.access_token = None self.refresh_token = None # Fungsi untuk mengambil data dari database def __get_param(self, equipment_id): try: # Mendapatkan koneksi dari config.py connection, connection_wo_db = get_connection() if connection is None: print("Database connection failed.") return None # Membuat cursor menggunakan DictCursor cursor = connection.cursor(cursor_factory=DictCursor) # print(f"Getting params for equipment_id: {equipment_id}") # Query untuk mendapatkan data query = """ SELECT (select COALESCE(forecasting_target_year, 2056) from lcc_ms_equipment_data where assetnum = %s) AS forecasting_target_year """ cursor.execute(query, (equipment_id,)) par1 = cursor.fetchone() return par1["forecasting_target_year"] except Exception as e: print(f"Error saat get params dari database: {e}") return None finally: if connection: connection.close() # Fungsi untuk mengambil data dari database def __fetch_data_from_db(self, equipment_id): try: # Get connection from config.py (using only the first connection) connection, connection_wo_db = get_connection() if connection is None: print("Database connection failed.") return None # Membuat cursor menggunakan DictCursor cursor = connection.cursor(cursor_factory=DictCursor) # print(f"Fetcing data for equipment_id: {equipment_id}") # Query untuk mendapatkan data query = """ SELECT tahun AS year, rc_cm_material_cost, rc_cm_labor_cost, rc_pm_material_cost, rc_pm_labor_cost, rc_oh_material_cost, rc_oh_labor_cost, rc_predictive_material_cost, rc_predictive_labor_cost FROM lcc_equipment_tr_data WHERE assetnum = %s and is_actual=1 and seq != 0 ; """ cursor.execute(query, (equipment_id,)) # Mengambil hasil dan mengonversi ke DataFrame pandas data = cursor.fetchall() columns = [ desc[0] for desc in cursor.description ] # Mengambil nama kolom dari hasil query df = pd.DataFrame(data, columns=columns) return df except Exception as e: print(f"Error saat mengambil data dari database: {e}") return None finally: if connection: connection.close() # Fungsi untuk prediksi menggunakan Future Value (FV) def __future_value_predict(self, rate, nper, pmt, pv, years): # Hitung Future Value untuk tahun-tahun proyeksi fv_values = [] for i in range(len(years)): fv = npf.fv(rate, nper + i, pmt, pv) # Menggunakan numpy_financial.fv fv_values.append(fv) return fv_values # Fungsi untuk menghapus data proyeksi pada tahun tertentu # def __delete_predictions_from_db(self, equipment_id): # try: # connections = get_connection() # connection = ( # connections[0] if isinstance(connections, tuple) else connections # ) # if connection is None: # print("Database connection failed.") # return None # cursor = connection.cursor() # # Query untuk menghapus data berdasarkan tahun proyeksi # delete_query = """ # DELETE FROM lcc_equipment_tr_data # WHERE assetnum = %s AND is_actual = 0; # """ # Asumsikan kolom is_actual digunakan untuk membedakan data proyeksi dan data aktual # # Eksekusi query delete # cursor.execute(delete_query, (equipment_id,)) # connection.commit() # # print(f"Data proyeksi untuk tahun {equipment_id} berhasil dihapus.") # except Exception as e: # print(f"Error saat menghapus data proyeksi dari database: {e}") # finally: # if connection: # connection.close() # Fungsi untuk menyimpan data proyeksi ke database async def __insert_predictions_to_db(self, data, equipment_id, token): try: connection, connection_wo_db = get_connection() if connection is None: print("Database connection failed.") return None cursor = connection.cursor() # Query untuk mendapatkan nilai maksimum seq dari data actual get_max_seq_query = """ SELECT COALESCE(MAX(seq), 0) FROM lcc_equipment_tr_data WHERE assetnum = %s AND is_actual = 1 """ cursor.execute(get_max_seq_query, (equipment_id,)) max_seq = cursor.fetchone()[0] # Query untuk insert data insert_query = """ INSERT INTO lcc_equipment_tr_data ( id, seq, is_actual, tahun, assetnum, rc_cm_material_cost, rc_cm_labor_cost, rc_pm_material_cost, rc_pm_labor_cost, rc_oh_material_cost, rc_oh_labor_cost, rc_predictive_material_cost, rc_predictive_labor_cost, created_by, created_at ) VALUES ( %s, %s, 0, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'Sys', NOW() ) """ # Menyiapkan data untuk batch insert atau update records_to_insert = [] check_existence_query = """ SELECT id FROM lcc_equipment_tr_data WHERE assetnum = %s AND tahun = %s AND is_actual = 0 """ update_query = """ UPDATE lcc_equipment_tr_data SET seq = %s, rc_cm_material_cost = %s, rc_cm_labor_cost = %s, rc_pm_material_cost = %s, rc_pm_labor_cost = %s, rc_oh_material_cost = %s, rc_oh_labor_cost = %s, rc_predictive_material_cost = %s, rc_predictive_labor_cost = %s, updated_by = 'Sys', updated_at = NOW() WHERE id = %s """ for idx, row in data.iterrows(): loop_seq = max_seq + idx + 1 # Check if data exists cursor.execute(check_existence_query, (equipment_id, int(row["year"]))) existing_record = cursor.fetchone() if existing_record: # print("Update existing record") # Update existing record record_id = existing_record[0] cursor.execute(update_query, ( int(loop_seq), float(row.get("rc_cm_material_cost", 0)) if not pd.isna(row.get("rc_cm_material_cost", 0)) else 0.0, float(row.get("rc_cm_labor_cost", 0)) if not pd.isna(row.get("rc_cm_labor_cost", 0)) else 0.0, float(row.get("rc_pm_material_cost", 0)) if not pd.isna(row.get("rc_pm_material_cost", 0)) else 0.0, float(row.get("rc_pm_labor_cost", 0)) if not pd.isna(row.get("rc_pm_labor_cost", 0)) else 0.0, float(row.get("rc_oh_material_cost", 0)) if not pd.isna(row.get("rc_oh_material_cost", 0)) else 0.0, float(row.get("rc_oh_labor_cost", 0)) if not pd.isna(row.get("rc_oh_labor_cost", 0)) else 0.0, float(row.get("rc_predictive_material_cost", 0)) if not pd.isna(row.get("rc_predictive_material_cost", 0)) else 0.0, float(row.get("rc_predictive_labor_cost", 0)) if not pd.isna(row.get("rc_predictive_labor_cost", 0)) else 0.0, record_id )) else: # Prepare for insert records_to_insert.append( ( str(uuid4()), # id int(loop_seq), # seq int(row["year"]), equipment_id, float(row.get("rc_cm_material_cost", 0)) if not pd.isna(row.get("rc_cm_material_cost", 0)) else 0.0, float(row.get("rc_cm_labor_cost", 0)) if not pd.isna(row.get("rc_cm_labor_cost", 0)) else 0.0, float(row.get("rc_pm_material_cost", 0)) if not pd.isna(row.get("rc_pm_material_cost", 0)) else 0.0, float(row.get("rc_pm_labor_cost", 0)) if not pd.isna(row.get("rc_pm_labor_cost", 0)) else 0.0, float(row.get("rc_oh_material_cost", 0)) if not pd.isna(row.get("rc_oh_material_cost", 0)) else 0.0, float(row.get("rc_oh_labor_cost", 0)) if not pd.isna(row.get("rc_oh_labor_cost", 0)) else 0.0, float(row.get("rc_predictive_material_cost", 0)) if not pd.isna(row.get("rc_predictive_material_cost", 0)) else 0.0, float(row.get("rc_predictive_labor_cost", 0)) if not pd.isna(row.get("rc_predictive_labor_cost", 0)) else 0.0, ) ) # Eksekusi batch insert jika ada data baru if records_to_insert: cursor.executemany(insert_query, records_to_insert) connection.commit() # Recalculate total costs and update asset criticality self.__update_data_lcc(equipment_id) except Exception as e: print(f"Error saat menyimpan data ke database: {e}") finally: if connection: connection.close() def __get_asset_criticality_params(self, equipment_id): try: connection, connection_wo_db = get_connection() efdh_foh_sum = None if connection is None: print("Database connection failed.") return None cursor = connection.cursor(cursor_factory=DictCursor) # Query untuk mendapatkan asset criticality query = """ SELECT row_to_json(t) AS asset_criticality FROM ( SELECT asset_crit_ens_energy_not_served, asset_crit_bpp_system, asset_crit_bpp_pembangkit, asset_crit_dmn_daya_mampu_netto, asset_crit_marginal_cost, asset_crit_efdh_equivalent_forced_derated_hours, asset_crit_foh_forced_outage_hours, asset_crit_extra_fuel_cost FROM lcc_ms_year_data ) t """ cursor.execute(query, (equipment_id,)) result = cursor.fetchone() asset_crit = result.get("asset_criticality") if result else None if not asset_crit: return None # asset_crit may already be a dict (from row_to_json) or a JSON string try: ac = asset_crit if isinstance(asset_crit, dict) else json.loads(asset_crit) except Exception: ac = {} def _f(key): try: return float(ac.get(key) or 0.0) except Exception: return 0.0 ens = _f("asset_crit_ens_energy_not_served") # ENS bpp_syst = _f("asset_crit_bpp_system") # BPP_SYST dmn = _f("asset_crit_dmn_daya_mampu_netto") # DMN extra_fuel = _f("asset_crit_extra_fuel_cost") # Extra Fuel Cost # Formula from image: # Asset Criticality = (ENS/1 hour * (7% * BPP_SYST)) + ((DMN - ENS/1 hour) * Extra Fuel Cost) # ENS/1 hour is ENS (division by 1) part1 = ens * (0.07 * bpp_syst) part2 = max(0.0, (dmn - ens)) * extra_fuel asset_criticality = part1 + part2 # efdh = _f("asset_crit_efdh_equivalent_forced_derated_hours") # EFDH per Year # foh = _f("asset_crit_foh_forced_outage_hours") # FOH per Year query_each_equipment = """ SELECT efdh_equivalent_forced_derated_hours, foh_forced_outage_hours FROM lcc_ms_equipment_data WHERE assetnum = %s """ cursor.execute(query_each_equipment, (equipment_id,)) result_eq = cursor.fetchone() if result_eq: eq_efdh = float(result_eq.get("efdh_equivalent_force_derated_hours") or 0.0) # EFDH per Equipment eq_foh = float(result_eq.get("foh_force_outage_hours") or 0.0) # FOH per Equipment efdh_foh_equipment = eq_efdh + eq_foh # if efdh_foh_equipment == 0: # efdh_foh_sum = efdh + foh # else: efdh_foh_sum = efdh_foh_equipment return { "asset_criticality": asset_criticality, "efdh_foh_sum": efdh_foh_sum, } except Exception as e: print(f"Error saat mendapatkan asset criticality dari database: {e}") return None finally: if connection: connection.close() # Fungsi untuk menghapus data proyeksi pada tahun tertentu def __update_data_lcc(self, equipment_id): try: connection, connection_wo_db = get_connection() production_connection = get_production_connection() if connection is None or production_connection is None: print("Database connection failed.") return None cursor = connection.cursor(cursor_factory=DictCursor) # --- OPTIMIZATION START: Fetch static data ONCE --- # Fetch man_hour_rate (constant for all years currently based on code logic) man_hour_rate = 0.0 try: prod_cur = production_connection.cursor() prod_cur.execute("""SELECT value_num FROM lcc_ms_master WHERE name='manhours_rate'""") r2 = prod_cur.fetchone() if r2 and r2[0] is not None: man_hour_rate = float(r2[0]) prod_cur.close() except Exception: pass # Fetch Asset Criticality (constant for the asset) asset_criticality_data = self.__get_asset_criticality_params(equipment_id) ac = asset_criticality_data if isinstance(asset_criticality_data, dict) else {} asset_criticality_value = float(ac.get("asset_criticality", 0.0)) # --- OPTIMIZATION END --- # Ambil semua baris untuk assetnum select_q = ''' SELECT id, seq, tahun, raw_cm_interval, raw_cm_material_cost, rc_cm_labor_cost, rc_cm_material_cost, raw_pm_interval, raw_pm_material_cost, rc_pm_labor_cost, rc_pm_material_cost, raw_predictive_interval, raw_predictive_material_cost, rc_predictive_labor_cost, rc_predictive_material_cost, raw_oh_interval, raw_oh_material_cost, rc_oh_labor_cost, rc_oh_material_cost, efdh_equivalent_forced_derated_hours, foh_forced_outage_hours FROM lcc_equipment_tr_data WHERE assetnum = %s; ''' cursor.execute(select_q, (equipment_id,)) rows = cursor.fetchall() update_q = ''' UPDATE lcc_equipment_tr_data SET rc_cm_material_cost = %s, rc_cm_labor_cost = %s, rc_pm_material_cost = %s, rc_pm_labor_cost = %s, rc_predictive_material_cost = %s, rc_predictive_labor_cost = %s, rc_oh_material_cost = %s, rc_oh_labor_cost = %s, rc_total_cost = %s, updated_by = 'Sys', updated_at = NOW() WHERE id = %s; ''' batch_params = [] for r in rows: try: # yr = r.get("tahun") if isinstance(r, dict) else r[2] # man_hour = man_hour_rate # Used pre-fetched value # seq = int(r.get("seq") or 0) if isinstance(r, dict) else int(r[1] or 0) raw_pm_material_cost = float(r.get("raw_pm_material_cost") or 0.0) raw_predictive_material_cost = float(r.get("raw_predictive_material_cost") or 0.0) raw_oh_material_cost = float(r.get("raw_oh_material_cost") or 0.0) efdh_equivalent_forced_derated_hours = float(r.get("efdh_equivalent_forced_derated_hours") or 0.0) foh_forced_outage_hours = float(r.get("foh_forced_outage_hours") or 0.0) rc_cm_material_cost = float(r.get("rc_cm_material_cost") or 0.0) # compute per-column costs using helpers rc_cm_material = rc_cm_material_cost rc_cm_labor = float(r.get("rc_cm_labor_cost") or 0.0) rc_pm_labor = float(r.get("rc_pm_labor_cost") or 0.0) rc_predictive_labor = float(r.get("rc_predictive_labor_cost") or 0.0) rc_oh_labor = float(r.get("rc_oh_labor_cost") or 0.0) try: rc_pm_material = raw_pm_material_cost if raw_pm_material_cost > 0 else float(r.get("rc_pm_material_cost") or 0.0) except Exception: rc_pm_material = 0.0 try: rc_predictive_material = raw_predictive_material_cost if raw_predictive_material_cost > 0 else float(r.get("rc_predictive_material_cost") or 0.0) except Exception: rc_predictive_material = 0.0 rc_oh_material = raw_oh_material_cost if raw_oh_material_cost > 0 else float(r.get("rc_oh_material_cost") or 0.0) efdh_foh_sum = efdh_equivalent_forced_derated_hours + foh_forced_outage_hours if efdh_equivalent_forced_derated_hours and foh_forced_outage_hours else 0.0 # single multiplier used for all RC groups ac_multiplier = efdh_foh_sum * asset_criticality_value total = rc_total_cost( rc_cm=rc_cm_material + rc_cm_labor + ac_multiplier, rc_pm=rc_pm_material + rc_pm_labor + ac_multiplier, rc_predictive=rc_predictive_material + rc_predictive_labor + ac_multiplier, rc_oh=rc_oh_material + rc_oh_labor + ac_multiplier, ) id_val = r.get("id") if isinstance(r, dict) else r[0] batch_params.append(( rc_cm_material, rc_cm_labor, rc_pm_material, rc_pm_labor, rc_predictive_material, rc_predictive_labor, rc_oh_material, rc_oh_labor, total, id_val )) except Exception: # ignore row-specific errors and continue continue # Execute Batch Update if batch_params: cursor.executemany(update_q, batch_params) # For seq=0 rows, set rc_total_cost to acquisition_cost cursor.execute( "update lcc_equipment_tr_data set rc_total_cost = (select acquisition_cost from lcc_ms_equipment_data where assetnum=lcc_equipment_tr_data.assetnum) where assetnum = %s and seq=0;", (equipment_id,) ) connection.commit() except Exception as e: print(f"Error saat update data proyeksi dari database: {e}") finally: if connection: connection.close() if 'production_connection' in locals() and production_connection: try: production_connection.close() except Exception: pass # Fungsi untuk mengambil parameter dari database def __get_rate_and_max_year(self, equipment_id): try: connection, connection_wo_db = get_connection() if connection is None: print("Database connection failed.") return None cursor = connection.cursor(cursor_factory=DictCursor) # Query untuk mendapatkan rate dan max_year query = """ SELECT (SELECT value_num / 100 FROM lcc_ms_master where name='inflation_rate') AS rate, (SELECT MAX(tahun) FROM lcc_equipment_tr_data WHERE is_actual = 1 AND assetnum = %s) AS max_year """ cursor.execute(query, (equipment_id,)) result = cursor.fetchone() # Debug hasil query # print(f"Result: {result}") rate = result["rate"] max_year = result["max_year"] # Validasi nilai rate dan max_year if rate is None: raise Exception( "Nilai 'rate' tidak boleh kosong. Periksa tabel 'lcc_ms_master'." ) if max_year is None: raise Exception( "Nilai 'max_year' tidak boleh kosong. Periksa tabel 'lcc_equipment_tr_data'." ) return rate, max_year except Exception as e: print(f"Error saat mendapatkan parameter dari database: {e}") raise # Lempar kembali exception agar program berhenti finally: if connection: connection.close() # Authentication: sign-in and refresh helpers async def sign_in(self, username: str = DEV_USERNAME, password: str = DEV_PASSWORD) -> dict: """Sign in to AUTH_APP_URL/sign-in using provided username/password. Stores access_token and refresh_token on the instance when successful and returns the parsed response dict. """ try: async with httpx.AsyncClient() as client: resp = await client.post( f"{self.AUTH_APP_URL}/sign-in", json={"username": username, "password": password}, timeout=30.0, ) resp.raise_for_status() data = resp.json() if isinstance(data, dict) and "data" in data: d = data.get("data") or {} # set tokens if present self.access_token = d.get("access_token") self.refresh_token = d.get("refresh_token") return data except httpx.HTTPError as e: print(f"Sign-in failed: {e}") # Try to sign out if sign-in failed try: signout_url = f"{self.AUTH_APP_URL}/sign-out" async with httpx.AsyncClient() as client: await client.get(signout_url, timeout=10.0) print("Signed out due to sign-in failure.") except Exception as signout_exc: print(f"Sign-out failed: {signout_exc}") # Try to sign in again try: signin_res = await self.sign_in() if self.access_token: return signin_res except Exception as signin_exc: print(f"Sign-in failed after sign-out: {signin_exc}") return None async def refresh_access_token(self) -> str: """Refresh the access token using the stored refresh_token via AUTH_APP_URL/refresh-token. On success updates self.access_token and returns it. Returns None on failure. """ if not self.refresh_token: print("No refresh token available to refresh access token.") return None try: async with httpx.AsyncClient() as client: resp = await client.get( f"{self.AUTH_APP_URL}/refresh-token", headers={"Authorization": f"Bearer {self.refresh_token}"}, timeout=30.0, ) resp.raise_for_status() data = resp.json() if isinstance(data, dict) and "data" in data: new_access = data.get("data", {}).get("access_token") if new_access: self.access_token = new_access print("Access token refreshed.") return new_access print("Refresh response did not contain a new access token.") return None except httpx.HTTPError as e: print(f"Error refreshing token: {e}") return None # ====================================================================================================================================================== async def _fetch_api_data(self, assetnum: str, year: int) -> dict: url = self.RELIABILITY_APP_URL endpoint = f"{url}/main/number-of-failures/{assetnum}/{int(year)}/{int(year)}" async with httpx.AsyncClient() as client: try: current_token = self.access_token response = await client.get( endpoint, timeout=30.0, headers={"Authorization": f"Bearer {current_token}"} if current_token else {}, ) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: status = e.response.status_code # If we get a 401 or 403, try to refresh the access token and retry once if status in (401, 403): print(f"Received {status} from reliability API, attempting to refresh/re-login...") # Try refreshing token first new_access = await self.refresh_access_token() # If refresh failed (e.g. refresh token expired), try full sign-in if not new_access: print("Refresh failed, attempting full sign-in...") await self.sign_in() new_access = self.access_token if new_access: try: response = await client.get( endpoint, timeout=30.0, headers={"Authorization": f"Bearer {new_access}"}, ) response.raise_for_status() return response.json() except httpx.HTTPError as e2: print(f"HTTP error occurred after retry: {e2}") return {} print(f"HTTP error occurred: {e}") return {} except httpx.HTTPError as e: print(f"HTTP error occurred: {e}") return {} def __get_man_hour_rate(self, staff_level: str = "Junior"): connection = None try: connection, connection_wo_db = get_connection() if connection is None: return 0.0 cursor = connection.cursor() # Takes from salary_per_hour_idr on specific staff_job_level query = "SELECT salary_per_hour_idr FROM lcc_manpower_cost WHERE staff_job_level = %s LIMIT 1" cursor.execute(query, (staff_level,)) result = cursor.fetchone() if result: return float(result[0]) return 0.0 except Exception as e: print(f"Error getting man hour rate for {staff_level}: {e}") return 0.0 finally: if connection: connection.close() async def predict_equipment_data(self, assetnum, token): try: # Mengambil data dari database df = self.__fetch_data_from_db(assetnum) if df is None: print("Data tidak tersedia untuk prediksi.") return # Mendapatkan tahun proyeksi dari DB par_tahun_target = self.__get_param(assetnum) if par_tahun_target is None: raise ValueError(f"Asset {assetnum} not found in master data (lcc_ms_equipment_data).") # Tahun proyeksi current_max_year = int(df["year"].max()) if not df.empty and not pd.isna(df["year"].max()) else 2024 future_years = list(range(current_max_year + 1, int(par_tahun_target) + 1)) print("future_years", future_years) # Hasil prediksi predictions = {"year": future_years} # Fungsi untuk prediksi menggunakan Linear Regression def linear_regression_predict(column, years): x = df["year"].values.reshape(-1, 1) y = df[column].fillna(0).values model = LinearRegression() model.fit(x, y) future_x = np.array(years).reshape(-1, 1) preds = model.predict(future_x) return np.abs(preds) # Fungsi untuk prediksi menggunakan Exponential Smoothing def exponential_smoothing_predict(column, years): data_series = df[column].fillna(0).values # Add a small epsilon to avoid zeros in the data if needed if np.any(data_series == 0): data_series = data_series + 1e-10 model = ExponentialSmoothing( data_series, trend="add", seasonal=None, seasonal_periods=None ) model_fit = model.fit(optimized=True, use_brute=False) preds = model_fit.forecast(len(years)) return np.abs(preds) # Fungsi untuk prediksi menggunakan Decision Tree def decision_tree_predict(column, years): x = df["year"].values.reshape(-1, 1) y = df[column].fillna(0).values model = DecisionTreeRegressor() model.fit(x, y) future_x = np.array(years).reshape(-1, 1) preds = model.predict(future_x) return np.abs(preds) # Mendapatkan rate dan tahun maksimal rate, max_year = self.__get_rate_and_max_year(assetnum) man_hour_rate = self.__get_man_hour_rate() # Defaults to 'junior' pmt = 0 # Prediksi untuk setiap kolom for column in df.columns: if column == "year": continue n_future = len(future_years) col_lower = column.lower() try: # Case untuk kolom yang terkait dengan corrective maintenance (cm) if "cm" in col_lower: recent_df = df recent_n = df.shape[0] recent_n = max(1, recent_n) recent_vals = ( recent_df .sort_values("year", ascending=True) .head(recent_n)[column] .dropna() ) # Fallback ke semua nilai non-na jika tidak ada recent_vals if recent_vals.empty: recent_vals = df[column].dropna() # Jika masih kosong, pakai default (interval minimal 1, lainnya 0) if "labor" in col_lower: preds_list = [] for yr in future_years: failures_data = await self._fetch_api_data(assetnum, yr) # Interval from number of failures interval = 0.0 if isinstance(failures_data, dict): data_list = failures_data.get("data") # data is a list of objects, extract num_fail from first item if isinstance(data_list, list) and len(data_list) > 0: first_item = data_list[0] if isinstance(first_item, dict): num_fail = first_item.get("num_fail") if num_fail is not None: try: interval = float(num_fail) except Exception: interval = 0.0 # interval * labor_time(3) * labor_human(1) * man_hour_rate cost = rc_labor_cost(interval, 3.0, 1.0, man_hour_rate) preds_list.append(cost) preds = np.array(preds_list, dtype=float) elif recent_vals.empty: avg = 0.0 preds = np.repeat(float(avg), n_future) else: avg = pd.to_numeric(recent_vals, errors="coerce").fillna(0).mean() avg = 0.0 if pd.isna(avg) else float(avg) preds = np.repeat(float(avg), n_future) else: # Для kolom non-cm, gunakan nilai dari last actual year bila ada, # jika tidak ada gunakan last available non-NA value, jika tidak ada pakai 0.0 if "is_actual" in df.columns and not df[df["is_actual"] == 1].empty: last_actual_year_series = df[df["is_actual"] == 1]["year"] last_actual_year = ( int(last_actual_year_series.max()) if not last_actual_year_series.isna().all() else int(df["year"].max()) ) else: last_actual_year = int(df["year"].max()) row_vals = df[df["year"] == last_actual_year] value = None if not row_vals.empty: val = row_vals[column].iloc[-1] if not pd.isna(val): try: value = float(val) except Exception: # jika bukan numeric, set 0.0 value = 0.0 if value is None: non_na = df[column].dropna() if not non_na.empty: try: value = float(non_na.iloc[-1]) except Exception: value = 0.0 else: value = 0.0 preds = np.repeat(float(value), n_future) except Exception: # Jika terjadi error unexpected, fallback ke nol preds = np.repeat(0.0, n_future) # Pastikan semua prediksi bernilai non-negatif float dan berbentuk list sesuai panjang future_years preds = np.abs(np.array(preds, dtype=float)) predictions[column] = preds.tolist() # if "cost" in column.lower(): # # Prediksi Future Value # nper = max_year - df["year"].max() # pv = -df[column].iloc[-1] # predictions[column] = self.__future_value_predict( # rate, nper, pmt, pv, future_years # ) # elif df[column].nunique() < 5: # predictions[column] = exponential_smoothing_predict( # column, future_years # ) # elif df[column].isnull().sum() > 0: # predictions[column] = decision_tree_predict( # column, future_years # ) # else: # predictions[column] = linear_regression_predict( # column, future_years # ) # for column in df.columns: # if column != "year": # if "cost" in column.lower(): # # Prediksi Future Value # # ensure nper is an integer and non-negative # try: # nper = int(max_year - df["year"].max()) # except Exception: # nper = 0 # if nper < 0: # nper = 0 # # safe conversion of last observed value to numeric present value (pv) # try: # last_val = df[column].iloc[-1] # pv = -float(last_val) if not pd.isna(last_val) else 0.0 # except Exception: # pv = 0.0 # # compute future values and ensure preds is a numpy float array # fv_list = self.__future_value_predict( # rate, nper, pmt, pv, future_years # ) # preds = np.array(fv_list, dtype=float) # predictions[column] = preds # elif df[column].nunique() < 5: # preds = exponential_smoothing_predict(column, future_years) # elif df[column].isnull().sum() > 0: # preds = decision_tree_predict(column, future_years) # else: # # Produce sideways / fluctuating predictions around recent level (deterministic) # series = df[column].dropna().values # if len(series) == 0: # base = 0.0 # else: # base = float(np.mean(series[-3:])) if len(series) >= 3 else float(series[-1]) # # amplitude based on historical std, fallback to a small fraction of base # hist_std = float(np.std(series)) if len(series) > 1 else max(abs(base) * 0.01, 0.0) # amp = max(hist_std, abs(base) * 0.01) # t = np.arange(len(future_years)) # preds = base + amp * np.sin(2 * np.pi * t / max(len(future_years), 1)) # # avoid negative predictions for inherently non-negative series # preds = np.where(preds < 0, 0, preds) # # normalize preds to numpy float array # preds = np.array(preds, dtype=float) # # Columns containing "human" should be rounded to one decimal and clamped 0.0-3.0 # if "human" in column.lower(): # # humans must be whole numbers (no decimals) and capped between 0 and 3 # preds = np.nan_to_num(preds, nan=0.0) # preds = np.rint(preds) # round to nearest integer # preds = np.clip(preds, 0, 3).astype(int) # # Columns containing "labor_time" should be reasonable yearly hours. # # If predictions are unrealistically large, scale them down proportionally to a sane max (e.g., 2000 hours/year), # # then round to one decimal and ensure non-negative. # if "labor_time" in column.lower(): # max_yearly_hours = 2000.0 # current_max = np.nanmax(preds) if preds.size > 0 else 0.0 # if current_max > max_yearly_hours and current_max > 0: # scale = max_yearly_hours / current_max # preds = preds * scale # preds = np.clip(preds, 0.0, max_yearly_hours) # preds = np.round(preds, 1) # predictions[column] = preds # Konversi hasil ke DataFrame predictions_df = pd.DataFrame(predictions) # print(predictions_df) # Hapus data prediksi yang ada sebelumnya # self.__delete_predictions_from_db(assetnum) # Insert hasil prediksi ke database try: await self.__insert_predictions_to_db( predictions_df, assetnum, token ) except Exception as e: print(f"Error saat insert data ke database: {e}") # self.__insert_predictions_to_db(predictions_df, p_equipment_id) # Update data untuk total RiskCost per tahun self.__update_data_lcc(assetnum) except Exception as e: print(f"Program dihentikan: {e}") RELIABILITY_APP_URL = os.getenv("RELIABILITY_APP_URL", "http://192.168.1.82:8000/reliability") async def main(RELIABILITY_APP_URL=RELIABILITY_APP_URL, assetnum=None, token=None): connection = None try: prediksi = Prediksi(RELIABILITY_APP_URL) # If token not provided, sign in to obtain access_token/refresh_token if token is None: signin_res = await prediksi.sign_in() if not prediksi.access_token: print("Failed to obtain access token; aborting.") return else: # Use provided token as access token prediksi.access_token = token # If an assetnum was provided, run only for that assetnum if assetnum: print(f"Predicting single assetnum: {assetnum}") try: await prediksi.predict_equipment_data(assetnum, prediksi.access_token) except Exception as e: print(f"Error Predicting {assetnum}: {e}") print("Selesai.") return # Otherwise fetch all assetnums from DB and loop connection, connection_wo_db = get_connection() if connection is None: print("Database connection failed.") return cursor = connection.cursor(cursor_factory=DictCursor) query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master" cursor.execute(query_main) results = cursor.fetchall() # Close connection early as we have the data and don't need it for the async loop if connection: connection.close() connection = None # Concurrency limit to prevent overwhelming DB/API MAX_CONCURRENT_TASKS = 10 sem = asyncio.Semaphore(MAX_CONCURRENT_TASKS) total_assets = len(results) print(f"Starting prediction for {total_assets} assets with {MAX_CONCURRENT_TASKS} concurrent tasks.") async def bound_predict(idx, asset): async with sem: try: if not asset or str(asset).strip() == "": print(f"[{idx}/{total_assets}] Skipping empty assetnum") return print(f"[{idx}/{total_assets}] Predicting assetnum: {asset}") await prediksi.predict_equipment_data(asset, prediksi.access_token) except Exception as e: print(f"Error Predicting {asset}: {e}") tasks = [] for idx, row in enumerate(results, start=1): current_asset = row.get("assetnum") if hasattr(row, "get") else row[0] tasks.append(bound_predict(idx, current_asset)) await asyncio.gather(*tasks) print("Selesai.") except Exception as e: print(f"Error in main: {e}") return finally: if connection: connection.close() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Run equipment prediction") parser.add_argument( "--assetnum", "-a", type=str, default=None, help="Specific assetnum to predict. If not provided, predicts all assets." ) args = parser.parse_args() asyncio.run( main(assetnum=args.assetnum) )