test_knowledge_pgvector_fallback.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. from __future__ import annotations
  2. import base64
  3. import json
  4. from pathlib import Path
  5. from tests.conftest import build_postgres_database_url, prepare_known_service_import
  6. class FakeRedis:
  7. def __init__(self) -> None:
  8. self.values: dict[str, bytes | str] = {}
  9. self.queues: dict[str, list[bytes | str]] = {}
  10. def ping(self) -> bool:
  11. return True
  12. def rpush(self, name: str, value: bytes | str) -> int:
  13. queue = self.queues.setdefault(name, [])
  14. queue.append(value)
  15. return len(queue)
  16. def blpop(self, names: list[str], timeout: int = 1) -> tuple[str, bytes | str] | None:
  17. for name in names:
  18. queue = self.queues.get(name) or []
  19. if queue:
  20. return name, queue.pop(0)
  21. return None
  22. def set(
  23. self,
  24. name: str,
  25. value: bytes | str,
  26. nx: bool = False,
  27. ex: int | None = None) -> bool:
  28. if nx and name in self.values:
  29. return False
  30. self.values[name] = value
  31. return True
  32. def get(self, name: str) -> bytes | str | None:
  33. return self.values.get(name)
  34. def eval(self, script: str, numkeys: int, name: str, token: str) -> int:
  35. if self.values.get(name) == token:
  36. self.values.pop(name, None)
  37. return 1
  38. return 0
  39. def delete(self, name: str) -> int:
  40. existed = name in self.values
  41. self.values.pop(name, None)
  42. return 1 if existed else 0
  43. def test_knowledge_search_uses_pgvector_when_available(tmp_path: Path) -> None:
  44. prepare_known_service_import("knowledge-service")
  45. from app.application.services import KnowledgeApplicationService
  46. from app.bootstrap.settings import KnowledgeServiceSettings
  47. from app.db.models import Base
  48. from app.db.session import build_session_factory
  49. from app.domain.repositories import (
  50. KnowledgeBaseRepository,
  51. KnowledgeChunkRepository,
  52. KnowledgeDocumentRepository,
  53. )
  54. from app.schemas.knowledge import (
  55. KnowledgeBaseCreateRequest,
  56. KnowledgeDocumentCreateRequest,
  57. KnowledgeSearchRequest,
  58. )
  59. settings = KnowledgeServiceSettings(
  60. database_url=build_postgres_database_url(tmp_path, "knowledge-search-fallback"),
  61. embedding_provider="local",
  62. object_storage_backend="memory")
  63. session_factory = build_session_factory(settings)
  64. Base.metadata.create_all(bind=session_factory.kw["bind"])
  65. with session_factory() as db:
  66. service = KnowledgeApplicationService(
  67. settings=settings,
  68. base_repository=KnowledgeBaseRepository(db),
  69. document_repository=KnowledgeDocumentRepository(db),
  70. chunk_repository=KnowledgeChunkRepository(db))
  71. base = service.create_base(
  72. KnowledgeBaseCreateRequest(code="kb", name="KB")
  73. )
  74. _, chunks = service.create_document(
  75. KnowledgeDocumentCreateRequest(
  76. knowledge_base_id=base.id,
  77. title="Refund Policy",
  78. content_text="Refunds are available within seven days for eligible orders.",
  79. chunk_size=40,
  80. chunk_overlap=5)
  81. )
  82. assert chunks[0].embedding_vector is not None
  83. results = service.search(
  84. KnowledgeSearchRequest(
  85. knowledge_base_id=base.id,
  86. query="refund seven days",
  87. top_k=3)
  88. )
  89. assert results
  90. assert results[0][3]["retrieval_mode"] == "pgvector-hybrid"
  91. assert results[0][3]["rerank_enabled"] is True
  92. assert "citation" in results[0][3]
  93. assert results[0][3]["weights"]["rerank"] == settings.retrieval_rerank_weight
  94. session_factory.kw["bind"].dispose()
  95. def test_create_document_parses_base64_markdown_before_indexing(tmp_path: Path) -> None:
  96. prepare_known_service_import("knowledge-service")
  97. from app.application.services import KnowledgeApplicationService
  98. from app.bootstrap.settings import KnowledgeServiceSettings
  99. from app.db.models import Base
  100. from app.db.session import build_session_factory
  101. from app.domain.repositories import (
  102. KnowledgeBaseRepository,
  103. KnowledgeChunkRepository,
  104. KnowledgeDocumentRepository,
  105. )
  106. from app.schemas.knowledge import KnowledgeBaseCreateRequest, KnowledgeDocumentCreateRequest
  107. settings = KnowledgeServiceSettings(
  108. database_url=build_postgres_database_url(tmp_path, "knowledge-markdown-parser"),
  109. embedding_provider="local",
  110. object_storage_backend="memory")
  111. session_factory = build_session_factory(settings)
  112. Base.metadata.create_all(bind=session_factory.kw["bind"])
  113. with session_factory() as db:
  114. service = KnowledgeApplicationService(
  115. settings=settings,
  116. base_repository=KnowledgeBaseRepository(db),
  117. document_repository=KnowledgeDocumentRepository(db),
  118. chunk_repository=KnowledgeChunkRepository(db))
  119. base = service.create_base(
  120. KnowledgeBaseCreateRequest(code="kb", name="KB")
  121. )
  122. encoded = base64.b64encode(
  123. b"# Refund Policy\nRefunds are available within seven days."
  124. ).decode("ascii")
  125. document, chunks = service.create_document(
  126. KnowledgeDocumentCreateRequest(
  127. knowledge_base_id=base.id,
  128. title="Refund Policy",
  129. source_type="markdown",
  130. content_base64=encoded,
  131. chunk_size=80,
  132. chunk_overlap=0)
  133. )
  134. assert document.source_type == "markdown"
  135. assert document.content_text == ""
  136. assert document.metadata_json is not None
  137. assert document.metadata_json["object_storage"]["backend"] == "memory"
  138. assert document.metadata_json is not None
  139. assert document.metadata_json["parser_metadata"]["parser"] == "knowledge-document-parser-v1"
  140. assert chunks
  141. assert chunks[0].content_text.startswith("Refund Policy")
  142. session_factory.kw["bind"].dispose()
  143. def test_create_document_can_queue_indexing_and_worker_processes_job(tmp_path: Path) -> None:
  144. prepare_known_service_import("knowledge-service")
  145. from app.application.services import KnowledgeApplicationService
  146. from app.bootstrap.settings import KnowledgeServiceSettings
  147. from app.db.models import Base
  148. from app.db.session import build_session_factory
  149. from app.domain.repositories import (
  150. KnowledgeBaseRepository,
  151. KnowledgeChunkRepository,
  152. KnowledgeDocumentRepository,
  153. )
  154. from app.schemas.knowledge import (
  155. KnowledgeBaseCreateRequest,
  156. KnowledgeDocumentCreateRequestDto,
  157. )
  158. from core_shared.task_queue import KNOWLEDGE_DOCUMENT_QUEUE, TaskQueuePublisher
  159. fake_redis = FakeRedis()
  160. settings = KnowledgeServiceSettings(
  161. database_url=build_postgres_database_url(tmp_path, "knowledge-async-indexing"),
  162. embedding_provider="local",
  163. object_storage_backend="memory",
  164. async_indexing_enabled=True)
  165. session_factory = build_session_factory(settings)
  166. Base.metadata.create_all(bind=session_factory.kw["bind"])
  167. with session_factory() as db:
  168. service = KnowledgeApplicationService(
  169. settings=settings,
  170. base_repository=KnowledgeBaseRepository(db),
  171. document_repository=KnowledgeDocumentRepository(db),
  172. chunk_repository=KnowledgeChunkRepository(db),
  173. redis_client=fake_redis,
  174. task_queue_publisher=TaskQueuePublisher(client=fake_redis))
  175. base = service.create_base(KnowledgeBaseCreateRequest(code="kb", name="KB"))
  176. result = service.create_document_from_contract_result(
  177. KnowledgeDocumentCreateRequestDto(
  178. knowledgeBaseId=base.id,
  179. title="Async FAQ",
  180. sourceType="markdown",
  181. contentText="# Async FAQ\nQueued indexing should not block HTTP.",
  182. chunkSize=80,
  183. chunkOverlap=0,
  184. asyncMode=True)
  185. )
  186. assert result.queued is True
  187. assert result.chunks == []
  188. assert result.document.status == "queued"
  189. assert result.job is not None
  190. assert result.job.status == "queued"
  191. queued_payload = fake_redis.blpop([KNOWLEDGE_DOCUMENT_QUEUE])
  192. assert queued_payload is not None
  193. _, raw_payload = queued_payload
  194. payload = json.loads(raw_payload.decode("utf-8") if isinstance(raw_payload, bytes) else raw_payload)
  195. processed = service.execute_document_index_job(
  196. document_id=str(payload["document_id"]),
  197. action=str(payload["action"]),
  198. job_id=str(payload["job_id"]),
  199. worker_key="test-worker",
  200. lease_seconds=30,
  201. redis_client=fake_redis)
  202. assert processed is not None
  203. indexed_document, chunks = processed
  204. assert indexed_document.status == "indexed"
  205. assert chunks
  206. job = service.detail_index_job(document_id=indexed_document.id)
  207. assert job is not None
  208. assert job.status == "completed"
  209. assert job.progress == 100
  210. assert job.workerKey == "test-worker"
  211. session_factory.kw["bind"].dispose()