test_observability.py 1.1 KB

12345678910111213141516171819202122232425262728293031
  1. import asyncio
  2. import httpx
  3. from fastapi import FastAPI
  4. from core_shared.observability import add_observability
  5. def test_observability_records_http_metrics() -> None:
  6. asyncio.run(_run_observability_smoke())
  7. async def _run_observability_smoke() -> None:
  8. app = FastAPI()
  9. add_observability(app, "test-service")
  10. @app.get("/health")
  11. async def health() -> dict[str, str]:
  12. return {"status": "ok"}
  13. transport = httpx.ASGITransport(app=app)
  14. async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
  15. health_response = await client.get("/health")
  16. metrics_response = await client.get("/metrics")
  17. assert health_response.status_code == 200
  18. assert metrics_response.status_code == 200
  19. assert 'agent_platform_service_info{service="test-service"} 1' in metrics_response.text
  20. assert "agent_platform_http_requests_total" in metrics_response.text
  21. assert 'method="GET"' in metrics_response.text
  22. assert 'path="/health"' in metrics_response.text
  23. assert 'status_code="200"' in metrics_response.text