|
@@ -1,83 +1,54 @@
|
|
|
-from __future__ import annotations
|
|
|
|
|
|
|
+"""Knowledge application service — thin facade delegating to sub-services."""
|
|
|
|
|
|
|
|
-import base64
|
|
|
|
|
-import hashlib
|
|
|
|
|
-from dataclasses import dataclass
|
|
|
|
|
-from datetime import datetime, timedelta
|
|
|
|
|
-from typing import TYPE_CHECKING, cast
|
|
|
|
|
-from uuid import uuid4
|
|
|
|
|
|
|
+from __future__ import annotations
|
|
|
|
|
|
|
|
-from sqlalchemy.orm import Session
|
|
|
|
|
|
|
+from typing import TYPE_CHECKING
|
|
|
|
|
|
|
|
from core_shared import JSONValue, try_build_redis_client
|
|
from core_shared import JSONValue, try_build_redis_client
|
|
|
-from core_shared.task_queue import KNOWLEDGE_DOCUMENT_QUEUE, TaskQueuePublisher
|
|
|
|
|
|
|
+from core_shared.task_queue import TaskQueuePublisher
|
|
|
|
|
|
|
|
-from app.application.document_parsers import (
|
|
|
|
|
- DocumentParseError,
|
|
|
|
|
- ParsedDocument,
|
|
|
|
|
- normalize_source_type,
|
|
|
|
|
- parse_document_content,
|
|
|
|
|
- read_document_content_bytes)
|
|
|
|
|
-from app.application.chunking import chunk_document
|
|
|
|
|
|
|
+from app.application.crud_service import KnowledgeCrudService
|
|
|
from app.application.embeddings import EmbeddingService
|
|
from app.application.embeddings import EmbeddingService
|
|
|
-from app.application.retrieval import (
|
|
|
|
|
- bm25_score,
|
|
|
|
|
- build_chunk_payloads,
|
|
|
|
|
- compute_bm25_stats,
|
|
|
|
|
- cosine_similarity,
|
|
|
|
|
- keyword_score,
|
|
|
|
|
- rerank_score,
|
|
|
|
|
- stable_content_hash)
|
|
|
|
|
|
|
+from app.application.indexing_service import (
|
|
|
|
|
+ KnowledgeDocumentIngestResult,
|
|
|
|
|
+ KnowledgeIndexingError,
|
|
|
|
|
+ KnowledgeIndexingService,
|
|
|
|
|
+)
|
|
|
|
|
+from app.application.search_service import KnowledgeSearchService
|
|
|
|
|
+from app.application.settings_service import KnowledgeSettingsService
|
|
|
from app.bootstrap.settings import KnowledgeServiceSettings
|
|
from app.bootstrap.settings import KnowledgeServiceSettings
|
|
|
from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument
|
|
from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument
|
|
|
-from app.domain.repositories import (
|
|
|
|
|
- KnowledgeBaseRepository,
|
|
|
|
|
- KnowledgeChunkRepository,
|
|
|
|
|
- KnowledgeDocumentRepository)
|
|
|
|
|
-from app.infrastructure.object_storage import (
|
|
|
|
|
- KnowledgeObjectStorage,
|
|
|
|
|
- ObjectStorageError,
|
|
|
|
|
- ObjectStorageNotFoundError,
|
|
|
|
|
- ObjectStorageStatus,
|
|
|
|
|
- build_document_object_key,
|
|
|
|
|
- build_object_storage)
|
|
|
|
|
from app.schemas.knowledge import (
|
|
from app.schemas.knowledge import (
|
|
|
KnowledgeBaseCreateRequest,
|
|
KnowledgeBaseCreateRequest,
|
|
|
KnowledgeBaseCreateRequestDto,
|
|
KnowledgeBaseCreateRequestDto,
|
|
|
|
|
+ KnowledgeBaseReindexRequestDto,
|
|
|
KnowledgeBaseStatusUpdateRequest,
|
|
KnowledgeBaseStatusUpdateRequest,
|
|
|
KnowledgeBaseUpdateRequestDto,
|
|
KnowledgeBaseUpdateRequestDto,
|
|
|
- KnowledgeBaseReindexRequestDto,
|
|
|
|
|
KnowledgeDocumentCreateRequest,
|
|
KnowledgeDocumentCreateRequest,
|
|
|
KnowledgeDocumentCreateRequestDto,
|
|
KnowledgeDocumentCreateRequestDto,
|
|
|
- KnowledgeIndexJobAction,
|
|
|
|
|
- KnowledgeIndexJobData,
|
|
|
|
|
- KnowledgeIndexJobStatus,
|
|
|
|
|
KnowledgeDocumentParseRequest,
|
|
KnowledgeDocumentParseRequest,
|
|
|
KnowledgeDocumentReindexRequestDto,
|
|
KnowledgeDocumentReindexRequestDto,
|
|
|
KnowledgeDocumentUpdateRequestDto,
|
|
KnowledgeDocumentUpdateRequestDto,
|
|
|
|
|
+ KnowledgeIndexJobData,
|
|
|
|
|
+ KnowledgeSearchRequest,
|
|
|
KnowledgeSettingsDto,
|
|
KnowledgeSettingsDto,
|
|
|
KnowledgeSettingsUpdateRequestDto,
|
|
KnowledgeSettingsUpdateRequestDto,
|
|
|
- KnowledgeSearchRequest)
|
|
|
|
|
|
|
+)
|
|
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
if TYPE_CHECKING:
|
|
|
from redis import Redis
|
|
from redis import Redis
|
|
|
|
|
|
|
|
-
|
|
|
|
|
-@dataclass(frozen=True, slots=True)
|
|
|
|
|
-class KnowledgeDocumentIngestResult:
|
|
|
|
|
- document: KnowledgeDocument
|
|
|
|
|
- chunks: list[KnowledgeChunk]
|
|
|
|
|
- queued: bool = False
|
|
|
|
|
- job: KnowledgeIndexJobData | None = None
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-class KnowledgeIndexingError(RuntimeError):
|
|
|
|
|
- def __init__(self, *, document_id: str, message: str) -> None:
|
|
|
|
|
- super().__init__(message)
|
|
|
|
|
- self.document_id = document_id
|
|
|
|
|
|
|
+ from app.domain.repositories import (
|
|
|
|
|
+ KnowledgeBaseRepository,
|
|
|
|
|
+ KnowledgeChunkRepository,
|
|
|
|
|
+ KnowledgeDocumentRepository,
|
|
|
|
|
+ )
|
|
|
|
|
+ from app.infrastructure.object_storage import KnowledgeObjectStorage
|
|
|
|
|
|
|
|
|
|
|
|
|
class KnowledgeApplicationService:
|
|
class KnowledgeApplicationService:
|
|
|
|
|
+ """Facade composing CRUD, Indexing, Search, and Settings sub-services."""
|
|
|
|
|
+
|
|
|
def __init__(
|
|
def __init__(
|
|
|
self,
|
|
self,
|
|
|
*,
|
|
*,
|
|
@@ -87,7 +58,8 @@ class KnowledgeApplicationService:
|
|
|
chunk_repository: KnowledgeChunkRepository,
|
|
chunk_repository: KnowledgeChunkRepository,
|
|
|
object_storage: KnowledgeObjectStorage | None = None,
|
|
object_storage: KnowledgeObjectStorage | None = None,
|
|
|
redis_client: Redis | None = None,
|
|
redis_client: Redis | None = None,
|
|
|
- task_queue_publisher: TaskQueuePublisher | None = None) -> None:
|
|
|
|
|
|
|
+ task_queue_publisher: TaskQueuePublisher | None = None,
|
|
|
|
|
+ ) -> None:
|
|
|
self.settings = settings
|
|
self.settings = settings
|
|
|
self.base_repository = base_repository
|
|
self.base_repository = base_repository
|
|
|
self.document_repository = document_repository
|
|
self.document_repository = document_repository
|
|
@@ -97,1134 +69,179 @@ class KnowledgeApplicationService:
|
|
|
self.redis_client = redis_client
|
|
self.redis_client = redis_client
|
|
|
self.task_queue_publisher = task_queue_publisher
|
|
self.task_queue_publisher = task_queue_publisher
|
|
|
|
|
|
|
|
|
|
+ self.crud_service = KnowledgeCrudService(
|
|
|
|
|
+ settings=settings,
|
|
|
|
|
+ base_repository=base_repository,
|
|
|
|
|
+ document_repository=document_repository,
|
|
|
|
|
+ chunk_repository=chunk_repository,
|
|
|
|
|
+ object_storage=object_storage,
|
|
|
|
|
+ )
|
|
|
|
|
+ self.indexing_service = KnowledgeIndexingService(
|
|
|
|
|
+ settings=settings,
|
|
|
|
|
+ base_repository=base_repository,
|
|
|
|
|
+ document_repository=document_repository,
|
|
|
|
|
+ chunk_repository=chunk_repository,
|
|
|
|
|
+ embedding_service=self.embedding_service,
|
|
|
|
|
+ object_storage=object_storage,
|
|
|
|
|
+ redis_client=redis_client,
|
|
|
|
|
+ task_queue_publisher=task_queue_publisher,
|
|
|
|
|
+ )
|
|
|
|
|
+ self.search_service = KnowledgeSearchService(
|
|
|
|
|
+ settings=settings,
|
|
|
|
|
+ base_repository=base_repository,
|
|
|
|
|
+ document_repository=document_repository,
|
|
|
|
|
+ chunk_repository=chunk_repository,
|
|
|
|
|
+ embedding_service=self.embedding_service,
|
|
|
|
|
+ )
|
|
|
|
|
+ self.settings_service = KnowledgeSettingsService(
|
|
|
|
|
+ settings=settings,
|
|
|
|
|
+ base_repository=base_repository,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
@property
|
|
@property
|
|
|
def object_storage(self) -> KnowledgeObjectStorage:
|
|
def object_storage(self) -> KnowledgeObjectStorage:
|
|
|
- if self._object_storage is None:
|
|
|
|
|
- self._object_storage = build_object_storage(self.settings)
|
|
|
|
|
- return self._object_storage
|
|
|
|
|
|
|
+ return self.crud_service.object_storage
|
|
|
|
|
+
|
|
|
|
|
+ # ── Knowledge Base ────────────────────────────────────────────────
|
|
|
|
|
|
|
|
def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase:
|
|
def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase:
|
|
|
- return self.base_repository.create(
|
|
|
|
|
- code=payload.code,
|
|
|
|
|
- name=payload.name,
|
|
|
|
|
- description=payload.description,
|
|
|
|
|
- metadata_json=payload.metadata_json)
|
|
|
|
|
|
|
+ return self.crud_service.create_base(payload)
|
|
|
|
|
|
|
|
- def create_base_from_contract(
|
|
|
|
|
- self,
|
|
|
|
|
- payload: KnowledgeBaseCreateRequestDto) -> KnowledgeBase:
|
|
|
|
|
- return self.create_base(
|
|
|
|
|
- KnowledgeBaseCreateRequest(
|
|
|
|
|
- code=self._build_base_code(payload.name),
|
|
|
|
|
- name=payload.name,
|
|
|
|
|
- description=payload.description,
|
|
|
|
|
- metadata_json=payload.metadata))
|
|
|
|
|
|
|
+ def create_base_from_contract(self, payload: KnowledgeBaseCreateRequestDto) -> KnowledgeBase:
|
|
|
|
|
+ return self.crud_service.create_base_from_contract(payload)
|
|
|
|
|
|
|
|
def list_bases(self) -> list[KnowledgeBase]:
|
|
def list_bases(self) -> list[KnowledgeBase]:
|
|
|
- return self.base_repository.list_all()
|
|
|
|
|
|
|
+ return self.crud_service.list_bases()
|
|
|
|
|
|
|
|
- def list_bases_filtered(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- keyword: str | None = None,
|
|
|
|
|
- status: str | None = None) -> list[KnowledgeBase]:
|
|
|
|
|
- return self.base_repository.list_filtered(
|
|
|
|
|
- keyword=keyword,
|
|
|
|
|
- status=status)
|
|
|
|
|
|
|
+ def list_bases_filtered(self, *, keyword: str | None = None, status: str | None = None) -> list[KnowledgeBase]:
|
|
|
|
|
+ return self.crud_service.list_bases_filtered(keyword=keyword, status=status)
|
|
|
|
|
|
|
|
- def update_base_from_contract(
|
|
|
|
|
- self,
|
|
|
|
|
- payload: KnowledgeBaseUpdateRequestDto) -> KnowledgeBase | None:
|
|
|
|
|
- return self.base_repository.update(
|
|
|
|
|
- knowledge_base_id=payload.knowledgeBaseId,
|
|
|
|
|
- name=payload.name,
|
|
|
|
|
- description=payload.description,
|
|
|
|
|
- status=payload.status,
|
|
|
|
|
- metadata_json=payload.metadata)
|
|
|
|
|
|
|
+ def update_base_from_contract(self, payload: KnowledgeBaseUpdateRequestDto) -> KnowledgeBase | None:
|
|
|
|
|
+ return self.crud_service.update_base_from_contract(payload)
|
|
|
|
|
|
|
|
def delete_base(self, *, knowledge_base_id: str) -> bool:
|
|
def delete_base(self, *, knowledge_base_id: str) -> bool:
|
|
|
- documents = self.document_repository.list_by_base(
|
|
|
|
|
- knowledge_base_id=knowledge_base_id)
|
|
|
|
|
- for document in documents:
|
|
|
|
|
- self._delete_document_object(document=document)
|
|
|
|
|
- self.chunk_repository.delete_by_base(knowledge_base_id=knowledge_base_id)
|
|
|
|
|
- for document in documents:
|
|
|
|
|
- self.document_repository.delete(document_id=document.id)
|
|
|
|
|
- return self.base_repository.delete(knowledge_base_id=knowledge_base_id)
|
|
|
|
|
-
|
|
|
|
|
- def update_base_status(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- knowledge_base_id: str,
|
|
|
|
|
- payload: KnowledgeBaseStatusUpdateRequest) -> KnowledgeBase | None:
|
|
|
|
|
- return self.base_repository.update_status(
|
|
|
|
|
- knowledge_base_id=knowledge_base_id,
|
|
|
|
|
- status=payload.status)
|
|
|
|
|
-
|
|
|
|
|
- def create_document(
|
|
|
|
|
- self,
|
|
|
|
|
- payload: KnowledgeDocumentCreateRequest) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
|
|
|
|
|
- knowledge_base = self.base_repository.get_by_id(
|
|
|
|
|
- knowledge_base_id=payload.knowledge_base_id)
|
|
|
|
|
- if knowledge_base is None:
|
|
|
|
|
- raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
|
|
|
|
|
-
|
|
|
|
|
- parsed = self.parse_document(
|
|
|
|
|
- KnowledgeDocumentParseRequest(
|
|
|
|
|
- source_type=payload.source_type,
|
|
|
|
|
- source_uri=payload.source_uri,
|
|
|
|
|
- content_text=payload.content_text,
|
|
|
|
|
- content_base64=payload.content_base64)
|
|
|
|
|
- )
|
|
|
|
|
- raw_content = read_document_content_bytes(
|
|
|
|
|
- content_text=payload.content_text,
|
|
|
|
|
- content_base64=payload.content_base64)
|
|
|
|
|
- object_key = build_document_object_key(
|
|
|
|
|
- knowledge_base_id=payload.knowledge_base_id,
|
|
|
|
|
- source_type=parsed.source_type,
|
|
|
|
|
- title=payload.title)
|
|
|
|
|
- stored_object = self.object_storage.put_bytes(
|
|
|
|
|
- object_key=object_key,
|
|
|
|
|
- content=raw_content,
|
|
|
|
|
- content_type=self._guess_content_type(source_type=parsed.source_type))
|
|
|
|
|
- document: KnowledgeDocument | None = None
|
|
|
|
|
- try:
|
|
|
|
|
- metadata_json = {
|
|
|
|
|
- **payload.metadata_json,
|
|
|
|
|
- "parser_metadata": parsed.metadata_json,
|
|
|
|
|
- "object_storage": stored_object.to_metadata(),
|
|
|
|
|
- }
|
|
|
|
|
- document = self.document_repository.create(
|
|
|
|
|
- knowledge_base_id=payload.knowledge_base_id,
|
|
|
|
|
- title=payload.title,
|
|
|
|
|
- source_type=parsed.source_type,
|
|
|
|
|
- source_uri=payload.source_uri,
|
|
|
|
|
- content_text="",
|
|
|
|
|
- content_hash=stable_content_hash(parsed.content_text),
|
|
|
|
|
- metadata_json=metadata_json)
|
|
|
|
|
- try:
|
|
|
|
|
- chunks = self._index_document(
|
|
|
|
|
- document=document,
|
|
|
|
|
- content_text=parsed.content_text,
|
|
|
|
|
- chunk_size=payload.chunk_size,
|
|
|
|
|
- chunk_overlap=payload.chunk_overlap)
|
|
|
|
|
- except Exception as exc:
|
|
|
|
|
- self._mark_document_failed(document=document, message=str(exc))
|
|
|
|
|
- raise KnowledgeIndexingError(
|
|
|
|
|
- document_id=document.id,
|
|
|
|
|
- message=f"knowledge document indexing failed: {exc}") from exc
|
|
|
|
|
- indexed_document = self.document_repository.update_status(
|
|
|
|
|
- document_id=document.id,
|
|
|
|
|
- status="indexed")
|
|
|
|
|
- return indexed_document or document, chunks
|
|
|
|
|
- except Exception:
|
|
|
|
|
- if document is None:
|
|
|
|
|
- try:
|
|
|
|
|
- self.object_storage.delete_object(object_key=stored_object.object_key)
|
|
|
|
|
- except ObjectStorageError:
|
|
|
|
|
- pass
|
|
|
|
|
- raise
|
|
|
|
|
-
|
|
|
|
|
- def create_document_from_contract(
|
|
|
|
|
- self,
|
|
|
|
|
- payload: KnowledgeDocumentCreateRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
|
|
|
|
|
- return self.create_document(
|
|
|
|
|
- KnowledgeDocumentCreateRequest(
|
|
|
|
|
- knowledge_base_id=payload.knowledgeBaseId,
|
|
|
|
|
- title=payload.title,
|
|
|
|
|
- content_text=payload.contentText,
|
|
|
|
|
- content_base64=payload.contentBase64,
|
|
|
|
|
- source_type=payload.sourceType,
|
|
|
|
|
- source_uri=payload.sourceUri,
|
|
|
|
|
- metadata_json=payload.metadata,
|
|
|
|
|
- chunk_size=payload.chunkSize,
|
|
|
|
|
- chunk_overlap=payload.chunkOverlap))
|
|
|
|
|
-
|
|
|
|
|
- def create_document_from_contract_result(
|
|
|
|
|
- self,
|
|
|
|
|
- payload: KnowledgeDocumentCreateRequestDto) -> KnowledgeDocumentIngestResult:
|
|
|
|
|
- request = KnowledgeDocumentCreateRequest(
|
|
|
|
|
- knowledge_base_id=payload.knowledgeBaseId,
|
|
|
|
|
- title=payload.title,
|
|
|
|
|
- content_text=payload.contentText,
|
|
|
|
|
- content_base64=payload.contentBase64,
|
|
|
|
|
- source_type=payload.sourceType,
|
|
|
|
|
- source_uri=payload.sourceUri,
|
|
|
|
|
- metadata_json=payload.metadata,
|
|
|
|
|
- chunk_size=payload.chunkSize,
|
|
|
|
|
- chunk_overlap=payload.chunkOverlap)
|
|
|
|
|
- if self._should_queue_indexing(async_mode=payload.asyncMode):
|
|
|
|
|
- return self.create_document_index_job(payload=request)
|
|
|
|
|
- document, chunks = self.create_document(request)
|
|
|
|
|
- return KnowledgeDocumentIngestResult(
|
|
|
|
|
- document=document,
|
|
|
|
|
- chunks=chunks)
|
|
|
|
|
-
|
|
|
|
|
- def create_document_index_job(
|
|
|
|
|
- self,
|
|
|
|
|
- payload: KnowledgeDocumentCreateRequest) -> KnowledgeDocumentIngestResult:
|
|
|
|
|
- knowledge_base = self.base_repository.get_by_id(
|
|
|
|
|
- knowledge_base_id=payload.knowledge_base_id)
|
|
|
|
|
- if knowledge_base is None:
|
|
|
|
|
- raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
|
|
|
|
|
-
|
|
|
|
|
- raw_content = read_document_content_bytes(
|
|
|
|
|
- content_text=payload.content_text,
|
|
|
|
|
- content_base64=payload.content_base64)
|
|
|
|
|
- source_type = normalize_source_type(
|
|
|
|
|
- source_type=payload.source_type,
|
|
|
|
|
- source_uri=payload.source_uri)
|
|
|
|
|
- object_key = build_document_object_key(
|
|
|
|
|
- knowledge_base_id=payload.knowledge_base_id,
|
|
|
|
|
- source_type=source_type,
|
|
|
|
|
- title=payload.title)
|
|
|
|
|
- stored_object = self.object_storage.put_bytes(
|
|
|
|
|
- object_key=object_key,
|
|
|
|
|
- content=raw_content,
|
|
|
|
|
- content_type=self._guess_content_type(source_type=source_type))
|
|
|
|
|
- document: KnowledgeDocument | None = None
|
|
|
|
|
- try:
|
|
|
|
|
- document = self.document_repository.create(
|
|
|
|
|
- knowledge_base_id=payload.knowledge_base_id,
|
|
|
|
|
- title=payload.title,
|
|
|
|
|
- source_type=source_type,
|
|
|
|
|
- source_uri=payload.source_uri,
|
|
|
|
|
- content_text="",
|
|
|
|
|
- content_hash=hashlib.sha256(raw_content).hexdigest(),
|
|
|
|
|
- metadata_json={
|
|
|
|
|
- **payload.metadata_json,
|
|
|
|
|
- "object_storage": stored_object.to_metadata(),
|
|
|
|
|
- },
|
|
|
|
|
- status="draft")
|
|
|
|
|
- queued_document, job = self.queue_document_indexing(
|
|
|
|
|
- document=document,
|
|
|
|
|
- action="index",
|
|
|
|
|
- chunk_size=payload.chunk_size,
|
|
|
|
|
- chunk_overlap=payload.chunk_overlap)
|
|
|
|
|
- except Exception:
|
|
|
|
|
- if document is None:
|
|
|
|
|
- try:
|
|
|
|
|
- self.object_storage.delete_object(object_key=stored_object.object_key)
|
|
|
|
|
- except ObjectStorageError:
|
|
|
|
|
- pass
|
|
|
|
|
- raise
|
|
|
|
|
- return KnowledgeDocumentIngestResult(
|
|
|
|
|
- document=queued_document,
|
|
|
|
|
- chunks=[],
|
|
|
|
|
- queued=True,
|
|
|
|
|
- job=job)
|
|
|
|
|
-
|
|
|
|
|
- def parse_document(self, payload: KnowledgeDocumentParseRequest) -> ParsedDocument:
|
|
|
|
|
- try:
|
|
|
|
|
- return parse_document_content(
|
|
|
|
|
- source_type=payload.source_type,
|
|
|
|
|
- content_text=payload.content_text,
|
|
|
|
|
- content_base64=payload.content_base64,
|
|
|
|
|
- source_uri=payload.source_uri)
|
|
|
|
|
- except DocumentParseError:
|
|
|
|
|
- raise
|
|
|
|
|
-
|
|
|
|
|
- def queue_document_indexing(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- document: KnowledgeDocument,
|
|
|
|
|
- action: str,
|
|
|
|
|
- chunk_size: int | None = None,
|
|
|
|
|
- chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, KnowledgeIndexJobData]:
|
|
|
|
|
- job_id = f"kjob_{uuid4().hex}"
|
|
|
|
|
- metadata = self._write_index_job_metadata(
|
|
|
|
|
- document=document,
|
|
|
|
|
- action=action,
|
|
|
|
|
- job_id=job_id,
|
|
|
|
|
- status="queued",
|
|
|
|
|
- progress=0,
|
|
|
|
|
- chunk_size=chunk_size,
|
|
|
|
|
- chunk_overlap=chunk_overlap)
|
|
|
|
|
- updated_document = self.document_repository.update(
|
|
|
|
|
- document_id=document.id,
|
|
|
|
|
- status="queued",
|
|
|
|
|
- metadata_json=metadata)
|
|
|
|
|
- document_for_job = updated_document or document
|
|
|
|
|
- published = self._publish_document_index_job(
|
|
|
|
|
- document_id=document.id,
|
|
|
|
|
- action=action,
|
|
|
|
|
- job_id=job_id)
|
|
|
|
|
- if not published:
|
|
|
|
|
- metadata = self._write_index_job_metadata(
|
|
|
|
|
- document=document_for_job,
|
|
|
|
|
- action=action,
|
|
|
|
|
- job_id=job_id,
|
|
|
|
|
- status="running",
|
|
|
|
|
- progress=1,
|
|
|
|
|
- chunk_size=chunk_size,
|
|
|
|
|
- chunk_overlap=chunk_overlap,
|
|
|
|
|
- worker_key="inline-fallback")
|
|
|
|
|
- document_for_job = self.document_repository.update(
|
|
|
|
|
- document_id=document.id,
|
|
|
|
|
- status="indexing",
|
|
|
|
|
- metadata_json=metadata) or document_for_job
|
|
|
|
|
- processed = self.process_document_index_job(
|
|
|
|
|
- document_id=document.id,
|
|
|
|
|
- action=action,
|
|
|
|
|
- job_id=job_id,
|
|
|
|
|
- worker_key="inline-fallback",
|
|
|
|
|
- chunk_size=chunk_size,
|
|
|
|
|
- chunk_overlap=chunk_overlap)
|
|
|
|
|
- if processed is not None:
|
|
|
|
|
- indexed_document, chunks = processed
|
|
|
|
|
- return indexed_document, self._read_latest_index_job(document=indexed_document)
|
|
|
|
|
- return document_for_job, self._read_latest_index_job(document=document_for_job)
|
|
|
|
|
-
|
|
|
|
|
- def list_documents(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- knowledge_base_id: str) -> list[KnowledgeDocument]:
|
|
|
|
|
- return self.document_repository.list_by_base(
|
|
|
|
|
- knowledge_base_id=knowledge_base_id)
|
|
|
|
|
|
|
+ return self.crud_service.delete_base(knowledge_base_id=knowledge_base_id)
|
|
|
|
|
|
|
|
- def list_documents_filtered(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- knowledge_base_id: str | None = None,
|
|
|
|
|
- keyword: str | None = None,
|
|
|
|
|
- status: str | None = None,
|
|
|
|
|
- source_type: str | None = None) -> list[KnowledgeDocument]:
|
|
|
|
|
- return self.document_repository.list_filtered(
|
|
|
|
|
- knowledge_base_id=knowledge_base_id,
|
|
|
|
|
- keyword=keyword,
|
|
|
|
|
- status=status,
|
|
|
|
|
- source_type=source_type)
|
|
|
|
|
|
|
+ def update_base_status(self, *, knowledge_base_id: str, payload: KnowledgeBaseStatusUpdateRequest) -> KnowledgeBase | None:
|
|
|
|
|
+ return self.crud_service.update_base_status(knowledge_base_id=knowledge_base_id, payload=payload)
|
|
|
|
|
|
|
|
- def update_document_from_contract(
|
|
|
|
|
- self,
|
|
|
|
|
- payload: KnowledgeDocumentUpdateRequestDto) -> KnowledgeDocument | None:
|
|
|
|
|
- return self.document_repository.update(
|
|
|
|
|
- document_id=payload.documentId,
|
|
|
|
|
- title=payload.title,
|
|
|
|
|
- source_uri=payload.sourceUri,
|
|
|
|
|
- status=payload.status,
|
|
|
|
|
- metadata_json=payload.metadata)
|
|
|
|
|
-
|
|
|
|
|
- def reindex_document(
|
|
|
|
|
- self,
|
|
|
|
|
- payload: KnowledgeDocumentReindexRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
|
|
|
|
|
- document = self.document_repository.get_by_id(document_id=payload.documentId)
|
|
|
|
|
- if document is None:
|
|
|
|
|
- return None
|
|
|
|
|
- try:
|
|
|
|
|
- parsed = self._parse_document_for_indexing(document=document)
|
|
|
|
|
- chunks = self._index_document(
|
|
|
|
|
- document=document,
|
|
|
|
|
- content_text=parsed.content_text,
|
|
|
|
|
- chunk_size=payload.chunkSize,
|
|
|
|
|
- chunk_overlap=payload.chunkOverlap)
|
|
|
|
|
- except Exception as exc:
|
|
|
|
|
- self._mark_document_failed(document=document, message=str(exc))
|
|
|
|
|
- raise KnowledgeIndexingError(
|
|
|
|
|
- document_id=document.id,
|
|
|
|
|
- message=f"knowledge document reindex failed: {exc}") from exc
|
|
|
|
|
- metadata = dict(document.metadata_json or {})
|
|
|
|
|
- metadata["parser_metadata"] = parsed.metadata_json
|
|
|
|
|
- indexed_document = self.document_repository.update(
|
|
|
|
|
- document_id=document.id,
|
|
|
|
|
- status="indexed",
|
|
|
|
|
- metadata_json=metadata)
|
|
|
|
|
- return indexed_document or document, chunks
|
|
|
|
|
|
|
+ # ── Document CRUD ─────────────────────────────────────────────────
|
|
|
|
|
|
|
|
- def reindex_document_from_contract_result(
|
|
|
|
|
- self,
|
|
|
|
|
- payload: KnowledgeDocumentReindexRequestDto) -> KnowledgeDocumentIngestResult | None:
|
|
|
|
|
- if self._should_queue_indexing(async_mode=payload.asyncMode):
|
|
|
|
|
- document = self.document_repository.get_by_id(document_id=payload.documentId)
|
|
|
|
|
- if document is None:
|
|
|
|
|
- return None
|
|
|
|
|
- queued_document, job = self.queue_document_indexing(
|
|
|
|
|
- document=document,
|
|
|
|
|
- action="reindex",
|
|
|
|
|
- chunk_size=payload.chunkSize,
|
|
|
|
|
- chunk_overlap=payload.chunkOverlap)
|
|
|
|
|
- return KnowledgeDocumentIngestResult(
|
|
|
|
|
- document=queued_document,
|
|
|
|
|
- chunks=[],
|
|
|
|
|
- queued=True,
|
|
|
|
|
- job=job)
|
|
|
|
|
- result = self.reindex_document(payload)
|
|
|
|
|
- if result is None:
|
|
|
|
|
- return None
|
|
|
|
|
- document, chunks = result
|
|
|
|
|
- return KnowledgeDocumentIngestResult(
|
|
|
|
|
- document=document,
|
|
|
|
|
- chunks=chunks)
|
|
|
|
|
-
|
|
|
|
|
- def reindex_base_from_contract(
|
|
|
|
|
- self,
|
|
|
|
|
- payload: KnowledgeBaseReindexRequestDto) -> list[KnowledgeIndexJobData]:
|
|
|
|
|
- documents = self.document_repository.list_by_base(
|
|
|
|
|
- knowledge_base_id=payload.knowledgeBaseId)
|
|
|
|
|
- jobs: list[KnowledgeIndexJobData] = []
|
|
|
|
|
- for document in documents:
|
|
|
|
|
- if document.status == "archived":
|
|
|
|
|
- continue
|
|
|
|
|
- queued_document, job = self.queue_document_indexing(
|
|
|
|
|
- document=document,
|
|
|
|
|
- action="reindex",
|
|
|
|
|
- chunk_size=payload.chunkSize,
|
|
|
|
|
- chunk_overlap=payload.chunkOverlap)
|
|
|
|
|
- jobs.append(job or self._read_latest_index_job(document=queued_document))
|
|
|
|
|
- return jobs
|
|
|
|
|
-
|
|
|
|
|
- def process_document_index_job(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- document_id: str,
|
|
|
|
|
- action: str,
|
|
|
|
|
- worker_key: str,
|
|
|
|
|
- job_id: str | None = None,
|
|
|
|
|
- chunk_size: int | None = None,
|
|
|
|
|
- chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
|
|
|
|
|
- document = self.document_repository.get_by_id(document_id=document_id)
|
|
|
|
|
- if document is None:
|
|
|
|
|
- return None
|
|
|
|
|
- resolved_job = self._read_latest_index_job(document=document)
|
|
|
|
|
- resolved_job_id = job_id or resolved_job.jobId
|
|
|
|
|
- resolved_chunk_size = chunk_size if chunk_size is not None else resolved_job.chunkSize
|
|
|
|
|
- resolved_chunk_overlap = chunk_overlap if chunk_overlap is not None else resolved_job.chunkOverlap
|
|
|
|
|
- if document.status == "archived":
|
|
|
|
|
- metadata = self._write_index_job_metadata(
|
|
|
|
|
- document=document,
|
|
|
|
|
- action=action,
|
|
|
|
|
- job_id=resolved_job_id,
|
|
|
|
|
- status="skipped",
|
|
|
|
|
- progress=100,
|
|
|
|
|
- chunk_size=resolved_chunk_size,
|
|
|
|
|
- chunk_overlap=resolved_chunk_overlap,
|
|
|
|
|
- worker_key=worker_key,
|
|
|
|
|
- completed_time=datetime.utcnow(),
|
|
|
|
|
- error_message="document is archived")
|
|
|
|
|
- skipped_document = self.document_repository.update(
|
|
|
|
|
- document_id=document.id,
|
|
|
|
|
- metadata_json=metadata) or document
|
|
|
|
|
- return skipped_document, []
|
|
|
|
|
|
|
+ def list_documents(self, *, knowledge_base_id: str) -> list[KnowledgeDocument]:
|
|
|
|
|
+ return self.crud_service.list_documents(knowledge_base_id=knowledge_base_id)
|
|
|
|
|
|
|
|
- metadata = self._write_index_job_metadata(
|
|
|
|
|
- document=document,
|
|
|
|
|
- action=action,
|
|
|
|
|
- job_id=resolved_job_id,
|
|
|
|
|
- status="running",
|
|
|
|
|
- progress=10,
|
|
|
|
|
- chunk_size=resolved_chunk_size,
|
|
|
|
|
- chunk_overlap=resolved_chunk_overlap,
|
|
|
|
|
- worker_key=worker_key,
|
|
|
|
|
- started_time=datetime.utcnow())
|
|
|
|
|
- running_document = self.document_repository.update(
|
|
|
|
|
- document_id=document.id,
|
|
|
|
|
- status="indexing",
|
|
|
|
|
- metadata_json=metadata) or document
|
|
|
|
|
- try:
|
|
|
|
|
- parsed = self._parse_document_for_indexing(document=running_document)
|
|
|
|
|
- metadata = self._write_index_job_metadata(
|
|
|
|
|
- document=running_document,
|
|
|
|
|
- action=action,
|
|
|
|
|
- job_id=resolved_job_id,
|
|
|
|
|
- status="running",
|
|
|
|
|
- progress=40,
|
|
|
|
|
- chunk_size=resolved_chunk_size,
|
|
|
|
|
- chunk_overlap=resolved_chunk_overlap,
|
|
|
|
|
- worker_key=worker_key,
|
|
|
|
|
- started_time=self._read_latest_index_job(document=running_document).startedTime)
|
|
|
|
|
- metadata["parser_metadata"] = parsed.metadata_json
|
|
|
|
|
- running_document = self.document_repository.update(
|
|
|
|
|
- document_id=document.id,
|
|
|
|
|
- status="indexing",
|
|
|
|
|
- metadata_json=metadata) or running_document
|
|
|
|
|
- chunks = self._index_document(
|
|
|
|
|
- document=running_document,
|
|
|
|
|
- content_text=parsed.content_text,
|
|
|
|
|
- chunk_size=resolved_chunk_size,
|
|
|
|
|
- chunk_overlap=resolved_chunk_overlap)
|
|
|
|
|
- except Exception as exc:
|
|
|
|
|
- self._mark_document_failed(
|
|
|
|
|
- document=running_document,
|
|
|
|
|
- message=str(exc),
|
|
|
|
|
- job_id=resolved_job_id,
|
|
|
|
|
- action=action,
|
|
|
|
|
- worker_key=worker_key,
|
|
|
|
|
- chunk_size=resolved_chunk_size,
|
|
|
|
|
- chunk_overlap=resolved_chunk_overlap)
|
|
|
|
|
- raise KnowledgeIndexingError(
|
|
|
|
|
- document_id=document.id,
|
|
|
|
|
- message=f"knowledge document {action} failed: {exc}") from exc
|
|
|
|
|
|
|
+ def list_documents_filtered(self, *, knowledge_base_id: str | None = None, keyword: str | None = None, status: str | None = None, source_type: str | None = None) -> list[KnowledgeDocument]:
|
|
|
|
|
+ return self.crud_service.list_documents_filtered(
|
|
|
|
|
+ knowledge_base_id=knowledge_base_id, keyword=keyword,
|
|
|
|
|
+ status=status, source_type=source_type)
|
|
|
|
|
|
|
|
- metadata = self._write_index_job_metadata(
|
|
|
|
|
- document=running_document,
|
|
|
|
|
- action=action,
|
|
|
|
|
- job_id=resolved_job_id,
|
|
|
|
|
- status="completed",
|
|
|
|
|
- progress=100,
|
|
|
|
|
- chunk_size=resolved_chunk_size,
|
|
|
|
|
- chunk_overlap=resolved_chunk_overlap,
|
|
|
|
|
- worker_key=worker_key,
|
|
|
|
|
- completed_time=datetime.utcnow())
|
|
|
|
|
- metadata["parser_metadata"] = parsed.metadata_json
|
|
|
|
|
- indexed_document = self.document_repository.update(
|
|
|
|
|
- document_id=document.id,
|
|
|
|
|
- status="indexed",
|
|
|
|
|
- metadata_json=metadata)
|
|
|
|
|
- return indexed_document or running_document, chunks
|
|
|
|
|
-
|
|
|
|
|
- def execute_document_index_job(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- document_id: str,
|
|
|
|
|
- action: str,
|
|
|
|
|
- worker_key: str,
|
|
|
|
|
- lease_seconds: int,
|
|
|
|
|
- job_id: str | None = None,
|
|
|
|
|
- redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
|
|
|
|
|
- resolved_redis_client = redis_client or self.redis_client
|
|
|
|
|
- idempotency_key = f"{document_id}:{job_id or action}"
|
|
|
|
|
- lock = None
|
|
|
|
|
- idempotency_store = None
|
|
|
|
|
- if resolved_redis_client is not None:
|
|
|
|
|
- from core_shared.redis_primitives import DistributedLock, IdempotencyStore
|
|
|
|
|
-
|
|
|
|
|
- lock = DistributedLock(
|
|
|
|
|
- client=resolved_redis_client,
|
|
|
|
|
- name=f"knowledge-document:{document_id}:lock",
|
|
|
|
|
- ttl_seconds=lease_seconds)
|
|
|
|
|
- if not lock.acquire():
|
|
|
|
|
- return None
|
|
|
|
|
- idempotency_store = IdempotencyStore(
|
|
|
|
|
- client=resolved_redis_client,
|
|
|
|
|
- prefix="knowledge-document-idempotency")
|
|
|
|
|
- if not idempotency_store.begin(key=idempotency_key):
|
|
|
|
|
- lock.release()
|
|
|
|
|
- return None
|
|
|
|
|
- try:
|
|
|
|
|
- result = self.process_document_index_job(
|
|
|
|
|
- document_id=document_id,
|
|
|
|
|
- action=action,
|
|
|
|
|
- job_id=job_id,
|
|
|
|
|
- worker_key=worker_key)
|
|
|
|
|
- if idempotency_store is not None and result is not None:
|
|
|
|
|
- document, chunks = result
|
|
|
|
|
- idempotency_store.complete(
|
|
|
|
|
- key=idempotency_key,
|
|
|
|
|
- result={
|
|
|
|
|
- "status": document.status,
|
|
|
|
|
- "document_id": document.id,
|
|
|
|
|
- "chunk_count": len(chunks),
|
|
|
|
|
- })
|
|
|
|
|
- except Exception:
|
|
|
|
|
- if idempotency_store is not None:
|
|
|
|
|
- idempotency_store.clear(key=idempotency_key)
|
|
|
|
|
- raise
|
|
|
|
|
- finally:
|
|
|
|
|
- if lock is not None:
|
|
|
|
|
- lock.release()
|
|
|
|
|
- return result
|
|
|
|
|
-
|
|
|
|
|
- def execute_next_pending_document_job(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- worker_key: str,
|
|
|
|
|
- lease_seconds: int,
|
|
|
|
|
- stale_indexing_seconds: int,
|
|
|
|
|
- redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
|
|
|
|
|
- stale_before = datetime.utcnow() - timedelta(seconds=stale_indexing_seconds)
|
|
|
|
|
- document = self.document_repository.get_next_pending_indexing(
|
|
|
|
|
- stale_before=stale_before)
|
|
|
|
|
- if document is None:
|
|
|
|
|
- return None
|
|
|
|
|
- job = self._read_latest_index_job(document=document)
|
|
|
|
|
- return self.execute_document_index_job(
|
|
|
|
|
- document_id=document.id,
|
|
|
|
|
- action=job.action,
|
|
|
|
|
- job_id=job.jobId,
|
|
|
|
|
- worker_key=worker_key,
|
|
|
|
|
- lease_seconds=lease_seconds,
|
|
|
|
|
- redis_client=redis_client)
|
|
|
|
|
-
|
|
|
|
|
- def list_index_jobs(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- knowledge_base_id: str | None = None,
|
|
|
|
|
- document_id: str | None = None,
|
|
|
|
|
- status: str | None = None) -> list[KnowledgeIndexJobData]:
|
|
|
|
|
- documents = self.document_repository.list_filtered(
|
|
|
|
|
- knowledge_base_id=knowledge_base_id)
|
|
|
|
|
- jobs: list[KnowledgeIndexJobData] = []
|
|
|
|
|
- for document in documents:
|
|
|
|
|
- if document_id is not None and document.id != document_id:
|
|
|
|
|
- continue
|
|
|
|
|
- job = self._try_read_latest_index_job(document=document)
|
|
|
|
|
- if job is None:
|
|
|
|
|
- continue
|
|
|
|
|
- if status is not None and job.status != status:
|
|
|
|
|
- continue
|
|
|
|
|
- jobs.append(job)
|
|
|
|
|
- jobs.sort(
|
|
|
|
|
- key=lambda item: item.queuedTime or datetime.min,
|
|
|
|
|
- reverse=True)
|
|
|
|
|
- return jobs
|
|
|
|
|
-
|
|
|
|
|
- def detail_index_job(self, *, document_id: str) -> KnowledgeIndexJobData | None:
|
|
|
|
|
- document = self.document_repository.get_by_id(document_id=document_id)
|
|
|
|
|
- if document is None:
|
|
|
|
|
- return None
|
|
|
|
|
- return self._try_read_latest_index_job(document=document)
|
|
|
|
|
|
|
+ def update_document_from_contract(self, payload: KnowledgeDocumentUpdateRequestDto) -> KnowledgeDocument | None:
|
|
|
|
|
+ return self.crud_service.update_document_from_contract(payload)
|
|
|
|
|
|
|
|
def delete_document(self, *, document_id: str) -> bool:
|
|
def delete_document(self, *, document_id: str) -> bool:
|
|
|
- return bool(self.delete_document_result(document_id=document_id)["deleted"])
|
|
|
|
|
|
|
+ return self.crud_service.delete_document(document_id=document_id)
|
|
|
|
|
|
|
|
def delete_document_result(self, *, document_id: str) -> dict[str, JSONValue]:
|
|
def delete_document_result(self, *, document_id: str) -> dict[str, JSONValue]:
|
|
|
- document = self.document_repository.get_by_id(document_id=document_id)
|
|
|
|
|
- if document is None:
|
|
|
|
|
- return {
|
|
|
|
|
- "deleted": False,
|
|
|
|
|
- "objectDeleted": False,
|
|
|
|
|
- "documentId": document_id,
|
|
|
|
|
- }
|
|
|
|
|
- object_deleted = self._delete_document_object(document=document)
|
|
|
|
|
- self.chunk_repository.delete_by_document(document_id=document_id)
|
|
|
|
|
- deleted = self.document_repository.delete(document_id=document_id) is not None
|
|
|
|
|
- return {
|
|
|
|
|
- "deleted": deleted,
|
|
|
|
|
- "objectDeleted": object_deleted,
|
|
|
|
|
- "documentId": document_id,
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ return self.crud_service.delete_document_result(document_id=document_id)
|
|
|
|
|
|
|
|
- def read_document_content(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- document_id: str,
|
|
|
|
|
- include_text: bool = True,
|
|
|
|
|
- include_base64: bool = False) -> dict[str, JSONValue] | None:
|
|
|
|
|
- document = self.document_repository.get_by_id(document_id=document_id)
|
|
|
|
|
- if document is None:
|
|
|
|
|
- return None
|
|
|
|
|
- raw_content = self._read_document_raw_content(document=document)
|
|
|
|
|
- object_status = self.read_document_storage_status(document_id=document_id)
|
|
|
|
|
- content_type = self._read_content_type_from_status(object_status)
|
|
|
|
|
- payload: dict[str, JSONValue] = {
|
|
|
|
|
- "documentId": document.id,
|
|
|
|
|
- "title": document.title,
|
|
|
|
|
- "sourceType": document.source_type,
|
|
|
|
|
- "contentType": content_type,
|
|
|
|
|
- "sizeBytes": len(raw_content),
|
|
|
|
|
- "objectStorage": self._read_object_storage_metadata(document=document),
|
|
|
|
|
- "contentBase64": None,
|
|
|
|
|
- "contentText": None,
|
|
|
|
|
- }
|
|
|
|
|
- if include_base64:
|
|
|
|
|
- payload["contentBase64"] = base64.b64encode(raw_content).decode("ascii")
|
|
|
|
|
- if include_text and self._is_text_content_type(content_type=content_type, source_type=document.source_type):
|
|
|
|
|
- payload["contentText"] = raw_content.decode("utf-8", errors="replace")
|
|
|
|
|
- return payload
|
|
|
|
|
|
|
+ def read_document_content(self, *, document_id: str, include_text: bool = True, include_base64: bool = False) -> dict[str, JSONValue] | None:
|
|
|
|
|
+ return self.crud_service.read_document_content(
|
|
|
|
|
+ document_id=document_id, include_text=include_text, include_base64=include_base64)
|
|
|
|
|
|
|
|
def read_document_storage_status(self, *, document_id: str) -> dict[str, JSONValue] | None:
|
|
def read_document_storage_status(self, *, document_id: str) -> dict[str, JSONValue] | None:
|
|
|
- document = self.document_repository.get_by_id(document_id=document_id)
|
|
|
|
|
- if document is None:
|
|
|
|
|
- return None
|
|
|
|
|
- object_key = self._read_document_object_key(document=document)
|
|
|
|
|
- if object_key is None:
|
|
|
|
|
- return {
|
|
|
|
|
- "documentId": document.id,
|
|
|
|
|
- "exists": False,
|
|
|
|
|
- "objectStorage": None,
|
|
|
|
|
- "errorMessage": "document object metadata is missing",
|
|
|
|
|
- }
|
|
|
|
|
- status = self.object_storage.head_object(object_key=object_key)
|
|
|
|
|
- return self._object_status_to_payload(document=document, status=status)
|
|
|
|
|
|
|
+ return self.crud_service.read_document_storage_status(document_id=document_id)
|
|
|
|
|
|
|
|
def read_storage_health(self) -> dict[str, JSONValue]:
|
|
def read_storage_health(self) -> dict[str, JSONValue]:
|
|
|
- return dict(self.object_storage.health_check())
|
|
|
|
|
-
|
|
|
|
|
- def list_chunks_filtered(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- knowledge_base_id: str | None = None,
|
|
|
|
|
- document_id: str | None = None,
|
|
|
|
|
- keyword: str | None = None) -> list[KnowledgeChunk]:
|
|
|
|
|
- return self.chunk_repository.list_filtered(
|
|
|
|
|
- knowledge_base_id=knowledge_base_id,
|
|
|
|
|
- document_id=document_id,
|
|
|
|
|
- keyword=keyword)
|
|
|
|
|
-
|
|
|
|
|
- def delete_chunk(self, *, chunk_id: str) -> bool:
|
|
|
|
|
- return self.chunk_repository.delete(chunk_id=chunk_id)
|
|
|
|
|
-
|
|
|
|
|
- def search(
|
|
|
|
|
- self,
|
|
|
|
|
- payload: KnowledgeSearchRequest) -> list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]]:
|
|
|
|
|
- document_cache: dict[str, KnowledgeDocument] = {}
|
|
|
|
|
- query_embedding_result = self.embedding_service.embed_text(payload.query)
|
|
|
|
|
- candidate_limit = max(
|
|
|
|
|
- payload.top_k * max(self.settings.retrieval_candidate_multiplier, 1),
|
|
|
|
|
- payload.top_k)
|
|
|
|
|
- vector_candidates = self.chunk_repository.search_by_vector(
|
|
|
|
|
- knowledge_base_id=payload.knowledge_base_id,
|
|
|
|
|
- embedding=query_embedding_result.embedding,
|
|
|
|
|
- limit=candidate_limit)
|
|
|
|
|
- if vector_candidates:
|
|
|
|
|
- chunks = [chunk for chunk, _ in vector_candidates]
|
|
|
|
|
- vector_scores_by_chunk_id = {
|
|
|
|
|
- chunk.id: score for chunk, score in vector_candidates
|
|
|
|
|
- }
|
|
|
|
|
- retrieval_mode = "pgvector-hybrid"
|
|
|
|
|
- else:
|
|
|
|
|
- chunks = self.chunk_repository.list_by_base(
|
|
|
|
|
- knowledge_base_id=payload.knowledge_base_id)
|
|
|
|
|
- vector_scores_by_chunk_id = {}
|
|
|
|
|
- retrieval_mode = "hybrid"
|
|
|
|
|
- # Resolve per-base retrieval weights with global fallback
|
|
|
|
|
- kb = self.base_repository.get_by_id(knowledge_base_id=payload.knowledge_base_id)
|
|
|
|
|
- retrieval_config = (kb.metadata_json or {}).get("retrieval_config", {}) if kb else {}
|
|
|
|
|
- keyword_weight = float(retrieval_config.get("keyword_weight", self.settings.retrieval_keyword_weight))
|
|
|
|
|
- vector_weight = float(retrieval_config.get("vector_weight", self.settings.retrieval_vector_weight))
|
|
|
|
|
- rerank_weight = float(retrieval_config.get("rerank_weight", self.settings.retrieval_rerank_weight))
|
|
|
|
|
- # Pre-compute BM25 collection stats from candidate chunks
|
|
|
|
|
- chunk_texts = [chunk.content_text for chunk in chunks]
|
|
|
|
|
- avg_doc_length, doc_count, df_map = compute_bm25_stats(chunk_texts)
|
|
|
|
|
- scored: list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]] = []
|
|
|
|
|
- for chunk in chunks:
|
|
|
|
|
- document = document_cache.get(chunk.document_id)
|
|
|
|
|
- if document is None:
|
|
|
|
|
- document = self.document_repository.get_by_id(
|
|
|
|
|
- document_id=chunk.document_id)
|
|
|
|
|
- if document is None:
|
|
|
|
|
- continue
|
|
|
|
|
- document_cache[chunk.document_id] = document
|
|
|
|
|
- if not self._matches_filters(document=document, filters_json=payload.filters_json):
|
|
|
|
|
- continue
|
|
|
|
|
- keyword = bm25_score(
|
|
|
|
|
- payload.query, chunk.content_text,
|
|
|
|
|
- avg_doc_length=avg_doc_length, doc_count=doc_count, df=df_map)
|
|
|
|
|
- vector = vector_scores_by_chunk_id.get(chunk.id)
|
|
|
|
|
- if vector is None:
|
|
|
|
|
- vector = cosine_similarity(query_embedding_result.embedding, chunk.embedding_json)
|
|
|
|
|
- rerank = (
|
|
|
|
|
- rerank_score(
|
|
|
|
|
- query=payload.query,
|
|
|
|
|
- chunk_text=chunk.content_text,
|
|
|
|
|
- document_title=document.title)
|
|
|
|
|
- if self.settings.retrieval_rerank_enabled
|
|
|
|
|
- else 0.0
|
|
|
|
|
- )
|
|
|
|
|
- score = round(
|
|
|
|
|
- keyword * keyword_weight
|
|
|
|
|
- + vector * vector_weight
|
|
|
|
|
- + rerank * rerank_weight,
|
|
|
|
|
- 6)
|
|
|
|
|
- scored.append(
|
|
|
|
|
- (
|
|
|
|
|
- chunk,
|
|
|
|
|
- document,
|
|
|
|
|
- score,
|
|
|
|
|
- {
|
|
|
|
|
- "final_score": score,
|
|
|
|
|
- "keyword_score": round(keyword, 6),
|
|
|
|
|
- "vector_score": round(vector, 6),
|
|
|
|
|
- "rerank_score": round(rerank, 6),
|
|
|
|
|
- "retrieval_mode": retrieval_mode,
|
|
|
|
|
- "rerank_enabled": self.settings.retrieval_rerank_enabled,
|
|
|
|
|
- "candidate_limit": candidate_limit,
|
|
|
|
|
- "weights": {
|
|
|
|
|
- "keyword": keyword_weight,
|
|
|
|
|
- "vector": vector_weight,
|
|
|
|
|
- "rerank": rerank_weight,
|
|
|
|
|
- },
|
|
|
|
|
- "embedding_provider": query_embedding_result.provider,
|
|
|
|
|
- "embedding_model": query_embedding_result.model,
|
|
|
|
|
- "citation": {
|
|
|
|
|
- "document_id": document.id,
|
|
|
|
|
- "document_title": document.title,
|
|
|
|
|
- "source_uri": document.source_uri,
|
|
|
|
|
- "chunk_id": chunk.id,
|
|
|
|
|
- "chunk_index": chunk.chunk_index,
|
|
|
|
|
- },
|
|
|
|
|
- })
|
|
|
|
|
- )
|
|
|
|
|
- scored.sort(key=lambda item: item[2], reverse=True)
|
|
|
|
|
- return scored[: payload.top_k]
|
|
|
|
|
-
|
|
|
|
|
- def _index_document(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- document: KnowledgeDocument,
|
|
|
|
|
- content_text: str,
|
|
|
|
|
- chunk_size: int | None,
|
|
|
|
|
- chunk_overlap: int | None) -> list[KnowledgeChunk]:
|
|
|
|
|
- source_type = document.source_type or "text"
|
|
|
|
|
- resolved_size = chunk_size or self.settings.default_chunk_size
|
|
|
|
|
- resolved_overlap = chunk_overlap or self.settings.default_chunk_overlap
|
|
|
|
|
- chunk_payloads = chunk_document(
|
|
|
|
|
- content_text=content_text,
|
|
|
|
|
- source_type=source_type,
|
|
|
|
|
- chunk_size=resolved_size,
|
|
|
|
|
- chunk_overlap=resolved_overlap)
|
|
|
|
|
- for chunk_payload in chunk_payloads:
|
|
|
|
|
- content_text = self._read_chunk_content(chunk_payload)
|
|
|
|
|
- embedding_result = self.embedding_service.embed_text(content_text)
|
|
|
|
|
- chunk_payload["embedding_model"] = embedding_result.model
|
|
|
|
|
- chunk_payload["embedding_json"] = embedding_result.embedding
|
|
|
|
|
- chunk_payload["metadata_json"] = {
|
|
|
|
|
- "embedding_provider": embedding_result.provider,
|
|
|
|
|
- }
|
|
|
|
|
- return self.chunk_repository.replace_document_chunks(
|
|
|
|
|
- knowledge_base_id=document.knowledge_base_id,
|
|
|
|
|
- document_id=document.id,
|
|
|
|
|
- chunks=chunk_payloads)
|
|
|
|
|
-
|
|
|
|
|
- def _read_chunk_content(self, chunk_payload: dict[str, JSONValue]) -> str:
|
|
|
|
|
- value = chunk_payload.get("content_text")
|
|
|
|
|
- return value if isinstance(value, str) else ""
|
|
|
|
|
-
|
|
|
|
|
- def _parse_document_for_indexing(self, *, document: KnowledgeDocument) -> ParsedDocument:
|
|
|
|
|
- metadata = document.metadata_json or {}
|
|
|
|
|
- object_metadata = metadata.get("object_storage")
|
|
|
|
|
- if isinstance(object_metadata, dict):
|
|
|
|
|
- object_key = object_metadata.get("objectKey")
|
|
|
|
|
- if isinstance(object_key, str) and object_key:
|
|
|
|
|
- try:
|
|
|
|
|
- raw_content = self.object_storage.get_bytes(object_key=object_key)
|
|
|
|
|
- except ObjectStorageNotFoundError as exc:
|
|
|
|
|
- raise ValueError(f"knowledge document content object not found: {document.id}") from exc
|
|
|
|
|
- return parse_document_content(
|
|
|
|
|
- source_type=document.source_type,
|
|
|
|
|
- source_uri=document.source_uri,
|
|
|
|
|
- content_base64=base64.b64encode(raw_content).decode("ascii"))
|
|
|
|
|
- if document.content_text:
|
|
|
|
|
- return parse_document_content(
|
|
|
|
|
- source_type=document.source_type,
|
|
|
|
|
- source_uri=document.source_uri,
|
|
|
|
|
- content_text=document.content_text)
|
|
|
|
|
- raise ValueError(f"knowledge document content object not found: {document.id}")
|
|
|
|
|
|
|
+ return self.crud_service.read_storage_health()
|
|
|
|
|
|
|
|
- def _read_document_content_for_indexing(self, *, document: KnowledgeDocument) -> str:
|
|
|
|
|
- return self._parse_document_for_indexing(document=document).content_text
|
|
|
|
|
|
|
+ # ── Chunk CRUD ────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
- def _read_document_raw_content(self, *, document: KnowledgeDocument) -> bytes:
|
|
|
|
|
- object_key = self._read_document_object_key(document=document)
|
|
|
|
|
- if isinstance(object_key, str) and object_key:
|
|
|
|
|
- return self.object_storage.get_bytes(object_key=object_key)
|
|
|
|
|
- if document.content_text:
|
|
|
|
|
- return document.content_text.encode("utf-8")
|
|
|
|
|
- raise ValueError(f"knowledge document content object not found: {document.id}")
|
|
|
|
|
|
|
+ def list_chunks_filtered(self, *, knowledge_base_id: str | None = None, document_id: str | None = None, keyword: str | None = None) -> list[KnowledgeChunk]:
|
|
|
|
|
+ return self.crud_service.list_chunks_filtered(
|
|
|
|
|
+ knowledge_base_id=knowledge_base_id, document_id=document_id, keyword=keyword)
|
|
|
|
|
|
|
|
- def _delete_document_object(self, *, document: KnowledgeDocument) -> bool:
|
|
|
|
|
- object_key = self._read_document_object_key(document=document)
|
|
|
|
|
- if object_key is None:
|
|
|
|
|
- return False
|
|
|
|
|
- try:
|
|
|
|
|
- return self.object_storage.delete_object(object_key=object_key)
|
|
|
|
|
- except ObjectStorageNotFoundError:
|
|
|
|
|
- return False
|
|
|
|
|
|
|
+ def delete_chunk(self, *, chunk_id: str) -> bool:
|
|
|
|
|
+ return self.crud_service.delete_chunk(chunk_id=chunk_id)
|
|
|
|
|
|
|
|
- def _read_document_object_key(self, *, document: KnowledgeDocument) -> str | None:
|
|
|
|
|
- object_metadata = self._read_object_storage_metadata(document=document)
|
|
|
|
|
- if object_metadata is None:
|
|
|
|
|
- return None
|
|
|
|
|
- object_key = object_metadata.get("objectKey")
|
|
|
|
|
- return object_key if isinstance(object_key, str) and object_key else None
|
|
|
|
|
|
|
+ # ── Parse ─────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
- def _read_object_storage_metadata(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- document: KnowledgeDocument) -> dict[str, JSONValue] | None:
|
|
|
|
|
- metadata = document.metadata_json or {}
|
|
|
|
|
- object_metadata = metadata.get("object_storage")
|
|
|
|
|
- return object_metadata if isinstance(object_metadata, dict) else None
|
|
|
|
|
|
|
+ def parse_document(self, payload: KnowledgeDocumentParseRequest):
|
|
|
|
|
+ return self.crud_service.parse_document(payload)
|
|
|
|
|
|
|
|
- def _object_status_to_payload(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- document: KnowledgeDocument,
|
|
|
|
|
- status: ObjectStorageStatus) -> dict[str, JSONValue]:
|
|
|
|
|
- return {
|
|
|
|
|
- "documentId": document.id,
|
|
|
|
|
- "exists": status.exists,
|
|
|
|
|
- "objectStorage": self._read_object_storage_metadata(document=document),
|
|
|
|
|
- "contentType": status.content_type,
|
|
|
|
|
- "sizeBytes": status.size_bytes,
|
|
|
|
|
- "etag": status.etag,
|
|
|
|
|
- "errorMessage": status.error_message,
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ # ── Indexing ──────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
- def _read_content_type_from_status(
|
|
|
|
|
- self,
|
|
|
|
|
- object_status: dict[str, JSONValue] | None) -> str | None:
|
|
|
|
|
- if object_status is None:
|
|
|
|
|
- return None
|
|
|
|
|
- content_type = object_status.get("contentType")
|
|
|
|
|
- return content_type if isinstance(content_type, str) else None
|
|
|
|
|
|
|
+ def create_document(self, payload: KnowledgeDocumentCreateRequest) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
|
|
|
|
|
+ return self.indexing_service.create_document(payload)
|
|
|
|
|
|
|
|
- def _is_text_content_type(self, *, content_type: str | None, source_type: str) -> bool:
|
|
|
|
|
- if content_type is not None and content_type.startswith("text/"):
|
|
|
|
|
- return True
|
|
|
|
|
- return source_type.strip().lower() in {"text", "txt", "markdown", "md", "html", "htm", "json", "csv"}
|
|
|
|
|
|
|
+ def create_document_from_contract(self, payload: KnowledgeDocumentCreateRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
|
|
|
|
|
+ return self.indexing_service.create_document_from_contract(payload)
|
|
|
|
|
|
|
|
- def _should_queue_indexing(self, *, async_mode: bool | None) -> bool:
|
|
|
|
|
- if async_mode is False:
|
|
|
|
|
- return False
|
|
|
|
|
- if not self.settings.async_indexing_enabled and async_mode is None:
|
|
|
|
|
- return False
|
|
|
|
|
- return self.task_queue_publisher is not None
|
|
|
|
|
|
|
+ def create_document_from_contract_result(self, payload: KnowledgeDocumentCreateRequestDto) -> KnowledgeDocumentIngestResult:
|
|
|
|
|
+ return self.indexing_service.create_document_from_contract_result(payload)
|
|
|
|
|
|
|
|
- def _publish_document_index_job(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- document_id: str,
|
|
|
|
|
- action: str,
|
|
|
|
|
- job_id: str) -> bool:
|
|
|
|
|
- if self.task_queue_publisher is None:
|
|
|
|
|
- return False
|
|
|
|
|
- return self.task_queue_publisher.publish_knowledge_document(
|
|
|
|
|
- document_id=document_id,
|
|
|
|
|
- action=action,
|
|
|
|
|
- job_id=job_id)
|
|
|
|
|
|
|
+ def create_document_index_job(self, payload: KnowledgeDocumentCreateRequest) -> KnowledgeDocumentIngestResult:
|
|
|
|
|
+ return self.indexing_service.create_document_index_job(payload)
|
|
|
|
|
|
|
|
- def _write_index_job_metadata(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- document: KnowledgeDocument,
|
|
|
|
|
- action: str,
|
|
|
|
|
- job_id: str,
|
|
|
|
|
- status: str,
|
|
|
|
|
- progress: int,
|
|
|
|
|
- chunk_size: int | None = None,
|
|
|
|
|
- chunk_overlap: int | None = None,
|
|
|
|
|
- worker_key: str | None = None,
|
|
|
|
|
- started_time: datetime | None = None,
|
|
|
|
|
- completed_time: datetime | None = None,
|
|
|
|
|
- error_message: str | None = None) -> dict[str, JSONValue]:
|
|
|
|
|
- metadata = dict(document.metadata_json or {})
|
|
|
|
|
- existing_job = self._read_index_job_payload(document=document)
|
|
|
|
|
- queued_time = existing_job.get("queuedTime")
|
|
|
|
|
- if not isinstance(queued_time, str):
|
|
|
|
|
- queued_time = datetime.utcnow().isoformat()
|
|
|
|
|
- resolved_started_time = (
|
|
|
|
|
- started_time.isoformat()
|
|
|
|
|
- if started_time is not None
|
|
|
|
|
- else existing_job.get("startedTime")
|
|
|
|
|
- )
|
|
|
|
|
- resolved_completed_time = (
|
|
|
|
|
- completed_time.isoformat()
|
|
|
|
|
- if completed_time is not None
|
|
|
|
|
- else existing_job.get("completedTime")
|
|
|
|
|
- )
|
|
|
|
|
- job_payload: dict[str, JSONValue] = {
|
|
|
|
|
- "jobId": job_id,
|
|
|
|
|
- "documentId": document.id,
|
|
|
|
|
- "knowledgeBaseId": document.knowledge_base_id,
|
|
|
|
|
- "documentTitle": document.title,
|
|
|
|
|
- "action": action if action in {"index", "reindex"} else "reindex",
|
|
|
|
|
- "status": status,
|
|
|
|
|
- "progress": max(0, min(progress, 100)),
|
|
|
|
|
- "queueName": KNOWLEDGE_DOCUMENT_QUEUE,
|
|
|
|
|
- "workerKey": worker_key,
|
|
|
|
|
- "errorMessage": error_message,
|
|
|
|
|
- "chunkSize": chunk_size,
|
|
|
|
|
- "chunkOverlap": chunk_overlap,
|
|
|
|
|
- "queuedTime": queued_time,
|
|
|
|
|
- "startedTime": resolved_started_time if isinstance(resolved_started_time, str) else None,
|
|
|
|
|
- "completedTime": resolved_completed_time if isinstance(resolved_completed_time, str) else None,
|
|
|
|
|
- }
|
|
|
|
|
- metadata["index_job"] = job_payload
|
|
|
|
|
- return metadata
|
|
|
|
|
|
|
+ def queue_document_indexing(self, *, document: KnowledgeDocument, action: str, chunk_size: int | None = None, chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, KnowledgeIndexJobData]:
|
|
|
|
|
+ return self.indexing_service.queue_document_indexing(
|
|
|
|
|
+ document=document, action=action, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
|
|
|
|
|
|
|
|
- def _read_latest_index_job(self, *, document: KnowledgeDocument) -> KnowledgeIndexJobData:
|
|
|
|
|
- payload = self._read_index_job_payload(document=document)
|
|
|
|
|
- return KnowledgeIndexJobData(
|
|
|
|
|
- jobId=self._read_payload_string(payload, "jobId") or f"kjob_{document.id}",
|
|
|
|
|
- documentId=self._read_payload_string(payload, "documentId") or document.id,
|
|
|
|
|
- knowledgeBaseId=self._read_payload_string(payload, "knowledgeBaseId") or document.knowledge_base_id,
|
|
|
|
|
- documentTitle=self._read_payload_string(payload, "documentTitle") or document.title,
|
|
|
|
|
- action=self._read_job_action(payload.get("action")),
|
|
|
|
|
- status=self._read_job_status(payload.get("status")),
|
|
|
|
|
- progress=self._read_payload_int(payload, "progress", 0),
|
|
|
|
|
- queueName=self._read_payload_string(payload, "queueName"),
|
|
|
|
|
- workerKey=self._read_payload_string(payload, "workerKey"),
|
|
|
|
|
- errorMessage=self._read_payload_string(payload, "errorMessage"),
|
|
|
|
|
- chunkSize=self._read_optional_payload_int(payload, "chunkSize"),
|
|
|
|
|
- chunkOverlap=self._read_optional_payload_int(payload, "chunkOverlap"),
|
|
|
|
|
- queuedTime=self._read_payload_datetime(payload, "queuedTime"),
|
|
|
|
|
- startedTime=self._read_payload_datetime(payload, "startedTime"),
|
|
|
|
|
- completedTime=self._read_payload_datetime(payload, "completedTime"))
|
|
|
|
|
|
|
+ def process_document_index_job(self, *, document_id: str, action: str, worker_key: str, job_id: str | None = None, chunk_size: int | None = None, chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
|
|
|
|
|
+ return self.indexing_service.process_document_index_job(
|
|
|
|
|
+ document_id=document_id, action=action, worker_key=worker_key,
|
|
|
|
|
+ job_id=job_id, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
|
|
|
|
|
|
|
|
- def _try_read_latest_index_job(self, *, document: KnowledgeDocument) -> KnowledgeIndexJobData | None:
|
|
|
|
|
- if not self._read_index_job_payload(document=document):
|
|
|
|
|
- return None
|
|
|
|
|
- return self._read_latest_index_job(document=document)
|
|
|
|
|
|
|
+ def execute_document_index_job(self, *, document_id: str, action: str, worker_key: str, lease_seconds: int, job_id: str | None = None, redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
|
|
|
|
|
+ return self.indexing_service.execute_document_index_job(
|
|
|
|
|
+ document_id=document_id, action=action, worker_key=worker_key,
|
|
|
|
|
+ lease_seconds=lease_seconds, job_id=job_id, redis_client=redis_client)
|
|
|
|
|
|
|
|
- def _read_index_job_payload(self, *, document: KnowledgeDocument) -> dict[str, JSONValue]:
|
|
|
|
|
- metadata = document.metadata_json or {}
|
|
|
|
|
- value = metadata.get("index_job")
|
|
|
|
|
- if isinstance(value, dict):
|
|
|
|
|
- return {str(item_key): item_value for item_key, item_value in value.items()}
|
|
|
|
|
- return {}
|
|
|
|
|
|
|
+ def execute_next_pending_document_job(self, *, worker_key: str, lease_seconds: int, stale_indexing_seconds: int, redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
|
|
|
|
|
+ return self.indexing_service.execute_next_pending_document_job(
|
|
|
|
|
+ worker_key=worker_key, lease_seconds=lease_seconds,
|
|
|
|
|
+ stale_indexing_seconds=stale_indexing_seconds, redis_client=redis_client)
|
|
|
|
|
|
|
|
- def _read_payload_string(
|
|
|
|
|
- self,
|
|
|
|
|
- payload: dict[str, JSONValue],
|
|
|
|
|
- key: str) -> str | None:
|
|
|
|
|
- value = payload.get(key)
|
|
|
|
|
- return value if isinstance(value, str) and value else None
|
|
|
|
|
|
|
+ def list_index_jobs(self, *, knowledge_base_id: str | None = None, document_id: str | None = None, status: str | None = None) -> list[KnowledgeIndexJobData]:
|
|
|
|
|
+ return self.indexing_service.list_index_jobs(
|
|
|
|
|
+ knowledge_base_id=knowledge_base_id, document_id=document_id, status=status)
|
|
|
|
|
|
|
|
- def _read_payload_int(
|
|
|
|
|
- self,
|
|
|
|
|
- payload: dict[str, JSONValue],
|
|
|
|
|
- key: str,
|
|
|
|
|
- fallback: int) -> int:
|
|
|
|
|
- value = payload.get(key)
|
|
|
|
|
- if isinstance(value, int) and not isinstance(value, bool):
|
|
|
|
|
- return value
|
|
|
|
|
- return fallback
|
|
|
|
|
-
|
|
|
|
|
- def _read_optional_payload_int(
|
|
|
|
|
- self,
|
|
|
|
|
- payload: dict[str, JSONValue],
|
|
|
|
|
- key: str) -> int | None:
|
|
|
|
|
- value = payload.get(key)
|
|
|
|
|
- if isinstance(value, int) and not isinstance(value, bool):
|
|
|
|
|
- return value
|
|
|
|
|
- return None
|
|
|
|
|
|
|
+ def detail_index_job(self, *, document_id: str) -> KnowledgeIndexJobData | None:
|
|
|
|
|
+ return self.indexing_service.detail_index_job(document_id=document_id)
|
|
|
|
|
|
|
|
- def _read_payload_datetime(
|
|
|
|
|
- self,
|
|
|
|
|
- payload: dict[str, JSONValue],
|
|
|
|
|
- key: str) -> datetime | None:
|
|
|
|
|
- value = payload.get(key)
|
|
|
|
|
- if not isinstance(value, str) or not value:
|
|
|
|
|
- return None
|
|
|
|
|
- try:
|
|
|
|
|
- return datetime.fromisoformat(value)
|
|
|
|
|
- except ValueError:
|
|
|
|
|
- return None
|
|
|
|
|
|
|
+ def reindex_document(self, payload: KnowledgeDocumentReindexRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
|
|
|
|
|
+ return self.indexing_service.reindex_document(payload)
|
|
|
|
|
|
|
|
- def _read_job_action(self, value: JSONValue) -> KnowledgeIndexJobAction:
|
|
|
|
|
- if isinstance(value, str) and value in {"index", "reindex"}:
|
|
|
|
|
- return cast(KnowledgeIndexJobAction, value)
|
|
|
|
|
- return "reindex"
|
|
|
|
|
|
|
+ def reindex_document_from_contract_result(self, payload: KnowledgeDocumentReindexRequestDto) -> KnowledgeDocumentIngestResult | None:
|
|
|
|
|
+ return self.indexing_service.reindex_document_from_contract_result(payload)
|
|
|
|
|
|
|
|
- def _read_job_status(self, value: JSONValue) -> KnowledgeIndexJobStatus:
|
|
|
|
|
- if isinstance(value, str) and value in {"queued", "running", "completed", "failed", "skipped"}:
|
|
|
|
|
- return cast(KnowledgeIndexJobStatus, value)
|
|
|
|
|
- return "queued"
|
|
|
|
|
|
|
+ def reindex_base_from_contract(self, payload: KnowledgeBaseReindexRequestDto) -> list[KnowledgeIndexJobData]:
|
|
|
|
|
+ return self.indexing_service.reindex_base_from_contract(payload)
|
|
|
|
|
|
|
|
- def _mark_document_failed(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- document: KnowledgeDocument,
|
|
|
|
|
- message: str,
|
|
|
|
|
- job_id: str | None = None,
|
|
|
|
|
- action: str = "reindex",
|
|
|
|
|
- worker_key: str | None = None,
|
|
|
|
|
- chunk_size: int | None = None,
|
|
|
|
|
- chunk_overlap: int | None = None) -> None:
|
|
|
|
|
- metadata = dict(document.metadata_json or {})
|
|
|
|
|
- metadata["last_error"] = {
|
|
|
|
|
- "message": message[:1000],
|
|
|
|
|
- "errorType": "indexing_failed",
|
|
|
|
|
- }
|
|
|
|
|
- if job_id is not None:
|
|
|
|
|
- metadata = self._write_index_job_metadata(
|
|
|
|
|
- document=document,
|
|
|
|
|
- action=action,
|
|
|
|
|
- job_id=job_id,
|
|
|
|
|
- status="failed",
|
|
|
|
|
- progress=100,
|
|
|
|
|
- chunk_size=chunk_size,
|
|
|
|
|
- chunk_overlap=chunk_overlap,
|
|
|
|
|
- worker_key=worker_key,
|
|
|
|
|
- completed_time=datetime.utcnow(),
|
|
|
|
|
- error_message=message[:1000])
|
|
|
|
|
- self.document_repository.update(
|
|
|
|
|
- document_id=document.id,
|
|
|
|
|
- status="failed",
|
|
|
|
|
- metadata_json=metadata)
|
|
|
|
|
|
|
+ # ── Search ────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
- def _guess_content_type(self, *, source_type: str) -> str:
|
|
|
|
|
- normalized = source_type.strip().lower().removeprefix(".")
|
|
|
|
|
- if normalized in {"markdown", "md"}:
|
|
|
|
|
- return "text/markdown; charset=utf-8"
|
|
|
|
|
- if normalized in {"html", "htm"}:
|
|
|
|
|
- return "text/html; charset=utf-8"
|
|
|
|
|
- if normalized == "json":
|
|
|
|
|
- return "application/json"
|
|
|
|
|
- if normalized == "csv":
|
|
|
|
|
- return "text/csv; charset=utf-8"
|
|
|
|
|
- if normalized == "pdf":
|
|
|
|
|
- return "application/pdf"
|
|
|
|
|
- if normalized in {"docx", "word"}:
|
|
|
|
|
- return "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
|
|
|
|
|
- return "text/plain; charset=utf-8"
|
|
|
|
|
|
|
+ def search(self, payload: KnowledgeSearchRequest) -> list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]]:
|
|
|
|
|
+ return self.search_service.search(payload)
|
|
|
|
|
|
|
|
- def _matches_filters(
|
|
|
|
|
- self,
|
|
|
|
|
- *,
|
|
|
|
|
- document: KnowledgeDocument,
|
|
|
|
|
- filters_json: dict[str, JSONValue]) -> bool:
|
|
|
|
|
- source_type = filters_json.get("sourceType") or filters_json.get("source_type")
|
|
|
|
|
- if isinstance(source_type, str) and document.source_type != source_type:
|
|
|
|
|
- return False
|
|
|
|
|
- status = filters_json.get("status")
|
|
|
|
|
- if isinstance(status, str) and document.status != status:
|
|
|
|
|
- return False
|
|
|
|
|
- return True
|
|
|
|
|
|
|
+ # ── Settings ──────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
def read_settings(self, *, knowledge_base_id: str | None = None) -> KnowledgeSettingsDto:
|
|
def read_settings(self, *, knowledge_base_id: str | None = None) -> KnowledgeSettingsDto:
|
|
|
- base_config: dict[str, JSONValue] = {}
|
|
|
|
|
- if knowledge_base_id:
|
|
|
|
|
- base = self.base_repository.get_by_id(knowledge_base_id=knowledge_base_id)
|
|
|
|
|
- if base is not None and isinstance(base.metadata_json, dict):
|
|
|
|
|
- value = base.metadata_json.get("retrieval_config")
|
|
|
|
|
- if isinstance(value, dict):
|
|
|
|
|
- base_config = value
|
|
|
|
|
- defaults = KnowledgeSettingsDto(
|
|
|
|
|
- knowledgeBaseId=knowledge_base_id,
|
|
|
|
|
- chunkSize=self.settings.default_chunk_size,
|
|
|
|
|
- chunkOverlap=self.settings.default_chunk_overlap,
|
|
|
|
|
- keywordWeight=self.settings.retrieval_keyword_weight,
|
|
|
|
|
- vectorWeight=self.settings.retrieval_vector_weight,
|
|
|
|
|
- rerankWeight=self.settings.retrieval_rerank_weight,
|
|
|
|
|
- queryRewrite=False,
|
|
|
|
|
- requireCitations=True)
|
|
|
|
|
- return KnowledgeSettingsDto.model_validate({
|
|
|
|
|
- **defaults.model_dump(),
|
|
|
|
|
- **base_config,
|
|
|
|
|
- "knowledgeBaseId": knowledge_base_id,
|
|
|
|
|
- })
|
|
|
|
|
|
|
+ return self.settings_service.read_settings(knowledge_base_id=knowledge_base_id)
|
|
|
|
|
|
|
|
- def update_settings(
|
|
|
|
|
- self,
|
|
|
|
|
- payload: KnowledgeSettingsUpdateRequestDto) -> KnowledgeSettingsDto:
|
|
|
|
|
- settings = KnowledgeSettingsDto.model_validate({
|
|
|
|
|
- **payload.model_dump(),
|
|
|
|
|
- "knowledgeBaseId": payload.knowledgeBaseId,
|
|
|
|
|
- })
|
|
|
|
|
- if payload.knowledgeBaseId:
|
|
|
|
|
- base = self.base_repository.get_by_id(knowledge_base_id=payload.knowledgeBaseId)
|
|
|
|
|
- if base is not None:
|
|
|
|
|
- metadata = dict(base.metadata_json or {})
|
|
|
|
|
- metadata["retrieval_config"] = settings.model_dump(exclude={"knowledgeBaseId"})
|
|
|
|
|
- self.base_repository.update(
|
|
|
|
|
- knowledge_base_id=payload.knowledgeBaseId,
|
|
|
|
|
- metadata_json=metadata)
|
|
|
|
|
- return settings
|
|
|
|
|
-
|
|
|
|
|
- def _build_base_code(self, name: str) -> str:
|
|
|
|
|
- base = "".join(
|
|
|
|
|
- char.lower() if char.isalnum() else "_"
|
|
|
|
|
- for char in name
|
|
|
|
|
- ).strip("_") or "knowledge_base"
|
|
|
|
|
- return base[:64]
|
|
|
|
|
|
|
+ def update_settings(self, payload: KnowledgeSettingsUpdateRequestDto) -> KnowledgeSettingsDto:
|
|
|
|
|
+ return self.settings_service.update_settings(payload)
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_knowledge_application_service(
|
|
def build_knowledge_application_service(
|
|
|
*,
|
|
*,
|
|
|
- db: Session,
|
|
|
|
|
- settings: KnowledgeServiceSettings) -> KnowledgeApplicationService:
|
|
|
|
|
|
|
+ db,
|
|
|
|
|
+ settings: KnowledgeServiceSettings,
|
|
|
|
|
+) -> KnowledgeApplicationService:
|
|
|
|
|
+ from app.domain.repositories import (
|
|
|
|
|
+ KnowledgeBaseRepository,
|
|
|
|
|
+ KnowledgeChunkRepository,
|
|
|
|
|
+ KnowledgeDocumentRepository,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
redis_client = try_build_redis_client(settings.redis_url)
|
|
redis_client = try_build_redis_client(settings.redis_url)
|
|
|
return KnowledgeApplicationService(
|
|
return KnowledgeApplicationService(
|
|
|
settings=settings,
|
|
settings=settings,
|