from datetime import datetime from typing import TypeVar from core_domain import ServiceHealth from core_shared import error_detail from fastapi import APIRouter, Depends, HTTPException from sqlalchemy import text from sqlalchemy.orm import Session from app.application.document_parsers import DocumentParseError from app.application.services import ( KnowledgeApplicationService, KnowledgeIndexingError, build_knowledge_application_service, ) from app.bootstrap.settings import KnowledgeServiceSettings from app.db.session import get_db from app.infrastructure.object_storage import ObjectStorageError from app.schemas.knowledge import ( ApiResponse, DeleteData, KnowledgeBaseCreateRequestDto, KnowledgeBaseDeleteRequestDto, KnowledgeBaseDetailRequestDto, KnowledgeBaseDto, KnowledgeBaseListRequestDto, KnowledgeBaseStatusRequestDto, KnowledgeBaseStatusUpdateRequest, KnowledgeBaseUpdateRequestDto, KnowledgeBaseReindexData, KnowledgeBaseReindexRequestDto, KnowledgeChunkDetailRequestDto, KnowledgeChunkDeleteRequestDto, KnowledgeChunkDto, KnowledgeChunkListRequestDto, KnowledgeDocumentCreateRequestDto, KnowledgeDocumentContentData, KnowledgeDocumentContentRequestDto, KnowledgeDocumentDeleteRequestDto, KnowledgeDocumentDetailRequestDto, KnowledgeDocumentDto, KnowledgeDocumentIngestData, KnowledgeDocumentListRequestDto, KnowledgeDocumentParseRequest, KnowledgeDocumentParseRequestDto, KnowledgeDocumentParseData, KnowledgeDocumentReindexRequestDto, KnowledgeDocumentStorageStatusData, KnowledgeDocumentStorageStatusRequestDto, KnowledgeDocumentStatusRequestDto, KnowledgeDocumentUpdateRequestDto, KnowledgeIndexJobData, KnowledgeIndexJobDetailRequestDto, KnowledgeIndexJobListRequestDto, KnowledgeIndexJobRetryRequestDto, KnowledgeSearchRequestDto, KnowledgeSearchRequest, KnowledgeSearchResultDto, KnowledgeSettingsDto, KnowledgeSettingsUpdateRequestDto, KnowledgeStorageHealthData, KnowledgeStorageHealthRequestDto, PageResult, ) router = APIRouter() T = TypeVar("T") def ok(data: T) -> ApiResponse[T]: return ApiResponse[T]( data=data, requestId="", serverTime=datetime.utcnow()) def paginate(items: list[T], *, page: int, page_size: int) -> PageResult[T]: start = (page - 1) * page_size return PageResult.from_items( items=items[start:start + page_size], total=len(items), page=page, page_size=page_size) def get_knowledge_settings() -> KnowledgeServiceSettings: return KnowledgeServiceSettings() def get_knowledge_application_service( db: Session = Depends(get_db), settings: KnowledgeServiceSettings = Depends(get_knowledge_settings)) -> KnowledgeApplicationService: return build_knowledge_application_service(db=db, settings=settings) @router.get("/health", response_model=ServiceHealth) def health_check(db: Session = Depends(get_db)) -> ServiceHealth: db.execute(text("SELECT 1")) return ServiceHealth(service="knowledge-service", status="ok", database="ok") @router.post("/storage/health", response_model=ApiResponse[KnowledgeStorageHealthData]) def storage_health_contract( payload: KnowledgeStorageHealthRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeStorageHealthData]: try: health = service.read_storage_health() except ObjectStorageError as exc: health = { "backend": service.settings.object_storage_backend, "bucket": service.settings.object_storage_bucket, "available": False, "message": str(exc), } return ok(KnowledgeStorageHealthData( backend=str(health.get("backend") or ""), bucket=str(health.get("bucket") or ""), available=bool(health.get("available")), message=str(health["message"]) if health.get("message") is not None else None, checkedTime=datetime.utcnow())) @router.post("/bases/list", response_model=ApiResponse[PageResult[KnowledgeBaseDto]]) def list_bases_contract( payload: KnowledgeBaseListRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[PageResult[KnowledgeBaseDto]]: items = [ KnowledgeBaseDto.from_entity(item) for item in service.list_bases_filtered( keyword=payload.keyword, status=payload.status) ] return ok(paginate(items, page=payload.page, page_size=payload.pageSize)) @router.post("/bases/create", response_model=ApiResponse[KnowledgeBaseDto]) def create_base_contract( payload: KnowledgeBaseCreateRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeBaseDto]: return ok(KnowledgeBaseDto.from_entity(service.create_base_from_contract(payload))) @router.post("/bases/detail", response_model=ApiResponse[KnowledgeBaseDto]) def detail_base_contract( payload: KnowledgeBaseDetailRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeBaseDto]: entity = service.base_repository.get_by_id(knowledge_base_id=payload.knowledgeBaseId) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.knowledge_base.not_found", id=payload.knowledgeBaseId)) return ok(KnowledgeBaseDto.from_entity(entity)) @router.post("/bases/update", response_model=ApiResponse[KnowledgeBaseDto]) def update_base_contract( payload: KnowledgeBaseUpdateRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeBaseDto]: entity = service.update_base_from_contract(payload) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.knowledge_base.not_found", id=payload.knowledgeBaseId)) return ok(KnowledgeBaseDto.from_entity(entity)) @router.post("/bases/status", response_model=ApiResponse[KnowledgeBaseDto]) def update_base_status_contract( payload: KnowledgeBaseStatusRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeBaseDto]: entity = service.update_base_status( knowledge_base_id=payload.knowledgeBaseId, payload=KnowledgeBaseStatusUpdateRequest(status=payload.status)) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.knowledge_base.not_found", id=payload.knowledgeBaseId)) return ok(KnowledgeBaseDto.from_entity(entity)) @router.post("/bases/delete", response_model=ApiResponse[DeleteData]) def delete_base_contract( payload: KnowledgeBaseDeleteRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[DeleteData]: try: deleted = service.delete_base(knowledge_base_id=payload.knowledgeBaseId) except ObjectStorageError as exc: raise HTTPException(status_code=503, detail=error_detail("error.storage.unavailable", message=str(exc))) from exc return ok(DeleteData( deleted=deleted, knowledgeBaseId=payload.knowledgeBaseId)) @router.post("/documents/list", response_model=ApiResponse[PageResult[KnowledgeDocumentDto]]) def list_documents_contract( payload: KnowledgeDocumentListRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[PageResult[KnowledgeDocumentDto]]: items = [ KnowledgeDocumentDto.from_entity(item) for item in service.list_documents_filtered( knowledge_base_id=payload.knowledgeBaseId, keyword=payload.keyword, status=payload.status, source_type=payload.sourceType) ] return ok(paginate(items, page=payload.page, page_size=payload.pageSize)) @router.post("/documents/create", response_model=ApiResponse[KnowledgeDocumentIngestData]) def create_document_contract( payload: KnowledgeDocumentCreateRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentIngestData]: try: result = service.create_document_from_contract_result(payload) except KnowledgeIndexingError as exc: raise HTTPException(status_code=503, detail=error_detail("error.knowledge.indexing_failed", message=str(exc))) from exc except ObjectStorageError as exc: raise HTTPException(status_code=503, detail=error_detail("error.storage.unavailable", message=str(exc))) from exc except ValueError as exc: raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc return ok(KnowledgeDocumentIngestData( document=KnowledgeDocumentDto.from_entity(result.document), chunks=[KnowledgeChunkDto.from_entity(item) for item in result.chunks], queued=result.queued, job=result.job)) @router.post("/documents/detail", response_model=ApiResponse[KnowledgeDocumentDto]) def detail_document_contract( payload: KnowledgeDocumentDetailRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentDto]: entity = service.document_repository.get_by_id(document_id=payload.documentId) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.knowledge_document.not_found", id=payload.documentId)) return ok(KnowledgeDocumentDto.from_entity(entity)) @router.post("/documents/update", response_model=ApiResponse[KnowledgeDocumentDto]) def update_document_contract( payload: KnowledgeDocumentUpdateRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentDto]: entity = service.update_document_from_contract(payload) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.knowledge_document.not_found", id=payload.documentId)) return ok(KnowledgeDocumentDto.from_entity(entity)) @router.post("/documents/status", response_model=ApiResponse[KnowledgeDocumentDto]) def update_document_status_contract( payload: KnowledgeDocumentStatusRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentDto]: entity = service.document_repository.update_status( document_id=payload.documentId, status=payload.status) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.knowledge_document.not_found", id=payload.documentId)) return ok(KnowledgeDocumentDto.from_entity(entity)) @router.post("/documents/delete", response_model=ApiResponse[DeleteData]) def delete_document_contract( payload: KnowledgeDocumentDeleteRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[DeleteData]: try: result = service.delete_document_result(document_id=payload.documentId) except ObjectStorageError as exc: raise HTTPException(status_code=503, detail=error_detail("error.storage.unavailable", message=str(exc))) from exc return ok(DeleteData( deleted=bool(result.get("deleted")), documentId=payload.documentId, objectDeleted=bool(result.get("objectDeleted")))) @router.post("/documents/reindex", response_model=ApiResponse[KnowledgeDocumentIngestData]) def reindex_document_contract( payload: KnowledgeDocumentReindexRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentIngestData]: try: result = service.reindex_document_from_contract_result(payload) except KnowledgeIndexingError as exc: raise HTTPException(status_code=503, detail=error_detail("error.knowledge.indexing_failed", message=str(exc))) from exc except ObjectStorageError as exc: raise HTTPException(status_code=503, detail=error_detail("error.storage.unavailable", message=str(exc))) from exc except ValueError as exc: raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc if result is None: raise HTTPException(status_code=404, detail=error_detail("error.knowledge_document.not_found", id=payload.documentId)) return ok(KnowledgeDocumentIngestData( document=KnowledgeDocumentDto.from_entity(result.document), chunks=[KnowledgeChunkDto.from_entity(item) for item in result.chunks], queued=result.queued, job=result.job)) @router.post("/jobs/list", response_model=ApiResponse[PageResult[KnowledgeIndexJobData]]) def list_index_jobs_contract( payload: KnowledgeIndexJobListRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[PageResult[KnowledgeIndexJobData]]: items = service.list_index_jobs( knowledge_base_id=payload.knowledgeBaseId, document_id=payload.documentId, status=payload.status) return ok(paginate(items, page=payload.page, page_size=payload.pageSize)) @router.post("/jobs/detail", response_model=ApiResponse[KnowledgeIndexJobData]) def detail_index_job_contract( payload: KnowledgeIndexJobDetailRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeIndexJobData]: job = service.detail_index_job(document_id=payload.documentId) if job is None: raise HTTPException(status_code=404, detail=error_detail("error.knowledge_index_job.not_found", id=payload.documentId)) return ok(job) @router.post("/jobs/retry", response_model=ApiResponse[KnowledgeDocumentIngestData]) def retry_index_job_contract( payload: KnowledgeIndexJobRetryRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentIngestData]: result = service.reindex_document_from_contract_result( KnowledgeDocumentReindexRequestDto( documentId=payload.documentId, chunkSize=payload.chunkSize, chunkOverlap=payload.chunkOverlap, asyncMode=True)) if result is None: raise HTTPException(status_code=404, detail=error_detail("error.knowledge_document.not_found", id=payload.documentId)) return ok(KnowledgeDocumentIngestData( document=KnowledgeDocumentDto.from_entity(result.document), chunks=[KnowledgeChunkDto.from_entity(item) for item in result.chunks], queued=result.queued, job=result.job)) @router.post("/jobs/reindex-base", response_model=ApiResponse[KnowledgeBaseReindexData]) def reindex_base_contract( payload: KnowledgeBaseReindexRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeBaseReindexData]: if service.base_repository.get_by_id(knowledge_base_id=payload.knowledgeBaseId) is None: raise HTTPException(status_code=404, detail=error_detail("error.knowledge_base.not_found", id=payload.knowledgeBaseId)) jobs = service.reindex_base_from_contract(payload) return ok(KnowledgeBaseReindexData( knowledgeBaseId=payload.knowledgeBaseId, queuedCount=len(jobs), jobs=jobs)) @router.post("/documents/content", response_model=ApiResponse[KnowledgeDocumentContentData]) def read_document_content_contract( payload: KnowledgeDocumentContentRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentContentData]: try: result = service.read_document_content( document_id=payload.documentId, include_text=payload.includeText, include_base64=payload.includeBase64) except ObjectStorageError as exc: raise HTTPException(status_code=503, detail=error_detail("error.storage.unavailable", message=str(exc))) from exc except ValueError as exc: raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc if result is None: raise HTTPException(status_code=404, detail=error_detail("error.knowledge_document.not_found", id=payload.documentId)) return ok(KnowledgeDocumentContentData.model_validate(result)) @router.post("/documents/storage/status", response_model=ApiResponse[KnowledgeDocumentStorageStatusData]) def read_document_storage_status_contract( payload: KnowledgeDocumentStorageStatusRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentStorageStatusData]: try: result = service.read_document_storage_status(document_id=payload.documentId) except ObjectStorageError as exc: raise HTTPException(status_code=503, detail=error_detail("error.storage.unavailable", message=str(exc))) from exc if result is None: raise HTTPException(status_code=404, detail=error_detail("error.knowledge_document.not_found", id=payload.documentId)) return ok(KnowledgeDocumentStorageStatusData.model_validate({ **result, "checkedTime": datetime.utcnow(), })) @router.post("/documents/parse", response_model=ApiResponse[KnowledgeDocumentParseData]) def parse_document_contract( payload: KnowledgeDocumentParseRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeDocumentParseData]: try: parsed = service.parse_document( KnowledgeDocumentParseRequest( source_type=payload.sourceType, source_uri=payload.sourceUri, content_text=payload.contentText, content_base64=payload.contentBase64)) except DocumentParseError as exc: raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc return ok(KnowledgeDocumentParseData( contentText=parsed.content_text, sourceType=parsed.source_type, metadata=parsed.metadata_json)) @router.post("/chunks/list", response_model=ApiResponse[PageResult[KnowledgeChunkDto]]) def list_chunks_contract( payload: KnowledgeChunkListRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[PageResult[KnowledgeChunkDto]]: items = [ KnowledgeChunkDto.from_entity(item) for item in service.list_chunks_filtered( knowledge_base_id=payload.knowledgeBaseId, document_id=payload.documentId, keyword=payload.keyword) ] return ok(paginate(items, page=payload.page, page_size=payload.pageSize)) @router.post("/chunks/detail", response_model=ApiResponse[KnowledgeChunkDto]) def detail_chunk_contract( payload: KnowledgeChunkDetailRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeChunkDto]: entity = service.chunk_repository.get_by_id(chunk_id=payload.chunkId) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.knowledge_chunk.not_found", id=payload.chunkId)) return ok(KnowledgeChunkDto.from_entity(entity)) @router.post("/chunks/delete", response_model=ApiResponse[DeleteData]) def delete_chunk_contract( payload: KnowledgeChunkDeleteRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[DeleteData]: return ok(DeleteData( deleted=service.delete_chunk(chunk_id=payload.chunkId), chunkId=payload.chunkId)) @router.post("/search/query", response_model=ApiResponse[list[KnowledgeSearchResultDto]]) def search_contract( payload: KnowledgeSearchRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[list[KnowledgeSearchResultDto]]: results = [ KnowledgeSearchResultDto( chunk=KnowledgeChunkDto.from_entity(chunk), document=KnowledgeDocumentDto.from_entity(document), score=score, scoreDetails=score_json) for chunk, document, score, score_json in service.search( KnowledgeSearchRequest( knowledge_base_id=payload.knowledgeBaseId, query=payload.query, top_k=payload.topK, filters_json=payload.filters)) ] return ok(results) @router.post("/settings/detail", response_model=ApiResponse[KnowledgeSettingsDto]) def detail_settings_contract( payload: KnowledgeBaseDetailRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeSettingsDto]: return ok(service.read_settings(knowledge_base_id=payload.knowledgeBaseId)) @router.post("/settings/update", response_model=ApiResponse[KnowledgeSettingsDto]) def update_settings_contract( payload: KnowledgeSettingsUpdateRequestDto, service: KnowledgeApplicationService = Depends(get_knowledge_application_service)) -> ApiResponse[KnowledgeSettingsDto]: return ok(service.update_settings(payload))