| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- from datetime import datetime
- from uuid import uuid4
- from core_events import EventDeliveryStatus
- from core_shared import JSONValue
- from app.db.models import EventRecord
- from app.domain.repositories import EventRecordRepository
- from app.schemas.event import (
- EventBatchPublishRequest,
- EventDeliveryStatusUpdateRequest,
- EventPublishRequest,
- PendingEventClaimRequest)
- class EventApplicationService:
- def __init__(self, *, event_repository: EventRecordRepository) -> None:
- self.event_repository = event_repository
- def publish_event(self, payload: EventPublishRequest) -> EventRecord:
- return self.event_repository.create(
- event_id=str(uuid4()),
- event_type=payload.event_type,
- source_service=payload.source_service,
- aggregate_type=payload.aggregate_type,
- aggregate_id=payload.aggregate_id,
- correlation_id=payload.correlation_id,
- causation_id=payload.causation_id,
- payload_json=payload.payload_json,
- metadata_json=payload.metadata_json,
- event_time=payload.event_time or datetime.utcnow())
- def publish_batch(self, payload: EventBatchPublishRequest) -> list[EventRecord]:
- return [self.publish_event(item) for item in payload.events]
- def list_events(
- self,
- *,
- event_type: str | None = None,
- source_service: str | None = None,
- aggregate_type: str | None = None,
- aggregate_id: str | None = None,
- correlation_id: str | None = None,
- status: EventDeliveryStatus | None = None,
- limit: int = 100) -> list[EventRecord]:
- return self.event_repository.list_by_scope(
- event_type=event_type,
- source_service=source_service,
- aggregate_type=aggregate_type,
- aggregate_id=aggregate_id,
- correlation_id=correlation_id,
- status=status,
- limit=limit)
- def claim_pending_events(self, payload: PendingEventClaimRequest) -> list[EventRecord]:
- return self.event_repository.claim_pending(
- limit=payload.limit)
- def update_delivery_status(
- self,
- *,
- event_record_id: str,
- payload: EventDeliveryStatusUpdateRequest) -> EventRecord | None:
- entity = self.event_repository.get_by_id(
- event_record_id=event_record_id)
- if entity is None:
- return None
- return self.event_repository.update_delivery_status(
- event_record_id=event_record_id,
- status=payload.status,
- last_error_message=payload.last_error_message)
- def build_stats(self) -> dict[str, JSONValue]:
- events = self.event_repository.list_by_scope(limit=500)
- by_status: dict[str, int] = {}
- by_type: dict[str, int] = {}
- for event in events:
- by_status[event.status] = by_status.get(event.status, 0) + 1
- by_type[event.event_type] = by_type.get(event.event_type, 0) + 1
- return {
- "sample_size": len(events),
- "by_status": by_status,
- "by_type": by_type,
- }
|