| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809 |
- """Knowledge indexing sub-service — document creation, chunking, embedding, job queue."""
- from __future__ import annotations
- import base64
- import hashlib
- from dataclasses import dataclass
- from datetime import datetime, timedelta
- from typing import TYPE_CHECKING, cast
- from uuid import uuid4
- from core_shared import JSONValue
- from core_shared.task_queue import KNOWLEDGE_DOCUMENT_QUEUE, TaskQueuePublisher
- from app.application._storage_mixin import _ObjectStorageMixin
- from app.application.chunking import chunk_document
- from app.application.document_parsers import (
- normalize_source_type,
- parse_document_content,
- read_document_content_bytes)
- from app.application.embeddings import EmbeddingService
- from app.application.retrieval import stable_content_hash
- from app.bootstrap.settings import KnowledgeServiceSettings
- from app.db.models import KnowledgeChunk, KnowledgeDocument
- from app.infrastructure.object_storage import (
- ObjectStorageError,
- ObjectStorageNotFoundError,
- build_document_object_key)
- from app.schemas.knowledge import (
- KnowledgeBaseReindexRequestDto,
- KnowledgeDocumentCreateRequest,
- KnowledgeDocumentCreateRequestDto,
- KnowledgeDocumentReindexRequestDto,
- KnowledgeIndexJobAction,
- KnowledgeIndexJobData,
- KnowledgeIndexJobStatus)
- if TYPE_CHECKING:
- from redis import Redis
- from app.domain.repositories import (
- KnowledgeBaseRepository,
- KnowledgeChunkRepository,
- KnowledgeDocumentRepository)
- from app.infrastructure.object_storage import KnowledgeObjectStorage
- @dataclass(frozen=True, slots=True)
- class KnowledgeDocumentIngestResult:
- document: KnowledgeDocument
- chunks: list[KnowledgeChunk]
- queued: bool = False
- job: KnowledgeIndexJobData | None = None
- class KnowledgeIndexingError(RuntimeError):
- def __init__(self, *, document_id: str, message: str) -> None:
- super().__init__(message)
- self.document_id = document_id
- class KnowledgeIndexingService(_ObjectStorageMixin):
- def __init__(
- self,
- *,
- settings: KnowledgeServiceSettings,
- base_repository: KnowledgeBaseRepository,
- document_repository: KnowledgeDocumentRepository,
- chunk_repository: KnowledgeChunkRepository,
- embedding_service: EmbeddingService,
- object_storage: KnowledgeObjectStorage | None = None,
- redis_client: Redis | None = None,
- task_queue_publisher: TaskQueuePublisher | None = None,
- ) -> None:
- self.settings = settings
- self.base_repository = base_repository
- self.document_repository = document_repository
- self.chunk_repository = chunk_repository
- self.embedding_service = embedding_service
- self._object_storage = object_storage
- self.redis_client = redis_client
- self.task_queue_publisher = task_queue_publisher
- # ── Document creation (with indexing) ─────────────────────────────
- def create_document(
- self,
- payload: KnowledgeDocumentCreateRequest,
- ) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
- from app.application.document_parsers import ParsedDocument
- knowledge_base = self.base_repository.get_by_id(
- knowledge_base_id=payload.knowledge_base_id)
- if knowledge_base is None:
- raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
- parsed = parse_document_content(
- source_type=payload.source_type,
- source_uri=payload.source_uri,
- content_text=payload.content_text,
- content_base64=payload.content_base64)
- raw_content = read_document_content_bytes(
- content_text=payload.content_text,
- content_base64=payload.content_base64)
- object_key = build_document_object_key(
- knowledge_base_id=payload.knowledge_base_id,
- source_type=parsed.source_type,
- title=payload.title)
- stored_object = self.object_storage.put_bytes(
- object_key=object_key,
- content=raw_content,
- content_type=self._guess_content_type(source_type=parsed.source_type))
- document: KnowledgeDocument | None = None
- try:
- metadata_json = {
- **payload.metadata_json,
- "parser_metadata": parsed.metadata_json,
- "object_storage": stored_object.to_metadata(),
- }
- document = self.document_repository.create(
- knowledge_base_id=payload.knowledge_base_id,
- title=payload.title,
- source_type=parsed.source_type,
- source_uri=payload.source_uri,
- content_text="",
- content_hash=stable_content_hash(parsed.content_text),
- metadata_json=metadata_json)
- try:
- chunks = self._index_document(
- document=document,
- content_text=parsed.content_text,
- chunk_size=payload.chunk_size,
- chunk_overlap=payload.chunk_overlap)
- except Exception as exc:
- self._mark_document_failed(document=document, message=str(exc))
- raise KnowledgeIndexingError(
- document_id=document.id,
- message=f"knowledge document indexing failed: {exc}") from exc
- indexed_document = self.document_repository.update_status(
- document_id=document.id,
- status="indexed")
- return indexed_document or document, chunks
- except Exception:
- if document is None:
- try:
- self.object_storage.delete_object(object_key=stored_object.object_key)
- except ObjectStorageError:
- pass
- raise
- def create_document_from_contract(
- self,
- payload: KnowledgeDocumentCreateRequestDto,
- ) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
- return self.create_document(KnowledgeDocumentCreateRequest(
- knowledge_base_id=payload.knowledgeBaseId,
- title=payload.title,
- content_text=payload.contentText,
- content_base64=payload.contentBase64,
- source_type=payload.sourceType,
- source_uri=payload.sourceUri,
- metadata_json=payload.metadata,
- chunk_size=payload.chunkSize,
- chunk_overlap=payload.chunkOverlap))
- def create_document_from_contract_result(
- self,
- payload: KnowledgeDocumentCreateRequestDto,
- ) -> KnowledgeDocumentIngestResult:
- request = KnowledgeDocumentCreateRequest(
- knowledge_base_id=payload.knowledgeBaseId,
- title=payload.title,
- content_text=payload.contentText,
- content_base64=payload.contentBase64,
- source_type=payload.sourceType,
- source_uri=payload.sourceUri,
- metadata_json=payload.metadata,
- chunk_size=payload.chunkSize,
- chunk_overlap=payload.chunkOverlap)
- if self._should_queue_indexing(async_mode=payload.asyncMode):
- return self.create_document_index_job(payload=request)
- document, chunks = self.create_document(request)
- return KnowledgeDocumentIngestResult(document=document, chunks=chunks)
- def create_document_index_job(
- self,
- payload: KnowledgeDocumentCreateRequest,
- ) -> KnowledgeDocumentIngestResult:
- knowledge_base = self.base_repository.get_by_id(
- knowledge_base_id=payload.knowledge_base_id)
- if knowledge_base is None:
- raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
- raw_content = read_document_content_bytes(
- content_text=payload.content_text,
- content_base64=payload.content_base64)
- source_type = normalize_source_type(
- source_type=payload.source_type,
- source_uri=payload.source_uri)
- object_key = build_document_object_key(
- knowledge_base_id=payload.knowledge_base_id,
- source_type=source_type,
- title=payload.title)
- stored_object = self.object_storage.put_bytes(
- object_key=object_key,
- content=raw_content,
- content_type=self._guess_content_type(source_type=source_type))
- document: KnowledgeDocument | None = None
- try:
- document = self.document_repository.create(
- knowledge_base_id=payload.knowledge_base_id,
- title=payload.title,
- source_type=source_type,
- source_uri=payload.source_uri,
- content_text="",
- content_hash=hashlib.sha256(raw_content).hexdigest(),
- metadata_json={
- **payload.metadata_json,
- "object_storage": stored_object.to_metadata(),
- },
- status="draft")
- queued_document, job = self.queue_document_indexing(
- document=document,
- action="index",
- chunk_size=payload.chunk_size,
- chunk_overlap=payload.chunk_overlap)
- except Exception:
- if document is None:
- try:
- self.object_storage.delete_object(object_key=stored_object.object_key)
- except ObjectStorageError:
- pass
- raise
- return KnowledgeDocumentIngestResult(
- document=queued_document,
- chunks=[],
- queued=True,
- job=job)
- # ── Reindexing ────────────────────────────────────────────────────
- def reindex_document(
- self,
- payload: KnowledgeDocumentReindexRequestDto,
- ) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
- document = self.document_repository.get_by_id(document_id=payload.documentId)
- if document is None:
- return None
- try:
- parsed = self._parse_document_for_indexing(document=document)
- chunks = self._index_document(
- document=document,
- content_text=parsed.content_text,
- chunk_size=payload.chunkSize,
- chunk_overlap=payload.chunkOverlap)
- except Exception as exc:
- self._mark_document_failed(document=document, message=str(exc))
- raise KnowledgeIndexingError(
- document_id=document.id,
- message=f"knowledge document reindex failed: {exc}") from exc
- metadata = dict(document.metadata_json or {})
- metadata["parser_metadata"] = parsed.metadata_json
- indexed_document = self.document_repository.update(
- document_id=document.id,
- status="indexed",
- metadata_json=metadata)
- return indexed_document or document, chunks
- def reindex_document_from_contract_result(
- self,
- payload: KnowledgeDocumentReindexRequestDto,
- ) -> KnowledgeDocumentIngestResult | None:
- if self._should_queue_indexing(async_mode=payload.asyncMode):
- document = self.document_repository.get_by_id(document_id=payload.documentId)
- if document is None:
- return None
- queued_document, job = self.queue_document_indexing(
- document=document,
- action="reindex",
- chunk_size=payload.chunkSize,
- chunk_overlap=payload.chunkOverlap)
- return KnowledgeDocumentIngestResult(
- document=queued_document,
- chunks=[],
- queued=True,
- job=job)
- result = self.reindex_document(payload)
- if result is None:
- return None
- document, chunks = result
- return KnowledgeDocumentIngestResult(document=document, chunks=chunks)
- def reindex_base_from_contract(
- self,
- payload: KnowledgeBaseReindexRequestDto,
- ) -> list[KnowledgeIndexJobData]:
- documents = self.document_repository.list_by_base(
- knowledge_base_id=payload.knowledgeBaseId)
- jobs: list[KnowledgeIndexJobData] = []
- for document in documents:
- if document.status == "archived":
- continue
- queued_document, job = self.queue_document_indexing(
- document=document,
- action="reindex",
- chunk_size=payload.chunkSize,
- chunk_overlap=payload.chunkOverlap)
- jobs.append(job or self._read_latest_index_job(document=queued_document))
- return jobs
- # ── Job queue management ──────────────────────────────────────────
- def queue_document_indexing(
- self,
- *,
- document: KnowledgeDocument,
- action: str,
- chunk_size: int | None = None,
- chunk_overlap: int | None = None,
- ) -> tuple[KnowledgeDocument, KnowledgeIndexJobData]:
- job_id = f"kjob_{uuid4().hex}"
- metadata = self._write_index_job_metadata(
- document=document,
- action=action,
- job_id=job_id,
- status="queued",
- progress=0,
- chunk_size=chunk_size,
- chunk_overlap=chunk_overlap)
- updated_document = self.document_repository.update(
- document_id=document.id,
- status="queued",
- metadata_json=metadata)
- document_for_job = updated_document or document
- published = self._publish_document_index_job(
- document_id=document.id,
- action=action,
- job_id=job_id)
- if not published:
- metadata = self._write_index_job_metadata(
- document=document_for_job,
- action=action,
- job_id=job_id,
- status="running",
- progress=1,
- chunk_size=chunk_size,
- chunk_overlap=chunk_overlap,
- worker_key="inline-fallback")
- document_for_job = self.document_repository.update(
- document_id=document.id,
- status="indexing",
- metadata_json=metadata) or document_for_job
- processed = self.process_document_index_job(
- document_id=document.id,
- action=action,
- job_id=job_id,
- worker_key="inline-fallback",
- chunk_size=chunk_size,
- chunk_overlap=chunk_overlap)
- if processed is not None:
- indexed_document, chunks = processed
- return indexed_document, self._read_latest_index_job(document=indexed_document)
- return document_for_job, self._read_latest_index_job(document=document_for_job)
- def process_document_index_job(
- self,
- *,
- document_id: str,
- action: str,
- worker_key: str,
- job_id: str | None = None,
- chunk_size: int | None = None,
- chunk_overlap: int | None = None,
- ) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
- document = self.document_repository.get_by_id(document_id=document_id)
- if document is None:
- return None
- resolved_job = self._read_latest_index_job(document=document)
- resolved_job_id = job_id or resolved_job.jobId
- resolved_chunk_size = chunk_size if chunk_size is not None else resolved_job.chunkSize
- resolved_chunk_overlap = chunk_overlap if chunk_overlap is not None else resolved_job.chunkOverlap
- if document.status == "archived":
- metadata = self._write_index_job_metadata(
- document=document,
- action=action,
- job_id=resolved_job_id,
- status="skipped",
- progress=100,
- chunk_size=resolved_chunk_size,
- chunk_overlap=resolved_chunk_overlap,
- worker_key=worker_key,
- completed_time=datetime.utcnow(),
- error_message="document is archived")
- skipped_document = self.document_repository.update(
- document_id=document.id,
- metadata_json=metadata) or document
- return skipped_document, []
- metadata = self._write_index_job_metadata(
- document=document,
- action=action,
- job_id=resolved_job_id,
- status="running",
- progress=10,
- chunk_size=resolved_chunk_size,
- chunk_overlap=resolved_chunk_overlap,
- worker_key=worker_key,
- started_time=datetime.utcnow())
- running_document = self.document_repository.update(
- document_id=document.id,
- status="indexing",
- metadata_json=metadata) or document
- try:
- parsed = self._parse_document_for_indexing(document=running_document)
- metadata = self._write_index_job_metadata(
- document=running_document,
- action=action,
- job_id=resolved_job_id,
- status="running",
- progress=40,
- chunk_size=resolved_chunk_size,
- chunk_overlap=resolved_chunk_overlap,
- worker_key=worker_key,
- started_time=self._read_latest_index_job(document=running_document).startedTime)
- metadata["parser_metadata"] = parsed.metadata_json
- running_document = self.document_repository.update(
- document_id=document.id,
- status="indexing",
- metadata_json=metadata) or running_document
- chunks = self._index_document(
- document=running_document,
- content_text=parsed.content_text,
- chunk_size=resolved_chunk_size,
- chunk_overlap=resolved_chunk_overlap)
- except Exception as exc:
- self._mark_document_failed(
- document=running_document,
- message=str(exc),
- job_id=resolved_job_id,
- action=action,
- worker_key=worker_key,
- chunk_size=resolved_chunk_size,
- chunk_overlap=resolved_chunk_overlap)
- raise KnowledgeIndexingError(
- document_id=document.id,
- message=f"knowledge document {action} failed: {exc}") from exc
- metadata = self._write_index_job_metadata(
- document=running_document,
- action=action,
- job_id=resolved_job_id,
- status="completed",
- progress=100,
- chunk_size=resolved_chunk_size,
- chunk_overlap=resolved_chunk_overlap,
- worker_key=worker_key,
- completed_time=datetime.utcnow())
- metadata["parser_metadata"] = parsed.metadata_json
- indexed_document = self.document_repository.update(
- document_id=document.id,
- status="indexed",
- metadata_json=metadata)
- return indexed_document or running_document, chunks
- def execute_document_index_job(
- self,
- *,
- document_id: str,
- action: str,
- worker_key: str,
- lease_seconds: int,
- job_id: str | None = None,
- redis_client: Redis | None = None,
- ) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
- resolved_redis_client = redis_client or self.redis_client
- idempotency_key = f"{document_id}:{job_id or action}"
- lock = None
- idempotency_store = None
- if resolved_redis_client is not None:
- from core_shared.redis_primitives import DistributedLock, IdempotencyStore
- lock = DistributedLock(
- client=resolved_redis_client,
- name=f"knowledge-document:{document_id}:lock",
- ttl_seconds=lease_seconds)
- if not lock.acquire():
- return None
- idempotency_store = IdempotencyStore(
- client=resolved_redis_client,
- prefix="knowledge-document-idempotency")
- if not idempotency_store.begin(key=idempotency_key):
- lock.release()
- return None
- try:
- result = self.process_document_index_job(
- document_id=document_id,
- action=action,
- job_id=job_id,
- worker_key=worker_key)
- if idempotency_store is not None and result is not None:
- document, chunks = result
- idempotency_store.complete(
- key=idempotency_key,
- result={
- "status": document.status,
- "document_id": document.id,
- "chunk_count": len(chunks),
- })
- except Exception:
- if idempotency_store is not None:
- idempotency_store.clear(key=idempotency_key)
- raise
- finally:
- if lock is not None:
- lock.release()
- return result
- def execute_next_pending_document_job(
- self,
- *,
- worker_key: str,
- lease_seconds: int,
- stale_indexing_seconds: int,
- redis_client: Redis | None = None,
- ) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
- stale_before = datetime.utcnow() - timedelta(seconds=stale_indexing_seconds)
- document = self.document_repository.get_next_pending_indexing(stale_before=stale_before)
- if document is None:
- return None
- job = self._read_latest_index_job(document=document)
- return self.execute_document_index_job(
- document_id=document.id,
- action=job.action,
- job_id=job.jobId,
- worker_key=worker_key,
- lease_seconds=lease_seconds,
- redis_client=redis_client)
- def list_index_jobs(
- self,
- *,
- knowledge_base_id: str | None = None,
- document_id: str | None = None,
- status: str | None = None,
- ) -> list[KnowledgeIndexJobData]:
- documents = self.document_repository.list_filtered(
- knowledge_base_id=knowledge_base_id)
- jobs: list[KnowledgeIndexJobData] = []
- for document in documents:
- if document_id is not None and document.id != document_id:
- continue
- job = self._try_read_latest_index_job(document=document)
- if job is None:
- continue
- if status is not None and job.status != status:
- continue
- jobs.append(job)
- jobs.sort(
- key=lambda item: item.queuedTime or datetime.min,
- reverse=True)
- return jobs
- def detail_index_job(self, *, document_id: str) -> KnowledgeIndexJobData | None:
- document = self.document_repository.get_by_id(document_id=document_id)
- if document is None:
- return None
- return self._try_read_latest_index_job(document=document)
- # ── Core indexing logic ───────────────────────────────────────────
- def _index_document(
- self,
- *,
- document: KnowledgeDocument,
- content_text: str,
- chunk_size: int | None,
- chunk_overlap: int | None,
- ) -> list[KnowledgeChunk]:
- source_type = document.source_type or "text"
- resolved_size = chunk_size or self.settings.default_chunk_size
- resolved_overlap = chunk_overlap or self.settings.default_chunk_overlap
- chunk_payloads = chunk_document(
- content_text=content_text,
- source_type=source_type,
- chunk_size=resolved_size,
- chunk_overlap=resolved_overlap)
- for chunk_payload in chunk_payloads:
- text = chunk_payload.get("content_text")
- content = text if isinstance(text, str) else ""
- embedding_result = self.embedding_service.embed_text(content)
- chunk_payload["embedding_model"] = embedding_result.model
- chunk_payload["embedding_json"] = embedding_result.embedding
- chunk_payload["metadata_json"] = {
- "embedding_provider": embedding_result.provider,
- }
- return self.chunk_repository.replace_document_chunks(
- knowledge_base_id=document.knowledge_base_id,
- document_id=document.id,
- chunks=chunk_payloads)
- def _parse_document_for_indexing(self, *, document: KnowledgeDocument):
- metadata = document.metadata_json or {}
- object_metadata = metadata.get("object_storage")
- if isinstance(object_metadata, dict):
- object_key = object_metadata.get("objectKey")
- if isinstance(object_key, str) and object_key:
- try:
- raw_content = self.object_storage.get_bytes(object_key=object_key)
- except ObjectStorageNotFoundError as exc:
- raise ValueError(f"knowledge document content object not found: {document.id}") from exc
- return parse_document_content(
- source_type=document.source_type,
- source_uri=document.source_uri,
- content_base64=base64.b64encode(raw_content).decode("ascii"))
- if document.content_text:
- return parse_document_content(
- source_type=document.source_type,
- source_uri=document.source_uri,
- content_text=document.content_text)
- raise ValueError(f"knowledge document content object not found: {document.id}")
- # ── Queue helpers ─────────────────────────────────────────────────
- def _should_queue_indexing(self, *, async_mode: bool | None) -> bool:
- if async_mode is False:
- return False
- if not self.settings.async_indexing_enabled and async_mode is None:
- return False
- return self.task_queue_publisher is not None
- def _publish_document_index_job(
- self,
- *,
- document_id: str,
- action: str,
- job_id: str,
- ) -> bool:
- if self.task_queue_publisher is None:
- return False
- return self.task_queue_publisher.publish_knowledge_document(
- document_id=document_id,
- action=action,
- job_id=job_id)
- # ── Job metadata ──────────────────────────────────────────────────
- def _write_index_job_metadata(
- self,
- *,
- document: KnowledgeDocument,
- action: str,
- job_id: str,
- status: str,
- progress: int,
- chunk_size: int | None = None,
- chunk_overlap: int | None = None,
- worker_key: str | None = None,
- started_time: datetime | None = None,
- completed_time: datetime | None = None,
- error_message: str | None = None,
- ) -> dict[str, JSONValue]:
- metadata = dict(document.metadata_json or {})
- existing_job = self._read_index_job_payload(document=document)
- queued_time = existing_job.get("queuedTime")
- if not isinstance(queued_time, str):
- queued_time = datetime.utcnow().isoformat()
- resolved_started_time = (
- started_time.isoformat()
- if started_time is not None
- else existing_job.get("startedTime")
- )
- resolved_completed_time = (
- completed_time.isoformat()
- if completed_time is not None
- else existing_job.get("completedTime")
- )
- job_payload: dict[str, JSONValue] = {
- "jobId": job_id,
- "documentId": document.id,
- "knowledgeBaseId": document.knowledge_base_id,
- "documentTitle": document.title,
- "action": action if action in {"index", "reindex"} else "reindex",
- "status": status,
- "progress": max(0, min(progress, 100)),
- "queueName": KNOWLEDGE_DOCUMENT_QUEUE,
- "workerKey": worker_key,
- "errorMessage": error_message,
- "chunkSize": chunk_size,
- "chunkOverlap": chunk_overlap,
- "queuedTime": queued_time,
- "startedTime": resolved_started_time if isinstance(resolved_started_time, str) else None,
- "completedTime": resolved_completed_time if isinstance(resolved_completed_time, str) else None,
- }
- metadata["index_job"] = job_payload
- return metadata
- def _read_latest_index_job(self, *, document: KnowledgeDocument) -> KnowledgeIndexJobData:
- payload = self._read_index_job_payload(document=document)
- return KnowledgeIndexJobData(
- jobId=self._read_payload_string(payload, "jobId") or f"kjob_{document.id}",
- documentId=self._read_payload_string(payload, "documentId") or document.id,
- knowledgeBaseId=self._read_payload_string(payload, "knowledgeBaseId") or document.knowledge_base_id,
- documentTitle=self._read_payload_string(payload, "documentTitle") or document.title,
- action=self._read_job_action(payload.get("action")),
- status=self._read_job_status(payload.get("status")),
- progress=self._read_payload_int(payload, "progress", 0),
- queueName=self._read_payload_string(payload, "queueName"),
- workerKey=self._read_payload_string(payload, "workerKey"),
- errorMessage=self._read_payload_string(payload, "errorMessage"),
- chunkSize=self._read_optional_payload_int(payload, "chunkSize"),
- chunkOverlap=self._read_optional_payload_int(payload, "chunkOverlap"),
- queuedTime=self._read_payload_datetime(payload, "queuedTime"),
- startedTime=self._read_payload_datetime(payload, "startedTime"),
- completedTime=self._read_payload_datetime(payload, "completedTime"))
- def _try_read_latest_index_job(self, *, document: KnowledgeDocument) -> KnowledgeIndexJobData | None:
- if not self._read_index_job_payload(document=document):
- return None
- return self._read_latest_index_job(document=document)
- def _read_index_job_payload(self, *, document: KnowledgeDocument) -> dict[str, JSONValue]:
- metadata = document.metadata_json or {}
- value = metadata.get("index_job")
- if isinstance(value, dict):
- return {str(k): v for k, v in value.items()}
- return {}
- def _read_payload_string(self, payload: dict[str, JSONValue], key: str) -> str | None:
- value = payload.get(key)
- return value if isinstance(value, str) and value else None
- def _read_payload_int(
- self,
- payload: dict[str, JSONValue],
- key: str,
- fallback: int,
- ) -> int:
- value = payload.get(key)
- if isinstance(value, int) and not isinstance(value, bool):
- return value
- return fallback
- def _read_optional_payload_int(
- self,
- payload: dict[str, JSONValue],
- key: str,
- ) -> int | None:
- value = payload.get(key)
- if isinstance(value, int) and not isinstance(value, bool):
- return value
- return None
- def _read_payload_datetime(
- self,
- payload: dict[str, JSONValue],
- key: str,
- ) -> datetime | None:
- value = payload.get(key)
- if not isinstance(value, str) or not value:
- return None
- try:
- return datetime.fromisoformat(value)
- except ValueError:
- return None
- def _read_job_action(self, value: JSONValue) -> KnowledgeIndexJobAction:
- if isinstance(value, str) and value in {"index", "reindex"}:
- return cast(KnowledgeIndexJobAction, value)
- return "reindex"
- def _read_job_status(self, value: JSONValue) -> KnowledgeIndexJobStatus:
- if isinstance(value, str) and value in {"queued", "running", "completed", "failed", "skipped"}:
- return cast(KnowledgeIndexJobStatus, value)
- return "queued"
- # ── Error handling ────────────────────────────────────────────────
- def _mark_document_failed(
- self,
- *,
- document: KnowledgeDocument,
- message: str,
- job_id: str | None = None,
- action: str = "reindex",
- worker_key: str | None = None,
- chunk_size: int | None = None,
- chunk_overlap: int | None = None,
- ) -> None:
- metadata = dict(document.metadata_json or {})
- metadata["last_error"] = {
- "message": message[:1000],
- "errorType": "indexing_failed",
- }
- if job_id is not None:
- metadata = self._write_index_job_metadata(
- document=document,
- action=action,
- job_id=job_id,
- status="failed",
- progress=100,
- chunk_size=chunk_size,
- chunk_overlap=chunk_overlap,
- worker_key=worker_key,
- completed_time=datetime.utcnow(),
- error_message=message[:1000])
- self.document_repository.update(
- document_id=document.id,
- status="failed",
- metadata_json=metadata)
|