_storage_mixin.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. """Shared object storage helper mixin for knowledge sub-services."""
  2. from __future__ import annotations
  3. from typing import TYPE_CHECKING
  4. from core_shared import JSONValue
  5. from app.bootstrap.settings import KnowledgeServiceSettings
  6. from app.db.models import KnowledgeDocument
  7. from app.infrastructure.object_storage import (
  8. ObjectStorageStatus,
  9. build_object_storage)
  10. if TYPE_CHECKING:
  11. from app.infrastructure.object_storage import KnowledgeObjectStorage
  12. class _ObjectStorageMixin:
  13. settings: KnowledgeServiceSettings
  14. _object_storage: KnowledgeObjectStorage | None
  15. @property
  16. def object_storage(self) -> KnowledgeObjectStorage:
  17. if self._object_storage is None:
  18. self._object_storage = build_object_storage(self.settings)
  19. return self._object_storage
  20. def _read_document_object_key(self, *, document: KnowledgeDocument) -> str | None:
  21. object_metadata = self._read_object_storage_metadata(document=document)
  22. if object_metadata is None:
  23. return None
  24. object_key = object_metadata.get("objectKey")
  25. return object_key if isinstance(object_key, str) and object_key else None
  26. def _read_object_storage_metadata(
  27. self,
  28. *,
  29. document: KnowledgeDocument,
  30. ) -> dict[str, JSONValue] | None:
  31. metadata = document.metadata_json or {}
  32. object_metadata = metadata.get("object_storage")
  33. return object_metadata if isinstance(object_metadata, dict) else None
  34. def _read_document_raw_content(self, *, document: KnowledgeDocument) -> bytes:
  35. object_key = self._read_document_object_key(document=document)
  36. if isinstance(object_key, str) and object_key:
  37. return self.object_storage.get_bytes(object_key=object_key)
  38. if document.content_text:
  39. return document.content_text.encode("utf-8")
  40. raise ValueError(f"knowledge document content object not found: {document.id}")
  41. def _delete_document_object(self, *, document: KnowledgeDocument) -> bool:
  42. from app.infrastructure.object_storage import ObjectStorageNotFoundError
  43. object_key = self._read_document_object_key(document=document)
  44. if object_key is None:
  45. return False
  46. try:
  47. return self.object_storage.delete_object(object_key=object_key)
  48. except ObjectStorageNotFoundError:
  49. return False
  50. def _guess_content_type(self, *, source_type: str) -> str:
  51. normalized = source_type.strip().lower().removeprefix(".")
  52. if normalized in {"markdown", "md"}:
  53. return "text/markdown; charset=utf-8"
  54. if normalized in {"html", "htm"}:
  55. return "text/html; charset=utf-8"
  56. if normalized == "json":
  57. return "application/json"
  58. if normalized == "csv":
  59. return "text/csv; charset=utf-8"
  60. if normalized == "pdf":
  61. return "application/pdf"
  62. if normalized in {"docx", "word"}:
  63. return "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
  64. return "text/plain; charset=utf-8"
  65. def _object_status_to_payload(
  66. self,
  67. *,
  68. document: KnowledgeDocument,
  69. status: ObjectStorageStatus,
  70. ) -> dict[str, JSONValue]:
  71. return {
  72. "documentId": document.id,
  73. "exists": status.exists,
  74. "objectStorage": self._read_object_storage_metadata(document=document),
  75. "contentType": status.content_type,
  76. "sizeBytes": status.size_bytes,
  77. "etag": status.etag,
  78. "errorMessage": status.error_message,
  79. }
  80. def _read_content_type_from_status(
  81. self,
  82. object_status: dict[str, JSONValue] | None,
  83. ) -> str | None:
  84. if object_status is None:
  85. return None
  86. content_type = object_status.get("contentType")
  87. return content_type if isinstance(content_type, str) else None
  88. def _is_text_content_type(self, *, content_type: str | None, source_type: str) -> bool:
  89. if content_type is not None and content_type.startswith("text/"):
  90. return True
  91. return source_type.strip().lower() in {"text", "txt", "markdown", "md", "html", "htm", "json", "csv"}