routes.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. import json
  2. from datetime import datetime
  3. from typing import TypeVar
  4. from core_domain import ServiceHealth
  5. from core_shared import error_detail, try_build_redis_client
  6. from fastapi import APIRouter, Depends, HTTPException, Query
  7. from fastapi.responses import StreamingResponse
  8. from sqlalchemy import text
  9. from sqlalchemy.orm import Session
  10. from app.application.services import TeamApplicationService, build_team_application_service
  11. from app.bootstrap.settings import TeamServiceSettings
  12. from app.db.session import get_db
  13. from app.domain.repositories import (
  14. TeamDefinitionRepository,
  15. TeamRunRepository,
  16. TeamConfigRepository,
  17. )
  18. from app.schemas.team import (
  19. ApiResponse,
  20. DeleteData,
  21. PageResult,
  22. TeamConfigCreateRequestDto,
  23. TeamConfigDeleteRequestDto,
  24. TeamConfigDetailRequestDto,
  25. TeamConfigDto,
  26. TeamConfigListRequestDto,
  27. TeamConfigUpdateRequestDto,
  28. TeamCreateRequest,
  29. TeamCreateRequestDto,
  30. TeamDeleteRequestDto,
  31. TeamDetailRequestDto,
  32. TeamDto,
  33. TeamListRequestDto,
  34. TeamResponse,
  35. TeamRunCreateRequest,
  36. TeamRunCreateRequestDto,
  37. TeamRunDeleteRequestDto,
  38. TeamRunDetailRequestDto,
  39. TeamRunDto,
  40. TeamRunExecuteRequest,
  41. TeamRunExecuteData,
  42. TeamRunExecuteRequestDto,
  43. TeamRunExecuteResponse,
  44. TeamRunListRequestDto,
  45. TeamRunResponse,
  46. TeamRunStatusUpdateRequest,
  47. TeamRunStatusUpdateRequestDto,
  48. TeamStatusUpdateRequest,
  49. TeamUpdateRequestDto,
  50. TeamWorkerExecuteNextRequest,
  51. TeamWorkerExecuteNextResponse,
  52. )
  53. router = APIRouter()
  54. T = TypeVar("T")
  55. def ok(data: T) -> ApiResponse[T]:
  56. return ApiResponse[T](
  57. data=data,
  58. requestId="",
  59. serverTime=datetime.utcnow())
  60. def json_dump(payload: dict[str, object]) -> str:
  61. return json.dumps(payload, ensure_ascii=False, default=str)
  62. def get_team_settings() -> TeamServiceSettings:
  63. return TeamServiceSettings()
  64. def get_team_application_service(
  65. db: Session = Depends(get_db),
  66. settings: TeamServiceSettings = Depends(get_team_settings)) -> TeamApplicationService:
  67. return build_team_application_service(
  68. team_repository=TeamDefinitionRepository(db),
  69. team_config_repository=TeamConfigRepository(db),
  70. team_run_repository=TeamRunRepository(db),
  71. settings=settings)
  72. @router.get("/health", response_model=ServiceHealth)
  73. def health_check(db: Session = Depends(get_db)) -> ServiceHealth:
  74. db.execute(text("SELECT 1"))
  75. return ServiceHealth(service="team-service", status="ok", database="ok")
  76. @router.post("", response_model=TeamResponse)
  77. def create_team(
  78. payload: TeamCreateRequest,
  79. service: TeamApplicationService = Depends(get_team_application_service)) -> TeamResponse:
  80. entity = service.create_team(payload)
  81. return TeamResponse.from_entity(entity)
  82. @router.get("", response_model=list[TeamResponse])
  83. def list_teams(
  84. service: TeamApplicationService = Depends(get_team_application_service)) -> list[TeamResponse]:
  85. return [TeamResponse.from_entity(item) for item in service.list_teams()]
  86. @router.post("/list", response_model=ApiResponse[PageResult[TeamDto]])
  87. def list_teams_contract(
  88. payload: TeamListRequestDto,
  89. service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[PageResult[TeamDto]]:
  90. keyword = (payload.keyword or "").lower().strip()
  91. items = [
  92. item
  93. for item in service.list_teams()
  94. if (payload.status is None or item.status == payload.status)
  95. and (
  96. not keyword
  97. or keyword in item.name.lower()
  98. or keyword in item.team_type.lower()
  99. or keyword in (item.description or "").lower()
  100. )
  101. ]
  102. page_items = items[payload.offset:payload.offset + payload.pageSize]
  103. return ok(PageResult[TeamDto].from_items(
  104. items=[TeamDto.from_entity(item) for item in page_items],
  105. total=len(items),
  106. page=payload.page,
  107. page_size=payload.pageSize))
  108. @router.post("/create", response_model=ApiResponse[TeamDto])
  109. def create_team_contract(
  110. payload: TeamCreateRequestDto,
  111. service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamDto]:
  112. return ok(TeamDto.from_entity(service.create_team_from_contract(payload)))
  113. @router.post("/detail", response_model=ApiResponse[TeamDto])
  114. def get_team_contract(
  115. payload: TeamDetailRequestDto,
  116. service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamDto]:
  117. entity = service.get_team(team_id=payload.teamId)
  118. if entity is None:
  119. raise HTTPException(status_code=404, detail=error_detail("error.team.not_found", id=payload.teamId))
  120. return ok(TeamDto.from_entity(entity))
  121. @router.post("/update", response_model=ApiResponse[TeamDto])
  122. def update_team_contract(
  123. payload: TeamUpdateRequestDto,
  124. service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamDto]:
  125. entity = service.update_team_from_contract(payload)
  126. if entity is None:
  127. raise HTTPException(status_code=404, detail=error_detail("error.team.not_found", id=payload.teamId))
  128. return ok(TeamDto.from_entity(entity))
  129. @router.post("/delete", response_model=ApiResponse[DeleteData])
  130. def delete_team_contract(
  131. payload: TeamDeleteRequestDto,
  132. service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[DeleteData]:
  133. deleted = service.delete_team_from_contract(payload)
  134. return ok(DeleteData(deleted=deleted, teamId=payload.teamId))
  135. @router.patch("/{team_id}/status", response_model=TeamResponse)
  136. def update_team_status(
  137. team_id: str,
  138. payload: TeamStatusUpdateRequest,
  139. service: TeamApplicationService = Depends(get_team_application_service)) -> TeamResponse:
  140. entity = service.update_team_status(team_id=team_id, payload=payload)
  141. if entity is None:
  142. raise HTTPException(status_code=404, detail=error_detail("error.team.not_found", id=team_id))
  143. return TeamResponse.from_entity(entity)
  144. @router.post("/configs/list", response_model=ApiResponse[PageResult[TeamConfigDto]])
  145. def list_team_configs_contract(
  146. payload: TeamConfigListRequestDto,
  147. service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[PageResult[TeamConfigDto]]:
  148. items = service.list_team_configs(team_id=payload.teamId)
  149. page_items = items[payload.offset:payload.offset + payload.pageSize]
  150. return ok(PageResult[TeamConfigDto].from_items(
  151. items=[TeamConfigDto.from_entity(item) for item in page_items],
  152. total=len(items),
  153. page=payload.page,
  154. page_size=payload.pageSize))
  155. @router.post("/configs/create", response_model=ApiResponse[TeamConfigDto])
  156. def create_team_config_contract(
  157. payload: TeamConfigCreateRequestDto,
  158. service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamConfigDto]:
  159. try:
  160. entity = service.create_team_config_from_contract(payload)
  161. except ValueError as exc:
  162. raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc
  163. return ok(TeamConfigDto.from_entity(entity))
  164. @router.post("/configs/detail", response_model=ApiResponse[TeamConfigDto])
  165. def get_team_config_contract(
  166. payload: TeamConfigDetailRequestDto,
  167. service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamConfigDto]:
  168. entity = service.get_team_config(config_id=payload.configId)
  169. if entity is None:
  170. raise HTTPException(status_code=404, detail=error_detail("error.team_config.not_found", id=payload.configId))
  171. return ok(TeamConfigDto.from_entity(entity))
  172. @router.post("/configs/update", response_model=ApiResponse[TeamConfigDto])
  173. def update_team_config_contract(
  174. payload: TeamConfigUpdateRequestDto,
  175. service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamConfigDto]:
  176. try:
  177. entity = service.update_team_config_from_contract(payload)
  178. except ValueError as exc:
  179. raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc
  180. if entity is None:
  181. raise HTTPException(status_code=404, detail=error_detail("error.team_config.not_found", id=payload.configId))
  182. return ok(TeamConfigDto.from_entity(entity))
  183. @router.post("/configs/delete", response_model=ApiResponse[DeleteData])
  184. def delete_team_config_contract(
  185. payload: TeamConfigDeleteRequestDto,
  186. service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[DeleteData]:
  187. deleted = service.delete_team_config(config_id=payload.configId)
  188. return ok(DeleteData(deleted=deleted, configId=payload.configId))
  189. @router.post("/runs", response_model=TeamRunResponse)
  190. def create_team_run(
  191. payload: TeamRunCreateRequest,
  192. service: TeamApplicationService = Depends(get_team_application_service)) -> TeamRunResponse:
  193. try:
  194. entity = service.create_team_run(payload)
  195. except ValueError as exc:
  196. raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc
  197. return TeamRunResponse.from_entity(entity)
  198. @router.get("/runs", response_model=list[TeamRunResponse])
  199. def list_team_runs(
  200. team_id: str | None = Query(default=None),
  201. session_id: str | None = Query(default=None),
  202. service: TeamApplicationService = Depends(get_team_application_service)) -> list[TeamRunResponse]:
  203. return [
  204. TeamRunResponse.from_entity(item)
  205. for item in service.list_team_runs(
  206. team_id=team_id,
  207. session_id=session_id)
  208. ]
  209. @router.post("/runs/list", response_model=ApiResponse[PageResult[TeamRunDto]])
  210. def list_team_runs_contract(
  211. payload: TeamRunListRequestDto,
  212. service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[PageResult[TeamRunDto]]:
  213. items = [
  214. item
  215. for item in service.list_team_runs(team_id=payload.teamId, session_id=payload.sessionId)
  216. if payload.status is None or item.status == payload.status
  217. ]
  218. page_items = items[payload.offset:payload.offset + payload.pageSize]
  219. return ok(PageResult[TeamRunDto].from_items(
  220. items=[TeamRunDto.from_entity(item) for item in page_items],
  221. total=len(items),
  222. page=payload.page,
  223. page_size=payload.pageSize))
  224. @router.post("/runs/create", response_model=ApiResponse[TeamRunDto])
  225. def create_team_run_contract(
  226. payload: TeamRunCreateRequestDto,
  227. service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamRunDto]:
  228. try:
  229. entity = service.create_team_run_from_contract(payload)
  230. except ValueError as exc:
  231. raise HTTPException(status_code=422, detail=error_detail("error.validation", message=str(exc))) from exc
  232. return ok(TeamRunDto.from_entity(entity))
  233. @router.post("/runs/detail", response_model=ApiResponse[TeamRunDto])
  234. def get_team_run_contract(
  235. payload: TeamRunDetailRequestDto,
  236. service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamRunDto]:
  237. entity = service.get_team_run(team_run_id=payload.teamRunId)
  238. if entity is None:
  239. raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=payload.teamRunId))
  240. return ok(TeamRunDto.from_entity(entity))
  241. @router.post("/runs/status", response_model=ApiResponse[TeamRunDto])
  242. def update_team_run_status_contract(
  243. payload: TeamRunStatusUpdateRequestDto,
  244. service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamRunDto]:
  245. entity = service.update_team_run_status_from_contract(payload)
  246. if entity is None:
  247. raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=payload.teamRunId))
  248. return ok(TeamRunDto.from_entity(entity))
  249. @router.post("/runs/execute", response_model=ApiResponse[TeamRunExecuteData])
  250. def execute_team_run_contract(
  251. payload: TeamRunExecuteRequestDto,
  252. service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[TeamRunExecuteData]:
  253. entity = service.execute_team_run(
  254. team_run_id=payload.teamRunId,
  255. payload=TeamRunExecuteRequest(
  256. worker_key=payload.workerKey,
  257. dry_run=payload.dryRun))
  258. if entity is None:
  259. raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=payload.teamRunId))
  260. output_json = entity.output_json or {}
  261. member_run_count = output_json.get("member_run_count")
  262. dry_run = output_json.get("dry_run")
  263. return ok(TeamRunExecuteData(
  264. run=TeamRunDto.from_entity(entity),
  265. memberRunCount=member_run_count if isinstance(member_run_count, int) else 0,
  266. dryRun=dry_run if isinstance(dry_run, bool) else payload.dryRun))
  267. @router.post("/runs/execute-stream")
  268. def execute_team_run_stream_contract(
  269. payload: TeamRunExecuteRequestDto,
  270. service: TeamApplicationService = Depends(get_team_application_service)) -> StreamingResponse:
  271. if service.get_team_run(team_run_id=payload.teamRunId) is None:
  272. raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=payload.teamRunId))
  273. def events():
  274. for item in service.execute_team_run_stream(
  275. team_run_id=payload.teamRunId,
  276. payload=TeamRunExecuteRequest(
  277. worker_key=payload.workerKey,
  278. dry_run=payload.dryRun)):
  279. event = item.get("event")
  280. event_name = event if isinstance(event, str) else "message"
  281. data = {key: value for key, value in item.items() if key != "event"}
  282. yield f"event: {event_name}\ndata: {json_dump(data)}\n\n"
  283. return StreamingResponse(
  284. events(),
  285. media_type="text/event-stream",
  286. headers=_sse_headers())
  287. @router.post("/runs/delete", response_model=ApiResponse[DeleteData])
  288. def delete_team_run_contract(
  289. payload: TeamRunDeleteRequestDto,
  290. service: TeamApplicationService = Depends(get_team_application_service)) -> ApiResponse[DeleteData]:
  291. deleted = service.delete_team_run(team_run_id=payload.teamRunId)
  292. return ok(DeleteData(deleted=deleted, teamRunId=payload.teamRunId))
  293. @router.post("/runs/{team_run_id}/status", response_model=TeamRunResponse)
  294. def update_team_run_status(
  295. team_run_id: str,
  296. payload: TeamRunStatusUpdateRequest,
  297. service: TeamApplicationService = Depends(get_team_application_service)) -> TeamRunResponse:
  298. entity = service.update_team_run_status(team_run_id=team_run_id, payload=payload)
  299. if entity is None:
  300. raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=team_run_id))
  301. return TeamRunResponse.from_entity(entity)
  302. @router.post("/runs/{team_run_id}/execute", response_model=TeamRunExecuteResponse)
  303. def execute_team_run(
  304. team_run_id: str,
  305. payload: TeamRunExecuteRequest,
  306. service: TeamApplicationService = Depends(get_team_application_service)) -> TeamRunExecuteResponse:
  307. entity = service.execute_team_run(team_run_id=team_run_id, payload=payload)
  308. if entity is None:
  309. raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=team_run_id))
  310. output_json = entity.output_json or {}
  311. member_run_count = output_json.get("member_run_count")
  312. dry_run = output_json.get("dry_run")
  313. return TeamRunExecuteResponse(
  314. run=TeamRunResponse.from_entity(entity),
  315. member_run_count=member_run_count if isinstance(member_run_count, int) else 0,
  316. dry_run=dry_run if isinstance(dry_run, bool) else payload.dry_run)
  317. @router.post("/runs/{team_run_id}/execute-stream")
  318. def execute_team_run_stream(
  319. team_run_id: str,
  320. payload: TeamRunExecuteRequest,
  321. service: TeamApplicationService = Depends(get_team_application_service)) -> StreamingResponse:
  322. if service.get_team_run(team_run_id=team_run_id) is None:
  323. raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found", id=team_run_id))
  324. def events():
  325. for item in service.execute_team_run_stream(team_run_id=team_run_id, payload=payload):
  326. event = item.get("event")
  327. event_name = event if isinstance(event, str) else "message"
  328. data = {key: value for key, value in item.items() if key != "event"}
  329. yield f"event: {event_name}\ndata: {json_dump(data)}\n\n"
  330. return StreamingResponse(
  331. events(),
  332. media_type="text/event-stream",
  333. headers=_sse_headers())
  334. def _sse_headers() -> dict[str, str]:
  335. return {
  336. "Cache-Control": "no-cache",
  337. "X-Accel-Buffering": "no",
  338. }
  339. @router.post("/workers/execute-next", response_model=TeamWorkerExecuteNextResponse)
  340. def execute_next_worker_task(
  341. payload: TeamWorkerExecuteNextRequest,
  342. settings: TeamServiceSettings = Depends(get_team_settings),
  343. service: TeamApplicationService = Depends(get_team_application_service)) -> TeamWorkerExecuteNextResponse:
  344. result = service.execute_next_claimed_team_run(
  345. worker_key=payload.worker_key,
  346. lease_seconds=payload.lease_seconds or settings.worker_lease_seconds,
  347. stale_running_seconds=settings.worker_stale_running_seconds,
  348. dry_run=payload.dry_run if payload.dry_run is not None else settings.worker_dry_run,
  349. redis_client=try_build_redis_client(settings.redis_url))
  350. if result is None:
  351. raise HTTPException(status_code=404, detail=error_detail("error.team_run.not_found_queued"))
  352. entity, released_lease_count = result
  353. output_json = entity.output_json or {}
  354. member_run_count = output_json.get("member_run_count")
  355. dry_run = output_json.get("dry_run")
  356. return TeamWorkerExecuteNextResponse(
  357. run=TeamRunResponse.from_entity(entity),
  358. member_run_count=member_run_count if isinstance(member_run_count, int) else 0,
  359. dry_run=dry_run if isinstance(dry_run, bool) else settings.worker_dry_run,
  360. released_lease_count=released_lease_count)