feat: add collector db connection and maximo route for each equipment

main
MrWaradana 2 months ago
parent 69030569b4
commit b467452fed

@ -15,7 +15,7 @@ from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker from sqlalchemy.orm import DeclarativeBase, sessionmaker
from src.config import SQLALCHEMY_DATABASE_URI from src.config import SQLALCHEMY_DATABASE_URI, COLLECTOR_URI
engine = create_async_engine( engine = create_async_engine(
SQLALCHEMY_DATABASE_URI, SQLALCHEMY_DATABASE_URI,
@ -23,6 +23,12 @@ engine = create_async_engine(
future=True future=True
) )
collector_engine = create_async_engine(
COLLECTOR_URI,
echo=False,
future=True
)
async_session = sessionmaker( async_session = sessionmaker(
engine, engine,
class_=AsyncSession, class_=AsyncSession,
@ -31,13 +37,22 @@ async_session = sessionmaker(
autoflush=False, autoflush=False,
) )
collector_async_session = sessionmaker(
collector_engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False,
)
def get_db(request: Request): def get_db(request: Request):
return request.state.db return request.state.db
def get_collector_db(request: Request):
return request.state.collector_db
DbSession = Annotated[AsyncSession, Depends(get_db)] DbSession = Annotated[AsyncSession, Depends(get_db)]
CollectorDbSession = Annotated[AsyncSession, Depends(get_collector_db)]
class CustomBase: class CustomBase:
__repr_attrs__ = [] __repr_attrs__ = []
@ -113,6 +128,18 @@ async def get_session():
finally: finally:
await session.close() await session.close()
@contextmanager
async def get_collector_session():
"""Context manager to ensure the collector session is closed after use."""
session = collector_async_session()
try:
yield session
await session.commit()
except:
await session.rollback()
raise
finally:
await session.close()
def resolve_table_name(name): def resolve_table_name(name):
"""Resolves table names to their mapped names.""" """Resolves table names to their mapped names."""
@ -122,20 +149,6 @@ def resolve_table_name(name):
raise_attribute_error = object() raise_attribute_error = object()
# def resolve_attr(obj, attr, default=None):
# """Attempts to access attr via dotted notation, returns none if attr does not exist."""
# try:
# return functools.reduce(getattr, attr.split("."), obj)
# except AttributeError:
# return default
# def get_model_name_by_tablename(table_fullname: str) -> str:
# """Returns the model name of a given table."""
# return get_class_by_tablename(table_fullname=table_fullname).__name__
def get_class_by_tablename(table_fullname: str) -> Any: def get_class_by_tablename(table_fullname: str) -> Any:
"""Return class reference mapped to table.""" """Return class reference mapped to table."""
@ -149,8 +162,3 @@ def get_class_by_tablename(table_fullname: str) -> Any:
mapped_class = _find_class(mapped_name) mapped_class = _find_class(mapped_name)
return mapped_class return mapped_class
# def get_table_name_by_class_instance(class_instance: Base) -> str:
# """Returns the name of the table for a given class instance."""
# return class_instance._sa_instance_state.mapper.mapped_table.name

@ -23,12 +23,13 @@ from src.equipment.service import (
delete, delete,
generate_all_transaction, generate_all_transaction,
get_top_10_economic_life, get_top_10_economic_life,
get_maximo_by_assetnum
) )
from src.modules.equipment.Prediksi import main as prediksi_main from src.modules.equipment.Prediksi import main as prediksi_main
from src.modules.equipment.Eac import Eac from src.modules.equipment.Eac import Eac
from src.database.service import CommonParameters, search_filter_sort_paginate from src.database.service import CommonParameters, search_filter_sort_paginate
from src.database.core import DbSession from src.database.core import DbSession, CollectorDbSession
from src.auth.service import CurrentUser, Token from src.auth.service import CurrentUser, Token
from src.models import StandardResponse from src.models import StandardResponse
@ -55,6 +56,16 @@ async def get_equipments(
message="Data retrieved successfully", message="Data retrieved successfully",
) )
@router.get("/maximo/{assetnum}", response_model=StandardResponse[List[dict]])
async def get_maximo_record_by_assetnum(db_session: CollectorDbSession, assetnum: str):
record = await get_maximo_by_assetnum(db_session=db_session, assetnum=assetnum)
if not record:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Maximo record not found")
return StandardResponse(
data=record,
message="Maximo record retrieved successfully",
)
@router.get("/simulate/{assetnum}") @router.get("/simulate/{assetnum}")
async def simulate_equipment(db_session: DbSession, assetnum: str): async def simulate_equipment(db_session: DbSession, assetnum: str):
"""Stream progress events while running the simulation (prediksi + EAC). """Stream progress events while running the simulation (prediksi + EAC).

@ -17,6 +17,7 @@ import httpx
from src.modules.equipment.run import main from src.modules.equipment.run import main
import datetime import datetime
import math import math
from sqlalchemy import text
async def get_master_by_assetnum( async def get_master_by_assetnum(
@ -120,6 +121,52 @@ async def get_master_by_assetnum(
) )
# return result.scalars().all() # return result.scalars().all()
async def get_maximo_by_assetnum(*, db_session: DbSession, assetnum: str) -> Optional[MasterBase]:
"""Returns a document based on the given document id."""
# default worktype; change if you need a different filtering
# worktype = "CM"
# where_worktype = (
# "AND a.worktype in ('CM', 'PROACTIVE', 'WA')"
# if worktype == "CM"
# else "AND a.worktype = :worktype"
# )
# where_wojp8 = "AND a.wojp8 != 'S1'" if worktype == "CM" else ""
# sql = f"""
# SELECT
# DATE_PART('year', a.reportdate) AS tahun,
# COUNT(a.wonum) AS raw_{worktype.lower()}_interval,
# SUM(a.actmatcost) AS raw_{worktype.lower()}_material_cost,
# ROUND(SUM(EXTRACT(EPOCH FROM (a.actfinish - a.actstart)) / 3600), 2) AS raw_{worktype.lower()}_labor_time,
# CASE WHEN COUNT(b.laborcode) = 0 THEN 3 ELSE COUNT(b.laborcode) END AS raw_{worktype.lower()}_labor_human
# FROM public.wo_maximo AS a
# LEFT JOIN public.wo_maximo_labtrans AS b ON b.wonum = a.wonum
# WHERE a.asset_unit = '3'
# {where_worktype}
# AND a.asset_assetnum = :assetnum
# AND a.wonum NOT LIKE 'T%'
# {where_wojp8}
# GROUP BY DATE_PART('year', a.reportdate);
# """
sql = f"""
SELECT
*
FROM public.wo_maximo AS a
WHERE a.asset_unit = '3'
AND a.asset_assetnum = '{assetnum}'
AND a.wonum NOT LIKE 'T%'
"""
query = text(sql)
# Pass parameters to execute to avoid bindparam/name mismatches
result = await db_session.execute(query)
record = result.mappings().all()
if record:
return record
return None
async def get_by_assetnum(*, db_session: DbSession, assetnum: str) -> Optional[Equipment]: async def get_by_assetnum(*, db_session: DbSession, assetnum: str) -> Optional[Equipment]:
"""Returns a document based on the given document id.""" """Returns a document based on the given document id."""
print("assetnum service:", assetnum) print("assetnum service:", assetnum)

@ -29,7 +29,7 @@ from src.enums import ResponseStatus
from src.logging import configure_logging from src.logging import configure_logging
from src.rate_limiter import limiter from src.rate_limiter import limiter
from src.api import api_router from src.api import api_router
from src.database.core import engine, async_session from src.database.core import engine, async_session, collector_async_session
from src.exceptions import handle_exception from src.exceptions import handle_exception
@ -70,11 +70,14 @@ async def db_session_middleware(request: Request, call_next):
try: try:
session = async_scoped_session(async_session, scopefunc=get_request_id) session = async_scoped_session(async_session, scopefunc=get_request_id)
request.state.db = session() request.state.db = session()
collector_session = async_scoped_session(collector_async_session, scopefunc=get_request_id)
request.state.collector_db = collector_session()
response = await call_next(request) response = await call_next(request)
except Exception as e: except Exception as e:
raise e from None raise e from None
finally: finally:
await request.state.db.close() await request.state.db.close()
await request.state.collector_db.close()
_request_id_ctx_var.reset(ctx_token) _request_id_ctx_var.reset(ctx_token)
return response return response

@ -159,8 +159,8 @@ class Eac:
# Total NPV pada titik proyeksi ini = NPV aktual terakhir + biaya proyeksi yang didiskontokan # Total NPV pada titik proyeksi ini = NPV aktual terakhir + biaya proyeksi yang didiskontokan
final_value = float(last_npv) + float(discounted_proj) final_value = float(last_npv) + float(discounted_proj)
# Gunakan seq penuh (jumlah periode dari akuisisi) untuk menghitung pembayaran tahunan tingkat # Gunakan seq penuh (jumlah periode dari akuisisi) untuk menghitung pembayaran tahunan pemeliharaan.
# pemeliharaan. Menggunakan hanya selisih dari seq aktual terakhir # Menggunakan hanya selisih dari seq aktual terakhir
# (sisa_periode) mengamortisasi seluruh nilai sekarang selama # (sisa_periode) mengamortisasi seluruh nilai sekarang selama
# sejumlah periode yang sangat kecil untuk proyeksi pertama dan menghasilkan lonjakan. # sejumlah periode yang sangat kecil untuk proyeksi pertama dan menghasilkan lonjakan.
# Menggunakan row["seq"] menjaga periode amortisasi konsisten dengan perhitungan lain # Menggunakan row["seq"] menjaga periode amortisasi konsisten dengan perhitungan lain

Loading…
Cancel
Save