from datetime import datetime from typing import Annotated, TypeVar from core_domain import ServiceHealth from core_shared import try_build_redis_client from fastapi import APIRouter, Depends, Header, HTTPException, Request from sqlalchemy import text from sqlalchemy.orm import Session from app.application.services import AuthApplicationService from app.db.session import get_db from app.domain.repositories import ( ApiKeyRepository, RoleAssignmentRepository, RolePermissionBindingRepository, RoleRepository, UserRepository, ) from app.schemas.identity import ( ApiKeyCreateData, ApiKeyCreateRequestDto, ApiKeyDto, ApiKeyRevokeRequest, ApiResponse, AuthMeData, BindingRemoveRequest, DeleteData, LoginData, LoginRequestDto, PageRequest, PageResult, PermissionCheckData, PermissionCheckRequestDto, RoleDto, RolePermissionBindingAddRequest, RolePermissionBindingDto, RolePermissionBindingListRequest, TokenVerifyData, TokenVerifyRequestDto, UserDto, ) router = APIRouter() DbSession = Annotated[Session, Depends(get_db)] T = TypeVar("T") def get_identity_application_service(request: Request, db: DbSession) -> AuthApplicationService: settings = request.app.state.settings return AuthApplicationService( user_repository=UserRepository(db), role_repository=RoleRepository(db), assignment_repository=RoleAssignmentRepository(db), permission_binding_repository=RolePermissionBindingRepository(db), api_key_repository=ApiKeyRepository(db), token_secret=settings.credential_encryption_key, redis_client=try_build_redis_client(settings.redis_url), permission_cache_ttl_seconds=settings.permission_cache_ttl_seconds) IdentityServiceDep = Annotated[AuthApplicationService, Depends(get_identity_application_service)] AuthorizationHeader = Annotated[str | None, Header(alias="Authorization")] def ok(request: Request, data: T) -> ApiResponse[T]: return ApiResponse[T]( data=data, requestId=request.headers.get("x-request-id", ""), serverTime=datetime.utcnow()) def get_bearer_token(authorization: str | None) -> str: if not authorization: raise HTTPException(status_code=401, detail="missing authorization header") scheme, _, token = authorization.partition(" ") if scheme.lower() != "bearer" or not token: raise HTTPException(status_code=401, detail="invalid authorization header") return token @router.get("/health", response_model=ServiceHealth) def health_check(db: DbSession) -> ServiceHealth: db.execute(text("SELECT 1")) return ServiceHealth(service="identity-service", status="ok", database="ok") @router.post("/auth/login", response_model=ApiResponse[LoginData]) def login( request: Request, payload: LoginRequestDto, service: IdentityServiceDep) -> ApiResponse[LoginData]: result = service.login(username=payload.username, password=payload.password) if result is None: raise HTTPException(status_code=401, detail="invalid username or password") return ok( request, LoginData( accessToken=result.access_token, expiresTime=result.expires_time, user=UserDto.from_entity(result.user))) @router.post("/auth/logout", response_model=ApiResponse[dict[str, bool]]) def logout( request: Request, service: IdentityServiceDep, authorization: AuthorizationHeader = None) -> ApiResponse[dict[str, bool]]: access_token = get_bearer_token(authorization) if authorization else None return ok(request, {"ok": service.logout(access_token=access_token)}) @router.post("/auth/tokens/verify", response_model=ApiResponse[TokenVerifyData]) def verify_token( request: Request, payload: TokenVerifyRequestDto, service: IdentityServiceDep) -> ApiResponse[TokenVerifyData]: result = service.verify_token(access_token=payload.accessToken) return ok( request, TokenVerifyData( active=result.active, userId=result.user_id, username=result.username, expiresTime=result.expires_time, reason=result.reason)) @router.post("/auth/me", response_model=ApiResponse[AuthMeData]) def me( request: Request, service: IdentityServiceDep, authorization: AuthorizationHeader = None) -> ApiResponse[AuthMeData]: token = get_bearer_token(authorization) verified = service.verify_token(access_token=token) if not verified.active or verified.user_id is None: raise HTTPException(status_code=401, detail=verified.reason or "invalid token") user = service.user_repository.get_by_id(user_id=verified.user_id) if user is None: raise HTTPException(status_code=401, detail="user not found") assignments = service.assignment_repository.list_by_user(user_id=user.id) roles = [] permissions: set[str] = set() for assignment in assignments: role = service.role_repository.get_by_id(role_id=assignment.role_id) if role is None: continue bindings = service.permission_binding_repository.list_all_by_role(role_id=role.id) roles.append(RoleDto.from_entity(role, permission_binding_count=len(bindings))) permissions.update(binding.permission for binding in bindings) return ok( request, AuthMeData( user=UserDto.from_entity(user), roles=roles, permissions=sorted(permissions))) @router.post("/users/list", response_model=ApiResponse[PageResult[UserDto]]) def list_users( request: Request, payload: PageRequest, service: IdentityServiceDep) -> ApiResponse[PageResult[UserDto]]: items, total = service.list_users_page( page=payload.page, page_size=payload.pageSize, keyword=payload.keyword) return ok( request, PageResult[UserDto].from_items( items=[UserDto.from_entity(item) for item in items], total=total, page=payload.page, page_size=payload.pageSize)) @router.post("/roles/list", response_model=ApiResponse[PageResult[RoleDto]]) def list_roles( request: Request, payload: PageRequest, service: IdentityServiceDep) -> ApiResponse[PageResult[RoleDto]]: items, total = service.list_roles_page( page=payload.page, page_size=payload.pageSize, keyword=payload.keyword) binding_repo = service.permission_binding_repository return ok( request, PageResult[RoleDto].from_items( items=[ RoleDto.from_entity( item, permission_binding_count=len(binding_repo.list_all_by_role(role_id=item.id))) for item in items ], total=total, page=payload.page, page_size=payload.pageSize)) @router.post( "/rolePermissionBindings/list", response_model=ApiResponse[PageResult[RolePermissionBindingDto]]) def list_role_permission_bindings( request: Request, payload: RolePermissionBindingListRequest, service: IdentityServiceDep) -> ApiResponse[PageResult[RolePermissionBindingDto]]: items, total = service.list_role_permission_bindings( role_id=payload.roleId, page=payload.page, page_size=payload.pageSize) return ok( request, PageResult[RolePermissionBindingDto].from_items( items=[RolePermissionBindingDto.from_entity(item) for item in items], total=total, page=payload.page, page_size=payload.pageSize)) @router.post("/rolePermissionBindings/add", response_model=ApiResponse[RolePermissionBindingDto]) def add_role_permission_binding( request: Request, payload: RolePermissionBindingAddRequest, service: IdentityServiceDep) -> ApiResponse[RolePermissionBindingDto]: entity = service.add_role_permission_binding( role_id=payload.roleId, permission=payload.permission, scope_type=payload.scopeType, scope_id=payload.scopeId) return ok(request, RolePermissionBindingDto.from_entity(entity)) @router.post("/rolePermissionBindings/remove", response_model=ApiResponse[DeleteData]) def remove_role_permission_binding( request: Request, payload: BindingRemoveRequest, service: IdentityServiceDep) -> ApiResponse[DeleteData]: deleted = service.remove_role_permission_binding(binding_id=payload.bindingId) return ok(request, DeleteData(deleted=deleted, bindingId=payload.bindingId)) @router.post("/permissions/check", response_model=ApiResponse[PermissionCheckData]) def check_permission( request: Request, payload: PermissionCheckRequestDto, service: IdentityServiceDep) -> ApiResponse[PermissionCheckData]: result = service.check_permission( user_id=payload.userId, permission=payload.permission, scope_type=payload.scopeType, scope_id=payload.scopeId) return ok( request, PermissionCheckData( allowed=result.allowed, reason=result.reason, matchedRoleIds=result.matched_role_ids)) @router.post("/apiKeys/list", response_model=ApiResponse[PageResult[ApiKeyDto]]) def list_api_keys( request: Request, payload: PageRequest, service: IdentityServiceDep) -> ApiResponse[PageResult[ApiKeyDto]]: items, total = service.list_api_keys_page( page=payload.page, page_size=payload.pageSize, keyword=payload.keyword) return ok( request, PageResult[ApiKeyDto].from_items( items=[ApiKeyDto.from_entity(item) for item in items], total=total, page=payload.page, page_size=payload.pageSize)) @router.post("/apiKeys/create", response_model=ApiResponse[ApiKeyCreateData]) def create_api_key( request: Request, payload: ApiKeyCreateRequestDto, service: IdentityServiceDep) -> ApiResponse[ApiKeyCreateData]: entity, secret = service.create_api_key( name=payload.name, scopes=payload.scopes, expires_time=payload.expiresTime) return ok( request, ApiKeyCreateData( apiKey=ApiKeyDto.from_entity(entity), secret=secret)) @router.post("/apiKeys/revoke", response_model=ApiResponse[ApiKeyDto]) def revoke_api_key( request: Request, payload: ApiKeyRevokeRequest, service: IdentityServiceDep) -> ApiResponse[ApiKeyDto]: entity = service.revoke_api_key(api_key_id=payload.apiKeyId) if entity is None: raise HTTPException(status_code=404, detail=f"api key not found: {payload.apiKeyId}") return ok(request, ApiKeyDto.from_entity(entity))