| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- from datetime import datetime
- from uuid import uuid4
- from core_events import EventDeliveryStatus
- from core_shared import JSONValue
- from app.db.models import EventRecord
- from app.domain.repositories import EventRecordRepository
- from app.schemas.event import (
- EventBatchPublishRequest,
- EventDeliveryStatusUpdateRequest,
- EventPublishRequest,
- PendingEventClaimRequest,
- )
- class EventApplicationService:
- def __init__(self, *, event_repository: EventRecordRepository) -> None:
- self.event_repository = event_repository
- def publish_event(self, payload: EventPublishRequest) -> EventRecord:
- return self.event_repository.create(
- tenant_id=payload.tenant_id or "public",
- event_id=str(uuid4()),
- event_type=payload.event_type,
- source_service=payload.source_service,
- aggregate_type=payload.aggregate_type,
- aggregate_id=payload.aggregate_id,
- correlation_id=payload.correlation_id,
- causation_id=payload.causation_id,
- payload_json=payload.payload_json,
- metadata_json=payload.metadata_json,
- event_time=payload.event_time or datetime.utcnow(),
- )
- def publish_batch(self, payload: EventBatchPublishRequest) -> list[EventRecord]:
- return [self.publish_event(item) for item in payload.events]
- def list_events(
- self,
- *,
- tenant_id: str,
- event_type: str | None = None,
- source_service: str | None = None,
- aggregate_type: str | None = None,
- aggregate_id: str | None = None,
- correlation_id: str | None = None,
- status: EventDeliveryStatus | None = None,
- limit: int = 100,
- ) -> list[EventRecord]:
- return self.event_repository.list_by_scope(
- tenant_id=tenant_id,
- event_type=event_type,
- source_service=source_service,
- aggregate_type=aggregate_type,
- aggregate_id=aggregate_id,
- correlation_id=correlation_id,
- status=status,
- limit=limit,
- )
- def claim_pending_events(self, payload: PendingEventClaimRequest) -> list[EventRecord]:
- return self.event_repository.claim_pending(
- tenant_id=payload.tenant_id,
- limit=payload.limit,
- )
- def update_delivery_status(
- self,
- *,
- event_record_id: str,
- payload: EventDeliveryStatusUpdateRequest,
- ) -> EventRecord | None:
- entity = self.event_repository.get_by_id(
- tenant_id=payload.tenant_id,
- event_record_id=event_record_id,
- )
- if entity is None:
- return None
- return self.event_repository.update_delivery_status(
- event_record_id=event_record_id,
- status=payload.status,
- last_error_message=payload.last_error_message,
- )
- def build_stats(self, *, tenant_id: str) -> dict[str, JSONValue]:
- events = self.event_repository.list_by_scope(tenant_id=tenant_id, limit=500)
- by_status: dict[str, int] = {}
- by_type: dict[str, int] = {}
- for event in events:
- by_status[event.status] = by_status.get(event.status, 0) + 1
- by_type[event.event_type] = by_type.get(event.event_type, 0) + 1
- return {
- "sample_size": len(events),
- "by_status": by_status,
- "by_type": by_type,
- }
|