event.py 1.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. from typing import TYPE_CHECKING
  2. from core_events import EventDeliveryStatus, EventPublishContract, EventRecordContract
  3. from core_shared import JSONValue
  4. from pydantic import BaseModel, Field
  5. if TYPE_CHECKING:
  6. from app.db.models import EventRecord
  7. class EventPublishRequest(EventPublishContract):
  8. pass
  9. class EventRecordResponse(EventRecordContract):
  10. @classmethod
  11. def from_entity(cls, entity: "EventRecord") -> "EventRecordResponse":
  12. return cls.model_validate(entity, from_attributes=True)
  13. class EventDeliveryStatusUpdateRequest(BaseModel):
  14. status: EventDeliveryStatus
  15. last_error_message: str | None = None
  16. class PendingEventClaimRequest(BaseModel):
  17. limit: int = Field(default=100, ge=1, le=500)
  18. class EventBatchPublishRequest(BaseModel):
  19. events: list[EventPublishRequest] = Field(default_factory=list, min_length=1, max_length=500)
  20. class EventBatchPublishResponse(BaseModel):
  21. events: list[EventRecordResponse]
  22. count: int
  23. class EventStatsResponse(BaseModel):
  24. counts_json: dict[str, JSONValue]