| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- from __future__ import annotations
- import atexit
- import hashlib
- import os
- import sys
- from dataclasses import dataclass
- from pathlib import Path
- from typing import Any
- from sqlalchemy import create_engine, text
- from sqlalchemy.engine import Engine, make_url
- REPO_ROOT = Path(__file__).resolve().parents[1]
- DEFAULT_TEST_DATABASE_URL = (
- "postgresql+psycopg://admin:hFOvG5UBeK5KIGhz5cQH@git.newpoint.work:5432/vectordb"
- )
- _CREATED_TEST_SCHEMAS: set[str] = set()
- @dataclass(frozen=True)
- class ServiceImportConfig:
- service_name: str
- libs: tuple[str, ...]
- SERVICE_IMPORT_CONFIGS: dict[str, ServiceImportConfig] = {
- "agent-service": ServiceImportConfig(
- service_name="agent-service",
- libs=("core-domain", "core-shared", "core-db", "core-events")),
- "api-gateway": ServiceImportConfig(
- service_name="api-gateway",
- libs=("core-domain", "core-shared", "core-db")),
- "auth-service": ServiceImportConfig(
- service_name="auth-service",
- libs=("core-domain", "core-shared", "core-db")),
- "event-service": ServiceImportConfig(
- service_name="event-service",
- libs=("core-domain", "core-shared", "core-db", "core-events")),
- "human-service": ServiceImportConfig(
- service_name="human-service",
- libs=("core-domain", "core-shared", "core-db")),
- "knowledge-service": ServiceImportConfig(
- service_name="knowledge-service",
- libs=("core-domain", "core-shared", "core-db")),
- "memory-service": ServiceImportConfig(
- service_name="memory-service",
- libs=("core-domain", "core-shared", "core-db")),
- "model-gateway-service": ServiceImportConfig(
- service_name="model-gateway-service",
- libs=("core-domain", "core-shared", "core-db")),
- "scheduler-service": ServiceImportConfig(
- service_name="scheduler-service",
- libs=("core-domain", "core-shared", "core-db")),
- "session-service": ServiceImportConfig(
- service_name="session-service",
- libs=("core-domain", "core-shared", "core-db")),
- "skill-service": ServiceImportConfig(
- service_name="skill-service",
- libs=("core-domain", "core-shared", "core-db")),
- "tool-service": ServiceImportConfig(
- service_name="tool-service",
- libs=("core-domain", "core-shared", "core-db")),
- "team-service": ServiceImportConfig(
- service_name="team-service",
- libs=("core-domain", "core-shared", "core-db", "core-events")),
- }
- def prepare_service_import(
- service_name: str,
- *,
- libs: tuple[str, ...]) -> None:
- for module_name in list(sys.modules):
- if module_name == "app" or module_name.startswith("app."):
- del sys.modules[module_name]
- _clear_shared_sqlalchemy_metadata()
- for lib_name in libs:
- lib_path = REPO_ROOT / "libs" / lib_name / "src"
- _prepend_sys_path(lib_path)
- _prepend_sys_path(REPO_ROOT / "services" / service_name)
- def prepare_known_service_import(service_name: str) -> None:
- config = SERVICE_IMPORT_CONFIGS[service_name]
- prepare_service_import(config.service_name, libs=config.libs)
- def build_postgres_database_url(tmp_path: Path, filename: str) -> str:
- schema_name = _build_test_schema_name(tmp_path=tmp_path, filename=filename)
- base_url = _base_test_database_url()
- _create_postgres_schema(base_url=base_url, schema_name=schema_name)
- url = make_url(base_url)
- query = dict(url.query)
- query["options"] = f"-csearch_path={schema_name}"
- return url.set(query=query).render_as_string(hide_password=False)
- def build_postgres_engine(database_url: str) -> Engine:
- return create_engine(database_url, pool_pre_ping=True)
- def _prepend_sys_path(path: Path) -> None:
- path_text = str(path)
- if path_text in sys.path:
- sys.path.remove(path_text)
- sys.path.insert(0, path_text)
- def _clear_shared_sqlalchemy_metadata() -> None:
- try:
- from core_db import Base
- except ImportError:
- return
- Base.registry.dispose()
- Base.metadata.clear()
- def _base_test_database_url() -> str:
- return (
- os.getenv("AGENT_PLATFORM_TEST_DATABASE_URL")
- or os.getenv("AGENT_PLATFORM_DATABASE_URL")
- or DEFAULT_TEST_DATABASE_URL
- )
- def _build_test_schema_name(*, tmp_path: Path, filename: str) -> str:
- digest = hashlib.sha1(str(tmp_path / filename).encode("utf-8")).hexdigest()[:16]
- return f"test_{digest}"
- def _create_postgres_schema(*, base_url: str, schema_name: str) -> None:
- engine = create_engine(base_url, isolation_level="AUTOCOMMIT", pool_pre_ping=True)
- with engine.connect() as connection:
- connection.execute(text("CREATE EXTENSION IF NOT EXISTS vector"))
- connection.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{schema_name}"'))
- _CREATED_TEST_SCHEMAS.add(schema_name)
- engine.dispose()
- def _drop_created_postgres_schemas() -> None:
- if not _CREATED_TEST_SCHEMAS:
- return
- engine = create_engine(_base_test_database_url(), isolation_level="AUTOCOMMIT")
- with engine.connect() as connection:
- for schema_name in sorted(_CREATED_TEST_SCHEMAS):
- connection.execute(text(f'DROP SCHEMA IF EXISTS "{schema_name}" CASCADE'))
- engine.dispose()
- atexit.register(_drop_created_postgres_schemas)
- def build_fastapi_test_client(app: Any) -> Any:
- _patch_httpx_testclient_compatibility()
- from fastapi.testclient import TestClient
- return TestClient(app)
- def _patch_httpx_testclient_compatibility() -> None:
- import inspect
- import httpx
- if "app" in inspect.signature(httpx.Client.__init__).parameters:
- return
- if getattr(httpx.Client.__init__, "_agent_platform_patched", False):
- return
- original_init = httpx.Client.__init__
- def patched_init(self: httpx.Client, *args: Any, **kwargs: Any) -> None:
- kwargs.pop("app", None)
- original_init(self, *args, **kwargs)
- patched_init._agent_platform_patched = True
- httpx.Client.__init__ = patched_init
|