| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237 |
- from __future__ import annotations
- import base64
- import hashlib
- from dataclasses import dataclass
- from datetime import datetime, timedelta
- from typing import TYPE_CHECKING, cast
- from uuid import uuid4
- from sqlalchemy.orm import Session
- from core_shared import JSONValue, try_build_redis_client
- from core_shared.task_queue import KNOWLEDGE_DOCUMENT_QUEUE, TaskQueuePublisher
- from app.application.document_parsers import (
- DocumentParseError,
- ParsedDocument,
- normalize_source_type,
- parse_document_content,
- read_document_content_bytes)
- from app.application.chunking import chunk_document
- from app.application.embeddings import EmbeddingService
- from app.application.retrieval import (
- bm25_score,
- build_chunk_payloads,
- compute_bm25_stats,
- cosine_similarity,
- keyword_score,
- rerank_score,
- stable_content_hash)
- from app.bootstrap.settings import KnowledgeServiceSettings
- from app.db.models import KnowledgeBase, KnowledgeChunk, KnowledgeDocument
- from app.domain.repositories import (
- KnowledgeBaseRepository,
- KnowledgeChunkRepository,
- KnowledgeDocumentRepository)
- from app.infrastructure.object_storage import (
- KnowledgeObjectStorage,
- ObjectStorageError,
- ObjectStorageNotFoundError,
- ObjectStorageStatus,
- build_document_object_key,
- build_object_storage)
- from app.schemas.knowledge import (
- KnowledgeBaseCreateRequest,
- KnowledgeBaseCreateRequestDto,
- KnowledgeBaseStatusUpdateRequest,
- KnowledgeBaseUpdateRequestDto,
- KnowledgeBaseReindexRequestDto,
- KnowledgeDocumentCreateRequest,
- KnowledgeDocumentCreateRequestDto,
- KnowledgeIndexJobAction,
- KnowledgeIndexJobData,
- KnowledgeIndexJobStatus,
- KnowledgeDocumentParseRequest,
- KnowledgeDocumentReindexRequestDto,
- KnowledgeDocumentUpdateRequestDto,
- KnowledgeSettingsDto,
- KnowledgeSettingsUpdateRequestDto,
- KnowledgeSearchRequest)
- if TYPE_CHECKING:
- from redis import Redis
- @dataclass(frozen=True, slots=True)
- class KnowledgeDocumentIngestResult:
- document: KnowledgeDocument
- chunks: list[KnowledgeChunk]
- queued: bool = False
- job: KnowledgeIndexJobData | None = None
- class KnowledgeIndexingError(RuntimeError):
- def __init__(self, *, document_id: str, message: str) -> None:
- super().__init__(message)
- self.document_id = document_id
- class KnowledgeApplicationService:
- def __init__(
- self,
- *,
- settings: KnowledgeServiceSettings,
- base_repository: KnowledgeBaseRepository,
- document_repository: KnowledgeDocumentRepository,
- chunk_repository: KnowledgeChunkRepository,
- object_storage: KnowledgeObjectStorage | None = None,
- redis_client: Redis | None = None,
- task_queue_publisher: TaskQueuePublisher | None = None) -> None:
- self.settings = settings
- self.base_repository = base_repository
- self.document_repository = document_repository
- self.chunk_repository = chunk_repository
- self.embedding_service = EmbeddingService(settings=settings)
- self._object_storage = object_storage
- self.redis_client = redis_client
- self.task_queue_publisher = task_queue_publisher
- @property
- def object_storage(self) -> KnowledgeObjectStorage:
- if self._object_storage is None:
- self._object_storage = build_object_storage(self.settings)
- return self._object_storage
- def create_base(self, payload: KnowledgeBaseCreateRequest) -> KnowledgeBase:
- return self.base_repository.create(
- code=payload.code,
- name=payload.name,
- description=payload.description,
- metadata_json=payload.metadata_json)
- def create_base_from_contract(
- self,
- payload: KnowledgeBaseCreateRequestDto) -> KnowledgeBase:
- return self.create_base(
- KnowledgeBaseCreateRequest(
- code=self._build_base_code(payload.name),
- name=payload.name,
- description=payload.description,
- metadata_json=payload.metadata))
- def list_bases(self) -> list[KnowledgeBase]:
- return self.base_repository.list_all()
- def list_bases_filtered(
- self,
- *,
- keyword: str | None = None,
- status: str | None = None) -> list[KnowledgeBase]:
- return self.base_repository.list_filtered(
- keyword=keyword,
- status=status)
- def update_base_from_contract(
- self,
- payload: KnowledgeBaseUpdateRequestDto) -> KnowledgeBase | None:
- return self.base_repository.update(
- knowledge_base_id=payload.knowledgeBaseId,
- name=payload.name,
- description=payload.description,
- status=payload.status,
- metadata_json=payload.metadata)
- def delete_base(self, *, knowledge_base_id: str) -> bool:
- documents = self.document_repository.list_by_base(
- knowledge_base_id=knowledge_base_id)
- for document in documents:
- self._delete_document_object(document=document)
- self.chunk_repository.delete_by_base(knowledge_base_id=knowledge_base_id)
- for document in documents:
- self.document_repository.delete(document_id=document.id)
- return self.base_repository.delete(knowledge_base_id=knowledge_base_id)
- def update_base_status(
- self,
- *,
- knowledge_base_id: str,
- payload: KnowledgeBaseStatusUpdateRequest) -> KnowledgeBase | None:
- return self.base_repository.update_status(
- knowledge_base_id=knowledge_base_id,
- status=payload.status)
- def create_document(
- self,
- payload: KnowledgeDocumentCreateRequest) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
- knowledge_base = self.base_repository.get_by_id(
- knowledge_base_id=payload.knowledge_base_id)
- if knowledge_base is None:
- raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
- parsed = self.parse_document(
- KnowledgeDocumentParseRequest(
- source_type=payload.source_type,
- source_uri=payload.source_uri,
- content_text=payload.content_text,
- content_base64=payload.content_base64)
- )
- raw_content = read_document_content_bytes(
- content_text=payload.content_text,
- content_base64=payload.content_base64)
- object_key = build_document_object_key(
- knowledge_base_id=payload.knowledge_base_id,
- source_type=parsed.source_type,
- title=payload.title)
- stored_object = self.object_storage.put_bytes(
- object_key=object_key,
- content=raw_content,
- content_type=self._guess_content_type(source_type=parsed.source_type))
- document: KnowledgeDocument | None = None
- try:
- metadata_json = {
- **payload.metadata_json,
- "parser_metadata": parsed.metadata_json,
- "object_storage": stored_object.to_metadata(),
- }
- document = self.document_repository.create(
- knowledge_base_id=payload.knowledge_base_id,
- title=payload.title,
- source_type=parsed.source_type,
- source_uri=payload.source_uri,
- content_text="",
- content_hash=stable_content_hash(parsed.content_text),
- metadata_json=metadata_json)
- try:
- chunks = self._index_document(
- document=document,
- content_text=parsed.content_text,
- chunk_size=payload.chunk_size,
- chunk_overlap=payload.chunk_overlap)
- except Exception as exc:
- self._mark_document_failed(document=document, message=str(exc))
- raise KnowledgeIndexingError(
- document_id=document.id,
- message=f"knowledge document indexing failed: {exc}") from exc
- indexed_document = self.document_repository.update_status(
- document_id=document.id,
- status="indexed")
- return indexed_document or document, chunks
- except Exception:
- if document is None:
- try:
- self.object_storage.delete_object(object_key=stored_object.object_key)
- except ObjectStorageError:
- pass
- raise
- def create_document_from_contract(
- self,
- payload: KnowledgeDocumentCreateRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]]:
- return self.create_document(
- KnowledgeDocumentCreateRequest(
- knowledge_base_id=payload.knowledgeBaseId,
- title=payload.title,
- content_text=payload.contentText,
- content_base64=payload.contentBase64,
- source_type=payload.sourceType,
- source_uri=payload.sourceUri,
- metadata_json=payload.metadata,
- chunk_size=payload.chunkSize,
- chunk_overlap=payload.chunkOverlap))
- def create_document_from_contract_result(
- self,
- payload: KnowledgeDocumentCreateRequestDto) -> KnowledgeDocumentIngestResult:
- request = KnowledgeDocumentCreateRequest(
- knowledge_base_id=payload.knowledgeBaseId,
- title=payload.title,
- content_text=payload.contentText,
- content_base64=payload.contentBase64,
- source_type=payload.sourceType,
- source_uri=payload.sourceUri,
- metadata_json=payload.metadata,
- chunk_size=payload.chunkSize,
- chunk_overlap=payload.chunkOverlap)
- if self._should_queue_indexing(async_mode=payload.asyncMode):
- return self.create_document_index_job(payload=request)
- document, chunks = self.create_document(request)
- return KnowledgeDocumentIngestResult(
- document=document,
- chunks=chunks)
- def create_document_index_job(
- self,
- payload: KnowledgeDocumentCreateRequest) -> KnowledgeDocumentIngestResult:
- knowledge_base = self.base_repository.get_by_id(
- knowledge_base_id=payload.knowledge_base_id)
- if knowledge_base is None:
- raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
- raw_content = read_document_content_bytes(
- content_text=payload.content_text,
- content_base64=payload.content_base64)
- source_type = normalize_source_type(
- source_type=payload.source_type,
- source_uri=payload.source_uri)
- object_key = build_document_object_key(
- knowledge_base_id=payload.knowledge_base_id,
- source_type=source_type,
- title=payload.title)
- stored_object = self.object_storage.put_bytes(
- object_key=object_key,
- content=raw_content,
- content_type=self._guess_content_type(source_type=source_type))
- document: KnowledgeDocument | None = None
- try:
- document = self.document_repository.create(
- knowledge_base_id=payload.knowledge_base_id,
- title=payload.title,
- source_type=source_type,
- source_uri=payload.source_uri,
- content_text="",
- content_hash=hashlib.sha256(raw_content).hexdigest(),
- metadata_json={
- **payload.metadata_json,
- "object_storage": stored_object.to_metadata(),
- },
- status="draft")
- queued_document, job = self.queue_document_indexing(
- document=document,
- action="index",
- chunk_size=payload.chunk_size,
- chunk_overlap=payload.chunk_overlap)
- except Exception:
- if document is None:
- try:
- self.object_storage.delete_object(object_key=stored_object.object_key)
- except ObjectStorageError:
- pass
- raise
- return KnowledgeDocumentIngestResult(
- document=queued_document,
- chunks=[],
- queued=True,
- job=job)
- def parse_document(self, payload: KnowledgeDocumentParseRequest) -> ParsedDocument:
- try:
- return parse_document_content(
- source_type=payload.source_type,
- content_text=payload.content_text,
- content_base64=payload.content_base64,
- source_uri=payload.source_uri)
- except DocumentParseError:
- raise
- def queue_document_indexing(
- self,
- *,
- document: KnowledgeDocument,
- action: str,
- chunk_size: int | None = None,
- chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, KnowledgeIndexJobData]:
- job_id = f"kjob_{uuid4().hex}"
- metadata = self._write_index_job_metadata(
- document=document,
- action=action,
- job_id=job_id,
- status="queued",
- progress=0,
- chunk_size=chunk_size,
- chunk_overlap=chunk_overlap)
- updated_document = self.document_repository.update(
- document_id=document.id,
- status="queued",
- metadata_json=metadata)
- document_for_job = updated_document or document
- published = self._publish_document_index_job(
- document_id=document.id,
- action=action,
- job_id=job_id)
- if not published:
- metadata = self._write_index_job_metadata(
- document=document_for_job,
- action=action,
- job_id=job_id,
- status="running",
- progress=1,
- chunk_size=chunk_size,
- chunk_overlap=chunk_overlap,
- worker_key="inline-fallback")
- document_for_job = self.document_repository.update(
- document_id=document.id,
- status="indexing",
- metadata_json=metadata) or document_for_job
- processed = self.process_document_index_job(
- document_id=document.id,
- action=action,
- job_id=job_id,
- worker_key="inline-fallback",
- chunk_size=chunk_size,
- chunk_overlap=chunk_overlap)
- if processed is not None:
- indexed_document, chunks = processed
- return indexed_document, self._read_latest_index_job(document=indexed_document)
- return document_for_job, self._read_latest_index_job(document=document_for_job)
- def list_documents(
- self,
- *,
- knowledge_base_id: str) -> list[KnowledgeDocument]:
- return self.document_repository.list_by_base(
- knowledge_base_id=knowledge_base_id)
- def list_documents_filtered(
- self,
- *,
- knowledge_base_id: str | None = None,
- keyword: str | None = None,
- status: str | None = None,
- source_type: str | None = None) -> list[KnowledgeDocument]:
- return self.document_repository.list_filtered(
- knowledge_base_id=knowledge_base_id,
- keyword=keyword,
- status=status,
- source_type=source_type)
- def update_document_from_contract(
- self,
- payload: KnowledgeDocumentUpdateRequestDto) -> KnowledgeDocument | None:
- return self.document_repository.update(
- document_id=payload.documentId,
- title=payload.title,
- source_uri=payload.sourceUri,
- status=payload.status,
- metadata_json=payload.metadata)
- def reindex_document(
- self,
- payload: KnowledgeDocumentReindexRequestDto) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
- document = self.document_repository.get_by_id(document_id=payload.documentId)
- if document is None:
- return None
- try:
- parsed = self._parse_document_for_indexing(document=document)
- chunks = self._index_document(
- document=document,
- content_text=parsed.content_text,
- chunk_size=payload.chunkSize,
- chunk_overlap=payload.chunkOverlap)
- except Exception as exc:
- self._mark_document_failed(document=document, message=str(exc))
- raise KnowledgeIndexingError(
- document_id=document.id,
- message=f"knowledge document reindex failed: {exc}") from exc
- metadata = dict(document.metadata_json or {})
- metadata["parser_metadata"] = parsed.metadata_json
- indexed_document = self.document_repository.update(
- document_id=document.id,
- status="indexed",
- metadata_json=metadata)
- return indexed_document or document, chunks
- def reindex_document_from_contract_result(
- self,
- payload: KnowledgeDocumentReindexRequestDto) -> KnowledgeDocumentIngestResult | None:
- if self._should_queue_indexing(async_mode=payload.asyncMode):
- document = self.document_repository.get_by_id(document_id=payload.documentId)
- if document is None:
- return None
- queued_document, job = self.queue_document_indexing(
- document=document,
- action="reindex",
- chunk_size=payload.chunkSize,
- chunk_overlap=payload.chunkOverlap)
- return KnowledgeDocumentIngestResult(
- document=queued_document,
- chunks=[],
- queued=True,
- job=job)
- result = self.reindex_document(payload)
- if result is None:
- return None
- document, chunks = result
- return KnowledgeDocumentIngestResult(
- document=document,
- chunks=chunks)
- def reindex_base_from_contract(
- self,
- payload: KnowledgeBaseReindexRequestDto) -> list[KnowledgeIndexJobData]:
- documents = self.document_repository.list_by_base(
- knowledge_base_id=payload.knowledgeBaseId)
- jobs: list[KnowledgeIndexJobData] = []
- for document in documents:
- if document.status == "archived":
- continue
- queued_document, job = self.queue_document_indexing(
- document=document,
- action="reindex",
- chunk_size=payload.chunkSize,
- chunk_overlap=payload.chunkOverlap)
- jobs.append(job or self._read_latest_index_job(document=queued_document))
- return jobs
- def process_document_index_job(
- self,
- *,
- document_id: str,
- action: str,
- worker_key: str,
- job_id: str | None = None,
- chunk_size: int | None = None,
- chunk_overlap: int | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
- document = self.document_repository.get_by_id(document_id=document_id)
- if document is None:
- return None
- resolved_job = self._read_latest_index_job(document=document)
- resolved_job_id = job_id or resolved_job.jobId
- resolved_chunk_size = chunk_size if chunk_size is not None else resolved_job.chunkSize
- resolved_chunk_overlap = chunk_overlap if chunk_overlap is not None else resolved_job.chunkOverlap
- if document.status == "archived":
- metadata = self._write_index_job_metadata(
- document=document,
- action=action,
- job_id=resolved_job_id,
- status="skipped",
- progress=100,
- chunk_size=resolved_chunk_size,
- chunk_overlap=resolved_chunk_overlap,
- worker_key=worker_key,
- completed_time=datetime.utcnow(),
- error_message="document is archived")
- skipped_document = self.document_repository.update(
- document_id=document.id,
- metadata_json=metadata) or document
- return skipped_document, []
- metadata = self._write_index_job_metadata(
- document=document,
- action=action,
- job_id=resolved_job_id,
- status="running",
- progress=10,
- chunk_size=resolved_chunk_size,
- chunk_overlap=resolved_chunk_overlap,
- worker_key=worker_key,
- started_time=datetime.utcnow())
- running_document = self.document_repository.update(
- document_id=document.id,
- status="indexing",
- metadata_json=metadata) or document
- try:
- parsed = self._parse_document_for_indexing(document=running_document)
- metadata = self._write_index_job_metadata(
- document=running_document,
- action=action,
- job_id=resolved_job_id,
- status="running",
- progress=40,
- chunk_size=resolved_chunk_size,
- chunk_overlap=resolved_chunk_overlap,
- worker_key=worker_key,
- started_time=self._read_latest_index_job(document=running_document).startedTime)
- metadata["parser_metadata"] = parsed.metadata_json
- running_document = self.document_repository.update(
- document_id=document.id,
- status="indexing",
- metadata_json=metadata) or running_document
- chunks = self._index_document(
- document=running_document,
- content_text=parsed.content_text,
- chunk_size=resolved_chunk_size,
- chunk_overlap=resolved_chunk_overlap)
- except Exception as exc:
- self._mark_document_failed(
- document=running_document,
- message=str(exc),
- job_id=resolved_job_id,
- action=action,
- worker_key=worker_key,
- chunk_size=resolved_chunk_size,
- chunk_overlap=resolved_chunk_overlap)
- raise KnowledgeIndexingError(
- document_id=document.id,
- message=f"knowledge document {action} failed: {exc}") from exc
- metadata = self._write_index_job_metadata(
- document=running_document,
- action=action,
- job_id=resolved_job_id,
- status="completed",
- progress=100,
- chunk_size=resolved_chunk_size,
- chunk_overlap=resolved_chunk_overlap,
- worker_key=worker_key,
- completed_time=datetime.utcnow())
- metadata["parser_metadata"] = parsed.metadata_json
- indexed_document = self.document_repository.update(
- document_id=document.id,
- status="indexed",
- metadata_json=metadata)
- return indexed_document or running_document, chunks
- def execute_document_index_job(
- self,
- *,
- document_id: str,
- action: str,
- worker_key: str,
- lease_seconds: int,
- job_id: str | None = None,
- redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
- resolved_redis_client = redis_client or self.redis_client
- idempotency_key = f"{document_id}:{job_id or action}"
- lock = None
- idempotency_store = None
- if resolved_redis_client is not None:
- from core_shared.redis_primitives import DistributedLock, IdempotencyStore
- lock = DistributedLock(
- client=resolved_redis_client,
- name=f"knowledge-document:{document_id}:lock",
- ttl_seconds=lease_seconds)
- if not lock.acquire():
- return None
- idempotency_store = IdempotencyStore(
- client=resolved_redis_client,
- prefix="knowledge-document-idempotency")
- if not idempotency_store.begin(key=idempotency_key):
- lock.release()
- return None
- try:
- result = self.process_document_index_job(
- document_id=document_id,
- action=action,
- job_id=job_id,
- worker_key=worker_key)
- if idempotency_store is not None and result is not None:
- document, chunks = result
- idempotency_store.complete(
- key=idempotency_key,
- result={
- "status": document.status,
- "document_id": document.id,
- "chunk_count": len(chunks),
- })
- except Exception:
- if idempotency_store is not None:
- idempotency_store.clear(key=idempotency_key)
- raise
- finally:
- if lock is not None:
- lock.release()
- return result
- def execute_next_pending_document_job(
- self,
- *,
- worker_key: str,
- lease_seconds: int,
- stale_indexing_seconds: int,
- redis_client: Redis | None = None) -> tuple[KnowledgeDocument, list[KnowledgeChunk]] | None:
- stale_before = datetime.utcnow() - timedelta(seconds=stale_indexing_seconds)
- document = self.document_repository.get_next_pending_indexing(
- stale_before=stale_before)
- if document is None:
- return None
- job = self._read_latest_index_job(document=document)
- return self.execute_document_index_job(
- document_id=document.id,
- action=job.action,
- job_id=job.jobId,
- worker_key=worker_key,
- lease_seconds=lease_seconds,
- redis_client=redis_client)
- def list_index_jobs(
- self,
- *,
- knowledge_base_id: str | None = None,
- document_id: str | None = None,
- status: str | None = None) -> list[KnowledgeIndexJobData]:
- documents = self.document_repository.list_filtered(
- knowledge_base_id=knowledge_base_id)
- jobs: list[KnowledgeIndexJobData] = []
- for document in documents:
- if document_id is not None and document.id != document_id:
- continue
- job = self._try_read_latest_index_job(document=document)
- if job is None:
- continue
- if status is not None and job.status != status:
- continue
- jobs.append(job)
- jobs.sort(
- key=lambda item: item.queuedTime or datetime.min,
- reverse=True)
- return jobs
- def detail_index_job(self, *, document_id: str) -> KnowledgeIndexJobData | None:
- document = self.document_repository.get_by_id(document_id=document_id)
- if document is None:
- return None
- return self._try_read_latest_index_job(document=document)
- def delete_document(self, *, document_id: str) -> bool:
- return bool(self.delete_document_result(document_id=document_id)["deleted"])
- def delete_document_result(self, *, document_id: str) -> dict[str, JSONValue]:
- document = self.document_repository.get_by_id(document_id=document_id)
- if document is None:
- return {
- "deleted": False,
- "objectDeleted": False,
- "documentId": document_id,
- }
- object_deleted = self._delete_document_object(document=document)
- self.chunk_repository.delete_by_document(document_id=document_id)
- deleted = self.document_repository.delete(document_id=document_id) is not None
- return {
- "deleted": deleted,
- "objectDeleted": object_deleted,
- "documentId": document_id,
- }
- def read_document_content(
- self,
- *,
- document_id: str,
- include_text: bool = True,
- include_base64: bool = False) -> dict[str, JSONValue] | None:
- document = self.document_repository.get_by_id(document_id=document_id)
- if document is None:
- return None
- raw_content = self._read_document_raw_content(document=document)
- object_status = self.read_document_storage_status(document_id=document_id)
- content_type = self._read_content_type_from_status(object_status)
- payload: dict[str, JSONValue] = {
- "documentId": document.id,
- "title": document.title,
- "sourceType": document.source_type,
- "contentType": content_type,
- "sizeBytes": len(raw_content),
- "objectStorage": self._read_object_storage_metadata(document=document),
- "contentBase64": None,
- "contentText": None,
- }
- if include_base64:
- payload["contentBase64"] = base64.b64encode(raw_content).decode("ascii")
- if include_text and self._is_text_content_type(content_type=content_type, source_type=document.source_type):
- payload["contentText"] = raw_content.decode("utf-8", errors="replace")
- return payload
- def read_document_storage_status(self, *, document_id: str) -> dict[str, JSONValue] | None:
- document = self.document_repository.get_by_id(document_id=document_id)
- if document is None:
- return None
- object_key = self._read_document_object_key(document=document)
- if object_key is None:
- return {
- "documentId": document.id,
- "exists": False,
- "objectStorage": None,
- "errorMessage": "document object metadata is missing",
- }
- status = self.object_storage.head_object(object_key=object_key)
- return self._object_status_to_payload(document=document, status=status)
- def read_storage_health(self) -> dict[str, JSONValue]:
- return dict(self.object_storage.health_check())
- def list_chunks_filtered(
- self,
- *,
- knowledge_base_id: str | None = None,
- document_id: str | None = None,
- keyword: str | None = None) -> list[KnowledgeChunk]:
- return self.chunk_repository.list_filtered(
- knowledge_base_id=knowledge_base_id,
- document_id=document_id,
- keyword=keyword)
- def delete_chunk(self, *, chunk_id: str) -> bool:
- return self.chunk_repository.delete(chunk_id=chunk_id)
- def search(
- self,
- payload: KnowledgeSearchRequest) -> list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]]:
- document_cache: dict[str, KnowledgeDocument] = {}
- query_embedding_result = self.embedding_service.embed_text(payload.query)
- candidate_limit = max(
- payload.top_k * max(self.settings.retrieval_candidate_multiplier, 1),
- payload.top_k)
- vector_candidates = self.chunk_repository.search_by_vector(
- knowledge_base_id=payload.knowledge_base_id,
- embedding=query_embedding_result.embedding,
- limit=candidate_limit)
- if vector_candidates:
- chunks = [chunk for chunk, _ in vector_candidates]
- vector_scores_by_chunk_id = {
- chunk.id: score for chunk, score in vector_candidates
- }
- retrieval_mode = "pgvector-hybrid"
- else:
- chunks = self.chunk_repository.list_by_base(
- knowledge_base_id=payload.knowledge_base_id)
- vector_scores_by_chunk_id = {}
- retrieval_mode = "hybrid"
- # Resolve per-base retrieval weights with global fallback
- kb = self.base_repository.get_by_id(knowledge_base_id=payload.knowledge_base_id)
- retrieval_config = (kb.metadata_json or {}).get("retrieval_config", {}) if kb else {}
- keyword_weight = float(retrieval_config.get("keyword_weight", self.settings.retrieval_keyword_weight))
- vector_weight = float(retrieval_config.get("vector_weight", self.settings.retrieval_vector_weight))
- rerank_weight = float(retrieval_config.get("rerank_weight", self.settings.retrieval_rerank_weight))
- # Pre-compute BM25 collection stats from candidate chunks
- chunk_texts = [chunk.content_text for chunk in chunks]
- avg_doc_length, doc_count, df_map = compute_bm25_stats(chunk_texts)
- scored: list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]] = []
- for chunk in chunks:
- document = document_cache.get(chunk.document_id)
- if document is None:
- document = self.document_repository.get_by_id(
- document_id=chunk.document_id)
- if document is None:
- continue
- document_cache[chunk.document_id] = document
- if not self._matches_filters(document=document, filters_json=payload.filters_json):
- continue
- keyword = bm25_score(
- payload.query, chunk.content_text,
- avg_doc_length=avg_doc_length, doc_count=doc_count, df=df_map)
- vector = vector_scores_by_chunk_id.get(chunk.id)
- if vector is None:
- vector = cosine_similarity(query_embedding_result.embedding, chunk.embedding_json)
- rerank = (
- rerank_score(
- query=payload.query,
- chunk_text=chunk.content_text,
- document_title=document.title)
- if self.settings.retrieval_rerank_enabled
- else 0.0
- )
- score = round(
- keyword * keyword_weight
- + vector * vector_weight
- + rerank * rerank_weight,
- 6)
- scored.append(
- (
- chunk,
- document,
- score,
- {
- "final_score": score,
- "keyword_score": round(keyword, 6),
- "vector_score": round(vector, 6),
- "rerank_score": round(rerank, 6),
- "retrieval_mode": retrieval_mode,
- "rerank_enabled": self.settings.retrieval_rerank_enabled,
- "candidate_limit": candidate_limit,
- "weights": {
- "keyword": keyword_weight,
- "vector": vector_weight,
- "rerank": rerank_weight,
- },
- "embedding_provider": query_embedding_result.provider,
- "embedding_model": query_embedding_result.model,
- "citation": {
- "document_id": document.id,
- "document_title": document.title,
- "source_uri": document.source_uri,
- "chunk_id": chunk.id,
- "chunk_index": chunk.chunk_index,
- },
- })
- )
- scored.sort(key=lambda item: item[2], reverse=True)
- return scored[: payload.top_k]
- def _index_document(
- self,
- *,
- document: KnowledgeDocument,
- content_text: str,
- chunk_size: int | None,
- chunk_overlap: int | None) -> list[KnowledgeChunk]:
- source_type = document.source_type or "text"
- resolved_size = chunk_size or self.settings.default_chunk_size
- resolved_overlap = chunk_overlap or self.settings.default_chunk_overlap
- chunk_payloads = chunk_document(
- content_text=content_text,
- source_type=source_type,
- chunk_size=resolved_size,
- chunk_overlap=resolved_overlap)
- for chunk_payload in chunk_payloads:
- content_text = self._read_chunk_content(chunk_payload)
- embedding_result = self.embedding_service.embed_text(content_text)
- chunk_payload["embedding_model"] = embedding_result.model
- chunk_payload["embedding_json"] = embedding_result.embedding
- chunk_payload["metadata_json"] = {
- "embedding_provider": embedding_result.provider,
- }
- return self.chunk_repository.replace_document_chunks(
- knowledge_base_id=document.knowledge_base_id,
- document_id=document.id,
- chunks=chunk_payloads)
- def _read_chunk_content(self, chunk_payload: dict[str, JSONValue]) -> str:
- value = chunk_payload.get("content_text")
- return value if isinstance(value, str) else ""
- def _parse_document_for_indexing(self, *, document: KnowledgeDocument) -> ParsedDocument:
- metadata = document.metadata_json or {}
- object_metadata = metadata.get("object_storage")
- if isinstance(object_metadata, dict):
- object_key = object_metadata.get("objectKey")
- if isinstance(object_key, str) and object_key:
- try:
- raw_content = self.object_storage.get_bytes(object_key=object_key)
- except ObjectStorageNotFoundError as exc:
- raise ValueError(f"knowledge document content object not found: {document.id}") from exc
- return parse_document_content(
- source_type=document.source_type,
- source_uri=document.source_uri,
- content_base64=base64.b64encode(raw_content).decode("ascii"))
- if document.content_text:
- return parse_document_content(
- source_type=document.source_type,
- source_uri=document.source_uri,
- content_text=document.content_text)
- raise ValueError(f"knowledge document content object not found: {document.id}")
- def _read_document_content_for_indexing(self, *, document: KnowledgeDocument) -> str:
- return self._parse_document_for_indexing(document=document).content_text
- def _read_document_raw_content(self, *, document: KnowledgeDocument) -> bytes:
- object_key = self._read_document_object_key(document=document)
- if isinstance(object_key, str) and object_key:
- return self.object_storage.get_bytes(object_key=object_key)
- if document.content_text:
- return document.content_text.encode("utf-8")
- raise ValueError(f"knowledge document content object not found: {document.id}")
- def _delete_document_object(self, *, document: KnowledgeDocument) -> bool:
- object_key = self._read_document_object_key(document=document)
- if object_key is None:
- return False
- try:
- return self.object_storage.delete_object(object_key=object_key)
- except ObjectStorageNotFoundError:
- return False
- def _read_document_object_key(self, *, document: KnowledgeDocument) -> str | None:
- object_metadata = self._read_object_storage_metadata(document=document)
- if object_metadata is None:
- return None
- object_key = object_metadata.get("objectKey")
- return object_key if isinstance(object_key, str) and object_key else None
- def _read_object_storage_metadata(
- self,
- *,
- document: KnowledgeDocument) -> dict[str, JSONValue] | None:
- metadata = document.metadata_json or {}
- object_metadata = metadata.get("object_storage")
- return object_metadata if isinstance(object_metadata, dict) else None
- def _object_status_to_payload(
- self,
- *,
- document: KnowledgeDocument,
- status: ObjectStorageStatus) -> dict[str, JSONValue]:
- return {
- "documentId": document.id,
- "exists": status.exists,
- "objectStorage": self._read_object_storage_metadata(document=document),
- "contentType": status.content_type,
- "sizeBytes": status.size_bytes,
- "etag": status.etag,
- "errorMessage": status.error_message,
- }
- def _read_content_type_from_status(
- self,
- object_status: dict[str, JSONValue] | None) -> str | None:
- if object_status is None:
- return None
- content_type = object_status.get("contentType")
- return content_type if isinstance(content_type, str) else None
- def _is_text_content_type(self, *, content_type: str | None, source_type: str) -> bool:
- if content_type is not None and content_type.startswith("text/"):
- return True
- return source_type.strip().lower() in {"text", "txt", "markdown", "md", "html", "htm", "json", "csv"}
- def _should_queue_indexing(self, *, async_mode: bool | None) -> bool:
- if async_mode is False:
- return False
- if not self.settings.async_indexing_enabled and async_mode is None:
- return False
- return self.task_queue_publisher is not None
- def _publish_document_index_job(
- self,
- *,
- document_id: str,
- action: str,
- job_id: str) -> bool:
- if self.task_queue_publisher is None:
- return False
- return self.task_queue_publisher.publish_knowledge_document(
- document_id=document_id,
- action=action,
- job_id=job_id)
- def _write_index_job_metadata(
- self,
- *,
- document: KnowledgeDocument,
- action: str,
- job_id: str,
- status: str,
- progress: int,
- chunk_size: int | None = None,
- chunk_overlap: int | None = None,
- worker_key: str | None = None,
- started_time: datetime | None = None,
- completed_time: datetime | None = None,
- error_message: str | None = None) -> dict[str, JSONValue]:
- metadata = dict(document.metadata_json or {})
- existing_job = self._read_index_job_payload(document=document)
- queued_time = existing_job.get("queuedTime")
- if not isinstance(queued_time, str):
- queued_time = datetime.utcnow().isoformat()
- resolved_started_time = (
- started_time.isoformat()
- if started_time is not None
- else existing_job.get("startedTime")
- )
- resolved_completed_time = (
- completed_time.isoformat()
- if completed_time is not None
- else existing_job.get("completedTime")
- )
- job_payload: dict[str, JSONValue] = {
- "jobId": job_id,
- "documentId": document.id,
- "knowledgeBaseId": document.knowledge_base_id,
- "documentTitle": document.title,
- "action": action if action in {"index", "reindex"} else "reindex",
- "status": status,
- "progress": max(0, min(progress, 100)),
- "queueName": KNOWLEDGE_DOCUMENT_QUEUE,
- "workerKey": worker_key,
- "errorMessage": error_message,
- "chunkSize": chunk_size,
- "chunkOverlap": chunk_overlap,
- "queuedTime": queued_time,
- "startedTime": resolved_started_time if isinstance(resolved_started_time, str) else None,
- "completedTime": resolved_completed_time if isinstance(resolved_completed_time, str) else None,
- }
- metadata["index_job"] = job_payload
- return metadata
- def _read_latest_index_job(self, *, document: KnowledgeDocument) -> KnowledgeIndexJobData:
- payload = self._read_index_job_payload(document=document)
- return KnowledgeIndexJobData(
- jobId=self._read_payload_string(payload, "jobId") or f"kjob_{document.id}",
- documentId=self._read_payload_string(payload, "documentId") or document.id,
- knowledgeBaseId=self._read_payload_string(payload, "knowledgeBaseId") or document.knowledge_base_id,
- documentTitle=self._read_payload_string(payload, "documentTitle") or document.title,
- action=self._read_job_action(payload.get("action")),
- status=self._read_job_status(payload.get("status")),
- progress=self._read_payload_int(payload, "progress", 0),
- queueName=self._read_payload_string(payload, "queueName"),
- workerKey=self._read_payload_string(payload, "workerKey"),
- errorMessage=self._read_payload_string(payload, "errorMessage"),
- chunkSize=self._read_optional_payload_int(payload, "chunkSize"),
- chunkOverlap=self._read_optional_payload_int(payload, "chunkOverlap"),
- queuedTime=self._read_payload_datetime(payload, "queuedTime"),
- startedTime=self._read_payload_datetime(payload, "startedTime"),
- completedTime=self._read_payload_datetime(payload, "completedTime"))
- def _try_read_latest_index_job(self, *, document: KnowledgeDocument) -> KnowledgeIndexJobData | None:
- if not self._read_index_job_payload(document=document):
- return None
- return self._read_latest_index_job(document=document)
- def _read_index_job_payload(self, *, document: KnowledgeDocument) -> dict[str, JSONValue]:
- metadata = document.metadata_json or {}
- value = metadata.get("index_job")
- if isinstance(value, dict):
- return {str(item_key): item_value for item_key, item_value in value.items()}
- return {}
- def _read_payload_string(
- self,
- payload: dict[str, JSONValue],
- key: str) -> str | None:
- value = payload.get(key)
- return value if isinstance(value, str) and value else None
- def _read_payload_int(
- self,
- payload: dict[str, JSONValue],
- key: str,
- fallback: int) -> int:
- value = payload.get(key)
- if isinstance(value, int) and not isinstance(value, bool):
- return value
- return fallback
- def _read_optional_payload_int(
- self,
- payload: dict[str, JSONValue],
- key: str) -> int | None:
- value = payload.get(key)
- if isinstance(value, int) and not isinstance(value, bool):
- return value
- return None
- def _read_payload_datetime(
- self,
- payload: dict[str, JSONValue],
- key: str) -> datetime | None:
- value = payload.get(key)
- if not isinstance(value, str) or not value:
- return None
- try:
- return datetime.fromisoformat(value)
- except ValueError:
- return None
- def _read_job_action(self, value: JSONValue) -> KnowledgeIndexJobAction:
- if isinstance(value, str) and value in {"index", "reindex"}:
- return cast(KnowledgeIndexJobAction, value)
- return "reindex"
- def _read_job_status(self, value: JSONValue) -> KnowledgeIndexJobStatus:
- if isinstance(value, str) and value in {"queued", "running", "completed", "failed", "skipped"}:
- return cast(KnowledgeIndexJobStatus, value)
- return "queued"
- def _mark_document_failed(
- self,
- *,
- document: KnowledgeDocument,
- message: str,
- job_id: str | None = None,
- action: str = "reindex",
- worker_key: str | None = None,
- chunk_size: int | None = None,
- chunk_overlap: int | None = None) -> None:
- metadata = dict(document.metadata_json or {})
- metadata["last_error"] = {
- "message": message[:1000],
- "errorType": "indexing_failed",
- }
- if job_id is not None:
- metadata = self._write_index_job_metadata(
- document=document,
- action=action,
- job_id=job_id,
- status="failed",
- progress=100,
- chunk_size=chunk_size,
- chunk_overlap=chunk_overlap,
- worker_key=worker_key,
- completed_time=datetime.utcnow(),
- error_message=message[:1000])
- self.document_repository.update(
- document_id=document.id,
- status="failed",
- metadata_json=metadata)
- def _guess_content_type(self, *, source_type: str) -> str:
- normalized = source_type.strip().lower().removeprefix(".")
- if normalized in {"markdown", "md"}:
- return "text/markdown; charset=utf-8"
- if normalized in {"html", "htm"}:
- return "text/html; charset=utf-8"
- if normalized == "json":
- return "application/json"
- if normalized == "csv":
- return "text/csv; charset=utf-8"
- if normalized == "pdf":
- return "application/pdf"
- if normalized in {"docx", "word"}:
- return "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
- return "text/plain; charset=utf-8"
- def _matches_filters(
- self,
- *,
- document: KnowledgeDocument,
- filters_json: dict[str, JSONValue]) -> bool:
- source_type = filters_json.get("sourceType") or filters_json.get("source_type")
- if isinstance(source_type, str) and document.source_type != source_type:
- return False
- status = filters_json.get("status")
- if isinstance(status, str) and document.status != status:
- return False
- return True
- def read_settings(self, *, knowledge_base_id: str | None = None) -> KnowledgeSettingsDto:
- base_config: dict[str, JSONValue] = {}
- if knowledge_base_id:
- base = self.base_repository.get_by_id(knowledge_base_id=knowledge_base_id)
- if base is not None and isinstance(base.metadata_json, dict):
- value = base.metadata_json.get("retrieval_config")
- if isinstance(value, dict):
- base_config = value
- defaults = KnowledgeSettingsDto(
- knowledgeBaseId=knowledge_base_id,
- chunkSize=self.settings.default_chunk_size,
- chunkOverlap=self.settings.default_chunk_overlap,
- keywordWeight=self.settings.retrieval_keyword_weight,
- vectorWeight=self.settings.retrieval_vector_weight,
- rerankWeight=self.settings.retrieval_rerank_weight,
- queryRewrite=False,
- requireCitations=True)
- return KnowledgeSettingsDto.model_validate({
- **defaults.model_dump(),
- **base_config,
- "knowledgeBaseId": knowledge_base_id,
- })
- def update_settings(
- self,
- payload: KnowledgeSettingsUpdateRequestDto) -> KnowledgeSettingsDto:
- settings = KnowledgeSettingsDto.model_validate({
- **payload.model_dump(),
- "knowledgeBaseId": payload.knowledgeBaseId,
- })
- if payload.knowledgeBaseId:
- base = self.base_repository.get_by_id(knowledge_base_id=payload.knowledgeBaseId)
- if base is not None:
- metadata = dict(base.metadata_json or {})
- metadata["retrieval_config"] = settings.model_dump(exclude={"knowledgeBaseId"})
- self.base_repository.update(
- knowledge_base_id=payload.knowledgeBaseId,
- metadata_json=metadata)
- return settings
- def _build_base_code(self, name: str) -> str:
- base = "".join(
- char.lower() if char.isalnum() else "_"
- for char in name
- ).strip("_") or "knowledge_base"
- return base[:64]
- def build_knowledge_application_service(
- *,
- db: Session,
- settings: KnowledgeServiceSettings) -> KnowledgeApplicationService:
- redis_client = try_build_redis_client(settings.redis_url)
- return KnowledgeApplicationService(
- settings=settings,
- base_repository=KnowledgeBaseRepository(db),
- document_repository=KnowledgeDocumentRepository(db),
- chunk_repository=KnowledgeChunkRepository(db),
- redis_client=redis_client,
- task_queue_publisher=(
- TaskQueuePublisher(client=redis_client) if redis_client is not None else None
- ))
|