from __future__ import annotations import hashlib from dataclasses import dataclass from typing import Protocol from uuid import uuid4 from app.bootstrap.settings import KnowledgeServiceSettings @dataclass(frozen=True, slots=True) class StoredObject: backend: str bucket: str object_key: str content_type: str size_bytes: int etag: str | None = None def to_metadata(self) -> dict[str, str | int | None]: return { "backend": self.backend, "bucket": self.bucket, "objectKey": self.object_key, "contentType": self.content_type, "sizeBytes": self.size_bytes, "etag": self.etag, } @dataclass(frozen=True, slots=True) class ObjectStorageStatus: backend: str bucket: str object_key: str exists: bool content_type: str | None = None size_bytes: int | None = None etag: str | None = None error_message: str | None = None class ObjectStorageError(RuntimeError): pass class ObjectStorageConfigurationError(ObjectStorageError): pass class ObjectStorageNotFoundError(ObjectStorageError, FileNotFoundError): pass class KnowledgeObjectStorage(Protocol): def put_bytes( self, *, object_key: str, content: bytes, content_type: str) -> StoredObject: ... def get_bytes(self, *, object_key: str) -> bytes: ... def head_object(self, *, object_key: str) -> ObjectStorageStatus: ... def delete_object(self, *, object_key: str) -> bool: ... def health_check(self) -> dict[str, str | bool | None]: ... class InMemoryObjectStorage: _buckets: dict[str, dict[str, tuple[bytes, str, str]]] = {} def __init__(self, *, bucket: str = "memory-knowledge") -> None: self.bucket = bucket self._objects = self._buckets.setdefault(bucket, {}) def put_bytes( self, *, object_key: str, content: bytes, content_type: str) -> StoredObject: etag = hashlib.sha256(content).hexdigest() self._objects[object_key] = (content, content_type, etag) return StoredObject( backend="memory", bucket=self.bucket, object_key=object_key, content_type=content_type, size_bytes=len(content), etag=etag) def get_bytes(self, *, object_key: str) -> bytes: try: return self._objects[object_key][0] except KeyError as exc: raise ObjectStorageNotFoundError(f"knowledge object not found: {object_key}") from exc def head_object(self, *, object_key: str) -> ObjectStorageStatus: stored = self._objects.get(object_key) if stored is None: return ObjectStorageStatus( backend="memory", bucket=self.bucket, object_key=object_key, exists=False, error_message=f"knowledge object not found: {object_key}") content, content_type, etag = stored return ObjectStorageStatus( backend="memory", bucket=self.bucket, object_key=object_key, exists=True, content_type=content_type, size_bytes=len(content), etag=etag) def delete_object(self, *, object_key: str) -> bool: return self._objects.pop(object_key, None) is not None def health_check(self) -> dict[str, str | bool | None]: return { "backend": "memory", "bucket": self.bucket, "available": True, "message": None, } class MinioObjectStorage: def __init__(self, *, settings: KnowledgeServiceSettings) -> None: try: import boto3 from botocore.config import Config from botocore.exceptions import ClientError except Exception as exc: raise ObjectStorageConfigurationError( "boto3 is required for MinIO object storage. " "Install knowledge-service dependencies before starting the service." ) from exc self._client_error_type = ClientError self.bucket = settings.object_storage_bucket.strip() if not self.bucket: raise ObjectStorageConfigurationError("MinIO bucket is required") config = Config( signature_version="s3v4", s3={"addressing_style": "path" if settings.object_storage_path_style else "auto"}, ) try: self._client = boto3.client( "s3", endpoint_url=settings.object_storage_endpoint_url, aws_access_key_id=settings.object_storage_access_key, aws_secret_access_key=settings.object_storage_secret_key, region_name=settings.object_storage_region, config=config, ) except Exception as exc: raise ObjectStorageConfigurationError("failed to configure MinIO client") from exc self._bucket_checked = False def put_bytes( self, *, object_key: str, content: bytes, content_type: str) -> StoredObject: try: self._ensure_bucket() response = self._client.put_object( Bucket=self.bucket, Key=object_key, Body=content, ContentType=content_type, ) except self._client_error_type as exc: raise ObjectStorageError(f"failed to write knowledge object: {object_key}") from exc etag = response.get("ETag") return StoredObject( backend="minio", bucket=self.bucket, object_key=object_key, content_type=content_type, size_bytes=len(content), etag=str(etag).strip('"') if etag else None) def get_bytes(self, *, object_key: str) -> bytes: try: response = self._client.get_object(Bucket=self.bucket, Key=object_key) except self._client_error_type as exc: if self._is_not_found(exc): raise ObjectStorageNotFoundError(f"knowledge object not found: {object_key}") from exc raise ObjectStorageError(f"failed to read knowledge object: {object_key}") from exc try: return response["Body"].read() finally: response["Body"].close() def head_object(self, *, object_key: str) -> ObjectStorageStatus: try: response = self._client.head_object(Bucket=self.bucket, Key=object_key) except self._client_error_type as exc: if self._is_not_found(exc): return ObjectStorageStatus( backend="minio", bucket=self.bucket, object_key=object_key, exists=False, error_message=f"knowledge object not found: {object_key}") raise ObjectStorageError(f"failed to stat knowledge object: {object_key}") from exc return ObjectStorageStatus( backend="minio", bucket=self.bucket, object_key=object_key, exists=True, content_type=response.get("ContentType"), size_bytes=response.get("ContentLength"), etag=str(response.get("ETag")).strip('"') if response.get("ETag") else None) def delete_object(self, *, object_key: str) -> bool: try: existed = self.head_object(object_key=object_key).exists self._client.delete_object(Bucket=self.bucket, Key=object_key) return existed except ObjectStorageError: raise except self._client_error_type as exc: raise ObjectStorageError(f"failed to delete knowledge object: {object_key}") from exc def health_check(self) -> dict[str, str | bool | None]: try: self._ensure_bucket() except ObjectStorageError as exc: return { "backend": "minio", "bucket": self.bucket, "available": False, "message": str(exc), } return { "backend": "minio", "bucket": self.bucket, "available": True, "message": None, } def _ensure_bucket(self) -> None: if self._bucket_checked: return try: self._client.head_bucket(Bucket=self.bucket) except self._client_error_type as exc: if not self._is_not_found(exc): raise ObjectStorageError(f"failed to access MinIO bucket: {self.bucket}") from exc try: self._client.create_bucket(Bucket=self.bucket) except self._client_error_type as create_exc: raise ObjectStorageError(f"failed to create MinIO bucket: {self.bucket}") from create_exc self._bucket_checked = True def _is_not_found(self, exc: Exception) -> bool: response = getattr(exc, "response", None) if not isinstance(response, dict): return False error = response.get("Error") status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") code = error.get("Code") if isinstance(error, dict) else None return status == 404 or code in {"404", "NoSuchBucket", "NoSuchKey", "NotFound"} def build_object_storage(settings: KnowledgeServiceSettings) -> KnowledgeObjectStorage: backend = settings.object_storage_backend.strip().lower() if backend == "memory": return InMemoryObjectStorage(bucket=settings.object_storage_bucket) if backend == "minio": return MinioObjectStorage(settings=settings) raise ValueError(f"unsupported knowledge object storage backend: {settings.object_storage_backend}") def build_document_object_key( *, knowledge_base_id: str, source_type: str, title: str) -> str: safe_title = "".join( char.lower() if char.isalnum() else "-" for char in title ).strip("-") or "document" suffix = source_type.strip().lower().removeprefix(".") or "txt" return f"knowledge/{knowledge_base_id}/documents/{uuid4().hex}/{safe_title[:80]}.{suffix}"