import httpx from core_domain import SkillDefinitionContract, SkillRunContract from core_shared import JSONValue class SkillServiceClientError(Exception): pass class SkillServiceClient: def __init__(self, base_url: str, timeout_seconds: float = 10.0) -> None: self.base_url = base_url.rstrip("/") self.timeout_seconds = timeout_seconds def list_skills(self) -> list[SkillDefinitionContract]: try: with httpx.Client(timeout=self.timeout_seconds) as client: response = client.get(f"{self.base_url}/skills") response.raise_for_status() return [ SkillDefinitionContract.model_validate(item) for item in response.json() ] except httpx.HTTPError as exc: raise SkillServiceClientError(f"skill-service list failed: {exc}") from exc def create_skill_run( self, *, skill_id: str, skill_version_id: str | None, installation_id: str | None, input_json: dict[str, JSONValue]) -> SkillRunContract: payload: dict[str, JSONValue] = { "skill_id": skill_id, "input_json": input_json, } if skill_version_id is not None: payload["skill_version_id"] = skill_version_id if installation_id is not None: payload["installation_id"] = installation_id try: with httpx.Client(timeout=self.timeout_seconds) as client: response = client.post(f"{self.base_url}/skills/runs", json=payload) response.raise_for_status() return SkillRunContract.model_validate(response.json()) except httpx.HTTPError as exc: raise SkillServiceClientError(f"skill-service create run failed: {exc}") from exc def execute_skill_run( self, *, skill_run_id: str, worker_key: str | None) -> SkillRunContract: payload: dict[str, JSONValue] = {} if worker_key is not None: payload["worker_key"] = worker_key try: with httpx.Client(timeout=self.timeout_seconds) as client: response = client.post( f"{self.base_url}/skills/runs/{skill_run_id}/execute", json=payload) response.raise_for_status() return SkillRunContract.model_validate(response.json()) except httpx.HTTPError as exc: raise SkillServiceClientError(f"skill-service execute run failed: {exc}") from exc