app.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. from contextlib import asynccontextmanager
  2. from core_shared.observability import add_observability
  3. from core_shared.security import add_internal_service_auth
  4. from fastapi import FastAPI
  5. from app.api.routes import router
  6. from app.bootstrap.settings import KnowledgeServiceSettings
  7. from app.db.session import build_session_factory
  8. from app.worker import BackgroundKnowledgeWorker, build_worker_key
  9. @asynccontextmanager
  10. async def lifespan(app: FastAPI):
  11. settings: KnowledgeServiceSettings = app.state.settings
  12. worker: BackgroundKnowledgeWorker | None = None
  13. if settings.auto_worker_enabled:
  14. worker = BackgroundKnowledgeWorker(
  15. settings=settings,
  16. session_factory=app.state.session_factory,
  17. worker_key=f"api-{build_worker_key()}")
  18. app.state.background_knowledge_worker = worker
  19. worker.start()
  20. try:
  21. yield
  22. finally:
  23. if worker is not None:
  24. worker.stop(timeout_seconds=settings.auto_worker_stop_timeout_seconds)
  25. def create_app() -> FastAPI:
  26. settings = KnowledgeServiceSettings()
  27. app = FastAPI(
  28. title="agent-platform knowledge-service",
  29. version="0.1.0",
  30. lifespan=lifespan)
  31. app.state.settings = settings
  32. app.state.session_factory = build_session_factory(settings)
  33. add_observability(app, settings.service_name)
  34. add_internal_service_auth(app, settings)
  35. app.include_router(router, prefix="/knowledge", tags=["knowledge"])
  36. return app