agent_client.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. import json
  2. from collections.abc import Iterator
  3. import httpx
  4. from core_domain import AgentRunContract
  5. from core_shared import JSONValue
  6. class AgentServiceClientError(Exception):
  7. pass
  8. class AgentServiceClient:
  9. def __init__(self, base_url: str, timeout_seconds: float = 30.0) -> None:
  10. self.base_url = base_url.rstrip("/")
  11. self.timeout_seconds = timeout_seconds
  12. def create_agent_run(
  13. self,
  14. *,
  15. agent_id: str,
  16. agent_config_id: str | None,
  17. session_id: str | None,
  18. input_text: str | None,
  19. input_json: dict[str, JSONValue] | None) -> AgentRunContract:
  20. payload: dict[str, JSONValue] = {
  21. "agent_id": agent_id,
  22. }
  23. if agent_config_id is not None:
  24. payload["agent_config_id"] = agent_config_id
  25. if session_id is not None:
  26. payload["session_id"] = session_id
  27. if input_text is not None:
  28. payload["input_text"] = input_text
  29. if input_json is not None:
  30. payload["input_json"] = input_json
  31. try:
  32. with httpx.Client(timeout=self.timeout_seconds) as client:
  33. response = client.post(f"{self.base_url}/agents/runs", json=payload)
  34. response.raise_for_status()
  35. return AgentRunContract.model_validate(response.json())
  36. except httpx.HTTPStatusError as exc:
  37. detail = exc.response.text[:500]
  38. raise AgentServiceClientError(
  39. f"agent-service create run failed: {exc.response.status_code} {detail}") from exc
  40. except httpx.HTTPError as exc:
  41. raise AgentServiceClientError(f"agent-service create run failed: {exc}") from exc
  42. def get_agent_run(
  43. self,
  44. *,
  45. agent_run_id: str) -> AgentRunContract:
  46. try:
  47. with httpx.Client(timeout=self.timeout_seconds) as client:
  48. response = client.post(
  49. f"{self.base_url}/agents/runs/detail",
  50. json={"agent_run_id": agent_run_id})
  51. response.raise_for_status()
  52. return AgentRunContract.model_validate(response.json())
  53. except httpx.HTTPStatusError as exc:
  54. detail = exc.response.text[:500]
  55. raise AgentServiceClientError(
  56. f"agent-service get run failed: {exc.response.status_code} {detail}") from exc
  57. except httpx.HTTPError as exc:
  58. raise AgentServiceClientError(f"agent-service get run failed: {exc}") from exc
  59. def execute_agent_run(
  60. self,
  61. *,
  62. agent_run_id: str,
  63. worker_key: str | None,
  64. dry_run: bool) -> AgentRunContract:
  65. payload: dict[str, JSONValue] = {
  66. "dry_run": dry_run,
  67. }
  68. if worker_key is not None:
  69. payload["worker_key"] = worker_key
  70. try:
  71. with httpx.Client(timeout=self.timeout_seconds) as client:
  72. response = client.post(
  73. f"{self.base_url}/agents/runs/{agent_run_id}/execute",
  74. json=payload)
  75. response.raise_for_status()
  76. response_payload = response.json()
  77. run_payload = response_payload.get("run")
  78. if not isinstance(run_payload, dict):
  79. raise AgentServiceClientError("agent-service execute response missing run")
  80. return AgentRunContract.model_validate(run_payload)
  81. except httpx.TimeoutException as exc:
  82. try:
  83. recovered_run = self.get_agent_run(agent_run_id=agent_run_id)
  84. except AgentServiceClientError as recovery_exc:
  85. raise AgentServiceClientError(
  86. f"agent-service execute run timed out and recovery failed: {recovery_exc}") from exc
  87. if recovered_run.status in {"completed", "failed", "cancelled"}:
  88. return recovered_run
  89. raise AgentServiceClientError(
  90. f"agent-service execute run timed out; recovered status={recovered_run.status}") from exc
  91. except httpx.HTTPStatusError as exc:
  92. detail = exc.response.text[:500]
  93. raise AgentServiceClientError(
  94. f"agent-service execute run failed: {exc.response.status_code} {detail}") from exc
  95. except httpx.HTTPError as exc:
  96. raise AgentServiceClientError(f"agent-service execute run failed: {exc}") from exc
  97. def execute_agent_run_stream(
  98. self,
  99. *,
  100. agent_run_id: str,
  101. worker_key: str | None,
  102. dry_run: bool) -> Iterator[tuple[str, dict[str, object]]]:
  103. payload: dict[str, JSONValue] = {
  104. "dry_run": dry_run,
  105. }
  106. if worker_key is not None:
  107. payload["worker_key"] = worker_key
  108. try:
  109. with httpx.Client(timeout=self.timeout_seconds) as client:
  110. with client.stream(
  111. "POST",
  112. f"{self.base_url}/agents/runs/{agent_run_id}/execute-stream",
  113. json=payload) as response:
  114. response.raise_for_status()
  115. yield from _iter_sse_events(response)
  116. except httpx.HTTPStatusError as exc:
  117. detail = exc.response.text[:500]
  118. raise AgentServiceClientError(
  119. f"agent-service execute stream failed: {exc.response.status_code} {detail}") from exc
  120. except httpx.HTTPError as exc:
  121. raise AgentServiceClientError(f"agent-service execute stream failed: {exc}") from exc
  122. def _iter_sse_events(response: httpx.Response) -> Iterator[tuple[str, dict[str, object]]]:
  123. event_name = "message"
  124. data_lines: list[str] = []
  125. for line in response.iter_lines():
  126. if line == "":
  127. if data_lines:
  128. yield event_name, _parse_json("\n".join(data_lines))
  129. event_name = "message"
  130. data_lines = []
  131. continue
  132. if line.startswith("event:"):
  133. event_name = line.removeprefix("event:").strip()
  134. elif line.startswith("data:"):
  135. data_lines.append(line.removeprefix("data:").strip())
  136. if data_lines:
  137. yield event_name, _parse_json("\n".join(data_lines))
  138. def _parse_json(value: str) -> dict[str, object]:
  139. try:
  140. payload = json.loads(value)
  141. except json.JSONDecodeError:
  142. return {}
  143. return payload if isinstance(payload, dict) else {}