services.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. """Knowledge application service — thin facade delegating to sub-services."""
  2. from __future__ import annotations
  3. from typing import TYPE_CHECKING
  4. from core_shared import JSONValue, try_build_redis_client
  5. from core_shared.task_queue import TaskQueuePublisher
  6. from app.application.crud_service import KnowledgeCrudService
  7. from app.application.embeddings import EmbeddingService
  8. from app.application.indexing_service import (
  9. KnowledgeDocumentIngestResult,
  10. KnowledgeIndexingError,
  11. KnowledgeIndexingService,
  12. )
  13. from app.application.search_service import KnowledgeSearchService
  14. from app.application.settings_service import KnowledgeSettingsService
  15. from app.bootstrap.settings import KnowledgeServiceSettings
  16. from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument
  17. from app.schemas.knowledge import (
  18. KnowledgeBaseCreateRequest,
  19. KnowledgeBaseCreateRequestDto,
  20. KnowledgeBaseReindexRequestDto,
  21. KnowledgeBaseStatusUpdateRequest,
  22. KnowledgeBaseUpdateRequestDto,
  23. KnowledgeDocumentCreateRequest,
  24. KnowledgeDocumentCreateRequestDto,
  25. KnowledgeDocumentParseRequest,
  26. KnowledgeDocumentReindexRequestDto,
  27. KnowledgeDocumentUpdateRequestDto,
  28. KnowledgeIndexJobData,
  29. KnowledgeSearchRequest,
  30. KnowledgeSettingsDto,
  31. KnowledgeSettingsUpdateRequestDto,
  32. )
  33. if TYPE_CHECKING:
  34. from redis import Redis
  35. from app.domain.repositories import (
  36. KnowledgeBaseRepository,
  37. KnowledgeChunkRepository,
  38. KnowledgeDocumentRepository,
  39. )
  40. from app.infrastructure.object_storage import KnowledgeObjectStorage
  41. class KnowledgeApplicationService:
  42. """Facade composing CRUD, Indexing, Search, and Settings sub-services."""
  43. def __init__(
  44. self,
  45. *,
  46. settings: KnowledgeServiceSettings,
  47. base_repository: KnowledgeBaseRepository,
  48. document_repository: KnowledgeDocumentRepository,
  49. chunk_repository: KnowledgeChunkRepository,
  50. object_storage: KnowledgeObjectStorage | None = None,
  51. redis_client: Redis | None = None,
  52. task_queue_publisher: TaskQueuePublisher | None = None,
  53. ) -> None:
  54. self.settings = settings
  55. self.base_repository = base_repository
  56. self.document_repository = document_repository
  57. self.chunk_repository = chunk_repository
  58. self.embedding_service = EmbeddingService(settings=settings)
  59. self._object_storage = object_storage
  60. self.redis_client = redis_client
  61. self.task_queue_publisher = task_queue_publisher
  62. self.crud_service = KnowledgeCrudService(
  63. settings=settings,
  64. base_repository=base_repository,
  65. document_repository=document_repository,
  66. chunk_repository=chunk_repository,
  67. object_storage=object_storage,
  68. )
  69. self.indexing_service = KnowledgeIndexingService(
  70. settings=settings,
  71. base_repository=base_repository,
  72. document_repository=document_repository,
  73. chunk_repository=chunk_repository,
  74. embedding_service=self.embedding_service,
  75. object_storage=object_storage,
  76. redis_client=redis_client,
  77. task_queue_publisher=task_queue_publisher,
  78. )
  79. self.search_service = KnowledgeSearchService(
  80. settings=settings,
  81. base_repository=base_repository,
  82. document_repository=document_repository,
  83. chunk_repository=chunk_repository,
  84. embedding_service=self.embedding_service,
  85. )
  86. self.settings_service = KnowledgeSettingsService(
  87. settings=settings,
  88. base_repository=base_repository,
  89. )
  90. @property
  91. def object_storage(self) -> KnowledgeObjectStorage:
  92. return self.crud_service.object_storage
  93. # ── Knowledge Base ────────────────────────────────────────────────
  94. def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase:
  95. return self.crud_service.create_base(payload)
  96. def create_base_from_contract(self, payload: KnowledgeBaseCreateRequestDto) -> KnowledgeBase:
  97. return self.crud_service.create_base_from_contract(payload)
  98. def list_bases(self) -> list[KnowledgeBase]:
  99. return self.crud_service.list_bases()
  100. def list_bases_filtered(self, *, keyword: str | None = None, status: str | None = None) -> list[KnowledgeBase]:
  101. return self.crud_service.list_bases_filtered(keyword=keyword, status=status)
  102. def update_base_from_contract(self, payload: KnowledgeBaseUpdateRequestDto) -> KnowledgeBase | None:
  103. return self.crud_service.update_base_from_contract(payload)
  104. def delete_base(self, *, knowledge_base_id: str) -> bool:
  105. return self.crud_service.delete_base(knowledge_base_id=knowledge_base_id)
  106. def update_base_status(self, *, knowledge_base_id: str, payload: KnowledgeBaseStatusUpdateRequest) -> KnowledgeBase | None:
  107. return self.crud_service.update_base_status(knowledge_base_id=knowledge_base_id, payload=payload)
  108. # ── Document CRUD ─────────────────────────────────────────────────
  109. def list_documents(self, *, knowledge_base_id: str) -> list[KnowledgeDocument]:
  110. return self.crud_service.list_documents(knowledge_base_id=knowledge_base_id)
  111. def list_documents_filtered(self, *, knowledge_base_id: str | None = None, keyword: str | None = None, status: str | None = None, source_type: str | None = None) -> list[KnowledgeDocument]:
  112. return self.crud_service.list_documents_filtered(
  113. knowledge_base_id=knowledge_base_id, keyword=keyword,
  114. status=status, source_type=source_type)
  115. def update_document_from_contract(self, payload: KnowledgeDocumentUpdateRequestDto) -> KnowledgeDocument | None:
  116. return self.crud_service.update_document_from_contract(payload)
  117. def delete_document(self, *, document_id: str) -> bool:
  118. return self.crud_service.delete_document(document_id=document_id)
  119. def delete_document_result(self, *, document_id: str) -> dict[str, JSONValue]:
  120. return self.crud_service.delete_document_result(document_id=document_id)
  121. def read_document_content(self, *, document_id: str, include_text: bool = True, include_base64: bool = False) -> dict[str, JSONValue] | None:
  122. return self.crud_service.read_document_content(
  123. document_id=document_id, include_text=include_text, include_base64=include_base64)
  124. def read_document_storage_status(self, *, document_id: str) -> dict[str, JSONValue] | None:
  125. return self.crud_service.read_document_storage_status(document_id=document_id)
  126. def read_storage_health(self) -> dict[str, JSONValue]:
  127. return self.crud_service.read_storage_health()
  128. # ── Chunk CRUD ────────────────────────────────────────────────────
  129. def list_chunks_filtered(self, *, knowledge_base_id: str | None = None, document_id: str | None = None, keyword: str | None = None) -> list[KnowledgeChunk]:
  130. return self.crud_service.list_chunks_filtered(
  131. knowledge_base_id=knowledge_base_id, document_id=document_id, keyword=keyword)
  132. def delete_chunk(self, *, chunk_id: str) -> bool:
  133. return self.crud_service.delete_chunk(chunk_id=chunk_id)
  134. # ── Parse ─────────────────────────────────────────────────────────
  135. def parse_document(self, payload: KnowledgeDocumentParseRequest):
  136. return self.crud_service.parse_document(payload)
  137. # ── Indexing ──────────────────────────────────────────────────────
  138. def create_document(self, payload: KnowledgeDocumentCreateRequest) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
  139. return self.indexing_service.create_document(payload)
  140. def create_document_from_contract(self, payload: KnowledgeDocumentCreateRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
  141. return self.indexing_service.create_document_from_contract(payload)
  142. def create_document_from_contract_result(self, payload: KnowledgeDocumentCreateRequestDto) -> KnowledgeDocumentIngestResult:
  143. return self.indexing_service.create_document_from_contract_result(payload)
  144. def create_document_index_job(self, payload: KnowledgeDocumentCreateRequest) -> KnowledgeDocumentIngestResult:
  145. return self.indexing_service.create_document_index_job(payload)
  146. def queue_document_indexing(self, *, document: KnowledgeDocument, action: str, chunk_size: int | None = None, chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, KnowledgeIndexJobData]:
  147. return self.indexing_service.queue_document_indexing(
  148. document=document, action=action, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
  149. def process_document_index_job(self, *, document_id: str, action: str, worker_key: str, job_id: str | None = None, chunk_size: int | None = None, chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
  150. return self.indexing_service.process_document_index_job(
  151. document_id=document_id, action=action, worker_key=worker_key,
  152. job_id=job_id, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
  153. def execute_document_index_job(self, *, document_id: str, action: str, worker_key: str, lease_seconds: int, job_id: str | None = None, redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
  154. return self.indexing_service.execute_document_index_job(
  155. document_id=document_id, action=action, worker_key=worker_key,
  156. lease_seconds=lease_seconds, job_id=job_id, redis_client=redis_client)
  157. def execute_next_pending_document_job(self, *, worker_key: str, lease_seconds: int, stale_indexing_seconds: int, redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
  158. return self.indexing_service.execute_next_pending_document_job(
  159. worker_key=worker_key, lease_seconds=lease_seconds,
  160. stale_indexing_seconds=stale_indexing_seconds, redis_client=redis_client)
  161. def list_index_jobs(self, *, knowledge_base_id: str | None = None, document_id: str | None = None, status: str | None = None) -> list[KnowledgeIndexJobData]:
  162. return self.indexing_service.list_index_jobs(
  163. knowledge_base_id=knowledge_base_id, document_id=document_id, status=status)
  164. def detail_index_job(self, *, document_id: str) -> KnowledgeIndexJobData | None:
  165. return self.indexing_service.detail_index_job(document_id=document_id)
  166. def reindex_document(self, payload: KnowledgeDocumentReindexRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
  167. return self.indexing_service.reindex_document(payload)
  168. def reindex_document_from_contract_result(self, payload: KnowledgeDocumentReindexRequestDto) -> KnowledgeDocumentIngestResult | None:
  169. return self.indexing_service.reindex_document_from_contract_result(payload)
  170. def reindex_base_from_contract(self, payload: KnowledgeBaseReindexRequestDto) -> list[KnowledgeIndexJobData]:
  171. return self.indexing_service.reindex_base_from_contract(payload)
  172. # ── Search ────────────────────────────────────────────────────────
  173. def search(self, payload: KnowledgeSearchRequest) -> list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]]:
  174. return self.search_service.search(payload)
  175. # ── Settings ──────────────────────────────────────────────────────
  176. def read_settings(self, *, knowledge_base_id: str | None = None) -> KnowledgeSettingsDto:
  177. return self.settings_service.read_settings(knowledge_base_id=knowledge_base_id)
  178. def update_settings(self, payload: KnowledgeSettingsUpdateRequestDto) -> KnowledgeSettingsDto:
  179. return self.settings_service.update_settings(payload)
  180. def build_knowledge_application_service(
  181. *,
  182. db,
  183. settings: KnowledgeServiceSettings,
  184. ) -> KnowledgeApplicationService:
  185. from app.domain.repositories import (
  186. KnowledgeBaseRepository,
  187. KnowledgeChunkRepository,
  188. KnowledgeDocumentRepository,
  189. )
  190. redis_client = try_build_redis_client(settings.redis_url)
  191. return KnowledgeApplicationService(
  192. settings=settings,
  193. base_repository=KnowledgeBaseRepository(db),
  194. document_repository=KnowledgeDocumentRepository(db),
  195. chunk_repository=KnowledgeChunkRepository(db),
  196. redis_client=redis_client,
  197. task_queue_publisher=(
  198. TaskQueuePublisher(client=redis_client) if redis_client is not None else None
  199. ))