from core_shared import JSONValue from app.application.retrieval import ( build_chunk_payloads, build_hash_embedding, cosine_similarity, keyword_score, stable_content_hash, ) from app.bootstrap.settings import KnowledgeServiceSettings from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument from app.domain.repositories import ( KnowledgeBaseRepository, KnowledgeChunkRepository, KnowledgeDocumentRepository, ) from app.schemas.knowledge import ( KnowledgeBaseCreateRequest, KnowledgeBaseStatusUpdateRequest, KnowledgeDocumentCreateRequest, KnowledgeSearchRequest, ) class KnowledgeApplicationService: def __init__( self, *, settings: KnowledgeServiceSettings, base_repository: KnowledgeBaseRepository, document_repository: KnowledgeDocumentRepository, chunk_repository: KnowledgeChunkRepository, ) -> None: self.settings = settings self.base_repository = base_repository self.document_repository = document_repository self.chunk_repository = chunk_repository def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase: return self.base_repository.create( tenant_id=payload.tenant_id, code=payload.code, name=payload.name, description=payload.description, metadata_json=payload.metadata_json, ) def list_bases(self, *, tenant_id: str) -> list[KnowledgeBase]: return self.base_repository.list_by_tenant(tenant_id=tenant_id) def update_base_status( self, *, knowledge_base_id: str, payload: KnowledgeBaseStatusUpdateRequest, ) -> KnowledgeBase | None: return self.base_repository.update_status( tenant_id=payload.tenant_id, knowledge_base_id=knowledge_base_id, status=payload.status, ) def create_document( self, payload: KnowledgeDocumentCreateRequest, ) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]: knowledge_base = self.base_repository.get_by_id( tenant_id=payload.tenant_id, knowledge_base_id=payload.knowledge_base_id, ) if knowledge_base is None: raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}") document = self.document_repository.create( tenant_id=payload.tenant_id, knowledge_base_id=payload.knowledge_base_id, title=payload.title, source_type=payload.source_type, source_uri=payload.source_uri, content_text=payload.content_text, content_hash=stable_content_hash(payload.content_text), metadata_json=payload.metadata_json, ) chunks = self._index_document(document=document, payload=payload) indexed_document = self.document_repository.update_status( document_id=document.id, status="indexed", ) return indexed_document or document, chunks def list_documents( self, *, tenant_id: str, knowledge_base_id: str, ) -> list[KnowledgeDocument]: return self.document_repository.list_by_base( tenant_id=tenant_id, knowledge_base_id=knowledge_base_id, ) def search( self, payload: KnowledgeSearchRequest, ) -> list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]]: chunks = self.chunk_repository.list_by_base( tenant_id=payload.tenant_id, knowledge_base_id=payload.knowledge_base_id, ) document_cache: dict[str, KnowledgeDocument] = {} query_embedding = build_hash_embedding( payload.query, dimensions=self.settings.embedding_dimensions, ) scored: list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]] = [] for chunk in chunks: document = document_cache.get(chunk.document_id) if document is None: document = self.document_repository.get_by_id( tenant_id=payload.tenant_id, document_id=chunk.document_id, ) if document is None: continue document_cache[chunk.document_id] = document if not self._matches_filters(document=document, filters_json=payload.filters_json): continue keyword = keyword_score(payload.query, chunk.content_text) vector = cosine_similarity(query_embedding, chunk.embedding_json) score = round(keyword * 0.7 + vector * 0.3, 6) scored.append( ( chunk, document, score, { "keyword_score": round(keyword, 6), "vector_score": round(vector, 6), "retrieval_mode": "hybrid-local", }, ) ) scored.sort(key=lambda item: item[2], reverse=True) return scored[: payload.top_k] def _index_document( self, *, document: KnowledgeDocument, payload: KnowledgeDocumentCreateRequest, ) -> list[KnowledgeChunk]: chunk_payloads = build_chunk_payloads( content_text=payload.content_text, chunk_size=payload.chunk_size or self.settings.default_chunk_size, chunk_overlap=payload.chunk_overlap or self.settings.default_chunk_overlap, embedding_dimensions=self.settings.embedding_dimensions, embedding_model=self.settings.embedding_model, ) return self.chunk_repository.replace_document_chunks( tenant_id=document.tenant_id, knowledge_base_id=document.knowledge_base_id, document_id=document.id, chunks=chunk_payloads, ) def _matches_filters( self, *, document: KnowledgeDocument, filters_json: dict[str, JSONValue], ) -> bool: source_type = filters_json.get("source_type") if isinstance(source_type, str) and document.source_type != source_type: return False status = filters_json.get("status") if isinstance(status, str) and document.status != status: return False return True