| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- """Knowledge application service — thin facade delegating to sub-services."""
- from __future__ import annotations
- from typing import TYPE_CHECKING
- from core_shared import JSONValue, try_build_redis_client
- from core_shared.task_queue import TaskQueuePublisher
- from app.application.crud_service import KnowledgeCrudService
- from app.application.embeddings import EmbeddingService
- from app.application.indexing_service import (
- KnowledgeDocumentIngestResult,
- KnowledgeIndexingError,
- KnowledgeIndexingService,
- )
- from app.application.search_service import KnowledgeSearchService
- from app.application.settings_service import KnowledgeSettingsService
- from app.bootstrap.settings import KnowledgeServiceSettings
- from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument
- from app.schemas.knowledge import (
- KnowledgeBaseCreateRequest,
- KnowledgeBaseCreateRequestDto,
- KnowledgeBaseReindexRequestDto,
- KnowledgeBaseStatusUpdateRequest,
- KnowledgeBaseUpdateRequestDto,
- KnowledgeDocumentCreateRequest,
- KnowledgeDocumentCreateRequestDto,
- KnowledgeDocumentParseRequest,
- KnowledgeDocumentReindexRequestDto,
- KnowledgeDocumentUpdateRequestDto,
- KnowledgeIndexJobData,
- KnowledgeSearchRequest,
- KnowledgeSettingsDto,
- KnowledgeSettingsUpdateRequestDto,
- )
- if TYPE_CHECKING:
- from redis import Redis
- from app.domain.repositories import (
- KnowledgeBaseRepository,
- KnowledgeChunkRepository,
- KnowledgeDocumentRepository,
- )
- from app.infrastructure.object_storage import KnowledgeObjectStorage
- class KnowledgeApplicationService:
- """Facade composing CRUD, Indexing, Search, and Settings sub-services."""
- def __init__(
- self,
- *,
- settings: KnowledgeServiceSettings,
- base_repository: KnowledgeBaseRepository,
- document_repository: KnowledgeDocumentRepository,
- chunk_repository: KnowledgeChunkRepository,
- object_storage: KnowledgeObjectStorage | None = None,
- redis_client: Redis | None = None,
- task_queue_publisher: TaskQueuePublisher | None = None,
- ) -> None:
- self.settings = settings
- self.base_repository = base_repository
- self.document_repository = document_repository
- self.chunk_repository = chunk_repository
- self.embedding_service = EmbeddingService(settings=settings)
- self._object_storage = object_storage
- self.redis_client = redis_client
- self.task_queue_publisher = task_queue_publisher
- self.crud_service = KnowledgeCrudService(
- settings=settings,
- base_repository=base_repository,
- document_repository=document_repository,
- chunk_repository=chunk_repository,
- object_storage=object_storage,
- )
- self.indexing_service = KnowledgeIndexingService(
- settings=settings,
- base_repository=base_repository,
- document_repository=document_repository,
- chunk_repository=chunk_repository,
- embedding_service=self.embedding_service,
- object_storage=object_storage,
- redis_client=redis_client,
- task_queue_publisher=task_queue_publisher,
- )
- self.search_service = KnowledgeSearchService(
- settings=settings,
- base_repository=base_repository,
- document_repository=document_repository,
- chunk_repository=chunk_repository,
- embedding_service=self.embedding_service,
- )
- self.settings_service = KnowledgeSettingsService(
- settings=settings,
- base_repository=base_repository,
- )
- @property
- def object_storage(self) -> KnowledgeObjectStorage:
- return self.crud_service.object_storage
- # ── Knowledge Base ────────────────────────────────────────────────
- def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase:
- return self.crud_service.create_base(payload)
- def create_base_from_contract(self, payload: KnowledgeBaseCreateRequestDto) -> KnowledgeBase:
- return self.crud_service.create_base_from_contract(payload)
- def list_bases(self) -> list[KnowledgeBase]:
- return self.crud_service.list_bases()
- def list_bases_filtered(self, *, keyword: str | None = None, status: str | None = None) -> list[KnowledgeBase]:
- return self.crud_service.list_bases_filtered(keyword=keyword, status=status)
- def update_base_from_contract(self, payload: KnowledgeBaseUpdateRequestDto) -> KnowledgeBase | None:
- return self.crud_service.update_base_from_contract(payload)
- def delete_base(self, *, knowledge_base_id: str) -> bool:
- return self.crud_service.delete_base(knowledge_base_id=knowledge_base_id)
- def update_base_status(self, *, knowledge_base_id: str, payload: KnowledgeBaseStatusUpdateRequest) -> KnowledgeBase | None:
- return self.crud_service.update_base_status(knowledge_base_id=knowledge_base_id, payload=payload)
- # ── Document CRUD ─────────────────────────────────────────────────
- def list_documents(self, *, knowledge_base_id: str) -> list[KnowledgeDocument]:
- return self.crud_service.list_documents(knowledge_base_id=knowledge_base_id)
- def list_documents_filtered(self, *, knowledge_base_id: str | None = None, keyword: str | None = None, status: str | None = None, source_type: str | None = None) -> list[KnowledgeDocument]:
- return self.crud_service.list_documents_filtered(
- knowledge_base_id=knowledge_base_id, keyword=keyword,
- status=status, source_type=source_type)
- def update_document_from_contract(self, payload: KnowledgeDocumentUpdateRequestDto) -> KnowledgeDocument | None:
- return self.crud_service.update_document_from_contract(payload)
- def delete_document(self, *, document_id: str) -> bool:
- return self.crud_service.delete_document(document_id=document_id)
- def delete_document_result(self, *, document_id: str) -> dict[str, JSONValue]:
- return self.crud_service.delete_document_result(document_id=document_id)
- def read_document_content(self, *, document_id: str, include_text: bool = True, include_base64: bool = False) -> dict[str, JSONValue] | None:
- return self.crud_service.read_document_content(
- document_id=document_id, include_text=include_text, include_base64=include_base64)
- def read_document_storage_status(self, *, document_id: str) -> dict[str, JSONValue] | None:
- return self.crud_service.read_document_storage_status(document_id=document_id)
- def read_storage_health(self) -> dict[str, JSONValue]:
- return self.crud_service.read_storage_health()
- # ── Chunk CRUD ────────────────────────────────────────────────────
- def list_chunks_filtered(self, *, knowledge_base_id: str | None = None, document_id: str | None = None, keyword: str | None = None) -> list[KnowledgeChunk]:
- return self.crud_service.list_chunks_filtered(
- knowledge_base_id=knowledge_base_id, document_id=document_id, keyword=keyword)
- def delete_chunk(self, *, chunk_id: str) -> bool:
- return self.crud_service.delete_chunk(chunk_id=chunk_id)
- # ── Parse ─────────────────────────────────────────────────────────
- def parse_document(self, payload: KnowledgeDocumentParseRequest):
- return self.crud_service.parse_document(payload)
- # ── Indexing ──────────────────────────────────────────────────────
- def create_document(self, payload: KnowledgeDocumentCreateRequest) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
- return self.indexing_service.create_document(payload)
- def create_document_from_contract(self, payload: KnowledgeDocumentCreateRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
- return self.indexing_service.create_document_from_contract(payload)
- def create_document_from_contract_result(self, payload: KnowledgeDocumentCreateRequestDto) -> KnowledgeDocumentIngestResult:
- return self.indexing_service.create_document_from_contract_result(payload)
- def create_document_index_job(self, payload: KnowledgeDocumentCreateRequest) -> KnowledgeDocumentIngestResult:
- return self.indexing_service.create_document_index_job(payload)
- def queue_document_indexing(self, *, document: KnowledgeDocument, action: str, chunk_size: int | None = None, chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, KnowledgeIndexJobData]:
- return self.indexing_service.queue_document_indexing(
- document=document, action=action, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
- def process_document_index_job(self, *, document_id: str, action: str, worker_key: str, job_id: str | None = None, chunk_size: int | None = None, chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
- return self.indexing_service.process_document_index_job(
- document_id=document_id, action=action, worker_key=worker_key,
- job_id=job_id, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
- def execute_document_index_job(self, *, document_id: str, action: str, worker_key: str, lease_seconds: int, job_id: str | None = None, redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
- return self.indexing_service.execute_document_index_job(
- document_id=document_id, action=action, worker_key=worker_key,
- lease_seconds=lease_seconds, job_id=job_id, redis_client=redis_client)
- def execute_next_pending_document_job(self, *, worker_key: str, lease_seconds: int, stale_indexing_seconds: int, redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
- return self.indexing_service.execute_next_pending_document_job(
- worker_key=worker_key, lease_seconds=lease_seconds,
- stale_indexing_seconds=stale_indexing_seconds, redis_client=redis_client)
- def list_index_jobs(self, *, knowledge_base_id: str | None = None, document_id: str | None = None, status: str | None = None) -> list[KnowledgeIndexJobData]:
- return self.indexing_service.list_index_jobs(
- knowledge_base_id=knowledge_base_id, document_id=document_id, status=status)
- def detail_index_job(self, *, document_id: str) -> KnowledgeIndexJobData | None:
- return self.indexing_service.detail_index_job(document_id=document_id)
- def reindex_document(self, payload: KnowledgeDocumentReindexRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
- return self.indexing_service.reindex_document(payload)
- def reindex_document_from_contract_result(self, payload: KnowledgeDocumentReindexRequestDto) -> KnowledgeDocumentIngestResult | None:
- return self.indexing_service.reindex_document_from_contract_result(payload)
- def reindex_base_from_contract(self, payload: KnowledgeBaseReindexRequestDto) -> list[KnowledgeIndexJobData]:
- return self.indexing_service.reindex_base_from_contract(payload)
- # ── Search ────────────────────────────────────────────────────────
- def search(self, payload: KnowledgeSearchRequest) -> list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]]:
- return self.search_service.search(payload)
- # ── Settings ──────────────────────────────────────────────────────
- def read_settings(self, *, knowledge_base_id: str | None = None) -> KnowledgeSettingsDto:
- return self.settings_service.read_settings(knowledge_base_id=knowledge_base_id)
- def update_settings(self, payload: KnowledgeSettingsUpdateRequestDto) -> KnowledgeSettingsDto:
- return self.settings_service.update_settings(payload)
- def build_knowledge_application_service(
- *,
- db,
- settings: KnowledgeServiceSettings,
- ) -> KnowledgeApplicationService:
- from app.domain.repositories import (
- KnowledgeBaseRepository,
- KnowledgeChunkRepository,
- KnowledgeDocumentRepository,
- )
- redis_client = try_build_redis_client(settings.redis_url)
- return KnowledgeApplicationService(
- settings=settings,
- base_repository=KnowledgeBaseRepository(db),
- document_repository=KnowledgeDocumentRepository(db),
- chunk_repository=KnowledgeChunkRepository(db),
- redis_client=redis_client,
- task_queue_publisher=(
- TaskQueuePublisher(client=redis_client) if redis_client is not None else None
- ))
|