from datetime import datetime from uuid import uuid4 from core_events import EventDeliveryStatus from core_shared import JSONValue from app.db.models import EventRecord from app.domain.repositories import EventRecordRepository from app.schemas.event import ( EventBatchPublishRequest, EventDeliveryStatusUpdateRequest, EventPublishRequest, PendingEventClaimRequest) class EventApplicationService: def __init__(self, *, event_repository: EventRecordRepository) -> None: self.event_repository = event_repository def publish_event(self, payload: EventPublishRequest) -> EventRecord: return self.event_repository.create( event_id=str(uuid4()), event_type=payload.event_type, source_service=payload.source_service, aggregate_type=payload.aggregate_type, aggregate_id=payload.aggregate_id, correlation_id=payload.correlation_id, causation_id=payload.causation_id, payload_json=payload.payload_json, metadata_json=payload.metadata_json, event_time=payload.event_time or datetime.utcnow()) def publish_batch(self, payload: EventBatchPublishRequest) -> list[EventRecord]: return [self.publish_event(item) for item in payload.events] def list_events( self, *, event_type: str | None = None, source_service: str | None = None, aggregate_type: str | None = None, aggregate_id: str | None = None, correlation_id: str | None = None, status: EventDeliveryStatus | None = None, limit: int = 100) -> list[EventRecord]: return self.event_repository.list_by_scope( event_type=event_type, source_service=source_service, aggregate_type=aggregate_type, aggregate_id=aggregate_id, correlation_id=correlation_id, status=status, limit=limit) def claim_pending_events(self, payload: PendingEventClaimRequest) -> list[EventRecord]: return self.event_repository.claim_pending( limit=payload.limit) def update_delivery_status( self, *, event_record_id: str, payload: EventDeliveryStatusUpdateRequest) -> EventRecord | None: entity = self.event_repository.get_by_id( event_record_id=event_record_id) if entity is None: return None return self.event_repository.update_delivery_status( event_record_id=event_record_id, status=payload.status, last_error_message=payload.last_error_message) def build_stats(self) -> dict[str, JSONValue]: events = self.event_repository.list_by_scope(limit=500) by_status: dict[str, int] = {} by_type: dict[str, int] = {} for event in events: by_status[event.status] = by_status.get(event.status, 0) + 1 by_type[event.event_type] = by_type.get(event.event_type, 0) + 1 return { "sample_size": len(events), "by_status": by_status, "by_type": by_type, }