import asyncio import pandas as pd from decimal import Decimal, InvalidOperation import psycopg2 from psycopg2.extras import DictCursor from uuid import uuid4 from datetime import datetime import sys import os import httpx sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from config import get_connection, get_production_connection async def fetch_api_data( assetnum: str, year: int, RELIABILITY_APP_URL: str, token: str ) -> dict: url = RELIABILITY_APP_URL # print(f"Using URL: {url}") # Add this for debugging async with httpx.AsyncClient() as client: # print( # f"{url}/main/number-of-failures/{assetnum}/{int(year)}/{int(year)}" # ) try: response = await client.get( f"{url}/main/failures/{assetnum}/{int(year)}/{int(year)}", timeout=30.0, headers={"Authorization": f"Bearer {token}"}, ) response.raise_for_status() return response.json() except httpx.HTTPError as e: print(f"HTTP error occurred: {e}") return {} def get_recursive_query(cursor, assetnum, worktype="CM"): """ Fungsi untuk menjalankan query rekursif berdasarkan assetnum dan worktype. worktype memiliki nilai default 'CM'. """ # query = f""" # SELECT # ROW_NUMBER() OVER (ORDER BY tbl.assetnum, tbl.year, tbl.worktype) AS seq, # * # FROM ( # SELECT # a.worktype, # a.assetnum, # EXTRACT(YEAR FROM a.reportdate) AS year, # COUNT(a.wonum) AS raw_corrective_failure_interval, # SUM(a.total_cost_max) AS raw_corrective_material_cost, # ROUND( # SUM( # EXTRACT(EPOCH FROM ( # a.actfinish - # a.actstart # )) # ) / 3600 # , 2) AS raw_corrective_labor_time_jam, # SUM(a.jumlah_labor) AS raw_corrective_labor_technician # FROM # public.wo_staging_3 AS a # WHERE # a.unit = '3' # GROUP BY # a.worktype, # a.assetnum, # EXTRACT(YEAR FROM a.reportdate) # ) AS tbl # WHERE # tbl.worktype = '{worktype}' # AND tbl.assetnum = '{assetnum}' # ORDER BY # tbl.assetnum, # tbl.year, # tbl.worktype # """ # query = f""" # select d.tahun, SUM(d.actmatcost) AS raw_corrective_material_cost, sum(d.man_hour) as man_hour_peryear from # ( # SELECT # a.wonum, # a.actmatcost, # DATE_PART('year', a.reportdate) AS tahun, # ( # ROUND(SUM(EXTRACT(EPOCH FROM (a.actfinish - a.actstart)) / 3600), 2) # ) AS man_hour, # CASE # WHEN COUNT(b.laborcode) = 0 THEN 3 # ELSE COUNT(b.laborcode) # END AS man_count # FROM public.wo_maximo AS a # LEFT JOIN public.wo_maximo_labtrans AS b # ON b.wonum = a.wonum # WHERE # a.asset_unit = '3' # AND a.worktype = '{worktype}' # AND a.asset_assetnum = '{assetnum}' # and a.wonum not like 'T%' # GROUP BY # a.wonum, # a.actmatcost, # DATE_PART('year', a.reportdate) # ) as d group by d.tahun # ; # """ query = f""" select DATE_PART('year', a.reportdate) as tahun, COUNT(a.wonum) as raw_{worktype.lower()}_interval, sum(a.actmatcost) as raw_{worktype.lower()}_material_cost, ( ROUND(SUM(EXTRACT(EPOCH FROM (a.actfinish - a.actstart)) / 3600), 2) ) AS raw_{worktype.lower()}_labor_time, CASE WHEN COUNT(b.laborcode) = 0 THEN 3 ELSE COUNT(b.laborcode) END AS raw_{worktype.lower()}_labor_human from public.wo_maximo as a LEFT JOIN public.wo_maximo_labtrans AS b ON b.wonum = a.wonum where a.asset_unit = '3' {f"AND a.worktype = '{worktype}'" if worktype != 'CM' else "AND a.worktype in ('CM', 'PROACTIVE', 'WA')"} AND a.asset_assetnum = '{assetnum}' and a.wonum not like 'T%' {f"AND a.wojp8 != 'S1'" if worktype == 'CM' else ""} group by DATE_PART('year', a.reportdate); """ # Eksekusi query dan fetch hasil cursor.execute(query) return cursor.fetchall() def get_data_tahun(cursor): query = f""" select * from lcc_ms_year_data """ # Eksekusi query dan fetch hasil cursor.execute(query) return cursor.fetchall() def _parse_decimal(value: str, decimal_separator: str = ".") -> Decimal: """Parse numeric strings that may use comma decimal separators.""" if value is None: return Decimal("0") sanitized = value.strip() if not sanitized: return Decimal("0") sanitized = sanitized.replace(" ", "") if decimal_separator == ",": sanitized = sanitized.replace(".", "").replace(",", ".") else: sanitized = sanitized.replace(",", "") try: return Decimal(sanitized) except (InvalidOperation, ValueError): print(f"Unable to parse numeric value '{value}', defaulting to 0.") return Decimal("0") def _normalize_key(key: str) -> str: if not key: return "" cleaned = key.strip().lstrip("\ufeff").lower() for char in (" ", ".", "-", "\t"): cleaned = cleaned.replace(char, "_") while "__" in cleaned: cleaned = cleaned.replace("__", "_") return cleaned def _load_acquisition_cost_lookup(csv_path: str) -> dict: if not os.path.exists(csv_path): print(f"CSV file not found at {csv_path}") return {} try: df = pd.read_csv(csv_path, sep=";", dtype=str, keep_default_na=False, encoding="utf-8") except Exception as exc: print(f"Failed to read CSV file {csv_path}: {exc}") return {} df.columns = [_normalize_key(col) for col in df.columns] required_cols = {"location_tag", "proportion", "category_no", "acquisition_cost"} missing_cols = required_cols - set(df.columns) if missing_cols: print(f"CSV file is missing required columns: {', '.join(sorted(missing_cols))}") return {} lookup = {} for _, row in df.iterrows(): raw_tag = (row.get("location_tag") or "").strip() location_tag = raw_tag.upper() if not location_tag: continue lookup[location_tag] = { "proportion": _parse_decimal(row.get("proportion"), decimal_separator=","), "category_no": _parse_decimal(row.get("category_no"), decimal_separator="."), "acquisition_cost": _parse_decimal(row.get("acquisition_cost"), decimal_separator="."), "raw_location_tag": raw_tag, } return lookup def _build_tr_row_values( data_cm_row, data_pm_row, data_oh_row, data_predictive_row, data_tahunan_row, ): """Return sanitized numeric values for equipment transaction rows.""" def _safe_value(row, key): if not row: return 0 value = row.get(key) return value if value is not None else 0 has_recursive_data = any( row for row in (data_cm_row, data_pm_row, data_oh_row, data_predictive_row) ) if not has_recursive_data: return { "raw_cm_interval": 0, "raw_cm_material_cost": 0, "raw_cm_labor_time": 0, "raw_cm_labor_human": 0, "raw_pm_interval": 0, "raw_pm_material_cost": 0, "raw_pm_labor_time": 0, "raw_pm_labor_human": 0, "raw_oh_interval": 0, "raw_oh_material_cost": 0, "raw_oh_labor_time": 0, "raw_oh_labor_human": 0, "raw_predictive_interval": 0, "raw_predictive_material_cost": 0, "raw_predictive_labor_time": 0, "raw_predictive_labor_human": 0, "raw_loss_output_MW": 0, "raw_loss_output_price": 0, "rc_cm_material_cost": 0, "rc_cm_labor_cost": 0, "rc_pm_material_cost": 0, "rc_pm_labor_cost": 0, "rc_oh_material_cost": 0, "rc_oh_labor_cost": 0, "rc_predictive_labor_cost": 0, } raw_cm_interval = _safe_value(data_cm_row, "raw_cm_interval") raw_cm_material_cost_total = _safe_value(data_cm_row, "raw_cm_material_cost") raw_cm_material_cost = ( raw_cm_material_cost_total / raw_cm_interval if raw_cm_interval else 0 ) raw_cm_labor_time = _safe_value(data_cm_row, "raw_cm_labor_time") raw_cm_labor_human = _safe_value(data_cm_row, "raw_cm_labor_human") raw_pm_interval = _safe_value(data_pm_row, "raw_pm_interval") raw_pm_material_cost = _safe_value(data_pm_row, "raw_pm_material_cost") raw_pm_labor_time = _safe_value(data_pm_row, "raw_pm_labor_time") raw_pm_labor_human = _safe_value(data_pm_row, "raw_pm_labor_human") raw_oh_interval = _safe_value(data_oh_row, "raw_oh_interval") raw_oh_material_cost = _safe_value(data_oh_row, "raw_oh_material_cost") raw_oh_labor_time = _safe_value(data_oh_row, "raw_oh_labor_time") raw_oh_labor_human = _safe_value(data_oh_row, "raw_oh_labor_human") raw_pdm_interval = _safe_value(data_predictive_row, "raw_predictive_interval") raw_pdm_material_cost = _safe_value( data_predictive_row, "raw_predictive_material_cost" ) raw_pdm_labor_time = _safe_value( data_predictive_row, "raw_predictive_labor_time" ) raw_pdm_labor_human = _safe_value( data_predictive_row, "raw_predictive_labor_human" ) raw_loss_output_MW = ( data_tahunan_row.get("total_lost") if data_tahunan_row and data_tahunan_row.get("total_lost") is not None else 0 ) raw_loss_output_price = ( data_tahunan_row.get("rp_per_kwh") if data_tahunan_row and data_tahunan_row.get("rp_per_kwh") is not None else 0 ) man_hour_value = ( data_tahunan_row.get("man_hour") if data_tahunan_row and data_tahunan_row.get("man_hour") is not None else None ) rc_cm_material_cost = raw_cm_material_cost_total rc_cm_labor_cost = ( data_cm_row.get("raw_cm_labor_time") * data_cm_row.get("rc_cm_labor_human") * man_hour_value if data_cm_row and data_cm_row.get("rc_cm_labor_cost") and data_cm_row.get("rc_cm_labor_human") and man_hour_value is not None else 0 ) rc_pm_material_cost = raw_pm_material_cost rc_pm_labor_cost = ( data_pm_row.get("raw_pm_labor_time") * data_pm_row.get("rc_pm_labor_human") * man_hour_value if data_pm_row and data_pm_row.get("rc_pm_labor_cost") and data_pm_row.get("rc_pm_labor_human") and man_hour_value is not None else 0 ) rc_oh_material_cost = raw_oh_material_cost rc_oh_labor_cost = ( data_oh_row.get("raw_oh_labor_time") * data_oh_row.get("rc_oh_labor_human") * man_hour_value if data_oh_row and data_oh_row.get("rc_oh_labor_cost") and data_oh_row.get("rc_oh_labor_human") and man_hour_value is not None else 0 ) rc_predictive_labor_cost = ( data_predictive_row.get("raw_predictive_labor_human") * man_hour_value if data_predictive_row and data_predictive_row.get("rc_predictive_labor_cost") and man_hour_value is not None else 0 ) return { "raw_cm_interval": raw_cm_interval, "raw_cm_material_cost": raw_cm_material_cost, "raw_cm_labor_time": raw_cm_labor_time, "raw_cm_labor_human": raw_cm_labor_human, "raw_pm_interval": raw_pm_interval, "raw_pm_material_cost": raw_pm_material_cost, "raw_pm_labor_time": raw_pm_labor_time, "raw_pm_labor_human": raw_pm_labor_human, "raw_oh_interval": raw_oh_interval, "raw_oh_material_cost": raw_oh_material_cost, "raw_oh_labor_time": raw_oh_labor_time, "raw_oh_labor_human": raw_oh_labor_human, "raw_predictive_interval": raw_pdm_interval, "raw_predictive_material_cost": raw_pdm_material_cost, "raw_predictive_labor_time": raw_pdm_labor_time, "raw_predictive_labor_human": raw_pdm_labor_human, "raw_loss_output_MW": raw_loss_output_MW, "raw_loss_output_price": raw_loss_output_price, "rc_cm_material_cost": rc_cm_material_cost, "rc_cm_labor_cost": rc_cm_labor_cost, "rc_pm_material_cost": rc_pm_material_cost, "rc_pm_labor_cost": rc_pm_labor_cost, "rc_oh_material_cost": rc_oh_material_cost, "rc_oh_labor_cost": rc_oh_labor_cost, "rc_predictive_labor_cost": rc_predictive_labor_cost, } async def insert_ms_equipment_data(): connection = None try: connection, connection_wo_db = get_connection() cursor_db_app = connection.cursor(cursor_factory=DictCursor) query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master" cursor_db_app.execute(query_main) results = cursor_db_app.fetchall() inserted = 0 processed = 0 total = len(results) if total == 0: print("No assetnum to insert.") else: start_time = datetime.now() print(f"Starting insert of {total} assetnum into lcc_ms_equipment_data") for idx, row in enumerate(results, start=1): assetnum = row.get("assetnum") try: # skip null/empty assetnum if not assetnum: print(f"[{idx}/{total}] Skipping empty assetnum") else: # check existing cursor_db_app.execute( "SELECT 1 FROM lcc_ms_equipment_data WHERE assetnum = %s LIMIT 1", (assetnum,), ) if cursor_db_app.fetchone(): print(f"[{idx}/{total}] Already exists: {assetnum}") else: # provide an id since the table enforces NOT NULL on id cursor_db_app.execute( "INSERT INTO lcc_ms_equipment_data (id, assetnum) VALUES (%s, %s)", (str(uuid4()), assetnum), ) connection.commit() inserted += 1 print(f"[{idx}/{total}] Inserted: {assetnum}") except Exception as e: try: connection.rollback() except Exception: pass print(f"[{idx}/{total}] Error inserting {assetnum}: {e}") processed += 1 # progress monitoring every 10 items and at end if idx % 10 == 0 or idx == total: elapsed = datetime.now() - start_time pct = (idx / total) * 100 if total else 100 print( f"Progress: {idx}/{total} ({pct:.1f}%) - processed {processed}, inserted {inserted} - elapsed {elapsed.total_seconds():.1f}s" ) print(f"Finished. Total processed: {processed}, inserted: {inserted}") except Exception as e: print("Error saat menjalankan insert_ms_equipment_data:", e) try: connection.rollback() except Exception: pass pass async def insert_lcca_maximo_corrective_data(): connection = None connection_wo_db = None production_connection = None finished_data = [] errors = [] inserted_count = 0 error_count = 0 try: connection, connection_wo_db = get_connection() production_connection = get_production_connection() if connection is None or connection_wo_db is None or production_connection is None: print("Database connection failed.") return # start total timer start_time = datetime.now() print(f"Start insert_lcca_maximo_corrective_data at {start_time.isoformat()}") cursor_db_app = connection.cursor(cursor_factory=DictCursor) cursor_wo = connection_wo_db.cursor(cursor_factory=DictCursor) cursor_production = production_connection.cursor(cursor_factory=DictCursor) check_data_query = "SELECT COUNT(*) FROM lcc_equipment_tr_data LIMIT 1" cursor_db_app.execute(check_data_query) data_count = cursor_db_app.fetchone()[0] # if data_count > 0: # truncate_query = "TRUNCATE TABLE lcc_equipment_tr_data" # cursor_db_app.execute(truncate_query) query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master" cursor_db_app.execute(query_main) results = cursor_db_app.fetchall() if not results: print("No assetnum found in ms_equipment_master") return print(f"Found {len(results)} assetnum entries to process.") current_year = datetime.now().year for row in results: asset_start = datetime.now() assetnum = row["assetnum"] data_corrective_maintenance = get_recursive_query( cursor_production, assetnum, worktype="CM" ) print(data_corrective_maintenance) start_year = 2015 end_year = 2056 seq = 0 for year in range(start_year, end_year): # corrective_row = next( # (r for r in data_corrective_maintenance if r["tahun"] == year), None # ) corrective_row = next( (r for r in data_corrective_maintenance), None ) # if corrective_row: insert_query = """ INSERT INTO lcc_equipment_tr_data ( id, assetnum, tahun, seq, is_actual, raw_cm_material_cost, raw_cm_labor_time, rc_cm_material_cost ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """ try: # find corrective_row for the specific year (may be None) # don't filter by tahun — take rows sequentially for each year in the range corrective_row = ( data_corrective_maintenance[seq] if isinstance(data_corrective_maintenance, (list, tuple)) and seq < len(data_corrective_maintenance) else None ) raw_cm_material_cost = ( corrective_row["raw_corrective_material_cost"] if corrective_row and corrective_row.get("raw_corrective_material_cost") is not None else 0 ) raw_cm_labor_time = ( corrective_row["man_hour_peryear"] if corrective_row and corrective_row.get("man_hour_peryear") is not None else 0 ) rc_cm_material_cost = raw_cm_material_cost cursor_db_app.execute( insert_query, ( str(uuid4()), # id assetnum, # assetnum year, # tahun seq, # seq (0 if year > current_year + 1 else 1), # is_actual raw_cm_material_cost, # raw_cm_material_cost raw_cm_labor_time, # raw_cm_labor_time rc_cm_material_cost, # rc_cm_material_cost ), ) # commit per successful insert to allow continuing on later errors connection.commit() inserted_count += 1 finished_data.append({"assetnum": assetnum, "year": year}) print(f"Corrective data inserted for {assetnum} in year {year}") except Exception as e: # rollback the failed statement so the transaction is usable again try: connection.rollback() except Exception: pass error_count += 1 errors.append({"assetnum": assetnum, "year": year, "error": str(e)}) print(f"Error inserting {assetnum} year {year}: {e}") seq += 1 asset_elapsed = datetime.now() - asset_start print(f"Processed asset {assetnum} in {asset_elapsed.total_seconds():.2f}s") # final commit for safety (no-op if nothing pending) try: connection.commit() except Exception: pass except Exception as e: print("Error saat menjalankan insert_lcca_maximo_corrective_data:", e) try: connection.rollback() except Exception: pass finally: total_elapsed = None try: total_elapsed = datetime.now() - start_time except Exception: pass print("========Process finished and connection closed.========") print(f"Inserted rows: {inserted_count}, Errors: {error_count}") if total_elapsed is not None: print(f"Total elapsed time: {total_elapsed.total_seconds():.2f}s") if errors: print(f"Sample error: {errors[0]}") if connection or connection_wo_db or production_connection: cursor_db_app.close() cursor_wo.close() cursor_production.close() connection.close() connection_wo_db.close() production_connection.close() async def insert_acquisition_cost_data(): connection = None connection_wo_db = None cursor = None try: connection, connection_wo_db = get_connection() if connection is None or connection_wo_db is None: print("Database connection failed.") return start_time = datetime.now() print(f"Start insert_acquisition_cost_data at {start_time.isoformat()}") # Ambil data dari tabel lcc_ms_equipment_data join dengan ms_equipment_master dari pada kolom assetnum dengan # select assetnum dan location_tag location_tag_query = """ SELECT em.assetnum, em.location_tag, em.name FROM lcc_ms_equipment_data AS ed JOIN ms_equipment_master AS em ON ed.assetnum = em.assetnum; """ cursor = connection.cursor(cursor_factory=DictCursor) cursor.execute(location_tag_query) location_tag_results = cursor.fetchall() if not location_tag_results: print("No equipment data found to update.") return csv_path = os.path.join(os.path.dirname(__file__), "acquisition_cost.csv") csv_lookup = _load_acquisition_cost_lookup(csv_path) if not csv_lookup: print("CSV file does not contain any usable rows.") return update_query = """ UPDATE lcc_ms_equipment_data SET proportion = %s, category_no = %s, acquisition_cost = %s, updated_at = NOW() WHERE assetnum = %s """ updated_assets = 0 skipped_missing_csv = 0 skipped_missing_tag = 0 progress_rows = [] processed_csv_tags = set() for idx, row in enumerate(location_tag_results, start=1): assetnum = row["assetnum"] location_tag_value = row["location_tag"] equipment_name = row.get("name") normalized_tag = (location_tag_value or "").strip().upper() if not normalized_tag: skipped_missing_tag += 1 print(f"[{idx}] Skipping asset {assetnum}: missing location_tag") progress_rows.append( { "assetnum": assetnum, "location_tag": location_tag_value or "", "name": equipment_name or "", "status": "missing_tag", } ) continue csv_row = csv_lookup.get(normalized_tag) if not csv_row: skipped_missing_csv += 1 print(f"[{idx}] No CSV match for asset {assetnum} (location_tag={location_tag_value})") progress_rows.append( { "assetnum": assetnum, "location_tag": location_tag_value or "", "name": equipment_name or "", "status": "no_csv_match", } ) continue processed_csv_tags.add(normalized_tag) try: cursor.execute( update_query, ( csv_row["proportion"], csv_row["category_no"], csv_row["acquisition_cost"], assetnum, ), ) if cursor.rowcount: updated_assets += 1 progress_rows.append( { "assetnum": assetnum, "location_tag": location_tag_value or "", "name": equipment_name or "", "status": "updated", } ) else: progress_rows.append( { "assetnum": assetnum, "location_tag": location_tag_value or "", "name": equipment_name or "", "status": "to_do", } ) except Exception as exc: try: connection.rollback() except Exception: pass print(f"[{idx}] Error updating asset {assetnum}: {exc}") progress_rows.append( { "assetnum": assetnum, "location_tag": location_tag_value or "", "name": equipment_name or "", "status": "to_do", } ) continue if idx % 100 == 0: try: connection.commit() except Exception: connection.rollback() print( f"Processed {idx} assets so far. Updated {updated_assets}, " f"no CSV match {skipped_missing_csv}, missing tag {skipped_missing_tag}." ) # Capture CSV rows that never matched any asset so the checklist highlights remaining work. unused_csv_tags = [ (tag, data) for tag, data in csv_lookup.items() if tag not in processed_csv_tags ] if unused_csv_tags: for unused_tag, csv_row in unused_csv_tags: progress_rows.append( { "assetnum": "", "location_tag": csv_row.get("raw_location_tag") or unused_tag, "name": "", "status": "csv_unprocessed", } ) if progress_rows: progress_df = pd.DataFrame(progress_rows) progress_csv_path = os.path.join( os.path.dirname(__file__), "acquisition_cost_progress.csv" ) try: progress_df.to_csv(progress_csv_path, index=False) print(f"Progress checklist saved to {progress_csv_path}") except Exception as exc: print(f"Failed to write progress checklist CSV: {exc}") try: connection.commit() except Exception as exc: print(f"Commit failed: {exc}") connection.rollback() duration = datetime.now() - start_time print( f"Finished insert_acquisition_cost_data in {duration.total_seconds():.2f}s. " f"Updated {updated_assets} assets, missing CSV {skipped_missing_csv}, missing tag {skipped_missing_tag}." ) except Exception as e: print("Error saat menjalankan insert_acquisition_cost_data:", e) finally: if cursor: try: cursor.close() except Exception: pass if connection or connection_wo_db: try: connection.close() except Exception: pass try: connection_wo_db.close() except Exception: pass async def query_data(): connection = None connection_wo_db = None connection_production_wo = None try: # Mendapatkan koneksi dari config.py connection, connection_wo_db = get_connection() connection_production_wo = get_production_connection() if connection is None or connection_wo_db is None or connection_production_wo is None: print("Database connection failed.") return # Membuat cursor menggunakan DictCursor cursor = connection.cursor(cursor_factory=DictCursor) cursor_wo = connection_production_wo.cursor(cursor_factory=DictCursor) insert_query = """ INSERT INTO lcc_equipment_tr_data ( id, assetnum, tahun, seq, is_actual, raw_cm_interval, raw_cm_material_cost, raw_cm_labor_time, raw_cm_labor_human, raw_pm_interval, raw_pm_material_cost, raw_pm_labor_time, raw_pm_labor_human, raw_oh_interval, raw_oh_material_cost, raw_oh_labor_time, raw_oh_labor_human, raw_predictive_interval, raw_predictive_material_cost, raw_predictive_labor_time, raw_predictive_labor_human, "raw_loss_output_MW", raw_loss_output_price, "rc_cm_material_cost", "rc_cm_labor_cost", "rc_pm_material_cost", "rc_pm_labor_cost", "rc_oh_material_cost", "rc_oh_labor_cost", "rc_predictive_labor_cost", created_by, created_at ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'Sys', NOW() ) """ update_query = """ UPDATE lcc_equipment_tr_data SET seq = %s, is_actual = %s, raw_cm_interval = %s, raw_cm_material_cost = %s, raw_cm_labor_time = %s, raw_cm_labor_human = %s, raw_pm_interval = %s, raw_pm_material_cost = %s, raw_pm_labor_time = %s, raw_pm_labor_human = %s, raw_oh_interval = %s, raw_oh_material_cost = %s, raw_oh_labor_time = %s, raw_oh_labor_human = %s, raw_predictive_interval = %s, raw_predictive_material_cost = %s, raw_predictive_labor_time = %s, raw_predictive_labor_human = %s, "raw_loss_output_MW" = %s, raw_loss_output_price = %s, "rc_cm_material_cost" = %s, "rc_cm_labor_cost" = %s, "rc_pm_material_cost" = %s, "rc_pm_labor_cost" = %s, "rc_oh_material_cost" = %s, "rc_oh_labor_cost" = %s, "rc_predictive_labor_cost" = %s, updated_by = 'Sys', updated_at = NOW() WHERE assetnum = %s AND tahun = %s """ # TRUNCATE DATA # truncate_query = "TRUNCATE TABLE lcc_equipment_tr_data RESTART IDENTITY" # cursor.execute(truncate_query) # Query untuk mendapatkan semua data dari tabel `lcc_ms_equipment_data` # query_main = "SELECT * FROM lcc_ms_equipment_data" query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master" cursor.execute(query_main) # Fetch semua hasil query results = cursor.fetchall() # Tahun sekarang current_year = datetime.now().year total_assets = len(results) processed_assets = 0 total_inserted = 0 overall_start = datetime.now() print(f"Starting processing {total_assets} assets at {overall_start.isoformat()}") # Looping untuk setiap assetnum for idx, row in enumerate(results, start=1): assetnum = row["assetnum"] # Mengambil assetnum dari hasil query if not assetnum or str(assetnum).strip() == "": print(f"[{idx}/{total_assets}] Skipping empty assetnum") continue # forecasting_start_year = row["forecasting_start_year"] - 1 forecasting_start_year = 2014 asset_start = datetime.now() processed_assets += 1 years_processed = 0 inserted_this_asset = 0 # CM data_cm = get_recursive_query(cursor_wo, assetnum, worktype="CM") # PM data_pm = get_recursive_query(cursor_wo, assetnum, worktype="PM") # PDM = Predictive Maintenance data_predictive = get_recursive_query(cursor_wo, assetnum, worktype="PDM") # OH data_oh = get_recursive_query(cursor_wo, assetnum, worktype="OH") # Data Tahun data_tahunan = get_data_tahun(cursor) seq = 0 # Looping untuk setiap tahun for year in range(forecasting_start_year, current_year + 1): years_processed += 1 # print(f"Processing assetnum {assetnum} in year {year}") # Filter data berdasarkan tahun (support both 'tahun' and 'year' column names) data_cm_row = next( (r for r in data_cm if (r.get("tahun") == year or r.get("year") == year)), None, ) # CM Corrective Maintenance data_pm_row = next( (r for r in data_pm if (r.get("tahun") == year or r.get("year") == year)), None, ) data_oh_row = next( (r for r in data_oh if (r.get("tahun") == year or r.get("year") == year)), None, ) data_predictive_row = next( (r for r in data_predictive if (r.get("tahun") == year or r.get("year") == year)), None, ) data_tahunan_row = next( (r for r in data_tahunan if (r.get("tahun") == year or r.get("year") == year)), None, ) # Cek apakah data sudah ada check_query = """ SELECT COUNT(*) FROM lcc_equipment_tr_data WHERE assetnum = %s AND tahun = %s """ try: cursor.execute(check_query, (assetnum, year)) data_exists = cursor.fetchone()[0] # print("Data exists for assetnum", assetnum) except Exception as e: print(f"Error checking data for assetnum {assetnum}: {e}") continue row_values = _build_tr_row_values( data_cm_row, data_pm_row, data_oh_row, data_predictive_row, data_tahunan_row, ) if not data_exists: cursor.execute( insert_query, ( str(uuid4()), assetnum, year, seq, 1, row_values["raw_cm_interval"], row_values["raw_cm_material_cost"], row_values["raw_cm_labor_time"], row_values["raw_cm_labor_human"], row_values["raw_pm_interval"], row_values["raw_pm_material_cost"], row_values["raw_pm_labor_time"], row_values["raw_pm_labor_human"], row_values["raw_oh_interval"], row_values["raw_oh_material_cost"], row_values["raw_oh_labor_time"], row_values["raw_oh_labor_human"], row_values["raw_predictive_interval"], row_values["raw_predictive_material_cost"], row_values["raw_predictive_labor_time"], row_values["raw_predictive_labor_human"], row_values["raw_loss_output_MW"], row_values["raw_loss_output_price"], row_values["rc_cm_material_cost"], row_values["rc_cm_labor_cost"], row_values["rc_pm_material_cost"], row_values["rc_pm_labor_cost"], row_values["rc_oh_material_cost"], row_values["rc_oh_labor_cost"], row_values["rc_predictive_labor_cost"], ), ) inserted_this_asset += 1 total_inserted += 1 else: cursor.execute( update_query, ( seq, 1, row_values["raw_cm_interval"], row_values["raw_cm_material_cost"], row_values["raw_cm_labor_time"], row_values["raw_cm_labor_human"], row_values["raw_pm_interval"], row_values["raw_pm_material_cost"], row_values["raw_pm_labor_time"], row_values["raw_pm_labor_human"], row_values["raw_oh_interval"], row_values["raw_oh_material_cost"], row_values["raw_oh_labor_time"], row_values["raw_oh_labor_human"], row_values["raw_predictive_interval"], row_values["raw_predictive_material_cost"], row_values["raw_predictive_labor_time"], row_values["raw_predictive_labor_human"], row_values["raw_loss_output_MW"], row_values["raw_loss_output_price"], row_values["rc_cm_material_cost"], row_values["rc_cm_labor_cost"], row_values["rc_pm_material_cost"], row_values["rc_pm_labor_cost"], row_values["rc_oh_material_cost"], row_values["rc_oh_labor_cost"], row_values["rc_predictive_labor_cost"], assetnum, year, ), ) seq = seq + 1 # commit per asset to persist progress and free transaction try: connection.commit() except Exception: try: connection.rollback() except Exception: pass asset_elapsed = datetime.now() - asset_start total_elapsed = datetime.now() - overall_start pct_assets = (idx / total_assets) * 100 if total_assets else 100 # progress per asset print( f"[{idx}/{total_assets}] Asset {assetnum} processed. " f"Inserted this asset: {inserted_this_asset}. " f"Asset time: {asset_elapsed.total_seconds():.2f}s. " f"Total inserted: {total_inserted}. " f"Overall elapsed: {total_elapsed.total_seconds():.2f}s. " f"Progress: {pct_assets:.1f}%" ) # periodic summary every 10 assets if idx % 10 == 0 or idx == total_assets: print(f"SUMMARY: {idx}/{total_assets} assets processed, {total_inserted} rows inserted, elapsed {total_elapsed.total_seconds():.1f}s") # Commit perubahan try: connection.commit() except Exception: try: connection.rollback() except Exception: pass print(f"Finished processing all assets. Total assets: {total_assets}, total inserted: {total_inserted}, total time: {(datetime.now()-overall_start).total_seconds():.2f}s") except Exception as e: print("Error saat menjalankan query:", e) finally: # Menutup koneksi print("========Process finished and connection closed.========") if connection or connection_wo_db: try: cursor.close() except Exception: pass try: cursor_wo.close() except Exception: pass try: connection.close() except Exception: pass try: connection_wo_db.close() except Exception: pass # print("========Process finished and connection closed.========") if __name__ == "__main__": async def main(): # await insert_ms_equipment_data() # await query_data() print("insert_actual_data.py is called") asyncio.run(main())