from sqlalchemy import Select, Delete, Float, func, cast, String from sqlalchemy.orm import selectinload from src.database.service import search_filter_sort_paginate from .model import Equipment, MasterRecords from ..equipment_master.model import EquipmentMaster from .schema import EquipmentCreate, EquipmentUpdate, MasterBase from typing import Optional from src.database.core import DbSession from src.auth.service import CurrentUser async def get_master_by_assetnum( *, db_session: DbSession, assetnum: str ) -> tuple[list[MasterRecords], float | None]: """Returns master records with equipment data based on asset number.""" # First query to get equipment record equipment_master_query = Select(EquipmentMaster).filter( EquipmentMaster.assetnum == assetnum ) equipment_master_result = await db_session.execute(equipment_master_query) equipment_master_record = equipment_master_result.scalars().one_or_none() equipment_query = Select(Equipment).filter(Equipment.assetnum == assetnum) equipment_result = await db_session.execute(equipment_query) equipment_record = equipment_result.scalars().one_or_none() # Second query to get master records master_query = ( Select(MasterRecords) .join(MasterRecords.equipment) .options(selectinload(MasterRecords.equipment)) .filter(Equipment.assetnum == assetnum) .order_by(MasterRecords.tahun.asc()) ) master_result = await db_session.execute(master_query) records = master_result.scalars().all() # Get the last actual year last_actual_year_query = ( Select(func.max(MasterRecords.tahun)) .join(MasterRecords.equipment) .filter(Equipment.assetnum == assetnum) .filter(MasterRecords.is_actual == 1) ) last_actual_year_result = await db_session.execute(last_actual_year_query) last_actual_year = last_actual_year_result.scalar() # Third query specifically for minimum eac_eac min_query = ( Select(func.min(func.cast(MasterRecords.eac_eac, Float)), MasterRecords.seq) .join(MasterRecords.equipment) .filter(Equipment.assetnum == assetnum) .group_by(MasterRecords.seq) .order_by(func.min(func.cast(MasterRecords.eac_eac, Float))) .limit(1) ) min_result = await db_session.execute(min_query) min_record = min_result.first() min_eac_value = ( float(min_record[0]) if min_record and min_record[0] is not None else None ) min_seq = min_record[1] if min_record else None return equipment_master_record, equipment_record, records, min_eac_value, min_seq, last_actual_year # return result.scalars().all() async def get_by_id(*, db_session: DbSession, equipment_id: str) -> Optional[Equipment]: """Returns a document based on the given document id.""" query = Select(Equipment).filter(Equipment.id == equipment_id) result = await db_session.execute(query) return result.scalars().one_or_none() async def get_all( *, db_session: DbSession, items_per_page: int, search: str = None, common ) -> list[Equipment]: """Returns all documents.""" query = Select(Equipment) if search: query = query.filter( cast(Equipment.acquisition_year, String).ilike(f"%{search}%") | cast(Equipment.assetnum, String).ilike(f"%{search}%") ) common["items_per_page"] = items_per_page result = await search_filter_sort_paginate(model=query, **common) return result async def generate_transaction(*, db_session: DbSession, data_in: EquipmentCreate): # Delete all existing master records for this asset number and prediction data query = ( Delete(MasterRecords) .where(MasterRecords.assetnum == data_in.assetnum) .where(MasterRecords.is_actual == 0) ) await db_session.execute(query) await db_session.commit() """Generate transaction for equipment.""" base_transaction = { "assetnum": data_in.assetnum, "is_actual": 0, "raw_cm_interval": None, "raw_cm_material_cost": None, "raw_cm_labor_time": None, "raw_cm_labor_human": None, "raw_pm_interval": None, "raw_pm_material_cost": None, "raw_pm_labor_time": None, "raw_pm_labor_human": None, "raw_predictive_labor_time": None, "raw_predictive_labor_human": None, "raw_oh_material_cost": None, "raw_oh_labor_time": None, "raw_oh_labor_human": None, "raw_project_task_material_cost": None, "raw_loss_output_MW": None, "raw_loss_output_price": None, "raw_operational_cost": None, "raw_maintenance_cost": None, "rc_cm_material_cost": None, "rc_cm_labor_cost": None, "rc_pm_material_cost": None, "rc_pm_labor_cost": None, "rc_predictive_labor_cost": None, "rc_oh_material_cost": None, "rc_oh_labor_cost": None, "rc_project_material_cost": None, "rc_lost_cost": None, "rc_operation_cost": None, "rc_maintenance_cost": None, "rc_total_cost": None, "eac_npv": None, "eac_annual_mnt_cost": None, "eac_annual_acq_cost": None, "eac_eac": None, } transactions = [] for sequence, year in enumerate( range(data_in.acquisition_year - 1, data_in.forecasting_target_year + 1), 0 ): transaction = base_transaction.copy() transaction.update({"tahun": int(year), "seq": int(sequence)}) transactions.append(MasterRecords(**transaction)) db_session.add_all(transactions) await db_session.commit() # Return the number of transactions created return len(transactions) async def create(*, db_session: DbSession, equipment_in: EquipmentCreate): """Creates a new document.""" equipment = Equipment(**equipment_in.model_dump()) db_session.add(equipment) await db_session.commit() await generate_transaction(db_session=db_session, data_in=equipment_in) return equipment async def update( *, db_session: DbSession, equipment: Equipment, equipment_in: EquipmentUpdate ): """Updates a document.""" data = equipment_in.model_dump() update_data = equipment_in.model_dump(exclude_defaults=True) for field in data: if field in update_data: setattr(equipment, field, update_data[field]) await db_session.commit() return equipment async def delete(*, db_session: DbSession, equipment_id: str): """Deletes a document.""" query = Delete(Equipment).where(Equipment.id == equipment_id) await db_session.execute(query) await db_session.commit()