from datetime import datetime from sqlalchemy import func, select from sqlalchemy.orm import Session from core_domain import SkillInstallStatus, SkillRunStatus, SkillStatus, SkillVersionStatus from core_shared import JSONValue from app.db.models import SkillDefinition, SkillInstallation, SkillRun, SkillVersion class SkillDefinitionRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, tenant_id: str, code: str, name: str, skill_type: str, description: str | None, owner_user_id: str | None, metadata_json: dict[str, JSONValue] | None, ) -> SkillDefinition: entity = SkillDefinition( tenant_id=tenant_id, code=code, name=name, skill_type=skill_type, description=description, owner_user_id=owner_user_id, metadata_json=metadata_json, ) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_by_tenant(self, *, tenant_id: str) -> list[SkillDefinition]: stmt = ( select(SkillDefinition) .where(SkillDefinition.tenant_id == tenant_id) .order_by(SkillDefinition.created_time.desc()) ) return list(self.db.scalars(stmt)) def get_by_id(self, *, tenant_id: str, skill_id: str) -> SkillDefinition | None: stmt = ( select(SkillDefinition) .where(SkillDefinition.tenant_id == tenant_id) .where(SkillDefinition.id == skill_id) ) return self.db.scalar(stmt) def update_status( self, *, tenant_id: str, skill_id: str, status: SkillStatus, ) -> SkillDefinition | None: entity = self.get_by_id(tenant_id=tenant_id, skill_id=skill_id) if entity is None: return None entity.status = status self.db.commit() self.db.refresh(entity) return entity class SkillVersionRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, tenant_id: str, skill_id: str, status: SkillVersionStatus, runtime_type: str, entrypoint: str | None, parameter_schema_json: dict[str, JSONValue], output_schema_json: dict[str, JSONValue], implementation_json: dict[str, JSONValue], ) -> SkillVersion: entity = SkillVersion( tenant_id=tenant_id, skill_id=skill_id, version_no=self._next_version_no(skill_id), status=status, runtime_type=runtime_type, entrypoint=entrypoint, parameter_schema_json=parameter_schema_json, output_schema_json=output_schema_json, implementation_json=implementation_json, published_time=datetime.utcnow() if status == "published" else None, ) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_by_skill(self, *, tenant_id: str, skill_id: str) -> list[SkillVersion]: stmt = ( select(SkillVersion) .where(SkillVersion.tenant_id == tenant_id) .where(SkillVersion.skill_id == skill_id) .order_by(SkillVersion.version_no.desc()) ) return list(self.db.scalars(stmt)) def get_by_id(self, *, tenant_id: str, skill_version_id: str) -> SkillVersion | None: stmt = ( select(SkillVersion) .where(SkillVersion.tenant_id == tenant_id) .where(SkillVersion.id == skill_version_id) ) return self.db.scalar(stmt) def get_latest_published(self, *, tenant_id: str, skill_id: str) -> SkillVersion | None: stmt = ( select(SkillVersion) .where(SkillVersion.tenant_id == tenant_id) .where(SkillVersion.skill_id == skill_id) .where(SkillVersion.status == "published") .order_by(SkillVersion.version_no.desc()) .limit(1) ) return self.db.scalar(stmt) def _next_version_no(self, skill_id: str) -> int: stmt = select(func.max(SkillVersion.version_no)).where(SkillVersion.skill_id == skill_id) return (self.db.scalar(stmt) or 0) + 1 class SkillInstallationRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, tenant_id: str, skill_id: str, skill_version_id: str, install_scope: str, scope_id: str, config_json: dict[str, JSONValue], installed_by: str | None, ) -> SkillInstallation: entity = SkillInstallation( tenant_id=tenant_id, skill_id=skill_id, skill_version_id=skill_version_id, install_scope=install_scope, scope_id=scope_id, config_json=config_json, status="installed", installed_by=installed_by, installed_time=datetime.utcnow(), ) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_by_scope( self, *, tenant_id: str, install_scope: str | None = None, scope_id: str | None = None, ) -> list[SkillInstallation]: stmt = select(SkillInstallation).where(SkillInstallation.tenant_id == tenant_id) if install_scope is not None: stmt = stmt.where(SkillInstallation.install_scope == install_scope) if scope_id is not None: stmt = stmt.where(SkillInstallation.scope_id == scope_id) stmt = stmt.order_by(SkillInstallation.created_time.desc()) return list(self.db.scalars(stmt)) def get_by_id(self, *, tenant_id: str, installation_id: str) -> SkillInstallation | None: stmt = ( select(SkillInstallation) .where(SkillInstallation.tenant_id == tenant_id) .where(SkillInstallation.id == installation_id) ) return self.db.scalar(stmt) def update_status( self, *, tenant_id: str, installation_id: str, status: SkillInstallStatus, ) -> SkillInstallation | None: entity = self.get_by_id(tenant_id=tenant_id, installation_id=installation_id) if entity is None: return None entity.status = status self.db.commit() self.db.refresh(entity) return entity class SkillRunRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, tenant_id: str, skill_id: str, skill_version_id: str, installation_id: str | None, input_json: dict[str, JSONValue], ) -> SkillRun: entity = SkillRun( tenant_id=tenant_id, skill_id=skill_id, skill_version_id=skill_version_id, installation_id=installation_id, input_json=input_json, status="queued", ) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def get_by_id(self, *, tenant_id: str, skill_run_id: str) -> SkillRun | None: stmt = ( select(SkillRun) .where(SkillRun.tenant_id == tenant_id) .where(SkillRun.id == skill_run_id) ) return self.db.scalar(stmt) def update_status( self, *, skill_run_id: str, status: SkillRunStatus, worker_key: str | None = None, output_json: dict[str, JSONValue] | None = None, output_text: str | None = None, error_code: str | None = None, error_message: str | None = None, ) -> SkillRun | None: entity = self.db.get(SkillRun, skill_run_id) if entity is None: return None now = datetime.utcnow() entity.status = status entity.worker_key = worker_key entity.output_json = output_json entity.output_text = output_text entity.error_code = error_code entity.error_message = error_message if status == "running" and entity.started_time is None: entity.started_time = now if status in {"completed", "failed", "cancelled"}: entity.finished_time = now self.db.commit() self.db.refresh(entity) return entity