| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- from datetime import datetime
- from typing import Annotated, TypeVar
- from core_domain import ServiceHealth
- from core_shared import try_build_redis_client
- from fastapi import APIRouter, Depends, Header, HTTPException, Request
- from sqlalchemy import text
- from sqlalchemy.orm import Session
- from app.application.services import AuthApplicationService
- from app.db.session import get_db
- from app.domain.repositories import (
- ApiKeyRepository,
- RoleAssignmentRepository,
- RolePermissionBindingRepository,
- RoleRepository,
- UserRepository,
- )
- from app.schemas.identity import (
- ApiKeyCreateData,
- ApiKeyCreateRequestDto,
- ApiKeyDto,
- ApiKeyRevokeRequest,
- ApiResponse,
- AuthMeData,
- BindingRemoveRequest,
- DeleteData,
- LoginData,
- LoginRequestDto,
- PageRequest,
- PageResult,
- PermissionCheckData,
- PermissionCheckRequestDto,
- RoleDto,
- RolePermissionBindingAddRequest,
- RolePermissionBindingDto,
- RolePermissionBindingListRequest,
- TokenVerifyData,
- TokenVerifyRequestDto,
- UserDto,
- )
- router = APIRouter()
- DbSession = Annotated[Session, Depends(get_db)]
- T = TypeVar("T")
- def get_identity_application_service(request: Request, db: DbSession) -> AuthApplicationService:
- settings = request.app.state.settings
- return AuthApplicationService(
- user_repository=UserRepository(db),
- role_repository=RoleRepository(db),
- assignment_repository=RoleAssignmentRepository(db),
- permission_binding_repository=RolePermissionBindingRepository(db),
- api_key_repository=ApiKeyRepository(db),
- token_secret=settings.credential_encryption_key,
- redis_client=try_build_redis_client(settings.redis_url),
- permission_cache_ttl_seconds=settings.permission_cache_ttl_seconds)
- IdentityServiceDep = Annotated[AuthApplicationService, Depends(get_identity_application_service)]
- AuthorizationHeader = Annotated[str | None, Header(alias="Authorization")]
- def ok(request: Request, data: T) -> ApiResponse[T]:
- return ApiResponse[T](
- data=data,
- requestId=request.headers.get("x-request-id", ""),
- serverTime=datetime.utcnow())
- def get_bearer_token(authorization: str | None) -> str:
- if not authorization:
- raise HTTPException(status_code=401, detail="missing authorization header")
- scheme, _, token = authorization.partition(" ")
- if scheme.lower() != "bearer" or not token:
- raise HTTPException(status_code=401, detail="invalid authorization header")
- return token
- @router.get("/health", response_model=ServiceHealth)
- def health_check(db: DbSession) -> ServiceHealth:
- db.execute(text("SELECT 1"))
- return ServiceHealth(service="identity-service", status="ok", database="ok")
- @router.post("/auth/login", response_model=ApiResponse[LoginData])
- def login(
- request: Request,
- payload: LoginRequestDto,
- service: IdentityServiceDep) -> ApiResponse[LoginData]:
- result = service.login(username=payload.username, password=payload.password)
- if result is None:
- raise HTTPException(status_code=401, detail="invalid username or password")
- return ok(
- request,
- LoginData(
- accessToken=result.access_token,
- expiresTime=result.expires_time,
- user=UserDto.from_entity(result.user)))
- @router.post("/auth/logout", response_model=ApiResponse[dict[str, bool]])
- def logout(
- request: Request,
- service: IdentityServiceDep,
- authorization: AuthorizationHeader = None) -> ApiResponse[dict[str, bool]]:
- access_token = get_bearer_token(authorization) if authorization else None
- return ok(request, {"ok": service.logout(access_token=access_token)})
- @router.post("/auth/tokens/verify", response_model=ApiResponse[TokenVerifyData])
- def verify_token(
- request: Request,
- payload: TokenVerifyRequestDto,
- service: IdentityServiceDep) -> ApiResponse[TokenVerifyData]:
- result = service.verify_token(access_token=payload.accessToken)
- return ok(
- request,
- TokenVerifyData(
- active=result.active,
- userId=result.user_id,
- username=result.username,
- expiresTime=result.expires_time,
- reason=result.reason))
- @router.post("/auth/me", response_model=ApiResponse[AuthMeData])
- def me(
- request: Request,
- service: IdentityServiceDep,
- authorization: AuthorizationHeader = None) -> ApiResponse[AuthMeData]:
- token = get_bearer_token(authorization)
- verified = service.verify_token(access_token=token)
- if not verified.active or verified.user_id is None:
- raise HTTPException(status_code=401, detail=verified.reason or "invalid token")
- user = service.user_repository.get_by_id(user_id=verified.user_id)
- if user is None:
- raise HTTPException(status_code=401, detail="user not found")
- assignments = service.assignment_repository.list_by_user(user_id=user.id)
- roles = []
- permissions: set[str] = set()
- for assignment in assignments:
- role = service.role_repository.get_by_id(role_id=assignment.role_id)
- if role is None:
- continue
- bindings = service.permission_binding_repository.list_all_by_role(role_id=role.id)
- roles.append(RoleDto.from_entity(role, permission_binding_count=len(bindings)))
- permissions.update(binding.permission for binding in bindings)
- return ok(
- request,
- AuthMeData(
- user=UserDto.from_entity(user),
- roles=roles,
- permissions=sorted(permissions)))
- @router.post("/users/list", response_model=ApiResponse[PageResult[UserDto]])
- def list_users(
- request: Request,
- payload: PageRequest,
- service: IdentityServiceDep) -> ApiResponse[PageResult[UserDto]]:
- items, total = service.list_users_page(
- page=payload.page,
- page_size=payload.pageSize,
- keyword=payload.keyword)
- return ok(
- request,
- PageResult[UserDto].from_items(
- items=[UserDto.from_entity(item) for item in items],
- total=total,
- page=payload.page,
- page_size=payload.pageSize))
- @router.post("/roles/list", response_model=ApiResponse[PageResult[RoleDto]])
- def list_roles(
- request: Request,
- payload: PageRequest,
- service: IdentityServiceDep) -> ApiResponse[PageResult[RoleDto]]:
- items, total = service.list_roles_page(
- page=payload.page,
- page_size=payload.pageSize,
- keyword=payload.keyword)
- binding_repo = service.permission_binding_repository
- return ok(
- request,
- PageResult[RoleDto].from_items(
- items=[
- RoleDto.from_entity(
- item,
- permission_binding_count=len(binding_repo.list_all_by_role(role_id=item.id)))
- for item in items
- ],
- total=total,
- page=payload.page,
- page_size=payload.pageSize))
- @router.post(
- "/rolePermissionBindings/list",
- response_model=ApiResponse[PageResult[RolePermissionBindingDto]])
- def list_role_permission_bindings(
- request: Request,
- payload: RolePermissionBindingListRequest,
- service: IdentityServiceDep) -> ApiResponse[PageResult[RolePermissionBindingDto]]:
- items, total = service.list_role_permission_bindings(
- role_id=payload.roleId,
- page=payload.page,
- page_size=payload.pageSize)
- return ok(
- request,
- PageResult[RolePermissionBindingDto].from_items(
- items=[RolePermissionBindingDto.from_entity(item) for item in items],
- total=total,
- page=payload.page,
- page_size=payload.pageSize))
- @router.post("/rolePermissionBindings/add", response_model=ApiResponse[RolePermissionBindingDto])
- def add_role_permission_binding(
- request: Request,
- payload: RolePermissionBindingAddRequest,
- service: IdentityServiceDep) -> ApiResponse[RolePermissionBindingDto]:
- entity = service.add_role_permission_binding(
- role_id=payload.roleId,
- permission=payload.permission,
- scope_type=payload.scopeType,
- scope_id=payload.scopeId)
- return ok(request, RolePermissionBindingDto.from_entity(entity))
- @router.post("/rolePermissionBindings/remove", response_model=ApiResponse[DeleteData])
- def remove_role_permission_binding(
- request: Request,
- payload: BindingRemoveRequest,
- service: IdentityServiceDep) -> ApiResponse[DeleteData]:
- deleted = service.remove_role_permission_binding(binding_id=payload.bindingId)
- return ok(request, DeleteData(deleted=deleted, bindingId=payload.bindingId))
- @router.post("/permissions/check", response_model=ApiResponse[PermissionCheckData])
- def check_permission(
- request: Request,
- payload: PermissionCheckRequestDto,
- service: IdentityServiceDep) -> ApiResponse[PermissionCheckData]:
- result = service.check_permission(
- user_id=payload.userId,
- permission=payload.permission,
- scope_type=payload.scopeType,
- scope_id=payload.scopeId)
- return ok(
- request,
- PermissionCheckData(
- allowed=result.allowed,
- reason=result.reason,
- matchedRoleIds=result.matched_role_ids))
- @router.post("/apiKeys/list", response_model=ApiResponse[PageResult[ApiKeyDto]])
- def list_api_keys(
- request: Request,
- payload: PageRequest,
- service: IdentityServiceDep) -> ApiResponse[PageResult[ApiKeyDto]]:
- items, total = service.list_api_keys_page(
- page=payload.page,
- page_size=payload.pageSize,
- keyword=payload.keyword)
- return ok(
- request,
- PageResult[ApiKeyDto].from_items(
- items=[ApiKeyDto.from_entity(item) for item in items],
- total=total,
- page=payload.page,
- page_size=payload.pageSize))
- @router.post("/apiKeys/create", response_model=ApiResponse[ApiKeyCreateData])
- def create_api_key(
- request: Request,
- payload: ApiKeyCreateRequestDto,
- service: IdentityServiceDep) -> ApiResponse[ApiKeyCreateData]:
- entity, secret = service.create_api_key(
- name=payload.name,
- scopes=payload.scopes,
- expires_time=payload.expiresTime)
- return ok(
- request,
- ApiKeyCreateData(
- apiKey=ApiKeyDto.from_entity(entity),
- secret=secret))
- @router.post("/apiKeys/revoke", response_model=ApiResponse[ApiKeyDto])
- def revoke_api_key(
- request: Request,
- payload: ApiKeyRevokeRequest,
- service: IdentityServiceDep) -> ApiResponse[ApiKeyDto]:
- entity = service.revoke_api_key(api_key_id=payload.apiKeyId)
- if entity is None:
- raise HTTPException(status_code=404, detail=f"api key not found: {payload.apiKeyId}")
- return ok(request, ApiKeyDto.from_entity(entity))
|