gateway.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. from pydantic import BaseModel
  2. from datetime import datetime
  3. from typing import Literal
  4. from typing import TYPE_CHECKING
  5. if TYPE_CHECKING:
  6. from app.db.models import ApiKey, GatewayRequestAudit
  7. class DownstreamServiceHealth(BaseModel):
  8. service: str
  9. status: str
  10. url: str
  11. status_code: int | None = None
  12. error_message: str | None = None
  13. class GatewayServicesHealthResponse(BaseModel):
  14. service: str = "api-gateway"
  15. status: str
  16. downstream_services: list[DownstreamServiceHealth]
  17. class GatewayRequestAuditResponse(BaseModel):
  18. id: str
  19. tenant_id: str
  20. request_id: str
  21. method: str
  22. path: str
  23. query_string: str | None = None
  24. target_service: str | None = None
  25. target_url: str | None = None
  26. status_code: int | None = None
  27. duration_ms: int
  28. client_host: str | None = None
  29. user_agent: str | None = None
  30. error_message: str | None = None
  31. created_time: datetime
  32. @classmethod
  33. def from_entity(cls, entity: "GatewayRequestAudit") -> "GatewayRequestAuditResponse":
  34. return cls.model_validate(entity, from_attributes=True)
  35. class ApiKeyCreateRequest(BaseModel):
  36. tenant_id: str
  37. name: str
  38. scopes: str | None = None
  39. expires_time: datetime | None = None
  40. class ApiKeyCreateResponse(BaseModel):
  41. id: str
  42. tenant_id: str
  43. name: str
  44. key_prefix: str
  45. api_key: str
  46. status: str
  47. scopes: str | None = None
  48. expires_time: datetime | None = None
  49. created_time: datetime
  50. class ApiKeyResponse(BaseModel):
  51. id: str
  52. tenant_id: str
  53. name: str
  54. key_prefix: str
  55. status: str
  56. scopes: str | None = None
  57. expires_time: datetime | None = None
  58. last_used_time: datetime | None = None
  59. created_time: datetime
  60. @classmethod
  61. def from_entity(cls, entity: "ApiKey") -> "ApiKeyResponse":
  62. return cls.model_validate(entity, from_attributes=True)
  63. ApiKeyStatus = Literal["active", "disabled", "revoked"]
  64. class ApiKeyStatusUpdateRequest(BaseModel):
  65. tenant_id: str
  66. status: ApiKeyStatus