services.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738
  1. from dataclasses import dataclass
  2. from datetime import datetime, timedelta
  3. from concurrent.futures import ThreadPoolExecutor, as_completed
  4. from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
  5. from core_domain import AgentRunContract, TeamMemberContract
  6. from core_shared import JSONValue, try_build_redis_client
  7. from core_shared.task_queue import TaskQueuePublisher
  8. from app.bootstrap.settings import TeamServiceSettings
  9. from app.db.models import TeamDefinition, TeamRun, TeamConfig
  10. from app.domain.repositories import (
  11. TeamDefinitionRepository,
  12. TeamRunRepository,
  13. TeamConfigRepository)
  14. from app.infrastructure.agent_client import AgentServiceClient, AgentServiceClientError
  15. from app.schemas.team import (
  16. TeamConfigCreateRequestDto,
  17. TeamConfigUpdateRequestDto,
  18. TeamCreateRequest,
  19. TeamCreateRequestDto,
  20. TeamDeleteRequestDto,
  21. TeamRunCreateRequest,
  22. TeamRunCreateRequestDto,
  23. TeamRunExecuteRequest,
  24. TeamRunStatusUpdateRequestDto,
  25. TeamRunStatusUpdateRequest,
  26. TeamStatusUpdateRequest,
  27. TeamUpdateRequestDto,
  28. TeamConfigCreateRequest)
  29. MAX_PRIOR_OUTPUT_TEXT_CHARS = 1600
  30. @dataclass(frozen=True)
  31. class TeamMemberRunResult:
  32. member: TeamMemberContract
  33. run: AgentRunContract
  34. class TeamApplicationService:
  35. def __init__(
  36. self,
  37. *,
  38. team_repository: TeamDefinitionRepository,
  39. team_config_repository: TeamConfigRepository,
  40. team_run_repository: TeamRunRepository,
  41. agent_client: AgentServiceClient | None = None,
  42. event_client: EventServiceClient | None = None,
  43. task_queue_publisher: TaskQueuePublisher | None = None) -> None:
  44. self.team_repository = team_repository
  45. self.team_config_repository = team_config_repository
  46. self.team_run_repository = team_run_repository
  47. self.agent_client = agent_client
  48. self.event_client = event_client
  49. self.task_queue_publisher = task_queue_publisher
  50. def create_team(self, payload: TeamCreateRequest) -> TeamDefinition:
  51. return self.team_repository.create(
  52. code=payload.code,
  53. name=payload.name,
  54. description=payload.description,
  55. team_type=payload.team_type,
  56. owner_user_id=payload.owner_user_id,
  57. metadata_json=payload.metadata_json)
  58. def create_team_from_contract(self, payload: TeamCreateRequestDto) -> TeamDefinition:
  59. return self.create_team(
  60. TeamCreateRequest(
  61. code=self._build_team_code(payload.name),
  62. name=payload.name,
  63. description=payload.description,
  64. team_type=payload.teamType,
  65. owner_user_id=payload.ownerUserId,
  66. metadata_json=payload.metadata))
  67. def list_teams(self) -> list[TeamDefinition]:
  68. return self.team_repository.list_all()
  69. def get_team(self, *, team_id: str) -> TeamDefinition | None:
  70. return self.team_repository.get_by_id(team_id=team_id)
  71. def update_team_from_contract(self, payload: TeamUpdateRequestDto) -> TeamDefinition | None:
  72. entity = self.team_repository.get_by_id(team_id=payload.teamId)
  73. if entity is None:
  74. return None
  75. if payload.name is not None:
  76. entity.name = payload.name
  77. entity.code = self._build_team_code(payload.name)
  78. if payload.description is not None:
  79. entity.description = payload.description
  80. if payload.teamType is not None:
  81. entity.team_type = payload.teamType
  82. if payload.status is not None:
  83. entity.status = payload.status
  84. if payload.ownerUserId is not None:
  85. entity.owner_user_id = payload.ownerUserId
  86. if payload.metadata is not None:
  87. entity.metadata_json = payload.metadata
  88. return self.team_repository.save(entity)
  89. def delete_team_from_contract(self, payload: TeamDeleteRequestDto) -> bool:
  90. entity = self.team_repository.get_by_id(team_id=payload.teamId)
  91. if entity is None:
  92. return False
  93. self.team_repository.delete(entity)
  94. return True
  95. def update_team_status(
  96. self,
  97. *,
  98. team_id: str,
  99. payload: TeamStatusUpdateRequest) -> TeamDefinition | None:
  100. return self.team_repository.update_status(
  101. team_id=team_id,
  102. status=payload.status)
  103. def create_team_config(self, payload: TeamConfigCreateRequest) -> TeamConfig:
  104. team = self.team_repository.get_by_id(team_id=payload.team_id)
  105. if team is None:
  106. raise ValueError(f"team not found: {payload.team_id}")
  107. if not payload.member_refs:
  108. raise ValueError("team config requires at least one member")
  109. return self.team_config_repository.create(
  110. team_id=payload.team_id,
  111. coordination_mode=payload.coordination_mode,
  112. objective=payload.objective,
  113. member_refs_json=[item.model_dump(mode="json") for item in payload.member_refs],
  114. policy_json=payload.policy_json)
  115. def create_team_config_from_contract(self, payload: TeamConfigCreateRequestDto) -> TeamConfig:
  116. return self.create_team_config(
  117. TeamConfigCreateRequest(
  118. team_id=payload.teamId,
  119. coordination_mode=payload.coordinationMode,
  120. objective=payload.objective,
  121. member_refs=self._normalize_member_refs(payload.memberRefs),
  122. policy_json=payload.policy))
  123. def list_team_configs(self, *, team_id: str) -> list[TeamConfig]:
  124. return self.team_config_repository.list_by_team(team_id=team_id)
  125. def list_team_configs(self, *, team_id: str | None = None) -> list[TeamConfig]:
  126. if team_id is not None:
  127. return self.team_config_repository.list_by_team(team_id=team_id)
  128. return self.team_config_repository.list_all()
  129. def get_team_config(self, *, config_id: str) -> TeamConfig | None:
  130. return self.team_config_repository.get_by_id(team_config_id=config_id)
  131. def update_team_config_from_contract(self, payload: TeamConfigUpdateRequestDto) -> TeamConfig | None:
  132. entity = self.team_config_repository.get_by_id(team_config_id=payload.configId)
  133. if entity is None:
  134. return None
  135. if payload.coordinationMode is not None:
  136. entity.coordination_mode = payload.coordinationMode
  137. if payload.objective is not None:
  138. entity.objective = payload.objective
  139. if payload.memberRefs is not None:
  140. normalized = self._normalize_member_refs(payload.memberRefs)
  141. if not normalized:
  142. raise ValueError("team config requires at least one member")
  143. entity.member_refs_json = [item.model_dump(mode="json") for item in normalized]
  144. if payload.policy is not None:
  145. entity.policy_json = payload.policy
  146. return self.team_config_repository.save(entity)
  147. def delete_team_config(self, *, config_id: str) -> bool:
  148. entity = self.team_config_repository.get_by_id(team_config_id=config_id)
  149. if entity is None:
  150. return False
  151. self.team_config_repository.delete(entity)
  152. return True
  153. def create_team_run(self, payload: TeamRunCreateRequest) -> TeamRun:
  154. team_config = self._resolve_team_config(
  155. team_id=payload.team_id,
  156. team_config_id=payload.team_config_id)
  157. if team_config is None:
  158. raise ValueError("team config not found")
  159. team_run = self.team_run_repository.create(
  160. team_id=payload.team_id,
  161. team_config_id=team_config.id,
  162. session_id=payload.session_id,
  163. input_text=payload.input_text,
  164. input_json=payload.input_json)
  165. self._publish_event(
  166. event_type="team.run.created",
  167. team_run=team_run,
  168. payload_json={"team_run_id": team_run.id, "status": team_run.status})
  169. if self.task_queue_publisher is not None:
  170. self.task_queue_publisher.publish_team_run(
  171. team_run_id=team_run.id)
  172. return team_run
  173. def create_team_run_from_contract(self, payload: TeamRunCreateRequestDto) -> TeamRun:
  174. return self.create_team_run(
  175. TeamRunCreateRequest(
  176. team_id=payload.teamId,
  177. team_config_id=payload.teamConfigId,
  178. session_id=payload.sessionId,
  179. input_text=payload.inputText,
  180. input_json=payload.inputJson))
  181. def list_team_runs(
  182. self,
  183. *,
  184. team_id: str | None = None,
  185. session_id: str | None = None) -> list[TeamRun]:
  186. return self.team_run_repository.list_by_scope(
  187. team_id=team_id,
  188. session_id=session_id)
  189. def get_team_run(self, *, team_run_id: str) -> TeamRun | None:
  190. return self.team_run_repository.get_by_id(team_run_id=team_run_id)
  191. def delete_team_run(self, *, team_run_id: str) -> bool:
  192. entity = self.team_run_repository.get_by_id(team_run_id=team_run_id)
  193. if entity is None:
  194. return False
  195. self.team_run_repository.delete(entity)
  196. return True
  197. def update_team_run_status(
  198. self,
  199. *,
  200. team_run_id: str,
  201. payload: TeamRunStatusUpdateRequest) -> TeamRun | None:
  202. entity = self.team_run_repository.get_by_id(
  203. team_run_id=team_run_id)
  204. if entity is None:
  205. return None
  206. return self.team_run_repository.update_status(
  207. team_run_id=team_run_id,
  208. status=payload.status,
  209. worker_key=payload.worker_key,
  210. output_text=payload.output_text,
  211. output_json=payload.output_json,
  212. error_code=payload.error_code,
  213. error_message=payload.error_message)
  214. def update_team_run_status_from_contract(
  215. self,
  216. payload: TeamRunStatusUpdateRequestDto) -> TeamRun | None:
  217. return self.update_team_run_status(
  218. team_run_id=payload.teamRunId,
  219. payload=TeamRunStatusUpdateRequest(
  220. status=payload.status,
  221. worker_key=payload.workerKey,
  222. output_text=payload.outputText,
  223. output_json=payload.outputJson,
  224. error_code=payload.errorCode,
  225. error_message=payload.errorMessage))
  226. def execute_team_run(
  227. self,
  228. *,
  229. team_run_id: str,
  230. payload: TeamRunExecuteRequest) -> TeamRun | None:
  231. team_run = self.team_run_repository.get_by_id(
  232. team_run_id=team_run_id)
  233. if team_run is None:
  234. return None
  235. team_config = self.team_config_repository.get_by_id(
  236. team_config_id=team_run.team_config_id)
  237. if team_config is None:
  238. failed_run = self.team_run_repository.update_status(
  239. team_run_id=team_run.id,
  240. status="failed",
  241. worker_key=payload.worker_key,
  242. error_code="team_config_missing",
  243. error_message=f"team config not found: {team_run.team_config_id}")
  244. if failed_run is not None:
  245. self._publish_event(
  246. event_type="team.run.failed",
  247. team_run=failed_run,
  248. payload_json={
  249. "team_run_id": failed_run.id,
  250. "status": failed_run.status,
  251. "error_code": "team_config_missing",
  252. })
  253. return failed_run
  254. running_run = self.team_run_repository.update_status(
  255. team_run_id=team_run.id,
  256. status="running",
  257. worker_key=payload.worker_key)
  258. if running_run is None:
  259. return None
  260. members = self._read_team_members(team_config)
  261. if not members:
  262. return self.team_run_repository.update_status(
  263. team_run_id=team_run.id,
  264. status="failed",
  265. worker_key=payload.worker_key,
  266. error_code="team_members_missing",
  267. error_message="team config has no valid members")
  268. try:
  269. member_results = self._execute_members(
  270. team_run=team_run,
  271. team_config=team_config,
  272. members=members,
  273. worker_key=payload.worker_key,
  274. dry_run=payload.dry_run)
  275. except AgentServiceClientError as exc:
  276. return self.team_run_repository.update_status(
  277. team_run_id=team_run.id,
  278. status="failed",
  279. worker_key=payload.worker_key,
  280. error_code="agent_service_error",
  281. error_message=str(exc))
  282. failed_results = [item for item in member_results if item.run.status != "completed"]
  283. output_text = self._build_team_output_text(
  284. team_config=team_config,
  285. member_results=member_results)
  286. output_json: dict[str, JSONValue] = {
  287. "dry_run": payload.dry_run,
  288. "coordination_mode": team_config.coordination_mode,
  289. "team_config_id": team_config.id,
  290. "member_run_count": len(member_results),
  291. "member_results": [
  292. self._member_result_to_json(item) for item in member_results
  293. ],
  294. }
  295. if failed_results:
  296. failed_run = self.team_run_repository.update_status(
  297. team_run_id=team_run.id,
  298. status="failed",
  299. worker_key=payload.worker_key,
  300. output_text=output_text,
  301. output_json=output_json,
  302. error_code="member_run_failed",
  303. error_message=f"{len(failed_results)} member run(s) failed")
  304. if failed_run is not None:
  305. self._publish_event(
  306. event_type="team.run.failed",
  307. team_run=failed_run,
  308. payload_json={
  309. "team_run_id": failed_run.id,
  310. "status": failed_run.status,
  311. "failed_member_count": len(failed_results),
  312. })
  313. return failed_run
  314. completed_run = self.team_run_repository.update_status(
  315. team_run_id=team_run.id,
  316. status="completed",
  317. worker_key=payload.worker_key,
  318. output_text=output_text,
  319. output_json=output_json)
  320. if completed_run is not None:
  321. self._publish_event(
  322. event_type="team.run.completed",
  323. team_run=completed_run,
  324. payload_json={
  325. "team_run_id": completed_run.id,
  326. "status": completed_run.status,
  327. "member_run_count": len(member_results),
  328. })
  329. return completed_run
  330. def execute_next_claimed_team_run(
  331. self,
  332. *,
  333. worker_key: str,
  334. lease_seconds: int,
  335. stale_running_seconds: int,
  336. dry_run: bool,
  337. redis_client: object | None = None) -> tuple[TeamRun, int] | None:
  338. released_lease_count = self.team_run_repository.release_expired_leases(
  339. now_time=datetime.utcnow(),
  340. stale_running_seconds=stale_running_seconds)
  341. claimed_team_run = self.team_run_repository.claim_next_queued(
  342. worker_key=worker_key,
  343. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds))
  344. if claimed_team_run is None:
  345. return None
  346. if redis_client is not None:
  347. from core_shared.redis_primitives import DistributedLock, IdempotencyStore
  348. lock = DistributedLock(
  349. client=redis_client,
  350. name=f"team-run:{claimed_team_run.id}:lock",
  351. ttl_seconds=lease_seconds)
  352. if not lock.acquire():
  353. return None
  354. idempotency_store = IdempotencyStore(
  355. client=redis_client,
  356. prefix="team-run-idempotency")
  357. if not idempotency_store.begin(key=claimed_team_run.id):
  358. lock.release()
  359. return None
  360. else:
  361. lock = None
  362. idempotency_store = None
  363. try:
  364. result = self.execute_team_run(
  365. team_run_id=claimed_team_run.id,
  366. payload=TeamRunExecuteRequest(
  367. worker_key=worker_key,
  368. dry_run=dry_run))
  369. if idempotency_store is not None and result is not None:
  370. idempotency_store.complete(
  371. key=claimed_team_run.id,
  372. result={"status": result.status, "team_run_id": result.id})
  373. finally:
  374. if lock is not None:
  375. lock.release()
  376. if result is None:
  377. return None
  378. return result, released_lease_count
  379. def _resolve_team_config(
  380. self,
  381. *,
  382. team_id: str,
  383. team_config_id: str | None) -> TeamConfig | None:
  384. if team_config_id is not None:
  385. return self.team_config_repository.get_by_id(
  386. team_config_id=team_config_id)
  387. return self.team_config_repository.get_latest_by_team(
  388. team_id=team_id)
  389. def _execute_members(
  390. self,
  391. *,
  392. team_run: TeamRun,
  393. team_config: TeamConfig,
  394. members: list[TeamMemberContract],
  395. worker_key: str | None,
  396. dry_run: bool) -> list[TeamMemberRunResult]:
  397. if self.agent_client is None:
  398. raise AgentServiceClientError("agent service client is not configured")
  399. ordered_members = self._order_members(members)
  400. if self._should_execute_members_in_parallel(team_config):
  401. return self._execute_members_in_parallel(
  402. team_run=team_run,
  403. team_config=team_config,
  404. members=ordered_members,
  405. worker_key=worker_key,
  406. dry_run=dry_run)
  407. member_results: list[TeamMemberRunResult] = []
  408. prior_outputs: list[dict[str, JSONValue]] = []
  409. for member in ordered_members:
  410. member_input_json = self._build_member_input_json(
  411. team_run=team_run,
  412. team_config=team_config,
  413. member=member,
  414. prior_outputs=prior_outputs)
  415. result = self._execute_single_member(
  416. team_run=team_run,
  417. team_config=team_config,
  418. member=member,
  419. member_input_json=member_input_json,
  420. worker_key=worker_key,
  421. dry_run=dry_run)
  422. member_results.append(result)
  423. prior_outputs.append(self._compact_prior_output(result))
  424. return member_results
  425. def _execute_members_in_parallel(
  426. self,
  427. *,
  428. team_run: TeamRun,
  429. team_config: TeamConfig,
  430. members: list[TeamMemberContract],
  431. worker_key: str | None,
  432. dry_run: bool) -> list[TeamMemberRunResult]:
  433. results_by_key: dict[str, TeamMemberRunResult] = {}
  434. with ThreadPoolExecutor(max_workers=max(1, min(len(members), 8))) as executor:
  435. future_by_member = {
  436. executor.submit(
  437. self._execute_single_member,
  438. team_run=team_run,
  439. team_config=team_config,
  440. member=member,
  441. member_input_json=self._build_member_input_json(
  442. team_run=team_run,
  443. team_config=team_config,
  444. member=member,
  445. prior_outputs=[]),
  446. worker_key=worker_key,
  447. dry_run=dry_run): member
  448. for member in members
  449. }
  450. for future in as_completed(future_by_member):
  451. member = future_by_member[future]
  452. results_by_key[member.member_key] = future.result()
  453. return [results_by_key[member.member_key] for member in members]
  454. def _execute_single_member(
  455. self,
  456. *,
  457. team_run: TeamRun,
  458. team_config: TeamConfig,
  459. member: TeamMemberContract,
  460. member_input_json: dict[str, JSONValue],
  461. worker_key: str | None,
  462. dry_run: bool) -> TeamMemberRunResult:
  463. if self.agent_client is None:
  464. raise AgentServiceClientError("agent service client is not configured")
  465. created_run = self.agent_client.create_agent_run(
  466. agent_id=member.agent_id,
  467. agent_config_id=member.agent_config_id,
  468. session_id=team_run.session_id,
  469. input_text=self._build_member_input_text(
  470. team_run=team_run,
  471. team_config=team_config,
  472. member=member),
  473. input_json=member_input_json)
  474. executed_run = self.agent_client.execute_agent_run(
  475. agent_run_id=created_run.id,
  476. worker_key=worker_key,
  477. dry_run=dry_run)
  478. return TeamMemberRunResult(member=member, run=executed_run)
  479. def _should_execute_members_in_parallel(self, team_config: TeamConfig) -> bool:
  480. handoff = team_config.policy_json.get("handoff")
  481. return team_config.coordination_mode == "parallel" or handoff == "parallel_merge"
  482. def _read_team_members(self, team_config: TeamConfig) -> list[TeamMemberContract]:
  483. members: list[TeamMemberContract] = []
  484. for item in team_config.member_refs_json:
  485. try:
  486. members.append(TeamMemberContract.model_validate(item))
  487. except ValueError:
  488. continue
  489. return members
  490. def _order_members(self, members: list[TeamMemberContract]) -> list[TeamMemberContract]:
  491. role_priority = {
  492. "planner": 0,
  493. "supervisor": 1,
  494. "specialist": 2,
  495. "executor": 3,
  496. "reviewer": 4,
  497. }
  498. return sorted(members, key=lambda item: role_priority.get(item.role, 10))
  499. def _build_member_input_text(
  500. self,
  501. *,
  502. team_run: TeamRun,
  503. team_config: TeamConfig,
  504. member: TeamMemberContract) -> str:
  505. lines = [
  506. f"Team objective: {team_config.objective or 'No objective provided.'}",
  507. f"Member role: {member.role}",
  508. ]
  509. if member.responsibility:
  510. lines.append(f"Responsibility: {member.responsibility}")
  511. if team_run.input_text:
  512. lines.append(f"User task: {team_run.input_text}")
  513. return "\n".join(lines)
  514. def _build_member_input_json(
  515. self,
  516. *,
  517. team_run: TeamRun,
  518. team_config: TeamConfig,
  519. member: TeamMemberContract,
  520. prior_outputs: list[dict[str, JSONValue]]) -> dict[str, JSONValue]:
  521. input_json: dict[str, JSONValue] = dict(team_run.input_json or {})
  522. input_json.update(
  523. {
  524. "team_id": team_run.team_id,
  525. "team_run_id": team_run.id,
  526. "team_config_id": team_config.id,
  527. "team_objective": team_config.objective,
  528. "member_key": member.member_key,
  529. "member_role": member.role,
  530. "member_responsibility": member.responsibility,
  531. "prior_member_outputs": prior_outputs,
  532. }
  533. )
  534. configured_input = member.config_json.get("input_json")
  535. if isinstance(configured_input, dict):
  536. input_json.update(
  537. {str(item_key): item_value for item_key, item_value in configured_input.items()}
  538. )
  539. return input_json
  540. def _build_team_output_text(
  541. self,
  542. *,
  543. team_config: TeamConfig,
  544. member_results: list[TeamMemberRunResult]) -> str:
  545. lines = [
  546. f"Team objective: {team_config.objective or 'No objective provided.'}",
  547. f"Coordination mode: {team_config.coordination_mode}",
  548. "Team conversation:",
  549. ]
  550. for index, item in enumerate(member_results, start=1):
  551. output_text = item.run.output_text or item.run.error_message or ""
  552. speaker = item.member.name or item.member.member_key
  553. lines.append(
  554. f"{index}. {speaker} ({item.member.role}) "
  555. f"status={item.run.status}: {output_text}")
  556. return "\n".join(lines)
  557. def _member_result_to_json(self, result: TeamMemberRunResult) -> dict[str, JSONValue]:
  558. return {
  559. "member_key": result.member.member_key,
  560. "member_role": result.member.role,
  561. "member_name": result.member.name,
  562. "member_responsibility": result.member.responsibility,
  563. "agent_run_id": result.run.id,
  564. "agent_id": result.run.agent_id,
  565. "agent_config_id": result.run.agent_config_id,
  566. "status": result.run.status,
  567. "output_text": result.run.output_text,
  568. "output_json": self._compact_agent_output_json(result.run.output_json or {}),
  569. "error_code": result.run.error_code,
  570. "error_message": result.run.error_message,
  571. }
  572. def _compact_prior_output(self, result: TeamMemberRunResult) -> dict[str, JSONValue]:
  573. return {
  574. "member_key": result.member.member_key,
  575. "member_role": result.member.role,
  576. "member_name": result.member.name,
  577. "agent_id": result.run.agent_id,
  578. "agent_run_id": result.run.id,
  579. "status": result.run.status,
  580. "output_text": self._truncate_text(result.run.output_text),
  581. "error_code": result.run.error_code,
  582. "error_message": result.run.error_message,
  583. "output_json": self._compact_agent_output_json(result.run.output_json or {}),
  584. }
  585. def _compact_agent_output_json(
  586. self,
  587. output_json: dict[str, JSONValue]) -> dict[str, JSONValue]:
  588. keep_keys = {
  589. "dry_run",
  590. "model",
  591. "finish_reason",
  592. "usage_json",
  593. "tool_invocations",
  594. "skill_invocations",
  595. "selected_tool_refs",
  596. "selected_skill_refs",
  597. "memory_read_enabled",
  598. "memory_read_count",
  599. "memory_read_reason",
  600. }
  601. compacted = {
  602. key: value for key, value in output_json.items()
  603. if key in keep_keys
  604. }
  605. if "raw_response_json" in output_json or "messages" in output_json:
  606. compacted["debug_payload_omitted"] = True
  607. return compacted
  608. def _truncate_text(self, value: str | None) -> str | None:
  609. if value is None or len(value) <= MAX_PRIOR_OUTPUT_TEXT_CHARS:
  610. return value
  611. return f"{value[:MAX_PRIOR_OUTPUT_TEXT_CHARS]}... [truncated]"
  612. def _publish_event(
  613. self,
  614. *,
  615. event_type: str,
  616. team_run: TeamRun,
  617. payload_json: dict[str, JSONValue]) -> None:
  618. if self.event_client is None:
  619. return
  620. try:
  621. self.event_client.publish_event(
  622. EventPublishContract(
  623. event_type=event_type,
  624. source_service="team-service",
  625. aggregate_type="team_run",
  626. aggregate_id=team_run.id,
  627. correlation_id=team_run.session_id,
  628. payload_json={
  629. **payload_json,
  630. "team_id": team_run.team_id,
  631. "team_config_id": team_run.team_config_id,
  632. })
  633. )
  634. except EventServiceClientError:
  635. return
  636. def _build_team_code(self, name: str) -> str:
  637. base = "".join(
  638. char.lower() if char.isalnum() else "_"
  639. for char in name
  640. ).strip("_") or "team"
  641. return base[:64]
  642. def _normalize_member_refs(self, member_refs: list[dict[str, JSONValue]]) -> list[TeamMemberContract]:
  643. members: list[TeamMemberContract] = []
  644. for index, item in enumerate(member_refs, start=1):
  645. role = item.get("role")
  646. normalized_role = "executor" if role == "worker" else role
  647. member = {
  648. **item,
  649. "member_key": item.get("member_key") or item.get("memberKey") or f"member_{index}",
  650. "agent_id": item.get("agent_id") or item.get("agentId"),
  651. "agent_config_id": item.get("agent_config_id") or item.get("agentConfigId"),
  652. "role": normalized_role or "specialist",
  653. "config_json": item.get("config_json") or item.get("configJson") or {},
  654. }
  655. members.append(TeamMemberContract.model_validate(member))
  656. return members
  657. def build_team_application_service(
  658. *,
  659. team_repository: TeamDefinitionRepository,
  660. team_config_repository: TeamConfigRepository,
  661. team_run_repository: TeamRunRepository,
  662. settings: TeamServiceSettings) -> TeamApplicationService:
  663. redis_client = try_build_redis_client(settings.redis_url)
  664. return TeamApplicationService(
  665. team_repository=team_repository,
  666. team_config_repository=team_config_repository,
  667. team_run_repository=team_run_repository,
  668. agent_client=AgentServiceClient(
  669. base_url=settings.agent_service_url,
  670. timeout_seconds=settings.agent_service_timeout_seconds),
  671. event_client=EventServiceClient(
  672. base_url=settings.event_service_url,
  673. timeout_seconds=settings.event_service_timeout_seconds),
  674. task_queue_publisher=(
  675. TaskQueuePublisher(client=redis_client) if redis_client is not None else None
  676. ))