From 996d1aa9d709e71ac14353ff2225684df9911d36 Mon Sep 17 00:00:00 2001 From: MrWaradana Date: Tue, 16 Dec 2025 10:26:21 +0700 Subject: [PATCH] fix onnection closed error --- src/database/core.py | 5 +- src/equipment/service.py | 101 ++++++++++++------ .../__pycache__/schema.cpython-311.pyc | Bin 3658 -> 3658 bytes 3 files changed, 69 insertions(+), 37 deletions(-) diff --git a/src/database/core.py b/src/database/core.py index b029e0e..8202f9f 100644 --- a/src/database/core.py +++ b/src/database/core.py @@ -14,6 +14,7 @@ import functools from typing import AsyncGenerator from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import DeclarativeBase, sessionmaker +from contextlib import asynccontextmanager from src.config import SQLALCHEMY_DATABASE_URI, COLLECTOR_URI @@ -115,7 +116,7 @@ Base = declarative_base(cls=CustomBase) # make_searchable(Base.metadata) -@contextmanager +@asynccontextmanager async def get_session(): """Context manager to ensure the session is closed after use.""" session = async_session() @@ -128,7 +129,7 @@ async def get_session(): finally: await session.close() -@contextmanager +@asynccontextmanager async def get_collector_session(): """Context manager to ensure the collector session is closed after use.""" session = collector_async_session() diff --git a/src/equipment/service.py b/src/equipment/service.py index 5987ac3..01ccd9d 100644 --- a/src/equipment/service.py +++ b/src/equipment/service.py @@ -1,6 +1,12 @@ import os -from sqlalchemy import Select, Delete, Float, func, cast, String +import logging +from typing import Optional, TypedDict, Any + +from sqlalchemy import Select, Delete, Float, func, cast, String, text from sqlalchemy.orm import selectinload +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncSession +from asyncpg.exceptions import InterfaceError as AsyncpgInterfaceError from src.database.service import search_filter_sort_paginate from src.equipment.model import Equipment, EquipmentTransactionRecords @@ -8,9 +14,8 @@ from src.acquisition_cost.model import AcquisitionData from src.yeardata.model import Yeardata from ..equipment_master.model import EquipmentMaster from .schema import EquipmentCreate, EquipmentUpdate, MasterBase -from typing import Optional, TypedDict -from src.database.core import DbSession, CollectorDbSession +from src.database.core import DbSession, CollectorDbSession, collector_async_session from src.auth.service import CurrentUser from src.config import RELIABILITY_APP_URL import httpx @@ -20,7 +25,6 @@ from src.modules.equipment.Prediksi import main as predict_main from src.modules.equipment.Eac import main as eac_main import datetime import math -from sqlalchemy import text class CategoryRule(TypedDict, total=False): @@ -114,6 +118,57 @@ def _build_category_rollup_children() -> dict[str, set[str]]: CATEGORY_ROLLUP_CHILDREN = _build_category_rollup_children() +logger = logging.getLogger(__name__) + +MAXIMO_SQL = text( + """ + SELECT + * + FROM public.wo_maximo AS a + WHERE a.asset_unit = '3' + AND a.asset_assetnum = :assetnum + AND a.wonum NOT LIKE 'T%' + """ +) + + +async def _fetch_maximo_records( + *, session: AsyncSession, assetnum: str +) -> list[dict[str, Any]]: + """Fetch Maximo rows with a retry to mask transient collector failures.""" + + query = MAXIMO_SQL.bindparams(assetnum=assetnum) + + try: + result = await session.execute(query) + return result.mappings().all() + except AsyncpgInterfaceError as exc: + logger.warning( + "Collector session closed while fetching Maximo data for %s. Retrying once.", + assetnum, + ) + try: + async with collector_async_session() as retry_session: + retry_result = await retry_session.execute(query) + return retry_result.mappings().all() + except Exception as retry_exc: + logger.error( + "Retrying Maximo query failed for %s: %s", + assetnum, + retry_exc, + exc_info=True, + ) + except SQLAlchemyError as exc: + logger.error( + "Failed to fetch Maximo data for %s: %s", assetnum, exc, exc_info=True + ) + except Exception as exc: + logger.exception( + "Unexpected error while fetching Maximo data for %s", assetnum + ) + + return [] + async def fetch_acquisition_cost_with_rollup( *, db_session: DbSession, base_category_no: str @@ -231,19 +286,9 @@ async def get_master_by_assetnum( min_seq = equipment_record.minimum_eac_seq if equipment_record else None min_eac_year = equipment_record.minimum_eac_year if equipment_record else None - maximo_query = f""" - SELECT - * - FROM public.wo_maximo AS a - WHERE a.asset_unit = '3' - AND a.asset_assetnum = '{assetnum}' - AND a.wonum NOT LIKE 'T%' - """ - - query = text(maximo_query) - # Pass parameters to execute to avoid bindparam/name mismatches - result_maximo = await collector_db_session.execute(query) - maximo_record = result_maximo.mappings().all() + maximo_record = await _fetch_maximo_records( + session=collector_db_session, assetnum=assetnum + ) return ( equipment_master_record, @@ -257,8 +302,8 @@ async def get_master_by_assetnum( ) # return result.scalars().all() -async def get_maximo_by_assetnum(*, db_session: DbSession, assetnum: str) -> Optional[MasterBase]: - """Returns a document based on the given document id.""" +async def get_maximo_by_assetnum(*, db_session: CollectorDbSession, assetnum: str) -> Optional[MasterBase]: + """Return Maximo collector rows for the provided asset number.""" # default worktype; change if you need a different filtering # worktype = "CM" @@ -286,22 +331,8 @@ async def get_maximo_by_assetnum(*, db_session: DbSession, assetnum: str) -> Opt # GROUP BY DATE_PART('year', a.reportdate); # """ - sql = f""" - SELECT - * - FROM public.wo_maximo AS a - WHERE a.asset_unit = '3' - AND a.asset_assetnum = '{assetnum}' - AND a.wonum NOT LIKE 'T%' - """ - - query = text(sql) - # Pass parameters to execute to avoid bindparam/name mismatches - result = await db_session.execute(query) - record = result.mappings().all() - if record: - return record - return None + record = await _fetch_maximo_records(session=db_session, assetnum=assetnum) + return record or None async def get_by_assetnum(*, db_session: DbSession, assetnum: str) -> Optional[Equipment]: """Returns a document based on the given document id.""" diff --git a/src/masterdata/__pycache__/schema.cpython-311.pyc b/src/masterdata/__pycache__/schema.cpython-311.pyc index cdaf5c7a43e1ef9c8710aaf1787a6ce3e54c5a2e..9fab2405e9ec4e78f755854bd8c8f6c58873239b 100644 GIT binary patch delta 204 zcmX>lb4rGHIWI340}$M~WuG~9BX0mZBiH0Oc1dk+8=xQ)Ln?C$R|?Y_=4DI_46A_{ z0-|_Qc~jX^ctC1WxF#=Rx90XM5(9FI#DT=-7wp`Ob|x!SE{N)16xF{Xs^8$*QG9_# zABZlp=-=QM=_s8LvcTm6zve}L%`5zx4bC0H7g#ib=pvW|GBz7>+OVi;ePG~VL?lb4rGHIWI340}vE`GtZp7kvD*Saw(GtJ9jE;Dq9NIZeAdZ+Xkp4g=g|2 zc5Ch;F(A8097yk&e;{!3#_- z@M~P;*SNy3(cs)6e1Sy+h%SOjAY-#3rwxmm<_882Ms#9A-A9n<7r4~q$-LVEorf~T