feat: Implement RBD integration for EAF simulation and result collection, and enhance equipment data with joined Maximo records.

main
MrWaradana 3 weeks ago
parent 65308e5735
commit a2152f33b4

@ -84,6 +84,9 @@ TIMEZONE = "Asia/Jakarta"
AUTH_SERVICE_API = config("AUTH_SERVICE_API", default="http://192.168.1.82:8000/auth") AUTH_SERVICE_API = config("AUTH_SERVICE_API", default="http://192.168.1.82:8000/auth")
RELIABILITY_APP_URL = config( RELIABILITY_APP_URL = config(
"RELIABILITY_APP_URL", default="http://192.168.1.82:8000/reliability" "RELIABILITY_APP_URL", default="http://192.168.1.82:8000/reliability"
) )
RBD_APP_URL = config("RBD_APP_URL", default="http://192.168.1.82:8000/rbd")

@ -166,6 +166,7 @@ async def get_equipment(db_session: DbSession, collector_db_session: CollectorDb
min_eac_year, min_eac_year,
last_actual_year, last_actual_year,
maximo_data, maximo_data,
joined_maximo_record,
min_eac_disposal_cost min_eac_disposal_cost
) = await get_master_by_assetnum(db_session=db_session, collector_db_session=collector_db_session, assetnum=assetnum) ) = await get_master_by_assetnum(db_session=db_session, collector_db_session=collector_db_session, assetnum=assetnum)
# raise Exception(equipment[0]) # raise Exception(equipment[0])
@ -185,6 +186,7 @@ async def get_equipment(db_session: DbSession, collector_db_session: CollectorDb
min_eac_year=min_eac_year, min_eac_year=min_eac_year,
last_actual_year=last_actual_year, last_actual_year=last_actual_year,
maximo_data=maximo_data, maximo_data=maximo_data,
joined_maximo=joined_maximo_record,
min_eac_disposal_cost=min_eac_disposal_cost min_eac_disposal_cost=min_eac_disposal_cost
), ),
message="Data retrieved successfully", message="Data retrieved successfully",

@ -106,6 +106,7 @@ class EquipmentRead(DefaultBase):
min_eac_year: Optional[float] = Field(None, nullable=True) min_eac_year: Optional[float] = Field(None, nullable=True)
last_actual_year: Optional[int] = Field(None, nullable=True) last_actual_year: Optional[int] = Field(None, nullable=True)
maximo_data: Optional[List[dict]] = Field(None, nullable=True) maximo_data: Optional[List[dict]] = Field(None, nullable=True)
joined_maximo: Optional[List[dict]] = Field(None, nullable=True)
min_eac_disposal_cost: Optional[float] = Field(None, nullable=True, le=MAX_PRICE) min_eac_disposal_cost: Optional[float] = Field(None, nullable=True, le=MAX_PRICE)
class EquipmentTop10(EquipmentBase): class EquipmentTop10(EquipmentBase):

@ -135,6 +135,32 @@ MAXIMO_SQL = text(
""" """
) )
JOINED_MAXIMO_SQL = text(
"""
SELECT *
FROM public.wo_maximo a
LEFT JOIN public.wo_maximo_labtrans b
ON b.wonum = a.wonum
LEFT JOIN lcc_ms_manpower emp
ON UPPER(TRIM(emp."ID Number")) = UPPER(TRIM(b.laborcode))
WHERE
a.asset_unit = '3'
AND a.wonum NOT LIKE 'T%'
AND a.asset_assetnum = :assetnum
AND (EXTRACT(EPOCH FROM (a.actfinish - a.actstart)) / 3600.0) <= 730
AND (
(a.worktype = 'CM' AND a.wojp8 != 'S1')
OR (a.worktype <> 'CM')
)
AND (
a.description NOT ILIKE '%U4%'
OR (
a.description ILIKE '%U3%'
AND a.description ILIKE '%U4%'
)
);
"""
)
async def _fetch_maximo_records( async def _fetch_maximo_records(
*, session: AsyncSession, assetnum: str *, session: AsyncSession, assetnum: str
@ -174,6 +200,44 @@ async def _fetch_maximo_records(
return [] return []
async def _fetch_joined_maximo_records(
*, session: AsyncSession, assetnum: str
) -> list[dict[str, Any]]:
"""Fetch Joined Maximo rows with a retry to mask transient collector failures."""
query = JOINED_MAXIMO_SQL.bindparams(assetnum=assetnum)
try:
result = await session.execute(query)
return result.mappings().all()
except AsyncpgInterfaceError as exc:
logger.warning(
"Collector session closed while fetching Joined Maximo data for %s. Retrying once.",
assetnum,
)
try:
async with collector_async_session() as retry_session:
retry_result = await retry_session.execute(query)
return retry_result.mappings().all()
except Exception as retry_exc:
logger.error(
"Retrying Joined Maximo query failed for %s: %s",
assetnum,
retry_exc,
exc_info=True,
)
except SQLAlchemyError as exc:
logger.error(
"Failed to fetch Joined Maximo data for %s: %s", assetnum, exc, exc_info=True
)
except Exception as exc:
logger.exception(
"Unexpected error while fetching Joined Maximo data for %s", assetnum
)
return []
async def fetch_acquisition_cost_with_rollup( async def fetch_acquisition_cost_with_rollup(
*, db_session: DbSession, base_category_no: str *, db_session: DbSession, base_category_no: str
) -> tuple[Optional[AcquisitionData], Optional[float]]: ) -> tuple[Optional[AcquisitionData], Optional[float]]:
@ -293,11 +357,14 @@ async def get_master_by_assetnum(
maximo_record = await _fetch_maximo_records( maximo_record = await _fetch_maximo_records(
session=collector_db_session, assetnum=assetnum session=collector_db_session, assetnum=assetnum
) )
joined_maximo_record = await _fetch_joined_maximo_records(
session=collector_db_session, assetnum=assetnum
)
min_eac_disposal_cost = next( min_eac_disposal_cost = next(
(record.eac_disposal_cost for record in records if record.tahun == min_eac_year), (record.eac_disposal_cost for record in records if record.tahun == min_eac_year),
None, None,
) )
print(min_eac_disposal_cost)
return ( return (
equipment_master_record, equipment_master_record,
equipment_record, equipment_record,
@ -307,6 +374,7 @@ async def get_master_by_assetnum(
min_eac_year, min_eac_year,
last_actual_year, last_actual_year,
maximo_record, maximo_record,
joined_maximo_record,
min_eac_disposal_cost, min_eac_disposal_cost,
) )
# return result.scalars().all() # return result.scalars().all()

@ -0,0 +1,24 @@
import asyncio
import logging
import sys
import os
# Add the project root to sys.path to resolve imports
sys.path.append(os.getcwd())
from src.database.core import get_session
from src.modules.plant.fetch_eaf_from_rbd import fetch_simulation_results
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
async def main():
logger.info("Starting simulation result collection script...")
async with get_session() as session:
await fetch_simulation_results(session)
logger.info("Simulation result collection script finished.")
if __name__ == "__main__":
asyncio.run(main())

@ -0,0 +1,154 @@
import httpx
import logging
import os
from datetime import datetime
from sqlalchemy import select
from src.database.core import DbSession
from src.yeardata.model import Yeardata
from src.config import RBD_APP_URL
logger = logging.getLogger(__name__)
current_year = datetime.now().year
AUTH_APP_URL = os.getenv("AUTH_APP_URL", "http://192.168.1.82:8000/auth")
async def sign_in(username: str = "lcca_admin", password: str = "password") -> str:
"""
Sign in to AUTH_APP_URL/sign-in using provided username/password.
Returns the access_token string if successful, else None.
"""
url = f"{AUTH_APP_URL}/sign-in"
logger.info(f"Signing in to {url}...")
async with httpx.AsyncClient() as client:
try:
resp = await client.post(
url,
json={"username": username, "password": password},
timeout=30.0,
)
resp.raise_for_status()
data = resp.json()
if isinstance(data, dict) and "data" in data:
token = data.get("data", {}).get("access_token")
if token:
logger.info("Sign-in successful.")
return token
except Exception as e:
logger.error(f"Sign-in failed: {e}")
return None
async def start_simulations(db_session: DbSession):
"""
Starts simulatios on RBD app for years > current year and saves simulation IDs.
"""
# 0. Sign in first
token = await sign_in()
if not token:
logger.error("Aborting start_simulations because sign-in failed.")
return
headers = {"Authorization": f"Bearer {token}"}
# Get all years after current year
query = select(Yeardata).filter(Yeardata.year > current_year)
result = await db_session.execute(query)
yeardata_list = result.scalars().all()
if not yeardata_list:
logger.info("No yeardata found for years > %s", current_year)
return
async with httpx.AsyncClient() as client:
# Step 1: Start simulations and get IDs
for record in yeardata_list:
try:
# POST {{RBD_APP_URL}}/aeros/simulation/run/yearly
url = f"{RBD_APP_URL}/aeros/simulation/run/yearly"
payload = {"year": record.year}
logger.info(f"Starting simulation for year {record.year}: POST {url}")
response = await client.post(url, json=payload, headers=headers, timeout=60.0)
response.raise_for_status()
# Check JSON for ID, fallback to text if needed
try:
data = response.json()
if isinstance(data, dict):
simulation_id = data.get("id") or data.get("simulation_id") or data.get("data")
if not simulation_id or isinstance(simulation_id, dict):
simulation_id = str(data.get("id", ""))
else:
simulation_id = str(data)
except Exception:
simulation_id = response.text.strip().strip('"')
if simulation_id:
record.rbd_simulation_id = str(simulation_id)
db_session.add(record)
logger.info(f"Saved simulation_id {simulation_id} for year {record.year}")
else:
logger.error(f"Could not extract simulation ID from response for year {record.year}: {response.text}")
except Exception as e:
logger.error(f"Failed to start simulation for year {record.year}: {e}")
# Commit simulation IDs
await db_session.commit()
logger.info("All simulations started and IDs saved.")
async def fetch_simulation_results(db_session: DbSession):
"""
Fetches EAF data using simulation IDs for years > current year.
Should be called after start_simulations.
"""
# 0. Sign in first
token = await sign_in()
if not token:
logger.error("Aborting fetch_simulation_results because sign-in failed.")
return
headers = {"Authorization": f"Bearer {token}"}
# Get all years after current year
query = select(Yeardata).filter(Yeardata.year > current_year)
result = await db_session.execute(query)
yeardata_list = result.scalars().all()
if not yeardata_list:
logger.info("No yeardata found for years > %s", current_year)
return
async with httpx.AsyncClient() as client:
# Step 2: Fetch EAF data
for record in yeardata_list:
if not record.rbd_simulation_id:
logger.warning(f"No simulation ID for year {record.year}, skipping.")
continue
try:
# GET {{RBD_APP_URL}}/aeros/simulation/result/calc/:simulation_id/plant
url = f"{RBD_APP_URL}/aeros/simulation/result/calc/{record.rbd_simulation_id}/plant"
logger.info(f"Fetching result for year {record.year}: GET {url}")
response = await client.get(url, headers=headers, timeout=60.0)
response.raise_for_status()
result_data = response.json()
# Expected format: { "data": { "eaf": ... } }
eaf_value = result_data.get("data", {}).get("eaf")
if eaf_value is not None:
record.eaf = float(eaf_value)
db_session.add(record)
logger.info(f"Saved EAF {eaf_value} for year {record.year}")
else:
logger.warning(f"No EAF value found in response for year {record.year}")
except Exception as e:
logger.error(f"Failed to fetch EAF for year {record.year}: {e}")
# Commit EAF updates
await db_session.commit()
logger.info("All simulation results fetched and saved.")

@ -0,0 +1,24 @@
import asyncio
import logging
import sys
import os
# Add the project root to sys.path to resolve imports
sys.path.append(os.getcwd())
from src.database.core import get_session
from src.modules.plant.fetch_eaf_from_rbd import start_simulations
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
async def main():
logger.info("Starting simulation trigger script...")
async with get_session() as session:
await start_simulations(session)
logger.info("Simulation trigger script finished.")
if __name__ == "__main__":
asyncio.run(main())

@ -19,4 +19,5 @@ class Yeardata(Base, DefaultMixin, IdentityMixin):
asset_crit_foh_forced_outage_hours = Column(Float, nullable=False) asset_crit_foh_forced_outage_hours = Column(Float, nullable=False)
asset_crit_extra_fuel_cost = Column(Float, nullable=False) asset_crit_extra_fuel_cost = Column(Float, nullable=False)
cf = Column(Float, nullable=False) cf = Column(Float, nullable=False)
eaf = Column(Float, nullable=False) eaf = Column(Float, nullable=False)
rbd_simulation_id = Column(String, nullable=False)

@ -21,6 +21,7 @@ class YeardataBase(DefaultBase):
asset_crit_extra_fuel_cost: Optional[float] = Field(None, nullable=True, ge=0, le=1_000_000_000_000_000) asset_crit_extra_fuel_cost: Optional[float] = Field(None, nullable=True, ge=0, le=1_000_000_000_000_000)
cf: Optional[float] = Field(None, nullable=True, ge=0, le=1_000_000_000_000_000) cf: Optional[float] = Field(None, nullable=True, ge=0, le=1_000_000_000_000_000)
eaf: Optional[float] = Field(None, nullable=True, ge=0, le=1_000_000_000_000_000) eaf: Optional[float] = Field(None, nullable=True, ge=0, le=1_000_000_000_000_000)
rbd_simulation_id: Optional[str] = Field(None, nullable=True)
created_at: Optional[datetime] = Field(None, nullable=True) created_at: Optional[datetime] = Field(None, nullable=True)
updated_at: Optional[datetime] = Field(None, nullable=True) updated_at: Optional[datetime] = Field(None, nullable=True)
created_by: Optional[str] = Field(None, nullable=True) created_by: Optional[str] = Field(None, nullable=True)

Loading…
Cancel
Save