update script and route get equipment tr data

main
MrWaradana 2 months ago
parent 5410a50d1a
commit 4292212df4

@ -64,8 +64,11 @@ class EquipmentTransactionRecords(Base, DefaultMixin, IdentityMixin):
raw_pm_material_cost = Column(Float, nullable=False)
raw_pm_labor_time = Column(Float, nullable=False)
raw_pm_labor_human = Column(Float, nullable=False)
raw_predictive_interval = Column(Float, nullable=False)
raw_predictive_material_cost = Column(Float, nullable=False)
raw_predictive_labor_time = Column(Float, nullable=False)
raw_predictive_labor_human = Column(Float, nullable=False)
raw_oh_interval = Column(Float, nullable=False)
raw_oh_material_cost = Column(Float, nullable=False)
raw_oh_labor_time = Column(Float, nullable=False)
raw_oh_labor_human = Column(Float, nullable=False)

@ -1,5 +1,7 @@
from typing import List, Optional
from fastapi import APIRouter, HTTPException, status, Query
from fastapi.responses import StreamingResponse
import json
from .model import Equipment, EquipmentTransactionRecords
from .schema import (
@ -22,6 +24,8 @@ from src.equipment.service import (
generate_all_transaction,
get_top_10_economic_life,
)
from src.modules.equipment.Prediksi import main as prediksi_main
from src.modules.equipment.Eac import Eac
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.database.core import DbSession
@ -51,6 +55,52 @@ async def get_equipments(
message="Data retrieved successfully",
)
@router.get("/simulate/{assetnum}")
async def simulate_equipment(db_session: DbSession, assetnum: str):
"""Stream progress events while running the simulation (prediksi + EAC).
This endpoint returns Server-Sent Events (SSE). Each event's `data` is
a JSON object with at least `status` and `step` fields. Frontend can
listen and parse events to show progress to users.
"""
async def event_generator():
# notify start of prediksi
yield f"data: {json.dumps({'status':'started','step':'prediksi','message':'Starting prediksi'})}\n\n"
try:
prediksi = await prediksi_main(assetnum=assetnum)
except Exception as exc:
# send error event and stop
yield f"data: {json.dumps({'status':'error','step':'prediksi','message':str(exc)})}\n\n"
return
# prediksi finished
yield f"data: {json.dumps({'status':'completed','step':'prediksi','message':'Prediksi completed','result_present': bool(prediksi)})}\n\n"
# start eac
yield f"data: {json.dumps({'status':'started','step':'eac','message':'Starting EAC calculation'})}\n\n"
try:
eac = Eac()
hasil_eac = await eac.hitung_eac_equipment(assetnum=assetnum)
except Exception as exc:
yield f"data: {json.dumps({'status':'error','step':'eac','message':str(exc)})}\n\n"
return
# eac finished
yield f"data: {json.dumps({'status':'completed','step':'eac','message':'EAC calculation completed','result_present': bool(hasil_eac)})}\n\n"
# if both are empty, emit a not-found style event
if not prediksi and not hasil_eac:
yield f"data: {json.dumps({'status':'not_found','message':'No data found for this assetnum'})}\n\n"
return
# final result
combined = {'prediksi': prediksi, 'hasil_eac': hasil_eac}
yield f"data: {json.dumps({'status':'done','message':f'Simulation for {assetnum} completed successfully','data':combined})}\n\n"
return StreamingResponse(event_generator(), media_type='text/event-stream')
@router.get(
"/top-10-replacement-priorities",

@ -41,8 +41,11 @@ class MasterBase(DefaultBase):
raw_pm_material_cost: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)
raw_pm_labor_time: Optional[float] = Field(None, nullable=True)
raw_pm_labor_human: Optional[float] = Field(None, nullable=True)
raw_predictive_interval: Optional[float] = Field(None, nullable=True)
raw_predictive_material_cost: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)
raw_predictive_labor_time: Optional[float] = Field(None, nullable=True)
raw_predictive_labor_human: Optional[float] = Field(None, nullable=True)
raw_oh_interval: Optional[float] = Field(None, nullable=True)
raw_oh_material_cost: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)
raw_oh_labor_time: Optional[float] = Field(None, nullable=True)
raw_oh_labor_human: Optional[float] = Field(None, nullable=True)
@ -62,6 +65,7 @@ class MasterBase(DefaultBase):
rc_lost_cost: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)
rc_operation_cost: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)
rc_maintenance_cost: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)
asset_criticality: Optional[float] = Field(None, nullable=True)
rc_total_cost: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)
eac_npv: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)
eac_annual_mnt_cost: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)

@ -3,7 +3,8 @@ from sqlalchemy import Select, Delete, Float, func, cast, String
from sqlalchemy.orm import selectinload
from src.database.service import search_filter_sort_paginate
from .model import Equipment, EquipmentTransactionRecords
from src.equipment.model import Equipment, EquipmentTransactionRecords
from src.yeardata.model import Yeardata
from ..equipment_master.model import EquipmentMaster
from .schema import EquipmentCreate, EquipmentUpdate, MasterBase
from typing import Optional
@ -15,6 +16,7 @@ import httpx
from src.modules.equipment.run import main
import datetime
import math
async def get_master_by_assetnum(
@ -42,6 +44,45 @@ async def get_master_by_assetnum(
)
master_result = await db_session.execute(master_query)
records = master_result.scalars().all()
# Get all yeardata
yeardata_query = Select(Yeardata)
yeardata_result = await db_session.execute(yeardata_query)
yeardata_records = yeardata_result.scalars().all()
yeardata_dict = {y.year: y for y in yeardata_records}
# Get ens value from year data and calculate asset criticality per year
for record in records:
year = record.tahun
if year in yeardata_dict:
asset_crit_ens_energy_not_served = yeardata_dict[year].asset_crit_ens_energy_not_served
asset_crit_bpp_system = yeardata_dict[year].asset_crit_bpp_system
asset_crit_bpp_pembangkit = yeardata_dict[year].asset_crit_bpp_pembangkit
asset_crit_marginal_cost = yeardata_dict[year].asset_crit_marginal_cost
asset_crit_dmn_daya_mampu_netto = yeardata_dict[year].asset_crit_dmn_daya_mampu_netto
else:
asset_crit_ens_energy_not_served = 0 # Default value if year data not found
# compute asset criticality per record/year and attach to each record
for record in records:
# use year-specific yeardata values already loaded above
ens = float(asset_crit_ens_energy_not_served or 0)
bpp_system = float(asset_crit_bpp_system or 0)
bpp_pembangkit = float(asset_crit_bpp_pembangkit or 0)
marginal_cost = float(asset_crit_marginal_cost or 0)
dmn = float(asset_crit_dmn_daya_mampu_netto or 0)
extra_fuel_cost = marginal_cost - bpp_pembangkit
asset_criticality = ens * (0.07 * bpp_system) + (dmn - ens * extra_fuel_cost)
# if NaN or None, return 0
if asset_criticality is None or (isinstance(asset_criticality, float) and math.isnan(asset_criticality)):
asset_criticality = 0.0
setattr(record, "asset_criticality", asset_criticality)
# Get the last actual year
last_actual_year_query = (

@ -67,12 +67,23 @@ class Eac:
for idx, row in enumerate(data_actual):
cumulative_values.append(row["rc_total_cost"])
# Menghitung NPV menggunakan rumus diskonto
# Rumus NPV: NPV = Σ [Ct / (1 + r)^t]
# dimana Ct = cash flow pada periode t, r = inflation_rate, t = periode
final_value = sum(
value / ((1 + inflation_rate) ** (i + 1))
for i, value in enumerate(cumulative_values)
)
# Menghitung PMT
# Rumus PMT: PMT = PV * [r(1 + r)^n] / [(1 + r)^n 1]
# dimana PV = final_value, r = inflation_rate, n = row["seq"]
pmt_value = -npf.pmt(inflation_rate, row["seq"], final_value)
# Menghitung PMT biaya akuisisi
# Rumus PMT: PMT = PV * [r(1 + r)^n] / [(1 + r)^n 1]
# dimana PV = rc_total_cost_0, r = disc_rate, n = row["seq"]
# rc_total_cost_0 adalah biaya akuisisi awal (seq = 0)
# disc_rate adalah discount rate dari database
# row["seq"] adalah periode ke-n
pmt_aq_cost = -npf.pmt(disc_rate, row["seq"], rc_total_cost_0)
eac = pmt_value + pmt_aq_cost
@ -125,19 +136,54 @@ class Eac:
cumulative_values = []
# Menghitung NPV dan PMT secara bertahap untuk data proyeksi
# NOTE: sebelumnya kode mencoba menggeser PV proyeksi menggunakan npf.pv + sign flips,
# yang dapat menghasilkan nilai pemeliharaan yang sangat besar (meledak). Sebaiknya hitung
# nilai diskonto dari biaya proyeksi menggunakan offset waktu yang benar (last_seq) dan
# tambahkan ke last_npv. Kemudian hitung pembayaran tahunan tingkat (PMT) selama sisa
# jumlah periode (remaining_periods). Ini menjaga nilai pemeliharaan tahunan proyeksi tetap konsisten dan mencegah lonjakan eksponensial.
for idx, row in enumerate(data_proyeksi):
# print(row)
# Menyimpan nilai kumulatif hingga baris ke-n
cumulative_values.append(row["rc_total_cost"])
npv_value = sum(
value / ((1 + inflation_rate) ** (i + 1))
# Nilai proyeksi yang didiskontokan menggunakan offset eksponen dari urutan aktual terakhir
# sehingga offset tahun berlanjut dari aktual yang sudah diproses.
# Rumus NPV: NPV = Σ [Ct / (1 + r)^t]
# dimana Ct = cash flow pada periode t, r = inflation_rate, t = periode
# value adalah rc_total_cost pada periode t
# 1 + inflation_rate ** (last_seq + i + 1) adalah perhitungan diskonto dengan offset waktu
discounted_proj = sum(
(float(value) / ((1 + inflation_rate) ** (last_seq + i + 1)))
for i, value in enumerate(cumulative_values)
)
pv_value = npf.pv(inflation_rate, last_seq, 0, npv_value)
final_value = -pv_value + last_npv
# Total NPV pada titik proyeksi ini = NPV aktual terakhir + biaya proyeksi yang didiskontokan
final_value = float(last_npv) + float(discounted_proj)
# Gunakan seq penuh (jumlah periode dari akuisisi) untuk menghitung pembayaran tahunan tingkat
# pemeliharaan. Menggunakan hanya selisih dari seq aktual terakhir
# (sisa_periode) mengamortisasi seluruh nilai sekarang selama
# sejumlah periode yang sangat kecil untuk proyeksi pertama dan menghasilkan lonjakan.
# Menggunakan row["seq"] menjaga periode amortisasi konsisten dengan perhitungan lain
# dan mencegah lonjakan setelah tahun berjalan.
# amortisasi adalah proses pencatatan biaya aset selama masa manfaatnya.
periods = int(row["seq"]) if int(row.get("seq", 0)) > 0 else 1
# Menghitung PMT
pmt_value = -npf.pmt(inflation_rate, row["seq"], final_value)
pmt_aq_cost = -npf.pmt(disc_rate, row["seq"], rc_total_cost_0)
eac = pmt_value + pmt_aq_cost
# Rumus PMT: PMT = PV * [r(1 + r)^n] / [(1 + r)^n 1]
# dimana PV = final_value, r = inflation_rate, n = row["seq"]
# periods adalah jumlah periode
# final_value adalah PV pada titik proyeksi periods
pmt_value = -float(npf.pmt(inflation_rate, periods, final_value))
# menghitung PMT biaya akuisisi
# Rumus PMT: PMT = PV * [r(1 + r)^n] / [(1 + r)^n 1]
# dimana PV = rc_total_cost_0, r = disc_rate, n = row["seq"]
# rc_total_cost_0 adalah biaya akuisisi awal (seq = 0)
# disc_rate adalah discount rate dari database
# row["seq"] adalah periode ke-n
pmt_aq_cost = -float(npf.pmt(disc_rate, row["seq"], rc_total_cost_0))
eac = float(pmt_value) + float(pmt_aq_cost)
npv_results.append(
{
@ -190,7 +236,7 @@ class Eac:
# ======================================================================================================================================================
def hitung_eac_equipment(self, p_equipment_id):
def hitung_eac_equipment(self, assetnum):
try:
# Mendapatkan koneksi dari config.py
connections = get_connection()
@ -203,9 +249,31 @@ class Eac:
cursor = connection.cursor(cursor_factory=DictCursor)
rslt = self.__calculate_npv_with_db_inflation_rate(p_equipment_id)
# print(rslt)
lowest_eac_record = min(rslt, key=lambda x: x["eac"])
rslt = self.__calculate_npv_with_db_inflation_rate(assetnum)
# choose the smallest positive EAC if any exist; otherwise choose a record with EAC == 0 if present;
# as a final fallback choose the overall minimum EAC record
if not rslt:
lowest_eac_record = {
"seq": None,
"year": None,
"eac": 0.0,
"npv": 0.0,
"pmt": 0.0,
"pmt_aq_cost": 0.0,
"is_actual": 0,
}
else:
positives = [r for r in rslt if float(r.get("eac", 0)) > 0]
if positives:
lowest_eac_record = min(positives, key=lambda x: float(x["eac"]))
else:
zeros = [r for r in rslt if float(r.get("eac", 0)) == 0]
if zeros:
# pick one of the zero-eac records (choose smallest npv for determinism)
lowest_eac_record = min(zeros, key=lambda x: float(x.get("npv", 0)))
else:
lowest_eac_record = min(rslt, key=lambda x: float(x.get("eac", 0)))
# print(json.dumps(lowest_eac_record))
# Update lcc_equipment_tr_data
update_query = """
@ -223,7 +291,7 @@ class Eac:
float(lowest_eac_record["pmt"]),
float(lowest_eac_record["pmt_aq_cost"]),
lowest_eac_record["is_actual"],
p_equipment_id,
assetnum,
),
)
@ -231,11 +299,98 @@ class Eac:
cursor.close()
connection.close()
return lowest_eac_record
except Exception as e:
print("Terjadi kesalahan saat memproses semua equipment:", str(e))
def main():
"""
Process all equipment EAC calculations. Returns list of processed asset numbers.
Raises RuntimeError if database connection cannot be established.
"""
connections = get_connection()
connection = connections[0] if isinstance(connections, tuple) else connections
if connection is None:
raise RuntimeError("Database connection failed.")
cursor = connection.cursor(cursor_factory=DictCursor)
processed = []
try:
query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master"
cursor.execute(query_main)
results = cursor.fetchall()
eac = Eac()
for row in results:
try:
assetnum = row["assetnum"]
except Exception:
assetnum = row[0] if len(row) > 0 else None
if assetnum is None:
print("Skipping None assetnum")
continue
print(f"Processing asset: {assetnum}")
eac.hitung_eac_equipment(assetnum)
processed.append(assetnum)
print("EAC calculation finished for all equipment.")
return processed
except Exception as e:
print("Terjadi kesalahan saat memproses semua equipment:", str(e))
raise
finally:
try:
cursor.close()
except Exception:
pass
try:
connection.close()
except Exception:
pass
if __name__ == "__main__":
eac = Eac()
eac.hitung_eac_equipment("A22277")
print("EAC calculation finished.")
try:
connections = get_connection()
connection = connections[0] if isinstance(connections, tuple) else connections
if connection is None:
print("Database connection failed.")
sys.exit(1)
cursor = connection.cursor(cursor_factory=DictCursor)
query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master"
cursor.execute(query_main)
results = cursor.fetchall()
eac = Eac()
for row in results:
try:
assetnum = row["assetnum"]
except Exception:
assetnum = row[0] if len(row) > 0 else None
if assetnum is None:
print("Skipping None assetnum")
continue
print(f"Processing asset: {assetnum}")
eac.hitung_eac_equipment(assetnum)
print("EAC calculation finished for all equipment.")
except Exception as e:
print("Terjadi kesalahan saat memproses semua equipment:", str(e))
finally:
try:
cursor.close()
except Exception:
pass
try:
connection.close()
except Exception:
pass

@ -28,6 +28,11 @@ class Prediksi:
self.RELIABILITY_APP_URL = RELIABILITY_APP_URL or os.getenv(
"RELIABILITY_APP_URL", "http://192.168.1.82:8000/reliability"
)
# Base URL for auth endpoints (sign-in, refresh-token)
self.AUTH_APP_URL = os.getenv("AUTH_APP_URL", "http://192.168.1.82:8000")
# tokens will be stored here after sign-in/refresh
self.access_token = None
self.refresh_token = None
# Fungsi untuk mengambil data dari database
def __get_param(self, equipment_id):
@ -199,22 +204,44 @@ class Prediksi:
)
"""
# Fetch data from external API
# If a token was provided, store locally so fetch_api_data can use/refresh it
if token:
self.access_token = token
# Fetch data from external API (uses instance access_token and will try refresh on 403)
async def fetch_api_data(assetnum: str, year: int) -> dict:
url = self.RELIABILITY_APP_URL
# print(f"Using URL: {url}") # Add this for debugging
endpoint = f"{url}/main/number-of-failures/{assetnum}/{int(year)}/{int(year)}"
async with httpx.AsyncClient() as client:
# print(
# f"{url}/main/number-of-failures/{assetnum}/{int(year)}/{int(year)}"
# )
try:
current_token = getattr(self, "access_token", None)
response = await client.get(
f"{url}/main/number-of-failures/{assetnum}/{int(year)}/{int(year)}",
endpoint,
timeout=30.0,
headers={"Authorization": f"Bearer {token}"},
headers={"Authorization": f"Bearer {current_token}"} if current_token else {},
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
status = getattr(e.response, "status_code", None)
# If we get a 403, try to refresh the access token and retry once
if status == 403:
print("Received 403 from reliability API, attempting to refresh access token...")
new_access = await self.refresh_access_token()
if new_access:
try:
response = await client.get(
endpoint,
timeout=30.0,
headers={"Authorization": f"Bearer {new_access}"},
)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e2:
print(f"HTTP error occurred after refresh: {e2}")
return {}
print(f"HTTP error occurred: {e}")
return {}
except httpx.HTTPError as e:
print(f"HTTP error occurred: {e}")
return {}
@ -224,22 +251,46 @@ class Prediksi:
records_to_insert = []
for _, row in data.iterrows():
max_seq = max_seq + 1
# Update values from API
# (token already stored before defining fetch_api_data)
# maintain previous cm_interval between iterations using attribute on fetch_api_data
if not hasattr(fetch_api_data, "prev_cm"):
fetch_api_data.prev_cm = None
# Update values from API (current year)
api_data = await fetch_api_data(equipment_id, row["year"])
if api_data and "data" in api_data and isinstance(api_data["data"], list) and len(api_data["data"]) > 0:
# Get current num_fail (ensure numeric)
try:
cm_interval_prediction = float(api_data["data"][0].get("num_fail", row.get("cm_interval", 1)))
cur_cm = float(api_data["data"][0].get("num_fail", row.get("cm_interval", 1)))
except Exception:
cm_interval_prediction = float(row.get("cm_interval", 1)) if not pd.isna(row.get("cm_interval", None)) else 1
cur_cm = float(row.get("cm_interval", 1)) if not pd.isna(row.get("cm_interval", None)) else 1.0
else:
# Fallback: ensure numeric scalar, not a tuple
try:
val = float(row.get("cm_interval", 1))
cm_interval_prediction = val if val >= 1 else 1.0
cur_cm = val if val >= 1 else 1.0
except Exception:
cur_cm = 1.0
# Determine previous cm_interval: prefer stored prev_cm, otherwise try API for previous year, else fallback to cur_cm
if fetch_api_data.prev_cm is not None:
prev_cm = float(fetch_api_data.prev_cm)
else:
try:
api_prev = await fetch_api_data(equipment_id, int(row["year"]) - 1)
if api_prev and "data" in api_prev and isinstance(api_prev["data"], list) and len(api_prev["data"]) > 0:
prev_cm = float(api_prev["data"][0].get("num_fail", cur_cm))
else:
# attempt to use any available previous value from the row if present, otherwise fallback to current
prev_cm = float(row.get("cm_interval", cur_cm)) if not pd.isna(row.get("cm_interval", None)) else cur_cm
except Exception:
cm_interval_prediction = 1.0
prev_cm = cur_cm
# compute difference: current year interval minus previous year interval
try:
cm_interval_diff = float(cur_cm) - float(prev_cm)
except Exception:
cm_interval_diff = 0.0
# append record using the difference for raw_cm_interval
records_to_insert.append(
(
str(uuid4()),
@ -247,7 +298,7 @@ class Prediksi:
float(row["pm_interval"]) if not pd.isna(row.get("pm_interval", None)) else 0.0,
float(row["year"]) if not pd.isna(row.get("year", None)) else 0.0,
equipment_id,
cm_interval_prediction,
cm_interval_diff,
float(row["cm_cost"]) if not pd.isna(row.get("cm_cost", None)) else 0.0,
float(row["cm_labor_time"]) if not pd.isna(row.get("cm_labor_time", None)) else 0.0,
float(row["cm_labor_human"]) if not pd.isna(row.get("cm_labor_human", None)) else 0.0,
@ -267,6 +318,9 @@ class Prediksi:
)
)
# store current cm for next iteration
fetch_api_data.prev_cm = cur_cm
# Eksekusi batch insert
cursor.executemany(insert_query, records_to_insert)
connection.commit()
@ -386,6 +440,60 @@ class Prediksi:
if connection:
connection.close()
# Authentication: sign-in and refresh helpers
async def sign_in(self, username: str = "user14", password: str = "password") -> dict:
"""Sign in to AUTH_APP_URL/sign-in using provided username/password.
Stores access_token and refresh_token on the instance when successful and returns the parsed response dict.
"""
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{self.AUTH_APP_URL}/sign-in",
json={"username": username, "password": password},
timeout=30.0,
)
resp.raise_for_status()
data = resp.json()
if isinstance(data, dict) and "data" in data:
d = data.get("data") or {}
# set tokens if present
self.access_token = d.get("access_token")
self.refresh_token = d.get("refresh_token")
return data
except httpx.HTTPError as e:
print(f"Sign-in failed: {e}")
return None
async def refresh_access_token(self) -> str:
"""Refresh the access token using the stored refresh_token via AUTH_APP_URL/refresh-token.
On success updates self.access_token and returns it. Returns None on failure.
"""
if not getattr(self, "refresh_token", None):
print("No refresh token available to refresh access token.")
return None
try:
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{self.AUTH_APP_URL}/refresh-token",
headers={"Authorization": f"Bearer {self.refresh_token}"},
timeout=30.0,
)
resp.raise_for_status()
data = resp.json()
if isinstance(data, dict) and "data" in data:
new_access = data.get("data", {}).get("access_token")
if new_access:
self.access_token = new_access
print("Access token refreshed.")
return new_access
print("Refresh response did not contain a new access token.")
return None
except httpx.HTTPError as e:
print(f"Error refreshing token: {e}")
return None
# ======================================================================================================================================================
async def predict_equipment_data(self, assetnum, token):
@ -445,26 +553,180 @@ class Prediksi:
# Prediksi untuk setiap kolom
for column in df.columns:
if column != "year":
if "cost" in column.lower():
# Prediksi Future Value
nper = max_year - df["year"].max()
pv = -df[column].iloc[-1]
predictions[column] = self.__future_value_predict(
rate, nper, pmt, pv, future_years
)
elif df[column].nunique() < 5:
predictions[column] = exponential_smoothing_predict(
column, future_years
)
elif df[column].isnull().sum() > 0:
predictions[column] = decision_tree_predict(
column, future_years
if column == "year":
continue
n_future = len(future_years)
col_lower = column.lower()
try:
# Case untuk kolom yang terkait dengan corrective maintenance (cm)
if "cm" in col_lower:
# Tentukan jumlah baris recent yang dianggap actual jika kolom is_actual ada
if "is_actual" in df.columns:
recent_df = df[df["is_actual"] == 1]
recent_n = recent_df.shape[0]
else:
recent_df = df
recent_n = df.shape[0]
recent_n = max(1, recent_n)
recent_vals = (
recent_df.sort_values("year", ascending=False)
.head(recent_n)[column]
.dropna()
)
# Fallback ke semua nilai non-na jika tidak ada recent_vals
if recent_vals.empty:
recent_vals = df[column].dropna()
# Jika masih kosong, pakai default (interval minimal 1, lainnya 0)
if recent_vals.empty:
avg = 0.0
else:
# Pastikan numeric; jika gagal, pakai mean dari yang bisa dikonversi
try:
avg = float(np.nanmean(recent_vals.astype(float)))
except Exception:
# jika conversion gagal gunakan mean pandas (objek mungkin numeric-like)
avg = float(recent_vals.mean())
if "interval" in col_lower:
avg = max(0.0, avg)
preds = np.repeat(float(avg), n_future)
else:
predictions[column] = linear_regression_predict(
column, future_years
)
# Untuk kolom non-cm, gunakan nilai dari last actual year bila ada,
# jika tidak ada gunakan last available non-NA value, jika tidak ada pakai 0.0
if "is_actual" in df.columns and not df[df["is_actual"] == 1].empty:
last_actual_year_series = df[df["is_actual"] == 1]["year"]
last_actual_year = (
int(last_actual_year_series.max())
if not last_actual_year_series.isna().all()
else int(df["year"].max())
)
else:
last_actual_year = int(df["year"].max())
row_vals = df[df["year"] == last_actual_year]
value = None
if not row_vals.empty:
val = row_vals[column].iloc[-1]
if not pd.isna(val):
try:
value = float(val)
except Exception:
# jika bukan numeric, set 0.0
value = 0.0
if value is None:
non_na = df[column].dropna()
if not non_na.empty:
try:
value = float(non_na.iloc[-1])
except Exception:
value = 0.0
else:
value = 0.0
preds = np.repeat(float(value), n_future)
except Exception:
# Jika terjadi error unexpected, fallback ke nol
preds = np.repeat(0.0, n_future)
# Pastikan semua prediksi bernilai non-negatif float dan berbentuk list sesuai panjang future_years
preds = np.abs(np.array(preds, dtype=float))
predictions[column] = preds.tolist()
# if "cost" in column.lower():
# # Prediksi Future Value
# nper = max_year - df["year"].max()
# pv = -df[column].iloc[-1]
# predictions[column] = self.__future_value_predict(
# rate, nper, pmt, pv, future_years
# )
# elif df[column].nunique() < 5:
# predictions[column] = exponential_smoothing_predict(
# column, future_years
# )
# elif df[column].isnull().sum() > 0:
# predictions[column] = decision_tree_predict(
# column, future_years
# )
# else:
# predictions[column] = linear_regression_predict(
# column, future_years
# )
# for column in df.columns:
# if column != "year":
# if "cost" in column.lower():
# # Prediksi Future Value
# # ensure nper is an integer and non-negative
# try:
# nper = int(max_year - df["year"].max())
# except Exception:
# nper = 0
# if nper < 0:
# nper = 0
# # safe conversion of last observed value to numeric present value (pv)
# try:
# last_val = df[column].iloc[-1]
# pv = -float(last_val) if not pd.isna(last_val) else 0.0
# except Exception:
# pv = 0.0
# # compute future values and ensure preds is a numpy float array
# fv_list = self.__future_value_predict(
# rate, nper, pmt, pv, future_years
# )
# preds = np.array(fv_list, dtype=float)
# predictions[column] = preds
# elif df[column].nunique() < 5:
# preds = exponential_smoothing_predict(column, future_years)
# elif df[column].isnull().sum() > 0:
# preds = decision_tree_predict(column, future_years)
# else:
# # Produce sideways / fluctuating predictions around recent level (deterministic)
# series = df[column].dropna().values
# if len(series) == 0:
# base = 0.0
# else:
# base = float(np.mean(series[-3:])) if len(series) >= 3 else float(series[-1])
# # amplitude based on historical std, fallback to a small fraction of base
# hist_std = float(np.std(series)) if len(series) > 1 else max(abs(base) * 0.01, 0.0)
# amp = max(hist_std, abs(base) * 0.01)
# t = np.arange(len(future_years))
# preds = base + amp * np.sin(2 * np.pi * t / max(len(future_years), 1))
# # avoid negative predictions for inherently non-negative series
# preds = np.where(preds < 0, 0, preds)
# # normalize preds to numpy float array
# preds = np.array(preds, dtype=float)
# # Columns containing "human" should be rounded to one decimal and clamped 0.0-3.0
# if "human" in column.lower():
# # humans must be whole numbers (no decimals) and capped between 0 and 3
# preds = np.nan_to_num(preds, nan=0.0)
# preds = np.rint(preds) # round to nearest integer
# preds = np.clip(preds, 0, 3).astype(int)
# # Columns containing "labor_time" should be reasonable yearly hours.
# # If predictions are unrealistically large, scale them down proportionally to a sane max (e.g., 2000 hours/year),
# # then round to one decimal and ensure non-negative.
# if "labor_time" in column.lower():
# max_yearly_hours = 2000.0
# current_max = np.nanmax(preds) if preds.size > 0 else 0.0
# if current_max > max_yearly_hours and current_max > 0:
# scale = max_yearly_hours / current_max
# preds = preds * scale
# preds = np.clip(preds, 0.0, max_yearly_hours)
# preds = np.round(preds, 1)
# predictions[column] = preds
# Konversi hasil ke DataFrame
predictions_df = pd.DataFrame(predictions)
@ -489,8 +751,28 @@ class Prediksi:
RELIABILITY_APP_URL = os.getenv("RELIABILITY_APP_URL", "http://192.168.1.82:8000/reliability")
async def main(RELIABILITY_APP_URL=RELIABILITY_APP_URL):
async def main(RELIABILITY_APP_URL=RELIABILITY_APP_URL, assetnum=None):
connection = None
try:
prediksi = Prediksi(RELIABILITY_APP_URL)
# Sign in to obtain access_token/refresh_token before processing
signin_res = await prediksi.sign_in()
if not getattr(prediksi, "access_token", None):
print("Failed to obtain access token; aborting.")
return
# If an assetnum was provided, run only for that assetnum
if assetnum:
print(f"Processing single assetnum: {assetnum}")
try:
await prediksi.predict_equipment_data(assetnum, prediksi.access_token)
except Exception as e:
print(f"Error processing {assetnum}: {e}")
print("Selesai.")
return
# Otherwise fetch all assetnums from DB and loop
connections = get_connection()
connection = connections[0] if isinstance(connections, tuple) else connections
if connection is None:
@ -502,28 +784,24 @@ async def main(RELIABILITY_APP_URL=RELIABILITY_APP_URL):
cursor.execute(query_main)
results = cursor.fetchall()
prediksi = Prediksi(RELIABILITY_APP_URL)
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTc2MjQxODk5My4xNzI4NTYsImp0aSI6ImJ1OU0xQVlLSTZENTd2cC1OaDgtUlEiLCJ0eXBlIjoiYWNjZXNzIiwic3ViIjoiMzg1NzJhOTItZjE2Yy00MWIyLThjNmYtYWZhNTcyMzhhNWU3IiwibmJmIjoxNzYyNDE4OTkzLCJjc3JmIjoiNjY5NzVjNDEtNTg0ZS00OGFkLWJjMmItMDNlZDEyZDM2ZDczIiwiZXhwIjoxNzYyNDI2MTkzLCJub25jZSI6ImYzMThkNDVkNmYzZWRjMzNiN2Q0MmE0MGRkNDJkNDRhIn0.elDnyaoeJ48oOIUdMRZjt7gGICmK-2Awg6Rbl_BZ1PQ"
for idx, row in enumerate(results, start=1):
assetnum = row.get("assetnum") if hasattr(row, "get") else row[0]
if not assetnum or str(assetnum).strip() == "":
current_asset = row.get("assetnum") if hasattr(row, "get") else row[0]
if not current_asset or str(current_asset).strip() == "":
print(f"[{idx}/{len(results)}] Skipping empty assetnum")
continue
print(f"[{idx}/{len(results)}] Processing assetnum: {assetnum}")
print(f"[{idx}/{len(results)}] Processing assetnum: {current_asset}")
try:
await prediksi.predict_equipment_data(assetnum, token)
await prediksi.predict_equipment_data(current_asset, prediksi.access_token)
except Exception as e:
print(f"Error processing {assetnum}: {e}")
print(f"Error processing {current_asset}: {e}")
print("Selesai.")
except Exception as e:
print(f"Error getting database connection: {e}")
return
except Exception as e:
print(f"Error getting database connection: {e}")
print(f"Error in main: {e}")
return
finally:
if connection:
connection.close()
if __name__ == "__main__":

@ -1,3 +1,4 @@
import asyncio
import psycopg2
from psycopg2.extras import DictCursor
from uuid import uuid4
@ -500,11 +501,11 @@ async def query_data():
year, # tahun
seq, # seq
1, # is_actual
1, # raw_cm_interval (minimal 1 karena minimal 1x OH)
0, # raw_cm_interval (minimal 1 karena minimal 1x OH)
0, # raw_cm_material_cost
0, # raw_cm_labor_time
0, # raw_cm_labor_human
1, # pm interval set default 1
0, # raw_pm_interval set default 1
0, # raw_pm_material_cost
0, # raw_pm_labor_time
0, # raw_pm_labor_human
@ -707,3 +708,11 @@ async def query_data():
except Exception:
pass
# print("========Process finished and connection closed.========")
if __name__ == "__main__":
async def main():
# await insert_ms_equipment_data()
# await query_data()
print("insert_actual_data.py is called")
asyncio.run(main())

@ -5,12 +5,12 @@ import time
try:
from .insert_actual_data import query_data, insert_lcca_maximo_corrective_data, insert_ms_equipment_data
from .Prediksi import Prediksi, main as predict_run
from .Eac import Eac
from .Eac import Eac, main as eac_run
except ImportError:
# fallback when there's no parent package (e.g., python run.py)
from insert_actual_data import query_data, insert_lcca_maximo_corrective_data, insert_ms_equipment_data
from Prediksi import Prediksi, main as predict_run
from Eac import Eac
from Eac import Eac, main as eac_run
# Panggil fungsi
@ -29,12 +29,21 @@ async def main():
print(f"Error in predict_equipment_data: {str(e)}")
return
# try:
# eac = Eac()
# eac.hitung_eac_equipment(assetnum)
# except Exception as e:
# print(f"Error in hitung_eac_equipment: {str(e)}")
# return
try:
# eac = Eac()
# eac.hitung_eac_equipment(assetnum)
result = await eac_run()
if asyncio.iscoroutine(result):
result = await result
# if the function returned a list of objects, optionally log or handle it
if isinstance(result, (list, tuple)):
print(f"EAC run returned {len(result)} items.")
else:
print("EAC run completed.")
except Exception as e:
print(f"Error in hitung_eac_equipment: {str(e)}")
return
end_time = time.time()
execution_time = end_time - start_time
@ -43,27 +52,17 @@ async def main():
hours = int(execution_time // 3600)
minutes = int((execution_time % 3600) // 60)
seconds = execution_time % 60
message = f"Insert & Prediction calculation finished in {hours}h {minutes}m {seconds:.2f}s."
message = f"Script calculation finished in {hours}h {minutes}m {seconds:.2f}s."
elif execution_time >= 60:
minutes = int(execution_time // 60)
seconds = execution_time % 60
message = f"Insert & Prediction calculation finished in {minutes}m {seconds:.2f}s."
message = f"Script calculation finished in {minutes}m {seconds:.2f}s."
else:
message = f"Insert & Prediction calculation finished in {execution_time:.2f} seconds."
message = f"Script calculation finished in {execution_time:.2f} seconds."
print(message)
return message
# print(f"EAC calculation finished in {execution_time:.2f} seconds.")
# return f"EAC calculation finished in {execution_time:.2f} seconds."
# if __name__ == "__main__":
# asyncio.run(
# main(
# "A22277",
# "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTczOTUxODc4Ni4yOTM5ODUsImp0aSI6Ilo5clRUOFhGa3RweFZUQlBmNGxvRmciLCJ0eXBlIjoiYWNjZXNzIiwic3ViIjoiNWUxNmY4YTgtMWEwMy00MTVjLWIwZjItMTVmZjczOWY1OGE4IiwibmJmIjoxNzM5NTE4Nzg2LCJjc3JmIjoiZWI0MjAzOTMtYTg1ZS00NDJjLWIyMjItZTU5MGU5MGVkYjkyIiwiZXhwIjoxNzM5NjA1MTg2LCJub25jZSI6IjVkZDdhOGYyMWIzZWUxZDZmYmI1YThhMDBlMmYyYjczIn0.3Jv943cU5FuxJ9K92JmVoOtTBqexF4Dke8TrrC4l0Uk",
# )
# )
if __name__ == "__main__":
asyncio.run(
main()

Loading…
Cancel
Save