services.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. from datetime import datetime
  2. from app.db.models import Role, RoleAssignment, User
  3. from app.domain.repositories import RoleAssignmentRepository, RoleRepository, UserRepository
  4. from app.schemas.auth import (
  5. PermissionCheckRequest,
  6. PermissionCheckResponse,
  7. RoleAssignmentCreateRequest,
  8. RoleAssignmentStatusUpdateRequest,
  9. RoleCreateRequest,
  10. RoleStatusUpdateRequest,
  11. UserCreateRequest,
  12. UserStatusUpdateRequest,
  13. )
  14. class AuthApplicationService:
  15. def __init__(
  16. self,
  17. *,
  18. user_repository: UserRepository,
  19. role_repository: RoleRepository,
  20. assignment_repository: RoleAssignmentRepository,
  21. ) -> None:
  22. self.user_repository = user_repository
  23. self.role_repository = role_repository
  24. self.assignment_repository = assignment_repository
  25. def create_user(self, payload: UserCreateRequest) -> User:
  26. return self.user_repository.create(
  27. tenant_id=payload.tenant_id,
  28. username=payload.username,
  29. display_name=payload.display_name,
  30. email=payload.email,
  31. metadata_json=payload.metadata_json,
  32. )
  33. def list_users(self, *, tenant_id: str) -> list[User]:
  34. return self.user_repository.list_by_tenant(tenant_id=tenant_id)
  35. def update_user_status(self, *, user_id: str, payload: UserStatusUpdateRequest) -> User | None:
  36. return self.user_repository.update_status(
  37. tenant_id=payload.tenant_id,
  38. user_id=user_id,
  39. status=payload.status,
  40. )
  41. def create_role(self, payload: RoleCreateRequest) -> Role:
  42. return self.role_repository.create(
  43. tenant_id=payload.tenant_id,
  44. code=payload.code,
  45. name=payload.name,
  46. description=payload.description,
  47. permissions_json=payload.permissions_json,
  48. )
  49. def list_roles(self, *, tenant_id: str) -> list[Role]:
  50. return self.role_repository.list_by_tenant(tenant_id=tenant_id)
  51. def update_role_status(self, *, role_id: str, payload: RoleStatusUpdateRequest) -> Role | None:
  52. return self.role_repository.update_status(
  53. tenant_id=payload.tenant_id,
  54. role_id=role_id,
  55. status=payload.status,
  56. )
  57. def create_assignment(
  58. self,
  59. payload: RoleAssignmentCreateRequest,
  60. ) -> RoleAssignment:
  61. return self.assignment_repository.create(
  62. tenant_id=payload.tenant_id,
  63. user_id=payload.user_id,
  64. role_id=payload.role_id,
  65. scope_type=payload.scope_type,
  66. scope_id=payload.scope_id,
  67. expires_time=payload.expires_time,
  68. )
  69. def list_assignments(self, *, tenant_id: str, user_id: str) -> list[RoleAssignment]:
  70. return self.assignment_repository.list_by_user(tenant_id=tenant_id, user_id=user_id)
  71. def update_assignment_status(
  72. self,
  73. *,
  74. assignment_id: str,
  75. payload: RoleAssignmentStatusUpdateRequest,
  76. ) -> RoleAssignment | None:
  77. return self.assignment_repository.update_status(
  78. tenant_id=payload.tenant_id,
  79. assignment_id=assignment_id,
  80. status=payload.status,
  81. )
  82. def check_permission(self, payload: PermissionCheckRequest) -> PermissionCheckResponse:
  83. user = self.user_repository.get_by_id(
  84. tenant_id=payload.tenant_id,
  85. user_id=payload.user_id,
  86. )
  87. if user is None or user.status != "active":
  88. return PermissionCheckResponse(allowed=False, reason="user_not_active")
  89. assignments = self.assignment_repository.list_by_user(
  90. tenant_id=payload.tenant_id,
  91. user_id=payload.user_id,
  92. )
  93. matched_role_ids: list[str] = []
  94. now = datetime.utcnow()
  95. for assignment in assignments:
  96. if assignment.status != "active":
  97. continue
  98. if assignment.expires_time is not None and assignment.expires_time <= now:
  99. continue
  100. if not self._scope_matches(
  101. assignment=assignment,
  102. scope_type=payload.scope_type,
  103. scope_id=payload.scope_id,
  104. ):
  105. continue
  106. role = self.role_repository.get_by_id(
  107. tenant_id=payload.tenant_id,
  108. role_id=assignment.role_id,
  109. )
  110. if role is None or role.status != "active":
  111. continue
  112. if self._permission_matches(role.permissions_json, payload.permission):
  113. matched_role_ids.append(role.id)
  114. return PermissionCheckResponse(
  115. allowed=bool(matched_role_ids),
  116. reason="matched" if matched_role_ids else "permission_not_found",
  117. matched_role_ids=matched_role_ids,
  118. )
  119. def _permission_matches(self, permissions: list[str], requested_permission: str) -> bool:
  120. return "*" in permissions or requested_permission in permissions
  121. def _scope_matches(
  122. self,
  123. *,
  124. assignment: RoleAssignment,
  125. scope_type: str | None,
  126. scope_id: str | None,
  127. ) -> bool:
  128. if assignment.scope_type is None and assignment.scope_id is None:
  129. return True
  130. return assignment.scope_type == scope_type and assignment.scope_id == scope_id