from __future__ import annotations import base64 import hashlib from dataclasses import dataclass from datetime import datetime, timedelta from typing import TYPE_CHECKING, cast from uuid import uuid4 from sqlalchemy.orm import Session from core_shared import JSONValue, try_build_redis_client from core_shared.task_queue import KNOWLEDGE_DOCUMENT_QUEUE, TaskQueuePublisher from app.application.document_parsers import ( DocumentParseError, ParsedDocument, normalize_source_type, parse_document_content, read_document_content_bytes) from app.application.embeddings import EmbeddingService from app.application.retrieval import ( build_chunk_payloads, cosine_similarity, keyword_score, rerank_score, stable_content_hash) from app.bootstrap.settings import KnowledgeServiceSettings from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument from app.domain.repositories import ( KnowledgeBaseRepository, KnowledgeChunkRepository, KnowledgeDocumentRepository) from app.infrastructure.object_storage import ( KnowledgeObjectStorage, ObjectStorageError, ObjectStorageNotFoundError, ObjectStorageStatus, build_document_object_key, build_object_storage) from app.schemas.knowledge import ( KnowledgeBaseCreateRequest, KnowledgeBaseCreateRequestDto, KnowledgeBaseStatusUpdateRequest, KnowledgeBaseUpdateRequestDto, KnowledgeBaseReindexRequestDto, KnowledgeDocumentCreateRequest, KnowledgeDocumentCreateRequestDto, KnowledgeIndexJobAction, KnowledgeIndexJobData, KnowledgeIndexJobStatus, KnowledgeDocumentParseRequest, KnowledgeDocumentReindexRequestDto, KnowledgeDocumentUpdateRequestDto, KnowledgeSettingsDto, KnowledgeSettingsUpdateRequestDto, KnowledgeSearchRequest) if TYPE_CHECKING: from redis import Redis @dataclass(frozen=True, slots=True) class KnowledgeDocumentIngestResult: document: KnowledgeDocument chunks: list[KnowledgeChunk] queued: bool = False job: KnowledgeIndexJobData | None = None class KnowledgeIndexingError(RuntimeError): def __init__(self, *, document_id: str, message: str) -> None: super().__init__(message) self.document_id = document_id class KnowledgeApplicationService: def __init__( self, *, settings: KnowledgeServiceSettings, base_repository: KnowledgeBaseRepository, document_repository: KnowledgeDocumentRepository, chunk_repository: KnowledgeChunkRepository, object_storage: KnowledgeObjectStorage | None = None, redis_client: Redis | None = None, task_queue_publisher: TaskQueuePublisher | None = None) -> None: self.settings = settings self.base_repository = base_repository self.document_repository = document_repository self.chunk_repository = chunk_repository self.embedding_service = EmbeddingService(settings=settings) self._object_storage = object_storage self.redis_client = redis_client self.task_queue_publisher = task_queue_publisher @property def object_storage(self) -> KnowledgeObjectStorage: if self._object_storage is None: self._object_storage = build_object_storage(self.settings) return self._object_storage def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase: return self.base_repository.create( code=payload.code, name=payload.name, description=payload.description, metadata_json=payload.metadata_json) def create_base_from_contract( self, payload: KnowledgeBaseCreateRequestDto) -> KnowledgeBase: return self.create_base( KnowledgeBaseCreateRequest( code=self._build_base_code(payload.name), name=payload.name, description=payload.description, metadata_json=payload.metadata)) def list_bases(self) -> list[KnowledgeBase]: return self.base_repository.list_all() def list_bases_filtered( self, *, keyword: str | None = None, status: str | None = None) -> list[KnowledgeBase]: return self.base_repository.list_filtered( keyword=keyword, status=status) def update_base_from_contract( self, payload: KnowledgeBaseUpdateRequestDto) -> KnowledgeBase | None: return self.base_repository.update( knowledge_base_id=payload.knowledgeBaseId, name=payload.name, description=payload.description, status=payload.status, metadata_json=payload.metadata) def delete_base(self, *, knowledge_base_id: str) -> bool: documents = self.document_repository.list_by_base( knowledge_base_id=knowledge_base_id) for document in documents: self._delete_document_object(document=document) self.chunk_repository.delete_by_base(knowledge_base_id=knowledge_base_id) for document in documents: self.document_repository.delete(document_id=document.id) return self.base_repository.delete(knowledge_base_id=knowledge_base_id) def update_base_status( self, *, knowledge_base_id: str, payload: KnowledgeBaseStatusUpdateRequest) -> KnowledgeBase | None: return self.base_repository.update_status( knowledge_base_id=knowledge_base_id, status=payload.status) def create_document( self, payload: KnowledgeDocumentCreateRequest) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]: knowledge_base = self.base_repository.get_by_id( knowledge_base_id=payload.knowledge_base_id) if knowledge_base is None: raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}") parsed = self.parse_document( KnowledgeDocumentParseRequest( source_type=payload.source_type, source_uri=payload.source_uri, content_text=payload.content_text, content_base64=payload.content_base64) ) raw_content = read_document_content_bytes( content_text=payload.content_text, content_base64=payload.content_base64) object_key = build_document_object_key( knowledge_base_id=payload.knowledge_base_id, source_type=parsed.source_type, title=payload.title) stored_object = self.object_storage.put_bytes( object_key=object_key, content=raw_content, content_type=self._guess_content_type(source_type=parsed.source_type)) document: KnowledgeDocument | None = None try: metadata_json = { **payload.metadata_json, "parser_metadata": parsed.metadata_json, "object_storage": stored_object.to_metadata(), } document = self.document_repository.create( knowledge_base_id=payload.knowledge_base_id, title=payload.title, source_type=parsed.source_type, source_uri=payload.source_uri, content_text="", content_hash=stable_content_hash(parsed.content_text), metadata_json=metadata_json) try: chunks = self._index_document( document=document, content_text=parsed.content_text, chunk_size=payload.chunk_size, chunk_overlap=payload.chunk_overlap) except Exception as exc: self._mark_document_failed(document=document, message=str(exc)) raise KnowledgeIndexingError( document_id=document.id, message=f"knowledge document indexing failed: {exc}") from exc indexed_document = self.document_repository.update_status( document_id=document.id, status="indexed") return indexed_document or document, chunks except Exception: if document is None: try: self.object_storage.delete_object(object_key=stored_object.object_key) except ObjectStorageError: pass raise def create_document_from_contract( self, payload: KnowledgeDocumentCreateRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]: return self.create_document( KnowledgeDocumentCreateRequest( knowledge_base_id=payload.knowledgeBaseId, title=payload.title, content_text=payload.contentText, content_base64=payload.contentBase64, source_type=payload.sourceType, source_uri=payload.sourceUri, metadata_json=payload.metadata, chunk_size=payload.chunkSize, chunk_overlap=payload.chunkOverlap)) def create_document_from_contract_result( self, payload: KnowledgeDocumentCreateRequestDto) -> KnowledgeDocumentIngestResult: request = KnowledgeDocumentCreateRequest( knowledge_base_id=payload.knowledgeBaseId, title=payload.title, content_text=payload.contentText, content_base64=payload.contentBase64, source_type=payload.sourceType, source_uri=payload.sourceUri, metadata_json=payload.metadata, chunk_size=payload.chunkSize, chunk_overlap=payload.chunkOverlap) if self._should_queue_indexing(async_mode=payload.asyncMode): return self.create_document_index_job(payload=request) document, chunks = self.create_document(request) return KnowledgeDocumentIngestResult( document=document, chunks=chunks) def create_document_index_job( self, payload: KnowledgeDocumentCreateRequest) -> KnowledgeDocumentIngestResult: knowledge_base = self.base_repository.get_by_id( knowledge_base_id=payload.knowledge_base_id) if knowledge_base is None: raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}") raw_content = read_document_content_bytes( content_text=payload.content_text, content_base64=payload.content_base64) source_type = normalize_source_type( source_type=payload.source_type, source_uri=payload.source_uri) object_key = build_document_object_key( knowledge_base_id=payload.knowledge_base_id, source_type=source_type, title=payload.title) stored_object = self.object_storage.put_bytes( object_key=object_key, content=raw_content, content_type=self._guess_content_type(source_type=source_type)) document: KnowledgeDocument | None = None try: document = self.document_repository.create( knowledge_base_id=payload.knowledge_base_id, title=payload.title, source_type=source_type, source_uri=payload.source_uri, content_text="", content_hash=hashlib.sha256(raw_content).hexdigest(), metadata_json={ **payload.metadata_json, "object_storage": stored_object.to_metadata(), }, status="draft") queued_document, job = self.queue_document_indexing( document=document, action="index", chunk_size=payload.chunk_size, chunk_overlap=payload.chunk_overlap) except Exception: if document is None: try: self.object_storage.delete_object(object_key=stored_object.object_key) except ObjectStorageError: pass raise return KnowledgeDocumentIngestResult( document=queued_document, chunks=[], queued=True, job=job) def parse_document(self, payload: KnowledgeDocumentParseRequest) -> ParsedDocument: try: return parse_document_content( source_type=payload.source_type, content_text=payload.content_text, content_base64=payload.content_base64, source_uri=payload.source_uri) except DocumentParseError: raise def queue_document_indexing( self, *, document: KnowledgeDocument, action: str, chunk_size: int | None = None, chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, KnowledgeIndexJobData]: job_id = f"kjob_{uuid4().hex}" metadata = self._write_index_job_metadata( document=document, action=action, job_id=job_id, status="queued", progress=0, chunk_size=chunk_size, chunk_overlap=chunk_overlap) updated_document = self.document_repository.update( document_id=document.id, status="queued", metadata_json=metadata) document_for_job = updated_document or document published = self._publish_document_index_job( document_id=document.id, action=action, job_id=job_id) if not published: metadata = self._write_index_job_metadata( document=document_for_job, action=action, job_id=job_id, status="running", progress=1, chunk_size=chunk_size, chunk_overlap=chunk_overlap, worker_key="inline-fallback") document_for_job = self.document_repository.update( document_id=document.id, status="indexing", metadata_json=metadata) or document_for_job processed = self.process_document_index_job( document_id=document.id, action=action, job_id=job_id, worker_key="inline-fallback", chunk_size=chunk_size, chunk_overlap=chunk_overlap) if processed is not None: indexed_document, chunks = processed return indexed_document, self._read_latest_index_job(document=indexed_document) return document_for_job, self._read_latest_index_job(document=document_for_job) def list_documents( self, *, knowledge_base_id: str) -> list[KnowledgeDocument]: return self.document_repository.list_by_base( knowledge_base_id=knowledge_base_id) def list_documents_filtered( self, *, knowledge_base_id: str | None = None, keyword: str | None = None, status: str | None = None, source_type: str | None = None) -> list[KnowledgeDocument]: return self.document_repository.list_filtered( knowledge_base_id=knowledge_base_id, keyword=keyword, status=status, source_type=source_type) def update_document_from_contract( self, payload: KnowledgeDocumentUpdateRequestDto) -> KnowledgeDocument | None: return self.document_repository.update( document_id=payload.documentId, title=payload.title, source_uri=payload.sourceUri, status=payload.status, metadata_json=payload.metadata) def reindex_document( self, payload: KnowledgeDocumentReindexRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None: document = self.document_repository.get_by_id(document_id=payload.documentId) if document is None: return None try: parsed = self._parse_document_for_indexing(document=document) chunks = self._index_document( document=document, content_text=parsed.content_text, chunk_size=payload.chunkSize, chunk_overlap=payload.chunkOverlap) except Exception as exc: self._mark_document_failed(document=document, message=str(exc)) raise KnowledgeIndexingError( document_id=document.id, message=f"knowledge document reindex failed: {exc}") from exc metadata = dict(document.metadata_json or {}) metadata["parser_metadata"] = parsed.metadata_json indexed_document = self.document_repository.update( document_id=document.id, status="indexed", metadata_json=metadata) return indexed_document or document, chunks def reindex_document_from_contract_result( self, payload: KnowledgeDocumentReindexRequestDto) -> KnowledgeDocumentIngestResult | None: if self._should_queue_indexing(async_mode=payload.asyncMode): document = self.document_repository.get_by_id(document_id=payload.documentId) if document is None: return None queued_document, job = self.queue_document_indexing( document=document, action="reindex", chunk_size=payload.chunkSize, chunk_overlap=payload.chunkOverlap) return KnowledgeDocumentIngestResult( document=queued_document, chunks=[], queued=True, job=job) result = self.reindex_document(payload) if result is None: return None document, chunks = result return KnowledgeDocumentIngestResult( document=document, chunks=chunks) def reindex_base_from_contract( self, payload: KnowledgeBaseReindexRequestDto) -> list[KnowledgeIndexJobData]: documents = self.document_repository.list_by_base( knowledge_base_id=payload.knowledgeBaseId) jobs: list[KnowledgeIndexJobData] = [] for document in documents: if document.status == "archived": continue queued_document, job = self.queue_document_indexing( document=document, action="reindex", chunk_size=payload.chunkSize, chunk_overlap=payload.chunkOverlap) jobs.append(job or self._read_latest_index_job(document=queued_document)) return jobs def process_document_index_job( self, *, document_id: str, action: str, worker_key: str, job_id: str | None = None, chunk_size: int | None = None, chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None: document = self.document_repository.get_by_id(document_id=document_id) if document is None: return None resolved_job = self._read_latest_index_job(document=document) resolved_job_id = job_id or resolved_job.jobId resolved_chunk_size = chunk_size if chunk_size is not None else resolved_job.chunkSize resolved_chunk_overlap = chunk_overlap if chunk_overlap is not None else resolved_job.chunkOverlap if document.status == "archived": metadata = self._write_index_job_metadata( document=document, action=action, job_id=resolved_job_id, status="skipped", progress=100, chunk_size=resolved_chunk_size, chunk_overlap=resolved_chunk_overlap, worker_key=worker_key, completed_time=datetime.utcnow(), error_message="document is archived") skipped_document = self.document_repository.update( document_id=document.id, metadata_json=metadata) or document return skipped_document, [] metadata = self._write_index_job_metadata( document=document, action=action, job_id=resolved_job_id, status="running", progress=10, chunk_size=resolved_chunk_size, chunk_overlap=resolved_chunk_overlap, worker_key=worker_key, started_time=datetime.utcnow()) running_document = self.document_repository.update( document_id=document.id, status="indexing", metadata_json=metadata) or document try: parsed = self._parse_document_for_indexing(document=running_document) metadata = self._write_index_job_metadata( document=running_document, action=action, job_id=resolved_job_id, status="running", progress=40, chunk_size=resolved_chunk_size, chunk_overlap=resolved_chunk_overlap, worker_key=worker_key, started_time=self._read_latest_index_job(document=running_document).startedTime) metadata["parser_metadata"] = parsed.metadata_json running_document = self.document_repository.update( document_id=document.id, status="indexing", metadata_json=metadata) or running_document chunks = self._index_document( document=running_document, content_text=parsed.content_text, chunk_size=resolved_chunk_size, chunk_overlap=resolved_chunk_overlap) except Exception as exc: self._mark_document_failed( document=running_document, message=str(exc), job_id=resolved_job_id, action=action, worker_key=worker_key, chunk_size=resolved_chunk_size, chunk_overlap=resolved_chunk_overlap) raise KnowledgeIndexingError( document_id=document.id, message=f"knowledge document {action} failed: {exc}") from exc metadata = self._write_index_job_metadata( document=running_document, action=action, job_id=resolved_job_id, status="completed", progress=100, chunk_size=resolved_chunk_size, chunk_overlap=resolved_chunk_overlap, worker_key=worker_key, completed_time=datetime.utcnow()) metadata["parser_metadata"] = parsed.metadata_json indexed_document = self.document_repository.update( document_id=document.id, status="indexed", metadata_json=metadata) return indexed_document or running_document, chunks def execute_document_index_job( self, *, document_id: str, action: str, worker_key: str, lease_seconds: int, job_id: str | None = None, redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None: resolved_redis_client = redis_client or self.redis_client idempotency_key = f"{document_id}:{job_id or action}" lock = None idempotency_store = None if resolved_redis_client is not None: from core_shared.redis_primitives import DistributedLock, IdempotencyStore lock = DistributedLock( client=resolved_redis_client, name=f"knowledge-document:{document_id}:lock", ttl_seconds=lease_seconds) if not lock.acquire(): return None idempotency_store = IdempotencyStore( client=resolved_redis_client, prefix="knowledge-document-idempotency") if not idempotency_store.begin(key=idempotency_key): lock.release() return None try: result = self.process_document_index_job( document_id=document_id, action=action, job_id=job_id, worker_key=worker_key) if idempotency_store is not None and result is not None: document, chunks = result idempotency_store.complete( key=idempotency_key, result={ "status": document.status, "document_id": document.id, "chunk_count": len(chunks), }) except Exception: if idempotency_store is not None: idempotency_store.clear(key=idempotency_key) raise finally: if lock is not None: lock.release() return result def execute_next_pending_document_job( self, *, worker_key: str, lease_seconds: int, stale_indexing_seconds: int, redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None: stale_before = datetime.utcnow() - timedelta(seconds=stale_indexing_seconds) document = self.document_repository.get_next_pending_indexing( stale_before=stale_before) if document is None: return None job = self._read_latest_index_job(document=document) return self.execute_document_index_job( document_id=document.id, action=job.action, job_id=job.jobId, worker_key=worker_key, lease_seconds=lease_seconds, redis_client=redis_client) def list_index_jobs( self, *, knowledge_base_id: str | None = None, document_id: str | None = None, status: str | None = None) -> list[KnowledgeIndexJobData]: documents = self.document_repository.list_filtered( knowledge_base_id=knowledge_base_id) jobs: list[KnowledgeIndexJobData] = [] for document in documents: if document_id is not None and document.id != document_id: continue job = self._try_read_latest_index_job(document=document) if job is None: continue if status is not None and job.status != status: continue jobs.append(job) jobs.sort( key=lambda item: item.queuedTime or datetime.min, reverse=True) return jobs def detail_index_job(self, *, document_id: str) -> KnowledgeIndexJobData | None: document = self.document_repository.get_by_id(document_id=document_id) if document is None: return None return self._try_read_latest_index_job(document=document) def delete_document(self, *, document_id: str) -> bool: return bool(self.delete_document_result(document_id=document_id)["deleted"]) def delete_document_result(self, *, document_id: str) -> dict[str, JSONValue]: document = self.document_repository.get_by_id(document_id=document_id) if document is None: return { "deleted": False, "objectDeleted": False, "documentId": document_id, } object_deleted = self._delete_document_object(document=document) self.chunk_repository.delete_by_document(document_id=document_id) deleted = self.document_repository.delete(document_id=document_id) is not None return { "deleted": deleted, "objectDeleted": object_deleted, "documentId": document_id, } def read_document_content( self, *, document_id: str, include_text: bool = True, include_base64: bool = False) -> dict[str, JSONValue] | None: document = self.document_repository.get_by_id(document_id=document_id) if document is None: return None raw_content = self._read_document_raw_content(document=document) object_status = self.read_document_storage_status(document_id=document_id) content_type = self._read_content_type_from_status(object_status) payload: dict[str, JSONValue] = { "documentId": document.id, "title": document.title, "sourceType": document.source_type, "contentType": content_type, "sizeBytes": len(raw_content), "objectStorage": self._read_object_storage_metadata(document=document), "contentBase64": None, "contentText": None, } if include_base64: payload["contentBase64"] = base64.b64encode(raw_content).decode("ascii") if include_text and self._is_text_content_type(content_type=content_type, source_type=document.source_type): payload["contentText"] = raw_content.decode("utf-8", errors="replace") return payload def read_document_storage_status(self, *, document_id: str) -> dict[str, JSONValue] | None: document = self.document_repository.get_by_id(document_id=document_id) if document is None: return None object_key = self._read_document_object_key(document=document) if object_key is None: return { "documentId": document.id, "exists": False, "objectStorage": None, "errorMessage": "document object metadata is missing", } status = self.object_storage.head_object(object_key=object_key) return self._object_status_to_payload(document=document, status=status) def read_storage_health(self) -> dict[str, JSONValue]: return dict(self.object_storage.health_check()) def list_chunks_filtered( self, *, knowledge_base_id: str | None = None, document_id: str | None = None, keyword: str | None = None) -> list[KnowledgeChunk]: return self.chunk_repository.list_filtered( knowledge_base_id=knowledge_base_id, document_id=document_id, keyword=keyword) def delete_chunk(self, *, chunk_id: str) -> bool: return self.chunk_repository.delete(chunk_id=chunk_id) def search( self, payload: KnowledgeSearchRequest) -> list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]]: document_cache: dict[str, KnowledgeDocument] = {} query_embedding_result = self.embedding_service.embed_text(payload.query) candidate_limit = max( payload.top_k * max(self.settings.retrieval_candidate_multiplier, 1), payload.top_k) vector_candidates = self.chunk_repository.search_by_vector( knowledge_base_id=payload.knowledge_base_id, embedding=query_embedding_result.embedding, limit=candidate_limit) if vector_candidates: chunks = [chunk for chunk, _ in vector_candidates] vector_scores_by_chunk_id = { chunk.id: score for chunk, score in vector_candidates } retrieval_mode = "pgvector-hybrid" else: chunks = self.chunk_repository.list_by_base( knowledge_base_id=payload.knowledge_base_id) vector_scores_by_chunk_id = {} retrieval_mode = "hybrid" scored: list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]] = [] for chunk in chunks: document = document_cache.get(chunk.document_id) if document is None: document = self.document_repository.get_by_id( document_id=chunk.document_id) if document is None: continue document_cache[chunk.document_id] = document if not self._matches_filters(document=document, filters_json=payload.filters_json): continue keyword = keyword_score(payload.query, chunk.content_text) vector = vector_scores_by_chunk_id.get(chunk.id) if vector is None: vector = cosine_similarity(query_embedding_result.embedding, chunk.embedding_json) rerank = ( rerank_score( query=payload.query, chunk_text=chunk.content_text, document_title=document.title) if self.settings.retrieval_rerank_enabled else 0.0 ) score = round( keyword * self.settings.retrieval_keyword_weight + vector * self.settings.retrieval_vector_weight + rerank * self.settings.retrieval_rerank_weight, 6) scored.append( ( chunk, document, score, { "final_score": score, "keyword_score": round(keyword, 6), "vector_score": round(vector, 6), "rerank_score": round(rerank, 6), "retrieval_mode": retrieval_mode, "rerank_enabled": self.settings.retrieval_rerank_enabled, "candidate_limit": candidate_limit, "weights": { "keyword": self.settings.retrieval_keyword_weight, "vector": self.settings.retrieval_vector_weight, "rerank": self.settings.retrieval_rerank_weight, }, "embedding_provider": query_embedding_result.provider, "embedding_model": query_embedding_result.model, "citation": { "document_id": document.id, "document_title": document.title, "source_uri": document.source_uri, "chunk_id": chunk.id, "chunk_index": chunk.chunk_index, }, }) ) scored.sort(key=lambda item: item[2], reverse=True) return scored[: payload.top_k] def _index_document( self, *, document: KnowledgeDocument, content_text: str, chunk_size: int | None, chunk_overlap: int | None) -> list[KnowledgeChunk]: chunk_payloads = build_chunk_payloads( content_text=content_text, chunk_size=chunk_size or self.settings.default_chunk_size, chunk_overlap=chunk_overlap or self.settings.default_chunk_overlap) for chunk_payload in chunk_payloads: content_text = self._read_chunk_content(chunk_payload) embedding_result = self.embedding_service.embed_text(content_text) chunk_payload["embedding_model"] = embedding_result.model chunk_payload["embedding_json"] = embedding_result.embedding chunk_payload["metadata_json"] = { "embedding_provider": embedding_result.provider, } return self.chunk_repository.replace_document_chunks( knowledge_base_id=document.knowledge_base_id, document_id=document.id, chunks=chunk_payloads) def _read_chunk_content(self, chunk_payload: dict[str, JSONValue]) -> str: value = chunk_payload.get("content_text") return value if isinstance(value, str) else "" def _parse_document_for_indexing(self, *, document: KnowledgeDocument) -> ParsedDocument: metadata = document.metadata_json or {} object_metadata = metadata.get("object_storage") if isinstance(object_metadata, dict): object_key = object_metadata.get("objectKey") if isinstance(object_key, str) and object_key: try: raw_content = self.object_storage.get_bytes(object_key=object_key) except ObjectStorageNotFoundError as exc: raise ValueError(f"knowledge document content object not found: {document.id}") from exc return parse_document_content( source_type=document.source_type, source_uri=document.source_uri, content_base64=base64.b64encode(raw_content).decode("ascii")) if document.content_text: return parse_document_content( source_type=document.source_type, source_uri=document.source_uri, content_text=document.content_text) raise ValueError(f"knowledge document content object not found: {document.id}") def _read_document_content_for_indexing(self, *, document: KnowledgeDocument) -> str: return self._parse_document_for_indexing(document=document).content_text def _read_document_raw_content(self, *, document: KnowledgeDocument) -> bytes: object_key = self._read_document_object_key(document=document) if isinstance(object_key, str) and object_key: return self.object_storage.get_bytes(object_key=object_key) if document.content_text: return document.content_text.encode("utf-8") raise ValueError(f"knowledge document content object not found: {document.id}") def _delete_document_object(self, *, document: KnowledgeDocument) -> bool: object_key = self._read_document_object_key(document=document) if object_key is None: return False try: return self.object_storage.delete_object(object_key=object_key) except ObjectStorageNotFoundError: return False def _read_document_object_key(self, *, document: KnowledgeDocument) -> str | None: object_metadata = self._read_object_storage_metadata(document=document) if object_metadata is None: return None object_key = object_metadata.get("objectKey") return object_key if isinstance(object_key, str) and object_key else None def _read_object_storage_metadata( self, *, document: KnowledgeDocument) -> dict[str, JSONValue] | None: metadata = document.metadata_json or {} object_metadata = metadata.get("object_storage") return object_metadata if isinstance(object_metadata, dict) else None def _object_status_to_payload( self, *, document: KnowledgeDocument, status: ObjectStorageStatus) -> dict[str, JSONValue]: return { "documentId": document.id, "exists": status.exists, "objectStorage": self._read_object_storage_metadata(document=document), "contentType": status.content_type, "sizeBytes": status.size_bytes, "etag": status.etag, "errorMessage": status.error_message, } def _read_content_type_from_status( self, object_status: dict[str, JSONValue] | None) -> str | None: if object_status is None: return None content_type = object_status.get("contentType") return content_type if isinstance(content_type, str) else None def _is_text_content_type(self, *, content_type: str | None, source_type: str) -> bool: if content_type is not None and content_type.startswith("text/"): return True return source_type.strip().lower() in {"text", "txt", "markdown", "md", "html", "htm", "json", "csv"} def _should_queue_indexing(self, *, async_mode: bool | None) -> bool: if async_mode is False: return False if not self.settings.async_indexing_enabled and async_mode is None: return False return self.task_queue_publisher is not None def _publish_document_index_job( self, *, document_id: str, action: str, job_id: str) -> bool: if self.task_queue_publisher is None: return False return self.task_queue_publisher.publish_knowledge_document( document_id=document_id, action=action, job_id=job_id) def _write_index_job_metadata( self, *, document: KnowledgeDocument, action: str, job_id: str, status: str, progress: int, chunk_size: int | None = None, chunk_overlap: int | None = None, worker_key: str | None = None, started_time: datetime | None = None, completed_time: datetime | None = None, error_message: str | None = None) -> dict[str, JSONValue]: metadata = dict(document.metadata_json or {}) existing_job = self._read_index_job_payload(document=document) queued_time = existing_job.get("queuedTime") if not isinstance(queued_time, str): queued_time = datetime.utcnow().isoformat() resolved_started_time = ( started_time.isoformat() if started_time is not None else existing_job.get("startedTime") ) resolved_completed_time = ( completed_time.isoformat() if completed_time is not None else existing_job.get("completedTime") ) job_payload: dict[str, JSONValue] = { "jobId": job_id, "documentId": document.id, "knowledgeBaseId": document.knowledge_base_id, "documentTitle": document.title, "action": action if action in {"index", "reindex"} else "reindex", "status": status, "progress": max(0, min(progress, 100)), "queueName": KNOWLEDGE_DOCUMENT_QUEUE, "workerKey": worker_key, "errorMessage": error_message, "chunkSize": chunk_size, "chunkOverlap": chunk_overlap, "queuedTime": queued_time, "startedTime": resolved_started_time if isinstance(resolved_started_time, str) else None, "completedTime": resolved_completed_time if isinstance(resolved_completed_time, str) else None, } metadata["index_job"] = job_payload return metadata def _read_latest_index_job(self, *, document: KnowledgeDocument) -> KnowledgeIndexJobData: payload = self._read_index_job_payload(document=document) return KnowledgeIndexJobData( jobId=self._read_payload_string(payload, "jobId") or f"kjob_{document.id}", documentId=self._read_payload_string(payload, "documentId") or document.id, knowledgeBaseId=self._read_payload_string(payload, "knowledgeBaseId") or document.knowledge_base_id, documentTitle=self._read_payload_string(payload, "documentTitle") or document.title, action=self._read_job_action(payload.get("action")), status=self._read_job_status(payload.get("status")), progress=self._read_payload_int(payload, "progress", 0), queueName=self._read_payload_string(payload, "queueName"), workerKey=self._read_payload_string(payload, "workerKey"), errorMessage=self._read_payload_string(payload, "errorMessage"), chunkSize=self._read_optional_payload_int(payload, "chunkSize"), chunkOverlap=self._read_optional_payload_int(payload, "chunkOverlap"), queuedTime=self._read_payload_datetime(payload, "queuedTime"), startedTime=self._read_payload_datetime(payload, "startedTime"), completedTime=self._read_payload_datetime(payload, "completedTime")) def _try_read_latest_index_job(self, *, document: KnowledgeDocument) -> KnowledgeIndexJobData | None: if not self._read_index_job_payload(document=document): return None return self._read_latest_index_job(document=document) def _read_index_job_payload(self, *, document: KnowledgeDocument) -> dict[str, JSONValue]: metadata = document.metadata_json or {} value = metadata.get("index_job") if isinstance(value, dict): return {str(item_key): item_value for item_key, item_value in value.items()} return {} def _read_payload_string( self, payload: dict[str, JSONValue], key: str) -> str | None: value = payload.get(key) return value if isinstance(value, str) and value else None def _read_payload_int( self, payload: dict[str, JSONValue], key: str, fallback: int) -> int: value = payload.get(key) if isinstance(value, int) and not isinstance(value, bool): return value return fallback def _read_optional_payload_int( self, payload: dict[str, JSONValue], key: str) -> int | None: value = payload.get(key) if isinstance(value, int) and not isinstance(value, bool): return value return None def _read_payload_datetime( self, payload: dict[str, JSONValue], key: str) -> datetime | None: value = payload.get(key) if not isinstance(value, str) or not value: return None try: return datetime.fromisoformat(value) except ValueError: return None def _read_job_action(self, value: JSONValue) -> KnowledgeIndexJobAction: if isinstance(value, str) and value in {"index", "reindex"}: return cast(KnowledgeIndexJobAction, value) return "reindex" def _read_job_status(self, value: JSONValue) -> KnowledgeIndexJobStatus: if isinstance(value, str) and value in {"queued", "running", "completed", "failed", "skipped"}: return cast(KnowledgeIndexJobStatus, value) return "queued" def _mark_document_failed( self, *, document: KnowledgeDocument, message: str, job_id: str | None = None, action: str = "reindex", worker_key: str | None = None, chunk_size: int | None = None, chunk_overlap: int | None = None) -> None: metadata = dict(document.metadata_json or {}) metadata["last_error"] = { "message": message[:1000], "errorType": "indexing_failed", } if job_id is not None: metadata = self._write_index_job_metadata( document=document, action=action, job_id=job_id, status="failed", progress=100, chunk_size=chunk_size, chunk_overlap=chunk_overlap, worker_key=worker_key, completed_time=datetime.utcnow(), error_message=message[:1000]) self.document_repository.update( document_id=document.id, status="failed", metadata_json=metadata) def _guess_content_type(self, *, source_type: str) -> str: normalized = source_type.strip().lower().removeprefix(".") if normalized in {"markdown", "md"}: return "text/markdown; charset=utf-8" if normalized in {"html", "htm"}: return "text/html; charset=utf-8" if normalized == "json": return "application/json" if normalized == "csv": return "text/csv; charset=utf-8" if normalized == "pdf": return "application/pdf" if normalized in {"docx", "word"}: return "application/vnd.openxmlformats-officedocument.wordprocessingml.document" return "text/plain; charset=utf-8" def _matches_filters( self, *, document: KnowledgeDocument, filters_json: dict[str, JSONValue]) -> bool: source_type = filters_json.get("sourceType") or filters_json.get("source_type") if isinstance(source_type, str) and document.source_type != source_type: return False status = filters_json.get("status") if isinstance(status, str) and document.status != status: return False return True def read_settings(self, *, knowledge_base_id: str | None = None) -> KnowledgeSettingsDto: base_config: dict[str, JSONValue] = {} if knowledge_base_id: base = self.base_repository.get_by_id(knowledge_base_id=knowledge_base_id) if base is not None and isinstance(base.metadata_json, dict): value = base.metadata_json.get("retrieval_config") if isinstance(value, dict): base_config = value defaults = KnowledgeSettingsDto( knowledgeBaseId=knowledge_base_id, chunkSize=self.settings.default_chunk_size, chunkOverlap=self.settings.default_chunk_overlap, keywordWeight=self.settings.retrieval_keyword_weight, vectorWeight=self.settings.retrieval_vector_weight, rerankWeight=self.settings.retrieval_rerank_weight, queryRewrite=False, requireCitations=True) return KnowledgeSettingsDto.model_validate({ **defaults.model_dump(), **base_config, "knowledgeBaseId": knowledge_base_id, }) def update_settings( self, payload: KnowledgeSettingsUpdateRequestDto) -> KnowledgeSettingsDto: settings = KnowledgeSettingsDto.model_validate({ **payload.model_dump(), "knowledgeBaseId": payload.knowledgeBaseId, }) if payload.knowledgeBaseId: base = self.base_repository.get_by_id(knowledge_base_id=payload.knowledgeBaseId) if base is not None: metadata = dict(base.metadata_json or {}) metadata["retrieval_config"] = settings.model_dump(exclude={"knowledgeBaseId"}) self.base_repository.update( knowledge_base_id=payload.knowledgeBaseId, metadata_json=metadata) return settings def _build_base_code(self, name: str) -> str: base = "".join( char.lower() if char.isalnum() else "_" for char in name ).strip("_") or "knowledge_base" return base[:64] def build_knowledge_application_service( *, db: Session, settings: KnowledgeServiceSettings) -> KnowledgeApplicationService: redis_client = try_build_redis_client(settings.redis_url) return KnowledgeApplicationService( settings=settings, base_repository=KnowledgeBaseRepository(db), document_repository=KnowledgeDocumentRepository(db), chunk_repository=KnowledgeChunkRepository(db), redis_client=redis_client, task_queue_publisher=( TaskQueuePublisher(client=redis_client) if redis_client is not None else None ))