from datetime import datetime from core_domain import RoleAssignmentStatus, RoleStatus, UserStatus from core_shared import JSONValue from sqlalchemy import select from sqlalchemy.orm import Session from app.db.models import ApiKey, Role, RoleAssignment, RolePermissionBinding, User class UserRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, username: str, password_hash: str, display_name: str | None, email: str | None, metadata_json: dict[str, JSONValue]) -> User: entity = User( username=username, password_hash=password_hash, display_name=display_name, email=email, metadata_json=metadata_json) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_all(self) -> list[User]: stmt = select(User).order_by(User.created_time.desc()) return list(self.db.scalars(stmt)) def has_any(self) -> bool: stmt = select(User.id).limit(1) return self.db.scalar(stmt) is not None def list_page(self, *, offset: int, limit: int, keyword: str | None) -> tuple[list[User], int]: stmt = select(User) if keyword: like = f"%{keyword}%" stmt = stmt.where( (User.username.like(like)) | (User.display_name.like(like)) | (User.email.like(like)) ) total = len(list(self.db.scalars(stmt))) items_stmt = stmt.order_by(User.created_time.desc()).offset(offset).limit(limit) return list(self.db.scalars(items_stmt)), total def get_by_id(self, *, user_id: str) -> User | None: return self.db.get(User, user_id) def get_by_username(self, *, username: str) -> User | None: stmt = select(User).where(User.username == username) return self.db.scalar(stmt) def touch_last_login_time(self, *, user_id: str) -> None: entity = self.db.get(User, user_id) if entity is None: return entity.last_login_time = datetime.utcnow() self.db.commit() self.db.refresh(entity) def update_status(self, *, user_id: str, status: UserStatus) -> User | None: entity = self.get_by_id(user_id=user_id) if entity is None: return None entity.status = status self.db.commit() self.db.refresh(entity) return entity class RoleRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, code: str, name: str, description: str | None, permissions_json: list[str]) -> Role: entity = Role( code=code, name=name, description=description, permissions_json=permissions_json) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_all(self) -> list[Role]: stmt = select(Role).order_by(Role.created_time.desc()) return list(self.db.scalars(stmt)) def get_by_name(self, *, name: str) -> Role | None: stmt = select(Role).where(Role.name == name) return self.db.scalar(stmt) def list_page(self, *, offset: int, limit: int, keyword: str | None) -> tuple[list[Role], int]: stmt = select(Role) if keyword: like = f"%{keyword}%" stmt = stmt.where((Role.name.like(like)) | (Role.description.like(like))) total = len(list(self.db.scalars(stmt))) items_stmt = stmt.order_by(Role.created_time.desc()).offset(offset).limit(limit) return list(self.db.scalars(items_stmt)), total def get_by_id(self, *, role_id: str) -> Role | None: return self.db.get(Role, role_id) def update_status(self, *, role_id: str, status: RoleStatus) -> Role | None: entity = self.get_by_id(role_id=role_id) if entity is None: return None entity.status = status self.db.commit() self.db.refresh(entity) return entity class RoleAssignmentRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, user_id: str, role_id: str, scope_type: str | None, scope_id: str | None, expires_time: datetime | None) -> RoleAssignment: entity = RoleAssignment( user_id=user_id, role_id=role_id, scope_type=scope_type, scope_id=scope_id, expires_time=expires_time) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_by_user(self, *, user_id: str) -> list[RoleAssignment]: stmt = ( select(RoleAssignment) .where(RoleAssignment.user_id == user_id) .order_by(RoleAssignment.created_time.desc()) ) return list(self.db.scalars(stmt)) def get_by_id( self, *, assignment_id: str) -> RoleAssignment | None: return self.db.get(RoleAssignment, assignment_id) def update_status( self, *, assignment_id: str, status: RoleAssignmentStatus) -> RoleAssignment | None: entity = self.get_by_id(assignment_id=assignment_id) if entity is None: return None entity.status = status self.db.commit() self.db.refresh(entity) return entity class RolePermissionBindingRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, role_id: str, permission: str, scope_type: str | None, scope_id: str | None) -> RolePermissionBinding: entity = RolePermissionBinding( role_id=role_id, permission=permission, scope_type=scope_type, scope_id=scope_id) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_by_role( self, *, role_id: str, offset: int = 0, limit: int = 100) -> tuple[list[RolePermissionBinding], int]: stmt = select(RolePermissionBinding).where(RolePermissionBinding.role_id == role_id) total = len(list(self.db.scalars(stmt))) items_stmt = ( stmt.order_by(RolePermissionBinding.created_time.desc()) .offset(offset) .limit(limit) ) return list(self.db.scalars(items_stmt)), total def list_all_by_role(self, *, role_id: str) -> list[RolePermissionBinding]: stmt = ( select(RolePermissionBinding) .where(RolePermissionBinding.role_id == role_id) .order_by(RolePermissionBinding.created_time.desc()) ) return list(self.db.scalars(stmt)) def delete(self, *, binding_id: str) -> bool: entity = self.db.get(RolePermissionBinding, binding_id) if entity is None: return False self.db.delete(entity) self.db.commit() return True class ApiKeyRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, name: str, key_prefix: str, key_hash: str, scopes: str | None, expires_time: datetime | None) -> ApiKey: entity = ApiKey( name=name, key_prefix=key_prefix, key_hash=key_hash, scopes=scopes, expires_time=expires_time) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_page( self, *, offset: int, limit: int, keyword: str | None) -> tuple[list[ApiKey], int]: stmt = select(ApiKey) if keyword: stmt = stmt.where(ApiKey.name.like(f"%{keyword}%")) total = len(list(self.db.scalars(stmt))) items_stmt = stmt.order_by(ApiKey.created_time.desc()).offset(offset).limit(limit) return list(self.db.scalars(items_stmt)), total def get_by_id(self, *, api_key_id: str) -> ApiKey | None: return self.db.get(ApiKey, api_key_id) def revoke(self, *, api_key_id: str) -> ApiKey | None: entity = self.get_by_id(api_key_id=api_key_id) if entity is None: return None entity.revoked_time = datetime.utcnow() self.db.commit() self.db.refresh(entity) return entity