diff --git a/src/modules/config.py b/src/modules/config.py index 5f57c73..68ea4ec 100644 --- a/src/modules/config.py +++ b/src/modules/config.py @@ -1,5 +1,19 @@ import psycopg2 +def get_production_connection(): + try: + # Konfigurasi koneksi database produksi + production_connection = psycopg2.connect( + dbname="digital_twin", + user="digital_twin", + password="Pr0jec7@D!g!tTwiN", + host="192.168.1.82", + port="1111", + ) + return production_connection + except Exception as e: + print("Error saat koneksi ke database produksi:", e) + return None def get_connection(): try: diff --git a/src/modules/equipment/Prediksi.py b/src/modules/equipment/Prediksi.py index c3d34e1..1acc5eb 100644 --- a/src/modules/equipment/Prediksi.py +++ b/src/modules/equipment/Prediksi.py @@ -1,4 +1,5 @@ import os +import asyncio import pandas as pd import numpy as np import numpy_financial as npf # Gunakan numpy-financial untuk fungsi keuangan @@ -22,8 +23,11 @@ load_dotenv() class Prediksi: - def __init__(self, RELIABILITY_APP_URL): - self.RELIABILITY_APP_URL = RELIABILITY_APP_URL + def __init__(self, RELIABILITY_APP_URL=None): + # Allow passing the URL or fallback to environment/default so callers can omit the parameter + self.RELIABILITY_APP_URL = RELIABILITY_APP_URL or os.getenv( + "RELIABILITY_APP_URL", "http://192.168.1.82:8000/reliability" + ) # Fungsi untuk mengambil data dari database def __get_param(self, equipment_id): @@ -42,7 +46,7 @@ class Prediksi: # Query untuk mendapatkan data query = """ SELECT - (select COALESCE(forecasting_target_year, 2060) from lcc_ms_equipment_data where assetnum = %s) AS forecasting_target_year + (select COALESCE(forecasting_target_year, 2056) from lcc_ms_equipment_data where assetnum = %s) AS forecasting_target_year """ cursor.execute(query, (equipment_id,)) par1 = cursor.fetchone() @@ -79,12 +83,18 @@ class Prediksi: raw_cm_material_cost AS cm_cost, raw_cm_labor_time AS cm_labor_time, raw_cm_labor_human AS cm_labor_human, + raw_pm_interval AS pm_interval, raw_pm_material_cost AS pm_cost, raw_pm_labor_time AS pm_labor_time, raw_pm_labor_human AS pm_labor_human, + raw_oh_interval AS oh_interval, raw_oh_material_cost AS oh_cost, raw_oh_labor_time AS oh_labor_time, raw_oh_labor_human AS oh_labor_human, + raw_predictive_material_cost AS predictive_material_cost, + raw_predictive_labor_time AS predictive_labor_time, + raw_predictive_labor_human AS predictive_labor_human, + raw_predictive_interval AS predictive_interval, "raw_loss_output_MW" AS loss_output_mw, raw_loss_output_price AS loss_price FROM lcc_equipment_tr_data @@ -180,11 +190,12 @@ class Prediksi: tahun, assetnum, raw_cm_interval, raw_cm_material_cost, raw_cm_labor_time, raw_cm_labor_human, raw_pm_material_cost, raw_pm_labor_time, raw_pm_labor_human, - raw_oh_material_cost, raw_oh_labor_time, raw_oh_labor_human, + raw_oh_interval, raw_oh_material_cost, raw_oh_labor_time, raw_oh_labor_human, + raw_predictive_interval, raw_predictive_material_cost, raw_predictive_labor_time, raw_predictive_labor_human, "raw_loss_output_MW", raw_loss_output_price , created_by, created_at ) VALUES ( - %s, %s, 0, 1, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'Sys', NOW() + %s, %s, 0, 1, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'Sys', NOW() ) """ @@ -239,9 +250,14 @@ class Prediksi: float(row["pm_cost"]), float(row["pm_labor_time"]), float(row["pm_labor_human"]), + float(row["oh_interval"]), float(row["oh_cost"]), float(row["oh_labor_time"]), float(row["oh_labor_human"]), + float(row["predictive_interval"]), + float(row["predictive_material_cost"]), + float(row["predictive_labor_time"]), + float(row["predictive_labor_human"]), float(row["loss_output_mw"]), float(row["loss_price"]), ) @@ -260,7 +276,7 @@ class Prediksi: connection.close() # Fungsi untuk menghapus data proyeksi pada tahun tertentu - def __update_date_lcc(self, equipment_id): + def __update_data_lcc(self, equipment_id): try: connections = get_connection() connection = ( @@ -368,16 +384,16 @@ class Prediksi: # ====================================================================================================================================================== - async def predict_equipment_data(self, p_equipment_id, token): + async def predict_equipment_data(self, assetnum, token): try: # Mengambil data dari database - df = self.__fetch_data_from_db(p_equipment_id) + df = self.__fetch_data_from_db(assetnum) if df is None: print("Data tidak tersedia untuk prediksi.") return # Mendapatkan tahun proyeksi dari DB - par_tahun_target = self.__get_param(p_equipment_id) + par_tahun_target = self.__get_param(assetnum) # Tahun proyeksi future_years = list(range(df["year"].max() + 1, par_tahun_target + 1)) @@ -420,7 +436,7 @@ class Prediksi: return np.abs(preds) # Mendapatkan rate dan tahun maksimal - rate, max_year = self.__get_rate_and_max_year(p_equipment_id) + rate, max_year = self.__get_rate_and_max_year(assetnum) pmt = 0 # Prediksi untuk setiap kolom @@ -450,34 +466,60 @@ class Prediksi: predictions_df = pd.DataFrame(predictions) # print(predictions_df) # Hapus data prediksi yang ada sebelumnya - self.__delete_predictions_from_db(p_equipment_id) + self.__delete_predictions_from_db(assetnum) # Insert hasil prediksi ke database try: await self.__insert_predictions_to_db( - predictions_df, p_equipment_id, token + predictions_df, assetnum, token ) except Exception as e: print(f"Error saat insert data ke database: {e}") # self.__insert_predictions_to_db(predictions_df, p_equipment_id) # Update data untuk total RiskCost per tahun - self.__update_date_lcc(p_equipment_id) + self.__update_data_lcc(assetnum) except Exception as e: print(f"Program dihentikan: {e}") -import asyncio +RELIABILITY_APP_URL = os.getenv("RELIABILITY_APP_URL", "http://192.168.1.82:8000/reliability") +async def main(RELIABILITY_APP_URL=RELIABILITY_APP_URL): + try: + connections = get_connection() + connection = connections[0] if isinstance(connections, tuple) else connections + if connection is None: + print("Database connection failed.") + return -if __name__ == "__main__": + cursor = connection.cursor(cursor_factory=DictCursor) + query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master" + cursor.execute(query_main) + results = cursor.fetchall() - async def main(): - prediksi = Prediksi() - await prediksi.predict_equipment_data( - "A22277", - token="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTczOTUxODc4Ni4yOTM5ODUsImp0aSI6Ilo5clRUOFhGa3RweFZUQlBmNGxvRmciLCJ0eXBlIjoiYWNjZXNzIiwic3ViIjoiNWUxNmY4YTgtMWEwMy00MTVjLWIwZjItMTVmZjczOWY1OGE4IiwibmJmIjoxNzM5NTE4Nzg2LCJjc3JmIjoiZWI0MjAzOTMtYTg1ZS00NDJjLWIyMjItZTU5MGU5MGVkYjkyIiwiZXhwIjoxNzM5NjA1MTg2LCJub25jZSI6IjVkZDdhOGYyMWIzZWUxZDZmYmI1YThhMDBlMmYyYjczIn0.3Jv943cU5FuxJ9K92JmVoOtTBqexF4Dke8TrrC4l0Uk", - ) - print("Selesai.") - asyncio.run(main()) + prediksi = Prediksi(RELIABILITY_APP_URL) + token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTc2MjQxODk5My4xNzI4NTYsImp0aSI6ImJ1OU0xQVlLSTZENTd2cC1OaDgtUlEiLCJ0eXBlIjoiYWNjZXNzIiwic3ViIjoiMzg1NzJhOTItZjE2Yy00MWIyLThjNmYtYWZhNTcyMzhhNWU3IiwibmJmIjoxNzYyNDE4OTkzLCJjc3JmIjoiNjY5NzVjNDEtNTg0ZS00OGFkLWJjMmItMDNlZDEyZDM2ZDczIiwiZXhwIjoxNzYyNDI2MTkzLCJub25jZSI6ImYzMThkNDVkNmYzZWRjMzNiN2Q0MmE0MGRkNDJkNDRhIn0.elDnyaoeJ48oOIUdMRZjt7gGICmK-2Awg6Rbl_BZ1PQ" + + for idx, row in enumerate(results, start=1): + assetnum = row.get("assetnum") if hasattr(row, "get") else row[0] + if not assetnum or str(assetnum).strip() == "": + print(f"[{idx}/{len(results)}] Skipping empty assetnum") + continue + print(f"[{idx}/{len(results)}] Processing assetnum: {assetnum}") + try: + await prediksi.predict_equipment_data(assetnum, token) + except Exception as e: + print(f"Error processing {assetnum}: {e}") + + print("Selesai.") + except Exception as e: + print(f"Error getting database connection: {e}") + return + except Exception as e: + print(f"Error getting database connection: {e}") + return + + +asyncio.run(main()) diff --git a/src/modules/equipment/__pycache__/Prediksi.cpython-311.pyc b/src/modules/equipment/__pycache__/Prediksi.cpython-311.pyc index 3543fb9..2d3140f 100644 Binary files a/src/modules/equipment/__pycache__/Prediksi.cpython-311.pyc and b/src/modules/equipment/__pycache__/Prediksi.cpython-311.pyc differ diff --git a/src/modules/equipment/__pycache__/insert_actual_data.cpython-311.pyc b/src/modules/equipment/__pycache__/insert_actual_data.cpython-311.pyc index de99de0..651c30a 100644 Binary files a/src/modules/equipment/__pycache__/insert_actual_data.cpython-311.pyc and b/src/modules/equipment/__pycache__/insert_actual_data.cpython-311.pyc differ diff --git a/src/modules/equipment/insert_actual_data.py b/src/modules/equipment/insert_actual_data.py index 4d03c7f..3ed32ea 100644 --- a/src/modules/equipment/insert_actual_data.py +++ b/src/modules/equipment/insert_actual_data.py @@ -7,7 +7,7 @@ import os import httpx sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) -from config import get_connection +from config import get_connection, get_production_connection async def fetch_api_data( @@ -37,43 +37,96 @@ def get_recursive_query(cursor, assetnum, worktype="CM"): Fungsi untuk menjalankan query rekursif berdasarkan assetnum dan worktype. worktype memiliki nilai default 'CM'. """ + # query = f""" + # SELECT + # ROW_NUMBER() OVER (ORDER BY tbl.assetnum, tbl.year, tbl.worktype) AS seq, + # * + # FROM ( + # SELECT + # a.worktype, + # a.assetnum, + # EXTRACT(YEAR FROM a.reportdate) AS year, + # COUNT(a.wonum) AS raw_corrective_failure_interval, + # SUM(a.total_cost_max) AS raw_corrective_material_cost, + # ROUND( + # SUM( + # EXTRACT(EPOCH FROM ( + # a.actfinish - + # a.actstart + # )) + # ) / 3600 + # , 2) AS raw_corrective_labor_time_jam, + # SUM(a.jumlah_labor) AS raw_corrective_labor_technician + # FROM + # public.wo_staging_3 AS a + # WHERE + # a.unit = '3' + # GROUP BY + # a.worktype, + # a.assetnum, + # EXTRACT(YEAR FROM a.reportdate) + # ) AS tbl + # WHERE + # tbl.worktype = '{worktype}' + # AND tbl.assetnum = '{assetnum}' + # ORDER BY + # tbl.assetnum, + # tbl.year, + # tbl.worktype + # """ +# query = f""" +# select d.tahun, SUM(d.actmatcost) AS raw_corrective_material_cost, sum(d.man_hour) as man_hour_peryear from +# ( +# SELECT +# a.wonum, +# a.actmatcost, +# DATE_PART('year', a.reportdate) AS tahun, +# ( +# ROUND(SUM(EXTRACT(EPOCH FROM (a.actfinish - a.actstart)) / 3600), 2) +# ) AS man_hour, +# CASE +# WHEN COUNT(b.laborcode) = 0 THEN 3 +# ELSE COUNT(b.laborcode) +# END AS man_count +# FROM public.wo_maximo AS a +# LEFT JOIN public.wo_maximo_labtrans AS b +# ON b.wonum = a.wonum +# WHERE +# a.asset_unit = '3' +# AND a.worktype = '{worktype}' +# AND a.asset_assetnum = '{assetnum}' +# and a.wonum not like 'T%' +# GROUP BY +# a.wonum, +# a.actmatcost, +# DATE_PART('year', a.reportdate) +# ) as d group by d.tahun +# ; +# """ + query = f""" - SELECT - ROW_NUMBER() OVER (ORDER BY tbl.assetnum, tbl.year, tbl.worktype) AS seq, - * - FROM ( - SELECT - a.worktype, - a.assetnum, - EXTRACT(YEAR FROM a.reportdate) AS year, - COUNT(a.wonum) AS raw_corrective_failure_interval, - SUM(a.total_cost_max) AS raw_corrective_material_cost, - ROUND( - SUM( - EXTRACT(EPOCH FROM ( - a.actfinish - - a.actstart - )) - ) / 3600 - , 2) AS raw_corrective_labor_time_jam, - SUM(a.jumlah_labor) AS raw_corrective_labor_technician - FROM - public.wo_staging_3 AS a - WHERE - a.unit = '3' - GROUP BY - a.worktype, - a.assetnum, - EXTRACT(YEAR FROM a.reportdate) - ) AS tbl - WHERE - tbl.worktype = '{worktype}' - AND tbl.assetnum = '{assetnum}' - ORDER BY - tbl.assetnum, - tbl.year, - tbl.worktype - """ + select + DATE_PART('year', a.reportdate) as tahun, + COUNT(a.wonum) as raw_{worktype.lower()}_interval, + sum(a.actmatcost) as raw_{worktype.lower()}_material_cost, + ( + ROUND(SUM(EXTRACT(EPOCH FROM (a.actfinish - a.actstart)) / 3600), 2) + ) AS raw_{worktype.lower()}_labor_time, + CASE + WHEN COUNT(b.laborcode) = 0 THEN 3 + ELSE COUNT(b.laborcode) + END AS raw_{worktype.lower()}_labor_human +from public.wo_maximo as a +LEFT JOIN public.wo_maximo_labtrans AS b + ON b.wonum = a.wonum +where + a.asset_unit = '3' + {f"AND a.worktype = '{worktype}'" if worktype != 'CM' else "AND a.worktype in ('CM', 'PROACTIVE', 'WA')"} + AND a.asset_assetnum = '{assetnum}' + and a.wonum not like 'T%' + {f"AND a.wojp8 != 'S1'" if worktype == 'CM' else ""} +group by DATE_PART('year', a.reportdate); +""" # Eksekusi query dan fetch hasil cursor.execute(query) return cursor.fetchall() @@ -87,25 +140,247 @@ def get_data_tahun(cursor): cursor.execute(query) return cursor.fetchall() +async def insert_ms_equipment_data(): + connection = None + try: + connection, connection_wo_db = get_connection() + cursor_db_app = connection.cursor(cursor_factory=DictCursor) + query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master" + cursor_db_app.execute(query_main) + results = cursor_db_app.fetchall() + + inserted = 0 + processed = 0 + total = len(results) + if total == 0: + print("No assetnum to insert.") + else: + start_time = datetime.now() + print(f"Starting insert of {total} assetnum into lcc_ms_equipment_data") + for idx, row in enumerate(results, start=1): + assetnum = row.get("assetnum") + try: + # skip null/empty assetnum + if not assetnum: + print(f"[{idx}/{total}] Skipping empty assetnum") + else: + # check existing + cursor_db_app.execute( + "SELECT 1 FROM lcc_ms_equipment_data WHERE assetnum = %s LIMIT 1", + (assetnum,), + ) + if cursor_db_app.fetchone(): + print(f"[{idx}/{total}] Already exists: {assetnum}") + else: + # provide an id since the table enforces NOT NULL on id + cursor_db_app.execute( + "INSERT INTO lcc_ms_equipment_data (id, assetnum) VALUES (%s, %s)", + (str(uuid4()), assetnum), + ) + connection.commit() + inserted += 1 + print(f"[{idx}/{total}] Inserted: {assetnum}") + except Exception as e: + try: + connection.rollback() + except Exception: + pass + print(f"[{idx}/{total}] Error inserting {assetnum}: {e}") + processed += 1 + + # progress monitoring every 10 items and at end + if idx % 10 == 0 or idx == total: + elapsed = datetime.now() - start_time + pct = (idx / total) * 100 if total else 100 + print( + f"Progress: {idx}/{total} ({pct:.1f}%) - processed {processed}, inserted {inserted} - elapsed {elapsed.total_seconds():.1f}s" + ) + + print(f"Finished. Total processed: {processed}, inserted: {inserted}") + + except Exception as e: + print("Error saat menjalankan insert_ms_equipment_data:", e) + try: + connection.rollback() + except Exception: + pass + pass + +async def insert_lcca_maximo_corrective_data(): + connection = None + connection_wo_db = None + production_connection = None + finished_data = [] + errors = [] + inserted_count = 0 + error_count = 0 + try: + connection, connection_wo_db = get_connection() + production_connection = get_production_connection() + if connection is None or connection_wo_db is None or production_connection is None: + print("Database connection failed.") + return + + # start total timer + start_time = datetime.now() + print(f"Start insert_lcca_maximo_corrective_data at {start_time.isoformat()}") + + cursor_db_app = connection.cursor(cursor_factory=DictCursor) + cursor_wo = connection_wo_db.cursor(cursor_factory=DictCursor) + cursor_production = production_connection.cursor(cursor_factory=DictCursor) + + check_data_query = "SELECT COUNT(*) FROM lcc_equipment_tr_data LIMIT 1" + cursor_db_app.execute(check_data_query) + data_count = cursor_db_app.fetchone()[0] + + if data_count > 0: + truncate_query = "TRUNCATE TABLE lcc_equipment_tr_data" + cursor_db_app.execute(truncate_query) + + query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master" + cursor_db_app.execute(query_main) + results = cursor_db_app.fetchall() + + if not results: + print("No assetnum found in ms_equipment_master") + return + + print(f"Found {len(results)} assetnum entries to process.") + + current_year = datetime.now().year + + for row in results: + asset_start = datetime.now() + assetnum = row["assetnum"] + + data_corrective_maintenance = get_recursive_query( + cursor_production, assetnum, worktype="CM" + ) + print(data_corrective_maintenance) + start_year = 2015 + end_year = 2056 + seq = 0 + for year in range(start_year, end_year): + # corrective_row = next( + # (r for r in data_corrective_maintenance if r["tahun"] == year), None + # ) + corrective_row = next( + (r for r in data_corrective_maintenance), None + ) + # if corrective_row: + insert_query = """ + INSERT INTO lcc_equipment_tr_data ( + id, assetnum, tahun, seq, is_actual, + raw_cm_material_cost, + raw_cm_labor_time, rc_cm_material_cost + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + """ + try: + # find corrective_row for the specific year (may be None) + # don't filter by tahun — take rows sequentially for each year in the range + corrective_row = ( + data_corrective_maintenance[seq] + if isinstance(data_corrective_maintenance, (list, tuple)) and seq < len(data_corrective_maintenance) + else None + ) + raw_cm_material_cost = ( + corrective_row["raw_corrective_material_cost"] + if corrective_row and corrective_row.get("raw_corrective_material_cost") is not None + else 0 + ) + raw_cm_labor_time = ( + corrective_row["man_hour_peryear"] + if corrective_row and corrective_row.get("man_hour_peryear") is not None + else 0 + ) + rc_cm_material_cost = raw_cm_material_cost + + cursor_db_app.execute( + insert_query, + ( + str(uuid4()), # id + assetnum, # assetnum + year, # tahun + seq, # seq + (0 if year > current_year + 1 else 1), # is_actual + raw_cm_material_cost, # raw_cm_material_cost + raw_cm_labor_time, # raw_cm_labor_time + rc_cm_material_cost, # rc_cm_material_cost + ), + ) + # commit per successful insert to allow continuing on later errors + connection.commit() + inserted_count += 1 + finished_data.append({"assetnum": assetnum, "year": year}) + print(f"Corrective data inserted for {assetnum} in year {year}") + except Exception as e: + # rollback the failed statement so the transaction is usable again + try: + connection.rollback() + except Exception: + pass + error_count += 1 + errors.append({"assetnum": assetnum, "year": year, "error": str(e)}) + print(f"Error inserting {assetnum} year {year}: {e}") + seq += 1 + + asset_elapsed = datetime.now() - asset_start + print(f"Processed asset {assetnum} in {asset_elapsed.total_seconds():.2f}s") + + # final commit for safety (no-op if nothing pending) + try: + connection.commit() + except Exception: + pass + except Exception as e: + print("Error saat menjalankan insert_lcca_maximo_corrective_data:", e) + try: + connection.rollback() + except Exception: + pass + finally: + total_elapsed = None + try: + total_elapsed = datetime.now() - start_time + except Exception: + pass + print("========Process finished and connection closed.========") + print(f"Inserted rows: {inserted_count}, Errors: {error_count}") + if total_elapsed is not None: + print(f"Total elapsed time: {total_elapsed.total_seconds():.2f}s") + if errors: + print(f"Sample error: {errors[0]}") + if connection or connection_wo_db or production_connection: + cursor_db_app.close() + cursor_wo.close() + cursor_production.close() + connection.close() + connection_wo_db.close() + production_connection.close() + -async def query_data(RELIABILITY_APP_URL: str, token: str): +async def query_data(): connection = None + connection_wo_db = None + connection_production_wo = None try: # Mendapatkan koneksi dari config.py connection, connection_wo_db = get_connection() - if connection is None or connection_wo_db is None: + connection_production_wo = get_production_connection() + if connection is None or connection_wo_db is None or connection_production_wo is None: print("Database connection failed.") return # Membuat cursor menggunakan DictCursor cursor = connection.cursor(cursor_factory=DictCursor) - cursor_wo = connection_wo_db.cursor(cursor_factory=DictCursor) + cursor_wo = connection_production_wo.cursor(cursor_factory=DictCursor) # TRUNCATE DATA - # truncate_query = "TRUNCATE TABLE lcc_equipment_tr_data" - # cursor.execute(truncate_query) + truncate_query = "TRUNCATE TABLE lcc_equipment_tr_data RESTART IDENTITY" + cursor.execute(truncate_query) # Query untuk mendapatkan semua data dari tabel `lcc_ms_equipment_data` - query_main = "SELECT * FROM lcc_ms_equipment_data" + # query_main = "SELECT * FROM lcc_ms_equipment_data" + query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master" cursor.execute(query_main) # Fetch semua hasil query @@ -114,17 +389,32 @@ async def query_data(RELIABILITY_APP_URL: str, token: str): # Tahun sekarang current_year = datetime.now().year + total_assets = len(results) + processed_assets = 0 + total_inserted = 0 + overall_start = datetime.now() + print(f"Starting processing {total_assets} assets at {overall_start.isoformat()}") + # Looping untuk setiap assetnum - for row in results: + for idx, row in enumerate(results, start=1): assetnum = row["assetnum"] # Mengambil assetnum dari hasil query - forecasting_start_year = row["forecasting_start_year"] - 1 + if not assetnum or str(assetnum).strip() == "": + print(f"[{idx}/{total_assets}] Skipping empty assetnum") + continue + # forecasting_start_year = row["forecasting_start_year"] - 1 + forecasting_start_year = 2014 + + asset_start = datetime.now() + processed_assets += 1 + years_processed = 0 + inserted_this_asset = 0 # CM - recursive_results = get_recursive_query( - cursor_wo, assetnum, worktype="CM" - ) + data_cm = get_recursive_query(cursor_wo, assetnum, worktype="CM") # PM data_pm = get_recursive_query(cursor_wo, assetnum, worktype="PM") + # PDM = Predictive Maintenance + data_predictive = get_recursive_query(cursor_wo, assetnum, worktype="PDM") # OH data_oh = get_recursive_query(cursor_wo, assetnum, worktype="OH") # Data Tahun @@ -133,15 +423,28 @@ async def query_data(RELIABILITY_APP_URL: str, token: str): seq = 0 # Looping untuk setiap tahun for year in range(forecasting_start_year, current_year + 1): + years_processed += 1 # print(f"Processing assetnum {assetnum} in year {year}") - # Filter data berdasarkan tahun - recursive_row = next( - (r for r in recursive_results if r["year"] == year), None + # Filter data berdasarkan tahun (support both 'tahun' and 'year' column names) + data_cm_row = next( + (r for r in data_cm if (r.get("tahun") == year or r.get("year") == year)), + None, ) # CM Corrective Maintenance - data_pm_row = next((r for r in data_pm if r["year"] == year), None) - data_oh_row = next((r for r in data_oh if r["year"] == year), None) + data_pm_row = next( + (r for r in data_pm if (r.get("tahun") == year or r.get("year") == year)), + None, + ) + data_oh_row = next( + (r for r in data_oh if (r.get("tahun") == year or r.get("year") == year)), + None, + ) + data_predictive_row = next( + (r for r in data_predictive if (r.get("tahun") == year or r.get("year") == year)), + None, + ) data_tahunan_row = next( - (r for r in data_tahunan if r["year"] == year), None + (r for r in data_tahunan if (r.get("tahun") == year or r.get("year") == year)), + None, ) # Cek apakah data sudah ada @@ -159,28 +462,36 @@ async def query_data(RELIABILITY_APP_URL: str, token: str): continue if not data_exists: - print("Data not exists for assetnum", assetnum) # Insert data jika belum ada - if not recursive_row and not data_pm_row and not data_oh_row: + if not data_cm_row and not data_pm_row and not data_oh_row and not data_predictive_row: # Jika data recursive_row tidak ada insert_query = """ INSERT INTO lcc_equipment_tr_data ( id, assetnum, tahun, seq, is_actual, - raw_cm_interval, raw_cm_material_cost, - raw_cm_labor_time, raw_cm_labor_human + raw_cm_interval, raw_cm_material_cost, raw_cm_labor_time, raw_cm_labor_human , raw_pm_interval, raw_pm_material_cost, raw_pm_labor_time, raw_pm_labor_human - , raw_oh_material_cost, raw_oh_labor_time, raw_oh_labor_human + , raw_oh_interval, raw_oh_material_cost, raw_oh_labor_time, raw_oh_labor_human + , raw_predictive_interval, raw_predictive_material_cost, raw_predictive_labor_time, raw_predictive_labor_human , "raw_loss_output_MW", raw_loss_output_price - ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s - , %s, %s, %s, %s - , %s, %s, %s - , %s, %s + , rc_cm_material_cost, rc_cm_labor_cost + , rc_pm_material_cost, rc_pm_labor_cost + , rc_oh_material_cost, rc_oh_labor_cost + , rc_predictive_labor_cost + , created_by, created_at + ) VALUES ( + %s, %s, %s, %s, %s, + %s, %s, %s, %s, + %s, %s, %s, %s, + %s, %s, %s, %s, + %s, %s, %s, %s, + %s, %s + , %s, %s + , %s, %s + , %s, %s + , %s + , 'Sys', NOW() ) """ - api_data = await fetch_api_data( - assetnum, year, RELIABILITY_APP_URL, token - ) - print(api_data) cursor.execute( insert_query, ( @@ -189,62 +500,112 @@ async def query_data(RELIABILITY_APP_URL: str, token: str): year, # tahun seq, # seq 1, # is_actual - ( - api_data["data"][0]["actual_fail"] - if api_data - else 1 - ), # raw_cm_interval (minimal 1 karena minimal 1x OH) + 1, # raw_cm_interval (minimal 1 karena minimal 1x OH) 0, # raw_cm_material_cost 0, # raw_cm_labor_time 0, # raw_cm_labor_human 1, # pm interval set default 1 - 0, - 0, - 0, - 0, - 0, - 0, - ( - data_tahunan_row["total_lost"] + 0, # raw_pm_material_cost + 0, # raw_pm_labor_time + 0, # raw_pm_labor_human + 0, # raw_oh_interval set default 1 + 0, # raw_oh_material_cost + 0, # raw_oh_labor_time + 0, # raw_oh_labor_human + 0, # raw_predictive_interval set default 1 + 0, # raw_predictive_material_cost + 0, # raw_predictive_labor_time + 0, # raw_predictive_labor_human + ( # "raw_loss_output_MW" + # data_tahunan_row["total_lost"] + 0 if data_tahunan_row else 0 ), - ( - data_tahunan_row["rp_per_kwh"] + ( # raw_loss_output_price + # data_tahunan_row["rp_per_kwh"] + 0 if data_tahunan_row else 0 ), + 0, # rc_cm_material_cost + 0, # rc_cm_labor_cost + 0, # rc_pm_material_cost + 0, # rc_pm_labor_cost + 0, # rc_oh_material_cost + 0, # rc_oh_labor_cost + 0, # rc_predictive_labor_cost ), ) - print(f"Data inserted for {assetnum} in year {year}") + inserted_this_asset += 1 + total_inserted += 1 + # print minimal per-year insert log + # print(f"Inserted default data for {assetnum} year {year}") else: - print("Data exists for assetnum", assetnum) # Jika data recursive_row ada - # raw_cm_interval ambil dari reliability predict insert_query = """ INSERT INTO lcc_equipment_tr_data ( id, assetnum, tahun, seq, is_actual, - raw_cm_interval, raw_cm_material_cost, - raw_cm_labor_time, raw_cm_labor_human + raw_cm_interval, raw_cm_material_cost, raw_cm_labor_time, raw_cm_labor_human , raw_pm_interval, raw_pm_material_cost, raw_pm_labor_time, raw_pm_labor_human - , raw_oh_material_cost, raw_oh_labor_time, raw_oh_labor_human + , raw_oh_interval, raw_oh_material_cost, raw_oh_labor_time, raw_oh_labor_human + , raw_predictive_interval, raw_predictive_material_cost, raw_predictive_labor_time, raw_predictive_labor_human , "raw_loss_output_MW", raw_loss_output_price - ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s - , %s, %s, %s, %s - , %s, %s, %s - , %s, %s + , "rc_cm_material_cost", "rc_cm_labor_cost" + , "rc_pm_material_cost", "rc_pm_labor_cost" + , "rc_oh_material_cost", "rc_oh_labor_cost" + , "rc_predictive_labor_cost" + , created_by, created_at + ) VALUES ( + %s, %s, %s, %s, %s, + %s, %s, %s, %s, + %s, %s, %s, %s, + %s, %s, %s, %s, + %s, %s, %s, %s, + %s, %s, + %s, %s, + %s, %s, + %s, %s, + %s, + 'Sys', NOW() ) """ - api_data = await fetch_api_data( - assetnum, year, RELIABILITY_APP_URL, token - ) - print(api_data) - if api_data and "data" in api_data and api_data["data"]: - print("API data:", api_data["data"][0]["actual_fail"]) - else: - print( - f"No API data available for {assetnum} in year {year}" - ) + # Normalize row values to avoid inserting NULL and avoid division by zero + raw_cm_interval = data_cm_row.get("raw_cm_interval") if data_cm_row and data_cm_row.get("raw_cm_interval") is not None else 0 + raw_cm_material_cost = data_cm_row.get("raw_cm_material_cost") if data_cm_row and data_cm_row.get("raw_cm_material_cost") is not None else 0 + avg_cm_material_cost = (raw_cm_material_cost / raw_cm_interval) if raw_cm_interval else 0 + raw_cm_labor_time = data_cm_row.get("raw_cm_labor_time") if data_cm_row and data_cm_row.get("raw_cm_labor_time") is not None else 0 + raw_cm_labor_human = data_cm_row.get("raw_cm_labor_human") if data_cm_row and data_cm_row.get("raw_cm_labor_human") is not None else 0 + + raw_pm_interval = data_pm_row.get("raw_pm_interval") if data_pm_row and data_pm_row.get("raw_pm_interval") is not None else 0 + raw_pm_material_cost = data_pm_row.get("raw_pm_material_cost") if data_pm_row and data_pm_row.get("raw_pm_material_cost") is not None else 0 + raw_pm_labor_time = data_pm_row.get("raw_pm_labor_time") if data_pm_row and data_pm_row.get("raw_pm_labor_time") is not None else 0 + raw_pm_labor_human = data_pm_row.get("raw_pm_labor_human") if data_pm_row and data_pm_row.get("raw_pm_labor_human") is not None else 0 + + raw_oh_interval = data_oh_row.get("raw_oh_interval") if data_oh_row and data_oh_row.get("raw_oh_interval") is not None else 0 + raw_oh_material_cost = data_oh_row.get("raw_oh_material_cost") if data_oh_row and data_oh_row.get("raw_oh_material_cost") is not None else 0 + raw_oh_labor_time = data_oh_row.get("raw_oh_labor_time") if data_oh_row and data_oh_row.get("raw_oh_labor_time") is not None else 0 + raw_oh_labor_human = data_oh_row.get("raw_oh_labor_human") if data_oh_row and data_oh_row.get("raw_oh_labor_human") is not None else 0 + + raw_pdm_interval = data_predictive_row.get("raw_predictive_interval") if data_predictive_row and data_predictive_row.get("raw_predictive_interval") is not None else 0 + raw_pdm_material_cost = data_predictive_row.get("raw_predictive_material_cost") if data_predictive_row and data_predictive_row.get("raw_predictive_material_cost") is not None else 0 + raw_pdm_labor_time = data_predictive_row.get("raw_predictive_labor_time") if data_predictive_row and data_predictive_row.get("raw_predictive_labor_time") is not None else 0 + raw_pdm_labor_human = data_predictive_row.get("raw_predictive_labor_human") if data_predictive_row and data_predictive_row.get("raw_predictive_labor_human") is not None else 0 + + raw_loss_output_MW = data_tahunan_row.get("total_lost") if data_tahunan_row and data_tahunan_row.get("total_lost") is not None else 0 + raw_loss_output_price = data_tahunan_row.get("rp_per_kwh") if data_tahunan_row and data_tahunan_row.get("rp_per_kwh") is not None else 0 + + rc_cm_material_cost = data_cm_row.get("raw_cm_material_cost") if data_cm_row and data_cm_row.get("raw_cm_material_cost") is not None else 0 + rc_cm_labor_cost = data_cm_row.get("raw_cm_labor_time")*data_cm_row.get("rc_cm_labor_human")*data_tahunan_row.get("man_hour") if data_cm_row and data_cm_row.get("rc_cm_labor_cost") and data_cm_row.get("rc_cm_labor_human") and data_tahunan_row.get("man_hour") is not None else 0 + + rc_pm_material_cost = data_pm_row.get("raw_pm_material_cost") if data_pm_row and data_pm_row.get("raw_pm_material_cost") is not None else 0 + rc_pm_labor_cost = data_pm_row.get("raw_pm_labor_time")*data_pm_row.get("rc_pm_labor_human")*data_tahunan_row.get("man_hour") if data_pm_row and data_pm_row.get("rc_pm_labor_cost") and data_pm_row.get("rc_pm_labor_human") and data_tahunan_row.get("man_hour") is not None else 0 + + rc_oh_material_cost = data_oh_row.get("raw_oh_material_cost") if data_oh_row and data_oh_row.get("raw_oh_material_cost") is not None else 0 + rc_oh_labor_cost = data_oh_row.get("raw_oh_labor_time")*data_oh_row.get("rc_oh_labor_human")*data_tahunan_row.get("man_hour") if data_oh_row and data_oh_row.get("rc_oh_labor_cost") and data_oh_row.get("rc_oh_labor_human") and data_tahunan_row.get("man_hour") is not None else 0 + + rc_predictive_labor_cost = data_predictive_row.get("raw_predictive_labor_human")*data_tahunan_row.get("man_hour") if data_predictive_row and data_predictive_row.get("rc_predictive_labor_cost") and data_tahunan_row.get("man_hour") is not None else 0 + cursor.execute( insert_query, ( @@ -253,117 +614,74 @@ async def query_data(RELIABILITY_APP_URL: str, token: str): year, # tahun seq, # seq 1, # is_actual - ( - api_data["data"][0]["actual_fail"] - if api_data - else ( - recursive_row["raw_corrective_failure_interval"] - + 1 - if recursive_row - else 1 - ) - ), # raw_cm_interval nanti ambil dari API reliability predict - ( - recursive_row["raw_corrective_material_cost"] - if recursive_row - else 0 - ), # raw_cm_material_cost - ( - ( - recursive_row["raw_corrective_labor_time_jam"] - or 0 - ) - if recursive_row - else 0 - ), # raw_cm_labor_time - ( - ( - max( - recursive_row[ - "raw_corrective_labor_technician" - ], - 1, - ) - if recursive_row[ - "raw_corrective_labor_time_jam" - ] - else 0 - ) - if recursive_row - else 0 - ), # raw_cm_labor_human - 1, # raw_pm_interval - ( - data_pm_row["raw_corrective_material_cost"] - if data_pm_row - else 0 - ), # raw_pm_material_cost - ( - (data_pm_row["raw_corrective_labor_time_jam"] or 0) - if data_pm_row - else 0 - ), # raw_pm_labor_time - ( - ( - max( - data_pm_row[ - "raw_corrective_labor_technician" - ], - 1, - ) - if data_pm_row["raw_corrective_labor_time_jam"] - else 0 - ) - if data_pm_row - else 0 - ), # raw_pm_labor_human - ( - data_oh_row["raw_corrective_material_cost"] - if data_oh_row - else 0 - ), # raw_oh_material_cost - ( - (data_oh_row["raw_corrective_labor_time_jam"] or 0) - if data_oh_row - else 0 - ), # raw_oh_labor_time - ( - ( - max( - data_oh_row[ - "raw_corrective_labor_technician" - ], - 1, - ) - if data_oh_row["raw_corrective_labor_time_jam"] - else 0 - ) - if data_oh_row - else 0 - ), # raw_oh_labor_human - ( - data_tahunan_row["total_lost"] - if data_tahunan_row - else 0 - ) - / ( - recursive_row["raw_corrective_failure_interval"] + 1 - if recursive_row - else 1 - ), # raw_loss_output_MW - ( - data_tahunan_row["rp_per_kwh"] - if data_tahunan_row - else 0 - ), + raw_cm_interval, # raw_cm_interval + avg_cm_material_cost, # avg raw_cm_material_cost per interval + raw_cm_labor_time, # raw_cm_labor_time + raw_cm_labor_human, # raw_cm_labor_human + raw_pm_interval, # raw_pm_interval + raw_pm_material_cost, # raw_pm_material_cost + raw_pm_labor_time, # raw_pm_labor_time + raw_pm_labor_human, + raw_oh_interval, + raw_oh_material_cost, + raw_oh_labor_time, + raw_oh_labor_human, + raw_pdm_interval, + raw_pdm_material_cost, + raw_pdm_labor_time, + raw_pdm_labor_human, + raw_loss_output_MW, + raw_loss_output_price, + rc_cm_material_cost, + rc_cm_labor_cost, + rc_pm_material_cost, + rc_pm_labor_cost, + rc_oh_material_cost, + rc_oh_labor_cost, + rc_predictive_labor_cost, ), ) - print(f"Data inserted for {assetnum} in year {year}") + inserted_this_asset += 1 + total_inserted += 1 seq = seq + 1 + # commit per asset to persist progress and free transaction + try: + connection.commit() + except Exception: + try: + connection.rollback() + except Exception: + pass + + asset_elapsed = datetime.now() - asset_start + total_elapsed = datetime.now() - overall_start + pct_assets = (idx / total_assets) * 100 if total_assets else 100 + # progress per asset + print( + f"[{idx}/{total_assets}] Asset {assetnum} processed. " + f"Inserted this asset: {inserted_this_asset}. " + f"Asset time: {asset_elapsed.total_seconds():.2f}s. " + f"Total inserted: {total_inserted}. " + f"Overall elapsed: {total_elapsed.total_seconds():.2f}s. " + f"Progress: {pct_assets:.1f}%" + ) + + # periodic summary every 10 assets + if idx % 10 == 0 or idx == total_assets: + print(f"SUMMARY: {idx}/{total_assets} assets processed, {total_inserted} rows inserted, elapsed {total_elapsed.total_seconds():.1f}s") + # Commit perubahan - connection.commit() + try: + connection.commit() + except Exception: + try: + connection.rollback() + except Exception: + pass + + print(f"Finished processing all assets. Total assets: {total_assets}, total inserted: {total_inserted}, total time: {(datetime.now()-overall_start).total_seconds():.2f}s") except Exception as e: print("Error saat menjalankan query:", e) @@ -372,8 +690,20 @@ async def query_data(RELIABILITY_APP_URL: str, token: str): # Menutup koneksi print("========Process finished and connection closed.========") if connection or connection_wo_db: - cursor.close() - cursor_wo.close() - connection.close() - connection_wo_db.close() + try: + cursor.close() + except Exception: + pass + try: + cursor_wo.close() + except Exception: + pass + try: + connection.close() + except Exception: + pass + try: + connection_wo_db.close() + except Exception: + pass # print("========Process finished and connection closed.========") diff --git a/src/modules/equipment/run.py b/src/modules/equipment/run.py index 9facc14..244b631 100644 --- a/src/modules/equipment/run.py +++ b/src/modules/equipment/run.py @@ -1,47 +1,71 @@ -from .insert_actual_data import query_data -from .Prediksi import Prediksi -from .Eac import Eac import asyncio import time +# prefer package-relative imports, but allow running this file directly as a script +try: + from .insert_actual_data import query_data, insert_lcca_maximo_corrective_data, insert_ms_equipment_data + from .Prediksi import Prediksi + from .Eac import Eac +except ImportError: + # fallback when there's no parent package (e.g., python run.py) + from insert_actual_data import query_data, insert_lcca_maximo_corrective_data, insert_ms_equipment_data + from Prediksi import Prediksi + from Eac import Eac + # Panggil fungsi -async def main(assetnum, token, RELIABILITY_APP_URL): +async def main(): start_time = time.time() try: - await query_data(RELIABILITY_APP_URL, token) + await query_data() except Exception as e: print(f"Error in query_data: {str(e)}") return try: - prediksi = Prediksi(RELIABILITY_APP_URL) - await prediksi.predict_equipment_data( - assetnum, - token=token, - ) + prediksi = Prediksi() + await prediksi.main() except Exception as e: print(f"Error in predict_equipment_data: {str(e)}") return - try: - eac = Eac() - eac.hitung_eac_equipment(assetnum) - except Exception as e: - print(f"Error in hitung_eac_equipment: {str(e)}") - return + # try: + # eac = Eac() + # eac.hitung_eac_equipment(assetnum) + # except Exception as e: + # print(f"Error in hitung_eac_equipment: {str(e)}") + # return end_time = time.time() execution_time = end_time - start_time - print(f"EAC calculation finished in {execution_time:.2f} seconds.") - return f"EAC calculation finished in {execution_time:.2f} seconds." + # format execution time into h/m/s as needed + if execution_time >= 3600: + hours = int(execution_time // 3600) + minutes = int((execution_time % 3600) // 60) + seconds = execution_time % 60 + message = f"Insert & Prediction calculation finished in {hours}h {minutes}m {seconds:.2f}s." + elif execution_time >= 60: + minutes = int(execution_time // 60) + seconds = execution_time % 60 + message = f"Insert & Prediction calculation finished in {minutes}m {seconds:.2f}s." + else: + message = f"Insert & Prediction calculation finished in {execution_time:.2f} seconds." + + print(message) + return message + # print(f"EAC calculation finished in {execution_time:.2f} seconds.") + # return f"EAC calculation finished in {execution_time:.2f} seconds." +# if __name__ == "__main__": +# asyncio.run( +# main( +# "A22277", +# "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTczOTUxODc4Ni4yOTM5ODUsImp0aSI6Ilo5clRUOFhGa3RweFZUQlBmNGxvRmciLCJ0eXBlIjoiYWNjZXNzIiwic3ViIjoiNWUxNmY4YTgtMWEwMy00MTVjLWIwZjItMTVmZjczOWY1OGE4IiwibmJmIjoxNzM5NTE4Nzg2LCJjc3JmIjoiZWI0MjAzOTMtYTg1ZS00NDJjLWIyMjItZTU5MGU5MGVkYjkyIiwiZXhwIjoxNzM5NjA1MTg2LCJub25jZSI6IjVkZDdhOGYyMWIzZWUxZDZmYmI1YThhMDBlMmYyYjczIn0.3Jv943cU5FuxJ9K92JmVoOtTBqexF4Dke8TrrC4l0Uk", +# ) +# ) if __name__ == "__main__": asyncio.run( - main( - "A22277", - "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTczOTUxODc4Ni4yOTM5ODUsImp0aSI6Ilo5clRUOFhGa3RweFZUQlBmNGxvRmciLCJ0eXBlIjoiYWNjZXNzIiwic3ViIjoiNWUxNmY4YTgtMWEwMy00MTVjLWIwZjItMTVmZjczOWY1OGE4IiwibmJmIjoxNzM5NTE4Nzg2LCJjc3JmIjoiZWI0MjAzOTMtYTg1ZS00NDJjLWIyMjItZTU5MGU5MGVkYjkyIiwiZXhwIjoxNzM5NjA1MTg2LCJub25jZSI6IjVkZDdhOGYyMWIzZWUxZDZmYmI1YThhMDBlMmYyYjczIn0.3Jv943cU5FuxJ9K92JmVoOtTBqexF4Dke8TrrC4l0Uk", - ) + main() )