object_storage.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. from __future__ import annotations
  2. import hashlib
  3. from dataclasses import dataclass
  4. from typing import Protocol
  5. from uuid import uuid4
  6. from app.bootstrap.settings import KnowledgeServiceSettings
  7. @dataclass(frozen=True, slots=True)
  8. class StoredObject:
  9. backend: str
  10. bucket: str
  11. object_key: str
  12. content_type: str
  13. size_bytes: int
  14. etag: str | None = None
  15. def to_metadata(self) -> dict[str, str | int | None]:
  16. return {
  17. "backend": self.backend,
  18. "bucket": self.bucket,
  19. "objectKey": self.object_key,
  20. "contentType": self.content_type,
  21. "sizeBytes": self.size_bytes,
  22. "etag": self.etag,
  23. }
  24. @dataclass(frozen=True, slots=True)
  25. class ObjectStorageStatus:
  26. backend: str
  27. bucket: str
  28. object_key: str
  29. exists: bool
  30. content_type: str | None = None
  31. size_bytes: int | None = None
  32. etag: str | None = None
  33. error_message: str | None = None
  34. class ObjectStorageError(RuntimeError):
  35. pass
  36. class ObjectStorageConfigurationError(ObjectStorageError):
  37. pass
  38. class ObjectStorageNotFoundError(ObjectStorageError, FileNotFoundError):
  39. pass
  40. class KnowledgeObjectStorage(Protocol):
  41. def put_bytes(
  42. self,
  43. *,
  44. object_key: str,
  45. content: bytes,
  46. content_type: str) -> StoredObject:
  47. ...
  48. def get_bytes(self, *, object_key: str) -> bytes:
  49. ...
  50. def head_object(self, *, object_key: str) -> ObjectStorageStatus:
  51. ...
  52. def delete_object(self, *, object_key: str) -> bool:
  53. ...
  54. def health_check(self) -> dict[str, str | bool | None]:
  55. ...
  56. class InMemoryObjectStorage:
  57. _buckets: dict[str, dict[str, tuple[bytes, str, str]]] = {}
  58. def __init__(self, *, bucket: str = "memory-knowledge") -> None:
  59. self.bucket = bucket
  60. self._objects = self._buckets.setdefault(bucket, {})
  61. def put_bytes(
  62. self,
  63. *,
  64. object_key: str,
  65. content: bytes,
  66. content_type: str) -> StoredObject:
  67. etag = hashlib.sha256(content).hexdigest()
  68. self._objects[object_key] = (content, content_type, etag)
  69. return StoredObject(
  70. backend="memory",
  71. bucket=self.bucket,
  72. object_key=object_key,
  73. content_type=content_type,
  74. size_bytes=len(content),
  75. etag=etag)
  76. def get_bytes(self, *, object_key: str) -> bytes:
  77. try:
  78. return self._objects[object_key][0]
  79. except KeyError as exc:
  80. raise ObjectStorageNotFoundError(f"knowledge object not found: {object_key}") from exc
  81. def head_object(self, *, object_key: str) -> ObjectStorageStatus:
  82. stored = self._objects.get(object_key)
  83. if stored is None:
  84. return ObjectStorageStatus(
  85. backend="memory",
  86. bucket=self.bucket,
  87. object_key=object_key,
  88. exists=False,
  89. error_message=f"knowledge object not found: {object_key}")
  90. content, content_type, etag = stored
  91. return ObjectStorageStatus(
  92. backend="memory",
  93. bucket=self.bucket,
  94. object_key=object_key,
  95. exists=True,
  96. content_type=content_type,
  97. size_bytes=len(content),
  98. etag=etag)
  99. def delete_object(self, *, object_key: str) -> bool:
  100. return self._objects.pop(object_key, None) is not None
  101. def health_check(self) -> dict[str, str | bool | None]:
  102. return {
  103. "backend": "memory",
  104. "bucket": self.bucket,
  105. "available": True,
  106. "message": None,
  107. }
  108. class MinioObjectStorage:
  109. def __init__(self, *, settings: KnowledgeServiceSettings) -> None:
  110. try:
  111. import boto3
  112. from botocore.config import Config
  113. from botocore.exceptions import ClientError
  114. except Exception as exc:
  115. raise ObjectStorageConfigurationError(
  116. "boto3 is required for MinIO object storage. "
  117. "Install knowledge-service dependencies before starting the service."
  118. ) from exc
  119. self._client_error_type = ClientError
  120. self.bucket = settings.object_storage_bucket.strip()
  121. if not self.bucket:
  122. raise ObjectStorageConfigurationError("MinIO bucket is required")
  123. config = Config(
  124. signature_version="s3v4",
  125. s3={"addressing_style": "path" if settings.object_storage_path_style else "auto"},
  126. )
  127. try:
  128. self._client = boto3.client(
  129. "s3",
  130. endpoint_url=settings.object_storage_endpoint_url,
  131. aws_access_key_id=settings.object_storage_access_key,
  132. aws_secret_access_key=settings.object_storage_secret_key,
  133. region_name=settings.object_storage_region,
  134. config=config,
  135. )
  136. except Exception as exc:
  137. raise ObjectStorageConfigurationError("failed to configure MinIO client") from exc
  138. self._bucket_checked = False
  139. def put_bytes(
  140. self,
  141. *,
  142. object_key: str,
  143. content: bytes,
  144. content_type: str) -> StoredObject:
  145. try:
  146. self._ensure_bucket()
  147. response = self._client.put_object(
  148. Bucket=self.bucket,
  149. Key=object_key,
  150. Body=content,
  151. ContentType=content_type,
  152. )
  153. except self._client_error_type as exc:
  154. raise ObjectStorageError(f"failed to write knowledge object: {object_key}") from exc
  155. etag = response.get("ETag")
  156. return StoredObject(
  157. backend="minio",
  158. bucket=self.bucket,
  159. object_key=object_key,
  160. content_type=content_type,
  161. size_bytes=len(content),
  162. etag=str(etag).strip('"') if etag else None)
  163. def get_bytes(self, *, object_key: str) -> bytes:
  164. try:
  165. response = self._client.get_object(Bucket=self.bucket, Key=object_key)
  166. except self._client_error_type as exc:
  167. if self._is_not_found(exc):
  168. raise ObjectStorageNotFoundError(f"knowledge object not found: {object_key}") from exc
  169. raise ObjectStorageError(f"failed to read knowledge object: {object_key}") from exc
  170. try:
  171. return response["Body"].read()
  172. finally:
  173. response["Body"].close()
  174. def head_object(self, *, object_key: str) -> ObjectStorageStatus:
  175. try:
  176. response = self._client.head_object(Bucket=self.bucket, Key=object_key)
  177. except self._client_error_type as exc:
  178. if self._is_not_found(exc):
  179. return ObjectStorageStatus(
  180. backend="minio",
  181. bucket=self.bucket,
  182. object_key=object_key,
  183. exists=False,
  184. error_message=f"knowledge object not found: {object_key}")
  185. raise ObjectStorageError(f"failed to stat knowledge object: {object_key}") from exc
  186. return ObjectStorageStatus(
  187. backend="minio",
  188. bucket=self.bucket,
  189. object_key=object_key,
  190. exists=True,
  191. content_type=response.get("ContentType"),
  192. size_bytes=response.get("ContentLength"),
  193. etag=str(response.get("ETag")).strip('"') if response.get("ETag") else None)
  194. def delete_object(self, *, object_key: str) -> bool:
  195. try:
  196. existed = self.head_object(object_key=object_key).exists
  197. self._client.delete_object(Bucket=self.bucket, Key=object_key)
  198. return existed
  199. except ObjectStorageError:
  200. raise
  201. except self._client_error_type as exc:
  202. raise ObjectStorageError(f"failed to delete knowledge object: {object_key}") from exc
  203. def health_check(self) -> dict[str, str | bool | None]:
  204. try:
  205. self._ensure_bucket()
  206. except ObjectStorageError as exc:
  207. return {
  208. "backend": "minio",
  209. "bucket": self.bucket,
  210. "available": False,
  211. "message": str(exc),
  212. }
  213. return {
  214. "backend": "minio",
  215. "bucket": self.bucket,
  216. "available": True,
  217. "message": None,
  218. }
  219. def _ensure_bucket(self) -> None:
  220. if self._bucket_checked:
  221. return
  222. try:
  223. self._client.head_bucket(Bucket=self.bucket)
  224. except self._client_error_type as exc:
  225. if not self._is_not_found(exc):
  226. raise ObjectStorageError(f"failed to access MinIO bucket: {self.bucket}") from exc
  227. try:
  228. self._client.create_bucket(Bucket=self.bucket)
  229. except self._client_error_type as create_exc:
  230. raise ObjectStorageError(f"failed to create MinIO bucket: {self.bucket}") from create_exc
  231. self._bucket_checked = True
  232. def _is_not_found(self, exc: Exception) -> bool:
  233. response = getattr(exc, "response", None)
  234. if not isinstance(response, dict):
  235. return False
  236. error = response.get("Error")
  237. status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
  238. code = error.get("Code") if isinstance(error, dict) else None
  239. return status == 404 or code in {"404", "NoSuchBucket", "NoSuchKey", "NotFound"}
  240. def build_object_storage(settings: KnowledgeServiceSettings) -> KnowledgeObjectStorage:
  241. backend = settings.object_storage_backend.strip().lower()
  242. if backend == "memory":
  243. return InMemoryObjectStorage(bucket=settings.object_storage_bucket)
  244. if backend == "minio":
  245. return MinioObjectStorage(settings=settings)
  246. raise ValueError(f"unsupported knowledge object storage backend: {settings.object_storage_backend}")
  247. def build_document_object_key(
  248. *,
  249. knowledge_base_id: str,
  250. source_type: str,
  251. title: str) -> str:
  252. safe_title = "".join(
  253. char.lower() if char.isalnum() else "-"
  254. for char in title
  255. ).strip("-") or "document"
  256. suffix = source_type.strip().lower().removeprefix(".") or "txt"
  257. return f"knowledge/{knowledge_base_id}/documents/{uuid4().hex}/{safe_title[:80]}.{suffix}"