from contextlib import asynccontextmanager from core_shared.observability import add_observability from core_shared.security import add_internal_service_auth from fastapi import FastAPI from app.api.routes import router from app.bootstrap.settings import KnowledgeServiceSettings from app.db.session import build_session_factory from app.worker import BackgroundKnowledgeWorker, build_worker_key @asynccontextmanager async def lifespan(app: FastAPI): settings: KnowledgeServiceSettings = app.state.settings worker: BackgroundKnowledgeWorker | None = None if settings.auto_worker_enabled: worker = BackgroundKnowledgeWorker( settings=settings, session_factory=app.state.session_factory, worker_key=f"api-{build_worker_key()}") app.state.background_knowledge_worker = worker worker.start() try: yield finally: if worker is not None: worker.stop(timeout_seconds=settings.auto_worker_stop_timeout_seconds) def create_app() -> FastAPI: settings = KnowledgeServiceSettings() app = FastAPI( title="agent-platform knowledge-service", version="0.1.0", lifespan=lifespan) app.state.settings = settings app.state.session_factory = build_session_factory(settings) add_observability(app, settings.service_name) add_internal_service_auth(app, settings) app.include_router(router, prefix="/knowledge", tags=["knowledge"]) return app