services.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. from datetime import datetime
  2. from app.db.models import Role, RoleAssignment, User
  3. from app.domain.repositories import RoleAssignmentRepository, RoleRepository, UserRepository
  4. from app.infrastructure.passwords import hash_password, verify_password
  5. from app.infrastructure.tokens import TokenError, issue_access_token, verify_access_token
  6. from app.schemas.auth import (
  7. LoginRequest,
  8. LoginResponse,
  9. LoginUserResponse,
  10. PermissionCheckRequest,
  11. PermissionCheckResponse,
  12. RoleAssignmentCreateRequest,
  13. RoleAssignmentStatusUpdateRequest,
  14. RoleCreateRequest,
  15. RoleStatusUpdateRequest,
  16. TokenVerifyRequest,
  17. TokenVerifyResponse,
  18. UserCreateRequest,
  19. UserStatusUpdateRequest,
  20. )
  21. class AuthApplicationService:
  22. def __init__(
  23. self,
  24. *,
  25. user_repository: UserRepository,
  26. role_repository: RoleRepository,
  27. assignment_repository: RoleAssignmentRepository,
  28. token_secret: str) -> None:
  29. self.user_repository = user_repository
  30. self.role_repository = role_repository
  31. self.assignment_repository = assignment_repository
  32. self.token_secret = token_secret
  33. def create_user(self, payload: UserCreateRequest) -> User:
  34. return self.user_repository.create(
  35. username=payload.username,
  36. password_hash=hash_password(payload.password) if payload.password else "",
  37. display_name=payload.display_name,
  38. email=payload.email,
  39. metadata_json=payload.metadata_json)
  40. def login(self, payload: LoginRequest) -> LoginResponse | None:
  41. user = self.user_repository.get_by_username(
  42. username=payload.username)
  43. if user is None or user.status != "active":
  44. return None
  45. if not verify_password(payload.password, user.password_hash):
  46. return None
  47. self.user_repository.touch_last_login_time(user_id=user.id)
  48. access_token, expires_time = issue_access_token(
  49. user_id=user.id,
  50. secret=self.token_secret)
  51. return LoginResponse(
  52. access_token=access_token,
  53. expires_time=expires_time,
  54. user=LoginUserResponse.from_entity(user))
  55. def verify_token(self, payload: TokenVerifyRequest) -> TokenVerifyResponse:
  56. try:
  57. token_payload = verify_access_token(
  58. payload.access_token,
  59. secret=self.token_secret)
  60. except TokenError as exc:
  61. return TokenVerifyResponse(active=False, reason=str(exc))
  62. expires_time_raw = token_payload["expires_time"]
  63. user = self.user_repository.get_by_id(
  64. user_id=token_payload["user_id"])
  65. if user is None or user.status != "active":
  66. return TokenVerifyResponse(active=False, reason="user_not_active")
  67. return TokenVerifyResponse(
  68. active=True,
  69. user_id=user.id,
  70. username=user.username,
  71. expires_time=datetime.fromisoformat(expires_time_raw.removesuffix("Z")))
  72. def list_users(self) -> list[User]:
  73. return self.user_repository.list_all()
  74. def update_user_status(self, *, user_id: str, payload: UserStatusUpdateRequest) -> User | None:
  75. return self.user_repository.update_status(
  76. user_id=user_id,
  77. status=payload.status)
  78. def create_role(self, payload: RoleCreateRequest) -> Role:
  79. return self.role_repository.create(
  80. code=payload.code,
  81. name=payload.name,
  82. description=payload.description,
  83. permissions_json=payload.permissions_json)
  84. def list_roles(self) -> list[Role]:
  85. return self.role_repository.list_all()
  86. def update_role_status(self, *, role_id: str, payload: RoleStatusUpdateRequest) -> Role | None:
  87. return self.role_repository.update_status(
  88. role_id=role_id,
  89. status=payload.status)
  90. def create_assignment(
  91. self,
  92. payload: RoleAssignmentCreateRequest) -> RoleAssignment:
  93. return self.assignment_repository.create(
  94. user_id=payload.user_id,
  95. role_id=payload.role_id,
  96. scope_type=payload.scope_type,
  97. scope_id=payload.scope_id,
  98. expires_time=payload.expires_time)
  99. def list_assignments(self, *, user_id: str) -> list[RoleAssignment]:
  100. return self.assignment_repository.list_by_user(user_id=user_id)
  101. def update_assignment_status(
  102. self,
  103. *,
  104. assignment_id: str,
  105. payload: RoleAssignmentStatusUpdateRequest) -> RoleAssignment | None:
  106. return self.assignment_repository.update_status(
  107. assignment_id=assignment_id,
  108. status=payload.status)
  109. def check_permission(self, payload: PermissionCheckRequest) -> PermissionCheckResponse:
  110. user = self.user_repository.get_by_id(
  111. user_id=payload.user_id)
  112. if user is None or user.status != "active":
  113. return PermissionCheckResponse(allowed=False, reason="user_not_active")
  114. assignments = self.assignment_repository.list_by_user(
  115. user_id=payload.user_id)
  116. matched_role_ids: list[str] = []
  117. now = datetime.utcnow()
  118. for assignment in assignments:
  119. if assignment.status != "active":
  120. continue
  121. if assignment.expires_time is not None and assignment.expires_time <= now:
  122. continue
  123. if not self._scope_matches(
  124. assignment=assignment,
  125. scope_type=payload.scope_type,
  126. scope_id=payload.scope_id):
  127. continue
  128. role = self.role_repository.get_by_id(
  129. role_id=assignment.role_id)
  130. if role is None or role.status != "active":
  131. continue
  132. if self._permission_matches(role.permissions_json, payload.permission):
  133. matched_role_ids.append(role.id)
  134. return PermissionCheckResponse(
  135. allowed=bool(matched_role_ids),
  136. reason="matched" if matched_role_ids else "permission_not_found",
  137. matched_role_ids=matched_role_ids)
  138. def _permission_matches(self, permissions: list[str], requested_permission: str) -> bool:
  139. if "*" in permissions or requested_permission in permissions:
  140. return True
  141. return any(
  142. permission.endswith(":*")
  143. and requested_permission.startswith(permission.removesuffix("*"))
  144. for permission in permissions
  145. )
  146. def _scope_matches(
  147. self,
  148. *,
  149. assignment: RoleAssignment,
  150. scope_type: str | None,
  151. scope_id: str | None) -> bool:
  152. if assignment.scope_type is None and assignment.scope_id is None:
  153. return True
  154. return assignment.scope_type == scope_type and assignment.scope_id == scope_id