| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- from fastapi import APIRouter, Depends, HTTPException, Query
- from sqlalchemy import text
- from sqlalchemy.orm import Session
- from core_domain import ServiceHealth
- from app.application.services import TeamApplicationService
- from app.db.session import get_db
- from app.domain.repositories import (
- TeamDefinitionRepository,
- TeamRunRepository,
- TeamVersionRepository,
- )
- from app.schemas.team import (
- TeamCreateRequest,
- TeamResponse,
- TeamRunCreateRequest,
- TeamRunResponse,
- TeamRunStatusUpdateRequest,
- TeamStatusUpdateRequest,
- TeamVersionCreateRequest,
- TeamVersionResponse,
- )
- router = APIRouter()
- def get_team_application_service(db: Session = Depends(get_db)) -> TeamApplicationService:
- return TeamApplicationService(
- team_repository=TeamDefinitionRepository(db),
- team_version_repository=TeamVersionRepository(db),
- team_run_repository=TeamRunRepository(db),
- )
- @router.get("/health", response_model=ServiceHealth)
- def health_check(db: Session = Depends(get_db)) -> ServiceHealth:
- db.execute(text("SELECT 1"))
- return ServiceHealth(service="team-service", status="ok", database="ok")
- @router.post("", response_model=TeamResponse)
- def create_team(
- payload: TeamCreateRequest,
- service: TeamApplicationService = Depends(get_team_application_service),
- ) -> TeamResponse:
- entity = service.create_team(payload)
- return TeamResponse.from_entity(entity)
- @router.get("", response_model=list[TeamResponse])
- def list_teams(
- tenant_id: str = Query(...),
- service: TeamApplicationService = Depends(get_team_application_service),
- ) -> list[TeamResponse]:
- return [TeamResponse.from_entity(item) for item in service.list_teams(tenant_id=tenant_id)]
- @router.patch("/{team_id}/status", response_model=TeamResponse)
- def update_team_status(
- team_id: str,
- payload: TeamStatusUpdateRequest,
- service: TeamApplicationService = Depends(get_team_application_service),
- ) -> TeamResponse:
- entity = service.update_team_status(team_id=team_id, payload=payload)
- if entity is None:
- raise HTTPException(status_code=404, detail=f"team not found: {team_id}")
- return TeamResponse.from_entity(entity)
- @router.post("/versions", response_model=TeamVersionResponse)
- def create_team_version(
- payload: TeamVersionCreateRequest,
- service: TeamApplicationService = Depends(get_team_application_service),
- ) -> TeamVersionResponse:
- try:
- entity = service.create_team_version(payload)
- except ValueError as exc:
- raise HTTPException(status_code=422, detail=str(exc)) from exc
- return TeamVersionResponse.from_entity(entity)
- @router.get("/versions", response_model=list[TeamVersionResponse])
- def list_team_versions(
- tenant_id: str = Query(...),
- team_id: str = Query(...),
- service: TeamApplicationService = Depends(get_team_application_service),
- ) -> list[TeamVersionResponse]:
- return [
- TeamVersionResponse.from_entity(item)
- for item in service.list_team_versions(tenant_id=tenant_id, team_id=team_id)
- ]
- @router.post("/runs", response_model=TeamRunResponse)
- def create_team_run(
- payload: TeamRunCreateRequest,
- service: TeamApplicationService = Depends(get_team_application_service),
- ) -> TeamRunResponse:
- try:
- entity = service.create_team_run(payload)
- except ValueError as exc:
- raise HTTPException(status_code=422, detail=str(exc)) from exc
- return TeamRunResponse.from_entity(entity)
- @router.get("/runs", response_model=list[TeamRunResponse])
- def list_team_runs(
- tenant_id: str = Query(...),
- team_id: str | None = Query(default=None),
- session_id: str | None = Query(default=None),
- service: TeamApplicationService = Depends(get_team_application_service),
- ) -> list[TeamRunResponse]:
- return [
- TeamRunResponse.from_entity(item)
- for item in service.list_team_runs(
- tenant_id=tenant_id,
- team_id=team_id,
- session_id=session_id,
- )
- ]
- @router.post("/runs/{team_run_id}/status", response_model=TeamRunResponse)
- def update_team_run_status(
- team_run_id: str,
- payload: TeamRunStatusUpdateRequest,
- service: TeamApplicationService = Depends(get_team_application_service),
- ) -> TeamRunResponse:
- entity = service.update_team_run_status(team_run_id=team_run_id, payload=payload)
- if entity is None:
- raise HTTPException(status_code=404, detail=f"team_run not found: {team_run_id}")
- return TeamRunResponse.from_entity(entity)
|