from datetime import datetime, timedelta from core_domain import AgentRunContract, TeamMemberContract from core_shared import JSONValue from app.bootstrap.settings import TeamServiceSettings from app.db.models import TeamDefinition, TeamRun, TeamVersion from app.domain.repositories import ( TeamDefinitionRepository, TeamRunRepository, TeamVersionRepository, ) from app.infrastructure.agent_client import AgentServiceClient, AgentServiceClientError from app.schemas.team import ( TeamCreateRequest, TeamRunCreateRequest, TeamRunExecuteRequest, TeamRunStatusUpdateRequest, TeamStatusUpdateRequest, TeamVersionCreateRequest, ) class TeamApplicationService: def __init__( self, *, team_repository: TeamDefinitionRepository, team_version_repository: TeamVersionRepository, team_run_repository: TeamRunRepository, agent_client: AgentServiceClient | None = None, ) -> None: self.team_repository = team_repository self.team_version_repository = team_version_repository self.team_run_repository = team_run_repository self.agent_client = agent_client def create_team(self, payload: TeamCreateRequest) -> TeamDefinition: return self.team_repository.create( tenant_id=payload.tenant_id, code=payload.code, name=payload.name, description=payload.description, team_type=payload.team_type, owner_user_id=payload.owner_user_id, metadata_json=payload.metadata_json, ) def list_teams(self, *, tenant_id: str) -> list[TeamDefinition]: return self.team_repository.list_by_tenant(tenant_id=tenant_id) def update_team_status( self, *, team_id: str, payload: TeamStatusUpdateRequest, ) -> TeamDefinition | None: return self.team_repository.update_status( tenant_id=payload.tenant_id, team_id=team_id, status=payload.status, ) def create_team_version(self, payload: TeamVersionCreateRequest) -> TeamVersion: team = self.team_repository.get_by_id(tenant_id=payload.tenant_id, team_id=payload.team_id) if team is None: raise ValueError(f"team not found: {payload.team_id}") if not payload.member_refs: raise ValueError("team version requires at least one member") return self.team_version_repository.create( tenant_id=payload.tenant_id, team_id=payload.team_id, status=payload.status, coordination_mode=payload.coordination_mode, objective=payload.objective, member_refs_json=[item.model_dump(mode="json") for item in payload.member_refs], policy_json=payload.policy_json, ) def list_team_versions(self, *, tenant_id: str, team_id: str) -> list[TeamVersion]: return self.team_version_repository.list_by_team(tenant_id=tenant_id, team_id=team_id) def create_team_run(self, payload: TeamRunCreateRequest) -> TeamRun: team_version = self._resolve_team_version( tenant_id=payload.tenant_id, team_id=payload.team_id, team_version_id=payload.team_version_id, ) if team_version is None: raise ValueError("published team version not found") return self.team_run_repository.create( tenant_id=payload.tenant_id, team_id=payload.team_id, team_version_id=team_version.id, session_id=payload.session_id, input_text=payload.input_text, input_json=payload.input_json, ) def list_team_runs( self, *, tenant_id: str, team_id: str | None = None, session_id: str | None = None, ) -> list[TeamRun]: return self.team_run_repository.list_by_scope( tenant_id=tenant_id, team_id=team_id, session_id=session_id, ) def update_team_run_status( self, *, team_run_id: str, payload: TeamRunStatusUpdateRequest, ) -> TeamRun | None: entity = self.team_run_repository.get_by_id( tenant_id=payload.tenant_id, team_run_id=team_run_id, ) if entity is None: return None return self.team_run_repository.update_status( team_run_id=team_run_id, status=payload.status, worker_key=payload.worker_key, output_text=payload.output_text, output_json=payload.output_json, error_code=payload.error_code, error_message=payload.error_message, ) def execute_team_run( self, *, team_run_id: str, payload: TeamRunExecuteRequest, ) -> TeamRun | None: team_run = self.team_run_repository.get_by_id( tenant_id=payload.tenant_id, team_run_id=team_run_id, ) if team_run is None: return None team_version = self.team_version_repository.get_by_id( tenant_id=payload.tenant_id, team_version_id=team_run.team_version_id, ) if team_version is None: return self.team_run_repository.update_status( team_run_id=team_run.id, status="failed", worker_key=payload.worker_key, error_code="team_version_missing", error_message=f"team version not found: {team_run.team_version_id}", ) running_run = self.team_run_repository.update_status( team_run_id=team_run.id, status="running", worker_key=payload.worker_key, ) if running_run is None: return None members = self._read_team_members(team_version) if not members: return self.team_run_repository.update_status( team_run_id=team_run.id, status="failed", worker_key=payload.worker_key, error_code="team_members_missing", error_message="team version has no valid members", ) try: member_results = self._execute_members( team_run=team_run, team_version=team_version, members=members, worker_key=payload.worker_key, dry_run=payload.dry_run, ) except AgentServiceClientError as exc: return self.team_run_repository.update_status( team_run_id=team_run.id, status="failed", worker_key=payload.worker_key, error_code="agent_service_error", error_message=str(exc), ) failed_results = [item for item in member_results if item.status != "completed"] output_text = self._build_team_output_text( team_version=team_version, member_results=member_results, ) output_json: dict[str, JSONValue] = { "dry_run": payload.dry_run, "coordination_mode": team_version.coordination_mode, "team_version_id": team_version.id, "member_run_count": len(member_results), "member_results": [ self._member_result_to_json(item) for item in member_results ], } if failed_results: return self.team_run_repository.update_status( team_run_id=team_run.id, status="failed", worker_key=payload.worker_key, output_text=output_text, output_json=output_json, error_code="member_run_failed", error_message=f"{len(failed_results)} member run(s) failed", ) return self.team_run_repository.update_status( team_run_id=team_run.id, status="completed", worker_key=payload.worker_key, output_text=output_text, output_json=output_json, ) def execute_next_claimed_team_run( self, *, worker_key: str, lease_seconds: int, dry_run: bool, ) -> tuple[TeamRun, int] | None: released_lease_count = self.team_run_repository.release_expired_leases( now_time=datetime.utcnow(), ) claimed_team_run = self.team_run_repository.claim_next_queued( worker_key=worker_key, lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds), ) if claimed_team_run is None: return None result = self.execute_team_run( team_run_id=claimed_team_run.id, payload=TeamRunExecuteRequest( tenant_id=claimed_team_run.tenant_id, worker_key=worker_key, dry_run=dry_run, ), ) if result is None: return None return result, released_lease_count def _resolve_team_version( self, *, tenant_id: str, team_id: str, team_version_id: str | None, ) -> TeamVersion | None: if team_version_id is not None: return self.team_version_repository.get_by_id( tenant_id=tenant_id, team_version_id=team_version_id, ) return self.team_version_repository.get_latest_published( tenant_id=tenant_id, team_id=team_id, ) def _execute_members( self, *, team_run: TeamRun, team_version: TeamVersion, members: list[TeamMemberContract], worker_key: str | None, dry_run: bool, ) -> list[AgentRunContract]: if self.agent_client is None: raise AgentServiceClientError("agent service client is not configured") member_results: list[AgentRunContract] = [] prior_outputs: list[dict[str, JSONValue]] = [] for member in self._order_members(members): member_input_json = self._build_member_input_json( team_run=team_run, team_version=team_version, member=member, prior_outputs=prior_outputs, ) created_run = self.agent_client.create_agent_run( tenant_id=team_run.tenant_id, agent_id=member.agent_id, agent_version_id=member.agent_version_id, session_id=team_run.session_id, input_text=self._build_member_input_text( team_run=team_run, team_version=team_version, member=member, ), input_json=member_input_json, ) executed_run = self.agent_client.execute_agent_run( tenant_id=team_run.tenant_id, agent_run_id=created_run.id, worker_key=worker_key, dry_run=dry_run, ) member_results.append(executed_run) prior_outputs.append( { "member_key": member.member_key, "role": member.role, "agent_run_id": executed_run.id, "status": executed_run.status, "output_text": executed_run.output_text, "output_json": executed_run.output_json or {}, } ) return member_results def _read_team_members(self, team_version: TeamVersion) -> list[TeamMemberContract]: members: list[TeamMemberContract] = [] for item in team_version.member_refs_json: try: members.append(TeamMemberContract.model_validate(item)) except ValueError: continue return members def _order_members(self, members: list[TeamMemberContract]) -> list[TeamMemberContract]: role_priority = { "planner": 0, "supervisor": 1, "specialist": 2, "executor": 3, "reviewer": 4, } return sorted(members, key=lambda item: role_priority.get(item.role, 10)) def _build_member_input_text( self, *, team_run: TeamRun, team_version: TeamVersion, member: TeamMemberContract, ) -> str: lines = [ f"Team objective: {team_version.objective or 'No objective provided.'}", f"Member role: {member.role}", ] if member.responsibility: lines.append(f"Responsibility: {member.responsibility}") if team_run.input_text: lines.append(f"User task: {team_run.input_text}") return "\n".join(lines) def _build_member_input_json( self, *, team_run: TeamRun, team_version: TeamVersion, member: TeamMemberContract, prior_outputs: list[dict[str, JSONValue]], ) -> dict[str, JSONValue]: input_json: dict[str, JSONValue] = dict(team_run.input_json or {}) input_json.update( { "team_id": team_run.team_id, "team_run_id": team_run.id, "team_version_id": team_version.id, "team_objective": team_version.objective, "member_key": member.member_key, "member_role": member.role, "member_responsibility": member.responsibility, "prior_member_outputs": prior_outputs, } ) configured_input = member.config_json.get("input_json") if isinstance(configured_input, dict): input_json.update( {str(item_key): item_value for item_key, item_value in configured_input.items()} ) return input_json def _build_team_output_text( self, *, team_version: TeamVersion, member_results: list[AgentRunContract], ) -> str: lines = [ f"Team objective: {team_version.objective or 'No objective provided.'}", f"Coordination mode: {team_version.coordination_mode}", "Member results:", ] for index, result in enumerate(member_results, start=1): output_text = result.output_text or result.error_message or "" lines.append(f"{index}. agent={result.agent_id} status={result.status}: {output_text}") return "\n".join(lines) def _member_result_to_json(self, result: AgentRunContract) -> dict[str, JSONValue]: return { "agent_run_id": result.id, "agent_id": result.agent_id, "agent_version_id": result.agent_version_id, "status": result.status, "output_text": result.output_text, "output_json": result.output_json or {}, "error_code": result.error_code, "error_message": result.error_message, } def build_team_application_service( *, team_repository: TeamDefinitionRepository, team_version_repository: TeamVersionRepository, team_run_repository: TeamRunRepository, settings: TeamServiceSettings, ) -> TeamApplicationService: return TeamApplicationService( team_repository=team_repository, team_version_repository=team_version_repository, team_run_repository=team_run_repository, agent_client=AgentServiceClient( base_url=settings.agent_service_url, timeout_seconds=settings.agent_service_timeout_seconds, ), )