| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- from __future__ import annotations
- import base64
- import json
- from pathlib import Path
- from tests.conftest import build_postgres_database_url, prepare_known_service_import
- class FakeRedis:
- def __init__(self) -> None:
- self.values: dict[str, bytes | str] = {}
- self.queues: dict[str, list[bytes | str]] = {}
- def ping(self) -> bool:
- return True
- def rpush(self, name: str, value: bytes | str) -> int:
- queue = self.queues.setdefault(name, [])
- queue.append(value)
- return len(queue)
- def blpop(self, names: list[str], timeout: int = 1) -> tuple[str, bytes | str] | None:
- for name in names:
- queue = self.queues.get(name) or []
- if queue:
- return name, queue.pop(0)
- return None
- def set(
- self,
- name: str,
- value: bytes | str,
- nx: bool = False,
- ex: int | None = None) -> bool:
- if nx and name in self.values:
- return False
- self.values[name] = value
- return True
- def get(self, name: str) -> bytes | str | None:
- return self.values.get(name)
- def eval(self, script: str, numkeys: int, name: str, token: str) -> int:
- if self.values.get(name) == token:
- self.values.pop(name, None)
- return 1
- return 0
- def delete(self, name: str) -> int:
- existed = name in self.values
- self.values.pop(name, None)
- return 1 if existed else 0
- def test_knowledge_search_uses_pgvector_when_available(tmp_path: Path) -> None:
- prepare_known_service_import("knowledge-service")
- from app.application.services import KnowledgeApplicationService
- from app.bootstrap.settings import KnowledgeServiceSettings
- from app.db.models import Base
- from app.db.session import build_session_factory
- from app.domain.repositories import (
- KnowledgeBaseRepository,
- KnowledgeChunkRepository,
- KnowledgeDocumentRepository,
- )
- from app.schemas.knowledge import (
- KnowledgeBaseCreateRequest,
- KnowledgeDocumentCreateRequest,
- KnowledgeSearchRequest,
- )
- settings = KnowledgeServiceSettings(
- database_url=build_postgres_database_url(tmp_path, "knowledge-search-fallback"),
- embedding_provider="local",
- object_storage_backend="memory")
- session_factory = build_session_factory(settings)
- Base.metadata.create_all(bind=session_factory.kw["bind"])
- with session_factory() as db:
- service = KnowledgeApplicationService(
- settings=settings,
- base_repository=KnowledgeBaseRepository(db),
- document_repository=KnowledgeDocumentRepository(db),
- chunk_repository=KnowledgeChunkRepository(db))
- base = service.create_base(
- KnowledgeBaseCreateRequest(code="kb", name="KB")
- )
- _, chunks = service.create_document(
- KnowledgeDocumentCreateRequest(
- knowledge_base_id=base.id,
- title="Refund Policy",
- content_text="Refunds are available within seven days for eligible orders.",
- chunk_size=40,
- chunk_overlap=5)
- )
- assert chunks[0].embedding_vector is not None
- results = service.search(
- KnowledgeSearchRequest(
- knowledge_base_id=base.id,
- query="refund seven days",
- top_k=3)
- )
- assert results
- assert results[0][3]["retrieval_mode"] == "pgvector-hybrid"
- assert results[0][3]["rerank_enabled"] is True
- assert "citation" in results[0][3]
- assert results[0][3]["weights"]["rerank"] == settings.retrieval_rerank_weight
- session_factory.kw["bind"].dispose()
- def test_create_document_parses_base64_markdown_before_indexing(tmp_path: Path) -> None:
- prepare_known_service_import("knowledge-service")
- from app.application.services import KnowledgeApplicationService
- from app.bootstrap.settings import KnowledgeServiceSettings
- from app.db.models import Base
- from app.db.session import build_session_factory
- from app.domain.repositories import (
- KnowledgeBaseRepository,
- KnowledgeChunkRepository,
- KnowledgeDocumentRepository,
- )
- from app.schemas.knowledge import KnowledgeBaseCreateRequest, KnowledgeDocumentCreateRequest
- settings = KnowledgeServiceSettings(
- database_url=build_postgres_database_url(tmp_path, "knowledge-markdown-parser"),
- embedding_provider="local",
- object_storage_backend="memory")
- session_factory = build_session_factory(settings)
- Base.metadata.create_all(bind=session_factory.kw["bind"])
- with session_factory() as db:
- service = KnowledgeApplicationService(
- settings=settings,
- base_repository=KnowledgeBaseRepository(db),
- document_repository=KnowledgeDocumentRepository(db),
- chunk_repository=KnowledgeChunkRepository(db))
- base = service.create_base(
- KnowledgeBaseCreateRequest(code="kb", name="KB")
- )
- encoded = base64.b64encode(
- b"# Refund Policy\nRefunds are available within seven days."
- ).decode("ascii")
- document, chunks = service.create_document(
- KnowledgeDocumentCreateRequest(
- knowledge_base_id=base.id,
- title="Refund Policy",
- source_type="markdown",
- content_base64=encoded,
- chunk_size=80,
- chunk_overlap=0)
- )
- assert document.source_type == "markdown"
- assert document.content_text == ""
- assert document.metadata_json is not None
- assert document.metadata_json["object_storage"]["backend"] == "memory"
- assert document.metadata_json is not None
- assert document.metadata_json["parser_metadata"]["parser"] == "knowledge-document-parser-v1"
- assert chunks
- assert chunks[0].content_text.startswith("Refund Policy")
- session_factory.kw["bind"].dispose()
- def test_create_document_can_queue_indexing_and_worker_processes_job(tmp_path: Path) -> None:
- prepare_known_service_import("knowledge-service")
- from app.application.services import KnowledgeApplicationService
- from app.bootstrap.settings import KnowledgeServiceSettings
- from app.db.models import Base
- from app.db.session import build_session_factory
- from app.domain.repositories import (
- KnowledgeBaseRepository,
- KnowledgeChunkRepository,
- KnowledgeDocumentRepository,
- )
- from app.schemas.knowledge import (
- KnowledgeBaseCreateRequest,
- KnowledgeDocumentCreateRequestDto,
- )
- from core_shared.task_queue import KNOWLEDGE_DOCUMENT_QUEUE, TaskQueuePublisher
- fake_redis = FakeRedis()
- settings = KnowledgeServiceSettings(
- database_url=build_postgres_database_url(tmp_path, "knowledge-async-indexing"),
- embedding_provider="local",
- object_storage_backend="memory",
- async_indexing_enabled=True)
- session_factory = build_session_factory(settings)
- Base.metadata.create_all(bind=session_factory.kw["bind"])
- with session_factory() as db:
- service = KnowledgeApplicationService(
- settings=settings,
- base_repository=KnowledgeBaseRepository(db),
- document_repository=KnowledgeDocumentRepository(db),
- chunk_repository=KnowledgeChunkRepository(db),
- redis_client=fake_redis,
- task_queue_publisher=TaskQueuePublisher(client=fake_redis))
- base = service.create_base(KnowledgeBaseCreateRequest(code="kb", name="KB"))
- result = service.create_document_from_contract_result(
- KnowledgeDocumentCreateRequestDto(
- knowledgeBaseId=base.id,
- title="Async FAQ",
- sourceType="markdown",
- contentText="# Async FAQ\nQueued indexing should not block HTTP.",
- chunkSize=80,
- chunkOverlap=0,
- asyncMode=True)
- )
- assert result.queued is True
- assert result.chunks == []
- assert result.document.status == "queued"
- assert result.job is not None
- assert result.job.status == "queued"
- queued_payload = fake_redis.blpop([KNOWLEDGE_DOCUMENT_QUEUE])
- assert queued_payload is not None
- _, raw_payload = queued_payload
- payload = json.loads(raw_payload.decode("utf-8") if isinstance(raw_payload, bytes) else raw_payload)
- processed = service.execute_document_index_job(
- document_id=str(payload["document_id"]),
- action=str(payload["action"]),
- job_id=str(payload["job_id"]),
- worker_key="test-worker",
- lease_seconds=30,
- redis_client=fake_redis)
- assert processed is not None
- indexed_document, chunks = processed
- assert indexed_document.status == "indexed"
- assert chunks
- job = service.detail_index_job(document_id=indexed_document.id)
- assert job is not None
- assert job.status == "completed"
- assert job.progress == 100
- assert job.workerKey == "test-worker"
- session_factory.kw["bind"].dispose()
|