You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1183 lines
45 KiB
Python

import asyncio
import pandas as pd
from decimal import Decimal, InvalidOperation
import psycopg2
from psycopg2.extras import DictCursor
from uuid import uuid4
from datetime import datetime
import sys
import os
import httpx
from src.modules.config import get_connection, get_production_connection
from .where_query_sql import get_where_query_sql
async def fetch_api_data(
assetnum: str, year: int, RELIABILITY_APP_URL: str, token: str
) -> dict:
url = RELIABILITY_APP_URL
# print(f"Using URL: {url}") # Add this for debugging
async with httpx.AsyncClient() as client:
# print(
# f"{url}/main/number-of-failures/{assetnum}/{int(year)}/{int(year)}"
# )
try:
response = await client.get(
f"{url}/main/failures/{assetnum}/{int(year)}/{int(year)}",
timeout=30.0,
headers={"Authorization": f"Bearer {token}"},
)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
print(f"HTTP error occurred: {e}")
return {}
def get_recursive_query(cursor, assetnum, worktype="CM"):
"""
Fungsi untuk menjalankan query rekursif berdasarkan assetnum dan worktype.
worktype memiliki nilai default 'CM'.
"""
# query = f"""
# SELECT
# ROW_NUMBER() OVER (ORDER BY tbl.assetnum, tbl.year, tbl.worktype) AS seq,
# *
# FROM (
# SELECT
# a.worktype,
# a.assetnum,
# EXTRACT(YEAR FROM a.reportdate) AS year,
# COUNT(a.wonum) AS raw_corrective_failure_interval,
# SUM(a.total_cost_max) AS raw_corrective_material_cost,
# ROUND(
# SUM(
# EXTRACT(EPOCH FROM (
# a.actfinish -
# a.actstart
# ))
# ) / 3600
# , 2) AS raw_corrective_labor_time_jam,
# SUM(a.jumlah_labor) AS raw_corrective_labor_technician
# FROM
# public.wo_staging_3 AS a
# WHERE
# a.unit = '3'
# GROUP BY
# a.worktype,
# a.assetnum,
# EXTRACT(YEAR FROM a.reportdate)
# ) AS tbl
# WHERE
# tbl.worktype = '{worktype}'
# AND tbl.assetnum = '{assetnum}'
# ORDER BY
# tbl.assetnum,
# tbl.year,
# tbl.worktype
# """
# query = f"""
# select d.tahun, SUM(d.actmatcost) AS raw_corrective_material_cost, sum(d.man_hour) as man_hour_peryear from
# (
# SELECT
# a.wonum,
# a.actmatcost,
# DATE_PART('year', a.reportdate) AS tahun,
# (
# ROUND(SUM(EXTRACT(EPOCH FROM (a.actfinish - a.actstart)) / 3600), 2)
# ) AS man_hour,
# CASE
# WHEN COUNT(b.laborcode) = 0 THEN 3
# ELSE COUNT(b.laborcode)
# END AS man_count
# FROM public.wo_maximo AS a
# LEFT JOIN public.wo_maximo_labtrans AS b
# ON b.wonum = a.wonum
# WHERE
# a.asset_unit = '3'
# AND a.worktype = '{worktype}'
# AND a.asset_assetnum = '{assetnum}'
# and a.wonum not like 'T%'
# GROUP BY
# a.wonum,
# a.actmatcost,
# DATE_PART('year', a.reportdate)
# ) as d group by d.tahun
# ;
# """
where_query = get_where_query_sql(assetnum, worktype)
query = f"""
select
DATE_PART('year', a.reportdate) as tahun,
COUNT(DISTINCT a.wonum) as raw_{worktype.lower()}_interval,
sum(a.actmatcost) as raw_{worktype.lower()}_material_cost
from public.wo_maximo as a
{where_query}
group by DATE_PART('year', a.reportdate);
"""
# Eksekusi query dan fetch hasil
cursor.execute(query)
return cursor.fetchall()
def get_data_tahun(cursor):
query = f"""
select * from lcc_ms_year_data
"""
# Eksekusi query dan fetch hasil
cursor.execute(query)
return cursor.fetchall()
def get_labour_cost_totals(cursor, assetnum: str, worktype: str) -> dict:
"""Return yearly labor cost totals for a worktype using the standardized query."""
if not assetnum or not worktype:
return {}
where_query = get_where_query_sql(assetnum, worktype)
query = f"""
SELECT
EXTRACT(YEAR FROM x.reportdate)::int AS tahun,
SUM(x.upah_per_wonum) AS total_upah_per_tahun
FROM (
SELECT
bw.wonum,
bw.reportdate,
bw.jumlah_jam_kerja,
CASE
WHEN COUNT(b.laborcode) FILTER (WHERE b.laborcode IS NOT NULL) > 0 THEN
SUM(
COALESCE(emp_cost.salary_per_hour_idr, (select salary_per_hour_idr from lcc_manpower_cost where upper(staff_job_level)= upper('Junior') limit 1))
* bw.jumlah_jam_kerja
)
ELSE
3 * (select salary_per_hour_idr from lcc_manpower_cost where upper(staff_job_level)= upper('Junior') limit 1) * bw.jumlah_jam_kerja
END AS upah_per_wonum
FROM (
SELECT
a.wonum,
a.reportdate,
CASE
WHEN EXTRACT(EPOCH FROM (a.actfinish - a.actstart)) / 3600.0 = 0 THEN 1
WHEN EXTRACT(EPOCH FROM (a.actfinish - a.actstart)) / 3600.0 > 730 THEN 1
ELSE EXTRACT(EPOCH FROM (a.actfinish - a.actstart)) / 3600.0
END AS jumlah_jam_kerja
FROM public.wo_maximo a
{where_query}
) bw
LEFT JOIN public.wo_maximo_labtrans b
ON b.wonum = bw.wonum
LEFT JOIN lcc_ms_manpower emp
ON UPPER(TRIM(emp."ID Number")) = UPPER(TRIM(b.laborcode))
LEFT JOIN lcc_manpower_cost emp_cost
ON UPPER(TRIM(emp_cost.staff_job_level)) = UPPER(TRIM(emp."Position"))
GROUP BY
bw.wonum, bw.reportdate, bw.jumlah_jam_kerja
) x
GROUP BY 1
ORDER BY 1;
"""
try:
cursor.execute(query)
rows = cursor.fetchall()
except Exception as exc:
print(f"Error fetching labour cost for {assetnum} ({worktype}): {exc}")
return {}
labour_costs = {}
for row in rows:
if isinstance(row, dict):
tahun_value = row.get("tahun")
total_value = row.get("total_upah_per_tahun")
else:
try:
tahun_value, total_value = row[0], row[1]
except (IndexError, TypeError):
continue
try:
year_int = int(tahun_value) if tahun_value is not None else None
except (TypeError, ValueError):
year_int = None
if year_int is None:
continue
try:
labour_costs[year_int] = float(total_value) if total_value is not None else 0.0
except (TypeError, ValueError):
labour_costs[year_int] = 0.0
return labour_costs
def _parse_decimal(value: str, decimal_separator: str = ".") -> Decimal:
"""Parse numeric strings that may use comma decimal separators."""
if value is None:
return Decimal("0")
sanitized = value.strip()
if not sanitized:
return Decimal("0")
sanitized = sanitized.replace(" ", "")
if decimal_separator == ",":
sanitized = sanitized.replace(".", "").replace(",", ".")
else:
sanitized = sanitized.replace(",", "")
try:
return Decimal(sanitized)
except (InvalidOperation, ValueError):
print(f"Unable to parse numeric value '{value}', defaulting to 0.")
return Decimal("0")
def _normalize_key(key: str) -> str:
if not key:
return ""
cleaned = key.strip().lstrip("\ufeff").lower()
for char in (" ", ".", "-", "\t"):
cleaned = cleaned.replace(char, "_")
while "__" in cleaned:
cleaned = cleaned.replace("__", "_")
return cleaned
def _load_acquisition_cost_lookup(csv_path: str) -> dict:
if not os.path.exists(csv_path):
print(f"CSV file not found at {csv_path}")
return {}
try:
df = pd.read_csv(csv_path, sep=";", dtype=str, keep_default_na=False, encoding="utf-8")
except Exception as exc:
print(f"Failed to read CSV file {csv_path}: {exc}")
return {}
df.columns = [_normalize_key(col) for col in df.columns]
required_cols = {"location_tag", "proportion", "category_no", "acquisition_cost"}
missing_cols = required_cols - set(df.columns)
if missing_cols:
print(f"CSV file is missing required columns: {', '.join(sorted(missing_cols))}")
return {}
lookup = {}
for _, row in df.iterrows():
raw_tag = (row.get("location_tag") or "").strip()
location_tag = raw_tag.upper()
if not location_tag:
continue
lookup[location_tag] = {
"proportion": _parse_decimal(row.get("proportion"), decimal_separator=","),
"category_no": _parse_decimal(row.get("category_no"), decimal_separator="."),
"acquisition_cost": _parse_decimal(row.get("acquisition_cost"), decimal_separator="."),
"raw_location_tag": raw_tag,
}
return lookup
def _build_tr_row_values(
data_cm_row,
data_pm_row,
data_oh_row,
data_predictive_row,
data_tahunan_row,
year=None,
labour_cost_lookup=None,
):
"""Return sanitized numeric values for equipment transaction rows."""
def _safe_value(row, key):
if not row:
return 0
value = row.get(key)
return value if value is not None else 0
has_recursive_data = any(
row for row in (data_cm_row, data_pm_row, data_oh_row, data_predictive_row)
)
if not has_recursive_data:
return {
"raw_cm_interval": 0,
"raw_cm_material_cost": 0,
"raw_cm_labor_time": 0,
"raw_cm_labor_human": 0,
"raw_pm_interval": 0,
"raw_pm_material_cost": 0,
"raw_pm_labor_time": 0,
"raw_pm_labor_human": 0,
"raw_oh_interval": 0,
"raw_oh_material_cost": 0,
"raw_oh_labor_time": 0,
"raw_oh_labor_human": 0,
"raw_predictive_interval": 0,
"raw_predictive_material_cost": 0,
"raw_predictive_labor_time": 0,
"raw_predictive_labor_human": 0,
"rc_cm_material_cost": 0,
"rc_cm_labor_cost": 0,
"rc_pm_material_cost": 0,
"rc_pm_labor_cost": 0,
"rc_oh_material_cost": 0,
"rc_oh_labor_cost": 0,
"rc_predictive_labor_cost": 0,
}
raw_cm_interval = _safe_value(data_cm_row, "raw_cm_interval")
raw_cm_material_cost_total = _safe_value(data_cm_row, "raw_cm_material_cost")
raw_cm_material_cost = (
raw_cm_material_cost_total / raw_cm_interval if raw_cm_interval else 0
)
raw_cm_labor_time = _safe_value(data_cm_row, "raw_cm_labor_time")
raw_cm_labor_human = _safe_value(data_cm_row, "raw_cm_labor_human")
raw_pm_interval = _safe_value(data_pm_row, "raw_pm_interval")
raw_pm_material_cost = _safe_value(data_pm_row, "raw_pm_material_cost")
raw_pm_labor_time = _safe_value(data_pm_row, "raw_pm_labor_time")
raw_pm_labor_human = _safe_value(data_pm_row, "raw_pm_labor_human")
raw_oh_interval = _safe_value(data_oh_row, "raw_oh_interval")
raw_oh_material_cost = _safe_value(data_oh_row, "raw_oh_material_cost")
raw_oh_labor_time = _safe_value(data_oh_row, "raw_oh_labor_time")
raw_oh_labor_human = _safe_value(data_oh_row, "raw_oh_labor_human")
raw_pdm_interval = _safe_value(data_predictive_row, "raw_predictive_interval")
raw_pdm_material_cost = _safe_value(
data_predictive_row, "raw_predictive_material_cost"
)
raw_pdm_labor_time = _safe_value(
data_predictive_row, "raw_predictive_labor_time"
)
raw_pdm_labor_human = _safe_value(
data_predictive_row, "raw_predictive_labor_human"
)
man_hour_value = (
data_tahunan_row.get("man_hour")
if data_tahunan_row and data_tahunan_row.get("man_hour") is not None
else None
)
rc_cm_material_cost = raw_cm_material_cost_total
# rc_cm_labor_cost = (
# data_cm_row.get("raw_cm_labor_time")
# * data_cm_row.get("rc_cm_labor_human")
# * man_hour_value
# if data_cm_row
# and data_cm_row.get("rc_cm_labor_cost")
# and data_cm_row.get("rc_cm_labor_human")
# and man_hour_value is not None
# else 0
# )
rc_pm_material_cost = raw_pm_material_cost
# rc_pm_labor_cost = (
# data_pm_row.get("raw_pm_labor_time")
# * data_pm_row.get("rc_pm_labor_human")
# * man_hour_value
# if data_pm_row
# and data_pm_row.get("rc_pm_labor_cost")
# and data_pm_row.get("rc_pm_labor_human")
# and man_hour_value is not None
# else 0
# )
rc_oh_material_cost = raw_oh_material_cost
# rc_oh_labor_cost = (
# data_oh_row.get("raw_oh_labor_time")
# * data_oh_row.get("rc_oh_labor_human")
# * man_hour_value
# if data_oh_row
# and data_oh_row.get("rc_oh_labor_cost")
# and data_oh_row.get("rc_oh_labor_human")
# and man_hour_value is not None
# else 0
# )
# rc_predictive_labor_cost = (
# data_predictive_row.get("raw_predictive_labor_human") * man_hour_value
# if data_predictive_row
# and data_predictive_row.get("rc_predictive_labor_cost")
# and man_hour_value is not None
# else 0
# )
if labour_cost_lookup and year is not None:
cm_lookup = labour_cost_lookup.get("CM", {})
pm_lookup = labour_cost_lookup.get("PM", {})
oh_lookup = labour_cost_lookup.get("OH", {})
pdm_lookup = labour_cost_lookup.get("PDM", {})
cm_value = cm_lookup.get(year)
pm_value = pm_lookup.get(year)
oh_value = oh_lookup.get(year)
pdm_value = pdm_lookup.get(year)
rc_cm_labor_cost = float(cm_value) if cm_value is not None else 0.0
rc_pm_labor_cost = float(pm_value) if pm_value is not None else 0.0
rc_oh_labor_cost = float(oh_value) if oh_value is not None else 0.0
rc_predictive_labor_cost = float(pdm_value) if pdm_value is not None else 0.0
return {
"raw_cm_interval": raw_cm_interval,
"raw_cm_material_cost": raw_cm_material_cost,
"raw_cm_labor_time": raw_cm_labor_time,
"raw_cm_labor_human": raw_cm_labor_human,
"raw_pm_interval": raw_pm_interval,
"raw_pm_material_cost": raw_pm_material_cost,
"raw_pm_labor_time": raw_pm_labor_time,
"raw_pm_labor_human": raw_pm_labor_human,
"raw_oh_interval": raw_oh_interval,
"raw_oh_material_cost": raw_oh_material_cost,
"raw_oh_labor_time": raw_oh_labor_time,
"raw_oh_labor_human": raw_oh_labor_human,
"raw_predictive_interval": raw_pdm_interval,
"raw_predictive_material_cost": raw_pdm_material_cost,
"raw_predictive_labor_time": raw_pdm_labor_time,
"raw_predictive_labor_human": raw_pdm_labor_human,
"rc_cm_material_cost": rc_cm_material_cost,
"rc_cm_labor_cost": rc_cm_labor_cost,
"rc_pm_material_cost": rc_pm_material_cost,
"rc_pm_labor_cost": rc_pm_labor_cost,
"rc_oh_material_cost": rc_oh_material_cost,
"rc_oh_labor_cost": rc_oh_labor_cost,
"rc_predictive_labor_cost": rc_predictive_labor_cost,
}
async def insert_ms_equipment_data():
connection = None
try:
connection, connection_wo_db = get_connection()
cursor_db_app = connection.cursor(cursor_factory=DictCursor)
query_main = f"SELECT DISTINCT(assetnum) FROM ms_equipment_master WHERE assetnum = 'A27860'"
cursor_db_app.execute(query_main)
results = cursor_db_app.fetchall()
inserted = 0
processed = 0
total = len(results)
if total == 0:
print("No assetnum to insert.")
else:
start_time = datetime.now()
print(f"Starting insert of {total} assetnum into lcc_ms_equipment_data")
for idx, row in enumerate(results, start=1):
assetnum = row.get("assetnum")
try:
# skip null/empty assetnum
if not assetnum:
print(f"[{idx}/{total}] Skipping empty assetnum")
else:
# check existing
cursor_db_app.execute(
"SELECT 1 FROM lcc_ms_equipment_data WHERE assetnum = %s LIMIT 1",
(assetnum,),
)
if cursor_db_app.fetchone():
print(f"[{idx}/{total}] Already exists: {assetnum}")
else:
# provide an id since the table enforces NOT NULL on id
cursor_db_app.execute(
"INSERT INTO lcc_ms_equipment_data (id, assetnum) VALUES (%s, %s)",
(str(uuid4()), assetnum),
)
connection.commit()
inserted += 1
print(f"[{idx}/{total}] Inserted: {assetnum}")
except Exception as e:
try:
connection.rollback()
except Exception:
pass
print(f"[{idx}/{total}] Error inserting {assetnum}: {e}")
processed += 1
# progress monitoring every 10 items and at end
if idx % 10 == 0 or idx == total:
elapsed = datetime.now() - start_time
pct = (idx / total) * 100 if total else 100
print(
f"Progress: {idx}/{total} ({pct:.1f}%) - processed {processed}, inserted {inserted} - elapsed {elapsed.total_seconds():.1f}s"
)
print(f"Finished. Total processed: {processed}, inserted: {inserted}")
except Exception as e:
print("Error saat menjalankan insert_ms_equipment_data:", e)
try:
connection.rollback()
except Exception:
pass
pass
async def insert_lcca_maximo_corrective_data():
connection = None
connection_wo_db = None
production_connection = None
finished_data = []
errors = []
inserted_count = 0
error_count = 0
try:
connection, connection_wo_db = get_connection()
production_connection = get_production_connection()
if connection is None or connection_wo_db is None or production_connection is None:
print("Database connection failed.")
return
# start total timer
start_time = datetime.now()
print(f"Start insert_lcca_maximo_corrective_data at {start_time.isoformat()}")
cursor_db_app = connection.cursor(cursor_factory=DictCursor)
cursor_wo = connection_wo_db.cursor(cursor_factory=DictCursor)
cursor_production = production_connection.cursor(cursor_factory=DictCursor)
check_data_query = "SELECT COUNT(*) FROM lcc_equipment_tr_data LIMIT 1"
cursor_db_app.execute(check_data_query)
data_count = cursor_db_app.fetchone()[0]
# if data_count > 0:
# truncate_query = "TRUNCATE TABLE lcc_equipment_tr_data"
# cursor_db_app.execute(truncate_query)
query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master"
cursor_db_app.execute(query_main)
results = cursor_db_app.fetchall()
if not results:
print("No assetnum found in ms_equipment_master")
return
print(f"Found {len(results)} assetnum entries to process.")
current_year = datetime.now().year
for row in results:
asset_start = datetime.now()
assetnum = row["assetnum"]
data_corrective_maintenance = get_recursive_query(
cursor_production, assetnum, worktype="CM"
)
# print(data_corrective_maintenance)
start_year = 2015
end_year = 2056
seq = 0
for year in range(start_year, end_year):
# corrective_row = next(
# (r for r in data_corrective_maintenance if r["tahun"] == year), None
# )
corrective_row = next(
(r for r in data_corrective_maintenance), None
)
# if corrective_row:
insert_query = """
INSERT INTO lcc_equipment_tr_data (
id, assetnum, tahun, seq, is_actual,
raw_cm_material_cost,
raw_cm_labor_time, rc_cm_material_cost
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
try:
# find corrective_row for the specific year (may be None)
# don't filter by tahun — take rows sequentially for each year in the range
corrective_row = (
data_corrective_maintenance[seq]
if isinstance(data_corrective_maintenance, (list, tuple)) and seq < len(data_corrective_maintenance)
else None
)
raw_cm_material_cost = (
corrective_row["raw_corrective_material_cost"]
if corrective_row and corrective_row.get("raw_corrective_material_cost") is not None
else 0
)
raw_cm_labor_time = (
corrective_row["man_hour_peryear"]
if corrective_row and corrective_row.get("man_hour_peryear") is not None
else 0
)
rc_cm_material_cost = raw_cm_material_cost
cursor_db_app.execute(
insert_query,
(
str(uuid4()), # id
assetnum, # assetnum
year, # tahun
seq, # seq
(0 if year > current_year + 1 else 1), # is_actual
raw_cm_material_cost, # raw_cm_material_cost
raw_cm_labor_time, # raw_cm_labor_time
rc_cm_material_cost, # rc_cm_material_cost
),
)
# commit per successful insert to allow continuing on later errors
connection.commit()
inserted_count += 1
finished_data.append({"assetnum": assetnum, "year": year})
print(f"Corrective data inserted for {assetnum} in year {year}")
except Exception as e:
# rollback the failed statement so the transaction is usable again
try:
connection.rollback()
except Exception:
pass
error_count += 1
errors.append({"assetnum": assetnum, "year": year, "error": str(e)})
print(f"Error inserting {assetnum} year {year}: {e}")
seq += 1
asset_elapsed = datetime.now() - asset_start
print(f"Processed asset {assetnum} in {asset_elapsed.total_seconds():.2f}s")
# final commit for safety (no-op if nothing pending)
try:
connection.commit()
except Exception:
pass
except Exception as e:
print("Error saat menjalankan insert_lcca_maximo_corrective_data:", e)
try:
connection.rollback()
except Exception:
pass
finally:
total_elapsed = None
try:
total_elapsed = datetime.now() - start_time
except Exception:
pass
print("========Process finished and connection closed.========")
print(f"Inserted rows: {inserted_count}, Errors: {error_count}")
if total_elapsed is not None:
print(f"Total elapsed time: {total_elapsed.total_seconds():.2f}s")
if errors:
print(f"Sample error: {errors[0]}")
if connection or connection_wo_db or production_connection:
cursor_db_app.close()
cursor_wo.close()
cursor_production.close()
connection.close()
connection_wo_db.close()
production_connection.close()
async def insert_acquisition_cost_data():
connection = None
connection_wo_db = None
cursor = None
try:
connection, connection_wo_db = get_connection()
if connection is None or connection_wo_db is None:
print("Database connection failed.")
return
start_time = datetime.now()
print(f"Start insert_acquisition_cost_data at {start_time.isoformat()}")
# Ambil data dari tabel lcc_ms_equipment_data join dengan ms_equipment_master dari pada kolom assetnum dengan
# select assetnum dan location_tag
location_tag_query = """
SELECT em.assetnum, em.location_tag, em.name
FROM lcc_ms_equipment_data AS ed
JOIN ms_equipment_master AS em ON ed.assetnum = em.assetnum;
"""
cursor = connection.cursor(cursor_factory=DictCursor)
cursor.execute(location_tag_query)
location_tag_results = cursor.fetchall()
if not location_tag_results:
print("No equipment data found to update.")
return
csv_path = os.path.join(os.path.dirname(__file__), "acquisition_cost.csv")
csv_lookup = _load_acquisition_cost_lookup(csv_path)
if not csv_lookup:
print("CSV file does not contain any usable rows.")
return
update_query = """
UPDATE lcc_ms_equipment_data
SET proportion = %s,
category_no = %s,
acquisition_cost = %s,
updated_at = NOW()
WHERE assetnum = %s
"""
updated_assets = 0
skipped_missing_csv = 0
skipped_missing_tag = 0
progress_rows = []
processed_csv_tags = set()
for idx, row in enumerate(location_tag_results, start=1):
assetnum = row["assetnum"]
location_tag_value = row["location_tag"]
equipment_name = row.get("name")
normalized_tag = (location_tag_value or "").strip().upper()
if not normalized_tag:
skipped_missing_tag += 1
print(f"[{idx}] Skipping asset {assetnum}: missing location_tag")
progress_rows.append(
{
"assetnum": assetnum,
"location_tag": location_tag_value or "",
"name": equipment_name or "",
"status": "missing_tag",
}
)
continue
csv_row = csv_lookup.get(normalized_tag)
if not csv_row:
skipped_missing_csv += 1
print(f"[{idx}] No CSV match for asset {assetnum} (location_tag={location_tag_value})")
progress_rows.append(
{
"assetnum": assetnum,
"location_tag": location_tag_value or "",
"name": equipment_name or "",
"status": "no_csv_match",
}
)
continue
processed_csv_tags.add(normalized_tag)
try:
cursor.execute(
update_query,
(
csv_row["proportion"],
csv_row["category_no"],
csv_row["acquisition_cost"],
assetnum,
),
)
if cursor.rowcount:
updated_assets += 1
progress_rows.append(
{
"assetnum": assetnum,
"location_tag": location_tag_value or "",
"name": equipment_name or "",
"status": "updated",
}
)
else:
progress_rows.append(
{
"assetnum": assetnum,
"location_tag": location_tag_value or "",
"name": equipment_name or "",
"status": "to_do",
}
)
except Exception as exc:
try:
connection.rollback()
except Exception:
pass
print(f"[{idx}] Error updating asset {assetnum}: {exc}")
progress_rows.append(
{
"assetnum": assetnum,
"location_tag": location_tag_value or "",
"name": equipment_name or "",
"status": "to_do",
}
)
continue
if idx % 100 == 0:
try:
connection.commit()
except Exception:
connection.rollback()
print(
f"Processed {idx} assets so far. Updated {updated_assets}, "
f"no CSV match {skipped_missing_csv}, missing tag {skipped_missing_tag}."
)
# Capture CSV rows that never matched any asset so the checklist highlights remaining work.
unused_csv_tags = [
(tag, data)
for tag, data in csv_lookup.items()
if tag not in processed_csv_tags
]
if unused_csv_tags:
for unused_tag, csv_row in unused_csv_tags:
progress_rows.append(
{
"assetnum": "",
"location_tag": csv_row.get("raw_location_tag") or unused_tag,
"name": "",
"status": "csv_unprocessed",
}
)
if progress_rows:
progress_df = pd.DataFrame(progress_rows)
progress_csv_path = os.path.join(
os.path.dirname(__file__), "acquisition_cost_progress.csv"
)
try:
progress_df.to_csv(progress_csv_path, index=False)
print(f"Progress checklist saved to {progress_csv_path}")
except Exception as exc:
print(f"Failed to write progress checklist CSV: {exc}")
try:
connection.commit()
except Exception as exc:
print(f"Commit failed: {exc}")
connection.rollback()
duration = datetime.now() - start_time
print(
f"Finished insert_acquisition_cost_data in {duration.total_seconds():.2f}s. "
f"Updated {updated_assets} assets, missing CSV {skipped_missing_csv}, missing tag {skipped_missing_tag}."
)
except Exception as e:
print("Error saat menjalankan insert_acquisition_cost_data:", e)
finally:
if cursor:
try:
cursor.close()
except Exception:
pass
if connection or connection_wo_db:
try:
connection.close()
except Exception:
pass
try:
connection_wo_db.close()
except Exception:
pass
async def query_data(target_assetnum: str = None):
connection = None
connection_wo_db = None
connection_production_wo = None
try:
# Mendapatkan koneksi dari config.py
connection, connection_wo_db = get_connection()
connection_production_wo = get_production_connection()
if connection is None or connection_wo_db is None or connection_production_wo is None:
print("Database connection failed.")
return
# Membuat cursor menggunakan DictCursor
cursor = connection.cursor(cursor_factory=DictCursor)
cursor_wo = connection_production_wo.cursor(cursor_factory=DictCursor)
insert_query = """
INSERT INTO lcc_equipment_tr_data (
id, assetnum, tahun, seq, is_actual,
raw_cm_interval, raw_cm_material_cost, raw_cm_labor_time, raw_cm_labor_human,
raw_pm_interval, raw_pm_material_cost, raw_pm_labor_time, raw_pm_labor_human,
raw_oh_interval, raw_oh_material_cost, raw_oh_labor_time, raw_oh_labor_human,
raw_predictive_interval, raw_predictive_material_cost, raw_predictive_labor_time, raw_predictive_labor_human,
"rc_cm_material_cost", "rc_cm_labor_cost",
"rc_pm_material_cost", "rc_pm_labor_cost",
"rc_oh_material_cost", "rc_oh_labor_cost",
"rc_predictive_labor_cost",
created_by, created_at
) VALUES (
%s, %s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s,
%s, %s,
%s, %s,
%s,
'Sys', NOW()
)
"""
update_query = """
UPDATE lcc_equipment_tr_data
SET seq = %s,
is_actual = %s,
raw_cm_interval = %s,
raw_cm_material_cost = %s,
raw_cm_labor_time = %s,
raw_cm_labor_human = %s,
raw_pm_interval = %s,
raw_pm_material_cost = %s,
raw_pm_labor_time = %s,
raw_pm_labor_human = %s,
raw_oh_interval = %s,
raw_oh_material_cost = %s,
raw_oh_labor_time = %s,
raw_oh_labor_human = %s,
raw_predictive_interval = %s,
raw_predictive_material_cost = %s,
raw_predictive_labor_time = %s,
raw_predictive_labor_human = %s,
"rc_cm_material_cost" = %s,
"rc_cm_labor_cost" = %s,
"rc_pm_material_cost" = %s,
"rc_pm_labor_cost" = %s,
"rc_oh_material_cost" = %s,
"rc_oh_labor_cost" = %s,
"rc_predictive_labor_cost" = %s,
updated_by = 'Sys',
updated_at = NOW()
WHERE assetnum = %s AND tahun = %s
"""
# TRUNCATE DATA
# truncate_query = "TRUNCATE TABLE lcc_equipment_tr_data RESTART IDENTITY"
# cursor.execute(truncate_query)
# Query untuk mendapatkan semua data dari tabel `lcc_ms_equipment_data`
# query_main = "SELECT * FROM lcc_ms_equipment_data"
query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master"
if target_assetnum:
query_main += " WHERE assetnum = %s"
cursor.execute(query_main, (target_assetnum,))
else:
cursor.execute(query_main)
# Fetch semua hasil query
results = cursor.fetchall()
# Tahun sekarang
current_year = datetime.now().year
total_assets = len(results)
processed_assets = 0
total_inserted = 0
overall_start = datetime.now()
print(f"Starting processing {total_assets} assets at {overall_start.isoformat()}")
# Looping untuk setiap assetnum
for idx, row in enumerate(results, start=1):
assetnum = row["assetnum"] # Mengambil assetnum dari hasil query
if not assetnum or str(assetnum).strip() == "":
print(f"[{idx}/{total_assets}] Skipping empty assetnum")
continue
# forecasting_start_year = row["forecasting_start_year"] - 1
forecasting_start_year = 2014
asset_start = datetime.now()
processed_assets += 1
years_processed = 0
inserted_this_asset = 0
# CM
data_cm = get_recursive_query(cursor_wo, assetnum, worktype="CM")
# PM
data_pm = get_recursive_query(cursor_wo, assetnum, worktype="PM")
# PDM = Predictive Maintenance
data_predictive = get_recursive_query(cursor_wo, assetnum, worktype="PDM")
# OH
data_oh = get_recursive_query(cursor_wo, assetnum, worktype="OH")
# Data Tahun
data_tahunan = get_data_tahun(cursor)
labour_cost_lookup = {
"CM": get_labour_cost_totals(cursor_wo, assetnum, "CM"),
"PM": get_labour_cost_totals(cursor_wo, assetnum, "PM"),
"PDM": get_labour_cost_totals(cursor_wo, assetnum, "PDM"),
"OH": get_labour_cost_totals(cursor_wo, assetnum, "OH"),
}
seq = 0
# Looping untuk setiap tahun
for year in range(forecasting_start_year, current_year + 1):
years_processed += 1
# print(f"Processing assetnum {assetnum} in year {year}")
# Filter data berdasarkan tahun (support both 'tahun' and 'year' column names)
data_cm_row = next(
(r for r in data_cm if (r.get("tahun") == year or r.get("year") == year)),
None,
) # CM Corrective Maintenance
data_pm_row = next(
(r for r in data_pm if (r.get("tahun") == year or r.get("year") == year)),
None,
)
data_oh_row = next(
(r for r in data_oh if (r.get("tahun") == year or r.get("year") == year)),
None,
)
data_predictive_row = next(
(r for r in data_predictive if (r.get("tahun") == year or r.get("year") == year)),
None,
)
data_tahunan_row = next(
(r for r in data_tahunan if (r.get("tahun") == year or r.get("year") == year)),
None,
)
# Cek apakah data sudah ada
check_query = """
SELECT COUNT(*) FROM lcc_equipment_tr_data
WHERE assetnum = %s AND tahun = %s
"""
try:
cursor.execute(check_query, (assetnum, year))
data_exists = cursor.fetchone()[0]
# print("Data exists for assetnum", assetnum)
except Exception as e:
print(f"Error checking data for assetnum {assetnum}: {e}")
continue
row_values = _build_tr_row_values(
data_cm_row,
data_pm_row,
data_oh_row,
data_predictive_row,
data_tahunan_row,
year=year,
labour_cost_lookup=labour_cost_lookup,
)
if not data_exists:
cursor.execute(
insert_query,
(
str(uuid4()),
assetnum,
year,
seq,
1,
row_values["raw_cm_interval"],
row_values["raw_cm_material_cost"],
row_values["raw_cm_labor_time"],
row_values["raw_cm_labor_human"],
row_values["raw_pm_interval"],
row_values["raw_pm_material_cost"],
row_values["raw_pm_labor_time"],
row_values["raw_pm_labor_human"],
row_values["raw_oh_interval"],
row_values["raw_oh_material_cost"],
row_values["raw_oh_labor_time"],
row_values["raw_oh_labor_human"],
row_values["raw_predictive_interval"],
row_values["raw_predictive_material_cost"],
row_values["raw_predictive_labor_time"],
row_values["raw_predictive_labor_human"],
row_values["rc_cm_material_cost"],
row_values["rc_cm_labor_cost"],
row_values["rc_pm_material_cost"],
row_values["rc_pm_labor_cost"],
row_values["rc_oh_material_cost"],
row_values["rc_oh_labor_cost"],
row_values["rc_predictive_labor_cost"],
),
)
inserted_this_asset += 1
total_inserted += 1
else:
cursor.execute(
update_query,
(
seq,
1,
row_values["raw_cm_interval"],
row_values["raw_cm_material_cost"],
row_values["raw_cm_labor_time"],
row_values["raw_cm_labor_human"],
row_values["raw_pm_interval"],
row_values["raw_pm_material_cost"],
row_values["raw_pm_labor_time"],
row_values["raw_pm_labor_human"],
row_values["raw_oh_interval"],
row_values["raw_oh_material_cost"],
row_values["raw_oh_labor_time"],
row_values["raw_oh_labor_human"],
row_values["raw_predictive_interval"],
row_values["raw_predictive_material_cost"],
row_values["raw_predictive_labor_time"],
row_values["raw_predictive_labor_human"],
row_values["rc_cm_material_cost"],
row_values["rc_cm_labor_cost"],
row_values["rc_pm_material_cost"],
row_values["rc_pm_labor_cost"],
row_values["rc_oh_material_cost"],
row_values["rc_oh_labor_cost"],
row_values["rc_predictive_labor_cost"],
assetnum,
year,
),
)
inserted_this_asset += 1
total_inserted += 1
seq = seq + 1
# commit per asset to persist progress and free transaction
try:
connection.commit()
except Exception:
try:
connection.rollback()
except Exception:
pass
asset_elapsed = datetime.now() - asset_start
total_elapsed = datetime.now() - overall_start
pct_assets = (idx / total_assets) * 100 if total_assets else 100
# progress per asset
print(
f"[{idx}/{total_assets}] Asset {assetnum} processed. "
f"Inserted this asset: {inserted_this_asset}. "
f"Asset time: {asset_elapsed.total_seconds():.2f}s. "
f"Total inserted: {total_inserted}. "
f"Overall elapsed: {total_elapsed.total_seconds():.2f}s. "
f"Progress: {pct_assets:.1f}%"
)
# periodic summary every 10 assets
if idx % 10 == 0 or idx == total_assets:
print(f"SUMMARY: {idx}/{total_assets} assets processed, {total_inserted} rows inserted, elapsed {total_elapsed.total_seconds():.1f}s")
# Commit perubahan
try:
connection.commit()
except Exception:
try:
connection.rollback()
except Exception:
pass
print(f"Finished processing all assets. Total assets: {total_assets}, total inserted: {total_inserted}, total time: {(datetime.now()-overall_start).total_seconds():.2f}s")
except Exception as e:
print("Error saat menjalankan query:", e)
finally:
# Menutup koneksi
print("========Process finished and connection closed.========")
if connection or connection_wo_db:
try:
cursor.close()
except Exception:
pass
try:
cursor_wo.close()
except Exception:
pass
try:
connection.close()
except Exception:
pass
try:
connection_wo_db.close()
except Exception:
pass
# print("========Process finished and connection closed.========")
if __name__ == "__main__":
async def main():
# await insert_ms_equipment_data()
# await query_data()
print("insert_actual_data.py is called")
asyncio.run(main())