feat: update bulk on master data

main
MrWaradana 11 months ago
parent 02ffe5785a
commit 9f49cf34b2

@ -1,14 +1,16 @@
from typing import Optional from typing import Optional, List
from fastapi import APIRouter, HTTPException, status, Query from fastapi import APIRouter, HTTPException, status, Query
from sqlalchemy import Select
from .model import MasterData from .model import MasterData
from .schema import ( from .schema import (
MasterDataPagination, MasterDataPagination,
MasterDataRead, MasterDataRead,
MasterDataCreate, MasterDataCreate,
MasterDataUpdate, MasterDataUpdate,
BulkMasterDataUpdate,
) )
from .service import get, get_all, create, update, delete from .service import get, get_all, create, update, bulk_update, delete
from src.database.service import CommonParameters, search_filter_sort_paginate from src.database.service import CommonParameters, search_filter_sort_paginate
from src.database.core import DbSession from src.database.core import DbSession
@ -17,7 +19,6 @@ from src.models import StandardResponse
router = APIRouter() router = APIRouter()
@router.get("", response_model=StandardResponse[MasterDataPagination]) @router.get("", response_model=StandardResponse[MasterDataPagination])
async def get_masterdatas( async def get_masterdatas(
db_session: DbSession, db_session: DbSession,
@ -61,6 +62,39 @@ async def create_masterdata(
return StandardResponse(data=masterdata, message="Data created successfully") return StandardResponse(data=masterdata, message="Data created successfully")
@router.put("/bulk", response_model=StandardResponse[List[MasterDataRead]])
async def update_masterdata(
db_session: DbSession,
data: BulkMasterDataUpdate,
current_user: CurrentUser,
):
# Extract IDs and updates
updates = []
ids = []
for item in data.updates:
masterdata_id = item.pop("id") # remove id from update data
# Create MasterDataUpdate object with remaining data
update = MasterDataUpdate(**item, updated_by=current_user.name)
updates.append(update)
ids.append(masterdata_id)
# Verify all records exist
query = Select(MasterData).where(MasterData.id.in_(ids))
result = await db_session.execute(query)
existing_records = result.scalars().all()
if len(existing_records) != len(ids):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Some records do not exist",
)
return StandardResponse(
data=await bulk_update(db_session=db_session, updates=updates, ids=ids),
message="Data updated successfully",
)
@router.put("/{masterdata_id}", response_model=StandardResponse[MasterDataRead]) @router.put("/{masterdata_id}", response_model=StandardResponse[MasterDataRead])
async def update_masterdata( async def update_masterdata(
db_session: DbSession, db_session: DbSession,

@ -34,6 +34,10 @@ class MasterDataUpdate(MasterdataBase):
pass pass
class BulkMasterDataUpdate(MasterdataBase):
updates: List[dict] # each dict contains id and update data
class MasterDataRead(MasterdataBase): class MasterDataRead(MasterdataBase):
id: UUID id: UUID

@ -3,12 +3,28 @@ from sqlalchemy import Select, Delete
from src.database.service import search_filter_sort_paginate from src.database.service import search_filter_sort_paginate
from .model import MasterData from .model import MasterData
from .schema import MasterDataCreate, MasterDataUpdate from .schema import MasterDataCreate, MasterDataUpdate
from typing import Optional from typing import Optional, List
from src.database.core import DbSession from src.database.core import DbSession
from src.auth.service import CurrentUser from src.auth.service import CurrentUser
def calculate_pmt(rate, nper, pv):
"""
rate: interest rate per period
nper: total number of payment periods
pv: present value (loan amount)
"""
# Convert percentage to decimal if needed (e.g., 5% to 0.05)
rate = float(rate) / 100 if rate > 1 else float(rate)
# PMT formula: PMT = PV * (r * (1 + r)^n) / ((1 + r)^n - 1)
if rate == 0:
return -pv / nper
else:
return -pv * (rate * (1 + rate) ** nper) / ((1 + rate) ** nper - 1)
async def get(*, db_session: DbSession, masterdata_id: str) -> Optional[MasterData]: async def get(*, db_session: DbSession, masterdata_id: str) -> Optional[MasterData]:
"""Returns a document based on the given document id.""" """Returns a document based on the given document id."""
query = Select(MasterData).filter(MasterData.id == masterdata_id) query = Select(MasterData).filter(MasterData.id == masterdata_id)
@ -43,16 +59,155 @@ async def update(
): ):
"""Updates a document.""" """Updates a document."""
data = masterdata_in.model_dump() data = masterdata_in.model_dump()
update_data = masterdata_in.model_dump(exclude_defaults=True) update_data = masterdata_in.model_dump(exclude_defaults=True)
for field in data: def get_value(data_list, name):
if field in update_data: return next((m.value_num for m in data_list if m.name == name), 0)
# First update the direct values from update_data
for field in update_data:
setattr(masterdata, field, update_data[field])
# Then check which formulas need to be recalculated based on updated fields
if "loan_portion" in update_data:
# Update equity_portion when loan_portion changes
equity_portion = 100 - get_value(masterdata, "loan_portion")
setattr(masterdata, "equity_portion", equity_portion)
# Update loan amount when loan_portion changes
total_project_cost = get_value(masterdata, "total_project_cost")
loan = total_project_cost * (get_value(masterdata, "loan_portion") / 100)
setattr(masterdata, "loan", loan)
# Update equity when loan_portion changes
equity = total_project_cost * (equity_portion / 100)
setattr(masterdata, "equity", equity)
if any(field in update_data for field in ["loan", "interest_rate", "loan_tenor"]):
# Recalculate PMT when loan, interest_rate, or loan_tenor changes
pmt = calculate_pmt(
rate=get_value(masterdata, "interest_rate"),
nper=get_value(masterdata, "loan_tenor"),
pv=get_value(masterdata, "loan"),
)
setattr(masterdata, "principal_interest_payment", pmt)
if any(
field in update_data
for field in [
"loan_portion",
"interest_rate",
"corporate_tax_rate",
"wacc_on_equity",
"equity_portion",
]
):
# Recalculate WACC when any of its components change
wacc = (
get_value(masterdata, "loan_portion")
* (
get_value(masterdata, "interest_rate")
* (1 - get_value(masterdata, "corporate_tax_rate"))
)
) + (
get_value(masterdata, "wacc_on_equity")
* get_value(masterdata, "equity_portion")
)
setattr(masterdata, "wacc_on_project", wacc)
await db_session.commit()
return masterdata
async def bulk_update(
*, db_session: DbSession, updates: List[MasterDataUpdate], ids: List[str]
) -> List[MasterData]:
"""
Performs bulk update on multiple MasterData records.
Args:
db_session: Database session
updates: List of MasterDataUpdate objects containing the updates
ids: List of MasterData IDs to update
Returns:
List of updated MasterData objects
"""
# Fetch all records to be updated in one query
query = Select(MasterData).where(MasterData.id.in_(ids))
result = await db_session.execute(query)
records = result.scalars().all()
# Create a mapping of id to record for easier access
records_map = {record.id: record for record in records}
# Process updates in batches
updated_records = []
for masterdata_id, masterdata_in in zip(ids, updates):
masterdata = records_map.get(masterdata_id)
if not masterdata:
continue
data = masterdata_in.model_dump()
update_data = masterdata_in.model_dump(exclude_defaults=True)
def get_value(obj, name):
return next((m.value_num for m in obj if m.name == name), 0)
# Update direct values
for field in update_data:
setattr(masterdata, field, update_data[field]) setattr(masterdata, field, update_data[field])
# Handle interdependent calculations
if "loan_portion" in update_data:
equity_portion = 100 - get_value(masterdata, "loan_portion")
setattr(masterdata, "equity_portion", equity_portion)
total_project_cost = get_value(masterdata, "total_project_cost")
loan = total_project_cost * (get_value(masterdata, "loan_portion") / 100)
setattr(masterdata, "loan", loan)
equity = total_project_cost * (equity_portion / 100)
setattr(masterdata, "equity", equity)
if any(
field in update_data for field in ["loan", "interest_rate", "loan_tenor"]
):
pmt = calculate_pmt(
rate=get_value(masterdata, "interest_rate"),
nper=get_value(masterdata, "loan_tenor"),
pv=get_value(masterdata, "loan"),
)
setattr(masterdata, "principal_interest_payment", pmt)
if any(
field in update_data
for field in [
"loan_portion",
"interest_rate",
"corporate_tax_rate",
"wacc_on_equity",
"equity_portion",
]
):
wacc = (
get_value(masterdata, "loan_portion")
* (
get_value(masterdata, "interest_rate")
* (1 - get_value(masterdata, "corporate_tax_rate"))
)
) + (
get_value(masterdata, "wacc_on_equity")
* get_value(masterdata, "equity_portion")
)
setattr(masterdata, "wacc_on_project", wacc)
updated_records.append(masterdata)
# Commit all changes in a single transaction
await db_session.commit() await db_session.commit()
return masterdata return updated_records
async def delete(*, db_session: DbSession, masterdata_id: str): async def delete(*, db_session: DbSession, masterdata_id: str):

Loading…
Cancel
Save