from datetime import datetime, timedelta from pathlib import Path from tests.conftest import ( build_fastapi_test_client, build_postgres_database_url, build_postgres_engine, prepare_known_service_import, ) def test_agent_service_post_contract_aliases( tmp_path: Path, monkeypatch, ) -> None: prepare_known_service_import("agent-service") from app.bootstrap.app import create_app from app.db.models import Base from core_db import create_session_factory database_url = build_postgres_database_url(tmp_path, "agent-contracts") monkeypatch.setenv("AGENT_PLATFORM_DATABASE_URL", database_url) monkeypatch.setenv("AGENT_PLATFORM_REDIS_URL", "") engine = build_postgres_engine(database_url) Base.metadata.create_all(engine) app = create_app() app.state.session_factory = create_session_factory(engine) client = build_fastapi_test_client(app) create_response = client.post( "/agents", json={"name": "Contract Agent", "agent_type": "assistant"}, ) assert create_response.status_code == 200 agent = create_response.json() list_response = client.post("/agents/list", json={}) assert list_response.status_code == 200 assert any(item["id"] == agent["id"] for item in list_response.json()) detail_response = client.post("/agents/detail", json={"agent_id": agent["id"]}) assert detail_response.status_code == 200 assert detail_response.json()["name"] == "Contract Agent" config_response = client.post( "/agents/configs/create", json={ "agent_id": agent["id"], "role": "assistant", "system_prompt": "You are a contract test agent.", "model_config": {"model": "test-model"}, "memory_policy": {"enabled": False}, "tool_refs": [], "skill_refs": [], }, ) assert config_response.status_code == 200 config = config_response.json() run_response = client.post( "/agents/runs", json={ "agent_id": agent["id"], "agent_config_id": config["id"], "input_text": "hello", }, ) assert run_response.status_code == 200 run = run_response.json() runs_response = client.post("/agents/runs/list", json={"agent_id": agent["id"]}) assert runs_response.status_code == 200 assert runs_response.json()[0]["id"] == run["id"] execute_response = client.post( "/agents/runs/execute", json={"agent_run_id": run["id"], "worker_key": "contract-worker", "dry_run": True}, ) assert execute_response.status_code == 200 assert execute_response.json()["dry_run"] is True assert execute_response.json()["run"]["status"] == "completed" delete_response = client.post("/agents/delete", json={"agent_id": agent["id"]}) assert delete_response.status_code == 200 assert delete_response.json() == {"deleted": True, "agent_id": agent["id"], "agent_run_id": None} def test_session_service_post_contract_aliases( tmp_path: Path, monkeypatch, ) -> None: prepare_known_service_import("session-service") from app.bootstrap.app import create_app from app.db.models import Base from core_db import create_session_factory database_url = build_postgres_database_url(tmp_path, "session-contracts") monkeypatch.setenv("AGENT_PLATFORM_DATABASE_URL", database_url) engine = build_postgres_engine(database_url) Base.metadata.create_all(engine) app = create_app() app.state.session_factory = create_session_factory(engine) client = build_fastapi_test_client(app) create_response = client.post( "/sessions", json={"app_id": "app_contract", "user_id": "user_contract", "title": "Contract Chat"}, ) assert create_response.status_code == 200 session = create_response.json() list_response = client.post("/sessions/list", json={"app_id": "app_contract"}) assert list_response.status_code == 200 assert list_response.json()[0]["id"] == session["id"] message_response = client.post( "/sessions/messages", json={"session_id": session["id"], "role": "user", "content_text": "hello"}, ) assert message_response.status_code == 200 messages_response = client.post("/sessions/messages/list", json={"session_id": session["id"]}) assert messages_response.status_code == 200 assert messages_response.json()[0]["content_text"] == "hello" request_response = client.post( "/sessions/run-requests", json={ "session_id": session["id"], "app_config_id": "appc_contract", "workflow_config_id": "wfc_contract", }, ) assert request_response.status_code == 200 requests_response = client.post( "/sessions/run-requests/list", json={"session_id": session["id"]}, ) assert requests_response.status_code == 200 assert requests_response.json()[0]["session_id"] == session["id"] def test_gateway_api_key_post_contract_aliases( tmp_path: Path, monkeypatch, ) -> None: prepare_known_service_import("api-gateway") from app.bootstrap.app import create_app from app.db.models import Base from core_db import create_session_factory database_url = build_postgres_database_url(tmp_path, "gateway-contracts") monkeypatch.setenv("AGENT_PLATFORM_DATABASE_URL", database_url) monkeypatch.setenv("AGENT_PLATFORM_REDIS_URL", "") engine = build_postgres_engine(database_url) Base.metadata.create_all(engine) app = create_app() app.state.session_factory = create_session_factory(engine) client = build_fastapi_test_client(app) create_response = client.post("/gateway/api-keys", json={"name": "Contract Key"}) assert create_response.status_code == 200 api_key = create_response.json() assert api_key["api_key"] list_response = client.post("/gateway/api-keys/list", json={}) assert list_response.status_code == 200 assert list_response.json()[0]["id"] == api_key["id"] status_response = client.post( "/gateway/api-keys/status", json={"api_key_id": api_key["id"], "status": "revoked"}, ) assert status_response.status_code == 200 assert status_response.json()["status"] == "revoked" def test_human_event_scheduler_post_contract_aliases( tmp_path: Path, monkeypatch, ) -> None: prepare_known_service_import("human-service") from app.bootstrap.app import create_app as create_human_app from app.db.models import Base as HumanBase from core_db import create_session_factory human_database_url = build_postgres_database_url(tmp_path, "human-contracts") monkeypatch.setenv("AGENT_PLATFORM_DATABASE_URL", human_database_url) human_engine = build_postgres_engine(human_database_url) HumanBase.metadata.create_all(human_engine) human_app = create_human_app() human_app.state.session_factory = create_session_factory(human_engine) human_client = build_fastapi_test_client(human_app) task_response = human_client.post( "/human/tasks", json={ "task_type": "approval", "title": "Approve deployment", "assigned_to": "operator", "run_id": "run_contract", }, ) assert task_response.status_code == 200 task = task_response.json() assert human_client.post("/human/tasks/list", json={"assigned_to": "operator"}).json()[0]["id"] == task["id"] assert human_client.post("/human/tasks/detail", json={"human_task_id": task["id"]}).json()["id"] == task["id"] claim_response = human_client.post( "/human/tasks/claim", json={"human_task_id": task["id"], "claimed_by": "operator"}, ) assert claim_response.status_code == 200 assert claim_response.json()["status"] == "claimed" complete_response = human_client.post( "/human/tasks/complete", json={ "human_task_id": task["id"], "status": "completed", "response_payload_json": {"approved": True}, }, ) assert complete_response.status_code == 200 assert complete_response.json()["response_payload_json"] == {"approved": True} prepare_known_service_import("event-service") from app.bootstrap.app import create_app as create_event_app from app.db.models import Base as EventBase event_database_url = build_postgres_database_url(tmp_path, "event-contracts") monkeypatch.setenv("AGENT_PLATFORM_DATABASE_URL", event_database_url) event_engine = build_postgres_engine(event_database_url) EventBase.metadata.create_all(event_engine) event_app = create_event_app() event_app.state.session_factory = create_session_factory(event_engine) event_client = build_fastapi_test_client(event_app) publish_response = event_client.post( "/events", json={ "event_type": "contract.created", "source_service": "test", "aggregate_type": "contract", "aggregate_id": "contract_1", }, ) assert publish_response.status_code == 200 event = publish_response.json() assert event_client.post("/events/list", json={"event_type": "contract.created"}).json()[0]["id"] == event["id"] delivery_response = event_client.post( "/events/delivery-status", json={"event_record_id": event["id"], "status": "published"}, ) assert delivery_response.status_code == 200 assert delivery_response.json()["status"] == "published" assert event_client.post("/events/stats", json={}).status_code == 200 prepare_known_service_import("scheduler-service") from app.bootstrap.app import create_app as create_scheduler_app from app.db.models import Base as SchedulerBase scheduler_database_url = build_postgres_database_url(tmp_path, "scheduler-contracts") monkeypatch.setenv("AGENT_PLATFORM_DATABASE_URL", scheduler_database_url) monkeypatch.setenv("AGENT_PLATFORM_REDIS_URL", "") scheduler_engine = build_postgres_engine(scheduler_database_url) SchedulerBase.metadata.create_all(scheduler_engine) scheduler_app = create_scheduler_app() scheduler_app.state.session_factory = create_session_factory(scheduler_engine) scheduler_client = build_fastapi_test_client(scheduler_app) job_response = scheduler_client.post( "/scheduler/jobs", json={ "job_type": "agent", "name": "Run agent", "schedule_time": (datetime.utcnow() + timedelta(minutes=5)).isoformat(), }, ) assert job_response.status_code == 200 job = job_response.json() assert scheduler_client.post("/scheduler/jobs/list", json={"job_type": "agent"}).json()[0]["id"] == job["id"] status_response = scheduler_client.post( "/scheduler/jobs/status", json={"job_id": job["id"], "status": "cancelled"}, ) assert status_response.status_code == 200 assert status_response.json()["status"] == "cancelled"