services.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. from core_shared import JSONValue
  2. from app.application.document_parsers import (
  3. DocumentParseError,
  4. ParsedDocument,
  5. parse_document_content)
  6. from app.application.embeddings import EmbeddingService
  7. from app.application.retrieval import (
  8. build_chunk_payloads,
  9. cosine_similarity,
  10. keyword_score,
  11. rerank_score,
  12. stable_content_hash)
  13. from app.bootstrap.settings import KnowledgeServiceSettings
  14. from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument
  15. from app.domain.repositories import (
  16. KnowledgeBaseRepository,
  17. KnowledgeChunkRepository,
  18. KnowledgeDocumentRepository)
  19. from app.schemas.knowledge import (
  20. KnowledgeBaseCreateRequest,
  21. KnowledgeBaseStatusUpdateRequest,
  22. KnowledgeDocumentCreateRequest,
  23. KnowledgeDocumentParseRequest,
  24. KnowledgeSearchRequest)
  25. class KnowledgeApplicationService:
  26. def __init__(
  27. self,
  28. *,
  29. settings: KnowledgeServiceSettings,
  30. base_repository: KnowledgeBaseRepository,
  31. document_repository: KnowledgeDocumentRepository,
  32. chunk_repository: KnowledgeChunkRepository) -> None:
  33. self.settings = settings
  34. self.base_repository = base_repository
  35. self.document_repository = document_repository
  36. self.chunk_repository = chunk_repository
  37. self.embedding_service = EmbeddingService(settings=settings)
  38. def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase:
  39. return self.base_repository.create(
  40. code=payload.code,
  41. name=payload.name,
  42. description=payload.description,
  43. metadata_json=payload.metadata_json)
  44. def list_bases(self) -> list[KnowledgeBase]:
  45. return self.base_repository.list_all()
  46. def update_base_status(
  47. self,
  48. *,
  49. knowledge_base_id: str,
  50. payload: KnowledgeBaseStatusUpdateRequest) -> KnowledgeBase | None:
  51. return self.base_repository.update_status(
  52. knowledge_base_id=knowledge_base_id,
  53. status=payload.status)
  54. def create_document(
  55. self,
  56. payload: KnowledgeDocumentCreateRequest) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
  57. knowledge_base = self.base_repository.get_by_id(
  58. knowledge_base_id=payload.knowledge_base_id)
  59. if knowledge_base is None:
  60. raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
  61. parsed = self.parse_document(
  62. KnowledgeDocumentParseRequest(
  63. source_type=payload.source_type,
  64. source_uri=payload.source_uri,
  65. content_text=payload.content_text,
  66. content_base64=payload.content_base64)
  67. )
  68. metadata_json = {
  69. **payload.metadata_json,
  70. "parser_metadata": parsed.metadata_json,
  71. }
  72. document = self.document_repository.create(
  73. knowledge_base_id=payload.knowledge_base_id,
  74. title=payload.title,
  75. source_type=parsed.source_type,
  76. source_uri=payload.source_uri,
  77. content_text=parsed.content_text,
  78. content_hash=stable_content_hash(parsed.content_text),
  79. metadata_json=metadata_json)
  80. chunks = self._index_document(
  81. document=document,
  82. content_text=parsed.content_text,
  83. chunk_size=payload.chunk_size,
  84. chunk_overlap=payload.chunk_overlap)
  85. indexed_document = self.document_repository.update_status(
  86. document_id=document.id,
  87. status="indexed")
  88. return indexed_document or document, chunks
  89. def parse_document(self, payload: KnowledgeDocumentParseRequest) -> ParsedDocument:
  90. try:
  91. return parse_document_content(
  92. source_type=payload.source_type,
  93. content_text=payload.content_text,
  94. content_base64=payload.content_base64,
  95. source_uri=payload.source_uri)
  96. except DocumentParseError:
  97. raise
  98. def list_documents(
  99. self,
  100. *,
  101. knowledge_base_id: str) -> list[KnowledgeDocument]:
  102. return self.document_repository.list_by_base(
  103. knowledge_base_id=knowledge_base_id)
  104. def search(
  105. self,
  106. payload: KnowledgeSearchRequest) -> list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]]:
  107. document_cache: dict[str, KnowledgeDocument] = {}
  108. query_embedding_result = self.embedding_service.embed_text(payload.query)
  109. candidate_limit = max(
  110. payload.top_k * max(self.settings.retrieval_candidate_multiplier, 1),
  111. payload.top_k)
  112. vector_candidates = self.chunk_repository.search_by_vector(
  113. knowledge_base_id=payload.knowledge_base_id,
  114. embedding=query_embedding_result.embedding,
  115. limit=candidate_limit)
  116. if vector_candidates:
  117. chunks = [chunk for chunk, _ in vector_candidates]
  118. vector_scores_by_chunk_id = {
  119. chunk.id: score for chunk, score in vector_candidates
  120. }
  121. retrieval_mode = "pgvector-hybrid"
  122. else:
  123. chunks = self.chunk_repository.list_by_base(
  124. knowledge_base_id=payload.knowledge_base_id)
  125. vector_scores_by_chunk_id = {}
  126. retrieval_mode = "hybrid"
  127. scored: list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]] = []
  128. for chunk in chunks:
  129. document = document_cache.get(chunk.document_id)
  130. if document is None:
  131. document = self.document_repository.get_by_id(
  132. document_id=chunk.document_id)
  133. if document is None:
  134. continue
  135. document_cache[chunk.document_id] = document
  136. if not self._matches_filters(document=document, filters_json=payload.filters_json):
  137. continue
  138. keyword = keyword_score(payload.query, chunk.content_text)
  139. vector = vector_scores_by_chunk_id.get(chunk.id)
  140. if vector is None:
  141. vector = cosine_similarity(query_embedding_result.embedding, chunk.embedding_json)
  142. rerank = (
  143. rerank_score(
  144. query=payload.query,
  145. chunk_text=chunk.content_text,
  146. document_title=document.title)
  147. if self.settings.retrieval_rerank_enabled
  148. else 0.0
  149. )
  150. score = round(
  151. keyword * self.settings.retrieval_keyword_weight
  152. + vector * self.settings.retrieval_vector_weight
  153. + rerank * self.settings.retrieval_rerank_weight,
  154. 6)
  155. scored.append(
  156. (
  157. chunk,
  158. document,
  159. score,
  160. {
  161. "final_score": score,
  162. "keyword_score": round(keyword, 6),
  163. "vector_score": round(vector, 6),
  164. "rerank_score": round(rerank, 6),
  165. "retrieval_mode": retrieval_mode,
  166. "rerank_enabled": self.settings.retrieval_rerank_enabled,
  167. "candidate_limit": candidate_limit,
  168. "weights": {
  169. "keyword": self.settings.retrieval_keyword_weight,
  170. "vector": self.settings.retrieval_vector_weight,
  171. "rerank": self.settings.retrieval_rerank_weight,
  172. },
  173. "embedding_provider": query_embedding_result.provider,
  174. "embedding_model": query_embedding_result.model,
  175. "citation": {
  176. "document_id": document.id,
  177. "document_title": document.title,
  178. "source_uri": document.source_uri,
  179. "chunk_id": chunk.id,
  180. "chunk_index": chunk.chunk_index,
  181. },
  182. })
  183. )
  184. scored.sort(key=lambda item: item[2], reverse=True)
  185. return scored[: payload.top_k]
  186. def _index_document(
  187. self,
  188. *,
  189. document: KnowledgeDocument,
  190. content_text: str,
  191. chunk_size: int | None,
  192. chunk_overlap: int | None) -> list[KnowledgeChunk]:
  193. chunk_payloads = build_chunk_payloads(
  194. content_text=content_text,
  195. chunk_size=chunk_size or self.settings.default_chunk_size,
  196. chunk_overlap=chunk_overlap or self.settings.default_chunk_overlap)
  197. for chunk_payload in chunk_payloads:
  198. content_text = self._read_chunk_content(chunk_payload)
  199. embedding_result = self.embedding_service.embed_text(content_text)
  200. chunk_payload["embedding_model"] = embedding_result.model
  201. chunk_payload["embedding_json"] = embedding_result.embedding
  202. chunk_payload["metadata_json"] = {
  203. "embedding_provider": embedding_result.provider,
  204. }
  205. return self.chunk_repository.replace_document_chunks(
  206. knowledge_base_id=document.knowledge_base_id,
  207. document_id=document.id,
  208. chunks=chunk_payloads)
  209. def _read_chunk_content(self, chunk_payload: dict[str, JSONValue]) -> str:
  210. value = chunk_payload.get("content_text")
  211. return value if isinstance(value, str) else ""
  212. def _matches_filters(
  213. self,
  214. *,
  215. document: KnowledgeDocument,
  216. filters_json: dict[str, JSONValue]) -> bool:
  217. source_type = filters_json.get("source_type")
  218. if isinstance(source_type, str) and document.source_type != source_type:
  219. return False
  220. status = filters_json.get("status")
  221. if isinstance(status, str) and document.status != status:
  222. return False
  223. return True