| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406 |
- import hashlib
- import json
- from dataclasses import dataclass
- from datetime import datetime
- from app.db.models import ApiKey, Role, RoleAssignment, RolePermissionBinding, User
- from app.domain.repositories import (
- ApiKeyRepository,
- RoleAssignmentRepository,
- RolePermissionBindingRepository,
- RoleRepository,
- UserRepository,
- )
- from app.infrastructure.api_keys import generate_api_key, get_api_key_prefix, hash_api_key
- from app.infrastructure.passwords import verify_password
- from app.infrastructure.tokens import TokenError, issue_access_token, verify_access_token
- @dataclass(frozen=True)
- class LoginResult:
- access_token: str
- expires_time: datetime
- user: User
- @dataclass(frozen=True)
- class TokenVerificationResult:
- active: bool
- user_id: str | None = None
- username: str | None = None
- expires_time: datetime | None = None
- reason: str | None = None
- @dataclass(frozen=True)
- class PermissionCheckResult:
- allowed: bool
- reason: str
- matched_role_ids: list[str]
- class AuthApplicationService:
- def __init__(
- self,
- *,
- user_repository: UserRepository,
- role_repository: RoleRepository,
- assignment_repository: RoleAssignmentRepository,
- permission_binding_repository: RolePermissionBindingRepository,
- api_key_repository: ApiKeyRepository,
- token_secret: str,
- redis_client: object | None = None,
- permission_cache_ttl_seconds: int = 60) -> None:
- self.user_repository = user_repository
- self.role_repository = role_repository
- self.assignment_repository = assignment_repository
- self.permission_binding_repository = permission_binding_repository
- self.api_key_repository = api_key_repository
- self.token_secret = token_secret
- self.redis_client = redis_client
- self.permission_cache_ttl_seconds = permission_cache_ttl_seconds
- def login(self, *, username: str, password: str) -> LoginResult | None:
- user = self.user_repository.get_by_username(username=username)
- if user is None or user.status != "active":
- return None
- if not verify_password(password, user.password_hash):
- return None
- self.user_repository.touch_last_login_time(user_id=user.id)
- access_token, expires_time = issue_access_token(
- user_id=user.id,
- secret=self.token_secret)
- return LoginResult(
- access_token=access_token,
- expires_time=expires_time,
- user=user)
- def verify_token(self, *, access_token: str) -> TokenVerificationResult:
- if self._is_token_revoked(access_token=access_token):
- return TokenVerificationResult(active=False, reason="token_revoked")
- try:
- token_payload = verify_access_token(access_token, secret=self.token_secret)
- except TokenError as exc:
- return TokenVerificationResult(active=False, reason=str(exc))
- expires_time_raw = token_payload["expires_time"]
- user = self.user_repository.get_by_id(user_id=token_payload["user_id"])
- if user is None or user.status != "active":
- return TokenVerificationResult(active=False, reason="user_not_active")
- return TokenVerificationResult(
- active=True,
- user_id=user.id,
- username=user.username,
- expires_time=datetime.fromisoformat(expires_time_raw.removesuffix("Z")))
- def logout(self, *, access_token: str | None) -> bool:
- if not access_token or self.redis_client is None:
- return True
- try:
- token_payload = verify_access_token(access_token, secret=self.token_secret)
- except TokenError:
- return True
- expires_time = datetime.fromisoformat(token_payload["expires_time"].removesuffix("Z"))
- ttl_seconds = max(1, int((expires_time - datetime.utcnow()).total_seconds()))
- try:
- self.redis_client.set(
- self._revoked_token_key(access_token=access_token),
- "1",
- ex=ttl_seconds)
- except Exception:
- return False
- return True
- def list_users(self) -> list[User]:
- return self.user_repository.list_all()
- def list_users_page(
- self,
- *,
- page: int,
- page_size: int,
- keyword: str | None) -> tuple[list[User], int]:
- return self.user_repository.list_page(
- offset=(page - 1) * page_size,
- limit=page_size,
- keyword=keyword)
- def list_roles(self) -> list[Role]:
- return self.role_repository.list_all()
- def list_roles_page(
- self,
- *,
- page: int,
- page_size: int,
- keyword: str | None) -> tuple[list[Role], int]:
- return self.role_repository.list_page(
- offset=(page - 1) * page_size,
- limit=page_size,
- keyword=keyword)
- def list_assignments(self, *, user_id: str) -> list[RoleAssignment]:
- return self.assignment_repository.list_by_user(user_id=user_id)
- def list_role_permission_bindings(
- self,
- *,
- role_id: str,
- page: int,
- page_size: int) -> tuple[list[RolePermissionBinding], int]:
- return self.permission_binding_repository.list_by_role(
- role_id=role_id,
- offset=(page - 1) * page_size,
- limit=page_size)
- def add_role_permission_binding(
- self,
- *,
- role_id: str,
- permission: str,
- scope_type: str | None,
- scope_id: str | None) -> RolePermissionBinding:
- return self.permission_binding_repository.create(
- role_id=role_id,
- permission=permission,
- scope_type=scope_type,
- scope_id=scope_id)
- def remove_role_permission_binding(self, *, binding_id: str) -> bool:
- return self.permission_binding_repository.delete(binding_id=binding_id)
- def list_api_keys_page(
- self,
- *,
- page: int,
- page_size: int,
- keyword: str | None) -> tuple[list[ApiKey], int]:
- return self.api_key_repository.list_page(
- offset=(page - 1) * page_size,
- limit=page_size,
- keyword=keyword)
- def create_api_key(
- self,
- *,
- name: str,
- scopes: str | None,
- expires_time: datetime | None) -> tuple[ApiKey, str]:
- secret = generate_api_key()
- entity = self.api_key_repository.create(
- name=name,
- key_prefix=get_api_key_prefix(secret),
- key_hash=hash_api_key(secret),
- scopes=scopes,
- expires_time=expires_time)
- return entity, secret
- def revoke_api_key(self, *, api_key_id: str) -> ApiKey | None:
- return self.api_key_repository.revoke(api_key_id=api_key_id)
- def check_permission(
- self,
- *,
- user_id: str,
- permission: str,
- scope_type: str | None,
- scope_id: str | None) -> PermissionCheckResult:
- cached_result = self._read_permission_cache(
- user_id=user_id,
- permission=permission,
- scope_type=scope_type,
- scope_id=scope_id)
- if cached_result is not None:
- return cached_result
- result = self._check_permission_uncached(
- user_id=user_id,
- permission=permission,
- scope_type=scope_type,
- scope_id=scope_id)
- self._write_permission_cache(
- user_id=user_id,
- permission=permission,
- scope_type=scope_type,
- scope_id=scope_id,
- result=result)
- return result
- def _check_permission_uncached(
- self,
- *,
- user_id: str,
- permission: str,
- scope_type: str | None,
- scope_id: str | None) -> PermissionCheckResult:
- user = self.user_repository.get_by_id(user_id=user_id)
- if user is None or user.status != "active":
- return PermissionCheckResult(
- allowed=False,
- reason="user_not_active",
- matched_role_ids=[])
- assignments = self.assignment_repository.list_by_user(user_id=user_id)
- matched_role_ids: list[str] = []
- now = datetime.utcnow()
- for assignment in assignments:
- if assignment.status != "active":
- continue
- if assignment.expires_time is not None and assignment.expires_time <= now:
- continue
- if not self._scope_matches(
- assignment=assignment,
- scope_type=scope_type,
- scope_id=scope_id):
- continue
- role = self.role_repository.get_by_id(role_id=assignment.role_id)
- if role is None or role.status != "active":
- continue
- if self._role_has_permission(
- role,
- permission,
- scope_type=scope_type,
- scope_id=scope_id):
- matched_role_ids.append(role.id)
- return PermissionCheckResult(
- allowed=bool(matched_role_ids),
- reason="matched" if matched_role_ids else "permission_not_found",
- matched_role_ids=matched_role_ids)
- def _is_token_revoked(self, *, access_token: str) -> bool:
- if self.redis_client is None:
- return False
- try:
- return self.redis_client.exists(self._revoked_token_key(access_token=access_token)) > 0
- except Exception:
- return False
- def _revoked_token_key(self, *, access_token: str) -> str:
- return f"auth:revoked-token:{self._token_digest(access_token)}"
- def _token_digest(self, access_token: str) -> str:
- return hashlib.sha256(access_token.encode("utf-8")).hexdigest()
- def _read_permission_cache(
- self,
- *,
- user_id: str,
- permission: str,
- scope_type: str | None,
- scope_id: str | None) -> PermissionCheckResult | None:
- if self.redis_client is None:
- return None
- try:
- raw_value = self.redis_client.get(
- self._permission_cache_key(
- user_id=user_id,
- permission=permission,
- scope_type=scope_type,
- scope_id=scope_id))
- except Exception:
- return None
- if not isinstance(raw_value, (bytes, str)):
- return None
- decoded = raw_value.decode("utf-8") if isinstance(raw_value, bytes) else raw_value
- try:
- payload = json.loads(decoded)
- except json.JSONDecodeError:
- return None
- if not isinstance(payload, dict):
- return None
- matched_role_ids = payload.get("matched_role_ids")
- if not isinstance(matched_role_ids, list):
- return None
- return PermissionCheckResult(
- allowed=bool(payload.get("allowed")),
- reason=str(payload.get("reason") or "cached"),
- matched_role_ids=[
- item for item in matched_role_ids
- if isinstance(item, str)
- ])
- def _write_permission_cache(
- self,
- *,
- user_id: str,
- permission: str,
- scope_type: str | None,
- scope_id: str | None,
- result: PermissionCheckResult) -> None:
- if self.redis_client is None or self.permission_cache_ttl_seconds <= 0:
- return
- payload = {
- "allowed": result.allowed,
- "reason": result.reason,
- "matched_role_ids": result.matched_role_ids,
- }
- try:
- self.redis_client.set(
- self._permission_cache_key(
- user_id=user_id,
- permission=permission,
- scope_type=scope_type,
- scope_id=scope_id),
- json.dumps(payload, ensure_ascii=False),
- ex=self.permission_cache_ttl_seconds)
- except Exception:
- return
- def _permission_cache_key(
- self,
- *,
- user_id: str,
- permission: str,
- scope_type: str | None,
- scope_id: str | None) -> str:
- raw_key = json.dumps(
- {
- "user_id": user_id,
- "permission": permission,
- "scope_type": scope_type,
- "scope_id": scope_id,
- },
- sort_keys=True,
- separators=(",", ":"))
- digest = hashlib.sha256(raw_key.encode("utf-8")).hexdigest()
- return f"auth:permission-check:{digest}"
- def _permission_matches(self, permissions: list[str], requested_permission: str) -> bool:
- if "*" in permissions or requested_permission in permissions:
- return True
- return any(
- permission.endswith(":*")
- and requested_permission.startswith(permission.removesuffix("*"))
- for permission in permissions
- )
- def _scope_matches(
- self,
- *,
- assignment: RoleAssignment,
- scope_type: str | None,
- scope_id: str | None) -> bool:
- if assignment.scope_type is None and assignment.scope_id is None:
- return True
- return assignment.scope_type == scope_type and assignment.scope_id == scope_id
- def _role_has_permission(
- self,
- role: Role,
- requested_permission: str,
- *,
- scope_type: str | None,
- scope_id: str | None) -> bool:
- bindings = self.permission_binding_repository.list_all_by_role(role_id=role.id)
- return any(
- (binding.scope_type is None and binding.scope_id is None
- or (binding.scope_type == scope_type and binding.scope_id == scope_id))
- and
- self._permission_matches([binding.permission], requested_permission)
- for binding in bindings
- )
|