from datetime import datetime from app.db.models import Role, RoleAssignment, User from app.domain.repositories import RoleAssignmentRepository, RoleRepository, UserRepository from app.infrastructure.passwords import hash_password, verify_password from app.infrastructure.tokens import TokenError, issue_access_token, verify_access_token from app.schemas.auth import ( LoginRequest, LoginResponse, LoginUserResponse, PermissionCheckRequest, PermissionCheckResponse, RoleAssignmentCreateRequest, RoleAssignmentStatusUpdateRequest, RoleCreateRequest, RoleStatusUpdateRequest, TokenVerifyRequest, TokenVerifyResponse, UserCreateRequest, UserStatusUpdateRequest, ) class AuthApplicationService: def __init__( self, *, user_repository: UserRepository, role_repository: RoleRepository, assignment_repository: RoleAssignmentRepository, token_secret: str) -> None: self.user_repository = user_repository self.role_repository = role_repository self.assignment_repository = assignment_repository self.token_secret = token_secret def create_user(self, payload: UserCreateRequest) -> User: return self.user_repository.create( username=payload.username, password_hash=hash_password(payload.password) if payload.password else "", display_name=payload.display_name, email=payload.email, metadata_json=payload.metadata_json) def login(self, payload: LoginRequest) -> LoginResponse | None: user = self.user_repository.get_by_username( username=payload.username) if user is None or user.status != "active": return None if not verify_password(payload.password, user.password_hash): return None self.user_repository.touch_last_login_time(user_id=user.id) access_token, expires_time = issue_access_token( user_id=user.id, secret=self.token_secret) return LoginResponse( access_token=access_token, expires_time=expires_time, user=LoginUserResponse.from_entity(user)) def verify_token(self, payload: TokenVerifyRequest) -> TokenVerifyResponse: try: token_payload = verify_access_token( payload.access_token, secret=self.token_secret) except TokenError as exc: return TokenVerifyResponse(active=False, reason=str(exc)) expires_time_raw = token_payload["expires_time"] user = self.user_repository.get_by_id( user_id=token_payload["user_id"]) if user is None or user.status != "active": return TokenVerifyResponse(active=False, reason="user_not_active") return TokenVerifyResponse( active=True, user_id=user.id, username=user.username, expires_time=datetime.fromisoformat(expires_time_raw.removesuffix("Z"))) def list_users(self) -> list[User]: return self.user_repository.list_all() def update_user_status(self, *, user_id: str, payload: UserStatusUpdateRequest) -> User | None: return self.user_repository.update_status( user_id=user_id, status=payload.status) def create_role(self, payload: RoleCreateRequest) -> Role: return self.role_repository.create( code=payload.code, name=payload.name, description=payload.description, permissions_json=payload.permissions_json) def list_roles(self) -> list[Role]: return self.role_repository.list_all() def update_role_status(self, *, role_id: str, payload: RoleStatusUpdateRequest) -> Role | None: return self.role_repository.update_status( role_id=role_id, status=payload.status) def create_assignment( self, payload: RoleAssignmentCreateRequest) -> RoleAssignment: return self.assignment_repository.create( user_id=payload.user_id, role_id=payload.role_id, scope_type=payload.scope_type, scope_id=payload.scope_id, expires_time=payload.expires_time) def list_assignments(self, *, user_id: str) -> list[RoleAssignment]: return self.assignment_repository.list_by_user(user_id=user_id) def update_assignment_status( self, *, assignment_id: str, payload: RoleAssignmentStatusUpdateRequest) -> RoleAssignment | None: return self.assignment_repository.update_status( assignment_id=assignment_id, status=payload.status) def check_permission(self, payload: PermissionCheckRequest) -> PermissionCheckResponse: user = self.user_repository.get_by_id( user_id=payload.user_id) if user is None or user.status != "active": return PermissionCheckResponse(allowed=False, reason="user_not_active") assignments = self.assignment_repository.list_by_user( user_id=payload.user_id) matched_role_ids: list[str] = [] now = datetime.utcnow() for assignment in assignments: if assignment.status != "active": continue if assignment.expires_time is not None and assignment.expires_time <= now: continue if not self._scope_matches( assignment=assignment, scope_type=payload.scope_type, scope_id=payload.scope_id): continue role = self.role_repository.get_by_id( role_id=assignment.role_id) if role is None or role.status != "active": continue if self._permission_matches(role.permissions_json, payload.permission): matched_role_ids.append(role.id) return PermissionCheckResponse( allowed=bool(matched_role_ids), reason="matched" if matched_role_ids else "permission_not_found", matched_role_ids=matched_role_ids) def _permission_matches(self, permissions: list[str], requested_permission: str) -> bool: if "*" in permissions or requested_permission in permissions: return True return any( permission.endswith(":*") and requested_permission.startswith(permission.removesuffix("*")) for permission in permissions ) def _scope_matches( self, *, assignment: RoleAssignment, scope_type: str | None, scope_id: str | None) -> bool: if assignment.scope_type is None and assignment.scope_id is None: return True return assignment.scope_type == scope_type and assignment.scope_id == scope_id