You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

679 lines
26 KiB
Python

import os
import logging
from typing import Optional, TypedDict, Any
from sqlalchemy import Select, Delete, Float, func, cast, String, text, case, asc, desc
from sqlalchemy.orm import selectinload
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from asyncpg.exceptions import InterfaceError as AsyncpgInterfaceError
from src.database.service import search_filter_sort_paginate
from src.equipment.model import Equipment, EquipmentTransactionRecords
from src.acquisition_cost.model import AcquisitionData
from src.yeardata.model import Yeardata
from ..equipment_master.model import EquipmentMaster
from .schema import EquipmentCreate, EquipmentUpdate, MasterBase
from src.database.core import DbSession, CollectorDbSession, collector_async_session
from src.auth.service import CurrentUser
from src.config import RELIABILITY_APP_URL
import httpx
from src.modules.equipment.run import main
from src.modules.equipment.Prediksi import main as predict_main
from src.modules.equipment.Eac import main as eac_main
from src.modules.equipment.where_query_sql import get_where_query_sql_all_worktype
import datetime
import math
class CategoryRule(TypedDict, total=False):
category_no: str
proportion: float
# Rules extracted from the provided proportion/category mapping sheet.
# Each entry tells the service to reuse the acquisition data (and optionally
# override the proportion) of the referenced category. Update the mapping below
# whenever business rules change — the logic elsewhere will pick it up
# automatically.
CATEGORY_PROPORTION_RULES: dict[str, CategoryRule] = {
"1.2": {"category_no": "1.1"},
"1.5": {"category_no": "1.1"},
"2.7": {"category_no": "2.1"},
"3.2": {"category_no": "3.1"},
"3.3": {"category_no": "3.1"},
"3.4": {"category_no": "3.1"},
"3.5": {"category_no": "3.1"},
"3.6": {"category_no": "3.1"},
"3.7": {"category_no": "3.1"},
"3.8": {"category_no": "3.1"},
"3.9": {"category_no": "3.1"},
"3.10": {"category_no": "3.1"},
"4.2": {"category_no": "4.1"},
"4.4": {"category_no": "4.1"},
"4.6": {"category_no": "4.1"},
"5.2": {"category_no": "5.1"},
"5.4": {"category_no": "5.3"},
"6.2": {"category_no": "6.1"},
"6.3": {"category_no": "6.1"},
"6.5": {"category_no": "6.4"},
"8.2": {"category_no": "8.1"},
"8.4": {"category_no": "8.8"},
"8.11": {"category_no": "8.8"},
"8.12": {"category_no": "8.8"},
"8.14": {"category_no": "8.8"},
"8.22": {"category_no": "8.21"},
"8.23": {"category_no": "8.21"},
"8.32": {"category_no": "8.31"},
"8.33": {"category_no": "8.8"},
"8.34": {"category_no": "8.8"},
"8.35": {"category_no": "8.31"},
"8.36": {"category_no": "8.8"},
"8.37": {"category_no": "8.39"},
"8.38": {"category_no": "8.39"},
"8.40": {"category_no": "8.39"},
"8.41": {"category_no": "8.39"},
"8.42": {"category_no": "8.39"},
"9.5": {"category_no": "9.4"},
}
def apply_category_proportion_rules(
*, category_no: Optional[str], proportion: Optional[float]
) -> tuple[Optional[str], Optional[float]]:
"""Normalize category/proportion based on pre-defined business rules."""
if not category_no:
return category_no, proportion
normalized_category = category_no
normalized_proportion = proportion
visited: set[str] = set()
# Follow chained mappings (e.g. 8.35 -> 8.31 -> ...).
while normalized_category in CATEGORY_PROPORTION_RULES and normalized_category not in visited:
visited.add(normalized_category)
rule = CATEGORY_PROPORTION_RULES[normalized_category]
target_category = rule.get("category_no")
if isinstance(target_category, str):
normalized_category = target_category
target_proportion = rule.get("proportion")
if isinstance(target_proportion, (int, float)):
normalized_proportion = float(target_proportion)
return normalized_category, normalized_proportion
def _build_category_rollup_children() -> dict[str, set[str]]:
rollups: dict[str, set[str]] = {}
for alias in CATEGORY_PROPORTION_RULES:
resolved, _ = apply_category_proportion_rules(category_no=alias, proportion=None)
if resolved and resolved != alias:
rollups.setdefault(resolved, set()).add(alias)
return rollups
CATEGORY_ROLLUP_CHILDREN = _build_category_rollup_children()
logger = logging.getLogger(__name__)
async def _fetch_joined_maximo_records(
*, session: AsyncSession, assetnum: str
) -> list[dict[str, Any]]:
"""Fetch Joined Maximo rows with a retry to mask transient collector failures."""
where_query = get_where_query_sql_all_worktype(assetnum)
JOINED_MAXIMO_SQL = text(
f"""
SELECT b.*, emp.*, a.*
FROM public.wo_maximo a
LEFT JOIN public.wo_maximo_labtrans b
ON b.wonum = a.wonum
LEFT JOIN lcc_ms_manpower emp
ON UPPER(TRIM(emp."ID Number")) = UPPER(TRIM(b.laborcode))
{where_query}
"""
)
try:
result = await session.execute(JOINED_MAXIMO_SQL)
return result.mappings().all()
except AsyncpgInterfaceError as exc:
logger.warning(
"Collector session closed while fetching Joined Maximo data for %s. Retrying once.",
assetnum,
)
try:
async with collector_async_session() as retry_session:
retry_result = await retry_session.execute(JOINED_MAXIMO_SQL)
return retry_result.mappings().all()
except Exception as retry_exc:
logger.error(
"Retrying Joined Maximo query failed for %s: %s",
assetnum,
retry_exc,
exc_info=True,
)
except SQLAlchemyError as exc:
logger.error(
"Failed to fetch Joined Maximo data for %s: %s", assetnum, exc, exc_info=True
)
except Exception as exc:
logger.exception(
"Unexpected error while fetching Joined Maximo data for %s", assetnum
)
return []
async def fetch_acquisition_cost_with_rollup(
*, db_session: DbSession, base_category_no: str
) -> tuple[Optional[AcquisitionData], Optional[float]]:
"""Return base acquisition data and aggregated cost_unit_3 for related categories."""
related_categories = {base_category_no}
related_categories.update(CATEGORY_ROLLUP_CHILDREN.get(base_category_no, set()))
if not related_categories:
return None, None
acquisition_data_query = Select(AcquisitionData).filter(
AcquisitionData.category_no.in_(tuple(related_categories))
)
acquisition_data_result = await db_session.execute(acquisition_data_query)
acquisition_records = acquisition_data_result.scalars().all()
base_record: Optional[AcquisitionData] = None
total_cost_unit_3 = 0.0
has_cost_unit = False
for record in acquisition_records:
if record.category_no == base_category_no:
base_record = record
if record.cost_unit_3 is not None:
has_cost_unit = True
total_cost_unit_3 += record.cost_unit_3
return base_record, total_cost_unit_3 if has_cost_unit else None
async def get_master_by_assetnum(
*, db_session: DbSession, collector_db_session: CollectorDbSession, assetnum: str
) -> tuple[list[EquipmentTransactionRecords], float | None]:
"""Returns master records with equipment data based on asset number."""
# First query to get equipment record
equipment_master_query = Select(EquipmentMaster).filter(
EquipmentMaster.assetnum == assetnum
)
equipment_master_result = await db_session.execute(equipment_master_query)
equipment_master_record = equipment_master_result.scalars().one_or_none()
equipment_query = Select(Equipment).filter(Equipment.assetnum == assetnum)
equipment_result = await db_session.execute(equipment_query)
equipment_record = equipment_result.scalars().one_or_none()
# Second query to get master records
master_query = (
Select(EquipmentTransactionRecords)
.join(EquipmentTransactionRecords.equipment)
.options(selectinload(EquipmentTransactionRecords.equipment))
.filter(Equipment.assetnum == assetnum)
.order_by(EquipmentTransactionRecords.tahun.asc())
)
master_result = await db_session.execute(master_query)
records = master_result.scalars().all()
# Get all yeardata
yeardata_query = Select(Yeardata)
yeardata_result = await db_session.execute(yeardata_query)
yeardata_records = yeardata_result.scalars().all()
yeardata_dict = {y.year: y for y in yeardata_records}
maximo_record = await _fetch_joined_maximo_records(
session=collector_db_session, assetnum=assetnum
)
joined_maximo_record = await _fetch_joined_maximo_records(
session=collector_db_session, assetnum=assetnum
)
# Compute asset criticality per record/year and attach to each record.
# Use safe attribute access for Yeardata; if a value or attribute is missing,
# fall back to 0 so N/A data does not raise AttributeError.
for record in records:
year = record.tahun
y = yeardata_dict.get(year)
asset_crit_ens_energy_not_served = (
getattr(y, "asset_crit_ens_energy_not_served", 0) if y is not None else 0
)
asset_crit_bpp_system = (
getattr(y, "asset_crit_bpp_system", 0) if y is not None else 0
)
asset_crit_bpp_pembangkit = (
getattr(y, "asset_crit_bpp_pembangkit", 0) if y is not None else 0
)
asset_crit_marginal_cost = (
getattr(y, "asset_crit_marginal_cost", 0) if y is not None else 0
)
asset_crit_dmn_daya_mampu_netto = (
getattr(y, "asset_crit_dmn_daya_mampu_netto", 0) if y is not None else 0
)
# Convert to floats and compute criticality safely
ens = float(asset_crit_ens_energy_not_served or 0)
bpp_system = float(asset_crit_bpp_system or 0)
bpp_pembangkit = float(asset_crit_bpp_pembangkit or 0)
marginal_cost = float(asset_crit_marginal_cost or 0)
dmn = float(asset_crit_dmn_daya_mampu_netto or 0)
extra_fuel_cost = marginal_cost - bpp_pembangkit
asset_criticality = ens * (0.07 * bpp_system) + (dmn - ens * extra_fuel_cost)
# if NaN or None, return 0
if (not joined_maximo_record or not maximo_record) or asset_criticality is None or (isinstance(asset_criticality, float) and math.isnan(asset_criticality)):
asset_criticality = 0.0
setattr(record, "asset_criticality", asset_criticality)
# Get the last actual year
last_actual_year_query = (
Select(func.max(EquipmentTransactionRecords.tahun))
.join(EquipmentTransactionRecords.equipment)
.filter(Equipment.assetnum == assetnum)
.filter(EquipmentTransactionRecords.is_actual == 1)
)
last_actual_year_result = await db_session.execute(last_actual_year_query)
last_actual_year = last_actual_year_result.scalar()
min_eac_value = equipment_record.minimum_eac if equipment_record else None
min_seq = equipment_record.minimum_eac_seq if equipment_record else None
min_eac_year = equipment_record.minimum_eac_year if equipment_record else None
min_eac_disposal_cost = next(
(record.eac_disposal_cost for record in records if record.tahun == min_eac_year),
None,
)
return (
equipment_master_record,
equipment_record,
records,
min_eac_value,
min_seq,
min_eac_year,
last_actual_year,
maximo_record,
joined_maximo_record,
min_eac_disposal_cost,
)
# return result.scalars().all()
async def get_maximo_by_assetnum(*, db_session: CollectorDbSession, assetnum: str) -> Optional[MasterBase]:
"""Return Maximo collector rows for the provided asset number."""
# default worktype; change if you need a different filtering
# worktype = "CM"
# where_worktype = (
# "AND a.worktype in ('CM', 'PROACTIVE', 'WA')"
# if worktype == "CM"
# else "AND a.worktype = :worktype"
# )
# where_wojp8 = "AND a.wojp8 != 'S1'" if worktype == "CM" else ""
# sql = f"""
# SELECT
# DATE_PART('year', a.reportdate) AS tahun,
# COUNT(a.wonum) AS raw_{worktype.lower()}_interval,
# SUM(a.actmatcost) AS raw_{worktype.lower()}_material_cost,
# ROUND(SUM(EXTRACT(EPOCH FROM (a.actfinish - a.actstart)) / 3600), 2) AS raw_{worktype.lower()}_labor_time,
# CASE WHEN COUNT(b.laborcode) = 0 THEN 3 ELSE COUNT(b.laborcode) END AS raw_{worktype.lower()}_labor_human
# FROM public.wo_maximo AS a
# LEFT JOIN public.wo_maximo_labtrans AS b ON b.wonum = a.wonum
# WHERE a.asset_unit = '3'
# {where_worktype}
# AND a.asset_assetnum = :assetnum
# AND a.wonum NOT LIKE 'T%'
# {where_wojp8}
# GROUP BY DATE_PART('year', a.reportdate);
# """
record = await _fetch_maximo_records(session=db_session, assetnum=assetnum)
return record or None
async def get_by_assetnum(*, db_session: DbSession, assetnum: str) -> Optional[Equipment]:
"""Returns a document based on the given document id."""
print("assetnum service:", assetnum)
query = Select(Equipment).filter(Equipment.assetnum == assetnum)
result = await db_session.execute(query)
return result.scalars().one_or_none()
async def get_by_id(*, db_session: DbSession, equipment_id: str) -> Optional[Equipment]:
"""Returns a document based on the given document id."""
query = Select(Equipment).filter(Equipment.id == equipment_id)
result = await db_session.execute(query)
return result.scalars().one_or_none()
async def get_all(
*, db_session: DbSession, items_per_page: int, search: str = None, common
) -> list[Equipment]:
"""Returns all documents."""
query = (
Select(Equipment)
.join(EquipmentMaster, Equipment.assetnum == EquipmentMaster.assetnum)
.options(selectinload(Equipment.equipment_master))
)
if search:
query = query.filter(
cast(Equipment.acquisition_year, String).ilike(f"%{search}%")
| cast(Equipment.assetnum, String).ilike(f"%{search}%")
| cast(EquipmentMaster.name, String).ilike(f"%{search}%")
)
common["items_per_page"] = items_per_page
result = await search_filter_sort_paginate(model=query, **common)
return result
async def get_top_10_economic_life(*, db_session: DbSession, common) -> list[Equipment]:
"""Returns top 10 economic life."""
query = (
Select(Equipment)
.join(EquipmentMaster, Equipment.assetnum == EquipmentMaster.assetnum)
.options(selectinload(Equipment.equipment_master))
)
current_year = datetime.datetime.now().year
query = (
query.add_columns(
case(
(
(Equipment.minimum_eac_year - current_year) >= 0,
(Equipment.minimum_eac_year - current_year),
),
else_=0,
).label("remaining_life")
)
.filter(Equipment.minimum_eac_year != None)
.filter((Equipment.minimum_eac != None) & (Equipment.minimum_eac != 0))
# .filter((current_year - Equipment.minimum_eac_year) >= 0)
.order_by(desc("remaining_life"))
.order_by(func.abs(Equipment.minimum_eac).desc())
)
# result = await db_session.execute(query)
result = await search_filter_sort_paginate(model=query, **common)
return result
async def get_top_10_replacement_priorities(*, db_session: DbSession, common) -> list[Equipment]:
"""Returns top 10 replacement priorities."""
query = (
Select(Equipment)
.join(EquipmentMaster, Equipment.assetnum == EquipmentMaster.assetnum)
.options(selectinload(Equipment.equipment_master))
)
current_year = datetime.datetime.now().year
# Only select rows where (current_year - Equipment.minimum_eac_year) >= 0
query = (
query.add_columns(
case(
(
(Equipment.minimum_eac_year - current_year) >= 0,
(Equipment.minimum_eac_year - current_year),
),
else_=0,
).label("remaining_life")
)
.filter(Equipment.minimum_eac_year != None)
.filter((Equipment.minimum_eac != None) & (Equipment.minimum_eac != 0))
# .filter((current_year - Equipment.minimum_eac_year) >= 0)
.order_by(asc("remaining_life"))
.order_by(func.abs(Equipment.minimum_eac).desc())
)
# result = await db_session.execute(query)
result = await search_filter_sort_paginate(model=query, **common)
return result
async def generate_all_transaction(*, db_session: DbSession, token):
"""Generate transaction for all equipments in the database based on equipments assetnum."""
query = Select(Equipment)
query_result = await db_session.execute(query)
equipments = query_result.scalars().all()
for equipment in equipments:
await main(equipment.assetnum, token, RELIABILITY_APP_URL)
return [equipment.assetnum for equipment in equipments]
async def generate_transaction(
*, db_session: DbSession, data_in: EquipmentCreate, token
):
# Delete all existing master records for this asset number and prediction data
# query = (
# Delete(EquipmentTransactionRecords)
# .where(EquipmentTransactionRecords.assetnum == data_in.assetnum)
# .where(EquipmentTransactionRecords.is_actual == 0)
# )
# await db_session.execute(query)
# await db_session.commit()
"""Generate transaction for equipment."""
# prediction = await main(data_in.assetnum, token, RELIABILITY_APP_URL)
prediction = await predict_main(assetnum=data_in.assetnum, token=token)
eac = eac_main(assetnum=data_in.assetnum)
# # Fetch data from external API
# async def fetch_api_data(assetnum: str, year: int) -> dict:
# async with httpx.AsyncClient() as client:
# try:
# response = await client.get(
# f"{os.environ.get('RELIABILITY_APP_URL')}/main/number-of-failures/{assetnum}/{year}/{year}",
# timeout=30.0,
# headers={"Authorization": f"Bearer {token}"},
# )
# response.raise_for_status()
# return response.json()
# except httpx.HTTPError as e:
# print(f"HTTP error occurred: {e}")
# return {}
# # Initialize base transaction with default values
# base_transaction = {
# "assetnum": data_in.assetnum,
# "is_actual": 0,
# "raw_cm_interval": None,
# "raw_cm_material_cost": None,
# "raw_cm_labor_time": None,
# "raw_cm_labor_human": None,
# "raw_pm_interval": None,
# "raw_pm_material_cost": None,
# "raw_pm_labor_time": None,
# "raw_pm_labor_human": None,
# "raw_predictive_labor_time": None,
# "raw_predictive_labor_human": None,
# "raw_oh_material_cost": None,
# "raw_oh_labor_time": None,
# "raw_oh_labor_human": None,
# "raw_project_task_material_cost": None,
# "raw_loss_output_MW": None,
# "raw_loss_output_price": None,
# "raw_operational_cost": None,
# "raw_maintenance_cost": None,
# "rc_cm_material_cost": None,
# "rc_cm_labor_cost": None,
# "rc_pm_material_cost": None,
# "rc_pm_labor_cost": None,
# "rc_predictive_labor_cost": None,
# "rc_oh_material_cost": None,
# "rc_oh_labor_cost": None,
# "rc_project_material_cost": None,
# "rc_lost_cost": None,
# "rc_operation_cost": None,
# "rc_maintenance_cost": None,
# "rc_total_cost": None,
# "eac_npv": None,
# "eac_annual_mnt_cost": None,
# "eac_annual_acq_cost": None,
# "eac_eac": None,
# }
# transactions = []
# # Query existing records with is_actual=1
# actual_years_query = (
# Select(EquipmentTransactionRecords.tahun)
# .filter(EquipmentTransactionRecords.assetnum == data_in.assetnum)
# .filter(EquipmentTransactionRecords.is_actual == 1)
# )
# result = await db_session.execute(actual_years_query)
# actual_years = {row[0] for row in result.all()}
# for sequence, year in enumerate(
# range(data_in.acquisition_year - 1, data_in.forecasting_target_year + 1), 0
# ):
# # Skip if year already has actual data
# if year in actual_years:
# continue
# transaction = base_transaction.copy()
# # Update values from API
# api_data = await fetch_api_data(data_in.assetnum, year)
# if api_data:
# # # Get current num_fail
# current_num_fail = api_data["data"][0]["num_fail"]
# # # Calculate sum of previous failures for this asset
# # previous_failures_query = (
# # Select(func.sum(EquipmentTransactionRecords.raw_cm_interval))
# # .filter(EquipmentTransactionRecords.assetnum == data_in.assetnum)
# # .filter(EquipmentTransactionRecords.tahun < year)
# # )
# # previous_failures_result = await db_session.execute(previous_failures_query)
# # previous_failures_sum = previous_failures_result.scalar() or 0
# # # Update with current minus sum of previous
# # transaction.update({"raw_cm_interval": current_num_fail - previous_failures_sum})
# transaction.update({"raw_cm_interval": current_num_fail})
# transaction.update({"tahun": int(year), "seq": int(sequence)})
# transactions.append(EquipmentTransactionRecords(**transaction))
# db_session.add_all(transactions)
# await db_session.commit()
# # Return the number of transactions created
# return len(transactions)
return prediction, eac
async def create(*, db_session: DbSession, equipment_in: EquipmentCreate, token):
"""Creates a new document."""
equipment = Equipment(**equipment_in.model_dump())
db_session.add(equipment)
await db_session.commit()
# await generate_transaction(db_session=db_session, data_in=equipment_in, token=token)
return equipment
async def update(
*, db_session: DbSession, equipment: Equipment, equipment_in: EquipmentUpdate, token
):
"""Updates a document and re-simulates transaction for the asset."""
# capture original assetnum (optional use)
old_assetnum = equipment.assetnum
data = equipment_in.model_dump()
update_data = equipment_in.model_dump(exclude_defaults=True)
# Check if proportion from AcquisitionData changed and recalculate acquisition_cost
if "proportion" in update_data or "category_no" in update_data:
category_no = update_data.get("category_no", equipment.category_no)
proportion = update_data.get("proportion", equipment.proportion)
print(f"DEBUG: Detected change - category_no={category_no}, proportion={proportion}")
resolved_category_no, resolved_proportion = apply_category_proportion_rules(
category_no=category_no, proportion=proportion
)
if resolved_category_no != category_no:
print(
"DEBUG: category alias rule applied - "
f"{category_no} -> {resolved_category_no}"
)
effective_category_no = resolved_category_no or category_no
if resolved_proportion is not None and resolved_proportion != proportion:
print(
"DEBUG: proportion overridden by rule - "
f"{proportion} -> {resolved_proportion}"
)
proportion = resolved_proportion
update_data["proportion"] = resolved_proportion
if not effective_category_no:
print("DEBUG: Missing category_no after applying rules; skip cost update")
else:
acquisition_data, aggregated_cost_unit_3 = await fetch_acquisition_cost_with_rollup(
db_session=db_session, base_category_no=effective_category_no
)
print(f"DEBUG: AcquisitionData found: {acquisition_data is not None}")
if acquisition_data:
print(f"DEBUG: base cost_unit_3={acquisition_data.cost_unit_3}")
related_categories = {effective_category_no}
related_categories.update(
CATEGORY_ROLLUP_CHILDREN.get(effective_category_no, set())
)
print(
"DEBUG: Aggregated categories="
f"{sorted(related_categories)} cost_unit_3={aggregated_cost_unit_3}"
)
if aggregated_cost_unit_3 is not None and proportion is not None:
new_acquisition_cost = (proportion * 0.01) * aggregated_cost_unit_3
print(f"DEBUG: Calculated new_acquisition_cost={new_acquisition_cost}")
equipment.acquisition_cost = new_acquisition_cost
update_data["acquisition_cost"] = new_acquisition_cost
elif aggregated_cost_unit_3 is None:
print("DEBUG: No cost_unit_3 available across related categories")
else:
print("DEBUG: Proportion missing; acquisition cost not updated")
for field in data:
if field in update_data:
setattr(equipment, field, update_data[field])
await db_session.commit()
# prepare a clean dict of attributes for return / recreate input model
updated_data = {k: v for k, v in vars(equipment).items() if not k.startswith("_")}
# Re-run generate_transaction for this equipment's assetnum.
# Build an EquipmentCreate from the updated SQLAlchemy object and call the generator.
try:
equipment_create = EquipmentCreate(**updated_data)
await generate_transaction(db_session=db_session, data_in=equipment_create, token=token)
except Exception as e:
# don't break the update if resimulation fails — log/print for visibility
print(f"Resimulation failed for assetnum {updated_data.get('assetnum')}: {e}")
return updated_data
async def delete(*, db_session: DbSession, equipment_id: str):
"""Deletes a document."""
query = Delete(Equipment).where(Equipment.id == equipment_id)
await db_session.execute(query)
await db_session.commit()