You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

177 lines
5.2 KiB
Python

import asyncio
import os
import logging
from subprocess import PIPE
from sqlalchemy import Select, Delete, cast, String
from .model import PlantTransactionData
from .schema import PlantTransactionDataCreate, PlantTransactionDataUpdate
from src.database.service import search_filter_sort_paginate
from typing import Optional
from src.database.core import DbSession
from src.auth.service import CurrentUser
logger = logging.getLogger(__name__)
async def get(
*, db_session: DbSession, transaction_data_id: str
) -> Optional[PlantTransactionData]:
"""Returns a document based on the given document id."""
query = Select(PlantTransactionData).filter(
PlantTransactionData.id == transaction_data_id
)
result = await db_session.execute(query)
return result.scalars().one_or_none()
async def get_all(
*,
db_session: DbSession,
items_per_page: Optional[int],
search: Optional[str] = None,
common,
):
"""Returns all documents."""
query = Select(PlantTransactionData).order_by(PlantTransactionData.tahun.desc())
if search:
query = query.filter(
cast(PlantTransactionData.tahun, String).ilike(f"%{search}%")
)
common["items_per_page"] = items_per_page
results = await search_filter_sort_paginate(model=query, **common)
# return results.scalars().all()
return results
async def get_charts(
*,
db_session: DbSession,
common,
):
"""Returns all documents."""
query = Select(PlantTransactionData).order_by(PlantTransactionData.tahun.asc())
results = await db_session.execute(query)
chart_data = results.scalars().all()
bep_year = None
previous_year = 0
previous_total_cost = 0
previous_revenue = 0
bep_total_lcc = 0
for idx, item in enumerate(chart_data):
total_cost = (
item.chart_capex_annualized
+ item.chart_oem_annualized
+ item.chart_fuel_cost_annualized
)
revenue = item.chart_revenue_annualized
if previous_total_cost and previous_revenue:
if (previous_total_cost > previous_revenue and total_cost < revenue) or (
previous_total_cost < previous_revenue and total_cost > revenue
):
if total_cost < revenue:
bep_total_lcc = previous_total_cost
bep_year = previous_year
else:
bep_total_lcc = total_cost
bep_year = item.tahun
break
previous_total_cost = total_cost
previous_revenue = revenue
previous_year = item.tahun
return chart_data, bep_year, bep_total_lcc
async def create(
*, db_session: DbSession, transaction_data_in: PlantTransactionDataCreate
):
"""Creates a new document."""
transaction_data = PlantTransactionData(**transaction_data_in.model_dump())
db_session.add(transaction_data)
await db_session.commit()
# Get the directory of the current file
# directory_path = "../modules/plant"
directory_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../modules/plant")
)
# Construct path to the script
script_path = os.path.join(directory_path, "run.py")
try:
process = await asyncio.create_subprocess_exec(
"python", script_path, stdout=PIPE, stderr=PIPE, cwd=directory_path
)
stdout, stderr = await process.communicate()
# Check if the script executed successfully
if process.returncode != 0:
print(f"Script execution error: {stderr.decode()}")
else:
print(f"Script output: {stdout.decode()}")
except Exception as e:
print(f"Error executing script: {e}")
return transaction_data
async def update(
*,
db_session: DbSession,
transaction_data: PlantTransactionData,
transaction_data_in: PlantTransactionDataUpdate,
):
"""Updates a document."""
data = transaction_data_in.model_dump()
update_data = transaction_data_in.model_dump(exclude_defaults=True)
for field in data:
if field in update_data:
setattr(transaction_data, field, update_data[field])
await db_session.commit()
# Get the directory of the current file
# directory_path = "../modules/plant"
directory_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../modules/plant")
)
# Construct path to the script
script_path = os.path.join(directory_path, "run.py")
try:
process = await asyncio.create_subprocess_exec(
"python", script_path, stdout=PIPE, stderr=PIPE, cwd=directory_path
)
stdout, stderr = await process.communicate()
# Check if the script executed successfully
if process.returncode != 0:
print(f"Script execution error: {stderr.decode()}")
else:
print(f"Script output: {stdout.decode()}")
except Exception as e:
print(f"Error executing script: {e}")
return transaction_data
async def delete(*, db_session: DbSession, transaction_data_id: str):
"""Deletes a document."""
query = Delete(PlantTransactionData).where(
PlantTransactionData.id == transaction_data_id
)
await db_session.execute(query)
await db_session.commit()