| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- from datetime import datetime
- from core_domain import RoleAssignmentStatus, RoleStatus, UserStatus
- from core_shared import JSONValue
- from sqlalchemy import select
- from sqlalchemy.orm import Session
- from app.db.models import ApiKey, Role, RoleAssignment, RolePermissionBinding, User
- class UserRepository:
- def __init__(self, db: Session) -> None:
- self.db = db
- def create(
- self,
- *,
- username: str,
- password_hash: str,
- display_name: str | None,
- email: str | None,
- metadata_json: dict[str, JSONValue]) -> User:
- entity = User(
- username=username,
- password_hash=password_hash,
- display_name=display_name,
- email=email,
- metadata_json=metadata_json)
- self.db.add(entity)
- self.db.commit()
- self.db.refresh(entity)
- return entity
- def list_all(self) -> list[User]:
- stmt = select(User).order_by(User.created_time.desc())
- return list(self.db.scalars(stmt))
- def has_any(self) -> bool:
- stmt = select(User.id).limit(1)
- return self.db.scalar(stmt) is not None
- def list_page(self, *, offset: int, limit: int, keyword: str | None) -> tuple[list[User], int]:
- stmt = select(User)
- if keyword:
- like = f"%{keyword}%"
- stmt = stmt.where(
- (User.username.like(like))
- | (User.display_name.like(like))
- | (User.email.like(like))
- )
- total = len(list(self.db.scalars(stmt)))
- items_stmt = stmt.order_by(User.created_time.desc()).offset(offset).limit(limit)
- return list(self.db.scalars(items_stmt)), total
- def get_by_id(self, *, user_id: str) -> User | None:
- return self.db.get(User, user_id)
- def get_by_username(self, *, username: str) -> User | None:
- stmt = select(User).where(User.username == username)
- return self.db.scalar(stmt)
- def touch_last_login_time(self, *, user_id: str) -> None:
- entity = self.db.get(User, user_id)
- if entity is None:
- return
- entity.last_login_time = datetime.utcnow()
- self.db.commit()
- self.db.refresh(entity)
- def update_status(self, *, user_id: str, status: UserStatus) -> User | None:
- entity = self.get_by_id(user_id=user_id)
- if entity is None:
- return None
- entity.status = status
- self.db.commit()
- self.db.refresh(entity)
- return entity
- class RoleRepository:
- def __init__(self, db: Session) -> None:
- self.db = db
- def create(
- self,
- *,
- code: str,
- name: str,
- description: str | None,
- permissions_json: list[str]) -> Role:
- entity = Role(
- code=code,
- name=name,
- description=description,
- permissions_json=permissions_json)
- self.db.add(entity)
- self.db.commit()
- self.db.refresh(entity)
- return entity
- def list_all(self) -> list[Role]:
- stmt = select(Role).order_by(Role.created_time.desc())
- return list(self.db.scalars(stmt))
- def get_by_name(self, *, name: str) -> Role | None:
- stmt = select(Role).where(Role.name == name)
- return self.db.scalar(stmt)
- def list_page(self, *, offset: int, limit: int, keyword: str | None) -> tuple[list[Role], int]:
- stmt = select(Role)
- if keyword:
- like = f"%{keyword}%"
- stmt = stmt.where((Role.name.like(like)) | (Role.description.like(like)))
- total = len(list(self.db.scalars(stmt)))
- items_stmt = stmt.order_by(Role.created_time.desc()).offset(offset).limit(limit)
- return list(self.db.scalars(items_stmt)), total
- def get_by_id(self, *, role_id: str) -> Role | None:
- return self.db.get(Role, role_id)
- def update_status(self, *, role_id: str, status: RoleStatus) -> Role | None:
- entity = self.get_by_id(role_id=role_id)
- if entity is None:
- return None
- entity.status = status
- self.db.commit()
- self.db.refresh(entity)
- return entity
- class RoleAssignmentRepository:
- def __init__(self, db: Session) -> None:
- self.db = db
- def create(
- self,
- *,
- user_id: str,
- role_id: str,
- scope_type: str | None,
- scope_id: str | None,
- expires_time: datetime | None) -> RoleAssignment:
- entity = RoleAssignment(
- user_id=user_id,
- role_id=role_id,
- scope_type=scope_type,
- scope_id=scope_id,
- expires_time=expires_time)
- self.db.add(entity)
- self.db.commit()
- self.db.refresh(entity)
- return entity
- def list_by_user(self, *, user_id: str) -> list[RoleAssignment]:
- stmt = (
- select(RoleAssignment)
- .where(RoleAssignment.user_id == user_id)
- .order_by(RoleAssignment.created_time.desc())
- )
- return list(self.db.scalars(stmt))
- def get_by_id(
- self,
- *,
- assignment_id: str) -> RoleAssignment | None:
- return self.db.get(RoleAssignment, assignment_id)
- def update_status(
- self,
- *,
- assignment_id: str,
- status: RoleAssignmentStatus) -> RoleAssignment | None:
- entity = self.get_by_id(assignment_id=assignment_id)
- if entity is None:
- return None
- entity.status = status
- self.db.commit()
- self.db.refresh(entity)
- return entity
- class RolePermissionBindingRepository:
- def __init__(self, db: Session) -> None:
- self.db = db
- def create(
- self,
- *,
- role_id: str,
- permission: str,
- scope_type: str | None,
- scope_id: str | None) -> RolePermissionBinding:
- entity = RolePermissionBinding(
- role_id=role_id,
- permission=permission,
- scope_type=scope_type,
- scope_id=scope_id)
- self.db.add(entity)
- self.db.commit()
- self.db.refresh(entity)
- return entity
- def list_by_role(
- self,
- *,
- role_id: str,
- offset: int = 0,
- limit: int = 100) -> tuple[list[RolePermissionBinding], int]:
- stmt = select(RolePermissionBinding).where(RolePermissionBinding.role_id == role_id)
- total = len(list(self.db.scalars(stmt)))
- items_stmt = (
- stmt.order_by(RolePermissionBinding.created_time.desc())
- .offset(offset)
- .limit(limit)
- )
- return list(self.db.scalars(items_stmt)), total
- def list_all_by_role(self, *, role_id: str) -> list[RolePermissionBinding]:
- stmt = (
- select(RolePermissionBinding)
- .where(RolePermissionBinding.role_id == role_id)
- .order_by(RolePermissionBinding.created_time.desc())
- )
- return list(self.db.scalars(stmt))
- def delete(self, *, binding_id: str) -> bool:
- entity = self.db.get(RolePermissionBinding, binding_id)
- if entity is None:
- return False
- self.db.delete(entity)
- self.db.commit()
- return True
- class ApiKeyRepository:
- def __init__(self, db: Session) -> None:
- self.db = db
- def create(
- self,
- *,
- name: str,
- key_prefix: str,
- key_hash: str,
- scopes: str | None,
- expires_time: datetime | None) -> ApiKey:
- entity = ApiKey(
- name=name,
- key_prefix=key_prefix,
- key_hash=key_hash,
- scopes=scopes,
- expires_time=expires_time)
- self.db.add(entity)
- self.db.commit()
- self.db.refresh(entity)
- return entity
- def list_page(
- self,
- *,
- offset: int,
- limit: int,
- keyword: str | None) -> tuple[list[ApiKey], int]:
- stmt = select(ApiKey)
- if keyword:
- stmt = stmt.where(ApiKey.name.like(f"%{keyword}%"))
- total = len(list(self.db.scalars(stmt)))
- items_stmt = stmt.order_by(ApiKey.created_time.desc()).offset(offset).limit(limit)
- return list(self.db.scalars(items_stmt)), total
- def get_by_id(self, *, api_key_id: str) -> ApiKey | None:
- return self.db.get(ApiKey, api_key_id)
- def revoke(self, *, api_key_id: str) -> ApiKey | None:
- entity = self.get_by_id(api_key_id=api_key_id)
- if entity is None:
- return None
- entity.revoked_time = datetime.utcnow()
- self.db.commit()
- self.db.refresh(entity)
- return entity
|