diff --git a/src/equipment/router.py b/src/equipment/router.py index c78ac63..23c7aa0 100644 --- a/src/equipment/router.py +++ b/src/equipment/router.py @@ -6,6 +6,7 @@ import json from src.equipment.model import Equipment, EquipmentTransactionRecords from src.equipment.schema import ( EquipmentBase, + EquipmentDataMaster, EquipmentPagination, EquipmentRead, EquipmentCreate, @@ -248,7 +249,7 @@ async def get_equipment(db_session: DbSession, collector_db_session: CollectorDb ) -@router.post("", response_model=StandardResponse[EquipmentCreate]) +@router.post("", response_model=StandardResponse[EquipmentDataMaster]) async def create_equipment( db_session: DbSession, equipment_in: EquipmentCreate, @@ -263,7 +264,7 @@ async def create_equipment( return StandardResponse(data=equipment, message="Data created successfully") -@router.put("/{assetnum}", response_model=StandardResponse[EquipmentUpdate]) +@router.put("/{assetnum}", response_model=StandardResponse[EquipmentDataMaster]) async def update_equipment( db_session: DbSession, assetnum: str, @@ -291,7 +292,7 @@ async def update_equipment( ) -@router.delete("/{equipment_id}", response_model=StandardResponse[EquipmentBase]) +@router.delete("/{equipment_id}", response_model=StandardResponse[EquipmentDataMaster]) async def delete_equipment(db_session: DbSession, equipment_id: str): equipment = await get_by_id(db_session=db_session, equipment_id=equipment_id) diff --git a/src/equipment/schema.py b/src/equipment/schema.py index eebd2c4..0ede5a4 100644 --- a/src/equipment/schema.py +++ b/src/equipment/schema.py @@ -95,7 +95,6 @@ class EquipmentCreate(EquipmentBase): class EquipmentUpdate(EquipmentBase): pass - class EquipmentRead(DefaultBase): equipment_master_record: EquipmentMasterBase equipment_data: EquipmentBase diff --git a/src/modules/equipment/Prediksi.py b/src/modules/equipment/Prediksi.py index c4fd0a7..b23dd56 100644 --- a/src/modules/equipment/Prediksi.py +++ b/src/modules/equipment/Prediksi.py @@ -38,10 +38,7 @@ class Prediksi: def __get_param(self, equipment_id): try: # Mendapatkan koneksi dari config.py - connections = get_connection() - connection = ( - connections[0] if isinstance(connections, tuple) else connections - ) + connection, connection_wo_db = get_connection() if connection is None: print("Database connection failed.") return None @@ -69,10 +66,7 @@ class Prediksi: def __fetch_data_from_db(self, equipment_id): try: # Get connection from config.py (using only the first connection) - connections = get_connection() - connection = ( - connections[0] if isinstance(connections, tuple) else connections - ) + connection, connection_wo_db = get_connection() if connection is None: print("Database connection failed.") return None @@ -159,10 +153,7 @@ class Prediksi: # Fungsi untuk menyimpan data proyeksi ke database async def __insert_predictions_to_db(self, data, equipment_id, token): try: - connections = get_connection() - connection = ( - connections[0] if isinstance(connections, tuple) else connections - ) + connection, connection_wo_db = get_connection() if connection is None: print("Database connection failed.") return None @@ -320,11 +311,8 @@ class Prediksi: def __get_asset_criticality_params(self, equipment_id): try: - connections = get_connection() + connection, connection_wo_db = get_connection() efdh_foh_sum = None - connection = ( - connections[0] if isinstance(connections, tuple) else connections - ) if connection is None: print("Database connection failed.") return None @@ -418,28 +406,42 @@ class Prediksi: # Fungsi untuk menghapus data proyeksi pada tahun tertentu def __update_data_lcc(self, equipment_id): try: - connections = get_connection() + connection, connection_wo_db = get_connection() production_connection = get_production_connection() - connection = ( - connections[0] if isinstance(connections, tuple) else connections - ) - production_connection = ( - production_connection[0] if isinstance(production_connection, tuple) else production_connection - ) if connection is None or production_connection is None: print("Database connection failed.") return None cursor = connection.cursor(cursor_factory=DictCursor) + # --- OPTIMIZATION START: Fetch static data ONCE --- + + # Fetch man_hour_rate (constant for all years currently based on code logic) + man_hour_rate = 0.0 + try: + prod_cur = production_connection.cursor() + prod_cur.execute("""SELECT value_num FROM lcc_ms_master WHERE name='manhours_rate'""") + r2 = prod_cur.fetchone() + if r2 and r2[0] is not None: + man_hour_rate = float(r2[0]) + prod_cur.close() + except Exception: + pass + + # Fetch Asset Criticality (constant for the asset) + asset_criticality_data = self.__get_asset_criticality_params(equipment_id) + ac = asset_criticality_data if isinstance(asset_criticality_data, dict) else {} + asset_criticality_value = float(ac.get("asset_criticality", 0.0)) + + # --- OPTIMIZATION END --- + # Ambil semua baris untuk assetnum select_q = ''' SELECT id, seq, tahun, - raw_cm_interval, raw_cm_material_cost, rc_cm_labor_cost, - raw_pm_interval, raw_pm_material_cost, rc_pm_labor_cost, - raw_predictive_interval, raw_predictive_material_cost, rc_predictive_labor_cost, - raw_oh_interval, raw_oh_material_cost, rc_oh_labor_cost, - raw_predictive_interval, raw_predictive_material_cost, rc_predictive_labor_cost, + raw_cm_interval, raw_cm_material_cost, rc_cm_labor_cost, rc_cm_material_cost, + raw_pm_interval, raw_pm_material_cost, rc_pm_labor_cost, rc_pm_material_cost, + raw_predictive_interval, raw_predictive_material_cost, rc_predictive_labor_cost, rc_predictive_material_cost, + raw_oh_interval, raw_oh_material_cost, rc_oh_labor_cost, rc_oh_material_cost, efdh_equivalent_forced_derated_hours, foh_forced_outage_hours FROM lcc_equipment_tr_data WHERE assetnum = %s; @@ -447,24 +449,6 @@ class Prediksi: cursor.execute(select_q, (equipment_id,)) rows = cursor.fetchall() - # Helper to get man_hour for a year (fallback to master 'manhours_rate') - def _get_man_hour_for_year(year): - try: - # cur = connection.cursor() - prod_cur = production_connection.cursor() - # cur.execute("SELECT man_hour FROM lcc_ms_year_data WHERE year = %s", (year,)) - # r = cur.fetchone() - # if r and r[0] is not None: - # return float(r[0]) - # cur.execute("SELECT value_num FROM lcc_ms_master WHERE name='manhours_rate'") - prod_cur.execute("""SELECT value_num FROM lcc_ms_master WHERE name='manhours_rate'""") - r2 = prod_cur.fetchone() - if r2 and r2[0] is not None: - return float(r2[0]) - except Exception: - pass - return 0.0 - update_q = ''' UPDATE lcc_equipment_tr_data SET rc_cm_material_cost = %s, @@ -479,15 +463,15 @@ class Prediksi: updated_by = 'Sys', updated_at = NOW() WHERE id = %s; ''' + + batch_params = [] for r in rows: try: - yr = r.get("tahun") if isinstance(r, dict) else r[2] - man_hour = _get_man_hour_for_year(yr) - - seq = int(r.get("seq") or 0) if isinstance(r, dict) else int(r[1] or 0) - + # yr = r.get("tahun") if isinstance(r, dict) else r[2] + # man_hour = man_hour_rate # Used pre-fetched value + # seq = int(r.get("seq") or 0) if isinstance(r, dict) else int(r[1] or 0) raw_pm_material_cost = float(r.get("raw_pm_material_cost") or 0.0) raw_predictive_material_cost = float(r.get("raw_predictive_material_cost") or 0.0) @@ -506,39 +490,21 @@ class Prediksi: rc_oh_labor = float(r.get("rc_oh_labor_cost") or 0.0) try: - # if np.isfinite(raw_pm_interval) and raw_pm_interval != 0: - # rc_pm_material = raw_pm_material_cost * raw_pm_interval - # else: - rc_pm_material = raw_pm_material_cost + rc_pm_material = raw_pm_material_cost if raw_pm_material_cost > 0 else float(r.get("rc_pm_material_cost") or 0.0) except Exception: rc_pm_material = 0.0 try: - # if np.isfinite(raw_predictive_interval) and raw_predictive_interval != 0: - # rc_predictive_material = raw_predictive_material_cost * raw_predictive_interval - # else: - rc_predictive_material = raw_predictive_material_cost + rc_predictive_material = raw_predictive_material_cost if raw_predictive_material_cost > 0 else float(r.get("rc_predictive_material_cost") or 0.0) except Exception: rc_predictive_material = 0.0 - rc_oh_material = raw_oh_material_cost + rc_oh_material = raw_oh_material_cost if raw_oh_material_cost > 0 else float(r.get("rc_oh_material_cost") or 0.0) - asset_criticality_data = self.__get_asset_criticality_params(equipment_id) - asset_criticality_value = 0.0 - # Simplify extraction and avoid repeating the multiplication - ac = asset_criticality_data if isinstance(asset_criticality_data, dict) else {} efdh_foh_sum = efdh_equivalent_forced_derated_hours + foh_forced_outage_hours if efdh_equivalent_forced_derated_hours and foh_forced_outage_hours else 0.0 - # try: - # efdh_foh_sum = float(ac.get("efdh_foh_sum", 0.0)) - # except Exception: - # efdh_foh_sum = 0.0 - try: - asset_criticality_value = float(ac.get("asset_criticality", 0.0)) - except Exception: - asset_criticality_value = 0.0 - + # single multiplier used for all RC groups ac_multiplier = efdh_foh_sum * asset_criticality_value @@ -551,24 +517,26 @@ class Prediksi: id_val = r.get("id") if isinstance(r, dict) else r[0] - cursor.execute( - update_q, - ( - rc_cm_material, - rc_cm_labor, - rc_pm_material, - rc_pm_labor, - rc_predictive_material, - rc_predictive_labor, - rc_oh_material, - rc_oh_labor, - total, - id_val, - ), - ) + batch_params.append(( + rc_cm_material, + rc_cm_labor, + rc_pm_material, + rc_pm_labor, + rc_predictive_material, + rc_predictive_labor, + rc_oh_material, + rc_oh_labor, + total, + id_val + )) + except Exception: # ignore row-specific errors and continue continue + + # Execute Batch Update + if batch_params: + cursor.executemany(update_q, batch_params) # For seq=0 rows, set rc_total_cost to acquisition_cost cursor.execute( @@ -584,14 +552,16 @@ class Prediksi: finally: if connection: connection.close() + if 'production_connection' in locals() and production_connection: + try: + production_connection.close() + except Exception: + pass # Fungsi untuk mengambil parameter dari database def __get_rate_and_max_year(self, equipment_id): try: - connections = get_connection() - connection = ( - connections[0] if isinstance(connections, tuple) else connections - ) + connection, connection_wo_db = get_connection() if connection is None: print("Database connection failed.") return None @@ -751,13 +721,10 @@ class Prediksi: print(f"HTTP error occurred: {e}") return {} - def __get_man_hour_rate(self, staff_level: str = "junior"): + def __get_man_hour_rate(self, staff_level: str = "Junior"): connection = None try: - connections = get_connection() - connection = ( - connections[0] if isinstance(connections, tuple) else connections - ) + connection, connection_wo_db = get_connection() if connection is None: return 0.0 @@ -829,6 +796,7 @@ class Prediksi: # Mendapatkan rate dan tahun maksimal rate, max_year = self.__get_rate_and_max_year(assetnum) man_hour_rate = self.__get_man_hour_rate() # Defaults to 'junior' + pmt = 0 # Prediksi untuk setiap kolom @@ -841,7 +809,6 @@ class Prediksi: try: # Case untuk kolom yang terkait dengan corrective maintenance (cm) if "cm" in col_lower: - recent_df = df recent_n = df.shape[0] @@ -852,7 +819,7 @@ class Prediksi: .head(recent_n)[column] .dropna() ) - # print(f"Recent Vals: {recent_vals}") + # Fallback ke semua nilai non-na jika tidak ada recent_vals if recent_vals.empty: recent_vals = df[column].dropna() @@ -865,12 +832,18 @@ class Prediksi: # Interval from number of failures interval = 0.0 if isinstance(failures_data, dict): - val = failures_data.get("data") - if val is not None: - try: - interval = float(val) - except Exception: - interval = 0.0 + data_list = failures_data.get("data") + # data is a list of objects, extract num_fail from first item + if isinstance(data_list, list) and len(data_list) > 0: + first_item = data_list[0] + if isinstance(first_item, dict): + num_fail = first_item.get("num_fail") + if num_fail is not None: + try: + interval = float(num_fail) + print(f"interval for year {yr}: {interval}") + except Exception: + interval = 0.0 # interval * labor_time(3) * labor_human(1) * man_hour_rate cost = rc_labor_cost(interval, 3.0, 1.0, man_hour_rate) @@ -883,11 +856,10 @@ class Prediksi: else: avg = pd.to_numeric(recent_vals, errors="coerce").fillna(0).mean() avg = 0.0 if pd.isna(avg) else float(avg) - preds = np.repeat(float(avg), n_future) - # print(preds) + else: - # Untuk kolom non-cm, gunakan nilai dari last actual year bila ada, + # Для kolom non-cm, gunakan nilai dari last actual year bila ada, # jika tidak ada gunakan last available non-NA value, jika tidak ada pakai 0.0 if "is_actual" in df.columns and not df[df["is_actual"] == 1].empty: last_actual_year_series = df[df["is_actual"] == 1]["year"] @@ -900,6 +872,7 @@ class Prediksi: last_actual_year = int(df["year"].max()) row_vals = df[df["year"] == last_actual_year] + value = None if not row_vals.empty: @@ -920,7 +893,7 @@ class Prediksi: value = 0.0 else: value = 0.0 - + preds = np.repeat(float(value), n_future) except Exception: @@ -1067,8 +1040,7 @@ async def main(RELIABILITY_APP_URL=RELIABILITY_APP_URL, assetnum=None, token=Non return # Otherwise fetch all assetnums from DB and loop - connections = get_connection() - connection = connections[0] if isinstance(connections, tuple) else connections + connection, connection_wo_db = get_connection() if connection is None: print("Database connection failed.") return @@ -1077,17 +1049,37 @@ async def main(RELIABILITY_APP_URL=RELIABILITY_APP_URL, assetnum=None, token=Non query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master" cursor.execute(query_main) results = cursor.fetchall() + + # Close connection early as we have the data and don't need it for the async loop + if connection: + connection.close() + connection = None + # Concurrency limit to prevent overwhelming DB/API + MAX_CONCURRENT_TASKS = 10 + sem = asyncio.Semaphore(MAX_CONCURRENT_TASKS) + + total_assets = len(results) + print(f"Starting prediction for {total_assets} assets with {MAX_CONCURRENT_TASKS} concurrent tasks.") + + async def bound_predict(idx, asset): + async with sem: + try: + if not asset or str(asset).strip() == "": + print(f"[{idx}/{total_assets}] Skipping empty assetnum") + return + + print(f"[{idx}/{total_assets}] Predicting assetnum: {asset}") + await prediksi.predict_equipment_data(asset, prediksi.access_token) + except Exception as e: + print(f"Error Predicting {asset}: {e}") + + tasks = [] for idx, row in enumerate(results, start=1): current_asset = row.get("assetnum") if hasattr(row, "get") else row[0] - if not current_asset or str(current_asset).strip() == "": - print(f"[{idx}/{len(results)}] Skipping empty assetnum") - continue - print(f"[{idx}/{len(results)}] Predicting assetnum: {current_asset}") - try: - await prediksi.predict_equipment_data(current_asset, prediksi.access_token) - except Exception as e: - print(f"Error Predicting {current_asset}: {e}") + tasks.append(bound_predict(idx, current_asset)) + + await asyncio.gather(*tasks) print("Selesai.") except Exception as e: @@ -1098,7 +1090,17 @@ async def main(RELIABILITY_APP_URL=RELIABILITY_APP_URL, assetnum=None, token=Non connection.close() if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="Run equipment prediction") + parser.add_argument( + "--assetnum", "-a", + type=str, + default=None, + help="Specific assetnum to predict. If not provided, predicts all assets." + ) + args = parser.parse_args() + asyncio.run( - main() + main(assetnum=args.assetnum) ) - diff --git a/src/modules/equipment/__pycache__/Prediksi.cpython-311.pyc b/src/modules/equipment/__pycache__/Prediksi.cpython-311.pyc index 4771647..4eed65b 100644 Binary files a/src/modules/equipment/__pycache__/Prediksi.cpython-311.pyc and b/src/modules/equipment/__pycache__/Prediksi.cpython-311.pyc differ diff --git a/src/modules/equipment/__pycache__/insert_actual_data.cpython-311.pyc b/src/modules/equipment/__pycache__/insert_actual_data.cpython-311.pyc index e1ed601..93630fa 100644 Binary files a/src/modules/equipment/__pycache__/insert_actual_data.cpython-311.pyc and b/src/modules/equipment/__pycache__/insert_actual_data.cpython-311.pyc differ diff --git a/src/modules/equipment/__pycache__/run.cpython-311.pyc b/src/modules/equipment/__pycache__/run.cpython-311.pyc index ef5c1fd..f5f23d6 100644 Binary files a/src/modules/equipment/__pycache__/run.cpython-311.pyc and b/src/modules/equipment/__pycache__/run.cpython-311.pyc differ diff --git a/src/modules/equipment/insert_actual_data.py b/src/modules/equipment/insert_actual_data.py index 5f587fd..f7ad447 100644 --- a/src/modules/equipment/insert_actual_data.py +++ b/src/modules/equipment/insert_actual_data.py @@ -606,9 +606,7 @@ async def insert_lcca_maximo_corrective_data(): year, # tahun seq, # seq ( - (1 if year <= current_year - 1 else 0) - if datetime.now().month >= 12 - else (0 if year > current_year + 1 else 1) + 1 if year < current_year else 0 ), # is_actual raw_cm_material_cost, # raw_cm_material_cost raw_cm_labor_time, # raw_cm_labor_time @@ -1053,7 +1051,7 @@ async def query_data(target_assetnum: str = None): assetnum, year, seq, - 1, + 1 if year < current_year else 0, row_values["raw_cm_interval"], row_values["raw_cm_material_cost"], row_values["raw_cm_labor_time"], @@ -1086,7 +1084,7 @@ async def query_data(target_assetnum: str = None): update_query, ( seq, - 1, + 1 if year < current_year else 0, row_values["raw_cm_interval"], row_values["raw_cm_material_cost"], row_values["raw_cm_labor_time"],