services.py 50 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219
  1. from __future__ import annotations
  2. import base64
  3. import hashlib
  4. from dataclasses import dataclass
  5. from datetime import datetime, timedelta
  6. from typing import TYPE_CHECKING, cast
  7. from uuid import uuid4
  8. from sqlalchemy.orm import Session
  9. from core_shared import JSONValue, try_build_redis_client
  10. from core_shared.task_queue import KNOWLEDGE_DOCUMENT_QUEUE, TaskQueuePublisher
  11. from app.application.document_parsers import (
  12. DocumentParseError,
  13. ParsedDocument,
  14. normalize_source_type,
  15. parse_document_content,
  16. read_document_content_bytes)
  17. from app.application.embeddings import EmbeddingService
  18. from app.application.retrieval import (
  19. build_chunk_payloads,
  20. cosine_similarity,
  21. keyword_score,
  22. rerank_score,
  23. stable_content_hash)
  24. from app.bootstrap.settings import KnowledgeServiceSettings
  25. from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument
  26. from app.domain.repositories import (
  27. KnowledgeBaseRepository,
  28. KnowledgeChunkRepository,
  29. KnowledgeDocumentRepository)
  30. from app.infrastructure.object_storage import (
  31. KnowledgeObjectStorage,
  32. ObjectStorageError,
  33. ObjectStorageNotFoundError,
  34. ObjectStorageStatus,
  35. build_document_object_key,
  36. build_object_storage)
  37. from app.schemas.knowledge import (
  38. KnowledgeBaseCreateRequest,
  39. KnowledgeBaseCreateRequestDto,
  40. KnowledgeBaseStatusUpdateRequest,
  41. KnowledgeBaseUpdateRequestDto,
  42. KnowledgeBaseReindexRequestDto,
  43. KnowledgeDocumentCreateRequest,
  44. KnowledgeDocumentCreateRequestDto,
  45. KnowledgeIndexJobAction,
  46. KnowledgeIndexJobData,
  47. KnowledgeIndexJobStatus,
  48. KnowledgeDocumentParseRequest,
  49. KnowledgeDocumentReindexRequestDto,
  50. KnowledgeDocumentUpdateRequestDto,
  51. KnowledgeSettingsDto,
  52. KnowledgeSettingsUpdateRequestDto,
  53. KnowledgeSearchRequest)
  54. if TYPE_CHECKING:
  55. from redis import Redis
  56. @dataclass(frozen=True, slots=True)
  57. class KnowledgeDocumentIngestResult:
  58. document: KnowledgeDocument
  59. chunks: list[KnowledgeChunk]
  60. queued: bool = False
  61. job: KnowledgeIndexJobData | None = None
  62. class KnowledgeIndexingError(RuntimeError):
  63. def __init__(self, *, document_id: str, message: str) -> None:
  64. super().__init__(message)
  65. self.document_id = document_id
  66. class KnowledgeApplicationService:
  67. def __init__(
  68. self,
  69. *,
  70. settings: KnowledgeServiceSettings,
  71. base_repository: KnowledgeBaseRepository,
  72. document_repository: KnowledgeDocumentRepository,
  73. chunk_repository: KnowledgeChunkRepository,
  74. object_storage: KnowledgeObjectStorage | None = None,
  75. redis_client: Redis | None = None,
  76. task_queue_publisher: TaskQueuePublisher | None = None) -> None:
  77. self.settings = settings
  78. self.base_repository = base_repository
  79. self.document_repository = document_repository
  80. self.chunk_repository = chunk_repository
  81. self.embedding_service = EmbeddingService(settings=settings)
  82. self._object_storage = object_storage
  83. self.redis_client = redis_client
  84. self.task_queue_publisher = task_queue_publisher
  85. @property
  86. def object_storage(self) -> KnowledgeObjectStorage:
  87. if self._object_storage is None:
  88. self._object_storage = build_object_storage(self.settings)
  89. return self._object_storage
  90. def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase:
  91. return self.base_repository.create(
  92. code=payload.code,
  93. name=payload.name,
  94. description=payload.description,
  95. metadata_json=payload.metadata_json)
  96. def create_base_from_contract(
  97. self,
  98. payload: KnowledgeBaseCreateRequestDto) -> KnowledgeBase:
  99. return self.create_base(
  100. KnowledgeBaseCreateRequest(
  101. code=self._build_base_code(payload.name),
  102. name=payload.name,
  103. description=payload.description,
  104. metadata_json=payload.metadata))
  105. def list_bases(self) -> list[KnowledgeBase]:
  106. return self.base_repository.list_all()
  107. def list_bases_filtered(
  108. self,
  109. *,
  110. keyword: str | None = None,
  111. status: str | None = None) -> list[KnowledgeBase]:
  112. return self.base_repository.list_filtered(
  113. keyword=keyword,
  114. status=status)
  115. def update_base_from_contract(
  116. self,
  117. payload: KnowledgeBaseUpdateRequestDto) -> KnowledgeBase | None:
  118. return self.base_repository.update(
  119. knowledge_base_id=payload.knowledgeBaseId,
  120. name=payload.name,
  121. description=payload.description,
  122. status=payload.status,
  123. metadata_json=payload.metadata)
  124. def delete_base(self, *, knowledge_base_id: str) -> bool:
  125. documents = self.document_repository.list_by_base(
  126. knowledge_base_id=knowledge_base_id)
  127. for document in documents:
  128. self._delete_document_object(document=document)
  129. self.chunk_repository.delete_by_base(knowledge_base_id=knowledge_base_id)
  130. for document in documents:
  131. self.document_repository.delete(document_id=document.id)
  132. return self.base_repository.delete(knowledge_base_id=knowledge_base_id)
  133. def update_base_status(
  134. self,
  135. *,
  136. knowledge_base_id: str,
  137. payload: KnowledgeBaseStatusUpdateRequest) -> KnowledgeBase | None:
  138. return self.base_repository.update_status(
  139. knowledge_base_id=knowledge_base_id,
  140. status=payload.status)
  141. def create_document(
  142. self,
  143. payload: KnowledgeDocumentCreateRequest) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
  144. knowledge_base = self.base_repository.get_by_id(
  145. knowledge_base_id=payload.knowledge_base_id)
  146. if knowledge_base is None:
  147. raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
  148. parsed = self.parse_document(
  149. KnowledgeDocumentParseRequest(
  150. source_type=payload.source_type,
  151. source_uri=payload.source_uri,
  152. content_text=payload.content_text,
  153. content_base64=payload.content_base64)
  154. )
  155. raw_content = read_document_content_bytes(
  156. content_text=payload.content_text,
  157. content_base64=payload.content_base64)
  158. object_key = build_document_object_key(
  159. knowledge_base_id=payload.knowledge_base_id,
  160. source_type=parsed.source_type,
  161. title=payload.title)
  162. stored_object = self.object_storage.put_bytes(
  163. object_key=object_key,
  164. content=raw_content,
  165. content_type=self._guess_content_type(source_type=parsed.source_type))
  166. document: KnowledgeDocument | None = None
  167. try:
  168. metadata_json = {
  169. **payload.metadata_json,
  170. "parser_metadata": parsed.metadata_json,
  171. "object_storage": stored_object.to_metadata(),
  172. }
  173. document = self.document_repository.create(
  174. knowledge_base_id=payload.knowledge_base_id,
  175. title=payload.title,
  176. source_type=parsed.source_type,
  177. source_uri=payload.source_uri,
  178. content_text="",
  179. content_hash=stable_content_hash(parsed.content_text),
  180. metadata_json=metadata_json)
  181. try:
  182. chunks = self._index_document(
  183. document=document,
  184. content_text=parsed.content_text,
  185. chunk_size=payload.chunk_size,
  186. chunk_overlap=payload.chunk_overlap)
  187. except Exception as exc:
  188. self._mark_document_failed(document=document, message=str(exc))
  189. raise KnowledgeIndexingError(
  190. document_id=document.id,
  191. message=f"knowledge document indexing failed: {exc}") from exc
  192. indexed_document = self.document_repository.update_status(
  193. document_id=document.id,
  194. status="indexed")
  195. return indexed_document or document, chunks
  196. except Exception:
  197. if document is None:
  198. try:
  199. self.object_storage.delete_object(object_key=stored_object.object_key)
  200. except ObjectStorageError:
  201. pass
  202. raise
  203. def create_document_from_contract(
  204. self,
  205. payload: KnowledgeDocumentCreateRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
  206. return self.create_document(
  207. KnowledgeDocumentCreateRequest(
  208. knowledge_base_id=payload.knowledgeBaseId,
  209. title=payload.title,
  210. content_text=payload.contentText,
  211. content_base64=payload.contentBase64,
  212. source_type=payload.sourceType,
  213. source_uri=payload.sourceUri,
  214. metadata_json=payload.metadata,
  215. chunk_size=payload.chunkSize,
  216. chunk_overlap=payload.chunkOverlap))
  217. def create_document_from_contract_result(
  218. self,
  219. payload: KnowledgeDocumentCreateRequestDto) -> KnowledgeDocumentIngestResult:
  220. request = KnowledgeDocumentCreateRequest(
  221. knowledge_base_id=payload.knowledgeBaseId,
  222. title=payload.title,
  223. content_text=payload.contentText,
  224. content_base64=payload.contentBase64,
  225. source_type=payload.sourceType,
  226. source_uri=payload.sourceUri,
  227. metadata_json=payload.metadata,
  228. chunk_size=payload.chunkSize,
  229. chunk_overlap=payload.chunkOverlap)
  230. if self._should_queue_indexing(async_mode=payload.asyncMode):
  231. return self.create_document_index_job(payload=request)
  232. document, chunks = self.create_document(request)
  233. return KnowledgeDocumentIngestResult(
  234. document=document,
  235. chunks=chunks)
  236. def create_document_index_job(
  237. self,
  238. payload: KnowledgeDocumentCreateRequest) -> KnowledgeDocumentIngestResult:
  239. knowledge_base = self.base_repository.get_by_id(
  240. knowledge_base_id=payload.knowledge_base_id)
  241. if knowledge_base is None:
  242. raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
  243. raw_content = read_document_content_bytes(
  244. content_text=payload.content_text,
  245. content_base64=payload.content_base64)
  246. source_type = normalize_source_type(
  247. source_type=payload.source_type,
  248. source_uri=payload.source_uri)
  249. object_key = build_document_object_key(
  250. knowledge_base_id=payload.knowledge_base_id,
  251. source_type=source_type,
  252. title=payload.title)
  253. stored_object = self.object_storage.put_bytes(
  254. object_key=object_key,
  255. content=raw_content,
  256. content_type=self._guess_content_type(source_type=source_type))
  257. document: KnowledgeDocument | None = None
  258. try:
  259. document = self.document_repository.create(
  260. knowledge_base_id=payload.knowledge_base_id,
  261. title=payload.title,
  262. source_type=source_type,
  263. source_uri=payload.source_uri,
  264. content_text="",
  265. content_hash=hashlib.sha256(raw_content).hexdigest(),
  266. metadata_json={
  267. **payload.metadata_json,
  268. "object_storage": stored_object.to_metadata(),
  269. },
  270. status="draft")
  271. queued_document, job = self.queue_document_indexing(
  272. document=document,
  273. action="index",
  274. chunk_size=payload.chunk_size,
  275. chunk_overlap=payload.chunk_overlap)
  276. except Exception:
  277. if document is None:
  278. try:
  279. self.object_storage.delete_object(object_key=stored_object.object_key)
  280. except ObjectStorageError:
  281. pass
  282. raise
  283. return KnowledgeDocumentIngestResult(
  284. document=queued_document,
  285. chunks=[],
  286. queued=True,
  287. job=job)
  288. def parse_document(self, payload: KnowledgeDocumentParseRequest) -> ParsedDocument:
  289. try:
  290. return parse_document_content(
  291. source_type=payload.source_type,
  292. content_text=payload.content_text,
  293. content_base64=payload.content_base64,
  294. source_uri=payload.source_uri)
  295. except DocumentParseError:
  296. raise
  297. def queue_document_indexing(
  298. self,
  299. *,
  300. document: KnowledgeDocument,
  301. action: str,
  302. chunk_size: int | None = None,
  303. chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, KnowledgeIndexJobData]:
  304. job_id = f"kjob_{uuid4().hex}"
  305. metadata = self._write_index_job_metadata(
  306. document=document,
  307. action=action,
  308. job_id=job_id,
  309. status="queued",
  310. progress=0,
  311. chunk_size=chunk_size,
  312. chunk_overlap=chunk_overlap)
  313. updated_document = self.document_repository.update(
  314. document_id=document.id,
  315. status="queued",
  316. metadata_json=metadata)
  317. document_for_job = updated_document or document
  318. published = self._publish_document_index_job(
  319. document_id=document.id,
  320. action=action,
  321. job_id=job_id)
  322. if not published:
  323. metadata = self._write_index_job_metadata(
  324. document=document_for_job,
  325. action=action,
  326. job_id=job_id,
  327. status="running",
  328. progress=1,
  329. chunk_size=chunk_size,
  330. chunk_overlap=chunk_overlap,
  331. worker_key="inline-fallback")
  332. document_for_job = self.document_repository.update(
  333. document_id=document.id,
  334. status="indexing",
  335. metadata_json=metadata) or document_for_job
  336. processed = self.process_document_index_job(
  337. document_id=document.id,
  338. action=action,
  339. job_id=job_id,
  340. worker_key="inline-fallback",
  341. chunk_size=chunk_size,
  342. chunk_overlap=chunk_overlap)
  343. if processed is not None:
  344. indexed_document, chunks = processed
  345. return indexed_document, self._read_latest_index_job(document=indexed_document)
  346. return document_for_job, self._read_latest_index_job(document=document_for_job)
  347. def list_documents(
  348. self,
  349. *,
  350. knowledge_base_id: str) -> list[KnowledgeDocument]:
  351. return self.document_repository.list_by_base(
  352. knowledge_base_id=knowledge_base_id)
  353. def list_documents_filtered(
  354. self,
  355. *,
  356. knowledge_base_id: str | None = None,
  357. keyword: str | None = None,
  358. status: str | None = None,
  359. source_type: str | None = None) -> list[KnowledgeDocument]:
  360. return self.document_repository.list_filtered(
  361. knowledge_base_id=knowledge_base_id,
  362. keyword=keyword,
  363. status=status,
  364. source_type=source_type)
  365. def update_document_from_contract(
  366. self,
  367. payload: KnowledgeDocumentUpdateRequestDto) -> KnowledgeDocument | None:
  368. return self.document_repository.update(
  369. document_id=payload.documentId,
  370. title=payload.title,
  371. source_uri=payload.sourceUri,
  372. status=payload.status,
  373. metadata_json=payload.metadata)
  374. def reindex_document(
  375. self,
  376. payload: KnowledgeDocumentReindexRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
  377. document = self.document_repository.get_by_id(document_id=payload.documentId)
  378. if document is None:
  379. return None
  380. try:
  381. parsed = self._parse_document_for_indexing(document=document)
  382. chunks = self._index_document(
  383. document=document,
  384. content_text=parsed.content_text,
  385. chunk_size=payload.chunkSize,
  386. chunk_overlap=payload.chunkOverlap)
  387. except Exception as exc:
  388. self._mark_document_failed(document=document, message=str(exc))
  389. raise KnowledgeIndexingError(
  390. document_id=document.id,
  391. message=f"knowledge document reindex failed: {exc}") from exc
  392. metadata = dict(document.metadata_json or {})
  393. metadata["parser_metadata"] = parsed.metadata_json
  394. indexed_document = self.document_repository.update(
  395. document_id=document.id,
  396. status="indexed",
  397. metadata_json=metadata)
  398. return indexed_document or document, chunks
  399. def reindex_document_from_contract_result(
  400. self,
  401. payload: KnowledgeDocumentReindexRequestDto) -> KnowledgeDocumentIngestResult | None:
  402. if self._should_queue_indexing(async_mode=payload.asyncMode):
  403. document = self.document_repository.get_by_id(document_id=payload.documentId)
  404. if document is None:
  405. return None
  406. queued_document, job = self.queue_document_indexing(
  407. document=document,
  408. action="reindex",
  409. chunk_size=payload.chunkSize,
  410. chunk_overlap=payload.chunkOverlap)
  411. return KnowledgeDocumentIngestResult(
  412. document=queued_document,
  413. chunks=[],
  414. queued=True,
  415. job=job)
  416. result = self.reindex_document(payload)
  417. if result is None:
  418. return None
  419. document, chunks = result
  420. return KnowledgeDocumentIngestResult(
  421. document=document,
  422. chunks=chunks)
  423. def reindex_base_from_contract(
  424. self,
  425. payload: KnowledgeBaseReindexRequestDto) -> list[KnowledgeIndexJobData]:
  426. documents = self.document_repository.list_by_base(
  427. knowledge_base_id=payload.knowledgeBaseId)
  428. jobs: list[KnowledgeIndexJobData] = []
  429. for document in documents:
  430. if document.status == "archived":
  431. continue
  432. queued_document, job = self.queue_document_indexing(
  433. document=document,
  434. action="reindex",
  435. chunk_size=payload.chunkSize,
  436. chunk_overlap=payload.chunkOverlap)
  437. jobs.append(job or self._read_latest_index_job(document=queued_document))
  438. return jobs
  439. def process_document_index_job(
  440. self,
  441. *,
  442. document_id: str,
  443. action: str,
  444. worker_key: str,
  445. job_id: str | None = None,
  446. chunk_size: int | None = None,
  447. chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
  448. document = self.document_repository.get_by_id(document_id=document_id)
  449. if document is None:
  450. return None
  451. resolved_job = self._read_latest_index_job(document=document)
  452. resolved_job_id = job_id or resolved_job.jobId
  453. resolved_chunk_size = chunk_size if chunk_size is not None else resolved_job.chunkSize
  454. resolved_chunk_overlap = chunk_overlap if chunk_overlap is not None else resolved_job.chunkOverlap
  455. if document.status == "archived":
  456. metadata = self._write_index_job_metadata(
  457. document=document,
  458. action=action,
  459. job_id=resolved_job_id,
  460. status="skipped",
  461. progress=100,
  462. chunk_size=resolved_chunk_size,
  463. chunk_overlap=resolved_chunk_overlap,
  464. worker_key=worker_key,
  465. completed_time=datetime.utcnow(),
  466. error_message="document is archived")
  467. skipped_document = self.document_repository.update(
  468. document_id=document.id,
  469. metadata_json=metadata) or document
  470. return skipped_document, []
  471. metadata = self._write_index_job_metadata(
  472. document=document,
  473. action=action,
  474. job_id=resolved_job_id,
  475. status="running",
  476. progress=10,
  477. chunk_size=resolved_chunk_size,
  478. chunk_overlap=resolved_chunk_overlap,
  479. worker_key=worker_key,
  480. started_time=datetime.utcnow())
  481. running_document = self.document_repository.update(
  482. document_id=document.id,
  483. status="indexing",
  484. metadata_json=metadata) or document
  485. try:
  486. parsed = self._parse_document_for_indexing(document=running_document)
  487. metadata = self._write_index_job_metadata(
  488. document=running_document,
  489. action=action,
  490. job_id=resolved_job_id,
  491. status="running",
  492. progress=40,
  493. chunk_size=resolved_chunk_size,
  494. chunk_overlap=resolved_chunk_overlap,
  495. worker_key=worker_key,
  496. started_time=self._read_latest_index_job(document=running_document).startedTime)
  497. metadata["parser_metadata"] = parsed.metadata_json
  498. running_document = self.document_repository.update(
  499. document_id=document.id,
  500. status="indexing",
  501. metadata_json=metadata) or running_document
  502. chunks = self._index_document(
  503. document=running_document,
  504. content_text=parsed.content_text,
  505. chunk_size=resolved_chunk_size,
  506. chunk_overlap=resolved_chunk_overlap)
  507. except Exception as exc:
  508. self._mark_document_failed(
  509. document=running_document,
  510. message=str(exc),
  511. job_id=resolved_job_id,
  512. action=action,
  513. worker_key=worker_key,
  514. chunk_size=resolved_chunk_size,
  515. chunk_overlap=resolved_chunk_overlap)
  516. raise KnowledgeIndexingError(
  517. document_id=document.id,
  518. message=f"knowledge document {action} failed: {exc}") from exc
  519. metadata = self._write_index_job_metadata(
  520. document=running_document,
  521. action=action,
  522. job_id=resolved_job_id,
  523. status="completed",
  524. progress=100,
  525. chunk_size=resolved_chunk_size,
  526. chunk_overlap=resolved_chunk_overlap,
  527. worker_key=worker_key,
  528. completed_time=datetime.utcnow())
  529. metadata["parser_metadata"] = parsed.metadata_json
  530. indexed_document = self.document_repository.update(
  531. document_id=document.id,
  532. status="indexed",
  533. metadata_json=metadata)
  534. return indexed_document or running_document, chunks
  535. def execute_document_index_job(
  536. self,
  537. *,
  538. document_id: str,
  539. action: str,
  540. worker_key: str,
  541. lease_seconds: int,
  542. job_id: str | None = None,
  543. redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
  544. resolved_redis_client = redis_client or self.redis_client
  545. idempotency_key = f"{document_id}:{job_id or action}"
  546. lock = None
  547. idempotency_store = None
  548. if resolved_redis_client is not None:
  549. from core_shared.redis_primitives import DistributedLock, IdempotencyStore
  550. lock = DistributedLock(
  551. client=resolved_redis_client,
  552. name=f"knowledge-document:{document_id}:lock",
  553. ttl_seconds=lease_seconds)
  554. if not lock.acquire():
  555. return None
  556. idempotency_store = IdempotencyStore(
  557. client=resolved_redis_client,
  558. prefix="knowledge-document-idempotency")
  559. if not idempotency_store.begin(key=idempotency_key):
  560. lock.release()
  561. return None
  562. try:
  563. result = self.process_document_index_job(
  564. document_id=document_id,
  565. action=action,
  566. job_id=job_id,
  567. worker_key=worker_key)
  568. if idempotency_store is not None and result is not None:
  569. document, chunks = result
  570. idempotency_store.complete(
  571. key=idempotency_key,
  572. result={
  573. "status": document.status,
  574. "document_id": document.id,
  575. "chunk_count": len(chunks),
  576. })
  577. except Exception:
  578. if idempotency_store is not None:
  579. idempotency_store.clear(key=idempotency_key)
  580. raise
  581. finally:
  582. if lock is not None:
  583. lock.release()
  584. return result
  585. def execute_next_pending_document_job(
  586. self,
  587. *,
  588. worker_key: str,
  589. lease_seconds: int,
  590. stale_indexing_seconds: int,
  591. redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
  592. stale_before = datetime.utcnow() - timedelta(seconds=stale_indexing_seconds)
  593. document = self.document_repository.get_next_pending_indexing(
  594. stale_before=stale_before)
  595. if document is None:
  596. return None
  597. job = self._read_latest_index_job(document=document)
  598. return self.execute_document_index_job(
  599. document_id=document.id,
  600. action=job.action,
  601. job_id=job.jobId,
  602. worker_key=worker_key,
  603. lease_seconds=lease_seconds,
  604. redis_client=redis_client)
  605. def list_index_jobs(
  606. self,
  607. *,
  608. knowledge_base_id: str | None = None,
  609. document_id: str | None = None,
  610. status: str | None = None) -> list[KnowledgeIndexJobData]:
  611. documents = self.document_repository.list_filtered(
  612. knowledge_base_id=knowledge_base_id)
  613. jobs: list[KnowledgeIndexJobData] = []
  614. for document in documents:
  615. if document_id is not None and document.id != document_id:
  616. continue
  617. job = self._try_read_latest_index_job(document=document)
  618. if job is None:
  619. continue
  620. if status is not None and job.status != status:
  621. continue
  622. jobs.append(job)
  623. jobs.sort(
  624. key=lambda item: item.queuedTime or datetime.min,
  625. reverse=True)
  626. return jobs
  627. def detail_index_job(self, *, document_id: str) -> KnowledgeIndexJobData | None:
  628. document = self.document_repository.get_by_id(document_id=document_id)
  629. if document is None:
  630. return None
  631. return self._try_read_latest_index_job(document=document)
  632. def delete_document(self, *, document_id: str) -> bool:
  633. return bool(self.delete_document_result(document_id=document_id)["deleted"])
  634. def delete_document_result(self, *, document_id: str) -> dict[str, JSONValue]:
  635. document = self.document_repository.get_by_id(document_id=document_id)
  636. if document is None:
  637. return {
  638. "deleted": False,
  639. "objectDeleted": False,
  640. "documentId": document_id,
  641. }
  642. object_deleted = self._delete_document_object(document=document)
  643. self.chunk_repository.delete_by_document(document_id=document_id)
  644. deleted = self.document_repository.delete(document_id=document_id) is not None
  645. return {
  646. "deleted": deleted,
  647. "objectDeleted": object_deleted,
  648. "documentId": document_id,
  649. }
  650. def read_document_content(
  651. self,
  652. *,
  653. document_id: str,
  654. include_text: bool = True,
  655. include_base64: bool = False) -> dict[str, JSONValue] | None:
  656. document = self.document_repository.get_by_id(document_id=document_id)
  657. if document is None:
  658. return None
  659. raw_content = self._read_document_raw_content(document=document)
  660. object_status = self.read_document_storage_status(document_id=document_id)
  661. content_type = self._read_content_type_from_status(object_status)
  662. payload: dict[str, JSONValue] = {
  663. "documentId": document.id,
  664. "title": document.title,
  665. "sourceType": document.source_type,
  666. "contentType": content_type,
  667. "sizeBytes": len(raw_content),
  668. "objectStorage": self._read_object_storage_metadata(document=document),
  669. "contentBase64": None,
  670. "contentText": None,
  671. }
  672. if include_base64:
  673. payload["contentBase64"] = base64.b64encode(raw_content).decode("ascii")
  674. if include_text and self._is_text_content_type(content_type=content_type, source_type=document.source_type):
  675. payload["contentText"] = raw_content.decode("utf-8", errors="replace")
  676. return payload
  677. def read_document_storage_status(self, *, document_id: str) -> dict[str, JSONValue] | None:
  678. document = self.document_repository.get_by_id(document_id=document_id)
  679. if document is None:
  680. return None
  681. object_key = self._read_document_object_key(document=document)
  682. if object_key is None:
  683. return {
  684. "documentId": document.id,
  685. "exists": False,
  686. "objectStorage": None,
  687. "errorMessage": "document object metadata is missing",
  688. }
  689. status = self.object_storage.head_object(object_key=object_key)
  690. return self._object_status_to_payload(document=document, status=status)
  691. def read_storage_health(self) -> dict[str, JSONValue]:
  692. return dict(self.object_storage.health_check())
  693. def list_chunks_filtered(
  694. self,
  695. *,
  696. knowledge_base_id: str | None = None,
  697. document_id: str | None = None,
  698. keyword: str | None = None) -> list[KnowledgeChunk]:
  699. return self.chunk_repository.list_filtered(
  700. knowledge_base_id=knowledge_base_id,
  701. document_id=document_id,
  702. keyword=keyword)
  703. def delete_chunk(self, *, chunk_id: str) -> bool:
  704. return self.chunk_repository.delete(chunk_id=chunk_id)
  705. def search(
  706. self,
  707. payload: KnowledgeSearchRequest) -> list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]]:
  708. document_cache: dict[str, KnowledgeDocument] = {}
  709. query_embedding_result = self.embedding_service.embed_text(payload.query)
  710. candidate_limit = max(
  711. payload.top_k * max(self.settings.retrieval_candidate_multiplier, 1),
  712. payload.top_k)
  713. vector_candidates = self.chunk_repository.search_by_vector(
  714. knowledge_base_id=payload.knowledge_base_id,
  715. embedding=query_embedding_result.embedding,
  716. limit=candidate_limit)
  717. if vector_candidates:
  718. chunks = [chunk for chunk, _ in vector_candidates]
  719. vector_scores_by_chunk_id = {
  720. chunk.id: score for chunk, score in vector_candidates
  721. }
  722. retrieval_mode = "pgvector-hybrid"
  723. else:
  724. chunks = self.chunk_repository.list_by_base(
  725. knowledge_base_id=payload.knowledge_base_id)
  726. vector_scores_by_chunk_id = {}
  727. retrieval_mode = "hybrid"
  728. scored: list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]] = []
  729. for chunk in chunks:
  730. document = document_cache.get(chunk.document_id)
  731. if document is None:
  732. document = self.document_repository.get_by_id(
  733. document_id=chunk.document_id)
  734. if document is None:
  735. continue
  736. document_cache[chunk.document_id] = document
  737. if not self._matches_filters(document=document, filters_json=payload.filters_json):
  738. continue
  739. keyword = keyword_score(payload.query, chunk.content_text)
  740. vector = vector_scores_by_chunk_id.get(chunk.id)
  741. if vector is None:
  742. vector = cosine_similarity(query_embedding_result.embedding, chunk.embedding_json)
  743. rerank = (
  744. rerank_score(
  745. query=payload.query,
  746. chunk_text=chunk.content_text,
  747. document_title=document.title)
  748. if self.settings.retrieval_rerank_enabled
  749. else 0.0
  750. )
  751. score = round(
  752. keyword * self.settings.retrieval_keyword_weight
  753. + vector * self.settings.retrieval_vector_weight
  754. + rerank * self.settings.retrieval_rerank_weight,
  755. 6)
  756. scored.append(
  757. (
  758. chunk,
  759. document,
  760. score,
  761. {
  762. "final_score": score,
  763. "keyword_score": round(keyword, 6),
  764. "vector_score": round(vector, 6),
  765. "rerank_score": round(rerank, 6),
  766. "retrieval_mode": retrieval_mode,
  767. "rerank_enabled": self.settings.retrieval_rerank_enabled,
  768. "candidate_limit": candidate_limit,
  769. "weights": {
  770. "keyword": self.settings.retrieval_keyword_weight,
  771. "vector": self.settings.retrieval_vector_weight,
  772. "rerank": self.settings.retrieval_rerank_weight,
  773. },
  774. "embedding_provider": query_embedding_result.provider,
  775. "embedding_model": query_embedding_result.model,
  776. "citation": {
  777. "document_id": document.id,
  778. "document_title": document.title,
  779. "source_uri": document.source_uri,
  780. "chunk_id": chunk.id,
  781. "chunk_index": chunk.chunk_index,
  782. },
  783. })
  784. )
  785. scored.sort(key=lambda item: item[2], reverse=True)
  786. return scored[: payload.top_k]
  787. def _index_document(
  788. self,
  789. *,
  790. document: KnowledgeDocument,
  791. content_text: str,
  792. chunk_size: int | None,
  793. chunk_overlap: int | None) -> list[KnowledgeChunk]:
  794. chunk_payloads = build_chunk_payloads(
  795. content_text=content_text,
  796. chunk_size=chunk_size or self.settings.default_chunk_size,
  797. chunk_overlap=chunk_overlap or self.settings.default_chunk_overlap)
  798. for chunk_payload in chunk_payloads:
  799. content_text = self._read_chunk_content(chunk_payload)
  800. embedding_result = self.embedding_service.embed_text(content_text)
  801. chunk_payload["embedding_model"] = embedding_result.model
  802. chunk_payload["embedding_json"] = embedding_result.embedding
  803. chunk_payload["metadata_json"] = {
  804. "embedding_provider": embedding_result.provider,
  805. }
  806. return self.chunk_repository.replace_document_chunks(
  807. knowledge_base_id=document.knowledge_base_id,
  808. document_id=document.id,
  809. chunks=chunk_payloads)
  810. def _read_chunk_content(self, chunk_payload: dict[str, JSONValue]) -> str:
  811. value = chunk_payload.get("content_text")
  812. return value if isinstance(value, str) else ""
  813. def _parse_document_for_indexing(self, *, document: KnowledgeDocument) -> ParsedDocument:
  814. metadata = document.metadata_json or {}
  815. object_metadata = metadata.get("object_storage")
  816. if isinstance(object_metadata, dict):
  817. object_key = object_metadata.get("objectKey")
  818. if isinstance(object_key, str) and object_key:
  819. try:
  820. raw_content = self.object_storage.get_bytes(object_key=object_key)
  821. except ObjectStorageNotFoundError as exc:
  822. raise ValueError(f"knowledge document content object not found: {document.id}") from exc
  823. return parse_document_content(
  824. source_type=document.source_type,
  825. source_uri=document.source_uri,
  826. content_base64=base64.b64encode(raw_content).decode("ascii"))
  827. if document.content_text:
  828. return parse_document_content(
  829. source_type=document.source_type,
  830. source_uri=document.source_uri,
  831. content_text=document.content_text)
  832. raise ValueError(f"knowledge document content object not found: {document.id}")
  833. def _read_document_content_for_indexing(self, *, document: KnowledgeDocument) -> str:
  834. return self._parse_document_for_indexing(document=document).content_text
  835. def _read_document_raw_content(self, *, document: KnowledgeDocument) -> bytes:
  836. object_key = self._read_document_object_key(document=document)
  837. if isinstance(object_key, str) and object_key:
  838. return self.object_storage.get_bytes(object_key=object_key)
  839. if document.content_text:
  840. return document.content_text.encode("utf-8")
  841. raise ValueError(f"knowledge document content object not found: {document.id}")
  842. def _delete_document_object(self, *, document: KnowledgeDocument) -> bool:
  843. object_key = self._read_document_object_key(document=document)
  844. if object_key is None:
  845. return False
  846. try:
  847. return self.object_storage.delete_object(object_key=object_key)
  848. except ObjectStorageNotFoundError:
  849. return False
  850. def _read_document_object_key(self, *, document: KnowledgeDocument) -> str | None:
  851. object_metadata = self._read_object_storage_metadata(document=document)
  852. if object_metadata is None:
  853. return None
  854. object_key = object_metadata.get("objectKey")
  855. return object_key if isinstance(object_key, str) and object_key else None
  856. def _read_object_storage_metadata(
  857. self,
  858. *,
  859. document: KnowledgeDocument) -> dict[str, JSONValue] | None:
  860. metadata = document.metadata_json or {}
  861. object_metadata = metadata.get("object_storage")
  862. return object_metadata if isinstance(object_metadata, dict) else None
  863. def _object_status_to_payload(
  864. self,
  865. *,
  866. document: KnowledgeDocument,
  867. status: ObjectStorageStatus) -> dict[str, JSONValue]:
  868. return {
  869. "documentId": document.id,
  870. "exists": status.exists,
  871. "objectStorage": self._read_object_storage_metadata(document=document),
  872. "contentType": status.content_type,
  873. "sizeBytes": status.size_bytes,
  874. "etag": status.etag,
  875. "errorMessage": status.error_message,
  876. }
  877. def _read_content_type_from_status(
  878. self,
  879. object_status: dict[str, JSONValue] | None) -> str | None:
  880. if object_status is None:
  881. return None
  882. content_type = object_status.get("contentType")
  883. return content_type if isinstance(content_type, str) else None
  884. def _is_text_content_type(self, *, content_type: str | None, source_type: str) -> bool:
  885. if content_type is not None and content_type.startswith("text/"):
  886. return True
  887. return source_type.strip().lower() in {"text", "txt", "markdown", "md", "html", "htm", "json", "csv"}
  888. def _should_queue_indexing(self, *, async_mode: bool | None) -> bool:
  889. if async_mode is False:
  890. return False
  891. if not self.settings.async_indexing_enabled and async_mode is None:
  892. return False
  893. return self.task_queue_publisher is not None
  894. def _publish_document_index_job(
  895. self,
  896. *,
  897. document_id: str,
  898. action: str,
  899. job_id: str) -> bool:
  900. if self.task_queue_publisher is None:
  901. return False
  902. return self.task_queue_publisher.publish_knowledge_document(
  903. document_id=document_id,
  904. action=action,
  905. job_id=job_id)
  906. def _write_index_job_metadata(
  907. self,
  908. *,
  909. document: KnowledgeDocument,
  910. action: str,
  911. job_id: str,
  912. status: str,
  913. progress: int,
  914. chunk_size: int | None = None,
  915. chunk_overlap: int | None = None,
  916. worker_key: str | None = None,
  917. started_time: datetime | None = None,
  918. completed_time: datetime | None = None,
  919. error_message: str | None = None) -> dict[str, JSONValue]:
  920. metadata = dict(document.metadata_json or {})
  921. existing_job = self._read_index_job_payload(document=document)
  922. queued_time = existing_job.get("queuedTime")
  923. if not isinstance(queued_time, str):
  924. queued_time = datetime.utcnow().isoformat()
  925. resolved_started_time = (
  926. started_time.isoformat()
  927. if started_time is not None
  928. else existing_job.get("startedTime")
  929. )
  930. resolved_completed_time = (
  931. completed_time.isoformat()
  932. if completed_time is not None
  933. else existing_job.get("completedTime")
  934. )
  935. job_payload: dict[str, JSONValue] = {
  936. "jobId": job_id,
  937. "documentId": document.id,
  938. "knowledgeBaseId": document.knowledge_base_id,
  939. "documentTitle": document.title,
  940. "action": action if action in {"index", "reindex"} else "reindex",
  941. "status": status,
  942. "progress": max(0, min(progress, 100)),
  943. "queueName": KNOWLEDGE_DOCUMENT_QUEUE,
  944. "workerKey": worker_key,
  945. "errorMessage": error_message,
  946. "chunkSize": chunk_size,
  947. "chunkOverlap": chunk_overlap,
  948. "queuedTime": queued_time,
  949. "startedTime": resolved_started_time if isinstance(resolved_started_time, str) else None,
  950. "completedTime": resolved_completed_time if isinstance(resolved_completed_time, str) else None,
  951. }
  952. metadata["index_job"] = job_payload
  953. return metadata
  954. def _read_latest_index_job(self, *, document: KnowledgeDocument) -> KnowledgeIndexJobData:
  955. payload = self._read_index_job_payload(document=document)
  956. return KnowledgeIndexJobData(
  957. jobId=self._read_payload_string(payload, "jobId") or f"kjob_{document.id}",
  958. documentId=self._read_payload_string(payload, "documentId") or document.id,
  959. knowledgeBaseId=self._read_payload_string(payload, "knowledgeBaseId") or document.knowledge_base_id,
  960. documentTitle=self._read_payload_string(payload, "documentTitle") or document.title,
  961. action=self._read_job_action(payload.get("action")),
  962. status=self._read_job_status(payload.get("status")),
  963. progress=self._read_payload_int(payload, "progress", 0),
  964. queueName=self._read_payload_string(payload, "queueName"),
  965. workerKey=self._read_payload_string(payload, "workerKey"),
  966. errorMessage=self._read_payload_string(payload, "errorMessage"),
  967. chunkSize=self._read_optional_payload_int(payload, "chunkSize"),
  968. chunkOverlap=self._read_optional_payload_int(payload, "chunkOverlap"),
  969. queuedTime=self._read_payload_datetime(payload, "queuedTime"),
  970. startedTime=self._read_payload_datetime(payload, "startedTime"),
  971. completedTime=self._read_payload_datetime(payload, "completedTime"))
  972. def _try_read_latest_index_job(self, *, document: KnowledgeDocument) -> KnowledgeIndexJobData | None:
  973. if not self._read_index_job_payload(document=document):
  974. return None
  975. return self._read_latest_index_job(document=document)
  976. def _read_index_job_payload(self, *, document: KnowledgeDocument) -> dict[str, JSONValue]:
  977. metadata = document.metadata_json or {}
  978. value = metadata.get("index_job")
  979. if isinstance(value, dict):
  980. return {str(item_key): item_value for item_key, item_value in value.items()}
  981. return {}
  982. def _read_payload_string(
  983. self,
  984. payload: dict[str, JSONValue],
  985. key: str) -> str | None:
  986. value = payload.get(key)
  987. return value if isinstance(value, str) and value else None
  988. def _read_payload_int(
  989. self,
  990. payload: dict[str, JSONValue],
  991. key: str,
  992. fallback: int) -> int:
  993. value = payload.get(key)
  994. if isinstance(value, int) and not isinstance(value, bool):
  995. return value
  996. return fallback
  997. def _read_optional_payload_int(
  998. self,
  999. payload: dict[str, JSONValue],
  1000. key: str) -> int | None:
  1001. value = payload.get(key)
  1002. if isinstance(value, int) and not isinstance(value, bool):
  1003. return value
  1004. return None
  1005. def _read_payload_datetime(
  1006. self,
  1007. payload: dict[str, JSONValue],
  1008. key: str) -> datetime | None:
  1009. value = payload.get(key)
  1010. if not isinstance(value, str) or not value:
  1011. return None
  1012. try:
  1013. return datetime.fromisoformat(value)
  1014. except ValueError:
  1015. return None
  1016. def _read_job_action(self, value: JSONValue) -> KnowledgeIndexJobAction:
  1017. if isinstance(value, str) and value in {"index", "reindex"}:
  1018. return cast(KnowledgeIndexJobAction, value)
  1019. return "reindex"
  1020. def _read_job_status(self, value: JSONValue) -> KnowledgeIndexJobStatus:
  1021. if isinstance(value, str) and value in {"queued", "running", "completed", "failed", "skipped"}:
  1022. return cast(KnowledgeIndexJobStatus, value)
  1023. return "queued"
  1024. def _mark_document_failed(
  1025. self,
  1026. *,
  1027. document: KnowledgeDocument,
  1028. message: str,
  1029. job_id: str | None = None,
  1030. action: str = "reindex",
  1031. worker_key: str | None = None,
  1032. chunk_size: int | None = None,
  1033. chunk_overlap: int | None = None) -> None:
  1034. metadata = dict(document.metadata_json or {})
  1035. metadata["last_error"] = {
  1036. "message": message[:1000],
  1037. "errorType": "indexing_failed",
  1038. }
  1039. if job_id is not None:
  1040. metadata = self._write_index_job_metadata(
  1041. document=document,
  1042. action=action,
  1043. job_id=job_id,
  1044. status="failed",
  1045. progress=100,
  1046. chunk_size=chunk_size,
  1047. chunk_overlap=chunk_overlap,
  1048. worker_key=worker_key,
  1049. completed_time=datetime.utcnow(),
  1050. error_message=message[:1000])
  1051. self.document_repository.update(
  1052. document_id=document.id,
  1053. status="failed",
  1054. metadata_json=metadata)
  1055. def _guess_content_type(self, *, source_type: str) -> str:
  1056. normalized = source_type.strip().lower().removeprefix(".")
  1057. if normalized in {"markdown", "md"}:
  1058. return "text/markdown; charset=utf-8"
  1059. if normalized in {"html", "htm"}:
  1060. return "text/html; charset=utf-8"
  1061. if normalized == "json":
  1062. return "application/json"
  1063. if normalized == "csv":
  1064. return "text/csv; charset=utf-8"
  1065. if normalized == "pdf":
  1066. return "application/pdf"
  1067. if normalized in {"docx", "word"}:
  1068. return "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
  1069. return "text/plain; charset=utf-8"
  1070. def _matches_filters(
  1071. self,
  1072. *,
  1073. document: KnowledgeDocument,
  1074. filters_json: dict[str, JSONValue]) -> bool:
  1075. source_type = filters_json.get("sourceType") or filters_json.get("source_type")
  1076. if isinstance(source_type, str) and document.source_type != source_type:
  1077. return False
  1078. status = filters_json.get("status")
  1079. if isinstance(status, str) and document.status != status:
  1080. return False
  1081. return True
  1082. def read_settings(self, *, knowledge_base_id: str | None = None) -> KnowledgeSettingsDto:
  1083. base_config: dict[str, JSONValue] = {}
  1084. if knowledge_base_id:
  1085. base = self.base_repository.get_by_id(knowledge_base_id=knowledge_base_id)
  1086. if base is not None and isinstance(base.metadata_json, dict):
  1087. value = base.metadata_json.get("retrieval_config")
  1088. if isinstance(value, dict):
  1089. base_config = value
  1090. defaults = KnowledgeSettingsDto(
  1091. knowledgeBaseId=knowledge_base_id,
  1092. chunkSize=self.settings.default_chunk_size,
  1093. chunkOverlap=self.settings.default_chunk_overlap,
  1094. keywordWeight=self.settings.retrieval_keyword_weight,
  1095. vectorWeight=self.settings.retrieval_vector_weight,
  1096. rerankWeight=self.settings.retrieval_rerank_weight,
  1097. queryRewrite=False,
  1098. requireCitations=True)
  1099. return KnowledgeSettingsDto.model_validate({
  1100. **defaults.model_dump(),
  1101. **base_config,
  1102. "knowledgeBaseId": knowledge_base_id,
  1103. })
  1104. def update_settings(
  1105. self,
  1106. payload: KnowledgeSettingsUpdateRequestDto) -> KnowledgeSettingsDto:
  1107. settings = KnowledgeSettingsDto.model_validate({
  1108. **payload.model_dump(),
  1109. "knowledgeBaseId": payload.knowledgeBaseId,
  1110. })
  1111. if payload.knowledgeBaseId:
  1112. base = self.base_repository.get_by_id(knowledge_base_id=payload.knowledgeBaseId)
  1113. if base is not None:
  1114. metadata = dict(base.metadata_json or {})
  1115. metadata["retrieval_config"] = settings.model_dump(exclude={"knowledgeBaseId"})
  1116. self.base_repository.update(
  1117. knowledge_base_id=payload.knowledgeBaseId,
  1118. metadata_json=metadata)
  1119. return settings
  1120. def _build_base_code(self, name: str) -> str:
  1121. base = "".join(
  1122. char.lower() if char.isalnum() else "_"
  1123. for char in name
  1124. ).strip("_") or "knowledge_base"
  1125. return base[:64]
  1126. def build_knowledge_application_service(
  1127. *,
  1128. db: Session,
  1129. settings: KnowledgeServiceSettings) -> KnowledgeApplicationService:
  1130. redis_client = try_build_redis_client(settings.redis_url)
  1131. return KnowledgeApplicationService(
  1132. settings=settings,
  1133. base_repository=KnowledgeBaseRepository(db),
  1134. document_repository=KnowledgeDocumentRepository(db),
  1135. chunk_repository=KnowledgeChunkRepository(db),
  1136. redis_client=redis_client,
  1137. task_queue_publisher=(
  1138. TaskQueuePublisher(client=redis_client) if redis_client is not None else None
  1139. ))