services.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. from datetime import datetime
  2. from uuid import uuid4
  3. from core_events import EventDeliveryStatus
  4. from core_shared import JSONValue
  5. from app.db.models import EventRecord
  6. from app.domain.repositories import EventRecordRepository
  7. from app.schemas.event import (
  8. EventBatchPublishRequest,
  9. EventDeliveryStatusUpdateRequest,
  10. EventPublishRequest,
  11. PendingEventClaimRequest)
  12. class EventApplicationService:
  13. def __init__(self, *, event_repository: EventRecordRepository) -> None:
  14. self.event_repository = event_repository
  15. def publish_event(self, payload: EventPublishRequest) -> EventRecord:
  16. return self.event_repository.create(
  17. event_id=str(uuid4()),
  18. event_type=payload.event_type,
  19. source_service=payload.source_service,
  20. aggregate_type=payload.aggregate_type,
  21. aggregate_id=payload.aggregate_id,
  22. correlation_id=payload.correlation_id,
  23. causation_id=payload.causation_id,
  24. payload_json=payload.payload_json,
  25. metadata_json=payload.metadata_json,
  26. event_time=payload.event_time or datetime.utcnow())
  27. def publish_batch(self, payload: EventBatchPublishRequest) -> list[EventRecord]:
  28. return [self.publish_event(item) for item in payload.events]
  29. def list_events(
  30. self,
  31. *,
  32. event_type: str | None = None,
  33. source_service: str | None = None,
  34. aggregate_type: str | None = None,
  35. aggregate_id: str | None = None,
  36. correlation_id: str | None = None,
  37. status: EventDeliveryStatus | None = None,
  38. limit: int = 100) -> list[EventRecord]:
  39. return self.event_repository.list_by_scope(
  40. event_type=event_type,
  41. source_service=source_service,
  42. aggregate_type=aggregate_type,
  43. aggregate_id=aggregate_id,
  44. correlation_id=correlation_id,
  45. status=status,
  46. limit=limit)
  47. def claim_pending_events(self, payload: PendingEventClaimRequest) -> list[EventRecord]:
  48. return self.event_repository.claim_pending(
  49. limit=payload.limit)
  50. def update_delivery_status(
  51. self,
  52. *,
  53. event_record_id: str,
  54. payload: EventDeliveryStatusUpdateRequest) -> EventRecord | None:
  55. entity = self.event_repository.get_by_id(
  56. event_record_id=event_record_id)
  57. if entity is None:
  58. return None
  59. return self.event_repository.update_delivery_status(
  60. event_record_id=event_record_id,
  61. status=payload.status,
  62. last_error_message=payload.last_error_message)
  63. def build_stats(self) -> dict[str, JSONValue]:
  64. events = self.event_repository.list_by_scope(limit=500)
  65. by_status: dict[str, int] = {}
  66. by_type: dict[str, int] = {}
  67. for event in events:
  68. by_status[event.status] = by_status.get(event.status, 0) + 1
  69. by_type[event.event_type] = by_type.get(event.event_type, 0) + 1
  70. return {
  71. "sample_size": len(events),
  72. "by_status": by_status,
  73. "by_type": by_type,
  74. }