test_redis_background_jobs.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. from __future__ import annotations
  2. from pathlib import Path
  3. from tests.conftest import build_postgres_database_url, prepare_known_service_import
  4. class FakeRedis:
  5. def __init__(self) -> None:
  6. self.values: dict[str, bytes | str] = {}
  7. self.queues: dict[str, list[bytes | str]] = {}
  8. def ping(self) -> bool:
  9. return True
  10. def rpush(self, name: str, value: bytes | str) -> int:
  11. queue = self.queues.setdefault(name, [])
  12. queue.append(value)
  13. return len(queue)
  14. def blpop(self, names: list[str], timeout: int = 1) -> tuple[str, bytes | str] | None:
  15. for name in names:
  16. queue = self.queues.get(name) or []
  17. if queue:
  18. return name, queue.pop(0)
  19. return None
  20. def set(
  21. self,
  22. name: str,
  23. value: bytes | str,
  24. nx: bool = False,
  25. ex: int | None = None) -> bool:
  26. if nx and name in self.values:
  27. return False
  28. self.values[name] = value
  29. return True
  30. def get(self, name: str) -> bytes | str | None:
  31. return self.values.get(name)
  32. def incr(self, name: str) -> int:
  33. current = self.get(name)
  34. if isinstance(current, bytes):
  35. current = current.decode("utf-8")
  36. next_value = int(current or "0") + 1
  37. self.values[name] = str(next_value)
  38. return next_value
  39. def eval(self, script: str, numkeys: int, name: str, token: str) -> int:
  40. if self.values.get(name) == token:
  41. self.values.pop(name, None)
  42. return 1
  43. return 0
  44. def delete(self, name: str) -> int:
  45. existed = name in self.values
  46. self.values.pop(name, None)
  47. return 1 if existed else 0
  48. def test_memory_search_uses_redis_cache_and_touch_queue(tmp_path: Path) -> None:
  49. prepare_known_service_import("memory-service")
  50. from app.application.services import MemoryApplicationService
  51. from app.bootstrap.settings import MemoryServiceSettings
  52. from app.db.models import Base
  53. from app.db.session import build_session_factory
  54. from app.domain.repositories import MemoryItemRepository
  55. from app.schemas.memory import MemoryCreateRequest, MemorySearchRequestDto
  56. from core_shared.task_queue import MEMORY_TOUCH_QUEUE, TaskQueuePublisher
  57. fake_redis = FakeRedis()
  58. settings = MemoryServiceSettings(
  59. database_url=build_postgres_database_url(tmp_path, "memory-redis-jobs"),
  60. search_cache_ttl_seconds=60,
  61. async_touch_enabled=True)
  62. session_factory = build_session_factory(settings)
  63. Base.metadata.create_all(bind=session_factory.kw["bind"])
  64. with session_factory() as db:
  65. service = MemoryApplicationService(
  66. memory_repository=MemoryItemRepository(db),
  67. settings=settings,
  68. redis_client=fake_redis,
  69. task_queue_publisher=TaskQueuePublisher(client=fake_redis))
  70. memory = service.create_memory(
  71. MemoryCreateRequest(
  72. scope_type="user",
  73. scope_id="u1",
  74. memory_type="preference",
  75. content_text="用户喜欢 Redis 后台任务。",
  76. importance_score=80))
  77. first_results = service.search_memories_contract(
  78. MemorySearchRequestDto(query="Redis 后台", scopeType="user", scopeId="u1"))
  79. second_results = service.search_memories_contract(
  80. MemorySearchRequestDto(query="Redis 后台", scopeType="user", scopeId="u1"))
  81. assert first_results[0][0].id == memory.id
  82. assert second_results[0][0].id == memory.id
  83. assert any(key.startswith("memory-search:") for key in fake_redis.values)
  84. assert len(fake_redis.queues[MEMORY_TOUCH_QUEUE]) == 2
  85. session_factory.kw["bind"].dispose()
  86. def test_tool_mcp_connect_queues_discovery_and_worker_updates_status(tmp_path: Path) -> None:
  87. prepare_known_service_import("tool-service")
  88. from app.application.services import ToolApplicationService
  89. from app.bootstrap.settings import ToolServiceSettings
  90. from app.db.models import Base
  91. from app.db.session import build_session_factory
  92. from app.domain.repositories import (
  93. ToolBindingRepository,
  94. ToolCredentialRepository,
  95. ToolDefinitionRepository,
  96. ToolConnectionRepository,
  97. )
  98. from app.schemas.tool import McpConnectRequestDto
  99. from core_shared.secrets import SecretCipher
  100. from core_shared.task_queue import TOOL_MCP_DISCOVERY_QUEUE, TaskQueuePublisher
  101. fake_redis = FakeRedis()
  102. settings = ToolServiceSettings(
  103. database_url=build_postgres_database_url(tmp_path, "tool-redis-jobs"),
  104. mcp_discovery_async_enabled=True)
  105. session_factory = build_session_factory(settings)
  106. Base.metadata.create_all(bind=session_factory.kw["bind"])
  107. with session_factory() as db:
  108. service = ToolApplicationService(
  109. tool_definition_repository=ToolDefinitionRepository(db),
  110. tool_connection_repository=ToolConnectionRepository(db),
  111. tool_binding_repository=ToolBindingRepository(db),
  112. tool_credential_repository=ToolCredentialRepository(db),
  113. secret_cipher=SecretCipher(key=settings.credential_encryption_key),
  114. settings=settings,
  115. redis_client=fake_redis,
  116. task_queue_publisher=TaskQueuePublisher(client=fake_redis))
  117. service._validate_mcp_connection = lambda config: None # type: ignore[method-assign]
  118. connected = service.connect_mcp_server(
  119. McpConnectRequestDto(
  120. config={
  121. "billing_mcp": {
  122. "url": "http://127.0.0.1:9090/sse",
  123. "headers": {"X-MCP-API-KEY": "secret"},
  124. "mcp_tools": [{"name": "query_invoice"}],
  125. }
  126. }))
  127. status = connected.connection.invokeConfig["mcp_status"]
  128. assert status["status"] == "queued"
  129. queued_payload = fake_redis.blpop([TOOL_MCP_DISCOVERY_QUEUE])
  130. assert queued_payload is not None
  131. processed = service.execute_mcp_discovery_job(
  132. connection_id=connected.connection.id,
  133. job_id=str(status["jobId"]),
  134. worker_key="test-worker",
  135. lease_seconds=30,
  136. redis_client=fake_redis)
  137. assert processed is not None
  138. assert processed.invoke_config_json["mcp_status"]["status"] == "completed"
  139. assert processed.invoke_config_json["mcp_status"]["workerKey"] == "test-worker"
  140. session_factory.kw["bind"].dispose()