import os from sqlalchemy import Select, Delete, Float, func, cast, String from sqlalchemy.orm import selectinload from src.database.service import search_filter_sort_paginate from src.equipment.model import Equipment, EquipmentTransactionRecords from src.yeardata.model import Yeardata from ..equipment_master.model import EquipmentMaster from .schema import EquipmentCreate, EquipmentUpdate, MasterBase from typing import Optional from src.database.core import DbSession from src.auth.service import CurrentUser from src.config import RELIABILITY_APP_URL import httpx from src.modules.equipment.run import main import datetime import math from sqlalchemy import text async def get_master_by_assetnum( *, db_session: DbSession, assetnum: str ) -> tuple[list[EquipmentTransactionRecords], float | None]: """Returns master records with equipment data based on asset number.""" # First query to get equipment record equipment_master_query = Select(EquipmentMaster).filter( EquipmentMaster.assetnum == assetnum ) equipment_master_result = await db_session.execute(equipment_master_query) equipment_master_record = equipment_master_result.scalars().one_or_none() equipment_query = Select(Equipment).filter(Equipment.assetnum == assetnum) equipment_result = await db_session.execute(equipment_query) equipment_record = equipment_result.scalars().one_or_none() # Second query to get master records master_query = ( Select(EquipmentTransactionRecords) .join(EquipmentTransactionRecords.equipment) .options(selectinload(EquipmentTransactionRecords.equipment)) .filter(Equipment.assetnum == assetnum) .order_by(EquipmentTransactionRecords.tahun.asc()) ) master_result = await db_session.execute(master_query) records = master_result.scalars().all() # Get all yeardata yeardata_query = Select(Yeardata) yeardata_result = await db_session.execute(yeardata_query) yeardata_records = yeardata_result.scalars().all() yeardata_dict = {y.year: y for y in yeardata_records} # Compute asset criticality per record/year and attach to each record. # Use safe attribute access for Yeardata; if a value or attribute is missing, # fall back to 0 so N/A data does not raise AttributeError. for record in records: year = record.tahun y = yeardata_dict.get(year) asset_crit_ens_energy_not_served = ( getattr(y, "asset_crit_ens_energy_not_served", 0) if y is not None else 0 ) asset_crit_bpp_system = ( getattr(y, "asset_crit_bpp_system", 0) if y is not None else 0 ) asset_crit_bpp_pembangkit = ( getattr(y, "asset_crit_bpp_pembangkit", 0) if y is not None else 0 ) asset_crit_marginal_cost = ( getattr(y, "asset_crit_marginal_cost", 0) if y is not None else 0 ) asset_crit_dmn_daya_mampu_netto = ( getattr(y, "asset_crit_dmn_daya_mampu_netto", 0) if y is not None else 0 ) # Convert to floats and compute criticality safely ens = float(asset_crit_ens_energy_not_served or 0) bpp_system = float(asset_crit_bpp_system or 0) bpp_pembangkit = float(asset_crit_bpp_pembangkit or 0) marginal_cost = float(asset_crit_marginal_cost or 0) dmn = float(asset_crit_dmn_daya_mampu_netto or 0) extra_fuel_cost = marginal_cost - bpp_pembangkit asset_criticality = ens * (0.07 * bpp_system) + (dmn - ens * extra_fuel_cost) # if NaN or None, return 0 if asset_criticality is None or (isinstance(asset_criticality, float) and math.isnan(asset_criticality)): asset_criticality = 0.0 setattr(record, "asset_criticality", asset_criticality) # Get the last actual year last_actual_year_query = ( Select(func.max(EquipmentTransactionRecords.tahun)) .join(EquipmentTransactionRecords.equipment) .filter(Equipment.assetnum == assetnum) .filter(EquipmentTransactionRecords.is_actual == 1) ) last_actual_year_result = await db_session.execute(last_actual_year_query) last_actual_year = last_actual_year_result.scalar() # Third query specifically for minimum eac_eac min_query = ( Select(func.min(func.cast(EquipmentTransactionRecords.eac_eac, Float)), EquipmentTransactionRecords.seq) .join(EquipmentTransactionRecords.equipment) .filter(Equipment.assetnum == assetnum) .group_by(EquipmentTransactionRecords.seq) .order_by(func.min(func.cast(EquipmentTransactionRecords.eac_eac, Float))) .limit(1) ) min_result = await db_session.execute(min_query) min_record = min_result.first() min_eac_value = ( float(min_record[0]) if min_record and min_record[0] is not None else None ) min_seq = min_record[1] if min_record else None return ( equipment_master_record, equipment_record, records, min_eac_value, min_seq, last_actual_year, ) # return result.scalars().all() async def get_maximo_by_assetnum(*, db_session: DbSession, assetnum: str) -> Optional[MasterBase]: """Returns a document based on the given document id.""" # default worktype; change if you need a different filtering # worktype = "CM" # where_worktype = ( # "AND a.worktype in ('CM', 'PROACTIVE', 'WA')" # if worktype == "CM" # else "AND a.worktype = :worktype" # ) # where_wojp8 = "AND a.wojp8 != 'S1'" if worktype == "CM" else "" # sql = f""" # SELECT # DATE_PART('year', a.reportdate) AS tahun, # COUNT(a.wonum) AS raw_{worktype.lower()}_interval, # SUM(a.actmatcost) AS raw_{worktype.lower()}_material_cost, # ROUND(SUM(EXTRACT(EPOCH FROM (a.actfinish - a.actstart)) / 3600), 2) AS raw_{worktype.lower()}_labor_time, # CASE WHEN COUNT(b.laborcode) = 0 THEN 3 ELSE COUNT(b.laborcode) END AS raw_{worktype.lower()}_labor_human # FROM public.wo_maximo AS a # LEFT JOIN public.wo_maximo_labtrans AS b ON b.wonum = a.wonum # WHERE a.asset_unit = '3' # {where_worktype} # AND a.asset_assetnum = :assetnum # AND a.wonum NOT LIKE 'T%' # {where_wojp8} # GROUP BY DATE_PART('year', a.reportdate); # """ sql = f""" SELECT * FROM public.wo_maximo AS a WHERE a.asset_unit = '3' AND a.asset_assetnum = '{assetnum}' AND a.wonum NOT LIKE 'T%' """ query = text(sql) # Pass parameters to execute to avoid bindparam/name mismatches result = await db_session.execute(query) record = result.mappings().all() if record: return record return None async def get_by_assetnum(*, db_session: DbSession, assetnum: str) -> Optional[Equipment]: """Returns a document based on the given document id.""" print("assetnum service:", assetnum) query = Select(Equipment).filter(Equipment.assetnum == assetnum) result = await db_session.execute(query) return result.scalars().one_or_none() async def get_by_id(*, db_session: DbSession, equipment_id: str) -> Optional[Equipment]: """Returns a document based on the given document id.""" query = Select(Equipment).filter(Equipment.id == equipment_id) result = await db_session.execute(query) return result.scalars().one_or_none() async def get_all( *, db_session: DbSession, items_per_page: int, search: str = None, common ) -> list[Equipment]: """Returns all documents.""" query = ( Select(Equipment) .join(EquipmentMaster, Equipment.assetnum == EquipmentMaster.assetnum) .options(selectinload(Equipment.equipment_master)) ) if search: query = query.filter( cast(Equipment.acquisition_year, String).ilike(f"%{search}%") | cast(Equipment.assetnum, String).ilike(f"%{search}%") | cast(EquipmentMaster.name, String).ilike(f"%{search}%") ) common["items_per_page"] = items_per_page result = await search_filter_sort_paginate(model=query, **common) return result async def get_top_10_economic_life(*, db_session: DbSession) -> list[Equipment]: """Returns top 10 economic life.""" query = ( Select(Equipment) .join(EquipmentMaster, Equipment.assetnum == EquipmentMaster.assetnum) .options(selectinload(Equipment.equipment_master)) ) current_year = datetime.datetime.now().year query = ( query.add_columns( func.abs(current_year - Equipment.minimum_eac_year).label("economic_life") ) .filter(Equipment.minimum_eac_year != None) .order_by(func.abs(current_year - Equipment.minimum_eac_year).desc()) .limit(10) ) result = await db_session.execute(query) equipment_list = [] for row in result.all(): equipment = row[0] equipment.economic_life = row[1] equipment_list.append(equipment) return equipment_list async def get_top_10_replacement_priorities(*, db_session: DbSession) -> list[Equipment]: """Returns top 10 replacement priorities.""" query = ( Select(Equipment) .join(EquipmentMaster, Equipment.assetnum == EquipmentMaster.assetnum) .options(selectinload(Equipment.equipment_master)) ) current_year = datetime.datetime.now().year query = ( query.add_columns( func.abs(current_year - Equipment.minimum_eac_year).label("economic_life") ) .filter(Equipment.minimum_eac_year != None) .order_by(func.abs(current_year - Equipment.minimum_eac_year).asc()) .order_by(func.abs(Equipment.minimum_eac).desc()) .limit(10) ) result = await db_session.execute(query) equipment_list = [] for row in result.all(): equipment = row[0] equipment.economic_life = row[1] equipment_list.append(equipment) return equipment_list async def generate_all_transaction(*, db_session: DbSession, token): """Generate transaction for all equipments in the database based on equipments assetnum.""" query = Select(Equipment) query_result = await db_session.execute(query) equipments = query_result.scalars().all() for equipment in equipments: await main(equipment.assetnum, token, RELIABILITY_APP_URL) return [equipment.assetnum for equipment in equipments] async def generate_transaction( *, db_session: DbSession, data_in: EquipmentCreate, token ): # Delete all existing master records for this asset number and prediction data query = ( Delete(EquipmentTransactionRecords) .where(EquipmentTransactionRecords.assetnum == data_in.assetnum) .where(EquipmentTransactionRecords.is_actual == 0) ) await db_session.execute(query) await db_session.commit() """Generate transaction for equipment.""" prediction = await main(data_in.assetnum, token, RELIABILITY_APP_URL) # # Fetch data from external API # async def fetch_api_data(assetnum: str, year: int) -> dict: # async with httpx.AsyncClient() as client: # try: # response = await client.get( # f"{os.environ.get('RELIABILITY_APP_URL')}/main/number-of-failures/{assetnum}/{year}/{year}", # timeout=30.0, # headers={"Authorization": f"Bearer {token}"}, # ) # response.raise_for_status() # return response.json() # except httpx.HTTPError as e: # print(f"HTTP error occurred: {e}") # return {} # # Initialize base transaction with default values # base_transaction = { # "assetnum": data_in.assetnum, # "is_actual": 0, # "raw_cm_interval": None, # "raw_cm_material_cost": None, # "raw_cm_labor_time": None, # "raw_cm_labor_human": None, # "raw_pm_interval": None, # "raw_pm_material_cost": None, # "raw_pm_labor_time": None, # "raw_pm_labor_human": None, # "raw_predictive_labor_time": None, # "raw_predictive_labor_human": None, # "raw_oh_material_cost": None, # "raw_oh_labor_time": None, # "raw_oh_labor_human": None, # "raw_project_task_material_cost": None, # "raw_loss_output_MW": None, # "raw_loss_output_price": None, # "raw_operational_cost": None, # "raw_maintenance_cost": None, # "rc_cm_material_cost": None, # "rc_cm_labor_cost": None, # "rc_pm_material_cost": None, # "rc_pm_labor_cost": None, # "rc_predictive_labor_cost": None, # "rc_oh_material_cost": None, # "rc_oh_labor_cost": None, # "rc_project_material_cost": None, # "rc_lost_cost": None, # "rc_operation_cost": None, # "rc_maintenance_cost": None, # "rc_total_cost": None, # "eac_npv": None, # "eac_annual_mnt_cost": None, # "eac_annual_acq_cost": None, # "eac_eac": None, # } # transactions = [] # # Query existing records with is_actual=1 # actual_years_query = ( # Select(EquipmentTransactionRecords.tahun) # .filter(EquipmentTransactionRecords.assetnum == data_in.assetnum) # .filter(EquipmentTransactionRecords.is_actual == 1) # ) # result = await db_session.execute(actual_years_query) # actual_years = {row[0] for row in result.all()} # for sequence, year in enumerate( # range(data_in.acquisition_year - 1, data_in.forecasting_target_year + 1), 0 # ): # # Skip if year already has actual data # if year in actual_years: # continue # transaction = base_transaction.copy() # # Update values from API # api_data = await fetch_api_data(data_in.assetnum, year) # if api_data: # # # Get current num_fail # current_num_fail = api_data["data"][0]["num_fail"] # # # Calculate sum of previous failures for this asset # # previous_failures_query = ( # # Select(func.sum(EquipmentTransactionRecords.raw_cm_interval)) # # .filter(EquipmentTransactionRecords.assetnum == data_in.assetnum) # # .filter(EquipmentTransactionRecords.tahun < year) # # ) # # previous_failures_result = await db_session.execute(previous_failures_query) # # previous_failures_sum = previous_failures_result.scalar() or 0 # # # Update with current minus sum of previous # # transaction.update({"raw_cm_interval": current_num_fail - previous_failures_sum}) # transaction.update({"raw_cm_interval": current_num_fail}) # transaction.update({"tahun": int(year), "seq": int(sequence)}) # transactions.append(EquipmentTransactionRecords(**transaction)) # db_session.add_all(transactions) # await db_session.commit() # # Return the number of transactions created # return len(transactions) return prediction async def create(*, db_session: DbSession, equipment_in: EquipmentCreate, token): """Creates a new document.""" equipment = Equipment(**equipment_in.model_dump()) db_session.add(equipment) await db_session.commit() # await generate_transaction(db_session=db_session, data_in=equipment_in, token=token) return equipment async def update( *, db_session: DbSession, equipment: Equipment, equipment_in: EquipmentUpdate, token ): """Updates a document.""" data = equipment_in.model_dump() update_data = equipment_in.model_dump(exclude_defaults=True) for field in data: if field in update_data: setattr(equipment, field, update_data[field]) await db_session.commit() updated_data = vars(equipment) # equipment_create = EquipmentCreate(**updated_data) # await generate_transaction( # db_session=db_session, data_in=equipment_create, token=token # ) return updated_data async def delete(*, db_session: DbSession, equipment_id: str): """Deletes a document.""" query = Delete(Equipment).where(Equipment.id == equipment_id) await db_session.execute(query) await db_session.commit()