from datetime import datetime from app.db.models import Role, RoleAssignment, User from app.domain.repositories import RoleAssignmentRepository, RoleRepository, UserRepository from app.schemas.auth import ( PermissionCheckRequest, PermissionCheckResponse, RoleAssignmentCreateRequest, RoleAssignmentStatusUpdateRequest, RoleCreateRequest, RoleStatusUpdateRequest, UserCreateRequest, UserStatusUpdateRequest, ) class AuthApplicationService: def __init__( self, *, user_repository: UserRepository, role_repository: RoleRepository, assignment_repository: RoleAssignmentRepository, ) -> None: self.user_repository = user_repository self.role_repository = role_repository self.assignment_repository = assignment_repository def create_user(self, payload: UserCreateRequest) -> User: return self.user_repository.create( tenant_id=payload.tenant_id, username=payload.username, display_name=payload.display_name, email=payload.email, metadata_json=payload.metadata_json, ) def list_users(self, *, tenant_id: str) -> list[User]: return self.user_repository.list_by_tenant(tenant_id=tenant_id) def update_user_status(self, *, user_id: str, payload: UserStatusUpdateRequest) -> User | None: return self.user_repository.update_status( tenant_id=payload.tenant_id, user_id=user_id, status=payload.status, ) def create_role(self, payload: RoleCreateRequest) -> Role: return self.role_repository.create( tenant_id=payload.tenant_id, code=payload.code, name=payload.name, description=payload.description, permissions_json=payload.permissions_json, ) def list_roles(self, *, tenant_id: str) -> list[Role]: return self.role_repository.list_by_tenant(tenant_id=tenant_id) def update_role_status(self, *, role_id: str, payload: RoleStatusUpdateRequest) -> Role | None: return self.role_repository.update_status( tenant_id=payload.tenant_id, role_id=role_id, status=payload.status, ) def create_assignment( self, payload: RoleAssignmentCreateRequest, ) -> RoleAssignment: return self.assignment_repository.create( tenant_id=payload.tenant_id, user_id=payload.user_id, role_id=payload.role_id, scope_type=payload.scope_type, scope_id=payload.scope_id, expires_time=payload.expires_time, ) def list_assignments(self, *, tenant_id: str, user_id: str) -> list[RoleAssignment]: return self.assignment_repository.list_by_user(tenant_id=tenant_id, user_id=user_id) def update_assignment_status( self, *, assignment_id: str, payload: RoleAssignmentStatusUpdateRequest, ) -> RoleAssignment | None: return self.assignment_repository.update_status( tenant_id=payload.tenant_id, assignment_id=assignment_id, status=payload.status, ) def check_permission(self, payload: PermissionCheckRequest) -> PermissionCheckResponse: user = self.user_repository.get_by_id( tenant_id=payload.tenant_id, user_id=payload.user_id, ) if user is None or user.status != "active": return PermissionCheckResponse(allowed=False, reason="user_not_active") assignments = self.assignment_repository.list_by_user( tenant_id=payload.tenant_id, user_id=payload.user_id, ) matched_role_ids: list[str] = [] now = datetime.utcnow() for assignment in assignments: if assignment.status != "active": continue if assignment.expires_time is not None and assignment.expires_time <= now: continue if not self._scope_matches( assignment=assignment, scope_type=payload.scope_type, scope_id=payload.scope_id, ): continue role = self.role_repository.get_by_id( tenant_id=payload.tenant_id, role_id=assignment.role_id, ) if role is None or role.status != "active": continue if self._permission_matches(role.permissions_json, payload.permission): matched_role_ids.append(role.id) return PermissionCheckResponse( allowed=bool(matched_role_ids), reason="matched" if matched_role_ids else "permission_not_found", matched_role_ids=matched_role_ids, ) def _permission_matches(self, permissions: list[str], requested_permission: str) -> bool: return "*" in permissions or requested_permission in permissions def _scope_matches( self, *, assignment: RoleAssignment, scope_type: str | None, scope_id: str | None, ) -> bool: if assignment.scope_type is None and assignment.scope_id is None: return True return assignment.scope_type == scope_type and assignment.scope_id == scope_id