| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155 |
- from dataclasses import dataclass
- from collections.abc import Iterator
- from datetime import datetime, timedelta
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
- from core_domain import AgentRunContract, TeamMemberContract
- from core_shared import JSONValue, try_build_redis_client
- from core_shared.task_queue import TaskQueuePublisher
- from app.bootstrap.settings import TeamServiceSettings
- from app.db.models import TeamDefinition, TeamRun, TeamConfig
- from app.domain.repositories import (
- TeamDefinitionRepository,
- TeamRunRepository,
- TeamConfigRepository)
- from app.infrastructure.agent_client import AgentServiceClient, AgentServiceClientError
- from app.schemas.team import (
- TeamConfigCreateRequestDto,
- TeamConfigUpdateRequestDto,
- TeamCreateRequest,
- TeamCreateRequestDto,
- TeamDeleteRequestDto,
- TeamRunCreateRequest,
- TeamRunCreateRequestDto,
- TeamRunExecuteRequest,
- TeamRunStatusUpdateRequestDto,
- TeamRunStatusUpdateRequest,
- TeamStatusUpdateRequest,
- TeamUpdateRequestDto,
- TeamConfigCreateRequest)
- MAX_PRIOR_OUTPUT_TEXT_CHARS = 1600
- @dataclass(frozen=True)
- class TeamMemberRunResult:
- member: TeamMemberContract
- run: AgentRunContract
- class TeamApplicationService:
- def __init__(
- self,
- *,
- team_repository: TeamDefinitionRepository,
- team_config_repository: TeamConfigRepository,
- team_run_repository: TeamRunRepository,
- agent_client: AgentServiceClient | None = None,
- event_client: EventServiceClient | None = None,
- task_queue_publisher: TaskQueuePublisher | None = None) -> None:
- self.team_repository = team_repository
- self.team_config_repository = team_config_repository
- self.team_run_repository = team_run_repository
- self.agent_client = agent_client
- self.event_client = event_client
- self.task_queue_publisher = task_queue_publisher
- def create_team(self, payload: TeamCreateRequest) -> TeamDefinition:
- return self.team_repository.create(
- code=payload.code,
- name=payload.name,
- description=payload.description,
- team_type=payload.team_type,
- owner_user_id=payload.owner_user_id,
- metadata_json=payload.metadata_json)
- def create_team_from_contract(self, payload: TeamCreateRequestDto) -> TeamDefinition:
- return self.create_team(
- TeamCreateRequest(
- code=self._build_team_code(payload.name),
- name=payload.name,
- description=payload.description,
- team_type=payload.teamType,
- owner_user_id=payload.ownerUserId,
- metadata_json=payload.metadata))
- def list_teams(self) -> list[TeamDefinition]:
- return self.team_repository.list_all()
- def get_team(self, *, team_id: str) -> TeamDefinition | None:
- return self.team_repository.get_by_id(team_id=team_id)
- def update_team_from_contract(self, payload: TeamUpdateRequestDto) -> TeamDefinition | None:
- entity = self.team_repository.get_by_id(team_id=payload.teamId)
- if entity is None:
- return None
- if payload.name is not None:
- entity.name = payload.name
- entity.code = self._build_team_code(payload.name)
- if payload.description is not None:
- entity.description = payload.description
- if payload.teamType is not None:
- entity.team_type = payload.teamType
- if payload.status is not None:
- entity.status = payload.status
- if payload.ownerUserId is not None:
- entity.owner_user_id = payload.ownerUserId
- if payload.metadata is not None:
- entity.metadata_json = payload.metadata
- return self.team_repository.save(entity)
- def delete_team_from_contract(self, payload: TeamDeleteRequestDto) -> bool:
- entity = self.team_repository.get_by_id(team_id=payload.teamId)
- if entity is None:
- return False
- self.team_repository.delete(entity)
- return True
- def update_team_status(
- self,
- *,
- team_id: str,
- payload: TeamStatusUpdateRequest) -> TeamDefinition | None:
- return self.team_repository.update_status(
- team_id=team_id,
- status=payload.status)
- def create_team_config(self, payload: TeamConfigCreateRequest) -> TeamConfig:
- team = self.team_repository.get_by_id(team_id=payload.team_id)
- if team is None:
- raise ValueError(f"team not found: {payload.team_id}")
- if not payload.member_refs:
- raise ValueError("team config requires at least one member")
- return self.team_config_repository.create(
- team_id=payload.team_id,
- coordination_mode=payload.coordination_mode,
- objective=payload.objective,
- member_refs_json=[item.model_dump(mode="json") for item in payload.member_refs],
- policy_json=payload.policy_json)
- def create_team_config_from_contract(self, payload: TeamConfigCreateRequestDto) -> TeamConfig:
- return self.create_team_config(
- TeamConfigCreateRequest(
- team_id=payload.teamId,
- coordination_mode=payload.coordinationMode,
- objective=payload.objective,
- member_refs=self._normalize_member_refs(payload.memberRefs),
- policy_json=payload.policy))
- def list_team_configs(self, *, team_id: str | None = None) -> list[TeamConfig]:
- if team_id is not None:
- return self.team_config_repository.list_by_team(team_id=team_id)
- return self.team_config_repository.list_all()
- def get_team_config(self, *, config_id: str) -> TeamConfig | None:
- return self.team_config_repository.get_by_id(team_config_id=config_id)
- def update_team_config_from_contract(self, payload: TeamConfigUpdateRequestDto) -> TeamConfig | None:
- entity = self.team_config_repository.get_by_id(team_config_id=payload.configId)
- if entity is None:
- return None
- if payload.coordinationMode is not None:
- entity.coordination_mode = payload.coordinationMode
- if payload.objective is not None:
- entity.objective = payload.objective
- if payload.memberRefs is not None:
- normalized = self._normalize_member_refs(payload.memberRefs)
- if not normalized:
- raise ValueError("team config requires at least one member")
- entity.member_refs_json = [item.model_dump(mode="json") for item in normalized]
- if payload.policy is not None:
- entity.policy_json = payload.policy
- return self.team_config_repository.save(entity)
- def delete_team_config(self, *, config_id: str) -> bool:
- entity = self.team_config_repository.get_by_id(team_config_id=config_id)
- if entity is None:
- return False
- self.team_config_repository.delete(entity)
- return True
- def create_team_run(self, payload: TeamRunCreateRequest) -> TeamRun:
- team_config = self._resolve_team_config(
- team_id=payload.team_id,
- team_config_id=payload.team_config_id)
- if team_config is None:
- raise ValueError("team config not found")
- team_run = self.team_run_repository.create(
- team_id=payload.team_id,
- team_config_id=team_config.id,
- session_id=payload.session_id,
- input_text=payload.input_text,
- input_json=payload.input_json)
- self._publish_event(
- event_type="team.run.created",
- team_run=team_run,
- payload_json={"team_run_id": team_run.id, "status": team_run.status})
- if payload.enqueue and self.task_queue_publisher is not None:
- self.task_queue_publisher.publish_team_run(
- team_run_id=team_run.id)
- return team_run
- def create_team_run_from_contract(self, payload: TeamRunCreateRequestDto) -> TeamRun:
- return self.create_team_run(
- TeamRunCreateRequest(
- team_id=payload.teamId,
- team_config_id=payload.teamConfigId,
- session_id=payload.sessionId,
- input_text=payload.inputText,
- input_json=payload.inputJson,
- enqueue=payload.enqueue))
- def list_team_runs(
- self,
- *,
- team_id: str | None = None,
- session_id: str | None = None) -> list[TeamRun]:
- return self.team_run_repository.list_by_scope(
- team_id=team_id,
- session_id=session_id)
- def get_team_run(self, *, team_run_id: str) -> TeamRun | None:
- return self.team_run_repository.get_by_id(team_run_id=team_run_id)
- def delete_team_run(self, *, team_run_id: str) -> bool:
- entity = self.team_run_repository.get_by_id(team_run_id=team_run_id)
- if entity is None:
- return False
- self.team_run_repository.delete(entity)
- return True
- def update_team_run_status(
- self,
- *,
- team_run_id: str,
- payload: TeamRunStatusUpdateRequest) -> TeamRun | None:
- entity = self.team_run_repository.get_by_id(
- team_run_id=team_run_id)
- if entity is None:
- return None
- return self.team_run_repository.update_status(
- team_run_id=team_run_id,
- status=payload.status,
- worker_key=payload.worker_key,
- output_text=payload.output_text,
- output_json=payload.output_json,
- error_code=payload.error_code,
- error_message=payload.error_message)
- def update_team_run_status_from_contract(
- self,
- payload: TeamRunStatusUpdateRequestDto) -> TeamRun | None:
- return self.update_team_run_status(
- team_run_id=payload.teamRunId,
- payload=TeamRunStatusUpdateRequest(
- status=payload.status,
- worker_key=payload.workerKey,
- output_text=payload.outputText,
- output_json=payload.outputJson,
- error_code=payload.errorCode,
- error_message=payload.errorMessage))
- def execute_team_run(
- self,
- *,
- team_run_id: str,
- payload: TeamRunExecuteRequest) -> TeamRun | None:
- team_run = self.team_run_repository.get_by_id(
- team_run_id=team_run_id)
- if team_run is None:
- return None
- team_config = self.team_config_repository.get_by_id(
- team_config_id=team_run.team_config_id)
- if team_config is None:
- failed_run = self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="team_config_missing",
- error_message=f"team config not found: {team_run.team_config_id}")
- if failed_run is not None:
- self._publish_event(
- event_type="team.run.failed",
- team_run=failed_run,
- payload_json={
- "team_run_id": failed_run.id,
- "status": failed_run.status,
- "error_code": "team_config_missing",
- })
- return failed_run
- running_run = self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="running",
- worker_key=payload.worker_key)
- if running_run is None:
- return None
- members = self._read_team_members(team_config)
- if not members:
- return self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="team_members_missing",
- error_message="team config has no valid members")
- try:
- member_results = self._execute_members(
- team_run=team_run,
- team_config=team_config,
- members=members,
- worker_key=payload.worker_key,
- dry_run=payload.dry_run)
- except AgentServiceClientError as exc:
- return self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="agent_service_error",
- error_message=str(exc))
- failed_results = [item for item in member_results if item.run.status != "completed"]
- output_text = self._build_team_output_text(
- team_config=team_config,
- member_results=member_results)
- output_json: dict[str, JSONValue] = {
- "dry_run": payload.dry_run,
- "coordination_mode": team_config.coordination_mode,
- "team_config_id": team_config.id,
- "member_run_count": len(member_results),
- "member_results": [
- self._member_result_to_json(item) for item in member_results
- ],
- }
- failure_mode = self._read_failure_mode(team_config)
- if failed_results and failure_mode != "continue_with_warning":
- failed_run = self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="failed",
- worker_key=payload.worker_key,
- output_text=output_text,
- output_json=output_json,
- error_code="member_run_failed",
- error_message=f"{len(failed_results)} member run(s) failed")
- if failed_run is not None:
- self._publish_event(
- event_type="team.run.failed",
- team_run=failed_run,
- payload_json={
- "team_run_id": failed_run.id,
- "status": failed_run.status,
- "failed_member_count": len(failed_results),
- })
- return failed_run
- completed_run = self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="completed",
- worker_key=payload.worker_key,
- output_text=output_text,
- output_json=output_json)
- if completed_run is not None:
- self._publish_event(
- event_type="team.run.completed",
- team_run=completed_run,
- payload_json={
- "team_run_id": completed_run.id,
- "status": completed_run.status,
- "member_run_count": len(member_results),
- })
- return completed_run
- def execute_team_run_stream(
- self,
- *,
- team_run_id: str,
- payload: TeamRunExecuteRequest) -> Iterator[dict[str, JSONValue]]:
- team_run = self.team_run_repository.get_by_id(team_run_id=team_run_id)
- if team_run is None:
- return
- team_config = self.team_config_repository.get_by_id(
- team_config_id=team_run.team_config_id)
- if team_config is None:
- failed_run = self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="team_config_missing",
- error_message=f"team config not found: {team_run.team_config_id}")
- yield {"event": "team.run.failed", "run": self._team_run_to_json(failed_run)}
- return
- running_run = self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="running",
- worker_key=payload.worker_key)
- yield {"event": "team.run.started", "run": self._team_run_to_json(running_run)}
- members = self._read_team_members(team_config)
- if not members:
- failed_run = self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="team_members_missing",
- error_message="team config has no valid members")
- yield {"event": "team.run.failed", "run": self._team_run_to_json(failed_run)}
- return
- if self.agent_client is None:
- failed_run = self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="agent_service_missing",
- error_message="agent service client is not configured")
- yield {"event": "team.run.failed", "run": self._team_run_to_json(failed_run)}
- return
- stream_members = self._select_stream_members(team_config=team_config, members=members)
- mode = team_config.coordination_mode
- if mode == "debate":
- member_results = yield from self._stream_members_debate(
- team_run=team_run, team_config=team_config, members=stream_members,
- payload=payload)
- else:
- member_results = yield from self._stream_members_sequential(
- team_run=team_run, team_config=team_config, members=stream_members,
- payload=payload)
- failed_results = [item for item in member_results if item.run.status != "completed"]
- output_text = self._build_team_output_text(
- team_config=team_config,
- member_results=member_results)
- output_json: dict[str, JSONValue] = {
- "dry_run": payload.dry_run,
- "coordination_mode": team_config.coordination_mode,
- "team_config_id": team_config.id,
- "member_run_count": len(member_results),
- "member_results": [
- self._member_result_to_json(item) for item in member_results
- ],
- "streamed": True,
- "response_mode": self._read_response_mode(team_config),
- }
- failure_mode = self._read_failure_mode(team_config)
- if failed_results and failure_mode != "continue_with_warning":
- failed_run = self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="failed",
- worker_key=payload.worker_key,
- output_text=output_text,
- output_json=output_json,
- error_code="member_run_failed",
- error_message=f"{len(failed_results)} member run(s) failed")
- yield {"event": "team.run.failed", "run": self._team_run_to_json(failed_run)}
- return
- completed_run = self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="completed",
- worker_key=payload.worker_key,
- output_text=output_text,
- output_json=output_json)
- yield {"event": "team.run.completed", "run": self._team_run_to_json(completed_run)}
- def execute_next_claimed_team_run(
- self,
- *,
- worker_key: str,
- lease_seconds: int,
- stale_running_seconds: int,
- dry_run: bool,
- redis_client: object | None = None) -> tuple[TeamRun, int] | None:
- released_lease_count = self.team_run_repository.release_expired_leases(
- now_time=datetime.utcnow(),
- stale_running_seconds=stale_running_seconds)
- claimed_team_run = self.team_run_repository.claim_next_queued(
- worker_key=worker_key,
- lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds))
- if claimed_team_run is None:
- return None
- if redis_client is not None:
- from core_shared.redis_primitives import DistributedLock, IdempotencyStore
- lock = DistributedLock(
- client=redis_client,
- name=f"team-run:{claimed_team_run.id}:lock",
- ttl_seconds=lease_seconds)
- if not lock.acquire():
- return None
- idempotency_store = IdempotencyStore(
- client=redis_client,
- prefix="team-run-idempotency")
- if not idempotency_store.begin(key=claimed_team_run.id):
- lock.release()
- return None
- else:
- lock = None
- idempotency_store = None
- try:
- result = self.execute_team_run(
- team_run_id=claimed_team_run.id,
- payload=TeamRunExecuteRequest(
- worker_key=worker_key,
- dry_run=dry_run))
- if idempotency_store is not None and result is not None:
- idempotency_store.complete(
- key=claimed_team_run.id,
- result={"status": result.status, "team_run_id": result.id})
- finally:
- if lock is not None:
- lock.release()
- if result is None:
- return None
- return result, released_lease_count
- def _resolve_team_config(
- self,
- *,
- team_id: str,
- team_config_id: str | None) -> TeamConfig | None:
- if team_config_id is not None:
- return self.team_config_repository.get_by_id(
- team_config_id=team_config_id)
- return self.team_config_repository.get_latest_by_team(
- team_id=team_id)
- def _execute_members(
- self,
- *,
- team_run: TeamRun,
- team_config: TeamConfig,
- members: list[TeamMemberContract],
- worker_key: str | None,
- dry_run: bool) -> list[TeamMemberRunResult]:
- if self.agent_client is None:
- raise AgentServiceClientError("agent service client is not configured")
- ordered_members = self._order_members(members)
- handoff = team_config.policy_json.get("handoff")
- mode = team_config.coordination_mode
- if mode == "parallel" or handoff == "parallel_merge":
- return self._execute_members_parallel(
- team_run=team_run, team_config=team_config,
- members=ordered_members, worker_key=worker_key, dry_run=dry_run)
- if mode == "pipeline":
- return self._execute_members_pipeline(
- team_run=team_run, team_config=team_config,
- members=ordered_members, worker_key=worker_key, dry_run=dry_run)
- if mode == "debate":
- return self._execute_members_debate(
- team_run=team_run, team_config=team_config,
- members=ordered_members, worker_key=worker_key, dry_run=dry_run)
- return self._execute_members_supervisor(
- team_run=team_run, team_config=team_config,
- members=ordered_members, worker_key=worker_key, dry_run=dry_run)
- def _execute_members_supervisor(
- self,
- *,
- team_run: TeamRun,
- team_config: TeamConfig,
- members: list[TeamMemberContract],
- worker_key: str | None,
- dry_run: bool) -> list[TeamMemberRunResult]:
- lead = next((m for m in members if m.role in {"supervisor", "planner"}), None)
- others = [m for m in members if m is not lead] if lead else members
- failure_mode = self._read_failure_mode(team_config)
- if lead is None:
- return self._execute_members_sequential(
- team_run=team_run, team_config=team_config, members=members,
- worker_key=worker_key, dry_run=dry_run, failure_mode=failure_mode)
- # Phase 1: lead executes first
- lead_input = self._build_member_input_json(
- team_run=team_run, team_config=team_config, member=lead, prior_outputs=[])
- lead_result = self._execute_single_member(
- team_run=team_run, team_config=team_config, member=lead,
- member_input_json=lead_input, worker_key=worker_key, dry_run=dry_run)
- if lead_result.run.status != "completed" and failure_mode == "stop_on_critical":
- return [lead_result]
- # Phase 2: others execute with lead output as context
- lead_output = self._compact_prior_output(lead_result)
- other_results = self._execute_members_sequential(
- team_run=team_run, team_config=team_config, members=others,
- worker_key=worker_key, dry_run=dry_run, failure_mode=failure_mode,
- initial_prior_outputs=[lead_output])
- # Phase 3: optional synthesis pass
- do_synthesis = team_config.policy_json.get("supervisor_synthesis", True)
- if do_synthesis and lead_result.run.status == "completed":
- all_outputs = [lead_output] + [self._compact_prior_output(r) for r in other_results]
- synthesis_input = self._build_member_input_json(
- team_run=team_run, team_config=team_config, member=lead,
- prior_outputs=all_outputs)
- synthesis_result = self._execute_single_member(
- team_run=team_run, team_config=team_config, member=lead,
- member_input_json=synthesis_input, worker_key=worker_key, dry_run=dry_run)
- return [lead_result] + other_results + [synthesis_result]
- return [lead_result] + other_results
- def _execute_members_pipeline(
- self,
- *,
- team_run: TeamRun,
- team_config: TeamConfig,
- members: list[TeamMemberContract],
- worker_key: str | None,
- dry_run: bool) -> list[TeamMemberRunResult]:
- failure_mode = self._read_failure_mode(team_config)
- member_results: list[TeamMemberRunResult] = []
- prev_output: dict[str, JSONValue] | None = None
- for member in members:
- prior = [prev_output] if prev_output is not None else []
- member_input_json = self._build_member_input_json(
- team_run=team_run, team_config=team_config, member=member, prior_outputs=prior)
- result = self._execute_single_member(
- team_run=team_run, team_config=team_config, member=member,
- member_input_json=member_input_json, worker_key=worker_key, dry_run=dry_run)
- member_results.append(result)
- if result.run.status == "completed":
- prev_output = self._compact_prior_output(result)
- elif failure_mode == "stop_on_critical":
- break
- elif failure_mode == "retry_once":
- retry_result = self._execute_single_member(
- team_run=team_run, team_config=team_config, member=member,
- member_input_json=member_input_json, worker_key=worker_key, dry_run=dry_run)
- member_results[-1] = retry_result
- if retry_result.run.status == "completed":
- prev_output = self._compact_prior_output(retry_result)
- elif failure_mode == "stop_on_critical":
- break
- return member_results
- def _execute_members_debate(
- self,
- *,
- team_run: TeamRun,
- team_config: TeamConfig,
- members: list[TeamMemberContract],
- worker_key: str | None,
- dry_run: bool) -> list[TeamMemberRunResult]:
- max_rounds = self._read_max_rounds(team_config)
- failure_mode = self._read_failure_mode(team_config)
- debate_history: list[dict[str, JSONValue]] = []
- final_results: list[TeamMemberRunResult] = []
- for round_num in range(1, max_rounds + 1):
- round_results: list[TeamMemberRunResult] = []
- for member in members:
- member_input_json = self._build_member_input_json(
- team_run=team_run, team_config=team_config, member=member,
- prior_outputs=debate_history)
- result = self._execute_single_member(
- team_run=team_run, team_config=team_config, member=member,
- member_input_json=member_input_json, worker_key=worker_key, dry_run=dry_run)
- round_results.append(result)
- debate_history.append(self._compact_prior_output(result))
- if result.run.status != "completed" and failure_mode == "stop_on_critical":
- break
- if result.run.status != "completed" and failure_mode == "retry_once":
- retry = self._execute_single_member(
- team_run=team_run, team_config=team_config, member=member,
- member_input_json=member_input_json, worker_key=worker_key, dry_run=dry_run)
- round_results[-1] = retry
- debate_history[-1] = self._compact_prior_output(retry)
- final_results = round_results
- if any(r.run.status != "completed" for r in round_results) and failure_mode == "stop_on_critical":
- break
- return final_results
- def _execute_members_sequential(
- self,
- *,
- team_run: TeamRun,
- team_config: TeamConfig,
- members: list[TeamMemberContract],
- worker_key: str | None,
- dry_run: bool,
- failure_mode: str = "stop_on_critical",
- initial_prior_outputs: list[dict[str, JSONValue]] | None = None) -> list[TeamMemberRunResult]:
- member_results: list[TeamMemberRunResult] = []
- prior_outputs = list(initial_prior_outputs or [])
- for member in members:
- member_input_json = self._build_member_input_json(
- team_run=team_run, team_config=team_config, member=member, prior_outputs=prior_outputs)
- result = self._execute_single_member(
- team_run=team_run, team_config=team_config, member=member,
- member_input_json=member_input_json, worker_key=worker_key, dry_run=dry_run)
- member_results.append(result)
- prior_outputs.append(self._compact_prior_output(result))
- if result.run.status != "completed":
- if failure_mode == "stop_on_critical":
- break
- if failure_mode == "retry_once":
- retry = self._execute_single_member(
- team_run=team_run, team_config=team_config, member=member,
- member_input_json=member_input_json, worker_key=worker_key, dry_run=dry_run)
- member_results[-1] = retry
- prior_outputs[-1] = self._compact_prior_output(retry)
- if retry.run.status != "completed" and failure_mode == "stop_on_critical":
- break
- return member_results
- def _execute_members_parallel(
- self,
- *,
- team_run: TeamRun,
- team_config: TeamConfig,
- members: list[TeamMemberContract],
- worker_key: str | None,
- dry_run: bool) -> list[TeamMemberRunResult]:
- results_by_key: dict[str, TeamMemberRunResult] = {}
- with ThreadPoolExecutor(max_workers=max(1, min(len(members), 8))) as executor:
- future_by_member = {
- executor.submit(
- self._execute_single_member,
- team_run=team_run,
- team_config=team_config,
- member=member,
- member_input_json=self._build_member_input_json(
- team_run=team_run,
- team_config=team_config,
- member=member,
- prior_outputs=[]),
- worker_key=worker_key,
- dry_run=dry_run): member
- for member in members
- }
- for future in as_completed(future_by_member):
- member = future_by_member[future]
- results_by_key[member.member_key] = future.result()
- return [results_by_key[member.member_key] for member in members]
- def _execute_single_member(
- self,
- *,
- team_run: TeamRun,
- team_config: TeamConfig,
- member: TeamMemberContract,
- member_input_json: dict[str, JSONValue],
- worker_key: str | None,
- dry_run: bool) -> TeamMemberRunResult:
- if self.agent_client is None:
- raise AgentServiceClientError("agent service client is not configured")
- created_run = self.agent_client.create_agent_run(
- agent_id=member.agent_id,
- agent_config_id=member.agent_config_id,
- session_id=team_run.session_id,
- input_text=self._build_member_input_text(
- team_run=team_run,
- team_config=team_config,
- member=member),
- input_json=member_input_json)
- executed_run = self.agent_client.execute_agent_run(
- agent_run_id=created_run.id,
- worker_key=worker_key,
- dry_run=dry_run)
- return TeamMemberRunResult(member=member, run=executed_run)
- def _stream_single_member(
- self,
- *,
- team_run: TeamRun,
- team_config: TeamConfig,
- member: TeamMemberContract,
- prior_outputs: list[dict[str, JSONValue]],
- payload: TeamRunExecuteRequest) -> Iterator[tuple[dict[str, JSONValue], TeamMemberRunResult]]:
- member_input_json = self._build_member_input_json(
- team_run=team_run, team_config=team_config, member=member, prior_outputs=prior_outputs)
- created_run = self.agent_client.create_agent_run(
- agent_id=member.agent_id,
- agent_config_id=member.agent_config_id,
- session_id=team_run.session_id,
- input_text=self._build_member_input_text(
- team_run=team_run, team_config=team_config, member=member),
- input_json=member_input_json)
- yield {
- "event": "team.member.started",
- "member": self._member_to_json(member),
- "agent_run": created_run.model_dump(mode="json"),
- }, None
- final_agent_run = created_run
- try:
- for event_name, data in self.agent_client.execute_agent_run_stream(
- agent_run_id=created_run.id,
- worker_key=payload.worker_key,
- dry_run=payload.dry_run):
- if event_name == "agent.run.delta":
- delta = data.get("delta")
- if isinstance(delta, str):
- yield {
- "event": "team.member.delta",
- "member": self._member_to_json(member),
- "agent_run_id": created_run.id,
- "delta": delta,
- }, None
- elif event_name in {"agent.run.completed", "agent.run.failed"}:
- run_payload = data.get("run")
- if isinstance(run_payload, dict):
- final_agent_run = AgentRunContract.model_validate(run_payload)
- except AgentServiceClientError as exc:
- final_agent_run = created_run.model_copy(update={
- "status": "failed",
- "error_code": "agent_service_error",
- "error_message": str(exc),
- })
- result = TeamMemberRunResult(member=member, run=final_agent_run)
- yield {
- "event": "team.member.completed",
- "member": self._member_to_json(member),
- "agent_run": final_agent_run.model_dump(mode="json"),
- }, result
- def _stream_members_sequential(
- self,
- *,
- team_run: TeamRun,
- team_config: TeamConfig,
- members: list[TeamMemberContract],
- payload: TeamRunExecuteRequest) -> Iterator[list[TeamMemberRunResult]]:
- member_results: list[TeamMemberRunResult] = []
- prior_outputs: list[dict[str, JSONValue]] = []
- for member in members:
- for event, result in self._stream_single_member(
- team_run=team_run, team_config=team_config, member=member,
- prior_outputs=prior_outputs, payload=payload):
- if result is not None:
- member_results.append(result)
- prior_outputs.append(self._compact_prior_output(result))
- else:
- yield event
- return member_results
- def _stream_members_debate(
- self,
- *,
- team_run: TeamRun,
- team_config: TeamConfig,
- members: list[TeamMemberContract],
- payload: TeamRunExecuteRequest) -> Iterator[list[TeamMemberRunResult]]:
- max_rounds = self._read_max_rounds(team_config)
- debate_history: list[dict[str, JSONValue]] = []
- final_results: list[TeamMemberRunResult] = []
- for round_num in range(1, max_rounds + 1):
- yield {
- "event": "team.debate.round_started",
- "round": round_num,
- "max_rounds": max_rounds,
- }
- round_results: list[TeamMemberRunResult] = []
- for member in members:
- for event, result in self._stream_single_member(
- team_run=team_run, team_config=team_config, member=member,
- prior_outputs=debate_history, payload=payload):
- if result is not None:
- round_results.append(result)
- debate_history.append(self._compact_prior_output(result))
- else:
- yield event
- final_results = round_results
- yield {
- "event": "team.debate.round_completed",
- "round": round_num,
- "max_rounds": max_rounds,
- "member_count": len(round_results),
- }
- return final_results
- def _read_team_members(self, team_config: TeamConfig) -> list[TeamMemberContract]:
- members: list[TeamMemberContract] = []
- for item in team_config.member_refs_json:
- try:
- members.append(TeamMemberContract.model_validate(item))
- except ValueError:
- continue
- return members
- def _order_members(self, members: list[TeamMemberContract]) -> list[TeamMemberContract]:
- role_priority = {
- "planner": 0,
- "supervisor": 1,
- "specialist": 2,
- "executor": 3,
- "reviewer": 4,
- }
- return sorted(members, key=lambda item: role_priority.get(item.role, 10))
- def _select_stream_members(
- self,
- *,
- team_config: TeamConfig,
- members: list[TeamMemberContract]) -> list[TeamMemberContract]:
- ordered_members = self._order_members(members)
- if self._read_response_mode(team_config) == "all_members":
- return ordered_members
- for member in ordered_members:
- if member.role in {"supervisor", "planner"}:
- return [member]
- return ordered_members[:1]
- def _read_response_mode(self, team_config: TeamConfig) -> str:
- value = team_config.policy_json.get("response_mode")
- if isinstance(value, str) and value in {"single_responder", "all_members"}:
- return value
- return "single_responder"
- def _read_max_rounds(self, team_config: TeamConfig) -> int:
- value = team_config.policy_json.get("max_rounds")
- if isinstance(value, (int, float)):
- return max(1, min(int(value), 20))
- return 3
- def _read_failure_mode(self, team_config: TeamConfig) -> str:
- value = team_config.policy_json.get("failure_mode")
- if isinstance(value, str) and value in {"stop_on_critical", "continue_with_warning", "retry_once"}:
- return value
- return "stop_on_critical"
- def _build_member_input_text(
- self,
- *,
- team_run: TeamRun,
- team_config: TeamConfig,
- member: TeamMemberContract) -> str:
- lines = [
- f"Team objective: {team_config.objective or 'No objective provided.'}",
- f"Member role: {member.role}",
- ]
- if member.responsibility:
- lines.append(f"Responsibility: {member.responsibility}")
- if team_run.input_text:
- lines.append(f"User task: {team_run.input_text}")
- return "\n".join(lines)
- def _build_member_input_json(
- self,
- *,
- team_run: TeamRun,
- team_config: TeamConfig,
- member: TeamMemberContract,
- prior_outputs: list[dict[str, JSONValue]]) -> dict[str, JSONValue]:
- input_json: dict[str, JSONValue] = dict(team_run.input_json or {})
- input_json.update(
- {
- "team_id": team_run.team_id,
- "team_run_id": team_run.id,
- "team_config_id": team_config.id,
- "team_objective": team_config.objective,
- "member_key": member.member_key,
- "member_role": member.role,
- "member_responsibility": member.responsibility,
- "prior_member_outputs": prior_outputs,
- }
- )
- configured_input = member.config_json.get("input_json")
- if isinstance(configured_input, dict):
- input_json.update(
- {str(item_key): item_value for item_key, item_value in configured_input.items()}
- )
- return input_json
- def _build_team_output_text(
- self,
- *,
- team_config: TeamConfig,
- member_results: list[TeamMemberRunResult]) -> str:
- lines = [
- f"Team objective: {team_config.objective or 'No objective provided.'}",
- f"Coordination mode: {team_config.coordination_mode}",
- "Team conversation:",
- ]
- for index, item in enumerate(member_results, start=1):
- output_text = item.run.output_text or item.run.error_message or ""
- speaker = item.member.name or item.member.member_key
- lines.append(
- f"{index}. {speaker} ({item.member.role}) "
- f"status={item.run.status}: {output_text}")
- return "\n".join(lines)
- def _member_result_to_json(self, result: TeamMemberRunResult) -> dict[str, JSONValue]:
- return {
- "member_key": result.member.member_key,
- "member_role": result.member.role,
- "member_name": result.member.name,
- "member_responsibility": result.member.responsibility,
- "agent_run_id": result.run.id,
- "agent_id": result.run.agent_id,
- "agent_config_id": result.run.agent_config_id,
- "status": result.run.status,
- "output_text": result.run.output_text,
- "output_json": self._compact_agent_output_json(result.run.output_json or {}),
- "error_code": result.run.error_code,
- "error_message": result.run.error_message,
- }
- def _member_to_json(self, member: TeamMemberContract) -> dict[str, JSONValue]:
- return member.model_dump(mode="json")
- def _team_run_to_json(self, team_run: TeamRun | None) -> dict[str, JSONValue]:
- if team_run is None:
- return {}
- return {
- "id": team_run.id,
- "team_id": team_run.team_id,
- "team_config_id": team_run.team_config_id,
- "session_id": team_run.session_id,
- "input_text": team_run.input_text,
- "input_json": team_run.input_json,
- "output_text": team_run.output_text,
- "output_json": team_run.output_json,
- "status": team_run.status,
- "worker_key": team_run.worker_key,
- "queued_time": team_run.queued_time,
- "lease_expire_time": team_run.lease_expire_time,
- "started_time": team_run.started_time,
- "finished_time": team_run.finished_time,
- "error_code": team_run.error_code,
- "error_message": team_run.error_message,
- "created_time": team_run.created_time,
- }
- def _compact_prior_output(self, result: TeamMemberRunResult) -> dict[str, JSONValue]:
- return {
- "member_key": result.member.member_key,
- "member_role": result.member.role,
- "member_name": result.member.name,
- "agent_id": result.run.agent_id,
- "agent_run_id": result.run.id,
- "status": result.run.status,
- "output_text": self._truncate_text(result.run.output_text),
- "error_code": result.run.error_code,
- "error_message": result.run.error_message,
- "output_json": self._compact_agent_output_json(result.run.output_json or {}),
- }
- def _compact_agent_output_json(
- self,
- output_json: dict[str, JSONValue]) -> dict[str, JSONValue]:
- keep_keys = {
- "dry_run",
- "model",
- "finish_reason",
- "usage_json",
- "tool_invocations",
- "skill_invocations",
- "selected_tool_refs",
- "selected_skill_refs",
- "memory_read_enabled",
- "memory_read_count",
- "memory_read_reason",
- }
- compacted = {
- key: value for key, value in output_json.items()
- if key in keep_keys
- }
- if "raw_response_json" in output_json or "messages" in output_json:
- compacted["debug_payload_omitted"] = True
- return compacted
- def _truncate_text(self, value: str | None) -> str | None:
- if value is None or len(value) <= MAX_PRIOR_OUTPUT_TEXT_CHARS:
- return value
- return f"{value[:MAX_PRIOR_OUTPUT_TEXT_CHARS]}... [truncated]"
- def _publish_event(
- self,
- *,
- event_type: str,
- team_run: TeamRun,
- payload_json: dict[str, JSONValue]) -> None:
- if self.event_client is None:
- return
- try:
- self.event_client.publish_event(
- EventPublishContract(
- event_type=event_type,
- source_service="team-service",
- aggregate_type="team_run",
- aggregate_id=team_run.id,
- correlation_id=team_run.session_id,
- payload_json={
- **payload_json,
- "team_id": team_run.team_id,
- "team_config_id": team_run.team_config_id,
- })
- )
- except EventServiceClientError:
- return
- def _build_team_code(self, name: str) -> str:
- base = "".join(
- char.lower() if char.isalnum() else "_"
- for char in name
- ).strip("_") or "team"
- return base[:64]
- def _normalize_member_refs(self, member_refs: list[dict[str, JSONValue]]) -> list[TeamMemberContract]:
- members: list[TeamMemberContract] = []
- for index, item in enumerate(member_refs, start=1):
- role = item.get("role")
- normalized_role = "specialist" if role == "worker" else role
- member = {
- **item,
- "member_key": item.get("member_key") or item.get("memberKey") or f"member_{index}",
- "agent_id": item.get("agent_id") or item.get("agentId"),
- "agent_config_id": item.get("agent_config_id") or item.get("agentConfigId"),
- "role": normalized_role or "specialist",
- "config_json": item.get("config_json") or item.get("configJson") or {},
- }
- members.append(TeamMemberContract.model_validate(member))
- return members
- def build_team_application_service(
- *,
- team_repository: TeamDefinitionRepository,
- team_config_repository: TeamConfigRepository,
- team_run_repository: TeamRunRepository,
- settings: TeamServiceSettings) -> TeamApplicationService:
- redis_client = try_build_redis_client(settings.redis_url)
- return TeamApplicationService(
- team_repository=team_repository,
- team_config_repository=team_config_repository,
- team_run_repository=team_run_repository,
- agent_client=AgentServiceClient(
- base_url=settings.agent_service_url,
- timeout_seconds=settings.agent_service_timeout_seconds),
- event_client=EventServiceClient(
- base_url=settings.event_service_url,
- timeout_seconds=settings.event_service_timeout_seconds),
- task_queue_publisher=(
- TaskQueuePublisher(client=redis_client) if redis_client is not None else None
- ))
|