from datetime import datetime from uuid import uuid4 from core_events import EventDeliveryStatus from core_shared import JSONValue from app.db.models import EventRecord from app.domain.repositories import EventRecordRepository from app.schemas.event import ( EventBatchPublishRequest, EventDeliveryStatusUpdateRequest, EventPublishRequest, PendingEventClaimRequest, ) class EventApplicationService: def __init__(self, *, event_repository: EventRecordRepository) -> None: self.event_repository = event_repository def publish_event(self, payload: EventPublishRequest) -> EventRecord: return self.event_repository.create( tenant_id=payload.tenant_id or "public", event_id=str(uuid4()), event_type=payload.event_type, source_service=payload.source_service, aggregate_type=payload.aggregate_type, aggregate_id=payload.aggregate_id, correlation_id=payload.correlation_id, causation_id=payload.causation_id, payload_json=payload.payload_json, metadata_json=payload.metadata_json, event_time=payload.event_time or datetime.utcnow(), ) def publish_batch(self, payload: EventBatchPublishRequest) -> list[EventRecord]: return [self.publish_event(item) for item in payload.events] def list_events( self, *, tenant_id: str, event_type: str | None = None, source_service: str | None = None, aggregate_type: str | None = None, aggregate_id: str | None = None, correlation_id: str | None = None, status: EventDeliveryStatus | None = None, limit: int = 100, ) -> list[EventRecord]: return self.event_repository.list_by_scope( tenant_id=tenant_id, event_type=event_type, source_service=source_service, aggregate_type=aggregate_type, aggregate_id=aggregate_id, correlation_id=correlation_id, status=status, limit=limit, ) def claim_pending_events(self, payload: PendingEventClaimRequest) -> list[EventRecord]: return self.event_repository.claim_pending( tenant_id=payload.tenant_id, limit=payload.limit, ) def update_delivery_status( self, *, event_record_id: str, payload: EventDeliveryStatusUpdateRequest, ) -> EventRecord | None: entity = self.event_repository.get_by_id( tenant_id=payload.tenant_id, event_record_id=event_record_id, ) if entity is None: return None return self.event_repository.update_delivery_status( event_record_id=event_record_id, status=payload.status, last_error_message=payload.last_error_message, ) def build_stats(self, *, tenant_id: str) -> dict[str, JSONValue]: events = self.event_repository.list_by_scope(tenant_id=tenant_id, limit=500) by_status: dict[str, int] = {} by_type: dict[str, int] = {} for event in events: by_status[event.status] = by_status.get(event.status, 0) + 1 by_type[event.event_type] = by_type.get(event.event_type, 0) + 1 return { "sample_size": len(events), "by_status": by_status, "by_type": by_type, }