from string import Template from core_shared import JSONValue from app.db.models import SkillDefinition, SkillInstallation, SkillRun from app.domain.repositories import ( SkillDefinitionRepository, SkillInstallationRepository, SkillRunRepository) from app.schemas.skill import ( SkillCreateRequest, SkillCreateRequestDto, SkillDeleteRequestDto, SkillInstallRequest, SkillInstallRequestDto, SkillInstallationListRequestDto, SkillInstallationStatusRequestDto, SkillInstallationStatusUpdateRequest, SkillListRequestDto, SkillRunCreateRequest, SkillRunCreateRequestDto, SkillRunExecuteRequest, SkillRunExecuteRequestDto, SkillStatusRequestDto, SkillStatusUpdateRequest, SkillUpdateRequestDto) class SkillApplicationService: def __init__( self, *, skill_repository: SkillDefinitionRepository, installation_repository: SkillInstallationRepository, skill_run_repository: SkillRunRepository) -> None: self.skill_repository = skill_repository self.installation_repository = installation_repository self.skill_run_repository = skill_run_repository def create_skill(self, payload: SkillCreateRequest) -> SkillDefinition: return self.skill_repository.create( code=payload.code, name=payload.name, skill_type=payload.skill_type, description=payload.description, owner_user_id=payload.owner_user_id, runtime_type=payload.runtime_type, entrypoint=payload.entrypoint, parameter_schema_json=payload.parameter_schema_json, output_schema_json=payload.output_schema_json, implementation_json=payload.implementation_json, metadata_json=payload.metadata_json) def create_skill_from_contract(self, payload: SkillCreateRequestDto) -> SkillDefinition: metadata = { **payload.metadata, "category": payload.category, "instruction": payload.instruction, "toolIds": payload.toolIds, } skill = self.create_skill(SkillCreateRequest( code=self._build_skill_code(payload.name), name=payload.name, skill_type=payload.skillType, description=payload.description, runtime_type=payload.runtimeType, entrypoint=payload.entrypoint, parameter_schema_json=payload.parameterSchema, output_schema_json=payload.outputSchema, implementation_json=payload.implementation or {"template": payload.instruction or payload.description or payload.name}, owner_user_id=payload.ownerUserId, metadata_json=metadata)) return skill def list_skills(self) -> list[SkillDefinition]: return self.skill_repository.list_all() def list_skills_contract(self, payload: SkillListRequestDto) -> tuple[list[SkillDefinition], int]: return self.skill_repository.list_filtered( keyword=payload.keyword, status=payload.status, skill_type=payload.skillType, category=payload.category, offset=payload.offset, limit=payload.pageSize) def update_skill_status( self, *, skill_id: str, payload: SkillStatusUpdateRequest) -> SkillDefinition | None: return self.skill_repository.update_status( skill_id=skill_id, status=payload.status) def update_skill_status_contract(self, payload: SkillStatusRequestDto) -> SkillDefinition | None: return self.update_skill_status( skill_id=payload.skillId, payload=SkillStatusUpdateRequest(status=payload.status)) def update_skill(self, payload: SkillUpdateRequestDto) -> SkillDefinition | None: current = self.skill_repository.get_by_id(skill_id=payload.skillId) if current is None: return None metadata = dict(current.metadata_json or {}) if payload.metadata is not None: metadata.update(payload.metadata) if payload.category is not None: metadata["category"] = payload.category if payload.instruction is not None: metadata["instruction"] = payload.instruction if payload.toolIds is not None: metadata["toolIds"] = payload.toolIds implementation_json = payload.implementation if implementation_json is None and payload.instruction is not None: implementation_json = {"template": payload.instruction} return self.skill_repository.update( skill_id=payload.skillId, name=payload.name, skill_type=payload.skillType, description=payload.description, owner_user_id=payload.ownerUserId, runtime_type=payload.runtimeType, entrypoint=payload.entrypoint, parameter_schema_json=payload.parameterSchema, output_schema_json=payload.outputSchema, implementation_json=implementation_json, metadata_json=metadata) def delete_skill(self, payload: SkillDeleteRequestDto) -> SkillDefinition | None: return self.skill_repository.update_status(skill_id=payload.skillId, status="archived") def install_skill(self, payload: SkillInstallRequest) -> SkillInstallation: skill = self.skill_repository.get_by_id(skill_id=payload.skill_id) if skill is None: raise ValueError("skill not found") return self.installation_repository.create( skill_id=payload.skill_id, install_scope=payload.install_scope, scope_id=payload.scope_id, config_json=payload.config_json, installed_by=payload.installed_by) def install_skill_from_contract(self, payload: SkillInstallRequestDto) -> SkillInstallation: return self.install_skill(SkillInstallRequest( skill_id=payload.skillId, install_scope=payload.installScope, scope_id=payload.scopeId, config_json=payload.config, installed_by=payload.installedBy)) def list_installations( self, *, install_scope: str | None = None, scope_id: str | None = None) -> list[SkillInstallation]: return self.installation_repository.list_by_scope( install_scope=install_scope, scope_id=scope_id) def list_installations_contract( self, payload: SkillInstallationListRequestDto) -> tuple[list[SkillInstallation], int]: return self.installation_repository.list_filtered( install_scope=payload.installScope, scope_id=payload.scopeId, status=payload.status, offset=payload.offset, limit=payload.pageSize) def update_installation_status( self, *, installation_id: str, payload: SkillInstallationStatusUpdateRequest) -> SkillInstallation | None: return self.installation_repository.update_status( installation_id=installation_id, status=payload.status) def update_installation_status_contract( self, payload: SkillInstallationStatusRequestDto) -> SkillInstallation | None: return self.update_installation_status( installation_id=payload.installationId, payload=SkillInstallationStatusUpdateRequest(status=payload.status)) def create_skill_run(self, payload: SkillRunCreateRequest) -> SkillRun: skill = self.skill_repository.get_by_id(skill_id=payload.skill_id) if skill is None: raise ValueError("skill not found") return self.skill_run_repository.create( skill_id=payload.skill_id, installation_id=payload.installation_id, input_json=payload.input_json) def create_skill_run_from_contract(self, payload: SkillRunCreateRequestDto) -> SkillRun: return self.create_skill_run(SkillRunCreateRequest( skill_id=payload.skillId, installation_id=payload.installationId, input_json=payload.input)) def execute_skill_run_from_contract(self, payload: SkillRunExecuteRequestDto) -> SkillRun | None: return self.execute_skill_run( skill_run_id=payload.skillRunId, payload=SkillRunExecuteRequest( skillRunId=payload.skillRunId, worker_key=payload.workerKey)) def execute_skill_run( self, *, skill_run_id: str, payload: SkillRunExecuteRequest) -> SkillRun | None: run = self.skill_run_repository.get_by_id( skill_run_id=skill_run_id) if run is None: return None skill = self.skill_repository.get_by_id(skill_id=run.skill_id) if skill is None: return self.skill_run_repository.update_status( skill_run_id=run.id, status="failed", worker_key=payload.worker_key, error_code="skill_missing", error_message=f"skill not found: {run.skill_id}") self.skill_run_repository.update_status( skill_run_id=run.id, status="running", worker_key=payload.worker_key) try: output_text, output_json = self._execute_skill(skill=skill, input_json=run.input_json) except ValueError as exc: return self.skill_run_repository.update_status( skill_run_id=run.id, status="failed", worker_key=payload.worker_key, error_code="skill_execution_error", error_message=str(exc)) return self.skill_run_repository.update_status( skill_run_id=run.id, status="completed", worker_key=payload.worker_key, output_text=output_text, output_json=output_json) def _execute_skill( self, *, skill: SkillDefinition, input_json: dict[str, JSONValue]) -> tuple[str | None, dict[str, JSONValue]]: if skill.runtime_type != "template": raise ValueError(f"unsupported skill runtime_type: {skill.runtime_type}") template_value = skill.implementation_json.get("template") if not isinstance(template_value, str): raise ValueError("template skill requires implementation_json.template") substitutions = { key: str(value) for key, value in input_json.items() if isinstance(value, (str, int, float, bool)) } output_text = Template(template_value).safe_substitute(substitutions) return output_text, { "runtime_type": skill.runtime_type, "entrypoint": skill.entrypoint, "result": output_text, } def _build_skill_code(self, name: str) -> str: base = "".join( char.lower() if char.isalnum() else "_" for char in name ).strip("_") or "skill" return base[:64]