from __future__ import annotations import atexit import hashlib import os import sys from dataclasses import dataclass from pathlib import Path from typing import Any from sqlalchemy import create_engine, text from sqlalchemy.engine import Engine, make_url REPO_ROOT = Path(__file__).resolve().parents[1] DEFAULT_TEST_DATABASE_URL = ( "postgresql+psycopg://admin:hFOvG5UBeK5KIGhz5cQH@git.newpoint.work:5432/vectordb" ) _CREATED_TEST_SCHEMAS: set[str] = set() @dataclass(frozen=True) class ServiceImportConfig: service_name: str libs: tuple[str, ...] SERVICE_IMPORT_CONFIGS: dict[str, ServiceImportConfig] = { "agent-service": ServiceImportConfig( service_name="agent-service", libs=("core-domain", "core-shared", "core-db", "core-events")), "api-gateway": ServiceImportConfig( service_name="api-gateway", libs=("core-domain", "core-shared", "core-db")), "auth-service": ServiceImportConfig( service_name="auth-service", libs=("core-domain", "core-shared", "core-db")), "event-service": ServiceImportConfig( service_name="event-service", libs=("core-domain", "core-shared", "core-db", "core-events")), "human-service": ServiceImportConfig( service_name="human-service", libs=("core-domain", "core-shared", "core-db")), "knowledge-service": ServiceImportConfig( service_name="knowledge-service", libs=("core-domain", "core-shared", "core-db")), "memory-service": ServiceImportConfig( service_name="memory-service", libs=("core-domain", "core-shared", "core-db")), "model-gateway-service": ServiceImportConfig( service_name="model-gateway-service", libs=("core-domain", "core-shared", "core-db")), "scheduler-service": ServiceImportConfig( service_name="scheduler-service", libs=("core-domain", "core-shared", "core-db")), "session-service": ServiceImportConfig( service_name="session-service", libs=("core-domain", "core-shared", "core-db")), "skill-service": ServiceImportConfig( service_name="skill-service", libs=("core-domain", "core-shared", "core-db")), "tool-service": ServiceImportConfig( service_name="tool-service", libs=("core-domain", "core-shared", "core-db")), "team-service": ServiceImportConfig( service_name="team-service", libs=("core-domain", "core-shared", "core-db", "core-events")), } def prepare_service_import( service_name: str, *, libs: tuple[str, ...]) -> None: for module_name in list(sys.modules): if module_name == "app" or module_name.startswith("app."): del sys.modules[module_name] _clear_shared_sqlalchemy_metadata() for lib_name in libs: lib_path = REPO_ROOT / "libs" / lib_name / "src" _prepend_sys_path(lib_path) _prepend_sys_path(REPO_ROOT / "services" / service_name) def prepare_known_service_import(service_name: str) -> None: config = SERVICE_IMPORT_CONFIGS[service_name] prepare_service_import(config.service_name, libs=config.libs) def build_postgres_database_url(tmp_path: Path, filename: str) -> str: schema_name = _build_test_schema_name(tmp_path=tmp_path, filename=filename) base_url = _base_test_database_url() _create_postgres_schema(base_url=base_url, schema_name=schema_name) url = make_url(base_url) query = dict(url.query) query["options"] = f"-csearch_path={schema_name}" return url.set(query=query).render_as_string(hide_password=False) def build_postgres_engine(database_url: str) -> Engine: return create_engine(database_url, pool_pre_ping=True) def _prepend_sys_path(path: Path) -> None: path_text = str(path) if path_text in sys.path: sys.path.remove(path_text) sys.path.insert(0, path_text) def _clear_shared_sqlalchemy_metadata() -> None: try: from core_db import Base except ImportError: return Base.registry.dispose() Base.metadata.clear() def _base_test_database_url() -> str: return ( os.getenv("AGENT_PLATFORM_TEST_DATABASE_URL") or os.getenv("AGENT_PLATFORM_DATABASE_URL") or DEFAULT_TEST_DATABASE_URL ) def _build_test_schema_name(*, tmp_path: Path, filename: str) -> str: digest = hashlib.sha1(str(tmp_path / filename).encode("utf-8")).hexdigest()[:16] return f"test_{digest}" def _create_postgres_schema(*, base_url: str, schema_name: str) -> None: engine = create_engine(base_url, isolation_level="AUTOCOMMIT", pool_pre_ping=True) with engine.connect() as connection: connection.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) connection.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{schema_name}"')) _CREATED_TEST_SCHEMAS.add(schema_name) engine.dispose() def _drop_created_postgres_schemas() -> None: if not _CREATED_TEST_SCHEMAS: return engine = create_engine(_base_test_database_url(), isolation_level="AUTOCOMMIT") with engine.connect() as connection: for schema_name in sorted(_CREATED_TEST_SCHEMAS): connection.execute(text(f'DROP SCHEMA IF EXISTS "{schema_name}" CASCADE')) engine.dispose() atexit.register(_drop_created_postgres_schemas) def build_fastapi_test_client(app: Any) -> Any: _patch_httpx_testclient_compatibility() from fastapi.testclient import TestClient return TestClient(app) def _patch_httpx_testclient_compatibility() -> None: import inspect import httpx if "app" in inspect.signature(httpx.Client.__init__).parameters: return if getattr(httpx.Client.__init__, "_agent_platform_patched", False): return original_init = httpx.Client.__init__ def patched_init(self: httpx.Client, *args: Any, **kwargs: Any) -> None: kwargs.pop("app", None) original_init(self, *args, **kwargs) patched_init._agent_platform_patched = True httpx.Client.__init__ = patched_init