from __future__ import annotations from pathlib import Path from tests.conftest import build_postgres_database_url, prepare_known_service_import class FakeRedis: def __init__(self) -> None: self.values: dict[str, bytes | str] = {} self.queues: dict[str, list[bytes | str]] = {} def ping(self) -> bool: return True def rpush(self, name: str, value: bytes | str) -> int: queue = self.queues.setdefault(name, []) queue.append(value) return len(queue) def blpop(self, names: list[str], timeout: int = 1) -> tuple[str, bytes | str] | None: for name in names: queue = self.queues.get(name) or [] if queue: return name, queue.pop(0) return None def set( self, name: str, value: bytes | str, nx: bool = False, ex: int | None = None) -> bool: if nx and name in self.values: return False self.values[name] = value return True def get(self, name: str) -> bytes | str | None: return self.values.get(name) def incr(self, name: str) -> int: current = self.get(name) if isinstance(current, bytes): current = current.decode("utf-8") next_value = int(current or "0") + 1 self.values[name] = str(next_value) return next_value def eval(self, script: str, numkeys: int, name: str, token: str) -> int: if self.values.get(name) == token: self.values.pop(name, None) return 1 return 0 def delete(self, name: str) -> int: existed = name in self.values self.values.pop(name, None) return 1 if existed else 0 def test_memory_search_uses_redis_cache_and_touch_queue(tmp_path: Path) -> None: prepare_known_service_import("memory-service") from app.application.services import MemoryApplicationService from app.bootstrap.settings import MemoryServiceSettings from app.db.models import Base from app.db.session import build_session_factory from app.domain.repositories import MemoryItemRepository from app.schemas.memory import MemoryCreateRequest, MemorySearchRequestDto from core_shared.task_queue import MEMORY_TOUCH_QUEUE, TaskQueuePublisher fake_redis = FakeRedis() settings = MemoryServiceSettings( database_url=build_postgres_database_url(tmp_path, "memory-redis-jobs"), search_cache_ttl_seconds=60, async_touch_enabled=True) session_factory = build_session_factory(settings) Base.metadata.create_all(bind=session_factory.kw["bind"]) with session_factory() as db: service = MemoryApplicationService( memory_repository=MemoryItemRepository(db), settings=settings, redis_client=fake_redis, task_queue_publisher=TaskQueuePublisher(client=fake_redis)) memory = service.create_memory( MemoryCreateRequest( scope_type="user", scope_id="u1", memory_type="preference", content_text="用户喜欢 Redis 后台任务。", importance_score=80)) first_results = service.search_memories_contract( MemorySearchRequestDto(query="Redis 后台", scopeType="user", scopeId="u1")) second_results = service.search_memories_contract( MemorySearchRequestDto(query="Redis 后台", scopeType="user", scopeId="u1")) assert first_results[0][0].id == memory.id assert second_results[0][0].id == memory.id assert any(key.startswith("memory-search:") for key in fake_redis.values) assert len(fake_redis.queues[MEMORY_TOUCH_QUEUE]) == 2 session_factory.kw["bind"].dispose() def test_tool_mcp_connect_queues_discovery_and_worker_updates_status(tmp_path: Path) -> None: prepare_known_service_import("tool-service") from app.application.services import ToolApplicationService from app.bootstrap.settings import ToolServiceSettings from app.db.models import Base from app.db.session import build_session_factory from app.domain.repositories import ( ToolBindingRepository, ToolCredentialRepository, ToolDefinitionRepository, ToolConnectionRepository, ) from app.schemas.tool import McpConnectRequestDto from core_shared.secrets import SecretCipher from core_shared.task_queue import TOOL_MCP_DISCOVERY_QUEUE, TaskQueuePublisher fake_redis = FakeRedis() settings = ToolServiceSettings( database_url=build_postgres_database_url(tmp_path, "tool-redis-jobs"), mcp_discovery_async_enabled=True) session_factory = build_session_factory(settings) Base.metadata.create_all(bind=session_factory.kw["bind"]) with session_factory() as db: service = ToolApplicationService( tool_definition_repository=ToolDefinitionRepository(db), tool_connection_repository=ToolConnectionRepository(db), tool_binding_repository=ToolBindingRepository(db), tool_credential_repository=ToolCredentialRepository(db), secret_cipher=SecretCipher(key=settings.credential_encryption_key), settings=settings, redis_client=fake_redis, task_queue_publisher=TaskQueuePublisher(client=fake_redis)) service._validate_mcp_connection = lambda config: None # type: ignore[method-assign] connected = service.connect_mcp_server( McpConnectRequestDto( config={ "billing_mcp": { "url": "http://127.0.0.1:9090/sse", "headers": {"X-MCP-API-KEY": "secret"}, "mcp_tools": [{"name": "query_invoice"}], } })) status = connected.connection.invokeConfig["mcp_status"] assert status["status"] == "queued" queued_payload = fake_redis.blpop([TOOL_MCP_DISCOVERY_QUEUE]) assert queued_payload is not None processed = service.execute_mcp_discovery_job( connection_id=connected.connection.id, job_id=str(status["jobId"]), worker_key="test-worker", lease_seconds=30, redis_client=fake_redis) assert processed is not None assert processed.invoke_config_json["mcp_status"]["status"] == "completed" assert processed.invoke_config_json["mcp_status"]["workerKey"] == "test-worker" session_factory.kw["bind"].dispose()