from __future__ import annotations import json import urllib.error import urllib.request from datetime import datetime, timedelta from typing import TYPE_CHECKING from uuid import uuid4 from sqlalchemy.orm import Session from core_shared import JSONValue, try_build_redis_client from core_shared.secrets import EncryptedSecret, SecretCipher from core_shared.task_queue import TOOL_MCP_DISCOVERY_QUEUE, TaskQueuePublisher from app.bootstrap.settings import ToolServiceSettings from app.db.models import ToolBinding, ToolCredential, ToolDefinition, ToolConnection from app.domain.repositories import ( ToolBindingRepository, ToolCredentialRepository, ToolDefinitionRepository, ToolConnectionRepository, ) from app.schemas.tool import ( McpConnectData, McpConnectRequestDto, McpDiscoverRequestDto, McpToolDto, ToolBindingCreateRequest, ToolBindingCreateRequestDto, ToolBindingDeleteRequestDto, ToolBindingDetailRequestDto, ToolBindingUpdateRequestDto, ToolCreateRequest, ToolCreateRequestDto, ToolCredentialCreateRequest, ToolCredentialCreateRequestDto, ToolCredentialDeleteRequestDto, ToolCredentialDetailRequestDto, ToolCredentialDto, ToolCredentialRevealDto, ToolCredentialUpdateRequestDto, ToolDeleteRequestDto, ToolDetailRequestDto, ToolDto, ToolUpdateRequestDto, ToolConnectionCreateRequest, ToolConnectionCreateRequestDto, ToolConnectionDetailRequestDto, ToolConnectionDto, ToolConnectionUpdateRequestDto, ) if TYPE_CHECKING: from redis import Redis class ToolApplicationService: def __init__( self, tool_definition_repository: ToolDefinitionRepository, tool_connection_repository: ToolConnectionRepository, tool_binding_repository: ToolBindingRepository, tool_credential_repository: ToolCredentialRepository, secret_cipher: SecretCipher, settings: ToolServiceSettings | None = None, redis_client: Redis | None = None, task_queue_publisher: TaskQueuePublisher | None = None) -> None: self.tool_definition_repository = tool_definition_repository self.tool_connection_repository = tool_connection_repository self.tool_binding_repository = tool_binding_repository self.tool_credential_repository = tool_credential_repository self.secret_cipher = secret_cipher self.settings = settings or ToolServiceSettings() self.redis_client = redis_client self.task_queue_publisher = task_queue_publisher def create_tool_definition(self, payload: ToolCreateRequest) -> ToolDefinition: code = payload.code or self._build_tool_code(payload.name) return self.tool_definition_repository.create( plugin_id=payload.plugin_id, code=code, name=payload.name, tool_type=payload.tool_type, description=payload.description) def list_tool_definitions(self) -> list[ToolDefinition]: return self.tool_definition_repository.list_all() def create_tool_definition_from_contract( self, payload: ToolCreateRequestDto) -> ToolDefinition: return self.create_tool_definition( ToolCreateRequest( plugin_id=payload.pluginId, name=payload.name, tool_type=payload.toolType, description=payload.description)) def delete_tool_definition_from_contract(self, payload: ToolDeleteRequestDto) -> bool: entity = self.tool_definition_repository.get_by_id(tool_id=payload.toolId) if entity is None: return False self.tool_definition_repository.delete(entity) return True def get_tool_definition_from_contract( self, payload: ToolDetailRequestDto) -> ToolDefinition | None: return self.tool_definition_repository.get_by_id(tool_id=payload.toolId) def update_tool_definition_from_contract( self, payload: ToolUpdateRequestDto) -> ToolDefinition | None: entity = self.tool_definition_repository.get_by_id(tool_id=payload.toolId) if entity is None: return None if payload.name is not None: entity.name = payload.name entity.code = self._build_tool_code(payload.name) if payload.toolType is not None: entity.tool_type = payload.toolType if payload.description is not None: entity.description = payload.description if payload.pluginId is not None: entity.plugin_id = payload.pluginId return self.tool_definition_repository.save(entity) def create_tool_connection(self, payload: ToolConnectionCreateRequest) -> ToolConnection: return self.tool_connection_repository.create( tool_id=payload.tool_id, input_schema_json=payload.input_schema_json, output_schema_json=payload.output_schema_json, invoke_config_json=payload.invoke_config_json, timeout_ms=payload.timeout_ms, retry_policy_json=payload.retry_policy_json) def list_tool_connections(self, tool_id: str | None = None) -> list[ToolConnection]: if tool_id is None: return self.tool_connection_repository.list_all() return self.tool_connection_repository.list_by_tool(tool_id=tool_id) def create_tool_connection_from_contract( self, payload: ToolConnectionCreateRequestDto) -> ToolConnection: return self.create_tool_connection( ToolConnectionCreateRequest( tool_id=payload.toolId, input_schema_json=payload.inputSchema, output_schema_json=payload.outputSchema, invoke_config_json=payload.invokeConfig, timeout_ms=payload.timeoutMs, retry_policy_json=payload.retryPolicy)) def delete_tool_connection(self, *, connection_id: str) -> bool: entity = self.tool_connection_repository.get_by_id(tool_connection_id=connection_id) if entity is None: return False self.tool_connection_repository.delete(entity) return True def get_tool_connection_from_contract( self, payload: ToolConnectionDetailRequestDto) -> ToolConnection | None: return self.tool_connection_repository.get_by_id( tool_connection_id=payload.connectionId) def update_tool_connection_from_contract( self, payload: ToolConnectionUpdateRequestDto) -> ToolConnection | None: entity = self.tool_connection_repository.get_by_id( tool_connection_id=payload.connectionId) if entity is None: return None if payload.inputSchema is not None: entity.input_schema_json = payload.inputSchema if payload.outputSchema is not None: entity.output_schema_json = payload.outputSchema if payload.invokeConfig is not None: entity.invoke_config_json = payload.invokeConfig if payload.timeoutMs is not None: entity.timeout_ms = payload.timeoutMs if payload.retryPolicy is not None: entity.retry_policy_json = payload.retryPolicy return self.tool_connection_repository.save(entity) def connect_mcp_server(self, payload: McpConnectRequestDto) -> McpConnectData: server_name, config = self._normalize_mcp_config(payload) discovered_tools = self._extract_mcp_tools(config) if discovered_tools: config["mcp_tools"] = [ tool.model_dump(mode="json", by_alias=False) for tool in discovered_tools ] job_id = f"mcpjob_{uuid4().hex}" config["mcp_status"] = self._build_mcp_status( job_id=job_id, status="queued", progress=0) tool = self.create_tool_definition( ToolCreateRequest( name=payload.name or server_name, tool_type="mcp", description=f"MCP SSE server: {config.get('url', '')}")) connection = self.create_tool_connection( ToolConnectionCreateRequest( tool_id=tool.id, input_schema_json={}, output_schema_json={}, invoke_config_json=config, timeout_ms=self._timeout_ms(config), retry_policy_json={"max_attempts": 1})) published = self._publish_mcp_discovery_job( connection_id=connection.id, job_id=job_id) if not published: config["mcp_status"] = self._build_mcp_status( job_id=job_id, status="skipped", progress=0, error_message="Redis queue is unavailable; connection check was not scheduled.") connection.invoke_config_json = config connection = self.tool_connection_repository.save(connection) return McpConnectData( tool=ToolDto.from_entity(tool), connection=ToolConnectionDto.from_entity(connection), discoveredTools=discovered_tools) def discover_mcp_connection(self, payload: McpDiscoverRequestDto) -> ToolConnection | None: return self._run_mcp_discovery( connection_id=payload.connectionId, worker_key="api-sync") def execute_mcp_discovery_job( self, *, connection_id: str, worker_key: str, lease_seconds: int, job_id: str | None = None, redis_client: Redis | None = None) -> ToolConnection | None: resolved_redis_client = redis_client or self.redis_client lock = None idempotency_store = None idempotency_key = f"{connection_id}:{job_id or 'discovery'}" if resolved_redis_client is not None: from core_shared.redis_primitives import DistributedLock, IdempotencyStore lock = DistributedLock( client=resolved_redis_client, name=f"tool-mcp-discovery:{connection_id}:lock", ttl_seconds=lease_seconds) if not lock.acquire(): return None idempotency_store = IdempotencyStore( client=resolved_redis_client, prefix="tool-mcp-discovery-idempotency") if not idempotency_store.begin(key=idempotency_key): lock.release() return None try: result = self._run_mcp_discovery( connection_id=connection_id, worker_key=worker_key, job_id=job_id) if idempotency_store is not None and result is not None: status_payload = self._read_mcp_status(result.invoke_config_json or {}) idempotency_store.complete( key=idempotency_key, result={ "connection_id": result.id, "status": str(status_payload.get("status") or ""), }) except Exception: if idempotency_store is not None: idempotency_store.clear(key=idempotency_key) raise finally: if lock is not None: lock.release() return result def execute_next_pending_mcp_discovery( self, *, worker_key: str, lease_seconds: int, stale_discovery_seconds: int, redis_client: Redis | None = None) -> ToolConnection | None: stale_before = datetime.utcnow() - timedelta(seconds=stale_discovery_seconds) pending_connections = self.tool_connection_repository.list_pending_mcp_discovery( stale_before=stale_before, limit=1) if not pending_connections: return None connection = pending_connections[0] status_payload = self._read_mcp_status(connection.invoke_config_json or {}) job_id = status_payload.get("jobId") return self.execute_mcp_discovery_job( connection_id=connection.id, job_id=job_id if isinstance(job_id, str) else None, worker_key=worker_key, lease_seconds=lease_seconds, redis_client=redis_client) def _run_mcp_discovery( self, *, connection_id: str, worker_key: str, job_id: str | None = None) -> ToolConnection | None: connection = self.tool_connection_repository.get_by_id(tool_connection_id=connection_id) if connection is None: return None config = dict(connection.invoke_config_json or {}) resolved_job_id = job_id or self._read_mcp_job_id(config) or f"mcpjob_{uuid4().hex}" config["mcp_status"] = self._build_mcp_status( job_id=resolved_job_id, status="running", progress=20, worker_key=worker_key) connection.invoke_config_json = config connection = self.tool_connection_repository.save(connection) try: discovered_tools = self._discover_mcp_tools(config) except Exception as exc: config = dict(connection.invoke_config_json or {}) config["mcp_status"] = self._build_mcp_status( job_id=resolved_job_id, status="failed", progress=100, worker_key=worker_key, error_message=str(exc)) connection.invoke_config_json = config return self.tool_connection_repository.save(connection) config = dict(connection.invoke_config_json or {}) if discovered_tools: config["mcp_tools"] = [ tool.model_dump(mode="json", by_alias=False) for tool in discovered_tools ] config["mcp_status"] = self._build_mcp_status( job_id=resolved_job_id, status="completed", progress=100, worker_key=worker_key) connection.invoke_config_json = config return self.tool_connection_repository.save(connection) def create_tool_binding(self, payload: ToolBindingCreateRequest) -> ToolBinding: if payload.credential_id is not None: credential = self.tool_credential_repository.get_by_id( credential_id=payload.credential_id) if credential is None: raise ValueError(f"tool credential not found: {payload.credential_id}") return self.tool_binding_repository.create( app_id=payload.app_id, tool_connection_id=payload.tool_connection_id, credential_id=payload.credential_id, binding_scope=payload.binding_scope, enabled=payload.enabled, config_json=payload.config_json) def create_tool_binding_from_contract( self, payload: ToolBindingCreateRequestDto) -> ToolBinding: return self.create_tool_binding( ToolBindingCreateRequest( app_id=payload.appId, tool_connection_id=payload.connectionId, credential_id=payload.credentialId, binding_scope=payload.bindingScope, enabled=True, config_json=payload.configJson)) def list_tool_bindings(self, app_id: str | None = None) -> list[ToolBinding]: return self.tool_binding_repository.list_by_scope(app_id=app_id) def delete_tool_binding(self, payload: ToolBindingDeleteRequestDto) -> bool: entity = self.tool_binding_repository.get_by_id(binding_id=payload.bindingId) if entity is None: return False self.tool_binding_repository.delete(entity) return True def get_tool_binding_from_contract( self, payload: ToolBindingDetailRequestDto) -> ToolBinding | None: return self.tool_binding_repository.get_by_id(binding_id=payload.bindingId) def update_tool_binding_from_contract( self, payload: ToolBindingUpdateRequestDto) -> ToolBinding | None: entity = self.tool_binding_repository.get_by_id(binding_id=payload.bindingId) if entity is None: return None if payload.credentialId is not None: entity.credential_id = payload.credentialId if payload.bindingScope is not None: entity.binding_scope = payload.bindingScope if payload.configJson is not None: entity.config_json = payload.configJson return self.tool_binding_repository.save(entity) def create_tool_credential(self, payload: ToolCredentialCreateRequest) -> ToolCredential: encrypted = self.secret_cipher.encrypt_json(payload.secret_json) return self.tool_credential_repository.create( name=payload.name, credential_type=payload.credential_type, encrypted_payload_text=encrypted.ciphertext, secret_fingerprint=encrypted.fingerprint, encryption_algorithm=encrypted.algorithm, metadata_json=payload.metadata_json) def create_tool_credential_from_contract( self, payload: ToolCredentialCreateRequestDto) -> ToolCredential: return self.create_tool_credential( ToolCredentialCreateRequest( name=payload.name, credential_type=payload.credentialType, secret_json=payload.secretJson, metadata_json=payload.metadataJson)) def list_tool_credentials(self) -> list[ToolCredential]: return self.tool_credential_repository.list_all() def delete_tool_credential(self, payload: ToolCredentialDeleteRequestDto) -> bool: entity = self.tool_credential_repository.get_by_id( credential_id=payload.credentialId) if entity is None: return False self.tool_credential_repository.delete(entity) return True def get_tool_credential_from_contract( self, payload: ToolCredentialDetailRequestDto) -> ToolCredential | None: return self.tool_credential_repository.get_by_id( credential_id=payload.credentialId) def update_tool_credential_from_contract( self, payload: ToolCredentialUpdateRequestDto) -> ToolCredential | None: entity = self.tool_credential_repository.get_by_id( credential_id=payload.credentialId) if entity is None: return None if payload.name is not None: entity.name = payload.name if payload.metadataJson is not None: entity.metadata_json = payload.metadataJson return self.tool_credential_repository.save(entity) def reveal_tool_credential( self, *, credential_id: str) -> tuple[ToolCredential, dict[str, JSONValue]] | None: credential = self.tool_credential_repository.get_by_id( credential_id=credential_id) if credential is None: return None secret_json = self.secret_cipher.decrypt_json( EncryptedSecret( ciphertext=credential.encrypted_payload_text, fingerprint=credential.secret_fingerprint, algorithm=credential.encryption_algorithm) ) return credential, secret_json def reveal_tool_credential_from_contract( self, *, credential_id: str) -> ToolCredentialRevealDto | None: result = self.reveal_tool_credential(credential_id=credential_id) if result is None: return None credential, secret_json = result return ToolCredentialRevealDto( credential=ToolCredentialDto.from_entity(credential), secretJson=secret_json) def get_tool_binding_detail( self, *, binding_id: str) -> tuple[ToolBinding, ToolConnection, ToolDefinition] | None: binding = self.tool_binding_repository.get_by_id(binding_id=binding_id) if binding is None: return None tool_connection = self.tool_connection_repository.get_by_id( tool_connection_id=binding.tool_connection_id) if tool_connection is None: return None tool_definition = self.tool_definition_repository.get_by_id( tool_id=tool_connection.tool_id) if tool_definition is None: return None return binding, tool_connection, tool_definition def _build_tool_code(self, name: str) -> str: base = "".join( char.lower() if char.isalnum() else "_" for char in name ).strip("_") or "tool" return base[:64] def _normalize_mcp_config( self, payload: McpConnectRequestDto) -> tuple[str, dict[str, JSONValue]]: config = payload.config if len(config) == 1: server_name, value = next(iter(config.items())) if isinstance(value, dict): normalized = {str(key): item for key, item in value.items()} normalized.setdefault("server_name", server_name) normalized.setdefault("transport", "sse") return payload.name or server_name, normalized server_name_value = config.get("server_name") or config.get("serverName") server_name = str(server_name_value or payload.name or "mcp_server") normalized = {str(key): value for key, value in config.items()} normalized.setdefault("server_name", server_name) normalized.setdefault("transport", "sse") return server_name, normalized def _extract_mcp_tools(self, config: dict[str, JSONValue]) -> list[McpToolDto]: raw_tools = config.get("mcp_tools") or config.get("tools") if not isinstance(raw_tools, list): return [] tools: list[McpToolDto] = [] for item in raw_tools: if not isinstance(item, dict): continue name = item.get("name") if not isinstance(name, str) or not name: continue description = item.get("description") input_schema = item.get("inputSchema") or item.get("input_schema") tools.append( McpToolDto( name=name, description=description if isinstance(description, str) else None, inputSchema=input_schema if isinstance(input_schema, dict) else None)) return tools def _discover_mcp_tools(self, config: dict[str, JSONValue]) -> list[McpToolDto]: self._validate_mcp_connection(config) try: return self._discover_mcp_tools_via_streamable_http(config) except Exception: return self._extract_mcp_tools(config) def _discover_mcp_tools_via_streamable_http( self, config: dict[str, JSONValue]) -> list[McpToolDto]: self._mcp_rpc( config, request_id=1, method="initialize", params={ "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": { "name": "auto-platform", "version": "0.1.0", }, }) self._mcp_rpc( config, method="notifications/initialized", params={}) result = self._mcp_rpc( config, request_id=2, method="tools/list", params={}) tools_result = result.get("result") if not isinstance(tools_result, dict): return [] raw_tools = tools_result.get("tools") if not isinstance(raw_tools, list): return [] return self._parse_mcp_tool_list(raw_tools) def _mcp_rpc( self, config: dict[str, JSONValue], *, method: str, params: dict[str, JSONValue], request_id: int | None = None) -> dict[str, JSONValue]: url = config.get("url") if not isinstance(url, str) or not url: raise ValueError("MCP server url is required") payload: dict[str, JSONValue] = { "jsonrpc": "2.0", "method": method, "params": params, } if request_id is not None: payload["id"] = request_id headers = { **self._read_mcp_headers(config), "Accept": "application/json, text/event-stream", "Content-Type": "application/json", } request = urllib.request.Request( url, data=json.dumps(payload).encode("utf-8"), headers=headers, method="POST") with urllib.request.urlopen( request, timeout=self._read_timeout_seconds(config)) as response: response_body = response.read().decode("utf-8") content_type = response.headers.get("Content-Type", "") data = self._parse_mcp_response(response_body, content_type) if not data: return {} error = data.get("error") if isinstance(error, dict): message = error.get("message") raise ValueError(str(message or "MCP JSON-RPC request failed")) return data def _parse_mcp_response( self, response_body: str, content_type: str) -> dict[str, JSONValue]: if "text/event-stream" in content_type: data_lines: list[str] = [] for line in response_body.splitlines(): if line.startswith("data:"): data_lines.append(line[5:].strip()) if not data_lines: return {} response_body = "\n".join(data_lines) parsed = json.loads(response_body) return parsed if isinstance(parsed, dict) else {} def _parse_mcp_tool_list(self, raw_tools: list[JSONValue]) -> list[McpToolDto]: tools: list[McpToolDto] = [] for item in raw_tools: if not isinstance(item, dict): continue name = item.get("name") if not isinstance(name, str) or not name: continue description = item.get("description") input_schema = item.get("inputSchema") or item.get("input_schema") tools.append( McpToolDto( name=name, description=description if isinstance(description, str) else None, inputSchema=input_schema if isinstance(input_schema, dict) else None)) return tools def _publish_mcp_discovery_job( self, *, connection_id: str, job_id: str) -> bool: if not self.settings.mcp_discovery_async_enabled or self.task_queue_publisher is None: return False return self.task_queue_publisher.publish_tool_mcp_discovery( connection_id=connection_id, job_id=job_id) def _validate_mcp_connection(self, config: dict[str, JSONValue]) -> None: url = config.get("url") if not isinstance(url, str) or not url: raise ValueError("MCP server url is required") headers = self._read_mcp_headers(config) request = urllib.request.Request( url, headers={ **headers, "Accept": headers.get("Accept", "text/event-stream"), }, method="GET") timeout = self._read_timeout_seconds(config) try: with urllib.request.urlopen(request, timeout=timeout) as response: status_code = int(getattr(response, "status", 200)) if status_code >= 500: raise ValueError(f"MCP server returned HTTP {status_code}") except urllib.error.HTTPError as exc: if exc.code >= 500: raise ValueError(f"MCP server returned HTTP {exc.code}") from exc except Exception as exc: raise ValueError(f"MCP server connection failed: {exc}") from exc def _read_mcp_headers(self, config: dict[str, JSONValue]) -> dict[str, str]: headers = config.get("headers") if not isinstance(headers, dict): return {} return { str(key): str(value) for key, value in headers.items() if isinstance(value, (str, int, float, bool)) } def _read_timeout_seconds(self, config: dict[str, JSONValue]) -> float: timeout = config.get("timeout") or config.get("timeout_seconds") if isinstance(timeout, (int, float)) and not isinstance(timeout, bool): return min(float(timeout), self.settings.mcp_discovery_timeout_seconds) return self.settings.mcp_discovery_timeout_seconds def _build_mcp_status( self, *, job_id: str, status: str, progress: int, worker_key: str | None = None, error_message: str | None = None) -> dict[str, JSONValue]: now = datetime.utcnow().isoformat() payload: dict[str, JSONValue] = { "jobId": job_id, "status": status, "progress": max(0, min(progress, 100)), "queueName": TOOL_MCP_DISCOVERY_QUEUE, "workerKey": worker_key, "errorMessage": error_message, "updatedTime": now, } if status == "queued": payload["queuedTime"] = now if status == "running": payload["startedTime"] = now if status in {"completed", "failed", "skipped"}: payload["completedTime"] = now return payload def _read_mcp_status(self, config: dict[str, JSONValue]) -> dict[str, JSONValue]: status_payload = config.get("mcp_status") if isinstance(status_payload, dict): return {str(key): value for key, value in status_payload.items()} return {} def _read_mcp_job_id(self, config: dict[str, JSONValue]) -> str | None: job_id = self._read_mcp_status(config).get("jobId") return job_id if isinstance(job_id, str) and job_id else None def _timeout_ms(self, config: dict[str, JSONValue]) -> int | None: timeout = config.get("timeout") or config.get("timeout_seconds") if isinstance(timeout, int | float): return int(timeout * 1000) return None def build_tool_application_service( *, db: Session, settings: ToolServiceSettings) -> ToolApplicationService: redis_client = try_build_redis_client(settings.redis_url) return ToolApplicationService( tool_definition_repository=ToolDefinitionRepository(db), tool_connection_repository=ToolConnectionRepository(db), tool_binding_repository=ToolBindingRepository(db), tool_credential_repository=ToolCredentialRepository(db), secret_cipher=SecretCipher(key=settings.credential_encryption_key), settings=settings, redis_client=redis_client, task_queue_publisher=( TaskQueuePublisher(client=redis_client) if redis_client is not None else None ))