repositories.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. from sqlalchemy import select
  2. from sqlalchemy.orm import Session
  3. from app.db.models import GatewayRequestAudit
  4. class GatewayRequestAuditRepository:
  5. def __init__(self, db: Session) -> None:
  6. self.db = db
  7. def create(
  8. self,
  9. *,
  10. tenant_id: str,
  11. request_id: str,
  12. method: str,
  13. path: str,
  14. query_string: str | None,
  15. target_service: str | None,
  16. target_url: str | None,
  17. status_code: int | None,
  18. duration_ms: int,
  19. client_host: str | None,
  20. user_agent: str | None,
  21. error_message: str | None,
  22. ) -> GatewayRequestAudit:
  23. entity = GatewayRequestAudit(
  24. tenant_id=tenant_id,
  25. request_id=request_id,
  26. method=method,
  27. path=path,
  28. query_string=query_string,
  29. target_service=target_service,
  30. target_url=target_url,
  31. status_code=status_code,
  32. duration_ms=duration_ms,
  33. client_host=client_host,
  34. user_agent=user_agent,
  35. error_message=error_message,
  36. )
  37. self.db.add(entity)
  38. self.db.commit()
  39. self.db.refresh(entity)
  40. return entity
  41. def list_by_scope(
  42. self,
  43. *,
  44. tenant_id: str,
  45. request_id: str | None = None,
  46. target_service: str | None = None,
  47. limit: int = 100,
  48. ) -> list[GatewayRequestAudit]:
  49. stmt = select(GatewayRequestAudit).where(GatewayRequestAudit.tenant_id == tenant_id)
  50. if request_id is not None:
  51. stmt = stmt.where(GatewayRequestAudit.request_id == request_id)
  52. if target_service is not None:
  53. stmt = stmt.where(GatewayRequestAudit.target_service == target_service)
  54. stmt = stmt.order_by(GatewayRequestAudit.created_time.desc()).limit(limit)
  55. return list(self.db.scalars(stmt))