| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- """Knowledge CRUD sub-service — bases, documents, chunks, content reading."""
- from __future__ import annotations
- import base64
- from typing import TYPE_CHECKING
- from core_shared import JSONValue
- from app.application._storage_mixin import _ObjectStorageMixin
- from app.application.document_parsers import (
- DocumentParseError,
- ParsedDocument,
- parse_document_content)
- from app.bootstrap.settings import KnowledgeServiceSettings
- from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument
- from app.schemas.knowledge import (
- KnowledgeBaseCreateRequest,
- KnowledgeBaseCreateRequestDto,
- KnowledgeBaseStatusUpdateRequest,
- KnowledgeBaseUpdateRequestDto,
- KnowledgeDocumentCreateRequestDto,
- KnowledgeDocumentParseRequest,
- KnowledgeDocumentUpdateRequestDto)
- if TYPE_CHECKING:
- from app.domain.repositories import (
- KnowledgeBaseRepository,
- KnowledgeChunkRepository,
- KnowledgeDocumentRepository)
- from app.infrastructure.object_storage import KnowledgeObjectStorage
- class KnowledgeCrudService(_ObjectStorageMixin):
- def __init__(
- self,
- *,
- settings: KnowledgeServiceSettings,
- base_repository: KnowledgeBaseRepository,
- document_repository: KnowledgeDocumentRepository,
- chunk_repository: KnowledgeChunkRepository,
- object_storage: KnowledgeObjectStorage | None = None,
- ) -> None:
- self.settings = settings
- self.base_repository = base_repository
- self.document_repository = document_repository
- self.chunk_repository = chunk_repository
- self._object_storage = object_storage
- # ── Knowledge Base CRUD ──────────────────────────────────────────
- def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase:
- return self.base_repository.create(
- code=payload.code,
- name=payload.name,
- description=payload.description,
- metadata_json=payload.metadata_json)
- def create_base_from_contract(
- self,
- payload: KnowledgeBaseCreateRequestDto,
- ) -> KnowledgeBase:
- return self.create_base(KnowledgeBaseCreateRequest(
- code=self._build_base_code(payload.name),
- name=payload.name,
- description=payload.description,
- metadata_json=payload.metadata))
- def list_bases(self) -> list[KnowledgeBase]:
- return self.base_repository.list_all()
- def list_bases_filtered(
- self,
- *,
- keyword: str | None = None,
- status: str | None = None,
- ) -> list[KnowledgeBase]:
- return self.base_repository.list_filtered(keyword=keyword, status=status)
- def update_base_from_contract(
- self,
- payload: KnowledgeBaseUpdateRequestDto,
- ) -> KnowledgeBase | None:
- return self.base_repository.update(
- knowledge_base_id=payload.knowledgeBaseId,
- name=payload.name,
- description=payload.description,
- status=payload.status,
- metadata_json=payload.metadata)
- def delete_base(self, *, knowledge_base_id: str) -> bool:
- documents = self.document_repository.list_by_base(knowledge_base_id=knowledge_base_id)
- for document in documents:
- self._delete_document_object(document=document)
- self.chunk_repository.delete_by_base(knowledge_base_id=knowledge_base_id)
- for document in documents:
- self.document_repository.delete(document_id=document.id)
- return self.base_repository.delete(knowledge_base_id=knowledge_base_id)
- def update_base_status(
- self,
- *,
- knowledge_base_id: str,
- payload: KnowledgeBaseStatusUpdateRequest,
- ) -> KnowledgeBase | None:
- return self.base_repository.update_status(
- knowledge_base_id=knowledge_base_id,
- status=payload.status)
- # ── Document CRUD ────────────────────────────────────────────────
- def list_documents(
- self,
- *,
- knowledge_base_id: str,
- ) -> list[KnowledgeDocument]:
- return self.document_repository.list_by_base(knowledge_base_id=knowledge_base_id)
- def list_documents_filtered(
- self,
- *,
- knowledge_base_id: str | None = None,
- keyword: str | None = None,
- status: str | None = None,
- source_type: str | None = None,
- ) -> list[KnowledgeDocument]:
- return self.document_repository.list_filtered(
- knowledge_base_id=knowledge_base_id,
- keyword=keyword,
- status=status,
- source_type=source_type)
- def update_document_from_contract(
- self,
- payload: KnowledgeDocumentUpdateRequestDto,
- ) -> KnowledgeDocument | None:
- return self.document_repository.update(
- document_id=payload.documentId,
- title=payload.title,
- source_uri=payload.sourceUri,
- status=payload.status,
- metadata_json=payload.metadata)
- def delete_document(self, *, document_id: str) -> bool:
- return bool(self.delete_document_result(document_id=document_id)["deleted"])
- def delete_document_result(self, *, document_id: str) -> dict[str, JSONValue]:
- document = self.document_repository.get_by_id(document_id=document_id)
- if document is None:
- return {
- "deleted": False,
- "objectDeleted": False,
- "documentId": document_id,
- }
- object_deleted = self._delete_document_object(document=document)
- self.chunk_repository.delete_by_document(document_id=document_id)
- deleted = self.document_repository.delete(document_id=document_id) is not None
- return {
- "deleted": deleted,
- "objectDeleted": object_deleted,
- "documentId": document_id,
- }
- def read_document_content(
- self,
- *,
- document_id: str,
- include_text: bool = True,
- include_base64: bool = False,
- ) -> dict[str, JSONValue] | None:
- document = self.document_repository.get_by_id(document_id=document_id)
- if document is None:
- return None
- raw_content = self._read_document_raw_content(document=document)
- object_status = self.read_document_storage_status(document_id=document_id)
- content_type = self._read_content_type_from_status(object_status)
- payload: dict[str, JSONValue] = {
- "documentId": document.id,
- "title": document.title,
- "sourceType": document.source_type,
- "contentType": content_type,
- "sizeBytes": len(raw_content),
- "objectStorage": self._read_object_storage_metadata(document=document),
- "contentBase64": None,
- "contentText": None,
- }
- if include_base64:
- payload["contentBase64"] = base64.b64encode(raw_content).decode("ascii")
- if include_text and self._is_text_content_type(content_type=content_type, source_type=document.source_type):
- payload["contentText"] = raw_content.decode("utf-8", errors="replace")
- return payload
- def read_document_storage_status(self, *, document_id: str) -> dict[str, JSONValue] | None:
- document = self.document_repository.get_by_id(document_id=document_id)
- if document is None:
- return None
- object_key = self._read_document_object_key(document=document)
- if object_key is None:
- return {
- "documentId": document.id,
- "exists": False,
- "objectStorage": None,
- "errorMessage": "document object metadata is missing",
- }
- status = self.object_storage.head_object(object_key=object_key)
- return self._object_status_to_payload(document=document, status=status)
- def read_storage_health(self) -> dict[str, JSONValue]:
- return dict(self.object_storage.health_check())
- # ── Chunk CRUD ───────────────────────────────────────────────────
- def list_chunks_filtered(
- self,
- *,
- knowledge_base_id: str | None = None,
- document_id: str | None = None,
- keyword: str | None = None,
- ) -> list[KnowledgeChunk]:
- return self.chunk_repository.list_filtered(
- knowledge_base_id=knowledge_base_id,
- document_id=document_id,
- keyword=keyword)
- def delete_chunk(self, *, chunk_id: str) -> bool:
- return self.chunk_repository.delete(chunk_id=chunk_id)
- # ── Parse ────────────────────────────────────────────────────────
- def parse_document(self, payload: KnowledgeDocumentParseRequest) -> ParsedDocument:
- try:
- return parse_document_content(
- source_type=payload.source_type,
- content_text=payload.content_text,
- content_base64=payload.content_base64,
- source_uri=payload.source_uri)
- except DocumentParseError:
- raise
- # ── Private helpers ──────────────────────────────────────────────
- def _build_base_code(self, name: str) -> str:
- base = "".join(
- char.lower() if char.isalnum() else "_"
- for char in name
- ).strip("_") or "knowledge_base"
- return base[:64]
|