crud_service.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. """Knowledge CRUD sub-service — bases, documents, chunks, content reading."""
  2. from __future__ import annotations
  3. import base64
  4. from typing import TYPE_CHECKING
  5. from core_shared import JSONValue
  6. from app.application._storage_mixin import _ObjectStorageMixin
  7. from app.application.document_parsers import (
  8. DocumentParseError,
  9. ParsedDocument,
  10. parse_document_content)
  11. from app.bootstrap.settings import KnowledgeServiceSettings
  12. from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument
  13. from app.schemas.knowledge import (
  14. KnowledgeBaseCreateRequest,
  15. KnowledgeBaseCreateRequestDto,
  16. KnowledgeBaseStatusUpdateRequest,
  17. KnowledgeBaseUpdateRequestDto,
  18. KnowledgeDocumentCreateRequestDto,
  19. KnowledgeDocumentParseRequest,
  20. KnowledgeDocumentUpdateRequestDto)
  21. if TYPE_CHECKING:
  22. from app.domain.repositories import (
  23. KnowledgeBaseRepository,
  24. KnowledgeChunkRepository,
  25. KnowledgeDocumentRepository)
  26. from app.infrastructure.object_storage import KnowledgeObjectStorage
  27. class KnowledgeCrudService(_ObjectStorageMixin):
  28. def __init__(
  29. self,
  30. *,
  31. settings: KnowledgeServiceSettings,
  32. base_repository: KnowledgeBaseRepository,
  33. document_repository: KnowledgeDocumentRepository,
  34. chunk_repository: KnowledgeChunkRepository,
  35. object_storage: KnowledgeObjectStorage | None = None,
  36. ) -> None:
  37. self.settings = settings
  38. self.base_repository = base_repository
  39. self.document_repository = document_repository
  40. self.chunk_repository = chunk_repository
  41. self._object_storage = object_storage
  42. # ── Knowledge Base CRUD ──────────────────────────────────────────
  43. def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase:
  44. return self.base_repository.create(
  45. code=payload.code,
  46. name=payload.name,
  47. description=payload.description,
  48. metadata_json=payload.metadata_json)
  49. def create_base_from_contract(
  50. self,
  51. payload: KnowledgeBaseCreateRequestDto,
  52. ) -> KnowledgeBase:
  53. return self.create_base(KnowledgeBaseCreateRequest(
  54. code=self._build_base_code(payload.name),
  55. name=payload.name,
  56. description=payload.description,
  57. metadata_json=payload.metadata))
  58. def list_bases(self) -> list[KnowledgeBase]:
  59. return self.base_repository.list_all()
  60. def list_bases_filtered(
  61. self,
  62. *,
  63. keyword: str | None = None,
  64. status: str | None = None,
  65. ) -> list[KnowledgeBase]:
  66. return self.base_repository.list_filtered(keyword=keyword, status=status)
  67. def update_base_from_contract(
  68. self,
  69. payload: KnowledgeBaseUpdateRequestDto,
  70. ) -> KnowledgeBase | None:
  71. return self.base_repository.update(
  72. knowledge_base_id=payload.knowledgeBaseId,
  73. name=payload.name,
  74. description=payload.description,
  75. status=payload.status,
  76. metadata_json=payload.metadata)
  77. def delete_base(self, *, knowledge_base_id: str) -> bool:
  78. documents = self.document_repository.list_by_base(knowledge_base_id=knowledge_base_id)
  79. for document in documents:
  80. self._delete_document_object(document=document)
  81. self.chunk_repository.delete_by_base(knowledge_base_id=knowledge_base_id)
  82. for document in documents:
  83. self.document_repository.delete(document_id=document.id)
  84. return self.base_repository.delete(knowledge_base_id=knowledge_base_id)
  85. def update_base_status(
  86. self,
  87. *,
  88. knowledge_base_id: str,
  89. payload: KnowledgeBaseStatusUpdateRequest,
  90. ) -> KnowledgeBase | None:
  91. return self.base_repository.update_status(
  92. knowledge_base_id=knowledge_base_id,
  93. status=payload.status)
  94. # ── Document CRUD ────────────────────────────────────────────────
  95. def list_documents(
  96. self,
  97. *,
  98. knowledge_base_id: str,
  99. ) -> list[KnowledgeDocument]:
  100. return self.document_repository.list_by_base(knowledge_base_id=knowledge_base_id)
  101. def list_documents_filtered(
  102. self,
  103. *,
  104. knowledge_base_id: str | None = None,
  105. keyword: str | None = None,
  106. status: str | None = None,
  107. source_type: str | None = None,
  108. ) -> list[KnowledgeDocument]:
  109. return self.document_repository.list_filtered(
  110. knowledge_base_id=knowledge_base_id,
  111. keyword=keyword,
  112. status=status,
  113. source_type=source_type)
  114. def update_document_from_contract(
  115. self,
  116. payload: KnowledgeDocumentUpdateRequestDto,
  117. ) -> KnowledgeDocument | None:
  118. return self.document_repository.update(
  119. document_id=payload.documentId,
  120. title=payload.title,
  121. source_uri=payload.sourceUri,
  122. status=payload.status,
  123. metadata_json=payload.metadata)
  124. def delete_document(self, *, document_id: str) -> bool:
  125. return bool(self.delete_document_result(document_id=document_id)["deleted"])
  126. def delete_document_result(self, *, document_id: str) -> dict[str, JSONValue]:
  127. document = self.document_repository.get_by_id(document_id=document_id)
  128. if document is None:
  129. return {
  130. "deleted": False,
  131. "objectDeleted": False,
  132. "documentId": document_id,
  133. }
  134. object_deleted = self._delete_document_object(document=document)
  135. self.chunk_repository.delete_by_document(document_id=document_id)
  136. deleted = self.document_repository.delete(document_id=document_id) is not None
  137. return {
  138. "deleted": deleted,
  139. "objectDeleted": object_deleted,
  140. "documentId": document_id,
  141. }
  142. def read_document_content(
  143. self,
  144. *,
  145. document_id: str,
  146. include_text: bool = True,
  147. include_base64: bool = False,
  148. ) -> dict[str, JSONValue] | None:
  149. document = self.document_repository.get_by_id(document_id=document_id)
  150. if document is None:
  151. return None
  152. raw_content = self._read_document_raw_content(document=document)
  153. object_status = self.read_document_storage_status(document_id=document_id)
  154. content_type = self._read_content_type_from_status(object_status)
  155. payload: dict[str, JSONValue] = {
  156. "documentId": document.id,
  157. "title": document.title,
  158. "sourceType": document.source_type,
  159. "contentType": content_type,
  160. "sizeBytes": len(raw_content),
  161. "objectStorage": self._read_object_storage_metadata(document=document),
  162. "contentBase64": None,
  163. "contentText": None,
  164. }
  165. if include_base64:
  166. payload["contentBase64"] = base64.b64encode(raw_content).decode("ascii")
  167. if include_text and self._is_text_content_type(content_type=content_type, source_type=document.source_type):
  168. payload["contentText"] = raw_content.decode("utf-8", errors="replace")
  169. return payload
  170. def read_document_storage_status(self, *, document_id: str) -> dict[str, JSONValue] | None:
  171. document = self.document_repository.get_by_id(document_id=document_id)
  172. if document is None:
  173. return None
  174. object_key = self._read_document_object_key(document=document)
  175. if object_key is None:
  176. return {
  177. "documentId": document.id,
  178. "exists": False,
  179. "objectStorage": None,
  180. "errorMessage": "document object metadata is missing",
  181. }
  182. status = self.object_storage.head_object(object_key=object_key)
  183. return self._object_status_to_payload(document=document, status=status)
  184. def read_storage_health(self) -> dict[str, JSONValue]:
  185. return dict(self.object_storage.health_check())
  186. # ── Chunk CRUD ───────────────────────────────────────────────────
  187. def list_chunks_filtered(
  188. self,
  189. *,
  190. knowledge_base_id: str | None = None,
  191. document_id: str | None = None,
  192. keyword: str | None = None,
  193. ) -> list[KnowledgeChunk]:
  194. return self.chunk_repository.list_filtered(
  195. knowledge_base_id=knowledge_base_id,
  196. document_id=document_id,
  197. keyword=keyword)
  198. def delete_chunk(self, *, chunk_id: str) -> bool:
  199. return self.chunk_repository.delete(chunk_id=chunk_id)
  200. # ── Parse ────────────────────────────────────────────────────────
  201. def parse_document(self, payload: KnowledgeDocumentParseRequest) -> ParsedDocument:
  202. try:
  203. return parse_document_content(
  204. source_type=payload.source_type,
  205. content_text=payload.content_text,
  206. content_base64=payload.content_base64,
  207. source_uri=payload.source_uri)
  208. except DocumentParseError:
  209. raise
  210. # ── Private helpers ──────────────────────────────────────────────
  211. def _build_base_code(self, name: str) -> str:
  212. base = "".join(
  213. char.lower() if char.isalnum() else "_"
  214. for char in name
  215. ).strip("_") or "knowledge_base"
  216. return base[:64]