services.py 51 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237
  1. from __future__ import annotations
  2. import base64
  3. import hashlib
  4. from dataclasses import dataclass
  5. from datetime import datetime, timedelta
  6. from typing import TYPE_CHECKING, cast
  7. from uuid import uuid4
  8. from sqlalchemy.orm import Session
  9. from core_shared import JSONValue, try_build_redis_client
  10. from core_shared.task_queue import KNOWLEDGE_DOCUMENT_QUEUE, TaskQueuePublisher
  11. from app.application.document_parsers import (
  12. DocumentParseError,
  13. ParsedDocument,
  14. normalize_source_type,
  15. parse_document_content,
  16. read_document_content_bytes)
  17. from app.application.chunking import chunk_document
  18. from app.application.embeddings import EmbeddingService
  19. from app.application.retrieval import (
  20. bm25_score,
  21. build_chunk_payloads,
  22. compute_bm25_stats,
  23. cosine_similarity,
  24. keyword_score,
  25. rerank_score,
  26. stable_content_hash)
  27. from app.bootstrap.settings import KnowledgeServiceSettings
  28. from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument
  29. from app.domain.repositories import (
  30. KnowledgeBaseRepository,
  31. KnowledgeChunkRepository,
  32. KnowledgeDocumentRepository)
  33. from app.infrastructure.object_storage import (
  34. KnowledgeObjectStorage,
  35. ObjectStorageError,
  36. ObjectStorageNotFoundError,
  37. ObjectStorageStatus,
  38. build_document_object_key,
  39. build_object_storage)
  40. from app.schemas.knowledge import (
  41. KnowledgeBaseCreateRequest,
  42. KnowledgeBaseCreateRequestDto,
  43. KnowledgeBaseStatusUpdateRequest,
  44. KnowledgeBaseUpdateRequestDto,
  45. KnowledgeBaseReindexRequestDto,
  46. KnowledgeDocumentCreateRequest,
  47. KnowledgeDocumentCreateRequestDto,
  48. KnowledgeIndexJobAction,
  49. KnowledgeIndexJobData,
  50. KnowledgeIndexJobStatus,
  51. KnowledgeDocumentParseRequest,
  52. KnowledgeDocumentReindexRequestDto,
  53. KnowledgeDocumentUpdateRequestDto,
  54. KnowledgeSettingsDto,
  55. KnowledgeSettingsUpdateRequestDto,
  56. KnowledgeSearchRequest)
  57. if TYPE_CHECKING:
  58. from redis import Redis
  59. @dataclass(frozen=True, slots=True)
  60. class KnowledgeDocumentIngestResult:
  61. document: KnowledgeDocument
  62. chunks: list[KnowledgeChunk]
  63. queued: bool = False
  64. job: KnowledgeIndexJobData | None = None
  65. class KnowledgeIndexingError(RuntimeError):
  66. def __init__(self, *, document_id: str, message: str) -> None:
  67. super().__init__(message)
  68. self.document_id = document_id
  69. class KnowledgeApplicationService:
  70. def __init__(
  71. self,
  72. *,
  73. settings: KnowledgeServiceSettings,
  74. base_repository: KnowledgeBaseRepository,
  75. document_repository: KnowledgeDocumentRepository,
  76. chunk_repository: KnowledgeChunkRepository,
  77. object_storage: KnowledgeObjectStorage | None = None,
  78. redis_client: Redis | None = None,
  79. task_queue_publisher: TaskQueuePublisher | None = None) -> None:
  80. self.settings = settings
  81. self.base_repository = base_repository
  82. self.document_repository = document_repository
  83. self.chunk_repository = chunk_repository
  84. self.embedding_service = EmbeddingService(settings=settings)
  85. self._object_storage = object_storage
  86. self.redis_client = redis_client
  87. self.task_queue_publisher = task_queue_publisher
  88. @property
  89. def object_storage(self) -> KnowledgeObjectStorage:
  90. if self._object_storage is None:
  91. self._object_storage = build_object_storage(self.settings)
  92. return self._object_storage
  93. def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase:
  94. return self.base_repository.create(
  95. code=payload.code,
  96. name=payload.name,
  97. description=payload.description,
  98. metadata_json=payload.metadata_json)
  99. def create_base_from_contract(
  100. self,
  101. payload: KnowledgeBaseCreateRequestDto) -> KnowledgeBase:
  102. return self.create_base(
  103. KnowledgeBaseCreateRequest(
  104. code=self._build_base_code(payload.name),
  105. name=payload.name,
  106. description=payload.description,
  107. metadata_json=payload.metadata))
  108. def list_bases(self) -> list[KnowledgeBase]:
  109. return self.base_repository.list_all()
  110. def list_bases_filtered(
  111. self,
  112. *,
  113. keyword: str | None = None,
  114. status: str | None = None) -> list[KnowledgeBase]:
  115. return self.base_repository.list_filtered(
  116. keyword=keyword,
  117. status=status)
  118. def update_base_from_contract(
  119. self,
  120. payload: KnowledgeBaseUpdateRequestDto) -> KnowledgeBase | None:
  121. return self.base_repository.update(
  122. knowledge_base_id=payload.knowledgeBaseId,
  123. name=payload.name,
  124. description=payload.description,
  125. status=payload.status,
  126. metadata_json=payload.metadata)
  127. def delete_base(self, *, knowledge_base_id: str) -> bool:
  128. documents = self.document_repository.list_by_base(
  129. knowledge_base_id=knowledge_base_id)
  130. for document in documents:
  131. self._delete_document_object(document=document)
  132. self.chunk_repository.delete_by_base(knowledge_base_id=knowledge_base_id)
  133. for document in documents:
  134. self.document_repository.delete(document_id=document.id)
  135. return self.base_repository.delete(knowledge_base_id=knowledge_base_id)
  136. def update_base_status(
  137. self,
  138. *,
  139. knowledge_base_id: str,
  140. payload: KnowledgeBaseStatusUpdateRequest) -> KnowledgeBase | None:
  141. return self.base_repository.update_status(
  142. knowledge_base_id=knowledge_base_id,
  143. status=payload.status)
  144. def create_document(
  145. self,
  146. payload: KnowledgeDocumentCreateRequest) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
  147. knowledge_base = self.base_repository.get_by_id(
  148. knowledge_base_id=payload.knowledge_base_id)
  149. if knowledge_base is None:
  150. raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
  151. parsed = self.parse_document(
  152. KnowledgeDocumentParseRequest(
  153. source_type=payload.source_type,
  154. source_uri=payload.source_uri,
  155. content_text=payload.content_text,
  156. content_base64=payload.content_base64)
  157. )
  158. raw_content = read_document_content_bytes(
  159. content_text=payload.content_text,
  160. content_base64=payload.content_base64)
  161. object_key = build_document_object_key(
  162. knowledge_base_id=payload.knowledge_base_id,
  163. source_type=parsed.source_type,
  164. title=payload.title)
  165. stored_object = self.object_storage.put_bytes(
  166. object_key=object_key,
  167. content=raw_content,
  168. content_type=self._guess_content_type(source_type=parsed.source_type))
  169. document: KnowledgeDocument | None = None
  170. try:
  171. metadata_json = {
  172. **payload.metadata_json,
  173. "parser_metadata": parsed.metadata_json,
  174. "object_storage": stored_object.to_metadata(),
  175. }
  176. document = self.document_repository.create(
  177. knowledge_base_id=payload.knowledge_base_id,
  178. title=payload.title,
  179. source_type=parsed.source_type,
  180. source_uri=payload.source_uri,
  181. content_text="",
  182. content_hash=stable_content_hash(parsed.content_text),
  183. metadata_json=metadata_json)
  184. try:
  185. chunks = self._index_document(
  186. document=document,
  187. content_text=parsed.content_text,
  188. chunk_size=payload.chunk_size,
  189. chunk_overlap=payload.chunk_overlap)
  190. except Exception as exc:
  191. self._mark_document_failed(document=document, message=str(exc))
  192. raise KnowledgeIndexingError(
  193. document_id=document.id,
  194. message=f"knowledge document indexing failed: {exc}") from exc
  195. indexed_document = self.document_repository.update_status(
  196. document_id=document.id,
  197. status="indexed")
  198. return indexed_document or document, chunks
  199. except Exception:
  200. if document is None:
  201. try:
  202. self.object_storage.delete_object(object_key=stored_object.object_key)
  203. except ObjectStorageError:
  204. pass
  205. raise
  206. def create_document_from_contract(
  207. self,
  208. payload: KnowledgeDocumentCreateRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
  209. return self.create_document(
  210. KnowledgeDocumentCreateRequest(
  211. knowledge_base_id=payload.knowledgeBaseId,
  212. title=payload.title,
  213. content_text=payload.contentText,
  214. content_base64=payload.contentBase64,
  215. source_type=payload.sourceType,
  216. source_uri=payload.sourceUri,
  217. metadata_json=payload.metadata,
  218. chunk_size=payload.chunkSize,
  219. chunk_overlap=payload.chunkOverlap))
  220. def create_document_from_contract_result(
  221. self,
  222. payload: KnowledgeDocumentCreateRequestDto) -> KnowledgeDocumentIngestResult:
  223. request = KnowledgeDocumentCreateRequest(
  224. knowledge_base_id=payload.knowledgeBaseId,
  225. title=payload.title,
  226. content_text=payload.contentText,
  227. content_base64=payload.contentBase64,
  228. source_type=payload.sourceType,
  229. source_uri=payload.sourceUri,
  230. metadata_json=payload.metadata,
  231. chunk_size=payload.chunkSize,
  232. chunk_overlap=payload.chunkOverlap)
  233. if self._should_queue_indexing(async_mode=payload.asyncMode):
  234. return self.create_document_index_job(payload=request)
  235. document, chunks = self.create_document(request)
  236. return KnowledgeDocumentIngestResult(
  237. document=document,
  238. chunks=chunks)
  239. def create_document_index_job(
  240. self,
  241. payload: KnowledgeDocumentCreateRequest) -> KnowledgeDocumentIngestResult:
  242. knowledge_base = self.base_repository.get_by_id(
  243. knowledge_base_id=payload.knowledge_base_id)
  244. if knowledge_base is None:
  245. raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
  246. raw_content = read_document_content_bytes(
  247. content_text=payload.content_text,
  248. content_base64=payload.content_base64)
  249. source_type = normalize_source_type(
  250. source_type=payload.source_type,
  251. source_uri=payload.source_uri)
  252. object_key = build_document_object_key(
  253. knowledge_base_id=payload.knowledge_base_id,
  254. source_type=source_type,
  255. title=payload.title)
  256. stored_object = self.object_storage.put_bytes(
  257. object_key=object_key,
  258. content=raw_content,
  259. content_type=self._guess_content_type(source_type=source_type))
  260. document: KnowledgeDocument | None = None
  261. try:
  262. document = self.document_repository.create(
  263. knowledge_base_id=payload.knowledge_base_id,
  264. title=payload.title,
  265. source_type=source_type,
  266. source_uri=payload.source_uri,
  267. content_text="",
  268. content_hash=hashlib.sha256(raw_content).hexdigest(),
  269. metadata_json={
  270. **payload.metadata_json,
  271. "object_storage": stored_object.to_metadata(),
  272. },
  273. status="draft")
  274. queued_document, job = self.queue_document_indexing(
  275. document=document,
  276. action="index",
  277. chunk_size=payload.chunk_size,
  278. chunk_overlap=payload.chunk_overlap)
  279. except Exception:
  280. if document is None:
  281. try:
  282. self.object_storage.delete_object(object_key=stored_object.object_key)
  283. except ObjectStorageError:
  284. pass
  285. raise
  286. return KnowledgeDocumentIngestResult(
  287. document=queued_document,
  288. chunks=[],
  289. queued=True,
  290. job=job)
  291. def parse_document(self, payload: KnowledgeDocumentParseRequest) -> ParsedDocument:
  292. try:
  293. return parse_document_content(
  294. source_type=payload.source_type,
  295. content_text=payload.content_text,
  296. content_base64=payload.content_base64,
  297. source_uri=payload.source_uri)
  298. except DocumentParseError:
  299. raise
  300. def queue_document_indexing(
  301. self,
  302. *,
  303. document: KnowledgeDocument,
  304. action: str,
  305. chunk_size: int | None = None,
  306. chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, KnowledgeIndexJobData]:
  307. job_id = f"kjob_{uuid4().hex}"
  308. metadata = self._write_index_job_metadata(
  309. document=document,
  310. action=action,
  311. job_id=job_id,
  312. status="queued",
  313. progress=0,
  314. chunk_size=chunk_size,
  315. chunk_overlap=chunk_overlap)
  316. updated_document = self.document_repository.update(
  317. document_id=document.id,
  318. status="queued",
  319. metadata_json=metadata)
  320. document_for_job = updated_document or document
  321. published = self._publish_document_index_job(
  322. document_id=document.id,
  323. action=action,
  324. job_id=job_id)
  325. if not published:
  326. metadata = self._write_index_job_metadata(
  327. document=document_for_job,
  328. action=action,
  329. job_id=job_id,
  330. status="running",
  331. progress=1,
  332. chunk_size=chunk_size,
  333. chunk_overlap=chunk_overlap,
  334. worker_key="inline-fallback")
  335. document_for_job = self.document_repository.update(
  336. document_id=document.id,
  337. status="indexing",
  338. metadata_json=metadata) or document_for_job
  339. processed = self.process_document_index_job(
  340. document_id=document.id,
  341. action=action,
  342. job_id=job_id,
  343. worker_key="inline-fallback",
  344. chunk_size=chunk_size,
  345. chunk_overlap=chunk_overlap)
  346. if processed is not None:
  347. indexed_document, chunks = processed
  348. return indexed_document, self._read_latest_index_job(document=indexed_document)
  349. return document_for_job, self._read_latest_index_job(document=document_for_job)
  350. def list_documents(
  351. self,
  352. *,
  353. knowledge_base_id: str) -> list[KnowledgeDocument]:
  354. return self.document_repository.list_by_base(
  355. knowledge_base_id=knowledge_base_id)
  356. def list_documents_filtered(
  357. self,
  358. *,
  359. knowledge_base_id: str | None = None,
  360. keyword: str | None = None,
  361. status: str | None = None,
  362. source_type: str | None = None) -> list[KnowledgeDocument]:
  363. return self.document_repository.list_filtered(
  364. knowledge_base_id=knowledge_base_id,
  365. keyword=keyword,
  366. status=status,
  367. source_type=source_type)
  368. def update_document_from_contract(
  369. self,
  370. payload: KnowledgeDocumentUpdateRequestDto) -> KnowledgeDocument | None:
  371. return self.document_repository.update(
  372. document_id=payload.documentId,
  373. title=payload.title,
  374. source_uri=payload.sourceUri,
  375. status=payload.status,
  376. metadata_json=payload.metadata)
  377. def reindex_document(
  378. self,
  379. payload: KnowledgeDocumentReindexRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
  380. document = self.document_repository.get_by_id(document_id=payload.documentId)
  381. if document is None:
  382. return None
  383. try:
  384. parsed = self._parse_document_for_indexing(document=document)
  385. chunks = self._index_document(
  386. document=document,
  387. content_text=parsed.content_text,
  388. chunk_size=payload.chunkSize,
  389. chunk_overlap=payload.chunkOverlap)
  390. except Exception as exc:
  391. self._mark_document_failed(document=document, message=str(exc))
  392. raise KnowledgeIndexingError(
  393. document_id=document.id,
  394. message=f"knowledge document reindex failed: {exc}") from exc
  395. metadata = dict(document.metadata_json or {})
  396. metadata["parser_metadata"] = parsed.metadata_json
  397. indexed_document = self.document_repository.update(
  398. document_id=document.id,
  399. status="indexed",
  400. metadata_json=metadata)
  401. return indexed_document or document, chunks
  402. def reindex_document_from_contract_result(
  403. self,
  404. payload: KnowledgeDocumentReindexRequestDto) -> KnowledgeDocumentIngestResult | None:
  405. if self._should_queue_indexing(async_mode=payload.asyncMode):
  406. document = self.document_repository.get_by_id(document_id=payload.documentId)
  407. if document is None:
  408. return None
  409. queued_document, job = self.queue_document_indexing(
  410. document=document,
  411. action="reindex",
  412. chunk_size=payload.chunkSize,
  413. chunk_overlap=payload.chunkOverlap)
  414. return KnowledgeDocumentIngestResult(
  415. document=queued_document,
  416. chunks=[],
  417. queued=True,
  418. job=job)
  419. result = self.reindex_document(payload)
  420. if result is None:
  421. return None
  422. document, chunks = result
  423. return KnowledgeDocumentIngestResult(
  424. document=document,
  425. chunks=chunks)
  426. def reindex_base_from_contract(
  427. self,
  428. payload: KnowledgeBaseReindexRequestDto) -> list[KnowledgeIndexJobData]:
  429. documents = self.document_repository.list_by_base(
  430. knowledge_base_id=payload.knowledgeBaseId)
  431. jobs: list[KnowledgeIndexJobData] = []
  432. for document in documents:
  433. if document.status == "archived":
  434. continue
  435. queued_document, job = self.queue_document_indexing(
  436. document=document,
  437. action="reindex",
  438. chunk_size=payload.chunkSize,
  439. chunk_overlap=payload.chunkOverlap)
  440. jobs.append(job or self._read_latest_index_job(document=queued_document))
  441. return jobs
  442. def process_document_index_job(
  443. self,
  444. *,
  445. document_id: str,
  446. action: str,
  447. worker_key: str,
  448. job_id: str | None = None,
  449. chunk_size: int | None = None,
  450. chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
  451. document = self.document_repository.get_by_id(document_id=document_id)
  452. if document is None:
  453. return None
  454. resolved_job = self._read_latest_index_job(document=document)
  455. resolved_job_id = job_id or resolved_job.jobId
  456. resolved_chunk_size = chunk_size if chunk_size is not None else resolved_job.chunkSize
  457. resolved_chunk_overlap = chunk_overlap if chunk_overlap is not None else resolved_job.chunkOverlap
  458. if document.status == "archived":
  459. metadata = self._write_index_job_metadata(
  460. document=document,
  461. action=action,
  462. job_id=resolved_job_id,
  463. status="skipped",
  464. progress=100,
  465. chunk_size=resolved_chunk_size,
  466. chunk_overlap=resolved_chunk_overlap,
  467. worker_key=worker_key,
  468. completed_time=datetime.utcnow(),
  469. error_message="document is archived")
  470. skipped_document = self.document_repository.update(
  471. document_id=document.id,
  472. metadata_json=metadata) or document
  473. return skipped_document, []
  474. metadata = self._write_index_job_metadata(
  475. document=document,
  476. action=action,
  477. job_id=resolved_job_id,
  478. status="running",
  479. progress=10,
  480. chunk_size=resolved_chunk_size,
  481. chunk_overlap=resolved_chunk_overlap,
  482. worker_key=worker_key,
  483. started_time=datetime.utcnow())
  484. running_document = self.document_repository.update(
  485. document_id=document.id,
  486. status="indexing",
  487. metadata_json=metadata) or document
  488. try:
  489. parsed = self._parse_document_for_indexing(document=running_document)
  490. metadata = self._write_index_job_metadata(
  491. document=running_document,
  492. action=action,
  493. job_id=resolved_job_id,
  494. status="running",
  495. progress=40,
  496. chunk_size=resolved_chunk_size,
  497. chunk_overlap=resolved_chunk_overlap,
  498. worker_key=worker_key,
  499. started_time=self._read_latest_index_job(document=running_document).startedTime)
  500. metadata["parser_metadata"] = parsed.metadata_json
  501. running_document = self.document_repository.update(
  502. document_id=document.id,
  503. status="indexing",
  504. metadata_json=metadata) or running_document
  505. chunks = self._index_document(
  506. document=running_document,
  507. content_text=parsed.content_text,
  508. chunk_size=resolved_chunk_size,
  509. chunk_overlap=resolved_chunk_overlap)
  510. except Exception as exc:
  511. self._mark_document_failed(
  512. document=running_document,
  513. message=str(exc),
  514. job_id=resolved_job_id,
  515. action=action,
  516. worker_key=worker_key,
  517. chunk_size=resolved_chunk_size,
  518. chunk_overlap=resolved_chunk_overlap)
  519. raise KnowledgeIndexingError(
  520. document_id=document.id,
  521. message=f"knowledge document {action} failed: {exc}") from exc
  522. metadata = self._write_index_job_metadata(
  523. document=running_document,
  524. action=action,
  525. job_id=resolved_job_id,
  526. status="completed",
  527. progress=100,
  528. chunk_size=resolved_chunk_size,
  529. chunk_overlap=resolved_chunk_overlap,
  530. worker_key=worker_key,
  531. completed_time=datetime.utcnow())
  532. metadata["parser_metadata"] = parsed.metadata_json
  533. indexed_document = self.document_repository.update(
  534. document_id=document.id,
  535. status="indexed",
  536. metadata_json=metadata)
  537. return indexed_document or running_document, chunks
  538. def execute_document_index_job(
  539. self,
  540. *,
  541. document_id: str,
  542. action: str,
  543. worker_key: str,
  544. lease_seconds: int,
  545. job_id: str | None = None,
  546. redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
  547. resolved_redis_client = redis_client or self.redis_client
  548. idempotency_key = f"{document_id}:{job_id or action}"
  549. lock = None
  550. idempotency_store = None
  551. if resolved_redis_client is not None:
  552. from core_shared.redis_primitives import DistributedLock, IdempotencyStore
  553. lock = DistributedLock(
  554. client=resolved_redis_client,
  555. name=f"knowledge-document:{document_id}:lock",
  556. ttl_seconds=lease_seconds)
  557. if not lock.acquire():
  558. return None
  559. idempotency_store = IdempotencyStore(
  560. client=resolved_redis_client,
  561. prefix="knowledge-document-idempotency")
  562. if not idempotency_store.begin(key=idempotency_key):
  563. lock.release()
  564. return None
  565. try:
  566. result = self.process_document_index_job(
  567. document_id=document_id,
  568. action=action,
  569. job_id=job_id,
  570. worker_key=worker_key)
  571. if idempotency_store is not None and result is not None:
  572. document, chunks = result
  573. idempotency_store.complete(
  574. key=idempotency_key,
  575. result={
  576. "status": document.status,
  577. "document_id": document.id,
  578. "chunk_count": len(chunks),
  579. })
  580. except Exception:
  581. if idempotency_store is not None:
  582. idempotency_store.clear(key=idempotency_key)
  583. raise
  584. finally:
  585. if lock is not None:
  586. lock.release()
  587. return result
  588. def execute_next_pending_document_job(
  589. self,
  590. *,
  591. worker_key: str,
  592. lease_seconds: int,
  593. stale_indexing_seconds: int,
  594. redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
  595. stale_before = datetime.utcnow() - timedelta(seconds=stale_indexing_seconds)
  596. document = self.document_repository.get_next_pending_indexing(
  597. stale_before=stale_before)
  598. if document is None:
  599. return None
  600. job = self._read_latest_index_job(document=document)
  601. return self.execute_document_index_job(
  602. document_id=document.id,
  603. action=job.action,
  604. job_id=job.jobId,
  605. worker_key=worker_key,
  606. lease_seconds=lease_seconds,
  607. redis_client=redis_client)
  608. def list_index_jobs(
  609. self,
  610. *,
  611. knowledge_base_id: str | None = None,
  612. document_id: str | None = None,
  613. status: str | None = None) -> list[KnowledgeIndexJobData]:
  614. documents = self.document_repository.list_filtered(
  615. knowledge_base_id=knowledge_base_id)
  616. jobs: list[KnowledgeIndexJobData] = []
  617. for document in documents:
  618. if document_id is not None and document.id != document_id:
  619. continue
  620. job = self._try_read_latest_index_job(document=document)
  621. if job is None:
  622. continue
  623. if status is not None and job.status != status:
  624. continue
  625. jobs.append(job)
  626. jobs.sort(
  627. key=lambda item: item.queuedTime or datetime.min,
  628. reverse=True)
  629. return jobs
  630. def detail_index_job(self, *, document_id: str) -> KnowledgeIndexJobData | None:
  631. document = self.document_repository.get_by_id(document_id=document_id)
  632. if document is None:
  633. return None
  634. return self._try_read_latest_index_job(document=document)
  635. def delete_document(self, *, document_id: str) -> bool:
  636. return bool(self.delete_document_result(document_id=document_id)["deleted"])
  637. def delete_document_result(self, *, document_id: str) -> dict[str, JSONValue]:
  638. document = self.document_repository.get_by_id(document_id=document_id)
  639. if document is None:
  640. return {
  641. "deleted": False,
  642. "objectDeleted": False,
  643. "documentId": document_id,
  644. }
  645. object_deleted = self._delete_document_object(document=document)
  646. self.chunk_repository.delete_by_document(document_id=document_id)
  647. deleted = self.document_repository.delete(document_id=document_id) is not None
  648. return {
  649. "deleted": deleted,
  650. "objectDeleted": object_deleted,
  651. "documentId": document_id,
  652. }
  653. def read_document_content(
  654. self,
  655. *,
  656. document_id: str,
  657. include_text: bool = True,
  658. include_base64: bool = False) -> dict[str, JSONValue] | None:
  659. document = self.document_repository.get_by_id(document_id=document_id)
  660. if document is None:
  661. return None
  662. raw_content = self._read_document_raw_content(document=document)
  663. object_status = self.read_document_storage_status(document_id=document_id)
  664. content_type = self._read_content_type_from_status(object_status)
  665. payload: dict[str, JSONValue] = {
  666. "documentId": document.id,
  667. "title": document.title,
  668. "sourceType": document.source_type,
  669. "contentType": content_type,
  670. "sizeBytes": len(raw_content),
  671. "objectStorage": self._read_object_storage_metadata(document=document),
  672. "contentBase64": None,
  673. "contentText": None,
  674. }
  675. if include_base64:
  676. payload["contentBase64"] = base64.b64encode(raw_content).decode("ascii")
  677. if include_text and self._is_text_content_type(content_type=content_type, source_type=document.source_type):
  678. payload["contentText"] = raw_content.decode("utf-8", errors="replace")
  679. return payload
  680. def read_document_storage_status(self, *, document_id: str) -> dict[str, JSONValue] | None:
  681. document = self.document_repository.get_by_id(document_id=document_id)
  682. if document is None:
  683. return None
  684. object_key = self._read_document_object_key(document=document)
  685. if object_key is None:
  686. return {
  687. "documentId": document.id,
  688. "exists": False,
  689. "objectStorage": None,
  690. "errorMessage": "document object metadata is missing",
  691. }
  692. status = self.object_storage.head_object(object_key=object_key)
  693. return self._object_status_to_payload(document=document, status=status)
  694. def read_storage_health(self) -> dict[str, JSONValue]:
  695. return dict(self.object_storage.health_check())
  696. def list_chunks_filtered(
  697. self,
  698. *,
  699. knowledge_base_id: str | None = None,
  700. document_id: str | None = None,
  701. keyword: str | None = None) -> list[KnowledgeChunk]:
  702. return self.chunk_repository.list_filtered(
  703. knowledge_base_id=knowledge_base_id,
  704. document_id=document_id,
  705. keyword=keyword)
  706. def delete_chunk(self, *, chunk_id: str) -> bool:
  707. return self.chunk_repository.delete(chunk_id=chunk_id)
  708. def search(
  709. self,
  710. payload: KnowledgeSearchRequest) -> list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]]:
  711. document_cache: dict[str, KnowledgeDocument] = {}
  712. query_embedding_result = self.embedding_service.embed_text(payload.query)
  713. candidate_limit = max(
  714. payload.top_k * max(self.settings.retrieval_candidate_multiplier, 1),
  715. payload.top_k)
  716. vector_candidates = self.chunk_repository.search_by_vector(
  717. knowledge_base_id=payload.knowledge_base_id,
  718. embedding=query_embedding_result.embedding,
  719. limit=candidate_limit)
  720. if vector_candidates:
  721. chunks = [chunk for chunk, _ in vector_candidates]
  722. vector_scores_by_chunk_id = {
  723. chunk.id: score for chunk, score in vector_candidates
  724. }
  725. retrieval_mode = "pgvector-hybrid"
  726. else:
  727. chunks = self.chunk_repository.list_by_base(
  728. knowledge_base_id=payload.knowledge_base_id)
  729. vector_scores_by_chunk_id = {}
  730. retrieval_mode = "hybrid"
  731. # Resolve per-base retrieval weights with global fallback
  732. kb = self.base_repository.get_by_id(knowledge_base_id=payload.knowledge_base_id)
  733. retrieval_config = (kb.metadata_json or {}).get("retrieval_config", {}) if kb else {}
  734. keyword_weight = float(retrieval_config.get("keyword_weight", self.settings.retrieval_keyword_weight))
  735. vector_weight = float(retrieval_config.get("vector_weight", self.settings.retrieval_vector_weight))
  736. rerank_weight = float(retrieval_config.get("rerank_weight", self.settings.retrieval_rerank_weight))
  737. # Pre-compute BM25 collection stats from candidate chunks
  738. chunk_texts = [chunk.content_text for chunk in chunks]
  739. avg_doc_length, doc_count, df_map = compute_bm25_stats(chunk_texts)
  740. scored: list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]] = []
  741. for chunk in chunks:
  742. document = document_cache.get(chunk.document_id)
  743. if document is None:
  744. document = self.document_repository.get_by_id(
  745. document_id=chunk.document_id)
  746. if document is None:
  747. continue
  748. document_cache[chunk.document_id] = document
  749. if not self._matches_filters(document=document, filters_json=payload.filters_json):
  750. continue
  751. keyword = bm25_score(
  752. payload.query, chunk.content_text,
  753. avg_doc_length=avg_doc_length, doc_count=doc_count, df=df_map)
  754. vector = vector_scores_by_chunk_id.get(chunk.id)
  755. if vector is None:
  756. vector = cosine_similarity(query_embedding_result.embedding, chunk.embedding_json)
  757. rerank = (
  758. rerank_score(
  759. query=payload.query,
  760. chunk_text=chunk.content_text,
  761. document_title=document.title)
  762. if self.settings.retrieval_rerank_enabled
  763. else 0.0
  764. )
  765. score = round(
  766. keyword * keyword_weight
  767. + vector * vector_weight
  768. + rerank * rerank_weight,
  769. 6)
  770. scored.append(
  771. (
  772. chunk,
  773. document,
  774. score,
  775. {
  776. "final_score": score,
  777. "keyword_score": round(keyword, 6),
  778. "vector_score": round(vector, 6),
  779. "rerank_score": round(rerank, 6),
  780. "retrieval_mode": retrieval_mode,
  781. "rerank_enabled": self.settings.retrieval_rerank_enabled,
  782. "candidate_limit": candidate_limit,
  783. "weights": {
  784. "keyword": keyword_weight,
  785. "vector": vector_weight,
  786. "rerank": rerank_weight,
  787. },
  788. "embedding_provider": query_embedding_result.provider,
  789. "embedding_model": query_embedding_result.model,
  790. "citation": {
  791. "document_id": document.id,
  792. "document_title": document.title,
  793. "source_uri": document.source_uri,
  794. "chunk_id": chunk.id,
  795. "chunk_index": chunk.chunk_index,
  796. },
  797. })
  798. )
  799. scored.sort(key=lambda item: item[2], reverse=True)
  800. return scored[: payload.top_k]
  801. def _index_document(
  802. self,
  803. *,
  804. document: KnowledgeDocument,
  805. content_text: str,
  806. chunk_size: int | None,
  807. chunk_overlap: int | None) -> list[KnowledgeChunk]:
  808. source_type = document.source_type or "text"
  809. resolved_size = chunk_size or self.settings.default_chunk_size
  810. resolved_overlap = chunk_overlap or self.settings.default_chunk_overlap
  811. chunk_payloads = chunk_document(
  812. content_text=content_text,
  813. source_type=source_type,
  814. chunk_size=resolved_size,
  815. chunk_overlap=resolved_overlap)
  816. for chunk_payload in chunk_payloads:
  817. content_text = self._read_chunk_content(chunk_payload)
  818. embedding_result = self.embedding_service.embed_text(content_text)
  819. chunk_payload["embedding_model"] = embedding_result.model
  820. chunk_payload["embedding_json"] = embedding_result.embedding
  821. chunk_payload["metadata_json"] = {
  822. "embedding_provider": embedding_result.provider,
  823. }
  824. return self.chunk_repository.replace_document_chunks(
  825. knowledge_base_id=document.knowledge_base_id,
  826. document_id=document.id,
  827. chunks=chunk_payloads)
  828. def _read_chunk_content(self, chunk_payload: dict[str, JSONValue]) -> str:
  829. value = chunk_payload.get("content_text")
  830. return value if isinstance(value, str) else ""
  831. def _parse_document_for_indexing(self, *, document: KnowledgeDocument) -> ParsedDocument:
  832. metadata = document.metadata_json or {}
  833. object_metadata = metadata.get("object_storage")
  834. if isinstance(object_metadata, dict):
  835. object_key = object_metadata.get("objectKey")
  836. if isinstance(object_key, str) and object_key:
  837. try:
  838. raw_content = self.object_storage.get_bytes(object_key=object_key)
  839. except ObjectStorageNotFoundError as exc:
  840. raise ValueError(f"knowledge document content object not found: {document.id}") from exc
  841. return parse_document_content(
  842. source_type=document.source_type,
  843. source_uri=document.source_uri,
  844. content_base64=base64.b64encode(raw_content).decode("ascii"))
  845. if document.content_text:
  846. return parse_document_content(
  847. source_type=document.source_type,
  848. source_uri=document.source_uri,
  849. content_text=document.content_text)
  850. raise ValueError(f"knowledge document content object not found: {document.id}")
  851. def _read_document_content_for_indexing(self, *, document: KnowledgeDocument) -> str:
  852. return self._parse_document_for_indexing(document=document).content_text
  853. def _read_document_raw_content(self, *, document: KnowledgeDocument) -> bytes:
  854. object_key = self._read_document_object_key(document=document)
  855. if isinstance(object_key, str) and object_key:
  856. return self.object_storage.get_bytes(object_key=object_key)
  857. if document.content_text:
  858. return document.content_text.encode("utf-8")
  859. raise ValueError(f"knowledge document content object not found: {document.id}")
  860. def _delete_document_object(self, *, document: KnowledgeDocument) -> bool:
  861. object_key = self._read_document_object_key(document=document)
  862. if object_key is None:
  863. return False
  864. try:
  865. return self.object_storage.delete_object(object_key=object_key)
  866. except ObjectStorageNotFoundError:
  867. return False
  868. def _read_document_object_key(self, *, document: KnowledgeDocument) -> str | None:
  869. object_metadata = self._read_object_storage_metadata(document=document)
  870. if object_metadata is None:
  871. return None
  872. object_key = object_metadata.get("objectKey")
  873. return object_key if isinstance(object_key, str) and object_key else None
  874. def _read_object_storage_metadata(
  875. self,
  876. *,
  877. document: KnowledgeDocument) -> dict[str, JSONValue] | None:
  878. metadata = document.metadata_json or {}
  879. object_metadata = metadata.get("object_storage")
  880. return object_metadata if isinstance(object_metadata, dict) else None
  881. def _object_status_to_payload(
  882. self,
  883. *,
  884. document: KnowledgeDocument,
  885. status: ObjectStorageStatus) -> dict[str, JSONValue]:
  886. return {
  887. "documentId": document.id,
  888. "exists": status.exists,
  889. "objectStorage": self._read_object_storage_metadata(document=document),
  890. "contentType": status.content_type,
  891. "sizeBytes": status.size_bytes,
  892. "etag": status.etag,
  893. "errorMessage": status.error_message,
  894. }
  895. def _read_content_type_from_status(
  896. self,
  897. object_status: dict[str, JSONValue] | None) -> str | None:
  898. if object_status is None:
  899. return None
  900. content_type = object_status.get("contentType")
  901. return content_type if isinstance(content_type, str) else None
  902. def _is_text_content_type(self, *, content_type: str | None, source_type: str) -> bool:
  903. if content_type is not None and content_type.startswith("text/"):
  904. return True
  905. return source_type.strip().lower() in {"text", "txt", "markdown", "md", "html", "htm", "json", "csv"}
  906. def _should_queue_indexing(self, *, async_mode: bool | None) -> bool:
  907. if async_mode is False:
  908. return False
  909. if not self.settings.async_indexing_enabled and async_mode is None:
  910. return False
  911. return self.task_queue_publisher is not None
  912. def _publish_document_index_job(
  913. self,
  914. *,
  915. document_id: str,
  916. action: str,
  917. job_id: str) -> bool:
  918. if self.task_queue_publisher is None:
  919. return False
  920. return self.task_queue_publisher.publish_knowledge_document(
  921. document_id=document_id,
  922. action=action,
  923. job_id=job_id)
  924. def _write_index_job_metadata(
  925. self,
  926. *,
  927. document: KnowledgeDocument,
  928. action: str,
  929. job_id: str,
  930. status: str,
  931. progress: int,
  932. chunk_size: int | None = None,
  933. chunk_overlap: int | None = None,
  934. worker_key: str | None = None,
  935. started_time: datetime | None = None,
  936. completed_time: datetime | None = None,
  937. error_message: str | None = None) -> dict[str, JSONValue]:
  938. metadata = dict(document.metadata_json or {})
  939. existing_job = self._read_index_job_payload(document=document)
  940. queued_time = existing_job.get("queuedTime")
  941. if not isinstance(queued_time, str):
  942. queued_time = datetime.utcnow().isoformat()
  943. resolved_started_time = (
  944. started_time.isoformat()
  945. if started_time is not None
  946. else existing_job.get("startedTime")
  947. )
  948. resolved_completed_time = (
  949. completed_time.isoformat()
  950. if completed_time is not None
  951. else existing_job.get("completedTime")
  952. )
  953. job_payload: dict[str, JSONValue] = {
  954. "jobId": job_id,
  955. "documentId": document.id,
  956. "knowledgeBaseId": document.knowledge_base_id,
  957. "documentTitle": document.title,
  958. "action": action if action in {"index", "reindex"} else "reindex",
  959. "status": status,
  960. "progress": max(0, min(progress, 100)),
  961. "queueName": KNOWLEDGE_DOCUMENT_QUEUE,
  962. "workerKey": worker_key,
  963. "errorMessage": error_message,
  964. "chunkSize": chunk_size,
  965. "chunkOverlap": chunk_overlap,
  966. "queuedTime": queued_time,
  967. "startedTime": resolved_started_time if isinstance(resolved_started_time, str) else None,
  968. "completedTime": resolved_completed_time if isinstance(resolved_completed_time, str) else None,
  969. }
  970. metadata["index_job"] = job_payload
  971. return metadata
  972. def _read_latest_index_job(self, *, document: KnowledgeDocument) -> KnowledgeIndexJobData:
  973. payload = self._read_index_job_payload(document=document)
  974. return KnowledgeIndexJobData(
  975. jobId=self._read_payload_string(payload, "jobId") or f"kjob_{document.id}",
  976. documentId=self._read_payload_string(payload, "documentId") or document.id,
  977. knowledgeBaseId=self._read_payload_string(payload, "knowledgeBaseId") or document.knowledge_base_id,
  978. documentTitle=self._read_payload_string(payload, "documentTitle") or document.title,
  979. action=self._read_job_action(payload.get("action")),
  980. status=self._read_job_status(payload.get("status")),
  981. progress=self._read_payload_int(payload, "progress", 0),
  982. queueName=self._read_payload_string(payload, "queueName"),
  983. workerKey=self._read_payload_string(payload, "workerKey"),
  984. errorMessage=self._read_payload_string(payload, "errorMessage"),
  985. chunkSize=self._read_optional_payload_int(payload, "chunkSize"),
  986. chunkOverlap=self._read_optional_payload_int(payload, "chunkOverlap"),
  987. queuedTime=self._read_payload_datetime(payload, "queuedTime"),
  988. startedTime=self._read_payload_datetime(payload, "startedTime"),
  989. completedTime=self._read_payload_datetime(payload, "completedTime"))
  990. def _try_read_latest_index_job(self, *, document: KnowledgeDocument) -> KnowledgeIndexJobData | None:
  991. if not self._read_index_job_payload(document=document):
  992. return None
  993. return self._read_latest_index_job(document=document)
  994. def _read_index_job_payload(self, *, document: KnowledgeDocument) -> dict[str, JSONValue]:
  995. metadata = document.metadata_json or {}
  996. value = metadata.get("index_job")
  997. if isinstance(value, dict):
  998. return {str(item_key): item_value for item_key, item_value in value.items()}
  999. return {}
  1000. def _read_payload_string(
  1001. self,
  1002. payload: dict[str, JSONValue],
  1003. key: str) -> str | None:
  1004. value = payload.get(key)
  1005. return value if isinstance(value, str) and value else None
  1006. def _read_payload_int(
  1007. self,
  1008. payload: dict[str, JSONValue],
  1009. key: str,
  1010. fallback: int) -> int:
  1011. value = payload.get(key)
  1012. if isinstance(value, int) and not isinstance(value, bool):
  1013. return value
  1014. return fallback
  1015. def _read_optional_payload_int(
  1016. self,
  1017. payload: dict[str, JSONValue],
  1018. key: str) -> int | None:
  1019. value = payload.get(key)
  1020. if isinstance(value, int) and not isinstance(value, bool):
  1021. return value
  1022. return None
  1023. def _read_payload_datetime(
  1024. self,
  1025. payload: dict[str, JSONValue],
  1026. key: str) -> datetime | None:
  1027. value = payload.get(key)
  1028. if not isinstance(value, str) or not value:
  1029. return None
  1030. try:
  1031. return datetime.fromisoformat(value)
  1032. except ValueError:
  1033. return None
  1034. def _read_job_action(self, value: JSONValue) -> KnowledgeIndexJobAction:
  1035. if isinstance(value, str) and value in {"index", "reindex"}:
  1036. return cast(KnowledgeIndexJobAction, value)
  1037. return "reindex"
  1038. def _read_job_status(self, value: JSONValue) -> KnowledgeIndexJobStatus:
  1039. if isinstance(value, str) and value in {"queued", "running", "completed", "failed", "skipped"}:
  1040. return cast(KnowledgeIndexJobStatus, value)
  1041. return "queued"
  1042. def _mark_document_failed(
  1043. self,
  1044. *,
  1045. document: KnowledgeDocument,
  1046. message: str,
  1047. job_id: str | None = None,
  1048. action: str = "reindex",
  1049. worker_key: str | None = None,
  1050. chunk_size: int | None = None,
  1051. chunk_overlap: int | None = None) -> None:
  1052. metadata = dict(document.metadata_json or {})
  1053. metadata["last_error"] = {
  1054. "message": message[:1000],
  1055. "errorType": "indexing_failed",
  1056. }
  1057. if job_id is not None:
  1058. metadata = self._write_index_job_metadata(
  1059. document=document,
  1060. action=action,
  1061. job_id=job_id,
  1062. status="failed",
  1063. progress=100,
  1064. chunk_size=chunk_size,
  1065. chunk_overlap=chunk_overlap,
  1066. worker_key=worker_key,
  1067. completed_time=datetime.utcnow(),
  1068. error_message=message[:1000])
  1069. self.document_repository.update(
  1070. document_id=document.id,
  1071. status="failed",
  1072. metadata_json=metadata)
  1073. def _guess_content_type(self, *, source_type: str) -> str:
  1074. normalized = source_type.strip().lower().removeprefix(".")
  1075. if normalized in {"markdown", "md"}:
  1076. return "text/markdown; charset=utf-8"
  1077. if normalized in {"html", "htm"}:
  1078. return "text/html; charset=utf-8"
  1079. if normalized == "json":
  1080. return "application/json"
  1081. if normalized == "csv":
  1082. return "text/csv; charset=utf-8"
  1083. if normalized == "pdf":
  1084. return "application/pdf"
  1085. if normalized in {"docx", "word"}:
  1086. return "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
  1087. return "text/plain; charset=utf-8"
  1088. def _matches_filters(
  1089. self,
  1090. *,
  1091. document: KnowledgeDocument,
  1092. filters_json: dict[str, JSONValue]) -> bool:
  1093. source_type = filters_json.get("sourceType") or filters_json.get("source_type")
  1094. if isinstance(source_type, str) and document.source_type != source_type:
  1095. return False
  1096. status = filters_json.get("status")
  1097. if isinstance(status, str) and document.status != status:
  1098. return False
  1099. return True
  1100. def read_settings(self, *, knowledge_base_id: str | None = None) -> KnowledgeSettingsDto:
  1101. base_config: dict[str, JSONValue] = {}
  1102. if knowledge_base_id:
  1103. base = self.base_repository.get_by_id(knowledge_base_id=knowledge_base_id)
  1104. if base is not None and isinstance(base.metadata_json, dict):
  1105. value = base.metadata_json.get("retrieval_config")
  1106. if isinstance(value, dict):
  1107. base_config = value
  1108. defaults = KnowledgeSettingsDto(
  1109. knowledgeBaseId=knowledge_base_id,
  1110. chunkSize=self.settings.default_chunk_size,
  1111. chunkOverlap=self.settings.default_chunk_overlap,
  1112. keywordWeight=self.settings.retrieval_keyword_weight,
  1113. vectorWeight=self.settings.retrieval_vector_weight,
  1114. rerankWeight=self.settings.retrieval_rerank_weight,
  1115. queryRewrite=False,
  1116. requireCitations=True)
  1117. return KnowledgeSettingsDto.model_validate({
  1118. **defaults.model_dump(),
  1119. **base_config,
  1120. "knowledgeBaseId": knowledge_base_id,
  1121. })
  1122. def update_settings(
  1123. self,
  1124. payload: KnowledgeSettingsUpdateRequestDto) -> KnowledgeSettingsDto:
  1125. settings = KnowledgeSettingsDto.model_validate({
  1126. **payload.model_dump(),
  1127. "knowledgeBaseId": payload.knowledgeBaseId,
  1128. })
  1129. if payload.knowledgeBaseId:
  1130. base = self.base_repository.get_by_id(knowledge_base_id=payload.knowledgeBaseId)
  1131. if base is not None:
  1132. metadata = dict(base.metadata_json or {})
  1133. metadata["retrieval_config"] = settings.model_dump(exclude={"knowledgeBaseId"})
  1134. self.base_repository.update(
  1135. knowledge_base_id=payload.knowledgeBaseId,
  1136. metadata_json=metadata)
  1137. return settings
  1138. def _build_base_code(self, name: str) -> str:
  1139. base = "".join(
  1140. char.lower() if char.isalnum() else "_"
  1141. for char in name
  1142. ).strip("_") or "knowledge_base"
  1143. return base[:64]
  1144. def build_knowledge_application_service(
  1145. *,
  1146. db: Session,
  1147. settings: KnowledgeServiceSettings) -> KnowledgeApplicationService:
  1148. redis_client = try_build_redis_client(settings.redis_url)
  1149. return KnowledgeApplicationService(
  1150. settings=settings,
  1151. base_repository=KnowledgeBaseRepository(db),
  1152. document_repository=KnowledgeDocumentRepository(db),
  1153. chunk_repository=KnowledgeChunkRepository(db),
  1154. redis_client=redis_client,
  1155. task_queue_publisher=(
  1156. TaskQueuePublisher(client=redis_client) if redis_client is not None else None
  1157. ))