from string import Template from core_shared import JSONValue from app.db.models import SkillDefinition, SkillInstallation, SkillRun, SkillVersion from app.domain.repositories import ( SkillDefinitionRepository, SkillInstallationRepository, SkillRunRepository, SkillVersionRepository) from app.schemas.skill import ( SkillCreateRequest, SkillInstallRequest, SkillInstallationStatusUpdateRequest, SkillRunCreateRequest, SkillRunExecuteRequest, SkillStatusUpdateRequest, SkillVersionCreateRequest) class SkillApplicationService: def __init__( self, *, skill_repository: SkillDefinitionRepository, skill_version_repository: SkillVersionRepository, installation_repository: SkillInstallationRepository, skill_run_repository: SkillRunRepository) -> None: self.skill_repository = skill_repository self.skill_version_repository = skill_version_repository self.installation_repository = installation_repository self.skill_run_repository = skill_run_repository def create_skill(self, payload: SkillCreateRequest) -> SkillDefinition: return self.skill_repository.create( code=payload.code, name=payload.name, skill_type=payload.skill_type, description=payload.description, owner_user_id=payload.owner_user_id, metadata_json=payload.metadata_json) def list_skills(self) -> list[SkillDefinition]: return self.skill_repository.list_all() def update_skill_status( self, *, skill_id: str, payload: SkillStatusUpdateRequest) -> SkillDefinition | None: return self.skill_repository.update_status( skill_id=skill_id, status=payload.status) def create_skill_version(self, payload: SkillVersionCreateRequest) -> SkillVersion: skill = self.skill_repository.get_by_id(skill_id=payload.skill_id) if skill is None: raise ValueError(f"skill not found: {payload.skill_id}") return self.skill_version_repository.create( skill_id=payload.skill_id, status=payload.status, runtime_type=payload.runtime_type, entrypoint=payload.entrypoint, parameter_schema_json=payload.parameter_schema_json, output_schema_json=payload.output_schema_json, implementation_json=payload.implementation_json) def list_skill_versions(self, *, skill_id: str) -> list[SkillVersion]: return self.skill_version_repository.list_by_skill(skill_id=skill_id) def install_skill(self, payload: SkillInstallRequest) -> SkillInstallation: version = self._resolve_skill_version( skill_id=payload.skill_id, skill_version_id=payload.skill_version_id) if version is None: raise ValueError("published skill version not found") return self.installation_repository.create( skill_id=payload.skill_id, skill_version_id=version.id, install_scope=payload.install_scope, scope_id=payload.scope_id, config_json=payload.config_json, installed_by=payload.installed_by) def list_installations( self, *, install_scope: str | None = None, scope_id: str | None = None) -> list[SkillInstallation]: return self.installation_repository.list_by_scope( install_scope=install_scope, scope_id=scope_id) def update_installation_status( self, *, installation_id: str, payload: SkillInstallationStatusUpdateRequest) -> SkillInstallation | None: return self.installation_repository.update_status( installation_id=installation_id, status=payload.status) def create_skill_run(self, payload: SkillRunCreateRequest) -> SkillRun: version = self._resolve_skill_version( skill_id=payload.skill_id, skill_version_id=payload.skill_version_id) if version is None: raise ValueError("published skill version not found") return self.skill_run_repository.create( skill_id=payload.skill_id, skill_version_id=version.id, installation_id=payload.installation_id, input_json=payload.input_json) def execute_skill_run( self, *, skill_run_id: str, payload: SkillRunExecuteRequest) -> SkillRun | None: run = self.skill_run_repository.get_by_id( skill_run_id=skill_run_id) if run is None: return None version = self.skill_version_repository.get_by_id( skill_version_id=run.skill_version_id) if version is None: return self.skill_run_repository.update_status( skill_run_id=run.id, status="failed", worker_key=payload.worker_key, error_code="skill_version_missing", error_message=f"skill version not found: {run.skill_version_id}") self.skill_run_repository.update_status( skill_run_id=run.id, status="running", worker_key=payload.worker_key) try: output_text, output_json = self._execute_version(version=version, input_json=run.input_json) except ValueError as exc: return self.skill_run_repository.update_status( skill_run_id=run.id, status="failed", worker_key=payload.worker_key, error_code="skill_execution_error", error_message=str(exc)) return self.skill_run_repository.update_status( skill_run_id=run.id, status="completed", worker_key=payload.worker_key, output_text=output_text, output_json=output_json) def _resolve_skill_version( self, *, skill_id: str, skill_version_id: str | None) -> SkillVersion | None: if skill_version_id is not None: return self.skill_version_repository.get_by_id( skill_version_id=skill_version_id) return self.skill_version_repository.get_latest_published( skill_id=skill_id) def _execute_version( self, *, version: SkillVersion, input_json: dict[str, JSONValue]) -> tuple[str | None, dict[str, JSONValue]]: if version.runtime_type != "template": raise ValueError(f"unsupported skill runtime_type: {version.runtime_type}") template_value = version.implementation_json.get("template") if not isinstance(template_value, str): raise ValueError("template skill requires implementation_json.template") substitutions = { key: str(value) for key, value in input_json.items() if isinstance(value, (str, int, float, bool)) } output_text = Template(template_value).safe_substitute(substitutions) return output_text, { "runtime_type": version.runtime_type, "entrypoint": version.entrypoint, "result": output_text, }