You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

348 lines
11 KiB
Python

import asyncio
import os
import logging
from subprocess import PIPE
from sqlalchemy import Select, Delete, cast, String
from src.plant_transaction_data_simulations.model import PlantTransactionDataSimulations
from src.plant_transaction_data_simulations.schema import (
PlantTransactionDataSimulationsCreate,
PlantTransactionDataSimulationsUpdate,
PlantTransactionFSImportSimulations,
)
from src.database.service import search_filter_sort_paginate
from typing import Any, Dict, List, Optional
from uuid import UUID
from src.database.core import DbSession
from src.auth.service import CurrentUser
logger = logging.getLogger(__name__)
def _safe_float(x: object) -> float:
"""Safely convert `x` to float, returning 0.0 for None or invalid values."""
try:
if x is None:
return 0.0
return float(x)
except Exception:
return 0.0
_FS_LABEL_FIELD_MAP: Dict[str, str] = {
"Total Revenue": "fs_chart_total_revenue",
"Revenue A": "fs_chart_revenue_a",
"Revenue B": "fs_chart_revenue_b",
"Revenue C": "fs_chart_revenue_c",
"Revenue D": "fs_chart_revenue_d",
"Revenue Annualized": "fs_chart_revenue_annualized",
"Fuel Cost (Component C)": "fs_chart_fuel_cost_component_c",
"Fuel Cost": "fs_chart_fuel_cost",
"Fuel Cost Annualized": "fs_chart_fuel_cost_annualized",
"O and M Cost (Component B and D)": "fs_chart_oem_component_bd",
"O and M Cost": "fs_chart_oem_bd_cost",
"Periodic Maintenance Cost (NonMI)": "fs_chart_oem_periodic_maintenance_cost",
"O and M Cost Annualized": "fs_chart_oem_annualized",
"Capex (Component A)": "fs_chart_capex_component_a",
"Biaya Investasi Tambahan": "fs_chart_capex_biaya_investasi_tambahan",
"Acquisition Cost": "fs_chart_capex_acquisition_cost",
"Capex Annualized": "fs_chart_capex_annualized",
}
def _extract_years(header_row: List[Any]) -> List[int]:
years: List[int] = []
for cell in header_row[2:]:
if cell is None:
continue
try:
years.append(int(float(cell)))
except Exception:
continue
return years
def _resolve_label(row: List[Any]) -> Optional[str]:
for candidate in row[:2]:
if isinstance(candidate, str):
label = candidate.strip()
if label:
return label
return None
def _build_fs_year_value_map(matrix: List[List[Any]]) -> Dict[int, Dict[str, float]]:
if not matrix:
return {}
header = matrix[0]
years = _extract_years(header)
if not years:
return {}
year_map: Dict[int, Dict[str, float]] = {year: {} for year in years}
for row in matrix[1:]:
label = _resolve_label(row)
if not label:
continue
field_name = _FS_LABEL_FIELD_MAP.get(label)
if not field_name:
continue
for idx, year in enumerate(years):
col_idx = idx + 2
if col_idx >= len(row):
continue
value = row[col_idx]
if value is None:
continue
try:
year_map[year][field_name] = _safe_float(value)
except Exception:
continue
return year_map
async def get(
*, db_session: DbSession, transaction_data_id: str
) -> Optional[PlantTransactionDataSimulations]:
"""Returns a document based on the given document id."""
query = Select(PlantTransactionDataSimulations).filter(
PlantTransactionDataSimulations.id == transaction_data_id
)
result = await db_session.execute(query)
return result.scalars().one_or_none()
async def get_all(
*,
db_session: DbSession,
items_per_page: Optional[int],
simulation_id: UUID,
search: Optional[str] = None,
common,
):
"""Returns all documents."""
query = (
Select(PlantTransactionDataSimulations)
.where(PlantTransactionDataSimulations.simulation_id == simulation_id)
.order_by(
PlantTransactionDataSimulations.seq.asc(), PlantTransactionDataSimulations.tahun.asc()
)
)
if search:
query = query.filter(
cast(PlantTransactionDataSimulations.tahun, String).ilike(f"%{search}%")
)
common["items_per_page"] = items_per_page
results = await search_filter_sort_paginate(model=query, **common)
# return results.scalars().all()
return results
async def get_charts(
*,
db_session: DbSession,
common,
simulation_id: UUID,
):
"""Returns all documents."""
query = (
Select(PlantTransactionDataSimulations)
.where(PlantTransactionDataSimulations.simulation_id == simulation_id)
.order_by(PlantTransactionDataSimulations.tahun.asc())
)
results = await db_session.execute(query)
chart_data = results.scalars().all()
bep_year = None
previous_year = None
previous_total_cost = None
previous_revenue = None
bep_total_lcc = 0
for idx, item in enumerate(chart_data):
total_cost = (
_safe_float(item.chart_capex_annualized)
+ _safe_float(item.chart_oem_annualized)
+ _safe_float(item.chart_fuel_cost_annualized)
)
revenue = _safe_float(item.chart_revenue_annualized)
if previous_total_cost is not None and previous_revenue is not None:
prev_diff = previous_total_cost - previous_revenue
curr_diff = total_cost - revenue
# If signs differ there's a crossing between previous and current point
if prev_diff == 0:
bep_year = previous_year
bep_total_lcc = previous_total_cost
break
if prev_diff * curr_diff < 0:
# Interpolate linearly between the two years to estimate BEP year
denom = ( (total_cost - previous_total_cost) - (revenue - previous_revenue) )
if denom != 0:
t = (previous_revenue - previous_total_cost) / denom
# clamp t to [0,1]
t = max(0.0, min(1.0, t))
try:
bep_year = previous_year + t * (item.tahun - previous_year)
except Exception:
bep_year = previous_year
bep_total_lcc = previous_total_cost + t * (total_cost - previous_total_cost)
else:
# fallback if interpolation is not possible
if total_cost < revenue:
bep_total_lcc = previous_total_cost
bep_year = previous_year
else:
bep_total_lcc = total_cost
bep_year = item.tahun
break
previous_total_cost = total_cost
previous_revenue = revenue
previous_year = item.tahun
return chart_data, int(bep_year) if bep_year is not None else None, bep_total_lcc
async def create(
*, db_session: DbSession, transaction_data_in: PlantTransactionDataSimulationsCreate
):
"""Creates a new document."""
transaction_data = PlantTransactionDataSimulations(**transaction_data_in.model_dump())
db_session.add(transaction_data)
await db_session.commit()
# Get the directory of the current file
# directory_path = "../modules/plant"
directory_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../modules/plant")
)
# Construct path to the script
script_path = os.path.join(directory_path, "run_plant_simulation.py")
try:
process = await asyncio.create_subprocess_exec(
"python", script_path, stdout=PIPE, stderr=PIPE, cwd=directory_path
)
stdout, stderr = await process.communicate()
# Check if the script executed successfully
if process.returncode != 0:
print(f"Script execution error: {stderr.decode()}")
else:
print(f"Script output: {stdout.decode()}")
except Exception as e:
print(f"Error executing script: {e}")
return transaction_data
async def update(
*,
db_session: DbSession,
transaction_data: PlantTransactionDataSimulations,
transaction_data_in: PlantTransactionDataSimulationsUpdate,
):
"""Updates a document."""
data = transaction_data_in.model_dump()
update_data = transaction_data_in.model_dump(exclude_defaults=True)
for field in data:
if field in update_data:
setattr(transaction_data, field, update_data[field])
await db_session.commit()
# Get the directory of the current file
# directory_path = "../modules/plant"
directory_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../modules/plant")
)
# Construct path to the script
script_path = os.path.join(directory_path, "run_plant_simulation.py")
try:
process = await asyncio.create_subprocess_exec(
"python", script_path, stdout=PIPE, stderr=PIPE, cwd=directory_path
)
stdout, stderr = await process.communicate()
# Check if the script executed successfully
if process.returncode != 0:
print(f"Script execution error: {stderr.decode()}")
else:
print(f"Script output: {stdout.decode()}")
except Exception as e:
print(f"Error executing script: {e}")
return transaction_data
async def update_fs_charts_from_matrix(
*,
db_session: DbSession,
payload: PlantTransactionFSImportSimulations,
updated_by: Optional[str] = None,
simulation_id: UUID,
):
"""Update fs_* chart columns based on a transposed matrix payload."""
year_value_map = _build_fs_year_value_map(payload.data)
if not year_value_map:
return [], []
updated_records: List[PlantTransactionDataSimulations] = []
missing_years: List[int] = []
for year, field_values in year_value_map.items():
if not field_values:
continue
query = (
Select(PlantTransactionDataSimulations)
.where(PlantTransactionDataSimulations.tahun == year)
.where(PlantTransactionDataSimulations.simulation_id == simulation_id)
)
if payload.is_actual is not None:
query = query.where(PlantTransactionDataSimulations.is_actual == payload.is_actual)
if payload.seq is not None:
query = query.where(PlantTransactionDataSimulations.seq == payload.seq)
result = await db_session.execute(query)
records = result.scalars().all()
if not records:
missing_years.append(year)
continue
for record in records:
for field_name, value in field_values.items():
setattr(record, field_name, value)
if updated_by:
record.updated_by = updated_by
updated_records.append(record)
await db_session.commit()
return updated_records, missing_years
async def delete(*, db_session: DbSession, transaction_data_id: str):
"""Deletes a document."""
query = Delete(PlantTransactionDataSimulations).where(
PlantTransactionDataSimulations.id == transaction_data_id
)
await db_session.execute(query)
await db_session.commit()