| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- from datetime import datetime
- from app.db.models import Role, RoleAssignment, User
- from app.domain.repositories import RoleAssignmentRepository, RoleRepository, UserRepository
- from app.infrastructure.passwords import hash_password, verify_password
- from app.infrastructure.tokens import TokenError, issue_access_token, verify_access_token
- from app.schemas.auth import (
- LoginRequest,
- LoginResponse,
- LoginUserResponse,
- PermissionCheckRequest,
- PermissionCheckResponse,
- RoleAssignmentCreateRequest,
- RoleAssignmentStatusUpdateRequest,
- RoleCreateRequest,
- RoleStatusUpdateRequest,
- TokenVerifyRequest,
- TokenVerifyResponse,
- UserCreateRequest,
- UserStatusUpdateRequest,
- )
- class AuthApplicationService:
- def __init__(
- self,
- *,
- user_repository: UserRepository,
- role_repository: RoleRepository,
- assignment_repository: RoleAssignmentRepository,
- token_secret: str) -> None:
- self.user_repository = user_repository
- self.role_repository = role_repository
- self.assignment_repository = assignment_repository
- self.token_secret = token_secret
- def create_user(self, payload: UserCreateRequest) -> User:
- return self.user_repository.create(
- username=payload.username,
- password_hash=hash_password(payload.password) if payload.password else "",
- display_name=payload.display_name,
- email=payload.email,
- metadata_json=payload.metadata_json)
- def login(self, payload: LoginRequest) -> LoginResponse | None:
- user = self.user_repository.get_by_username(
- username=payload.username)
- if user is None or user.status != "active":
- return None
- if not verify_password(payload.password, user.password_hash):
- return None
- self.user_repository.touch_last_login_time(user_id=user.id)
- access_token, expires_time = issue_access_token(
- user_id=user.id,
- secret=self.token_secret)
- return LoginResponse(
- access_token=access_token,
- expires_time=expires_time,
- user=LoginUserResponse.from_entity(user))
- def verify_token(self, payload: TokenVerifyRequest) -> TokenVerifyResponse:
- try:
- token_payload = verify_access_token(
- payload.access_token,
- secret=self.token_secret)
- except TokenError as exc:
- return TokenVerifyResponse(active=False, reason=str(exc))
- expires_time_raw = token_payload["expires_time"]
- user = self.user_repository.get_by_id(
- user_id=token_payload["user_id"])
- if user is None or user.status != "active":
- return TokenVerifyResponse(active=False, reason="user_not_active")
- return TokenVerifyResponse(
- active=True,
- user_id=user.id,
- username=user.username,
- expires_time=datetime.fromisoformat(expires_time_raw.removesuffix("Z")))
- def list_users(self) -> list[User]:
- return self.user_repository.list_all()
- def update_user_status(self, *, user_id: str, payload: UserStatusUpdateRequest) -> User | None:
- return self.user_repository.update_status(
- user_id=user_id,
- status=payload.status)
- def create_role(self, payload: RoleCreateRequest) -> Role:
- return self.role_repository.create(
- code=payload.code,
- name=payload.name,
- description=payload.description,
- permissions_json=payload.permissions_json)
- def list_roles(self) -> list[Role]:
- return self.role_repository.list_all()
- def update_role_status(self, *, role_id: str, payload: RoleStatusUpdateRequest) -> Role | None:
- return self.role_repository.update_status(
- role_id=role_id,
- status=payload.status)
- def create_assignment(
- self,
- payload: RoleAssignmentCreateRequest) -> RoleAssignment:
- return self.assignment_repository.create(
- user_id=payload.user_id,
- role_id=payload.role_id,
- scope_type=payload.scope_type,
- scope_id=payload.scope_id,
- expires_time=payload.expires_time)
- def list_assignments(self, *, user_id: str) -> list[RoleAssignment]:
- return self.assignment_repository.list_by_user(user_id=user_id)
- def update_assignment_status(
- self,
- *,
- assignment_id: str,
- payload: RoleAssignmentStatusUpdateRequest) -> RoleAssignment | None:
- return self.assignment_repository.update_status(
- assignment_id=assignment_id,
- status=payload.status)
- def check_permission(self, payload: PermissionCheckRequest) -> PermissionCheckResponse:
- user = self.user_repository.get_by_id(
- user_id=payload.user_id)
- if user is None or user.status != "active":
- return PermissionCheckResponse(allowed=False, reason="user_not_active")
- assignments = self.assignment_repository.list_by_user(
- user_id=payload.user_id)
- matched_role_ids: list[str] = []
- now = datetime.utcnow()
- for assignment in assignments:
- if assignment.status != "active":
- continue
- if assignment.expires_time is not None and assignment.expires_time <= now:
- continue
- if not self._scope_matches(
- assignment=assignment,
- scope_type=payload.scope_type,
- scope_id=payload.scope_id):
- continue
- role = self.role_repository.get_by_id(
- role_id=assignment.role_id)
- if role is None or role.status != "active":
- continue
- if self._permission_matches(role.permissions_json, payload.permission):
- matched_role_ids.append(role.id)
- return PermissionCheckResponse(
- allowed=bool(matched_role_ids),
- reason="matched" if matched_role_ids else "permission_not_found",
- matched_role_ids=matched_role_ids)
- def _permission_matches(self, permissions: list[str], requested_permission: str) -> bool:
- if "*" in permissions or requested_permission in permissions:
- return True
- return any(
- permission.endswith(":*")
- and requested_permission.startswith(permission.removesuffix("*"))
- for permission in permissions
- )
- def _scope_matches(
- self,
- *,
- assignment: RoleAssignment,
- scope_type: str | None,
- scope_id: str | None) -> bool:
- if assignment.scope_type is None and assignment.scope_id is None:
- return True
- return assignment.scope_type == scope_type and assignment.scope_id == scope_id
|