| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646 |
- from __future__ import annotations
- import urllib.error
- import urllib.request
- from datetime import datetime, timedelta
- from typing import TYPE_CHECKING
- from uuid import uuid4
- from sqlalchemy.orm import Session
- from core_shared import JSONValue, try_build_redis_client
- from core_shared.secrets import EncryptedSecret, SecretCipher
- from core_shared.task_queue import TOOL_MCP_DISCOVERY_QUEUE, TaskQueuePublisher
- from app.bootstrap.settings import ToolServiceSettings
- from app.db.models import ToolBinding, ToolCredential, ToolDefinition, ToolConnection
- from app.domain.repositories import (
- ToolBindingRepository,
- ToolCredentialRepository,
- ToolDefinitionRepository,
- ToolConnectionRepository,
- )
- from app.schemas.tool import (
- McpConnectData,
- McpConnectRequestDto,
- McpToolDto,
- ToolBindingCreateRequest,
- ToolBindingCreateRequestDto,
- ToolBindingDeleteRequestDto,
- ToolBindingDetailRequestDto,
- ToolBindingUpdateRequestDto,
- ToolCreateRequest,
- ToolCreateRequestDto,
- ToolCredentialCreateRequest,
- ToolCredentialCreateRequestDto,
- ToolCredentialDeleteRequestDto,
- ToolCredentialDetailRequestDto,
- ToolCredentialDto,
- ToolCredentialRevealDto,
- ToolCredentialUpdateRequestDto,
- ToolDeleteRequestDto,
- ToolDetailRequestDto,
- ToolDto,
- ToolUpdateRequestDto,
- ToolConnectionCreateRequest,
- ToolConnectionCreateRequestDto,
- ToolConnectionDetailRequestDto,
- ToolConnectionDto,
- ToolConnectionUpdateRequestDto,
- )
- if TYPE_CHECKING:
- from redis import Redis
- class ToolApplicationService:
- def __init__(
- self,
- tool_definition_repository: ToolDefinitionRepository,
- tool_connection_repository: ToolConnectionRepository,
- tool_binding_repository: ToolBindingRepository,
- tool_credential_repository: ToolCredentialRepository,
- secret_cipher: SecretCipher,
- settings: ToolServiceSettings | None = None,
- redis_client: Redis | None = None,
- task_queue_publisher: TaskQueuePublisher | None = None) -> None:
- self.tool_definition_repository = tool_definition_repository
- self.tool_connection_repository = tool_connection_repository
- self.tool_binding_repository = tool_binding_repository
- self.tool_credential_repository = tool_credential_repository
- self.secret_cipher = secret_cipher
- self.settings = settings or ToolServiceSettings()
- self.redis_client = redis_client
- self.task_queue_publisher = task_queue_publisher
- def create_tool_definition(self, payload: ToolCreateRequest) -> ToolDefinition:
- code = payload.code or self._build_tool_code(payload.name)
- return self.tool_definition_repository.create(
- plugin_id=payload.plugin_id,
- code=code,
- name=payload.name,
- tool_type=payload.tool_type,
- description=payload.description)
- def list_tool_definitions(self) -> list[ToolDefinition]:
- return self.tool_definition_repository.list_all()
- def create_tool_definition_from_contract(
- self,
- payload: ToolCreateRequestDto) -> ToolDefinition:
- return self.create_tool_definition(
- ToolCreateRequest(
- plugin_id=payload.pluginId,
- name=payload.name,
- tool_type=payload.toolType,
- description=payload.description))
- def delete_tool_definition_from_contract(self, payload: ToolDeleteRequestDto) -> bool:
- entity = self.tool_definition_repository.get_by_id(tool_id=payload.toolId)
- if entity is None:
- return False
- self.tool_definition_repository.delete(entity)
- return True
- def get_tool_definition_from_contract(
- self,
- payload: ToolDetailRequestDto) -> ToolDefinition | None:
- return self.tool_definition_repository.get_by_id(tool_id=payload.toolId)
- def update_tool_definition_from_contract(
- self,
- payload: ToolUpdateRequestDto) -> ToolDefinition | None:
- entity = self.tool_definition_repository.get_by_id(tool_id=payload.toolId)
- if entity is None:
- return None
- if payload.name is not None:
- entity.name = payload.name
- entity.code = self._build_tool_code(payload.name)
- if payload.toolType is not None:
- entity.tool_type = payload.toolType
- if payload.description is not None:
- entity.description = payload.description
- if payload.pluginId is not None:
- entity.plugin_id = payload.pluginId
- return self.tool_definition_repository.save(entity)
- def create_tool_connection(self, payload: ToolConnectionCreateRequest) -> ToolConnection:
- return self.tool_connection_repository.create(
- tool_id=payload.tool_id,
- input_schema_json=payload.input_schema_json,
- output_schema_json=payload.output_schema_json,
- invoke_config_json=payload.invoke_config_json,
- timeout_ms=payload.timeout_ms,
- retry_policy_json=payload.retry_policy_json)
- def list_tool_connections(self, tool_id: str | None = None) -> list[ToolConnection]:
- if tool_id is None:
- return self.tool_connection_repository.list_all()
- return self.tool_connection_repository.list_by_tool(tool_id=tool_id)
- def create_tool_connection_from_contract(
- self,
- payload: ToolConnectionCreateRequestDto) -> ToolConnection:
- return self.create_tool_connection(
- ToolConnectionCreateRequest(
- tool_id=payload.toolId,
- input_schema_json=payload.inputSchema,
- output_schema_json=payload.outputSchema,
- invoke_config_json=payload.invokeConfig,
- timeout_ms=payload.timeoutMs,
- retry_policy_json=payload.retryPolicy))
- def delete_tool_connection(self, *, connection_id: str) -> bool:
- entity = self.tool_connection_repository.get_by_id(tool_connection_id=connection_id)
- if entity is None:
- return False
- self.tool_connection_repository.delete(entity)
- return True
- def get_tool_connection_from_contract(
- self,
- payload: ToolConnectionDetailRequestDto) -> ToolConnection | None:
- return self.tool_connection_repository.get_by_id(
- tool_connection_id=payload.connectionId)
- def update_tool_connection_from_contract(
- self,
- payload: ToolConnectionUpdateRequestDto) -> ToolConnection | None:
- entity = self.tool_connection_repository.get_by_id(
- tool_connection_id=payload.connectionId)
- if entity is None:
- return None
- if payload.inputSchema is not None:
- entity.input_schema_json = payload.inputSchema
- if payload.outputSchema is not None:
- entity.output_schema_json = payload.outputSchema
- if payload.invokeConfig is not None:
- entity.invoke_config_json = payload.invokeConfig
- if payload.timeoutMs is not None:
- entity.timeout_ms = payload.timeoutMs
- if payload.retryPolicy is not None:
- entity.retry_policy_json = payload.retryPolicy
- return self.tool_connection_repository.save(entity)
- def connect_mcp_server(self, payload: McpConnectRequestDto) -> McpConnectData:
- server_name, config = self._normalize_mcp_config(payload)
- discovered_tools = self._extract_mcp_tools(config)
- if discovered_tools:
- config["mcp_tools"] = [
- tool.model_dump(mode="json", by_alias=False)
- for tool in discovered_tools
- ]
- job_id = f"mcpjob_{uuid4().hex}"
- config["mcp_status"] = self._build_mcp_status(
- job_id=job_id,
- status="queued",
- progress=0)
- tool = self.create_tool_definition(
- ToolCreateRequest(
- name=payload.name or server_name,
- tool_type="mcp",
- description=f"MCP SSE server: {config.get('url', '')}"))
- connection = self.create_tool_connection(
- ToolConnectionCreateRequest(
- tool_id=tool.id,
- input_schema_json={},
- output_schema_json={},
- invoke_config_json=config,
- timeout_ms=self._timeout_ms(config),
- retry_policy_json={"max_attempts": 1}))
- published = self._publish_mcp_discovery_job(
- connection_id=connection.id,
- job_id=job_id)
- if not published:
- config["mcp_status"] = self._build_mcp_status(
- job_id=job_id,
- status="skipped",
- progress=0,
- error_message="Redis queue is unavailable; connection check was not scheduled.")
- connection.invoke_config_json = config
- connection = self.tool_connection_repository.save(connection)
- return McpConnectData(
- tool=ToolDto.from_entity(tool),
- connection=ToolConnectionDto.from_entity(connection),
- discoveredTools=discovered_tools)
- def execute_mcp_discovery_job(
- self,
- *,
- connection_id: str,
- worker_key: str,
- lease_seconds: int,
- job_id: str | None = None,
- redis_client: Redis | None = None) -> ToolConnection | None:
- resolved_redis_client = redis_client or self.redis_client
- lock = None
- idempotency_store = None
- idempotency_key = f"{connection_id}:{job_id or 'discovery'}"
- if resolved_redis_client is not None:
- from core_shared.redis_primitives import DistributedLock, IdempotencyStore
- lock = DistributedLock(
- client=resolved_redis_client,
- name=f"tool-mcp-discovery:{connection_id}:lock",
- ttl_seconds=lease_seconds)
- if not lock.acquire():
- return None
- idempotency_store = IdempotencyStore(
- client=resolved_redis_client,
- prefix="tool-mcp-discovery-idempotency")
- if not idempotency_store.begin(key=idempotency_key):
- lock.release()
- return None
- try:
- result = self._run_mcp_discovery(
- connection_id=connection_id,
- worker_key=worker_key,
- job_id=job_id)
- if idempotency_store is not None and result is not None:
- status_payload = self._read_mcp_status(result.invoke_config_json or {})
- idempotency_store.complete(
- key=idempotency_key,
- result={
- "connection_id": result.id,
- "status": str(status_payload.get("status") or ""),
- })
- except Exception:
- if idempotency_store is not None:
- idempotency_store.clear(key=idempotency_key)
- raise
- finally:
- if lock is not None:
- lock.release()
- return result
- def execute_next_pending_mcp_discovery(
- self,
- *,
- worker_key: str,
- lease_seconds: int,
- stale_discovery_seconds: int,
- redis_client: Redis | None = None) -> ToolConnection | None:
- stale_before = datetime.utcnow() - timedelta(seconds=stale_discovery_seconds)
- pending_connections = self.tool_connection_repository.list_pending_mcp_discovery(
- stale_before=stale_before,
- limit=1)
- if not pending_connections:
- return None
- connection = pending_connections[0]
- status_payload = self._read_mcp_status(connection.invoke_config_json or {})
- job_id = status_payload.get("jobId")
- return self.execute_mcp_discovery_job(
- connection_id=connection.id,
- job_id=job_id if isinstance(job_id, str) else None,
- worker_key=worker_key,
- lease_seconds=lease_seconds,
- redis_client=redis_client)
- def _run_mcp_discovery(
- self,
- *,
- connection_id: str,
- worker_key: str,
- job_id: str | None = None) -> ToolConnection | None:
- connection = self.tool_connection_repository.get_by_id(tool_connection_id=connection_id)
- if connection is None:
- return None
- config = dict(connection.invoke_config_json or {})
- resolved_job_id = job_id or self._read_mcp_job_id(config) or f"mcpjob_{uuid4().hex}"
- config["mcp_status"] = self._build_mcp_status(
- job_id=resolved_job_id,
- status="running",
- progress=20,
- worker_key=worker_key)
- connection.invoke_config_json = config
- connection = self.tool_connection_repository.save(connection)
- try:
- self._validate_mcp_connection(config)
- except Exception as exc:
- config = dict(connection.invoke_config_json or {})
- config["mcp_status"] = self._build_mcp_status(
- job_id=resolved_job_id,
- status="failed",
- progress=100,
- worker_key=worker_key,
- error_message=str(exc))
- connection.invoke_config_json = config
- return self.tool_connection_repository.save(connection)
- config = dict(connection.invoke_config_json or {})
- config["mcp_status"] = self._build_mcp_status(
- job_id=resolved_job_id,
- status="completed",
- progress=100,
- worker_key=worker_key)
- connection.invoke_config_json = config
- return self.tool_connection_repository.save(connection)
- def create_tool_binding(self, payload: ToolBindingCreateRequest) -> ToolBinding:
- if payload.credential_id is not None:
- credential = self.tool_credential_repository.get_by_id(
- credential_id=payload.credential_id)
- if credential is None:
- raise ValueError(f"tool credential not found: {payload.credential_id}")
- return self.tool_binding_repository.create(
- app_id=payload.app_id,
- tool_connection_id=payload.tool_connection_id,
- credential_id=payload.credential_id,
- binding_scope=payload.binding_scope,
- enabled=payload.enabled,
- config_json=payload.config_json)
- def create_tool_binding_from_contract(
- self,
- payload: ToolBindingCreateRequestDto) -> ToolBinding:
- return self.create_tool_binding(
- ToolBindingCreateRequest(
- app_id=payload.appId,
- tool_connection_id=payload.connectionId,
- credential_id=payload.credentialId,
- binding_scope=payload.bindingScope,
- enabled=True,
- config_json=payload.configJson))
- def list_tool_bindings(self, app_id: str | None = None) -> list[ToolBinding]:
- return self.tool_binding_repository.list_by_scope(app_id=app_id)
- def delete_tool_binding(self, payload: ToolBindingDeleteRequestDto) -> bool:
- entity = self.tool_binding_repository.get_by_id(binding_id=payload.bindingId)
- if entity is None:
- return False
- self.tool_binding_repository.delete(entity)
- return True
- def get_tool_binding_from_contract(
- self,
- payload: ToolBindingDetailRequestDto) -> ToolBinding | None:
- return self.tool_binding_repository.get_by_id(binding_id=payload.bindingId)
- def update_tool_binding_from_contract(
- self,
- payload: ToolBindingUpdateRequestDto) -> ToolBinding | None:
- entity = self.tool_binding_repository.get_by_id(binding_id=payload.bindingId)
- if entity is None:
- return None
- if payload.credentialId is not None:
- entity.credential_id = payload.credentialId
- if payload.bindingScope is not None:
- entity.binding_scope = payload.bindingScope
- if payload.configJson is not None:
- entity.config_json = payload.configJson
- return self.tool_binding_repository.save(entity)
- def create_tool_credential(self, payload: ToolCredentialCreateRequest) -> ToolCredential:
- encrypted = self.secret_cipher.encrypt_json(payload.secret_json)
- return self.tool_credential_repository.create(
- name=payload.name,
- credential_type=payload.credential_type,
- encrypted_payload_text=encrypted.ciphertext,
- secret_fingerprint=encrypted.fingerprint,
- encryption_algorithm=encrypted.algorithm,
- metadata_json=payload.metadata_json)
- def create_tool_credential_from_contract(
- self,
- payload: ToolCredentialCreateRequestDto) -> ToolCredential:
- return self.create_tool_credential(
- ToolCredentialCreateRequest(
- name=payload.name,
- credential_type=payload.credentialType,
- secret_json=payload.secretJson,
- metadata_json=payload.metadataJson))
- def list_tool_credentials(self) -> list[ToolCredential]:
- return self.tool_credential_repository.list_all()
- def delete_tool_credential(self, payload: ToolCredentialDeleteRequestDto) -> bool:
- entity = self.tool_credential_repository.get_by_id(
- credential_id=payload.credentialId)
- if entity is None:
- return False
- self.tool_credential_repository.delete(entity)
- return True
- def get_tool_credential_from_contract(
- self,
- payload: ToolCredentialDetailRequestDto) -> ToolCredential | None:
- return self.tool_credential_repository.get_by_id(
- credential_id=payload.credentialId)
- def update_tool_credential_from_contract(
- self,
- payload: ToolCredentialUpdateRequestDto) -> ToolCredential | None:
- entity = self.tool_credential_repository.get_by_id(
- credential_id=payload.credentialId)
- if entity is None:
- return None
- if payload.name is not None:
- entity.name = payload.name
- if payload.metadataJson is not None:
- entity.metadata_json = payload.metadataJson
- return self.tool_credential_repository.save(entity)
- def reveal_tool_credential(
- self,
- *,
- credential_id: str) -> tuple[ToolCredential, dict[str, JSONValue]] | None:
- credential = self.tool_credential_repository.get_by_id(
- credential_id=credential_id)
- if credential is None:
- return None
- secret_json = self.secret_cipher.decrypt_json(
- EncryptedSecret(
- ciphertext=credential.encrypted_payload_text,
- fingerprint=credential.secret_fingerprint,
- algorithm=credential.encryption_algorithm)
- )
- return credential, secret_json
- def reveal_tool_credential_from_contract(
- self,
- *,
- credential_id: str) -> ToolCredentialRevealDto | None:
- result = self.reveal_tool_credential(credential_id=credential_id)
- if result is None:
- return None
- credential, secret_json = result
- return ToolCredentialRevealDto(
- credential=ToolCredentialDto.from_entity(credential),
- secretJson=secret_json)
- def get_tool_binding_detail(
- self,
- *,
- binding_id: str) -> tuple[ToolBinding, ToolConnection, ToolDefinition] | None:
- binding = self.tool_binding_repository.get_by_id(binding_id=binding_id)
- if binding is None:
- return None
- tool_connection = self.tool_connection_repository.get_by_id(
- tool_connection_id=binding.tool_connection_id)
- if tool_connection is None:
- return None
- tool_definition = self.tool_definition_repository.get_by_id(
- tool_id=tool_connection.tool_id)
- if tool_definition is None:
- return None
- return binding, tool_connection, tool_definition
- def _build_tool_code(self, name: str) -> str:
- base = "".join(
- char.lower() if char.isalnum() else "_"
- for char in name
- ).strip("_") or "tool"
- return base[:64]
- def _normalize_mcp_config(
- self,
- payload: McpConnectRequestDto) -> tuple[str, dict[str, JSONValue]]:
- config = payload.config
- if len(config) == 1:
- server_name, value = next(iter(config.items()))
- if isinstance(value, dict):
- normalized = {str(key): item for key, item in value.items()}
- normalized.setdefault("server_name", server_name)
- normalized.setdefault("transport", "sse")
- return payload.name or server_name, normalized
- server_name_value = config.get("server_name") or config.get("serverName")
- server_name = str(server_name_value or payload.name or "mcp_server")
- normalized = {str(key): value for key, value in config.items()}
- normalized.setdefault("server_name", server_name)
- normalized.setdefault("transport", "sse")
- return server_name, normalized
- def _extract_mcp_tools(self, config: dict[str, JSONValue]) -> list[McpToolDto]:
- raw_tools = config.get("mcp_tools") or config.get("tools")
- if not isinstance(raw_tools, list):
- return []
- tools: list[McpToolDto] = []
- for item in raw_tools:
- if not isinstance(item, dict):
- continue
- name = item.get("name")
- if not isinstance(name, str) or not name:
- continue
- description = item.get("description")
- input_schema = item.get("inputSchema") or item.get("input_schema")
- tools.append(
- McpToolDto(
- name=name,
- description=description if isinstance(description, str) else None,
- inputSchema=input_schema if isinstance(input_schema, dict) else None))
- return tools
- def _publish_mcp_discovery_job(
- self,
- *,
- connection_id: str,
- job_id: str) -> bool:
- if not self.settings.mcp_discovery_async_enabled or self.task_queue_publisher is None:
- return False
- return self.task_queue_publisher.publish_tool_mcp_discovery(
- connection_id=connection_id,
- job_id=job_id)
- def _validate_mcp_connection(self, config: dict[str, JSONValue]) -> None:
- url = config.get("url")
- if not isinstance(url, str) or not url:
- raise ValueError("MCP server url is required")
- headers = self._read_mcp_headers(config)
- request = urllib.request.Request(
- url,
- headers={
- **headers,
- "Accept": headers.get("Accept", "text/event-stream"),
- },
- method="GET")
- timeout = self._read_timeout_seconds(config)
- try:
- with urllib.request.urlopen(request, timeout=timeout) as response:
- status_code = int(getattr(response, "status", 200))
- if status_code >= 500:
- raise ValueError(f"MCP server returned HTTP {status_code}")
- except urllib.error.HTTPError as exc:
- if exc.code >= 500:
- raise ValueError(f"MCP server returned HTTP {exc.code}") from exc
- except Exception as exc:
- raise ValueError(f"MCP server connection failed: {exc}") from exc
- def _read_mcp_headers(self, config: dict[str, JSONValue]) -> dict[str, str]:
- headers = config.get("headers")
- if not isinstance(headers, dict):
- return {}
- return {
- str(key): str(value)
- for key, value in headers.items()
- if isinstance(value, (str, int, float, bool))
- }
- def _read_timeout_seconds(self, config: dict[str, JSONValue]) -> float:
- timeout = config.get("timeout") or config.get("timeout_seconds")
- if isinstance(timeout, (int, float)) and not isinstance(timeout, bool):
- return min(float(timeout), self.settings.mcp_discovery_timeout_seconds)
- return self.settings.mcp_discovery_timeout_seconds
- def _build_mcp_status(
- self,
- *,
- job_id: str,
- status: str,
- progress: int,
- worker_key: str | None = None,
- error_message: str | None = None) -> dict[str, JSONValue]:
- now = datetime.utcnow().isoformat()
- payload: dict[str, JSONValue] = {
- "jobId": job_id,
- "status": status,
- "progress": max(0, min(progress, 100)),
- "queueName": TOOL_MCP_DISCOVERY_QUEUE,
- "workerKey": worker_key,
- "errorMessage": error_message,
- "updatedTime": now,
- }
- if status == "queued":
- payload["queuedTime"] = now
- if status == "running":
- payload["startedTime"] = now
- if status in {"completed", "failed", "skipped"}:
- payload["completedTime"] = now
- return payload
- def _read_mcp_status(self, config: dict[str, JSONValue]) -> dict[str, JSONValue]:
- status_payload = config.get("mcp_status")
- if isinstance(status_payload, dict):
- return {str(key): value for key, value in status_payload.items()}
- return {}
- def _read_mcp_job_id(self, config: dict[str, JSONValue]) -> str | None:
- job_id = self._read_mcp_status(config).get("jobId")
- return job_id if isinstance(job_id, str) and job_id else None
- def _timeout_ms(self, config: dict[str, JSONValue]) -> int | None:
- timeout = config.get("timeout") or config.get("timeout_seconds")
- if isinstance(timeout, int | float):
- return int(timeout * 1000)
- return None
- def build_tool_application_service(
- *,
- db: Session,
- settings: ToolServiceSettings) -> ToolApplicationService:
- redis_client = try_build_redis_client(settings.redis_url)
- return ToolApplicationService(
- tool_definition_repository=ToolDefinitionRepository(db),
- tool_connection_repository=ToolConnectionRepository(db),
- tool_binding_repository=ToolBindingRepository(db),
- tool_credential_repository=ToolCredentialRepository(db),
- secret_cipher=SecretCipher(key=settings.credential_encryption_key),
- settings=settings,
- redis_client=redis_client,
- task_queue_publisher=(
- TaskQueuePublisher(client=redis_client) if redis_client is not None else None
- ))
|