| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- from __future__ import annotations
- from pathlib import Path
- from tests.conftest import build_postgres_database_url, prepare_known_service_import
- class FakeRedis:
- def __init__(self) -> None:
- self.values: dict[str, bytes | str] = {}
- self.queues: dict[str, list[bytes | str]] = {}
- def ping(self) -> bool:
- return True
- def rpush(self, name: str, value: bytes | str) -> int:
- queue = self.queues.setdefault(name, [])
- queue.append(value)
- return len(queue)
- def blpop(self, names: list[str], timeout: int = 1) -> tuple[str, bytes | str] | None:
- for name in names:
- queue = self.queues.get(name) or []
- if queue:
- return name, queue.pop(0)
- return None
- def set(
- self,
- name: str,
- value: bytes | str,
- nx: bool = False,
- ex: int | None = None) -> bool:
- if nx and name in self.values:
- return False
- self.values[name] = value
- return True
- def get(self, name: str) -> bytes | str | None:
- return self.values.get(name)
- def incr(self, name: str) -> int:
- current = self.get(name)
- if isinstance(current, bytes):
- current = current.decode("utf-8")
- next_value = int(current or "0") + 1
- self.values[name] = str(next_value)
- return next_value
- def eval(self, script: str, numkeys: int, name: str, token: str) -> int:
- if self.values.get(name) == token:
- self.values.pop(name, None)
- return 1
- return 0
- def delete(self, name: str) -> int:
- existed = name in self.values
- self.values.pop(name, None)
- return 1 if existed else 0
- def test_memory_search_uses_redis_cache_and_touch_queue(tmp_path: Path) -> None:
- prepare_known_service_import("memory-service")
- from app.application.services import MemoryApplicationService
- from app.bootstrap.settings import MemoryServiceSettings
- from app.db.models import Base
- from app.db.session import build_session_factory
- from app.domain.repositories import MemoryItemRepository
- from app.schemas.memory import MemoryCreateRequest, MemorySearchRequestDto
- from core_shared.task_queue import MEMORY_TOUCH_QUEUE, TaskQueuePublisher
- fake_redis = FakeRedis()
- settings = MemoryServiceSettings(
- database_url=build_postgres_database_url(tmp_path, "memory-redis-jobs"),
- search_cache_ttl_seconds=60,
- async_touch_enabled=True)
- session_factory = build_session_factory(settings)
- Base.metadata.create_all(bind=session_factory.kw["bind"])
- with session_factory() as db:
- service = MemoryApplicationService(
- memory_repository=MemoryItemRepository(db),
- settings=settings,
- redis_client=fake_redis,
- task_queue_publisher=TaskQueuePublisher(client=fake_redis))
- memory = service.create_memory(
- MemoryCreateRequest(
- scope_type="user",
- scope_id="u1",
- memory_type="preference",
- content_text="用户喜欢 Redis 后台任务。",
- importance_score=80))
- first_results = service.search_memories_contract(
- MemorySearchRequestDto(query="Redis 后台", scopeType="user", scopeId="u1"))
- second_results = service.search_memories_contract(
- MemorySearchRequestDto(query="Redis 后台", scopeType="user", scopeId="u1"))
- assert first_results[0][0].id == memory.id
- assert second_results[0][0].id == memory.id
- assert any(key.startswith("memory-search:") for key in fake_redis.values)
- assert len(fake_redis.queues[MEMORY_TOUCH_QUEUE]) == 2
- session_factory.kw["bind"].dispose()
- def test_tool_mcp_connect_queues_discovery_and_worker_updates_status(tmp_path: Path) -> None:
- prepare_known_service_import("tool-service")
- from app.application.services import ToolApplicationService
- from app.bootstrap.settings import ToolServiceSettings
- from app.db.models import Base
- from app.db.session import build_session_factory
- from app.domain.repositories import (
- ToolBindingRepository,
- ToolCredentialRepository,
- ToolDefinitionRepository,
- ToolConnectionRepository,
- )
- from app.schemas.tool import McpConnectRequestDto
- from core_shared.secrets import SecretCipher
- from core_shared.task_queue import TOOL_MCP_DISCOVERY_QUEUE, TaskQueuePublisher
- fake_redis = FakeRedis()
- settings = ToolServiceSettings(
- database_url=build_postgres_database_url(tmp_path, "tool-redis-jobs"),
- mcp_discovery_async_enabled=True)
- session_factory = build_session_factory(settings)
- Base.metadata.create_all(bind=session_factory.kw["bind"])
- with session_factory() as db:
- service = ToolApplicationService(
- tool_definition_repository=ToolDefinitionRepository(db),
- tool_connection_repository=ToolConnectionRepository(db),
- tool_binding_repository=ToolBindingRepository(db),
- tool_credential_repository=ToolCredentialRepository(db),
- secret_cipher=SecretCipher(key=settings.credential_encryption_key),
- settings=settings,
- redis_client=fake_redis,
- task_queue_publisher=TaskQueuePublisher(client=fake_redis))
- service._validate_mcp_connection = lambda config: None # type: ignore[method-assign]
- connected = service.connect_mcp_server(
- McpConnectRequestDto(
- config={
- "billing_mcp": {
- "url": "http://127.0.0.1:9090/sse",
- "headers": {"X-MCP-API-KEY": "secret"},
- "mcp_tools": [{"name": "query_invoice"}],
- }
- }))
- status = connected.connection.invokeConfig["mcp_status"]
- assert status["status"] == "queued"
- queued_payload = fake_redis.blpop([TOOL_MCP_DISCOVERY_QUEUE])
- assert queued_payload is not None
- processed = service.execute_mcp_discovery_job(
- connection_id=connected.connection.id,
- job_id=str(status["jobId"]),
- worker_key="test-worker",
- lease_seconds=30,
- redis_client=fake_redis)
- assert processed is not None
- assert processed.invoke_config_json["mcp_status"]["status"] == "completed"
- assert processed.invoke_config_json["mcp_status"]["workerKey"] == "test-worker"
- session_factory.kw["bind"].dispose()
|