| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- from __future__ import annotations
- import hashlib
- from dataclasses import dataclass
- from typing import Protocol
- from uuid import uuid4
- from app.bootstrap.settings import KnowledgeServiceSettings
- @dataclass(frozen=True, slots=True)
- class StoredObject:
- backend: str
- bucket: str
- object_key: str
- content_type: str
- size_bytes: int
- etag: str | None = None
- def to_metadata(self) -> dict[str, str | int | None]:
- return {
- "backend": self.backend,
- "bucket": self.bucket,
- "objectKey": self.object_key,
- "contentType": self.content_type,
- "sizeBytes": self.size_bytes,
- "etag": self.etag,
- }
- @dataclass(frozen=True, slots=True)
- class ObjectStorageStatus:
- backend: str
- bucket: str
- object_key: str
- exists: bool
- content_type: str | None = None
- size_bytes: int | None = None
- etag: str | None = None
- error_message: str | None = None
- class ObjectStorageError(RuntimeError):
- pass
- class ObjectStorageConfigurationError(ObjectStorageError):
- pass
- class ObjectStorageNotFoundError(ObjectStorageError, FileNotFoundError):
- pass
- class KnowledgeObjectStorage(Protocol):
- def put_bytes(
- self,
- *,
- object_key: str,
- content: bytes,
- content_type: str) -> StoredObject:
- ...
- def get_bytes(self, *, object_key: str) -> bytes:
- ...
- def head_object(self, *, object_key: str) -> ObjectStorageStatus:
- ...
- def delete_object(self, *, object_key: str) -> bool:
- ...
- def health_check(self) -> dict[str, str | bool | None]:
- ...
- class InMemoryObjectStorage:
- _buckets: dict[str, dict[str, tuple[bytes, str, str]]] = {}
- def __init__(self, *, bucket: str = "memory-knowledge") -> None:
- self.bucket = bucket
- self._objects = self._buckets.setdefault(bucket, {})
- def put_bytes(
- self,
- *,
- object_key: str,
- content: bytes,
- content_type: str) -> StoredObject:
- etag = hashlib.sha256(content).hexdigest()
- self._objects[object_key] = (content, content_type, etag)
- return StoredObject(
- backend="memory",
- bucket=self.bucket,
- object_key=object_key,
- content_type=content_type,
- size_bytes=len(content),
- etag=etag)
- def get_bytes(self, *, object_key: str) -> bytes:
- try:
- return self._objects[object_key][0]
- except KeyError as exc:
- raise ObjectStorageNotFoundError(f"knowledge object not found: {object_key}") from exc
- def head_object(self, *, object_key: str) -> ObjectStorageStatus:
- stored = self._objects.get(object_key)
- if stored is None:
- return ObjectStorageStatus(
- backend="memory",
- bucket=self.bucket,
- object_key=object_key,
- exists=False,
- error_message=f"knowledge object not found: {object_key}")
- content, content_type, etag = stored
- return ObjectStorageStatus(
- backend="memory",
- bucket=self.bucket,
- object_key=object_key,
- exists=True,
- content_type=content_type,
- size_bytes=len(content),
- etag=etag)
- def delete_object(self, *, object_key: str) -> bool:
- return self._objects.pop(object_key, None) is not None
- def health_check(self) -> dict[str, str | bool | None]:
- return {
- "backend": "memory",
- "bucket": self.bucket,
- "available": True,
- "message": None,
- }
- class MinioObjectStorage:
- def __init__(self, *, settings: KnowledgeServiceSettings) -> None:
- try:
- import boto3
- from botocore.config import Config
- from botocore.exceptions import ClientError
- except Exception as exc:
- raise ObjectStorageConfigurationError(
- "boto3 is required for MinIO object storage. "
- "Install knowledge-service dependencies before starting the service."
- ) from exc
- self._client_error_type = ClientError
- self.bucket = settings.object_storage_bucket.strip()
- if not self.bucket:
- raise ObjectStorageConfigurationError("MinIO bucket is required")
- config = Config(
- signature_version="s3v4",
- s3={"addressing_style": "path" if settings.object_storage_path_style else "auto"},
- )
- try:
- self._client = boto3.client(
- "s3",
- endpoint_url=settings.object_storage_endpoint_url,
- aws_access_key_id=settings.object_storage_access_key,
- aws_secret_access_key=settings.object_storage_secret_key,
- region_name=settings.object_storage_region,
- config=config,
- )
- except Exception as exc:
- raise ObjectStorageConfigurationError("failed to configure MinIO client") from exc
- self._bucket_checked = False
- def put_bytes(
- self,
- *,
- object_key: str,
- content: bytes,
- content_type: str) -> StoredObject:
- try:
- self._ensure_bucket()
- response = self._client.put_object(
- Bucket=self.bucket,
- Key=object_key,
- Body=content,
- ContentType=content_type,
- )
- except self._client_error_type as exc:
- raise ObjectStorageError(f"failed to write knowledge object: {object_key}") from exc
- etag = response.get("ETag")
- return StoredObject(
- backend="minio",
- bucket=self.bucket,
- object_key=object_key,
- content_type=content_type,
- size_bytes=len(content),
- etag=str(etag).strip('"') if etag else None)
- def get_bytes(self, *, object_key: str) -> bytes:
- try:
- response = self._client.get_object(Bucket=self.bucket, Key=object_key)
- except self._client_error_type as exc:
- if self._is_not_found(exc):
- raise ObjectStorageNotFoundError(f"knowledge object not found: {object_key}") from exc
- raise ObjectStorageError(f"failed to read knowledge object: {object_key}") from exc
- try:
- return response["Body"].read()
- finally:
- response["Body"].close()
- def head_object(self, *, object_key: str) -> ObjectStorageStatus:
- try:
- response = self._client.head_object(Bucket=self.bucket, Key=object_key)
- except self._client_error_type as exc:
- if self._is_not_found(exc):
- return ObjectStorageStatus(
- backend="minio",
- bucket=self.bucket,
- object_key=object_key,
- exists=False,
- error_message=f"knowledge object not found: {object_key}")
- raise ObjectStorageError(f"failed to stat knowledge object: {object_key}") from exc
- return ObjectStorageStatus(
- backend="minio",
- bucket=self.bucket,
- object_key=object_key,
- exists=True,
- content_type=response.get("ContentType"),
- size_bytes=response.get("ContentLength"),
- etag=str(response.get("ETag")).strip('"') if response.get("ETag") else None)
- def delete_object(self, *, object_key: str) -> bool:
- try:
- existed = self.head_object(object_key=object_key).exists
- self._client.delete_object(Bucket=self.bucket, Key=object_key)
- return existed
- except ObjectStorageError:
- raise
- except self._client_error_type as exc:
- raise ObjectStorageError(f"failed to delete knowledge object: {object_key}") from exc
- def health_check(self) -> dict[str, str | bool | None]:
- try:
- self._ensure_bucket()
- except ObjectStorageError as exc:
- return {
- "backend": "minio",
- "bucket": self.bucket,
- "available": False,
- "message": str(exc),
- }
- return {
- "backend": "minio",
- "bucket": self.bucket,
- "available": True,
- "message": None,
- }
- def _ensure_bucket(self) -> None:
- if self._bucket_checked:
- return
- try:
- self._client.head_bucket(Bucket=self.bucket)
- except self._client_error_type as exc:
- if not self._is_not_found(exc):
- raise ObjectStorageError(f"failed to access MinIO bucket: {self.bucket}") from exc
- try:
- self._client.create_bucket(Bucket=self.bucket)
- except self._client_error_type as create_exc:
- raise ObjectStorageError(f"failed to create MinIO bucket: {self.bucket}") from create_exc
- self._bucket_checked = True
- def _is_not_found(self, exc: Exception) -> bool:
- response = getattr(exc, "response", None)
- if not isinstance(response, dict):
- return False
- error = response.get("Error")
- status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
- code = error.get("Code") if isinstance(error, dict) else None
- return status == 404 or code in {"404", "NoSuchBucket", "NoSuchKey", "NotFound"}
- def build_object_storage(settings: KnowledgeServiceSettings) -> KnowledgeObjectStorage:
- backend = settings.object_storage_backend.strip().lower()
- if backend == "memory":
- return InMemoryObjectStorage(bucket=settings.object_storage_bucket)
- if backend == "minio":
- return MinioObjectStorage(settings=settings)
- raise ValueError(f"unsupported knowledge object storage backend: {settings.object_storage_backend}")
- def build_document_object_key(
- *,
- knowledge_base_id: str,
- source_type: str,
- title: str) -> str:
- safe_title = "".join(
- char.lower() if char.isalnum() else "-"
- for char in title
- ).strip("-") or "document"
- suffix = source_type.strip().lower().removeprefix(".") or "txt"
- return f"knowledge/{knowledge_base_id}/documents/{uuid4().hex}/{safe_title[:80]}.{suffix}"
|