import json from datetime import datetime from typing import TypeVar from core_domain import ServiceHealth from core_shared import error_detail, try_build_redis_client from fastapi import APIRouter, Depends, HTTPException, Query from fastapi.responses import StreamingResponse from sqlalchemy import text from sqlalchemy.orm import Session from app.application.services import TeamApplicationService, build_team_application_service from app.bootstrap.settings import TeamServiceSettings from app.db.session import get_db from app.domain.repositories import ( TeamDefinitionRepository, TeamRunRepository, TeamConfigRepository, ) from app.schemas.team import ( ApiResponse, DeleteData, PageResult, TeamConfigCreateRequestDto, TeamConfigDeleteRequestDto, TeamConfigDetailRequestDto, TeamConfigDto, TeamConfigListRequestDto, TeamConfigUpdateRequestDto, TeamCreateRequest, TeamCreateRequestDto, TeamDeleteRequestDto, TeamDetailRequestDto, TeamDto, TeamListRequestDto, TeamResponse, TeamRunCreateRequest, TeamRunCreateRequestDto, TeamRunDeleteRequestDto, TeamRunDetailRequestDto, TeamRunDto, TeamRunExecuteRequest, TeamRunExecuteData, TeamRunExecuteRequestDto, TeamRunExecuteResponse, TeamRunListRequestDto, TeamRunResponse, TeamRunStatusUpdateRequest, TeamRunStatusUpdateRequestDto, TeamStatusUpdateRequest, TeamUpdateRequestDto, TeamWorkerExecuteNextRequest, TeamWorkerExecuteNextResponse, ) router = APIRouter() T = TypeVar("T") def ok(data: T) -> ApiResponse[T]: return ApiResponse[T]( data=data, requestId="", serverTime=datetime.utcnow()) def json_dump(payload: dict[str, object]) -> str: return json.dumps(payload, ensure_ascii=False, default=str) def get_team_settings() -> TeamServiceSettings: return TeamServiceSettings() def get_team_application_service( db: Session = Depends(get_db), settings: TeamServiceSettings = Depends(get_team_settings)) -> TeamApplicationService: return build_team_application_service( team_repository=TeamDefinitionRepository(db), team_config_repository=TeamConfigRepository(db), team_run_repository=TeamRunRepository(db), settings=settings) @router.get("/health", response_model=ServiceHealth) def health_check(db: Session = Depends(get_db)) -> ServiceHealth: db.execute(text("SELECT 1")) return ServiceHealth(service="team-service", status="ok", database="ok") @router.post("", response_model=TeamResponse) def create_team( payload: TeamCreateRequest, service: TeamApplicationService = Depends(get_team_application_service)) -> TeamResponse: entity = service.create_team(payload) return TeamResponse.from_entity(entity) @router.get("", response_model=list[TeamResponse]) def list_teams( service: TeamApplicationService = Depends(get_team_application_service)) -> list[TeamResponse]: return [TeamResponse.from_entity(item) for item in service.list_teams()] @router.post("/list", response_model=ApiResponse[PageResult[TeamDto]]) def list_teams_contract( payload: TeamListRequestDto, service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[PageResult[TeamDto]]: keyword = (payload.keyword or "").lower().strip() items = [ item for item in service.list_teams() if (payload.status is None or item.status == payload.status) and ( not keyword or keyword in item.name.lower() or keyword in item.team_type.lower() or keyword in (item.description or "").lower() ) ] page_items = items[payload.offset:payload.offset + payload.pageSize] return ok(PageResult[TeamDto].from_items( items=[TeamDto.from_entity(item) for item in page_items], total=len(items), page=payload.page, page_size=payload.pageSize)) @router.post("/create", response_model=ApiResponse[TeamDto]) def create_team_contract( payload: TeamCreateRequestDto, service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamDto]: return ok(TeamDto.from_entity(service.create_team_from_contract(payload))) @router.post("/detail", response_model=ApiResponse[TeamDto]) def get_team_contract( payload: TeamDetailRequestDto, service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamDto]: entity = service.get_team(team_id=payload.teamId) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.team.not_found", id=payload.teamId)) return ok(TeamDto.from_entity(entity)) @router.post("/update", response_model=ApiResponse[TeamDto]) def update_team_contract( payload: TeamUpdateRequestDto, service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamDto]: entity = service.update_team_from_contract(payload) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.team.not_found", id=payload.teamId)) return ok(TeamDto.from_entity(entity)) @router.post("/delete", response_model=ApiResponse[DeleteData]) def delete_team_contract( payload: TeamDeleteRequestDto, service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[DeleteData]: deleted = service.delete_team_from_contract(payload) return ok(DeleteData(deleted=deleted, teamId=payload.teamId)) @router.patch("/{team_id}/status", response_model=TeamResponse) def update_team_status( team_id: str, payload: TeamStatusUpdateRequest, service: TeamApplicationService = Depends(get_team_application_service)) -> TeamResponse: entity = service.update_team_status(team_id=team_id, payload=payload) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.team.not_found", id=team_id)) return TeamResponse.from_entity(entity) @router.post("/configs/list", response_model=ApiResponse[PageResult[TeamConfigDto]]) def list_team_configs_contract( payload: TeamConfigListRequestDto, service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[PageResult[TeamConfigDto]]: items = service.list_team_configs(team_id=payload.teamId) page_items = items[payload.offset:payload.offset + payload.pageSize] return ok(PageResult[TeamConfigDto].from_items( items=[TeamConfigDto.from_entity(item) for item in page_items], total=len(items), page=payload.page, page_size=payload.pageSize)) @router.post("/configs/create", response_model=ApiResponse[TeamConfigDto]) def create_team_config_contract( payload: TeamConfigCreateRequestDto, service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamConfigDto]: try: entity = service.create_team_config_from_contract(payload) except ValueError as exc: raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc return ok(TeamConfigDto.from_entity(entity)) @router.post("/configs/detail", response_model=ApiResponse[TeamConfigDto]) def get_team_config_contract( payload: TeamConfigDetailRequestDto, service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamConfigDto]: entity = service.get_team_config(config_id=payload.configId) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.team_config.not_found", id=payload.configId)) return ok(TeamConfigDto.from_entity(entity)) @router.post("/configs/update", response_model=ApiResponse[TeamConfigDto]) def update_team_config_contract( payload: TeamConfigUpdateRequestDto, service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamConfigDto]: try: entity = service.update_team_config_from_contract(payload) except ValueError as exc: raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.team_config.not_found", id=payload.configId)) return ok(TeamConfigDto.from_entity(entity)) @router.post("/configs/delete", response_model=ApiResponse[DeleteData]) def delete_team_config_contract( payload: TeamConfigDeleteRequestDto, service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[DeleteData]: deleted = service.delete_team_config(config_id=payload.configId) return ok(DeleteData(deleted=deleted, configId=payload.configId)) @router.post("/runs", response_model=TeamRunResponse) def create_team_run( payload: TeamRunCreateRequest, service: TeamApplicationService = Depends(get_team_application_service)) -> TeamRunResponse: try: entity = service.create_team_run(payload) except ValueError as exc: raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc return TeamRunResponse.from_entity(entity) @router.get("/runs", response_model=list[TeamRunResponse]) def list_team_runs( team_id: str | None = Query(default=None), session_id: str | None = Query(default=None), service: TeamApplicationService = Depends(get_team_application_service)) -> list[TeamRunResponse]: return [ TeamRunResponse.from_entity(item) for item in service.list_team_runs( team_id=team_id, session_id=session_id) ] @router.post("/runs/list", response_model=ApiResponse[PageResult[TeamRunDto]]) def list_team_runs_contract( payload: TeamRunListRequestDto, service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[PageResult[TeamRunDto]]: items = [ item for item in service.list_team_runs(team_id=payload.teamId, session_id=payload.sessionId) if payload.status is None or item.status == payload.status ] page_items = items[payload.offset:payload.offset + payload.pageSize] return ok(PageResult[TeamRunDto].from_items( items=[TeamRunDto.from_entity(item) for item in page_items], total=len(items), page=payload.page, page_size=payload.pageSize)) @router.post("/runs/create", response_model=ApiResponse[TeamRunDto]) def create_team_run_contract( payload: TeamRunCreateRequestDto, service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamRunDto]: try: entity = service.create_team_run_from_contract(payload) except ValueError as exc: raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc return ok(TeamRunDto.from_entity(entity)) @router.post("/runs/detail", response_model=ApiResponse[TeamRunDto]) def get_team_run_contract( payload: TeamRunDetailRequestDto, service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamRunDto]: entity = service.get_team_run(team_run_id=payload.teamRunId) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=payload.teamRunId)) return ok(TeamRunDto.from_entity(entity)) @router.post("/runs/status", response_model=ApiResponse[TeamRunDto]) def update_team_run_status_contract( payload: TeamRunStatusUpdateRequestDto, service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamRunDto]: entity = service.update_team_run_status_from_contract(payload) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=payload.teamRunId)) return ok(TeamRunDto.from_entity(entity)) @router.post("/runs/execute", response_model=ApiResponse[TeamRunExecuteData]) def execute_team_run_contract( payload: TeamRunExecuteRequestDto, service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamRunExecuteData]: entity = service.execute_team_run( team_run_id=payload.teamRunId, payload=TeamRunExecuteRequest( worker_key=payload.workerKey, dry_run=payload.dryRun)) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=payload.teamRunId)) output_json = entity.output_json or {} member_run_count = output_json.get("member_run_count") dry_run = output_json.get("dry_run") return ok(TeamRunExecuteData( run=TeamRunDto.from_entity(entity), memberRunCount=member_run_count if isinstance(member_run_count, int) else 0, dryRun=dry_run if isinstance(dry_run, bool) else payload.dryRun)) @router.post("/runs/execute-stream") def execute_team_run_stream_contract( payload: TeamRunExecuteRequestDto, service: TeamApplicationService = Depends(get_team_application_service)) -> StreamingResponse: if service.get_team_run(team_run_id=payload.teamRunId) is None: raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=payload.teamRunId)) def events(): for item in service.execute_team_run_stream( team_run_id=payload.teamRunId, payload=TeamRunExecuteRequest( worker_key=payload.workerKey, dry_run=payload.dryRun)): event = item.get("event") event_name = event if isinstance(event, str) else "message" data = {key: value for key, value in item.items() if key != "event"} yield f"event: {event_name}\ndata: {json_dump(data)}\n\n" return StreamingResponse( events(), media_type="text/event-stream", headers=_sse_headers()) @router.post("/runs/delete", response_model=ApiResponse[DeleteData]) def delete_team_run_contract( payload: TeamRunDeleteRequestDto, service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[DeleteData]: deleted = service.delete_team_run(team_run_id=payload.teamRunId) return ok(DeleteData(deleted=deleted, teamRunId=payload.teamRunId)) @router.post("/runs/{team_run_id}/status", response_model=TeamRunResponse) def update_team_run_status( team_run_id: str, payload: TeamRunStatusUpdateRequest, service: TeamApplicationService = Depends(get_team_application_service)) -> TeamRunResponse: entity = service.update_team_run_status(team_run_id=team_run_id, payload=payload) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=team_run_id)) return TeamRunResponse.from_entity(entity) @router.post("/runs/{team_run_id}/execute", response_model=TeamRunExecuteResponse) def execute_team_run( team_run_id: str, payload: TeamRunExecuteRequest, service: TeamApplicationService = Depends(get_team_application_service)) -> TeamRunExecuteResponse: entity = service.execute_team_run(team_run_id=team_run_id, payload=payload) if entity is None: raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=team_run_id)) output_json = entity.output_json or {} member_run_count = output_json.get("member_run_count") dry_run = output_json.get("dry_run") return TeamRunExecuteResponse( run=TeamRunResponse.from_entity(entity), member_run_count=member_run_count if isinstance(member_run_count, int) else 0, dry_run=dry_run if isinstance(dry_run, bool) else payload.dry_run) @router.post("/runs/{team_run_id}/execute-stream") def execute_team_run_stream( team_run_id: str, payload: TeamRunExecuteRequest, service: TeamApplicationService = Depends(get_team_application_service)) -> StreamingResponse: if service.get_team_run(team_run_id=team_run_id) is None: raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=team_run_id)) def events(): for item in service.execute_team_run_stream(team_run_id=team_run_id, payload=payload): event = item.get("event") event_name = event if isinstance(event, str) else "message" data = {key: value for key, value in item.items() if key != "event"} yield f"event: {event_name}\ndata: {json_dump(data)}\n\n" return StreamingResponse( events(), media_type="text/event-stream", headers=_sse_headers()) def _sse_headers() -> dict[str, str]: return { "Cache-Control": "no-cache", "X-Accel-Buffering": "no", } @router.post("/workers/execute-next", response_model=TeamWorkerExecuteNextResponse) def execute_next_worker_task( payload: TeamWorkerExecuteNextRequest, settings: TeamServiceSettings = Depends(get_team_settings), service: TeamApplicationService = Depends(get_team_application_service)) -> TeamWorkerExecuteNextResponse: result = service.execute_next_claimed_team_run( worker_key=payload.worker_key, lease_seconds=payload.lease_seconds or settings.worker_lease_seconds, stale_running_seconds=settings.worker_stale_running_seconds, dry_run=payload.dry_run if payload.dry_run is not None else settings.worker_dry_run, redis_client=try_build_redis_client(settings.redis_url)) if result is None: raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found_queued")) entity, released_lease_count = result output_json = entity.output_json or {} member_run_count = output_json.get("member_run_count") dry_run = output_json.get("dry_run") return TeamWorkerExecuteNextResponse( run=TeamRunResponse.from_entity(entity), member_run_count=member_run_count if isinstance(member_run_count, int) else 0, dry_run=dry_run if isinstance(dry_run, bool) else settings.worker_dry_run, released_lease_count=released_lease_count)