services.py 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155
  1. from dataclasses import dataclass
  2. from collections.abc import Iterator
  3. from datetime import datetime, timedelta
  4. from concurrent.futures import ThreadPoolExecutor, as_completed
  5. from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
  6. from core_domain import AgentRunContract, TeamMemberContract
  7. from core_shared import JSONValue, try_build_redis_client
  8. from core_shared.task_queue import TaskQueuePublisher
  9. from app.bootstrap.settings import TeamServiceSettings
  10. from app.db.models import TeamDefinition, TeamRun, TeamConfig
  11. from app.domain.repositories import (
  12. TeamDefinitionRepository,
  13. TeamRunRepository,
  14. TeamConfigRepository)
  15. from app.infrastructure.agent_client import AgentServiceClient, AgentServiceClientError
  16. from app.schemas.team import (
  17. TeamConfigCreateRequestDto,
  18. TeamConfigUpdateRequestDto,
  19. TeamCreateRequest,
  20. TeamCreateRequestDto,
  21. TeamDeleteRequestDto,
  22. TeamRunCreateRequest,
  23. TeamRunCreateRequestDto,
  24. TeamRunExecuteRequest,
  25. TeamRunStatusUpdateRequestDto,
  26. TeamRunStatusUpdateRequest,
  27. TeamStatusUpdateRequest,
  28. TeamUpdateRequestDto,
  29. TeamConfigCreateRequest)
  30. MAX_PRIOR_OUTPUT_TEXT_CHARS = 1600
  31. @dataclass(frozen=True)
  32. class TeamMemberRunResult:
  33. member: TeamMemberContract
  34. run: AgentRunContract
  35. class TeamApplicationService:
  36. def __init__(
  37. self,
  38. *,
  39. team_repository: TeamDefinitionRepository,
  40. team_config_repository: TeamConfigRepository,
  41. team_run_repository: TeamRunRepository,
  42. agent_client: AgentServiceClient | None = None,
  43. event_client: EventServiceClient | None = None,
  44. task_queue_publisher: TaskQueuePublisher | None = None) -> None:
  45. self.team_repository = team_repository
  46. self.team_config_repository = team_config_repository
  47. self.team_run_repository = team_run_repository
  48. self.agent_client = agent_client
  49. self.event_client = event_client
  50. self.task_queue_publisher = task_queue_publisher
  51. def create_team(self, payload: TeamCreateRequest) -> TeamDefinition:
  52. return self.team_repository.create(
  53. code=payload.code,
  54. name=payload.name,
  55. description=payload.description,
  56. team_type=payload.team_type,
  57. owner_user_id=payload.owner_user_id,
  58. metadata_json=payload.metadata_json)
  59. def create_team_from_contract(self, payload: TeamCreateRequestDto) -> TeamDefinition:
  60. return self.create_team(
  61. TeamCreateRequest(
  62. code=self._build_team_code(payload.name),
  63. name=payload.name,
  64. description=payload.description,
  65. team_type=payload.teamType,
  66. owner_user_id=payload.ownerUserId,
  67. metadata_json=payload.metadata))
  68. def list_teams(self) -> list[TeamDefinition]:
  69. return self.team_repository.list_all()
  70. def get_team(self, *, team_id: str) -> TeamDefinition | None:
  71. return self.team_repository.get_by_id(team_id=team_id)
  72. def update_team_from_contract(self, payload: TeamUpdateRequestDto) -> TeamDefinition | None:
  73. entity = self.team_repository.get_by_id(team_id=payload.teamId)
  74. if entity is None:
  75. return None
  76. if payload.name is not None:
  77. entity.name = payload.name
  78. entity.code = self._build_team_code(payload.name)
  79. if payload.description is not None:
  80. entity.description = payload.description
  81. if payload.teamType is not None:
  82. entity.team_type = payload.teamType
  83. if payload.status is not None:
  84. entity.status = payload.status
  85. if payload.ownerUserId is not None:
  86. entity.owner_user_id = payload.ownerUserId
  87. if payload.metadata is not None:
  88. entity.metadata_json = payload.metadata
  89. return self.team_repository.save(entity)
  90. def delete_team_from_contract(self, payload: TeamDeleteRequestDto) -> bool:
  91. entity = self.team_repository.get_by_id(team_id=payload.teamId)
  92. if entity is None:
  93. return False
  94. self.team_repository.delete(entity)
  95. return True
  96. def update_team_status(
  97. self,
  98. *,
  99. team_id: str,
  100. payload: TeamStatusUpdateRequest) -> TeamDefinition | None:
  101. return self.team_repository.update_status(
  102. team_id=team_id,
  103. status=payload.status)
  104. def create_team_config(self, payload: TeamConfigCreateRequest) -> TeamConfig:
  105. team = self.team_repository.get_by_id(team_id=payload.team_id)
  106. if team is None:
  107. raise ValueError(f"team not found: {payload.team_id}")
  108. if not payload.member_refs:
  109. raise ValueError("team config requires at least one member")
  110. return self.team_config_repository.create(
  111. team_id=payload.team_id,
  112. coordination_mode=payload.coordination_mode,
  113. objective=payload.objective,
  114. member_refs_json=[item.model_dump(mode="json") for item in payload.member_refs],
  115. policy_json=payload.policy_json)
  116. def create_team_config_from_contract(self, payload: TeamConfigCreateRequestDto) -> TeamConfig:
  117. return self.create_team_config(
  118. TeamConfigCreateRequest(
  119. team_id=payload.teamId,
  120. coordination_mode=payload.coordinationMode,
  121. objective=payload.objective,
  122. member_refs=self._normalize_member_refs(payload.memberRefs),
  123. policy_json=payload.policy))
  124. def list_team_configs(self, *, team_id: str | None = None) -> list[TeamConfig]:
  125. if team_id is not None:
  126. return self.team_config_repository.list_by_team(team_id=team_id)
  127. return self.team_config_repository.list_all()
  128. def get_team_config(self, *, config_id: str) -> TeamConfig | None:
  129. return self.team_config_repository.get_by_id(team_config_id=config_id)
  130. def update_team_config_from_contract(self, payload: TeamConfigUpdateRequestDto) -> TeamConfig | None:
  131. entity = self.team_config_repository.get_by_id(team_config_id=payload.configId)
  132. if entity is None:
  133. return None
  134. if payload.coordinationMode is not None:
  135. entity.coordination_mode = payload.coordinationMode
  136. if payload.objective is not None:
  137. entity.objective = payload.objective
  138. if payload.memberRefs is not None:
  139. normalized = self._normalize_member_refs(payload.memberRefs)
  140. if not normalized:
  141. raise ValueError("team config requires at least one member")
  142. entity.member_refs_json = [item.model_dump(mode="json") for item in normalized]
  143. if payload.policy is not None:
  144. entity.policy_json = payload.policy
  145. return self.team_config_repository.save(entity)
  146. def delete_team_config(self, *, config_id: str) -> bool:
  147. entity = self.team_config_repository.get_by_id(team_config_id=config_id)
  148. if entity is None:
  149. return False
  150. self.team_config_repository.delete(entity)
  151. return True
  152. def create_team_run(self, payload: TeamRunCreateRequest) -> TeamRun:
  153. team_config = self._resolve_team_config(
  154. team_id=payload.team_id,
  155. team_config_id=payload.team_config_id)
  156. if team_config is None:
  157. raise ValueError("team config not found")
  158. team_run = self.team_run_repository.create(
  159. team_id=payload.team_id,
  160. team_config_id=team_config.id,
  161. session_id=payload.session_id,
  162. input_text=payload.input_text,
  163. input_json=payload.input_json)
  164. self._publish_event(
  165. event_type="team.run.created",
  166. team_run=team_run,
  167. payload_json={"team_run_id": team_run.id, "status": team_run.status})
  168. if payload.enqueue and self.task_queue_publisher is not None:
  169. self.task_queue_publisher.publish_team_run(
  170. team_run_id=team_run.id)
  171. return team_run
  172. def create_team_run_from_contract(self, payload: TeamRunCreateRequestDto) -> TeamRun:
  173. return self.create_team_run(
  174. TeamRunCreateRequest(
  175. team_id=payload.teamId,
  176. team_config_id=payload.teamConfigId,
  177. session_id=payload.sessionId,
  178. input_text=payload.inputText,
  179. input_json=payload.inputJson,
  180. enqueue=payload.enqueue))
  181. def list_team_runs(
  182. self,
  183. *,
  184. team_id: str | None = None,
  185. session_id: str | None = None) -> list[TeamRun]:
  186. return self.team_run_repository.list_by_scope(
  187. team_id=team_id,
  188. session_id=session_id)
  189. def get_team_run(self, *, team_run_id: str) -> TeamRun | None:
  190. return self.team_run_repository.get_by_id(team_run_id=team_run_id)
  191. def delete_team_run(self, *, team_run_id: str) -> bool:
  192. entity = self.team_run_repository.get_by_id(team_run_id=team_run_id)
  193. if entity is None:
  194. return False
  195. self.team_run_repository.delete(entity)
  196. return True
  197. def update_team_run_status(
  198. self,
  199. *,
  200. team_run_id: str,
  201. payload: TeamRunStatusUpdateRequest) -> TeamRun | None:
  202. entity = self.team_run_repository.get_by_id(
  203. team_run_id=team_run_id)
  204. if entity is None:
  205. return None
  206. return self.team_run_repository.update_status(
  207. team_run_id=team_run_id,
  208. status=payload.status,
  209. worker_key=payload.worker_key,
  210. output_text=payload.output_text,
  211. output_json=payload.output_json,
  212. error_code=payload.error_code,
  213. error_message=payload.error_message)
  214. def update_team_run_status_from_contract(
  215. self,
  216. payload: TeamRunStatusUpdateRequestDto) -> TeamRun | None:
  217. return self.update_team_run_status(
  218. team_run_id=payload.teamRunId,
  219. payload=TeamRunStatusUpdateRequest(
  220. status=payload.status,
  221. worker_key=payload.workerKey,
  222. output_text=payload.outputText,
  223. output_json=payload.outputJson,
  224. error_code=payload.errorCode,
  225. error_message=payload.errorMessage))
  226. def execute_team_run(
  227. self,
  228. *,
  229. team_run_id: str,
  230. payload: TeamRunExecuteRequest) -> TeamRun | None:
  231. team_run = self.team_run_repository.get_by_id(
  232. team_run_id=team_run_id)
  233. if team_run is None:
  234. return None
  235. team_config = self.team_config_repository.get_by_id(
  236. team_config_id=team_run.team_config_id)
  237. if team_config is None:
  238. failed_run = self.team_run_repository.update_status(
  239. team_run_id=team_run.id,
  240. status="failed",
  241. worker_key=payload.worker_key,
  242. error_code="team_config_missing",
  243. error_message=f"team config not found: {team_run.team_config_id}")
  244. if failed_run is not None:
  245. self._publish_event(
  246. event_type="team.run.failed",
  247. team_run=failed_run,
  248. payload_json={
  249. "team_run_id": failed_run.id,
  250. "status": failed_run.status,
  251. "error_code": "team_config_missing",
  252. })
  253. return failed_run
  254. running_run = self.team_run_repository.update_status(
  255. team_run_id=team_run.id,
  256. status="running",
  257. worker_key=payload.worker_key)
  258. if running_run is None:
  259. return None
  260. members = self._read_team_members(team_config)
  261. if not members:
  262. return self.team_run_repository.update_status(
  263. team_run_id=team_run.id,
  264. status="failed",
  265. worker_key=payload.worker_key,
  266. error_code="team_members_missing",
  267. error_message="team config has no valid members")
  268. try:
  269. member_results = self._execute_members(
  270. team_run=team_run,
  271. team_config=team_config,
  272. members=members,
  273. worker_key=payload.worker_key,
  274. dry_run=payload.dry_run)
  275. except AgentServiceClientError as exc:
  276. return self.team_run_repository.update_status(
  277. team_run_id=team_run.id,
  278. status="failed",
  279. worker_key=payload.worker_key,
  280. error_code="agent_service_error",
  281. error_message=str(exc))
  282. failed_results = [item for item in member_results if item.run.status != "completed"]
  283. output_text = self._build_team_output_text(
  284. team_config=team_config,
  285. member_results=member_results)
  286. output_json: dict[str, JSONValue] = {
  287. "dry_run": payload.dry_run,
  288. "coordination_mode": team_config.coordination_mode,
  289. "team_config_id": team_config.id,
  290. "member_run_count": len(member_results),
  291. "member_results": [
  292. self._member_result_to_json(item) for item in member_results
  293. ],
  294. }
  295. failure_mode = self._read_failure_mode(team_config)
  296. if failed_results and failure_mode != "continue_with_warning":
  297. failed_run = self.team_run_repository.update_status(
  298. team_run_id=team_run.id,
  299. status="failed",
  300. worker_key=payload.worker_key,
  301. output_text=output_text,
  302. output_json=output_json,
  303. error_code="member_run_failed",
  304. error_message=f"{len(failed_results)} member run(s) failed")
  305. if failed_run is not None:
  306. self._publish_event(
  307. event_type="team.run.failed",
  308. team_run=failed_run,
  309. payload_json={
  310. "team_run_id": failed_run.id,
  311. "status": failed_run.status,
  312. "failed_member_count": len(failed_results),
  313. })
  314. return failed_run
  315. completed_run = self.team_run_repository.update_status(
  316. team_run_id=team_run.id,
  317. status="completed",
  318. worker_key=payload.worker_key,
  319. output_text=output_text,
  320. output_json=output_json)
  321. if completed_run is not None:
  322. self._publish_event(
  323. event_type="team.run.completed",
  324. team_run=completed_run,
  325. payload_json={
  326. "team_run_id": completed_run.id,
  327. "status": completed_run.status,
  328. "member_run_count": len(member_results),
  329. })
  330. return completed_run
  331. def execute_team_run_stream(
  332. self,
  333. *,
  334. team_run_id: str,
  335. payload: TeamRunExecuteRequest) -> Iterator[dict[str, JSONValue]]:
  336. team_run = self.team_run_repository.get_by_id(team_run_id=team_run_id)
  337. if team_run is None:
  338. return
  339. team_config = self.team_config_repository.get_by_id(
  340. team_config_id=team_run.team_config_id)
  341. if team_config is None:
  342. failed_run = self.team_run_repository.update_status(
  343. team_run_id=team_run.id,
  344. status="failed",
  345. worker_key=payload.worker_key,
  346. error_code="team_config_missing",
  347. error_message=f"team config not found: {team_run.team_config_id}")
  348. yield {"event": "team.run.failed", "run": self._team_run_to_json(failed_run)}
  349. return
  350. running_run = self.team_run_repository.update_status(
  351. team_run_id=team_run.id,
  352. status="running",
  353. worker_key=payload.worker_key)
  354. yield {"event": "team.run.started", "run": self._team_run_to_json(running_run)}
  355. members = self._read_team_members(team_config)
  356. if not members:
  357. failed_run = self.team_run_repository.update_status(
  358. team_run_id=team_run.id,
  359. status="failed",
  360. worker_key=payload.worker_key,
  361. error_code="team_members_missing",
  362. error_message="team config has no valid members")
  363. yield {"event": "team.run.failed", "run": self._team_run_to_json(failed_run)}
  364. return
  365. if self.agent_client is None:
  366. failed_run = self.team_run_repository.update_status(
  367. team_run_id=team_run.id,
  368. status="failed",
  369. worker_key=payload.worker_key,
  370. error_code="agent_service_missing",
  371. error_message="agent service client is not configured")
  372. yield {"event": "team.run.failed", "run": self._team_run_to_json(failed_run)}
  373. return
  374. stream_members = self._select_stream_members(team_config=team_config, members=members)
  375. mode = team_config.coordination_mode
  376. if mode == "debate":
  377. member_results = yield from self._stream_members_debate(
  378. team_run=team_run, team_config=team_config, members=stream_members,
  379. payload=payload)
  380. else:
  381. member_results = yield from self._stream_members_sequential(
  382. team_run=team_run, team_config=team_config, members=stream_members,
  383. payload=payload)
  384. failed_results = [item for item in member_results if item.run.status != "completed"]
  385. output_text = self._build_team_output_text(
  386. team_config=team_config,
  387. member_results=member_results)
  388. output_json: dict[str, JSONValue] = {
  389. "dry_run": payload.dry_run,
  390. "coordination_mode": team_config.coordination_mode,
  391. "team_config_id": team_config.id,
  392. "member_run_count": len(member_results),
  393. "member_results": [
  394. self._member_result_to_json(item) for item in member_results
  395. ],
  396. "streamed": True,
  397. "response_mode": self._read_response_mode(team_config),
  398. }
  399. failure_mode = self._read_failure_mode(team_config)
  400. if failed_results and failure_mode != "continue_with_warning":
  401. failed_run = self.team_run_repository.update_status(
  402. team_run_id=team_run.id,
  403. status="failed",
  404. worker_key=payload.worker_key,
  405. output_text=output_text,
  406. output_json=output_json,
  407. error_code="member_run_failed",
  408. error_message=f"{len(failed_results)} member run(s) failed")
  409. yield {"event": "team.run.failed", "run": self._team_run_to_json(failed_run)}
  410. return
  411. completed_run = self.team_run_repository.update_status(
  412. team_run_id=team_run.id,
  413. status="completed",
  414. worker_key=payload.worker_key,
  415. output_text=output_text,
  416. output_json=output_json)
  417. yield {"event": "team.run.completed", "run": self._team_run_to_json(completed_run)}
  418. def execute_next_claimed_team_run(
  419. self,
  420. *,
  421. worker_key: str,
  422. lease_seconds: int,
  423. stale_running_seconds: int,
  424. dry_run: bool,
  425. redis_client: object | None = None) -> tuple[TeamRun, int] | None:
  426. released_lease_count = self.team_run_repository.release_expired_leases(
  427. now_time=datetime.utcnow(),
  428. stale_running_seconds=stale_running_seconds)
  429. claimed_team_run = self.team_run_repository.claim_next_queued(
  430. worker_key=worker_key,
  431. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds))
  432. if claimed_team_run is None:
  433. return None
  434. if redis_client is not None:
  435. from core_shared.redis_primitives import DistributedLock, IdempotencyStore
  436. lock = DistributedLock(
  437. client=redis_client,
  438. name=f"team-run:{claimed_team_run.id}:lock",
  439. ttl_seconds=lease_seconds)
  440. if not lock.acquire():
  441. return None
  442. idempotency_store = IdempotencyStore(
  443. client=redis_client,
  444. prefix="team-run-idempotency")
  445. if not idempotency_store.begin(key=claimed_team_run.id):
  446. lock.release()
  447. return None
  448. else:
  449. lock = None
  450. idempotency_store = None
  451. try:
  452. result = self.execute_team_run(
  453. team_run_id=claimed_team_run.id,
  454. payload=TeamRunExecuteRequest(
  455. worker_key=worker_key,
  456. dry_run=dry_run))
  457. if idempotency_store is not None and result is not None:
  458. idempotency_store.complete(
  459. key=claimed_team_run.id,
  460. result={"status": result.status, "team_run_id": result.id})
  461. finally:
  462. if lock is not None:
  463. lock.release()
  464. if result is None:
  465. return None
  466. return result, released_lease_count
  467. def _resolve_team_config(
  468. self,
  469. *,
  470. team_id: str,
  471. team_config_id: str | None) -> TeamConfig | None:
  472. if team_config_id is not None:
  473. return self.team_config_repository.get_by_id(
  474. team_config_id=team_config_id)
  475. return self.team_config_repository.get_latest_by_team(
  476. team_id=team_id)
  477. def _execute_members(
  478. self,
  479. *,
  480. team_run: TeamRun,
  481. team_config: TeamConfig,
  482. members: list[TeamMemberContract],
  483. worker_key: str | None,
  484. dry_run: bool) -> list[TeamMemberRunResult]:
  485. if self.agent_client is None:
  486. raise AgentServiceClientError("agent service client is not configured")
  487. ordered_members = self._order_members(members)
  488. handoff = team_config.policy_json.get("handoff")
  489. mode = team_config.coordination_mode
  490. if mode == "parallel" or handoff == "parallel_merge":
  491. return self._execute_members_parallel(
  492. team_run=team_run, team_config=team_config,
  493. members=ordered_members, worker_key=worker_key, dry_run=dry_run)
  494. if mode == "pipeline":
  495. return self._execute_members_pipeline(
  496. team_run=team_run, team_config=team_config,
  497. members=ordered_members, worker_key=worker_key, dry_run=dry_run)
  498. if mode == "debate":
  499. return self._execute_members_debate(
  500. team_run=team_run, team_config=team_config,
  501. members=ordered_members, worker_key=worker_key, dry_run=dry_run)
  502. return self._execute_members_supervisor(
  503. team_run=team_run, team_config=team_config,
  504. members=ordered_members, worker_key=worker_key, dry_run=dry_run)
  505. def _execute_members_supervisor(
  506. self,
  507. *,
  508. team_run: TeamRun,
  509. team_config: TeamConfig,
  510. members: list[TeamMemberContract],
  511. worker_key: str | None,
  512. dry_run: bool) -> list[TeamMemberRunResult]:
  513. lead = next((m for m in members if m.role in {"supervisor", "planner"}), None)
  514. others = [m for m in members if m is not lead] if lead else members
  515. failure_mode = self._read_failure_mode(team_config)
  516. if lead is None:
  517. return self._execute_members_sequential(
  518. team_run=team_run, team_config=team_config, members=members,
  519. worker_key=worker_key, dry_run=dry_run, failure_mode=failure_mode)
  520. # Phase 1: lead executes first
  521. lead_input = self._build_member_input_json(
  522. team_run=team_run, team_config=team_config, member=lead, prior_outputs=[])
  523. lead_result = self._execute_single_member(
  524. team_run=team_run, team_config=team_config, member=lead,
  525. member_input_json=lead_input, worker_key=worker_key, dry_run=dry_run)
  526. if lead_result.run.status != "completed" and failure_mode == "stop_on_critical":
  527. return [lead_result]
  528. # Phase 2: others execute with lead output as context
  529. lead_output = self._compact_prior_output(lead_result)
  530. other_results = self._execute_members_sequential(
  531. team_run=team_run, team_config=team_config, members=others,
  532. worker_key=worker_key, dry_run=dry_run, failure_mode=failure_mode,
  533. initial_prior_outputs=[lead_output])
  534. # Phase 3: optional synthesis pass
  535. do_synthesis = team_config.policy_json.get("supervisor_synthesis", True)
  536. if do_synthesis and lead_result.run.status == "completed":
  537. all_outputs = [lead_output] + [self._compact_prior_output(r) for r in other_results]
  538. synthesis_input = self._build_member_input_json(
  539. team_run=team_run, team_config=team_config, member=lead,
  540. prior_outputs=all_outputs)
  541. synthesis_result = self._execute_single_member(
  542. team_run=team_run, team_config=team_config, member=lead,
  543. member_input_json=synthesis_input, worker_key=worker_key, dry_run=dry_run)
  544. return [lead_result] + other_results + [synthesis_result]
  545. return [lead_result] + other_results
  546. def _execute_members_pipeline(
  547. self,
  548. *,
  549. team_run: TeamRun,
  550. team_config: TeamConfig,
  551. members: list[TeamMemberContract],
  552. worker_key: str | None,
  553. dry_run: bool) -> list[TeamMemberRunResult]:
  554. failure_mode = self._read_failure_mode(team_config)
  555. member_results: list[TeamMemberRunResult] = []
  556. prev_output: dict[str, JSONValue] | None = None
  557. for member in members:
  558. prior = [prev_output] if prev_output is not None else []
  559. member_input_json = self._build_member_input_json(
  560. team_run=team_run, team_config=team_config, member=member, prior_outputs=prior)
  561. result = self._execute_single_member(
  562. team_run=team_run, team_config=team_config, member=member,
  563. member_input_json=member_input_json, worker_key=worker_key, dry_run=dry_run)
  564. member_results.append(result)
  565. if result.run.status == "completed":
  566. prev_output = self._compact_prior_output(result)
  567. elif failure_mode == "stop_on_critical":
  568. break
  569. elif failure_mode == "retry_once":
  570. retry_result = self._execute_single_member(
  571. team_run=team_run, team_config=team_config, member=member,
  572. member_input_json=member_input_json, worker_key=worker_key, dry_run=dry_run)
  573. member_results[-1] = retry_result
  574. if retry_result.run.status == "completed":
  575. prev_output = self._compact_prior_output(retry_result)
  576. elif failure_mode == "stop_on_critical":
  577. break
  578. return member_results
  579. def _execute_members_debate(
  580. self,
  581. *,
  582. team_run: TeamRun,
  583. team_config: TeamConfig,
  584. members: list[TeamMemberContract],
  585. worker_key: str | None,
  586. dry_run: bool) -> list[TeamMemberRunResult]:
  587. max_rounds = self._read_max_rounds(team_config)
  588. failure_mode = self._read_failure_mode(team_config)
  589. debate_history: list[dict[str, JSONValue]] = []
  590. final_results: list[TeamMemberRunResult] = []
  591. for round_num in range(1, max_rounds + 1):
  592. round_results: list[TeamMemberRunResult] = []
  593. for member in members:
  594. member_input_json = self._build_member_input_json(
  595. team_run=team_run, team_config=team_config, member=member,
  596. prior_outputs=debate_history)
  597. result = self._execute_single_member(
  598. team_run=team_run, team_config=team_config, member=member,
  599. member_input_json=member_input_json, worker_key=worker_key, dry_run=dry_run)
  600. round_results.append(result)
  601. debate_history.append(self._compact_prior_output(result))
  602. if result.run.status != "completed" and failure_mode == "stop_on_critical":
  603. break
  604. if result.run.status != "completed" and failure_mode == "retry_once":
  605. retry = self._execute_single_member(
  606. team_run=team_run, team_config=team_config, member=member,
  607. member_input_json=member_input_json, worker_key=worker_key, dry_run=dry_run)
  608. round_results[-1] = retry
  609. debate_history[-1] = self._compact_prior_output(retry)
  610. final_results = round_results
  611. if any(r.run.status != "completed" for r in round_results) and failure_mode == "stop_on_critical":
  612. break
  613. return final_results
  614. def _execute_members_sequential(
  615. self,
  616. *,
  617. team_run: TeamRun,
  618. team_config: TeamConfig,
  619. members: list[TeamMemberContract],
  620. worker_key: str | None,
  621. dry_run: bool,
  622. failure_mode: str = "stop_on_critical",
  623. initial_prior_outputs: list[dict[str, JSONValue]] | None = None) -> list[TeamMemberRunResult]:
  624. member_results: list[TeamMemberRunResult] = []
  625. prior_outputs = list(initial_prior_outputs or [])
  626. for member in members:
  627. member_input_json = self._build_member_input_json(
  628. team_run=team_run, team_config=team_config, member=member, prior_outputs=prior_outputs)
  629. result = self._execute_single_member(
  630. team_run=team_run, team_config=team_config, member=member,
  631. member_input_json=member_input_json, worker_key=worker_key, dry_run=dry_run)
  632. member_results.append(result)
  633. prior_outputs.append(self._compact_prior_output(result))
  634. if result.run.status != "completed":
  635. if failure_mode == "stop_on_critical":
  636. break
  637. if failure_mode == "retry_once":
  638. retry = self._execute_single_member(
  639. team_run=team_run, team_config=team_config, member=member,
  640. member_input_json=member_input_json, worker_key=worker_key, dry_run=dry_run)
  641. member_results[-1] = retry
  642. prior_outputs[-1] = self._compact_prior_output(retry)
  643. if retry.run.status != "completed" and failure_mode == "stop_on_critical":
  644. break
  645. return member_results
  646. def _execute_members_parallel(
  647. self,
  648. *,
  649. team_run: TeamRun,
  650. team_config: TeamConfig,
  651. members: list[TeamMemberContract],
  652. worker_key: str | None,
  653. dry_run: bool) -> list[TeamMemberRunResult]:
  654. results_by_key: dict[str, TeamMemberRunResult] = {}
  655. with ThreadPoolExecutor(max_workers=max(1, min(len(members), 8))) as executor:
  656. future_by_member = {
  657. executor.submit(
  658. self._execute_single_member,
  659. team_run=team_run,
  660. team_config=team_config,
  661. member=member,
  662. member_input_json=self._build_member_input_json(
  663. team_run=team_run,
  664. team_config=team_config,
  665. member=member,
  666. prior_outputs=[]),
  667. worker_key=worker_key,
  668. dry_run=dry_run): member
  669. for member in members
  670. }
  671. for future in as_completed(future_by_member):
  672. member = future_by_member[future]
  673. results_by_key[member.member_key] = future.result()
  674. return [results_by_key[member.member_key] for member in members]
  675. def _execute_single_member(
  676. self,
  677. *,
  678. team_run: TeamRun,
  679. team_config: TeamConfig,
  680. member: TeamMemberContract,
  681. member_input_json: dict[str, JSONValue],
  682. worker_key: str | None,
  683. dry_run: bool) -> TeamMemberRunResult:
  684. if self.agent_client is None:
  685. raise AgentServiceClientError("agent service client is not configured")
  686. created_run = self.agent_client.create_agent_run(
  687. agent_id=member.agent_id,
  688. agent_config_id=member.agent_config_id,
  689. session_id=team_run.session_id,
  690. input_text=self._build_member_input_text(
  691. team_run=team_run,
  692. team_config=team_config,
  693. member=member),
  694. input_json=member_input_json)
  695. executed_run = self.agent_client.execute_agent_run(
  696. agent_run_id=created_run.id,
  697. worker_key=worker_key,
  698. dry_run=dry_run)
  699. return TeamMemberRunResult(member=member, run=executed_run)
  700. def _stream_single_member(
  701. self,
  702. *,
  703. team_run: TeamRun,
  704. team_config: TeamConfig,
  705. member: TeamMemberContract,
  706. prior_outputs: list[dict[str, JSONValue]],
  707. payload: TeamRunExecuteRequest) -> Iterator[tuple[dict[str, JSONValue], TeamMemberRunResult]]:
  708. member_input_json = self._build_member_input_json(
  709. team_run=team_run, team_config=team_config, member=member, prior_outputs=prior_outputs)
  710. created_run = self.agent_client.create_agent_run(
  711. agent_id=member.agent_id,
  712. agent_config_id=member.agent_config_id,
  713. session_id=team_run.session_id,
  714. input_text=self._build_member_input_text(
  715. team_run=team_run, team_config=team_config, member=member),
  716. input_json=member_input_json)
  717. yield {
  718. "event": "team.member.started",
  719. "member": self._member_to_json(member),
  720. "agent_run": created_run.model_dump(mode="json"),
  721. }, None
  722. final_agent_run = created_run
  723. try:
  724. for event_name, data in self.agent_client.execute_agent_run_stream(
  725. agent_run_id=created_run.id,
  726. worker_key=payload.worker_key,
  727. dry_run=payload.dry_run):
  728. if event_name == "agent.run.delta":
  729. delta = data.get("delta")
  730. if isinstance(delta, str):
  731. yield {
  732. "event": "team.member.delta",
  733. "member": self._member_to_json(member),
  734. "agent_run_id": created_run.id,
  735. "delta": delta,
  736. }, None
  737. elif event_name in {"agent.run.completed", "agent.run.failed"}:
  738. run_payload = data.get("run")
  739. if isinstance(run_payload, dict):
  740. final_agent_run = AgentRunContract.model_validate(run_payload)
  741. except AgentServiceClientError as exc:
  742. final_agent_run = created_run.model_copy(update={
  743. "status": "failed",
  744. "error_code": "agent_service_error",
  745. "error_message": str(exc),
  746. })
  747. result = TeamMemberRunResult(member=member, run=final_agent_run)
  748. yield {
  749. "event": "team.member.completed",
  750. "member": self._member_to_json(member),
  751. "agent_run": final_agent_run.model_dump(mode="json"),
  752. }, result
  753. def _stream_members_sequential(
  754. self,
  755. *,
  756. team_run: TeamRun,
  757. team_config: TeamConfig,
  758. members: list[TeamMemberContract],
  759. payload: TeamRunExecuteRequest) -> Iterator[list[TeamMemberRunResult]]:
  760. member_results: list[TeamMemberRunResult] = []
  761. prior_outputs: list[dict[str, JSONValue]] = []
  762. for member in members:
  763. for event, result in self._stream_single_member(
  764. team_run=team_run, team_config=team_config, member=member,
  765. prior_outputs=prior_outputs, payload=payload):
  766. if result is not None:
  767. member_results.append(result)
  768. prior_outputs.append(self._compact_prior_output(result))
  769. else:
  770. yield event
  771. return member_results
  772. def _stream_members_debate(
  773. self,
  774. *,
  775. team_run: TeamRun,
  776. team_config: TeamConfig,
  777. members: list[TeamMemberContract],
  778. payload: TeamRunExecuteRequest) -> Iterator[list[TeamMemberRunResult]]:
  779. max_rounds = self._read_max_rounds(team_config)
  780. debate_history: list[dict[str, JSONValue]] = []
  781. final_results: list[TeamMemberRunResult] = []
  782. for round_num in range(1, max_rounds + 1):
  783. yield {
  784. "event": "team.debate.round_started",
  785. "round": round_num,
  786. "max_rounds": max_rounds,
  787. }
  788. round_results: list[TeamMemberRunResult] = []
  789. for member in members:
  790. for event, result in self._stream_single_member(
  791. team_run=team_run, team_config=team_config, member=member,
  792. prior_outputs=debate_history, payload=payload):
  793. if result is not None:
  794. round_results.append(result)
  795. debate_history.append(self._compact_prior_output(result))
  796. else:
  797. yield event
  798. final_results = round_results
  799. yield {
  800. "event": "team.debate.round_completed",
  801. "round": round_num,
  802. "max_rounds": max_rounds,
  803. "member_count": len(round_results),
  804. }
  805. return final_results
  806. def _read_team_members(self, team_config: TeamConfig) -> list[TeamMemberContract]:
  807. members: list[TeamMemberContract] = []
  808. for item in team_config.member_refs_json:
  809. try:
  810. members.append(TeamMemberContract.model_validate(item))
  811. except ValueError:
  812. continue
  813. return members
  814. def _order_members(self, members: list[TeamMemberContract]) -> list[TeamMemberContract]:
  815. role_priority = {
  816. "planner": 0,
  817. "supervisor": 1,
  818. "specialist": 2,
  819. "executor": 3,
  820. "reviewer": 4,
  821. }
  822. return sorted(members, key=lambda item: role_priority.get(item.role, 10))
  823. def _select_stream_members(
  824. self,
  825. *,
  826. team_config: TeamConfig,
  827. members: list[TeamMemberContract]) -> list[TeamMemberContract]:
  828. ordered_members = self._order_members(members)
  829. if self._read_response_mode(team_config) == "all_members":
  830. return ordered_members
  831. for member in ordered_members:
  832. if member.role in {"supervisor", "planner"}:
  833. return [member]
  834. return ordered_members[:1]
  835. def _read_response_mode(self, team_config: TeamConfig) -> str:
  836. value = team_config.policy_json.get("response_mode")
  837. if isinstance(value, str) and value in {"single_responder", "all_members"}:
  838. return value
  839. return "single_responder"
  840. def _read_max_rounds(self, team_config: TeamConfig) -> int:
  841. value = team_config.policy_json.get("max_rounds")
  842. if isinstance(value, (int, float)):
  843. return max(1, min(int(value), 20))
  844. return 3
  845. def _read_failure_mode(self, team_config: TeamConfig) -> str:
  846. value = team_config.policy_json.get("failure_mode")
  847. if isinstance(value, str) and value in {"stop_on_critical", "continue_with_warning", "retry_once"}:
  848. return value
  849. return "stop_on_critical"
  850. def _build_member_input_text(
  851. self,
  852. *,
  853. team_run: TeamRun,
  854. team_config: TeamConfig,
  855. member: TeamMemberContract) -> str:
  856. lines = [
  857. f"Team objective: {team_config.objective or 'No objective provided.'}",
  858. f"Member role: {member.role}",
  859. ]
  860. if member.responsibility:
  861. lines.append(f"Responsibility: {member.responsibility}")
  862. if team_run.input_text:
  863. lines.append(f"User task: {team_run.input_text}")
  864. return "\n".join(lines)
  865. def _build_member_input_json(
  866. self,
  867. *,
  868. team_run: TeamRun,
  869. team_config: TeamConfig,
  870. member: TeamMemberContract,
  871. prior_outputs: list[dict[str, JSONValue]]) -> dict[str, JSONValue]:
  872. input_json: dict[str, JSONValue] = dict(team_run.input_json or {})
  873. input_json.update(
  874. {
  875. "team_id": team_run.team_id,
  876. "team_run_id": team_run.id,
  877. "team_config_id": team_config.id,
  878. "team_objective": team_config.objective,
  879. "member_key": member.member_key,
  880. "member_role": member.role,
  881. "member_responsibility": member.responsibility,
  882. "prior_member_outputs": prior_outputs,
  883. }
  884. )
  885. configured_input = member.config_json.get("input_json")
  886. if isinstance(configured_input, dict):
  887. input_json.update(
  888. {str(item_key): item_value for item_key, item_value in configured_input.items()}
  889. )
  890. return input_json
  891. def _build_team_output_text(
  892. self,
  893. *,
  894. team_config: TeamConfig,
  895. member_results: list[TeamMemberRunResult]) -> str:
  896. lines = [
  897. f"Team objective: {team_config.objective or 'No objective provided.'}",
  898. f"Coordination mode: {team_config.coordination_mode}",
  899. "Team conversation:",
  900. ]
  901. for index, item in enumerate(member_results, start=1):
  902. output_text = item.run.output_text or item.run.error_message or ""
  903. speaker = item.member.name or item.member.member_key
  904. lines.append(
  905. f"{index}. {speaker} ({item.member.role}) "
  906. f"status={item.run.status}: {output_text}")
  907. return "\n".join(lines)
  908. def _member_result_to_json(self, result: TeamMemberRunResult) -> dict[str, JSONValue]:
  909. return {
  910. "member_key": result.member.member_key,
  911. "member_role": result.member.role,
  912. "member_name": result.member.name,
  913. "member_responsibility": result.member.responsibility,
  914. "agent_run_id": result.run.id,
  915. "agent_id": result.run.agent_id,
  916. "agent_config_id": result.run.agent_config_id,
  917. "status": result.run.status,
  918. "output_text": result.run.output_text,
  919. "output_json": self._compact_agent_output_json(result.run.output_json or {}),
  920. "error_code": result.run.error_code,
  921. "error_message": result.run.error_message,
  922. }
  923. def _member_to_json(self, member: TeamMemberContract) -> dict[str, JSONValue]:
  924. return member.model_dump(mode="json")
  925. def _team_run_to_json(self, team_run: TeamRun | None) -> dict[str, JSONValue]:
  926. if team_run is None:
  927. return {}
  928. return {
  929. "id": team_run.id,
  930. "team_id": team_run.team_id,
  931. "team_config_id": team_run.team_config_id,
  932. "session_id": team_run.session_id,
  933. "input_text": team_run.input_text,
  934. "input_json": team_run.input_json,
  935. "output_text": team_run.output_text,
  936. "output_json": team_run.output_json,
  937. "status": team_run.status,
  938. "worker_key": team_run.worker_key,
  939. "queued_time": team_run.queued_time,
  940. "lease_expire_time": team_run.lease_expire_time,
  941. "started_time": team_run.started_time,
  942. "finished_time": team_run.finished_time,
  943. "error_code": team_run.error_code,
  944. "error_message": team_run.error_message,
  945. "created_time": team_run.created_time,
  946. }
  947. def _compact_prior_output(self, result: TeamMemberRunResult) -> dict[str, JSONValue]:
  948. return {
  949. "member_key": result.member.member_key,
  950. "member_role": result.member.role,
  951. "member_name": result.member.name,
  952. "agent_id": result.run.agent_id,
  953. "agent_run_id": result.run.id,
  954. "status": result.run.status,
  955. "output_text": self._truncate_text(result.run.output_text),
  956. "error_code": result.run.error_code,
  957. "error_message": result.run.error_message,
  958. "output_json": self._compact_agent_output_json(result.run.output_json or {}),
  959. }
  960. def _compact_agent_output_json(
  961. self,
  962. output_json: dict[str, JSONValue]) -> dict[str, JSONValue]:
  963. keep_keys = {
  964. "dry_run",
  965. "model",
  966. "finish_reason",
  967. "usage_json",
  968. "tool_invocations",
  969. "skill_invocations",
  970. "selected_tool_refs",
  971. "selected_skill_refs",
  972. "memory_read_enabled",
  973. "memory_read_count",
  974. "memory_read_reason",
  975. }
  976. compacted = {
  977. key: value for key, value in output_json.items()
  978. if key in keep_keys
  979. }
  980. if "raw_response_json" in output_json or "messages" in output_json:
  981. compacted["debug_payload_omitted"] = True
  982. return compacted
  983. def _truncate_text(self, value: str | None) -> str | None:
  984. if value is None or len(value) <= MAX_PRIOR_OUTPUT_TEXT_CHARS:
  985. return value
  986. return f"{value[:MAX_PRIOR_OUTPUT_TEXT_CHARS]}... [truncated]"
  987. def _publish_event(
  988. self,
  989. *,
  990. event_type: str,
  991. team_run: TeamRun,
  992. payload_json: dict[str, JSONValue]) -> None:
  993. if self.event_client is None:
  994. return
  995. try:
  996. self.event_client.publish_event(
  997. EventPublishContract(
  998. event_type=event_type,
  999. source_service="team-service",
  1000. aggregate_type="team_run",
  1001. aggregate_id=team_run.id,
  1002. correlation_id=team_run.session_id,
  1003. payload_json={
  1004. **payload_json,
  1005. "team_id": team_run.team_id,
  1006. "team_config_id": team_run.team_config_id,
  1007. })
  1008. )
  1009. except EventServiceClientError:
  1010. return
  1011. def _build_team_code(self, name: str) -> str:
  1012. base = "".join(
  1013. char.lower() if char.isalnum() else "_"
  1014. for char in name
  1015. ).strip("_") or "team"
  1016. return base[:64]
  1017. def _normalize_member_refs(self, member_refs: list[dict[str, JSONValue]]) -> list[TeamMemberContract]:
  1018. members: list[TeamMemberContract] = []
  1019. for index, item in enumerate(member_refs, start=1):
  1020. role = item.get("role")
  1021. normalized_role = "specialist" if role == "worker" else role
  1022. member = {
  1023. **item,
  1024. "member_key": item.get("member_key") or item.get("memberKey") or f"member_{index}",
  1025. "agent_id": item.get("agent_id") or item.get("agentId"),
  1026. "agent_config_id": item.get("agent_config_id") or item.get("agentConfigId"),
  1027. "role": normalized_role or "specialist",
  1028. "config_json": item.get("config_json") or item.get("configJson") or {},
  1029. }
  1030. members.append(TeamMemberContract.model_validate(member))
  1031. return members
  1032. def build_team_application_service(
  1033. *,
  1034. team_repository: TeamDefinitionRepository,
  1035. team_config_repository: TeamConfigRepository,
  1036. team_run_repository: TeamRunRepository,
  1037. settings: TeamServiceSettings) -> TeamApplicationService:
  1038. redis_client = try_build_redis_client(settings.redis_url)
  1039. return TeamApplicationService(
  1040. team_repository=team_repository,
  1041. team_config_repository=team_config_repository,
  1042. team_run_repository=team_run_repository,
  1043. agent_client=AgentServiceClient(
  1044. base_url=settings.agent_service_url,
  1045. timeout_seconds=settings.agent_service_timeout_seconds),
  1046. event_client=EventServiceClient(
  1047. base_url=settings.event_service_url,
  1048. timeout_seconds=settings.event_service_timeout_seconds),
  1049. task_queue_publisher=(
  1050. TaskQueuePublisher(client=redis_client) if redis_client is not None else None
  1051. ))