| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- import httpx
- from core_domain import SkillDefinitionContract, SkillRunContract
- from core_shared import JSONValue
- class SkillServiceClientError(Exception):
- pass
- class SkillServiceClient:
- def __init__(self, base_url: str, timeout_seconds: float = 10.0) -> None:
- self.base_url = base_url.rstrip("/")
- self.timeout_seconds = timeout_seconds
- def list_skills(self, *, tenant_id: str) -> list[SkillDefinitionContract]:
- try:
- with httpx.Client(timeout=self.timeout_seconds) as client:
- response = client.get(
- f"{self.base_url}/skills",
- params={"tenant_id": tenant_id},
- )
- response.raise_for_status()
- return [
- SkillDefinitionContract.model_validate(item)
- for item in response.json()
- ]
- except httpx.HTTPError as exc:
- raise SkillServiceClientError(f"skill-service list failed: {exc}") from exc
- def create_skill_run(
- self,
- *,
- tenant_id: str,
- skill_id: str,
- skill_version_id: str | None,
- installation_id: str | None,
- input_json: dict[str, JSONValue],
- ) -> SkillRunContract:
- payload: dict[str, JSONValue] = {
- "tenant_id": tenant_id,
- "skill_id": skill_id,
- "input_json": input_json,
- }
- if skill_version_id is not None:
- payload["skill_version_id"] = skill_version_id
- if installation_id is not None:
- payload["installation_id"] = installation_id
- try:
- with httpx.Client(timeout=self.timeout_seconds) as client:
- response = client.post(f"{self.base_url}/skills/runs", json=payload)
- response.raise_for_status()
- return SkillRunContract.model_validate(response.json())
- except httpx.HTTPError as exc:
- raise SkillServiceClientError(f"skill-service create run failed: {exc}") from exc
- def execute_skill_run(
- self,
- *,
- tenant_id: str,
- skill_run_id: str,
- worker_key: str | None,
- ) -> SkillRunContract:
- payload: dict[str, JSONValue] = {"tenant_id": tenant_id}
- if worker_key is not None:
- payload["worker_key"] = worker_key
- try:
- with httpx.Client(timeout=self.timeout_seconds) as client:
- response = client.post(
- f"{self.base_url}/skills/runs/{skill_run_id}/execute",
- json=payload,
- )
- response.raise_for_status()
- return SkillRunContract.model_validate(response.json())
- except httpx.HTTPError as exc:
- raise SkillServiceClientError(f"skill-service execute run failed: {exc}") from exc
|