services.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929
  1. from dataclasses import dataclass
  2. from collections.abc import Iterator
  3. from datetime import datetime, timedelta
  4. from concurrent.futures import ThreadPoolExecutor, as_completed
  5. from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
  6. from core_domain import AgentRunContract, TeamMemberContract
  7. from core_shared import JSONValue, try_build_redis_client
  8. from core_shared.task_queue import TaskQueuePublisher
  9. from app.bootstrap.settings import TeamServiceSettings
  10. from app.db.models import TeamDefinition, TeamRun, TeamConfig
  11. from app.domain.repositories import (
  12. TeamDefinitionRepository,
  13. TeamRunRepository,
  14. TeamConfigRepository)
  15. from app.infrastructure.agent_client import AgentServiceClient, AgentServiceClientError
  16. from app.schemas.team import (
  17. TeamConfigCreateRequestDto,
  18. TeamConfigUpdateRequestDto,
  19. TeamCreateRequest,
  20. TeamCreateRequestDto,
  21. TeamDeleteRequestDto,
  22. TeamRunCreateRequest,
  23. TeamRunCreateRequestDto,
  24. TeamRunExecuteRequest,
  25. TeamRunStatusUpdateRequestDto,
  26. TeamRunStatusUpdateRequest,
  27. TeamStatusUpdateRequest,
  28. TeamUpdateRequestDto,
  29. TeamConfigCreateRequest)
  30. MAX_PRIOR_OUTPUT_TEXT_CHARS = 1600
  31. @dataclass(frozen=True)
  32. class TeamMemberRunResult:
  33. member: TeamMemberContract
  34. run: AgentRunContract
  35. class TeamApplicationService:
  36. def __init__(
  37. self,
  38. *,
  39. team_repository: TeamDefinitionRepository,
  40. team_config_repository: TeamConfigRepository,
  41. team_run_repository: TeamRunRepository,
  42. agent_client: AgentServiceClient | None = None,
  43. event_client: EventServiceClient | None = None,
  44. task_queue_publisher: TaskQueuePublisher | None = None) -> None:
  45. self.team_repository = team_repository
  46. self.team_config_repository = team_config_repository
  47. self.team_run_repository = team_run_repository
  48. self.agent_client = agent_client
  49. self.event_client = event_client
  50. self.task_queue_publisher = task_queue_publisher
  51. def create_team(self, payload: TeamCreateRequest) -> TeamDefinition:
  52. return self.team_repository.create(
  53. code=payload.code,
  54. name=payload.name,
  55. description=payload.description,
  56. team_type=payload.team_type,
  57. owner_user_id=payload.owner_user_id,
  58. metadata_json=payload.metadata_json)
  59. def create_team_from_contract(self, payload: TeamCreateRequestDto) -> TeamDefinition:
  60. return self.create_team(
  61. TeamCreateRequest(
  62. code=self._build_team_code(payload.name),
  63. name=payload.name,
  64. description=payload.description,
  65. team_type=payload.teamType,
  66. owner_user_id=payload.ownerUserId,
  67. metadata_json=payload.metadata))
  68. def list_teams(self) -> list[TeamDefinition]:
  69. return self.team_repository.list_all()
  70. def get_team(self, *, team_id: str) -> TeamDefinition | None:
  71. return self.team_repository.get_by_id(team_id=team_id)
  72. def update_team_from_contract(self, payload: TeamUpdateRequestDto) -> TeamDefinition | None:
  73. entity = self.team_repository.get_by_id(team_id=payload.teamId)
  74. if entity is None:
  75. return None
  76. if payload.name is not None:
  77. entity.name = payload.name
  78. entity.code = self._build_team_code(payload.name)
  79. if payload.description is not None:
  80. entity.description = payload.description
  81. if payload.teamType is not None:
  82. entity.team_type = payload.teamType
  83. if payload.status is not None:
  84. entity.status = payload.status
  85. if payload.ownerUserId is not None:
  86. entity.owner_user_id = payload.ownerUserId
  87. if payload.metadata is not None:
  88. entity.metadata_json = payload.metadata
  89. return self.team_repository.save(entity)
  90. def delete_team_from_contract(self, payload: TeamDeleteRequestDto) -> bool:
  91. entity = self.team_repository.get_by_id(team_id=payload.teamId)
  92. if entity is None:
  93. return False
  94. self.team_repository.delete(entity)
  95. return True
  96. def update_team_status(
  97. self,
  98. *,
  99. team_id: str,
  100. payload: TeamStatusUpdateRequest) -> TeamDefinition | None:
  101. return self.team_repository.update_status(
  102. team_id=team_id,
  103. status=payload.status)
  104. def create_team_config(self, payload: TeamConfigCreateRequest) -> TeamConfig:
  105. team = self.team_repository.get_by_id(team_id=payload.team_id)
  106. if team is None:
  107. raise ValueError(f"team not found: {payload.team_id}")
  108. if not payload.member_refs:
  109. raise ValueError("team config requires at least one member")
  110. return self.team_config_repository.create(
  111. team_id=payload.team_id,
  112. coordination_mode=payload.coordination_mode,
  113. objective=payload.objective,
  114. member_refs_json=[item.model_dump(mode="json") for item in payload.member_refs],
  115. policy_json=payload.policy_json)
  116. def create_team_config_from_contract(self, payload: TeamConfigCreateRequestDto) -> TeamConfig:
  117. return self.create_team_config(
  118. TeamConfigCreateRequest(
  119. team_id=payload.teamId,
  120. coordination_mode=payload.coordinationMode,
  121. objective=payload.objective,
  122. member_refs=self._normalize_member_refs(payload.memberRefs),
  123. policy_json=payload.policy))
  124. def list_team_configs(self, *, team_id: str) -> list[TeamConfig]:
  125. return self.team_config_repository.list_by_team(team_id=team_id)
  126. def list_team_configs(self, *, team_id: str | None = None) -> list[TeamConfig]:
  127. if team_id is not None:
  128. return self.team_config_repository.list_by_team(team_id=team_id)
  129. return self.team_config_repository.list_all()
  130. def get_team_config(self, *, config_id: str) -> TeamConfig | None:
  131. return self.team_config_repository.get_by_id(team_config_id=config_id)
  132. def update_team_config_from_contract(self, payload: TeamConfigUpdateRequestDto) -> TeamConfig | None:
  133. entity = self.team_config_repository.get_by_id(team_config_id=payload.configId)
  134. if entity is None:
  135. return None
  136. if payload.coordinationMode is not None:
  137. entity.coordination_mode = payload.coordinationMode
  138. if payload.objective is not None:
  139. entity.objective = payload.objective
  140. if payload.memberRefs is not None:
  141. normalized = self._normalize_member_refs(payload.memberRefs)
  142. if not normalized:
  143. raise ValueError("team config requires at least one member")
  144. entity.member_refs_json = [item.model_dump(mode="json") for item in normalized]
  145. if payload.policy is not None:
  146. entity.policy_json = payload.policy
  147. return self.team_config_repository.save(entity)
  148. def delete_team_config(self, *, config_id: str) -> bool:
  149. entity = self.team_config_repository.get_by_id(team_config_id=config_id)
  150. if entity is None:
  151. return False
  152. self.team_config_repository.delete(entity)
  153. return True
  154. def create_team_run(self, payload: TeamRunCreateRequest) -> TeamRun:
  155. team_config = self._resolve_team_config(
  156. team_id=payload.team_id,
  157. team_config_id=payload.team_config_id)
  158. if team_config is None:
  159. raise ValueError("team config not found")
  160. team_run = self.team_run_repository.create(
  161. team_id=payload.team_id,
  162. team_config_id=team_config.id,
  163. session_id=payload.session_id,
  164. input_text=payload.input_text,
  165. input_json=payload.input_json)
  166. self._publish_event(
  167. event_type="team.run.created",
  168. team_run=team_run,
  169. payload_json={"team_run_id": team_run.id, "status": team_run.status})
  170. if payload.enqueue and self.task_queue_publisher is not None:
  171. self.task_queue_publisher.publish_team_run(
  172. team_run_id=team_run.id)
  173. return team_run
  174. def create_team_run_from_contract(self, payload: TeamRunCreateRequestDto) -> TeamRun:
  175. return self.create_team_run(
  176. TeamRunCreateRequest(
  177. team_id=payload.teamId,
  178. team_config_id=payload.teamConfigId,
  179. session_id=payload.sessionId,
  180. input_text=payload.inputText,
  181. input_json=payload.inputJson,
  182. enqueue=payload.enqueue))
  183. def list_team_runs(
  184. self,
  185. *,
  186. team_id: str | None = None,
  187. session_id: str | None = None) -> list[TeamRun]:
  188. return self.team_run_repository.list_by_scope(
  189. team_id=team_id,
  190. session_id=session_id)
  191. def get_team_run(self, *, team_run_id: str) -> TeamRun | None:
  192. return self.team_run_repository.get_by_id(team_run_id=team_run_id)
  193. def delete_team_run(self, *, team_run_id: str) -> bool:
  194. entity = self.team_run_repository.get_by_id(team_run_id=team_run_id)
  195. if entity is None:
  196. return False
  197. self.team_run_repository.delete(entity)
  198. return True
  199. def update_team_run_status(
  200. self,
  201. *,
  202. team_run_id: str,
  203. payload: TeamRunStatusUpdateRequest) -> TeamRun | None:
  204. entity = self.team_run_repository.get_by_id(
  205. team_run_id=team_run_id)
  206. if entity is None:
  207. return None
  208. return self.team_run_repository.update_status(
  209. team_run_id=team_run_id,
  210. status=payload.status,
  211. worker_key=payload.worker_key,
  212. output_text=payload.output_text,
  213. output_json=payload.output_json,
  214. error_code=payload.error_code,
  215. error_message=payload.error_message)
  216. def update_team_run_status_from_contract(
  217. self,
  218. payload: TeamRunStatusUpdateRequestDto) -> TeamRun | None:
  219. return self.update_team_run_status(
  220. team_run_id=payload.teamRunId,
  221. payload=TeamRunStatusUpdateRequest(
  222. status=payload.status,
  223. worker_key=payload.workerKey,
  224. output_text=payload.outputText,
  225. output_json=payload.outputJson,
  226. error_code=payload.errorCode,
  227. error_message=payload.errorMessage))
  228. def execute_team_run(
  229. self,
  230. *,
  231. team_run_id: str,
  232. payload: TeamRunExecuteRequest) -> TeamRun | None:
  233. team_run = self.team_run_repository.get_by_id(
  234. team_run_id=team_run_id)
  235. if team_run is None:
  236. return None
  237. team_config = self.team_config_repository.get_by_id(
  238. team_config_id=team_run.team_config_id)
  239. if team_config is None:
  240. failed_run = self.team_run_repository.update_status(
  241. team_run_id=team_run.id,
  242. status="failed",
  243. worker_key=payload.worker_key,
  244. error_code="team_config_missing",
  245. error_message=f"team config not found: {team_run.team_config_id}")
  246. if failed_run is not None:
  247. self._publish_event(
  248. event_type="team.run.failed",
  249. team_run=failed_run,
  250. payload_json={
  251. "team_run_id": failed_run.id,
  252. "status": failed_run.status,
  253. "error_code": "team_config_missing",
  254. })
  255. return failed_run
  256. running_run = self.team_run_repository.update_status(
  257. team_run_id=team_run.id,
  258. status="running",
  259. worker_key=payload.worker_key)
  260. if running_run is None:
  261. return None
  262. members = self._read_team_members(team_config)
  263. if not members:
  264. return self.team_run_repository.update_status(
  265. team_run_id=team_run.id,
  266. status="failed",
  267. worker_key=payload.worker_key,
  268. error_code="team_members_missing",
  269. error_message="team config has no valid members")
  270. try:
  271. member_results = self._execute_members(
  272. team_run=team_run,
  273. team_config=team_config,
  274. members=members,
  275. worker_key=payload.worker_key,
  276. dry_run=payload.dry_run)
  277. except AgentServiceClientError as exc:
  278. return self.team_run_repository.update_status(
  279. team_run_id=team_run.id,
  280. status="failed",
  281. worker_key=payload.worker_key,
  282. error_code="agent_service_error",
  283. error_message=str(exc))
  284. failed_results = [item for item in member_results if item.run.status != "completed"]
  285. output_text = self._build_team_output_text(
  286. team_config=team_config,
  287. member_results=member_results)
  288. output_json: dict[str, JSONValue] = {
  289. "dry_run": payload.dry_run,
  290. "coordination_mode": team_config.coordination_mode,
  291. "team_config_id": team_config.id,
  292. "member_run_count": len(member_results),
  293. "member_results": [
  294. self._member_result_to_json(item) for item in member_results
  295. ],
  296. }
  297. if failed_results:
  298. failed_run = self.team_run_repository.update_status(
  299. team_run_id=team_run.id,
  300. status="failed",
  301. worker_key=payload.worker_key,
  302. output_text=output_text,
  303. output_json=output_json,
  304. error_code="member_run_failed",
  305. error_message=f"{len(failed_results)} member run(s) failed")
  306. if failed_run is not None:
  307. self._publish_event(
  308. event_type="team.run.failed",
  309. team_run=failed_run,
  310. payload_json={
  311. "team_run_id": failed_run.id,
  312. "status": failed_run.status,
  313. "failed_member_count": len(failed_results),
  314. })
  315. return failed_run
  316. completed_run = self.team_run_repository.update_status(
  317. team_run_id=team_run.id,
  318. status="completed",
  319. worker_key=payload.worker_key,
  320. output_text=output_text,
  321. output_json=output_json)
  322. if completed_run is not None:
  323. self._publish_event(
  324. event_type="team.run.completed",
  325. team_run=completed_run,
  326. payload_json={
  327. "team_run_id": completed_run.id,
  328. "status": completed_run.status,
  329. "member_run_count": len(member_results),
  330. })
  331. return completed_run
  332. def execute_team_run_stream(
  333. self,
  334. *,
  335. team_run_id: str,
  336. payload: TeamRunExecuteRequest) -> Iterator[dict[str, JSONValue]]:
  337. team_run = self.team_run_repository.get_by_id(team_run_id=team_run_id)
  338. if team_run is None:
  339. return
  340. team_config = self.team_config_repository.get_by_id(
  341. team_config_id=team_run.team_config_id)
  342. if team_config is None:
  343. failed_run = self.team_run_repository.update_status(
  344. team_run_id=team_run.id,
  345. status="failed",
  346. worker_key=payload.worker_key,
  347. error_code="team_config_missing",
  348. error_message=f"team config not found: {team_run.team_config_id}")
  349. yield {"event": "team.run.failed", "run": self._team_run_to_json(failed_run)}
  350. return
  351. running_run = self.team_run_repository.update_status(
  352. team_run_id=team_run.id,
  353. status="running",
  354. worker_key=payload.worker_key)
  355. yield {"event": "team.run.started", "run": self._team_run_to_json(running_run)}
  356. members = self._read_team_members(team_config)
  357. if not members:
  358. failed_run = self.team_run_repository.update_status(
  359. team_run_id=team_run.id,
  360. status="failed",
  361. worker_key=payload.worker_key,
  362. error_code="team_members_missing",
  363. error_message="team config has no valid members")
  364. yield {"event": "team.run.failed", "run": self._team_run_to_json(failed_run)}
  365. return
  366. if self.agent_client is None:
  367. failed_run = self.team_run_repository.update_status(
  368. team_run_id=team_run.id,
  369. status="failed",
  370. worker_key=payload.worker_key,
  371. error_code="agent_service_missing",
  372. error_message="agent service client is not configured")
  373. yield {"event": "team.run.failed", "run": self._team_run_to_json(failed_run)}
  374. return
  375. stream_members = self._select_stream_members(team_config=team_config, members=members)
  376. member_results: list[TeamMemberRunResult] = []
  377. prior_outputs: list[dict[str, JSONValue]] = []
  378. for member in stream_members:
  379. member_input_json = self._build_member_input_json(
  380. team_run=team_run,
  381. team_config=team_config,
  382. member=member,
  383. prior_outputs=prior_outputs)
  384. created_run = self.agent_client.create_agent_run(
  385. agent_id=member.agent_id,
  386. agent_config_id=member.agent_config_id,
  387. session_id=team_run.session_id,
  388. input_text=self._build_member_input_text(
  389. team_run=team_run,
  390. team_config=team_config,
  391. member=member),
  392. input_json=member_input_json)
  393. yield {
  394. "event": "team.member.started",
  395. "member": self._member_to_json(member),
  396. "agent_run": created_run.model_dump(mode="json"),
  397. }
  398. final_agent_run = created_run
  399. try:
  400. for event_name, data in self.agent_client.execute_agent_run_stream(
  401. agent_run_id=created_run.id,
  402. worker_key=payload.worker_key,
  403. dry_run=payload.dry_run):
  404. if event_name == "agent.run.delta":
  405. delta = data.get("delta")
  406. if isinstance(delta, str):
  407. yield {
  408. "event": "team.member.delta",
  409. "member": self._member_to_json(member),
  410. "agent_run_id": created_run.id,
  411. "delta": delta,
  412. }
  413. elif event_name in {"agent.run.completed", "agent.run.failed"}:
  414. run_payload = data.get("run")
  415. if isinstance(run_payload, dict):
  416. final_agent_run = AgentRunContract.model_validate(run_payload)
  417. except AgentServiceClientError as exc:
  418. failed_agent_run = created_run.model_copy(
  419. update={
  420. "status": "failed",
  421. "error_code": "agent_service_error",
  422. "error_message": str(exc),
  423. })
  424. final_agent_run = failed_agent_run
  425. result = TeamMemberRunResult(member=member, run=final_agent_run)
  426. member_results.append(result)
  427. prior_outputs.append(self._compact_prior_output(result))
  428. yield {
  429. "event": "team.member.completed",
  430. "member": self._member_to_json(member),
  431. "agent_run": final_agent_run.model_dump(mode="json"),
  432. }
  433. failed_results = [item for item in member_results if item.run.status != "completed"]
  434. output_text = self._build_team_output_text(
  435. team_config=team_config,
  436. member_results=member_results)
  437. output_json: dict[str, JSONValue] = {
  438. "dry_run": payload.dry_run,
  439. "coordination_mode": team_config.coordination_mode,
  440. "team_config_id": team_config.id,
  441. "member_run_count": len(member_results),
  442. "member_results": [
  443. self._member_result_to_json(item) for item in member_results
  444. ],
  445. "streamed": True,
  446. "response_mode": self._read_response_mode(team_config),
  447. }
  448. if failed_results:
  449. failed_run = self.team_run_repository.update_status(
  450. team_run_id=team_run.id,
  451. status="failed",
  452. worker_key=payload.worker_key,
  453. output_text=output_text,
  454. output_json=output_json,
  455. error_code="member_run_failed",
  456. error_message=f"{len(failed_results)} member run(s) failed")
  457. yield {"event": "team.run.failed", "run": self._team_run_to_json(failed_run)}
  458. return
  459. completed_run = self.team_run_repository.update_status(
  460. team_run_id=team_run.id,
  461. status="completed",
  462. worker_key=payload.worker_key,
  463. output_text=output_text,
  464. output_json=output_json)
  465. yield {"event": "team.run.completed", "run": self._team_run_to_json(completed_run)}
  466. def execute_next_claimed_team_run(
  467. self,
  468. *,
  469. worker_key: str,
  470. lease_seconds: int,
  471. stale_running_seconds: int,
  472. dry_run: bool,
  473. redis_client: object | None = None) -> tuple[TeamRun, int] | None:
  474. released_lease_count = self.team_run_repository.release_expired_leases(
  475. now_time=datetime.utcnow(),
  476. stale_running_seconds=stale_running_seconds)
  477. claimed_team_run = self.team_run_repository.claim_next_queued(
  478. worker_key=worker_key,
  479. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds))
  480. if claimed_team_run is None:
  481. return None
  482. if redis_client is not None:
  483. from core_shared.redis_primitives import DistributedLock, IdempotencyStore
  484. lock = DistributedLock(
  485. client=redis_client,
  486. name=f"team-run:{claimed_team_run.id}:lock",
  487. ttl_seconds=lease_seconds)
  488. if not lock.acquire():
  489. return None
  490. idempotency_store = IdempotencyStore(
  491. client=redis_client,
  492. prefix="team-run-idempotency")
  493. if not idempotency_store.begin(key=claimed_team_run.id):
  494. lock.release()
  495. return None
  496. else:
  497. lock = None
  498. idempotency_store = None
  499. try:
  500. result = self.execute_team_run(
  501. team_run_id=claimed_team_run.id,
  502. payload=TeamRunExecuteRequest(
  503. worker_key=worker_key,
  504. dry_run=dry_run))
  505. if idempotency_store is not None and result is not None:
  506. idempotency_store.complete(
  507. key=claimed_team_run.id,
  508. result={"status": result.status, "team_run_id": result.id})
  509. finally:
  510. if lock is not None:
  511. lock.release()
  512. if result is None:
  513. return None
  514. return result, released_lease_count
  515. def _resolve_team_config(
  516. self,
  517. *,
  518. team_id: str,
  519. team_config_id: str | None) -> TeamConfig | None:
  520. if team_config_id is not None:
  521. return self.team_config_repository.get_by_id(
  522. team_config_id=team_config_id)
  523. return self.team_config_repository.get_latest_by_team(
  524. team_id=team_id)
  525. def _execute_members(
  526. self,
  527. *,
  528. team_run: TeamRun,
  529. team_config: TeamConfig,
  530. members: list[TeamMemberContract],
  531. worker_key: str | None,
  532. dry_run: bool) -> list[TeamMemberRunResult]:
  533. if self.agent_client is None:
  534. raise AgentServiceClientError("agent service client is not configured")
  535. ordered_members = self._order_members(members)
  536. if self._should_execute_members_in_parallel(team_config):
  537. return self._execute_members_in_parallel(
  538. team_run=team_run,
  539. team_config=team_config,
  540. members=ordered_members,
  541. worker_key=worker_key,
  542. dry_run=dry_run)
  543. member_results: list[TeamMemberRunResult] = []
  544. prior_outputs: list[dict[str, JSONValue]] = []
  545. for member in ordered_members:
  546. member_input_json = self._build_member_input_json(
  547. team_run=team_run,
  548. team_config=team_config,
  549. member=member,
  550. prior_outputs=prior_outputs)
  551. result = self._execute_single_member(
  552. team_run=team_run,
  553. team_config=team_config,
  554. member=member,
  555. member_input_json=member_input_json,
  556. worker_key=worker_key,
  557. dry_run=dry_run)
  558. member_results.append(result)
  559. prior_outputs.append(self._compact_prior_output(result))
  560. return member_results
  561. def _execute_members_in_parallel(
  562. self,
  563. *,
  564. team_run: TeamRun,
  565. team_config: TeamConfig,
  566. members: list[TeamMemberContract],
  567. worker_key: str | None,
  568. dry_run: bool) -> list[TeamMemberRunResult]:
  569. results_by_key: dict[str, TeamMemberRunResult] = {}
  570. with ThreadPoolExecutor(max_workers=max(1, min(len(members), 8))) as executor:
  571. future_by_member = {
  572. executor.submit(
  573. self._execute_single_member,
  574. team_run=team_run,
  575. team_config=team_config,
  576. member=member,
  577. member_input_json=self._build_member_input_json(
  578. team_run=team_run,
  579. team_config=team_config,
  580. member=member,
  581. prior_outputs=[]),
  582. worker_key=worker_key,
  583. dry_run=dry_run): member
  584. for member in members
  585. }
  586. for future in as_completed(future_by_member):
  587. member = future_by_member[future]
  588. results_by_key[member.member_key] = future.result()
  589. return [results_by_key[member.member_key] for member in members]
  590. def _execute_single_member(
  591. self,
  592. *,
  593. team_run: TeamRun,
  594. team_config: TeamConfig,
  595. member: TeamMemberContract,
  596. member_input_json: dict[str, JSONValue],
  597. worker_key: str | None,
  598. dry_run: bool) -> TeamMemberRunResult:
  599. if self.agent_client is None:
  600. raise AgentServiceClientError("agent service client is not configured")
  601. created_run = self.agent_client.create_agent_run(
  602. agent_id=member.agent_id,
  603. agent_config_id=member.agent_config_id,
  604. session_id=team_run.session_id,
  605. input_text=self._build_member_input_text(
  606. team_run=team_run,
  607. team_config=team_config,
  608. member=member),
  609. input_json=member_input_json)
  610. executed_run = self.agent_client.execute_agent_run(
  611. agent_run_id=created_run.id,
  612. worker_key=worker_key,
  613. dry_run=dry_run)
  614. return TeamMemberRunResult(member=member, run=executed_run)
  615. def _should_execute_members_in_parallel(self, team_config: TeamConfig) -> bool:
  616. handoff = team_config.policy_json.get("handoff")
  617. return team_config.coordination_mode == "parallel" or handoff == "parallel_merge"
  618. def _read_team_members(self, team_config: TeamConfig) -> list[TeamMemberContract]:
  619. members: list[TeamMemberContract] = []
  620. for item in team_config.member_refs_json:
  621. try:
  622. members.append(TeamMemberContract.model_validate(item))
  623. except ValueError:
  624. continue
  625. return members
  626. def _order_members(self, members: list[TeamMemberContract]) -> list[TeamMemberContract]:
  627. role_priority = {
  628. "planner": 0,
  629. "supervisor": 1,
  630. "specialist": 2,
  631. "executor": 3,
  632. "reviewer": 4,
  633. }
  634. return sorted(members, key=lambda item: role_priority.get(item.role, 10))
  635. def _select_stream_members(
  636. self,
  637. *,
  638. team_config: TeamConfig,
  639. members: list[TeamMemberContract]) -> list[TeamMemberContract]:
  640. ordered_members = self._order_members(members)
  641. if self._read_response_mode(team_config) == "all_members":
  642. return ordered_members
  643. for member in ordered_members:
  644. if member.role in {"supervisor", "planner"}:
  645. return [member]
  646. return ordered_members[:1]
  647. def _read_response_mode(self, team_config: TeamConfig) -> str:
  648. value = team_config.policy_json.get("response_mode")
  649. if isinstance(value, str) and value in {"single_responder", "all_members"}:
  650. return value
  651. return "single_responder"
  652. def _build_member_input_text(
  653. self,
  654. *,
  655. team_run: TeamRun,
  656. team_config: TeamConfig,
  657. member: TeamMemberContract) -> str:
  658. lines = [
  659. f"Team objective: {team_config.objective or 'No objective provided.'}",
  660. f"Member role: {member.role}",
  661. ]
  662. if member.responsibility:
  663. lines.append(f"Responsibility: {member.responsibility}")
  664. if team_run.input_text:
  665. lines.append(f"User task: {team_run.input_text}")
  666. return "\n".join(lines)
  667. def _build_member_input_json(
  668. self,
  669. *,
  670. team_run: TeamRun,
  671. team_config: TeamConfig,
  672. member: TeamMemberContract,
  673. prior_outputs: list[dict[str, JSONValue]]) -> dict[str, JSONValue]:
  674. input_json: dict[str, JSONValue] = dict(team_run.input_json or {})
  675. input_json.update(
  676. {
  677. "team_id": team_run.team_id,
  678. "team_run_id": team_run.id,
  679. "team_config_id": team_config.id,
  680. "team_objective": team_config.objective,
  681. "member_key": member.member_key,
  682. "member_role": member.role,
  683. "member_responsibility": member.responsibility,
  684. "prior_member_outputs": prior_outputs,
  685. }
  686. )
  687. configured_input = member.config_json.get("input_json")
  688. if isinstance(configured_input, dict):
  689. input_json.update(
  690. {str(item_key): item_value for item_key, item_value in configured_input.items()}
  691. )
  692. return input_json
  693. def _build_team_output_text(
  694. self,
  695. *,
  696. team_config: TeamConfig,
  697. member_results: list[TeamMemberRunResult]) -> str:
  698. lines = [
  699. f"Team objective: {team_config.objective or 'No objective provided.'}",
  700. f"Coordination mode: {team_config.coordination_mode}",
  701. "Team conversation:",
  702. ]
  703. for index, item in enumerate(member_results, start=1):
  704. output_text = item.run.output_text or item.run.error_message or ""
  705. speaker = item.member.name or item.member.member_key
  706. lines.append(
  707. f"{index}. {speaker} ({item.member.role}) "
  708. f"status={item.run.status}: {output_text}")
  709. return "\n".join(lines)
  710. def _member_result_to_json(self, result: TeamMemberRunResult) -> dict[str, JSONValue]:
  711. return {
  712. "member_key": result.member.member_key,
  713. "member_role": result.member.role,
  714. "member_name": result.member.name,
  715. "member_responsibility": result.member.responsibility,
  716. "agent_run_id": result.run.id,
  717. "agent_id": result.run.agent_id,
  718. "agent_config_id": result.run.agent_config_id,
  719. "status": result.run.status,
  720. "output_text": result.run.output_text,
  721. "output_json": self._compact_agent_output_json(result.run.output_json or {}),
  722. "error_code": result.run.error_code,
  723. "error_message": result.run.error_message,
  724. }
  725. def _member_to_json(self, member: TeamMemberContract) -> dict[str, JSONValue]:
  726. return member.model_dump(mode="json")
  727. def _team_run_to_json(self, team_run: TeamRun | None) -> dict[str, JSONValue]:
  728. if team_run is None:
  729. return {}
  730. return {
  731. "id": team_run.id,
  732. "team_id": team_run.team_id,
  733. "team_config_id": team_run.team_config_id,
  734. "session_id": team_run.session_id,
  735. "input_text": team_run.input_text,
  736. "input_json": team_run.input_json,
  737. "output_text": team_run.output_text,
  738. "output_json": team_run.output_json,
  739. "status": team_run.status,
  740. "worker_key": team_run.worker_key,
  741. "queued_time": team_run.queued_time,
  742. "lease_expire_time": team_run.lease_expire_time,
  743. "started_time": team_run.started_time,
  744. "finished_time": team_run.finished_time,
  745. "error_code": team_run.error_code,
  746. "error_message": team_run.error_message,
  747. "created_time": team_run.created_time,
  748. }
  749. def _compact_prior_output(self, result: TeamMemberRunResult) -> dict[str, JSONValue]:
  750. return {
  751. "member_key": result.member.member_key,
  752. "member_role": result.member.role,
  753. "member_name": result.member.name,
  754. "agent_id": result.run.agent_id,
  755. "agent_run_id": result.run.id,
  756. "status": result.run.status,
  757. "output_text": self._truncate_text(result.run.output_text),
  758. "error_code": result.run.error_code,
  759. "error_message": result.run.error_message,
  760. "output_json": self._compact_agent_output_json(result.run.output_json or {}),
  761. }
  762. def _compact_agent_output_json(
  763. self,
  764. output_json: dict[str, JSONValue]) -> dict[str, JSONValue]:
  765. keep_keys = {
  766. "dry_run",
  767. "model",
  768. "finish_reason",
  769. "usage_json",
  770. "tool_invocations",
  771. "skill_invocations",
  772. "selected_tool_refs",
  773. "selected_skill_refs",
  774. "memory_read_enabled",
  775. "memory_read_count",
  776. "memory_read_reason",
  777. }
  778. compacted = {
  779. key: value for key, value in output_json.items()
  780. if key in keep_keys
  781. }
  782. if "raw_response_json" in output_json or "messages" in output_json:
  783. compacted["debug_payload_omitted"] = True
  784. return compacted
  785. def _truncate_text(self, value: str | None) -> str | None:
  786. if value is None or len(value) <= MAX_PRIOR_OUTPUT_TEXT_CHARS:
  787. return value
  788. return f"{value[:MAX_PRIOR_OUTPUT_TEXT_CHARS]}... [truncated]"
  789. def _publish_event(
  790. self,
  791. *,
  792. event_type: str,
  793. team_run: TeamRun,
  794. payload_json: dict[str, JSONValue]) -> None:
  795. if self.event_client is None:
  796. return
  797. try:
  798. self.event_client.publish_event(
  799. EventPublishContract(
  800. event_type=event_type,
  801. source_service="team-service",
  802. aggregate_type="team_run",
  803. aggregate_id=team_run.id,
  804. correlation_id=team_run.session_id,
  805. payload_json={
  806. **payload_json,
  807. "team_id": team_run.team_id,
  808. "team_config_id": team_run.team_config_id,
  809. })
  810. )
  811. except EventServiceClientError:
  812. return
  813. def _build_team_code(self, name: str) -> str:
  814. base = "".join(
  815. char.lower() if char.isalnum() else "_"
  816. for char in name
  817. ).strip("_") or "team"
  818. return base[:64]
  819. def _normalize_member_refs(self, member_refs: list[dict[str, JSONValue]]) -> list[TeamMemberContract]:
  820. members: list[TeamMemberContract] = []
  821. for index, item in enumerate(member_refs, start=1):
  822. role = item.get("role")
  823. normalized_role = "executor" if role == "worker" else role
  824. member = {
  825. **item,
  826. "member_key": item.get("member_key") or item.get("memberKey") or f"member_{index}",
  827. "agent_id": item.get("agent_id") or item.get("agentId"),
  828. "agent_config_id": item.get("agent_config_id") or item.get("agentConfigId"),
  829. "role": normalized_role or "specialist",
  830. "config_json": item.get("config_json") or item.get("configJson") or {},
  831. }
  832. members.append(TeamMemberContract.model_validate(member))
  833. return members
  834. def build_team_application_service(
  835. *,
  836. team_repository: TeamDefinitionRepository,
  837. team_config_repository: TeamConfigRepository,
  838. team_run_repository: TeamRunRepository,
  839. settings: TeamServiceSettings) -> TeamApplicationService:
  840. redis_client = try_build_redis_client(settings.redis_url)
  841. return TeamApplicationService(
  842. team_repository=team_repository,
  843. team_config_repository=team_config_repository,
  844. team_run_repository=team_run_repository,
  845. agent_client=AgentServiceClient(
  846. base_url=settings.agent_service_url,
  847. timeout_seconds=settings.agent_service_timeout_seconds),
  848. event_client=EventServiceClient(
  849. base_url=settings.event_service_url,
  850. timeout_seconds=settings.event_service_timeout_seconds),
  851. task_queue_publisher=(
  852. TaskQueuePublisher(client=redis_client) if redis_client is not None else None
  853. ))