import hashlib import json from dataclasses import dataclass from datetime import datetime from app.db.models import ApiKey, Role, RoleAssignment, RolePermissionBinding, User from app.domain.repositories import ( ApiKeyRepository, RoleAssignmentRepository, RolePermissionBindingRepository, RoleRepository, UserRepository, ) from app.infrastructure.api_keys import generate_api_key, get_api_key_prefix, hash_api_key from app.infrastructure.passwords import verify_password from app.infrastructure.tokens import TokenError, issue_access_token, verify_access_token @dataclass(frozen=True) class LoginResult: access_token: str expires_time: datetime user: User @dataclass(frozen=True) class TokenVerificationResult: active: bool user_id: str | None = None username: str | None = None expires_time: datetime | None = None reason: str | None = None @dataclass(frozen=True) class PermissionCheckResult: allowed: bool reason: str matched_role_ids: list[str] class AuthApplicationService: def __init__( self, *, user_repository: UserRepository, role_repository: RoleRepository, assignment_repository: RoleAssignmentRepository, permission_binding_repository: RolePermissionBindingRepository, api_key_repository: ApiKeyRepository, token_secret: str, redis_client: object | None = None, permission_cache_ttl_seconds: int = 60) -> None: self.user_repository = user_repository self.role_repository = role_repository self.assignment_repository = assignment_repository self.permission_binding_repository = permission_binding_repository self.api_key_repository = api_key_repository self.token_secret = token_secret self.redis_client = redis_client self.permission_cache_ttl_seconds = permission_cache_ttl_seconds def login(self, *, username: str, password: str) -> LoginResult | None: user = self.user_repository.get_by_username(username=username) if user is None or user.status != "active": return None if not verify_password(password, user.password_hash): return None self.user_repository.touch_last_login_time(user_id=user.id) access_token, expires_time = issue_access_token( user_id=user.id, secret=self.token_secret) return LoginResult( access_token=access_token, expires_time=expires_time, user=user) def verify_token(self, *, access_token: str) -> TokenVerificationResult: if self._is_token_revoked(access_token=access_token): return TokenVerificationResult(active=False, reason="token_revoked") try: token_payload = verify_access_token(access_token, secret=self.token_secret) except TokenError as exc: return TokenVerificationResult(active=False, reason=str(exc)) expires_time_raw = token_payload["expires_time"] user = self.user_repository.get_by_id(user_id=token_payload["user_id"]) if user is None or user.status != "active": return TokenVerificationResult(active=False, reason="user_not_active") return TokenVerificationResult( active=True, user_id=user.id, username=user.username, expires_time=datetime.fromisoformat(expires_time_raw.removesuffix("Z"))) def logout(self, *, access_token: str | None) -> bool: if not access_token or self.redis_client is None: return True try: token_payload = verify_access_token(access_token, secret=self.token_secret) except TokenError: return True expires_time = datetime.fromisoformat(token_payload["expires_time"].removesuffix("Z")) ttl_seconds = max(1, int((expires_time - datetime.utcnow()).total_seconds())) try: self.redis_client.set( self._revoked_token_key(access_token=access_token), "1", ex=ttl_seconds) except Exception: return False return True def list_users(self) -> list[User]: return self.user_repository.list_all() def list_users_page( self, *, page: int, page_size: int, keyword: str | None) -> tuple[list[User], int]: return self.user_repository.list_page( offset=(page - 1) * page_size, limit=page_size, keyword=keyword) def list_roles(self) -> list[Role]: return self.role_repository.list_all() def list_roles_page( self, *, page: int, page_size: int, keyword: str | None) -> tuple[list[Role], int]: return self.role_repository.list_page( offset=(page - 1) * page_size, limit=page_size, keyword=keyword) def list_assignments(self, *, user_id: str) -> list[RoleAssignment]: return self.assignment_repository.list_by_user(user_id=user_id) def list_role_permission_bindings( self, *, role_id: str, page: int, page_size: int) -> tuple[list[RolePermissionBinding], int]: return self.permission_binding_repository.list_by_role( role_id=role_id, offset=(page - 1) * page_size, limit=page_size) def add_role_permission_binding( self, *, role_id: str, permission: str, scope_type: str | None, scope_id: str | None) -> RolePermissionBinding: return self.permission_binding_repository.create( role_id=role_id, permission=permission, scope_type=scope_type, scope_id=scope_id) def remove_role_permission_binding(self, *, binding_id: str) -> bool: return self.permission_binding_repository.delete(binding_id=binding_id) def list_api_keys_page( self, *, page: int, page_size: int, keyword: str | None) -> tuple[list[ApiKey], int]: return self.api_key_repository.list_page( offset=(page - 1) * page_size, limit=page_size, keyword=keyword) def create_api_key( self, *, name: str, scopes: str | None, expires_time: datetime | None) -> tuple[ApiKey, str]: secret = generate_api_key() entity = self.api_key_repository.create( name=name, key_prefix=get_api_key_prefix(secret), key_hash=hash_api_key(secret), scopes=scopes, expires_time=expires_time) return entity, secret def revoke_api_key(self, *, api_key_id: str) -> ApiKey | None: return self.api_key_repository.revoke(api_key_id=api_key_id) def check_permission( self, *, user_id: str, permission: str, scope_type: str | None, scope_id: str | None) -> PermissionCheckResult: cached_result = self._read_permission_cache( user_id=user_id, permission=permission, scope_type=scope_type, scope_id=scope_id) if cached_result is not None: return cached_result result = self._check_permission_uncached( user_id=user_id, permission=permission, scope_type=scope_type, scope_id=scope_id) self._write_permission_cache( user_id=user_id, permission=permission, scope_type=scope_type, scope_id=scope_id, result=result) return result def _check_permission_uncached( self, *, user_id: str, permission: str, scope_type: str | None, scope_id: str | None) -> PermissionCheckResult: user = self.user_repository.get_by_id(user_id=user_id) if user is None or user.status != "active": return PermissionCheckResult( allowed=False, reason="user_not_active", matched_role_ids=[]) assignments = self.assignment_repository.list_by_user(user_id=user_id) matched_role_ids: list[str] = [] now = datetime.utcnow() for assignment in assignments: if assignment.status != "active": continue if assignment.expires_time is not None and assignment.expires_time <= now: continue if not self._scope_matches( assignment=assignment, scope_type=scope_type, scope_id=scope_id): continue role = self.role_repository.get_by_id(role_id=assignment.role_id) if role is None or role.status != "active": continue if self._role_has_permission( role, permission, scope_type=scope_type, scope_id=scope_id): matched_role_ids.append(role.id) return PermissionCheckResult( allowed=bool(matched_role_ids), reason="matched" if matched_role_ids else "permission_not_found", matched_role_ids=matched_role_ids) def _is_token_revoked(self, *, access_token: str) -> bool: if self.redis_client is None: return False try: return self.redis_client.exists(self._revoked_token_key(access_token=access_token)) > 0 except Exception: return False def _revoked_token_key(self, *, access_token: str) -> str: return f"auth:revoked-token:{self._token_digest(access_token)}" def _token_digest(self, access_token: str) -> str: return hashlib.sha256(access_token.encode("utf-8")).hexdigest() def _read_permission_cache( self, *, user_id: str, permission: str, scope_type: str | None, scope_id: str | None) -> PermissionCheckResult | None: if self.redis_client is None: return None try: raw_value = self.redis_client.get( self._permission_cache_key( user_id=user_id, permission=permission, scope_type=scope_type, scope_id=scope_id)) except Exception: return None if not isinstance(raw_value, (bytes, str)): return None decoded = raw_value.decode("utf-8") if isinstance(raw_value, bytes) else raw_value try: payload = json.loads(decoded) except json.JSONDecodeError: return None if not isinstance(payload, dict): return None matched_role_ids = payload.get("matched_role_ids") if not isinstance(matched_role_ids, list): return None return PermissionCheckResult( allowed=bool(payload.get("allowed")), reason=str(payload.get("reason") or "cached"), matched_role_ids=[ item for item in matched_role_ids if isinstance(item, str) ]) def _write_permission_cache( self, *, user_id: str, permission: str, scope_type: str | None, scope_id: str | None, result: PermissionCheckResult) -> None: if self.redis_client is None or self.permission_cache_ttl_seconds <= 0: return payload = { "allowed": result.allowed, "reason": result.reason, "matched_role_ids": result.matched_role_ids, } try: self.redis_client.set( self._permission_cache_key( user_id=user_id, permission=permission, scope_type=scope_type, scope_id=scope_id), json.dumps(payload, ensure_ascii=False), ex=self.permission_cache_ttl_seconds) except Exception: return def _permission_cache_key( self, *, user_id: str, permission: str, scope_type: str | None, scope_id: str | None) -> str: raw_key = json.dumps( { "user_id": user_id, "permission": permission, "scope_type": scope_type, "scope_id": scope_id, }, sort_keys=True, separators=(",", ":")) digest = hashlib.sha256(raw_key.encode("utf-8")).hexdigest() return f"auth:permission-check:{digest}" def _permission_matches(self, permissions: list[str], requested_permission: str) -> bool: if "*" in permissions or requested_permission in permissions: return True return any( permission.endswith(":*") and requested_permission.startswith(permission.removesuffix("*")) for permission in permissions ) def _scope_matches( self, *, assignment: RoleAssignment, scope_type: str | None, scope_id: str | None) -> bool: if assignment.scope_type is None and assignment.scope_id is None: return True return assignment.scope_type == scope_type and assignment.scope_id == scope_id def _role_has_permission( self, role: Role, requested_permission: str, *, scope_type: str | None, scope_id: str | None) -> bool: bindings = self.permission_binding_repository.list_all_by_role(role_id=role.id) return any( (binding.scope_type is None and binding.scope_id is None or (binding.scope_type == scope_type and binding.scope_id == scope_id)) and self._permission_matches([binding.permission], requested_permission) for binding in bindings )