| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432 |
- import json
- from datetime import datetime
- from typing import TypeVar
- from core_domain import ServiceHealth
- from core_shared import error_detail, try_build_redis_client
- from fastapi import APIRouter, Depends, HTTPException, Query
- from fastapi.responses import StreamingResponse
- from sqlalchemy import text
- from sqlalchemy.orm import Session
- from app.application.services import TeamApplicationService, build_team_application_service
- from app.bootstrap.settings import TeamServiceSettings
- from app.db.session import get_db
- from app.domain.repositories import (
- TeamDefinitionRepository,
- TeamRunRepository,
- TeamConfigRepository,
- )
- from app.schemas.team import (
- ApiResponse,
- DeleteData,
- PageResult,
- TeamConfigCreateRequestDto,
- TeamConfigDeleteRequestDto,
- TeamConfigDetailRequestDto,
- TeamConfigDto,
- TeamConfigListRequestDto,
- TeamConfigUpdateRequestDto,
- TeamCreateRequest,
- TeamCreateRequestDto,
- TeamDeleteRequestDto,
- TeamDetailRequestDto,
- TeamDto,
- TeamListRequestDto,
- TeamResponse,
- TeamRunCreateRequest,
- TeamRunCreateRequestDto,
- TeamRunDeleteRequestDto,
- TeamRunDetailRequestDto,
- TeamRunDto,
- TeamRunExecuteRequest,
- TeamRunExecuteData,
- TeamRunExecuteRequestDto,
- TeamRunExecuteResponse,
- TeamRunListRequestDto,
- TeamRunResponse,
- TeamRunStatusUpdateRequest,
- TeamRunStatusUpdateRequestDto,
- TeamStatusUpdateRequest,
- TeamUpdateRequestDto,
- TeamWorkerExecuteNextRequest,
- TeamWorkerExecuteNextResponse,
- )
- router = APIRouter()
- T = TypeVar("T")
- def ok(data: T) -> ApiResponse[T]:
- return ApiResponse[T](
- data=data,
- requestId="",
- serverTime=datetime.utcnow())
- def json_dump(payload: dict[str, object]) -> str:
- return json.dumps(payload, ensure_ascii=False, default=str)
- def get_team_settings() -> TeamServiceSettings:
- return TeamServiceSettings()
- def get_team_application_service(
- db: Session = Depends(get_db),
- settings: TeamServiceSettings = Depends(get_team_settings)) -> TeamApplicationService:
- return build_team_application_service(
- team_repository=TeamDefinitionRepository(db),
- team_config_repository=TeamConfigRepository(db),
- team_run_repository=TeamRunRepository(db),
- settings=settings)
- @router.get("/health", response_model=ServiceHealth)
- def health_check(db: Session = Depends(get_db)) -> ServiceHealth:
- db.execute(text("SELECT 1"))
- return ServiceHealth(service="team-service", status="ok", database="ok")
- @router.post("", response_model=TeamResponse)
- def create_team(
- payload: TeamCreateRequest,
- service: TeamApplicationService = Depends(get_team_application_service)) -> TeamResponse:
- entity = service.create_team(payload)
- return TeamResponse.from_entity(entity)
- @router.get("", response_model=list[TeamResponse])
- def list_teams(
- service: TeamApplicationService = Depends(get_team_application_service)) -> list[TeamResponse]:
- return [TeamResponse.from_entity(item) for item in service.list_teams()]
- @router.post("/list", response_model=ApiResponse[PageResult[TeamDto]])
- def list_teams_contract(
- payload: TeamListRequestDto,
- service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[PageResult[TeamDto]]:
- keyword = (payload.keyword or "").lower().strip()
- items = [
- item
- for item in service.list_teams()
- if (payload.status is None or item.status == payload.status)
- and (
- not keyword
- or keyword in item.name.lower()
- or keyword in item.team_type.lower()
- or keyword in (item.description or "").lower()
- )
- ]
- page_items = items[payload.offset:payload.offset + payload.pageSize]
- return ok(PageResult[TeamDto].from_items(
- items=[TeamDto.from_entity(item) for item in page_items],
- total=len(items),
- page=payload.page,
- page_size=payload.pageSize))
- @router.post("/create", response_model=ApiResponse[TeamDto])
- def create_team_contract(
- payload: TeamCreateRequestDto,
- service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamDto]:
- return ok(TeamDto.from_entity(service.create_team_from_contract(payload)))
- @router.post("/detail", response_model=ApiResponse[TeamDto])
- def get_team_contract(
- payload: TeamDetailRequestDto,
- service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamDto]:
- entity = service.get_team(team_id=payload.teamId)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.team.not_found", id=payload.teamId))
- return ok(TeamDto.from_entity(entity))
- @router.post("/update", response_model=ApiResponse[TeamDto])
- def update_team_contract(
- payload: TeamUpdateRequestDto,
- service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamDto]:
- entity = service.update_team_from_contract(payload)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.team.not_found", id=payload.teamId))
- return ok(TeamDto.from_entity(entity))
- @router.post("/delete", response_model=ApiResponse[DeleteData])
- def delete_team_contract(
- payload: TeamDeleteRequestDto,
- service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[DeleteData]:
- deleted = service.delete_team_from_contract(payload)
- return ok(DeleteData(deleted=deleted, teamId=payload.teamId))
- @router.patch("/{team_id}/status", response_model=TeamResponse)
- def update_team_status(
- team_id: str,
- payload: TeamStatusUpdateRequest,
- service: TeamApplicationService = Depends(get_team_application_service)) -> TeamResponse:
- entity = service.update_team_status(team_id=team_id, payload=payload)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.team.not_found", id=team_id))
- return TeamResponse.from_entity(entity)
- @router.post("/configs/list", response_model=ApiResponse[PageResult[TeamConfigDto]])
- def list_team_configs_contract(
- payload: TeamConfigListRequestDto,
- service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[PageResult[TeamConfigDto]]:
- items = service.list_team_configs(team_id=payload.teamId)
- page_items = items[payload.offset:payload.offset + payload.pageSize]
- return ok(PageResult[TeamConfigDto].from_items(
- items=[TeamConfigDto.from_entity(item) for item in page_items],
- total=len(items),
- page=payload.page,
- page_size=payload.pageSize))
- @router.post("/configs/create", response_model=ApiResponse[TeamConfigDto])
- def create_team_config_contract(
- payload: TeamConfigCreateRequestDto,
- service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamConfigDto]:
- try:
- entity = service.create_team_config_from_contract(payload)
- except ValueError as exc:
- raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc
- return ok(TeamConfigDto.from_entity(entity))
- @router.post("/configs/detail", response_model=ApiResponse[TeamConfigDto])
- def get_team_config_contract(
- payload: TeamConfigDetailRequestDto,
- service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamConfigDto]:
- entity = service.get_team_config(config_id=payload.configId)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.team_config.not_found", id=payload.configId))
- return ok(TeamConfigDto.from_entity(entity))
- @router.post("/configs/update", response_model=ApiResponse[TeamConfigDto])
- def update_team_config_contract(
- payload: TeamConfigUpdateRequestDto,
- service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamConfigDto]:
- try:
- entity = service.update_team_config_from_contract(payload)
- except ValueError as exc:
- raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.team_config.not_found", id=payload.configId))
- return ok(TeamConfigDto.from_entity(entity))
- @router.post("/configs/delete", response_model=ApiResponse[DeleteData])
- def delete_team_config_contract(
- payload: TeamConfigDeleteRequestDto,
- service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[DeleteData]:
- deleted = service.delete_team_config(config_id=payload.configId)
- return ok(DeleteData(deleted=deleted, configId=payload.configId))
- @router.post("/runs", response_model=TeamRunResponse)
- def create_team_run(
- payload: TeamRunCreateRequest,
- service: TeamApplicationService = Depends(get_team_application_service)) -> TeamRunResponse:
- try:
- entity = service.create_team_run(payload)
- except ValueError as exc:
- raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc
- return TeamRunResponse.from_entity(entity)
- @router.get("/runs", response_model=list[TeamRunResponse])
- def list_team_runs(
- team_id: str | None = Query(default=None),
- session_id: str | None = Query(default=None),
- service: TeamApplicationService = Depends(get_team_application_service)) -> list[TeamRunResponse]:
- return [
- TeamRunResponse.from_entity(item)
- for item in service.list_team_runs(
- team_id=team_id,
- session_id=session_id)
- ]
- @router.post("/runs/list", response_model=ApiResponse[PageResult[TeamRunDto]])
- def list_team_runs_contract(
- payload: TeamRunListRequestDto,
- service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[PageResult[TeamRunDto]]:
- items = [
- item
- for item in service.list_team_runs(team_id=payload.teamId, session_id=payload.sessionId)
- if payload.status is None or item.status == payload.status
- ]
- page_items = items[payload.offset:payload.offset + payload.pageSize]
- return ok(PageResult[TeamRunDto].from_items(
- items=[TeamRunDto.from_entity(item) for item in page_items],
- total=len(items),
- page=payload.page,
- page_size=payload.pageSize))
- @router.post("/runs/create", response_model=ApiResponse[TeamRunDto])
- def create_team_run_contract(
- payload: TeamRunCreateRequestDto,
- service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamRunDto]:
- try:
- entity = service.create_team_run_from_contract(payload)
- except ValueError as exc:
- raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc
- return ok(TeamRunDto.from_entity(entity))
- @router.post("/runs/detail", response_model=ApiResponse[TeamRunDto])
- def get_team_run_contract(
- payload: TeamRunDetailRequestDto,
- service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamRunDto]:
- entity = service.get_team_run(team_run_id=payload.teamRunId)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=payload.teamRunId))
- return ok(TeamRunDto.from_entity(entity))
- @router.post("/runs/status", response_model=ApiResponse[TeamRunDto])
- def update_team_run_status_contract(
- payload: TeamRunStatusUpdateRequestDto,
- service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamRunDto]:
- entity = service.update_team_run_status_from_contract(payload)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=payload.teamRunId))
- return ok(TeamRunDto.from_entity(entity))
- @router.post("/runs/execute", response_model=ApiResponse[TeamRunExecuteData])
- def execute_team_run_contract(
- payload: TeamRunExecuteRequestDto,
- service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamRunExecuteData]:
- entity = service.execute_team_run(
- team_run_id=payload.teamRunId,
- payload=TeamRunExecuteRequest(
- worker_key=payload.workerKey,
- dry_run=payload.dryRun))
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=payload.teamRunId))
- output_json = entity.output_json or {}
- member_run_count = output_json.get("member_run_count")
- dry_run = output_json.get("dry_run")
- return ok(TeamRunExecuteData(
- run=TeamRunDto.from_entity(entity),
- memberRunCount=member_run_count if isinstance(member_run_count, int) else 0,
- dryRun=dry_run if isinstance(dry_run, bool) else payload.dryRun))
- @router.post("/runs/execute-stream")
- def execute_team_run_stream_contract(
- payload: TeamRunExecuteRequestDto,
- service: TeamApplicationService = Depends(get_team_application_service)) -> StreamingResponse:
- if service.get_team_run(team_run_id=payload.teamRunId) is None:
- raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=payload.teamRunId))
- def events():
- for item in service.execute_team_run_stream(
- team_run_id=payload.teamRunId,
- payload=TeamRunExecuteRequest(
- worker_key=payload.workerKey,
- dry_run=payload.dryRun)):
- event = item.get("event")
- event_name = event if isinstance(event, str) else "message"
- data = {key: value for key, value in item.items() if key != "event"}
- yield f"event: {event_name}\ndata: {json_dump(data)}\n\n"
- return StreamingResponse(
- events(),
- media_type="text/event-stream",
- headers=_sse_headers())
- @router.post("/runs/delete", response_model=ApiResponse[DeleteData])
- def delete_team_run_contract(
- payload: TeamRunDeleteRequestDto,
- service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[DeleteData]:
- deleted = service.delete_team_run(team_run_id=payload.teamRunId)
- return ok(DeleteData(deleted=deleted, teamRunId=payload.teamRunId))
- @router.post("/runs/{team_run_id}/status", response_model=TeamRunResponse)
- def update_team_run_status(
- team_run_id: str,
- payload: TeamRunStatusUpdateRequest,
- service: TeamApplicationService = Depends(get_team_application_service)) -> TeamRunResponse:
- entity = service.update_team_run_status(team_run_id=team_run_id, payload=payload)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=team_run_id))
- return TeamRunResponse.from_entity(entity)
- @router.post("/runs/{team_run_id}/execute", response_model=TeamRunExecuteResponse)
- def execute_team_run(
- team_run_id: str,
- payload: TeamRunExecuteRequest,
- service: TeamApplicationService = Depends(get_team_application_service)) -> TeamRunExecuteResponse:
- entity = service.execute_team_run(team_run_id=team_run_id, payload=payload)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=team_run_id))
- output_json = entity.output_json or {}
- member_run_count = output_json.get("member_run_count")
- dry_run = output_json.get("dry_run")
- return TeamRunExecuteResponse(
- run=TeamRunResponse.from_entity(entity),
- member_run_count=member_run_count if isinstance(member_run_count, int) else 0,
- dry_run=dry_run if isinstance(dry_run, bool) else payload.dry_run)
- @router.post("/runs/{team_run_id}/execute-stream")
- def execute_team_run_stream(
- team_run_id: str,
- payload: TeamRunExecuteRequest,
- service: TeamApplicationService = Depends(get_team_application_service)) -> StreamingResponse:
- if service.get_team_run(team_run_id=team_run_id) is None:
- raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=team_run_id))
- def events():
- for item in service.execute_team_run_stream(team_run_id=team_run_id, payload=payload):
- event = item.get("event")
- event_name = event if isinstance(event, str) else "message"
- data = {key: value for key, value in item.items() if key != "event"}
- yield f"event: {event_name}\ndata: {json_dump(data)}\n\n"
- return StreamingResponse(
- events(),
- media_type="text/event-stream",
- headers=_sse_headers())
- def _sse_headers() -> dict[str, str]:
- return {
- "Cache-Control": "no-cache",
- "X-Accel-Buffering": "no",
- }
- @router.post("/workers/execute-next", response_model=TeamWorkerExecuteNextResponse)
- def execute_next_worker_task(
- payload: TeamWorkerExecuteNextRequest,
- settings: TeamServiceSettings = Depends(get_team_settings),
- service: TeamApplicationService = Depends(get_team_application_service)) -> TeamWorkerExecuteNextResponse:
- result = service.execute_next_claimed_team_run(
- worker_key=payload.worker_key,
- lease_seconds=payload.lease_seconds or settings.worker_lease_seconds,
- stale_running_seconds=settings.worker_stale_running_seconds,
- dry_run=payload.dry_run if payload.dry_run is not None else settings.worker_dry_run,
- redis_client=try_build_redis_client(settings.redis_url))
- if result is None:
- raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found_queued"))
- entity, released_lease_count = result
- output_json = entity.output_json or {}
- member_run_count = output_json.get("member_run_count")
- dry_run = output_json.get("dry_run")
- return TeamWorkerExecuteNextResponse(
- run=TeamRunResponse.from_entity(entity),
- member_run_count=member_run_count if isinstance(member_run_count, int) else 0,
- dry_run=dry_run if isinstance(dry_run, bool) else settings.worker_dry_run,
- released_lease_count=released_lease_count)
|