| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459 |
- from datetime import datetime
- from typing import TypeVar
- from core_domain import ServiceHealth
- from core_shared import error_detail
- from fastapi import APIRouter, Depends, HTTPException
- from sqlalchemy import text
- from sqlalchemy.orm import Session
- from app.application.document_parsers import DocumentParseError
- from app.application.services import (
- KnowledgeApplicationService,
- KnowledgeIndexingError,
- build_knowledge_application_service,
- )
- from app.bootstrap.settings import KnowledgeServiceSettings
- from app.db.session import get_db
- from app.infrastructure.object_storage import ObjectStorageError
- from app.schemas.knowledge import (
- ApiResponse,
- DeleteData,
- KnowledgeBaseCreateRequestDto,
- KnowledgeBaseDeleteRequestDto,
- KnowledgeBaseDetailRequestDto,
- KnowledgeBaseDto,
- KnowledgeBaseListRequestDto,
- KnowledgeBaseStatusRequestDto,
- KnowledgeBaseStatusUpdateRequest,
- KnowledgeBaseUpdateRequestDto,
- KnowledgeBaseReindexData,
- KnowledgeBaseReindexRequestDto,
- KnowledgeChunkDetailRequestDto,
- KnowledgeChunkDeleteRequestDto,
- KnowledgeChunkDto,
- KnowledgeChunkListRequestDto,
- KnowledgeDocumentCreateRequestDto,
- KnowledgeDocumentContentData,
- KnowledgeDocumentContentRequestDto,
- KnowledgeDocumentDeleteRequestDto,
- KnowledgeDocumentDetailRequestDto,
- KnowledgeDocumentDto,
- KnowledgeDocumentIngestData,
- KnowledgeDocumentListRequestDto,
- KnowledgeDocumentParseRequest,
- KnowledgeDocumentParseRequestDto,
- KnowledgeDocumentParseData,
- KnowledgeDocumentReindexRequestDto,
- KnowledgeDocumentStorageStatusData,
- KnowledgeDocumentStorageStatusRequestDto,
- KnowledgeDocumentStatusRequestDto,
- KnowledgeDocumentUpdateRequestDto,
- KnowledgeIndexJobData,
- KnowledgeIndexJobDetailRequestDto,
- KnowledgeIndexJobListRequestDto,
- KnowledgeIndexJobRetryRequestDto,
- KnowledgeSearchRequestDto,
- KnowledgeSearchRequest,
- KnowledgeSearchResultDto,
- KnowledgeSettingsDto,
- KnowledgeSettingsUpdateRequestDto,
- KnowledgeStorageHealthData,
- KnowledgeStorageHealthRequestDto,
- PageResult,
- )
- router = APIRouter()
- T = TypeVar("T")
- def ok(data: T) -> ApiResponse[T]:
- return ApiResponse[T](
- data=data,
- requestId="",
- serverTime=datetime.utcnow())
- def paginate(items: list[T], *, page: int, page_size: int) -> PageResult[T]:
- start = (page - 1) * page_size
- return PageResult.from_items(
- items=items[start:start + page_size],
- total=len(items),
- page=page,
- page_size=page_size)
- def get_knowledge_settings() -> KnowledgeServiceSettings:
- return KnowledgeServiceSettings()
- def get_knowledge_application_service(
- db: Session = Depends(get_db),
- settings: KnowledgeServiceSettings = Depends(get_knowledge_settings)) -> KnowledgeApplicationService:
- return build_knowledge_application_service(db=db, settings=settings)
- @router.get("/health", response_model=ServiceHealth)
- def health_check(db: Session = Depends(get_db)) -> ServiceHealth:
- db.execute(text("SELECT 1"))
- return ServiceHealth(service="knowledge-service", status="ok", database="ok")
- @router.post("/storage/health", response_model=ApiResponse[KnowledgeStorageHealthData])
- def storage_health_contract(
- payload: KnowledgeStorageHealthRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeStorageHealthData]:
- try:
- health = service.read_storage_health()
- except ObjectStorageError as exc:
- health = {
- "backend": service.settings.object_storage_backend,
- "bucket": service.settings.object_storage_bucket,
- "available": False,
- "message": str(exc),
- }
- return ok(KnowledgeStorageHealthData(
- backend=str(health.get("backend") or ""),
- bucket=str(health.get("bucket") or ""),
- available=bool(health.get("available")),
- message=str(health["message"]) if health.get("message") is not None else None,
- checkedTime=datetime.utcnow()))
- @router.post("/bases/list", response_model=ApiResponse[PageResult[KnowledgeBaseDto]])
- def list_bases_contract(
- payload: KnowledgeBaseListRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[PageResult[KnowledgeBaseDto]]:
- items = [
- KnowledgeBaseDto.from_entity(item)
- for item in service.list_bases_filtered(
- keyword=payload.keyword,
- status=payload.status)
- ]
- return ok(paginate(items, page=payload.page, page_size=payload.pageSize))
- @router.post("/bases/create", response_model=ApiResponse[KnowledgeBaseDto])
- def create_base_contract(
- payload: KnowledgeBaseCreateRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeBaseDto]:
- return ok(KnowledgeBaseDto.from_entity(service.create_base_from_contract(payload)))
- @router.post("/bases/detail", response_model=ApiResponse[KnowledgeBaseDto])
- def detail_base_contract(
- payload: KnowledgeBaseDetailRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeBaseDto]:
- entity = service.base_repository.get_by_id(knowledge_base_id=payload.knowledgeBaseId)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.knowledge_base.not_found", id=payload.knowledgeBaseId))
- return ok(KnowledgeBaseDto.from_entity(entity))
- @router.post("/bases/update", response_model=ApiResponse[KnowledgeBaseDto])
- def update_base_contract(
- payload: KnowledgeBaseUpdateRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeBaseDto]:
- entity = service.update_base_from_contract(payload)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.knowledge_base.not_found", id=payload.knowledgeBaseId))
- return ok(KnowledgeBaseDto.from_entity(entity))
- @router.post("/bases/status", response_model=ApiResponse[KnowledgeBaseDto])
- def update_base_status_contract(
- payload: KnowledgeBaseStatusRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeBaseDto]:
- entity = service.update_base_status(
- knowledge_base_id=payload.knowledgeBaseId,
- payload=KnowledgeBaseStatusUpdateRequest(status=payload.status))
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.knowledge_base.not_found", id=payload.knowledgeBaseId))
- return ok(KnowledgeBaseDto.from_entity(entity))
- @router.post("/bases/delete", response_model=ApiResponse[DeleteData])
- def delete_base_contract(
- payload: KnowledgeBaseDeleteRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[DeleteData]:
- try:
- deleted = service.delete_base(knowledge_base_id=payload.knowledgeBaseId)
- except ObjectStorageError as exc:
- raise HTTPException(status_code=503, detail=error_detail("error.storage.unavailable", message=str(exc))) from exc
- return ok(DeleteData(
- deleted=deleted,
- knowledgeBaseId=payload.knowledgeBaseId))
- @router.post("/documents/list", response_model=ApiResponse[PageResult[KnowledgeDocumentDto]])
- def list_documents_contract(
- payload: KnowledgeDocumentListRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[PageResult[KnowledgeDocumentDto]]:
- items = [
- KnowledgeDocumentDto.from_entity(item)
- for item in service.list_documents_filtered(
- knowledge_base_id=payload.knowledgeBaseId,
- keyword=payload.keyword,
- status=payload.status,
- source_type=payload.sourceType)
- ]
- return ok(paginate(items, page=payload.page, page_size=payload.pageSize))
- @router.post("/documents/create", response_model=ApiResponse[KnowledgeDocumentIngestData])
- def create_document_contract(
- payload: KnowledgeDocumentCreateRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentIngestData]:
- try:
- result = service.create_document_from_contract_result(payload)
- except KnowledgeIndexingError as exc:
- raise HTTPException(status_code=503, detail=error_detail("error.knowledge.indexing_failed", message=str(exc))) from exc
- except ObjectStorageError as exc:
- raise HTTPException(status_code=503, detail=error_detail("error.storage.unavailable", message=str(exc))) from exc
- except ValueError as exc:
- raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc
- return ok(KnowledgeDocumentIngestData(
- document=KnowledgeDocumentDto.from_entity(result.document),
- chunks=[KnowledgeChunkDto.from_entity(item) for item in result.chunks],
- queued=result.queued,
- job=result.job))
- @router.post("/documents/detail", response_model=ApiResponse[KnowledgeDocumentDto])
- def detail_document_contract(
- payload: KnowledgeDocumentDetailRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentDto]:
- entity = service.document_repository.get_by_id(document_id=payload.documentId)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.knowledge_document.not_found", id=payload.documentId))
- return ok(KnowledgeDocumentDto.from_entity(entity))
- @router.post("/documents/update", response_model=ApiResponse[KnowledgeDocumentDto])
- def update_document_contract(
- payload: KnowledgeDocumentUpdateRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentDto]:
- entity = service.update_document_from_contract(payload)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.knowledge_document.not_found", id=payload.documentId))
- return ok(KnowledgeDocumentDto.from_entity(entity))
- @router.post("/documents/status", response_model=ApiResponse[KnowledgeDocumentDto])
- def update_document_status_contract(
- payload: KnowledgeDocumentStatusRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentDto]:
- entity = service.document_repository.update_status(
- document_id=payload.documentId,
- status=payload.status)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.knowledge_document.not_found", id=payload.documentId))
- return ok(KnowledgeDocumentDto.from_entity(entity))
- @router.post("/documents/delete", response_model=ApiResponse[DeleteData])
- def delete_document_contract(
- payload: KnowledgeDocumentDeleteRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[DeleteData]:
- try:
- result = service.delete_document_result(document_id=payload.documentId)
- except ObjectStorageError as exc:
- raise HTTPException(status_code=503, detail=error_detail("error.storage.unavailable", message=str(exc))) from exc
- return ok(DeleteData(
- deleted=bool(result.get("deleted")),
- documentId=payload.documentId,
- objectDeleted=bool(result.get("objectDeleted"))))
- @router.post("/documents/reindex", response_model=ApiResponse[KnowledgeDocumentIngestData])
- def reindex_document_contract(
- payload: KnowledgeDocumentReindexRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentIngestData]:
- try:
- result = service.reindex_document_from_contract_result(payload)
- except KnowledgeIndexingError as exc:
- raise HTTPException(status_code=503, detail=error_detail("error.knowledge.indexing_failed", message=str(exc))) from exc
- except ObjectStorageError as exc:
- raise HTTPException(status_code=503, detail=error_detail("error.storage.unavailable", message=str(exc))) from exc
- except ValueError as exc:
- raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc
- if result is None:
- raise HTTPException(status_code=404, detail=error_detail("error.knowledge_document.not_found", id=payload.documentId))
- return ok(KnowledgeDocumentIngestData(
- document=KnowledgeDocumentDto.from_entity(result.document),
- chunks=[KnowledgeChunkDto.from_entity(item) for item in result.chunks],
- queued=result.queued,
- job=result.job))
- @router.post("/jobs/list", response_model=ApiResponse[PageResult[KnowledgeIndexJobData]])
- def list_index_jobs_contract(
- payload: KnowledgeIndexJobListRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[PageResult[KnowledgeIndexJobData]]:
- items = service.list_index_jobs(
- knowledge_base_id=payload.knowledgeBaseId,
- document_id=payload.documentId,
- status=payload.status)
- return ok(paginate(items, page=payload.page, page_size=payload.pageSize))
- @router.post("/jobs/detail", response_model=ApiResponse[KnowledgeIndexJobData])
- def detail_index_job_contract(
- payload: KnowledgeIndexJobDetailRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeIndexJobData]:
- job = service.detail_index_job(document_id=payload.documentId)
- if job is None:
- raise HTTPException(status_code=404, detail=error_detail("error.knowledge_index_job.not_found", id=payload.documentId))
- return ok(job)
- @router.post("/jobs/retry", response_model=ApiResponse[KnowledgeDocumentIngestData])
- def retry_index_job_contract(
- payload: KnowledgeIndexJobRetryRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentIngestData]:
- result = service.reindex_document_from_contract_result(
- KnowledgeDocumentReindexRequestDto(
- documentId=payload.documentId,
- chunkSize=payload.chunkSize,
- chunkOverlap=payload.chunkOverlap,
- asyncMode=True))
- if result is None:
- raise HTTPException(status_code=404, detail=error_detail("error.knowledge_document.not_found", id=payload.documentId))
- return ok(KnowledgeDocumentIngestData(
- document=KnowledgeDocumentDto.from_entity(result.document),
- chunks=[KnowledgeChunkDto.from_entity(item) for item in result.chunks],
- queued=result.queued,
- job=result.job))
- @router.post("/jobs/reindex-base", response_model=ApiResponse[KnowledgeBaseReindexData])
- def reindex_base_contract(
- payload: KnowledgeBaseReindexRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeBaseReindexData]:
- if service.base_repository.get_by_id(knowledge_base_id=payload.knowledgeBaseId) is None:
- raise HTTPException(status_code=404, detail=error_detail("error.knowledge_base.not_found", id=payload.knowledgeBaseId))
- jobs = service.reindex_base_from_contract(payload)
- return ok(KnowledgeBaseReindexData(
- knowledgeBaseId=payload.knowledgeBaseId,
- queuedCount=len(jobs),
- jobs=jobs))
- @router.post("/documents/content", response_model=ApiResponse[KnowledgeDocumentContentData])
- def read_document_content_contract(
- payload: KnowledgeDocumentContentRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentContentData]:
- try:
- result = service.read_document_content(
- document_id=payload.documentId,
- include_text=payload.includeText,
- include_base64=payload.includeBase64)
- except ObjectStorageError as exc:
- raise HTTPException(status_code=503, detail=error_detail("error.storage.unavailable", message=str(exc))) from exc
- except ValueError as exc:
- raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc
- if result is None:
- raise HTTPException(status_code=404, detail=error_detail("error.knowledge_document.not_found", id=payload.documentId))
- return ok(KnowledgeDocumentContentData.model_validate(result))
- @router.post("/documents/storage/status", response_model=ApiResponse[KnowledgeDocumentStorageStatusData])
- def read_document_storage_status_contract(
- payload: KnowledgeDocumentStorageStatusRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentStorageStatusData]:
- try:
- result = service.read_document_storage_status(document_id=payload.documentId)
- except ObjectStorageError as exc:
- raise HTTPException(status_code=503, detail=error_detail("error.storage.unavailable", message=str(exc))) from exc
- if result is None:
- raise HTTPException(status_code=404, detail=error_detail("error.knowledge_document.not_found", id=payload.documentId))
- return ok(KnowledgeDocumentStorageStatusData.model_validate({
- **result,
- "checkedTime": datetime.utcnow(),
- }))
- @router.post("/documents/parse", response_model=ApiResponse[KnowledgeDocumentParseData])
- def parse_document_contract(
- payload: KnowledgeDocumentParseRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentParseData]:
- try:
- parsed = service.parse_document(
- KnowledgeDocumentParseRequest(
- source_type=payload.sourceType,
- source_uri=payload.sourceUri,
- content_text=payload.contentText,
- content_base64=payload.contentBase64))
- except DocumentParseError as exc:
- raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc
- return ok(KnowledgeDocumentParseData(
- contentText=parsed.content_text,
- sourceType=parsed.source_type,
- metadata=parsed.metadata_json))
- @router.post("/chunks/list", response_model=ApiResponse[PageResult[KnowledgeChunkDto]])
- def list_chunks_contract(
- payload: KnowledgeChunkListRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[PageResult[KnowledgeChunkDto]]:
- items = [
- KnowledgeChunkDto.from_entity(item)
- for item in service.list_chunks_filtered(
- knowledge_base_id=payload.knowledgeBaseId,
- document_id=payload.documentId,
- keyword=payload.keyword)
- ]
- return ok(paginate(items, page=payload.page, page_size=payload.pageSize))
- @router.post("/chunks/detail", response_model=ApiResponse[KnowledgeChunkDto])
- def detail_chunk_contract(
- payload: KnowledgeChunkDetailRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeChunkDto]:
- entity = service.chunk_repository.get_by_id(chunk_id=payload.chunkId)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.knowledge_chunk.not_found", id=payload.chunkId))
- return ok(KnowledgeChunkDto.from_entity(entity))
- @router.post("/chunks/delete", response_model=ApiResponse[DeleteData])
- def delete_chunk_contract(
- payload: KnowledgeChunkDeleteRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[DeleteData]:
- return ok(DeleteData(
- deleted=service.delete_chunk(chunk_id=payload.chunkId),
- chunkId=payload.chunkId))
- @router.post("/search/query", response_model=ApiResponse[list[KnowledgeSearchResultDto]])
- def search_contract(
- payload: KnowledgeSearchRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[list[KnowledgeSearchResultDto]]:
- results = [
- KnowledgeSearchResultDto(
- chunk=KnowledgeChunkDto.from_entity(chunk),
- document=KnowledgeDocumentDto.from_entity(document),
- score=score,
- scoreDetails=score_json)
- for chunk, document, score, score_json in service.search(
- KnowledgeSearchRequest(
- knowledge_base_id=payload.knowledgeBaseId,
- query=payload.query,
- top_k=payload.topK,
- filters_json=payload.filters))
- ]
- return ok(results)
- @router.post("/settings/detail", response_model=ApiResponse[KnowledgeSettingsDto])
- def detail_settings_contract(
- payload: KnowledgeBaseDetailRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeSettingsDto]:
- return ok(service.read_settings(knowledge_base_id=payload.knowledgeBaseId))
- @router.post("/settings/update", response_model=ApiResponse[KnowledgeSettingsDto])
- def update_settings_contract(
- payload: KnowledgeSettingsUpdateRequestDto,
- service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeSettingsDto]:
- return ok(service.update_settings(payload))
|