services.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. from datetime import datetime, timedelta
  2. from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
  3. from core_domain import AgentRunContract, TeamMemberContract
  4. from core_shared import JSONValue, try_build_redis_client
  5. from core_shared.task_queue import TaskQueuePublisher
  6. from app.bootstrap.settings import TeamServiceSettings
  7. from app.db.models import TeamDefinition, TeamRun, TeamVersion
  8. from app.domain.repositories import (
  9. TeamDefinitionRepository,
  10. TeamRunRepository,
  11. TeamVersionRepository,
  12. )
  13. from app.infrastructure.agent_client import AgentServiceClient, AgentServiceClientError
  14. from app.schemas.team import (
  15. TeamCreateRequest,
  16. TeamRunCreateRequest,
  17. TeamRunExecuteRequest,
  18. TeamRunStatusUpdateRequest,
  19. TeamStatusUpdateRequest,
  20. TeamVersionCreateRequest,
  21. )
  22. class TeamApplicationService:
  23. def __init__(
  24. self,
  25. *,
  26. team_repository: TeamDefinitionRepository,
  27. team_version_repository: TeamVersionRepository,
  28. team_run_repository: TeamRunRepository,
  29. agent_client: AgentServiceClient | None = None,
  30. event_client: EventServiceClient | None = None,
  31. task_queue_publisher: TaskQueuePublisher | None = None,
  32. ) -> None:
  33. self.team_repository = team_repository
  34. self.team_version_repository = team_version_repository
  35. self.team_run_repository = team_run_repository
  36. self.agent_client = agent_client
  37. self.event_client = event_client
  38. self.task_queue_publisher = task_queue_publisher
  39. def create_team(self, payload: TeamCreateRequest) -> TeamDefinition:
  40. return self.team_repository.create(
  41. tenant_id=payload.tenant_id,
  42. code=payload.code,
  43. name=payload.name,
  44. description=payload.description,
  45. team_type=payload.team_type,
  46. owner_user_id=payload.owner_user_id,
  47. metadata_json=payload.metadata_json,
  48. )
  49. def list_teams(self, *, tenant_id: str) -> list[TeamDefinition]:
  50. return self.team_repository.list_by_tenant(tenant_id=tenant_id)
  51. def update_team_status(
  52. self,
  53. *,
  54. team_id: str,
  55. payload: TeamStatusUpdateRequest,
  56. ) -> TeamDefinition | None:
  57. return self.team_repository.update_status(
  58. tenant_id=payload.tenant_id,
  59. team_id=team_id,
  60. status=payload.status,
  61. )
  62. def create_team_version(self, payload: TeamVersionCreateRequest) -> TeamVersion:
  63. team = self.team_repository.get_by_id(tenant_id=payload.tenant_id, team_id=payload.team_id)
  64. if team is None:
  65. raise ValueError(f"team not found: {payload.team_id}")
  66. if not payload.member_refs:
  67. raise ValueError("team version requires at least one member")
  68. return self.team_version_repository.create(
  69. tenant_id=payload.tenant_id,
  70. team_id=payload.team_id,
  71. status=payload.status,
  72. coordination_mode=payload.coordination_mode,
  73. objective=payload.objective,
  74. member_refs_json=[item.model_dump(mode="json") for item in payload.member_refs],
  75. policy_json=payload.policy_json,
  76. )
  77. def list_team_versions(self, *, tenant_id: str, team_id: str) -> list[TeamVersion]:
  78. return self.team_version_repository.list_by_team(tenant_id=tenant_id, team_id=team_id)
  79. def create_team_run(self, payload: TeamRunCreateRequest) -> TeamRun:
  80. team_version = self._resolve_team_version(
  81. tenant_id=payload.tenant_id,
  82. team_id=payload.team_id,
  83. team_version_id=payload.team_version_id,
  84. )
  85. if team_version is None:
  86. raise ValueError("published team version not found")
  87. team_run = self.team_run_repository.create(
  88. tenant_id=payload.tenant_id,
  89. team_id=payload.team_id,
  90. team_version_id=team_version.id,
  91. session_id=payload.session_id,
  92. input_text=payload.input_text,
  93. input_json=payload.input_json,
  94. )
  95. self._publish_event(
  96. event_type="team.run.created",
  97. team_run=team_run,
  98. payload_json={"team_run_id": team_run.id, "status": team_run.status},
  99. )
  100. if self.task_queue_publisher is not None:
  101. self.task_queue_publisher.publish_team_run(
  102. tenant_id=team_run.tenant_id,
  103. team_run_id=team_run.id,
  104. )
  105. return team_run
  106. def list_team_runs(
  107. self,
  108. *,
  109. tenant_id: str,
  110. team_id: str | None = None,
  111. session_id: str | None = None,
  112. ) -> list[TeamRun]:
  113. return self.team_run_repository.list_by_scope(
  114. tenant_id=tenant_id,
  115. team_id=team_id,
  116. session_id=session_id,
  117. )
  118. def update_team_run_status(
  119. self,
  120. *,
  121. team_run_id: str,
  122. payload: TeamRunStatusUpdateRequest,
  123. ) -> TeamRun | None:
  124. entity = self.team_run_repository.get_by_id(
  125. tenant_id=payload.tenant_id,
  126. team_run_id=team_run_id,
  127. )
  128. if entity is None:
  129. return None
  130. return self.team_run_repository.update_status(
  131. team_run_id=team_run_id,
  132. status=payload.status,
  133. worker_key=payload.worker_key,
  134. output_text=payload.output_text,
  135. output_json=payload.output_json,
  136. error_code=payload.error_code,
  137. error_message=payload.error_message,
  138. )
  139. def execute_team_run(
  140. self,
  141. *,
  142. team_run_id: str,
  143. payload: TeamRunExecuteRequest,
  144. ) -> TeamRun | None:
  145. team_run = self.team_run_repository.get_by_id(
  146. tenant_id=payload.tenant_id,
  147. team_run_id=team_run_id,
  148. )
  149. if team_run is None:
  150. return None
  151. team_version = self.team_version_repository.get_by_id(
  152. tenant_id=payload.tenant_id,
  153. team_version_id=team_run.team_version_id,
  154. )
  155. if team_version is None:
  156. failed_run = self.team_run_repository.update_status(
  157. team_run_id=team_run.id,
  158. status="failed",
  159. worker_key=payload.worker_key,
  160. error_code="team_version_missing",
  161. error_message=f"team version not found: {team_run.team_version_id}",
  162. )
  163. running_run = self.team_run_repository.update_status(
  164. team_run_id=team_run.id,
  165. status="running",
  166. worker_key=payload.worker_key,
  167. )
  168. if running_run is None:
  169. return None
  170. members = self._read_team_members(team_version)
  171. if not members:
  172. return self.team_run_repository.update_status(
  173. team_run_id=team_run.id,
  174. status="failed",
  175. worker_key=payload.worker_key,
  176. error_code="team_members_missing",
  177. error_message="team version has no valid members",
  178. )
  179. try:
  180. member_results = self._execute_members(
  181. team_run=team_run,
  182. team_version=team_version,
  183. members=members,
  184. worker_key=payload.worker_key,
  185. dry_run=payload.dry_run,
  186. )
  187. except AgentServiceClientError as exc:
  188. return self.team_run_repository.update_status(
  189. team_run_id=team_run.id,
  190. status="failed",
  191. worker_key=payload.worker_key,
  192. error_code="agent_service_error",
  193. error_message=str(exc),
  194. )
  195. failed_results = [item for item in member_results if item.status != "completed"]
  196. output_text = self._build_team_output_text(
  197. team_version=team_version,
  198. member_results=member_results,
  199. )
  200. output_json: dict[str, JSONValue] = {
  201. "dry_run": payload.dry_run,
  202. "coordination_mode": team_version.coordination_mode,
  203. "team_version_id": team_version.id,
  204. "member_run_count": len(member_results),
  205. "member_results": [
  206. self._member_result_to_json(item) for item in member_results
  207. ],
  208. }
  209. if failed_results:
  210. return self.team_run_repository.update_status(
  211. team_run_id=team_run.id,
  212. status="failed",
  213. worker_key=payload.worker_key,
  214. output_text=output_text,
  215. output_json=output_json,
  216. error_code="member_run_failed",
  217. error_message=f"{len(failed_results)} member run(s) failed",
  218. )
  219. if failed_run is not None:
  220. self._publish_event(
  221. event_type="team.run.failed",
  222. team_run=failed_run,
  223. payload_json={
  224. "team_run_id": failed_run.id,
  225. "status": failed_run.status,
  226. "failed_member_count": len(failed_results),
  227. },
  228. )
  229. return failed_run
  230. completed_run = self.team_run_repository.update_status(
  231. team_run_id=team_run.id,
  232. status="completed",
  233. worker_key=payload.worker_key,
  234. output_text=output_text,
  235. output_json=output_json,
  236. )
  237. if completed_run is not None:
  238. self._publish_event(
  239. event_type="team.run.completed",
  240. team_run=completed_run,
  241. payload_json={
  242. "team_run_id": completed_run.id,
  243. "status": completed_run.status,
  244. "member_run_count": len(member_results),
  245. },
  246. )
  247. return completed_run
  248. def execute_next_claimed_team_run(
  249. self,
  250. *,
  251. worker_key: str,
  252. lease_seconds: int,
  253. dry_run: bool,
  254. ) -> tuple[TeamRun, int] | None:
  255. released_lease_count = self.team_run_repository.release_expired_leases(
  256. now_time=datetime.utcnow(),
  257. )
  258. claimed_team_run = self.team_run_repository.claim_next_queued(
  259. worker_key=worker_key,
  260. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds),
  261. )
  262. if claimed_team_run is None:
  263. return None
  264. result = self.execute_team_run(
  265. team_run_id=claimed_team_run.id,
  266. payload=TeamRunExecuteRequest(
  267. tenant_id=claimed_team_run.tenant_id,
  268. worker_key=worker_key,
  269. dry_run=dry_run,
  270. ),
  271. )
  272. if result is None:
  273. return None
  274. return result, released_lease_count
  275. def _resolve_team_version(
  276. self,
  277. *,
  278. tenant_id: str,
  279. team_id: str,
  280. team_version_id: str | None,
  281. ) -> TeamVersion | None:
  282. if team_version_id is not None:
  283. return self.team_version_repository.get_by_id(
  284. tenant_id=tenant_id,
  285. team_version_id=team_version_id,
  286. )
  287. return self.team_version_repository.get_latest_published(
  288. tenant_id=tenant_id,
  289. team_id=team_id,
  290. )
  291. def _execute_members(
  292. self,
  293. *,
  294. team_run: TeamRun,
  295. team_version: TeamVersion,
  296. members: list[TeamMemberContract],
  297. worker_key: str | None,
  298. dry_run: bool,
  299. ) -> list[AgentRunContract]:
  300. if self.agent_client is None:
  301. raise AgentServiceClientError("agent service client is not configured")
  302. member_results: list[AgentRunContract] = []
  303. prior_outputs: list[dict[str, JSONValue]] = []
  304. for member in self._order_members(members):
  305. member_input_json = self._build_member_input_json(
  306. team_run=team_run,
  307. team_version=team_version,
  308. member=member,
  309. prior_outputs=prior_outputs,
  310. )
  311. created_run = self.agent_client.create_agent_run(
  312. tenant_id=team_run.tenant_id,
  313. agent_id=member.agent_id,
  314. agent_version_id=member.agent_version_id,
  315. session_id=team_run.session_id,
  316. input_text=self._build_member_input_text(
  317. team_run=team_run,
  318. team_version=team_version,
  319. member=member,
  320. ),
  321. input_json=member_input_json,
  322. )
  323. executed_run = self.agent_client.execute_agent_run(
  324. tenant_id=team_run.tenant_id,
  325. agent_run_id=created_run.id,
  326. worker_key=worker_key,
  327. dry_run=dry_run,
  328. )
  329. member_results.append(executed_run)
  330. prior_outputs.append(
  331. {
  332. "member_key": member.member_key,
  333. "role": member.role,
  334. "agent_run_id": executed_run.id,
  335. "status": executed_run.status,
  336. "output_text": executed_run.output_text,
  337. "output_json": executed_run.output_json or {},
  338. }
  339. )
  340. return member_results
  341. def _read_team_members(self, team_version: TeamVersion) -> list[TeamMemberContract]:
  342. members: list[TeamMemberContract] = []
  343. for item in team_version.member_refs_json:
  344. try:
  345. members.append(TeamMemberContract.model_validate(item))
  346. except ValueError:
  347. continue
  348. return members
  349. def _order_members(self, members: list[TeamMemberContract]) -> list[TeamMemberContract]:
  350. role_priority = {
  351. "planner": 0,
  352. "supervisor": 1,
  353. "specialist": 2,
  354. "executor": 3,
  355. "reviewer": 4,
  356. }
  357. return sorted(members, key=lambda item: role_priority.get(item.role, 10))
  358. def _build_member_input_text(
  359. self,
  360. *,
  361. team_run: TeamRun,
  362. team_version: TeamVersion,
  363. member: TeamMemberContract,
  364. ) -> str:
  365. lines = [
  366. f"Team objective: {team_version.objective or 'No objective provided.'}",
  367. f"Member role: {member.role}",
  368. ]
  369. if member.responsibility:
  370. lines.append(f"Responsibility: {member.responsibility}")
  371. if team_run.input_text:
  372. lines.append(f"User task: {team_run.input_text}")
  373. return "\n".join(lines)
  374. def _build_member_input_json(
  375. self,
  376. *,
  377. team_run: TeamRun,
  378. team_version: TeamVersion,
  379. member: TeamMemberContract,
  380. prior_outputs: list[dict[str, JSONValue]],
  381. ) -> dict[str, JSONValue]:
  382. input_json: dict[str, JSONValue] = dict(team_run.input_json or {})
  383. input_json.update(
  384. {
  385. "team_id": team_run.team_id,
  386. "team_run_id": team_run.id,
  387. "team_version_id": team_version.id,
  388. "team_objective": team_version.objective,
  389. "member_key": member.member_key,
  390. "member_role": member.role,
  391. "member_responsibility": member.responsibility,
  392. "prior_member_outputs": prior_outputs,
  393. }
  394. )
  395. configured_input = member.config_json.get("input_json")
  396. if isinstance(configured_input, dict):
  397. input_json.update(
  398. {str(item_key): item_value for item_key, item_value in configured_input.items()}
  399. )
  400. return input_json
  401. def _build_team_output_text(
  402. self,
  403. *,
  404. team_version: TeamVersion,
  405. member_results: list[AgentRunContract],
  406. ) -> str:
  407. lines = [
  408. f"Team objective: {team_version.objective or 'No objective provided.'}",
  409. f"Coordination mode: {team_version.coordination_mode}",
  410. "Member results:",
  411. ]
  412. for index, result in enumerate(member_results, start=1):
  413. output_text = result.output_text or result.error_message or ""
  414. lines.append(f"{index}. agent={result.agent_id} status={result.status}: {output_text}")
  415. return "\n".join(lines)
  416. def _member_result_to_json(self, result: AgentRunContract) -> dict[str, JSONValue]:
  417. return {
  418. "agent_run_id": result.id,
  419. "agent_id": result.agent_id,
  420. "agent_version_id": result.agent_version_id,
  421. "status": result.status,
  422. "output_text": result.output_text,
  423. "output_json": result.output_json or {},
  424. "error_code": result.error_code,
  425. "error_message": result.error_message,
  426. }
  427. def _publish_event(
  428. self,
  429. *,
  430. event_type: str,
  431. team_run: TeamRun,
  432. payload_json: dict[str, JSONValue],
  433. ) -> None:
  434. if self.event_client is None:
  435. return
  436. try:
  437. self.event_client.publish_event(
  438. EventPublishContract(
  439. tenant_id=team_run.tenant_id,
  440. event_type=event_type,
  441. source_service="team-service",
  442. aggregate_type="team_run",
  443. aggregate_id=team_run.id,
  444. correlation_id=team_run.session_id,
  445. payload_json={
  446. **payload_json,
  447. "team_id": team_run.team_id,
  448. "team_version_id": team_run.team_version_id,
  449. },
  450. )
  451. )
  452. except EventServiceClientError:
  453. return
  454. def build_team_application_service(
  455. *,
  456. team_repository: TeamDefinitionRepository,
  457. team_version_repository: TeamVersionRepository,
  458. team_run_repository: TeamRunRepository,
  459. settings: TeamServiceSettings,
  460. ) -> TeamApplicationService:
  461. redis_client = try_build_redis_client(settings.redis_url)
  462. return TeamApplicationService(
  463. team_repository=team_repository,
  464. team_version_repository=team_version_repository,
  465. team_run_repository=team_run_repository,
  466. agent_client=AgentServiceClient(
  467. base_url=settings.agent_service_url,
  468. timeout_seconds=settings.agent_service_timeout_seconds,
  469. ),
  470. event_client=EventServiceClient(
  471. base_url=settings.event_service_url,
  472. timeout_seconds=settings.event_service_timeout_seconds,
  473. ),
  474. task_queue_publisher=(
  475. TaskQueuePublisher(client=redis_client) if redis_client is not None else None
  476. ),
  477. )