| 12345678910111213141516171819202122232425262728 |
- from collections.abc import Generator
- from fastapi import Request
- from sqlalchemy.orm import Session, sessionmaker
- from core_db import DatabaseSettings, create_engine_from_settings, create_session_factory
- from app.bootstrap.settings import HumanServiceSettings
- def build_session_factory(settings: HumanServiceSettings | None = None) -> sessionmaker[Session]:
- resolved_settings = settings or HumanServiceSettings()
- engine = create_engine_from_settings(
- DatabaseSettings(
- database_url=resolved_settings.database_url,
- echo_sql=resolved_settings.echo_sql,
- )
- )
- return create_session_factory(engine)
- def get_db(request: Request) -> Generator[Session, None, None]:
- session_factory: sessionmaker[Session] = request.app.state.session_factory
- session = session_factory()
- try:
- yield session
- finally:
- session.close()
|