feat: Implement historical transaction records and refine actual data processing logic with acquisition year handling and targeted execution.

rest-api
MrWaradana 1 month ago
parent 9f9238c088
commit d10f9a2bde

@ -100,3 +100,60 @@ class EquipmentTransactionRecords(Base, DefaultMixin, IdentityMixin):
eac_eac = Column(Float, nullable=False)
efdh_equivalent_forced_derated_hours = Column(Float, nullable=False)
foh_forced_outage_hours = Column(Float, nullable=False)
class EquipmentHistoricalTransactionRecords(Base, DefaultMixin, IdentityMixin):
__tablename__ = "lcc_equipment_historical_tr_data"
equipment = relationship(
"Equipment",
backref="historical_maintenance_records",
lazy="raise",
primaryjoin="and_(EquipmentHistoricalTransactionRecords.assetnum == foreign(Equipment.assetnum))",
viewonly=True,
)
assetnum = Column(String, nullable=False)
tahun = Column(Integer, nullable=False)
seq = Column(Integer, nullable=False)
is_actual = Column(Integer, nullable=False)
raw_cm_interval = Column(Float, nullable=False)
raw_cm_material_cost = Column(Float, nullable=False)
raw_cm_labor_time = Column(Float, nullable=False)
raw_cm_labor_human = Column(Float, nullable=False)
raw_pm_interval = Column(Float, nullable=False)
raw_pm_material_cost = Column(Float, nullable=False)
raw_pm_labor_time = Column(Float, nullable=False)
raw_pm_labor_human = Column(Float, nullable=False)
raw_predictive_interval = Column(Float, nullable=False)
raw_predictive_material_cost = Column(Float, nullable=False)
raw_predictive_labor_time = Column(Float, nullable=False)
raw_predictive_labor_human = Column(Float, nullable=False)
raw_oh_interval = Column(Float, nullable=False)
raw_oh_material_cost = Column(Float, nullable=False)
raw_oh_labor_time = Column(Float, nullable=False)
raw_oh_labor_human = Column(Float, nullable=False)
raw_project_task_material_cost = Column(Float, nullable=False)
raw_loss_output_MW = Column(Float, nullable=False)
raw_loss_output_price = Column(Float, nullable=False)
raw_operational_cost = Column(Float, nullable=False)
raw_maintenance_cost = Column(Float, nullable=False)
rc_cm_material_cost = Column(Float, nullable=False)
rc_cm_labor_cost = Column(Float, nullable=False)
rc_pm_material_cost = Column(Float, nullable=False)
rc_pm_labor_cost = Column(Float, nullable=False)
rc_predictive_labor_cost = Column(Float, nullable=False)
rc_oh_material_cost = Column(Float, nullable=False)
rc_oh_labor_cost = Column(Float, nullable=False)
rc_project_material_cost = Column(Float, nullable=False)
rc_lost_cost = Column(Float, nullable=False)
rc_operation_cost = Column(Float, nullable=False)
rc_maintenance_cost = Column(Float, nullable=False)
rc_total_cost = Column(Float, nullable=False)
eac_npv = Column(Float, nullable=False)
eac_annual_mnt_cost = Column(Float, nullable=False)
eac_annual_acq_cost = Column(Float, nullable=False)
eac_disposal_cost = Column(Float, nullable=False)
eac_eac = Column(Float, nullable=False)
efdh_equivalent_forced_derated_hours = Column(Float, nullable=False)
foh_forced_outage_hours = Column(Float, nullable=False)

@ -287,7 +287,8 @@ async def get_equipment(db_session: DbSession, collector_db_session: CollectorDb
last_actual_year,
maximo_data,
joined_maximo_record,
min_eac_disposal_cost
min_eac_disposal_cost,
historical_records
) = await get_master_by_assetnum(db_session=db_session, collector_db_session=collector_db_session, assetnum=assetnum)
# raise Exception(equipment[0])
if not chart_data:
@ -307,7 +308,8 @@ async def get_equipment(db_session: DbSession, collector_db_session: CollectorDb
last_actual_year=last_actual_year,
maximo_data=maximo_data,
joined_maximo=joined_maximo_record,
min_eac_disposal_cost=min_eac_disposal_cost
min_eac_disposal_cost=min_eac_disposal_cost,
historical_data=historical_records
),
message="Data retrieved successfully",
)

@ -106,6 +106,7 @@ class EquipmentRead(DefaultBase):
maximo_data: Optional[List[dict]] = Field(None, nullable=True)
joined_maximo: Optional[List[dict]] = Field(None, nullable=True)
min_eac_disposal_cost: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)
historical_data: Optional[List[MasterBase]] = Field(None, nullable=True)
class EquipmentTop10(EquipmentBase):
id: UUID

@ -9,7 +9,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from asyncpg.exceptions import InterfaceError as AsyncpgInterfaceError
from src.database.service import search_filter_sort_paginate
from src.equipment.model import Equipment, EquipmentTransactionRecords
from src.equipment.model import Equipment, EquipmentTransactionRecords, EquipmentHistoricalTransactionRecords
from src.acquisition_cost.model import AcquisitionData
from src.yeardata.model import Yeardata
from ..equipment_master.model import EquipmentMaster
@ -299,6 +299,16 @@ async def get_master_by_assetnum(
None,
)
# Historical data query
historical_query = (
Select(EquipmentHistoricalTransactionRecords)
.join(EquipmentHistoricalTransactionRecords.equipment)
.filter(Equipment.assetnum == assetnum)
.order_by(EquipmentHistoricalTransactionRecords.tahun.asc())
)
historical_result = await db_session.execute(historical_query)
historical_records = historical_result.scalars().all()
return (
equipment_master_record,
equipment_record,
@ -310,6 +320,7 @@ async def get_master_by_assetnum(
maximo_record,
joined_maximo_record,
min_eac_disposal_cost,
historical_records,
)
# return result.scalars().all()

@ -160,9 +160,9 @@ class Prediksi:
cursor = connection.cursor()
# Query untuk mendapatkan nilai maksimum seq
# Query untuk mendapatkan nilai maksimum seq dari data actual
get_max_seq_query = """
SELECT COALESCE(MAX(seq), 0) FROM lcc_equipment_tr_data WHERE assetnum = %s
SELECT COALESCE(MAX(seq), 0) FROM lcc_equipment_tr_data WHERE assetnum = %s AND is_actual = 1
"""
cursor.execute(get_max_seq_query, (equipment_id,))
max_seq = cursor.fetchone()[0]
@ -187,49 +187,6 @@ class Prediksi:
%s, %s, 0, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'Sys', NOW()
)
"""
# If a token was provided, store locally so fetch_api_data can use/refresh it
# if token:
# self.access_token = token
# # Fetch data from external API (uses instance access_token and will try refresh on 403)
# async def fetch_api_data(assetnum: str, year: int) -> dict:
# url = self.RELIABILITY_APP_URL
# endpoint = f"{url}/main/number-of-failures/{assetnum}/{int(year)}/{int(year)}"
# async with httpx.AsyncClient() as client:
# try:
# current_token = getattr(self, "access_token", None)
# response = await client.get(
# endpoint,
# timeout=30.0,
# headers={"Authorization": f"Bearer {current_token}"} if current_token else {},
# )
# response.raise_for_status()
# return response.json()
# except httpx.HTTPStatusError as e:
# status = getattr(e.response, "status_code", None)
# # If we get a 403, try to refresh the access token and retry once
# if status == 403:
# print("Received 403 from reliability API, attempting to refresh access token...")
# new_access = await self.refresh_access_token()
# if new_access:
# try:
# response = await client.get(
# endpoint,
# timeout=30.0,
# headers={"Authorization": f"Bearer {new_access}"},
# )
# response.raise_for_status()
# return response.json()
# except httpx.HTTPError as e2:
# print(f"HTTP error occurred after refresh: {e2}")
# return {}
# print(f"HTTP error occurred: {e}")
# return {}
# except httpx.HTTPError as e:
# print(f"HTTP error occurred: {e}")
# return {}
# Menyiapkan data untuk batch insert atau update
records_to_insert = []
@ -241,6 +198,7 @@ class Prediksi:
update_query = """
UPDATE lcc_equipment_tr_data
SET
seq = %s,
rc_cm_material_cost = %s,
rc_cm_labor_cost = %s,
rc_pm_material_cost = %s,
@ -254,7 +212,8 @@ class Prediksi:
WHERE id = %s
"""
for _, row in data.iterrows():
for idx, row in data.iterrows():
loop_seq = max_seq + idx + 1
# Check if data exists
cursor.execute(check_existence_query, (equipment_id, int(row["year"])))
existing_record = cursor.fetchone()
@ -263,6 +222,7 @@ class Prediksi:
# Update existing record
record_id = existing_record[0]
cursor.execute(update_query, (
int(loop_seq),
float(row.get("rc_cm_material_cost", 0)) if not pd.isna(row.get("rc_cm_material_cost", 0)) else 0.0,
float(row.get("rc_cm_labor_cost", 0)) if not pd.isna(row.get("rc_cm_labor_cost", 0)) else 0.0,
float(row.get("rc_pm_material_cost", 0)) if not pd.isna(row.get("rc_pm_material_cost", 0)) else 0.0,
@ -274,12 +234,11 @@ class Prediksi:
record_id
))
else:
max_seq = max_seq + 1
# Prepare for insert
records_to_insert.append(
(
str(uuid4()), # id
int(max_seq), # seq
int(loop_seq), # seq
int(row["year"]),
equipment_id,
float(row.get("rc_cm_material_cost", 0)) if not pd.isna(row.get("rc_cm_material_cost", 0)) else 0.0,
@ -831,8 +790,6 @@ class Prediksi:
async def predict_equipment_data(self, assetnum, token):
try:
# Update acquisition year first
acquisition_year_ref = await self.__update_equipment_acquisition_year(assetnum)
# Mengambil data dari database
df = self.__fetch_data_from_db(assetnum)
if df is None:

@ -10,6 +10,7 @@ import os
import httpx
from src.modules.config import get_connection, get_production_connection
from .where_query_sql import get_where_query_sql, get_where_query_sql_labour_cost
import argparse
async def fetch_api_data(
assetnum: str, year: int, RELIABILITY_APP_URL: str, token: str
@ -975,10 +976,15 @@ async def query_data(target_assetnum: str = None):
forecasting_start_year_db = row.get("forecasting_start_year")
acquisition_year = row.get("acquisition_year")
if forecasting_start_year_db:
if acquisition_year:
# Remove data before acquisition_year
cursor.execute("DELETE FROM lcc_equipment_tr_data WHERE assetnum = %s AND tahun < %s", (assetnum, acquisition_year))
forecasting_start_year = acquisition_year - 1
elif forecasting_start_year_db:
# If no acquisition_year but forecasting_start_year defined in DB
forecasting_start_year = forecasting_start_year_db
else:
forecasting_start_year = (acquisition_year-1) if acquisition_year else 2014
forecasting_start_year = 2014
asset_start = datetime.now()
processed_assets += 1
@ -1196,9 +1202,13 @@ async def query_data(target_assetnum: str = None):
# print("========Process finished and connection closed.========")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Insert actual data for equipment.")
parser.add_argument("--assetnum", type=str, help="Asset number to process (optional). If not provided, process all.")
args = parser.parse_args()
async def main():
# await insert_ms_equipment_data()
# await query_data()
print("insert_actual_data.py is called")
await query_data(target_assetnum=args.assetnum)
print("insert_actual_data.py finished")
asyncio.run(main())
Loading…
Cancel
Save