| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- from pathlib import Path
- from tests.conftest import (
- build_fastapi_test_client,
- build_postgres_database_url,
- build_postgres_engine,
- prepare_known_service_import,
- )
- class FakeRedis:
- def __init__(self) -> None:
- self.values: dict[str, object] = {}
- def set(self, key: str, value: object, ex: int | None = None) -> bool:
- self.values[key] = value
- return True
- def get(self, key: str) -> object | None:
- return self.values.get(key)
- def exists(self, key: str) -> int:
- return 1 if key in self.values else 0
- def test_identity_bootstraps_demo_user_for_frontend_login(
- tmp_path: Path,
- monkeypatch,
- ) -> None:
- prepare_known_service_import("auth-service")
- from app.bootstrap.app import create_app
- from app.db.models import Base
- database_url = build_postgres_database_url(tmp_path, "demo-auth")
- monkeypatch.setenv("AGENT_PLATFORM_DATABASE_URL", database_url)
- engine = build_postgres_engine(database_url)
- Base.metadata.create_all(engine)
- app = create_app()
- client = build_fastapi_test_client(app)
- response = client.post(
- "/identity/auth/login",
- json={"username": "demo-user", "password": "demo-password"},
- )
- assert response.status_code == 200
- payload = response.json()
- assert payload["data"]["accessToken"]
- assert payload["data"]["user"]["username"] == "demo-user"
- def test_identity_post_contract_supports_login_permissions_and_api_keys(
- tmp_path: Path,
- monkeypatch,
- ) -> None:
- prepare_known_service_import("auth-service")
- from app.bootstrap.app import create_app
- from app.db.models import Base
- from app.domain.repositories import RoleAssignmentRepository, RoleRepository, UserRepository
- from app.infrastructure.passwords import hash_password
- from core_db import create_session_factory
- database_url = build_postgres_database_url(tmp_path, "auth")
- monkeypatch.setenv("AGENT_PLATFORM_DATABASE_URL", database_url)
- engine = build_postgres_engine(database_url)
- Base.metadata.create_all(engine)
- app = create_app()
- app.state.session_factory = create_session_factory(engine)
- client = build_fastapi_test_client(app)
- session_factory = create_session_factory(engine)
- with session_factory() as db:
- user = UserRepository(db).create(
- username="demo",
- password_hash=hash_password("super-secret"),
- display_name="Demo User",
- email="demo@example.com",
- metadata_json={"source": "test"},
- )
- role = RoleRepository(db).create(
- code="admin",
- name="Admin",
- description="Platform admin",
- permissions_json=[],
- )
- RoleAssignmentRepository(db).create(
- user_id=user.id,
- role_id=role.id,
- scope_type=None,
- scope_id=None,
- expires_time=None,
- )
- user_id = user.id
- role_id = role.id
- binding_response = client.post(
- "/identity/rolePermissionBindings/add",
- json={
- "roleId": role_id,
- "permission": "agents:*",
- },
- )
- assert binding_response.status_code == 200
- binding_payload = binding_response.json()
- assert binding_payload["success"] is True
- assert binding_payload["data"]["roleId"] == role_id
- login_response = client.post(
- "/identity/auth/login",
- json={"username": "demo", "password": "super-secret"},
- )
- assert login_response.status_code == 200
- login_payload = login_response.json()
- assert login_payload["data"]["accessToken"]
- assert login_payload["data"]["user"]["displayName"] == "Demo User"
- assert "createdTime" in login_payload["data"]["user"]
- token = login_payload["data"]["accessToken"]
- me_response = client.post(
- "/identity/auth/me",
- headers={"Authorization": f"Bearer {token}"},
- json={},
- )
- assert me_response.status_code == 200
- me_payload = me_response.json()
- assert me_payload["data"]["user"]["id"] == user_id
- assert me_payload["data"]["permissions"] == ["agents:*"]
- permission_response = client.post(
- "/identity/permissions/check",
- json={
- "userId": user_id,
- "permission": "agents:create",
- },
- )
- assert permission_response.status_code == 200
- assert permission_response.json()["data"]["allowed"] is True
- api_key_response = client.post(
- "/identity/apiKeys/create",
- json={"name": "Gateway Test Key", "scopes": "agents:read"},
- )
- assert api_key_response.status_code == 200
- api_key_payload = api_key_response.json()["data"]
- assert api_key_payload["secret"].startswith("agp_")
- assert api_key_payload["apiKey"]["keyPrefix"]
- api_keys_response = client.post("/identity/apiKeys/list", json={"page": 1, "pageSize": 20})
- assert api_keys_response.status_code == 200
- assert api_keys_response.json()["data"]["total"] == 1
- def test_identity_logout_revokes_token_with_redis_blacklist(
- tmp_path: Path,
- monkeypatch,
- ) -> None:
- prepare_known_service_import("auth-service")
- from app.api import identity_routes
- from app.bootstrap.app import create_app
- from app.db.models import Base
- from app.domain.repositories import UserRepository
- from app.infrastructure.passwords import hash_password
- from core_db import create_session_factory
- database_url = build_postgres_database_url(tmp_path, "auth-redis")
- monkeypatch.setenv("AGENT_PLATFORM_DATABASE_URL", database_url)
- engine = build_postgres_engine(database_url)
- Base.metadata.create_all(engine)
- fake_redis = FakeRedis()
- monkeypatch.setattr(identity_routes, "try_build_redis_client", lambda redis_url: fake_redis)
- app = create_app()
- app.state.session_factory = create_session_factory(engine)
- client = build_fastapi_test_client(app)
- session_factory = create_session_factory(engine)
- with session_factory() as db:
- UserRepository(db).create(
- username="redis-user",
- password_hash=hash_password("super-secret"),
- display_name="Redis User",
- email="redis@example.com",
- metadata_json={},
- )
- login_response = client.post(
- "/identity/auth/login",
- json={"username": "redis-user", "password": "super-secret"},
- )
- assert login_response.status_code == 200
- token = login_response.json()["data"]["accessToken"]
- verify_before_response = client.post(
- "/identity/auth/tokens/verify",
- json={"accessToken": token},
- )
- assert verify_before_response.status_code == 200
- assert verify_before_response.json()["data"]["active"] is True
- logout_response = client.post(
- "/identity/auth/logout",
- headers={"Authorization": f"Bearer {token}"},
- json={},
- )
- assert logout_response.status_code == 200
- assert logout_response.json()["data"]["ok"] is True
- verify_after_response = client.post(
- "/identity/auth/tokens/verify",
- json={"accessToken": token},
- )
- assert verify_after_response.status_code == 200
- assert verify_after_response.json()["data"]["active"] is False
- assert verify_after_response.json()["data"]["reason"] == "token_revoked"
|