import json from collections.abc import Iterator import httpx from core_domain import AgentRunContract from core_shared import JSONValue class AgentServiceClientError(Exception): pass class AgentServiceClient: def __init__(self, base_url: str, timeout_seconds: float = 30.0) -> None: self.base_url = base_url.rstrip("/") self.timeout_seconds = timeout_seconds def create_agent_run( self, *, agent_id: str, agent_config_id: str | None, session_id: str | None, input_text: str | None, input_json: dict[str, JSONValue] | None) -> AgentRunContract: payload: dict[str, JSONValue] = { "agent_id": agent_id, } if agent_config_id is not None: payload["agent_config_id"] = agent_config_id if session_id is not None: payload["session_id"] = session_id if input_text is not None: payload["input_text"] = input_text if input_json is not None: payload["input_json"] = input_json try: with httpx.Client(timeout=self.timeout_seconds) as client: response = client.post(f"{self.base_url}/agents/runs", json=payload) response.raise_for_status() return AgentRunContract.model_validate(response.json()) except httpx.HTTPStatusError as exc: detail = exc.response.text[:500] raise AgentServiceClientError( f"agent-service create run failed: {exc.response.status_code} {detail}") from exc except httpx.HTTPError as exc: raise AgentServiceClientError(f"agent-service create run failed: {exc}") from exc def get_agent_run( self, *, agent_run_id: str) -> AgentRunContract: try: with httpx.Client(timeout=self.timeout_seconds) as client: response = client.post( f"{self.base_url}/agents/runs/detail", json={"agent_run_id": agent_run_id}) response.raise_for_status() return AgentRunContract.model_validate(response.json()) except httpx.HTTPStatusError as exc: detail = exc.response.text[:500] raise AgentServiceClientError( f"agent-service get run failed: {exc.response.status_code} {detail}") from exc except httpx.HTTPError as exc: raise AgentServiceClientError(f"agent-service get run failed: {exc}") from exc def execute_agent_run( self, *, agent_run_id: str, worker_key: str | None, dry_run: bool) -> AgentRunContract: payload: dict[str, JSONValue] = { "dry_run": dry_run, } if worker_key is not None: payload["worker_key"] = worker_key try: with httpx.Client(timeout=self.timeout_seconds) as client: response = client.post( f"{self.base_url}/agents/runs/{agent_run_id}/execute", json=payload) response.raise_for_status() response_payload = response.json() run_payload = response_payload.get("run") if not isinstance(run_payload, dict): raise AgentServiceClientError("agent-service execute response missing run") return AgentRunContract.model_validate(run_payload) except httpx.TimeoutException as exc: try: recovered_run = self.get_agent_run(agent_run_id=agent_run_id) except AgentServiceClientError as recovery_exc: raise AgentServiceClientError( f"agent-service execute run timed out and recovery failed: {recovery_exc}") from exc if recovered_run.status in {"completed", "failed", "cancelled"}: return recovered_run raise AgentServiceClientError( f"agent-service execute run timed out; recovered status={recovered_run.status}") from exc except httpx.HTTPStatusError as exc: detail = exc.response.text[:500] raise AgentServiceClientError( f"agent-service execute run failed: {exc.response.status_code} {detail}") from exc except httpx.HTTPError as exc: raise AgentServiceClientError(f"agent-service execute run failed: {exc}") from exc def execute_agent_run_stream( self, *, agent_run_id: str, worker_key: str | None, dry_run: bool) -> Iterator[tuple[str, dict[str, object]]]: payload: dict[str, JSONValue] = { "dry_run": dry_run, } if worker_key is not None: payload["worker_key"] = worker_key try: with httpx.Client(timeout=self.timeout_seconds) as client: with client.stream( "POST", f"{self.base_url}/agents/runs/{agent_run_id}/execute-stream", json=payload) as response: response.raise_for_status() yield from _iter_sse_events(response) except httpx.HTTPStatusError as exc: detail = exc.response.text[:500] raise AgentServiceClientError( f"agent-service execute stream failed: {exc.response.status_code} {detail}") from exc except httpx.HTTPError as exc: raise AgentServiceClientError(f"agent-service execute stream failed: {exc}") from exc def _iter_sse_events(response: httpx.Response) -> Iterator[tuple[str, dict[str, object]]]: event_name = "message" data_lines: list[str] = [] for line in response.iter_lines(): if line == "": if data_lines: yield event_name, _parse_json("\n".join(data_lines)) event_name = "message" data_lines = [] continue if line.startswith("event:"): event_name = line.removeprefix("event:").strip() elif line.startswith("data:"): data_lines.append(line.removeprefix("data:").strip()) if data_lines: yield event_name, _parse_json("\n".join(data_lines)) def _parse_json(value: str) -> dict[str, object]: try: payload = json.loads(value) except json.JSONDecodeError: return {} return payload if isinstance(payload, dict) else {}