services.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500
  1. from datetime import datetime, timedelta
  2. from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
  3. from core_domain import AgentRunContract, TeamMemberContract
  4. from core_shared import JSONValue
  5. from app.bootstrap.settings import TeamServiceSettings
  6. from app.db.models import TeamDefinition, TeamRun, TeamVersion
  7. from app.domain.repositories import (
  8. TeamDefinitionRepository,
  9. TeamRunRepository,
  10. TeamVersionRepository,
  11. )
  12. from app.infrastructure.agent_client import AgentServiceClient, AgentServiceClientError
  13. from app.schemas.team import (
  14. TeamCreateRequest,
  15. TeamRunCreateRequest,
  16. TeamRunExecuteRequest,
  17. TeamRunStatusUpdateRequest,
  18. TeamStatusUpdateRequest,
  19. TeamVersionCreateRequest,
  20. )
  21. class TeamApplicationService:
  22. def __init__(
  23. self,
  24. *,
  25. team_repository: TeamDefinitionRepository,
  26. team_version_repository: TeamVersionRepository,
  27. team_run_repository: TeamRunRepository,
  28. agent_client: AgentServiceClient | None = None,
  29. event_client: EventServiceClient | None = None,
  30. ) -> None:
  31. self.team_repository = team_repository
  32. self.team_version_repository = team_version_repository
  33. self.team_run_repository = team_run_repository
  34. self.agent_client = agent_client
  35. self.event_client = event_client
  36. def create_team(self, payload: TeamCreateRequest) -> TeamDefinition:
  37. return self.team_repository.create(
  38. tenant_id=payload.tenant_id,
  39. code=payload.code,
  40. name=payload.name,
  41. description=payload.description,
  42. team_type=payload.team_type,
  43. owner_user_id=payload.owner_user_id,
  44. metadata_json=payload.metadata_json,
  45. )
  46. def list_teams(self, *, tenant_id: str) -> list[TeamDefinition]:
  47. return self.team_repository.list_by_tenant(tenant_id=tenant_id)
  48. def update_team_status(
  49. self,
  50. *,
  51. team_id: str,
  52. payload: TeamStatusUpdateRequest,
  53. ) -> TeamDefinition | None:
  54. return self.team_repository.update_status(
  55. tenant_id=payload.tenant_id,
  56. team_id=team_id,
  57. status=payload.status,
  58. )
  59. def create_team_version(self, payload: TeamVersionCreateRequest) -> TeamVersion:
  60. team = self.team_repository.get_by_id(tenant_id=payload.tenant_id, team_id=payload.team_id)
  61. if team is None:
  62. raise ValueError(f"team not found: {payload.team_id}")
  63. if not payload.member_refs:
  64. raise ValueError("team version requires at least one member")
  65. return self.team_version_repository.create(
  66. tenant_id=payload.tenant_id,
  67. team_id=payload.team_id,
  68. status=payload.status,
  69. coordination_mode=payload.coordination_mode,
  70. objective=payload.objective,
  71. member_refs_json=[item.model_dump(mode="json") for item in payload.member_refs],
  72. policy_json=payload.policy_json,
  73. )
  74. def list_team_versions(self, *, tenant_id: str, team_id: str) -> list[TeamVersion]:
  75. return self.team_version_repository.list_by_team(tenant_id=tenant_id, team_id=team_id)
  76. def create_team_run(self, payload: TeamRunCreateRequest) -> TeamRun:
  77. team_version = self._resolve_team_version(
  78. tenant_id=payload.tenant_id,
  79. team_id=payload.team_id,
  80. team_version_id=payload.team_version_id,
  81. )
  82. if team_version is None:
  83. raise ValueError("published team version not found")
  84. team_run = self.team_run_repository.create(
  85. tenant_id=payload.tenant_id,
  86. team_id=payload.team_id,
  87. team_version_id=team_version.id,
  88. session_id=payload.session_id,
  89. input_text=payload.input_text,
  90. input_json=payload.input_json,
  91. )
  92. self._publish_event(
  93. event_type="team.run.created",
  94. team_run=team_run,
  95. payload_json={"team_run_id": team_run.id, "status": team_run.status},
  96. )
  97. return team_run
  98. def list_team_runs(
  99. self,
  100. *,
  101. tenant_id: str,
  102. team_id: str | None = None,
  103. session_id: str | None = None,
  104. ) -> list[TeamRun]:
  105. return self.team_run_repository.list_by_scope(
  106. tenant_id=tenant_id,
  107. team_id=team_id,
  108. session_id=session_id,
  109. )
  110. def update_team_run_status(
  111. self,
  112. *,
  113. team_run_id: str,
  114. payload: TeamRunStatusUpdateRequest,
  115. ) -> TeamRun | None:
  116. entity = self.team_run_repository.get_by_id(
  117. tenant_id=payload.tenant_id,
  118. team_run_id=team_run_id,
  119. )
  120. if entity is None:
  121. return None
  122. return self.team_run_repository.update_status(
  123. team_run_id=team_run_id,
  124. status=payload.status,
  125. worker_key=payload.worker_key,
  126. output_text=payload.output_text,
  127. output_json=payload.output_json,
  128. error_code=payload.error_code,
  129. error_message=payload.error_message,
  130. )
  131. def execute_team_run(
  132. self,
  133. *,
  134. team_run_id: str,
  135. payload: TeamRunExecuteRequest,
  136. ) -> TeamRun | None:
  137. team_run = self.team_run_repository.get_by_id(
  138. tenant_id=payload.tenant_id,
  139. team_run_id=team_run_id,
  140. )
  141. if team_run is None:
  142. return None
  143. team_version = self.team_version_repository.get_by_id(
  144. tenant_id=payload.tenant_id,
  145. team_version_id=team_run.team_version_id,
  146. )
  147. if team_version is None:
  148. failed_run = self.team_run_repository.update_status(
  149. team_run_id=team_run.id,
  150. status="failed",
  151. worker_key=payload.worker_key,
  152. error_code="team_version_missing",
  153. error_message=f"team version not found: {team_run.team_version_id}",
  154. )
  155. running_run = self.team_run_repository.update_status(
  156. team_run_id=team_run.id,
  157. status="running",
  158. worker_key=payload.worker_key,
  159. )
  160. if running_run is None:
  161. return None
  162. members = self._read_team_members(team_version)
  163. if not members:
  164. return self.team_run_repository.update_status(
  165. team_run_id=team_run.id,
  166. status="failed",
  167. worker_key=payload.worker_key,
  168. error_code="team_members_missing",
  169. error_message="team version has no valid members",
  170. )
  171. try:
  172. member_results = self._execute_members(
  173. team_run=team_run,
  174. team_version=team_version,
  175. members=members,
  176. worker_key=payload.worker_key,
  177. dry_run=payload.dry_run,
  178. )
  179. except AgentServiceClientError as exc:
  180. return self.team_run_repository.update_status(
  181. team_run_id=team_run.id,
  182. status="failed",
  183. worker_key=payload.worker_key,
  184. error_code="agent_service_error",
  185. error_message=str(exc),
  186. )
  187. failed_results = [item for item in member_results if item.status != "completed"]
  188. output_text = self._build_team_output_text(
  189. team_version=team_version,
  190. member_results=member_results,
  191. )
  192. output_json: dict[str, JSONValue] = {
  193. "dry_run": payload.dry_run,
  194. "coordination_mode": team_version.coordination_mode,
  195. "team_version_id": team_version.id,
  196. "member_run_count": len(member_results),
  197. "member_results": [
  198. self._member_result_to_json(item) for item in member_results
  199. ],
  200. }
  201. if failed_results:
  202. return self.team_run_repository.update_status(
  203. team_run_id=team_run.id,
  204. status="failed",
  205. worker_key=payload.worker_key,
  206. output_text=output_text,
  207. output_json=output_json,
  208. error_code="member_run_failed",
  209. error_message=f"{len(failed_results)} member run(s) failed",
  210. )
  211. if failed_run is not None:
  212. self._publish_event(
  213. event_type="team.run.failed",
  214. team_run=failed_run,
  215. payload_json={
  216. "team_run_id": failed_run.id,
  217. "status": failed_run.status,
  218. "failed_member_count": len(failed_results),
  219. },
  220. )
  221. return failed_run
  222. completed_run = self.team_run_repository.update_status(
  223. team_run_id=team_run.id,
  224. status="completed",
  225. worker_key=payload.worker_key,
  226. output_text=output_text,
  227. output_json=output_json,
  228. )
  229. if completed_run is not None:
  230. self._publish_event(
  231. event_type="team.run.completed",
  232. team_run=completed_run,
  233. payload_json={
  234. "team_run_id": completed_run.id,
  235. "status": completed_run.status,
  236. "member_run_count": len(member_results),
  237. },
  238. )
  239. return completed_run
  240. def execute_next_claimed_team_run(
  241. self,
  242. *,
  243. worker_key: str,
  244. lease_seconds: int,
  245. dry_run: bool,
  246. ) -> tuple[TeamRun, int] | None:
  247. released_lease_count = self.team_run_repository.release_expired_leases(
  248. now_time=datetime.utcnow(),
  249. )
  250. claimed_team_run = self.team_run_repository.claim_next_queued(
  251. worker_key=worker_key,
  252. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds),
  253. )
  254. if claimed_team_run is None:
  255. return None
  256. result = self.execute_team_run(
  257. team_run_id=claimed_team_run.id,
  258. payload=TeamRunExecuteRequest(
  259. tenant_id=claimed_team_run.tenant_id,
  260. worker_key=worker_key,
  261. dry_run=dry_run,
  262. ),
  263. )
  264. if result is None:
  265. return None
  266. return result, released_lease_count
  267. def _resolve_team_version(
  268. self,
  269. *,
  270. tenant_id: str,
  271. team_id: str,
  272. team_version_id: str | None,
  273. ) -> TeamVersion | None:
  274. if team_version_id is not None:
  275. return self.team_version_repository.get_by_id(
  276. tenant_id=tenant_id,
  277. team_version_id=team_version_id,
  278. )
  279. return self.team_version_repository.get_latest_published(
  280. tenant_id=tenant_id,
  281. team_id=team_id,
  282. )
  283. def _execute_members(
  284. self,
  285. *,
  286. team_run: TeamRun,
  287. team_version: TeamVersion,
  288. members: list[TeamMemberContract],
  289. worker_key: str | None,
  290. dry_run: bool,
  291. ) -> list[AgentRunContract]:
  292. if self.agent_client is None:
  293. raise AgentServiceClientError("agent service client is not configured")
  294. member_results: list[AgentRunContract] = []
  295. prior_outputs: list[dict[str, JSONValue]] = []
  296. for member in self._order_members(members):
  297. member_input_json = self._build_member_input_json(
  298. team_run=team_run,
  299. team_version=team_version,
  300. member=member,
  301. prior_outputs=prior_outputs,
  302. )
  303. created_run = self.agent_client.create_agent_run(
  304. tenant_id=team_run.tenant_id,
  305. agent_id=member.agent_id,
  306. agent_version_id=member.agent_version_id,
  307. session_id=team_run.session_id,
  308. input_text=self._build_member_input_text(
  309. team_run=team_run,
  310. team_version=team_version,
  311. member=member,
  312. ),
  313. input_json=member_input_json,
  314. )
  315. executed_run = self.agent_client.execute_agent_run(
  316. tenant_id=team_run.tenant_id,
  317. agent_run_id=created_run.id,
  318. worker_key=worker_key,
  319. dry_run=dry_run,
  320. )
  321. member_results.append(executed_run)
  322. prior_outputs.append(
  323. {
  324. "member_key": member.member_key,
  325. "role": member.role,
  326. "agent_run_id": executed_run.id,
  327. "status": executed_run.status,
  328. "output_text": executed_run.output_text,
  329. "output_json": executed_run.output_json or {},
  330. }
  331. )
  332. return member_results
  333. def _read_team_members(self, team_version: TeamVersion) -> list[TeamMemberContract]:
  334. members: list[TeamMemberContract] = []
  335. for item in team_version.member_refs_json:
  336. try:
  337. members.append(TeamMemberContract.model_validate(item))
  338. except ValueError:
  339. continue
  340. return members
  341. def _order_members(self, members: list[TeamMemberContract]) -> list[TeamMemberContract]:
  342. role_priority = {
  343. "planner": 0,
  344. "supervisor": 1,
  345. "specialist": 2,
  346. "executor": 3,
  347. "reviewer": 4,
  348. }
  349. return sorted(members, key=lambda item: role_priority.get(item.role, 10))
  350. def _build_member_input_text(
  351. self,
  352. *,
  353. team_run: TeamRun,
  354. team_version: TeamVersion,
  355. member: TeamMemberContract,
  356. ) -> str:
  357. lines = [
  358. f"Team objective: {team_version.objective or 'No objective provided.'}",
  359. f"Member role: {member.role}",
  360. ]
  361. if member.responsibility:
  362. lines.append(f"Responsibility: {member.responsibility}")
  363. if team_run.input_text:
  364. lines.append(f"User task: {team_run.input_text}")
  365. return "\n".join(lines)
  366. def _build_member_input_json(
  367. self,
  368. *,
  369. team_run: TeamRun,
  370. team_version: TeamVersion,
  371. member: TeamMemberContract,
  372. prior_outputs: list[dict[str, JSONValue]],
  373. ) -> dict[str, JSONValue]:
  374. input_json: dict[str, JSONValue] = dict(team_run.input_json or {})
  375. input_json.update(
  376. {
  377. "team_id": team_run.team_id,
  378. "team_run_id": team_run.id,
  379. "team_version_id": team_version.id,
  380. "team_objective": team_version.objective,
  381. "member_key": member.member_key,
  382. "member_role": member.role,
  383. "member_responsibility": member.responsibility,
  384. "prior_member_outputs": prior_outputs,
  385. }
  386. )
  387. configured_input = member.config_json.get("input_json")
  388. if isinstance(configured_input, dict):
  389. input_json.update(
  390. {str(item_key): item_value for item_key, item_value in configured_input.items()}
  391. )
  392. return input_json
  393. def _build_team_output_text(
  394. self,
  395. *,
  396. team_version: TeamVersion,
  397. member_results: list[AgentRunContract],
  398. ) -> str:
  399. lines = [
  400. f"Team objective: {team_version.objective or 'No objective provided.'}",
  401. f"Coordination mode: {team_version.coordination_mode}",
  402. "Member results:",
  403. ]
  404. for index, result in enumerate(member_results, start=1):
  405. output_text = result.output_text or result.error_message or ""
  406. lines.append(f"{index}. agent={result.agent_id} status={result.status}: {output_text}")
  407. return "\n".join(lines)
  408. def _member_result_to_json(self, result: AgentRunContract) -> dict[str, JSONValue]:
  409. return {
  410. "agent_run_id": result.id,
  411. "agent_id": result.agent_id,
  412. "agent_version_id": result.agent_version_id,
  413. "status": result.status,
  414. "output_text": result.output_text,
  415. "output_json": result.output_json or {},
  416. "error_code": result.error_code,
  417. "error_message": result.error_message,
  418. }
  419. def _publish_event(
  420. self,
  421. *,
  422. event_type: str,
  423. team_run: TeamRun,
  424. payload_json: dict[str, JSONValue],
  425. ) -> None:
  426. if self.event_client is None:
  427. return
  428. try:
  429. self.event_client.publish_event(
  430. EventPublishContract(
  431. tenant_id=team_run.tenant_id,
  432. event_type=event_type,
  433. source_service="team-service",
  434. aggregate_type="team_run",
  435. aggregate_id=team_run.id,
  436. correlation_id=team_run.session_id,
  437. payload_json={
  438. **payload_json,
  439. "team_id": team_run.team_id,
  440. "team_version_id": team_run.team_version_id,
  441. },
  442. )
  443. )
  444. except EventServiceClientError:
  445. return
  446. def build_team_application_service(
  447. *,
  448. team_repository: TeamDefinitionRepository,
  449. team_version_repository: TeamVersionRepository,
  450. team_run_repository: TeamRunRepository,
  451. settings: TeamServiceSettings,
  452. ) -> TeamApplicationService:
  453. return TeamApplicationService(
  454. team_repository=team_repository,
  455. team_version_repository=team_version_repository,
  456. team_run_repository=team_run_repository,
  457. agent_client=AgentServiceClient(
  458. base_url=settings.agent_service_url,
  459. timeout_seconds=settings.agent_service_timeout_seconds,
  460. ),
  461. event_client=EventServiceClient(
  462. base_url=settings.event_service_url,
  463. timeout_seconds=settings.event_service_timeout_seconds,
  464. ),
  465. )