| 123456789101112131415161718192021222324252627282930313233343536373839404142 |
- from contextlib import asynccontextmanager
- from core_shared.observability import add_observability
- from core_shared.security import add_internal_service_auth
- from fastapi import FastAPI
- from app.api.routes import router
- from app.bootstrap.settings import TeamServiceSettings
- from app.db.session import build_session_factory
- from app.worker import BackgroundTeamWorker, build_worker_key
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- settings: TeamServiceSettings = app.state.settings
- worker: BackgroundTeamWorker | None = None
- if settings.auto_worker_enabled:
- worker = BackgroundTeamWorker(
- settings=settings,
- session_factory=app.state.session_factory,
- worker_key=f"api-{build_worker_key()}")
- app.state.background_team_worker = worker
- worker.start()
- try:
- yield
- finally:
- if worker is not None:
- worker.stop(timeout_seconds=settings.auto_worker_stop_timeout_seconds)
- def create_app() -> FastAPI:
- settings = TeamServiceSettings()
- app = FastAPI(
- title="agent-platform team-service",
- version="0.1.0",
- lifespan=lifespan)
- app.state.settings = settings
- app.state.session_factory = build_session_factory(settings)
- add_observability(app, settings.service_name)
- add_internal_service_auth(app, settings)
- app.include_router(router, prefix="/teams", tags=["teams"])
- return app
|