from datetime import datetime from sqlalchemy import func, or_, select from sqlalchemy.orm import Session from core_domain import SkillInstallStatus, SkillRunStatus, SkillStatus from core_shared import JSONValue from app.db.models import SkillDefinition, SkillInstallation, SkillRun class SkillDefinitionRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, code: str, name: str, skill_type: str, description: str | None, owner_user_id: str | None, runtime_type: str, entrypoint: str | None, parameter_schema_json: dict[str, JSONValue], output_schema_json: dict[str, JSONValue], implementation_json: dict[str, JSONValue], metadata_json: dict[str, JSONValue] | None) -> SkillDefinition: entity = SkillDefinition( code=code, name=name, skill_type=skill_type, description=description, owner_user_id=owner_user_id, runtime_type=runtime_type, entrypoint=entrypoint, parameter_schema_json=parameter_schema_json, output_schema_json=output_schema_json, implementation_json=implementation_json, metadata_json=metadata_json) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_all(self) -> list[SkillDefinition]: stmt = ( select(SkillDefinition) .order_by(SkillDefinition.created_time.desc()) ) return list(self.db.scalars(stmt)) def list_filtered( self, *, keyword: str | None = None, status: SkillStatus | None = None, skill_type: str | None = None, category: str | None = None, offset: int = 0, limit: int = 20) -> tuple[list[SkillDefinition], int]: stmt = select(SkillDefinition) if status is not None: stmt = stmt.where(SkillDefinition.status == status) if skill_type is not None: stmt = stmt.where(SkillDefinition.skill_type == skill_type) if keyword: pattern = f"%{keyword.strip()}%" stmt = stmt.where(or_( SkillDefinition.name.ilike(pattern), SkillDefinition.description.ilike(pattern), SkillDefinition.code.ilike(pattern))) if category is not None: all_items = list(self.db.scalars(stmt.order_by(SkillDefinition.created_time.desc()))) filtered_items = [ item for item in all_items if isinstance(item.metadata_json, dict) and item.metadata_json.get("category") == category ] return filtered_items[offset:offset + limit], len(filtered_items) items = list(self.db.scalars( stmt.order_by(SkillDefinition.created_time.desc()).offset(offset).limit(limit))) total = self.db.scalar(select(func.count()).select_from(stmt.subquery())) or 0 return items, total def get_by_id(self, *, skill_id: str) -> SkillDefinition | None: stmt = ( select(SkillDefinition) .where(SkillDefinition.id == skill_id) ) return self.db.scalar(stmt) def update_status( self, *, skill_id: str, status: SkillStatus) -> SkillDefinition | None: entity = self.get_by_id(skill_id=skill_id) if entity is None: return None entity.status = status self.db.commit() self.db.refresh(entity) return entity def update( self, *, skill_id: str, name: str | None = None, skill_type: str | None = None, description: str | None = None, owner_user_id: str | None = None, runtime_type: str | None = None, entrypoint: str | None = None, parameter_schema_json: dict[str, JSONValue] | None = None, output_schema_json: dict[str, JSONValue] | None = None, implementation_json: dict[str, JSONValue] | None = None, metadata_json: dict[str, JSONValue] | None = None) -> SkillDefinition | None: entity = self.get_by_id(skill_id=skill_id) if entity is None: return None if name is not None: entity.name = name if skill_type is not None: entity.skill_type = skill_type if description is not None: entity.description = description if owner_user_id is not None: entity.owner_user_id = owner_user_id if runtime_type is not None: entity.runtime_type = runtime_type if entrypoint is not None: entity.entrypoint = entrypoint if parameter_schema_json is not None: entity.parameter_schema_json = parameter_schema_json if output_schema_json is not None: entity.output_schema_json = output_schema_json if implementation_json is not None: entity.implementation_json = implementation_json if metadata_json is not None: entity.metadata_json = metadata_json self.db.commit() self.db.refresh(entity) return entity class SkillInstallationRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, skill_id: str, install_scope: str, scope_id: str, config_json: dict[str, JSONValue], installed_by: str | None) -> SkillInstallation: entity = SkillInstallation( skill_id=skill_id, install_scope=install_scope, scope_id=scope_id, config_json=config_json, status="installed", installed_by=installed_by, installed_time=datetime.utcnow()) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_by_scope( self, *, install_scope: str | None = None, scope_id: str | None = None) -> list[SkillInstallation]: stmt = select(SkillInstallation) if install_scope is not None: stmt = stmt.where(SkillInstallation.install_scope == install_scope) if scope_id is not None: stmt = stmt.where(SkillInstallation.scope_id == scope_id) stmt = stmt.order_by(SkillInstallation.created_time.desc()) return list(self.db.scalars(stmt)) def list_filtered( self, *, install_scope: str | None = None, scope_id: str | None = None, status: SkillInstallStatus | None = None, offset: int = 0, limit: int = 20) -> tuple[list[SkillInstallation], int]: stmt = select(SkillInstallation) if install_scope is not None: stmt = stmt.where(SkillInstallation.install_scope == install_scope) if scope_id is not None: stmt = stmt.where(SkillInstallation.scope_id == scope_id) if status is not None: stmt = stmt.where(SkillInstallation.status == status) total = self.db.scalar(select(func.count()).select_from(stmt.subquery())) or 0 items = list(self.db.scalars( stmt.order_by(SkillInstallation.created_time.desc()).offset(offset).limit(limit))) return items, total def get_by_id(self, *, installation_id: str) -> SkillInstallation | None: stmt = ( select(SkillInstallation) .where(SkillInstallation.id == installation_id) ) return self.db.scalar(stmt) def update_status( self, *, installation_id: str, status: SkillInstallStatus) -> SkillInstallation | None: entity = self.get_by_id(installation_id=installation_id) if entity is None: return None entity.status = status self.db.commit() self.db.refresh(entity) return entity class SkillRunRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, skill_id: str, installation_id: str | None, input_json: dict[str, JSONValue]) -> SkillRun: entity = SkillRun( skill_id=skill_id, installation_id=installation_id, input_json=input_json, status="queued") self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def get_by_id(self, *, skill_run_id: str) -> SkillRun | None: stmt = ( select(SkillRun) .where(SkillRun.id == skill_run_id) ) return self.db.scalar(stmt) def update_status( self, *, skill_run_id: str, status: SkillRunStatus, worker_key: str | None = None, output_json: dict[str, JSONValue] | None = None, output_text: str | None = None, error_code: str | None = None, error_message: str | None = None) -> SkillRun | None: entity = self.db.get(SkillRun, skill_run_id) if entity is None: return None now = datetime.utcnow() entity.status = status entity.worker_key = worker_key entity.output_json = output_json entity.output_text = output_text entity.error_code = error_code entity.error_message = error_message if status == "running" and entity.started_time is None: entity.started_time = now if status in {"completed", "failed", "cancelled"}: entity.finished_time = now self.db.commit() self.db.refresh(entity) return entity