services.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. from core_shared import JSONValue
  2. from app.application.embeddings import EmbeddingService
  3. from app.application.retrieval import (
  4. build_chunk_payloads,
  5. cosine_similarity,
  6. keyword_score,
  7. stable_content_hash,
  8. )
  9. from app.bootstrap.settings import KnowledgeServiceSettings
  10. from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument
  11. from app.domain.repositories import (
  12. KnowledgeBaseRepository,
  13. KnowledgeChunkRepository,
  14. KnowledgeDocumentRepository,
  15. )
  16. from app.schemas.knowledge import (
  17. KnowledgeBaseCreateRequest,
  18. KnowledgeBaseStatusUpdateRequest,
  19. KnowledgeDocumentCreateRequest,
  20. KnowledgeSearchRequest,
  21. )
  22. class KnowledgeApplicationService:
  23. def __init__(
  24. self,
  25. *,
  26. settings: KnowledgeServiceSettings,
  27. base_repository: KnowledgeBaseRepository,
  28. document_repository: KnowledgeDocumentRepository,
  29. chunk_repository: KnowledgeChunkRepository,
  30. ) -> None:
  31. self.settings = settings
  32. self.base_repository = base_repository
  33. self.document_repository = document_repository
  34. self.chunk_repository = chunk_repository
  35. self.embedding_service = EmbeddingService(settings=settings)
  36. def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase:
  37. return self.base_repository.create(
  38. tenant_id=payload.tenant_id,
  39. code=payload.code,
  40. name=payload.name,
  41. description=payload.description,
  42. metadata_json=payload.metadata_json,
  43. )
  44. def list_bases(self, *, tenant_id: str) -> list[KnowledgeBase]:
  45. return self.base_repository.list_by_tenant(tenant_id=tenant_id)
  46. def update_base_status(
  47. self,
  48. *,
  49. knowledge_base_id: str,
  50. payload: KnowledgeBaseStatusUpdateRequest,
  51. ) -> KnowledgeBase | None:
  52. return self.base_repository.update_status(
  53. tenant_id=payload.tenant_id,
  54. knowledge_base_id=knowledge_base_id,
  55. status=payload.status,
  56. )
  57. def create_document(
  58. self,
  59. payload: KnowledgeDocumentCreateRequest,
  60. ) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
  61. knowledge_base = self.base_repository.get_by_id(
  62. tenant_id=payload.tenant_id,
  63. knowledge_base_id=payload.knowledge_base_id,
  64. )
  65. if knowledge_base is None:
  66. raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
  67. document = self.document_repository.create(
  68. tenant_id=payload.tenant_id,
  69. knowledge_base_id=payload.knowledge_base_id,
  70. title=payload.title,
  71. source_type=payload.source_type,
  72. source_uri=payload.source_uri,
  73. content_text=payload.content_text,
  74. content_hash=stable_content_hash(payload.content_text),
  75. metadata_json=payload.metadata_json,
  76. )
  77. chunks = self._index_document(document=document, payload=payload)
  78. indexed_document = self.document_repository.update_status(
  79. document_id=document.id,
  80. status="indexed",
  81. )
  82. return indexed_document or document, chunks
  83. def list_documents(
  84. self,
  85. *,
  86. tenant_id: str,
  87. knowledge_base_id: str,
  88. ) -> list[KnowledgeDocument]:
  89. return self.document_repository.list_by_base(
  90. tenant_id=tenant_id,
  91. knowledge_base_id=knowledge_base_id,
  92. )
  93. def search(
  94. self,
  95. payload: KnowledgeSearchRequest,
  96. ) -> list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]]:
  97. chunks = self.chunk_repository.list_by_base(
  98. tenant_id=payload.tenant_id,
  99. knowledge_base_id=payload.knowledge_base_id,
  100. )
  101. document_cache: dict[str, KnowledgeDocument] = {}
  102. query_embedding_result = self.embedding_service.embed_text(payload.query)
  103. scored: list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]] = []
  104. for chunk in chunks:
  105. document = document_cache.get(chunk.document_id)
  106. if document is None:
  107. document = self.document_repository.get_by_id(
  108. tenant_id=payload.tenant_id,
  109. document_id=chunk.document_id,
  110. )
  111. if document is None:
  112. continue
  113. document_cache[chunk.document_id] = document
  114. if not self._matches_filters(document=document, filters_json=payload.filters_json):
  115. continue
  116. keyword = keyword_score(payload.query, chunk.content_text)
  117. vector = cosine_similarity(query_embedding_result.embedding, chunk.embedding_json)
  118. score = round(keyword * 0.7 + vector * 0.3, 6)
  119. scored.append(
  120. (
  121. chunk,
  122. document,
  123. score,
  124. {
  125. "keyword_score": round(keyword, 6),
  126. "vector_score": round(vector, 6),
  127. "retrieval_mode": "hybrid",
  128. "embedding_provider": query_embedding_result.provider,
  129. "embedding_model": query_embedding_result.model,
  130. },
  131. )
  132. )
  133. scored.sort(key=lambda item: item[2], reverse=True)
  134. return scored[: payload.top_k]
  135. def _index_document(
  136. self,
  137. *,
  138. document: KnowledgeDocument,
  139. payload: KnowledgeDocumentCreateRequest,
  140. ) -> list[KnowledgeChunk]:
  141. chunk_payloads = build_chunk_payloads(
  142. content_text=payload.content_text,
  143. chunk_size=payload.chunk_size or self.settings.default_chunk_size,
  144. chunk_overlap=payload.chunk_overlap or self.settings.default_chunk_overlap,
  145. )
  146. for chunk_payload in chunk_payloads:
  147. content_text = self._read_chunk_content(chunk_payload)
  148. embedding_result = self.embedding_service.embed_text(content_text)
  149. chunk_payload["embedding_model"] = embedding_result.model
  150. chunk_payload["embedding_json"] = embedding_result.embedding
  151. chunk_payload["metadata_json"] = {
  152. "embedding_provider": embedding_result.provider,
  153. }
  154. return self.chunk_repository.replace_document_chunks(
  155. tenant_id=document.tenant_id,
  156. knowledge_base_id=document.knowledge_base_id,
  157. document_id=document.id,
  158. chunks=chunk_payloads,
  159. )
  160. def _read_chunk_content(self, chunk_payload: dict[str, JSONValue]) -> str:
  161. value = chunk_payload.get("content_text")
  162. return value if isinstance(value, str) else ""
  163. def _matches_filters(
  164. self,
  165. *,
  166. document: KnowledgeDocument,
  167. filters_json: dict[str, JSONValue],
  168. ) -> bool:
  169. source_type = filters_json.get("source_type")
  170. if isinstance(source_type, str) and document.source_type != source_type:
  171. return False
  172. status = filters_json.get("status")
  173. if isinstance(status, str) and document.status != status:
  174. return False
  175. return True