| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- """Shared object storage helper mixin for knowledge sub-services."""
- from __future__ import annotations
- from typing import TYPE_CHECKING
- from core_shared import JSONValue
- from app.bootstrap.settings import KnowledgeServiceSettings
- from app.db.models import KnowledgeDocument
- from app.infrastructure.object_storage import (
- ObjectStorageStatus,
- build_object_storage)
- if TYPE_CHECKING:
- from app.infrastructure.object_storage import KnowledgeObjectStorage
- class _ObjectStorageMixin:
- settings: KnowledgeServiceSettings
- _object_storage: KnowledgeObjectStorage | None
- @property
- def object_storage(self) -> KnowledgeObjectStorage:
- if self._object_storage is None:
- self._object_storage = build_object_storage(self.settings)
- return self._object_storage
- def _read_document_object_key(self, *, document: KnowledgeDocument) -> str | None:
- object_metadata = self._read_object_storage_metadata(document=document)
- if object_metadata is None:
- return None
- object_key = object_metadata.get("objectKey")
- return object_key if isinstance(object_key, str) and object_key else None
- def _read_object_storage_metadata(
- self,
- *,
- document: KnowledgeDocument,
- ) -> dict[str, JSONValue] | None:
- metadata = document.metadata_json or {}
- object_metadata = metadata.get("object_storage")
- return object_metadata if isinstance(object_metadata, dict) else None
- def _read_document_raw_content(self, *, document: KnowledgeDocument) -> bytes:
- object_key = self._read_document_object_key(document=document)
- if isinstance(object_key, str) and object_key:
- return self.object_storage.get_bytes(object_key=object_key)
- if document.content_text:
- return document.content_text.encode("utf-8")
- raise ValueError(f"knowledge document content object not found: {document.id}")
- def _delete_document_object(self, *, document: KnowledgeDocument) -> bool:
- from app.infrastructure.object_storage import ObjectStorageNotFoundError
- object_key = self._read_document_object_key(document=document)
- if object_key is None:
- return False
- try:
- return self.object_storage.delete_object(object_key=object_key)
- except ObjectStorageNotFoundError:
- return False
- def _guess_content_type(self, *, source_type: str) -> str:
- normalized = source_type.strip().lower().removeprefix(".")
- if normalized in {"markdown", "md"}:
- return "text/markdown; charset=utf-8"
- if normalized in {"html", "htm"}:
- return "text/html; charset=utf-8"
- if normalized == "json":
- return "application/json"
- if normalized == "csv":
- return "text/csv; charset=utf-8"
- if normalized == "pdf":
- return "application/pdf"
- if normalized in {"docx", "word"}:
- return "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
- return "text/plain; charset=utf-8"
- def _object_status_to_payload(
- self,
- *,
- document: KnowledgeDocument,
- status: ObjectStorageStatus,
- ) -> dict[str, JSONValue]:
- return {
- "documentId": document.id,
- "exists": status.exists,
- "objectStorage": self._read_object_storage_metadata(document=document),
- "contentType": status.content_type,
- "sizeBytes": status.size_bytes,
- "etag": status.etag,
- "errorMessage": status.error_message,
- }
- def _read_content_type_from_status(
- self,
- object_status: dict[str, JSONValue] | None,
- ) -> str | None:
- if object_status is None:
- return None
- content_type = object_status.get("contentType")
- return content_type if isinstance(content_type, str) else None
- def _is_text_content_type(self, *, content_type: str | None, source_type: str) -> bool:
- if content_type is not None and content_type.startswith("text/"):
- return True
- return source_type.strip().lower() in {"text", "txt", "markdown", "md", "html", "htm", "json", "csv"}
|