You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

388 lines
17 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from psycopg2.extras import DictCursor
import numpy_financial as npf
import json
import sys
import os
from src.modules.config import get_connection
import argparse
class Eac:
def __init__(self):
pass
def __calculate_npv_with_db_inflation_rate(self, equipment_id):
try:
# Mendapatkan koneksi dari config.py
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
if connection is None:
print("Database connection failed.")
return None
# Membuat cursor menggunakan DictCursor
cursor = connection.cursor(cursor_factory=DictCursor)
# Query untuk mendapatkan data2 dasar
query_inflation_rate = """
select
(SELECT value_num / 100 FROM lcc_ms_master WHERE name = 'inflation_rate') as inflation_rate
, (SELECT value_num / 100 FROM lcc_ms_master WHERE name = 'discount_rate') as discount_rate
, (select COALESCE(rc_total_cost,0) from lcc_equipment_tr_data ltd where assetnum = %s and seq = 0) as rc_total_cost_0
, (SELECT value_num / 100 FROM lcc_ms_master WHERE name = 'history_inflation_rate') as history_inflation_rate
, (SELECT value_num / 100 FROM lcc_ms_master WHERE name = 'history_future_inflation_rate_annual') as history_future_inflation_rate_annual
;
"""
cursor.execute(query_inflation_rate, (equipment_id,))
inflation_rate_result = cursor.fetchone()
if not inflation_rate_result:
print("Inflation rate tidak ditemukan.")
return None
inflation_rate = inflation_rate_result["inflation_rate"]
history_inflation_rate = inflation_rate_result["history_inflation_rate"]
history_future_inflation_rate = inflation_rate_result["history_future_inflation_rate_annual"]
disc_rate = inflation_rate_result["discount_rate"]
rc_total_cost_0 = inflation_rate_result["rc_total_cost_0"]
last_seq = 0
last_npv = 0
# Query untuk mendapatkan data dengan seq dan is_actual
query_data_actual = """
SELECT assetnum, tahun, seq, is_actual, rc_total_cost
FROM lcc_equipment_tr_data
WHERE is_actual = 1 AND seq != 0
AND assetnum = %s
ORDER BY seq;
"""
cursor.execute(query_data_actual, (equipment_id,))
data_actual = cursor.fetchall()
# Variabel untuk menyimpan hasil NPV dan PMT per baris
npv_results = []
cumulative_values = [] # Menyimpan nilai kumulatif hingga baris ke-n
# Menghitung NPV dan PMT secara bertahap untuk data aktual
for idx, row in enumerate(data_actual):
cumulative_values.append(row["rc_total_cost"])
# Menghitung NPV menggunakan rumus diskonto
# Rumus NPV: NPV = Σ [Ct / (1 + r)^t]
# dimana Ct = cash flow pada periode t, r = disc_rate, t = periode
final_value = sum(
value / ((1 + history_inflation_rate) ** (i + 1))
for i, value in enumerate(cumulative_values)
)
# Menghitung PMT biaya maintenance
# Rumus PMT: PMT = PV * [r(1 + r)^n] / [(1 + r)^n 1]
# dimana PV = final_value, r = history_future_inflation_rate, n = row["seq"]
pmt_mnt_cost = -npf.pmt(history_future_inflation_rate, row["seq"], final_value)
# Menghitung PMT biaya disposal
# Rumus PMT: PMT = PV * [r(1 + r)^n] / [(1 + r)^n 1]
# dimana PV = 0.05 * rc_total_cost_0, r = disc_rate, n = row["seq"]
# rc_total_cost_0 adalah biaya akuisisi awal (seq = 0)
eac_disposal_cost = -npf.pmt(disc_rate, row["seq"], 0, 0.05 * rc_total_cost_0) if row["seq"] > 0 else 0.0
# Menghitung PMT biaya akuisisi
# Rumus PMT: PMT = PV * [r(1 + r)^n] / [(1 + r)^n 1]
# dimana PV = rc_total_cost_0, r = disc_rate, n = row["seq"]
# rc_total_cost_0 adalah biaya akuisisi awal (seq = 0)
# disc_rate adalah discount rate dari database
# row["seq"] adalah periode ke-n
pmt_aq_cost = -npf.pmt(disc_rate, row["seq"], rc_total_cost_0)
eac = pmt_mnt_cost + pmt_aq_cost
npv_results.append(
{
"seq": row["seq"],
"year": row["tahun"],
"npv": final_value,
"pmt": pmt_mnt_cost,
"pmt_aq_cost": pmt_aq_cost,
"eac": eac,
"is_actual": 1,
}
)
# Update lcc_equipment_tr_data
update_query = """
UPDATE lcc_equipment_tr_data
SET eac_npv = %s, eac_annual_mnt_cost = %s, eac_annual_acq_cost = %s, eac_disposal_cost = %s, eac_eac = %s
, updated_by = 'Sys', updated_at = NOW()
WHERE assetnum = %s AND tahun = %s;
"""
cursor.execute(
update_query,
(
float(final_value),
float(pmt_mnt_cost),
float(pmt_aq_cost),
float(eac_disposal_cost),
float(eac),
equipment_id,
row["tahun"],
),
)
last_seq = row["seq"]
last_npv = float(final_value)
# Commit perubahan
connection.commit()
# Query untuk mendapatkan data dengan seq dan is_actual = 0
query_data_proyeksi = """
SELECT assetnum, tahun, seq, is_actual, rc_total_cost
FROM lcc_equipment_tr_data
WHERE assetnum = %s AND is_actual = 0
ORDER BY seq;
"""
cursor.execute(query_data_proyeksi, (equipment_id,))
data_proyeksi = cursor.fetchall()
cumulative_values = []
# Menghitung NPV dan PMT secara bertahap untuk data proyeksi
# NOTE: sebelumnya kode mencoba menggeser PV proyeksi menggunakan npf.pv + sign flips,
# yang dapat menghasilkan nilai pemeliharaan yang sangat besar (meledak). Sebaiknya hitung
# nilai diskonto dari biaya proyeksi menggunakan offset waktu yang benar (last_seq) dan
# tambahkan ke last_npv. Kemudian hitung pembayaran tahunan tingkat (PMT) selama sisa
# jumlah periode (remaining_periods). Ini menjaga nilai pemeliharaan tahunan proyeksi tetap konsisten dan mencegah lonjakan eksponensial.
for idx, row in enumerate(data_proyeksi):
# Menyimpan nilai kumulatif hingga baris ke-n
cumulative_values.append(row["rc_total_cost"])
# Nilai proyeksi yang didiskontokan menggunakan offset eksponen dari urutan aktual terakhir
# sehingga offset tahun berlanjut dari aktual yang sudah diproses.
# Rumus NPV: NPV = Σ [Ct / (1 + r)^t]
# dimana Ct = cash flow pada periode t, r = disc_rate, t = periode
# value adalah rc_total_cost pada periode t
# 1 + disc_rate ** (last_seq + i + 1) adalah perhitungan diskonto dengan offset waktu
discounted_proj = sum(
(float(value) / ((1 + disc_rate) ** (last_seq + i + 1)))
for i, value in enumerate(cumulative_values)
)
# Total NPV pada titik proyeksi ini = NPV aktual terakhir + biaya proyeksi yang didiskontokan
final_value = float(last_npv) + float(discounted_proj)
# Gunakan seq penuh (jumlah periode dari akuisisi) untuk menghitung pembayaran tahunan pemeliharaan.
# Menggunakan hanya selisih dari seq aktual terakhir
# (sisa_periode) mengamortisasi seluruh nilai sekarang selama
# sejumlah periode yang sangat kecil untuk proyeksi pertama dan menghasilkan lonjakan.
# Menggunakan row["seq"] menjaga periode amortisasi konsisten dengan perhitungan lain
# dan mencegah lonjakan setelah tahun berjalan.
# amortisasi adalah proses pencatatan biaya aset selama masa manfaatnya.
periods = int(row["seq"]) if int(row.get("seq", 0)) > 0 else 1
# Menghitung PMT
# Rumus PMT: PMT = PV * [r(1 + r)^n] / [(1 + r)^n 1]
# dimana PV = final_value, r = disc_rate, n = row["seq"]
# periods adalah jumlah periode
# final_value adalah PV pada titik proyeksi periods
pmt_mnt_cost = -float(npf.pmt(disc_rate, periods, final_value))
eac_disposal_cost_proyeksi = -npf.pmt(disc_rate, row["seq"], 0, 0.05 * rc_total_cost_0) if row["seq"] > 0 else 0.0
# menghitung PMT biaya akuisisi
# Rumus PMT: PMT = PV * [r(1 + r)^n] / [(1 + r)^n 1]
# dimana PV = rc_total_cost_0, r = disc_rate, n = row["seq"]
# rc_total_cost_0 adalah biaya akuisisi awal (seq = 0)
# disc_rate adalah discount rate dari database
# row["seq"] adalah periode ke-n
pmt_aq_cost = -float(npf.pmt(disc_rate, row["seq"], rc_total_cost_0))
eac = float(pmt_mnt_cost) + float(pmt_aq_cost)
npv_results.append(
{
"seq": row["seq"],
"year": row["tahun"],
"npv": final_value,
"pmt": pmt_mnt_cost,
"pmt_aq_cost": pmt_aq_cost,
"eac": eac,
"is_actual": 0,
}
)
# Update lcc_equipment_tr_data
update_query = """
UPDATE lcc_equipment_tr_data
SET eac_npv = %s, eac_annual_mnt_cost = %s, eac_annual_acq_cost = %s, eac_disposal_cost = %s,eac_eac = %s
, updated_by = 'Sys', updated_at = NOW()
WHERE assetnum = %s AND tahun = %s;
"""
cursor.execute(
update_query,
(
float(final_value),
float(pmt_mnt_cost),
float(pmt_aq_cost),
float(eac_disposal_cost_proyeksi),
float(eac),
equipment_id,
row["tahun"],
),
)
# Commit perubahan
connection.commit()
# Menutup koneksi
cursor.close()
connection.close()
# Menampilkan hasil
# for result in npv_results:
# print(
# f"Seq: {result['seq']}, Is Actual: {result['is_actual']}, NPV: {result['npv']:.2f}, PMT: {result['pmt']:.2f}, AQ_COST: {result['pmt_aq_cost']:.2f}, EAC: {result['eac']:.2f}"
# )
return npv_results
except Exception as e:
print("Terjadi kesalahan:", str(e))
# ======================================================================================================================================================
def hitung_eac_equipment(self, assetnum):
try:
# Mendapatkan koneksi dari config.py
connections = get_connection()
connection = (
connections[0] if isinstance(connections, tuple) else connections
)
if connection is None:
print("Database connection failed.")
return None
cursor = connection.cursor(cursor_factory=DictCursor)
rslt = self.__calculate_npv_with_db_inflation_rate(assetnum)
# choose the smallest positive EAC if any exist; otherwise choose a record with EAC == 0 if present;
# as a final fallback choose the overall minimum EAC record
if not rslt:
lowest_eac_record = {
"seq": None,
"year": None,
"eac": 0.0,
"npv": 0.0,
"pmt": 0.0,
"pmt_aq_cost": 0.0,
"is_actual": 0,
}
else:
positives = [r for r in rslt if float(r.get("eac", 0)) > 0]
if positives:
lowest_eac_record = min(positives, key=lambda x: float(x["eac"]))
else:
zeros = [r for r in rslt if float(r.get("eac", 0)) == 0]
if zeros:
# pick one of the zero-eac records (choose smallest npv for determinism)
lowest_eac_record = min(zeros, key=lambda x: float(x.get("npv", 0)))
else:
lowest_eac_record = min(rslt, key=lambda x: float(x.get("eac", 0)))
print(json.dumps(lowest_eac_record))
# Update lcc_equipment_tr_data
update_query = """
UPDATE lcc_ms_equipment_data
SET minimum_eac_seq = %s, minimum_eac_year=%s, minimum_eac=%s, minimum_npv=%s, minimum_pmt=%s, minimum_pmt_aq_cost=%s, minimum_is_actual=%s, updated_by = 'Sys', updated_at = NOW()
WHERE assetnum = %s;
"""
cursor.execute(
update_query,
(
lowest_eac_record["seq"],
lowest_eac_record["year"],
float(lowest_eac_record["eac"]),
float(lowest_eac_record["npv"]),
float(lowest_eac_record["pmt"]),
float(lowest_eac_record["pmt_aq_cost"]),
lowest_eac_record["is_actual"],
assetnum,
),
)
connection.commit()
cursor.close()
connection.close()
return lowest_eac_record
except Exception as e:
print("Terjadi kesalahan saat memproses semua equipment:", str(e))
def main(assetnum=None):
"""
Process EAC calculations. If assetnum is provided (string), process only that asset;
otherwise process all equipment. Returns list of processed asset numbers.
Raises RuntimeError if database connection cannot be established.
"""
connections = get_connection()
connection = connections[0] if isinstance(connections, tuple) else connections
if connection is None:
raise RuntimeError("Database connection failed.")
cursor = connection.cursor(cursor_factory=DictCursor)
processed = []
try:
if assetnum:
query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master WHERE assetnum = %s"
cursor.execute(query_main, (assetnum,))
else:
query_main = "SELECT DISTINCT(assetnum) FROM ms_equipment_master"
cursor.execute(query_main)
results = cursor.fetchall()
eac = Eac()
for row in results:
try:
row_assetnum = row["assetnum"]
except Exception:
row_assetnum = row[0] if len(row) > 0 else None
if row_assetnum is None:
print("Skipping None assetnum")
continue
print(f"EAC Calculation asset: {row_assetnum}")
eac.hitung_eac_equipment(row_assetnum)
processed.append(row_assetnum)
print("EAC calculation finished for processed equipment.")
return processed
except Exception as e:
print("Terjadi kesalahan saat memproses semua equipment:", str(e))
raise
finally:
try:
cursor.close()
except Exception:
pass
try:
connection.close()
except Exception:
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run EAC calculation for equipment.")
parser.add_argument("--assetnum", type=str, help="Asset number to process (optional). If not provided, process all.")
args = parser.parse_args()
try:
processed = main(assetnum=args.assetnum)
print(f"EAC calculation finished for: {processed}")
except Exception as e:
print("Terjadi kesalahan saat memproses semua equipment:", str(e))