You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
679 lines
26 KiB
Python
679 lines
26 KiB
Python
import os
|
|
import logging
|
|
from typing import Optional, TypedDict, Any
|
|
|
|
from sqlalchemy import Select, Delete, Float, func, cast, String, text, case, asc, desc
|
|
from sqlalchemy.orm import selectinload
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from asyncpg.exceptions import InterfaceError as AsyncpgInterfaceError
|
|
|
|
from src.database.service import search_filter_sort_paginate
|
|
from src.equipment.model import Equipment, EquipmentTransactionRecords
|
|
from src.acquisition_cost.model import AcquisitionData
|
|
from src.yeardata.model import Yeardata
|
|
from ..equipment_master.model import EquipmentMaster
|
|
from .schema import EquipmentCreate, EquipmentUpdate, MasterBase
|
|
|
|
from src.database.core import DbSession, CollectorDbSession, collector_async_session
|
|
from src.auth.service import CurrentUser
|
|
from src.config import RELIABILITY_APP_URL
|
|
import httpx
|
|
|
|
from src.modules.equipment.run import main
|
|
from src.modules.equipment.Prediksi import main as predict_main
|
|
from src.modules.equipment.Eac import main as eac_main
|
|
from src.modules.equipment.where_query_sql import get_where_query_sql_all_worktype
|
|
import datetime
|
|
import math
|
|
|
|
|
|
class CategoryRule(TypedDict, total=False):
|
|
category_no: str
|
|
proportion: float
|
|
|
|
|
|
# Rules extracted from the provided proportion/category mapping sheet.
|
|
# Each entry tells the service to reuse the acquisition data (and optionally
|
|
# override the proportion) of the referenced category. Update the mapping below
|
|
# whenever business rules change — the logic elsewhere will pick it up
|
|
# automatically.
|
|
CATEGORY_PROPORTION_RULES: dict[str, CategoryRule] = {
|
|
"1.2": {"category_no": "1.1"},
|
|
"1.5": {"category_no": "1.1"},
|
|
"2.7": {"category_no": "2.1"},
|
|
"3.2": {"category_no": "3.1"},
|
|
"3.3": {"category_no": "3.1"},
|
|
"3.4": {"category_no": "3.1"},
|
|
"3.5": {"category_no": "3.1"},
|
|
"3.6": {"category_no": "3.1"},
|
|
"3.7": {"category_no": "3.1"},
|
|
"3.8": {"category_no": "3.1"},
|
|
"3.9": {"category_no": "3.1"},
|
|
"3.10": {"category_no": "3.1"},
|
|
"4.2": {"category_no": "4.1"},
|
|
"4.4": {"category_no": "4.1"},
|
|
"4.6": {"category_no": "4.1"},
|
|
"5.2": {"category_no": "5.1"},
|
|
"5.4": {"category_no": "5.3"},
|
|
"6.2": {"category_no": "6.1"},
|
|
"6.3": {"category_no": "6.1"},
|
|
"6.5": {"category_no": "6.4"},
|
|
"8.2": {"category_no": "8.1"},
|
|
"8.4": {"category_no": "8.8"},
|
|
"8.11": {"category_no": "8.8"},
|
|
"8.12": {"category_no": "8.8"},
|
|
"8.14": {"category_no": "8.8"},
|
|
"8.22": {"category_no": "8.21"},
|
|
"8.23": {"category_no": "8.21"},
|
|
"8.32": {"category_no": "8.31"},
|
|
"8.33": {"category_no": "8.8"},
|
|
"8.34": {"category_no": "8.8"},
|
|
"8.35": {"category_no": "8.31"},
|
|
"8.36": {"category_no": "8.8"},
|
|
"8.37": {"category_no": "8.39"},
|
|
"8.38": {"category_no": "8.39"},
|
|
"8.40": {"category_no": "8.39"},
|
|
"8.41": {"category_no": "8.39"},
|
|
"8.42": {"category_no": "8.39"},
|
|
"9.5": {"category_no": "9.4"},
|
|
}
|
|
|
|
|
|
def apply_category_proportion_rules(
|
|
*, category_no: Optional[str], proportion: Optional[float]
|
|
) -> tuple[Optional[str], Optional[float]]:
|
|
"""Normalize category/proportion based on pre-defined business rules."""
|
|
|
|
if not category_no:
|
|
return category_no, proportion
|
|
|
|
normalized_category = category_no
|
|
normalized_proportion = proportion
|
|
visited: set[str] = set()
|
|
|
|
# Follow chained mappings (e.g. 8.35 -> 8.31 -> ...).
|
|
while normalized_category in CATEGORY_PROPORTION_RULES and normalized_category not in visited:
|
|
visited.add(normalized_category)
|
|
rule = CATEGORY_PROPORTION_RULES[normalized_category]
|
|
|
|
target_category = rule.get("category_no")
|
|
if isinstance(target_category, str):
|
|
normalized_category = target_category
|
|
|
|
target_proportion = rule.get("proportion")
|
|
if isinstance(target_proportion, (int, float)):
|
|
normalized_proportion = float(target_proportion)
|
|
|
|
return normalized_category, normalized_proportion
|
|
|
|
|
|
def _build_category_rollup_children() -> dict[str, set[str]]:
|
|
rollups: dict[str, set[str]] = {}
|
|
for alias in CATEGORY_PROPORTION_RULES:
|
|
resolved, _ = apply_category_proportion_rules(category_no=alias, proportion=None)
|
|
if resolved and resolved != alias:
|
|
rollups.setdefault(resolved, set()).add(alias)
|
|
return rollups
|
|
|
|
|
|
CATEGORY_ROLLUP_CHILDREN = _build_category_rollup_children()
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def _fetch_joined_maximo_records(
|
|
*, session: AsyncSession, assetnum: str
|
|
) -> list[dict[str, Any]]:
|
|
"""Fetch Joined Maximo rows with a retry to mask transient collector failures."""
|
|
where_query = get_where_query_sql_all_worktype(assetnum)
|
|
|
|
JOINED_MAXIMO_SQL = text(
|
|
f"""
|
|
SELECT b.*, emp.*, a.*
|
|
FROM public.wo_maximo a
|
|
LEFT JOIN public.wo_maximo_labtrans b
|
|
ON b.wonum = a.wonum
|
|
LEFT JOIN lcc_ms_manpower emp
|
|
ON UPPER(TRIM(emp."ID Number")) = UPPER(TRIM(b.laborcode))
|
|
{where_query}
|
|
"""
|
|
)
|
|
try:
|
|
result = await session.execute(JOINED_MAXIMO_SQL)
|
|
return result.mappings().all()
|
|
except AsyncpgInterfaceError as exc:
|
|
logger.warning(
|
|
"Collector session closed while fetching Joined Maximo data for %s. Retrying once.",
|
|
assetnum,
|
|
)
|
|
try:
|
|
async with collector_async_session() as retry_session:
|
|
retry_result = await retry_session.execute(JOINED_MAXIMO_SQL)
|
|
return retry_result.mappings().all()
|
|
except Exception as retry_exc:
|
|
logger.error(
|
|
"Retrying Joined Maximo query failed for %s: %s",
|
|
assetnum,
|
|
retry_exc,
|
|
exc_info=True,
|
|
)
|
|
except SQLAlchemyError as exc:
|
|
logger.error(
|
|
"Failed to fetch Joined Maximo data for %s: %s", assetnum, exc, exc_info=True
|
|
)
|
|
except Exception as exc:
|
|
logger.exception(
|
|
"Unexpected error while fetching Joined Maximo data for %s", assetnum
|
|
)
|
|
|
|
return []
|
|
|
|
|
|
async def fetch_acquisition_cost_with_rollup(
|
|
*, db_session: DbSession, base_category_no: str
|
|
) -> tuple[Optional[AcquisitionData], Optional[float]]:
|
|
"""Return base acquisition data and aggregated cost_unit_3 for related categories."""
|
|
|
|
related_categories = {base_category_no}
|
|
related_categories.update(CATEGORY_ROLLUP_CHILDREN.get(base_category_no, set()))
|
|
|
|
if not related_categories:
|
|
return None, None
|
|
|
|
acquisition_data_query = Select(AcquisitionData).filter(
|
|
AcquisitionData.category_no.in_(tuple(related_categories))
|
|
)
|
|
acquisition_data_result = await db_session.execute(acquisition_data_query)
|
|
acquisition_records = acquisition_data_result.scalars().all()
|
|
|
|
base_record: Optional[AcquisitionData] = None
|
|
total_cost_unit_3 = 0.0
|
|
has_cost_unit = False
|
|
|
|
for record in acquisition_records:
|
|
if record.category_no == base_category_no:
|
|
base_record = record
|
|
if record.cost_unit_3 is not None:
|
|
has_cost_unit = True
|
|
total_cost_unit_3 += record.cost_unit_3
|
|
|
|
return base_record, total_cost_unit_3 if has_cost_unit else None
|
|
|
|
|
|
async def get_master_by_assetnum(
|
|
*, db_session: DbSession, collector_db_session: CollectorDbSession, assetnum: str
|
|
) -> tuple[list[EquipmentTransactionRecords], float | None]:
|
|
"""Returns master records with equipment data based on asset number."""
|
|
|
|
# First query to get equipment record
|
|
equipment_master_query = Select(EquipmentMaster).filter(
|
|
EquipmentMaster.assetnum == assetnum
|
|
)
|
|
equipment_master_result = await db_session.execute(equipment_master_query)
|
|
equipment_master_record = equipment_master_result.scalars().one_or_none()
|
|
equipment_query = Select(Equipment).filter(Equipment.assetnum == assetnum)
|
|
equipment_result = await db_session.execute(equipment_query)
|
|
equipment_record = equipment_result.scalars().one_or_none()
|
|
|
|
# Second query to get master records
|
|
master_query = (
|
|
Select(EquipmentTransactionRecords)
|
|
.join(EquipmentTransactionRecords.equipment)
|
|
.options(selectinload(EquipmentTransactionRecords.equipment))
|
|
.filter(Equipment.assetnum == assetnum)
|
|
.order_by(EquipmentTransactionRecords.tahun.asc())
|
|
)
|
|
master_result = await db_session.execute(master_query)
|
|
records = master_result.scalars().all()
|
|
|
|
# Get all yeardata
|
|
yeardata_query = Select(Yeardata)
|
|
yeardata_result = await db_session.execute(yeardata_query)
|
|
yeardata_records = yeardata_result.scalars().all()
|
|
yeardata_dict = {y.year: y for y in yeardata_records}
|
|
|
|
maximo_record = await _fetch_joined_maximo_records(
|
|
session=collector_db_session, assetnum=assetnum
|
|
)
|
|
joined_maximo_record = await _fetch_joined_maximo_records(
|
|
session=collector_db_session, assetnum=assetnum
|
|
)
|
|
|
|
# Compute asset criticality per record/year and attach to each record.
|
|
# Use safe attribute access for Yeardata; if a value or attribute is missing,
|
|
# fall back to 0 so N/A data does not raise AttributeError.
|
|
for record in records:
|
|
year = record.tahun
|
|
y = yeardata_dict.get(year)
|
|
|
|
asset_crit_ens_energy_not_served = (
|
|
getattr(y, "asset_crit_ens_energy_not_served", 0) if y is not None else 0
|
|
)
|
|
asset_crit_bpp_system = (
|
|
getattr(y, "asset_crit_bpp_system", 0) if y is not None else 0
|
|
)
|
|
asset_crit_bpp_pembangkit = (
|
|
getattr(y, "asset_crit_bpp_pembangkit", 0) if y is not None else 0
|
|
)
|
|
asset_crit_marginal_cost = (
|
|
getattr(y, "asset_crit_marginal_cost", 0) if y is not None else 0
|
|
)
|
|
asset_crit_dmn_daya_mampu_netto = (
|
|
getattr(y, "asset_crit_dmn_daya_mampu_netto", 0) if y is not None else 0
|
|
)
|
|
|
|
# Convert to floats and compute criticality safely
|
|
ens = float(asset_crit_ens_energy_not_served or 0)
|
|
bpp_system = float(asset_crit_bpp_system or 0)
|
|
bpp_pembangkit = float(asset_crit_bpp_pembangkit or 0)
|
|
marginal_cost = float(asset_crit_marginal_cost or 0)
|
|
dmn = float(asset_crit_dmn_daya_mampu_netto or 0)
|
|
|
|
extra_fuel_cost = marginal_cost - bpp_pembangkit
|
|
asset_criticality = ens * (0.07 * bpp_system) + (dmn - ens * extra_fuel_cost)
|
|
|
|
# if NaN or None, return 0
|
|
if (not joined_maximo_record or not maximo_record) or asset_criticality is None or (isinstance(asset_criticality, float) and math.isnan(asset_criticality)):
|
|
asset_criticality = 0.0
|
|
|
|
setattr(record, "asset_criticality", asset_criticality)
|
|
|
|
# Get the last actual year
|
|
last_actual_year_query = (
|
|
Select(func.max(EquipmentTransactionRecords.tahun))
|
|
.join(EquipmentTransactionRecords.equipment)
|
|
.filter(Equipment.assetnum == assetnum)
|
|
.filter(EquipmentTransactionRecords.is_actual == 1)
|
|
)
|
|
last_actual_year_result = await db_session.execute(last_actual_year_query)
|
|
last_actual_year = last_actual_year_result.scalar()
|
|
|
|
min_eac_value = equipment_record.minimum_eac if equipment_record else None
|
|
min_seq = equipment_record.minimum_eac_seq if equipment_record else None
|
|
min_eac_year = equipment_record.minimum_eac_year if equipment_record else None
|
|
|
|
|
|
min_eac_disposal_cost = next(
|
|
(record.eac_disposal_cost for record in records if record.tahun == min_eac_year),
|
|
None,
|
|
)
|
|
|
|
return (
|
|
equipment_master_record,
|
|
equipment_record,
|
|
records,
|
|
min_eac_value,
|
|
min_seq,
|
|
min_eac_year,
|
|
last_actual_year,
|
|
maximo_record,
|
|
joined_maximo_record,
|
|
min_eac_disposal_cost,
|
|
)
|
|
# return result.scalars().all()
|
|
|
|
async def get_maximo_by_assetnum(*, db_session: CollectorDbSession, assetnum: str) -> Optional[MasterBase]:
|
|
"""Return Maximo collector rows for the provided asset number."""
|
|
# default worktype; change if you need a different filtering
|
|
# worktype = "CM"
|
|
|
|
# where_worktype = (
|
|
# "AND a.worktype in ('CM', 'PROACTIVE', 'WA')"
|
|
# if worktype == "CM"
|
|
# else "AND a.worktype = :worktype"
|
|
# )
|
|
# where_wojp8 = "AND a.wojp8 != 'S1'" if worktype == "CM" else ""
|
|
|
|
# sql = f"""
|
|
# SELECT
|
|
# DATE_PART('year', a.reportdate) AS tahun,
|
|
# COUNT(a.wonum) AS raw_{worktype.lower()}_interval,
|
|
# SUM(a.actmatcost) AS raw_{worktype.lower()}_material_cost,
|
|
# ROUND(SUM(EXTRACT(EPOCH FROM (a.actfinish - a.actstart)) / 3600), 2) AS raw_{worktype.lower()}_labor_time,
|
|
# CASE WHEN COUNT(b.laborcode) = 0 THEN 3 ELSE COUNT(b.laborcode) END AS raw_{worktype.lower()}_labor_human
|
|
# FROM public.wo_maximo AS a
|
|
# LEFT JOIN public.wo_maximo_labtrans AS b ON b.wonum = a.wonum
|
|
# WHERE a.asset_unit = '3'
|
|
# {where_worktype}
|
|
# AND a.asset_assetnum = :assetnum
|
|
# AND a.wonum NOT LIKE 'T%'
|
|
# {where_wojp8}
|
|
# GROUP BY DATE_PART('year', a.reportdate);
|
|
# """
|
|
|
|
record = await _fetch_maximo_records(session=db_session, assetnum=assetnum)
|
|
return record or None
|
|
|
|
async def get_by_assetnum(*, db_session: DbSession, assetnum: str) -> Optional[Equipment]:
|
|
"""Returns a document based on the given document id."""
|
|
print("assetnum service:", assetnum)
|
|
query = Select(Equipment).filter(Equipment.assetnum == assetnum)
|
|
result = await db_session.execute(query)
|
|
return result.scalars().one_or_none()
|
|
|
|
async def get_by_id(*, db_session: DbSession, equipment_id: str) -> Optional[Equipment]:
|
|
"""Returns a document based on the given document id."""
|
|
query = Select(Equipment).filter(Equipment.id == equipment_id)
|
|
result = await db_session.execute(query)
|
|
return result.scalars().one_or_none()
|
|
|
|
|
|
async def get_all(
|
|
*, db_session: DbSession, items_per_page: int, search: str = None, common
|
|
) -> list[Equipment]:
|
|
"""Returns all documents."""
|
|
query = (
|
|
Select(Equipment)
|
|
.outerjoin(EquipmentMaster, Equipment.assetnum == EquipmentMaster.assetnum)
|
|
.options(selectinload(Equipment.equipment_master))
|
|
)
|
|
|
|
if search:
|
|
query = query.filter(
|
|
cast(Equipment.acquisition_year, String).ilike(f"%{search}%")
|
|
| cast(Equipment.assetnum, String).ilike(f"%{search}%")
|
|
| cast(EquipmentMaster.name, String).ilike(f"%{search}%")
|
|
)
|
|
common["items_per_page"] = items_per_page
|
|
result = await search_filter_sort_paginate(model=query, **common)
|
|
return result
|
|
|
|
|
|
async def get_top_10_economic_life(*, db_session: DbSession, common) -> list[Equipment]:
|
|
"""Returns top 10 economic life."""
|
|
query = (
|
|
Select(Equipment)
|
|
.join(EquipmentMaster, Equipment.assetnum == EquipmentMaster.assetnum)
|
|
.options(selectinload(Equipment.equipment_master))
|
|
)
|
|
|
|
current_year = datetime.datetime.now().year
|
|
|
|
query = (
|
|
query.add_columns(
|
|
case(
|
|
(
|
|
(Equipment.minimum_eac_year - current_year) >= 0,
|
|
(Equipment.minimum_eac_year - current_year),
|
|
),
|
|
else_=0,
|
|
).label("remaining_life")
|
|
)
|
|
.filter(Equipment.minimum_eac_year != None)
|
|
.filter((Equipment.minimum_eac != None) & (Equipment.minimum_eac != 0))
|
|
# .filter((current_year - Equipment.minimum_eac_year) >= 0)
|
|
.order_by(desc("remaining_life"))
|
|
.order_by(func.abs(Equipment.minimum_eac).desc())
|
|
)
|
|
|
|
# result = await db_session.execute(query)
|
|
result = await search_filter_sort_paginate(model=query, **common)
|
|
return result
|
|
|
|
async def get_top_10_replacement_priorities(*, db_session: DbSession, common) -> list[Equipment]:
|
|
"""Returns top 10 replacement priorities."""
|
|
query = (
|
|
Select(Equipment)
|
|
.join(EquipmentMaster, Equipment.assetnum == EquipmentMaster.assetnum)
|
|
.options(selectinload(Equipment.equipment_master))
|
|
)
|
|
|
|
current_year = datetime.datetime.now().year
|
|
|
|
# Only select rows where (current_year - Equipment.minimum_eac_year) >= 0
|
|
query = (
|
|
query.add_columns(
|
|
case(
|
|
(
|
|
(Equipment.minimum_eac_year - current_year) >= 0,
|
|
(Equipment.minimum_eac_year - current_year),
|
|
),
|
|
else_=0,
|
|
).label("remaining_life")
|
|
)
|
|
.filter(Equipment.minimum_eac_year != None)
|
|
.filter((Equipment.minimum_eac != None) & (Equipment.minimum_eac != 0))
|
|
# .filter((current_year - Equipment.minimum_eac_year) >= 0)
|
|
.order_by(asc("remaining_life"))
|
|
.order_by(func.abs(Equipment.minimum_eac).desc())
|
|
)
|
|
|
|
# result = await db_session.execute(query)
|
|
result = await search_filter_sort_paginate(model=query, **common)
|
|
return result
|
|
|
|
async def generate_all_transaction(*, db_session: DbSession, token):
|
|
"""Generate transaction for all equipments in the database based on equipments assetnum."""
|
|
query = Select(Equipment)
|
|
query_result = await db_session.execute(query)
|
|
equipments = query_result.scalars().all()
|
|
for equipment in equipments:
|
|
await main(equipment.assetnum, token, RELIABILITY_APP_URL)
|
|
return [equipment.assetnum for equipment in equipments]
|
|
|
|
|
|
async def generate_transaction(
|
|
*, db_session: DbSession, data_in: EquipmentCreate, token
|
|
):
|
|
# Delete all existing master records for this asset number and prediction data
|
|
# query = (
|
|
# Delete(EquipmentTransactionRecords)
|
|
# .where(EquipmentTransactionRecords.assetnum == data_in.assetnum)
|
|
# .where(EquipmentTransactionRecords.is_actual == 0)
|
|
# )
|
|
# await db_session.execute(query)
|
|
# await db_session.commit()
|
|
"""Generate transaction for equipment."""
|
|
# prediction = await main(data_in.assetnum, token, RELIABILITY_APP_URL)
|
|
prediction = await predict_main(assetnum=data_in.assetnum, token=token)
|
|
eac = eac_main(assetnum=data_in.assetnum)
|
|
|
|
# # Fetch data from external API
|
|
# async def fetch_api_data(assetnum: str, year: int) -> dict:
|
|
# async with httpx.AsyncClient() as client:
|
|
# try:
|
|
# response = await client.get(
|
|
# f"{os.environ.get('RELIABILITY_APP_URL')}/main/number-of-failures/{assetnum}/{year}/{year}",
|
|
# timeout=30.0,
|
|
# headers={"Authorization": f"Bearer {token}"},
|
|
# )
|
|
# response.raise_for_status()
|
|
# return response.json()
|
|
# except httpx.HTTPError as e:
|
|
# print(f"HTTP error occurred: {e}")
|
|
# return {}
|
|
|
|
# # Initialize base transaction with default values
|
|
# base_transaction = {
|
|
# "assetnum": data_in.assetnum,
|
|
# "is_actual": 0,
|
|
# "raw_cm_interval": None,
|
|
# "raw_cm_material_cost": None,
|
|
# "raw_cm_labor_time": None,
|
|
# "raw_cm_labor_human": None,
|
|
# "raw_pm_interval": None,
|
|
# "raw_pm_material_cost": None,
|
|
# "raw_pm_labor_time": None,
|
|
# "raw_pm_labor_human": None,
|
|
# "raw_predictive_labor_time": None,
|
|
# "raw_predictive_labor_human": None,
|
|
# "raw_oh_material_cost": None,
|
|
# "raw_oh_labor_time": None,
|
|
# "raw_oh_labor_human": None,
|
|
# "raw_project_task_material_cost": None,
|
|
# "raw_loss_output_MW": None,
|
|
# "raw_loss_output_price": None,
|
|
# "raw_operational_cost": None,
|
|
# "raw_maintenance_cost": None,
|
|
# "rc_cm_material_cost": None,
|
|
# "rc_cm_labor_cost": None,
|
|
# "rc_pm_material_cost": None,
|
|
# "rc_pm_labor_cost": None,
|
|
# "rc_predictive_labor_cost": None,
|
|
# "rc_oh_material_cost": None,
|
|
# "rc_oh_labor_cost": None,
|
|
# "rc_project_material_cost": None,
|
|
# "rc_lost_cost": None,
|
|
# "rc_operation_cost": None,
|
|
# "rc_maintenance_cost": None,
|
|
# "rc_total_cost": None,
|
|
# "eac_npv": None,
|
|
# "eac_annual_mnt_cost": None,
|
|
# "eac_annual_acq_cost": None,
|
|
# "eac_eac": None,
|
|
# }
|
|
|
|
# transactions = []
|
|
|
|
# # Query existing records with is_actual=1
|
|
# actual_years_query = (
|
|
# Select(EquipmentTransactionRecords.tahun)
|
|
# .filter(EquipmentTransactionRecords.assetnum == data_in.assetnum)
|
|
# .filter(EquipmentTransactionRecords.is_actual == 1)
|
|
# )
|
|
# result = await db_session.execute(actual_years_query)
|
|
# actual_years = {row[0] for row in result.all()}
|
|
|
|
# for sequence, year in enumerate(
|
|
# range(data_in.acquisition_year - 1, data_in.forecasting_target_year + 1), 0
|
|
# ):
|
|
# # Skip if year already has actual data
|
|
# if year in actual_years:
|
|
# continue
|
|
|
|
# transaction = base_transaction.copy()
|
|
# # Update values from API
|
|
# api_data = await fetch_api_data(data_in.assetnum, year)
|
|
|
|
# if api_data:
|
|
# # # Get current num_fail
|
|
# current_num_fail = api_data["data"][0]["num_fail"]
|
|
|
|
# # # Calculate sum of previous failures for this asset
|
|
# # previous_failures_query = (
|
|
# # Select(func.sum(EquipmentTransactionRecords.raw_cm_interval))
|
|
# # .filter(EquipmentTransactionRecords.assetnum == data_in.assetnum)
|
|
# # .filter(EquipmentTransactionRecords.tahun < year)
|
|
# # )
|
|
# # previous_failures_result = await db_session.execute(previous_failures_query)
|
|
# # previous_failures_sum = previous_failures_result.scalar() or 0
|
|
|
|
# # # Update with current minus sum of previous
|
|
# # transaction.update({"raw_cm_interval": current_num_fail - previous_failures_sum})
|
|
# transaction.update({"raw_cm_interval": current_num_fail})
|
|
# transaction.update({"tahun": int(year), "seq": int(sequence)})
|
|
# transactions.append(EquipmentTransactionRecords(**transaction))
|
|
|
|
# db_session.add_all(transactions)
|
|
# await db_session.commit()
|
|
|
|
# # Return the number of transactions created
|
|
# return len(transactions)
|
|
return prediction, eac
|
|
|
|
|
|
async def create(*, db_session: DbSession, equipment_in: EquipmentCreate, token):
|
|
"""Creates a new document."""
|
|
equipment = Equipment(**equipment_in.model_dump())
|
|
db_session.add(equipment)
|
|
await db_session.commit()
|
|
# await generate_transaction(db_session=db_session, data_in=equipment_in, token=token)
|
|
return equipment
|
|
|
|
|
|
async def update(
|
|
*, db_session: DbSession, equipment: Equipment, equipment_in: EquipmentUpdate, token
|
|
):
|
|
"""Updates a document and re-simulates transaction for the asset."""
|
|
|
|
# capture original assetnum (optional use)
|
|
old_assetnum = equipment.assetnum
|
|
|
|
data = equipment_in.model_dump()
|
|
update_data = equipment_in.model_dump(exclude_defaults=True)
|
|
|
|
# Check if proportion from AcquisitionData changed and recalculate acquisition_cost
|
|
if "proportion" in update_data or "category_no" in update_data:
|
|
category_no = update_data.get("category_no", equipment.category_no)
|
|
proportion = update_data.get("proportion", equipment.proportion)
|
|
|
|
print(f"DEBUG: Detected change - category_no={category_no}, proportion={proportion}")
|
|
|
|
resolved_category_no, resolved_proportion = apply_category_proportion_rules(
|
|
category_no=category_no, proportion=proportion
|
|
)
|
|
|
|
if resolved_category_no != category_no:
|
|
print(
|
|
"DEBUG: category alias rule applied - "
|
|
f"{category_no} -> {resolved_category_no}"
|
|
)
|
|
|
|
effective_category_no = resolved_category_no or category_no
|
|
|
|
if resolved_proportion is not None and resolved_proportion != proportion:
|
|
print(
|
|
"DEBUG: proportion overridden by rule - "
|
|
f"{proportion} -> {resolved_proportion}"
|
|
)
|
|
proportion = resolved_proportion
|
|
update_data["proportion"] = resolved_proportion
|
|
|
|
if not effective_category_no:
|
|
print("DEBUG: Missing category_no after applying rules; skip cost update")
|
|
else:
|
|
acquisition_data, aggregated_cost_unit_3 = await fetch_acquisition_cost_with_rollup(
|
|
db_session=db_session, base_category_no=effective_category_no
|
|
)
|
|
|
|
print(f"DEBUG: AcquisitionData found: {acquisition_data is not None}")
|
|
if acquisition_data:
|
|
print(f"DEBUG: base cost_unit_3={acquisition_data.cost_unit_3}")
|
|
|
|
related_categories = {effective_category_no}
|
|
related_categories.update(
|
|
CATEGORY_ROLLUP_CHILDREN.get(effective_category_no, set())
|
|
)
|
|
print(
|
|
"DEBUG: Aggregated categories="
|
|
f"{sorted(related_categories)} cost_unit_3={aggregated_cost_unit_3}"
|
|
)
|
|
|
|
if aggregated_cost_unit_3 is not None and proportion is not None:
|
|
new_acquisition_cost = (proportion * 0.01) * aggregated_cost_unit_3
|
|
print(f"DEBUG: Calculated new_acquisition_cost={new_acquisition_cost}")
|
|
equipment.acquisition_cost = new_acquisition_cost
|
|
update_data["acquisition_cost"] = new_acquisition_cost
|
|
elif aggregated_cost_unit_3 is None:
|
|
print("DEBUG: No cost_unit_3 available across related categories")
|
|
else:
|
|
print("DEBUG: Proportion missing; acquisition cost not updated")
|
|
|
|
for field in data:
|
|
if field in update_data:
|
|
setattr(equipment, field, update_data[field])
|
|
|
|
await db_session.commit()
|
|
|
|
# prepare a clean dict of attributes for return / recreate input model
|
|
updated_data = {k: v for k, v in vars(equipment).items() if not k.startswith("_")}
|
|
|
|
# Re-run generate_transaction for this equipment's assetnum.
|
|
# Build an EquipmentCreate from the updated SQLAlchemy object and call the generator.
|
|
try:
|
|
equipment_create = EquipmentCreate(**updated_data)
|
|
await generate_transaction(db_session=db_session, data_in=equipment_create, token=token)
|
|
except Exception as e:
|
|
# don't break the update if resimulation fails — log/print for visibility
|
|
print(f"Resimulation failed for assetnum {updated_data.get('assetnum')}: {e}")
|
|
|
|
return updated_data
|
|
|
|
|
|
async def delete(*, db_session: DbSession, equipment_id: str):
|
|
"""Deletes a document."""
|
|
query = Delete(Equipment).where(Equipment.id == equipment_id)
|
|
await db_session.execute(query)
|
|
await db_session.commit()
|