fix onnection closed error

main
MrWaradana 4 weeks ago
parent 9ae181232b
commit 996d1aa9d7

@ -14,6 +14,7 @@ import functools
from typing import AsyncGenerator from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker from sqlalchemy.orm import DeclarativeBase, sessionmaker
from contextlib import asynccontextmanager
from src.config import SQLALCHEMY_DATABASE_URI, COLLECTOR_URI from src.config import SQLALCHEMY_DATABASE_URI, COLLECTOR_URI
@ -115,7 +116,7 @@ Base = declarative_base(cls=CustomBase)
# make_searchable(Base.metadata) # make_searchable(Base.metadata)
@contextmanager @asynccontextmanager
async def get_session(): async def get_session():
"""Context manager to ensure the session is closed after use.""" """Context manager to ensure the session is closed after use."""
session = async_session() session = async_session()
@ -128,7 +129,7 @@ async def get_session():
finally: finally:
await session.close() await session.close()
@contextmanager @asynccontextmanager
async def get_collector_session(): async def get_collector_session():
"""Context manager to ensure the collector session is closed after use.""" """Context manager to ensure the collector session is closed after use."""
session = collector_async_session() session = collector_async_session()

@ -1,6 +1,12 @@
import os import os
from sqlalchemy import Select, Delete, Float, func, cast, String import logging
from typing import Optional, TypedDict, Any
from sqlalchemy import Select, Delete, Float, func, cast, String, text
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from asyncpg.exceptions import InterfaceError as AsyncpgInterfaceError
from src.database.service import search_filter_sort_paginate from src.database.service import search_filter_sort_paginate
from src.equipment.model import Equipment, EquipmentTransactionRecords from src.equipment.model import Equipment, EquipmentTransactionRecords
@ -8,9 +14,8 @@ from src.acquisition_cost.model import AcquisitionData
from src.yeardata.model import Yeardata from src.yeardata.model import Yeardata
from ..equipment_master.model import EquipmentMaster from ..equipment_master.model import EquipmentMaster
from .schema import EquipmentCreate, EquipmentUpdate, MasterBase from .schema import EquipmentCreate, EquipmentUpdate, MasterBase
from typing import Optional, TypedDict
from src.database.core import DbSession, CollectorDbSession from src.database.core import DbSession, CollectorDbSession, collector_async_session
from src.auth.service import CurrentUser from src.auth.service import CurrentUser
from src.config import RELIABILITY_APP_URL from src.config import RELIABILITY_APP_URL
import httpx import httpx
@ -20,7 +25,6 @@ from src.modules.equipment.Prediksi import main as predict_main
from src.modules.equipment.Eac import main as eac_main from src.modules.equipment.Eac import main as eac_main
import datetime import datetime
import math import math
from sqlalchemy import text
class CategoryRule(TypedDict, total=False): class CategoryRule(TypedDict, total=False):
@ -114,6 +118,57 @@ def _build_category_rollup_children() -> dict[str, set[str]]:
CATEGORY_ROLLUP_CHILDREN = _build_category_rollup_children() CATEGORY_ROLLUP_CHILDREN = _build_category_rollup_children()
logger = logging.getLogger(__name__)
MAXIMO_SQL = text(
"""
SELECT
*
FROM public.wo_maximo AS a
WHERE a.asset_unit = '3'
AND a.asset_assetnum = :assetnum
AND a.wonum NOT LIKE 'T%'
"""
)
async def _fetch_maximo_records(
*, session: AsyncSession, assetnum: str
) -> list[dict[str, Any]]:
"""Fetch Maximo rows with a retry to mask transient collector failures."""
query = MAXIMO_SQL.bindparams(assetnum=assetnum)
try:
result = await session.execute(query)
return result.mappings().all()
except AsyncpgInterfaceError as exc:
logger.warning(
"Collector session closed while fetching Maximo data for %s. Retrying once.",
assetnum,
)
try:
async with collector_async_session() as retry_session:
retry_result = await retry_session.execute(query)
return retry_result.mappings().all()
except Exception as retry_exc:
logger.error(
"Retrying Maximo query failed for %s: %s",
assetnum,
retry_exc,
exc_info=True,
)
except SQLAlchemyError as exc:
logger.error(
"Failed to fetch Maximo data for %s: %s", assetnum, exc, exc_info=True
)
except Exception as exc:
logger.exception(
"Unexpected error while fetching Maximo data for %s", assetnum
)
return []
async def fetch_acquisition_cost_with_rollup( async def fetch_acquisition_cost_with_rollup(
*, db_session: DbSession, base_category_no: str *, db_session: DbSession, base_category_no: str
@ -231,19 +286,9 @@ async def get_master_by_assetnum(
min_seq = equipment_record.minimum_eac_seq if equipment_record else None min_seq = equipment_record.minimum_eac_seq if equipment_record else None
min_eac_year = equipment_record.minimum_eac_year if equipment_record else None min_eac_year = equipment_record.minimum_eac_year if equipment_record else None
maximo_query = f""" maximo_record = await _fetch_maximo_records(
SELECT session=collector_db_session, assetnum=assetnum
* )
FROM public.wo_maximo AS a
WHERE a.asset_unit = '3'
AND a.asset_assetnum = '{assetnum}'
AND a.wonum NOT LIKE 'T%'
"""
query = text(maximo_query)
# Pass parameters to execute to avoid bindparam/name mismatches
result_maximo = await collector_db_session.execute(query)
maximo_record = result_maximo.mappings().all()
return ( return (
equipment_master_record, equipment_master_record,
@ -257,8 +302,8 @@ async def get_master_by_assetnum(
) )
# return result.scalars().all() # return result.scalars().all()
async def get_maximo_by_assetnum(*, db_session: DbSession, assetnum: str) -> Optional[MasterBase]: async def get_maximo_by_assetnum(*, db_session: CollectorDbSession, assetnum: str) -> Optional[MasterBase]:
"""Returns a document based on the given document id.""" """Return Maximo collector rows for the provided asset number."""
# default worktype; change if you need a different filtering # default worktype; change if you need a different filtering
# worktype = "CM" # worktype = "CM"
@ -286,22 +331,8 @@ async def get_maximo_by_assetnum(*, db_session: DbSession, assetnum: str) -> Opt
# GROUP BY DATE_PART('year', a.reportdate); # GROUP BY DATE_PART('year', a.reportdate);
# """ # """
sql = f""" record = await _fetch_maximo_records(session=db_session, assetnum=assetnum)
SELECT return record or None
*
FROM public.wo_maximo AS a
WHERE a.asset_unit = '3'
AND a.asset_assetnum = '{assetnum}'
AND a.wonum NOT LIKE 'T%'
"""
query = text(sql)
# Pass parameters to execute to avoid bindparam/name mismatches
result = await db_session.execute(query)
record = result.mappings().all()
if record:
return record
return None
async def get_by_assetnum(*, db_session: DbSession, assetnum: str) -> Optional[Equipment]: async def get_by_assetnum(*, db_session: DbSession, assetnum: str) -> Optional[Equipment]:
"""Returns a document based on the given document id.""" """Returns a document based on the given document id."""

Loading…
Cancel
Save