| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- from pathlib import Path
- from datetime import datetime
- from unittest.mock import MagicMock
- from tests.conftest import (
- build_fastapi_test_client,
- build_postgres_database_url,
- build_postgres_engine,
- prepare_known_service_import,
- )
- def test_team_service_post_contract_supports_team_configs_and_runs(
- tmp_path: Path,
- monkeypatch,
- ) -> None:
- prepare_known_service_import("team-service")
- from app.bootstrap.app import create_app
- from app.db.models import Base
- from core_db import create_session_factory
- database_url = build_postgres_database_url(tmp_path, "teams-api")
- monkeypatch.setenv("AGENT_PLATFORM_DATABASE_URL", database_url)
- monkeypatch.setenv("AGENT_PLATFORM_REDIS_URL", "")
- monkeypatch.setenv("AGENT_PLATFORM_AUTO_WORKER_ENABLED", "false")
- engine = build_postgres_engine(database_url)
- Base.metadata.create_all(engine)
- app = create_app()
- app.state.session_factory = create_session_factory(engine)
- client = build_fastapi_test_client(app)
- team_response = client.post(
- "/teams/create",
- json={
- "name": "Support Team",
- "description": "Handles support escalations",
- "teamType": "collaborative",
- "ownerUserId": "demo-user",
- },
- )
- assert team_response.status_code == 200
- team_payload = team_response.json()["data"]
- assert team_payload["name"] == "Support Team"
- assert team_payload["teamType"] == "collaborative"
- assert "code" not in team_payload
- config_response = client.post(
- "/teams/configs/create",
- json={
- "teamId": team_payload["id"],
- "coordinationMode": "supervisor",
- "objective": "Resolve the customer request",
- "memberRefs": [
- {
- "role": "worker",
- "agentId": "agent_support",
- "responsibility": "Draft the answer",
- }
- ],
- "policy": {
- "max_rounds": 3,
- "handoff": "supervisor",
- },
- },
- )
- assert config_response.status_code == 200
- config_payload = config_response.json()["data"]
- assert config_payload["teamId"] == team_payload["id"]
- assert config_payload["memberRefs"][0]["role"] == "specialist"
- assert config_payload["memberRefs"][0]["member_key"] == "member_1"
- list_response = client.post(
- "/teams/list",
- json={"page": 1, "pageSize": 20, "keyword": "support"},
- )
- assert list_response.status_code == 200
- assert list_response.json()["data"]["total"] == 1
- configs_response = client.post(
- "/teams/configs/list",
- json={"page": 1, "pageSize": 20, "teamId": team_payload["id"]},
- )
- assert configs_response.status_code == 200
- assert configs_response.json()["data"]["total"] == 1
- run_response = client.post(
- "/teams/runs/create",
- json={
- "teamId": team_payload["id"],
- "teamConfigId": config_payload["id"],
- "inputText": "Help the customer reset MFA",
- },
- )
- assert run_response.status_code == 200
- run_payload = run_response.json()["data"]
- assert run_payload["teamId"] == team_payload["id"]
- assert run_payload["teamConfigId"] == config_payload["id"]
- assert run_payload["status"] == "queued"
- status_response = client.post(
- "/teams/runs/status",
- json={
- "teamRunId": run_payload["id"],
- "status": "completed",
- "workerKey": "test-worker",
- "outputText": "MFA reset steps prepared.",
- },
- )
- assert status_response.status_code == 200
- assert status_response.json()["data"]["status"] == "completed"
- runs_response = client.post(
- "/teams/runs/list",
- json={"page": 1, "pageSize": 20, "teamId": team_payload["id"]},
- )
- assert runs_response.status_code == 200
- assert runs_response.json()["data"]["total"] == 1
- update_response = client.post(
- "/teams/update",
- json={
- "teamId": team_payload["id"],
- "name": "Support Team Updated",
- "status": "active",
- },
- )
- assert update_response.status_code == 200
- assert update_response.json()["data"]["name"] == "Support Team Updated"
- assert update_response.json()["data"]["status"] == "active"
- delete_run_response = client.post(
- "/teams/runs/delete",
- json={"teamRunId": run_payload["id"]},
- )
- assert delete_run_response.status_code == 200
- assert delete_run_response.json()["data"]["deleted"] is True
- delete_config_response = client.post(
- "/teams/configs/delete",
- json={"configId": config_payload["id"]},
- )
- assert delete_config_response.status_code == 200
- assert delete_config_response.json()["data"]["deleted"] is True
- delete_team_response = client.post(
- "/teams/delete",
- json={"teamId": team_payload["id"]},
- )
- assert delete_team_response.status_code == 200
- assert delete_team_response.json()["data"]["deleted"] is True
- def test_team_service_compacts_member_context_between_agent_calls() -> None:
- prepare_known_service_import("team-service")
- from app.application.services import TeamApplicationService, TeamMemberRunResult
- from core_domain import AgentRunContract, TeamMemberContract
- service = TeamApplicationService(
- team_repository=None,
- team_config_repository=None,
- team_run_repository=None)
- result = TeamMemberRunResult(
- member=TeamMemberContract(
- member_key="member_1",
- agent_id="agent_1",
- role="supervisor",
- name="Planner"),
- run=AgentRunContract(
- id="run_1",
- agent_id="agent_1",
- agent_config_id="config_1",
- output_text="x" * 2000,
- output_json={
- "model": "demo-model",
- "finish_reason": "stop",
- "messages": [{"role": "user", "content": "large prompt"}],
- "raw_response_json": {"thinking": "hidden debug payload"},
- },
- status="completed",
- created_time=datetime.utcnow()))
- prior_output = service._compact_prior_output(result)
- member_json = service._member_result_to_json(result)
- assert prior_output["output_text"].endswith("[truncated]")
- assert prior_output["output_json"] == {
- "model": "demo-model",
- "finish_reason": "stop",
- "debug_payload_omitted": True,
- }
- assert "messages" not in member_json["output_json"]
- assert "raw_response_json" not in member_json["output_json"]
- def _build_service_with_mock_agent() -> tuple:
- prepare_known_service_import("team-service")
- from app.application.services import TeamApplicationService, TeamMemberRunResult
- from core_domain import AgentRunContract, TeamMemberContract
- call_log: list[str] = []
- def make_member_result(member: TeamMemberContract, text: str) -> TeamMemberRunResult:
- return TeamMemberRunResult(
- member=member,
- run=AgentRunContract(
- id=f"run_{member.member_key}",
- agent_id=member.agent_id,
- agent_config_id=member.agent_config_id or "default_config",
- output_text=text,
- output_json={},
- status="completed",
- created_time=datetime.utcnow()))
- mock_client = MagicMock()
- mock_client.create_agent_run = MagicMock(
- side_effect=lambda **kw: AgentRunContract(
- id="run_mock", agent_id=kw.get("agent_id", "a"),
- status="created", created_time=datetime.utcnow()))
- mock_client.execute_agent_run = MagicMock(
- side_effect=lambda **kw: AgentRunContract(
- id=kw.get("agent_run_id", "run_mock"),
- agent_id="a", output_text="ok",
- output_json={}, status="completed",
- created_time=datetime.utcnow()))
- def track_execute(team_run, team_config, member, member_input_json, worker_key, dry_run):
- prior = member_input_json.get("prior_member_outputs", [])
- call_log.append(f"{member.member_key}:{member.role}:prior={len(prior)}")
- return make_member_result(member, f"output_{member.member_key}")
- service = TeamApplicationService(
- team_repository=None,
- team_config_repository=None,
- team_run_repository=None,
- agent_client=mock_client)
- return service, call_log, track_execute, make_member_result
- def _mock_team_run(**overrides):
- run = MagicMock()
- run.id = "run_test"
- run.team_id = "team_test"
- run.team_config_id = "config_test"
- run.session_id = None
- run.input_text = "test input"
- run.input_json = None
- for k, v in overrides.items():
- setattr(run, k, v)
- return run
- def _mock_team_config(**overrides):
- config = MagicMock()
- config.id = "config_test"
- config.team_id = "team_test"
- config.coordination_mode = "supervisor"
- config.objective = "test objective"
- config.policy_json = {}
- for k, v in overrides.items():
- setattr(config, k, v)
- return config
- def test_supervisor_mode_executes_lead_first_then_others() -> None:
- service, call_log, track_execute, _ = _build_service_with_mock_agent()
- from unittest.mock import patch
- from core_domain import TeamMemberContract
- members = [
- TeamMemberContract(member_key="worker_1", agent_id="a1", role="executor"),
- TeamMemberContract(member_key="lead_1", agent_id="a2", role="supervisor"),
- TeamMemberContract(member_key="worker_2", agent_id="a3", role="reviewer"),
- ]
- team_config = _mock_team_config(
- coordination_mode="supervisor",
- policy_json={"supervisor_synthesis": True})
- with patch.object(service, "_execute_single_member", side_effect=track_execute):
- results = service._execute_members(
- team_run=_mock_team_run(), team_config=team_config,
- members=members, worker_key=None, dry_run=False)
- # lead runs first, then workers, then synthesis = 4 executions
- assert len(results) == 4
- keys = [r.member.member_key for r in results]
- assert keys[0] == "lead_1" # supervisor first
- assert "worker_1" in keys[1:3]
- assert "worker_2" in keys[1:3]
- assert keys[3] == "lead_1" # synthesis pass
- def test_pipeline_mode_chains_single_prior_output() -> None:
- service, call_log, track_execute, _ = _build_service_with_mock_agent()
- from unittest.mock import patch
- from core_domain import TeamMemberContract
- members = [
- TeamMemberContract(member_key="m1", agent_id="a1", role="planner"),
- TeamMemberContract(member_key="m2", agent_id="a2", role="executor"),
- TeamMemberContract(member_key="m3", agent_id="a3", role="reviewer"),
- ]
- team_config = _mock_team_config(
- coordination_mode="pipeline",
- policy_json={})
- with patch.object(service, "_execute_single_member", side_effect=track_execute):
- results = service._execute_members(
- team_run=_mock_team_run(), team_config=team_config,
- members=members, worker_key=None, dry_run=False)
- assert len(results) == 3
- # m1: no prior, m2: 1 prior, m3: 1 prior (only previous, not all)
- assert call_log[0] == "m1:planner:prior=0"
- assert call_log[1] == "m2:executor:prior=1"
- assert call_log[2] == "m3:reviewer:prior=1"
- def test_debate_mode_executes_multiple_rounds() -> None:
- service, call_log, track_execute, _ = _build_service_with_mock_agent()
- from unittest.mock import patch
- from core_domain import TeamMemberContract
- members = [
- TeamMemberContract(member_key="m1", agent_id="a1", role="executor"),
- TeamMemberContract(member_key="m2", agent_id="a2", role="reviewer"),
- ]
- team_config = _mock_team_config(
- coordination_mode="debate",
- policy_json={"max_rounds": 3})
- with patch.object(service, "_execute_single_member", side_effect=track_execute):
- results = service._execute_members(
- team_run=_mock_team_run(), team_config=team_config,
- members=members, worker_key=None, dry_run=False)
- # 2 members x 3 rounds = 6 executions, final_results = last round
- assert len(results) == 2
- assert len(call_log) == 6
- # Round 1: prior=0 for first, prior=1 for second
- assert call_log[0] == "m1:executor:prior=0"
- assert call_log[1] == "m2:reviewer:prior=1"
- # Round 2: prior=2 (history from round 1)
- assert call_log[2] == "m1:executor:prior=2"
- assert call_log[3] == "m2:reviewer:prior=3"
- # Round 3: prior=4
- assert call_log[4] == "m1:executor:prior=4"
- def test_failure_mode_continue_allows_partial_failure() -> None:
- service, call_log, _, make_member_result = _build_service_with_mock_agent()
- from unittest.mock import patch
- from core_domain import TeamMemberContract, AgentRunContract
- members = [
- TeamMemberContract(member_key="m1", agent_id="a1", role="executor"),
- TeamMemberContract(member_key="m2", agent_id="a2", role="executor"),
- ]
- team_config = _mock_team_config(
- coordination_mode="supervisor",
- policy_json={"failure_mode": "continue_with_warning"})
- call_count = 0
- def track_with_failure(team_run, team_config, member, member_input_json, worker_key, dry_run):
- nonlocal call_count
- call_count += 1
- if member.member_key == "m1":
- from app.application.services import TeamMemberRunResult
- return TeamMemberRunResult(
- member=member,
- run=AgentRunContract(
- id="run_fail", agent_id="a1", agent_config_id="c1",
- status="failed", error_code="test_error",
- error_message="boom",
- created_time=datetime.utcnow()))
- return make_member_result(member, f"output_{member.member_key}")
- with patch.object(service, "_execute_single_member", side_effect=track_with_failure):
- results = service._execute_members(
- team_run=_mock_team_run(), team_config=team_config,
- members=members, worker_key=None, dry_run=False)
- # Both members executed despite first failing
- assert call_count == 2
- assert len(results) == 2
- def test_read_max_rounds_and_failure_mode_helpers() -> None:
- service, _, _, _ = _build_service_with_mock_agent()
- config_default = _mock_team_config(policy_json={})
- assert service._read_max_rounds(config_default) == 3
- assert service._read_failure_mode(config_default) == "stop_on_critical"
- config_custom = _mock_team_config(policy_json={
- "max_rounds": 5, "failure_mode": "continue_with_warning"})
- assert service._read_max_rounds(config_custom) == 5
- assert service._read_failure_mode(config_custom) == "continue_with_warning"
- config_clamped = _mock_team_config(policy_json={"max_rounds": 50})
- assert service._read_max_rounds(config_clamped) == 20
|