services.py 3.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. from datetime import datetime
  2. from uuid import uuid4
  3. from core_events import EventDeliveryStatus
  4. from core_shared import JSONValue
  5. from app.db.models import EventRecord
  6. from app.domain.repositories import EventRecordRepository
  7. from app.schemas.event import (
  8. EventBatchPublishRequest,
  9. EventDeliveryStatusUpdateRequest,
  10. EventPublishRequest,
  11. PendingEventClaimRequest,
  12. )
  13. class EventApplicationService:
  14. def __init__(self, *, event_repository: EventRecordRepository) -> None:
  15. self.event_repository = event_repository
  16. def publish_event(self, payload: EventPublishRequest) -> EventRecord:
  17. return self.event_repository.create(
  18. tenant_id=payload.tenant_id or "public",
  19. event_id=str(uuid4()),
  20. event_type=payload.event_type,
  21. source_service=payload.source_service,
  22. aggregate_type=payload.aggregate_type,
  23. aggregate_id=payload.aggregate_id,
  24. correlation_id=payload.correlation_id,
  25. causation_id=payload.causation_id,
  26. payload_json=payload.payload_json,
  27. metadata_json=payload.metadata_json,
  28. event_time=payload.event_time or datetime.utcnow(),
  29. )
  30. def publish_batch(self, payload: EventBatchPublishRequest) -> list[EventRecord]:
  31. return [self.publish_event(item) for item in payload.events]
  32. def list_events(
  33. self,
  34. *,
  35. tenant_id: str,
  36. event_type: str | None = None,
  37. source_service: str | None = None,
  38. aggregate_type: str | None = None,
  39. aggregate_id: str | None = None,
  40. correlation_id: str | None = None,
  41. status: EventDeliveryStatus | None = None,
  42. limit: int = 100,
  43. ) -> list[EventRecord]:
  44. return self.event_repository.list_by_scope(
  45. tenant_id=tenant_id,
  46. event_type=event_type,
  47. source_service=source_service,
  48. aggregate_type=aggregate_type,
  49. aggregate_id=aggregate_id,
  50. correlation_id=correlation_id,
  51. status=status,
  52. limit=limit,
  53. )
  54. def claim_pending_events(self, payload: PendingEventClaimRequest) -> list[EventRecord]:
  55. return self.event_repository.claim_pending(
  56. tenant_id=payload.tenant_id,
  57. limit=payload.limit,
  58. )
  59. def update_delivery_status(
  60. self,
  61. *,
  62. event_record_id: str,
  63. payload: EventDeliveryStatusUpdateRequest,
  64. ) -> EventRecord | None:
  65. entity = self.event_repository.get_by_id(
  66. tenant_id=payload.tenant_id,
  67. event_record_id=event_record_id,
  68. )
  69. if entity is None:
  70. return None
  71. return self.event_repository.update_delivery_status(
  72. event_record_id=event_record_id,
  73. status=payload.status,
  74. last_error_message=payload.last_error_message,
  75. )
  76. def build_stats(self, *, tenant_id: str) -> dict[str, JSONValue]:
  77. events = self.event_repository.list_by_scope(tenant_id=tenant_id, limit=500)
  78. by_status: dict[str, int] = {}
  79. by_type: dict[str, int] = {}
  80. for event in events:
  81. by_status[event.status] = by_status.get(event.status, 0) + 1
  82. by_type[event.event_type] = by_type.get(event.event_type, 0) + 1
  83. return {
  84. "sample_size": len(events),
  85. "by_status": by_status,
  86. "by_type": by_type,
  87. }