"""Knowledge application service — thin facade delegating to sub-services.""" from __future__ import annotations from typing import TYPE_CHECKING from core_shared import JSONValue, try_build_redis_client from core_shared.task_queue import TaskQueuePublisher from app.application.crud_service import KnowledgeCrudService from app.application.embeddings import EmbeddingService from app.application.indexing_service import ( KnowledgeDocumentIngestResult, KnowledgeIndexingError, KnowledgeIndexingService, ) from app.application.search_service import KnowledgeSearchService from app.application.settings_service import KnowledgeSettingsService from app.bootstrap.settings import KnowledgeServiceSettings from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument from app.schemas.knowledge import ( KnowledgeBaseCreateRequest, KnowledgeBaseCreateRequestDto, KnowledgeBaseReindexRequestDto, KnowledgeBaseStatusUpdateRequest, KnowledgeBaseUpdateRequestDto, KnowledgeDocumentCreateRequest, KnowledgeDocumentCreateRequestDto, KnowledgeDocumentParseRequest, KnowledgeDocumentReindexRequestDto, KnowledgeDocumentUpdateRequestDto, KnowledgeIndexJobData, KnowledgeSearchRequest, KnowledgeSettingsDto, KnowledgeSettingsUpdateRequestDto, ) if TYPE_CHECKING: from redis import Redis from app.domain.repositories import ( KnowledgeBaseRepository, KnowledgeChunkRepository, KnowledgeDocumentRepository, ) from app.infrastructure.object_storage import KnowledgeObjectStorage class KnowledgeApplicationService: """Facade composing CRUD, Indexing, Search, and Settings sub-services.""" def __init__( self, *, settings: KnowledgeServiceSettings, base_repository: KnowledgeBaseRepository, document_repository: KnowledgeDocumentRepository, chunk_repository: KnowledgeChunkRepository, object_storage: KnowledgeObjectStorage | None = None, redis_client: Redis | None = None, task_queue_publisher: TaskQueuePublisher | None = None, ) -> None: self.settings = settings self.base_repository = base_repository self.document_repository = document_repository self.chunk_repository = chunk_repository self.embedding_service = EmbeddingService(settings=settings) self._object_storage = object_storage self.redis_client = redis_client self.task_queue_publisher = task_queue_publisher self.crud_service = KnowledgeCrudService( settings=settings, base_repository=base_repository, document_repository=document_repository, chunk_repository=chunk_repository, object_storage=object_storage, ) self.indexing_service = KnowledgeIndexingService( settings=settings, base_repository=base_repository, document_repository=document_repository, chunk_repository=chunk_repository, embedding_service=self.embedding_service, object_storage=object_storage, redis_client=redis_client, task_queue_publisher=task_queue_publisher, ) self.search_service = KnowledgeSearchService( settings=settings, base_repository=base_repository, document_repository=document_repository, chunk_repository=chunk_repository, embedding_service=self.embedding_service, ) self.settings_service = KnowledgeSettingsService( settings=settings, base_repository=base_repository, ) @property def object_storage(self) -> KnowledgeObjectStorage: return self.crud_service.object_storage # ── Knowledge Base ──────────────────────────────────────────────── def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase: return self.crud_service.create_base(payload) def create_base_from_contract(self, payload: KnowledgeBaseCreateRequestDto) -> KnowledgeBase: return self.crud_service.create_base_from_contract(payload) def list_bases(self) -> list[KnowledgeBase]: return self.crud_service.list_bases() def list_bases_filtered(self, *, keyword: str | None = None, status: str | None = None) -> list[KnowledgeBase]: return self.crud_service.list_bases_filtered(keyword=keyword, status=status) def update_base_from_contract(self, payload: KnowledgeBaseUpdateRequestDto) -> KnowledgeBase | None: return self.crud_service.update_base_from_contract(payload) def delete_base(self, *, knowledge_base_id: str) -> bool: return self.crud_service.delete_base(knowledge_base_id=knowledge_base_id) def update_base_status(self, *, knowledge_base_id: str, payload: KnowledgeBaseStatusUpdateRequest) -> KnowledgeBase | None: return self.crud_service.update_base_status(knowledge_base_id=knowledge_base_id, payload=payload) # ── Document CRUD ───────────────────────────────────────────────── def list_documents(self, *, knowledge_base_id: str) -> list[KnowledgeDocument]: return self.crud_service.list_documents(knowledge_base_id=knowledge_base_id) def list_documents_filtered(self, *, knowledge_base_id: str | None = None, keyword: str | None = None, status: str | None = None, source_type: str | None = None) -> list[KnowledgeDocument]: return self.crud_service.list_documents_filtered( knowledge_base_id=knowledge_base_id, keyword=keyword, status=status, source_type=source_type) def update_document_from_contract(self, payload: KnowledgeDocumentUpdateRequestDto) -> KnowledgeDocument | None: return self.crud_service.update_document_from_contract(payload) def delete_document(self, *, document_id: str) -> bool: return self.crud_service.delete_document(document_id=document_id) def delete_document_result(self, *, document_id: str) -> dict[str, JSONValue]: return self.crud_service.delete_document_result(document_id=document_id) def read_document_content(self, *, document_id: str, include_text: bool = True, include_base64: bool = False) -> dict[str, JSONValue] | None: return self.crud_service.read_document_content( document_id=document_id, include_text=include_text, include_base64=include_base64) def read_document_storage_status(self, *, document_id: str) -> dict[str, JSONValue] | None: return self.crud_service.read_document_storage_status(document_id=document_id) def read_storage_health(self) -> dict[str, JSONValue]: return self.crud_service.read_storage_health() # ── Chunk CRUD ──────────────────────────────────────────────────── def list_chunks_filtered(self, *, knowledge_base_id: str | None = None, document_id: str | None = None, keyword: str | None = None) -> list[KnowledgeChunk]: return self.crud_service.list_chunks_filtered( knowledge_base_id=knowledge_base_id, document_id=document_id, keyword=keyword) def delete_chunk(self, *, chunk_id: str) -> bool: return self.crud_service.delete_chunk(chunk_id=chunk_id) # ── Parse ───────────────────────────────────────────────────────── def parse_document(self, payload: KnowledgeDocumentParseRequest): return self.crud_service.parse_document(payload) # ── Indexing ────────────────────────────────────────────────────── def create_document(self, payload: KnowledgeDocumentCreateRequest) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]: return self.indexing_service.create_document(payload) def create_document_from_contract(self, payload: KnowledgeDocumentCreateRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]: return self.indexing_service.create_document_from_contract(payload) def create_document_from_contract_result(self, payload: KnowledgeDocumentCreateRequestDto) -> KnowledgeDocumentIngestResult: return self.indexing_service.create_document_from_contract_result(payload) def create_document_index_job(self, payload: KnowledgeDocumentCreateRequest) -> KnowledgeDocumentIngestResult: return self.indexing_service.create_document_index_job(payload) def queue_document_indexing(self, *, document: KnowledgeDocument, action: str, chunk_size: int | None = None, chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, KnowledgeIndexJobData]: return self.indexing_service.queue_document_indexing( document=document, action=action, chunk_size=chunk_size, chunk_overlap=chunk_overlap) def process_document_index_job(self, *, document_id: str, action: str, worker_key: str, job_id: str | None = None, chunk_size: int | None = None, chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None: return self.indexing_service.process_document_index_job( document_id=document_id, action=action, worker_key=worker_key, job_id=job_id, chunk_size=chunk_size, chunk_overlap=chunk_overlap) def execute_document_index_job(self, *, document_id: str, action: str, worker_key: str, lease_seconds: int, job_id: str | None = None, redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None: return self.indexing_service.execute_document_index_job( document_id=document_id, action=action, worker_key=worker_key, lease_seconds=lease_seconds, job_id=job_id, redis_client=redis_client) def execute_next_pending_document_job(self, *, worker_key: str, lease_seconds: int, stale_indexing_seconds: int, redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None: return self.indexing_service.execute_next_pending_document_job( worker_key=worker_key, lease_seconds=lease_seconds, stale_indexing_seconds=stale_indexing_seconds, redis_client=redis_client) def list_index_jobs(self, *, knowledge_base_id: str | None = None, document_id: str | None = None, status: str | None = None) -> list[KnowledgeIndexJobData]: return self.indexing_service.list_index_jobs( knowledge_base_id=knowledge_base_id, document_id=document_id, status=status) def detail_index_job(self, *, document_id: str) -> KnowledgeIndexJobData | None: return self.indexing_service.detail_index_job(document_id=document_id) def reindex_document(self, payload: KnowledgeDocumentReindexRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None: return self.indexing_service.reindex_document(payload) def reindex_document_from_contract_result(self, payload: KnowledgeDocumentReindexRequestDto) -> KnowledgeDocumentIngestResult | None: return self.indexing_service.reindex_document_from_contract_result(payload) def reindex_base_from_contract(self, payload: KnowledgeBaseReindexRequestDto) -> list[KnowledgeIndexJobData]: return self.indexing_service.reindex_base_from_contract(payload) # ── Search ──────────────────────────────────────────────────────── def search(self, payload: KnowledgeSearchRequest) -> list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]]: return self.search_service.search(payload) # ── Settings ────────────────────────────────────────────────────── def read_settings(self, *, knowledge_base_id: str | None = None) -> KnowledgeSettingsDto: return self.settings_service.read_settings(knowledge_base_id=knowledge_base_id) def update_settings(self, payload: KnowledgeSettingsUpdateRequestDto) -> KnowledgeSettingsDto: return self.settings_service.update_settings(payload) def build_knowledge_application_service( *, db, settings: KnowledgeServiceSettings, ) -> KnowledgeApplicationService: from app.domain.repositories import ( KnowledgeBaseRepository, KnowledgeChunkRepository, KnowledgeDocumentRepository, ) redis_client = try_build_redis_client(settings.redis_url) return KnowledgeApplicationService( settings=settings, base_repository=KnowledgeBaseRepository(db), document_repository=KnowledgeDocumentRepository(db), chunk_repository=KnowledgeChunkRepository(db), redis_client=redis_client, task_queue_publisher=( TaskQueuePublisher(client=redis_client) if redis_client is not None else None ))