feat: Implement historical data archiving for equipment and transaction records when the acquisition year is updated.

rest-api
MrWaradana 1 month ago
parent e1db01f9a8
commit 1e27adfa31

@ -513,109 +513,18 @@ async def generate_transaction(
prediction = await predict_main(assetnum=data_in.assetnum, token=token)
eac = eac_main(assetnum=data_in.assetnum)
# # Fetch data from external API
# async def fetch_api_data(assetnum: str, year: int) -> dict:
# async with httpx.AsyncClient() as client:
# try:
# response = await client.get(
# f"{os.environ.get('RELIABILITY_APP_URL')}/main/number-of-failures/{assetnum}/{year}/{year}",
# timeout=30.0,
# headers={"Authorization": f"Bearer {token}"},
# )
# response.raise_for_status()
# return response.json()
# except httpx.HTTPError as e:
# print(f"HTTP error occurred: {e}")
# return {}
# # Initialize base transaction with default values
# base_transaction = {
# "assetnum": data_in.assetnum,
# "is_actual": 0,
# "raw_cm_interval": None,
# "raw_cm_material_cost": None,
# "raw_cm_labor_time": None,
# "raw_cm_labor_human": None,
# "raw_pm_interval": None,
# "raw_pm_material_cost": None,
# "raw_pm_labor_time": None,
# "raw_pm_labor_human": None,
# "raw_predictive_labor_time": None,
# "raw_predictive_labor_human": None,
# "raw_oh_material_cost": None,
# "raw_oh_labor_time": None,
# "raw_oh_labor_human": None,
# "raw_project_task_material_cost": None,
# "raw_loss_output_MW": None,
# "raw_loss_output_price": None,
# "raw_operational_cost": None,
# "raw_maintenance_cost": None,
# "rc_cm_material_cost": None,
# "rc_cm_labor_cost": None,
# "rc_pm_material_cost": None,
# "rc_pm_labor_cost": None,
# "rc_predictive_labor_cost": None,
# "rc_oh_material_cost": None,
# "rc_oh_labor_cost": None,
# "rc_project_material_cost": None,
# "rc_lost_cost": None,
# "rc_operation_cost": None,
# "rc_maintenance_cost": None,
# "rc_total_cost": None,
# "eac_npv": None,
# "eac_annual_mnt_cost": None,
# "eac_annual_acq_cost": None,
# "eac_eac": None,
# }
# transactions = []
# # Query existing records with is_actual=1
# actual_years_query = (
# Select(EquipmentTransactionRecords.tahun)
# .filter(EquipmentTransactionRecords.assetnum == data_in.assetnum)
# .filter(EquipmentTransactionRecords.is_actual == 1)
# )
# result = await db_session.execute(actual_years_query)
# actual_years = {row[0] for row in result.all()}
# for sequence, year in enumerate(
# range(data_in.acquisition_year - 1, data_in.forecasting_target_year + 1), 0
# ):
# # Skip if year already has actual data
# if year in actual_years:
# continue
# transaction = base_transaction.copy()
# # Update values from API
# api_data = await fetch_api_data(data_in.assetnum, year)
# if api_data:
# # # Get current num_fail
# current_num_fail = api_data["data"][0]["num_fail"]
# # # Calculate sum of previous failures for this asset
# # previous_failures_query = (
# # Select(func.sum(EquipmentTransactionRecords.raw_cm_interval))
# # .filter(EquipmentTransactionRecords.assetnum == data_in.assetnum)
# # .filter(EquipmentTransactionRecords.tahun < year)
# # )
# # previous_failures_result = await db_session.execute(previous_failures_query)
# # previous_failures_sum = previous_failures_result.scalar() or 0
# # # Update with current minus sum of previous
# # transaction.update({"raw_cm_interval": current_num_fail - previous_failures_sum})
# transaction.update({"raw_cm_interval": current_num_fail})
# transaction.update({"tahun": int(year), "seq": int(sequence)})
# transactions.append(EquipmentTransactionRecords(**transaction))
# db_session.add_all(transactions)
# await db_session.commit()
# # Return the number of transactions created
# return len(transactions)
return prediction, eac
async def recalculate_transaction(
*, db_session: DbSession, data_in: EquipmentCreate, token
):
"""Recalculate transaction for equipment."""
insert_data = await insert_actual_data(db_session=db_session, data_in=data_in)
prediction = await predict_main(assetnum=data_in.assetnum, token=token)
eac = eac_main(assetnum=data_in.assetnum)
return insert_data, prediction, eac
async def create(*, db_session: DbSession, equipment_in: EquipmentCreate, token):
"""Creates a new document."""
@ -710,7 +619,13 @@ async def update(
create_data = {k: v for k, v in updated_data.items() if k in create_fields}
try:
equipment_update = EquipmentUpdate(**update_data)
await generate_transaction(db_session=db_session, data_in=equipment_update, token=token)
# Check if critical parameters changed
if "acquisition_year" in data or "forecasting_start_year" in data:
print(f"Critical parameter change detected for {equipment_update.assetnum}. Running full recalculation.")
await recalculate_transaction(db_session=db_session, data_in=equipment_update, token=token)
else:
await generate_transaction(db_session=db_session, data_in=equipment_update, token=token)
except Exception as e:
# don't break the update if resimulation fails — log/print for visibility
print(f"Resimulation failed for assetnum {updated_data.get('assetnum')}: {e}")

@ -702,92 +702,6 @@ class Prediksi:
if connection:
connection.close()
async def __update_equipment_acquisition_year(self, assetnum: str):
"""
Update acquisition_year from wo_maximo and set default forecasting_target_year
using raw SQL connections.
"""
try:
# 1. Fetch first acquisition year from wo_maximo (production db)
prod_conn = get_production_connection()
if not prod_conn:
print("Failed to connect to production DB for acquisition year update.")
return None
first_year = None
try:
p_cursor = prod_conn.cursor()
query = """
select DATE_PART('year', a.reportdate) AS year
from wo_maximo a
where a.asset_replacecost > 0
and a.asset_assetnum = %s
order by a.reportdate asc
limit 1;
"""
p_cursor.execute(query, (assetnum,))
res = p_cursor.fetchone()
if res and res[0] is not None:
first_year = int(res[0])
p_cursor.close()
finally:
prod_conn.close()
if not first_year:
# No data, skip update
return None
# 2. Update local DB
conn, _ = get_connection()
if not conn:
return None
updated_acq = None
try:
cursor = conn.cursor(cursor_factory=DictCursor)
# Fetch current values
cursor.execute("SELECT acquisition_year, design_life, forecasting_target_year FROM lcc_ms_equipment_data WHERE assetnum = %s", (assetnum,))
current = cursor.fetchone()
if current:
curr_acq = int(current["acquisition_year"])
curr_life = int(current["design_life"])
curr_target = int(current["forecasting_target_year"])
# Logic: if current_target matches the "old default" (old_acq + life), then update it too.
is_valid_default = (curr_target == (curr_acq + curr_life))
if curr_acq != first_year:
new_target = curr_target
if is_valid_default:
new_target = first_year + curr_life
update_q = """
UPDATE lcc_ms_equipment_data
SET acquisition_year = %s, forecasting_target_year = %s
WHERE assetnum = %s
"""
cursor.execute(update_q, (first_year, new_target, assetnum))
conn.commit()
print(f"Updated acquisition_year for {assetnum}: {curr_acq} -> {first_year}")
updated_acq = first_year
else:
updated_acq = curr_acq
else:
# Logic if equipment not found? Unlikely here.
pass
cursor.close()
finally:
conn.close()
return updated_acq
except Exception as e:
print(f"Error updating acquisition year for {assetnum}: {e}")
return None
async def predict_equipment_data(self, assetnum, token):
try:
# Mengambil data dari database

@ -862,6 +862,178 @@ async def insert_acquisition_cost_data():
except Exception:
pass
async def update_equipment_acquisition_year(assetnum: str):
"""
Update acquisition_year from wo_maximo and set default forecasting_target_year
using raw SQL connections.
Refactored to archive historical data if acquisition year changes.
"""
try:
# 1. Fetch first acquisition year from wo_maximo (production db)
prod_conn = get_production_connection()
if not prod_conn:
print("Failed to connect to production DB for acquisition year update.")
return None
first_year = None
try:
p_cursor = prod_conn.cursor()
query = """
select DATE_PART('year', a.reportdate) AS year
from wo_maximo a
where a.asset_replacecost > 0
and a.asset_assetnum = %s
order by a.reportdate asc
limit 1;
"""
p_cursor.execute(query, (assetnum,))
res = p_cursor.fetchone()
if res and res[0] is not None:
first_year = int(res[0])
p_cursor.close()
finally:
prod_conn.close()
if not first_year:
# No data, skip update
return None
# 2. Update local DB
conn, _ = get_connection()
if not conn:
return None
updated_acq = None
try:
cursor = conn.cursor(cursor_factory=DictCursor)
# Fetch current values
cursor.execute("SELECT * FROM lcc_ms_equipment_data WHERE assetnum = %s", (assetnum,))
current = cursor.fetchone()
if current:
curr_acq = int(current["acquisition_year"])
curr_life = int(current["design_life"])
curr_target_val = current["forecasting_target_year"]
curr_target = int(curr_target_val) if curr_target_val is not None else (curr_acq + curr_life) # Fallback if none
# Logic: if current_target matches the "old default" (old_acq + life), then update it too.
is_valid_default = (curr_target == (curr_acq + curr_life))
if curr_acq != first_year:
print(f"Acquisition year change detected for {assetnum}: {curr_acq} -> {first_year}. Archiving history.")
# Define reference for history
acq_year_ref = f"{curr_acq}_{curr_target}"
# --- ARCHIVE HISTORICAL DATA ---
# 1. Copy old equipment master data to history
history_ms_query = """
INSERT INTO lcc_ms_equipment_historical_data (
id, assetnum, acquisition_year, acquisition_cost, capital_cost_record_time, design_life,
forecasting_start_year, forecasting_target_year, manhours_rate, created_at, created_by,
updated_at, updated_by, min_eac_info, harga_saat_ini, minimum_eac_seq, minimum_eac_year,
minimum_eac, minimum_npv, minimum_pmt, minimum_pmt_aq_cost, minimum_is_actual,
efdh_equivalent_forced_derated_hours, foh_forced_outage_hours, category_no, proportion,
acquisition_year_ref
)
SELECT
uuid_generate_v4(), assetnum, acquisition_year, acquisition_cost, capital_cost_record_time, design_life,
forecasting_start_year, forecasting_target_year, manhours_rate, created_at, created_by,
updated_at, updated_by, min_eac_info, harga_saat_ini, minimum_eac_seq, minimum_eac_year,
minimum_eac, minimum_npv, minimum_pmt, minimum_pmt_aq_cost, minimum_is_actual,
efdh_equivalent_forced_derated_hours, foh_forced_outage_hours, category_no, proportion,
%s
FROM lcc_ms_equipment_data
WHERE assetnum = %s
"""
cursor.execute(history_ms_query, (acq_year_ref, assetnum))
# 2. Copy old transaction data to lcc_equipment_historical_tr_data
# Format: {acquisition_year}_{forecasting_target_year}
history_tr_query = """
INSERT INTO lcc_equipment_historical_tr_data (
id, assetnum, tahun, seq, is_actual,
raw_cm_interval, raw_cm_material_cost, raw_cm_labor_time, raw_cm_labor_human,
raw_pm_interval, raw_pm_material_cost, raw_pm_labor_time, raw_pm_labor_human,
raw_oh_interval, raw_oh_material_cost, raw_oh_labor_time, raw_oh_labor_human,
raw_predictive_interval, raw_predictive_material_cost, raw_predictive_labor_time, raw_predictive_labor_human,
raw_project_task_material_cost, raw_loss_output_MW, raw_loss_output_price,
raw_operational_cost, raw_maintenance_cost,
rc_cm_material_cost, rc_cm_labor_cost,
rc_pm_material_cost, rc_pm_labor_cost,
rc_oh_material_cost, rc_oh_labor_cost,
rc_predictive_labor_cost,
rc_project_material_cost, rc_lost_cost, rc_operation_cost, rc_maintenance_cost,
rc_total_cost,
eac_npv, eac_annual_mnt_cost, eac_annual_acq_cost, eac_disposal_cost, eac_eac,
efdh_equivalent_forced_derated_hours, foh_forced_outage_hours,
created_by, created_at, acquisition_year_ref
)
SELECT
uuid_generate_v4(), assetnum, tahun, seq, is_actual,
raw_cm_interval, raw_cm_material_cost, raw_cm_labor_time, raw_cm_labor_human,
raw_pm_interval, raw_pm_material_cost, raw_pm_labor_time, raw_pm_labor_human,
raw_oh_interval, raw_oh_material_cost, raw_oh_labor_time, raw_oh_labor_human,
raw_predictive_interval, raw_predictive_material_cost, raw_predictive_labor_time, raw_predictive_labor_human,
raw_project_task_material_cost, raw_loss_output_MW, raw_loss_output_price,
raw_operational_cost, raw_maintenance_cost,
rc_cm_material_cost, rc_cm_labor_cost,
rc_pm_material_cost, rc_pm_labor_cost,
rc_oh_material_cost, rc_oh_labor_cost,
rc_predictive_labor_cost,
rc_project_material_cost, rc_lost_cost, rc_operation_cost, rc_maintenance_cost,
rc_total_cost,
eac_npv, eac_annual_mnt_cost, eac_annual_acq_cost, eac_disposal_cost, eac_eac,
efdh_equivalent_forced_derated_hours, foh_forced_outage_hours,
created_by, NOW(), %s
FROM lcc_equipment_tr_data
WHERE assetnum = %s
"""
cursor.execute(history_tr_query, (acq_year_ref, assetnum))
# 3. Delete old data
del_query = "DELETE FROM lcc_equipment_tr_data WHERE assetnum = %s"
cursor.execute(del_query, (assetnum,))
# 4. Update Equipment Master
new_target = curr_target
if is_valid_default:
new_target = first_year + curr_life
update_q = """
UPDATE lcc_ms_equipment_data
SET acquisition_year = %s, forecasting_target_year = %s
WHERE assetnum = %s
"""
cursor.execute(update_q, (first_year, new_target, assetnum))
conn.commit()
print(f"Updated acquisition_year for {assetnum}: {curr_acq} -> {first_year}. History archived.")
updated_acq = first_year
else:
print(f"No acquisition year update needed for {assetnum}. Current: {curr_acq}, New: {first_year}")
updated_acq = curr_acq
else:
# Logic if equipment not found? Unlikely here.
pass
cursor.close()
finally:
conn.close()
return updated_acq
except Exception as e:
print(f"Error updating acquisition year for {assetnum}: {e}")
return None
async def query_data(target_assetnum: str = None):
connection = None
connection_wo_db = None
@ -874,6 +1046,7 @@ async def query_data(target_assetnum: str = None):
print("Database connection failed.")
return
# Membuat cursor menggunakan DictCursor
cursor = connection.cursor(cursor_factory=DictCursor)
cursor_wo = connection_production_wo.cursor(cursor_factory=DictCursor)
@ -973,6 +1146,11 @@ async def query_data(target_assetnum: str = None):
print(f"[{idx}/{total_assets}] Skipping empty assetnum")
continue
# Check if there is acquisition year that need to be updated because of new equipment replacement
if assetnum:
await update_equipment_acquisition_year(assetnum)
forecasting_start_year_db = row.get("forecasting_start_year")
acquisition_year = row.get("acquisition_year")

Loading…
Cancel
Save