| 123456789101112131415161718192021222324252627282930313233343536373839404142 |
- from contextlib import asynccontextmanager
- from core_shared.observability import add_observability
- from core_shared.security import add_internal_service_auth
- from fastapi import FastAPI
- from app.api.routes import router
- from app.bootstrap.settings import ToolServiceSettings
- from app.db.session import build_session_factory
- from app.worker import BackgroundToolWorker, build_worker_key
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- settings: ToolServiceSettings = app.state.settings
- worker: BackgroundToolWorker | None = None
- if settings.auto_worker_enabled:
- worker = BackgroundToolWorker(
- settings=settings,
- session_factory=app.state.session_factory,
- worker_key=f"api-{build_worker_key()}")
- app.state.background_tool_worker = worker
- worker.start()
- try:
- yield
- finally:
- if worker is not None:
- worker.stop(timeout_seconds=settings.auto_worker_stop_timeout_seconds)
- def create_app() -> FastAPI:
- settings = ToolServiceSettings()
- app = FastAPI(
- title="agent-platform tool-service",
- version="0.1.0",
- lifespan=lifespan)
- app.state.settings = settings
- app.state.session_factory = build_session_factory(settings)
- add_observability(app, settings.service_name)
- add_internal_service_auth(app, settings)
- app.include_router(router, prefix="/tools", tags=["tools"])
- return app
|