services.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. from datetime import datetime, timedelta
  2. from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
  3. from core_domain import AgentRunContract, TeamMemberContract
  4. from core_shared import JSONValue, try_build_redis_client
  5. from core_shared.task_queue import TaskQueuePublisher
  6. from app.bootstrap.settings import TeamServiceSettings
  7. from app.db.models import TeamDefinition, TeamRun, TeamVersion
  8. from app.domain.repositories import (
  9. TeamDefinitionRepository,
  10. TeamRunRepository,
  11. TeamVersionRepository)
  12. from app.infrastructure.agent_client import AgentServiceClient, AgentServiceClientError
  13. from app.schemas.team import (
  14. TeamCreateRequest,
  15. TeamRunCreateRequest,
  16. TeamRunExecuteRequest,
  17. TeamRunStatusUpdateRequest,
  18. TeamStatusUpdateRequest,
  19. TeamVersionCreateRequest)
  20. class TeamApplicationService:
  21. def __init__(
  22. self,
  23. *,
  24. team_repository: TeamDefinitionRepository,
  25. team_version_repository: TeamVersionRepository,
  26. team_run_repository: TeamRunRepository,
  27. agent_client: AgentServiceClient | None = None,
  28. event_client: EventServiceClient | None = None,
  29. task_queue_publisher: TaskQueuePublisher | None = None) -> None:
  30. self.team_repository = team_repository
  31. self.team_version_repository = team_version_repository
  32. self.team_run_repository = team_run_repository
  33. self.agent_client = agent_client
  34. self.event_client = event_client
  35. self.task_queue_publisher = task_queue_publisher
  36. def create_team(self, payload: TeamCreateRequest) -> TeamDefinition:
  37. return self.team_repository.create(
  38. code=payload.code,
  39. name=payload.name,
  40. description=payload.description,
  41. team_type=payload.team_type,
  42. owner_user_id=payload.owner_user_id,
  43. metadata_json=payload.metadata_json)
  44. def list_teams(self) -> list[TeamDefinition]:
  45. return self.team_repository.list_all()
  46. def update_team_status(
  47. self,
  48. *,
  49. team_id: str,
  50. payload: TeamStatusUpdateRequest) -> TeamDefinition | None:
  51. return self.team_repository.update_status(
  52. team_id=team_id,
  53. status=payload.status)
  54. def create_team_version(self, payload: TeamVersionCreateRequest) -> TeamVersion:
  55. team = self.team_repository.get_by_id(team_id=payload.team_id)
  56. if team is None:
  57. raise ValueError(f"team not found: {payload.team_id}")
  58. if not payload.member_refs:
  59. raise ValueError("team version requires at least one member")
  60. return self.team_version_repository.create(
  61. team_id=payload.team_id,
  62. status=payload.status,
  63. coordination_mode=payload.coordination_mode,
  64. objective=payload.objective,
  65. member_refs_json=[item.model_dump(mode="json") for item in payload.member_refs],
  66. policy_json=payload.policy_json)
  67. def list_team_versions(self, *, team_id: str) -> list[TeamVersion]:
  68. return self.team_version_repository.list_by_team(team_id=team_id)
  69. def create_team_run(self, payload: TeamRunCreateRequest) -> TeamRun:
  70. team_version = self._resolve_team_version(
  71. team_id=payload.team_id,
  72. team_version_id=payload.team_version_id)
  73. if team_version is None:
  74. raise ValueError("published team version not found")
  75. team_run = self.team_run_repository.create(
  76. team_id=payload.team_id,
  77. team_version_id=team_version.id,
  78. session_id=payload.session_id,
  79. input_text=payload.input_text,
  80. input_json=payload.input_json)
  81. self._publish_event(
  82. event_type="team.run.created",
  83. team_run=team_run,
  84. payload_json={"team_run_id": team_run.id, "status": team_run.status})
  85. if self.task_queue_publisher is not None:
  86. self.task_queue_publisher.publish_team_run(
  87. team_run_id=team_run.id)
  88. return team_run
  89. def list_team_runs(
  90. self,
  91. *,
  92. team_id: str | None = None,
  93. session_id: str | None = None) -> list[TeamRun]:
  94. return self.team_run_repository.list_by_scope(
  95. team_id=team_id,
  96. session_id=session_id)
  97. def update_team_run_status(
  98. self,
  99. *,
  100. team_run_id: str,
  101. payload: TeamRunStatusUpdateRequest) -> TeamRun | None:
  102. entity = self.team_run_repository.get_by_id(
  103. team_run_id=team_run_id)
  104. if entity is None:
  105. return None
  106. return self.team_run_repository.update_status(
  107. team_run_id=team_run_id,
  108. status=payload.status,
  109. worker_key=payload.worker_key,
  110. output_text=payload.output_text,
  111. output_json=payload.output_json,
  112. error_code=payload.error_code,
  113. error_message=payload.error_message)
  114. def execute_team_run(
  115. self,
  116. *,
  117. team_run_id: str,
  118. payload: TeamRunExecuteRequest) -> TeamRun | None:
  119. team_run = self.team_run_repository.get_by_id(
  120. team_run_id=team_run_id)
  121. if team_run is None:
  122. return None
  123. team_version = self.team_version_repository.get_by_id(
  124. team_version_id=team_run.team_version_id)
  125. if team_version is None:
  126. failed_run = self.team_run_repository.update_status(
  127. team_run_id=team_run.id,
  128. status="failed",
  129. worker_key=payload.worker_key,
  130. error_code="team_version_missing",
  131. error_message=f"team version not found: {team_run.team_version_id}")
  132. running_run = self.team_run_repository.update_status(
  133. team_run_id=team_run.id,
  134. status="running",
  135. worker_key=payload.worker_key)
  136. if running_run is None:
  137. return None
  138. members = self._read_team_members(team_version)
  139. if not members:
  140. return self.team_run_repository.update_status(
  141. team_run_id=team_run.id,
  142. status="failed",
  143. worker_key=payload.worker_key,
  144. error_code="team_members_missing",
  145. error_message="team version has no valid members")
  146. try:
  147. member_results = self._execute_members(
  148. team_run=team_run,
  149. team_version=team_version,
  150. members=members,
  151. worker_key=payload.worker_key,
  152. dry_run=payload.dry_run)
  153. except AgentServiceClientError as exc:
  154. return self.team_run_repository.update_status(
  155. team_run_id=team_run.id,
  156. status="failed",
  157. worker_key=payload.worker_key,
  158. error_code="agent_service_error",
  159. error_message=str(exc))
  160. failed_results = [item for item in member_results if item.status != "completed"]
  161. output_text = self._build_team_output_text(
  162. team_version=team_version,
  163. member_results=member_results)
  164. output_json: dict[str, JSONValue] = {
  165. "dry_run": payload.dry_run,
  166. "coordination_mode": team_version.coordination_mode,
  167. "team_version_id": team_version.id,
  168. "member_run_count": len(member_results),
  169. "member_results": [
  170. self._member_result_to_json(item) for item in member_results
  171. ],
  172. }
  173. if failed_results:
  174. return self.team_run_repository.update_status(
  175. team_run_id=team_run.id,
  176. status="failed",
  177. worker_key=payload.worker_key,
  178. output_text=output_text,
  179. output_json=output_json,
  180. error_code="member_run_failed",
  181. error_message=f"{len(failed_results)} member run(s) failed")
  182. if failed_run is not None:
  183. self._publish_event(
  184. event_type="team.run.failed",
  185. team_run=failed_run,
  186. payload_json={
  187. "team_run_id": failed_run.id,
  188. "status": failed_run.status,
  189. "failed_member_count": len(failed_results),
  190. })
  191. return failed_run
  192. completed_run = self.team_run_repository.update_status(
  193. team_run_id=team_run.id,
  194. status="completed",
  195. worker_key=payload.worker_key,
  196. output_text=output_text,
  197. output_json=output_json)
  198. if completed_run is not None:
  199. self._publish_event(
  200. event_type="team.run.completed",
  201. team_run=completed_run,
  202. payload_json={
  203. "team_run_id": completed_run.id,
  204. "status": completed_run.status,
  205. "member_run_count": len(member_results),
  206. })
  207. return completed_run
  208. def execute_next_claimed_team_run(
  209. self,
  210. *,
  211. worker_key: str,
  212. lease_seconds: int,
  213. dry_run: bool) -> tuple[TeamRun, int] | None:
  214. released_lease_count = self.team_run_repository.release_expired_leases(
  215. now_time=datetime.utcnow())
  216. claimed_team_run = self.team_run_repository.claim_next_queued(
  217. worker_key=worker_key,
  218. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds))
  219. if claimed_team_run is None:
  220. return None
  221. result = self.execute_team_run(
  222. team_run_id=claimed_team_run.id,
  223. payload=TeamRunExecuteRequest(
  224. worker_key=worker_key,
  225. dry_run=dry_run))
  226. if result is None:
  227. return None
  228. return result, released_lease_count
  229. def _resolve_team_version(
  230. self,
  231. *,
  232. team_id: str,
  233. team_version_id: str | None) -> TeamVersion | None:
  234. if team_version_id is not None:
  235. return self.team_version_repository.get_by_id(
  236. team_version_id=team_version_id)
  237. return self.team_version_repository.get_latest_published(
  238. team_id=team_id)
  239. def _execute_members(
  240. self,
  241. *,
  242. team_run: TeamRun,
  243. team_version: TeamVersion,
  244. members: list[TeamMemberContract],
  245. worker_key: str | None,
  246. dry_run: bool) -> list[AgentRunContract]:
  247. if self.agent_client is None:
  248. raise AgentServiceClientError("agent service client is not configured")
  249. member_results: list[AgentRunContract] = []
  250. prior_outputs: list[dict[str, JSONValue]] = []
  251. for member in self._order_members(members):
  252. member_input_json = self._build_member_input_json(
  253. team_run=team_run,
  254. team_version=team_version,
  255. member=member,
  256. prior_outputs=prior_outputs)
  257. created_run = self.agent_client.create_agent_run(
  258. agent_id=member.agent_id,
  259. agent_version_id=member.agent_version_id,
  260. session_id=team_run.session_id,
  261. input_text=self._build_member_input_text(
  262. team_run=team_run,
  263. team_version=team_version,
  264. member=member),
  265. input_json=member_input_json)
  266. executed_run = self.agent_client.execute_agent_run(
  267. agent_run_id=created_run.id,
  268. worker_key=worker_key,
  269. dry_run=dry_run)
  270. member_results.append(executed_run)
  271. prior_outputs.append(
  272. {
  273. "member_key": member.member_key,
  274. "role": member.role,
  275. "agent_run_id": executed_run.id,
  276. "status": executed_run.status,
  277. "output_text": executed_run.output_text,
  278. "output_json": executed_run.output_json or {},
  279. }
  280. )
  281. return member_results
  282. def _read_team_members(self, team_version: TeamVersion) -> list[TeamMemberContract]:
  283. members: list[TeamMemberContract] = []
  284. for item in team_version.member_refs_json:
  285. try:
  286. members.append(TeamMemberContract.model_validate(item))
  287. except ValueError:
  288. continue
  289. return members
  290. def _order_members(self, members: list[TeamMemberContract]) -> list[TeamMemberContract]:
  291. role_priority = {
  292. "planner": 0,
  293. "supervisor": 1,
  294. "specialist": 2,
  295. "executor": 3,
  296. "reviewer": 4,
  297. }
  298. return sorted(members, key=lambda item: role_priority.get(item.role, 10))
  299. def _build_member_input_text(
  300. self,
  301. *,
  302. team_run: TeamRun,
  303. team_version: TeamVersion,
  304. member: TeamMemberContract) -> str:
  305. lines = [
  306. f"Team objective: {team_version.objective or 'No objective provided.'}",
  307. f"Member role: {member.role}",
  308. ]
  309. if member.responsibility:
  310. lines.append(f"Responsibility: {member.responsibility}")
  311. if team_run.input_text:
  312. lines.append(f"User task: {team_run.input_text}")
  313. return "\n".join(lines)
  314. def _build_member_input_json(
  315. self,
  316. *,
  317. team_run: TeamRun,
  318. team_version: TeamVersion,
  319. member: TeamMemberContract,
  320. prior_outputs: list[dict[str, JSONValue]]) -> dict[str, JSONValue]:
  321. input_json: dict[str, JSONValue] = dict(team_run.input_json or {})
  322. input_json.update(
  323. {
  324. "team_id": team_run.team_id,
  325. "team_run_id": team_run.id,
  326. "team_version_id": team_version.id,
  327. "team_objective": team_version.objective,
  328. "member_key": member.member_key,
  329. "member_role": member.role,
  330. "member_responsibility": member.responsibility,
  331. "prior_member_outputs": prior_outputs,
  332. }
  333. )
  334. configured_input = member.config_json.get("input_json")
  335. if isinstance(configured_input, dict):
  336. input_json.update(
  337. {str(item_key): item_value for item_key, item_value in configured_input.items()}
  338. )
  339. return input_json
  340. def _build_team_output_text(
  341. self,
  342. *,
  343. team_version: TeamVersion,
  344. member_results: list[AgentRunContract]) -> str:
  345. lines = [
  346. f"Team objective: {team_version.objective or 'No objective provided.'}",
  347. f"Coordination mode: {team_version.coordination_mode}",
  348. "Member results:",
  349. ]
  350. for index, result in enumerate(member_results, start=1):
  351. output_text = result.output_text or result.error_message or ""
  352. lines.append(f"{index}. agent={result.agent_id} status={result.status}: {output_text}")
  353. return "\n".join(lines)
  354. def _member_result_to_json(self, result: AgentRunContract) -> dict[str, JSONValue]:
  355. return {
  356. "agent_run_id": result.id,
  357. "agent_id": result.agent_id,
  358. "agent_version_id": result.agent_version_id,
  359. "status": result.status,
  360. "output_text": result.output_text,
  361. "output_json": result.output_json or {},
  362. "error_code": result.error_code,
  363. "error_message": result.error_message,
  364. }
  365. def _publish_event(
  366. self,
  367. *,
  368. event_type: str,
  369. team_run: TeamRun,
  370. payload_json: dict[str, JSONValue]) -> None:
  371. if self.event_client is None:
  372. return
  373. try:
  374. self.event_client.publish_event(
  375. EventPublishContract(
  376. event_type=event_type,
  377. source_service="team-service",
  378. aggregate_type="team_run",
  379. aggregate_id=team_run.id,
  380. correlation_id=team_run.session_id,
  381. payload_json={
  382. **payload_json,
  383. "team_id": team_run.team_id,
  384. "team_version_id": team_run.team_version_id,
  385. })
  386. )
  387. except EventServiceClientError:
  388. return
  389. def build_team_application_service(
  390. *,
  391. team_repository: TeamDefinitionRepository,
  392. team_version_repository: TeamVersionRepository,
  393. team_run_repository: TeamRunRepository,
  394. settings: TeamServiceSettings) -> TeamApplicationService:
  395. redis_client = try_build_redis_client(settings.redis_url)
  396. return TeamApplicationService(
  397. team_repository=team_repository,
  398. team_version_repository=team_version_repository,
  399. team_run_repository=team_run_repository,
  400. agent_client=AgentServiceClient(
  401. base_url=settings.agent_service_url,
  402. timeout_seconds=settings.agent_service_timeout_seconds),
  403. event_client=EventServiceClient(
  404. base_url=settings.event_service_url,
  405. timeout_seconds=settings.event_service_timeout_seconds),
  406. task_queue_publisher=(
  407. TaskQueuePublisher(client=redis_client) if redis_client is not None else None
  408. ))