"""Knowledge CRUD sub-service — bases, documents, chunks, content reading.""" from __future__ import annotations import base64 from typing import TYPE_CHECKING from core_shared import JSONValue from app.application._storage_mixin import _ObjectStorageMixin from app.application.document_parsers import ( DocumentParseError, ParsedDocument, parse_document_content) from app.bootstrap.settings import KnowledgeServiceSettings from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument from app.schemas.knowledge import ( KnowledgeBaseCreateRequest, KnowledgeBaseCreateRequestDto, KnowledgeBaseStatusUpdateRequest, KnowledgeBaseUpdateRequestDto, KnowledgeDocumentCreateRequestDto, KnowledgeDocumentParseRequest, KnowledgeDocumentUpdateRequestDto) if TYPE_CHECKING: from app.domain.repositories import ( KnowledgeBaseRepository, KnowledgeChunkRepository, KnowledgeDocumentRepository) from app.infrastructure.object_storage import KnowledgeObjectStorage class KnowledgeCrudService(_ObjectStorageMixin): def __init__( self, *, settings: KnowledgeServiceSettings, base_repository: KnowledgeBaseRepository, document_repository: KnowledgeDocumentRepository, chunk_repository: KnowledgeChunkRepository, object_storage: KnowledgeObjectStorage | None = None, ) -> None: self.settings = settings self.base_repository = base_repository self.document_repository = document_repository self.chunk_repository = chunk_repository self._object_storage = object_storage # ── Knowledge Base CRUD ────────────────────────────────────────── def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase: return self.base_repository.create( code=payload.code, name=payload.name, description=payload.description, metadata_json=payload.metadata_json) def create_base_from_contract( self, payload: KnowledgeBaseCreateRequestDto, ) -> KnowledgeBase: return self.create_base(KnowledgeBaseCreateRequest( code=self._build_base_code(payload.name), name=payload.name, description=payload.description, metadata_json=payload.metadata)) def list_bases(self) -> list[KnowledgeBase]: return self.base_repository.list_all() def list_bases_filtered( self, *, keyword: str | None = None, status: str | None = None, ) -> list[KnowledgeBase]: return self.base_repository.list_filtered(keyword=keyword, status=status) def update_base_from_contract( self, payload: KnowledgeBaseUpdateRequestDto, ) -> KnowledgeBase | None: return self.base_repository.update( knowledge_base_id=payload.knowledgeBaseId, name=payload.name, description=payload.description, status=payload.status, metadata_json=payload.metadata) def delete_base(self, *, knowledge_base_id: str) -> bool: documents = self.document_repository.list_by_base(knowledge_base_id=knowledge_base_id) for document in documents: self._delete_document_object(document=document) self.chunk_repository.delete_by_base(knowledge_base_id=knowledge_base_id) for document in documents: self.document_repository.delete(document_id=document.id) return self.base_repository.delete(knowledge_base_id=knowledge_base_id) def update_base_status( self, *, knowledge_base_id: str, payload: KnowledgeBaseStatusUpdateRequest, ) -> KnowledgeBase | None: return self.base_repository.update_status( knowledge_base_id=knowledge_base_id, status=payload.status) # ── Document CRUD ──────────────────────────────────────────────── def list_documents( self, *, knowledge_base_id: str, ) -> list[KnowledgeDocument]: return self.document_repository.list_by_base(knowledge_base_id=knowledge_base_id) def list_documents_filtered( self, *, knowledge_base_id: str | None = None, keyword: str | None = None, status: str | None = None, source_type: str | None = None, ) -> list[KnowledgeDocument]: return self.document_repository.list_filtered( knowledge_base_id=knowledge_base_id, keyword=keyword, status=status, source_type=source_type) def update_document_from_contract( self, payload: KnowledgeDocumentUpdateRequestDto, ) -> KnowledgeDocument | None: return self.document_repository.update( document_id=payload.documentId, title=payload.title, source_uri=payload.sourceUri, status=payload.status, metadata_json=payload.metadata) def delete_document(self, *, document_id: str) -> bool: return bool(self.delete_document_result(document_id=document_id)["deleted"]) def delete_document_result(self, *, document_id: str) -> dict[str, JSONValue]: document = self.document_repository.get_by_id(document_id=document_id) if document is None: return { "deleted": False, "objectDeleted": False, "documentId": document_id, } object_deleted = self._delete_document_object(document=document) self.chunk_repository.delete_by_document(document_id=document_id) deleted = self.document_repository.delete(document_id=document_id) is not None return { "deleted": deleted, "objectDeleted": object_deleted, "documentId": document_id, } def read_document_content( self, *, document_id: str, include_text: bool = True, include_base64: bool = False, ) -> dict[str, JSONValue] | None: document = self.document_repository.get_by_id(document_id=document_id) if document is None: return None raw_content = self._read_document_raw_content(document=document) object_status = self.read_document_storage_status(document_id=document_id) content_type = self._read_content_type_from_status(object_status) payload: dict[str, JSONValue] = { "documentId": document.id, "title": document.title, "sourceType": document.source_type, "contentType": content_type, "sizeBytes": len(raw_content), "objectStorage": self._read_object_storage_metadata(document=document), "contentBase64": None, "contentText": None, } if include_base64: payload["contentBase64"] = base64.b64encode(raw_content).decode("ascii") if include_text and self._is_text_content_type(content_type=content_type, source_type=document.source_type): payload["contentText"] = raw_content.decode("utf-8", errors="replace") return payload def read_document_storage_status(self, *, document_id: str) -> dict[str, JSONValue] | None: document = self.document_repository.get_by_id(document_id=document_id) if document is None: return None object_key = self._read_document_object_key(document=document) if object_key is None: return { "documentId": document.id, "exists": False, "objectStorage": None, "errorMessage": "document object metadata is missing", } status = self.object_storage.head_object(object_key=object_key) return self._object_status_to_payload(document=document, status=status) def read_storage_health(self) -> dict[str, JSONValue]: return dict(self.object_storage.health_check()) # ── Chunk CRUD ─────────────────────────────────────────────────── def list_chunks_filtered( self, *, knowledge_base_id: str | None = None, document_id: str | None = None, keyword: str | None = None, ) -> list[KnowledgeChunk]: return self.chunk_repository.list_filtered( knowledge_base_id=knowledge_base_id, document_id=document_id, keyword=keyword) def delete_chunk(self, *, chunk_id: str) -> bool: return self.chunk_repository.delete(chunk_id=chunk_id) # ── Parse ──────────────────────────────────────────────────────── def parse_document(self, payload: KnowledgeDocumentParseRequest) -> ParsedDocument: try: return parse_document_content( source_type=payload.source_type, content_text=payload.content_text, content_base64=payload.content_base64, source_uri=payload.source_uri) except DocumentParseError: raise # ── Private helpers ────────────────────────────────────────────── def _build_base_code(self, name: str) -> str: base = "".join( char.lower() if char.isalnum() else "_" for char in name ).strip("_") or "knowledge_base" return base[:64]