diff --git a/tests/conftest.py b/tests/conftest.py index 04fa40a..29f621b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,115 +1,115 @@ -import os +# import os -# Set dummy environment variables for testing -os.environ["DATABASE_HOSTNAME"] = "localhost" -os.environ["DATABASE_CREDENTIAL_USER"] = "test" -os.environ["DATABASE_CREDENTIAL_PASSWORD"] = "test" -os.environ["COLLECTOR_CREDENTIAL_USER"] = "test" -os.environ["COLLECTOR_CREDENTIAL_PASSWORD"] = "test" -os.environ["DEV_USERNAME"] = "test" -os.environ["DEV_PASSWORD"] = "test" +# # Set dummy environment variables for testing +# os.environ["DATABASE_HOSTNAME"] = "localhost" +# os.environ["DATABASE_CREDENTIAL_USER"] = "test" +# os.environ["DATABASE_CREDENTIAL_PASSWORD"] = "test" +# os.environ["COLLECTOR_CREDENTIAL_USER"] = "test" +# os.environ["COLLECTOR_CREDENTIAL_PASSWORD"] = "test" +# os.environ["DEV_USERNAME"] = "test" +# os.environ["DEV_PASSWORD"] = "test" -import asyncio -from typing import AsyncGenerator, Generator -import pytest -import pytest_asyncio -from httpx import AsyncClient, ASGITransport -from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine -from sqlalchemy.orm import sessionmaker -from sqlalchemy.pool import StaticPool -from fastapi import Request +# import asyncio +# from typing import AsyncGenerator, Generator +# import pytest +# import pytest_asyncio +# from httpx import AsyncClient, ASGITransport +# from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine +# from sqlalchemy.orm import sessionmaker +# from sqlalchemy.pool import StaticPool +# from fastapi import Request -from src.main import app -from src.database.core import Base, get_db, get_collector_db -from src.auth.service import JWTBearer -from src.auth.model import UserBase +# from src.main import app +# from src.database.core import Base, get_db, get_collector_db +# from src.auth.service import JWTBearer +# from src.auth.model import UserBase -# Import all models to register them with Base -import src.acquisition_cost.model -import src.equipment.model -import src.equipment_master.model -import src.manpower_cost.model -import src.manpower_master.model -import src.masterdata.model -import src.masterdata_simulations.model -import src.plant_fs_transaction_data.model -import src.plant_masterdata.model -import src.plant_transaction_data.model -import src.plant_transaction_data_simulations.model -import src.simulations.model -import src.uploaded_file.model -import src.yeardata.model +# # Import all models to register them with Base +# import src.acquisition_cost.model +# import src.equipment.model +# import src.equipment_master.model +# import src.manpower_cost.model +# import src.manpower_master.model +# import src.masterdata.model +# import src.masterdata_simulations.model +# import src.plant_fs_transaction_data.model +# import src.plant_masterdata.model +# import src.plant_transaction_data.model +# import src.plant_transaction_data_simulations.model +# import src.simulations.model +# import src.uploaded_file.model +# import src.yeardata.model -# Test database URL -TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:" +# # Test database URL +# TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:" -engine = create_async_engine( - TEST_DATABASE_URL, - connect_args={"check_same_thread": False}, - poolclass=StaticPool, -) +# engine = create_async_engine( +# TEST_DATABASE_URL, +# connect_args={"check_same_thread": False}, +# poolclass=StaticPool, +# ) -TestingSessionLocal = sessionmaker( - engine, - class_=AsyncSession, - expire_on_commit=False, - autocommit=False, - autoflush=False, -) +# TestingSessionLocal = sessionmaker( +# engine, +# class_=AsyncSession, +# expire_on_commit=False, +# autocommit=False, +# autoflush=False, +# ) -def pytest_sessionfinish(session, exitstatus): - """ - Called after whole test run finished, right before returning the exit status to the system. - Used here to dispose of all SQLAlchemy engines to prevent hanging. - """ - from src.database.core import engine as db_engine, collector_engine +# def pytest_sessionfinish(session, exitstatus): +# """ +# Called after whole test run finished, right before returning the exit status to the system. +# Used here to dispose of all SQLAlchemy engines to prevent hanging. +# """ +# from src.database.core import engine as db_engine, collector_engine - async def dispose_all(): - # Dispose of both test engine and production engines - await engine.dispose() - await db_engine.dispose() - await collector_engine.dispose() +# async def dispose_all(): +# # Dispose of both test engine and production engines +# await engine.dispose() +# await db_engine.dispose() +# await collector_engine.dispose() - try: - loop = asyncio.get_event_loop() - if loop.is_running(): - # If the loop is already running, we create a task - loop.create_task(dispose_all()) - else: - loop.run_until_complete(dispose_all()) - except Exception: - # Fallback for environment where no loop is available or loop is closed - try: - asyncio.run(dispose_all()) - except Exception: - pass +# try: +# loop = asyncio.get_event_loop() +# if loop.is_running(): +# # If the loop is already running, we create a task +# loop.create_task(dispose_all()) +# else: +# loop.run_until_complete(dispose_all()) +# except Exception: +# # Fallback for environment where no loop is available or loop is closed +# try: +# asyncio.run(dispose_all()) +# except Exception: +# pass -# Removed custom event_loop fixture +# # Removed custom event_loop fixture -@pytest_asyncio.fixture(autouse=True) -async def setup_db(): - async with engine.begin() as conn: - await conn.run_sync(Base.metadata.create_all) - yield - async with engine.begin() as conn: - await conn.run_sync(Base.metadata.drop_all) +# @pytest_asyncio.fixture(autouse=True) +# async def setup_db(): +# async with engine.begin() as conn: +# await conn.run_sync(Base.metadata.create_all) +# yield +# async with engine.begin() as conn: +# await conn.run_sync(Base.metadata.drop_all) -async def override_get_db(request: Request = None): - async with TestingSessionLocal() as session: - yield session +# async def override_get_db(request: Request = None): +# async with TestingSessionLocal() as session: +# yield session -app.dependency_overrides[get_db] = override_get_db -app.dependency_overrides[get_collector_db] = override_get_db +# app.dependency_overrides[get_db] = override_get_db +# app.dependency_overrides[get_collector_db] = override_get_db -@pytest.fixture(autouse=True) -def mock_auth(monkeypatch): - async def mock_call(self, request: Request): - user = UserBase(user_id="test-id", name="test-user", role="admin") - request.state.user = user - return user - monkeypatch.setattr(JWTBearer, "__call__", mock_call) +# @pytest.fixture(autouse=True) +# def mock_auth(monkeypatch): +# async def mock_call(self, request: Request): +# user = UserBase(user_id="test-id", name="test-user", role="admin") +# request.state.user = user +# return user +# monkeypatch.setattr(JWTBearer, "__call__", mock_call) -@pytest_asyncio.fixture -async def client() -> AsyncGenerator[AsyncClient, None]: - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: - yield client \ No newline at end of file +# @pytest_asyncio.fixture +# async def client() -> AsyncGenerator[AsyncClient, None]: +# async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: +# yield client \ No newline at end of file