from __future__ import annotations import base64 import json from pathlib import Path from tests.conftest import build_postgres_database_url, prepare_known_service_import class FakeRedis: def __init__(self) -> None: self.values: dict[str, bytes | str] = {} self.queues: dict[str, list[bytes | str]] = {} def ping(self) -> bool: return True def rpush(self, name: str, value: bytes | str) -> int: queue = self.queues.setdefault(name, []) queue.append(value) return len(queue) def blpop(self, names: list[str], timeout: int = 1) -> tuple[str, bytes | str] | None: for name in names: queue = self.queues.get(name) or [] if queue: return name, queue.pop(0) return None def set( self, name: str, value: bytes | str, nx: bool = False, ex: int | None = None) -> bool: if nx and name in self.values: return False self.values[name] = value return True def get(self, name: str) -> bytes | str | None: return self.values.get(name) def eval(self, script: str, numkeys: int, name: str, token: str) -> int: if self.values.get(name) == token: self.values.pop(name, None) return 1 return 0 def delete(self, name: str) -> int: existed = name in self.values self.values.pop(name, None) return 1 if existed else 0 def test_knowledge_search_uses_pgvector_when_available(tmp_path: Path) -> None: prepare_known_service_import("knowledge-service") from app.application.services import KnowledgeApplicationService from app.bootstrap.settings import KnowledgeServiceSettings from app.db.models import Base from app.db.session import build_session_factory from app.domain.repositories import ( KnowledgeBaseRepository, KnowledgeChunkRepository, KnowledgeDocumentRepository, ) from app.schemas.knowledge import ( KnowledgeBaseCreateRequest, KnowledgeDocumentCreateRequest, KnowledgeSearchRequest, ) settings = KnowledgeServiceSettings( database_url=build_postgres_database_url(tmp_path, "knowledge-search-fallback"), embedding_provider="local", object_storage_backend="memory") session_factory = build_session_factory(settings) Base.metadata.create_all(bind=session_factory.kw["bind"]) with session_factory() as db: service = KnowledgeApplicationService( settings=settings, base_repository=KnowledgeBaseRepository(db), document_repository=KnowledgeDocumentRepository(db), chunk_repository=KnowledgeChunkRepository(db)) base = service.create_base( KnowledgeBaseCreateRequest(code="kb", name="KB") ) _, chunks = service.create_document( KnowledgeDocumentCreateRequest( knowledge_base_id=base.id, title="Refund Policy", content_text="Refunds are available within seven days for eligible orders.", chunk_size=40, chunk_overlap=5) ) assert chunks[0].embedding_vector is not None results = service.search( KnowledgeSearchRequest( knowledge_base_id=base.id, query="refund seven days", top_k=3) ) assert results assert results[0][3]["retrieval_mode"] == "pgvector-hybrid" assert results[0][3]["rerank_enabled"] is True assert "citation" in results[0][3] assert results[0][3]["weights"]["rerank"] == settings.retrieval_rerank_weight session_factory.kw["bind"].dispose() def test_create_document_parses_base64_markdown_before_indexing(tmp_path: Path) -> None: prepare_known_service_import("knowledge-service") from app.application.services import KnowledgeApplicationService from app.bootstrap.settings import KnowledgeServiceSettings from app.db.models import Base from app.db.session import build_session_factory from app.domain.repositories import ( KnowledgeBaseRepository, KnowledgeChunkRepository, KnowledgeDocumentRepository, ) from app.schemas.knowledge import KnowledgeBaseCreateRequest, KnowledgeDocumentCreateRequest settings = KnowledgeServiceSettings( database_url=build_postgres_database_url(tmp_path, "knowledge-markdown-parser"), embedding_provider="local", object_storage_backend="memory") session_factory = build_session_factory(settings) Base.metadata.create_all(bind=session_factory.kw["bind"]) with session_factory() as db: service = KnowledgeApplicationService( settings=settings, base_repository=KnowledgeBaseRepository(db), document_repository=KnowledgeDocumentRepository(db), chunk_repository=KnowledgeChunkRepository(db)) base = service.create_base( KnowledgeBaseCreateRequest(code="kb", name="KB") ) encoded = base64.b64encode( b"# Refund Policy\nRefunds are available within seven days." ).decode("ascii") document, chunks = service.create_document( KnowledgeDocumentCreateRequest( knowledge_base_id=base.id, title="Refund Policy", source_type="markdown", content_base64=encoded, chunk_size=80, chunk_overlap=0) ) assert document.source_type == "markdown" assert document.content_text == "" assert document.metadata_json is not None assert document.metadata_json["object_storage"]["backend"] == "memory" assert document.metadata_json is not None assert document.metadata_json["parser_metadata"]["parser"] == "knowledge-document-parser-v1" assert chunks assert chunks[0].content_text.startswith("Refund Policy") session_factory.kw["bind"].dispose() def test_create_document_can_queue_indexing_and_worker_processes_job(tmp_path: Path) -> None: prepare_known_service_import("knowledge-service") from app.application.services import KnowledgeApplicationService from app.bootstrap.settings import KnowledgeServiceSettings from app.db.models import Base from app.db.session import build_session_factory from app.domain.repositories import ( KnowledgeBaseRepository, KnowledgeChunkRepository, KnowledgeDocumentRepository, ) from app.schemas.knowledge import ( KnowledgeBaseCreateRequest, KnowledgeDocumentCreateRequestDto, ) from core_shared.task_queue import KNOWLEDGE_DOCUMENT_QUEUE, TaskQueuePublisher fake_redis = FakeRedis() settings = KnowledgeServiceSettings( database_url=build_postgres_database_url(tmp_path, "knowledge-async-indexing"), embedding_provider="local", object_storage_backend="memory", async_indexing_enabled=True) session_factory = build_session_factory(settings) Base.metadata.create_all(bind=session_factory.kw["bind"]) with session_factory() as db: service = KnowledgeApplicationService( settings=settings, base_repository=KnowledgeBaseRepository(db), document_repository=KnowledgeDocumentRepository(db), chunk_repository=KnowledgeChunkRepository(db), redis_client=fake_redis, task_queue_publisher=TaskQueuePublisher(client=fake_redis)) base = service.create_base(KnowledgeBaseCreateRequest(code="kb", name="KB")) result = service.create_document_from_contract_result( KnowledgeDocumentCreateRequestDto( knowledgeBaseId=base.id, title="Async FAQ", sourceType="markdown", contentText="# Async FAQ\nQueued indexing should not block HTTP.", chunkSize=80, chunkOverlap=0, asyncMode=True) ) assert result.queued is True assert result.chunks == [] assert result.document.status == "queued" assert result.job is not None assert result.job.status == "queued" queued_payload = fake_redis.blpop([KNOWLEDGE_DOCUMENT_QUEUE]) assert queued_payload is not None _, raw_payload = queued_payload payload = json.loads(raw_payload.decode("utf-8") if isinstance(raw_payload, bytes) else raw_payload) processed = service.execute_document_index_job( document_id=str(payload["document_id"]), action=str(payload["action"]), job_id=str(payload["job_id"]), worker_key="test-worker", lease_seconds=30, redis_client=fake_redis) assert processed is not None indexed_document, chunks = processed assert indexed_document.status == "indexed" assert chunks job = service.detail_index_job(document_id=indexed_document.id) assert job is not None assert job.status == "completed" assert job.progress == 100 assert job.workerKey == "test-worker" session_factory.kw["bind"].dispose()