from datetime import datetime from sqlalchemy import select from sqlalchemy.orm import Session from core_domain import RoleAssignmentStatus, RoleStatus, UserStatus from core_shared import JSONValue from app.db.models import Role, RoleAssignment, User class UserRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, tenant_id: str, username: str, display_name: str | None, email: str | None, metadata_json: dict[str, JSONValue], ) -> User: entity = User( tenant_id=tenant_id, username=username, display_name=display_name, email=email, metadata_json=metadata_json, ) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_by_tenant(self, *, tenant_id: str) -> list[User]: stmt = select(User).where(User.tenant_id == tenant_id).order_by(User.created_time.desc()) return list(self.db.scalars(stmt)) def get_by_id(self, *, tenant_id: str, user_id: str) -> User | None: stmt = select(User).where(User.tenant_id == tenant_id).where(User.id == user_id) return self.db.scalar(stmt) def update_status(self, *, tenant_id: str, user_id: str, status: UserStatus) -> User | None: entity = self.get_by_id(tenant_id=tenant_id, user_id=user_id) if entity is None: return None entity.status = status self.db.commit() self.db.refresh(entity) return entity class RoleRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, tenant_id: str, code: str, name: str, description: str | None, permissions_json: list[str], ) -> Role: entity = Role( tenant_id=tenant_id, code=code, name=name, description=description, permissions_json=permissions_json, ) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_by_tenant(self, *, tenant_id: str) -> list[Role]: stmt = select(Role).where(Role.tenant_id == tenant_id).order_by(Role.created_time.desc()) return list(self.db.scalars(stmt)) def get_by_id(self, *, tenant_id: str, role_id: str) -> Role | None: stmt = select(Role).where(Role.tenant_id == tenant_id).where(Role.id == role_id) return self.db.scalar(stmt) def update_status(self, *, tenant_id: str, role_id: str, status: RoleStatus) -> Role | None: entity = self.get_by_id(tenant_id=tenant_id, role_id=role_id) if entity is None: return None entity.status = status self.db.commit() self.db.refresh(entity) return entity class RoleAssignmentRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, tenant_id: str, user_id: str, role_id: str, scope_type: str | None, scope_id: str | None, expires_time: datetime | None, ) -> RoleAssignment: entity = RoleAssignment( tenant_id=tenant_id, user_id=user_id, role_id=role_id, scope_type=scope_type, scope_id=scope_id, expires_time=expires_time, ) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_by_user(self, *, tenant_id: str, user_id: str) -> list[RoleAssignment]: stmt = ( select(RoleAssignment) .where(RoleAssignment.tenant_id == tenant_id) .where(RoleAssignment.user_id == user_id) .order_by(RoleAssignment.created_time.desc()) ) return list(self.db.scalars(stmt)) def get_by_id( self, *, tenant_id: str, assignment_id: str, ) -> RoleAssignment | None: stmt = ( select(RoleAssignment) .where(RoleAssignment.tenant_id == tenant_id) .where(RoleAssignment.id == assignment_id) ) return self.db.scalar(stmt) def update_status( self, *, tenant_id: str, assignment_id: str, status: RoleAssignmentStatus, ) -> RoleAssignment | None: entity = self.get_by_id(tenant_id=tenant_id, assignment_id=assignment_id) if entity is None: return None entity.status = status self.db.commit() self.db.refresh(entity) return entity