|
|
@@ -1,12 +1,20 @@
|
|
|
+from datetime import datetime, timedelta
|
|
|
+
|
|
|
+from core_domain import AgentRunContract, TeamMemberContract
|
|
|
+from core_shared import JSONValue
|
|
|
+
|
|
|
+from app.bootstrap.settings import TeamServiceSettings
|
|
|
from app.db.models import TeamDefinition, TeamRun, TeamVersion
|
|
|
from app.domain.repositories import (
|
|
|
TeamDefinitionRepository,
|
|
|
TeamRunRepository,
|
|
|
TeamVersionRepository,
|
|
|
)
|
|
|
+from app.infrastructure.agent_client import AgentServiceClient, AgentServiceClientError
|
|
|
from app.schemas.team import (
|
|
|
TeamCreateRequest,
|
|
|
TeamRunCreateRequest,
|
|
|
+ TeamRunExecuteRequest,
|
|
|
TeamRunStatusUpdateRequest,
|
|
|
TeamStatusUpdateRequest,
|
|
|
TeamVersionCreateRequest,
|
|
|
@@ -20,10 +28,12 @@ class TeamApplicationService:
|
|
|
team_repository: TeamDefinitionRepository,
|
|
|
team_version_repository: TeamVersionRepository,
|
|
|
team_run_repository: TeamRunRepository,
|
|
|
+ agent_client: AgentServiceClient | None = None,
|
|
|
) -> None:
|
|
|
self.team_repository = team_repository
|
|
|
self.team_version_repository = team_version_repository
|
|
|
self.team_run_repository = team_run_repository
|
|
|
+ self.agent_client = agent_client
|
|
|
|
|
|
def create_team(self, payload: TeamCreateRequest) -> TeamDefinition:
|
|
|
return self.team_repository.create(
|
|
|
@@ -124,6 +134,129 @@ class TeamApplicationService:
|
|
|
error_message=payload.error_message,
|
|
|
)
|
|
|
|
|
|
+ def execute_team_run(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ team_run_id: str,
|
|
|
+ payload: TeamRunExecuteRequest,
|
|
|
+ ) -> TeamRun | None:
|
|
|
+ team_run = self.team_run_repository.get_by_id(
|
|
|
+ tenant_id=payload.tenant_id,
|
|
|
+ team_run_id=team_run_id,
|
|
|
+ )
|
|
|
+ if team_run is None:
|
|
|
+ return None
|
|
|
+
|
|
|
+ team_version = self.team_version_repository.get_by_id(
|
|
|
+ tenant_id=payload.tenant_id,
|
|
|
+ team_version_id=team_run.team_version_id,
|
|
|
+ )
|
|
|
+ if team_version is None:
|
|
|
+ return self.team_run_repository.update_status(
|
|
|
+ team_run_id=team_run.id,
|
|
|
+ status="failed",
|
|
|
+ worker_key=payload.worker_key,
|
|
|
+ error_code="team_version_missing",
|
|
|
+ error_message=f"team version not found: {team_run.team_version_id}",
|
|
|
+ )
|
|
|
+
|
|
|
+ running_run = self.team_run_repository.update_status(
|
|
|
+ team_run_id=team_run.id,
|
|
|
+ status="running",
|
|
|
+ worker_key=payload.worker_key,
|
|
|
+ )
|
|
|
+ if running_run is None:
|
|
|
+ return None
|
|
|
+
|
|
|
+ members = self._read_team_members(team_version)
|
|
|
+ if not members:
|
|
|
+ return self.team_run_repository.update_status(
|
|
|
+ team_run_id=team_run.id,
|
|
|
+ status="failed",
|
|
|
+ worker_key=payload.worker_key,
|
|
|
+ error_code="team_members_missing",
|
|
|
+ error_message="team version has no valid members",
|
|
|
+ )
|
|
|
+
|
|
|
+ try:
|
|
|
+ member_results = self._execute_members(
|
|
|
+ team_run=team_run,
|
|
|
+ team_version=team_version,
|
|
|
+ members=members,
|
|
|
+ worker_key=payload.worker_key,
|
|
|
+ dry_run=payload.dry_run,
|
|
|
+ )
|
|
|
+ except AgentServiceClientError as exc:
|
|
|
+ return self.team_run_repository.update_status(
|
|
|
+ team_run_id=team_run.id,
|
|
|
+ status="failed",
|
|
|
+ worker_key=payload.worker_key,
|
|
|
+ error_code="agent_service_error",
|
|
|
+ error_message=str(exc),
|
|
|
+ )
|
|
|
+
|
|
|
+ failed_results = [item for item in member_results if item.status != "completed"]
|
|
|
+ output_text = self._build_team_output_text(
|
|
|
+ team_version=team_version,
|
|
|
+ member_results=member_results,
|
|
|
+ )
|
|
|
+ output_json: dict[str, JSONValue] = {
|
|
|
+ "dry_run": payload.dry_run,
|
|
|
+ "coordination_mode": team_version.coordination_mode,
|
|
|
+ "team_version_id": team_version.id,
|
|
|
+ "member_run_count": len(member_results),
|
|
|
+ "member_results": [
|
|
|
+ self._member_result_to_json(item) for item in member_results
|
|
|
+ ],
|
|
|
+ }
|
|
|
+ if failed_results:
|
|
|
+ return self.team_run_repository.update_status(
|
|
|
+ team_run_id=team_run.id,
|
|
|
+ status="failed",
|
|
|
+ worker_key=payload.worker_key,
|
|
|
+ output_text=output_text,
|
|
|
+ output_json=output_json,
|
|
|
+ error_code="member_run_failed",
|
|
|
+ error_message=f"{len(failed_results)} member run(s) failed",
|
|
|
+ )
|
|
|
+
|
|
|
+ return self.team_run_repository.update_status(
|
|
|
+ team_run_id=team_run.id,
|
|
|
+ status="completed",
|
|
|
+ worker_key=payload.worker_key,
|
|
|
+ output_text=output_text,
|
|
|
+ output_json=output_json,
|
|
|
+ )
|
|
|
+
|
|
|
+ def execute_next_claimed_team_run(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ worker_key: str,
|
|
|
+ lease_seconds: int,
|
|
|
+ dry_run: bool,
|
|
|
+ ) -> tuple[TeamRun, int] | None:
|
|
|
+ released_lease_count = self.team_run_repository.release_expired_leases(
|
|
|
+ now_time=datetime.utcnow(),
|
|
|
+ )
|
|
|
+ claimed_team_run = self.team_run_repository.claim_next_queued(
|
|
|
+ worker_key=worker_key,
|
|
|
+ lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds),
|
|
|
+ )
|
|
|
+ if claimed_team_run is None:
|
|
|
+ return None
|
|
|
+
|
|
|
+ result = self.execute_team_run(
|
|
|
+ team_run_id=claimed_team_run.id,
|
|
|
+ payload=TeamRunExecuteRequest(
|
|
|
+ tenant_id=claimed_team_run.tenant_id,
|
|
|
+ worker_key=worker_key,
|
|
|
+ dry_run=dry_run,
|
|
|
+ ),
|
|
|
+ )
|
|
|
+ if result is None:
|
|
|
+ return None
|
|
|
+ return result, released_lease_count
|
|
|
+
|
|
|
def _resolve_team_version(
|
|
|
self,
|
|
|
*,
|
|
|
@@ -140,3 +273,165 @@ class TeamApplicationService:
|
|
|
tenant_id=tenant_id,
|
|
|
team_id=team_id,
|
|
|
)
|
|
|
+
|
|
|
+ def _execute_members(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ team_run: TeamRun,
|
|
|
+ team_version: TeamVersion,
|
|
|
+ members: list[TeamMemberContract],
|
|
|
+ worker_key: str | None,
|
|
|
+ dry_run: bool,
|
|
|
+ ) -> list[AgentRunContract]:
|
|
|
+ if self.agent_client is None:
|
|
|
+ raise AgentServiceClientError("agent service client is not configured")
|
|
|
+
|
|
|
+ member_results: list[AgentRunContract] = []
|
|
|
+ prior_outputs: list[dict[str, JSONValue]] = []
|
|
|
+ for member in self._order_members(members):
|
|
|
+ member_input_json = self._build_member_input_json(
|
|
|
+ team_run=team_run,
|
|
|
+ team_version=team_version,
|
|
|
+ member=member,
|
|
|
+ prior_outputs=prior_outputs,
|
|
|
+ )
|
|
|
+ created_run = self.agent_client.create_agent_run(
|
|
|
+ tenant_id=team_run.tenant_id,
|
|
|
+ agent_id=member.agent_id,
|
|
|
+ agent_version_id=member.agent_version_id,
|
|
|
+ session_id=team_run.session_id,
|
|
|
+ input_text=self._build_member_input_text(
|
|
|
+ team_run=team_run,
|
|
|
+ team_version=team_version,
|
|
|
+ member=member,
|
|
|
+ ),
|
|
|
+ input_json=member_input_json,
|
|
|
+ )
|
|
|
+ executed_run = self.agent_client.execute_agent_run(
|
|
|
+ tenant_id=team_run.tenant_id,
|
|
|
+ agent_run_id=created_run.id,
|
|
|
+ worker_key=worker_key,
|
|
|
+ dry_run=dry_run,
|
|
|
+ )
|
|
|
+ member_results.append(executed_run)
|
|
|
+ prior_outputs.append(
|
|
|
+ {
|
|
|
+ "member_key": member.member_key,
|
|
|
+ "role": member.role,
|
|
|
+ "agent_run_id": executed_run.id,
|
|
|
+ "status": executed_run.status,
|
|
|
+ "output_text": executed_run.output_text,
|
|
|
+ "output_json": executed_run.output_json or {},
|
|
|
+ }
|
|
|
+ )
|
|
|
+ return member_results
|
|
|
+
|
|
|
+ def _read_team_members(self, team_version: TeamVersion) -> list[TeamMemberContract]:
|
|
|
+ members: list[TeamMemberContract] = []
|
|
|
+ for item in team_version.member_refs_json:
|
|
|
+ try:
|
|
|
+ members.append(TeamMemberContract.model_validate(item))
|
|
|
+ except ValueError:
|
|
|
+ continue
|
|
|
+ return members
|
|
|
+
|
|
|
+ def _order_members(self, members: list[TeamMemberContract]) -> list[TeamMemberContract]:
|
|
|
+ role_priority = {
|
|
|
+ "planner": 0,
|
|
|
+ "supervisor": 1,
|
|
|
+ "specialist": 2,
|
|
|
+ "executor": 3,
|
|
|
+ "reviewer": 4,
|
|
|
+ }
|
|
|
+ return sorted(members, key=lambda item: role_priority.get(item.role, 10))
|
|
|
+
|
|
|
+ def _build_member_input_text(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ team_run: TeamRun,
|
|
|
+ team_version: TeamVersion,
|
|
|
+ member: TeamMemberContract,
|
|
|
+ ) -> str:
|
|
|
+ lines = [
|
|
|
+ f"Team objective: {team_version.objective or 'No objective provided.'}",
|
|
|
+ f"Member role: {member.role}",
|
|
|
+ ]
|
|
|
+ if member.responsibility:
|
|
|
+ lines.append(f"Responsibility: {member.responsibility}")
|
|
|
+ if team_run.input_text:
|
|
|
+ lines.append(f"User task: {team_run.input_text}")
|
|
|
+ return "\n".join(lines)
|
|
|
+
|
|
|
+ def _build_member_input_json(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ team_run: TeamRun,
|
|
|
+ team_version: TeamVersion,
|
|
|
+ member: TeamMemberContract,
|
|
|
+ prior_outputs: list[dict[str, JSONValue]],
|
|
|
+ ) -> dict[str, JSONValue]:
|
|
|
+ input_json: dict[str, JSONValue] = dict(team_run.input_json or {})
|
|
|
+ input_json.update(
|
|
|
+ {
|
|
|
+ "team_id": team_run.team_id,
|
|
|
+ "team_run_id": team_run.id,
|
|
|
+ "team_version_id": team_version.id,
|
|
|
+ "team_objective": team_version.objective,
|
|
|
+ "member_key": member.member_key,
|
|
|
+ "member_role": member.role,
|
|
|
+ "member_responsibility": member.responsibility,
|
|
|
+ "prior_member_outputs": prior_outputs,
|
|
|
+ }
|
|
|
+ )
|
|
|
+ configured_input = member.config_json.get("input_json")
|
|
|
+ if isinstance(configured_input, dict):
|
|
|
+ input_json.update(
|
|
|
+ {str(item_key): item_value for item_key, item_value in configured_input.items()}
|
|
|
+ )
|
|
|
+ return input_json
|
|
|
+
|
|
|
+ def _build_team_output_text(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ team_version: TeamVersion,
|
|
|
+ member_results: list[AgentRunContract],
|
|
|
+ ) -> str:
|
|
|
+ lines = [
|
|
|
+ f"Team objective: {team_version.objective or 'No objective provided.'}",
|
|
|
+ f"Coordination mode: {team_version.coordination_mode}",
|
|
|
+ "Member results:",
|
|
|
+ ]
|
|
|
+ for index, result in enumerate(member_results, start=1):
|
|
|
+ output_text = result.output_text or result.error_message or ""
|
|
|
+ lines.append(f"{index}. agent={result.agent_id} status={result.status}: {output_text}")
|
|
|
+ return "\n".join(lines)
|
|
|
+
|
|
|
+ def _member_result_to_json(self, result: AgentRunContract) -> dict[str, JSONValue]:
|
|
|
+ return {
|
|
|
+ "agent_run_id": result.id,
|
|
|
+ "agent_id": result.agent_id,
|
|
|
+ "agent_version_id": result.agent_version_id,
|
|
|
+ "status": result.status,
|
|
|
+ "output_text": result.output_text,
|
|
|
+ "output_json": result.output_json or {},
|
|
|
+ "error_code": result.error_code,
|
|
|
+ "error_message": result.error_message,
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def build_team_application_service(
|
|
|
+ *,
|
|
|
+ team_repository: TeamDefinitionRepository,
|
|
|
+ team_version_repository: TeamVersionRepository,
|
|
|
+ team_run_repository: TeamRunRepository,
|
|
|
+ settings: TeamServiceSettings,
|
|
|
+) -> TeamApplicationService:
|
|
|
+ return TeamApplicationService(
|
|
|
+ team_repository=team_repository,
|
|
|
+ team_version_repository=team_version_repository,
|
|
|
+ team_run_repository=team_run_repository,
|
|
|
+ agent_client=AgentServiceClient(
|
|
|
+ base_url=settings.agent_service_url,
|
|
|
+ timeout_seconds=settings.agent_service_timeout_seconds,
|
|
|
+ ),
|
|
|
+ )
|