| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500 |
- from datetime import datetime, timedelta
- from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
- from core_domain import AgentRunContract, TeamMemberContract
- from core_shared import JSONValue
- from app.bootstrap.settings import TeamServiceSettings
- from app.db.models import TeamDefinition, TeamRun, TeamVersion
- from app.domain.repositories import (
- TeamDefinitionRepository,
- TeamRunRepository,
- TeamVersionRepository,
- )
- from app.infrastructure.agent_client import AgentServiceClient, AgentServiceClientError
- from app.schemas.team import (
- TeamCreateRequest,
- TeamRunCreateRequest,
- TeamRunExecuteRequest,
- TeamRunStatusUpdateRequest,
- TeamStatusUpdateRequest,
- TeamVersionCreateRequest,
- )
- class TeamApplicationService:
- def __init__(
- self,
- *,
- team_repository: TeamDefinitionRepository,
- team_version_repository: TeamVersionRepository,
- team_run_repository: TeamRunRepository,
- agent_client: AgentServiceClient | None = None,
- event_client: EventServiceClient | None = None,
- ) -> None:
- self.team_repository = team_repository
- self.team_version_repository = team_version_repository
- self.team_run_repository = team_run_repository
- self.agent_client = agent_client
- self.event_client = event_client
- def create_team(self, payload: TeamCreateRequest) -> TeamDefinition:
- return self.team_repository.create(
- tenant_id=payload.tenant_id,
- code=payload.code,
- name=payload.name,
- description=payload.description,
- team_type=payload.team_type,
- owner_user_id=payload.owner_user_id,
- metadata_json=payload.metadata_json,
- )
- def list_teams(self, *, tenant_id: str) -> list[TeamDefinition]:
- return self.team_repository.list_by_tenant(tenant_id=tenant_id)
- def update_team_status(
- self,
- *,
- team_id: str,
- payload: TeamStatusUpdateRequest,
- ) -> TeamDefinition | None:
- return self.team_repository.update_status(
- tenant_id=payload.tenant_id,
- team_id=team_id,
- status=payload.status,
- )
- def create_team_version(self, payload: TeamVersionCreateRequest) -> TeamVersion:
- team = self.team_repository.get_by_id(tenant_id=payload.tenant_id, team_id=payload.team_id)
- if team is None:
- raise ValueError(f"team not found: {payload.team_id}")
- if not payload.member_refs:
- raise ValueError("team version requires at least one member")
- return self.team_version_repository.create(
- tenant_id=payload.tenant_id,
- team_id=payload.team_id,
- status=payload.status,
- coordination_mode=payload.coordination_mode,
- objective=payload.objective,
- member_refs_json=[item.model_dump(mode="json") for item in payload.member_refs],
- policy_json=payload.policy_json,
- )
- def list_team_versions(self, *, tenant_id: str, team_id: str) -> list[TeamVersion]:
- return self.team_version_repository.list_by_team(tenant_id=tenant_id, team_id=team_id)
- def create_team_run(self, payload: TeamRunCreateRequest) -> TeamRun:
- team_version = self._resolve_team_version(
- tenant_id=payload.tenant_id,
- team_id=payload.team_id,
- team_version_id=payload.team_version_id,
- )
- if team_version is None:
- raise ValueError("published team version not found")
- team_run = self.team_run_repository.create(
- tenant_id=payload.tenant_id,
- team_id=payload.team_id,
- team_version_id=team_version.id,
- session_id=payload.session_id,
- input_text=payload.input_text,
- input_json=payload.input_json,
- )
- self._publish_event(
- event_type="team.run.created",
- team_run=team_run,
- payload_json={"team_run_id": team_run.id, "status": team_run.status},
- )
- return team_run
- def list_team_runs(
- self,
- *,
- tenant_id: str,
- team_id: str | None = None,
- session_id: str | None = None,
- ) -> list[TeamRun]:
- return self.team_run_repository.list_by_scope(
- tenant_id=tenant_id,
- team_id=team_id,
- session_id=session_id,
- )
- def update_team_run_status(
- self,
- *,
- team_run_id: str,
- payload: TeamRunStatusUpdateRequest,
- ) -> TeamRun | None:
- entity = self.team_run_repository.get_by_id(
- tenant_id=payload.tenant_id,
- team_run_id=team_run_id,
- )
- if entity is None:
- return None
- return self.team_run_repository.update_status(
- team_run_id=team_run_id,
- status=payload.status,
- worker_key=payload.worker_key,
- output_text=payload.output_text,
- output_json=payload.output_json,
- error_code=payload.error_code,
- error_message=payload.error_message,
- )
- def execute_team_run(
- self,
- *,
- team_run_id: str,
- payload: TeamRunExecuteRequest,
- ) -> TeamRun | None:
- team_run = self.team_run_repository.get_by_id(
- tenant_id=payload.tenant_id,
- team_run_id=team_run_id,
- )
- if team_run is None:
- return None
- team_version = self.team_version_repository.get_by_id(
- tenant_id=payload.tenant_id,
- team_version_id=team_run.team_version_id,
- )
- if team_version is None:
- failed_run = self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="team_version_missing",
- error_message=f"team version not found: {team_run.team_version_id}",
- )
- running_run = self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="running",
- worker_key=payload.worker_key,
- )
- if running_run is None:
- return None
- members = self._read_team_members(team_version)
- if not members:
- return self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="team_members_missing",
- error_message="team version has no valid members",
- )
- try:
- member_results = self._execute_members(
- team_run=team_run,
- team_version=team_version,
- members=members,
- worker_key=payload.worker_key,
- dry_run=payload.dry_run,
- )
- except AgentServiceClientError as exc:
- return self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="agent_service_error",
- error_message=str(exc),
- )
- failed_results = [item for item in member_results if item.status != "completed"]
- output_text = self._build_team_output_text(
- team_version=team_version,
- member_results=member_results,
- )
- output_json: dict[str, JSONValue] = {
- "dry_run": payload.dry_run,
- "coordination_mode": team_version.coordination_mode,
- "team_version_id": team_version.id,
- "member_run_count": len(member_results),
- "member_results": [
- self._member_result_to_json(item) for item in member_results
- ],
- }
- if failed_results:
- return self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="failed",
- worker_key=payload.worker_key,
- output_text=output_text,
- output_json=output_json,
- error_code="member_run_failed",
- error_message=f"{len(failed_results)} member run(s) failed",
- )
- if failed_run is not None:
- self._publish_event(
- event_type="team.run.failed",
- team_run=failed_run,
- payload_json={
- "team_run_id": failed_run.id,
- "status": failed_run.status,
- "failed_member_count": len(failed_results),
- },
- )
- return failed_run
- completed_run = self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="completed",
- worker_key=payload.worker_key,
- output_text=output_text,
- output_json=output_json,
- )
- if completed_run is not None:
- self._publish_event(
- event_type="team.run.completed",
- team_run=completed_run,
- payload_json={
- "team_run_id": completed_run.id,
- "status": completed_run.status,
- "member_run_count": len(member_results),
- },
- )
- return completed_run
- def execute_next_claimed_team_run(
- self,
- *,
- worker_key: str,
- lease_seconds: int,
- dry_run: bool,
- ) -> tuple[TeamRun, int] | None:
- released_lease_count = self.team_run_repository.release_expired_leases(
- now_time=datetime.utcnow(),
- )
- claimed_team_run = self.team_run_repository.claim_next_queued(
- worker_key=worker_key,
- lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds),
- )
- if claimed_team_run is None:
- return None
- result = self.execute_team_run(
- team_run_id=claimed_team_run.id,
- payload=TeamRunExecuteRequest(
- tenant_id=claimed_team_run.tenant_id,
- worker_key=worker_key,
- dry_run=dry_run,
- ),
- )
- if result is None:
- return None
- return result, released_lease_count
- def _resolve_team_version(
- self,
- *,
- tenant_id: str,
- team_id: str,
- team_version_id: str | None,
- ) -> TeamVersion | None:
- if team_version_id is not None:
- return self.team_version_repository.get_by_id(
- tenant_id=tenant_id,
- team_version_id=team_version_id,
- )
- return self.team_version_repository.get_latest_published(
- tenant_id=tenant_id,
- team_id=team_id,
- )
- def _execute_members(
- self,
- *,
- team_run: TeamRun,
- team_version: TeamVersion,
- members: list[TeamMemberContract],
- worker_key: str | None,
- dry_run: bool,
- ) -> list[AgentRunContract]:
- if self.agent_client is None:
- raise AgentServiceClientError("agent service client is not configured")
- member_results: list[AgentRunContract] = []
- prior_outputs: list[dict[str, JSONValue]] = []
- for member in self._order_members(members):
- member_input_json = self._build_member_input_json(
- team_run=team_run,
- team_version=team_version,
- member=member,
- prior_outputs=prior_outputs,
- )
- created_run = self.agent_client.create_agent_run(
- tenant_id=team_run.tenant_id,
- agent_id=member.agent_id,
- agent_version_id=member.agent_version_id,
- session_id=team_run.session_id,
- input_text=self._build_member_input_text(
- team_run=team_run,
- team_version=team_version,
- member=member,
- ),
- input_json=member_input_json,
- )
- executed_run = self.agent_client.execute_agent_run(
- tenant_id=team_run.tenant_id,
- agent_run_id=created_run.id,
- worker_key=worker_key,
- dry_run=dry_run,
- )
- member_results.append(executed_run)
- prior_outputs.append(
- {
- "member_key": member.member_key,
- "role": member.role,
- "agent_run_id": executed_run.id,
- "status": executed_run.status,
- "output_text": executed_run.output_text,
- "output_json": executed_run.output_json or {},
- }
- )
- return member_results
- def _read_team_members(self, team_version: TeamVersion) -> list[TeamMemberContract]:
- members: list[TeamMemberContract] = []
- for item in team_version.member_refs_json:
- try:
- members.append(TeamMemberContract.model_validate(item))
- except ValueError:
- continue
- return members
- def _order_members(self, members: list[TeamMemberContract]) -> list[TeamMemberContract]:
- role_priority = {
- "planner": 0,
- "supervisor": 1,
- "specialist": 2,
- "executor": 3,
- "reviewer": 4,
- }
- return sorted(members, key=lambda item: role_priority.get(item.role, 10))
- def _build_member_input_text(
- self,
- *,
- team_run: TeamRun,
- team_version: TeamVersion,
- member: TeamMemberContract,
- ) -> str:
- lines = [
- f"Team objective: {team_version.objective or 'No objective provided.'}",
- f"Member role: {member.role}",
- ]
- if member.responsibility:
- lines.append(f"Responsibility: {member.responsibility}")
- if team_run.input_text:
- lines.append(f"User task: {team_run.input_text}")
- return "\n".join(lines)
- def _build_member_input_json(
- self,
- *,
- team_run: TeamRun,
- team_version: TeamVersion,
- member: TeamMemberContract,
- prior_outputs: list[dict[str, JSONValue]],
- ) -> dict[str, JSONValue]:
- input_json: dict[str, JSONValue] = dict(team_run.input_json or {})
- input_json.update(
- {
- "team_id": team_run.team_id,
- "team_run_id": team_run.id,
- "team_version_id": team_version.id,
- "team_objective": team_version.objective,
- "member_key": member.member_key,
- "member_role": member.role,
- "member_responsibility": member.responsibility,
- "prior_member_outputs": prior_outputs,
- }
- )
- configured_input = member.config_json.get("input_json")
- if isinstance(configured_input, dict):
- input_json.update(
- {str(item_key): item_value for item_key, item_value in configured_input.items()}
- )
- return input_json
- def _build_team_output_text(
- self,
- *,
- team_version: TeamVersion,
- member_results: list[AgentRunContract],
- ) -> str:
- lines = [
- f"Team objective: {team_version.objective or 'No objective provided.'}",
- f"Coordination mode: {team_version.coordination_mode}",
- "Member results:",
- ]
- for index, result in enumerate(member_results, start=1):
- output_text = result.output_text or result.error_message or ""
- lines.append(f"{index}. agent={result.agent_id} status={result.status}: {output_text}")
- return "\n".join(lines)
- def _member_result_to_json(self, result: AgentRunContract) -> dict[str, JSONValue]:
- return {
- "agent_run_id": result.id,
- "agent_id": result.agent_id,
- "agent_version_id": result.agent_version_id,
- "status": result.status,
- "output_text": result.output_text,
- "output_json": result.output_json or {},
- "error_code": result.error_code,
- "error_message": result.error_message,
- }
- def _publish_event(
- self,
- *,
- event_type: str,
- team_run: TeamRun,
- payload_json: dict[str, JSONValue],
- ) -> None:
- if self.event_client is None:
- return
- try:
- self.event_client.publish_event(
- EventPublishContract(
- tenant_id=team_run.tenant_id,
- event_type=event_type,
- source_service="team-service",
- aggregate_type="team_run",
- aggregate_id=team_run.id,
- correlation_id=team_run.session_id,
- payload_json={
- **payload_json,
- "team_id": team_run.team_id,
- "team_version_id": team_run.team_version_id,
- },
- )
- )
- except EventServiceClientError:
- return
- def build_team_application_service(
- *,
- team_repository: TeamDefinitionRepository,
- team_version_repository: TeamVersionRepository,
- team_run_repository: TeamRunRepository,
- settings: TeamServiceSettings,
- ) -> TeamApplicationService:
- return TeamApplicationService(
- team_repository=team_repository,
- team_version_repository=team_version_repository,
- team_run_repository=team_run_repository,
- agent_client=AgentServiceClient(
- base_url=settings.agent_service_url,
- timeout_seconds=settings.agent_service_timeout_seconds,
- ),
- event_client=EventServiceClient(
- base_url=settings.event_service_url,
- timeout_seconds=settings.event_service_timeout_seconds,
- ),
- )
|