simulation service

main
MrWaradana 1 month ago
parent 2926794fbe
commit e0feb43459

@ -9,6 +9,7 @@ from src.auth.service import JWTBearer
from src.masterdata.router import router as masterdata_router
from src.masterdata_simulations.router import router as masterdata_simulations_router
from src.plant_masterdata.router import router as plant_masterdata
from src.plant_transaction_data.router import router as plant_transaction_data
from src.plant_transaction_data_simulations.router import router as plant_transaction_data_simulations
@ -17,6 +18,7 @@ from src.acquisition_cost.router import router as acquisition_data_router
from src.yeardata.router import router as yeardata_router
from src.equipment_master.router import router as equipment_master_router
from src.uploaded_file.router import router as uploaded_file_router
from src.simulations.router import router as simulations_router
class ErrorMessage(BaseModel):
@ -57,6 +59,18 @@ authenticated_api_router.include_router(
masterdata_router, prefix="/masterdata", tags=["masterdata"]
)
authenticated_api_router.include_router(
masterdata_simulations_router,
prefix="/masterdata-simulations",
tags=["masterdata_simulations"],
)
authenticated_api_router.include_router(
simulations_router,
prefix="/simulations",
tags=["simulations"],
)
authenticated_api_router.include_router(
plant_masterdata, prefix="/plant-masterdata", tags=["plant_masterdata"]
)

@ -0,0 +1,3 @@
from .router import router
__all__ = ["router"]

@ -0,0 +1,26 @@
from sqlalchemy import Column, Float, ForeignKey, Integer, String
from sqlalchemy.dialects.postgresql import UUID as PGUUID
from sqlalchemy.orm import relationship
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin
class MasterDataSimulation(Base, DefaultMixin, IdentityMixin):
__tablename__ = "lcc_ms_master_simulations"
simulation_id = Column(
PGUUID(as_uuid=True), ForeignKey("lcc_simulations.id"), nullable=False
)
name = Column(String, nullable=True)
description = Column(String, nullable=True)
unit_of_measurement = Column(String, nullable=True)
value_num = Column(Float, nullable=True)
value_str = Column(String, nullable=True)
seq = Column(Integer, nullable=True)
simulation = relationship(
"Simulation",
back_populates="masterdata_entries",
lazy="joined",
)

@ -0,0 +1,147 @@
from typing import List, Optional
from uuid import UUID
from fastapi import APIRouter, HTTPException, Query, status
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters
from src.masterdata_simulations.schema import (
BulkMasterDataSimulationUpdate,
MasterDataSimulationCreate,
MasterDataSimulationPagination,
MasterDataSimulationRead,
MasterDataSimulationUpdate,
)
from src.masterdata_simulations.service import (
bulk_update,
create,
delete,
get,
get_all,
update,
)
from src.models import StandardResponse
router = APIRouter()
@router.get("", response_model=StandardResponse[MasterDataSimulationPagination])
async def get_masterdata_simulations(
db_session: DbSession,
common: CommonParameters,
simulation_id: UUID = Query(..., description="Simulation identifier"),
items_per_page: Optional[int] = Query(5),
search: Optional[str] = Query(None),
):
master_datas = await get_all(
db_session=db_session,
items_per_page=items_per_page,
simulation_id=simulation_id,
search=search,
common=common,
)
return StandardResponse(data=master_datas, message="Data retrieved successfully")
@router.get("/{masterdata_id}", response_model=StandardResponse[MasterDataSimulationRead])
async def get_masterdata_simulation(db_session: DbSession, masterdata_id: str):
masterdata = await get(db_session=db_session, masterdata_id=masterdata_id)
if not masterdata:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(data=masterdata, message="Data retrieved successfully")
@router.post("", response_model=StandardResponse[MasterDataSimulationRead])
async def create_masterdata_simulation(
db_session: DbSession,
masterdata_in: MasterDataSimulationCreate,
current_user: CurrentUser,
):
masterdata_in.created_by = current_user.name
masterdata = await create(db_session=db_session, masterdata_in=masterdata_in)
return StandardResponse(data=masterdata, message="Data created successfully")
@router.put("/bulk", response_model=StandardResponse[List[MasterDataSimulationRead]])
async def bulk_update_masterdata_simulation(
db_session: DbSession,
data: BulkMasterDataSimulationUpdate,
current_user: CurrentUser,
):
updates: List[MasterDataSimulationUpdate] = []
ids: List[str] = []
for item in data.updates:
payload = item.copy()
masterdata_id = payload.pop("id", None)
if masterdata_id is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Each update entry must include an id field.",
)
masterdata_id = str(masterdata_id)
update_obj = MasterDataSimulationUpdate(**payload, updated_by=current_user.name)
updates.append(update_obj)
ids.append(masterdata_id)
if not ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No update payload supplied.",
)
return StandardResponse(
data=await bulk_update(
db_session=db_session,
updates=updates,
ids=ids,
simulation_id=data.simulation_id,
),
message="Data updated successfully",
)
@router.put("/{masterdata_id}", response_model=StandardResponse[MasterDataSimulationRead])
async def update_masterdata_simulation(
db_session: DbSession,
masterdata_id: str,
masterdata_in: MasterDataSimulationUpdate,
current_user: CurrentUser,
):
masterdata = await get(db_session=db_session, masterdata_id=masterdata_id)
if not masterdata:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
masterdata_in.updated_by = current_user.name
return StandardResponse(
data=await update(
db_session=db_session,
masterdata=masterdata,
masterdata_in=masterdata_in,
),
message="Data updated successfully",
)
@router.delete("/{masterdata_id}", response_model=StandardResponse[MasterDataSimulationRead])
async def delete_masterdata_simulation(db_session: DbSession, masterdata_id: str):
masterdata = await get(db_session=db_session, masterdata_id=masterdata_id)
if not masterdata:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=[{"msg": "A data with this id does not exist."}],
)
await delete(db_session=db_session, masterdata_id=masterdata_id)
return StandardResponse(message="Data deleted successfully", data=masterdata)

@ -0,0 +1,40 @@
from typing import List, Optional
from uuid import UUID
from pydantic import Field
from src.masterdata.schema import MasterdataBase
from src.models import DefaultBase, Pagination
class MasterDataSimulationBase(MasterdataBase):
simulation_id: Optional[UUID] = Field(None, nullable=True)
class MasterDataSimulationCreate(MasterDataSimulationBase):
simulation_id: UUID = Field(..., nullable=False)
name: str = Field(..., nullable=True)
description: str = Field(..., nullable=True)
unit_of_measurement: str = Field(..., nullable=True)
value_num: float = Field(
..., nullable=True, le=1_000_000_000_000_000
)
value_str: str = Field(..., nullable=True)
seq: int = Field(..., nullable=True)
class MasterDataSimulationUpdate(MasterDataSimulationBase):
pass
class BulkMasterDataSimulationUpdate(DefaultBase):
simulation_id: UUID = Field(..., nullable=False)
updates: List[dict]
class MasterDataSimulationRead(MasterDataSimulationBase):
id: UUID
class MasterDataSimulationPagination(Pagination):
items: List[MasterDataSimulationRead] = []

@ -0,0 +1,300 @@
import asyncio
import os
from subprocess import PIPE
from typing import Dict, List, Optional
from uuid import UUID
from sqlalchemy import Delete, Select
from src.database.core import DbSession
from src.database.service import search_filter_sort_paginate
from .model import MasterDataSimulation
from .schema import MasterDataSimulationCreate, MasterDataSimulationUpdate
MASTERDATA_SIM_ATTR_FIELDS = {
"name",
"description",
"unit_of_measurement",
"value_num",
"value_str",
"created_by",
"updated_by",
}
async def _apply_masterdata_simulation_update_logic(
*,
db_session: DbSession,
masterdata: MasterDataSimulation,
masterdata_in: MasterDataSimulationUpdate,
records_by_name: Dict[str, MasterDataSimulation],
simulation_id: UUID,
):
"""Mirror the update behaviour from src.masterdata.service for simulations."""
update_data = masterdata_in.model_dump(exclude_defaults=True)
async def get_value(name: str) -> float:
record = records_by_name.get(name)
if record is not None and record.value_num is not None:
return record.value_num
query_val = (
Select(MasterDataSimulation)
.where(MasterDataSimulation.simulation_id == simulation_id)
.where(MasterDataSimulation.name == name)
)
res_val = await db_session.execute(query_val)
row = res_val.scalars().one_or_none()
if row:
records_by_name[row.name] = row
return row.value_num if row.value_num is not None else 0
return 0
run_plant_calculation = False
def flag_special(record: MasterDataSimulation):
"""Track when special masterdata rows change to trigger recalculation."""
nonlocal run_plant_calculation
rec_name = getattr(record, "name", None)
if rec_name in [
"umur_teknis",
"discount_rate",
"loan_portion",
"interest_rate",
"loan_tenor",
"corporate_tax_rate",
"wacc_on_equity",
"auxiliary",
"susut_trafo",
"sfc",
"electricity_price_a",
"electricity_price_b",
"electricity_price_c",
"electricity_price_d",
"harga_bahan_bakar",
"inflation_rate",
"loan",
"wacc_on_project",
"principal_interest_payment",
"equity",
]:
run_plant_calculation = True
for field, val in update_data.items():
if field in MASTERDATA_SIM_ATTR_FIELDS:
setattr(masterdata, field, val)
flag_special(masterdata)
else:
query_other = (
Select(MasterDataSimulation)
.where(MasterDataSimulation.simulation_id == simulation_id)
.where(MasterDataSimulation.name == field)
)
res_other = await db_session.execute(query_other)
other = res_other.scalars().one_or_none()
if other:
if isinstance(val, (int, float)):
other.value_num = val
flag_special(other)
else:
other.value_str = str(val)
if other.name:
records_by_name[other.name] = other
if "loan_portion" in update_data:
equity_portion = 100 - await get_value("loan_portion")
setattr(masterdata, "equity_portion", equity_portion)
total_project_cost = await get_value("total_project_cost")
loan = total_project_cost * (await get_value("loan_portion") / 100)
setattr(masterdata, "loan", loan)
equity = total_project_cost * (equity_portion / 100)
setattr(masterdata, "equity", equity)
if any(field in update_data for field in ["loan", "interest_rate", "loan_tenor"]):
pmt = calculate_pmt(
rate=await get_value("interest_rate"),
nper=await get_value("loan_tenor"),
pv=await get_value("loan"),
)
setattr(masterdata, "principal_interest_payment", pmt)
if any(
field in update_data
for field in [
"loan_portion",
"interest_rate",
"corporate_tax_rate",
"wacc_on_equity",
"equity_portion",
]
):
wacc = (
await get_value("loan_portion")
* (
await get_value("interest_rate")
* (1 - await get_value("corporate_tax_rate"))
)
) + (await get_value("wacc_on_equity") * await get_value("equity_portion"))
setattr(masterdata, "wacc_on_project", wacc)
return masterdata, run_plant_calculation
async def _trigger_masterdata_simulation_recalculation(
*, db_session: DbSession, run_plant_calculation_change: bool = False
):
if not run_plant_calculation_change:
return
try:
directory_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../modules/plant")
)
script_path = os.path.join(directory_path, "run2.py")
process = await asyncio.create_subprocess_exec(
"python",
script_path,
stdout=PIPE,
stderr=PIPE,
cwd=directory_path,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
print(f"Plant recalc error: {stderr.decode()}")
else:
print(f"Plant recalc output: {stdout.decode()}")
except Exception as exc:
print(f"Error during simulation masterdata recalculation: {exc}")
def calculate_pmt(rate, nper, pv):
rate = float(rate) / 100 if rate > 1 else float(rate)
if rate == 0:
return -pv / nper
return -pv * (rate * (1 + rate) ** nper) / ((1 + rate) ** nper - 1)
async def get(
*, db_session: DbSession, masterdata_id: str
) -> Optional[MasterDataSimulation]:
query = Select(MasterDataSimulation).where(MasterDataSimulation.id == masterdata_id)
result = await db_session.execute(query)
return result.scalars().one_or_none()
async def get_all(
*,
db_session: DbSession,
items_per_page: int,
simulation_id: UUID,
search: Optional[str],
common,
):
query = (
Select(MasterDataSimulation)
.where(MasterDataSimulation.simulation_id == simulation_id)
.order_by(MasterDataSimulation.seq.asc())
)
if search:
query = query.filter(MasterDataSimulation.name.ilike(f"%{search}%"))
common["items_per_page"] = items_per_page
return await search_filter_sort_paginate(model=query, **common)
async def create(*, db_session: DbSession, masterdata_in: MasterDataSimulationCreate):
masterdata = MasterDataSimulation(**masterdata_in.model_dump())
db_session.add(masterdata)
await db_session.commit()
return masterdata
async def update(
*,
db_session: DbSession,
masterdata: MasterDataSimulation,
masterdata_in: MasterDataSimulationUpdate,
):
records_by_name: Dict[str, MasterDataSimulation] = {}
if masterdata.name:
records_by_name[masterdata.name] = masterdata
_, run_plant_calculation = await _apply_masterdata_simulation_update_logic(
db_session=db_session,
masterdata=masterdata,
masterdata_in=masterdata_in,
records_by_name=records_by_name,
simulation_id=masterdata.simulation_id,
)
await db_session.commit()
await _trigger_masterdata_simulation_recalculation(
db_session=db_session,
run_plant_calculation_change=run_plant_calculation,
)
return masterdata
async def bulk_update(
*,
db_session: DbSession,
updates: List[MasterDataSimulationUpdate],
ids: List[str],
simulation_id: UUID,
) -> List[MasterDataSimulation]:
query = (
Select(MasterDataSimulation)
.where(MasterDataSimulation.id.in_(ids))
.where(MasterDataSimulation.simulation_id == simulation_id)
)
result = await db_session.execute(query)
records = result.scalars().all()
records_map = {str(record.id): record for record in records}
records_by_name = {record.name: record for record in records if record.name}
run_plant_calculation_change = False
updated_records: List[MasterDataSimulation] = []
for masterdata_id, masterdata_in in zip(ids, updates):
masterdata = records_map.get(masterdata_id)
if not masterdata:
continue
_, run_plant_calculation = await _apply_masterdata_simulation_update_logic(
db_session=db_session,
masterdata=masterdata,
masterdata_in=masterdata_in,
records_by_name=records_by_name,
simulation_id=simulation_id,
)
if run_plant_calculation:
run_plant_calculation_change = True
updated_records.append(masterdata)
await db_session.commit()
await _trigger_masterdata_simulation_recalculation(
db_session=db_session,
run_plant_calculation_change=run_plant_calculation_change,
)
return updated_records
async def delete(*, db_session: DbSession, masterdata_id: str):
query = Delete(MasterDataSimulation).where(MasterDataSimulation.id == masterdata_id)
await db_session.execute(query)
await db_session.commit()

@ -0,0 +1,656 @@
import os
import sys
# Tambah path ke config.py (seperti di kode-kode kamu sebelumnya)
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from config import get_connection # harus mengembalikan koneksi psycopg2
from math import pow
import numpy as np
import numpy_financial as npf
import math
import uuid
SIMULATION_ID = os.getenv("PLANT_SIMULATION_ID")
MASTER_TABLE = "lcc_ms_masterq_simulations"
PLANT_TABLE = "lcc_plant_tr_data_simulations"
def validate_number(n):
return n if n is not None else 0
def normalize_db_value(value):
"""Convert numpy scalars to native Python types for psycopg2."""
if isinstance(value, np.generic):
return value.item()
return value
def cumulative_npv(values, rate, initial_cf0=0.0):
"""
Penggunaan:
discount_rate = 0.12 # setara Params!C2
cashflows = [10000, 15000, 20000, 18000]
result = cumulative_npv(cashflows, discount_rate, initial_cf0=0)
"""
cumulative_results = []
running_npv = 0.0
for i, cf in enumerate(values, start=1):
running_npv += cf / pow(1 + rate, i)
cumulative_results.append(initial_cf0 + running_npv)
return cumulative_results
def pmt_excel_style(rate, periods, pv):
"""
Fungsi ini menghasilkan nilai setara Excel:
=-PMT(rate, periods, pv)
rate : discount_rate (contoh: 0.12)
periods : jumlah periode (contoh: 1,2,3,... seperti E2)
pv : present value (contoh: E17 hasil NPV cumulative)
Output : nilai positif seperti yang muncul di Excel
"""
if periods <= 0:
return 0
# Jika rate == 0, maka PMT hanya pembagian sederhana
if rate == 0:
return pv / periods
# Rumus Excel PMT:
# PMT = pv * (rate / (1 - (1 + rate)^(-periods)))
payment = pv * (rate / (1 - pow(1 + rate, -periods)))
# Excel memberi hasil negatif, tapi rumusmu pakai -PMT, maka hasilnya positif
return abs(payment)
def hitung_pv(rate, nper, fv):
pv = npf.pv(rate, nper, pmt=0, fv=fv)
return -pv
def hitung_irr(cashflows: list):
return npf.irr(cashflows)
def main():
if not SIMULATION_ID:
print("Environment variable PLANT_SIMULATION_ID is required for simulations.")
sys.exit(1)
connections = get_connection()
conn = connections[0] if isinstance(connections, tuple) else connections
if conn is None:
print("Koneksi ke database gagal.")
sys.exit(1)
try:
# ### LOCKING: pastikan transaksi manual (non-autocommit)
try:
conn.autocommit = False
except Exception:
# Kalau driver tidak punya autocommit, abaikan
pass
cur = conn.cursor()
# ### LOCKING: kunci tabel simulasi
# Mode SHARE ROW EXCLUSIVE:
# - Menghalangi INSERT/UPDATE/DELETE di tabel ini
# - Menghalangi lock SHARE ROW EXCLUSIVE lain → script ngantri satu per satu
cur.execute(f"LOCK TABLE {PLANT_TABLE} IN SHARE ROW EXCLUSIVE MODE")
# 0 Mendapatkan master parameter dari tabel lcc_ms_master
cur.execute(
f"""
SELECT name,
value_num AS value
FROM {MASTER_TABLE}
WHERE simulation_id = %s
""",
(SIMULATION_ID,),
)
param_rows = cur.fetchall()
param_map = {name: val for (name, val) in param_rows}
# helper biar aman
def get_param(name, default=0.0):
v = param_map.get(name, default)
return float(v) if v is not None else float(default)
# 0-1 Generate New data Projection (is_actual=0) if not exist
# Hapus data projection lama (is_actual = 0)
cur.execute(
f"""
DELETE
FROM {PLANT_TABLE}
WHERE is_actual = 0
AND simulation_id = %s
""",
(SIMULATION_ID,),
)
# Hitung kebutuhan jumlah baris projection baru agar total (actual + projection)
# sama dengan parameter umur_teknis
cur.execute(
f"""
SELECT COALESCE(COUNT(*), 0)
FROM {PLANT_TABLE}
WHERE is_actual = 1
AND simulation_id = %s
""",
(SIMULATION_ID,),
)
count_actual = cur.fetchone()[0] if cur.rowcount != -1 else 0
umur_teknis = int(get_param("umur_teknis"))
proj_needed = max(0, umur_teknis - int(count_actual))
# Ambil seq dan tahun terakhir sebagai titik awal penomoran berikutnya
cur.execute(
f"SELECT COALESCE(MAX(seq), 0) FROM {PLANT_TABLE} WHERE simulation_id = %s",
(SIMULATION_ID,),
)
last_seq = int(cur.fetchone()[0])
cur.execute(
f"SELECT COALESCE(MAX(tahun), 0) FROM {PLANT_TABLE} WHERE simulation_id = %s",
(SIMULATION_ID,),
)
last_year = int(cur.fetchone()[0])
# Jika belum ada tahun sama sekali, gunakan tahun_cod-1 sebagai dasar
if last_year == 0:
try:
last_year = int(get_param("tahun_cod")) - 1
except Exception:
last_year = 0
if proj_needed > 0:
# Siapkan rows untuk INSERT projection baru
values = []
next_seq = last_seq + 1
next_year = last_year + 1
for _ in range(proj_needed):
values.append((str(uuid.uuid4()), next_seq, next_year))
next_seq += 1
next_year += 1
insert_sql = (
f"INSERT INTO {PLANT_TABLE} (id, seq, tahun, is_actual, simulation_id, created_at, created_by) "
"VALUES (%s, %s, %s, 0, %s, CURRENT_TIMESTAMP, 'SYS')"
)
sim_values = [(*val, SIMULATION_ID) for val in values]
cur.executemany(insert_sql, sim_values)
# 1. Ambil data awal
select_sql = f"""
SELECT *
FROM {PLANT_TABLE}
WHERE simulation_id = %s
ORDER BY seq \
"""
cur.execute(select_sql, (SIMULATION_ID,))
col_names = [desc[0] for desc in cur.description]
rows = cur.fetchall()
print(f"Jumlah baris yang akan di-update: {len(rows)}")
# 2. Siapkan data untuk bulk UPDATE
update_sql = f"""
UPDATE {PLANT_TABLE}
SET net_capacity_factor = %s,
eaf = %s,
production_bruto = %s,
production_netto = %s,
energy_sales = %s,
fuel_consumption = %s,
revenue_a = %s,
revenue_b = %s,
revenue_c = %s,
revenue_d = %s,
revenue_total = %s,
revenue_pv = %s,
revenue_annualized = %s,
cost_a_replacement = %s,
cost_a_pm = %s,
cost_a_acquisition = %s,
cost_a_pinjaman = %s,
cost_a_depreciation = %s,
cost_a_total = %s,
cost_a_pv = %s,
cost_a_annualized = %s,
cost_c_fuel = %s,
cost_c_pv = %s,
cost_c_annualized = %s,
cost_bd_om = %s,
cost_bd_pm_nonmi = %s,
cost_bd_bd = %s,
cost_bd_total = %s,
cost_bd_pv = %s,
cost_bd_annualized = %s,
total_expense = %s,
total_cost_eac = %s,
total_profit_loss = %s,
total_residual_value = %s,
calc_depreciation = %s,
calc_interest_payment = %s,
calc_principal_payment = %s,
calc_dept_amount = %s,
calc2_ebitda = %s,
calc2_earning_before_tax = %s,
calc2_tax = %s,
calc2_earning_after_tax = %s,
calc2_nopat = %s,
calc3_interest_after_tax = %s,
calc3_free_cash_flow_on_project = %s,
calc3_discounted_fcf_on_project = %s,
calc4_principal_repayment = %s,
calc4_free_cash_flow_on_equity = %s,
calc4_discounted_fcf_on_equity = %s,
chart_total_revenue = %s,
chart_revenue_a = %s,
chart_revenue_b = %s,
chart_revenue_c = %s,
chart_revenue_d = %s,
chart_revenue_annualized = %s,
chart_fuel_cost_component_c = %s,
chart_fuel_cost = %s,
chart_fuel_cost_annualized = %s,
chart_oem_component_bd = %s,
chart_oem_bd_cost = %s,
chart_oem_periodic_maintenance_cost = %s,
chart_oem_annualized = %s,
chart_capex_component_a = %s,
chart_capex_biaya_investasi_tambahan = %s,
chart_capex_acquisition_cost = %s,
chart_capex_annualized = %s
WHERE seq = %s
AND simulation_id = %s \
"""
# Ambil parameter dari tabel (fungsi get_param sudah kamu buat sebelumnya)
discount_rate = get_param("discount_rate") / 100
total_project_cost = get_param("total_project_cost")
daya_mampu_netto = get_param("daya_mampu_netto")
auxiliary = get_param("auxiliary")
susut_trafo = get_param("susut_trafo")
sfc = get_param("sfc")
# Harga listrik berdasarkan tipe
price_a = get_param("electricity_price_a")
price_b = get_param("electricity_price_b")
price_c = get_param("electricity_price_c")
price_d = get_param("electricity_price_d")
# Parameter lain
harga_bahan_bakar = get_param("harga_bahan_bakar")
inflation_rate = get_param("inflation_rate") / 100
loan_portion = get_param("loan_portion") / 100
equity_portion = get_param("equity_portion") / 100
interest_rate = get_param("interest_rate") / 100
loan_tenor = get_param("loan_tenor")
loan = get_param("loan")
corporate_tax_rate = get_param("corporate_tax_rate") / 100
wacc_on_equity = get_param("wacc_on_equity") / 100
wacc_on_project = get_param("wacc_on_project") / 100
manhours_rate = get_param("manhours_rate")
principal_interest_payment = get_param("principal_interest_payment")
umur_teknis = get_param("umur_teknis")
tahun_cod = get_param("tahun_cod")
daya_terpasang = get_param("daya_terpasang")
equity = get_param("equity")
params = []
revenue_total_array = []
cost_a_acquisition_array = []
cost_c_fuel_array = []
cost_bd_total_array = []
total_residual_value = 0 # nilai awal dari total_residual_value
calc_dept_amount = 0 # nilai awal dari calc_dept_amount
revenue_total_start = 0 # nilai awal dari revenue_total_start
calc4_free_cash_flow_on_equity = 0 # nilai awal dari calc4_free_cash_flow_on_equity
calc3_free_cash_flow_on_project_array = []
calc4_free_cash_flow_on_equity_array = []
total_residual_value_array = []
calc2_earning_after_tax_array = []
total_residual_value_array_sampai_sekarang = []
calc2_earning_after_tax_array_sampai_sekarang = []
net_capacity_factor = 0
eaf = 0
cost_bd_om = 0
cost_bd_pm_nonmi = 0
cost_bd_bd = 0
cost_a_replacement = 0
cost_a_pm = 0
cost_a_pinjaman = 0
cost_a_depreciation = 0
for row in rows:
# row adalah tuple sesuai urutan select_sql
data = dict(zip(col_names, row))
seq = data["seq"] # primary key / unique key untuk WHERE
if data["is_actual"] == 1:
net_capacity_factor = validate_number(data["net_capacity_factor"])
eaf = validate_number(data["eaf"])
production_bruto = validate_number(data["production_bruto"])
production_netto = validate_number(data["production_netto"])
energy_sales = production_netto
fuel_consumption = validate_number(data["fuel_consumption"])
revenue_a = validate_number(data["revenue_a"])
revenue_b = validate_number(data["revenue_b"])
revenue_c = validate_number(data["revenue_c"])
revenue_d = validate_number(data["revenue_d"])
cost_c_fuel = validate_number(data["cost_c_fuel"])
cost_bd_om = validate_number(data["cost_bd_om"])
cost_bd_pm_nonmi = validate_number(data["cost_bd_pm_nonmi"])
cost_bd_bd = validate_number(data["cost_bd_bd"])
else:
net_capacity_factor = net_capacity_factor # last value
eaf = eaf # last value
production_netto = net_capacity_factor * 8760 * daya_mampu_netto / 100
production_bruto = production_netto / (100 - (auxiliary + susut_trafo)) * 100
energy_sales = production_netto
fuel_consumption = production_bruto * sfc
revenue_a = (price_a * eaf * daya_mampu_netto * 1000 * 12 / 100) / 1000000
revenue_b = (price_b * eaf * daya_mampu_netto * 1000 * 12 / 100) / 1000000
revenue_c = price_c * production_netto * 1000 / 1000000
revenue_d = price_d * production_netto * 1000 / 1000000
cost_c_fuel = fuel_consumption * harga_bahan_bakar / 1000000
cost_bd_om = cost_bd_om # last value
cost_bd_pm_nonmi = cost_bd_pm_nonmi # last value
cost_bd_bd = cost_bd_bd # last value
# ++++++ REVENUE +++++++
revenue_total = revenue_a + revenue_b + revenue_c + revenue_d
if seq > 0:
revenue_total_array.append(revenue_total)
revenue_pv = cumulative_npv(revenue_total_array, discount_rate)[-1] + revenue_total_start
revenue_annualized = pmt_excel_style(discount_rate, seq, revenue_pv)
else:
revenue_annualized = 0
revenue_pv = 0
revenue_total_start = revenue_total
# print(revenue_total_array)
# print(discount_rate)
# print(revenue_pv)
chart_total_revenue = revenue_total
chart_revenue_a = revenue_a
chart_revenue_b = revenue_b
chart_revenue_c = revenue_c
chart_revenue_d = revenue_d
chart_revenue_annualized = revenue_annualized
# ===== COST A =====
if seq > 0:
if data["is_actual"] == 1:
cost_a_replacement = validate_number(data["cost_a_replacement"])
cost_a_pm = validate_number(data["cost_a_pm"])
cost_a_pinjaman = 0 # validate_number(data["cost_a_pinjaman"])
cost_a_depreciation = 0 # validate_number(data["cost_a_depreciation"])
else:
cost_a_replacement = cost_a_replacement
cost_a_pm = cost_a_pm
cost_a_pinjaman = 0 # cost_a_pinjaman
cost_a_depreciation = 0 # cost_a_depreciation
cost_a_total = validate_number(data["cost_a_total"])
cost_a_acquisition = (
cost_a_replacement
+ cost_a_pm
# + cost_a_pinjaman
# + cost_a_depreciation
)
else:
cost_a_replacement = 0
cost_a_pm = 0
cost_a_pinjaman = 0
cost_a_depreciation = 0
cost_a_total = 0
cost_a_acquisition = total_project_cost
if seq > 0:
cost_a_acquisition_array.append(cost_a_acquisition)
cost_a_pv = cumulative_npv(cost_a_acquisition_array, discount_rate)[-1] + total_project_cost
cost_a_annualized = pmt_excel_style(discount_rate, seq, cost_a_pv)
chart_capex_component_a = cost_a_acquisition
chart_capex_annualized = cost_a_annualized
else:
chart_capex_component_a = total_project_cost
chart_capex_annualized = 0
cost_a_pv = 0
cost_a_annualized = 0
chart_capex_biaya_investasi_tambahan = 0
chart_capex_acquisition_cost = 0
# ===== COST C =====
cost_c_fuel_start = 0
if seq > 0:
cost_c_fuel_array.append(cost_c_fuel)
cost_c_pv = cumulative_npv(cost_c_fuel_array, discount_rate)[-1] + cost_c_fuel_start
cost_c_annualized = pmt_excel_style(discount_rate, seq, cost_c_pv)
else:
cost_c_fuel_start = cost_c_fuel
cost_c_pv = 0
cost_c_annualized = 0
chart_fuel_cost_component_c = cost_c_fuel
chart_fuel_cost = cost_c_fuel
chart_fuel_cost_annualized = cost_c_annualized
# ===== COST BD =====
cost_bd_total_start = 0
if seq > 0:
cost_bd_total = cost_bd_om + cost_bd_pm_nonmi + cost_bd_bd
cost_bd_total_array.append(cost_bd_total)
cost_bd_pv = cumulative_npv(cost_bd_total_array, discount_rate)[-1] + cost_bd_total_start
cost_bd_annualized = pmt_excel_style(discount_rate, seq, cost_bd_pv)
else:
cost_bd_total = 0
cost_bd_total_start = cost_bd_om + cost_bd_pm_nonmi
cost_bd_pv = 0
cost_bd_annualized = 0
chart_oem_component_bd = cost_bd_total
chart_oem_bd_cost = cost_bd_om
chart_oem_periodic_maintenance_cost = cost_bd_pm_nonmi
chart_oem_annualized = cost_bd_annualized
# ===== TOTAL EXPENSE & PROFIT/LOSS =====
if seq > 0:
calc_depreciation = total_residual_value / (umur_teknis - seq + 1)
total_residual_value = total_residual_value + cost_a_replacement - calc_depreciation
calc_interest_payment = interest_rate * calc_dept_amount
calc_principal_payment = principal_interest_payment - calc_interest_payment
calc_dept_amount = calc_dept_amount - calc_principal_payment
else:
calc_depreciation = 0
total_residual_value = total_project_cost
calc_interest_payment = 0
calc_principal_payment = 0
calc_dept_amount = loan
total_residual_value_array.append(total_residual_value)
if data["is_actual"] == 1:
total_residual_value_array_sampai_sekarang.append(total_residual_value)
total_expense = cost_c_fuel + cost_bd_total
total_cost_eac = cost_a_annualized + cost_c_annualized + cost_bd_annualized
total_profit_loss = revenue_annualized - total_cost_eac
calc2_ebitda = revenue_total - total_expense
calc2_earning_before_tax = calc2_ebitda - cost_a_depreciation - calc_interest_payment
calc2_tax = calc2_earning_before_tax * corporate_tax_rate if calc2_earning_before_tax > 0 else 0
calc2_earning_after_tax = calc2_earning_before_tax - calc2_tax
calc2_earning_after_tax_array.append(calc2_earning_after_tax)
if data["is_actual"] == 1: calc2_earning_after_tax_array_sampai_sekarang.append(calc2_earning_after_tax)
calc3_interest_after_tax = calc_interest_payment * (1 - corporate_tax_rate)
calc2_nopat = calc2_earning_before_tax - calc3_interest_after_tax
if seq > 0:
calc3_free_cash_flow_on_project = calc2_earning_after_tax + calc3_interest_after_tax + calc_depreciation - cost_a_replacement
else:
calc3_free_cash_flow_on_project = -total_project_cost
calc3_free_cash_flow_on_project_array.append(calc3_free_cash_flow_on_project)
calc3_discounted_fcf_on_project = hitung_pv(wacc_on_project, seq, calc3_free_cash_flow_on_project)
calc4_principal_repayment = -calc_principal_payment
if seq > 0:
calc4_free_cash_flow_on_equity = calc4_principal_repayment + calc2_earning_after_tax + calc_depreciation - cost_a_replacement
else:
calc4_free_cash_flow_on_equity = -equity
calc4_free_cash_flow_on_equity_array.append(calc4_free_cash_flow_on_equity)
calc4_discounted_fcf_on_equity = hitung_pv(wacc_on_equity, seq, calc4_free_cash_flow_on_equity)
row_params = (
net_capacity_factor,
eaf,
production_bruto,
production_netto,
energy_sales,
fuel_consumption,
revenue_a,
revenue_b,
revenue_c,
revenue_d,
revenue_total,
revenue_pv,
revenue_annualized,
cost_a_replacement,
cost_a_pm,
cost_a_acquisition,
cost_a_pinjaman,
cost_a_depreciation,
cost_a_total,
cost_a_pv,
cost_a_annualized,
cost_c_fuel,
cost_c_pv,
cost_c_annualized,
cost_bd_om,
cost_bd_pm_nonmi,
cost_bd_bd,
cost_bd_total,
cost_bd_pv,
cost_bd_annualized,
total_expense,
total_cost_eac,
total_profit_loss,
total_residual_value,
calc_depreciation,
calc_interest_payment,
calc_principal_payment,
calc_dept_amount,
calc2_ebitda,
calc2_earning_before_tax,
calc2_tax,
calc2_earning_after_tax,
calc2_nopat,
calc3_interest_after_tax,
calc3_free_cash_flow_on_project,
calc3_discounted_fcf_on_project,
calc4_principal_repayment,
calc4_free_cash_flow_on_equity,
calc4_discounted_fcf_on_equity,
chart_total_revenue,
chart_revenue_a,
chart_revenue_b,
chart_revenue_c,
chart_revenue_d,
chart_revenue_annualized,
chart_fuel_cost_component_c,
chart_fuel_cost,
chart_fuel_cost_annualized,
chart_oem_component_bd,
chart_oem_bd_cost,
chart_oem_periodic_maintenance_cost,
chart_oem_annualized,
chart_capex_component_a,
chart_capex_biaya_investasi_tambahan,
chart_capex_acquisition_cost,
chart_capex_annualized,
seq,
SIMULATION_ID,
)
params.append(tuple(normalize_db_value(v) for v in row_params))
# 3. Bulk update dengan executemany
if params:
cur.executemany(update_sql, params)
conn.commit()
print("Bulk update selesai dan sudah di-commit.")
else:
print("Tidak ada data untuk di-update.")
# ===========================================================================
# ----- ==== HITUNGAN TERAKHIR LCC PLANT ==== -----
# ===========================================================================
IRR_ON_PROJECT = hitung_irr(calc3_free_cash_flow_on_project_array) # dalam %
NPV_ON_PROJECT = cumulative_npv(calc3_free_cash_flow_on_project_array[1:], wacc_on_project)[-1] + \
calc3_free_cash_flow_on_project_array[0]
IRR_ON_EQUITY = hitung_irr(calc4_free_cash_flow_on_equity_array) # dalam %
NPV_ON_EQUITY = cumulative_npv(calc4_free_cash_flow_on_equity_array[1:], wacc_on_equity)[-1] + \
calc4_free_cash_flow_on_equity_array[0]
ROA_ALL = sum(calc2_earning_after_tax_array) / sum(total_residual_value_array) * 100 # dalam %
ROA_TO_L = sum(calc2_earning_after_tax_array_sampai_sekarang) / sum(
total_residual_value_array_sampai_sekarang) * 100 # dalam %
update_kpi_sql = f"""
UPDATE {MASTER_TABLE}
SET value_num = %s
WHERE name = %s
AND simulation_id = %s \
"""
kpi_params_raw = [
(IRR_ON_EQUITY * 100, "calc_on_equity_irr"),
(NPV_ON_EQUITY, "calc_on_equity_npv"),
(IRR_ON_PROJECT * 100, "calc_on_project_irr"),
(NPV_ON_PROJECT, "calc_on_project_npv"),
(ROA_ALL, "calc_roa_all"),
(ROA_TO_L, "calc_roa_current"),
]
kpi_params = [
(
None if (value is None or isinstance(value, float) and math.isnan(value)) else value,
key,
SIMULATION_ID,
)
for value, key in kpi_params_raw
]
cur.executemany(update_kpi_sql, kpi_params)
conn.commit()
# ===========================================================================
cur.close()
conn.close()
except Exception as e:
if conn:
conn.rollback()
print(f"Terjadi error, transaksi di-rollback. Error: {e}")
try:
cur.close()
except Exception:
pass
if conn:
conn.close()
if __name__ == "__main__":
main()

@ -1,4 +1,5 @@
from sqlalchemy import Column, Float, Integer, String, UUID
from sqlalchemy import Column, Float, ForeignKey, Integer, UUID
from sqlalchemy.orm import relationship
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin
@ -6,8 +7,9 @@ from src.models import DefaultMixin, IdentityMixin
class PlantTransactionDataSimulations(Base, DefaultMixin, IdentityMixin):
__tablename__ = "lcc_plant_tr_data_simulations"
version = Column(Integer, nullable=False)
label = Column(String, nullable=False)
simulation_id = Column(
UUID(as_uuid=True), ForeignKey("lcc_simulations.id"), nullable=False
)
tahun = Column(Integer, nullable=False)
is_actual = Column(Integer, nullable=False)
seq = Column(Integer, nullable=False)
@ -95,3 +97,9 @@ class PlantTransactionDataSimulations(Base, DefaultMixin, IdentityMixin):
fs_chart_capex_biaya_investasi_tambahan = Column(Float, nullable=True)
fs_chart_capex_acquisition_cost = Column(Float, nullable=True)
fs_chart_capex_annualized = Column(Float, nullable=True)
simulation = relationship(
"Simulation",
back_populates="plant_transactions",
lazy="joined",
)

@ -1,4 +1,5 @@
from typing import List, Optional
from uuid import UUID
from fastapi import APIRouter, HTTPException, status, Query
from src.plant_transaction_data_simulations.model import PlantTransactionDataSimulations
@ -21,7 +22,7 @@ from src.plant_transaction_data_simulations.service import (
update_fs_charts_from_matrix,
)
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.database.service import CommonParameters
from src.database.core import DbSession
from src.auth.service import CurrentUser
from src.models import StandardResponse
@ -33,6 +34,7 @@ router = APIRouter()
async def get_transaction_datas(
db_session: DbSession,
common: CommonParameters,
simulation_id: UUID = Query(..., description="Simulation identifier"),
items_per_page: Optional[int] = Query(5),
search: Optional[str] = Query(None),
):
@ -42,6 +44,7 @@ async def get_transaction_datas(
items_per_page=items_per_page,
search=search,
common=common,
simulation_id=simulation_id,
)
# return
return StandardResponse(
@ -50,9 +53,15 @@ async def get_transaction_datas(
)
@router.get("/charts", response_model=StandardResponse[PlantChartDataSimulations])
async def get_chart_data(db_session: DbSession, common: CommonParameters):
async def get_chart_data(
db_session: DbSession,
common: CommonParameters,
simulation_id: UUID = Query(..., description="Simulation identifier"),
):
chart_data, bep_year, bep_total_lcc = await get_charts(
db_session=db_session, common=common
db_session=db_session,
common=common,
simulation_id=simulation_id,
)
if not chart_data:
raise HTTPException(
@ -82,6 +91,7 @@ async def import_fs_chart_data(
db_session=db_session,
payload=payload,
updated_by=current_user.name,
simulation_id=payload.simulation_id,
)
if not updated_records:

@ -7,8 +7,7 @@ from src.models import DefaultBase, Pagination
class PlantTransactionDataSimulationsBase(DefaultBase):
version: Optional[int] = Field(None, nullable=True, ge=0, le=9_999_999_999)
label: Optional[str] = Field(None, nullable=True)
simulation_id: Optional[UUID] = Field(None, nullable=True)
tahun: Optional[int] = Field(None, nullable=True, ge=1900, le=9999)
is_actual: Optional[int] = Field(None, nullable=True, ge=0, le=1)
seq: Optional[int] = Field(None, nullable=True, ge=0, le=9999)
@ -143,10 +142,11 @@ class PlantTransactionFSImportSimulations(DefaultBase):
data: List[List[Optional[Any]]]
is_actual: Optional[int] = Field(None, nullable=True, ge=0, le=1)
seq: Optional[int] = Field(None, nullable=True, ge=0, le=9999)
simulation_id: UUID = Field(..., nullable=False)
class PlantTransactionDataSimulationsCreate(PlantTransactionDataSimulationsBase):
pass
simulation_id: UUID = Field(..., nullable=False)
class PlantTransactionDataSimulationsUpdate(PlantTransactionDataSimulationsBase):

@ -12,6 +12,7 @@ from src.plant_transaction_data_simulations.schema import (
)
from src.database.service import search_filter_sort_paginate
from typing import Any, Dict, List, Optional
from uuid import UUID
from src.database.core import DbSession
from src.auth.service import CurrentUser
@ -121,12 +122,17 @@ async def get_all(
*,
db_session: DbSession,
items_per_page: Optional[int],
simulation_id: UUID,
search: Optional[str] = None,
common,
):
"""Returns all documents."""
query = Select(PlantTransactionDataSimulations).order_by(
query = (
Select(PlantTransactionDataSimulations)
.where(PlantTransactionDataSimulations.simulation_id == simulation_id)
.order_by(
PlantTransactionDataSimulations.seq.asc(), PlantTransactionDataSimulations.tahun.asc()
)
)
if search:
query = query.filter(
@ -144,9 +150,14 @@ async def get_charts(
*,
db_session: DbSession,
common,
simulation_id: UUID,
):
"""Returns all documents."""
query = Select(PlantTransactionDataSimulations).order_by(PlantTransactionDataSimulations.tahun.asc())
query = (
Select(PlantTransactionDataSimulations)
.where(PlantTransactionDataSimulations.simulation_id == simulation_id)
.order_by(PlantTransactionDataSimulations.tahun.asc())
)
results = await db_session.execute(query)
chart_data = results.scalars().all()
@ -285,6 +296,7 @@ async def update_fs_charts_from_matrix(
db_session: DbSession,
payload: PlantTransactionFSImportSimulations,
updated_by: Optional[str] = None,
simulation_id: UUID,
):
"""Update fs_* chart columns based on a transposed matrix payload."""
@ -299,7 +311,11 @@ async def update_fs_charts_from_matrix(
if not field_values:
continue
query = Select(PlantTransactionDataSimulations).where(PlantTransactionDataSimulations.tahun == year)
query = (
Select(PlantTransactionDataSimulations)
.where(PlantTransactionDataSimulations.tahun == year)
.where(PlantTransactionDataSimulations.simulation_id == simulation_id)
)
if payload.is_actual is not None:
query = query.where(PlantTransactionDataSimulations.is_actual == payload.is_actual)
if payload.seq is not None:

@ -0,0 +1,3 @@
from .router import router
__all__ = ["router"]

@ -0,0 +1,24 @@
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin
class Simulation(Base, DefaultMixin, IdentityMixin):
__tablename__ = "lcc_simulations"
label = Column(String, nullable=False)
version = Column(Integer, nullable=True)
masterdata_entries = relationship(
"MasterDataSimulation",
back_populates="simulation",
cascade="all, delete-orphan",
)
plant_transactions = relationship(
"PlantTransactionDataSimulations",
back_populates="simulation",
cascade="all, delete-orphan",
)

@ -0,0 +1,117 @@
from typing import Optional
from fastapi import APIRouter, HTTPException, Query, status
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters
from src.models import StandardResponse
from src.simulations.schema import (
SimulationCreate,
SimulationPagination,
SimulationRead,
SimulationRunPayload,
SimulationUpdate,
)
from src.simulations.service import create, delete, get, get_all, run_simulation, update
router = APIRouter()
@router.get("", response_model=StandardResponse[SimulationPagination])
async def get_simulations(
db_session: DbSession,
common: CommonParameters,
items_per_page: Optional[int] = Query(5),
search: Optional[str] = Query(None),
):
simulations = await get_all(
db_session=db_session,
items_per_page=items_per_page,
search=search,
common=common,
)
return StandardResponse(data=simulations, message="Data retrieved successfully")
@router.get("/{simulation_id}", response_model=StandardResponse[SimulationRead])
async def get_simulation(db_session: DbSession, simulation_id: str):
simulation = await get(db_session=db_session, simulation_id=simulation_id)
if not simulation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A simulation with this id does not exist.",
)
return StandardResponse(data=simulation, message="Data retrieved successfully")
@router.post("", response_model=StandardResponse[SimulationRead])
async def create_simulation(
db_session: DbSession, simulation_in: SimulationCreate, current_user: CurrentUser
):
simulation_in.created_by = current_user.name
simulation = await create(db_session=db_session, simulation_in=simulation_in)
return StandardResponse(data=simulation, message="Data created successfully")
@router.post("/run", response_model=StandardResponse[SimulationRead])
async def run_simulation_endpoint(
db_session: DbSession,
payload: SimulationRunPayload,
current_user: CurrentUser,
):
try:
simulation = await run_simulation(
db_session=db_session,
payload=payload,
current_user=current_user,
)
except RuntimeError as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(exc),
) from exc
return StandardResponse(
data=simulation,
message="Simulation executed successfully",
)
@router.put("/{simulation_id}", response_model=StandardResponse[SimulationRead])
async def update_simulation(
db_session: DbSession,
simulation_id: str,
simulation_in: SimulationUpdate,
current_user: CurrentUser,
):
simulation = await get(db_session=db_session, simulation_id=simulation_id)
if not simulation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A simulation with this id does not exist.",
)
simulation_in.updated_by = current_user.name
updated_simulation = await update(
db_session=db_session,
simulation=simulation,
simulation_in=simulation_in,
)
return StandardResponse(data=updated_simulation, message="Data updated successfully")
@router.delete("/{simulation_id}", response_model=StandardResponse[SimulationRead])
async def delete_simulation(db_session: DbSession, simulation_id: str):
simulation = await get(db_session=db_session, simulation_id=simulation_id)
if not simulation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=[{"msg": "A simulation with this id does not exist."}],
)
await delete(db_session=db_session, simulation_id=simulation_id)
return StandardResponse(data=simulation, message="Data deleted successfully")

@ -0,0 +1,42 @@
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from pydantic import Field
from src.models import DefaultBase, Pagination
class SimulationBase(DefaultBase):
label: Optional[str] = Field(None, nullable=False)
version: Optional[int] = Field(None, nullable=True, ge=0, le=9_999_999_999)
created_at: Optional[datetime] = Field(None, nullable=True)
updated_at: Optional[datetime] = Field(None, nullable=True)
created_by: Optional[str] = Field(None, nullable=True)
updated_by: Optional[str] = Field(None, nullable=True)
class SimulationCreate(SimulationBase):
label: str = Field(..., nullable=False)
class SimulationUpdate(SimulationBase):
pass
class SimulationRead(SimulationBase):
id: UUID
class SimulationPagination(Pagination):
items: List[SimulationRead] = []
class MasterDataOverride(DefaultBase):
name: str = Field(..., nullable=False)
value_num: Optional[float] = Field(None, nullable=True, le=1_000_000_000_000_000)
value_str: Optional[str] = Field(None, nullable=True)
class SimulationRunPayload(DefaultBase):
label: Optional[str] = Field(None, nullable=True)
overrides: List[MasterDataOverride] = Field(default_factory=list)

@ -0,0 +1,224 @@
import asyncio
import os
from subprocess import PIPE
from typing import Dict, List, Optional
from uuid import UUID
from sqlalchemy import Delete, Select, func
from sqlalchemy.inspection import inspect as sa_inspect
from src.database.core import DbSession
from src.database.service import search_filter_sort_paginate
from src.masterdata.model import MasterData
from src.masterdata_simulations.model import MasterDataSimulation
from src.plant_transaction_data.model import PlantTransactionData
from src.plant_transaction_data_simulations.model import PlantTransactionDataSimulations
from src.auth.service import CurrentUser
from .model import Simulation
from .schema import (
MasterDataOverride,
SimulationCreate,
SimulationRunPayload,
SimulationUpdate,
)
MODULES_PLANT_PATH = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../modules/plant")
)
SIMULATION_SCRIPT_PATH = os.path.join(MODULES_PLANT_PATH, "run_plant_simulation.py")
MASTERDATA_COPY_COLUMNS = [
column.key for column in sa_inspect(MasterData).mapper.column_attrs if column.key != "id"
]
PLANT_COPY_COLUMNS = [
column.key for column in sa_inspect(PlantTransactionData).mapper.column_attrs if column.key != "id"
]
async def get(*, db_session: DbSession, simulation_id: str) -> Optional[Simulation]:
query = Select(Simulation).where(Simulation.id == simulation_id)
result = await db_session.execute(query)
return result.scalars().one_or_none()
async def get_all(
*,
db_session: DbSession,
items_per_page: Optional[int],
search: Optional[str],
common,
):
query = Select(Simulation).order_by(Simulation.created_at.desc())
if search:
query = query.filter(Simulation.label.ilike(f"%{search}%"))
common["items_per_page"] = items_per_page
return await search_filter_sort_paginate(model=query, **common)
async def create(*, db_session: DbSession, simulation_in: SimulationCreate) -> Simulation:
data = simulation_in.model_dump()
if data.get("version") is None:
data["version"] = await _get_next_version(db_session)
if not data.get("label"):
data["label"] = f"Simulation {data['version']}"
simulation = Simulation(**data)
db_session.add(simulation)
await db_session.commit()
return simulation
async def update(
*, db_session: DbSession, simulation: Simulation, simulation_in: SimulationUpdate
) -> Simulation:
update_data = simulation_in.model_dump(exclude_defaults=True)
for field, value in update_data.items():
setattr(simulation, field, value)
await db_session.commit()
return simulation
async def delete(*, db_session: DbSession, simulation_id: str) -> None:
query = Delete(Simulation).where(Simulation.id == simulation_id)
await db_session.execute(query)
await db_session.commit()
async def run_simulation(
*,
db_session: DbSession,
payload: SimulationRunPayload,
current_user: CurrentUser,
) -> Simulation:
next_version = await _get_next_version(db_session)
label = payload.label or f"Simulation {next_version}"
simulation = Simulation(
label=label,
version=next_version,
created_by=current_user.name,
updated_by=current_user.name,
)
db_session.add(simulation)
await db_session.commit()
await _copy_masterdata_to_simulation(
db_session=db_session,
simulation_id=simulation.id,
overrides=payload.overrides,
actor=current_user.name,
)
await _copy_plant_transactions_to_simulation(
db_session=db_session,
simulation_id=simulation.id,
actor=current_user.name,
)
await db_session.commit()
try:
await _run_plant_calculation_for_simulation(simulation.id)
except RuntimeError as exc:
raise RuntimeError(str(exc)) from exc
await db_session.refresh(simulation)
return simulation
async def _get_next_version(db_session: DbSession) -> int:
query = Select(func.max(Simulation.version))
result = await db_session.execute(query)
max_version = result.scalar()
return (max_version or 0) + 1
async def _copy_masterdata_to_simulation(
*,
db_session: DbSession,
simulation_id: UUID,
overrides: List[MasterDataOverride],
actor: Optional[str],
):
override_map: Dict[str, MasterDataOverride] = {item.name: item for item in overrides or []}
result = await db_session.execute(Select(MasterData))
records = result.scalars().all()
if not records:
raise RuntimeError("Master data is empty; cannot run simulation.")
entries: List[MasterDataSimulation] = []
for record in records:
payload = {column: getattr(record, column) for column in MASTERDATA_COPY_COLUMNS}
payload["simulation_id"] = simulation_id
if actor:
payload["created_by"] = actor
payload["updated_by"] = actor
override = override_map.get(record.name)
if override:
if getattr(override, "value_num", None) is not None:
payload["value_num"] = override.value_num
if getattr(override, "value_str", None) is not None:
payload["value_str"] = override.value_str
entries.append(MasterDataSimulation(**payload))
db_session.add_all(entries)
async def _copy_plant_transactions_to_simulation(
*,
db_session: DbSession,
simulation_id: UUID,
actor: Optional[str],
):
result = await db_session.execute(Select(PlantTransactionData))
rows = result.scalars().all()
if not rows:
raise RuntimeError("Plant transaction data is empty; cannot run simulation.")
entries: List[PlantTransactionDataSimulations] = []
for row in rows:
payload = {column: getattr(row, column) for column in PLANT_COPY_COLUMNS}
payload["simulation_id"] = simulation_id
if actor:
payload["created_by"] = actor
payload["updated_by"] = actor
entries.append(PlantTransactionDataSimulations(**payload))
db_session.add_all(entries)
async def _run_plant_calculation_for_simulation(simulation_id: UUID) -> None:
env = os.environ.copy()
env["PLANT_SIMULATION_ID"] = str(simulation_id)
process = await asyncio.create_subprocess_exec(
"python",
SIMULATION_SCRIPT_PATH,
stdout=PIPE,
stderr=PIPE,
cwd=MODULES_PLANT_PATH,
env=env,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
error_output = stderr.decode().strip() or stdout.decode().strip()
raise RuntimeError(
f"Plant calculation failed for simulation {simulation_id}: {error_output}"
)
Loading…
Cancel
Save