routes.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. from fastapi import APIRouter, Depends, HTTPException, Query
  2. from sqlalchemy import text
  3. from sqlalchemy.orm import Session
  4. from core_domain import ServiceHealth
  5. from core_events import EventDeliveryStatus
  6. from app.application.services import EventApplicationService
  7. from app.db.session import get_db
  8. from app.domain.repositories import EventRecordRepository
  9. from app.schemas.event import (
  10. EventBatchPublishRequest,
  11. EventBatchPublishResponse,
  12. EventDeliveryStatusUpdateRequest,
  13. EventPublishRequest,
  14. EventRecordResponse,
  15. EventStatsResponse,
  16. PendingEventClaimRequest,
  17. )
  18. router = APIRouter()
  19. def get_event_application_service(db: Session = Depends(get_db)) -> EventApplicationService:
  20. return EventApplicationService(event_repository=EventRecordRepository(db))
  21. @router.get("/health", response_model=ServiceHealth)
  22. def health_check(db: Session = Depends(get_db)) -> ServiceHealth:
  23. db.execute(text("SELECT 1"))
  24. return ServiceHealth(service="event-service", status="ok", database="ok")
  25. @router.post("", response_model=EventRecordResponse)
  26. def publish_event(
  27. payload: EventPublishRequest,
  28. service: EventApplicationService = Depends(get_event_application_service),
  29. ) -> EventRecordResponse:
  30. return EventRecordResponse.from_entity(service.publish_event(payload))
  31. @router.post("/batch", response_model=EventBatchPublishResponse)
  32. def publish_batch(
  33. payload: EventBatchPublishRequest,
  34. service: EventApplicationService = Depends(get_event_application_service),
  35. ) -> EventBatchPublishResponse:
  36. events = service.publish_batch(payload)
  37. return EventBatchPublishResponse(
  38. events=[EventRecordResponse.from_entity(item) for item in events],
  39. count=len(events),
  40. )
  41. @router.get("", response_model=list[EventRecordResponse])
  42. def list_events(
  43. tenant_id: str = Query(...),
  44. event_type: str | None = Query(default=None),
  45. source_service: str | None = Query(default=None),
  46. aggregate_type: str | None = Query(default=None),
  47. aggregate_id: str | None = Query(default=None),
  48. correlation_id: str | None = Query(default=None),
  49. status: EventDeliveryStatus | None = Query(default=None),
  50. limit: int = Query(default=100, ge=1, le=500),
  51. service: EventApplicationService = Depends(get_event_application_service),
  52. ) -> list[EventRecordResponse]:
  53. return [
  54. EventRecordResponse.from_entity(item)
  55. for item in service.list_events(
  56. tenant_id=tenant_id,
  57. event_type=event_type,
  58. source_service=source_service,
  59. aggregate_type=aggregate_type,
  60. aggregate_id=aggregate_id,
  61. correlation_id=correlation_id,
  62. status=status,
  63. limit=limit,
  64. )
  65. ]
  66. @router.post("/claim-pending", response_model=list[EventRecordResponse])
  67. def claim_pending_events(
  68. payload: PendingEventClaimRequest,
  69. service: EventApplicationService = Depends(get_event_application_service),
  70. ) -> list[EventRecordResponse]:
  71. return [
  72. EventRecordResponse.from_entity(item)
  73. for item in service.claim_pending_events(payload)
  74. ]
  75. @router.post("/{event_record_id}/delivery-status", response_model=EventRecordResponse)
  76. def update_delivery_status(
  77. event_record_id: str,
  78. payload: EventDeliveryStatusUpdateRequest,
  79. service: EventApplicationService = Depends(get_event_application_service),
  80. ) -> EventRecordResponse:
  81. entity = service.update_delivery_status(
  82. event_record_id=event_record_id,
  83. payload=payload,
  84. )
  85. if entity is None:
  86. raise HTTPException(status_code=404, detail=f"event not found: {event_record_id}")
  87. return EventRecordResponse.from_entity(entity)
  88. @router.get("/stats", response_model=EventStatsResponse)
  89. def event_stats(
  90. tenant_id: str = Query(...),
  91. service: EventApplicationService = Depends(get_event_application_service),
  92. ) -> EventStatsResponse:
  93. return EventStatsResponse(
  94. tenant_id=tenant_id,
  95. counts_json=service.build_stats(tenant_id=tenant_id),
  96. )