| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738 |
- from dataclasses import dataclass
- from datetime import datetime, timedelta
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
- from core_domain import AgentRunContract, TeamMemberContract
- from core_shared import JSONValue, try_build_redis_client
- from core_shared.task_queue import TaskQueuePublisher
- from app.bootstrap.settings import TeamServiceSettings
- from app.db.models import TeamDefinition, TeamRun, TeamConfig
- from app.domain.repositories import (
- TeamDefinitionRepository,
- TeamRunRepository,
- TeamConfigRepository)
- from app.infrastructure.agent_client import AgentServiceClient, AgentServiceClientError
- from app.schemas.team import (
- TeamConfigCreateRequestDto,
- TeamConfigUpdateRequestDto,
- TeamCreateRequest,
- TeamCreateRequestDto,
- TeamDeleteRequestDto,
- TeamRunCreateRequest,
- TeamRunCreateRequestDto,
- TeamRunExecuteRequest,
- TeamRunStatusUpdateRequestDto,
- TeamRunStatusUpdateRequest,
- TeamStatusUpdateRequest,
- TeamUpdateRequestDto,
- TeamConfigCreateRequest)
- MAX_PRIOR_OUTPUT_TEXT_CHARS = 1600
- @dataclass(frozen=True)
- class TeamMemberRunResult:
- member: TeamMemberContract
- run: AgentRunContract
- class TeamApplicationService:
- def __init__(
- self,
- *,
- team_repository: TeamDefinitionRepository,
- team_config_repository: TeamConfigRepository,
- team_run_repository: TeamRunRepository,
- agent_client: AgentServiceClient | None = None,
- event_client: EventServiceClient | None = None,
- task_queue_publisher: TaskQueuePublisher | None = None) -> None:
- self.team_repository = team_repository
- self.team_config_repository = team_config_repository
- self.team_run_repository = team_run_repository
- self.agent_client = agent_client
- self.event_client = event_client
- self.task_queue_publisher = task_queue_publisher
- def create_team(self, payload: TeamCreateRequest) -> TeamDefinition:
- return self.team_repository.create(
- code=payload.code,
- name=payload.name,
- description=payload.description,
- team_type=payload.team_type,
- owner_user_id=payload.owner_user_id,
- metadata_json=payload.metadata_json)
- def create_team_from_contract(self, payload: TeamCreateRequestDto) -> TeamDefinition:
- return self.create_team(
- TeamCreateRequest(
- code=self._build_team_code(payload.name),
- name=payload.name,
- description=payload.description,
- team_type=payload.teamType,
- owner_user_id=payload.ownerUserId,
- metadata_json=payload.metadata))
- def list_teams(self) -> list[TeamDefinition]:
- return self.team_repository.list_all()
- def get_team(self, *, team_id: str) -> TeamDefinition | None:
- return self.team_repository.get_by_id(team_id=team_id)
- def update_team_from_contract(self, payload: TeamUpdateRequestDto) -> TeamDefinition | None:
- entity = self.team_repository.get_by_id(team_id=payload.teamId)
- if entity is None:
- return None
- if payload.name is not None:
- entity.name = payload.name
- entity.code = self._build_team_code(payload.name)
- if payload.description is not None:
- entity.description = payload.description
- if payload.teamType is not None:
- entity.team_type = payload.teamType
- if payload.status is not None:
- entity.status = payload.status
- if payload.ownerUserId is not None:
- entity.owner_user_id = payload.ownerUserId
- if payload.metadata is not None:
- entity.metadata_json = payload.metadata
- return self.team_repository.save(entity)
- def delete_team_from_contract(self, payload: TeamDeleteRequestDto) -> bool:
- entity = self.team_repository.get_by_id(team_id=payload.teamId)
- if entity is None:
- return False
- self.team_repository.delete(entity)
- return True
- def update_team_status(
- self,
- *,
- team_id: str,
- payload: TeamStatusUpdateRequest) -> TeamDefinition | None:
- return self.team_repository.update_status(
- team_id=team_id,
- status=payload.status)
- def create_team_config(self, payload: TeamConfigCreateRequest) -> TeamConfig:
- team = self.team_repository.get_by_id(team_id=payload.team_id)
- if team is None:
- raise ValueError(f"team not found: {payload.team_id}")
- if not payload.member_refs:
- raise ValueError("team config requires at least one member")
- return self.team_config_repository.create(
- team_id=payload.team_id,
- coordination_mode=payload.coordination_mode,
- objective=payload.objective,
- member_refs_json=[item.model_dump(mode="json") for item in payload.member_refs],
- policy_json=payload.policy_json)
- def create_team_config_from_contract(self, payload: TeamConfigCreateRequestDto) -> TeamConfig:
- return self.create_team_config(
- TeamConfigCreateRequest(
- team_id=payload.teamId,
- coordination_mode=payload.coordinationMode,
- objective=payload.objective,
- member_refs=self._normalize_member_refs(payload.memberRefs),
- policy_json=payload.policy))
- def list_team_configs(self, *, team_id: str) -> list[TeamConfig]:
- return self.team_config_repository.list_by_team(team_id=team_id)
- def list_team_configs(self, *, team_id: str | None = None) -> list[TeamConfig]:
- if team_id is not None:
- return self.team_config_repository.list_by_team(team_id=team_id)
- return self.team_config_repository.list_all()
- def get_team_config(self, *, config_id: str) -> TeamConfig | None:
- return self.team_config_repository.get_by_id(team_config_id=config_id)
- def update_team_config_from_contract(self, payload: TeamConfigUpdateRequestDto) -> TeamConfig | None:
- entity = self.team_config_repository.get_by_id(team_config_id=payload.configId)
- if entity is None:
- return None
- if payload.coordinationMode is not None:
- entity.coordination_mode = payload.coordinationMode
- if payload.objective is not None:
- entity.objective = payload.objective
- if payload.memberRefs is not None:
- normalized = self._normalize_member_refs(payload.memberRefs)
- if not normalized:
- raise ValueError("team config requires at least one member")
- entity.member_refs_json = [item.model_dump(mode="json") for item in normalized]
- if payload.policy is not None:
- entity.policy_json = payload.policy
- return self.team_config_repository.save(entity)
- def delete_team_config(self, *, config_id: str) -> bool:
- entity = self.team_config_repository.get_by_id(team_config_id=config_id)
- if entity is None:
- return False
- self.team_config_repository.delete(entity)
- return True
- def create_team_run(self, payload: TeamRunCreateRequest) -> TeamRun:
- team_config = self._resolve_team_config(
- team_id=payload.team_id,
- team_config_id=payload.team_config_id)
- if team_config is None:
- raise ValueError("team config not found")
- team_run = self.team_run_repository.create(
- team_id=payload.team_id,
- team_config_id=team_config.id,
- session_id=payload.session_id,
- input_text=payload.input_text,
- input_json=payload.input_json)
- self._publish_event(
- event_type="team.run.created",
- team_run=team_run,
- payload_json={"team_run_id": team_run.id, "status": team_run.status})
- if self.task_queue_publisher is not None:
- self.task_queue_publisher.publish_team_run(
- team_run_id=team_run.id)
- return team_run
- def create_team_run_from_contract(self, payload: TeamRunCreateRequestDto) -> TeamRun:
- return self.create_team_run(
- TeamRunCreateRequest(
- team_id=payload.teamId,
- team_config_id=payload.teamConfigId,
- session_id=payload.sessionId,
- input_text=payload.inputText,
- input_json=payload.inputJson))
- def list_team_runs(
- self,
- *,
- team_id: str | None = None,
- session_id: str | None = None) -> list[TeamRun]:
- return self.team_run_repository.list_by_scope(
- team_id=team_id,
- session_id=session_id)
- def get_team_run(self, *, team_run_id: str) -> TeamRun | None:
- return self.team_run_repository.get_by_id(team_run_id=team_run_id)
- def delete_team_run(self, *, team_run_id: str) -> bool:
- entity = self.team_run_repository.get_by_id(team_run_id=team_run_id)
- if entity is None:
- return False
- self.team_run_repository.delete(entity)
- return True
- def update_team_run_status(
- self,
- *,
- team_run_id: str,
- payload: TeamRunStatusUpdateRequest) -> TeamRun | None:
- entity = self.team_run_repository.get_by_id(
- team_run_id=team_run_id)
- if entity is None:
- return None
- return self.team_run_repository.update_status(
- team_run_id=team_run_id,
- status=payload.status,
- worker_key=payload.worker_key,
- output_text=payload.output_text,
- output_json=payload.output_json,
- error_code=payload.error_code,
- error_message=payload.error_message)
- def update_team_run_status_from_contract(
- self,
- payload: TeamRunStatusUpdateRequestDto) -> TeamRun | None:
- return self.update_team_run_status(
- team_run_id=payload.teamRunId,
- payload=TeamRunStatusUpdateRequest(
- status=payload.status,
- worker_key=payload.workerKey,
- output_text=payload.outputText,
- output_json=payload.outputJson,
- error_code=payload.errorCode,
- error_message=payload.errorMessage))
- def execute_team_run(
- self,
- *,
- team_run_id: str,
- payload: TeamRunExecuteRequest) -> TeamRun | None:
- team_run = self.team_run_repository.get_by_id(
- team_run_id=team_run_id)
- if team_run is None:
- return None
- team_config = self.team_config_repository.get_by_id(
- team_config_id=team_run.team_config_id)
- if team_config is None:
- failed_run = self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="team_config_missing",
- error_message=f"team config not found: {team_run.team_config_id}")
- if failed_run is not None:
- self._publish_event(
- event_type="team.run.failed",
- team_run=failed_run,
- payload_json={
- "team_run_id": failed_run.id,
- "status": failed_run.status,
- "error_code": "team_config_missing",
- })
- return failed_run
- running_run = self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="running",
- worker_key=payload.worker_key)
- if running_run is None:
- return None
- members = self._read_team_members(team_config)
- if not members:
- return self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="team_members_missing",
- error_message="team config has no valid members")
- try:
- member_results = self._execute_members(
- team_run=team_run,
- team_config=team_config,
- members=members,
- worker_key=payload.worker_key,
- dry_run=payload.dry_run)
- except AgentServiceClientError as exc:
- return self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="agent_service_error",
- error_message=str(exc))
- failed_results = [item for item in member_results if item.run.status != "completed"]
- output_text = self._build_team_output_text(
- team_config=team_config,
- member_results=member_results)
- output_json: dict[str, JSONValue] = {
- "dry_run": payload.dry_run,
- "coordination_mode": team_config.coordination_mode,
- "team_config_id": team_config.id,
- "member_run_count": len(member_results),
- "member_results": [
- self._member_result_to_json(item) for item in member_results
- ],
- }
- if failed_results:
- failed_run = self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="failed",
- worker_key=payload.worker_key,
- output_text=output_text,
- output_json=output_json,
- error_code="member_run_failed",
- error_message=f"{len(failed_results)} member run(s) failed")
- if failed_run is not None:
- self._publish_event(
- event_type="team.run.failed",
- team_run=failed_run,
- payload_json={
- "team_run_id": failed_run.id,
- "status": failed_run.status,
- "failed_member_count": len(failed_results),
- })
- return failed_run
- completed_run = self.team_run_repository.update_status(
- team_run_id=team_run.id,
- status="completed",
- worker_key=payload.worker_key,
- output_text=output_text,
- output_json=output_json)
- if completed_run is not None:
- self._publish_event(
- event_type="team.run.completed",
- team_run=completed_run,
- payload_json={
- "team_run_id": completed_run.id,
- "status": completed_run.status,
- "member_run_count": len(member_results),
- })
- return completed_run
- def execute_next_claimed_team_run(
- self,
- *,
- worker_key: str,
- lease_seconds: int,
- stale_running_seconds: int,
- dry_run: bool,
- redis_client: object | None = None) -> tuple[TeamRun, int] | None:
- released_lease_count = self.team_run_repository.release_expired_leases(
- now_time=datetime.utcnow(),
- stale_running_seconds=stale_running_seconds)
- claimed_team_run = self.team_run_repository.claim_next_queued(
- worker_key=worker_key,
- lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds))
- if claimed_team_run is None:
- return None
- if redis_client is not None:
- from core_shared.redis_primitives import DistributedLock, IdempotencyStore
- lock = DistributedLock(
- client=redis_client,
- name=f"team-run:{claimed_team_run.id}:lock",
- ttl_seconds=lease_seconds)
- if not lock.acquire():
- return None
- idempotency_store = IdempotencyStore(
- client=redis_client,
- prefix="team-run-idempotency")
- if not idempotency_store.begin(key=claimed_team_run.id):
- lock.release()
- return None
- else:
- lock = None
- idempotency_store = None
- try:
- result = self.execute_team_run(
- team_run_id=claimed_team_run.id,
- payload=TeamRunExecuteRequest(
- worker_key=worker_key,
- dry_run=dry_run))
- if idempotency_store is not None and result is not None:
- idempotency_store.complete(
- key=claimed_team_run.id,
- result={"status": result.status, "team_run_id": result.id})
- finally:
- if lock is not None:
- lock.release()
- if result is None:
- return None
- return result, released_lease_count
- def _resolve_team_config(
- self,
- *,
- team_id: str,
- team_config_id: str | None) -> TeamConfig | None:
- if team_config_id is not None:
- return self.team_config_repository.get_by_id(
- team_config_id=team_config_id)
- return self.team_config_repository.get_latest_by_team(
- team_id=team_id)
- def _execute_members(
- self,
- *,
- team_run: TeamRun,
- team_config: TeamConfig,
- members: list[TeamMemberContract],
- worker_key: str | None,
- dry_run: bool) -> list[TeamMemberRunResult]:
- if self.agent_client is None:
- raise AgentServiceClientError("agent service client is not configured")
- ordered_members = self._order_members(members)
- if self._should_execute_members_in_parallel(team_config):
- return self._execute_members_in_parallel(
- team_run=team_run,
- team_config=team_config,
- members=ordered_members,
- worker_key=worker_key,
- dry_run=dry_run)
- member_results: list[TeamMemberRunResult] = []
- prior_outputs: list[dict[str, JSONValue]] = []
- for member in ordered_members:
- member_input_json = self._build_member_input_json(
- team_run=team_run,
- team_config=team_config,
- member=member,
- prior_outputs=prior_outputs)
- result = self._execute_single_member(
- team_run=team_run,
- team_config=team_config,
- member=member,
- member_input_json=member_input_json,
- worker_key=worker_key,
- dry_run=dry_run)
- member_results.append(result)
- prior_outputs.append(self._compact_prior_output(result))
- return member_results
- def _execute_members_in_parallel(
- self,
- *,
- team_run: TeamRun,
- team_config: TeamConfig,
- members: list[TeamMemberContract],
- worker_key: str | None,
- dry_run: bool) -> list[TeamMemberRunResult]:
- results_by_key: dict[str, TeamMemberRunResult] = {}
- with ThreadPoolExecutor(max_workers=max(1, min(len(members), 8))) as executor:
- future_by_member = {
- executor.submit(
- self._execute_single_member,
- team_run=team_run,
- team_config=team_config,
- member=member,
- member_input_json=self._build_member_input_json(
- team_run=team_run,
- team_config=team_config,
- member=member,
- prior_outputs=[]),
- worker_key=worker_key,
- dry_run=dry_run): member
- for member in members
- }
- for future in as_completed(future_by_member):
- member = future_by_member[future]
- results_by_key[member.member_key] = future.result()
- return [results_by_key[member.member_key] for member in members]
- def _execute_single_member(
- self,
- *,
- team_run: TeamRun,
- team_config: TeamConfig,
- member: TeamMemberContract,
- member_input_json: dict[str, JSONValue],
- worker_key: str | None,
- dry_run: bool) -> TeamMemberRunResult:
- if self.agent_client is None:
- raise AgentServiceClientError("agent service client is not configured")
- created_run = self.agent_client.create_agent_run(
- agent_id=member.agent_id,
- agent_config_id=member.agent_config_id,
- session_id=team_run.session_id,
- input_text=self._build_member_input_text(
- team_run=team_run,
- team_config=team_config,
- member=member),
- input_json=member_input_json)
- executed_run = self.agent_client.execute_agent_run(
- agent_run_id=created_run.id,
- worker_key=worker_key,
- dry_run=dry_run)
- return TeamMemberRunResult(member=member, run=executed_run)
- def _should_execute_members_in_parallel(self, team_config: TeamConfig) -> bool:
- handoff = team_config.policy_json.get("handoff")
- return team_config.coordination_mode == "parallel" or handoff == "parallel_merge"
- def _read_team_members(self, team_config: TeamConfig) -> list[TeamMemberContract]:
- members: list[TeamMemberContract] = []
- for item in team_config.member_refs_json:
- try:
- members.append(TeamMemberContract.model_validate(item))
- except ValueError:
- continue
- return members
- def _order_members(self, members: list[TeamMemberContract]) -> list[TeamMemberContract]:
- role_priority = {
- "planner": 0,
- "supervisor": 1,
- "specialist": 2,
- "executor": 3,
- "reviewer": 4,
- }
- return sorted(members, key=lambda item: role_priority.get(item.role, 10))
- def _build_member_input_text(
- self,
- *,
- team_run: TeamRun,
- team_config: TeamConfig,
- member: TeamMemberContract) -> str:
- lines = [
- f"Team objective: {team_config.objective or 'No objective provided.'}",
- f"Member role: {member.role}",
- ]
- if member.responsibility:
- lines.append(f"Responsibility: {member.responsibility}")
- if team_run.input_text:
- lines.append(f"User task: {team_run.input_text}")
- return "\n".join(lines)
- def _build_member_input_json(
- self,
- *,
- team_run: TeamRun,
- team_config: TeamConfig,
- member: TeamMemberContract,
- prior_outputs: list[dict[str, JSONValue]]) -> dict[str, JSONValue]:
- input_json: dict[str, JSONValue] = dict(team_run.input_json or {})
- input_json.update(
- {
- "team_id": team_run.team_id,
- "team_run_id": team_run.id,
- "team_config_id": team_config.id,
- "team_objective": team_config.objective,
- "member_key": member.member_key,
- "member_role": member.role,
- "member_responsibility": member.responsibility,
- "prior_member_outputs": prior_outputs,
- }
- )
- configured_input = member.config_json.get("input_json")
- if isinstance(configured_input, dict):
- input_json.update(
- {str(item_key): item_value for item_key, item_value in configured_input.items()}
- )
- return input_json
- def _build_team_output_text(
- self,
- *,
- team_config: TeamConfig,
- member_results: list[TeamMemberRunResult]) -> str:
- lines = [
- f"Team objective: {team_config.objective or 'No objective provided.'}",
- f"Coordination mode: {team_config.coordination_mode}",
- "Team conversation:",
- ]
- for index, item in enumerate(member_results, start=1):
- output_text = item.run.output_text or item.run.error_message or ""
- speaker = item.member.name or item.member.member_key
- lines.append(
- f"{index}. {speaker} ({item.member.role}) "
- f"status={item.run.status}: {output_text}")
- return "\n".join(lines)
- def _member_result_to_json(self, result: TeamMemberRunResult) -> dict[str, JSONValue]:
- return {
- "member_key": result.member.member_key,
- "member_role": result.member.role,
- "member_name": result.member.name,
- "member_responsibility": result.member.responsibility,
- "agent_run_id": result.run.id,
- "agent_id": result.run.agent_id,
- "agent_config_id": result.run.agent_config_id,
- "status": result.run.status,
- "output_text": result.run.output_text,
- "output_json": self._compact_agent_output_json(result.run.output_json or {}),
- "error_code": result.run.error_code,
- "error_message": result.run.error_message,
- }
- def _compact_prior_output(self, result: TeamMemberRunResult) -> dict[str, JSONValue]:
- return {
- "member_key": result.member.member_key,
- "member_role": result.member.role,
- "member_name": result.member.name,
- "agent_id": result.run.agent_id,
- "agent_run_id": result.run.id,
- "status": result.run.status,
- "output_text": self._truncate_text(result.run.output_text),
- "error_code": result.run.error_code,
- "error_message": result.run.error_message,
- "output_json": self._compact_agent_output_json(result.run.output_json or {}),
- }
- def _compact_agent_output_json(
- self,
- output_json: dict[str, JSONValue]) -> dict[str, JSONValue]:
- keep_keys = {
- "dry_run",
- "model",
- "finish_reason",
- "usage_json",
- "tool_invocations",
- "skill_invocations",
- "selected_tool_refs",
- "selected_skill_refs",
- "memory_read_enabled",
- "memory_read_count",
- "memory_read_reason",
- }
- compacted = {
- key: value for key, value in output_json.items()
- if key in keep_keys
- }
- if "raw_response_json" in output_json or "messages" in output_json:
- compacted["debug_payload_omitted"] = True
- return compacted
- def _truncate_text(self, value: str | None) -> str | None:
- if value is None or len(value) <= MAX_PRIOR_OUTPUT_TEXT_CHARS:
- return value
- return f"{value[:MAX_PRIOR_OUTPUT_TEXT_CHARS]}... [truncated]"
- def _publish_event(
- self,
- *,
- event_type: str,
- team_run: TeamRun,
- payload_json: dict[str, JSONValue]) -> None:
- if self.event_client is None:
- return
- try:
- self.event_client.publish_event(
- EventPublishContract(
- event_type=event_type,
- source_service="team-service",
- aggregate_type="team_run",
- aggregate_id=team_run.id,
- correlation_id=team_run.session_id,
- payload_json={
- **payload_json,
- "team_id": team_run.team_id,
- "team_config_id": team_run.team_config_id,
- })
- )
- except EventServiceClientError:
- return
- def _build_team_code(self, name: str) -> str:
- base = "".join(
- char.lower() if char.isalnum() else "_"
- for char in name
- ).strip("_") or "team"
- return base[:64]
- def _normalize_member_refs(self, member_refs: list[dict[str, JSONValue]]) -> list[TeamMemberContract]:
- members: list[TeamMemberContract] = []
- for index, item in enumerate(member_refs, start=1):
- role = item.get("role")
- normalized_role = "executor" if role == "worker" else role
- member = {
- **item,
- "member_key": item.get("member_key") or item.get("memberKey") or f"member_{index}",
- "agent_id": item.get("agent_id") or item.get("agentId"),
- "agent_config_id": item.get("agent_config_id") or item.get("agentConfigId"),
- "role": normalized_role or "specialist",
- "config_json": item.get("config_json") or item.get("configJson") or {},
- }
- members.append(TeamMemberContract.model_validate(member))
- return members
- def build_team_application_service(
- *,
- team_repository: TeamDefinitionRepository,
- team_config_repository: TeamConfigRepository,
- team_run_repository: TeamRunRepository,
- settings: TeamServiceSettings) -> TeamApplicationService:
- redis_client = try_build_redis_client(settings.redis_url)
- return TeamApplicationService(
- team_repository=team_repository,
- team_config_repository=team_config_repository,
- team_run_repository=team_run_repository,
- agent_client=AgentServiceClient(
- base_url=settings.agent_service_url,
- timeout_seconds=settings.agent_service_timeout_seconds),
- event_client=EventServiceClient(
- base_url=settings.event_service_url,
- timeout_seconds=settings.event_service_timeout_seconds),
- task_queue_publisher=(
- TaskQueuePublisher(client=redis_client) if redis_client is not None else None
- ))
|