from __future__ import annotations import json import os import sys import uuid from dataclasses import dataclass import httpx WORKFLOW_SERVICE_URL = os.getenv( "AGENT_PLATFORM_SMOKE_WORKFLOW_URL", "http://127.0.0.1:8002/workflows") RUNTIME_SERVICE_URL = os.getenv( "AGENT_PLATFORM_SMOKE_RUNTIME_URL", "http://127.0.0.1:8003/runtime") SMOKE_API_KEY = os.getenv("AGENT_PLATFORM_SMOKE_API_KEY") @dataclass(frozen=True) class SmokeScenario: score: int expected_branch_node_id: str expected_output_text: str SCENARIOS = ( SmokeScenario( score=7, expected_branch_node_id="high_path", expected_output_text="Alice passed with score 7"), SmokeScenario( score=3, expected_branch_node_id="low_path", expected_output_text="Alice did not pass; score 3")) def main() -> int: unique_suffix = uuid.uuid4().hex[:8] headers = {} if SMOKE_API_KEY: headers["x-api-key"] = SMOKE_API_KEY with httpx.Client(timeout=20.0, headers=headers) as client: app_id = create_app(client, unique_suffix) workflow_id = create_workflow(client, app_id, unique_suffix) results: list[dict[str, object]] = [] for scenario in SCENARIOS: results.append(run_scenario(client, app_id, workflow_id, unique_suffix, scenario)) results.append(run_retriever_scenario(client, app_id, workflow_id, unique_suffix)) print(json.dumps(results, ensure_ascii=False, indent=2)) return 0 def create_app(client: httpx.Client, unique_suffix: str) -> str: response = client.post( f"{WORKFLOW_SERVICE_URL}/apps", json={ "code": f"smoke-app-{unique_suffix}", "name": f"Smoke App {unique_suffix}", }) response.raise_for_status() payload = response.json() return str(payload["id"]) def create_workflow(client: httpx.Client, app_id: str, unique_suffix: str) -> str: response = client.post( WORKFLOW_SERVICE_URL, json={ "app_id": app_id, "code": f"smoke-flow-{unique_suffix}", "name": f"Smoke Flow {unique_suffix}", }) response.raise_for_status() payload = response.json() return str(payload["id"]) def run_scenario( client: httpx.Client, app_id: str, workflow_id: str, unique_suffix: str, scenario: SmokeScenario) -> dict[str, object]: workflow_version_id = create_workflow_version(client, workflow_id, unique_suffix, scenario.score) app_version_id = create_app_version(client, app_id, workflow_version_id) run_id = create_run(client, app_id, app_version_id, workflow_id, workflow_version_id) execute_run(client, run_id) node_runs = list_node_runs(client, run_id) artifacts = list_node_artifacts(client, run_id) if len(artifacts) < 3: raise AssertionError(f"expected at least 3 artifacts, got {len(artifacts)}") trace_spans = list_trace_spans(client, run_id) if len(trace_spans) < 3: raise AssertionError(f"expected at least 3 trace spans, got {len(trace_spans)}") node_map = {str(item["node_id"]): item for item in node_runs} assert scenario.expected_branch_node_id in node_map, ( f"expected branch node not found: {scenario.expected_branch_node_id}" ) expected_node = node_map[scenario.expected_branch_node_id] actual_output_text = expected_node.get("output_text") if actual_output_text != scenario.expected_output_text: raise AssertionError( f"unexpected output_text for {scenario.expected_branch_node_id}: {actual_output_text!r}" ) other_branch_node_id = "low_path" if scenario.expected_branch_node_id == "high_path" else "high_path" if other_branch_node_id in node_map: raise AssertionError(f"unexpected branch node executed: {other_branch_node_id}") return { "score": scenario.score, "executed_node_ids": [str(item["node_id"]) for item in node_runs], "branch_output_text": actual_output_text, "artifact_count": len(artifacts), "trace_span_count": len(trace_spans), } def run_retriever_scenario( client: httpx.Client, app_id: str, workflow_id: str, unique_suffix: str) -> dict[str, object]: workflow_version_id = create_retriever_workflow_version(client, workflow_id, unique_suffix) app_version_id = create_app_version(client, app_id, workflow_version_id) run_id = create_run(client, app_id, app_version_id, workflow_id, workflow_version_id) execute_run(client, run_id) node_runs = list_node_runs(client, run_id) artifacts = list_node_artifacts(client, run_id) if len(artifacts) < 3: raise AssertionError(f"expected at least 3 retriever artifacts, got {len(artifacts)}") trace_spans = list_trace_spans(client, run_id) if len(trace_spans) < 3: raise AssertionError(f"expected at least 3 retriever trace spans, got {len(trace_spans)}") node_map = {str(item["node_id"]): item for item in node_runs} answer_node = node_map.get("render_answer") if answer_node is None: raise AssertionError("retriever answer node was not executed") answer_text = answer_node.get("output_text") expected_answer_text = "Top doc: Refund Policy" if answer_text != expected_answer_text: raise AssertionError(f"unexpected retriever answer text: {answer_text!r}") retrieve_node = node_map.get("retrieve_docs") if retrieve_node is None: raise AssertionError("retriever node was not executed") retrieve_output = retrieve_node.get("output_json") if not isinstance(retrieve_output, dict): raise AssertionError("retriever output_json must be an object") return { "scenario": "retriever", "executed_node_ids": [str(item["node_id"]) for item in node_runs], "answer_text": answer_text, "artifact_count": len(artifacts), "trace_span_count": len(trace_spans), } def create_workflow_version( client: httpx.Client, workflow_id: str, unique_suffix: str, score: int) -> str: response = client.post( f"{WORKFLOW_SERVICE_URL}/versions", json={ "workflow_id": workflow_id, "status": "active", "dsl_json": build_workflow_dsl(unique_suffix, score), }) response.raise_for_status() payload = response.json() return str(payload["id"]) def create_retriever_workflow_version( client: httpx.Client, workflow_id: str, unique_suffix: str) -> str: response = client.post( f"{WORKFLOW_SERVICE_URL}/versions", json={ "workflow_id": workflow_id, "status": "active", "dsl_json": build_retriever_workflow_dsl(unique_suffix), }) response.raise_for_status() payload = response.json() return str(payload["id"]) def create_app_version(client: httpx.Client, app_id: str, workflow_version_id: str) -> str: response = client.post( f"{WORKFLOW_SERVICE_URL}/apps/versions", json={ "app_id": app_id, "workflow_version_id": workflow_version_id, "status": "active", }) response.raise_for_status() payload = response.json() return str(payload["id"]) def create_run( client: httpx.Client, app_id: str, app_version_id: str, workflow_id: str, workflow_version_id: str) -> str: response = client.post( f"{RUNTIME_SERVICE_URL}/runs", json={ "app_id": app_id, "app_version_id": app_version_id, "workflow_id": workflow_id, "workflow_version_id": workflow_version_id, }) response.raise_for_status() payload = response.json() return str(payload["run"]["id"]) def execute_run(client: httpx.Client, run_id: str) -> None: response = client.post( f"{RUNTIME_SERVICE_URL}/runs/{run_id}/execute", json={"max_steps": 8}) response.raise_for_status() def list_node_runs(client: httpx.Client, run_id: str) -> list[dict[str, object]]: response = client.get( f"{RUNTIME_SERVICE_URL}/node-runs", params={"run_id": run_id}) response.raise_for_status() payload = response.json() if not isinstance(payload, list): raise AssertionError("node-runs response must be a list") return [item for item in payload if isinstance(item, dict)] def list_node_artifacts(client: httpx.Client, run_id: str) -> list[dict[str, object]]: response = client.get( f"{RUNTIME_SERVICE_URL}/node-artifacts", params={"run_id": run_id}) response.raise_for_status() payload = response.json() if not isinstance(payload, list): raise AssertionError("node-artifacts response must be a list") return [item for item in payload if isinstance(item, dict)] def list_trace_spans(client: httpx.Client, run_id: str) -> list[dict[str, object]]: response = client.get( f"{RUNTIME_SERVICE_URL}/trace-spans", params={"run_id": run_id}) response.raise_for_status() payload = response.json() if not isinstance(payload, list): raise AssertionError("trace-spans response must be a list") return [item for item in payload if isinstance(item, dict)] def build_workflow_dsl(unique_suffix: str, score: int) -> dict[str, object]: return { "code": f"smoke-flow-{unique_suffix}-{score}", "name": f"Smoke Flow {score}", "nodes": [ { "id": "seed_state", "type": "assigner", "config": { "assignments": { "score": score, "user_name": "Alice", }, }, }, { "id": "check_score", "type": "if-else", "config": { "expression": "state.score >= 5", }, }, { "id": "high_path", "type": "template-transform", "config": { "template": "{{state.user_name}} passed with score {{state.score}}", }, }, { "id": "low_path", "type": "template-transform", "config": { "template": "{{state.user_name}} did not pass; score {{state.score}}", }, }, ], "edges": [ {"source": "seed_state", "target": "check_score"}, {"source": "check_score", "target": "high_path", "condition": "true"}, {"source": "check_score", "target": "low_path", "condition": "false"}, ], } def build_retriever_workflow_dsl(unique_suffix: str) -> dict[str, object]: return { "code": f"smoke-retriever-{unique_suffix}", "name": "Smoke Retriever Flow", "nodes": [ { "id": "seed_query", "type": "assigner", "config": { "assignments": { "query": "refund policy", }, }, }, { "id": "retrieve_docs", "type": "knowledge-retrieval", "config": { "query_template": "{{state.query}}", "top_k": 1, "documents": [ { "id": "shipping", "title": "Shipping Policy", "text": "Shipping usually takes three to five business days.", }, { "id": "refund", "title": "Refund Policy", "text": "Refund policy allows returns within seven days after delivery.", }, ], }, }, { "id": "render_answer", "type": "template-transform", "config": { "template": "Top doc: {{nodes.retrieve_docs.output.retrieved_documents.0.title}}", }, }, ], "edges": [ {"source": "seed_query", "target": "retrieve_docs"}, {"source": "retrieve_docs", "target": "render_answer"}, ], } if __name__ == "__main__": try: raise SystemExit(main()) except Exception as exc: print(f"smoke test failed: {exc}", file=sys.stderr) raise