from datetime import datetime from sqlalchemy import case, func, select from sqlalchemy.orm import Session from app.db.models import ApiKey, AppApiKey, AppDefinition, AppInvocationAudit, GatewayRequestAudit class GatewayRequestAuditRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, request_id: str, method: str, path: str, query_string: str | None, target_service: str | None, target_url: str | None, status_code: int | None, duration_ms: int, client_host: str | None, user_agent: str | None, error_message: str | None) -> GatewayRequestAudit: entity = GatewayRequestAudit( request_id=request_id, method=method, path=path, query_string=query_string, target_service=target_service, target_url=target_url, status_code=status_code, duration_ms=duration_ms, client_host=client_host, user_agent=user_agent, error_message=error_message) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_by_scope( self, *, request_id: str | None = None, target_service: str | None = None, limit: int = 100) -> list[GatewayRequestAudit]: stmt = select(GatewayRequestAudit) if request_id is not None: stmt = stmt.where(GatewayRequestAudit.request_id == request_id) if target_service is not None: stmt = stmt.where(GatewayRequestAudit.target_service == target_service) stmt = stmt.order_by(GatewayRequestAudit.created_time.desc()).limit(limit) return list(self.db.scalars(stmt)) def stats_by_service(self) -> list[tuple[str, int, int, float]]: target_service = func.coalesce(GatewayRequestAudit.target_service, "api-gateway") error_count = func.sum( case( (GatewayRequestAudit.status_code >= 400, 1), else_=0) ) stmt = ( select( target_service.label("target_service"), func.count(GatewayRequestAudit.id), error_count, func.avg(GatewayRequestAudit.duration_ms)) .group_by(target_service) .order_by(target_service.asc()) ) rows = self.db.execute(stmt).all() return [ ( str(row[0]), int(row[1] or 0), int(row[2] or 0), float(row[3] or 0.0)) for row in rows ] class ApiKeyRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, name: str, key_prefix: str, key_hash: str, scopes: str | None, expires_time: datetime | None) -> ApiKey: entity = ApiKey( name=name, key_prefix=key_prefix, key_hash=key_hash, status="active", scopes=scopes, expires_time=expires_time) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_all(self) -> list[ApiKey]: stmt = ( select(ApiKey) .order_by(ApiKey.created_time.desc()) ) return list(self.db.scalars(stmt)) def has_any(self) -> bool: stmt = select(ApiKey.id).limit(1) return self.db.scalar(stmt) is not None def get_by_id(self, *, api_key_id: str) -> ApiKey | None: stmt = ( select(ApiKey) .where(ApiKey.id == api_key_id) .limit(1) ) return self.db.scalar(stmt) def get_active_by_hash(self, *, key_hash: str) -> ApiKey | None: stmt = ( select(ApiKey) .where(ApiKey.key_hash == key_hash) .where(ApiKey.status == "active") .limit(1) ) return self.db.scalar(stmt) def touch_last_used_time(self, *, api_key_id: str) -> None: entity = self.db.get(ApiKey, api_key_id) if entity is None: return entity.last_used_time = datetime.utcnow() self.db.commit() def update_status( self, *, api_key_id: str, status: str) -> ApiKey | None: entity = self.get_by_id(api_key_id=api_key_id) if entity is None: return None entity.status = status self.db.commit() self.db.refresh(entity) return entity class AppDefinitionRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, code: str, name: str, target_type: str, target_id: str, description: str | None = None, owner_user_id: str | None = None, settings_json: str | None = None) -> AppDefinition: entity = AppDefinition( code=code, name=name, description=description, status="draft", target_type=target_type, target_id=target_id, owner_user_id=owner_user_id, settings_json=settings_json) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def get_by_id(self, *, app_id: str) -> AppDefinition | None: stmt = select(AppDefinition).where(AppDefinition.id == app_id).limit(1) return self.db.scalar(stmt) def get_by_code(self, *, code: str) -> AppDefinition | None: stmt = select(AppDefinition).where(AppDefinition.code == code).limit(1) return self.db.scalar(stmt) def list_all(self) -> list[AppDefinition]: stmt = select(AppDefinition).order_by(AppDefinition.created_time.desc()) return list(self.db.scalars(stmt)) def update( self, *, app_id: str, name: str | None = None, description: str | None = None, target_type: str | None = None, target_id: str | None = None, settings_json: str | None = None) -> AppDefinition | None: entity = self.get_by_id(app_id=app_id) if entity is None: return None if name is not None: entity.name = name if description is not None: entity.description = description if target_type is not None: entity.target_type = target_type if target_id is not None: entity.target_id = target_id if settings_json is not None: entity.settings_json = settings_json self.db.commit() self.db.refresh(entity) return entity def update_status(self, *, app_id: str, status: str) -> AppDefinition | None: entity = self.get_by_id(app_id=app_id) if entity is None: return None entity.status = status self.db.commit() self.db.refresh(entity) return entity class AppApiKeyRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, app_id: str, name: str, key_prefix: str, key_hash: str, scopes: str | None, expires_time: datetime | None) -> AppApiKey: entity = AppApiKey( app_id=app_id, name=name, key_prefix=key_prefix, key_hash=key_hash, status="active", scopes=scopes, expires_time=expires_time) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_by_app(self, *, app_id: str) -> list[AppApiKey]: stmt = ( select(AppApiKey) .where(AppApiKey.app_id == app_id) .order_by(AppApiKey.created_time.desc()) ) return list(self.db.scalars(stmt)) def get_by_id(self, *, api_key_id: str) -> AppApiKey | None: stmt = select(AppApiKey).where(AppApiKey.id == api_key_id).limit(1) return self.db.scalar(stmt) def get_active_by_hash(self, *, key_hash: str) -> AppApiKey | None: stmt = ( select(AppApiKey) .where(AppApiKey.key_hash == key_hash) .where(AppApiKey.status == "active") .limit(1) ) return self.db.scalar(stmt) def touch_last_used_time(self, *, api_key_id: str) -> None: entity = self.db.get(AppApiKey, api_key_id) if entity is None: return entity.last_used_time = datetime.utcnow() self.db.commit() def update_status(self, *, api_key_id: str, status: str) -> AppApiKey | None: entity = self.get_by_id(api_key_id=api_key_id) if entity is None: return None entity.status = status self.db.commit() self.db.refresh(entity) return entity class AppInvocationAuditRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, app_id: str, request_id: str, target_type: str, target_id: str, invoke_type: str, status: str, duration_ms: int, api_key_prefix: str | None = None, session_id: str | None = None, run_request_id: str | None = None, error_code: str | None = None, error_message: str | None = None, client_metadata_json: str | None = None) -> AppInvocationAudit: entity = AppInvocationAudit( app_id=app_id, api_key_prefix=api_key_prefix, request_id=request_id, session_id=session_id, run_request_id=run_request_id, target_type=target_type, target_id=target_id, invoke_type=invoke_type, status=status, duration_ms=duration_ms, error_code=error_code, error_message=error_message, client_metadata_json=client_metadata_json) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_by_app(self, *, app_id: str, limit: int = 100) -> list[AppInvocationAudit]: stmt = ( select(AppInvocationAudit) .where(AppInvocationAudit.app_id == app_id) .order_by(AppInvocationAudit.created_time.desc()) .limit(limit) ) return list(self.db.scalars(stmt))