services.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. from datetime import datetime, timedelta
  2. from core_domain import AgentRunContract, TeamMemberContract
  3. from core_shared import JSONValue
  4. from app.bootstrap.settings import TeamServiceSettings
  5. from app.db.models import TeamDefinition, TeamRun, TeamVersion
  6. from app.domain.repositories import (
  7. TeamDefinitionRepository,
  8. TeamRunRepository,
  9. TeamVersionRepository,
  10. )
  11. from app.infrastructure.agent_client import AgentServiceClient, AgentServiceClientError
  12. from app.schemas.team import (
  13. TeamCreateRequest,
  14. TeamRunCreateRequest,
  15. TeamRunExecuteRequest,
  16. TeamRunStatusUpdateRequest,
  17. TeamStatusUpdateRequest,
  18. TeamVersionCreateRequest,
  19. )
  20. class TeamApplicationService:
  21. def __init__(
  22. self,
  23. *,
  24. team_repository: TeamDefinitionRepository,
  25. team_version_repository: TeamVersionRepository,
  26. team_run_repository: TeamRunRepository,
  27. agent_client: AgentServiceClient | None = None,
  28. ) -> None:
  29. self.team_repository = team_repository
  30. self.team_version_repository = team_version_repository
  31. self.team_run_repository = team_run_repository
  32. self.agent_client = agent_client
  33. def create_team(self, payload: TeamCreateRequest) -> TeamDefinition:
  34. return self.team_repository.create(
  35. tenant_id=payload.tenant_id,
  36. code=payload.code,
  37. name=payload.name,
  38. description=payload.description,
  39. team_type=payload.team_type,
  40. owner_user_id=payload.owner_user_id,
  41. metadata_json=payload.metadata_json,
  42. )
  43. def list_teams(self, *, tenant_id: str) -> list[TeamDefinition]:
  44. return self.team_repository.list_by_tenant(tenant_id=tenant_id)
  45. def update_team_status(
  46. self,
  47. *,
  48. team_id: str,
  49. payload: TeamStatusUpdateRequest,
  50. ) -> TeamDefinition | None:
  51. return self.team_repository.update_status(
  52. tenant_id=payload.tenant_id,
  53. team_id=team_id,
  54. status=payload.status,
  55. )
  56. def create_team_version(self, payload: TeamVersionCreateRequest) -> TeamVersion:
  57. team = self.team_repository.get_by_id(tenant_id=payload.tenant_id, team_id=payload.team_id)
  58. if team is None:
  59. raise ValueError(f"team not found: {payload.team_id}")
  60. if not payload.member_refs:
  61. raise ValueError("team version requires at least one member")
  62. return self.team_version_repository.create(
  63. tenant_id=payload.tenant_id,
  64. team_id=payload.team_id,
  65. status=payload.status,
  66. coordination_mode=payload.coordination_mode,
  67. objective=payload.objective,
  68. member_refs_json=[item.model_dump(mode="json") for item in payload.member_refs],
  69. policy_json=payload.policy_json,
  70. )
  71. def list_team_versions(self, *, tenant_id: str, team_id: str) -> list[TeamVersion]:
  72. return self.team_version_repository.list_by_team(tenant_id=tenant_id, team_id=team_id)
  73. def create_team_run(self, payload: TeamRunCreateRequest) -> TeamRun:
  74. team_version = self._resolve_team_version(
  75. tenant_id=payload.tenant_id,
  76. team_id=payload.team_id,
  77. team_version_id=payload.team_version_id,
  78. )
  79. if team_version is None:
  80. raise ValueError("published team version not found")
  81. return self.team_run_repository.create(
  82. tenant_id=payload.tenant_id,
  83. team_id=payload.team_id,
  84. team_version_id=team_version.id,
  85. session_id=payload.session_id,
  86. input_text=payload.input_text,
  87. input_json=payload.input_json,
  88. )
  89. def list_team_runs(
  90. self,
  91. *,
  92. tenant_id: str,
  93. team_id: str | None = None,
  94. session_id: str | None = None,
  95. ) -> list[TeamRun]:
  96. return self.team_run_repository.list_by_scope(
  97. tenant_id=tenant_id,
  98. team_id=team_id,
  99. session_id=session_id,
  100. )
  101. def update_team_run_status(
  102. self,
  103. *,
  104. team_run_id: str,
  105. payload: TeamRunStatusUpdateRequest,
  106. ) -> TeamRun | None:
  107. entity = self.team_run_repository.get_by_id(
  108. tenant_id=payload.tenant_id,
  109. team_run_id=team_run_id,
  110. )
  111. if entity is None:
  112. return None
  113. return self.team_run_repository.update_status(
  114. team_run_id=team_run_id,
  115. status=payload.status,
  116. worker_key=payload.worker_key,
  117. output_text=payload.output_text,
  118. output_json=payload.output_json,
  119. error_code=payload.error_code,
  120. error_message=payload.error_message,
  121. )
  122. def execute_team_run(
  123. self,
  124. *,
  125. team_run_id: str,
  126. payload: TeamRunExecuteRequest,
  127. ) -> TeamRun | None:
  128. team_run = self.team_run_repository.get_by_id(
  129. tenant_id=payload.tenant_id,
  130. team_run_id=team_run_id,
  131. )
  132. if team_run is None:
  133. return None
  134. team_version = self.team_version_repository.get_by_id(
  135. tenant_id=payload.tenant_id,
  136. team_version_id=team_run.team_version_id,
  137. )
  138. if team_version is None:
  139. return self.team_run_repository.update_status(
  140. team_run_id=team_run.id,
  141. status="failed",
  142. worker_key=payload.worker_key,
  143. error_code="team_version_missing",
  144. error_message=f"team version not found: {team_run.team_version_id}",
  145. )
  146. running_run = self.team_run_repository.update_status(
  147. team_run_id=team_run.id,
  148. status="running",
  149. worker_key=payload.worker_key,
  150. )
  151. if running_run is None:
  152. return None
  153. members = self._read_team_members(team_version)
  154. if not members:
  155. return self.team_run_repository.update_status(
  156. team_run_id=team_run.id,
  157. status="failed",
  158. worker_key=payload.worker_key,
  159. error_code="team_members_missing",
  160. error_message="team version has no valid members",
  161. )
  162. try:
  163. member_results = self._execute_members(
  164. team_run=team_run,
  165. team_version=team_version,
  166. members=members,
  167. worker_key=payload.worker_key,
  168. dry_run=payload.dry_run,
  169. )
  170. except AgentServiceClientError as exc:
  171. return self.team_run_repository.update_status(
  172. team_run_id=team_run.id,
  173. status="failed",
  174. worker_key=payload.worker_key,
  175. error_code="agent_service_error",
  176. error_message=str(exc),
  177. )
  178. failed_results = [item for item in member_results if item.status != "completed"]
  179. output_text = self._build_team_output_text(
  180. team_version=team_version,
  181. member_results=member_results,
  182. )
  183. output_json: dict[str, JSONValue] = {
  184. "dry_run": payload.dry_run,
  185. "coordination_mode": team_version.coordination_mode,
  186. "team_version_id": team_version.id,
  187. "member_run_count": len(member_results),
  188. "member_results": [
  189. self._member_result_to_json(item) for item in member_results
  190. ],
  191. }
  192. if failed_results:
  193. return self.team_run_repository.update_status(
  194. team_run_id=team_run.id,
  195. status="failed",
  196. worker_key=payload.worker_key,
  197. output_text=output_text,
  198. output_json=output_json,
  199. error_code="member_run_failed",
  200. error_message=f"{len(failed_results)} member run(s) failed",
  201. )
  202. return self.team_run_repository.update_status(
  203. team_run_id=team_run.id,
  204. status="completed",
  205. worker_key=payload.worker_key,
  206. output_text=output_text,
  207. output_json=output_json,
  208. )
  209. def execute_next_claimed_team_run(
  210. self,
  211. *,
  212. worker_key: str,
  213. lease_seconds: int,
  214. dry_run: bool,
  215. ) -> tuple[TeamRun, int] | None:
  216. released_lease_count = self.team_run_repository.release_expired_leases(
  217. now_time=datetime.utcnow(),
  218. )
  219. claimed_team_run = self.team_run_repository.claim_next_queued(
  220. worker_key=worker_key,
  221. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds),
  222. )
  223. if claimed_team_run is None:
  224. return None
  225. result = self.execute_team_run(
  226. team_run_id=claimed_team_run.id,
  227. payload=TeamRunExecuteRequest(
  228. tenant_id=claimed_team_run.tenant_id,
  229. worker_key=worker_key,
  230. dry_run=dry_run,
  231. ),
  232. )
  233. if result is None:
  234. return None
  235. return result, released_lease_count
  236. def _resolve_team_version(
  237. self,
  238. *,
  239. tenant_id: str,
  240. team_id: str,
  241. team_version_id: str | None,
  242. ) -> TeamVersion | None:
  243. if team_version_id is not None:
  244. return self.team_version_repository.get_by_id(
  245. tenant_id=tenant_id,
  246. team_version_id=team_version_id,
  247. )
  248. return self.team_version_repository.get_latest_published(
  249. tenant_id=tenant_id,
  250. team_id=team_id,
  251. )
  252. def _execute_members(
  253. self,
  254. *,
  255. team_run: TeamRun,
  256. team_version: TeamVersion,
  257. members: list[TeamMemberContract],
  258. worker_key: str | None,
  259. dry_run: bool,
  260. ) -> list[AgentRunContract]:
  261. if self.agent_client is None:
  262. raise AgentServiceClientError("agent service client is not configured")
  263. member_results: list[AgentRunContract] = []
  264. prior_outputs: list[dict[str, JSONValue]] = []
  265. for member in self._order_members(members):
  266. member_input_json = self._build_member_input_json(
  267. team_run=team_run,
  268. team_version=team_version,
  269. member=member,
  270. prior_outputs=prior_outputs,
  271. )
  272. created_run = self.agent_client.create_agent_run(
  273. tenant_id=team_run.tenant_id,
  274. agent_id=member.agent_id,
  275. agent_version_id=member.agent_version_id,
  276. session_id=team_run.session_id,
  277. input_text=self._build_member_input_text(
  278. team_run=team_run,
  279. team_version=team_version,
  280. member=member,
  281. ),
  282. input_json=member_input_json,
  283. )
  284. executed_run = self.agent_client.execute_agent_run(
  285. tenant_id=team_run.tenant_id,
  286. agent_run_id=created_run.id,
  287. worker_key=worker_key,
  288. dry_run=dry_run,
  289. )
  290. member_results.append(executed_run)
  291. prior_outputs.append(
  292. {
  293. "member_key": member.member_key,
  294. "role": member.role,
  295. "agent_run_id": executed_run.id,
  296. "status": executed_run.status,
  297. "output_text": executed_run.output_text,
  298. "output_json": executed_run.output_json or {},
  299. }
  300. )
  301. return member_results
  302. def _read_team_members(self, team_version: TeamVersion) -> list[TeamMemberContract]:
  303. members: list[TeamMemberContract] = []
  304. for item in team_version.member_refs_json:
  305. try:
  306. members.append(TeamMemberContract.model_validate(item))
  307. except ValueError:
  308. continue
  309. return members
  310. def _order_members(self, members: list[TeamMemberContract]) -> list[TeamMemberContract]:
  311. role_priority = {
  312. "planner": 0,
  313. "supervisor": 1,
  314. "specialist": 2,
  315. "executor": 3,
  316. "reviewer": 4,
  317. }
  318. return sorted(members, key=lambda item: role_priority.get(item.role, 10))
  319. def _build_member_input_text(
  320. self,
  321. *,
  322. team_run: TeamRun,
  323. team_version: TeamVersion,
  324. member: TeamMemberContract,
  325. ) -> str:
  326. lines = [
  327. f"Team objective: {team_version.objective or 'No objective provided.'}",
  328. f"Member role: {member.role}",
  329. ]
  330. if member.responsibility:
  331. lines.append(f"Responsibility: {member.responsibility}")
  332. if team_run.input_text:
  333. lines.append(f"User task: {team_run.input_text}")
  334. return "\n".join(lines)
  335. def _build_member_input_json(
  336. self,
  337. *,
  338. team_run: TeamRun,
  339. team_version: TeamVersion,
  340. member: TeamMemberContract,
  341. prior_outputs: list[dict[str, JSONValue]],
  342. ) -> dict[str, JSONValue]:
  343. input_json: dict[str, JSONValue] = dict(team_run.input_json or {})
  344. input_json.update(
  345. {
  346. "team_id": team_run.team_id,
  347. "team_run_id": team_run.id,
  348. "team_version_id": team_version.id,
  349. "team_objective": team_version.objective,
  350. "member_key": member.member_key,
  351. "member_role": member.role,
  352. "member_responsibility": member.responsibility,
  353. "prior_member_outputs": prior_outputs,
  354. }
  355. )
  356. configured_input = member.config_json.get("input_json")
  357. if isinstance(configured_input, dict):
  358. input_json.update(
  359. {str(item_key): item_value for item_key, item_value in configured_input.items()}
  360. )
  361. return input_json
  362. def _build_team_output_text(
  363. self,
  364. *,
  365. team_version: TeamVersion,
  366. member_results: list[AgentRunContract],
  367. ) -> str:
  368. lines = [
  369. f"Team objective: {team_version.objective or 'No objective provided.'}",
  370. f"Coordination mode: {team_version.coordination_mode}",
  371. "Member results:",
  372. ]
  373. for index, result in enumerate(member_results, start=1):
  374. output_text = result.output_text or result.error_message or ""
  375. lines.append(f"{index}. agent={result.agent_id} status={result.status}: {output_text}")
  376. return "\n".join(lines)
  377. def _member_result_to_json(self, result: AgentRunContract) -> dict[str, JSONValue]:
  378. return {
  379. "agent_run_id": result.id,
  380. "agent_id": result.agent_id,
  381. "agent_version_id": result.agent_version_id,
  382. "status": result.status,
  383. "output_text": result.output_text,
  384. "output_json": result.output_json or {},
  385. "error_code": result.error_code,
  386. "error_message": result.error_message,
  387. }
  388. def build_team_application_service(
  389. *,
  390. team_repository: TeamDefinitionRepository,
  391. team_version_repository: TeamVersionRepository,
  392. team_run_repository: TeamRunRepository,
  393. settings: TeamServiceSettings,
  394. ) -> TeamApplicationService:
  395. return TeamApplicationService(
  396. team_repository=team_repository,
  397. team_version_repository=team_version_repository,
  398. team_run_repository=team_run_repository,
  399. agent_client=AgentServiceClient(
  400. base_url=settings.agent_service_url,
  401. timeout_seconds=settings.agent_service_timeout_seconds,
  402. ),
  403. )