from core_shared import JSONValue from app.application.document_parsers import ( DocumentParseError, ParsedDocument, parse_document_content) from app.application.embeddings import EmbeddingService from app.application.retrieval import ( build_chunk_payloads, cosine_similarity, keyword_score, rerank_score, stable_content_hash) from app.bootstrap.settings import KnowledgeServiceSettings from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument from app.domain.repositories import ( KnowledgeBaseRepository, KnowledgeChunkRepository, KnowledgeDocumentRepository) from app.schemas.knowledge import ( KnowledgeBaseCreateRequest, KnowledgeBaseStatusUpdateRequest, KnowledgeDocumentCreateRequest, KnowledgeDocumentParseRequest, KnowledgeSearchRequest) class KnowledgeApplicationService: def __init__( self, *, settings: KnowledgeServiceSettings, base_repository: KnowledgeBaseRepository, document_repository: KnowledgeDocumentRepository, chunk_repository: KnowledgeChunkRepository) -> None: self.settings = settings self.base_repository = base_repository self.document_repository = document_repository self.chunk_repository = chunk_repository self.embedding_service = EmbeddingService(settings=settings) def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase: return self.base_repository.create( code=payload.code, name=payload.name, description=payload.description, metadata_json=payload.metadata_json) def list_bases(self) -> list[KnowledgeBase]: return self.base_repository.list_all() def update_base_status( self, *, knowledge_base_id: str, payload: KnowledgeBaseStatusUpdateRequest) -> KnowledgeBase | None: return self.base_repository.update_status( knowledge_base_id=knowledge_base_id, status=payload.status) def create_document( self, payload: KnowledgeDocumentCreateRequest) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]: knowledge_base = self.base_repository.get_by_id( knowledge_base_id=payload.knowledge_base_id) if knowledge_base is None: raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}") parsed = self.parse_document( KnowledgeDocumentParseRequest( source_type=payload.source_type, source_uri=payload.source_uri, content_text=payload.content_text, content_base64=payload.content_base64) ) metadata_json = { **payload.metadata_json, "parser_metadata": parsed.metadata_json, } document = self.document_repository.create( knowledge_base_id=payload.knowledge_base_id, title=payload.title, source_type=parsed.source_type, source_uri=payload.source_uri, content_text=parsed.content_text, content_hash=stable_content_hash(parsed.content_text), metadata_json=metadata_json) chunks = self._index_document( document=document, content_text=parsed.content_text, chunk_size=payload.chunk_size, chunk_overlap=payload.chunk_overlap) indexed_document = self.document_repository.update_status( document_id=document.id, status="indexed") return indexed_document or document, chunks def parse_document(self, payload: KnowledgeDocumentParseRequest) -> ParsedDocument: try: return parse_document_content( source_type=payload.source_type, content_text=payload.content_text, content_base64=payload.content_base64, source_uri=payload.source_uri) except DocumentParseError: raise def list_documents( self, *, knowledge_base_id: str) -> list[KnowledgeDocument]: return self.document_repository.list_by_base( knowledge_base_id=knowledge_base_id) def search( self, payload: KnowledgeSearchRequest) -> list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]]: document_cache: dict[str, KnowledgeDocument] = {} query_embedding_result = self.embedding_service.embed_text(payload.query) candidate_limit = max( payload.top_k * max(self.settings.retrieval_candidate_multiplier, 1), payload.top_k) vector_candidates = self.chunk_repository.search_by_vector( knowledge_base_id=payload.knowledge_base_id, embedding=query_embedding_result.embedding, limit=candidate_limit) if vector_candidates: chunks = [chunk for chunk, _ in vector_candidates] vector_scores_by_chunk_id = { chunk.id: score for chunk, score in vector_candidates } retrieval_mode = "pgvector-hybrid" else: chunks = self.chunk_repository.list_by_base( knowledge_base_id=payload.knowledge_base_id) vector_scores_by_chunk_id = {} retrieval_mode = "hybrid" scored: list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]] = [] for chunk in chunks: document = document_cache.get(chunk.document_id) if document is None: document = self.document_repository.get_by_id( document_id=chunk.document_id) if document is None: continue document_cache[chunk.document_id] = document if not self._matches_filters(document=document, filters_json=payload.filters_json): continue keyword = keyword_score(payload.query, chunk.content_text) vector = vector_scores_by_chunk_id.get(chunk.id) if vector is None: vector = cosine_similarity(query_embedding_result.embedding, chunk.embedding_json) rerank = ( rerank_score( query=payload.query, chunk_text=chunk.content_text, document_title=document.title) if self.settings.retrieval_rerank_enabled else 0.0 ) score = round( keyword * self.settings.retrieval_keyword_weight + vector * self.settings.retrieval_vector_weight + rerank * self.settings.retrieval_rerank_weight, 6) scored.append( ( chunk, document, score, { "final_score": score, "keyword_score": round(keyword, 6), "vector_score": round(vector, 6), "rerank_score": round(rerank, 6), "retrieval_mode": retrieval_mode, "rerank_enabled": self.settings.retrieval_rerank_enabled, "candidate_limit": candidate_limit, "weights": { "keyword": self.settings.retrieval_keyword_weight, "vector": self.settings.retrieval_vector_weight, "rerank": self.settings.retrieval_rerank_weight, }, "embedding_provider": query_embedding_result.provider, "embedding_model": query_embedding_result.model, "citation": { "document_id": document.id, "document_title": document.title, "source_uri": document.source_uri, "chunk_id": chunk.id, "chunk_index": chunk.chunk_index, }, }) ) scored.sort(key=lambda item: item[2], reverse=True) return scored[: payload.top_k] def _index_document( self, *, document: KnowledgeDocument, content_text: str, chunk_size: int | None, chunk_overlap: int | None) -> list[KnowledgeChunk]: chunk_payloads = build_chunk_payloads( content_text=content_text, chunk_size=chunk_size or self.settings.default_chunk_size, chunk_overlap=chunk_overlap or self.settings.default_chunk_overlap) for chunk_payload in chunk_payloads: content_text = self._read_chunk_content(chunk_payload) embedding_result = self.embedding_service.embed_text(content_text) chunk_payload["embedding_model"] = embedding_result.model chunk_payload["embedding_json"] = embedding_result.embedding chunk_payload["metadata_json"] = { "embedding_provider": embedding_result.provider, } return self.chunk_repository.replace_document_chunks( knowledge_base_id=document.knowledge_base_id, document_id=document.id, chunks=chunk_payloads) def _read_chunk_content(self, chunk_payload: dict[str, JSONValue]) -> str: value = chunk_payload.get("content_text") return value if isinstance(value, str) else "" def _matches_filters( self, *, document: KnowledgeDocument, filters_json: dict[str, JSONValue]) -> bool: source_type = filters_json.get("source_type") if isinstance(source_type, str) and document.source_type != source_type: return False status = filters_json.get("status") if isinstance(status, str) and document.status != status: return False return True