from pathlib import Path from datetime import datetime from unittest.mock import MagicMock from tests.conftest import ( build_fastapi_test_client, build_postgres_database_url, build_postgres_engine, prepare_known_service_import, ) def test_team_service_post_contract_supports_team_configs_and_runs( tmp_path: Path, monkeypatch, ) -> None: prepare_known_service_import("team-service") from app.bootstrap.app import create_app from app.db.models import Base from core_db import create_session_factory database_url = build_postgres_database_url(tmp_path, "teams-api") monkeypatch.setenv("AGENT_PLATFORM_DATABASE_URL", database_url) monkeypatch.setenv("AGENT_PLATFORM_REDIS_URL", "") monkeypatch.setenv("AGENT_PLATFORM_AUTO_WORKER_ENABLED", "false") engine = build_postgres_engine(database_url) Base.metadata.create_all(engine) app = create_app() app.state.session_factory = create_session_factory(engine) client = build_fastapi_test_client(app) team_response = client.post( "/teams/create", json={ "name": "Support Team", "description": "Handles support escalations", "teamType": "collaborative", "ownerUserId": "demo-user", }, ) assert team_response.status_code == 200 team_payload = team_response.json()["data"] assert team_payload["name"] == "Support Team" assert team_payload["teamType"] == "collaborative" assert "code" not in team_payload config_response = client.post( "/teams/configs/create", json={ "teamId": team_payload["id"], "coordinationMode": "supervisor", "objective": "Resolve the customer request", "memberRefs": [ { "role": "worker", "agentId": "agent_support", "responsibility": "Draft the answer", } ], "policy": { "max_rounds": 3, "handoff": "supervisor", }, }, ) assert config_response.status_code == 200 config_payload = config_response.json()["data"] assert config_payload["teamId"] == team_payload["id"] assert config_payload["memberRefs"][0]["role"] == "specialist" assert config_payload["memberRefs"][0]["member_key"] == "member_1" list_response = client.post( "/teams/list", json={"page": 1, "pageSize": 20, "keyword": "support"}, ) assert list_response.status_code == 200 assert list_response.json()["data"]["total"] == 1 configs_response = client.post( "/teams/configs/list", json={"page": 1, "pageSize": 20, "teamId": team_payload["id"]}, ) assert configs_response.status_code == 200 assert configs_response.json()["data"]["total"] == 1 run_response = client.post( "/teams/runs/create", json={ "teamId": team_payload["id"], "teamConfigId": config_payload["id"], "inputText": "Help the customer reset MFA", }, ) assert run_response.status_code == 200 run_payload = run_response.json()["data"] assert run_payload["teamId"] == team_payload["id"] assert run_payload["teamConfigId"] == config_payload["id"] assert run_payload["status"] == "queued" status_response = client.post( "/teams/runs/status", json={ "teamRunId": run_payload["id"], "status": "completed", "workerKey": "test-worker", "outputText": "MFA reset steps prepared.", }, ) assert status_response.status_code == 200 assert status_response.json()["data"]["status"] == "completed" runs_response = client.post( "/teams/runs/list", json={"page": 1, "pageSize": 20, "teamId": team_payload["id"]}, ) assert runs_response.status_code == 200 assert runs_response.json()["data"]["total"] == 1 update_response = client.post( "/teams/update", json={ "teamId": team_payload["id"], "name": "Support Team Updated", "status": "active", }, ) assert update_response.status_code == 200 assert update_response.json()["data"]["name"] == "Support Team Updated" assert update_response.json()["data"]["status"] == "active" delete_run_response = client.post( "/teams/runs/delete", json={"teamRunId": run_payload["id"]}, ) assert delete_run_response.status_code == 200 assert delete_run_response.json()["data"]["deleted"] is True delete_config_response = client.post( "/teams/configs/delete", json={"configId": config_payload["id"]}, ) assert delete_config_response.status_code == 200 assert delete_config_response.json()["data"]["deleted"] is True delete_team_response = client.post( "/teams/delete", json={"teamId": team_payload["id"]}, ) assert delete_team_response.status_code == 200 assert delete_team_response.json()["data"]["deleted"] is True def test_team_service_compacts_member_context_between_agent_calls() -> None: prepare_known_service_import("team-service") from app.application.services import TeamApplicationService, TeamMemberRunResult from core_domain import AgentRunContract, TeamMemberContract service = TeamApplicationService( team_repository=None, team_config_repository=None, team_run_repository=None) result = TeamMemberRunResult( member=TeamMemberContract( member_key="member_1", agent_id="agent_1", role="supervisor", name="Planner"), run=AgentRunContract( id="run_1", agent_id="agent_1", agent_config_id="config_1", output_text="x" * 2000, output_json={ "model": "demo-model", "finish_reason": "stop", "messages": [{"role": "user", "content": "large prompt"}], "raw_response_json": {"thinking": "hidden debug payload"}, }, status="completed", created_time=datetime.utcnow())) prior_output = service._compact_prior_output(result) member_json = service._member_result_to_json(result) assert prior_output["output_text"].endswith("[truncated]") assert prior_output["output_json"] == { "model": "demo-model", "finish_reason": "stop", "debug_payload_omitted": True, } assert "messages" not in member_json["output_json"] assert "raw_response_json" not in member_json["output_json"] def _build_service_with_mock_agent() -> tuple: prepare_known_service_import("team-service") from app.application.services import TeamApplicationService, TeamMemberRunResult from core_domain import AgentRunContract, TeamMemberContract call_log: list[str] = [] def make_member_result(member: TeamMemberContract, text: str) -> TeamMemberRunResult: return TeamMemberRunResult( member=member, run=AgentRunContract( id=f"run_{member.member_key}", agent_id=member.agent_id, agent_config_id=member.agent_config_id or "default_config", output_text=text, output_json={}, status="completed", created_time=datetime.utcnow())) mock_client = MagicMock() mock_client.create_agent_run = MagicMock( side_effect=lambda **kw: AgentRunContract( id="run_mock", agent_id=kw.get("agent_id", "a"), status="created", created_time=datetime.utcnow())) mock_client.execute_agent_run = MagicMock( side_effect=lambda **kw: AgentRunContract( id=kw.get("agent_run_id", "run_mock"), agent_id="a", output_text="ok", output_json={}, status="completed", created_time=datetime.utcnow())) def track_execute(team_run, team_config, member, member_input_json, worker_key, dry_run): prior = member_input_json.get("prior_member_outputs", []) call_log.append(f"{member.member_key}:{member.role}:prior={len(prior)}") return make_member_result(member, f"output_{member.member_key}") service = TeamApplicationService( team_repository=None, team_config_repository=None, team_run_repository=None, agent_client=mock_client) return service, call_log, track_execute, make_member_result def _mock_team_run(**overrides): run = MagicMock() run.id = "run_test" run.team_id = "team_test" run.team_config_id = "config_test" run.session_id = None run.input_text = "test input" run.input_json = None for k, v in overrides.items(): setattr(run, k, v) return run def _mock_team_config(**overrides): config = MagicMock() config.id = "config_test" config.team_id = "team_test" config.coordination_mode = "supervisor" config.objective = "test objective" config.policy_json = {} for k, v in overrides.items(): setattr(config, k, v) return config def test_supervisor_mode_executes_lead_first_then_others() -> None: service, call_log, track_execute, _ = _build_service_with_mock_agent() from unittest.mock import patch from core_domain import TeamMemberContract members = [ TeamMemberContract(member_key="worker_1", agent_id="a1", role="executor"), TeamMemberContract(member_key="lead_1", agent_id="a2", role="supervisor"), TeamMemberContract(member_key="worker_2", agent_id="a3", role="reviewer"), ] team_config = _mock_team_config( coordination_mode="supervisor", policy_json={"supervisor_synthesis": True}) with patch.object(service, "_execute_single_member", side_effect=track_execute): results = service._execute_members( team_run=_mock_team_run(), team_config=team_config, members=members, worker_key=None, dry_run=False) # lead runs first, then workers, then synthesis = 4 executions assert len(results) == 4 keys = [r.member.member_key for r in results] assert keys[0] == "lead_1" # supervisor first assert "worker_1" in keys[1:3] assert "worker_2" in keys[1:3] assert keys[3] == "lead_1" # synthesis pass def test_pipeline_mode_chains_single_prior_output() -> None: service, call_log, track_execute, _ = _build_service_with_mock_agent() from unittest.mock import patch from core_domain import TeamMemberContract members = [ TeamMemberContract(member_key="m1", agent_id="a1", role="planner"), TeamMemberContract(member_key="m2", agent_id="a2", role="executor"), TeamMemberContract(member_key="m3", agent_id="a3", role="reviewer"), ] team_config = _mock_team_config( coordination_mode="pipeline", policy_json={}) with patch.object(service, "_execute_single_member", side_effect=track_execute): results = service._execute_members( team_run=_mock_team_run(), team_config=team_config, members=members, worker_key=None, dry_run=False) assert len(results) == 3 # m1: no prior, m2: 1 prior, m3: 1 prior (only previous, not all) assert call_log[0] == "m1:planner:prior=0" assert call_log[1] == "m2:executor:prior=1" assert call_log[2] == "m3:reviewer:prior=1" def test_debate_mode_executes_multiple_rounds() -> None: service, call_log, track_execute, _ = _build_service_with_mock_agent() from unittest.mock import patch from core_domain import TeamMemberContract members = [ TeamMemberContract(member_key="m1", agent_id="a1", role="executor"), TeamMemberContract(member_key="m2", agent_id="a2", role="reviewer"), ] team_config = _mock_team_config( coordination_mode="debate", policy_json={"max_rounds": 3}) with patch.object(service, "_execute_single_member", side_effect=track_execute): results = service._execute_members( team_run=_mock_team_run(), team_config=team_config, members=members, worker_key=None, dry_run=False) # 2 members x 3 rounds = 6 executions, final_results = last round assert len(results) == 2 assert len(call_log) == 6 # Round 1: prior=0 for first, prior=1 for second assert call_log[0] == "m1:executor:prior=0" assert call_log[1] == "m2:reviewer:prior=1" # Round 2: prior=2 (history from round 1) assert call_log[2] == "m1:executor:prior=2" assert call_log[3] == "m2:reviewer:prior=3" # Round 3: prior=4 assert call_log[4] == "m1:executor:prior=4" def test_failure_mode_continue_allows_partial_failure() -> None: service, call_log, _, make_member_result = _build_service_with_mock_agent() from unittest.mock import patch from core_domain import TeamMemberContract, AgentRunContract members = [ TeamMemberContract(member_key="m1", agent_id="a1", role="executor"), TeamMemberContract(member_key="m2", agent_id="a2", role="executor"), ] team_config = _mock_team_config( coordination_mode="supervisor", policy_json={"failure_mode": "continue_with_warning"}) call_count = 0 def track_with_failure(team_run, team_config, member, member_input_json, worker_key, dry_run): nonlocal call_count call_count += 1 if member.member_key == "m1": from app.application.services import TeamMemberRunResult return TeamMemberRunResult( member=member, run=AgentRunContract( id="run_fail", agent_id="a1", agent_config_id="c1", status="failed", error_code="test_error", error_message="boom", created_time=datetime.utcnow())) return make_member_result(member, f"output_{member.member_key}") with patch.object(service, "_execute_single_member", side_effect=track_with_failure): results = service._execute_members( team_run=_mock_team_run(), team_config=team_config, members=members, worker_key=None, dry_run=False) # Both members executed despite first failing assert call_count == 2 assert len(results) == 2 def test_read_max_rounds_and_failure_mode_helpers() -> None: service, _, _, _ = _build_service_with_mock_agent() config_default = _mock_team_config(policy_json={}) assert service._read_max_rounds(config_default) == 3 assert service._read_failure_mode(config_default) == "stop_on_critical" config_custom = _mock_team_config(policy_json={ "max_rounds": 5, "failure_mode": "continue_with_warning"}) assert service._read_max_rounds(config_custom) == 5 assert service._read_failure_mode(config_custom) == "continue_with_warning" config_clamped = _mock_team_config(policy_json={"max_rounds": 50}) assert service._read_max_rounds(config_clamped) == 20