import asyncio import os import logging from subprocess import PIPE from sqlalchemy import Select, Delete, cast, String from .model import PlantTransactionData from .schema import PlantTransactionDataCreate, PlantTransactionDataUpdate from src.database.service import search_filter_sort_paginate from typing import Optional from src.database.core import DbSession from src.auth.service import CurrentUser logger = logging.getLogger(__name__) async def get( *, db_session: DbSession, transaction_data_id: str ) -> Optional[PlantTransactionData]: """Returns a document based on the given document id.""" query = Select(PlantTransactionData).filter( PlantTransactionData.id == transaction_data_id ) result = await db_session.execute(query) return result.scalars().one_or_none() async def get_all( *, db_session: DbSession, items_per_page: Optional[int], search: Optional[str] = None, common, ): """Returns all documents.""" query = Select(PlantTransactionData).order_by( PlantTransactionData.is_actual.desc(), PlantTransactionData.tahun.desc() ) if search: query = query.filter( cast(PlantTransactionData.tahun, String).ilike(f"%{search}%") ) common["items_per_page"] = items_per_page results = await search_filter_sort_paginate(model=query, **common) # return results.scalars().all() return results async def get_charts( *, db_session: DbSession, common, ): """Returns all documents.""" query = Select(PlantTransactionData).order_by(PlantTransactionData.tahun.asc()) results = await db_session.execute(query) chart_data = results.scalars().all() bep_year = None previous_year = None previous_total_cost = None previous_revenue = None bep_total_lcc = 0 for idx, item in enumerate(chart_data): total_cost = ( item.chart_capex_annualized + item.chart_oem_annualized + item.chart_fuel_cost_annualized ) revenue = item.chart_revenue_annualized if previous_total_cost is not None and previous_revenue is not None: prev_diff = previous_total_cost - previous_revenue curr_diff = total_cost - revenue # If signs differ there's a crossing between previous and current point if prev_diff == 0: bep_year = previous_year bep_total_lcc = previous_total_cost break if prev_diff * curr_diff < 0: # Interpolate linearly between the two years to estimate BEP year denom = ( (total_cost - previous_total_cost) - (revenue - previous_revenue) ) if denom != 0: t = (previous_revenue - previous_total_cost) / denom # clamp t to [0,1] t = max(0.0, min(1.0, t)) try: bep_year = previous_year + t * (item.tahun - previous_year) except Exception: bep_year = previous_year bep_total_lcc = previous_total_cost + t * (total_cost - previous_total_cost) else: # fallback if interpolation is not possible if total_cost < revenue: bep_total_lcc = previous_total_cost bep_year = previous_year else: bep_total_lcc = total_cost bep_year = item.tahun break previous_total_cost = total_cost previous_revenue = revenue previous_year = item.tahun return chart_data, bep_year, bep_total_lcc async def create( *, db_session: DbSession, transaction_data_in: PlantTransactionDataCreate ): """Creates a new document.""" transaction_data = PlantTransactionData(**transaction_data_in.model_dump()) db_session.add(transaction_data) await db_session.commit() # Get the directory of the current file # directory_path = "../modules/plant" directory_path = os.path.abspath( os.path.join(os.path.dirname(__file__), "../modules/plant") ) # Construct path to the script script_path = os.path.join(directory_path, "run.py") try: process = await asyncio.create_subprocess_exec( "python", script_path, stdout=PIPE, stderr=PIPE, cwd=directory_path ) stdout, stderr = await process.communicate() # Check if the script executed successfully if process.returncode != 0: print(f"Script execution error: {stderr.decode()}") else: print(f"Script output: {stdout.decode()}") except Exception as e: print(f"Error executing script: {e}") return transaction_data async def update( *, db_session: DbSession, transaction_data: PlantTransactionData, transaction_data_in: PlantTransactionDataUpdate, ): """Updates a document.""" data = transaction_data_in.model_dump() update_data = transaction_data_in.model_dump(exclude_defaults=True) for field in data: if field in update_data: setattr(transaction_data, field, update_data[field]) await db_session.commit() # Get the directory of the current file # directory_path = "../modules/plant" directory_path = os.path.abspath( os.path.join(os.path.dirname(__file__), "../modules/plant") ) # Construct path to the script script_path = os.path.join(directory_path, "run.py") try: process = await asyncio.create_subprocess_exec( "python", script_path, stdout=PIPE, stderr=PIPE, cwd=directory_path ) stdout, stderr = await process.communicate() # Check if the script executed successfully if process.returncode != 0: print(f"Script execution error: {stderr.decode()}") else: print(f"Script output: {stdout.decode()}") except Exception as e: print(f"Error executing script: {e}") return transaction_data async def delete(*, db_session: DbSession, transaction_data_id: str): """Deletes a document.""" query = Delete(PlantTransactionData).where( PlantTransactionData.id == transaction_data_id ) await db_session.execute(query) await db_session.commit()