feat: Standardize equipment API responses to `EquipmentDataMaster` and optimize LCC calculations by pre-fetching parameters and including new material cost fields.

rest-api
MrWaradana 1 month ago
parent b72e1ec51d
commit 1f38f1a80f

@ -6,6 +6,7 @@ import json
from src.equipment.model import Equipment, EquipmentTransactionRecords
from src.equipment.schema import (
EquipmentBase,
EquipmentDataMaster,
EquipmentPagination,
EquipmentRead,
EquipmentCreate,
@ -248,7 +249,7 @@ async def get_equipment(db_session: DbSession, collector_db_session: CollectorDb
)
@router.post("", response_model=StandardResponse[EquipmentCreate])
@router.post("", response_model=StandardResponse[EquipmentDataMaster])
async def create_equipment(
db_session: DbSession,
equipment_in: EquipmentCreate,
@ -263,7 +264,7 @@ async def create_equipment(
return StandardResponse(data=equipment, message="Data created successfully")
@router.put("/{assetnum}", response_model=StandardResponse[EquipmentUpdate])
@router.put("/{assetnum}", response_model=StandardResponse[EquipmentDataMaster])
async def update_equipment(
db_session: DbSession,
assetnum: str,
@ -291,7 +292,7 @@ async def update_equipment(
)
@router.delete("/{equipment_id}", response_model=StandardResponse[EquipmentBase])
@router.delete("/{equipment_id}", response_model=StandardResponse[EquipmentDataMaster])
async def delete_equipment(db_session: DbSession, equipment_id: str):
equipment = await get_by_id(db_session=db_session, equipment_id=equipment_id)

@ -95,7 +95,6 @@ class EquipmentCreate(EquipmentBase):
class EquipmentUpdate(EquipmentBase):
pass
class EquipmentRead(DefaultBase):
equipment_master_record: EquipmentMasterBase
equipment_data: EquipmentBase

@ -38,10 +38,7 @@ class Prediksi:
def __get_param(self, equipment_id):
try:
# Mendapatkan koneksi dari config.py
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
connection, connection_wo_db = get_connection()
if connection is None:
print("Database connection failed.")
return None
@ -69,10 +66,7 @@ class Prediksi:
def __fetch_data_from_db(self, equipment_id):
try:
# Get connection from config.py (using only the first connection)
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
connection, connection_wo_db = get_connection()
if connection is None:
print("Database connection failed.")
return None
@ -159,10 +153,7 @@ class Prediksi:
# Fungsi untuk menyimpan data proyeksi ke database
async def __insert_predictions_to_db(self, data, equipment_id, token):
try:
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
connection, connection_wo_db = get_connection()
if connection is None:
print("Database connection failed.")
return None
@ -320,11 +311,8 @@ class Prediksi:
def __get_asset_criticality_params(self, equipment_id):
try:
connections = get_connection()
connection, connection_wo_db = get_connection()
efdh_foh_sum = None
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
if connection is None:
print("Database connection failed.")
return None
@ -418,28 +406,42 @@ class Prediksi:
# Fungsi untuk menghapus data proyeksi pada tahun tertentu
def __update_data_lcc(self, equipment_id):
try:
connections = get_connection()
connection, connection_wo_db = get_connection()
production_connection = get_production_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
production_connection = (
production_connection[0] if isinstance(production_connection, tuple) else production_connection
)
if connection is None or production_connection is None:
print("Database connection failed.")
return None
cursor = connection.cursor(cursor_factory=DictCursor)
# --- OPTIMIZATION START: Fetch static data ONCE ---
# Fetch man_hour_rate (constant for all years currently based on code logic)
man_hour_rate = 0.0
try:
prod_cur = production_connection.cursor()
prod_cur.execute("""SELECT value_num FROM lcc_ms_master WHERE name='manhours_rate'""")
r2 = prod_cur.fetchone()
if r2 and r2[0] is not None:
man_hour_rate = float(r2[0])
prod_cur.close()
except Exception:
pass
# Fetch Asset Criticality (constant for the asset)
asset_criticality_data = self.__get_asset_criticality_params(equipment_id)
ac = asset_criticality_data if isinstance(asset_criticality_data, dict) else {}
asset_criticality_value = float(ac.get("asset_criticality", 0.0))
# --- OPTIMIZATION END ---
# Ambil semua baris untuk assetnum
select_q = '''
SELECT id, seq, tahun,
raw_cm_interval, raw_cm_material_cost, rc_cm_labor_cost,
raw_pm_interval, raw_pm_material_cost, rc_pm_labor_cost,
raw_predictive_interval, raw_predictive_material_cost, rc_predictive_labor_cost,
raw_oh_interval, raw_oh_material_cost, rc_oh_labor_cost,
raw_predictive_interval, raw_predictive_material_cost, rc_predictive_labor_cost,
raw_cm_interval, raw_cm_material_cost, rc_cm_labor_cost, rc_cm_material_cost,
raw_pm_interval, raw_pm_material_cost, rc_pm_labor_cost, rc_pm_material_cost,
raw_predictive_interval, raw_predictive_material_cost, rc_predictive_labor_cost, rc_predictive_material_cost,
raw_oh_interval, raw_oh_material_cost, rc_oh_labor_cost, rc_oh_material_cost,
efdh_equivalent_forced_derated_hours, foh_forced_outage_hours
FROM lcc_equipment_tr_data
WHERE assetnum = %s;
@ -447,24 +449,6 @@ class Prediksi:
cursor.execute(select_q, (equipment_id,))
rows = cursor.fetchall()
# Helper to get man_hour for a year (fallback to master 'manhours_rate')
def _get_man_hour_for_year(year):
try:
# cur = connection.cursor()
prod_cur = production_connection.cursor()
# cur.execute("SELECT man_hour FROM lcc_ms_year_data WHERE year = %s", (year,))
# r = cur.fetchone()
# if r and r[0] is not None:
# return float(r[0])
# cur.execute("SELECT value_num FROM lcc_ms_master WHERE name='manhours_rate'")
prod_cur.execute("""SELECT value_num FROM lcc_ms_master WHERE name='manhours_rate'""")
r2 = prod_cur.fetchone()
if r2 and r2[0] is not None:
return float(r2[0])
except Exception:
pass
return 0.0
update_q = '''
UPDATE lcc_equipment_tr_data
SET rc_cm_material_cost = %s,
@ -480,14 +464,14 @@ class Prediksi:
WHERE id = %s;
'''
batch_params = []
for r in rows:
try:
yr = r.get("tahun") if isinstance(r, dict) else r[2]
man_hour = _get_man_hour_for_year(yr)
seq = int(r.get("seq") or 0) if isinstance(r, dict) else int(r[1] or 0)
# yr = r.get("tahun") if isinstance(r, dict) else r[2]
# man_hour = man_hour_rate # Used pre-fetched value
# seq = int(r.get("seq") or 0) if isinstance(r, dict) else int(r[1] or 0)
raw_pm_material_cost = float(r.get("raw_pm_material_cost") or 0.0)
raw_predictive_material_cost = float(r.get("raw_predictive_material_cost") or 0.0)
@ -506,38 +490,20 @@ class Prediksi:
rc_oh_labor = float(r.get("rc_oh_labor_cost") or 0.0)
try:
# if np.isfinite(raw_pm_interval) and raw_pm_interval != 0:
# rc_pm_material = raw_pm_material_cost * raw_pm_interval
# else:
rc_pm_material = raw_pm_material_cost
rc_pm_material = raw_pm_material_cost if raw_pm_material_cost > 0 else float(r.get("rc_pm_material_cost") or 0.0)
except Exception:
rc_pm_material = 0.0
try:
# if np.isfinite(raw_predictive_interval) and raw_predictive_interval != 0:
# rc_predictive_material = raw_predictive_material_cost * raw_predictive_interval
# else:
rc_predictive_material = raw_predictive_material_cost
rc_predictive_material = raw_predictive_material_cost if raw_predictive_material_cost > 0 else float(r.get("rc_predictive_material_cost") or 0.0)
except Exception:
rc_predictive_material = 0.0
rc_oh_material = raw_oh_material_cost
rc_oh_material = raw_oh_material_cost if raw_oh_material_cost > 0 else float(r.get("rc_oh_material_cost") or 0.0)
asset_criticality_data = self.__get_asset_criticality_params(equipment_id)
asset_criticality_value = 0.0
# Simplify extraction and avoid repeating the multiplication
ac = asset_criticality_data if isinstance(asset_criticality_data, dict) else {}
efdh_foh_sum = efdh_equivalent_forced_derated_hours + foh_forced_outage_hours if efdh_equivalent_forced_derated_hours and foh_forced_outage_hours else 0.0
# try:
# efdh_foh_sum = float(ac.get("efdh_foh_sum", 0.0))
# except Exception:
# efdh_foh_sum = 0.0
try:
asset_criticality_value = float(ac.get("asset_criticality", 0.0))
except Exception:
asset_criticality_value = 0.0
# single multiplier used for all RC groups
ac_multiplier = efdh_foh_sum * asset_criticality_value
@ -551,9 +517,7 @@ class Prediksi:
id_val = r.get("id") if isinstance(r, dict) else r[0]
cursor.execute(
update_q,
(
batch_params.append((
rc_cm_material,
rc_cm_labor,
rc_pm_material,
@ -563,13 +527,17 @@ class Prediksi:
rc_oh_material,
rc_oh_labor,
total,
id_val,
),
)
id_val
))
except Exception:
# ignore row-specific errors and continue
continue
# Execute Batch Update
if batch_params:
cursor.executemany(update_q, batch_params)
# For seq=0 rows, set rc_total_cost to acquisition_cost
cursor.execute(
"update lcc_equipment_tr_data set rc_total_cost = (select acquisition_cost from lcc_ms_equipment_data where assetnum=lcc_equipment_tr_data.assetnum) where assetnum = %s and seq=0;",
@ -584,14 +552,16 @@ class Prediksi:
finally:
if connection:
connection.close()
if 'production_connection' in locals() and production_connection:
try:
production_connection.close()
except Exception:
pass
# Fungsi untuk mengambil parameter dari database
def __get_rate_and_max_year(self, equipment_id):
try:
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
connection, connection_wo_db = get_connection()
if connection is None:
print("Database connection failed.")
return None
@ -751,13 +721,10 @@ class Prediksi:
print(f"HTTP error occurred: {e}")
return {}
def __get_man_hour_rate(self, staff_level: str = "junior"):
def __get_man_hour_rate(self, staff_level: str = "Junior"):
connection = None
try:
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
connection, connection_wo_db = get_connection()
if connection is None:
return 0.0
@ -829,6 +796,7 @@ class Prediksi:
# Mendapatkan rate dan tahun maksimal
rate, max_year = self.__get_rate_and_max_year(assetnum)
man_hour_rate = self.__get_man_hour_rate() # Defaults to 'junior'
pmt = 0
# Prediksi untuk setiap kolom
@ -841,7 +809,6 @@ class Prediksi:
try:
# Case untuk kolom yang terkait dengan corrective maintenance (cm)
if "cm" in col_lower:
recent_df = df
recent_n = df.shape[0]
@ -852,7 +819,7 @@ class Prediksi:
.head(recent_n)[column]
.dropna()
)
# print(f"Recent Vals: {recent_vals}")
# Fallback ke semua nilai non-na jika tidak ada recent_vals
if recent_vals.empty:
recent_vals = df[column].dropna()
@ -865,10 +832,16 @@ class Prediksi:
# Interval from number of failures
interval = 0.0
if isinstance(failures_data, dict):
val = failures_data.get("data")
if val is not None:
data_list = failures_data.get("data")
# data is a list of objects, extract num_fail from first item
if isinstance(data_list, list) and len(data_list) > 0:
first_item = data_list[0]
if isinstance(first_item, dict):
num_fail = first_item.get("num_fail")
if num_fail is not None:
try:
interval = float(val)
interval = float(num_fail)
print(f"interval for year {yr}: {interval}")
except Exception:
interval = 0.0
@ -883,11 +856,10 @@ class Prediksi:
else:
avg = pd.to_numeric(recent_vals, errors="coerce").fillna(0).mean()
avg = 0.0 if pd.isna(avg) else float(avg)
preds = np.repeat(float(avg), n_future)
# print(preds)
else:
# Untuk kolom non-cm, gunakan nilai dari last actual year bila ada,
# Для kolom non-cm, gunakan nilai dari last actual year bila ada,
# jika tidak ada gunakan last available non-NA value, jika tidak ada pakai 0.0
if "is_actual" in df.columns and not df[df["is_actual"] == 1].empty:
last_actual_year_series = df[df["is_actual"] == 1]["year"]
@ -900,6 +872,7 @@ class Prediksi:
last_actual_year = int(df["year"].max())
row_vals = df[df["year"] == last_actual_year]
value = None
if not row_vals.empty:
@ -1067,8 +1040,7 @@ async def main(RELIABILITY_APP_URL=RELIABILITY_APP_URL, assetnum=None, token=Non
return
# Otherwise fetch all assetnums from DB and loop
connections = get_connection()
connection = connections[0] if isinstance(connections, tuple) else connections
connection, connection_wo_db = get_connection()
if connection is None:
print("Database connection failed.")
return
@ -1078,16 +1050,36 @@ async def main(RELIABILITY_APP_URL=RELIABILITY_APP_URL, assetnum=None, token=Non
cursor.execute(query_main)
results = cursor.fetchall()
for idx, row in enumerate(results, start=1):
current_asset = row.get("assetnum") if hasattr(row, "get") else row[0]
if not current_asset or str(current_asset).strip() == "":
print(f"[{idx}/{len(results)}] Skipping empty assetnum")
continue
print(f"[{idx}/{len(results)}] Predicting assetnum: {current_asset}")
# Close connection early as we have the data and don't need it for the async loop
if connection:
connection.close()
connection = None
# Concurrency limit to prevent overwhelming DB/API
MAX_CONCURRENT_TASKS = 10
sem = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
total_assets = len(results)
print(f"Starting prediction for {total_assets} assets with {MAX_CONCURRENT_TASKS} concurrent tasks.")
async def bound_predict(idx, asset):
async with sem:
try:
await prediksi.predict_equipment_data(current_asset, prediksi.access_token)
if not asset or str(asset).strip() == "":
print(f"[{idx}/{total_assets}] Skipping empty assetnum")
return
print(f"[{idx}/{total_assets}] Predicting assetnum: {asset}")
await prediksi.predict_equipment_data(asset, prediksi.access_token)
except Exception as e:
print(f"Error Predicting {current_asset}: {e}")
print(f"Error Predicting {asset}: {e}")
tasks = []
for idx, row in enumerate(results, start=1):
current_asset = row.get("assetnum") if hasattr(row, "get") else row[0]
tasks.append(bound_predict(idx, current_asset))
await asyncio.gather(*tasks)
print("Selesai.")
except Exception as e:
@ -1098,7 +1090,17 @@ async def main(RELIABILITY_APP_URL=RELIABILITY_APP_URL, assetnum=None, token=Non
connection.close()
if __name__ == "__main__":
asyncio.run(
main()
import argparse
parser = argparse.ArgumentParser(description="Run equipment prediction")
parser.add_argument(
"--assetnum", "-a",
type=str,
default=None,
help="Specific assetnum to predict. If not provided, predicts all assets."
)
args = parser.parse_args()
asyncio.run(
main(assetnum=args.assetnum)
)

@ -606,9 +606,7 @@ async def insert_lcca_maximo_corrective_data():
year, # tahun
seq, # seq
(
(1 if year <= current_year - 1 else 0)
if datetime.now().month >= 12
else (0 if year > current_year + 1 else 1)
1 if year < current_year else 0
), # is_actual
raw_cm_material_cost, # raw_cm_material_cost
raw_cm_labor_time, # raw_cm_labor_time
@ -1053,7 +1051,7 @@ async def query_data(target_assetnum: str = None):
assetnum,
year,
seq,
1,
1 if year < current_year else 0,
row_values["raw_cm_interval"],
row_values["raw_cm_material_cost"],
row_values["raw_cm_labor_time"],
@ -1086,7 +1084,7 @@ async def query_data(target_assetnum: str = None):
update_query,
(
seq,
1,
1 if year < current_year else 0,
row_values["raw_cm_interval"],
row_values["raw_cm_material_cost"],
row_values["raw_cm_labor_time"],

Loading…
Cancel
Save