| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- from core_shared import JSONValue
- from app.application.document_parsers import (
- DocumentParseError,
- ParsedDocument,
- parse_document_content,
- )
- from app.application.embeddings import EmbeddingService
- from app.application.retrieval import (
- build_chunk_payloads,
- cosine_similarity,
- keyword_score,
- stable_content_hash,
- )
- from app.bootstrap.settings import KnowledgeServiceSettings
- from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument
- from app.domain.repositories import (
- KnowledgeBaseRepository,
- KnowledgeChunkRepository,
- KnowledgeDocumentRepository,
- )
- from app.schemas.knowledge import (
- KnowledgeBaseCreateRequest,
- KnowledgeBaseStatusUpdateRequest,
- KnowledgeDocumentCreateRequest,
- KnowledgeDocumentParseRequest,
- KnowledgeSearchRequest,
- )
- class KnowledgeApplicationService:
- def __init__(
- self,
- *,
- settings: KnowledgeServiceSettings,
- base_repository: KnowledgeBaseRepository,
- document_repository: KnowledgeDocumentRepository,
- chunk_repository: KnowledgeChunkRepository,
- ) -> None:
- self.settings = settings
- self.base_repository = base_repository
- self.document_repository = document_repository
- self.chunk_repository = chunk_repository
- self.embedding_service = EmbeddingService(settings=settings)
- def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase:
- return self.base_repository.create(
- tenant_id=payload.tenant_id,
- code=payload.code,
- name=payload.name,
- description=payload.description,
- metadata_json=payload.metadata_json,
- )
- def list_bases(self, *, tenant_id: str) -> list[KnowledgeBase]:
- return self.base_repository.list_by_tenant(tenant_id=tenant_id)
- def update_base_status(
- self,
- *,
- knowledge_base_id: str,
- payload: KnowledgeBaseStatusUpdateRequest,
- ) -> KnowledgeBase | None:
- return self.base_repository.update_status(
- tenant_id=payload.tenant_id,
- knowledge_base_id=knowledge_base_id,
- status=payload.status,
- )
- def create_document(
- self,
- payload: KnowledgeDocumentCreateRequest,
- ) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
- knowledge_base = self.base_repository.get_by_id(
- tenant_id=payload.tenant_id,
- knowledge_base_id=payload.knowledge_base_id,
- )
- if knowledge_base is None:
- raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
- parsed = self.parse_document(
- KnowledgeDocumentParseRequest(
- source_type=payload.source_type,
- source_uri=payload.source_uri,
- content_text=payload.content_text,
- content_base64=payload.content_base64,
- )
- )
- metadata_json = {
- **payload.metadata_json,
- "parser_metadata": parsed.metadata_json,
- }
- document = self.document_repository.create(
- tenant_id=payload.tenant_id,
- knowledge_base_id=payload.knowledge_base_id,
- title=payload.title,
- source_type=parsed.source_type,
- source_uri=payload.source_uri,
- content_text=parsed.content_text,
- content_hash=stable_content_hash(parsed.content_text),
- metadata_json=metadata_json,
- )
- chunks = self._index_document(
- document=document,
- content_text=parsed.content_text,
- chunk_size=payload.chunk_size,
- chunk_overlap=payload.chunk_overlap,
- )
- indexed_document = self.document_repository.update_status(
- document_id=document.id,
- status="indexed",
- )
- return indexed_document or document, chunks
- def parse_document(self, payload: KnowledgeDocumentParseRequest) -> ParsedDocument:
- try:
- return parse_document_content(
- source_type=payload.source_type,
- content_text=payload.content_text,
- content_base64=payload.content_base64,
- source_uri=payload.source_uri,
- )
- except DocumentParseError:
- raise
- def list_documents(
- self,
- *,
- tenant_id: str,
- knowledge_base_id: str,
- ) -> list[KnowledgeDocument]:
- return self.document_repository.list_by_base(
- tenant_id=tenant_id,
- knowledge_base_id=knowledge_base_id,
- )
- def search(
- self,
- payload: KnowledgeSearchRequest,
- ) -> list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]]:
- document_cache: dict[str, KnowledgeDocument] = {}
- query_embedding_result = self.embedding_service.embed_text(payload.query)
- vector_candidates = self.chunk_repository.search_by_vector(
- tenant_id=payload.tenant_id,
- knowledge_base_id=payload.knowledge_base_id,
- embedding=query_embedding_result.embedding,
- limit=max(payload.top_k * 5, payload.top_k),
- )
- if vector_candidates:
- chunks = [chunk for chunk, _ in vector_candidates]
- vector_scores_by_chunk_id = {
- chunk.id: score for chunk, score in vector_candidates
- }
- retrieval_mode = "pgvector-hybrid"
- else:
- chunks = self.chunk_repository.list_by_base(
- tenant_id=payload.tenant_id,
- knowledge_base_id=payload.knowledge_base_id,
- )
- vector_scores_by_chunk_id = {}
- retrieval_mode = "hybrid"
- scored: list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]] = []
- for chunk in chunks:
- document = document_cache.get(chunk.document_id)
- if document is None:
- document = self.document_repository.get_by_id(
- tenant_id=payload.tenant_id,
- document_id=chunk.document_id,
- )
- if document is None:
- continue
- document_cache[chunk.document_id] = document
- if not self._matches_filters(document=document, filters_json=payload.filters_json):
- continue
- keyword = keyword_score(payload.query, chunk.content_text)
- vector = vector_scores_by_chunk_id.get(chunk.id)
- if vector is None:
- vector = cosine_similarity(query_embedding_result.embedding, chunk.embedding_json)
- score = round(keyword * 0.7 + vector * 0.3, 6)
- scored.append(
- (
- chunk,
- document,
- score,
- {
- "keyword_score": round(keyword, 6),
- "vector_score": round(vector, 6),
- "retrieval_mode": retrieval_mode,
- "embedding_provider": query_embedding_result.provider,
- "embedding_model": query_embedding_result.model,
- },
- )
- )
- scored.sort(key=lambda item: item[2], reverse=True)
- return scored[: payload.top_k]
- def _index_document(
- self,
- *,
- document: KnowledgeDocument,
- content_text: str,
- chunk_size: int | None,
- chunk_overlap: int | None,
- ) -> list[KnowledgeChunk]:
- chunk_payloads = build_chunk_payloads(
- content_text=content_text,
- chunk_size=chunk_size or self.settings.default_chunk_size,
- chunk_overlap=chunk_overlap or self.settings.default_chunk_overlap,
- )
- for chunk_payload in chunk_payloads:
- content_text = self._read_chunk_content(chunk_payload)
- embedding_result = self.embedding_service.embed_text(content_text)
- chunk_payload["embedding_model"] = embedding_result.model
- chunk_payload["embedding_json"] = embedding_result.embedding
- chunk_payload["metadata_json"] = {
- "embedding_provider": embedding_result.provider,
- }
- return self.chunk_repository.replace_document_chunks(
- tenant_id=document.tenant_id,
- knowledge_base_id=document.knowledge_base_id,
- document_id=document.id,
- chunks=chunk_payloads,
- )
- def _read_chunk_content(self, chunk_payload: dict[str, JSONValue]) -> str:
- value = chunk_payload.get("content_text")
- return value if isinstance(value, str) else ""
- def _matches_filters(
- self,
- *,
- document: KnowledgeDocument,
- filters_json: dict[str, JSONValue],
- ) -> bool:
- source_type = filters_json.get("source_type")
- if isinstance(source_type, str) and document.source_type != source_type:
- return False
- status = filters_json.get("status")
- if isinstance(status, str) and document.status != status:
- return False
- return True
|