| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- from datetime import datetime
- from app.db.models import Role, RoleAssignment, User
- from app.domain.repositories import RoleAssignmentRepository, RoleRepository, UserRepository
- from app.schemas.auth import (
- PermissionCheckRequest,
- PermissionCheckResponse,
- RoleAssignmentCreateRequest,
- RoleAssignmentStatusUpdateRequest,
- RoleCreateRequest,
- RoleStatusUpdateRequest,
- UserCreateRequest,
- UserStatusUpdateRequest,
- )
- class AuthApplicationService:
- def __init__(
- self,
- *,
- user_repository: UserRepository,
- role_repository: RoleRepository,
- assignment_repository: RoleAssignmentRepository,
- ) -> None:
- self.user_repository = user_repository
- self.role_repository = role_repository
- self.assignment_repository = assignment_repository
- def create_user(self, payload: UserCreateRequest) -> User:
- return self.user_repository.create(
- tenant_id=payload.tenant_id,
- username=payload.username,
- display_name=payload.display_name,
- email=payload.email,
- metadata_json=payload.metadata_json,
- )
- def list_users(self, *, tenant_id: str) -> list[User]:
- return self.user_repository.list_by_tenant(tenant_id=tenant_id)
- def update_user_status(self, *, user_id: str, payload: UserStatusUpdateRequest) -> User | None:
- return self.user_repository.update_status(
- tenant_id=payload.tenant_id,
- user_id=user_id,
- status=payload.status,
- )
- def create_role(self, payload: RoleCreateRequest) -> Role:
- return self.role_repository.create(
- tenant_id=payload.tenant_id,
- code=payload.code,
- name=payload.name,
- description=payload.description,
- permissions_json=payload.permissions_json,
- )
- def list_roles(self, *, tenant_id: str) -> list[Role]:
- return self.role_repository.list_by_tenant(tenant_id=tenant_id)
- def update_role_status(self, *, role_id: str, payload: RoleStatusUpdateRequest) -> Role | None:
- return self.role_repository.update_status(
- tenant_id=payload.tenant_id,
- role_id=role_id,
- status=payload.status,
- )
- def create_assignment(
- self,
- payload: RoleAssignmentCreateRequest,
- ) -> RoleAssignment:
- return self.assignment_repository.create(
- tenant_id=payload.tenant_id,
- user_id=payload.user_id,
- role_id=payload.role_id,
- scope_type=payload.scope_type,
- scope_id=payload.scope_id,
- expires_time=payload.expires_time,
- )
- def list_assignments(self, *, tenant_id: str, user_id: str) -> list[RoleAssignment]:
- return self.assignment_repository.list_by_user(tenant_id=tenant_id, user_id=user_id)
- def update_assignment_status(
- self,
- *,
- assignment_id: str,
- payload: RoleAssignmentStatusUpdateRequest,
- ) -> RoleAssignment | None:
- return self.assignment_repository.update_status(
- tenant_id=payload.tenant_id,
- assignment_id=assignment_id,
- status=payload.status,
- )
- def check_permission(self, payload: PermissionCheckRequest) -> PermissionCheckResponse:
- user = self.user_repository.get_by_id(
- tenant_id=payload.tenant_id,
- user_id=payload.user_id,
- )
- if user is None or user.status != "active":
- return PermissionCheckResponse(allowed=False, reason="user_not_active")
- assignments = self.assignment_repository.list_by_user(
- tenant_id=payload.tenant_id,
- user_id=payload.user_id,
- )
- matched_role_ids: list[str] = []
- now = datetime.utcnow()
- for assignment in assignments:
- if assignment.status != "active":
- continue
- if assignment.expires_time is not None and assignment.expires_time <= now:
- continue
- if not self._scope_matches(
- assignment=assignment,
- scope_type=payload.scope_type,
- scope_id=payload.scope_id,
- ):
- continue
- role = self.role_repository.get_by_id(
- tenant_id=payload.tenant_id,
- role_id=assignment.role_id,
- )
- if role is None or role.status != "active":
- continue
- if self._permission_matches(role.permissions_json, payload.permission):
- matched_role_ids.append(role.id)
- return PermissionCheckResponse(
- allowed=bool(matched_role_ids),
- reason="matched" if matched_role_ids else "permission_not_found",
- matched_role_ids=matched_role_ids,
- )
- def _permission_matches(self, permissions: list[str], requested_permission: str) -> bool:
- return "*" in permissions or requested_permission in permissions
- def _scope_matches(
- self,
- *,
- assignment: RoleAssignment,
- scope_type: str | None,
- scope_id: str | None,
- ) -> bool:
- if assignment.scope_type is None and assignment.scope_id is None:
- return True
- return assignment.scope_type == scope_type and assignment.scope_id == scope_id
|