services.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. from core_shared import JSONValue
  2. from app.application.document_parsers import (
  3. DocumentParseError,
  4. ParsedDocument,
  5. parse_document_content,
  6. )
  7. from app.application.embeddings import EmbeddingService
  8. from app.application.retrieval import (
  9. build_chunk_payloads,
  10. cosine_similarity,
  11. keyword_score,
  12. stable_content_hash,
  13. )
  14. from app.bootstrap.settings import KnowledgeServiceSettings
  15. from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument
  16. from app.domain.repositories import (
  17. KnowledgeBaseRepository,
  18. KnowledgeChunkRepository,
  19. KnowledgeDocumentRepository,
  20. )
  21. from app.schemas.knowledge import (
  22. KnowledgeBaseCreateRequest,
  23. KnowledgeBaseStatusUpdateRequest,
  24. KnowledgeDocumentCreateRequest,
  25. KnowledgeDocumentParseRequest,
  26. KnowledgeSearchRequest,
  27. )
  28. class KnowledgeApplicationService:
  29. def __init__(
  30. self,
  31. *,
  32. settings: KnowledgeServiceSettings,
  33. base_repository: KnowledgeBaseRepository,
  34. document_repository: KnowledgeDocumentRepository,
  35. chunk_repository: KnowledgeChunkRepository,
  36. ) -> None:
  37. self.settings = settings
  38. self.base_repository = base_repository
  39. self.document_repository = document_repository
  40. self.chunk_repository = chunk_repository
  41. self.embedding_service = EmbeddingService(settings=settings)
  42. def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase:
  43. return self.base_repository.create(
  44. tenant_id=payload.tenant_id,
  45. code=payload.code,
  46. name=payload.name,
  47. description=payload.description,
  48. metadata_json=payload.metadata_json,
  49. )
  50. def list_bases(self, *, tenant_id: str) -> list[KnowledgeBase]:
  51. return self.base_repository.list_by_tenant(tenant_id=tenant_id)
  52. def update_base_status(
  53. self,
  54. *,
  55. knowledge_base_id: str,
  56. payload: KnowledgeBaseStatusUpdateRequest,
  57. ) -> KnowledgeBase | None:
  58. return self.base_repository.update_status(
  59. tenant_id=payload.tenant_id,
  60. knowledge_base_id=knowledge_base_id,
  61. status=payload.status,
  62. )
  63. def create_document(
  64. self,
  65. payload: KnowledgeDocumentCreateRequest,
  66. ) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
  67. knowledge_base = self.base_repository.get_by_id(
  68. tenant_id=payload.tenant_id,
  69. knowledge_base_id=payload.knowledge_base_id,
  70. )
  71. if knowledge_base is None:
  72. raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
  73. parsed = self.parse_document(
  74. KnowledgeDocumentParseRequest(
  75. source_type=payload.source_type,
  76. source_uri=payload.source_uri,
  77. content_text=payload.content_text,
  78. content_base64=payload.content_base64,
  79. )
  80. )
  81. metadata_json = {
  82. **payload.metadata_json,
  83. "parser_metadata": parsed.metadata_json,
  84. }
  85. document = self.document_repository.create(
  86. tenant_id=payload.tenant_id,
  87. knowledge_base_id=payload.knowledge_base_id,
  88. title=payload.title,
  89. source_type=parsed.source_type,
  90. source_uri=payload.source_uri,
  91. content_text=parsed.content_text,
  92. content_hash=stable_content_hash(parsed.content_text),
  93. metadata_json=metadata_json,
  94. )
  95. chunks = self._index_document(
  96. document=document,
  97. content_text=parsed.content_text,
  98. chunk_size=payload.chunk_size,
  99. chunk_overlap=payload.chunk_overlap,
  100. )
  101. indexed_document = self.document_repository.update_status(
  102. document_id=document.id,
  103. status="indexed",
  104. )
  105. return indexed_document or document, chunks
  106. def parse_document(self, payload: KnowledgeDocumentParseRequest) -> ParsedDocument:
  107. try:
  108. return parse_document_content(
  109. source_type=payload.source_type,
  110. content_text=payload.content_text,
  111. content_base64=payload.content_base64,
  112. source_uri=payload.source_uri,
  113. )
  114. except DocumentParseError:
  115. raise
  116. def list_documents(
  117. self,
  118. *,
  119. tenant_id: str,
  120. knowledge_base_id: str,
  121. ) -> list[KnowledgeDocument]:
  122. return self.document_repository.list_by_base(
  123. tenant_id=tenant_id,
  124. knowledge_base_id=knowledge_base_id,
  125. )
  126. def search(
  127. self,
  128. payload: KnowledgeSearchRequest,
  129. ) -> list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]]:
  130. document_cache: dict[str, KnowledgeDocument] = {}
  131. query_embedding_result = self.embedding_service.embed_text(payload.query)
  132. vector_candidates = self.chunk_repository.search_by_vector(
  133. tenant_id=payload.tenant_id,
  134. knowledge_base_id=payload.knowledge_base_id,
  135. embedding=query_embedding_result.embedding,
  136. limit=max(payload.top_k * 5, payload.top_k),
  137. )
  138. if vector_candidates:
  139. chunks = [chunk for chunk, _ in vector_candidates]
  140. vector_scores_by_chunk_id = {
  141. chunk.id: score for chunk, score in vector_candidates
  142. }
  143. retrieval_mode = "pgvector-hybrid"
  144. else:
  145. chunks = self.chunk_repository.list_by_base(
  146. tenant_id=payload.tenant_id,
  147. knowledge_base_id=payload.knowledge_base_id,
  148. )
  149. vector_scores_by_chunk_id = {}
  150. retrieval_mode = "hybrid"
  151. scored: list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]] = []
  152. for chunk in chunks:
  153. document = document_cache.get(chunk.document_id)
  154. if document is None:
  155. document = self.document_repository.get_by_id(
  156. tenant_id=payload.tenant_id,
  157. document_id=chunk.document_id,
  158. )
  159. if document is None:
  160. continue
  161. document_cache[chunk.document_id] = document
  162. if not self._matches_filters(document=document, filters_json=payload.filters_json):
  163. continue
  164. keyword = keyword_score(payload.query, chunk.content_text)
  165. vector = vector_scores_by_chunk_id.get(chunk.id)
  166. if vector is None:
  167. vector = cosine_similarity(query_embedding_result.embedding, chunk.embedding_json)
  168. score = round(keyword * 0.7 + vector * 0.3, 6)
  169. scored.append(
  170. (
  171. chunk,
  172. document,
  173. score,
  174. {
  175. "keyword_score": round(keyword, 6),
  176. "vector_score": round(vector, 6),
  177. "retrieval_mode": retrieval_mode,
  178. "embedding_provider": query_embedding_result.provider,
  179. "embedding_model": query_embedding_result.model,
  180. },
  181. )
  182. )
  183. scored.sort(key=lambda item: item[2], reverse=True)
  184. return scored[: payload.top_k]
  185. def _index_document(
  186. self,
  187. *,
  188. document: KnowledgeDocument,
  189. content_text: str,
  190. chunk_size: int | None,
  191. chunk_overlap: int | None,
  192. ) -> list[KnowledgeChunk]:
  193. chunk_payloads = build_chunk_payloads(
  194. content_text=content_text,
  195. chunk_size=chunk_size or self.settings.default_chunk_size,
  196. chunk_overlap=chunk_overlap or self.settings.default_chunk_overlap,
  197. )
  198. for chunk_payload in chunk_payloads:
  199. content_text = self._read_chunk_content(chunk_payload)
  200. embedding_result = self.embedding_service.embed_text(content_text)
  201. chunk_payload["embedding_model"] = embedding_result.model
  202. chunk_payload["embedding_json"] = embedding_result.embedding
  203. chunk_payload["metadata_json"] = {
  204. "embedding_provider": embedding_result.provider,
  205. }
  206. return self.chunk_repository.replace_document_chunks(
  207. tenant_id=document.tenant_id,
  208. knowledge_base_id=document.knowledge_base_id,
  209. document_id=document.id,
  210. chunks=chunk_payloads,
  211. )
  212. def _read_chunk_content(self, chunk_payload: dict[str, JSONValue]) -> str:
  213. value = chunk_payload.get("content_text")
  214. return value if isinstance(value, str) else ""
  215. def _matches_filters(
  216. self,
  217. *,
  218. document: KnowledgeDocument,
  219. filters_json: dict[str, JSONValue],
  220. ) -> bool:
  221. source_type = filters_json.get("source_type")
  222. if isinstance(source_type, str) and document.source_type != source_type:
  223. return False
  224. status = filters_json.get("status")
  225. if isinstance(status, str) and document.status != status:
  226. return False
  227. return True