from pathlib import Path from tests.conftest import ( build_fastapi_test_client, build_postgres_database_url, build_postgres_engine, prepare_known_service_import, ) class FakeRedis: def __init__(self) -> None: self.values: dict[str, object] = {} def set(self, key: str, value: object, ex: int | None = None) -> bool: self.values[key] = value return True def get(self, key: str) -> object | None: return self.values.get(key) def exists(self, key: str) -> int: return 1 if key in self.values else 0 def test_identity_bootstraps_demo_user_for_frontend_login( tmp_path: Path, monkeypatch, ) -> None: prepare_known_service_import("auth-service") from app.bootstrap.app import create_app from app.db.models import Base database_url = build_postgres_database_url(tmp_path, "demo-auth") monkeypatch.setenv("AGENT_PLATFORM_DATABASE_URL", database_url) engine = build_postgres_engine(database_url) Base.metadata.create_all(engine) app = create_app() client = build_fastapi_test_client(app) response = client.post( "/identity/auth/login", json={"username": "demo-user", "password": "demo-password"}, ) assert response.status_code == 200 payload = response.json() assert payload["data"]["accessToken"] assert payload["data"]["user"]["username"] == "demo-user" def test_identity_post_contract_supports_login_permissions_and_api_keys( tmp_path: Path, monkeypatch, ) -> None: prepare_known_service_import("auth-service") from app.bootstrap.app import create_app from app.db.models import Base from app.domain.repositories import RoleAssignmentRepository, RoleRepository, UserRepository from app.infrastructure.passwords import hash_password from core_db import create_session_factory database_url = build_postgres_database_url(tmp_path, "auth") monkeypatch.setenv("AGENT_PLATFORM_DATABASE_URL", database_url) engine = build_postgres_engine(database_url) Base.metadata.create_all(engine) app = create_app() app.state.session_factory = create_session_factory(engine) client = build_fastapi_test_client(app) session_factory = create_session_factory(engine) with session_factory() as db: user = UserRepository(db).create( username="demo", password_hash=hash_password("super-secret"), display_name="Demo User", email="demo@example.com", metadata_json={"source": "test"}, ) role = RoleRepository(db).create( code="admin", name="Admin", description="Platform admin", permissions_json=[], ) RoleAssignmentRepository(db).create( user_id=user.id, role_id=role.id, scope_type=None, scope_id=None, expires_time=None, ) user_id = user.id role_id = role.id binding_response = client.post( "/identity/rolePermissionBindings/add", json={ "roleId": role_id, "permission": "agents:*", }, ) assert binding_response.status_code == 200 binding_payload = binding_response.json() assert binding_payload["success"] is True assert binding_payload["data"]["roleId"] == role_id login_response = client.post( "/identity/auth/login", json={"username": "demo", "password": "super-secret"}, ) assert login_response.status_code == 200 login_payload = login_response.json() assert login_payload["data"]["accessToken"] assert login_payload["data"]["user"]["displayName"] == "Demo User" assert "createdTime" in login_payload["data"]["user"] token = login_payload["data"]["accessToken"] me_response = client.post( "/identity/auth/me", headers={"Authorization": f"Bearer {token}"}, json={}, ) assert me_response.status_code == 200 me_payload = me_response.json() assert me_payload["data"]["user"]["id"] == user_id assert me_payload["data"]["permissions"] == ["agents:*"] permission_response = client.post( "/identity/permissions/check", json={ "userId": user_id, "permission": "agents:create", }, ) assert permission_response.status_code == 200 assert permission_response.json()["data"]["allowed"] is True api_key_response = client.post( "/identity/apiKeys/create", json={"name": "Gateway Test Key", "scopes": "agents:read"}, ) assert api_key_response.status_code == 200 api_key_payload = api_key_response.json()["data"] assert api_key_payload["secret"].startswith("agp_") assert api_key_payload["apiKey"]["keyPrefix"] api_keys_response = client.post("/identity/apiKeys/list", json={"page": 1, "pageSize": 20}) assert api_keys_response.status_code == 200 assert api_keys_response.json()["data"]["total"] == 1 def test_identity_logout_revokes_token_with_redis_blacklist( tmp_path: Path, monkeypatch, ) -> None: prepare_known_service_import("auth-service") from app.api import identity_routes from app.bootstrap.app import create_app from app.db.models import Base from app.domain.repositories import UserRepository from app.infrastructure.passwords import hash_password from core_db import create_session_factory database_url = build_postgres_database_url(tmp_path, "auth-redis") monkeypatch.setenv("AGENT_PLATFORM_DATABASE_URL", database_url) engine = build_postgres_engine(database_url) Base.metadata.create_all(engine) fake_redis = FakeRedis() monkeypatch.setattr(identity_routes, "try_build_redis_client", lambda redis_url: fake_redis) app = create_app() app.state.session_factory = create_session_factory(engine) client = build_fastapi_test_client(app) session_factory = create_session_factory(engine) with session_factory() as db: UserRepository(db).create( username="redis-user", password_hash=hash_password("super-secret"), display_name="Redis User", email="redis@example.com", metadata_json={}, ) login_response = client.post( "/identity/auth/login", json={"username": "redis-user", "password": "super-secret"}, ) assert login_response.status_code == 200 token = login_response.json()["data"]["accessToken"] verify_before_response = client.post( "/identity/auth/tokens/verify", json={"accessToken": token}, ) assert verify_before_response.status_code == 200 assert verify_before_response.json()["data"]["active"] is True logout_response = client.post( "/identity/auth/logout", headers={"Authorization": f"Bearer {token}"}, json={}, ) assert logout_response.status_code == 200 assert logout_response.json()["data"]["ok"] is True verify_after_response = client.post( "/identity/auth/tokens/verify", json={"accessToken": token}, ) assert verify_after_response.status_code == 200 assert verify_after_response.json()["data"]["active"] is False assert verify_after_response.json()["data"]["reason"] == "token_revoked"