|
|
@@ -1,9 +1,23 @@
|
|
|
-from core_shared import JSONValue
|
|
|
+from __future__ import annotations
|
|
|
+
|
|
|
+import base64
|
|
|
+import hashlib
|
|
|
+from dataclasses import dataclass
|
|
|
+from datetime import datetime, timedelta
|
|
|
+from typing import TYPE_CHECKING, cast
|
|
|
+from uuid import uuid4
|
|
|
+
|
|
|
+from sqlalchemy.orm import Session
|
|
|
+
|
|
|
+from core_shared import JSONValue, try_build_redis_client
|
|
|
+from core_shared.task_queue import KNOWLEDGE_DOCUMENT_QUEUE, TaskQueuePublisher
|
|
|
|
|
|
from app.application.document_parsers import (
|
|
|
DocumentParseError,
|
|
|
ParsedDocument,
|
|
|
- parse_document_content)
|
|
|
+ normalize_source_type,
|
|
|
+ parse_document_content,
|
|
|
+ read_document_content_bytes)
|
|
|
from app.application.embeddings import EmbeddingService
|
|
|
from app.application.retrieval import (
|
|
|
build_chunk_payloads,
|
|
|
@@ -17,13 +31,48 @@ from app.domain.repositories import (
|
|
|
KnowledgeBaseRepository,
|
|
|
KnowledgeChunkRepository,
|
|
|
KnowledgeDocumentRepository)
|
|
|
+from app.infrastructure.object_storage import (
|
|
|
+ KnowledgeObjectStorage,
|
|
|
+ ObjectStorageError,
|
|
|
+ ObjectStorageNotFoundError,
|
|
|
+ ObjectStorageStatus,
|
|
|
+ build_document_object_key,
|
|
|
+ build_object_storage)
|
|
|
from app.schemas.knowledge import (
|
|
|
KnowledgeBaseCreateRequest,
|
|
|
+ KnowledgeBaseCreateRequestDto,
|
|
|
KnowledgeBaseStatusUpdateRequest,
|
|
|
+ KnowledgeBaseUpdateRequestDto,
|
|
|
+ KnowledgeBaseReindexRequestDto,
|
|
|
KnowledgeDocumentCreateRequest,
|
|
|
+ KnowledgeDocumentCreateRequestDto,
|
|
|
+ KnowledgeIndexJobAction,
|
|
|
+ KnowledgeIndexJobData,
|
|
|
+ KnowledgeIndexJobStatus,
|
|
|
KnowledgeDocumentParseRequest,
|
|
|
+ KnowledgeDocumentReindexRequestDto,
|
|
|
+ KnowledgeDocumentUpdateRequestDto,
|
|
|
+ KnowledgeSettingsDto,
|
|
|
+ KnowledgeSettingsUpdateRequestDto,
|
|
|
KnowledgeSearchRequest)
|
|
|
|
|
|
+if TYPE_CHECKING:
|
|
|
+ from redis import Redis
|
|
|
+
|
|
|
+
|
|
|
+@dataclass(frozen=True, slots=True)
|
|
|
+class KnowledgeDocumentIngestResult:
|
|
|
+ document: KnowledgeDocument
|
|
|
+ chunks: list[KnowledgeChunk]
|
|
|
+ queued: bool = False
|
|
|
+ job: KnowledgeIndexJobData | None = None
|
|
|
+
|
|
|
+
|
|
|
+class KnowledgeIndexingError(RuntimeError):
|
|
|
+ def __init__(self, *, document_id: str, message: str) -> None:
|
|
|
+ super().__init__(message)
|
|
|
+ self.document_id = document_id
|
|
|
+
|
|
|
|
|
|
class KnowledgeApplicationService:
|
|
|
def __init__(
|
|
|
@@ -32,12 +81,24 @@ class KnowledgeApplicationService:
|
|
|
settings: KnowledgeServiceSettings,
|
|
|
base_repository: KnowledgeBaseRepository,
|
|
|
document_repository: KnowledgeDocumentRepository,
|
|
|
- chunk_repository: KnowledgeChunkRepository) -> None:
|
|
|
+ chunk_repository: KnowledgeChunkRepository,
|
|
|
+ object_storage: KnowledgeObjectStorage | None = None,
|
|
|
+ redis_client: Redis | None = None,
|
|
|
+ task_queue_publisher: TaskQueuePublisher | None = None) -> None:
|
|
|
self.settings = settings
|
|
|
self.base_repository = base_repository
|
|
|
self.document_repository = document_repository
|
|
|
self.chunk_repository = chunk_repository
|
|
|
self.embedding_service = EmbeddingService(settings=settings)
|
|
|
+ self._object_storage = object_storage
|
|
|
+ self.redis_client = redis_client
|
|
|
+ self.task_queue_publisher = task_queue_publisher
|
|
|
+
|
|
|
+ @property
|
|
|
+ def object_storage(self) -> KnowledgeObjectStorage:
|
|
|
+ if self._object_storage is None:
|
|
|
+ self._object_storage = build_object_storage(self.settings)
|
|
|
+ return self._object_storage
|
|
|
|
|
|
def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase:
|
|
|
return self.base_repository.create(
|
|
|
@@ -46,9 +107,48 @@ class KnowledgeApplicationService:
|
|
|
description=payload.description,
|
|
|
metadata_json=payload.metadata_json)
|
|
|
|
|
|
+ def create_base_from_contract(
|
|
|
+ self,
|
|
|
+ payload: KnowledgeBaseCreateRequestDto) -> KnowledgeBase:
|
|
|
+ return self.create_base(
|
|
|
+ KnowledgeBaseCreateRequest(
|
|
|
+ code=self._build_base_code(payload.name),
|
|
|
+ name=payload.name,
|
|
|
+ description=payload.description,
|
|
|
+ metadata_json=payload.metadata))
|
|
|
+
|
|
|
def list_bases(self) -> list[KnowledgeBase]:
|
|
|
return self.base_repository.list_all()
|
|
|
|
|
|
+ def list_bases_filtered(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ keyword: str | None = None,
|
|
|
+ status: str | None = None) -> list[KnowledgeBase]:
|
|
|
+ return self.base_repository.list_filtered(
|
|
|
+ keyword=keyword,
|
|
|
+ status=status)
|
|
|
+
|
|
|
+ def update_base_from_contract(
|
|
|
+ self,
|
|
|
+ payload: KnowledgeBaseUpdateRequestDto) -> KnowledgeBase | None:
|
|
|
+ return self.base_repository.update(
|
|
|
+ knowledge_base_id=payload.knowledgeBaseId,
|
|
|
+ name=payload.name,
|
|
|
+ description=payload.description,
|
|
|
+ status=payload.status,
|
|
|
+ metadata_json=payload.metadata)
|
|
|
+
|
|
|
+ def delete_base(self, *, knowledge_base_id: str) -> bool:
|
|
|
+ documents = self.document_repository.list_by_base(
|
|
|
+ knowledge_base_id=knowledge_base_id)
|
|
|
+ for document in documents:
|
|
|
+ self._delete_document_object(document=document)
|
|
|
+ self.chunk_repository.delete_by_base(knowledge_base_id=knowledge_base_id)
|
|
|
+ for document in documents:
|
|
|
+ self.document_repository.delete(document_id=document.id)
|
|
|
+ return self.base_repository.delete(knowledge_base_id=knowledge_base_id)
|
|
|
+
|
|
|
def update_base_status(
|
|
|
self,
|
|
|
*,
|
|
|
@@ -73,27 +173,143 @@ class KnowledgeApplicationService:
|
|
|
content_text=payload.content_text,
|
|
|
content_base64=payload.content_base64)
|
|
|
)
|
|
|
- metadata_json = {
|
|
|
- **payload.metadata_json,
|
|
|
- "parser_metadata": parsed.metadata_json,
|
|
|
- }
|
|
|
- document = self.document_repository.create(
|
|
|
+ raw_content = read_document_content_bytes(
|
|
|
+ content_text=payload.content_text,
|
|
|
+ content_base64=payload.content_base64)
|
|
|
+ object_key = build_document_object_key(
|
|
|
knowledge_base_id=payload.knowledge_base_id,
|
|
|
- title=payload.title,
|
|
|
source_type=parsed.source_type,
|
|
|
- source_uri=payload.source_uri,
|
|
|
- content_text=parsed.content_text,
|
|
|
- content_hash=stable_content_hash(parsed.content_text),
|
|
|
- metadata_json=metadata_json)
|
|
|
- chunks = self._index_document(
|
|
|
+ title=payload.title)
|
|
|
+ stored_object = self.object_storage.put_bytes(
|
|
|
+ object_key=object_key,
|
|
|
+ content=raw_content,
|
|
|
+ content_type=self._guess_content_type(source_type=parsed.source_type))
|
|
|
+ document: KnowledgeDocument | None = None
|
|
|
+ try:
|
|
|
+ metadata_json = {
|
|
|
+ **payload.metadata_json,
|
|
|
+ "parser_metadata": parsed.metadata_json,
|
|
|
+ "object_storage": stored_object.to_metadata(),
|
|
|
+ }
|
|
|
+ document = self.document_repository.create(
|
|
|
+ knowledge_base_id=payload.knowledge_base_id,
|
|
|
+ title=payload.title,
|
|
|
+ source_type=parsed.source_type,
|
|
|
+ source_uri=payload.source_uri,
|
|
|
+ content_text="",
|
|
|
+ content_hash=stable_content_hash(parsed.content_text),
|
|
|
+ metadata_json=metadata_json)
|
|
|
+ try:
|
|
|
+ chunks = self._index_document(
|
|
|
+ document=document,
|
|
|
+ content_text=parsed.content_text,
|
|
|
+ chunk_size=payload.chunk_size,
|
|
|
+ chunk_overlap=payload.chunk_overlap)
|
|
|
+ except Exception as exc:
|
|
|
+ self._mark_document_failed(document=document, message=str(exc))
|
|
|
+ raise KnowledgeIndexingError(
|
|
|
+ document_id=document.id,
|
|
|
+ message=f"knowledge document indexing failed: {exc}") from exc
|
|
|
+ indexed_document = self.document_repository.update_status(
|
|
|
+ document_id=document.id,
|
|
|
+ status="indexed")
|
|
|
+ return indexed_document or document, chunks
|
|
|
+ except Exception:
|
|
|
+ if document is None:
|
|
|
+ try:
|
|
|
+ self.object_storage.delete_object(object_key=stored_object.object_key)
|
|
|
+ except ObjectStorageError:
|
|
|
+ pass
|
|
|
+ raise
|
|
|
+
|
|
|
+ def create_document_from_contract(
|
|
|
+ self,
|
|
|
+ payload: KnowledgeDocumentCreateRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
|
|
|
+ return self.create_document(
|
|
|
+ KnowledgeDocumentCreateRequest(
|
|
|
+ knowledge_base_id=payload.knowledgeBaseId,
|
|
|
+ title=payload.title,
|
|
|
+ content_text=payload.contentText,
|
|
|
+ content_base64=payload.contentBase64,
|
|
|
+ source_type=payload.sourceType,
|
|
|
+ source_uri=payload.sourceUri,
|
|
|
+ metadata_json=payload.metadata,
|
|
|
+ chunk_size=payload.chunkSize,
|
|
|
+ chunk_overlap=payload.chunkOverlap))
|
|
|
+
|
|
|
+ def create_document_from_contract_result(
|
|
|
+ self,
|
|
|
+ payload: KnowledgeDocumentCreateRequestDto) -> KnowledgeDocumentIngestResult:
|
|
|
+ request = KnowledgeDocumentCreateRequest(
|
|
|
+ knowledge_base_id=payload.knowledgeBaseId,
|
|
|
+ title=payload.title,
|
|
|
+ content_text=payload.contentText,
|
|
|
+ content_base64=payload.contentBase64,
|
|
|
+ source_type=payload.sourceType,
|
|
|
+ source_uri=payload.sourceUri,
|
|
|
+ metadata_json=payload.metadata,
|
|
|
+ chunk_size=payload.chunkSize,
|
|
|
+ chunk_overlap=payload.chunkOverlap)
|
|
|
+ if self._should_queue_indexing(async_mode=payload.asyncMode):
|
|
|
+ return self.create_document_index_job(payload=request)
|
|
|
+ document, chunks = self.create_document(request)
|
|
|
+ return KnowledgeDocumentIngestResult(
|
|
|
document=document,
|
|
|
- content_text=parsed.content_text,
|
|
|
- chunk_size=payload.chunk_size,
|
|
|
- chunk_overlap=payload.chunk_overlap)
|
|
|
- indexed_document = self.document_repository.update_status(
|
|
|
- document_id=document.id,
|
|
|
- status="indexed")
|
|
|
- return indexed_document or document, chunks
|
|
|
+ chunks=chunks)
|
|
|
+
|
|
|
+ def create_document_index_job(
|
|
|
+ self,
|
|
|
+ payload: KnowledgeDocumentCreateRequest) -> KnowledgeDocumentIngestResult:
|
|
|
+ knowledge_base = self.base_repository.get_by_id(
|
|
|
+ knowledge_base_id=payload.knowledge_base_id)
|
|
|
+ if knowledge_base is None:
|
|
|
+ raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
|
|
|
+
|
|
|
+ raw_content = read_document_content_bytes(
|
|
|
+ content_text=payload.content_text,
|
|
|
+ content_base64=payload.content_base64)
|
|
|
+ source_type = normalize_source_type(
|
|
|
+ source_type=payload.source_type,
|
|
|
+ source_uri=payload.source_uri)
|
|
|
+ object_key = build_document_object_key(
|
|
|
+ knowledge_base_id=payload.knowledge_base_id,
|
|
|
+ source_type=source_type,
|
|
|
+ title=payload.title)
|
|
|
+ stored_object = self.object_storage.put_bytes(
|
|
|
+ object_key=object_key,
|
|
|
+ content=raw_content,
|
|
|
+ content_type=self._guess_content_type(source_type=source_type))
|
|
|
+ document: KnowledgeDocument | None = None
|
|
|
+ try:
|
|
|
+ document = self.document_repository.create(
|
|
|
+ knowledge_base_id=payload.knowledge_base_id,
|
|
|
+ title=payload.title,
|
|
|
+ source_type=source_type,
|
|
|
+ source_uri=payload.source_uri,
|
|
|
+ content_text="",
|
|
|
+ content_hash=hashlib.sha256(raw_content).hexdigest(),
|
|
|
+ metadata_json={
|
|
|
+ **payload.metadata_json,
|
|
|
+ "object_storage": stored_object.to_metadata(),
|
|
|
+ },
|
|
|
+ status="draft")
|
|
|
+ queued_document, job = self.queue_document_indexing(
|
|
|
+ document=document,
|
|
|
+ action="index",
|
|
|
+ chunk_size=payload.chunk_size,
|
|
|
+ chunk_overlap=payload.chunk_overlap)
|
|
|
+ except Exception:
|
|
|
+ if document is None:
|
|
|
+ try:
|
|
|
+ self.object_storage.delete_object(object_key=stored_object.object_key)
|
|
|
+ except ObjectStorageError:
|
|
|
+ pass
|
|
|
+ raise
|
|
|
+ return KnowledgeDocumentIngestResult(
|
|
|
+ document=queued_document,
|
|
|
+ chunks=[],
|
|
|
+ queued=True,
|
|
|
+ job=job)
|
|
|
|
|
|
def parse_document(self, payload: KnowledgeDocumentParseRequest) -> ParsedDocument:
|
|
|
try:
|
|
|
@@ -105,6 +321,57 @@ class KnowledgeApplicationService:
|
|
|
except DocumentParseError:
|
|
|
raise
|
|
|
|
|
|
+ def queue_document_indexing(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ document: KnowledgeDocument,
|
|
|
+ action: str,
|
|
|
+ chunk_size: int | None = None,
|
|
|
+ chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, KnowledgeIndexJobData]:
|
|
|
+ job_id = f"kjob_{uuid4().hex}"
|
|
|
+ metadata = self._write_index_job_metadata(
|
|
|
+ document=document,
|
|
|
+ action=action,
|
|
|
+ job_id=job_id,
|
|
|
+ status="queued",
|
|
|
+ progress=0,
|
|
|
+ chunk_size=chunk_size,
|
|
|
+ chunk_overlap=chunk_overlap)
|
|
|
+ updated_document = self.document_repository.update(
|
|
|
+ document_id=document.id,
|
|
|
+ status="queued",
|
|
|
+ metadata_json=metadata)
|
|
|
+ document_for_job = updated_document or document
|
|
|
+ published = self._publish_document_index_job(
|
|
|
+ document_id=document.id,
|
|
|
+ action=action,
|
|
|
+ job_id=job_id)
|
|
|
+ if not published:
|
|
|
+ metadata = self._write_index_job_metadata(
|
|
|
+ document=document_for_job,
|
|
|
+ action=action,
|
|
|
+ job_id=job_id,
|
|
|
+ status="running",
|
|
|
+ progress=1,
|
|
|
+ chunk_size=chunk_size,
|
|
|
+ chunk_overlap=chunk_overlap,
|
|
|
+ worker_key="inline-fallback")
|
|
|
+ document_for_job = self.document_repository.update(
|
|
|
+ document_id=document.id,
|
|
|
+ status="indexing",
|
|
|
+ metadata_json=metadata) or document_for_job
|
|
|
+ processed = self.process_document_index_job(
|
|
|
+ document_id=document.id,
|
|
|
+ action=action,
|
|
|
+ job_id=job_id,
|
|
|
+ worker_key="inline-fallback",
|
|
|
+ chunk_size=chunk_size,
|
|
|
+ chunk_overlap=chunk_overlap)
|
|
|
+ if processed is not None:
|
|
|
+ indexed_document, chunks = processed
|
|
|
+ return indexed_document, self._read_latest_index_job(document=indexed_document)
|
|
|
+ return document_for_job, self._read_latest_index_job(document=document_for_job)
|
|
|
+
|
|
|
def list_documents(
|
|
|
self,
|
|
|
*,
|
|
|
@@ -112,6 +379,378 @@ class KnowledgeApplicationService:
|
|
|
return self.document_repository.list_by_base(
|
|
|
knowledge_base_id=knowledge_base_id)
|
|
|
|
|
|
+ def list_documents_filtered(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ knowledge_base_id: str | None = None,
|
|
|
+ keyword: str | None = None,
|
|
|
+ status: str | None = None,
|
|
|
+ source_type: str | None = None) -> list[KnowledgeDocument]:
|
|
|
+ return self.document_repository.list_filtered(
|
|
|
+ knowledge_base_id=knowledge_base_id,
|
|
|
+ keyword=keyword,
|
|
|
+ status=status,
|
|
|
+ source_type=source_type)
|
|
|
+
|
|
|
+ def update_document_from_contract(
|
|
|
+ self,
|
|
|
+ payload: KnowledgeDocumentUpdateRequestDto) -> KnowledgeDocument | None:
|
|
|
+ return self.document_repository.update(
|
|
|
+ document_id=payload.documentId,
|
|
|
+ title=payload.title,
|
|
|
+ source_uri=payload.sourceUri,
|
|
|
+ status=payload.status,
|
|
|
+ metadata_json=payload.metadata)
|
|
|
+
|
|
|
+ def reindex_document(
|
|
|
+ self,
|
|
|
+ payload: KnowledgeDocumentReindexRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
|
|
|
+ document = self.document_repository.get_by_id(document_id=payload.documentId)
|
|
|
+ if document is None:
|
|
|
+ return None
|
|
|
+ try:
|
|
|
+ parsed = self._parse_document_for_indexing(document=document)
|
|
|
+ chunks = self._index_document(
|
|
|
+ document=document,
|
|
|
+ content_text=parsed.content_text,
|
|
|
+ chunk_size=payload.chunkSize,
|
|
|
+ chunk_overlap=payload.chunkOverlap)
|
|
|
+ except Exception as exc:
|
|
|
+ self._mark_document_failed(document=document, message=str(exc))
|
|
|
+ raise KnowledgeIndexingError(
|
|
|
+ document_id=document.id,
|
|
|
+ message=f"knowledge document reindex failed: {exc}") from exc
|
|
|
+ metadata = dict(document.metadata_json or {})
|
|
|
+ metadata["parser_metadata"] = parsed.metadata_json
|
|
|
+ indexed_document = self.document_repository.update(
|
|
|
+ document_id=document.id,
|
|
|
+ status="indexed",
|
|
|
+ metadata_json=metadata)
|
|
|
+ return indexed_document or document, chunks
|
|
|
+
|
|
|
+ def reindex_document_from_contract_result(
|
|
|
+ self,
|
|
|
+ payload: KnowledgeDocumentReindexRequestDto) -> KnowledgeDocumentIngestResult | None:
|
|
|
+ if self._should_queue_indexing(async_mode=payload.asyncMode):
|
|
|
+ document = self.document_repository.get_by_id(document_id=payload.documentId)
|
|
|
+ if document is None:
|
|
|
+ return None
|
|
|
+ queued_document, job = self.queue_document_indexing(
|
|
|
+ document=document,
|
|
|
+ action="reindex",
|
|
|
+ chunk_size=payload.chunkSize,
|
|
|
+ chunk_overlap=payload.chunkOverlap)
|
|
|
+ return KnowledgeDocumentIngestResult(
|
|
|
+ document=queued_document,
|
|
|
+ chunks=[],
|
|
|
+ queued=True,
|
|
|
+ job=job)
|
|
|
+ result = self.reindex_document(payload)
|
|
|
+ if result is None:
|
|
|
+ return None
|
|
|
+ document, chunks = result
|
|
|
+ return KnowledgeDocumentIngestResult(
|
|
|
+ document=document,
|
|
|
+ chunks=chunks)
|
|
|
+
|
|
|
+ def reindex_base_from_contract(
|
|
|
+ self,
|
|
|
+ payload: KnowledgeBaseReindexRequestDto) -> list[KnowledgeIndexJobData]:
|
|
|
+ documents = self.document_repository.list_by_base(
|
|
|
+ knowledge_base_id=payload.knowledgeBaseId)
|
|
|
+ jobs: list[KnowledgeIndexJobData] = []
|
|
|
+ for document in documents:
|
|
|
+ if document.status == "archived":
|
|
|
+ continue
|
|
|
+ queued_document, job = self.queue_document_indexing(
|
|
|
+ document=document,
|
|
|
+ action="reindex",
|
|
|
+ chunk_size=payload.chunkSize,
|
|
|
+ chunk_overlap=payload.chunkOverlap)
|
|
|
+ jobs.append(job or self._read_latest_index_job(document=queued_document))
|
|
|
+ return jobs
|
|
|
+
|
|
|
+ def process_document_index_job(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ document_id: str,
|
|
|
+ action: str,
|
|
|
+ worker_key: str,
|
|
|
+ job_id: str | None = None,
|
|
|
+ chunk_size: int | None = None,
|
|
|
+ chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
|
|
|
+ document = self.document_repository.get_by_id(document_id=document_id)
|
|
|
+ if document is None:
|
|
|
+ return None
|
|
|
+ resolved_job = self._read_latest_index_job(document=document)
|
|
|
+ resolved_job_id = job_id or resolved_job.jobId
|
|
|
+ resolved_chunk_size = chunk_size if chunk_size is not None else resolved_job.chunkSize
|
|
|
+ resolved_chunk_overlap = chunk_overlap if chunk_overlap is not None else resolved_job.chunkOverlap
|
|
|
+ if document.status == "archived":
|
|
|
+ metadata = self._write_index_job_metadata(
|
|
|
+ document=document,
|
|
|
+ action=action,
|
|
|
+ job_id=resolved_job_id,
|
|
|
+ status="skipped",
|
|
|
+ progress=100,
|
|
|
+ chunk_size=resolved_chunk_size,
|
|
|
+ chunk_overlap=resolved_chunk_overlap,
|
|
|
+ worker_key=worker_key,
|
|
|
+ completed_time=datetime.utcnow(),
|
|
|
+ error_message="document is archived")
|
|
|
+ skipped_document = self.document_repository.update(
|
|
|
+ document_id=document.id,
|
|
|
+ metadata_json=metadata) or document
|
|
|
+ return skipped_document, []
|
|
|
+
|
|
|
+ metadata = self._write_index_job_metadata(
|
|
|
+ document=document,
|
|
|
+ action=action,
|
|
|
+ job_id=resolved_job_id,
|
|
|
+ status="running",
|
|
|
+ progress=10,
|
|
|
+ chunk_size=resolved_chunk_size,
|
|
|
+ chunk_overlap=resolved_chunk_overlap,
|
|
|
+ worker_key=worker_key,
|
|
|
+ started_time=datetime.utcnow())
|
|
|
+ running_document = self.document_repository.update(
|
|
|
+ document_id=document.id,
|
|
|
+ status="indexing",
|
|
|
+ metadata_json=metadata) or document
|
|
|
+ try:
|
|
|
+ parsed = self._parse_document_for_indexing(document=running_document)
|
|
|
+ metadata = self._write_index_job_metadata(
|
|
|
+ document=running_document,
|
|
|
+ action=action,
|
|
|
+ job_id=resolved_job_id,
|
|
|
+ status="running",
|
|
|
+ progress=40,
|
|
|
+ chunk_size=resolved_chunk_size,
|
|
|
+ chunk_overlap=resolved_chunk_overlap,
|
|
|
+ worker_key=worker_key,
|
|
|
+ started_time=self._read_latest_index_job(document=running_document).startedTime)
|
|
|
+ metadata["parser_metadata"] = parsed.metadata_json
|
|
|
+ running_document = self.document_repository.update(
|
|
|
+ document_id=document.id,
|
|
|
+ status="indexing",
|
|
|
+ metadata_json=metadata) or running_document
|
|
|
+ chunks = self._index_document(
|
|
|
+ document=running_document,
|
|
|
+ content_text=parsed.content_text,
|
|
|
+ chunk_size=resolved_chunk_size,
|
|
|
+ chunk_overlap=resolved_chunk_overlap)
|
|
|
+ except Exception as exc:
|
|
|
+ self._mark_document_failed(
|
|
|
+ document=running_document,
|
|
|
+ message=str(exc),
|
|
|
+ job_id=resolved_job_id,
|
|
|
+ action=action,
|
|
|
+ worker_key=worker_key,
|
|
|
+ chunk_size=resolved_chunk_size,
|
|
|
+ chunk_overlap=resolved_chunk_overlap)
|
|
|
+ raise KnowledgeIndexingError(
|
|
|
+ document_id=document.id,
|
|
|
+ message=f"knowledge document {action} failed: {exc}") from exc
|
|
|
+
|
|
|
+ metadata = self._write_index_job_metadata(
|
|
|
+ document=running_document,
|
|
|
+ action=action,
|
|
|
+ job_id=resolved_job_id,
|
|
|
+ status="completed",
|
|
|
+ progress=100,
|
|
|
+ chunk_size=resolved_chunk_size,
|
|
|
+ chunk_overlap=resolved_chunk_overlap,
|
|
|
+ worker_key=worker_key,
|
|
|
+ completed_time=datetime.utcnow())
|
|
|
+ metadata["parser_metadata"] = parsed.metadata_json
|
|
|
+ indexed_document = self.document_repository.update(
|
|
|
+ document_id=document.id,
|
|
|
+ status="indexed",
|
|
|
+ metadata_json=metadata)
|
|
|
+ return indexed_document or running_document, chunks
|
|
|
+
|
|
|
+ def execute_document_index_job(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ document_id: str,
|
|
|
+ action: str,
|
|
|
+ worker_key: str,
|
|
|
+ lease_seconds: int,
|
|
|
+ job_id: str | None = None,
|
|
|
+ redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
|
|
|
+ resolved_redis_client = redis_client or self.redis_client
|
|
|
+ idempotency_key = f"{document_id}:{job_id or action}"
|
|
|
+ lock = None
|
|
|
+ idempotency_store = None
|
|
|
+ if resolved_redis_client is not None:
|
|
|
+ from core_shared.redis_primitives import DistributedLock, IdempotencyStore
|
|
|
+
|
|
|
+ lock = DistributedLock(
|
|
|
+ client=resolved_redis_client,
|
|
|
+ name=f"knowledge-document:{document_id}:lock",
|
|
|
+ ttl_seconds=lease_seconds)
|
|
|
+ if not lock.acquire():
|
|
|
+ return None
|
|
|
+ idempotency_store = IdempotencyStore(
|
|
|
+ client=resolved_redis_client,
|
|
|
+ prefix="knowledge-document-idempotency")
|
|
|
+ if not idempotency_store.begin(key=idempotency_key):
|
|
|
+ lock.release()
|
|
|
+ return None
|
|
|
+ try:
|
|
|
+ result = self.process_document_index_job(
|
|
|
+ document_id=document_id,
|
|
|
+ action=action,
|
|
|
+ job_id=job_id,
|
|
|
+ worker_key=worker_key)
|
|
|
+ if idempotency_store is not None and result is not None:
|
|
|
+ document, chunks = result
|
|
|
+ idempotency_store.complete(
|
|
|
+ key=idempotency_key,
|
|
|
+ result={
|
|
|
+ "status": document.status,
|
|
|
+ "document_id": document.id,
|
|
|
+ "chunk_count": len(chunks),
|
|
|
+ })
|
|
|
+ except Exception:
|
|
|
+ if idempotency_store is not None:
|
|
|
+ idempotency_store.clear(key=idempotency_key)
|
|
|
+ raise
|
|
|
+ finally:
|
|
|
+ if lock is not None:
|
|
|
+ lock.release()
|
|
|
+ return result
|
|
|
+
|
|
|
+ def execute_next_pending_document_job(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ worker_key: str,
|
|
|
+ lease_seconds: int,
|
|
|
+ stale_indexing_seconds: int,
|
|
|
+ redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
|
|
|
+ stale_before = datetime.utcnow() - timedelta(seconds=stale_indexing_seconds)
|
|
|
+ document = self.document_repository.get_next_pending_indexing(
|
|
|
+ stale_before=stale_before)
|
|
|
+ if document is None:
|
|
|
+ return None
|
|
|
+ job = self._read_latest_index_job(document=document)
|
|
|
+ return self.execute_document_index_job(
|
|
|
+ document_id=document.id,
|
|
|
+ action=job.action,
|
|
|
+ job_id=job.jobId,
|
|
|
+ worker_key=worker_key,
|
|
|
+ lease_seconds=lease_seconds,
|
|
|
+ redis_client=redis_client)
|
|
|
+
|
|
|
+ def list_index_jobs(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ knowledge_base_id: str | None = None,
|
|
|
+ document_id: str | None = None,
|
|
|
+ status: str | None = None) -> list[KnowledgeIndexJobData]:
|
|
|
+ documents = self.document_repository.list_filtered(
|
|
|
+ knowledge_base_id=knowledge_base_id)
|
|
|
+ jobs: list[KnowledgeIndexJobData] = []
|
|
|
+ for document in documents:
|
|
|
+ if document_id is not None and document.id != document_id:
|
|
|
+ continue
|
|
|
+ job = self._try_read_latest_index_job(document=document)
|
|
|
+ if job is None:
|
|
|
+ continue
|
|
|
+ if status is not None and job.status != status:
|
|
|
+ continue
|
|
|
+ jobs.append(job)
|
|
|
+ jobs.sort(
|
|
|
+ key=lambda item: item.queuedTime or datetime.min,
|
|
|
+ reverse=True)
|
|
|
+ return jobs
|
|
|
+
|
|
|
+ def detail_index_job(self, *, document_id: str) -> KnowledgeIndexJobData | None:
|
|
|
+ document = self.document_repository.get_by_id(document_id=document_id)
|
|
|
+ if document is None:
|
|
|
+ return None
|
|
|
+ return self._try_read_latest_index_job(document=document)
|
|
|
+
|
|
|
+ def delete_document(self, *, document_id: str) -> bool:
|
|
|
+ return bool(self.delete_document_result(document_id=document_id)["deleted"])
|
|
|
+
|
|
|
+ def delete_document_result(self, *, document_id: str) -> dict[str, JSONValue]:
|
|
|
+ document = self.document_repository.get_by_id(document_id=document_id)
|
|
|
+ if document is None:
|
|
|
+ return {
|
|
|
+ "deleted": False,
|
|
|
+ "objectDeleted": False,
|
|
|
+ "documentId": document_id,
|
|
|
+ }
|
|
|
+ object_deleted = self._delete_document_object(document=document)
|
|
|
+ self.chunk_repository.delete_by_document(document_id=document_id)
|
|
|
+ deleted = self.document_repository.delete(document_id=document_id) is not None
|
|
|
+ return {
|
|
|
+ "deleted": deleted,
|
|
|
+ "objectDeleted": object_deleted,
|
|
|
+ "documentId": document_id,
|
|
|
+ }
|
|
|
+
|
|
|
+ def read_document_content(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ document_id: str,
|
|
|
+ include_text: bool = True,
|
|
|
+ include_base64: bool = False) -> dict[str, JSONValue] | None:
|
|
|
+ document = self.document_repository.get_by_id(document_id=document_id)
|
|
|
+ if document is None:
|
|
|
+ return None
|
|
|
+ raw_content = self._read_document_raw_content(document=document)
|
|
|
+ object_status = self.read_document_storage_status(document_id=document_id)
|
|
|
+ content_type = self._read_content_type_from_status(object_status)
|
|
|
+ payload: dict[str, JSONValue] = {
|
|
|
+ "documentId": document.id,
|
|
|
+ "title": document.title,
|
|
|
+ "sourceType": document.source_type,
|
|
|
+ "contentType": content_type,
|
|
|
+ "sizeBytes": len(raw_content),
|
|
|
+ "objectStorage": self._read_object_storage_metadata(document=document),
|
|
|
+ "contentBase64": None,
|
|
|
+ "contentText": None,
|
|
|
+ }
|
|
|
+ if include_base64:
|
|
|
+ payload["contentBase64"] = base64.b64encode(raw_content).decode("ascii")
|
|
|
+ if include_text and self._is_text_content_type(content_type=content_type, source_type=document.source_type):
|
|
|
+ payload["contentText"] = raw_content.decode("utf-8", errors="replace")
|
|
|
+ return payload
|
|
|
+
|
|
|
+ def read_document_storage_status(self, *, document_id: str) -> dict[str, JSONValue] | None:
|
|
|
+ document = self.document_repository.get_by_id(document_id=document_id)
|
|
|
+ if document is None:
|
|
|
+ return None
|
|
|
+ object_key = self._read_document_object_key(document=document)
|
|
|
+ if object_key is None:
|
|
|
+ return {
|
|
|
+ "documentId": document.id,
|
|
|
+ "exists": False,
|
|
|
+ "objectStorage": None,
|
|
|
+ "errorMessage": "document object metadata is missing",
|
|
|
+ }
|
|
|
+ status = self.object_storage.head_object(object_key=object_key)
|
|
|
+ return self._object_status_to_payload(document=document, status=status)
|
|
|
+
|
|
|
+ def read_storage_health(self) -> dict[str, JSONValue]:
|
|
|
+ return dict(self.object_storage.health_check())
|
|
|
+
|
|
|
+ def list_chunks_filtered(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ knowledge_base_id: str | None = None,
|
|
|
+ document_id: str | None = None,
|
|
|
+ keyword: str | None = None) -> list[KnowledgeChunk]:
|
|
|
+ return self.chunk_repository.list_filtered(
|
|
|
+ knowledge_base_id=knowledge_base_id,
|
|
|
+ document_id=document_id,
|
|
|
+ keyword=keyword)
|
|
|
+
|
|
|
+ def delete_chunk(self, *, chunk_id: str) -> bool:
|
|
|
+ return self.chunk_repository.delete(chunk_id=chunk_id)
|
|
|
+
|
|
|
def search(
|
|
|
self,
|
|
|
payload: KnowledgeSearchRequest) -> list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]]:
|
|
|
@@ -223,15 +862,358 @@ class KnowledgeApplicationService:
|
|
|
value = chunk_payload.get("content_text")
|
|
|
return value if isinstance(value, str) else ""
|
|
|
|
|
|
+ def _parse_document_for_indexing(self, *, document: KnowledgeDocument) -> ParsedDocument:
|
|
|
+ metadata = document.metadata_json or {}
|
|
|
+ object_metadata = metadata.get("object_storage")
|
|
|
+ if isinstance(object_metadata, dict):
|
|
|
+ object_key = object_metadata.get("objectKey")
|
|
|
+ if isinstance(object_key, str) and object_key:
|
|
|
+ try:
|
|
|
+ raw_content = self.object_storage.get_bytes(object_key=object_key)
|
|
|
+ except ObjectStorageNotFoundError as exc:
|
|
|
+ raise ValueError(f"knowledge document content object not found: {document.id}") from exc
|
|
|
+ return parse_document_content(
|
|
|
+ source_type=document.source_type,
|
|
|
+ source_uri=document.source_uri,
|
|
|
+ content_base64=base64.b64encode(raw_content).decode("ascii"))
|
|
|
+ if document.content_text:
|
|
|
+ return parse_document_content(
|
|
|
+ source_type=document.source_type,
|
|
|
+ source_uri=document.source_uri,
|
|
|
+ content_text=document.content_text)
|
|
|
+ raise ValueError(f"knowledge document content object not found: {document.id}")
|
|
|
+
|
|
|
+ def _read_document_content_for_indexing(self, *, document: KnowledgeDocument) -> str:
|
|
|
+ return self._parse_document_for_indexing(document=document).content_text
|
|
|
+
|
|
|
+ def _read_document_raw_content(self, *, document: KnowledgeDocument) -> bytes:
|
|
|
+ object_key = self._read_document_object_key(document=document)
|
|
|
+ if isinstance(object_key, str) and object_key:
|
|
|
+ return self.object_storage.get_bytes(object_key=object_key)
|
|
|
+ if document.content_text:
|
|
|
+ return document.content_text.encode("utf-8")
|
|
|
+ raise ValueError(f"knowledge document content object not found: {document.id}")
|
|
|
+
|
|
|
+ def _delete_document_object(self, *, document: KnowledgeDocument) -> bool:
|
|
|
+ object_key = self._read_document_object_key(document=document)
|
|
|
+ if object_key is None:
|
|
|
+ return False
|
|
|
+ try:
|
|
|
+ return self.object_storage.delete_object(object_key=object_key)
|
|
|
+ except ObjectStorageNotFoundError:
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _read_document_object_key(self, *, document: KnowledgeDocument) -> str | None:
|
|
|
+ object_metadata = self._read_object_storage_metadata(document=document)
|
|
|
+ if object_metadata is None:
|
|
|
+ return None
|
|
|
+ object_key = object_metadata.get("objectKey")
|
|
|
+ return object_key if isinstance(object_key, str) and object_key else None
|
|
|
+
|
|
|
+ def _read_object_storage_metadata(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ document: KnowledgeDocument) -> dict[str, JSONValue] | None:
|
|
|
+ metadata = document.metadata_json or {}
|
|
|
+ object_metadata = metadata.get("object_storage")
|
|
|
+ return object_metadata if isinstance(object_metadata, dict) else None
|
|
|
+
|
|
|
+ def _object_status_to_payload(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ document: KnowledgeDocument,
|
|
|
+ status: ObjectStorageStatus) -> dict[str, JSONValue]:
|
|
|
+ return {
|
|
|
+ "documentId": document.id,
|
|
|
+ "exists": status.exists,
|
|
|
+ "objectStorage": self._read_object_storage_metadata(document=document),
|
|
|
+ "contentType": status.content_type,
|
|
|
+ "sizeBytes": status.size_bytes,
|
|
|
+ "etag": status.etag,
|
|
|
+ "errorMessage": status.error_message,
|
|
|
+ }
|
|
|
+
|
|
|
+ def _read_content_type_from_status(
|
|
|
+ self,
|
|
|
+ object_status: dict[str, JSONValue] | None) -> str | None:
|
|
|
+ if object_status is None:
|
|
|
+ return None
|
|
|
+ content_type = object_status.get("contentType")
|
|
|
+ return content_type if isinstance(content_type, str) else None
|
|
|
+
|
|
|
+ def _is_text_content_type(self, *, content_type: str | None, source_type: str) -> bool:
|
|
|
+ if content_type is not None and content_type.startswith("text/"):
|
|
|
+ return True
|
|
|
+ return source_type.strip().lower() in {"text", "txt", "markdown", "md", "html", "htm", "json", "csv"}
|
|
|
+
|
|
|
+ def _should_queue_indexing(self, *, async_mode: bool | None) -> bool:
|
|
|
+ if async_mode is False:
|
|
|
+ return False
|
|
|
+ if not self.settings.async_indexing_enabled and async_mode is None:
|
|
|
+ return False
|
|
|
+ return self.task_queue_publisher is not None
|
|
|
+
|
|
|
+ def _publish_document_index_job(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ document_id: str,
|
|
|
+ action: str,
|
|
|
+ job_id: str) -> bool:
|
|
|
+ if self.task_queue_publisher is None:
|
|
|
+ return False
|
|
|
+ return self.task_queue_publisher.publish_knowledge_document(
|
|
|
+ document_id=document_id,
|
|
|
+ action=action,
|
|
|
+ job_id=job_id)
|
|
|
+
|
|
|
+ def _write_index_job_metadata(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ document: KnowledgeDocument,
|
|
|
+ action: str,
|
|
|
+ job_id: str,
|
|
|
+ status: str,
|
|
|
+ progress: int,
|
|
|
+ chunk_size: int | None = None,
|
|
|
+ chunk_overlap: int | None = None,
|
|
|
+ worker_key: str | None = None,
|
|
|
+ started_time: datetime | None = None,
|
|
|
+ completed_time: datetime | None = None,
|
|
|
+ error_message: str | None = None) -> dict[str, JSONValue]:
|
|
|
+ metadata = dict(document.metadata_json or {})
|
|
|
+ existing_job = self._read_index_job_payload(document=document)
|
|
|
+ queued_time = existing_job.get("queuedTime")
|
|
|
+ if not isinstance(queued_time, str):
|
|
|
+ queued_time = datetime.utcnow().isoformat()
|
|
|
+ resolved_started_time = (
|
|
|
+ started_time.isoformat()
|
|
|
+ if started_time is not None
|
|
|
+ else existing_job.get("startedTime")
|
|
|
+ )
|
|
|
+ resolved_completed_time = (
|
|
|
+ completed_time.isoformat()
|
|
|
+ if completed_time is not None
|
|
|
+ else existing_job.get("completedTime")
|
|
|
+ )
|
|
|
+ job_payload: dict[str, JSONValue] = {
|
|
|
+ "jobId": job_id,
|
|
|
+ "documentId": document.id,
|
|
|
+ "knowledgeBaseId": document.knowledge_base_id,
|
|
|
+ "documentTitle": document.title,
|
|
|
+ "action": action if action in {"index", "reindex"} else "reindex",
|
|
|
+ "status": status,
|
|
|
+ "progress": max(0, min(progress, 100)),
|
|
|
+ "queueName": KNOWLEDGE_DOCUMENT_QUEUE,
|
|
|
+ "workerKey": worker_key,
|
|
|
+ "errorMessage": error_message,
|
|
|
+ "chunkSize": chunk_size,
|
|
|
+ "chunkOverlap": chunk_overlap,
|
|
|
+ "queuedTime": queued_time,
|
|
|
+ "startedTime": resolved_started_time if isinstance(resolved_started_time, str) else None,
|
|
|
+ "completedTime": resolved_completed_time if isinstance(resolved_completed_time, str) else None,
|
|
|
+ }
|
|
|
+ metadata["index_job"] = job_payload
|
|
|
+ return metadata
|
|
|
+
|
|
|
+ def _read_latest_index_job(self, *, document: KnowledgeDocument) -> KnowledgeIndexJobData:
|
|
|
+ payload = self._read_index_job_payload(document=document)
|
|
|
+ return KnowledgeIndexJobData(
|
|
|
+ jobId=self._read_payload_string(payload, "jobId") or f"kjob_{document.id}",
|
|
|
+ documentId=self._read_payload_string(payload, "documentId") or document.id,
|
|
|
+ knowledgeBaseId=self._read_payload_string(payload, "knowledgeBaseId") or document.knowledge_base_id,
|
|
|
+ documentTitle=self._read_payload_string(payload, "documentTitle") or document.title,
|
|
|
+ action=self._read_job_action(payload.get("action")),
|
|
|
+ status=self._read_job_status(payload.get("status")),
|
|
|
+ progress=self._read_payload_int(payload, "progress", 0),
|
|
|
+ queueName=self._read_payload_string(payload, "queueName"),
|
|
|
+ workerKey=self._read_payload_string(payload, "workerKey"),
|
|
|
+ errorMessage=self._read_payload_string(payload, "errorMessage"),
|
|
|
+ chunkSize=self._read_optional_payload_int(payload, "chunkSize"),
|
|
|
+ chunkOverlap=self._read_optional_payload_int(payload, "chunkOverlap"),
|
|
|
+ queuedTime=self._read_payload_datetime(payload, "queuedTime"),
|
|
|
+ startedTime=self._read_payload_datetime(payload, "startedTime"),
|
|
|
+ completedTime=self._read_payload_datetime(payload, "completedTime"))
|
|
|
+
|
|
|
+ def _try_read_latest_index_job(self, *, document: KnowledgeDocument) -> KnowledgeIndexJobData | None:
|
|
|
+ if not self._read_index_job_payload(document=document):
|
|
|
+ return None
|
|
|
+ return self._read_latest_index_job(document=document)
|
|
|
+
|
|
|
+ def _read_index_job_payload(self, *, document: KnowledgeDocument) -> dict[str, JSONValue]:
|
|
|
+ metadata = document.metadata_json or {}
|
|
|
+ value = metadata.get("index_job")
|
|
|
+ if isinstance(value, dict):
|
|
|
+ return {str(item_key): item_value for item_key, item_value in value.items()}
|
|
|
+ return {}
|
|
|
+
|
|
|
+ def _read_payload_string(
|
|
|
+ self,
|
|
|
+ payload: dict[str, JSONValue],
|
|
|
+ key: str) -> str | None:
|
|
|
+ value = payload.get(key)
|
|
|
+ return value if isinstance(value, str) and value else None
|
|
|
+
|
|
|
+ def _read_payload_int(
|
|
|
+ self,
|
|
|
+ payload: dict[str, JSONValue],
|
|
|
+ key: str,
|
|
|
+ fallback: int) -> int:
|
|
|
+ value = payload.get(key)
|
|
|
+ if isinstance(value, int) and not isinstance(value, bool):
|
|
|
+ return value
|
|
|
+ return fallback
|
|
|
+
|
|
|
+ def _read_optional_payload_int(
|
|
|
+ self,
|
|
|
+ payload: dict[str, JSONValue],
|
|
|
+ key: str) -> int | None:
|
|
|
+ value = payload.get(key)
|
|
|
+ if isinstance(value, int) and not isinstance(value, bool):
|
|
|
+ return value
|
|
|
+ return None
|
|
|
+
|
|
|
+ def _read_payload_datetime(
|
|
|
+ self,
|
|
|
+ payload: dict[str, JSONValue],
|
|
|
+ key: str) -> datetime | None:
|
|
|
+ value = payload.get(key)
|
|
|
+ if not isinstance(value, str) or not value:
|
|
|
+ return None
|
|
|
+ try:
|
|
|
+ return datetime.fromisoformat(value)
|
|
|
+ except ValueError:
|
|
|
+ return None
|
|
|
+
|
|
|
+ def _read_job_action(self, value: JSONValue) -> KnowledgeIndexJobAction:
|
|
|
+ if isinstance(value, str) and value in {"index", "reindex"}:
|
|
|
+ return cast(KnowledgeIndexJobAction, value)
|
|
|
+ return "reindex"
|
|
|
+
|
|
|
+ def _read_job_status(self, value: JSONValue) -> KnowledgeIndexJobStatus:
|
|
|
+ if isinstance(value, str) and value in {"queued", "running", "completed", "failed", "skipped"}:
|
|
|
+ return cast(KnowledgeIndexJobStatus, value)
|
|
|
+ return "queued"
|
|
|
+
|
|
|
+ def _mark_document_failed(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ document: KnowledgeDocument,
|
|
|
+ message: str,
|
|
|
+ job_id: str | None = None,
|
|
|
+ action: str = "reindex",
|
|
|
+ worker_key: str | None = None,
|
|
|
+ chunk_size: int | None = None,
|
|
|
+ chunk_overlap: int | None = None) -> None:
|
|
|
+ metadata = dict(document.metadata_json or {})
|
|
|
+ metadata["last_error"] = {
|
|
|
+ "message": message[:1000],
|
|
|
+ "errorType": "indexing_failed",
|
|
|
+ }
|
|
|
+ if job_id is not None:
|
|
|
+ metadata = self._write_index_job_metadata(
|
|
|
+ document=document,
|
|
|
+ action=action,
|
|
|
+ job_id=job_id,
|
|
|
+ status="failed",
|
|
|
+ progress=100,
|
|
|
+ chunk_size=chunk_size,
|
|
|
+ chunk_overlap=chunk_overlap,
|
|
|
+ worker_key=worker_key,
|
|
|
+ completed_time=datetime.utcnow(),
|
|
|
+ error_message=message[:1000])
|
|
|
+ self.document_repository.update(
|
|
|
+ document_id=document.id,
|
|
|
+ status="failed",
|
|
|
+ metadata_json=metadata)
|
|
|
+
|
|
|
+ def _guess_content_type(self, *, source_type: str) -> str:
|
|
|
+ normalized = source_type.strip().lower().removeprefix(".")
|
|
|
+ if normalized in {"markdown", "md"}:
|
|
|
+ return "text/markdown; charset=utf-8"
|
|
|
+ if normalized in {"html", "htm"}:
|
|
|
+ return "text/html; charset=utf-8"
|
|
|
+ if normalized == "json":
|
|
|
+ return "application/json"
|
|
|
+ if normalized == "csv":
|
|
|
+ return "text/csv; charset=utf-8"
|
|
|
+ if normalized == "pdf":
|
|
|
+ return "application/pdf"
|
|
|
+ if normalized in {"docx", "word"}:
|
|
|
+ return "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
|
|
|
+ return "text/plain; charset=utf-8"
|
|
|
+
|
|
|
def _matches_filters(
|
|
|
self,
|
|
|
*,
|
|
|
document: KnowledgeDocument,
|
|
|
filters_json: dict[str, JSONValue]) -> bool:
|
|
|
- source_type = filters_json.get("source_type")
|
|
|
+ source_type = filters_json.get("sourceType") or filters_json.get("source_type")
|
|
|
if isinstance(source_type, str) and document.source_type != source_type:
|
|
|
return False
|
|
|
status = filters_json.get("status")
|
|
|
if isinstance(status, str) and document.status != status:
|
|
|
return False
|
|
|
return True
|
|
|
+
|
|
|
+ def read_settings(self, *, knowledge_base_id: str | None = None) -> KnowledgeSettingsDto:
|
|
|
+ base_config: dict[str, JSONValue] = {}
|
|
|
+ if knowledge_base_id:
|
|
|
+ base = self.base_repository.get_by_id(knowledge_base_id=knowledge_base_id)
|
|
|
+ if base is not None and isinstance(base.metadata_json, dict):
|
|
|
+ value = base.metadata_json.get("retrieval_config")
|
|
|
+ if isinstance(value, dict):
|
|
|
+ base_config = value
|
|
|
+ defaults = KnowledgeSettingsDto(
|
|
|
+ knowledgeBaseId=knowledge_base_id,
|
|
|
+ chunkSize=self.settings.default_chunk_size,
|
|
|
+ chunkOverlap=self.settings.default_chunk_overlap,
|
|
|
+ keywordWeight=self.settings.retrieval_keyword_weight,
|
|
|
+ vectorWeight=self.settings.retrieval_vector_weight,
|
|
|
+ rerankWeight=self.settings.retrieval_rerank_weight,
|
|
|
+ queryRewrite=False,
|
|
|
+ requireCitations=True)
|
|
|
+ return KnowledgeSettingsDto.model_validate({
|
|
|
+ **defaults.model_dump(),
|
|
|
+ **base_config,
|
|
|
+ "knowledgeBaseId": knowledge_base_id,
|
|
|
+ })
|
|
|
+
|
|
|
+ def update_settings(
|
|
|
+ self,
|
|
|
+ payload: KnowledgeSettingsUpdateRequestDto) -> KnowledgeSettingsDto:
|
|
|
+ settings = KnowledgeSettingsDto.model_validate({
|
|
|
+ **payload.model_dump(),
|
|
|
+ "knowledgeBaseId": payload.knowledgeBaseId,
|
|
|
+ })
|
|
|
+ if payload.knowledgeBaseId:
|
|
|
+ base = self.base_repository.get_by_id(knowledge_base_id=payload.knowledgeBaseId)
|
|
|
+ if base is not None:
|
|
|
+ metadata = dict(base.metadata_json or {})
|
|
|
+ metadata["retrieval_config"] = settings.model_dump(exclude={"knowledgeBaseId"})
|
|
|
+ self.base_repository.update(
|
|
|
+ knowledge_base_id=payload.knowledgeBaseId,
|
|
|
+ metadata_json=metadata)
|
|
|
+ return settings
|
|
|
+
|
|
|
+ def _build_base_code(self, name: str) -> str:
|
|
|
+ base = "".join(
|
|
|
+ char.lower() if char.isalnum() else "_"
|
|
|
+ for char in name
|
|
|
+ ).strip("_") or "knowledge_base"
|
|
|
+ return base[:64]
|
|
|
+
|
|
|
+
|
|
|
+def build_knowledge_application_service(
|
|
|
+ *,
|
|
|
+ db: Session,
|
|
|
+ settings: KnowledgeServiceSettings) -> KnowledgeApplicationService:
|
|
|
+ redis_client = try_build_redis_client(settings.redis_url)
|
|
|
+ return KnowledgeApplicationService(
|
|
|
+ settings=settings,
|
|
|
+ base_repository=KnowledgeBaseRepository(db),
|
|
|
+ document_repository=KnowledgeDocumentRepository(db),
|
|
|
+ chunk_repository=KnowledgeChunkRepository(db),
|
|
|
+ redis_client=redis_client,
|
|
|
+ task_queue_publisher=(
|
|
|
+ TaskQueuePublisher(client=redis_client) if redis_client is not None else None
|
|
|
+ ))
|