You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
274 lines
9.9 KiB
Python
274 lines
9.9 KiB
Python
import os
|
|
from sqlalchemy import Select, Delete, Float, func, cast, String
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from src.database.service import search_filter_sort_paginate
|
|
from .model import Equipment, MasterRecords
|
|
from ..equipment_master.model import EquipmentMaster
|
|
from .schema import EquipmentCreate, EquipmentUpdate, MasterBase
|
|
from typing import Optional
|
|
|
|
from src.database.core import DbSession
|
|
from src.auth.service import CurrentUser
|
|
from src.config import RELIABILITY_APP_URL
|
|
import httpx
|
|
|
|
from src.modules.equipment.run import main
|
|
|
|
|
|
async def get_master_by_assetnum(
|
|
*, db_session: DbSession, assetnum: str
|
|
) -> tuple[list[MasterRecords], float | None]:
|
|
"""Returns master records with equipment data based on asset number."""
|
|
|
|
# First query to get equipment record
|
|
equipment_master_query = Select(EquipmentMaster).filter(
|
|
EquipmentMaster.assetnum == assetnum
|
|
)
|
|
equipment_master_result = await db_session.execute(equipment_master_query)
|
|
equipment_master_record = equipment_master_result.scalars().one_or_none()
|
|
equipment_query = Select(Equipment).filter(Equipment.assetnum == assetnum)
|
|
equipment_result = await db_session.execute(equipment_query)
|
|
equipment_record = equipment_result.scalars().one_or_none()
|
|
|
|
# Second query to get master records
|
|
master_query = (
|
|
Select(MasterRecords)
|
|
.join(MasterRecords.equipment)
|
|
.options(selectinload(MasterRecords.equipment))
|
|
.filter(Equipment.assetnum == assetnum)
|
|
.order_by(MasterRecords.tahun.asc())
|
|
)
|
|
master_result = await db_session.execute(master_query)
|
|
records = master_result.scalars().all()
|
|
|
|
# Get the last actual year
|
|
last_actual_year_query = (
|
|
Select(func.max(MasterRecords.tahun))
|
|
.join(MasterRecords.equipment)
|
|
.filter(Equipment.assetnum == assetnum)
|
|
.filter(MasterRecords.is_actual == 1)
|
|
)
|
|
last_actual_year_result = await db_session.execute(last_actual_year_query)
|
|
last_actual_year = last_actual_year_result.scalar()
|
|
|
|
# Third query specifically for minimum eac_eac
|
|
min_query = (
|
|
Select(func.min(func.cast(MasterRecords.eac_eac, Float)), MasterRecords.seq)
|
|
.join(MasterRecords.equipment)
|
|
.filter(Equipment.assetnum == assetnum)
|
|
.group_by(MasterRecords.seq)
|
|
.order_by(func.min(func.cast(MasterRecords.eac_eac, Float)))
|
|
.limit(1)
|
|
)
|
|
min_result = await db_session.execute(min_query)
|
|
min_record = min_result.first()
|
|
min_eac_value = (
|
|
float(min_record[0]) if min_record and min_record[0] is not None else None
|
|
)
|
|
min_seq = min_record[1] if min_record else None
|
|
|
|
return (
|
|
equipment_master_record,
|
|
equipment_record,
|
|
records,
|
|
min_eac_value,
|
|
min_seq,
|
|
last_actual_year,
|
|
)
|
|
# return result.scalars().all()
|
|
|
|
|
|
async def get_by_id(*, db_session: DbSession, equipment_id: str) -> Optional[Equipment]:
|
|
"""Returns a document based on the given document id."""
|
|
query = Select(Equipment).filter(Equipment.id == equipment_id)
|
|
result = await db_session.execute(query)
|
|
return result.scalars().one_or_none()
|
|
|
|
|
|
async def get_all(
|
|
*, db_session: DbSession, items_per_page: int, search: str = None, common
|
|
) -> list[Equipment]:
|
|
"""Returns all documents."""
|
|
query = (
|
|
Select(Equipment)
|
|
.join(EquipmentMaster, Equipment.assetnum == EquipmentMaster.assetnum)
|
|
.options(selectinload(Equipment.equipment_master))
|
|
)
|
|
|
|
if search:
|
|
query = query.filter(
|
|
cast(Equipment.acquisition_year, String).ilike(f"%{search}%")
|
|
| cast(Equipment.assetnum, String).ilike(f"%{search}%")
|
|
| cast(EquipmentMaster.name, String).ilike(f"%{search}%")
|
|
)
|
|
common["items_per_page"] = items_per_page
|
|
result = await search_filter_sort_paginate(model=query, **common)
|
|
return result
|
|
|
|
|
|
async def generate_all_transaction(*, db_session: DbSession, token):
|
|
"""Generate transaction for all equipments in the database based on equipments assetnum."""
|
|
query = Select(Equipment)
|
|
query_result = await db_session.execute(query)
|
|
equipments = query_result.scalars().all()
|
|
for equipment in equipments:
|
|
await main(equipment.assetnum, token, RELIABILITY_APP_URL)
|
|
return [equipment.assetnum for equipment in equipments]
|
|
|
|
|
|
async def generate_transaction(
|
|
*, db_session: DbSession, data_in: EquipmentCreate, token
|
|
):
|
|
# Delete all existing master records for this asset number and prediction data
|
|
query = (
|
|
Delete(MasterRecords)
|
|
.where(MasterRecords.assetnum == data_in.assetnum)
|
|
.where(MasterRecords.is_actual == 0)
|
|
)
|
|
await db_session.execute(query)
|
|
await db_session.commit()
|
|
"""Generate transaction for equipment."""
|
|
prediction = await main(data_in.assetnum, token, RELIABILITY_APP_URL)
|
|
|
|
# # Fetch data from external API
|
|
# async def fetch_api_data(assetnum: str, year: int) -> dict:
|
|
# async with httpx.AsyncClient() as client:
|
|
# try:
|
|
# response = await client.get(
|
|
# f"{os.environ.get('RELIABILITY_APP_URL')}/main/number-of-failures/{assetnum}/{year}/{year}",
|
|
# timeout=30.0,
|
|
# headers={"Authorization": f"Bearer {token}"},
|
|
# )
|
|
# response.raise_for_status()
|
|
# return response.json()
|
|
# except httpx.HTTPError as e:
|
|
# print(f"HTTP error occurred: {e}")
|
|
# return {}
|
|
|
|
# # Initialize base transaction with default values
|
|
# base_transaction = {
|
|
# "assetnum": data_in.assetnum,
|
|
# "is_actual": 0,
|
|
# "raw_cm_interval": None,
|
|
# "raw_cm_material_cost": None,
|
|
# "raw_cm_labor_time": None,
|
|
# "raw_cm_labor_human": None,
|
|
# "raw_pm_interval": None,
|
|
# "raw_pm_material_cost": None,
|
|
# "raw_pm_labor_time": None,
|
|
# "raw_pm_labor_human": None,
|
|
# "raw_predictive_labor_time": None,
|
|
# "raw_predictive_labor_human": None,
|
|
# "raw_oh_material_cost": None,
|
|
# "raw_oh_labor_time": None,
|
|
# "raw_oh_labor_human": None,
|
|
# "raw_project_task_material_cost": None,
|
|
# "raw_loss_output_MW": None,
|
|
# "raw_loss_output_price": None,
|
|
# "raw_operational_cost": None,
|
|
# "raw_maintenance_cost": None,
|
|
# "rc_cm_material_cost": None,
|
|
# "rc_cm_labor_cost": None,
|
|
# "rc_pm_material_cost": None,
|
|
# "rc_pm_labor_cost": None,
|
|
# "rc_predictive_labor_cost": None,
|
|
# "rc_oh_material_cost": None,
|
|
# "rc_oh_labor_cost": None,
|
|
# "rc_project_material_cost": None,
|
|
# "rc_lost_cost": None,
|
|
# "rc_operation_cost": None,
|
|
# "rc_maintenance_cost": None,
|
|
# "rc_total_cost": None,
|
|
# "eac_npv": None,
|
|
# "eac_annual_mnt_cost": None,
|
|
# "eac_annual_acq_cost": None,
|
|
# "eac_eac": None,
|
|
# }
|
|
|
|
# transactions = []
|
|
|
|
# # Query existing records with is_actual=1
|
|
# actual_years_query = (
|
|
# Select(MasterRecords.tahun)
|
|
# .filter(MasterRecords.assetnum == data_in.assetnum)
|
|
# .filter(MasterRecords.is_actual == 1)
|
|
# )
|
|
# result = await db_session.execute(actual_years_query)
|
|
# actual_years = {row[0] for row in result.all()}
|
|
|
|
# for sequence, year in enumerate(
|
|
# range(data_in.acquisition_year - 1, data_in.forecasting_target_year + 1), 0
|
|
# ):
|
|
# # Skip if year already has actual data
|
|
# if year in actual_years:
|
|
# continue
|
|
|
|
# transaction = base_transaction.copy()
|
|
# # Update values from API
|
|
# api_data = await fetch_api_data(data_in.assetnum, year)
|
|
|
|
# if api_data:
|
|
# # # Get current num_fail
|
|
# current_num_fail = api_data["data"][0]["num_fail"]
|
|
|
|
# # # Calculate sum of previous failures for this asset
|
|
# # previous_failures_query = (
|
|
# # Select(func.sum(MasterRecords.raw_cm_interval))
|
|
# # .filter(MasterRecords.assetnum == data_in.assetnum)
|
|
# # .filter(MasterRecords.tahun < year)
|
|
# # )
|
|
# # previous_failures_result = await db_session.execute(previous_failures_query)
|
|
# # previous_failures_sum = previous_failures_result.scalar() or 0
|
|
|
|
# # # Update with current minus sum of previous
|
|
# # transaction.update({"raw_cm_interval": current_num_fail - previous_failures_sum})
|
|
# transaction.update({"raw_cm_interval": current_num_fail})
|
|
# transaction.update({"tahun": int(year), "seq": int(sequence)})
|
|
# transactions.append(MasterRecords(**transaction))
|
|
|
|
# db_session.add_all(transactions)
|
|
# await db_session.commit()
|
|
|
|
# # Return the number of transactions created
|
|
# return len(transactions)
|
|
return prediction
|
|
|
|
|
|
async def create(*, db_session: DbSession, equipment_in: EquipmentCreate, token):
|
|
"""Creates a new document."""
|
|
equipment = Equipment(**equipment_in.model_dump())
|
|
db_session.add(equipment)
|
|
await db_session.commit()
|
|
await generate_transaction(db_session=db_session, data_in=equipment_in, token=token)
|
|
return equipment
|
|
|
|
|
|
async def update(
|
|
*, db_session: DbSession, equipment: Equipment, equipment_in: EquipmentUpdate, token
|
|
):
|
|
"""Updates a document."""
|
|
|
|
data = equipment_in.model_dump()
|
|
update_data = equipment_in.model_dump(exclude_defaults=True)
|
|
for field in data:
|
|
if field in update_data:
|
|
setattr(equipment, field, update_data[field])
|
|
|
|
await db_session.commit()
|
|
|
|
updated_data = vars(equipment)
|
|
equipment_create = EquipmentCreate(**updated_data)
|
|
await generate_transaction(
|
|
db_session=db_session, data_in=equipment_create, token=token
|
|
)
|
|
|
|
return equipment
|
|
|
|
|
|
async def delete(*, db_session: DbSession, equipment_id: str):
|
|
"""Deletes a document."""
|
|
query = Delete(Equipment).where(Equipment.id == equipment_id)
|
|
await db_session.execute(query)
|
|
await db_session.commit()
|