update fs plant transaction data

main
MrWaradana 1 month ago
parent 25cd2323df
commit cb229086e8

@ -48,9 +48,6 @@ async def _apply_masterdata_update_logic(
return 0
run_plant_calculation = False
umur_changed = False
umur_new_value = None
discount_rate_changed = False
def flag_special(record: MasterData):
"""

@ -9,8 +9,17 @@ from .schema import (
PlantTransactionChart,
PlantTransactionDataCreate,
PlantTransactionDataUpdate,
PlantTransactionFSImport,
)
from .service import (
get,
get_all,
get_charts,
create,
update,
delete,
update_fs_charts_from_matrix,
)
from .service import get, get_all, get_charts, create, update, delete
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.database.core import DbSession
@ -59,6 +68,35 @@ async def get_chart_data(db_session: DbSession, common: CommonParameters):
message="Data retrieved successfully",
)
@router.post(
"/charts/fs/import",
response_model=StandardResponse[List[PlantTransactionDataRead]],
)
async def import_fs_chart_data(
db_session: DbSession,
payload: PlantTransactionFSImport,
current_user: CurrentUser,
):
updated_records, missing_years = await update_fs_charts_from_matrix(
db_session=db_session,
payload=payload,
updated_by=current_user.name,
)
if not updated_records:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No plant transaction data found for the supplied years.",
)
message = "FS chart data updated successfully"
if missing_years:
years_text = ", ".join(str(year) for year in sorted(missing_years))
message += f"; missing years: {years_text}"
return StandardResponse(data=updated_records, message=message)
@router.get(
"/{transaction_data_id}", response_model=StandardResponse[PlantTransactionDataRead]
)
@ -90,6 +128,35 @@ async def create_transaction_data(
return StandardResponse(data=transaction_data, message="Data created successfully")
@router.put(
"/bulk", response_model=StandardResponse[List[PlantTransactionDataRead]]
)
async def bulk_update_transaction_data(
db_session: DbSession,
ids: List[str],
updates: List[PlantTransactionDataUpdate],
current_user: CurrentUser,
):
if len(ids) != len(updates):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="The number of IDs must match the number of update objects.",
)
# Set updated_by for each update object
for update_obj in updates:
update_obj.updated_by = current_user.name
updated_records = await update(
db_session=db_session,
ids=ids,
updates=updates,
)
return StandardResponse(
data=updated_records,
message="Bulk update completed successfully",
)
@router.put(
"/{transaction_data_id}", response_model=StandardResponse[PlantTransactionDataRead]

@ -1,5 +1,5 @@
from datetime import datetime
from typing import List, Optional
from typing import Any, List, Optional
from uuid import UUID
from pydantic import Field
@ -118,6 +118,12 @@ class PlantChartData(DefaultBase):
bep_total_lcc: Optional[float] = Field(float, nullable=True, ge=0, le=1_000_000_000_000_000)
class PlantTransactionFSImport(DefaultBase):
data: List[List[Optional[Any]]]
is_actual: Optional[int] = Field(None, nullable=True, ge=0, le=1)
seq: Optional[int] = Field(None, nullable=True, ge=0, le=9999)
class PlantTransactionDataCreate(PlantTransactionDataBase):
pass

@ -5,9 +5,13 @@ from subprocess import PIPE
from sqlalchemy import Select, Delete, cast, String
from src.plant_transaction_data.model import PlantTransactionData
from src.plant_transaction_data.schema import PlantTransactionDataCreate, PlantTransactionDataUpdate
from src.plant_transaction_data.schema import (
PlantTransactionDataCreate,
PlantTransactionDataUpdate,
PlantTransactionFSImport,
)
from src.database.service import search_filter_sort_paginate
from typing import Optional
from typing import Any, Dict, List, Optional
from src.database.core import DbSession
from src.auth.service import CurrentUser
@ -25,6 +29,83 @@ def _safe_float(x: object) -> float:
return 0.0
_FS_LABEL_FIELD_MAP: Dict[str, str] = {
"Total Revenue": "fs_chart_total_revenue",
"Revenue A": "fs_chart_revenue_a",
"Revenue B": "fs_chart_revenue_b",
"Revenue C": "fs_chart_revenue_c",
"Revenue D": "fs_chart_revenue_d",
"Revenue Annualized": "fs_chart_revenue_annualized",
"Fuel Cost (Component C)": "fs_chart_fuel_cost_component_c",
"Fuel Cost": "fs_chart_fuel_cost",
"Fuel Cost Annualized": "fs_chart_fuel_cost_annualized",
"O and M Cost (Component B and D)": "fs_chart_oem_component_bd",
"O and M Cost": "fs_chart_oem_bd_cost",
"Periodic Maintenance Cost (NonMI)": "fs_chart_oem_periodic_maintenance_cost",
"O and M Cost Annualized": "fs_chart_oem_annualized",
"Capex (Component A)": "fs_chart_capex_component_a",
"Biaya Investasi Tambahan": "fs_chart_capex_biaya_investasi_tambahan",
"Acquisition Cost": "fs_chart_capex_acquisition_cost",
"Capex Annualized": "fs_chart_capex_annualized",
}
def _extract_years(header_row: List[Any]) -> List[int]:
years: List[int] = []
for cell in header_row[2:]:
if cell is None:
continue
try:
years.append(int(float(cell)))
except Exception:
continue
return years
def _resolve_label(row: List[Any]) -> Optional[str]:
for candidate in row[:2]:
if isinstance(candidate, str):
label = candidate.strip()
if label:
return label
return None
def _build_fs_year_value_map(matrix: List[List[Any]]) -> Dict[int, Dict[str, float]]:
if not matrix:
return {}
header = matrix[0]
years = _extract_years(header)
if not years:
return {}
year_map: Dict[int, Dict[str, float]] = {year: {} for year in years}
for row in matrix[1:]:
label = _resolve_label(row)
if not label:
continue
field_name = _FS_LABEL_FIELD_MAP.get(label)
if not field_name:
continue
for idx, year in enumerate(years):
col_idx = idx + 2
if col_idx >= len(row):
continue
value = row[col_idx]
if value is None:
continue
try:
year_map[year][field_name] = _safe_float(value)
except Exception:
continue
return year_map
async def get(
*, db_session: DbSession, transaction_data_id: str
) -> Optional[PlantTransactionData]:
@ -199,6 +280,48 @@ async def update(
return transaction_data
async def update_fs_charts_from_matrix(
*,
db_session: DbSession,
payload: PlantTransactionFSImport,
updated_by: Optional[str] = None,
):
"""Update fs_* chart columns based on a transposed matrix payload."""
year_value_map = _build_fs_year_value_map(payload.data)
if not year_value_map:
return [], []
updated_records: List[PlantTransactionData] = []
missing_years: List[int] = []
for year, field_values in year_value_map.items():
if not field_values:
continue
query = Select(PlantTransactionData).where(PlantTransactionData.tahun == year)
if payload.is_actual is not None:
query = query.where(PlantTransactionData.is_actual == payload.is_actual)
if payload.seq is not None:
query = query.where(PlantTransactionData.seq == payload.seq)
result = await db_session.execute(query)
records = result.scalars().all()
if not records:
missing_years.append(year)
continue
for record in records:
for field_name, value in field_values.items():
setattr(record, field_name, value)
if updated_by:
record.updated_by = updated_by
updated_records.append(record)
await db_session.commit()
return updated_records, missing_years
async def delete(*, db_session: DbSession, transaction_data_id: str):
"""Deletes a document."""
query = Delete(PlantTransactionData).where(

Loading…
Cancel
Save