update manpower and recalculation plant

main
MrWaradana 1 month ago
parent 2842c32c4b
commit 25cd2323df

@ -0,0 +1,13 @@
from sqlalchemy import Column, Float, Integer, String
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin
class ManpowerMaster(Base, DefaultMixin, IdentityMixin):
__tablename__ = "lcc_ms_manpower"
staff_job_level = Column(String, nullable=False)
salary_per_month_idr = Column(Float, nullable=False)
salary_per_day_idr = Column(Float, nullable=False)
salary_per_hour_idr = Column(Float, nullable=False)

@ -0,0 +1,95 @@
from typing import Optional
from fastapi import APIRouter, HTTPException, status, Query
from src.manpower_cost.model import ManpowerCost
from src.manpower_cost.schema import ManpowerCostPagination, ManpowerCostRead, ManpowerCostCreate, ManpowerCostUpdate
from src.manpower_cost.service import get, get_all, create, update, delete
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.database.core import DbSession
from src.auth.service import CurrentUser
from src.models import StandardResponse
router = APIRouter()
@router.get("", response_model=StandardResponse[ManpowerCostPagination])
async def get_yeardatas(
db_session: DbSession,
common: CommonParameters,
items_per_page: Optional[int] = Query(5),
search: Optional[str] = Query(None),
):
"""Get all acquisition_cost_data pagination."""
get_acquisition_cost_data = await get_all(
db_session=db_session,
items_per_page=items_per_page,
search=search,
common=common,
)
# return
return StandardResponse(
data=get_acquisition_cost_data,
message="Data retrieved successfully",
)
@router.get("/{acquisition_cost_data_id}", response_model=StandardResponse[ManpowerCostRead])
async def get_acquisition_cost_data(db_session: DbSession, acquisition_cost_data_id: str):
acquisition_cost_data = await get(db_session=db_session, acquisition_cost_data_id=acquisition_cost_data_id)
if not acquisition_cost_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(data=acquisition_cost_data, message="Data retrieved successfully")
@router.post("", response_model=StandardResponse[ManpowerCostRead])
async def create_acquisition_cost_data(
db_session: DbSession, acquisition_cost_data_in: ManpowerCostCreate, current_user: CurrentUser
):
acquisition_cost_data_in.created_by = current_user.name
acquisition_cost_data = await create(db_session=db_session, acquisition_data_in=acquisition_cost_data_in)
return StandardResponse(data=acquisition_cost_data, message="Data created successfully")
@router.put("/{acquisition_cost_data_id}", response_model=StandardResponse[ManpowerCostRead])
async def update_acquisition_cost_data(
db_session: DbSession,
acquisition_cost_data_id: str,
acquisition_cost_data_in: ManpowerCostUpdate,
current_user: CurrentUser,
):
acquisition_cost_data = await get(db_session=db_session, acquisition_cost_data_id=acquisition_cost_data_id)
if not acquisition_cost_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
acquisition_cost_data_in.updated_by = current_user.name
return StandardResponse(
data=await update(
db_session=db_session, acquisition_data=acquisition_cost_data, acquisition_data_in=acquisition_cost_data_in
),
message="Data updated successfully",
)
@router.delete("/{acquisition_cost_data_id}", response_model=StandardResponse[ManpowerCostRead])
async def delete_acquisition_cost_data(db_session: DbSession, acquisition_cost_data_id: str):
acquisition_cost_data = await get(db_session=db_session, acquisition_cost_data_id=acquisition_cost_data_id)
if not acquisition_cost_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=[{"msg": "A data with this id does not exist."}],
)
await delete(db_session=db_session, acquisition_cost_data_id=acquisition_cost_data_id)
return StandardResponse(message="Data deleted successfully", data=acquisition_cost_data)

@ -0,0 +1,33 @@
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from pydantic import Field
from src.models import DefaultBase, Pagination
class ManpowerCostBase(DefaultBase):
staff_job_level: str = Field(..., nullable=False)
salary_per_month_idr: float = Field(..., nullable=False)
salary_per_day_idr: float = Field(..., nullable=False)
salary_per_hour_idr: float = Field(..., nullable=False)
created_at: Optional[datetime] = Field(None, nullable=True)
updated_at: Optional[datetime] = Field(None, nullable=True)
created_by: Optional[str] = Field(None, nullable=True)
updated_by: Optional[str] = Field(None, nullable=True)
class ManpowerCostCreate(ManpowerCostBase):
pass
class ManpowerCostUpdate(ManpowerCostBase):
pass
class ManpowerCostRead(ManpowerCostBase):
id: UUID
class ManpowerCostPagination(Pagination):
items: List[ManpowerCostRead] = []

@ -0,0 +1,94 @@
from sqlalchemy import Select, Delete, cast, String
from src.manpower_cost.model import ManpowerCost
from src.manpower_cost.schema import ManpowerCostCreate, ManpowerCostUpdate
from src.database.service import search_filter_sort_paginate
from typing import Optional
from src.database.core import DbSession
from src.auth.service import CurrentUser
from src.equipment.model import Equipment
def _calculate_cost_unit_3(cost_unit_3_n_4: Optional[float]) -> Optional[float]:
"""Derive cost_unit_3 by splitting the combined unit 3&4 cost evenly."""
if cost_unit_3_n_4 is None:
return None
return cost_unit_3_n_4 / 2
async def _sync_equipment_acquisition_costs(
*, db_session: DbSession, category_no: Optional[str], cost_unit_3: Optional[float]
):
"""Keep equipment manpower cost in sync for the affected category."""
if not category_no or cost_unit_3 is None:
return
equipment_query = Select(Equipment).filter(Equipment.category_no == category_no)
equipment_result = await db_session.execute(equipment_query)
equipments = equipment_result.scalars().all()
for equipment in equipments:
if equipment.proportion is None:
continue
equipment.acquisition_cost = (equipment.proportion * 0.01) * cost_unit_3
async def get(*, db_session: DbSession, manpower_cost_id: str) -> Optional[ManpowerCost]:
"""Returns a document based on the given document id."""
query = Select(ManpowerCost).filter(ManpowerCost.id == manpower_cost_id)
result = await db_session.execute(query)
return result.scalars().one_or_none()
async def get_all(
*,
db_session: DbSession,
items_per_page: Optional[int],
search: Optional[str] = None,
common,
):
"""Returns all documents."""
query = Select(ManpowerCost).order_by(ManpowerCost.salary_per_month_idr.asc())
if search:
query = query.filter(
(cast(ManpowerCost.staff_job_level, String).ilike(f"%{search}%"))
| (cast(ManpowerCost.salary_per_month_idr, String).ilike(f"%{search}%"))
)
common["items_per_page"] = items_per_page
results = await search_filter_sort_paginate(model=query, **common)
# return results.scalars().all()
return results
async def create(*, db_session: DbSession, manpower_cost_in: ManpowerCostCreate):
"""Creates a new document."""
data = manpower_cost_in.model_dump()
manpower_cost = ManpowerCost(**data)
db_session.add(manpower_cost)
await db_session.commit()
return manpower_cost
async def update(
*, db_session: DbSession, manpower_cost: ManpowerCost, manpower_cost_in: ManpowerCostUpdate
):
"""Updates a document."""
data = manpower_cost_in.model_dump()
update_data = manpower_cost_in.model_dump(exclude_defaults=True)
for field in data:
if field in update_data:
setattr(manpower_cost, field, update_data[field])
await db_session.commit()
return manpower_cost
async def delete(*, db_session: DbSession, manpower_cost_id: str):
"""Deletes a document."""
query = Delete(ManpowerCost).where(ManpowerCost.id == manpower_cost_id)
await db_session.execute(query)
await db_session.commit()

@ -1,4 +1,4 @@
from sqlalchemy import Column, Float, String from sqlalchemy import Column, Float, String, Integer
from src.database.core import Base from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin from src.models import DefaultMixin, IdentityMixin
@ -15,3 +15,4 @@ class MasterData(Base, DefaultMixin, IdentityMixin):
unit_of_measurement = Column(String, nullable=True) unit_of_measurement = Column(String, nullable=True)
value_num = Column(Float, nullable=True) value_num = Column(Float, nullable=True)
value_str = Column(String, nullable=True) value_str = Column(String, nullable=True)
seq = Column(Integer, nullable=True)

@ -18,6 +18,7 @@ class MasterdataBase(DefaultBase):
None, nullable=True, le=1_000_000_000_000_000 # 1 quadrillion None, nullable=True, le=1_000_000_000_000_000 # 1 quadrillion
) )
value_str: Optional[str] = Field(None, nullable=True) value_str: Optional[str] = Field(None, nullable=True)
seq: Optional[int] = Field(None, nullable=True)
created_at: Optional[datetime] = Field(None, nullable=True) created_at: Optional[datetime] = Field(None, nullable=True)
updated_at: Optional[datetime] = Field(None, nullable=True) updated_at: Optional[datetime] = Field(None, nullable=True)
created_by: Optional[str] = Field(None, nullable=True) created_by: Optional[str] = Field(None, nullable=True)
@ -32,6 +33,7 @@ class MasterDataCreate(MasterdataBase):
..., nullable=True, le=1_000_000_000_000_000 # 1 quadrillion ..., nullable=True, le=1_000_000_000_000_000 # 1 quadrillion
) )
value_str: str = Field(..., nullable=True) value_str: str = Field(..., nullable=True)
seq: int = Field(..., nullable=True)
class MasterDataUpdate(MasterdataBase): class MasterDataUpdate(MasterdataBase):

@ -7,12 +7,181 @@ from datetime import datetime
from src.database.service import search_filter_sort_paginate from src.database.service import search_filter_sort_paginate
from .model import MasterData from .model import MasterData
from .schema import MasterDataCreate, MasterDataUpdate from .schema import MasterDataCreate, MasterDataUpdate
from typing import Optional, List from typing import Optional, List, Dict
from src.database.core import DbSession from src.database.core import DbSession
from src.auth.service import CurrentUser from src.auth.service import CurrentUser
MASTERDATA_ATTR_FIELDS = {
"name",
"description",
"unit_of_measurement",
"value_num",
"value_str",
"created_by",
"updated_by",
}
async def _apply_masterdata_update_logic(
*,
db_session: DbSession,
masterdata: MasterData,
masterdata_in: MasterDataUpdate,
records_by_name: Dict[str, MasterData],
):
"""Apply the same field update logic used in bulk_update."""
update_data = masterdata_in.model_dump(exclude_defaults=True)
async def get_value(name: str) -> float:
record = records_by_name.get(name)
if record is not None and record.value_num is not None:
return record.value_num
query_val = Select(MasterData).where(MasterData.name == name)
res_val = await db_session.execute(query_val)
row = res_val.scalars().one_or_none()
if row:
records_by_name[row.name] = row
return row.value_num if row.value_num is not None else 0
return 0
run_plant_calculation = False
umur_changed = False
umur_new_value = None
discount_rate_changed = False
def flag_special(record: MasterData):
"""
Menandai perubahan pada field khusus dalam data master.
Fungsi ini memeriksa record MasterData dan mengidentifikasi perubahan
pada field 'umur_teknis' dan 'discount_rate'. Jika field tersebut
ditemukan, fungsi akan mengatur flag nonlocal dan menyimpan nilai baru.
Args:
record (MasterData): Object MasterData yang akan diperiksa.
Side Effects:
- Mengatur umur_changed = True jika rec_name == "umur_teknis"
- Mengatur umur_new_value dengan nilai numerik dari record
- Mengatur discount_rate_changed = True jika rec_name == "discount_rate"
"""
nonlocal run_plant_calculation
rec_name = getattr(record, "name", None)
if rec_name in [
"umur_teknis",
"discount_rate",
"loan_portion",
"interest_rate",
"loan_tenor",
"corporate_tax_rate",
"wacc_on_equity",
"auxiliary",
"susut_trafo",
"sfc",
"electricity_price_a",
"electricity_price_b",
"electricity_price_c",
"electricity_price_d",
"harga_bahan_bakar",
"inflation_rate",
"loan",
"wacc_on_project",
"principal_interest_payment",
"equity",
]:
run_plant_calculation = True
for field, val in update_data.items():
if field in MASTERDATA_ATTR_FIELDS:
setattr(masterdata, field, val)
flag_special(masterdata)
else:
query_other = Select(MasterData).where(MasterData.name == field)
res_other = await db_session.execute(query_other)
other = res_other.scalars().one_or_none()
if other:
if isinstance(val, (int, float)):
other.value_num = val
flag_special(other)
else:
other.value_str = str(val)
if other.name:
records_by_name[other.name] = other
if "loan_portion" in update_data:
equity_portion = 100 - await get_value("loan_portion")
setattr(masterdata, "equity_portion", equity_portion)
total_project_cost = await get_value("total_project_cost")
loan = total_project_cost * (await get_value("loan_portion") / 100)
setattr(masterdata, "loan", loan)
equity = total_project_cost * (equity_portion / 100)
setattr(masterdata, "equity", equity)
if any(field in update_data for field in ["loan", "interest_rate", "loan_tenor"]):
pmt = calculate_pmt(
rate=await get_value("interest_rate"),
nper=await get_value("loan_tenor"),
pv=await get_value("loan"),
)
setattr(masterdata, "principal_interest_payment", pmt)
if any(
field in update_data
for field in [
"loan_portion",
"interest_rate",
"corporate_tax_rate",
"wacc_on_equity",
"equity_portion",
]
):
wacc = (
await get_value("loan_portion")
* (
await get_value("interest_rate")
* (1 - await get_value("corporate_tax_rate"))
)
) + (
await get_value("wacc_on_equity") * await get_value("equity_portion")
)
setattr(masterdata, "wacc_on_project", wacc)
return masterdata, run_plant_calculation
async def _trigger_masterdata_recalculation(
*, db_session: DbSession, run_plant_calculation: bool = False
):
"""Run downstream recalculation when special masterdata values change."""
if not run_plant_calculation:
return
try:
current_year = datetime.now(datetime.timezone.utc).year
directory_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../modules/plant")
)
script_path = os.path.join(directory_path, "run2.py")
process = await asyncio.create_subprocess_exec(
"python",
script_path,
stdout=PIPE,
stderr=PIPE,
cwd=directory_path,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
print(f"Plant recalc error: {stderr.decode()}")
else:
print(f"Plant recalc output: {stdout.decode()}")
except Exception as e:
print(f"Error during umur_teknis recalculation: {e}")
def calculate_pmt(rate, nper, pv): def calculate_pmt(rate, nper, pv):
""" """
rate: interest rate per period rate: interest rate per period
@ -40,7 +209,7 @@ async def get_all(
*, db_session: DbSession, items_per_page: int, search: str = None, common *, db_session: DbSession, items_per_page: int, search: str = None, common
) -> list[MasterData]: ) -> list[MasterData]:
"""Returns all documents.""" """Returns all documents."""
query = Select(MasterData) query = Select(MasterData).order_by(MasterData.seq.asc())
if search: if search:
query = query.filter(MasterData.name.ilike(f"%{search}%")) query = query.filter(MasterData.name.ilike(f"%{search}%"))
@ -61,65 +230,28 @@ async def create(*, db_session: DbSession, masterdata_in: MasterDataCreate):
async def update( async def update(
*, db_session: DbSession, masterdata: MasterData, masterdata_in: MasterDataUpdate *, db_session: DbSession, masterdata: MasterData, masterdata_in: MasterDataUpdate
): ):
"""Updates a document.""" """Updates a document using the same logic as bulk_update."""
data = masterdata_in.model_dump() records_by_name: Dict[str, MasterData] = {}
update_data = masterdata_in.model_dump(exclude_defaults=True) if masterdata.name:
records_by_name[masterdata.name] = masterdata
def get_value(data_list, name):
return next((m.value_num for m in data_list if m.name == name), 0) (
_,
# First update the direct values from update_data run_plant_calculation,
for field in update_data: ) = await _apply_masterdata_update_logic(
setattr(masterdata, field, update_data[field]) db_session=db_session,
masterdata=masterdata,
# Then check which formulas need to be recalculated based on updated fields masterdata_in=masterdata_in,
if "loan_portion" in update_data: records_by_name=records_by_name,
# Update equity_portion when loan_portion changes )
equity_portion = 100 - get_value(masterdata, "loan_portion")
setattr(masterdata, "equity_portion", equity_portion)
# Update loan amount when loan_portion changes
total_project_cost = get_value(masterdata, "total_project_cost")
loan = total_project_cost * (get_value(masterdata, "loan_portion") / 100)
setattr(masterdata, "loan", loan)
# Update equity when loan_portion changes
equity = total_project_cost * (equity_portion / 100)
setattr(masterdata, "equity", equity)
if any(field in update_data for field in ["loan", "interest_rate", "loan_tenor"]): await db_session.commit()
# Recalculate PMT when loan, interest_rate, or loan_tenor changes
pmt = calculate_pmt(
rate=get_value(masterdata, "interest_rate"),
nper=get_value(masterdata, "loan_tenor"),
pv=get_value(masterdata, "loan"),
)
setattr(masterdata, "principal_interest_payment", pmt)
if any( await _trigger_masterdata_recalculation(
field in update_data db_session=db_session,
for field in [ run_plant_calculation=run_plant_calculation,
"loan_portion", )
"interest_rate",
"corporate_tax_rate",
"wacc_on_equity",
"equity_portion",
]
):
# Recalculate WACC when any of its components change
wacc = (
get_value(masterdata, "loan_portion")
* (
get_value(masterdata, "interest_rate")
* (1 - get_value(masterdata, "corporate_tax_rate"))
)
) + (
get_value(masterdata, "wacc_on_equity")
* get_value(masterdata, "equity_portion")
)
setattr(masterdata, "wacc_on_project", wacc)
await db_session.commit()
return masterdata return masterdata
@ -147,10 +279,6 @@ async def bulk_update(
records_map = {str(record.id): record for record in records} records_map = {str(record.id): record for record in records}
records_by_name = {record.name: record for record in records} records_by_name = {record.name: record for record in records}
# Track if umur_teknis was changed in this batch
umur_changed = False
umur_new_value = None
# Process updates in batches # Process updates in batches
updated_records = [] updated_records = []
for masterdata_id, masterdata_in in zip(ids, updates): for masterdata_id, masterdata_in in zip(ids, updates):
@ -159,148 +287,26 @@ async def bulk_update(
print("Processing update for ID:", masterdata) print("Processing update for ID:", masterdata)
if not masterdata: if not masterdata:
continue continue
(
_,
run_plant_calculation,
) = await _apply_masterdata_update_logic(
db_session=db_session,
masterdata=masterdata,
masterdata_in=masterdata_in,
records_by_name=records_by_name,
)
data = masterdata_in.model_dump()
update_data = masterdata_in.model_dump(exclude_defaults=True)
async def get_value(name):
# Prefer values from the current batch (records_by_name)
rd = records_by_name.get(name)
if rd is not None and rd.value_num is not None:
return rd.value_num
# If not found in batch, try to fetch from DB
query_val = Select(MasterData).where(MasterData.name == name)
res_val = await db_session.execute(query_val)
row = res_val.scalars().one_or_none()
return row.value_num if row and row.value_num is not None else 0
# Update direct values (attributes present on MasterData)
# Recognised attribute fields on MasterData
attr_fields = {
"name",
"description",
"unit_of_measurement",
"value_num",
"value_str",
"created_by",
"updated_by",
}
for field, val in update_data.items():
if field in attr_fields:
setattr(masterdata, field, val)
# If this record itself represents umur_teknis, detect change
if getattr(masterdata, "name", None) == "umur_teknis":
umur_changed = True
try:
umur_new_value = float(masterdata.value_num)
except Exception:
umur_new_value = None
else:
# Field is not a direct attribute: treat it as a named masterdata
# e.g. payload included {"discount_rate": 5.0}
query_other = Select(MasterData).where(MasterData.name == field)
res_other = await db_session.execute(query_other)
other = res_other.scalars().one_or_none()
if other:
# Update numeric or string value depending on payload
if isinstance(val, (int, float)):
other.value_num = val
if other.name == "umur_teknis":
umur_changed = True
try:
umur_new_value = float(other.value_num)
except Exception:
umur_new_value = None
else:
other.value_str = str(val)
# keep updated record available for batch calculations
records_by_name[other.name] = other
# Handle interdependent calculations
if "loan_portion" in update_data:
equity_portion = 100 - await get_value("loan_portion")
setattr(masterdata, "equity_portion", equity_portion)
total_project_cost = await get_value("total_project_cost")
loan = total_project_cost * (await get_value("loan_portion") / 100)
setattr(masterdata, "loan", loan)
equity = total_project_cost * (equity_portion / 100)
setattr(masterdata, "equity", equity)
if any(field in update_data for field in ["loan", "interest_rate", "loan_tenor"]):
pmt = calculate_pmt(
rate=await get_value("interest_rate"),
nper=await get_value("loan_tenor"),
pv=await get_value("loan"),
)
setattr(masterdata, "principal_interest_payment", pmt)
if any(
field in update_data
for field in [
"loan_portion",
"interest_rate",
"corporate_tax_rate",
"wacc_on_equity",
"equity_portion",
]
):
wacc = (
await get_value("loan_portion")
* (
await get_value("interest_rate")
* (1 - await get_value("corporate_tax_rate"))
)
) + (
await get_value("wacc_on_equity") * await get_value("equity_portion")
)
setattr(masterdata, "wacc_on_project", wacc)
updated_records.append(masterdata) updated_records.append(masterdata)
print("Updated masterdata:", updated_records) print("Updated masterdata:", updated_records)
# Commit all changes in a single transaction # Commit all changes in a single transaction
await db_session.commit() await db_session.commit()
# If umur_teknis changed, clear projection rows and trigger recalculation await _trigger_masterdata_recalculation(
if umur_changed: db_session=db_session,
try: run_plant_calculation=run_plant_calculation,
# Determine current year )
current_year = datetime.now(datetime.timezone.utc).year
# Import model locally to avoid circular imports
from src.plant_transaction_data.model import PlantTransactionData
# Delete projection rows (is_actual == 0) from current year onward
del_q = Delete(PlantTransactionData).where(
PlantTransactionData.is_actual == 0,
PlantTransactionData.tahun >= current_year,
)
await db_session.execute(del_q)
await db_session.commit()
# Trigger recalculation by running the plant module script
directory_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../modules/plant")
)
script_path = os.path.join(directory_path, "run.py")
process = await asyncio.create_subprocess_exec(
"python",
script_path,
stdout=PIPE,
stderr=PIPE,
cwd=directory_path,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
print(f"Plant recalc error: {stderr.decode()}")
else:
print(f"Plant recalc output: {stdout.decode()}")
except Exception as e:
print(f"Error during umur_teknis recalculation: {e}")
return updated_records return updated_records

Loading…
Cancel
Save