add manpower cost service

main
MrWaradana 1 month ago
parent 53e5c93cfd
commit 2842c32c4b

@ -0,0 +1,13 @@
from sqlalchemy import Column, Float, Integer, String
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin
class ManpowerCost(Base, DefaultMixin, IdentityMixin):
__tablename__ = "lcc_manpower_cost"
staff_job_level = Column(String, nullable=False)
salary_per_month_idr = Column(Float, nullable=False)
salary_per_day_idr = Column(Float, nullable=False)
salary_per_hour_idr = Column(Float, nullable=False)

@ -0,0 +1,95 @@
from typing import Optional
from fastapi import APIRouter, HTTPException, status, Query
from src.manpower_cost.model import ManpowerCost
from src.manpower_cost.schema import ManpowerCostPagination, ManpowerCostRead, ManpowerCostCreate, ManpowerCostUpdate
from src.manpower_cost.service import get, get_all, create, update, delete
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.database.core import DbSession
from src.auth.service import CurrentUser
from src.models import StandardResponse
router = APIRouter()
@router.get("", response_model=StandardResponse[ManpowerCostPagination])
async def get_yeardatas(
db_session: DbSession,
common: CommonParameters,
items_per_page: Optional[int] = Query(5),
search: Optional[str] = Query(None),
):
"""Get all acquisition_cost_data pagination."""
get_acquisition_cost_data = await get_all(
db_session=db_session,
items_per_page=items_per_page,
search=search,
common=common,
)
# return
return StandardResponse(
data=get_acquisition_cost_data,
message="Data retrieved successfully",
)
@router.get("/{acquisition_cost_data_id}", response_model=StandardResponse[ManpowerCostRead])
async def get_acquisition_cost_data(db_session: DbSession, acquisition_cost_data_id: str):
acquisition_cost_data = await get(db_session=db_session, acquisition_cost_data_id=acquisition_cost_data_id)
if not acquisition_cost_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(data=acquisition_cost_data, message="Data retrieved successfully")
@router.post("", response_model=StandardResponse[ManpowerCostRead])
async def create_acquisition_cost_data(
db_session: DbSession, acquisition_cost_data_in: ManpowerCostCreate, current_user: CurrentUser
):
acquisition_cost_data_in.created_by = current_user.name
acquisition_cost_data = await create(db_session=db_session, acquisition_data_in=acquisition_cost_data_in)
return StandardResponse(data=acquisition_cost_data, message="Data created successfully")
@router.put("/{acquisition_cost_data_id}", response_model=StandardResponse[ManpowerCostRead])
async def update_acquisition_cost_data(
db_session: DbSession,
acquisition_cost_data_id: str,
acquisition_cost_data_in: ManpowerCostUpdate,
current_user: CurrentUser,
):
acquisition_cost_data = await get(db_session=db_session, acquisition_cost_data_id=acquisition_cost_data_id)
if not acquisition_cost_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
acquisition_cost_data_in.updated_by = current_user.name
return StandardResponse(
data=await update(
db_session=db_session, acquisition_data=acquisition_cost_data, acquisition_data_in=acquisition_cost_data_in
),
message="Data updated successfully",
)
@router.delete("/{acquisition_cost_data_id}", response_model=StandardResponse[ManpowerCostRead])
async def delete_acquisition_cost_data(db_session: DbSession, acquisition_cost_data_id: str):
acquisition_cost_data = await get(db_session=db_session, acquisition_cost_data_id=acquisition_cost_data_id)
if not acquisition_cost_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=[{"msg": "A data with this id does not exist."}],
)
await delete(db_session=db_session, acquisition_cost_data_id=acquisition_cost_data_id)
return StandardResponse(message="Data deleted successfully", data=acquisition_cost_data)

@ -0,0 +1,33 @@
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from pydantic import Field
from src.models import DefaultBase, Pagination
class ManpowerCostBase(DefaultBase):
staff_job_level: str = Field(..., nullable=False)
salary_per_month_idr: float = Field(..., nullable=False)
salary_per_day_idr: float = Field(..., nullable=False)
salary_per_hour_idr: float = Field(..., nullable=False)
created_at: Optional[datetime] = Field(None, nullable=True)
updated_at: Optional[datetime] = Field(None, nullable=True)
created_by: Optional[str] = Field(None, nullable=True)
updated_by: Optional[str] = Field(None, nullable=True)
class ManpowerCostCreate(ManpowerCostBase):
pass
class ManpowerCostUpdate(ManpowerCostBase):
pass
class ManpowerCostRead(ManpowerCostBase):
id: UUID
class ManpowerCostPagination(Pagination):
items: List[ManpowerCostRead] = []

@ -0,0 +1,94 @@
from sqlalchemy import Select, Delete, cast, String
from src.manpower_cost.model import ManpowerCost
from src.manpower_cost.schema import ManpowerCostCreate, ManpowerCostUpdate
from src.database.service import search_filter_sort_paginate
from typing import Optional
from src.database.core import DbSession
from src.auth.service import CurrentUser
from src.equipment.model import Equipment
def _calculate_cost_unit_3(cost_unit_3_n_4: Optional[float]) -> Optional[float]:
"""Derive cost_unit_3 by splitting the combined unit 3&4 cost evenly."""
if cost_unit_3_n_4 is None:
return None
return cost_unit_3_n_4 / 2
async def _sync_equipment_acquisition_costs(
*, db_session: DbSession, category_no: Optional[str], cost_unit_3: Optional[float]
):
"""Keep equipment manpower cost in sync for the affected category."""
if not category_no or cost_unit_3 is None:
return
equipment_query = Select(Equipment).filter(Equipment.category_no == category_no)
equipment_result = await db_session.execute(equipment_query)
equipments = equipment_result.scalars().all()
for equipment in equipments:
if equipment.proportion is None:
continue
equipment.acquisition_cost = (equipment.proportion * 0.01) * cost_unit_3
async def get(*, db_session: DbSession, manpower_cost_id: str) -> Optional[ManpowerCost]:
"""Returns a document based on the given document id."""
query = Select(ManpowerCost).filter(ManpowerCost.id == manpower_cost_id)
result = await db_session.execute(query)
return result.scalars().one_or_none()
async def get_all(
*,
db_session: DbSession,
items_per_page: Optional[int],
search: Optional[str] = None,
common,
):
"""Returns all documents."""
query = Select(ManpowerCost).order_by(ManpowerCost.salary_per_month_idr.asc())
if search:
query = query.filter(
(cast(ManpowerCost.staff_job_level, String).ilike(f"%{search}%"))
| (cast(ManpowerCost.salary_per_month_idr, String).ilike(f"%{search}%"))
)
common["items_per_page"] = items_per_page
results = await search_filter_sort_paginate(model=query, **common)
# return results.scalars().all()
return results
async def create(*, db_session: DbSession, manpower_cost_in: ManpowerCostCreate):
"""Creates a new document."""
data = manpower_cost_in.model_dump()
manpower_cost = ManpowerCost(**data)
db_session.add(manpower_cost)
await db_session.commit()
return manpower_cost
async def update(
*, db_session: DbSession, manpower_cost: ManpowerCost, manpower_cost_in: ManpowerCostUpdate
):
"""Updates a document."""
data = manpower_cost_in.model_dump()
update_data = manpower_cost_in.model_dump(exclude_defaults=True)
for field in data:
if field in update_data:
setattr(manpower_cost, field, update_data[field])
await db_session.commit()
return manpower_cost
async def delete(*, db_session: DbSession, manpower_cost_id: str):
"""Deletes a document."""
query = Delete(ManpowerCost).where(ManpowerCost.id == manpower_cost_id)
await db_session.execute(query)
await db_session.commit()
Loading…
Cancel
Save