| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- from core_domain import ServiceHealth
- from core_events import EventDeliveryStatus
- from core_shared import error_detail
- from fastapi import APIRouter, Depends, HTTPException, Query
- from sqlalchemy import text
- from sqlalchemy.orm import Session
- from app.application.services import EventApplicationService
- from app.db.session import get_db
- from app.domain.repositories import EventRecordRepository
- from app.schemas.event import (
- EventBatchPublishRequest,
- EventBatchPublishResponse,
- EventDeliveryStatusPostRequest,
- EventDeliveryStatusUpdateRequest,
- EventListRequest,
- EventPublishRequest,
- EventRecordResponse,
- EventStatsResponse,
- PendingEventClaimRequest,
- )
- router = APIRouter()
- def get_event_application_service(db: Session = Depends(get_db)) -> EventApplicationService:
- return EventApplicationService(event_repository=EventRecordRepository(db))
- @router.get("/health", response_model=ServiceHealth)
- def health_check(db: Session = Depends(get_db)) -> ServiceHealth:
- db.execute(text("SELECT 1"))
- return ServiceHealth(service="event-service", status="ok", database="ok")
- @router.post("", response_model=EventRecordResponse)
- def publish_event(
- payload: EventPublishRequest,
- service: EventApplicationService = Depends(get_event_application_service)) -> EventRecordResponse:
- return EventRecordResponse.from_entity(service.publish_event(payload))
- @router.post("/batch", response_model=EventBatchPublishResponse)
- def publish_batch(
- payload: EventBatchPublishRequest,
- service: EventApplicationService = Depends(get_event_application_service)) -> EventBatchPublishResponse:
- events = service.publish_batch(payload)
- return EventBatchPublishResponse(
- events=[EventRecordResponse.from_entity(item) for item in events],
- count=len(events))
- @router.get("", response_model=list[EventRecordResponse])
- def list_events(
- event_type: str | None = Query(default=None),
- source_service: str | None = Query(default=None),
- aggregate_type: str | None = Query(default=None),
- aggregate_id: str | None = Query(default=None),
- correlation_id: str | None = Query(default=None),
- status: EventDeliveryStatus | None = Query(default=None),
- limit: int = Query(default=100, ge=1, le=500),
- service: EventApplicationService = Depends(get_event_application_service)) -> list[EventRecordResponse]:
- return [
- EventRecordResponse.from_entity(item)
- for item in service.list_events(
- event_type=event_type,
- source_service=source_service,
- aggregate_type=aggregate_type,
- aggregate_id=aggregate_id,
- correlation_id=correlation_id,
- status=status,
- limit=limit)
- ]
- @router.post("/list", response_model=list[EventRecordResponse])
- def list_events_post(
- payload: EventListRequest,
- service: EventApplicationService = Depends(get_event_application_service)) -> list[EventRecordResponse]:
- return [
- EventRecordResponse.from_entity(item)
- for item in service.list_events(
- event_type=payload.event_type,
- source_service=payload.source_service,
- aggregate_type=payload.aggregate_type,
- aggregate_id=payload.aggregate_id,
- correlation_id=payload.correlation_id,
- status=payload.status,
- limit=payload.limit)
- ]
- @router.post("/claim-pending", response_model=list[EventRecordResponse])
- def claim_pending_events(
- payload: PendingEventClaimRequest,
- service: EventApplicationService = Depends(get_event_application_service)) -> list[EventRecordResponse]:
- return [
- EventRecordResponse.from_entity(item)
- for item in service.claim_pending_events(payload)
- ]
- @router.post("/{event_record_id}/delivery-status", response_model=EventRecordResponse)
- def update_delivery_status(
- event_record_id: str,
- payload: EventDeliveryStatusUpdateRequest,
- service: EventApplicationService = Depends(get_event_application_service)) -> EventRecordResponse:
- entity = service.update_delivery_status(
- event_record_id=event_record_id,
- payload=payload)
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.event.not_found", id=event_record_id))
- return EventRecordResponse.from_entity(entity)
- @router.post("/delivery-status", response_model=EventRecordResponse)
- def update_delivery_status_post(
- payload: EventDeliveryStatusPostRequest,
- service: EventApplicationService = Depends(get_event_application_service)) -> EventRecordResponse:
- entity = service.update_delivery_status(
- event_record_id=payload.event_record_id,
- payload=EventDeliveryStatusUpdateRequest(
- status=payload.status,
- last_error_message=payload.last_error_message))
- if entity is None:
- raise HTTPException(status_code=404, detail=error_detail("error.event.not_found", id=payload.event_record_id))
- return EventRecordResponse.from_entity(entity)
- @router.get("/stats", response_model=EventStatsResponse)
- def event_stats(
- service: EventApplicationService = Depends(get_event_application_service)) -> EventStatsResponse:
- return EventStatsResponse(
- counts_json=service.build_stats())
- @router.post("/stats", response_model=EventStatsResponse)
- def event_stats_post(
- service: EventApplicationService = Depends(get_event_application_service)) -> EventStatsResponse:
- return EventStatsResponse(
- counts_json=service.build_stats())
|