from dataclasses import dataclass from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor, as_completed from core_events import EventPublishContract, EventServiceClient, EventServiceClientError from core_domain import AgentRunContract, TeamMemberContract from core_shared import JSONValue, try_build_redis_client from core_shared.task_queue import TaskQueuePublisher from app.bootstrap.settings import TeamServiceSettings from app.db.models import TeamDefinition, TeamRun, TeamConfig from app.domain.repositories import ( TeamDefinitionRepository, TeamRunRepository, TeamConfigRepository) from app.infrastructure.agent_client import AgentServiceClient, AgentServiceClientError from app.schemas.team import ( TeamConfigCreateRequestDto, TeamConfigUpdateRequestDto, TeamCreateRequest, TeamCreateRequestDto, TeamDeleteRequestDto, TeamRunCreateRequest, TeamRunCreateRequestDto, TeamRunExecuteRequest, TeamRunStatusUpdateRequestDto, TeamRunStatusUpdateRequest, TeamStatusUpdateRequest, TeamUpdateRequestDto, TeamConfigCreateRequest) MAX_PRIOR_OUTPUT_TEXT_CHARS = 1600 @dataclass(frozen=True) class TeamMemberRunResult: member: TeamMemberContract run: AgentRunContract class TeamApplicationService: def __init__( self, *, team_repository: TeamDefinitionRepository, team_config_repository: TeamConfigRepository, team_run_repository: TeamRunRepository, agent_client: AgentServiceClient | None = None, event_client: EventServiceClient | None = None, task_queue_publisher: TaskQueuePublisher | None = None) -> None: self.team_repository = team_repository self.team_config_repository = team_config_repository self.team_run_repository = team_run_repository self.agent_client = agent_client self.event_client = event_client self.task_queue_publisher = task_queue_publisher def create_team(self, payload: TeamCreateRequest) -> TeamDefinition: return self.team_repository.create( code=payload.code, name=payload.name, description=payload.description, team_type=payload.team_type, owner_user_id=payload.owner_user_id, metadata_json=payload.metadata_json) def create_team_from_contract(self, payload: TeamCreateRequestDto) -> TeamDefinition: return self.create_team( TeamCreateRequest( code=self._build_team_code(payload.name), name=payload.name, description=payload.description, team_type=payload.teamType, owner_user_id=payload.ownerUserId, metadata_json=payload.metadata)) def list_teams(self) -> list[TeamDefinition]: return self.team_repository.list_all() def get_team(self, *, team_id: str) -> TeamDefinition | None: return self.team_repository.get_by_id(team_id=team_id) def update_team_from_contract(self, payload: TeamUpdateRequestDto) -> TeamDefinition | None: entity = self.team_repository.get_by_id(team_id=payload.teamId) if entity is None: return None if payload.name is not None: entity.name = payload.name entity.code = self._build_team_code(payload.name) if payload.description is not None: entity.description = payload.description if payload.teamType is not None: entity.team_type = payload.teamType if payload.status is not None: entity.status = payload.status if payload.ownerUserId is not None: entity.owner_user_id = payload.ownerUserId if payload.metadata is not None: entity.metadata_json = payload.metadata return self.team_repository.save(entity) def delete_team_from_contract(self, payload: TeamDeleteRequestDto) -> bool: entity = self.team_repository.get_by_id(team_id=payload.teamId) if entity is None: return False self.team_repository.delete(entity) return True def update_team_status( self, *, team_id: str, payload: TeamStatusUpdateRequest) -> TeamDefinition | None: return self.team_repository.update_status( team_id=team_id, status=payload.status) def create_team_config(self, payload: TeamConfigCreateRequest) -> TeamConfig: team = self.team_repository.get_by_id(team_id=payload.team_id) if team is None: raise ValueError(f"team not found: {payload.team_id}") if not payload.member_refs: raise ValueError("team config requires at least one member") return self.team_config_repository.create( team_id=payload.team_id, coordination_mode=payload.coordination_mode, objective=payload.objective, member_refs_json=[item.model_dump(mode="json") for item in payload.member_refs], policy_json=payload.policy_json) def create_team_config_from_contract(self, payload: TeamConfigCreateRequestDto) -> TeamConfig: return self.create_team_config( TeamConfigCreateRequest( team_id=payload.teamId, coordination_mode=payload.coordinationMode, objective=payload.objective, member_refs=self._normalize_member_refs(payload.memberRefs), policy_json=payload.policy)) def list_team_configs(self, *, team_id: str) -> list[TeamConfig]: return self.team_config_repository.list_by_team(team_id=team_id) def list_team_configs(self, *, team_id: str | None = None) -> list[TeamConfig]: if team_id is not None: return self.team_config_repository.list_by_team(team_id=team_id) return self.team_config_repository.list_all() def get_team_config(self, *, config_id: str) -> TeamConfig | None: return self.team_config_repository.get_by_id(team_config_id=config_id) def update_team_config_from_contract(self, payload: TeamConfigUpdateRequestDto) -> TeamConfig | None: entity = self.team_config_repository.get_by_id(team_config_id=payload.configId) if entity is None: return None if payload.coordinationMode is not None: entity.coordination_mode = payload.coordinationMode if payload.objective is not None: entity.objective = payload.objective if payload.memberRefs is not None: normalized = self._normalize_member_refs(payload.memberRefs) if not normalized: raise ValueError("team config requires at least one member") entity.member_refs_json = [item.model_dump(mode="json") for item in normalized] if payload.policy is not None: entity.policy_json = payload.policy return self.team_config_repository.save(entity) def delete_team_config(self, *, config_id: str) -> bool: entity = self.team_config_repository.get_by_id(team_config_id=config_id) if entity is None: return False self.team_config_repository.delete(entity) return True def create_team_run(self, payload: TeamRunCreateRequest) -> TeamRun: team_config = self._resolve_team_config( team_id=payload.team_id, team_config_id=payload.team_config_id) if team_config is None: raise ValueError("team config not found") team_run = self.team_run_repository.create( team_id=payload.team_id, team_config_id=team_config.id, session_id=payload.session_id, input_text=payload.input_text, input_json=payload.input_json) self._publish_event( event_type="team.run.created", team_run=team_run, payload_json={"team_run_id": team_run.id, "status": team_run.status}) if self.task_queue_publisher is not None: self.task_queue_publisher.publish_team_run( team_run_id=team_run.id) return team_run def create_team_run_from_contract(self, payload: TeamRunCreateRequestDto) -> TeamRun: return self.create_team_run( TeamRunCreateRequest( team_id=payload.teamId, team_config_id=payload.teamConfigId, session_id=payload.sessionId, input_text=payload.inputText, input_json=payload.inputJson)) def list_team_runs( self, *, team_id: str | None = None, session_id: str | None = None) -> list[TeamRun]: return self.team_run_repository.list_by_scope( team_id=team_id, session_id=session_id) def get_team_run(self, *, team_run_id: str) -> TeamRun | None: return self.team_run_repository.get_by_id(team_run_id=team_run_id) def delete_team_run(self, *, team_run_id: str) -> bool: entity = self.team_run_repository.get_by_id(team_run_id=team_run_id) if entity is None: return False self.team_run_repository.delete(entity) return True def update_team_run_status( self, *, team_run_id: str, payload: TeamRunStatusUpdateRequest) -> TeamRun | None: entity = self.team_run_repository.get_by_id( team_run_id=team_run_id) if entity is None: return None return self.team_run_repository.update_status( team_run_id=team_run_id, status=payload.status, worker_key=payload.worker_key, output_text=payload.output_text, output_json=payload.output_json, error_code=payload.error_code, error_message=payload.error_message) def update_team_run_status_from_contract( self, payload: TeamRunStatusUpdateRequestDto) -> TeamRun | None: return self.update_team_run_status( team_run_id=payload.teamRunId, payload=TeamRunStatusUpdateRequest( status=payload.status, worker_key=payload.workerKey, output_text=payload.outputText, output_json=payload.outputJson, error_code=payload.errorCode, error_message=payload.errorMessage)) def execute_team_run( self, *, team_run_id: str, payload: TeamRunExecuteRequest) -> TeamRun | None: team_run = self.team_run_repository.get_by_id( team_run_id=team_run_id) if team_run is None: return None team_config = self.team_config_repository.get_by_id( team_config_id=team_run.team_config_id) if team_config is None: failed_run = self.team_run_repository.update_status( team_run_id=team_run.id, status="failed", worker_key=payload.worker_key, error_code="team_config_missing", error_message=f"team config not found: {team_run.team_config_id}") if failed_run is not None: self._publish_event( event_type="team.run.failed", team_run=failed_run, payload_json={ "team_run_id": failed_run.id, "status": failed_run.status, "error_code": "team_config_missing", }) return failed_run running_run = self.team_run_repository.update_status( team_run_id=team_run.id, status="running", worker_key=payload.worker_key) if running_run is None: return None members = self._read_team_members(team_config) if not members: return self.team_run_repository.update_status( team_run_id=team_run.id, status="failed", worker_key=payload.worker_key, error_code="team_members_missing", error_message="team config has no valid members") try: member_results = self._execute_members( team_run=team_run, team_config=team_config, members=members, worker_key=payload.worker_key, dry_run=payload.dry_run) except AgentServiceClientError as exc: return self.team_run_repository.update_status( team_run_id=team_run.id, status="failed", worker_key=payload.worker_key, error_code="agent_service_error", error_message=str(exc)) failed_results = [item for item in member_results if item.run.status != "completed"] output_text = self._build_team_output_text( team_config=team_config, member_results=member_results) output_json: dict[str, JSONValue] = { "dry_run": payload.dry_run, "coordination_mode": team_config.coordination_mode, "team_config_id": team_config.id, "member_run_count": len(member_results), "member_results": [ self._member_result_to_json(item) for item in member_results ], } if failed_results: failed_run = self.team_run_repository.update_status( team_run_id=team_run.id, status="failed", worker_key=payload.worker_key, output_text=output_text, output_json=output_json, error_code="member_run_failed", error_message=f"{len(failed_results)} member run(s) failed") if failed_run is not None: self._publish_event( event_type="team.run.failed", team_run=failed_run, payload_json={ "team_run_id": failed_run.id, "status": failed_run.status, "failed_member_count": len(failed_results), }) return failed_run completed_run = self.team_run_repository.update_status( team_run_id=team_run.id, status="completed", worker_key=payload.worker_key, output_text=output_text, output_json=output_json) if completed_run is not None: self._publish_event( event_type="team.run.completed", team_run=completed_run, payload_json={ "team_run_id": completed_run.id, "status": completed_run.status, "member_run_count": len(member_results), }) return completed_run def execute_next_claimed_team_run( self, *, worker_key: str, lease_seconds: int, stale_running_seconds: int, dry_run: bool, redis_client: object | None = None) -> tuple[TeamRun, int] | None: released_lease_count = self.team_run_repository.release_expired_leases( now_time=datetime.utcnow(), stale_running_seconds=stale_running_seconds) claimed_team_run = self.team_run_repository.claim_next_queued( worker_key=worker_key, lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds)) if claimed_team_run is None: return None if redis_client is not None: from core_shared.redis_primitives import DistributedLock, IdempotencyStore lock = DistributedLock( client=redis_client, name=f"team-run:{claimed_team_run.id}:lock", ttl_seconds=lease_seconds) if not lock.acquire(): return None idempotency_store = IdempotencyStore( client=redis_client, prefix="team-run-idempotency") if not idempotency_store.begin(key=claimed_team_run.id): lock.release() return None else: lock = None idempotency_store = None try: result = self.execute_team_run( team_run_id=claimed_team_run.id, payload=TeamRunExecuteRequest( worker_key=worker_key, dry_run=dry_run)) if idempotency_store is not None and result is not None: idempotency_store.complete( key=claimed_team_run.id, result={"status": result.status, "team_run_id": result.id}) finally: if lock is not None: lock.release() if result is None: return None return result, released_lease_count def _resolve_team_config( self, *, team_id: str, team_config_id: str | None) -> TeamConfig | None: if team_config_id is not None: return self.team_config_repository.get_by_id( team_config_id=team_config_id) return self.team_config_repository.get_latest_by_team( team_id=team_id) def _execute_members( self, *, team_run: TeamRun, team_config: TeamConfig, members: list[TeamMemberContract], worker_key: str | None, dry_run: bool) -> list[TeamMemberRunResult]: if self.agent_client is None: raise AgentServiceClientError("agent service client is not configured") ordered_members = self._order_members(members) if self._should_execute_members_in_parallel(team_config): return self._execute_members_in_parallel( team_run=team_run, team_config=team_config, members=ordered_members, worker_key=worker_key, dry_run=dry_run) member_results: list[TeamMemberRunResult] = [] prior_outputs: list[dict[str, JSONValue]] = [] for member in ordered_members: member_input_json = self._build_member_input_json( team_run=team_run, team_config=team_config, member=member, prior_outputs=prior_outputs) result = self._execute_single_member( team_run=team_run, team_config=team_config, member=member, member_input_json=member_input_json, worker_key=worker_key, dry_run=dry_run) member_results.append(result) prior_outputs.append(self._compact_prior_output(result)) return member_results def _execute_members_in_parallel( self, *, team_run: TeamRun, team_config: TeamConfig, members: list[TeamMemberContract], worker_key: str | None, dry_run: bool) -> list[TeamMemberRunResult]: results_by_key: dict[str, TeamMemberRunResult] = {} with ThreadPoolExecutor(max_workers=max(1, min(len(members), 8))) as executor: future_by_member = { executor.submit( self._execute_single_member, team_run=team_run, team_config=team_config, member=member, member_input_json=self._build_member_input_json( team_run=team_run, team_config=team_config, member=member, prior_outputs=[]), worker_key=worker_key, dry_run=dry_run): member for member in members } for future in as_completed(future_by_member): member = future_by_member[future] results_by_key[member.member_key] = future.result() return [results_by_key[member.member_key] for member in members] def _execute_single_member( self, *, team_run: TeamRun, team_config: TeamConfig, member: TeamMemberContract, member_input_json: dict[str, JSONValue], worker_key: str | None, dry_run: bool) -> TeamMemberRunResult: if self.agent_client is None: raise AgentServiceClientError("agent service client is not configured") created_run = self.agent_client.create_agent_run( agent_id=member.agent_id, agent_config_id=member.agent_config_id, session_id=team_run.session_id, input_text=self._build_member_input_text( team_run=team_run, team_config=team_config, member=member), input_json=member_input_json) executed_run = self.agent_client.execute_agent_run( agent_run_id=created_run.id, worker_key=worker_key, dry_run=dry_run) return TeamMemberRunResult(member=member, run=executed_run) def _should_execute_members_in_parallel(self, team_config: TeamConfig) -> bool: handoff = team_config.policy_json.get("handoff") return team_config.coordination_mode == "parallel" or handoff == "parallel_merge" def _read_team_members(self, team_config: TeamConfig) -> list[TeamMemberContract]: members: list[TeamMemberContract] = [] for item in team_config.member_refs_json: try: members.append(TeamMemberContract.model_validate(item)) except ValueError: continue return members def _order_members(self, members: list[TeamMemberContract]) -> list[TeamMemberContract]: role_priority = { "planner": 0, "supervisor": 1, "specialist": 2, "executor": 3, "reviewer": 4, } return sorted(members, key=lambda item: role_priority.get(item.role, 10)) def _build_member_input_text( self, *, team_run: TeamRun, team_config: TeamConfig, member: TeamMemberContract) -> str: lines = [ f"Team objective: {team_config.objective or 'No objective provided.'}", f"Member role: {member.role}", ] if member.responsibility: lines.append(f"Responsibility: {member.responsibility}") if team_run.input_text: lines.append(f"User task: {team_run.input_text}") return "\n".join(lines) def _build_member_input_json( self, *, team_run: TeamRun, team_config: TeamConfig, member: TeamMemberContract, prior_outputs: list[dict[str, JSONValue]]) -> dict[str, JSONValue]: input_json: dict[str, JSONValue] = dict(team_run.input_json or {}) input_json.update( { "team_id": team_run.team_id, "team_run_id": team_run.id, "team_config_id": team_config.id, "team_objective": team_config.objective, "member_key": member.member_key, "member_role": member.role, "member_responsibility": member.responsibility, "prior_member_outputs": prior_outputs, } ) configured_input = member.config_json.get("input_json") if isinstance(configured_input, dict): input_json.update( {str(item_key): item_value for item_key, item_value in configured_input.items()} ) return input_json def _build_team_output_text( self, *, team_config: TeamConfig, member_results: list[TeamMemberRunResult]) -> str: lines = [ f"Team objective: {team_config.objective or 'No objective provided.'}", f"Coordination mode: {team_config.coordination_mode}", "Team conversation:", ] for index, item in enumerate(member_results, start=1): output_text = item.run.output_text or item.run.error_message or "" speaker = item.member.name or item.member.member_key lines.append( f"{index}. {speaker} ({item.member.role}) " f"status={item.run.status}: {output_text}") return "\n".join(lines) def _member_result_to_json(self, result: TeamMemberRunResult) -> dict[str, JSONValue]: return { "member_key": result.member.member_key, "member_role": result.member.role, "member_name": result.member.name, "member_responsibility": result.member.responsibility, "agent_run_id": result.run.id, "agent_id": result.run.agent_id, "agent_config_id": result.run.agent_config_id, "status": result.run.status, "output_text": result.run.output_text, "output_json": self._compact_agent_output_json(result.run.output_json or {}), "error_code": result.run.error_code, "error_message": result.run.error_message, } def _compact_prior_output(self, result: TeamMemberRunResult) -> dict[str, JSONValue]: return { "member_key": result.member.member_key, "member_role": result.member.role, "member_name": result.member.name, "agent_id": result.run.agent_id, "agent_run_id": result.run.id, "status": result.run.status, "output_text": self._truncate_text(result.run.output_text), "error_code": result.run.error_code, "error_message": result.run.error_message, "output_json": self._compact_agent_output_json(result.run.output_json or {}), } def _compact_agent_output_json( self, output_json: dict[str, JSONValue]) -> dict[str, JSONValue]: keep_keys = { "dry_run", "model", "finish_reason", "usage_json", "tool_invocations", "skill_invocations", "selected_tool_refs", "selected_skill_refs", "memory_read_enabled", "memory_read_count", "memory_read_reason", } compacted = { key: value for key, value in output_json.items() if key in keep_keys } if "raw_response_json" in output_json or "messages" in output_json: compacted["debug_payload_omitted"] = True return compacted def _truncate_text(self, value: str | None) -> str | None: if value is None or len(value) <= MAX_PRIOR_OUTPUT_TEXT_CHARS: return value return f"{value[:MAX_PRIOR_OUTPUT_TEXT_CHARS]}... [truncated]" def _publish_event( self, *, event_type: str, team_run: TeamRun, payload_json: dict[str, JSONValue]) -> None: if self.event_client is None: return try: self.event_client.publish_event( EventPublishContract( event_type=event_type, source_service="team-service", aggregate_type="team_run", aggregate_id=team_run.id, correlation_id=team_run.session_id, payload_json={ **payload_json, "team_id": team_run.team_id, "team_config_id": team_run.team_config_id, }) ) except EventServiceClientError: return def _build_team_code(self, name: str) -> str: base = "".join( char.lower() if char.isalnum() else "_" for char in name ).strip("_") or "team" return base[:64] def _normalize_member_refs(self, member_refs: list[dict[str, JSONValue]]) -> list[TeamMemberContract]: members: list[TeamMemberContract] = [] for index, item in enumerate(member_refs, start=1): role = item.get("role") normalized_role = "executor" if role == "worker" else role member = { **item, "member_key": item.get("member_key") or item.get("memberKey") or f"member_{index}", "agent_id": item.get("agent_id") or item.get("agentId"), "agent_config_id": item.get("agent_config_id") or item.get("agentConfigId"), "role": normalized_role or "specialist", "config_json": item.get("config_json") or item.get("configJson") or {}, } members.append(TeamMemberContract.model_validate(member)) return members def build_team_application_service( *, team_repository: TeamDefinitionRepository, team_config_repository: TeamConfigRepository, team_run_repository: TeamRunRepository, settings: TeamServiceSettings) -> TeamApplicationService: redis_client = try_build_redis_client(settings.redis_url) return TeamApplicationService( team_repository=team_repository, team_config_repository=team_config_repository, team_run_repository=team_run_repository, agent_client=AgentServiceClient( base_url=settings.agent_service_url, timeout_seconds=settings.agent_service_timeout_seconds), event_client=EventServiceClient( base_url=settings.event_service_url, timeout_seconds=settings.event_service_timeout_seconds), task_queue_publisher=( TaskQueuePublisher(client=redis_client) if redis_client is not None else None ))