services.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770
  1. from __future__ import annotations
  2. import json
  3. import urllib.error
  4. import urllib.request
  5. from datetime import datetime, timedelta
  6. from typing import TYPE_CHECKING
  7. from uuid import uuid4
  8. from sqlalchemy.orm import Session
  9. from core_shared import JSONValue, try_build_redis_client
  10. from core_shared.secrets import EncryptedSecret, SecretCipher
  11. from core_shared.task_queue import TOOL_MCP_DISCOVERY_QUEUE, TaskQueuePublisher
  12. from app.bootstrap.settings import ToolServiceSettings
  13. from app.db.models import ToolBinding, ToolCredential, ToolDefinition, ToolConnection
  14. from app.domain.repositories import (
  15. ToolBindingRepository,
  16. ToolCredentialRepository,
  17. ToolDefinitionRepository,
  18. ToolConnectionRepository,
  19. )
  20. from app.schemas.tool import (
  21. McpConnectData,
  22. McpConnectRequestDto,
  23. McpDiscoverRequestDto,
  24. McpToolDto,
  25. ToolBindingCreateRequest,
  26. ToolBindingCreateRequestDto,
  27. ToolBindingDeleteRequestDto,
  28. ToolBindingDetailRequestDto,
  29. ToolBindingUpdateRequestDto,
  30. ToolCreateRequest,
  31. ToolCreateRequestDto,
  32. ToolCredentialCreateRequest,
  33. ToolCredentialCreateRequestDto,
  34. ToolCredentialDeleteRequestDto,
  35. ToolCredentialDetailRequestDto,
  36. ToolCredentialDto,
  37. ToolCredentialRevealDto,
  38. ToolCredentialUpdateRequestDto,
  39. ToolDeleteRequestDto,
  40. ToolDetailRequestDto,
  41. ToolDto,
  42. ToolUpdateRequestDto,
  43. ToolConnectionCreateRequest,
  44. ToolConnectionCreateRequestDto,
  45. ToolConnectionDetailRequestDto,
  46. ToolConnectionDto,
  47. ToolConnectionUpdateRequestDto,
  48. )
  49. if TYPE_CHECKING:
  50. from redis import Redis
  51. class ToolApplicationService:
  52. def __init__(
  53. self,
  54. tool_definition_repository: ToolDefinitionRepository,
  55. tool_connection_repository: ToolConnectionRepository,
  56. tool_binding_repository: ToolBindingRepository,
  57. tool_credential_repository: ToolCredentialRepository,
  58. secret_cipher: SecretCipher,
  59. settings: ToolServiceSettings | None = None,
  60. redis_client: Redis | None = None,
  61. task_queue_publisher: TaskQueuePublisher | None = None) -> None:
  62. self.tool_definition_repository = tool_definition_repository
  63. self.tool_connection_repository = tool_connection_repository
  64. self.tool_binding_repository = tool_binding_repository
  65. self.tool_credential_repository = tool_credential_repository
  66. self.secret_cipher = secret_cipher
  67. self.settings = settings or ToolServiceSettings()
  68. self.redis_client = redis_client
  69. self.task_queue_publisher = task_queue_publisher
  70. def create_tool_definition(self, payload: ToolCreateRequest) -> ToolDefinition:
  71. code = payload.code or self._build_tool_code(payload.name)
  72. return self.tool_definition_repository.create(
  73. plugin_id=payload.plugin_id,
  74. code=code,
  75. name=payload.name,
  76. tool_type=payload.tool_type,
  77. description=payload.description)
  78. def list_tool_definitions(self) -> list[ToolDefinition]:
  79. return self.tool_definition_repository.list_all()
  80. def create_tool_definition_from_contract(
  81. self,
  82. payload: ToolCreateRequestDto) -> ToolDefinition:
  83. return self.create_tool_definition(
  84. ToolCreateRequest(
  85. plugin_id=payload.pluginId,
  86. name=payload.name,
  87. tool_type=payload.toolType,
  88. description=payload.description))
  89. def delete_tool_definition_from_contract(self, payload: ToolDeleteRequestDto) -> bool:
  90. entity = self.tool_definition_repository.get_by_id(tool_id=payload.toolId)
  91. if entity is None:
  92. return False
  93. self.tool_definition_repository.delete(entity)
  94. return True
  95. def get_tool_definition_from_contract(
  96. self,
  97. payload: ToolDetailRequestDto) -> ToolDefinition | None:
  98. return self.tool_definition_repository.get_by_id(tool_id=payload.toolId)
  99. def update_tool_definition_from_contract(
  100. self,
  101. payload: ToolUpdateRequestDto) -> ToolDefinition | None:
  102. entity = self.tool_definition_repository.get_by_id(tool_id=payload.toolId)
  103. if entity is None:
  104. return None
  105. if payload.name is not None:
  106. entity.name = payload.name
  107. entity.code = self._build_tool_code(payload.name)
  108. if payload.toolType is not None:
  109. entity.tool_type = payload.toolType
  110. if payload.description is not None:
  111. entity.description = payload.description
  112. if payload.pluginId is not None:
  113. entity.plugin_id = payload.pluginId
  114. return self.tool_definition_repository.save(entity)
  115. def create_tool_connection(self, payload: ToolConnectionCreateRequest) -> ToolConnection:
  116. return self.tool_connection_repository.create(
  117. tool_id=payload.tool_id,
  118. input_schema_json=payload.input_schema_json,
  119. output_schema_json=payload.output_schema_json,
  120. invoke_config_json=payload.invoke_config_json,
  121. timeout_ms=payload.timeout_ms,
  122. retry_policy_json=payload.retry_policy_json)
  123. def list_tool_connections(self, tool_id: str | None = None) -> list[ToolConnection]:
  124. if tool_id is None:
  125. return self.tool_connection_repository.list_all()
  126. return self.tool_connection_repository.list_by_tool(tool_id=tool_id)
  127. def create_tool_connection_from_contract(
  128. self,
  129. payload: ToolConnectionCreateRequestDto) -> ToolConnection:
  130. return self.create_tool_connection(
  131. ToolConnectionCreateRequest(
  132. tool_id=payload.toolId,
  133. input_schema_json=payload.inputSchema,
  134. output_schema_json=payload.outputSchema,
  135. invoke_config_json=payload.invokeConfig,
  136. timeout_ms=payload.timeoutMs,
  137. retry_policy_json=payload.retryPolicy))
  138. def delete_tool_connection(self, *, connection_id: str) -> bool:
  139. entity = self.tool_connection_repository.get_by_id(tool_connection_id=connection_id)
  140. if entity is None:
  141. return False
  142. self.tool_connection_repository.delete(entity)
  143. return True
  144. def get_tool_connection_from_contract(
  145. self,
  146. payload: ToolConnectionDetailRequestDto) -> ToolConnection | None:
  147. return self.tool_connection_repository.get_by_id(
  148. tool_connection_id=payload.connectionId)
  149. def update_tool_connection_from_contract(
  150. self,
  151. payload: ToolConnectionUpdateRequestDto) -> ToolConnection | None:
  152. entity = self.tool_connection_repository.get_by_id(
  153. tool_connection_id=payload.connectionId)
  154. if entity is None:
  155. return None
  156. if payload.inputSchema is not None:
  157. entity.input_schema_json = payload.inputSchema
  158. if payload.outputSchema is not None:
  159. entity.output_schema_json = payload.outputSchema
  160. if payload.invokeConfig is not None:
  161. entity.invoke_config_json = payload.invokeConfig
  162. if payload.timeoutMs is not None:
  163. entity.timeout_ms = payload.timeoutMs
  164. if payload.retryPolicy is not None:
  165. entity.retry_policy_json = payload.retryPolicy
  166. return self.tool_connection_repository.save(entity)
  167. def connect_mcp_server(self, payload: McpConnectRequestDto) -> McpConnectData:
  168. server_name, config = self._normalize_mcp_config(payload)
  169. discovered_tools = self._extract_mcp_tools(config)
  170. if discovered_tools:
  171. config["mcp_tools"] = [
  172. tool.model_dump(mode="json", by_alias=False)
  173. for tool in discovered_tools
  174. ]
  175. job_id = f"mcpjob_{uuid4().hex}"
  176. config["mcp_status"] = self._build_mcp_status(
  177. job_id=job_id,
  178. status="queued",
  179. progress=0)
  180. tool = self.create_tool_definition(
  181. ToolCreateRequest(
  182. name=payload.name or server_name,
  183. tool_type="mcp",
  184. description=f"MCP SSE server: {config.get('url', '')}"))
  185. connection = self.create_tool_connection(
  186. ToolConnectionCreateRequest(
  187. tool_id=tool.id,
  188. input_schema_json={},
  189. output_schema_json={},
  190. invoke_config_json=config,
  191. timeout_ms=self._timeout_ms(config),
  192. retry_policy_json={"max_attempts": 1}))
  193. published = self._publish_mcp_discovery_job(
  194. connection_id=connection.id,
  195. job_id=job_id)
  196. if not published:
  197. config["mcp_status"] = self._build_mcp_status(
  198. job_id=job_id,
  199. status="skipped",
  200. progress=0,
  201. error_message="Redis queue is unavailable; connection check was not scheduled.")
  202. connection.invoke_config_json = config
  203. connection = self.tool_connection_repository.save(connection)
  204. return McpConnectData(
  205. tool=ToolDto.from_entity(tool),
  206. connection=ToolConnectionDto.from_entity(connection),
  207. discoveredTools=discovered_tools)
  208. def discover_mcp_connection(self, payload: McpDiscoverRequestDto) -> ToolConnection | None:
  209. return self._run_mcp_discovery(
  210. connection_id=payload.connectionId,
  211. worker_key="api-sync")
  212. def execute_mcp_discovery_job(
  213. self,
  214. *,
  215. connection_id: str,
  216. worker_key: str,
  217. lease_seconds: int,
  218. job_id: str | None = None,
  219. redis_client: Redis | None = None) -> ToolConnection | None:
  220. resolved_redis_client = redis_client or self.redis_client
  221. lock = None
  222. idempotency_store = None
  223. idempotency_key = f"{connection_id}:{job_id or 'discovery'}"
  224. if resolved_redis_client is not None:
  225. from core_shared.redis_primitives import DistributedLock, IdempotencyStore
  226. lock = DistributedLock(
  227. client=resolved_redis_client,
  228. name=f"tool-mcp-discovery:{connection_id}:lock",
  229. ttl_seconds=lease_seconds)
  230. if not lock.acquire():
  231. return None
  232. idempotency_store = IdempotencyStore(
  233. client=resolved_redis_client,
  234. prefix="tool-mcp-discovery-idempotency")
  235. if not idempotency_store.begin(key=idempotency_key):
  236. lock.release()
  237. return None
  238. try:
  239. result = self._run_mcp_discovery(
  240. connection_id=connection_id,
  241. worker_key=worker_key,
  242. job_id=job_id)
  243. if idempotency_store is not None and result is not None:
  244. status_payload = self._read_mcp_status(result.invoke_config_json or {})
  245. idempotency_store.complete(
  246. key=idempotency_key,
  247. result={
  248. "connection_id": result.id,
  249. "status": str(status_payload.get("status") or ""),
  250. })
  251. except Exception:
  252. if idempotency_store is not None:
  253. idempotency_store.clear(key=idempotency_key)
  254. raise
  255. finally:
  256. if lock is not None:
  257. lock.release()
  258. return result
  259. def execute_next_pending_mcp_discovery(
  260. self,
  261. *,
  262. worker_key: str,
  263. lease_seconds: int,
  264. stale_discovery_seconds: int,
  265. redis_client: Redis | None = None) -> ToolConnection | None:
  266. stale_before = datetime.utcnow() - timedelta(seconds=stale_discovery_seconds)
  267. pending_connections = self.tool_connection_repository.list_pending_mcp_discovery(
  268. stale_before=stale_before,
  269. limit=1)
  270. if not pending_connections:
  271. return None
  272. connection = pending_connections[0]
  273. status_payload = self._read_mcp_status(connection.invoke_config_json or {})
  274. job_id = status_payload.get("jobId")
  275. return self.execute_mcp_discovery_job(
  276. connection_id=connection.id,
  277. job_id=job_id if isinstance(job_id, str) else None,
  278. worker_key=worker_key,
  279. lease_seconds=lease_seconds,
  280. redis_client=redis_client)
  281. def _run_mcp_discovery(
  282. self,
  283. *,
  284. connection_id: str,
  285. worker_key: str,
  286. job_id: str | None = None) -> ToolConnection | None:
  287. connection = self.tool_connection_repository.get_by_id(tool_connection_id=connection_id)
  288. if connection is None:
  289. return None
  290. config = dict(connection.invoke_config_json or {})
  291. resolved_job_id = job_id or self._read_mcp_job_id(config) or f"mcpjob_{uuid4().hex}"
  292. config["mcp_status"] = self._build_mcp_status(
  293. job_id=resolved_job_id,
  294. status="running",
  295. progress=20,
  296. worker_key=worker_key)
  297. connection.invoke_config_json = config
  298. connection = self.tool_connection_repository.save(connection)
  299. try:
  300. discovered_tools = self._discover_mcp_tools(config)
  301. except Exception as exc:
  302. config = dict(connection.invoke_config_json or {})
  303. config["mcp_status"] = self._build_mcp_status(
  304. job_id=resolved_job_id,
  305. status="failed",
  306. progress=100,
  307. worker_key=worker_key,
  308. error_message=str(exc))
  309. connection.invoke_config_json = config
  310. return self.tool_connection_repository.save(connection)
  311. config = dict(connection.invoke_config_json or {})
  312. if discovered_tools:
  313. config["mcp_tools"] = [
  314. tool.model_dump(mode="json", by_alias=False)
  315. for tool in discovered_tools
  316. ]
  317. config["mcp_status"] = self._build_mcp_status(
  318. job_id=resolved_job_id,
  319. status="completed",
  320. progress=100,
  321. worker_key=worker_key)
  322. connection.invoke_config_json = config
  323. return self.tool_connection_repository.save(connection)
  324. def create_tool_binding(self, payload: ToolBindingCreateRequest) -> ToolBinding:
  325. if payload.credential_id is not None:
  326. credential = self.tool_credential_repository.get_by_id(
  327. credential_id=payload.credential_id)
  328. if credential is None:
  329. raise ValueError(f"tool credential not found: {payload.credential_id}")
  330. return self.tool_binding_repository.create(
  331. app_id=payload.app_id,
  332. tool_connection_id=payload.tool_connection_id,
  333. credential_id=payload.credential_id,
  334. binding_scope=payload.binding_scope,
  335. enabled=payload.enabled,
  336. config_json=payload.config_json)
  337. def create_tool_binding_from_contract(
  338. self,
  339. payload: ToolBindingCreateRequestDto) -> ToolBinding:
  340. return self.create_tool_binding(
  341. ToolBindingCreateRequest(
  342. app_id=payload.appId,
  343. tool_connection_id=payload.connectionId,
  344. credential_id=payload.credentialId,
  345. binding_scope=payload.bindingScope,
  346. enabled=True,
  347. config_json=payload.configJson))
  348. def list_tool_bindings(self, app_id: str | None = None) -> list[ToolBinding]:
  349. return self.tool_binding_repository.list_by_scope(app_id=app_id)
  350. def delete_tool_binding(self, payload: ToolBindingDeleteRequestDto) -> bool:
  351. entity = self.tool_binding_repository.get_by_id(binding_id=payload.bindingId)
  352. if entity is None:
  353. return False
  354. self.tool_binding_repository.delete(entity)
  355. return True
  356. def get_tool_binding_from_contract(
  357. self,
  358. payload: ToolBindingDetailRequestDto) -> ToolBinding | None:
  359. return self.tool_binding_repository.get_by_id(binding_id=payload.bindingId)
  360. def update_tool_binding_from_contract(
  361. self,
  362. payload: ToolBindingUpdateRequestDto) -> ToolBinding | None:
  363. entity = self.tool_binding_repository.get_by_id(binding_id=payload.bindingId)
  364. if entity is None:
  365. return None
  366. if payload.credentialId is not None:
  367. entity.credential_id = payload.credentialId
  368. if payload.bindingScope is not None:
  369. entity.binding_scope = payload.bindingScope
  370. if payload.configJson is not None:
  371. entity.config_json = payload.configJson
  372. return self.tool_binding_repository.save(entity)
  373. def create_tool_credential(self, payload: ToolCredentialCreateRequest) -> ToolCredential:
  374. encrypted = self.secret_cipher.encrypt_json(payload.secret_json)
  375. return self.tool_credential_repository.create(
  376. name=payload.name,
  377. credential_type=payload.credential_type,
  378. encrypted_payload_text=encrypted.ciphertext,
  379. secret_fingerprint=encrypted.fingerprint,
  380. encryption_algorithm=encrypted.algorithm,
  381. metadata_json=payload.metadata_json)
  382. def create_tool_credential_from_contract(
  383. self,
  384. payload: ToolCredentialCreateRequestDto) -> ToolCredential:
  385. return self.create_tool_credential(
  386. ToolCredentialCreateRequest(
  387. name=payload.name,
  388. credential_type=payload.credentialType,
  389. secret_json=payload.secretJson,
  390. metadata_json=payload.metadataJson))
  391. def list_tool_credentials(self) -> list[ToolCredential]:
  392. return self.tool_credential_repository.list_all()
  393. def delete_tool_credential(self, payload: ToolCredentialDeleteRequestDto) -> bool:
  394. entity = self.tool_credential_repository.get_by_id(
  395. credential_id=payload.credentialId)
  396. if entity is None:
  397. return False
  398. self.tool_credential_repository.delete(entity)
  399. return True
  400. def get_tool_credential_from_contract(
  401. self,
  402. payload: ToolCredentialDetailRequestDto) -> ToolCredential | None:
  403. return self.tool_credential_repository.get_by_id(
  404. credential_id=payload.credentialId)
  405. def update_tool_credential_from_contract(
  406. self,
  407. payload: ToolCredentialUpdateRequestDto) -> ToolCredential | None:
  408. entity = self.tool_credential_repository.get_by_id(
  409. credential_id=payload.credentialId)
  410. if entity is None:
  411. return None
  412. if payload.name is not None:
  413. entity.name = payload.name
  414. if payload.metadataJson is not None:
  415. entity.metadata_json = payload.metadataJson
  416. return self.tool_credential_repository.save(entity)
  417. def reveal_tool_credential(
  418. self,
  419. *,
  420. credential_id: str) -> tuple[ToolCredential, dict[str, JSONValue]] | None:
  421. credential = self.tool_credential_repository.get_by_id(
  422. credential_id=credential_id)
  423. if credential is None:
  424. return None
  425. secret_json = self.secret_cipher.decrypt_json(
  426. EncryptedSecret(
  427. ciphertext=credential.encrypted_payload_text,
  428. fingerprint=credential.secret_fingerprint,
  429. algorithm=credential.encryption_algorithm)
  430. )
  431. return credential, secret_json
  432. def reveal_tool_credential_from_contract(
  433. self,
  434. *,
  435. credential_id: str) -> ToolCredentialRevealDto | None:
  436. result = self.reveal_tool_credential(credential_id=credential_id)
  437. if result is None:
  438. return None
  439. credential, secret_json = result
  440. return ToolCredentialRevealDto(
  441. credential=ToolCredentialDto.from_entity(credential),
  442. secretJson=secret_json)
  443. def get_tool_binding_detail(
  444. self,
  445. *,
  446. binding_id: str) -> tuple[ToolBinding, ToolConnection, ToolDefinition] | None:
  447. binding = self.tool_binding_repository.get_by_id(binding_id=binding_id)
  448. if binding is None:
  449. return None
  450. tool_connection = self.tool_connection_repository.get_by_id(
  451. tool_connection_id=binding.tool_connection_id)
  452. if tool_connection is None:
  453. return None
  454. tool_definition = self.tool_definition_repository.get_by_id(
  455. tool_id=tool_connection.tool_id)
  456. if tool_definition is None:
  457. return None
  458. return binding, tool_connection, tool_definition
  459. def _build_tool_code(self, name: str) -> str:
  460. base = "".join(
  461. char.lower() if char.isalnum() else "_"
  462. for char in name
  463. ).strip("_") or "tool"
  464. return base[:64]
  465. def _normalize_mcp_config(
  466. self,
  467. payload: McpConnectRequestDto) -> tuple[str, dict[str, JSONValue]]:
  468. config = payload.config
  469. if len(config) == 1:
  470. server_name, value = next(iter(config.items()))
  471. if isinstance(value, dict):
  472. normalized = {str(key): item for key, item in value.items()}
  473. normalized.setdefault("server_name", server_name)
  474. normalized.setdefault("transport", "sse")
  475. return payload.name or server_name, normalized
  476. server_name_value = config.get("server_name") or config.get("serverName")
  477. server_name = str(server_name_value or payload.name or "mcp_server")
  478. normalized = {str(key): value for key, value in config.items()}
  479. normalized.setdefault("server_name", server_name)
  480. normalized.setdefault("transport", "sse")
  481. return server_name, normalized
  482. def _extract_mcp_tools(self, config: dict[str, JSONValue]) -> list[McpToolDto]:
  483. raw_tools = config.get("mcp_tools") or config.get("tools")
  484. if not isinstance(raw_tools, list):
  485. return []
  486. tools: list[McpToolDto] = []
  487. for item in raw_tools:
  488. if not isinstance(item, dict):
  489. continue
  490. name = item.get("name")
  491. if not isinstance(name, str) or not name:
  492. continue
  493. description = item.get("description")
  494. input_schema = item.get("inputSchema") or item.get("input_schema")
  495. tools.append(
  496. McpToolDto(
  497. name=name,
  498. description=description if isinstance(description, str) else None,
  499. inputSchema=input_schema if isinstance(input_schema, dict) else None))
  500. return tools
  501. def _discover_mcp_tools(self, config: dict[str, JSONValue]) -> list[McpToolDto]:
  502. self._validate_mcp_connection(config)
  503. try:
  504. return self._discover_mcp_tools_via_streamable_http(config)
  505. except Exception:
  506. return self._extract_mcp_tools(config)
  507. def _discover_mcp_tools_via_streamable_http(
  508. self,
  509. config: dict[str, JSONValue]) -> list[McpToolDto]:
  510. self._mcp_rpc(
  511. config,
  512. request_id=1,
  513. method="initialize",
  514. params={
  515. "protocolVersion": "2024-11-05",
  516. "capabilities": {},
  517. "clientInfo": {
  518. "name": "auto-platform",
  519. "version": "0.1.0",
  520. },
  521. })
  522. self._mcp_rpc(
  523. config,
  524. method="notifications/initialized",
  525. params={})
  526. result = self._mcp_rpc(
  527. config,
  528. request_id=2,
  529. method="tools/list",
  530. params={})
  531. tools_result = result.get("result")
  532. if not isinstance(tools_result, dict):
  533. return []
  534. raw_tools = tools_result.get("tools")
  535. if not isinstance(raw_tools, list):
  536. return []
  537. return self._parse_mcp_tool_list(raw_tools)
  538. def _mcp_rpc(
  539. self,
  540. config: dict[str, JSONValue],
  541. *,
  542. method: str,
  543. params: dict[str, JSONValue],
  544. request_id: int | None = None) -> dict[str, JSONValue]:
  545. url = config.get("url")
  546. if not isinstance(url, str) or not url:
  547. raise ValueError("MCP server url is required")
  548. payload: dict[str, JSONValue] = {
  549. "jsonrpc": "2.0",
  550. "method": method,
  551. "params": params,
  552. }
  553. if request_id is not None:
  554. payload["id"] = request_id
  555. headers = {
  556. **self._read_mcp_headers(config),
  557. "Accept": "application/json, text/event-stream",
  558. "Content-Type": "application/json",
  559. }
  560. request = urllib.request.Request(
  561. url,
  562. data=json.dumps(payload).encode("utf-8"),
  563. headers=headers,
  564. method="POST")
  565. with urllib.request.urlopen(
  566. request,
  567. timeout=self._read_timeout_seconds(config)) as response:
  568. response_body = response.read().decode("utf-8")
  569. content_type = response.headers.get("Content-Type", "")
  570. data = self._parse_mcp_response(response_body, content_type)
  571. if not data:
  572. return {}
  573. error = data.get("error")
  574. if isinstance(error, dict):
  575. message = error.get("message")
  576. raise ValueError(str(message or "MCP JSON-RPC request failed"))
  577. return data
  578. def _parse_mcp_response(
  579. self,
  580. response_body: str,
  581. content_type: str) -> dict[str, JSONValue]:
  582. if "text/event-stream" in content_type:
  583. data_lines: list[str] = []
  584. for line in response_body.splitlines():
  585. if line.startswith("data:"):
  586. data_lines.append(line[5:].strip())
  587. if not data_lines:
  588. return {}
  589. response_body = "\n".join(data_lines)
  590. parsed = json.loads(response_body)
  591. return parsed if isinstance(parsed, dict) else {}
  592. def _parse_mcp_tool_list(self, raw_tools: list[JSONValue]) -> list[McpToolDto]:
  593. tools: list[McpToolDto] = []
  594. for item in raw_tools:
  595. if not isinstance(item, dict):
  596. continue
  597. name = item.get("name")
  598. if not isinstance(name, str) or not name:
  599. continue
  600. description = item.get("description")
  601. input_schema = item.get("inputSchema") or item.get("input_schema")
  602. tools.append(
  603. McpToolDto(
  604. name=name,
  605. description=description if isinstance(description, str) else None,
  606. inputSchema=input_schema if isinstance(input_schema, dict) else None))
  607. return tools
  608. def _publish_mcp_discovery_job(
  609. self,
  610. *,
  611. connection_id: str,
  612. job_id: str) -> bool:
  613. if not self.settings.mcp_discovery_async_enabled or self.task_queue_publisher is None:
  614. return False
  615. return self.task_queue_publisher.publish_tool_mcp_discovery(
  616. connection_id=connection_id,
  617. job_id=job_id)
  618. def _validate_mcp_connection(self, config: dict[str, JSONValue]) -> None:
  619. url = config.get("url")
  620. if not isinstance(url, str) or not url:
  621. raise ValueError("MCP server url is required")
  622. headers = self._read_mcp_headers(config)
  623. request = urllib.request.Request(
  624. url,
  625. headers={
  626. **headers,
  627. "Accept": headers.get("Accept", "text/event-stream"),
  628. },
  629. method="GET")
  630. timeout = self._read_timeout_seconds(config)
  631. try:
  632. with urllib.request.urlopen(request, timeout=timeout) as response:
  633. status_code = int(getattr(response, "status", 200))
  634. if status_code >= 500:
  635. raise ValueError(f"MCP server returned HTTP {status_code}")
  636. except urllib.error.HTTPError as exc:
  637. if exc.code >= 500:
  638. raise ValueError(f"MCP server returned HTTP {exc.code}") from exc
  639. except Exception as exc:
  640. raise ValueError(f"MCP server connection failed: {exc}") from exc
  641. def _read_mcp_headers(self, config: dict[str, JSONValue]) -> dict[str, str]:
  642. headers = config.get("headers")
  643. if not isinstance(headers, dict):
  644. return {}
  645. return {
  646. str(key): str(value)
  647. for key, value in headers.items()
  648. if isinstance(value, (str, int, float, bool))
  649. }
  650. def _read_timeout_seconds(self, config: dict[str, JSONValue]) -> float:
  651. timeout = config.get("timeout") or config.get("timeout_seconds")
  652. if isinstance(timeout, (int, float)) and not isinstance(timeout, bool):
  653. return min(float(timeout), self.settings.mcp_discovery_timeout_seconds)
  654. return self.settings.mcp_discovery_timeout_seconds
  655. def _build_mcp_status(
  656. self,
  657. *,
  658. job_id: str,
  659. status: str,
  660. progress: int,
  661. worker_key: str | None = None,
  662. error_message: str | None = None) -> dict[str, JSONValue]:
  663. now = datetime.utcnow().isoformat()
  664. payload: dict[str, JSONValue] = {
  665. "jobId": job_id,
  666. "status": status,
  667. "progress": max(0, min(progress, 100)),
  668. "queueName": TOOL_MCP_DISCOVERY_QUEUE,
  669. "workerKey": worker_key,
  670. "errorMessage": error_message,
  671. "updatedTime": now,
  672. }
  673. if status == "queued":
  674. payload["queuedTime"] = now
  675. if status == "running":
  676. payload["startedTime"] = now
  677. if status in {"completed", "failed", "skipped"}:
  678. payload["completedTime"] = now
  679. return payload
  680. def _read_mcp_status(self, config: dict[str, JSONValue]) -> dict[str, JSONValue]:
  681. status_payload = config.get("mcp_status")
  682. if isinstance(status_payload, dict):
  683. return {str(key): value for key, value in status_payload.items()}
  684. return {}
  685. def _read_mcp_job_id(self, config: dict[str, JSONValue]) -> str | None:
  686. job_id = self._read_mcp_status(config).get("jobId")
  687. return job_id if isinstance(job_id, str) and job_id else None
  688. def _timeout_ms(self, config: dict[str, JSONValue]) -> int | None:
  689. timeout = config.get("timeout") or config.get("timeout_seconds")
  690. if isinstance(timeout, int | float):
  691. return int(timeout * 1000)
  692. return None
  693. def build_tool_application_service(
  694. *,
  695. db: Session,
  696. settings: ToolServiceSettings) -> ToolApplicationService:
  697. redis_client = try_build_redis_client(settings.redis_url)
  698. return ToolApplicationService(
  699. tool_definition_repository=ToolDefinitionRepository(db),
  700. tool_connection_repository=ToolConnectionRepository(db),
  701. tool_binding_repository=ToolBindingRepository(db),
  702. tool_credential_repository=ToolCredentialRepository(db),
  703. secret_cipher=SecretCipher(key=settings.credential_encryption_key),
  704. settings=settings,
  705. redis_client=redis_client,
  706. task_queue_publisher=(
  707. TaskQueuePublisher(client=redis_client) if redis_client is not None else None
  708. ))