routes.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. from core_domain import ServiceHealth
  2. from core_events import EventDeliveryStatus
  3. from core_shared import error_detail
  4. from fastapi import APIRouter, Depends, HTTPException, Query
  5. from sqlalchemy import text
  6. from sqlalchemy.orm import Session
  7. from app.application.services import EventApplicationService
  8. from app.db.session import get_db
  9. from app.domain.repositories import EventRecordRepository
  10. from app.schemas.event import (
  11. EventBatchPublishRequest,
  12. EventBatchPublishResponse,
  13. EventDeliveryStatusPostRequest,
  14. EventDeliveryStatusUpdateRequest,
  15. EventListRequest,
  16. EventPublishRequest,
  17. EventRecordResponse,
  18. EventStatsResponse,
  19. PendingEventClaimRequest,
  20. )
  21. router = APIRouter()
  22. def get_event_application_service(db: Session = Depends(get_db)) -> EventApplicationService:
  23. return EventApplicationService(event_repository=EventRecordRepository(db))
  24. @router.get("/health", response_model=ServiceHealth)
  25. def health_check(db: Session = Depends(get_db)) -> ServiceHealth:
  26. db.execute(text("SELECT 1"))
  27. return ServiceHealth(service="event-service", status="ok", database="ok")
  28. @router.post("", response_model=EventRecordResponse)
  29. def publish_event(
  30. payload: EventPublishRequest,
  31. service: EventApplicationService = Depends(get_event_application_service)) -> EventRecordResponse:
  32. return EventRecordResponse.from_entity(service.publish_event(payload))
  33. @router.post("/batch", response_model=EventBatchPublishResponse)
  34. def publish_batch(
  35. payload: EventBatchPublishRequest,
  36. service: EventApplicationService = Depends(get_event_application_service)) -> EventBatchPublishResponse:
  37. events = service.publish_batch(payload)
  38. return EventBatchPublishResponse(
  39. events=[EventRecordResponse.from_entity(item) for item in events],
  40. count=len(events))
  41. @router.get("", response_model=list[EventRecordResponse])
  42. def list_events(
  43. event_type: str | None = Query(default=None),
  44. source_service: str | None = Query(default=None),
  45. aggregate_type: str | None = Query(default=None),
  46. aggregate_id: str | None = Query(default=None),
  47. correlation_id: str | None = Query(default=None),
  48. status: EventDeliveryStatus | None = Query(default=None),
  49. limit: int = Query(default=100, ge=1, le=500),
  50. service: EventApplicationService = Depends(get_event_application_service)) -> list[EventRecordResponse]:
  51. return [
  52. EventRecordResponse.from_entity(item)
  53. for item in service.list_events(
  54. event_type=event_type,
  55. source_service=source_service,
  56. aggregate_type=aggregate_type,
  57. aggregate_id=aggregate_id,
  58. correlation_id=correlation_id,
  59. status=status,
  60. limit=limit)
  61. ]
  62. @router.post("/list", response_model=list[EventRecordResponse])
  63. def list_events_post(
  64. payload: EventListRequest,
  65. service: EventApplicationService = Depends(get_event_application_service)) -> list[EventRecordResponse]:
  66. return [
  67. EventRecordResponse.from_entity(item)
  68. for item in service.list_events(
  69. event_type=payload.event_type,
  70. source_service=payload.source_service,
  71. aggregate_type=payload.aggregate_type,
  72. aggregate_id=payload.aggregate_id,
  73. correlation_id=payload.correlation_id,
  74. status=payload.status,
  75. limit=payload.limit)
  76. ]
  77. @router.post("/claim-pending", response_model=list[EventRecordResponse])
  78. def claim_pending_events(
  79. payload: PendingEventClaimRequest,
  80. service: EventApplicationService = Depends(get_event_application_service)) -> list[EventRecordResponse]:
  81. return [
  82. EventRecordResponse.from_entity(item)
  83. for item in service.claim_pending_events(payload)
  84. ]
  85. @router.post("/{event_record_id}/delivery-status", response_model=EventRecordResponse)
  86. def update_delivery_status(
  87. event_record_id: str,
  88. payload: EventDeliveryStatusUpdateRequest,
  89. service: EventApplicationService = Depends(get_event_application_service)) -> EventRecordResponse:
  90. entity = service.update_delivery_status(
  91. event_record_id=event_record_id,
  92. payload=payload)
  93. if entity is None:
  94. raise HTTPException(status_code=404, detail=error_detail("error.event.not_found", id=event_record_id))
  95. return EventRecordResponse.from_entity(entity)
  96. @router.post("/delivery-status", response_model=EventRecordResponse)
  97. def update_delivery_status_post(
  98. payload: EventDeliveryStatusPostRequest,
  99. service: EventApplicationService = Depends(get_event_application_service)) -> EventRecordResponse:
  100. entity = service.update_delivery_status(
  101. event_record_id=payload.event_record_id,
  102. payload=EventDeliveryStatusUpdateRequest(
  103. status=payload.status,
  104. last_error_message=payload.last_error_message))
  105. if entity is None:
  106. raise HTTPException(status_code=404, detail=error_detail("error.event.not_found", id=payload.event_record_id))
  107. return EventRecordResponse.from_entity(entity)
  108. @router.get("/stats", response_model=EventStatsResponse)
  109. def event_stats(
  110. service: EventApplicationService = Depends(get_event_application_service)) -> EventStatsResponse:
  111. return EventStatsResponse(
  112. counts_json=service.build_stats())
  113. @router.post("/stats", response_model=EventStatsResponse)
  114. def event_stats_post(
  115. service: EventApplicationService = Depends(get_event_application_service)) -> EventStatsResponse:
  116. return EventStatsResponse(
  117. counts_json=service.build_stats())