indexing_service.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809
  1. """Knowledge indexing sub-service — document creation, chunking, embedding, job queue."""
  2. from __future__ import annotations
  3. import base64
  4. import hashlib
  5. from dataclasses import dataclass
  6. from datetime import datetime, timedelta
  7. from typing import TYPE_CHECKING, cast
  8. from uuid import uuid4
  9. from core_shared import JSONValue
  10. from core_shared.task_queue import KNOWLEDGE_DOCUMENT_QUEUE, TaskQueuePublisher
  11. from app.application._storage_mixin import _ObjectStorageMixin
  12. from app.application.chunking import chunk_document
  13. from app.application.document_parsers import (
  14. normalize_source_type,
  15. parse_document_content,
  16. read_document_content_bytes)
  17. from app.application.embeddings import EmbeddingService
  18. from app.application.retrieval import stable_content_hash
  19. from app.bootstrap.settings import KnowledgeServiceSettings
  20. from app.db.models import KnowledgeChunk, KnowledgeDocument
  21. from app.infrastructure.object_storage import (
  22. ObjectStorageError,
  23. ObjectStorageNotFoundError,
  24. build_document_object_key)
  25. from app.schemas.knowledge import (
  26. KnowledgeBaseReindexRequestDto,
  27. KnowledgeDocumentCreateRequest,
  28. KnowledgeDocumentCreateRequestDto,
  29. KnowledgeDocumentReindexRequestDto,
  30. KnowledgeIndexJobAction,
  31. KnowledgeIndexJobData,
  32. KnowledgeIndexJobStatus)
  33. if TYPE_CHECKING:
  34. from redis import Redis
  35. from app.domain.repositories import (
  36. KnowledgeBaseRepository,
  37. KnowledgeChunkRepository,
  38. KnowledgeDocumentRepository)
  39. from app.infrastructure.object_storage import KnowledgeObjectStorage
  40. @dataclass(frozen=True, slots=True)
  41. class KnowledgeDocumentIngestResult:
  42. document: KnowledgeDocument
  43. chunks: list[KnowledgeChunk]
  44. queued: bool = False
  45. job: KnowledgeIndexJobData | None = None
  46. class KnowledgeIndexingError(RuntimeError):
  47. def __init__(self, *, document_id: str, message: str) -> None:
  48. super().__init__(message)
  49. self.document_id = document_id
  50. class KnowledgeIndexingService(_ObjectStorageMixin):
  51. def __init__(
  52. self,
  53. *,
  54. settings: KnowledgeServiceSettings,
  55. base_repository: KnowledgeBaseRepository,
  56. document_repository: KnowledgeDocumentRepository,
  57. chunk_repository: KnowledgeChunkRepository,
  58. embedding_service: EmbeddingService,
  59. object_storage: KnowledgeObjectStorage | None = None,
  60. redis_client: Redis | None = None,
  61. task_queue_publisher: TaskQueuePublisher | None = None,
  62. ) -> None:
  63. self.settings = settings
  64. self.base_repository = base_repository
  65. self.document_repository = document_repository
  66. self.chunk_repository = chunk_repository
  67. self.embedding_service = embedding_service
  68. self._object_storage = object_storage
  69. self.redis_client = redis_client
  70. self.task_queue_publisher = task_queue_publisher
  71. # ── Document creation (with indexing) ─────────────────────────────
  72. def create_document(
  73. self,
  74. payload: KnowledgeDocumentCreateRequest,
  75. ) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
  76. from app.application.document_parsers import ParsedDocument
  77. knowledge_base = self.base_repository.get_by_id(
  78. knowledge_base_id=payload.knowledge_base_id)
  79. if knowledge_base is None:
  80. raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
  81. parsed = parse_document_content(
  82. source_type=payload.source_type,
  83. source_uri=payload.source_uri,
  84. content_text=payload.content_text,
  85. content_base64=payload.content_base64)
  86. raw_content = read_document_content_bytes(
  87. content_text=payload.content_text,
  88. content_base64=payload.content_base64)
  89. object_key = build_document_object_key(
  90. knowledge_base_id=payload.knowledge_base_id,
  91. source_type=parsed.source_type,
  92. title=payload.title)
  93. stored_object = self.object_storage.put_bytes(
  94. object_key=object_key,
  95. content=raw_content,
  96. content_type=self._guess_content_type(source_type=parsed.source_type))
  97. document: KnowledgeDocument | None = None
  98. try:
  99. metadata_json = {
  100. **payload.metadata_json,
  101. "parser_metadata": parsed.metadata_json,
  102. "object_storage": stored_object.to_metadata(),
  103. }
  104. document = self.document_repository.create(
  105. knowledge_base_id=payload.knowledge_base_id,
  106. title=payload.title,
  107. source_type=parsed.source_type,
  108. source_uri=payload.source_uri,
  109. content_text="",
  110. content_hash=stable_content_hash(parsed.content_text),
  111. metadata_json=metadata_json)
  112. try:
  113. chunks = self._index_document(
  114. document=document,
  115. content_text=parsed.content_text,
  116. chunk_size=payload.chunk_size,
  117. chunk_overlap=payload.chunk_overlap)
  118. except Exception as exc:
  119. self._mark_document_failed(document=document, message=str(exc))
  120. raise KnowledgeIndexingError(
  121. document_id=document.id,
  122. message=f"knowledge document indexing failed: {exc}") from exc
  123. indexed_document = self.document_repository.update_status(
  124. document_id=document.id,
  125. status="indexed")
  126. return indexed_document or document, chunks
  127. except Exception:
  128. if document is None:
  129. try:
  130. self.object_storage.delete_object(object_key=stored_object.object_key)
  131. except ObjectStorageError:
  132. pass
  133. raise
  134. def create_document_from_contract(
  135. self,
  136. payload: KnowledgeDocumentCreateRequestDto,
  137. ) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
  138. return self.create_document(KnowledgeDocumentCreateRequest(
  139. knowledge_base_id=payload.knowledgeBaseId,
  140. title=payload.title,
  141. content_text=payload.contentText,
  142. content_base64=payload.contentBase64,
  143. source_type=payload.sourceType,
  144. source_uri=payload.sourceUri,
  145. metadata_json=payload.metadata,
  146. chunk_size=payload.chunkSize,
  147. chunk_overlap=payload.chunkOverlap))
  148. def create_document_from_contract_result(
  149. self,
  150. payload: KnowledgeDocumentCreateRequestDto,
  151. ) -> KnowledgeDocumentIngestResult:
  152. request = KnowledgeDocumentCreateRequest(
  153. knowledge_base_id=payload.knowledgeBaseId,
  154. title=payload.title,
  155. content_text=payload.contentText,
  156. content_base64=payload.contentBase64,
  157. source_type=payload.sourceType,
  158. source_uri=payload.sourceUri,
  159. metadata_json=payload.metadata,
  160. chunk_size=payload.chunkSize,
  161. chunk_overlap=payload.chunkOverlap)
  162. if self._should_queue_indexing(async_mode=payload.asyncMode):
  163. return self.create_document_index_job(payload=request)
  164. document, chunks = self.create_document(request)
  165. return KnowledgeDocumentIngestResult(document=document, chunks=chunks)
  166. def create_document_index_job(
  167. self,
  168. payload: KnowledgeDocumentCreateRequest,
  169. ) -> KnowledgeDocumentIngestResult:
  170. knowledge_base = self.base_repository.get_by_id(
  171. knowledge_base_id=payload.knowledge_base_id)
  172. if knowledge_base is None:
  173. raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
  174. raw_content = read_document_content_bytes(
  175. content_text=payload.content_text,
  176. content_base64=payload.content_base64)
  177. source_type = normalize_source_type(
  178. source_type=payload.source_type,
  179. source_uri=payload.source_uri)
  180. object_key = build_document_object_key(
  181. knowledge_base_id=payload.knowledge_base_id,
  182. source_type=source_type,
  183. title=payload.title)
  184. stored_object = self.object_storage.put_bytes(
  185. object_key=object_key,
  186. content=raw_content,
  187. content_type=self._guess_content_type(source_type=source_type))
  188. document: KnowledgeDocument | None = None
  189. try:
  190. document = self.document_repository.create(
  191. knowledge_base_id=payload.knowledge_base_id,
  192. title=payload.title,
  193. source_type=source_type,
  194. source_uri=payload.source_uri,
  195. content_text="",
  196. content_hash=hashlib.sha256(raw_content).hexdigest(),
  197. metadata_json={
  198. **payload.metadata_json,
  199. "object_storage": stored_object.to_metadata(),
  200. },
  201. status="draft")
  202. queued_document, job = self.queue_document_indexing(
  203. document=document,
  204. action="index",
  205. chunk_size=payload.chunk_size,
  206. chunk_overlap=payload.chunk_overlap)
  207. except Exception:
  208. if document is None:
  209. try:
  210. self.object_storage.delete_object(object_key=stored_object.object_key)
  211. except ObjectStorageError:
  212. pass
  213. raise
  214. return KnowledgeDocumentIngestResult(
  215. document=queued_document,
  216. chunks=[],
  217. queued=True,
  218. job=job)
  219. # ── Reindexing ────────────────────────────────────────────────────
  220. def reindex_document(
  221. self,
  222. payload: KnowledgeDocumentReindexRequestDto,
  223. ) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
  224. document = self.document_repository.get_by_id(document_id=payload.documentId)
  225. if document is None:
  226. return None
  227. try:
  228. parsed = self._parse_document_for_indexing(document=document)
  229. chunks = self._index_document(
  230. document=document,
  231. content_text=parsed.content_text,
  232. chunk_size=payload.chunkSize,
  233. chunk_overlap=payload.chunkOverlap)
  234. except Exception as exc:
  235. self._mark_document_failed(document=document, message=str(exc))
  236. raise KnowledgeIndexingError(
  237. document_id=document.id,
  238. message=f"knowledge document reindex failed: {exc}") from exc
  239. metadata = dict(document.metadata_json or {})
  240. metadata["parser_metadata"] = parsed.metadata_json
  241. indexed_document = self.document_repository.update(
  242. document_id=document.id,
  243. status="indexed",
  244. metadata_json=metadata)
  245. return indexed_document or document, chunks
  246. def reindex_document_from_contract_result(
  247. self,
  248. payload: KnowledgeDocumentReindexRequestDto,
  249. ) -> KnowledgeDocumentIngestResult | None:
  250. if self._should_queue_indexing(async_mode=payload.asyncMode):
  251. document = self.document_repository.get_by_id(document_id=payload.documentId)
  252. if document is None:
  253. return None
  254. queued_document, job = self.queue_document_indexing(
  255. document=document,
  256. action="reindex",
  257. chunk_size=payload.chunkSize,
  258. chunk_overlap=payload.chunkOverlap)
  259. return KnowledgeDocumentIngestResult(
  260. document=queued_document,
  261. chunks=[],
  262. queued=True,
  263. job=job)
  264. result = self.reindex_document(payload)
  265. if result is None:
  266. return None
  267. document, chunks = result
  268. return KnowledgeDocumentIngestResult(document=document, chunks=chunks)
  269. def reindex_base_from_contract(
  270. self,
  271. payload: KnowledgeBaseReindexRequestDto,
  272. ) -> list[KnowledgeIndexJobData]:
  273. documents = self.document_repository.list_by_base(
  274. knowledge_base_id=payload.knowledgeBaseId)
  275. jobs: list[KnowledgeIndexJobData] = []
  276. for document in documents:
  277. if document.status == "archived":
  278. continue
  279. queued_document, job = self.queue_document_indexing(
  280. document=document,
  281. action="reindex",
  282. chunk_size=payload.chunkSize,
  283. chunk_overlap=payload.chunkOverlap)
  284. jobs.append(job or self._read_latest_index_job(document=queued_document))
  285. return jobs
  286. # ── Job queue management ──────────────────────────────────────────
  287. def queue_document_indexing(
  288. self,
  289. *,
  290. document: KnowledgeDocument,
  291. action: str,
  292. chunk_size: int | None = None,
  293. chunk_overlap: int | None = None,
  294. ) -> tuple[KnowledgeDocument, KnowledgeIndexJobData]:
  295. job_id = f"kjob_{uuid4().hex}"
  296. metadata = self._write_index_job_metadata(
  297. document=document,
  298. action=action,
  299. job_id=job_id,
  300. status="queued",
  301. progress=0,
  302. chunk_size=chunk_size,
  303. chunk_overlap=chunk_overlap)
  304. updated_document = self.document_repository.update(
  305. document_id=document.id,
  306. status="queued",
  307. metadata_json=metadata)
  308. document_for_job = updated_document or document
  309. published = self._publish_document_index_job(
  310. document_id=document.id,
  311. action=action,
  312. job_id=job_id)
  313. if not published:
  314. metadata = self._write_index_job_metadata(
  315. document=document_for_job,
  316. action=action,
  317. job_id=job_id,
  318. status="running",
  319. progress=1,
  320. chunk_size=chunk_size,
  321. chunk_overlap=chunk_overlap,
  322. worker_key="inline-fallback")
  323. document_for_job = self.document_repository.update(
  324. document_id=document.id,
  325. status="indexing",
  326. metadata_json=metadata) or document_for_job
  327. processed = self.process_document_index_job(
  328. document_id=document.id,
  329. action=action,
  330. job_id=job_id,
  331. worker_key="inline-fallback",
  332. chunk_size=chunk_size,
  333. chunk_overlap=chunk_overlap)
  334. if processed is not None:
  335. indexed_document, chunks = processed
  336. return indexed_document, self._read_latest_index_job(document=indexed_document)
  337. return document_for_job, self._read_latest_index_job(document=document_for_job)
  338. def process_document_index_job(
  339. self,
  340. *,
  341. document_id: str,
  342. action: str,
  343. worker_key: str,
  344. job_id: str | None = None,
  345. chunk_size: int | None = None,
  346. chunk_overlap: int | None = None,
  347. ) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
  348. document = self.document_repository.get_by_id(document_id=document_id)
  349. if document is None:
  350. return None
  351. resolved_job = self._read_latest_index_job(document=document)
  352. resolved_job_id = job_id or resolved_job.jobId
  353. resolved_chunk_size = chunk_size if chunk_size is not None else resolved_job.chunkSize
  354. resolved_chunk_overlap = chunk_overlap if chunk_overlap is not None else resolved_job.chunkOverlap
  355. if document.status == "archived":
  356. metadata = self._write_index_job_metadata(
  357. document=document,
  358. action=action,
  359. job_id=resolved_job_id,
  360. status="skipped",
  361. progress=100,
  362. chunk_size=resolved_chunk_size,
  363. chunk_overlap=resolved_chunk_overlap,
  364. worker_key=worker_key,
  365. completed_time=datetime.utcnow(),
  366. error_message="document is archived")
  367. skipped_document = self.document_repository.update(
  368. document_id=document.id,
  369. metadata_json=metadata) or document
  370. return skipped_document, []
  371. metadata = self._write_index_job_metadata(
  372. document=document,
  373. action=action,
  374. job_id=resolved_job_id,
  375. status="running",
  376. progress=10,
  377. chunk_size=resolved_chunk_size,
  378. chunk_overlap=resolved_chunk_overlap,
  379. worker_key=worker_key,
  380. started_time=datetime.utcnow())
  381. running_document = self.document_repository.update(
  382. document_id=document.id,
  383. status="indexing",
  384. metadata_json=metadata) or document
  385. try:
  386. parsed = self._parse_document_for_indexing(document=running_document)
  387. metadata = self._write_index_job_metadata(
  388. document=running_document,
  389. action=action,
  390. job_id=resolved_job_id,
  391. status="running",
  392. progress=40,
  393. chunk_size=resolved_chunk_size,
  394. chunk_overlap=resolved_chunk_overlap,
  395. worker_key=worker_key,
  396. started_time=self._read_latest_index_job(document=running_document).startedTime)
  397. metadata["parser_metadata"] = parsed.metadata_json
  398. running_document = self.document_repository.update(
  399. document_id=document.id,
  400. status="indexing",
  401. metadata_json=metadata) or running_document
  402. chunks = self._index_document(
  403. document=running_document,
  404. content_text=parsed.content_text,
  405. chunk_size=resolved_chunk_size,
  406. chunk_overlap=resolved_chunk_overlap)
  407. except Exception as exc:
  408. self._mark_document_failed(
  409. document=running_document,
  410. message=str(exc),
  411. job_id=resolved_job_id,
  412. action=action,
  413. worker_key=worker_key,
  414. chunk_size=resolved_chunk_size,
  415. chunk_overlap=resolved_chunk_overlap)
  416. raise KnowledgeIndexingError(
  417. document_id=document.id,
  418. message=f"knowledge document {action} failed: {exc}") from exc
  419. metadata = self._write_index_job_metadata(
  420. document=running_document,
  421. action=action,
  422. job_id=resolved_job_id,
  423. status="completed",
  424. progress=100,
  425. chunk_size=resolved_chunk_size,
  426. chunk_overlap=resolved_chunk_overlap,
  427. worker_key=worker_key,
  428. completed_time=datetime.utcnow())
  429. metadata["parser_metadata"] = parsed.metadata_json
  430. indexed_document = self.document_repository.update(
  431. document_id=document.id,
  432. status="indexed",
  433. metadata_json=metadata)
  434. return indexed_document or running_document, chunks
  435. def execute_document_index_job(
  436. self,
  437. *,
  438. document_id: str,
  439. action: str,
  440. worker_key: str,
  441. lease_seconds: int,
  442. job_id: str | None = None,
  443. redis_client: Redis | None = None,
  444. ) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
  445. resolved_redis_client = redis_client or self.redis_client
  446. idempotency_key = f"{document_id}:{job_id or action}"
  447. lock = None
  448. idempotency_store = None
  449. if resolved_redis_client is not None:
  450. from core_shared.redis_primitives import DistributedLock, IdempotencyStore
  451. lock = DistributedLock(
  452. client=resolved_redis_client,
  453. name=f"knowledge-document:{document_id}:lock",
  454. ttl_seconds=lease_seconds)
  455. if not lock.acquire():
  456. return None
  457. idempotency_store = IdempotencyStore(
  458. client=resolved_redis_client,
  459. prefix="knowledge-document-idempotency")
  460. if not idempotency_store.begin(key=idempotency_key):
  461. lock.release()
  462. return None
  463. try:
  464. result = self.process_document_index_job(
  465. document_id=document_id,
  466. action=action,
  467. job_id=job_id,
  468. worker_key=worker_key)
  469. if idempotency_store is not None and result is not None:
  470. document, chunks = result
  471. idempotency_store.complete(
  472. key=idempotency_key,
  473. result={
  474. "status": document.status,
  475. "document_id": document.id,
  476. "chunk_count": len(chunks),
  477. })
  478. except Exception:
  479. if idempotency_store is not None:
  480. idempotency_store.clear(key=idempotency_key)
  481. raise
  482. finally:
  483. if lock is not None:
  484. lock.release()
  485. return result
  486. def execute_next_pending_document_job(
  487. self,
  488. *,
  489. worker_key: str,
  490. lease_seconds: int,
  491. stale_indexing_seconds: int,
  492. redis_client: Redis | None = None,
  493. ) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
  494. stale_before = datetime.utcnow() - timedelta(seconds=stale_indexing_seconds)
  495. document = self.document_repository.get_next_pending_indexing(stale_before=stale_before)
  496. if document is None:
  497. return None
  498. job = self._read_latest_index_job(document=document)
  499. return self.execute_document_index_job(
  500. document_id=document.id,
  501. action=job.action,
  502. job_id=job.jobId,
  503. worker_key=worker_key,
  504. lease_seconds=lease_seconds,
  505. redis_client=redis_client)
  506. def list_index_jobs(
  507. self,
  508. *,
  509. knowledge_base_id: str | None = None,
  510. document_id: str | None = None,
  511. status: str | None = None,
  512. ) -> list[KnowledgeIndexJobData]:
  513. documents = self.document_repository.list_filtered(
  514. knowledge_base_id=knowledge_base_id)
  515. jobs: list[KnowledgeIndexJobData] = []
  516. for document in documents:
  517. if document_id is not None and document.id != document_id:
  518. continue
  519. job = self._try_read_latest_index_job(document=document)
  520. if job is None:
  521. continue
  522. if status is not None and job.status != status:
  523. continue
  524. jobs.append(job)
  525. jobs.sort(
  526. key=lambda item: item.queuedTime or datetime.min,
  527. reverse=True)
  528. return jobs
  529. def detail_index_job(self, *, document_id: str) -> KnowledgeIndexJobData | None:
  530. document = self.document_repository.get_by_id(document_id=document_id)
  531. if document is None:
  532. return None
  533. return self._try_read_latest_index_job(document=document)
  534. # ── Core indexing logic ───────────────────────────────────────────
  535. def _index_document(
  536. self,
  537. *,
  538. document: KnowledgeDocument,
  539. content_text: str,
  540. chunk_size: int | None,
  541. chunk_overlap: int | None,
  542. ) -> list[KnowledgeChunk]:
  543. source_type = document.source_type or "text"
  544. resolved_size = chunk_size or self.settings.default_chunk_size
  545. resolved_overlap = chunk_overlap or self.settings.default_chunk_overlap
  546. chunk_payloads = chunk_document(
  547. content_text=content_text,
  548. source_type=source_type,
  549. chunk_size=resolved_size,
  550. chunk_overlap=resolved_overlap)
  551. for chunk_payload in chunk_payloads:
  552. text = chunk_payload.get("content_text")
  553. content = text if isinstance(text, str) else ""
  554. embedding_result = self.embedding_service.embed_text(content)
  555. chunk_payload["embedding_model"] = embedding_result.model
  556. chunk_payload["embedding_json"] = embedding_result.embedding
  557. chunk_payload["metadata_json"] = {
  558. "embedding_provider": embedding_result.provider,
  559. }
  560. return self.chunk_repository.replace_document_chunks(
  561. knowledge_base_id=document.knowledge_base_id,
  562. document_id=document.id,
  563. chunks=chunk_payloads)
  564. def _parse_document_for_indexing(self, *, document: KnowledgeDocument):
  565. metadata = document.metadata_json or {}
  566. object_metadata = metadata.get("object_storage")
  567. if isinstance(object_metadata, dict):
  568. object_key = object_metadata.get("objectKey")
  569. if isinstance(object_key, str) and object_key:
  570. try:
  571. raw_content = self.object_storage.get_bytes(object_key=object_key)
  572. except ObjectStorageNotFoundError as exc:
  573. raise ValueError(f"knowledge document content object not found: {document.id}") from exc
  574. return parse_document_content(
  575. source_type=document.source_type,
  576. source_uri=document.source_uri,
  577. content_base64=base64.b64encode(raw_content).decode("ascii"))
  578. if document.content_text:
  579. return parse_document_content(
  580. source_type=document.source_type,
  581. source_uri=document.source_uri,
  582. content_text=document.content_text)
  583. raise ValueError(f"knowledge document content object not found: {document.id}")
  584. # ── Queue helpers ─────────────────────────────────────────────────
  585. def _should_queue_indexing(self, *, async_mode: bool | None) -> bool:
  586. if async_mode is False:
  587. return False
  588. if not self.settings.async_indexing_enabled and async_mode is None:
  589. return False
  590. return self.task_queue_publisher is not None
  591. def _publish_document_index_job(
  592. self,
  593. *,
  594. document_id: str,
  595. action: str,
  596. job_id: str,
  597. ) -> bool:
  598. if self.task_queue_publisher is None:
  599. return False
  600. return self.task_queue_publisher.publish_knowledge_document(
  601. document_id=document_id,
  602. action=action,
  603. job_id=job_id)
  604. # ── Job metadata ──────────────────────────────────────────────────
  605. def _write_index_job_metadata(
  606. self,
  607. *,
  608. document: KnowledgeDocument,
  609. action: str,
  610. job_id: str,
  611. status: str,
  612. progress: int,
  613. chunk_size: int | None = None,
  614. chunk_overlap: int | None = None,
  615. worker_key: str | None = None,
  616. started_time: datetime | None = None,
  617. completed_time: datetime | None = None,
  618. error_message: str | None = None,
  619. ) -> dict[str, JSONValue]:
  620. metadata = dict(document.metadata_json or {})
  621. existing_job = self._read_index_job_payload(document=document)
  622. queued_time = existing_job.get("queuedTime")
  623. if not isinstance(queued_time, str):
  624. queued_time = datetime.utcnow().isoformat()
  625. resolved_started_time = (
  626. started_time.isoformat()
  627. if started_time is not None
  628. else existing_job.get("startedTime")
  629. )
  630. resolved_completed_time = (
  631. completed_time.isoformat()
  632. if completed_time is not None
  633. else existing_job.get("completedTime")
  634. )
  635. job_payload: dict[str, JSONValue] = {
  636. "jobId": job_id,
  637. "documentId": document.id,
  638. "knowledgeBaseId": document.knowledge_base_id,
  639. "documentTitle": document.title,
  640. "action": action if action in {"index", "reindex"} else "reindex",
  641. "status": status,
  642. "progress": max(0, min(progress, 100)),
  643. "queueName": KNOWLEDGE_DOCUMENT_QUEUE,
  644. "workerKey": worker_key,
  645. "errorMessage": error_message,
  646. "chunkSize": chunk_size,
  647. "chunkOverlap": chunk_overlap,
  648. "queuedTime": queued_time,
  649. "startedTime": resolved_started_time if isinstance(resolved_started_time, str) else None,
  650. "completedTime": resolved_completed_time if isinstance(resolved_completed_time, str) else None,
  651. }
  652. metadata["index_job"] = job_payload
  653. return metadata
  654. def _read_latest_index_job(self, *, document: KnowledgeDocument) -> KnowledgeIndexJobData:
  655. payload = self._read_index_job_payload(document=document)
  656. return KnowledgeIndexJobData(
  657. jobId=self._read_payload_string(payload, "jobId") or f"kjob_{document.id}",
  658. documentId=self._read_payload_string(payload, "documentId") or document.id,
  659. knowledgeBaseId=self._read_payload_string(payload, "knowledgeBaseId") or document.knowledge_base_id,
  660. documentTitle=self._read_payload_string(payload, "documentTitle") or document.title,
  661. action=self._read_job_action(payload.get("action")),
  662. status=self._read_job_status(payload.get("status")),
  663. progress=self._read_payload_int(payload, "progress", 0),
  664. queueName=self._read_payload_string(payload, "queueName"),
  665. workerKey=self._read_payload_string(payload, "workerKey"),
  666. errorMessage=self._read_payload_string(payload, "errorMessage"),
  667. chunkSize=self._read_optional_payload_int(payload, "chunkSize"),
  668. chunkOverlap=self._read_optional_payload_int(payload, "chunkOverlap"),
  669. queuedTime=self._read_payload_datetime(payload, "queuedTime"),
  670. startedTime=self._read_payload_datetime(payload, "startedTime"),
  671. completedTime=self._read_payload_datetime(payload, "completedTime"))
  672. def _try_read_latest_index_job(self, *, document: KnowledgeDocument) -> KnowledgeIndexJobData | None:
  673. if not self._read_index_job_payload(document=document):
  674. return None
  675. return self._read_latest_index_job(document=document)
  676. def _read_index_job_payload(self, *, document: KnowledgeDocument) -> dict[str, JSONValue]:
  677. metadata = document.metadata_json or {}
  678. value = metadata.get("index_job")
  679. if isinstance(value, dict):
  680. return {str(k): v for k, v in value.items()}
  681. return {}
  682. def _read_payload_string(self, payload: dict[str, JSONValue], key: str) -> str | None:
  683. value = payload.get(key)
  684. return value if isinstance(value, str) and value else None
  685. def _read_payload_int(
  686. self,
  687. payload: dict[str, JSONValue],
  688. key: str,
  689. fallback: int,
  690. ) -> int:
  691. value = payload.get(key)
  692. if isinstance(value, int) and not isinstance(value, bool):
  693. return value
  694. return fallback
  695. def _read_optional_payload_int(
  696. self,
  697. payload: dict[str, JSONValue],
  698. key: str,
  699. ) -> int | None:
  700. value = payload.get(key)
  701. if isinstance(value, int) and not isinstance(value, bool):
  702. return value
  703. return None
  704. def _read_payload_datetime(
  705. self,
  706. payload: dict[str, JSONValue],
  707. key: str,
  708. ) -> datetime | None:
  709. value = payload.get(key)
  710. if not isinstance(value, str) or not value:
  711. return None
  712. try:
  713. return datetime.fromisoformat(value)
  714. except ValueError:
  715. return None
  716. def _read_job_action(self, value: JSONValue) -> KnowledgeIndexJobAction:
  717. if isinstance(value, str) and value in {"index", "reindex"}:
  718. return cast(KnowledgeIndexJobAction, value)
  719. return "reindex"
  720. def _read_job_status(self, value: JSONValue) -> KnowledgeIndexJobStatus:
  721. if isinstance(value, str) and value in {"queued", "running", "completed", "failed", "skipped"}:
  722. return cast(KnowledgeIndexJobStatus, value)
  723. return "queued"
  724. # ── Error handling ────────────────────────────────────────────────
  725. def _mark_document_failed(
  726. self,
  727. *,
  728. document: KnowledgeDocument,
  729. message: str,
  730. job_id: str | None = None,
  731. action: str = "reindex",
  732. worker_key: str | None = None,
  733. chunk_size: int | None = None,
  734. chunk_overlap: int | None = None,
  735. ) -> None:
  736. metadata = dict(document.metadata_json or {})
  737. metadata["last_error"] = {
  738. "message": message[:1000],
  739. "errorType": "indexing_failed",
  740. }
  741. if job_id is not None:
  742. metadata = self._write_index_job_metadata(
  743. document=document,
  744. action=action,
  745. job_id=job_id,
  746. status="failed",
  747. progress=100,
  748. chunk_size=chunk_size,
  749. chunk_overlap=chunk_overlap,
  750. worker_key=worker_key,
  751. completed_time=datetime.utcnow(),
  752. error_message=message[:1000])
  753. self.document_repository.update(
  754. document_id=document.id,
  755. status="failed",
  756. metadata_json=metadata)