services.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646
  1. from __future__ import annotations
  2. import urllib.error
  3. import urllib.request
  4. from datetime import datetime, timedelta
  5. from typing import TYPE_CHECKING
  6. from uuid import uuid4
  7. from sqlalchemy.orm import Session
  8. from core_shared import JSONValue, try_build_redis_client
  9. from core_shared.secrets import EncryptedSecret, SecretCipher
  10. from core_shared.task_queue import TOOL_MCP_DISCOVERY_QUEUE, TaskQueuePublisher
  11. from app.bootstrap.settings import ToolServiceSettings
  12. from app.db.models import ToolBinding, ToolCredential, ToolDefinition, ToolConnection
  13. from app.domain.repositories import (
  14. ToolBindingRepository,
  15. ToolCredentialRepository,
  16. ToolDefinitionRepository,
  17. ToolConnectionRepository,
  18. )
  19. from app.schemas.tool import (
  20. McpConnectData,
  21. McpConnectRequestDto,
  22. McpToolDto,
  23. ToolBindingCreateRequest,
  24. ToolBindingCreateRequestDto,
  25. ToolBindingDeleteRequestDto,
  26. ToolBindingDetailRequestDto,
  27. ToolBindingUpdateRequestDto,
  28. ToolCreateRequest,
  29. ToolCreateRequestDto,
  30. ToolCredentialCreateRequest,
  31. ToolCredentialCreateRequestDto,
  32. ToolCredentialDeleteRequestDto,
  33. ToolCredentialDetailRequestDto,
  34. ToolCredentialDto,
  35. ToolCredentialRevealDto,
  36. ToolCredentialUpdateRequestDto,
  37. ToolDeleteRequestDto,
  38. ToolDetailRequestDto,
  39. ToolDto,
  40. ToolUpdateRequestDto,
  41. ToolConnectionCreateRequest,
  42. ToolConnectionCreateRequestDto,
  43. ToolConnectionDetailRequestDto,
  44. ToolConnectionDto,
  45. ToolConnectionUpdateRequestDto,
  46. )
  47. if TYPE_CHECKING:
  48. from redis import Redis
  49. class ToolApplicationService:
  50. def __init__(
  51. self,
  52. tool_definition_repository: ToolDefinitionRepository,
  53. tool_connection_repository: ToolConnectionRepository,
  54. tool_binding_repository: ToolBindingRepository,
  55. tool_credential_repository: ToolCredentialRepository,
  56. secret_cipher: SecretCipher,
  57. settings: ToolServiceSettings | None = None,
  58. redis_client: Redis | None = None,
  59. task_queue_publisher: TaskQueuePublisher | None = None) -> None:
  60. self.tool_definition_repository = tool_definition_repository
  61. self.tool_connection_repository = tool_connection_repository
  62. self.tool_binding_repository = tool_binding_repository
  63. self.tool_credential_repository = tool_credential_repository
  64. self.secret_cipher = secret_cipher
  65. self.settings = settings or ToolServiceSettings()
  66. self.redis_client = redis_client
  67. self.task_queue_publisher = task_queue_publisher
  68. def create_tool_definition(self, payload: ToolCreateRequest) -> ToolDefinition:
  69. code = payload.code or self._build_tool_code(payload.name)
  70. return self.tool_definition_repository.create(
  71. plugin_id=payload.plugin_id,
  72. code=code,
  73. name=payload.name,
  74. tool_type=payload.tool_type,
  75. description=payload.description)
  76. def list_tool_definitions(self) -> list[ToolDefinition]:
  77. return self.tool_definition_repository.list_all()
  78. def create_tool_definition_from_contract(
  79. self,
  80. payload: ToolCreateRequestDto) -> ToolDefinition:
  81. return self.create_tool_definition(
  82. ToolCreateRequest(
  83. plugin_id=payload.pluginId,
  84. name=payload.name,
  85. tool_type=payload.toolType,
  86. description=payload.description))
  87. def delete_tool_definition_from_contract(self, payload: ToolDeleteRequestDto) -> bool:
  88. entity = self.tool_definition_repository.get_by_id(tool_id=payload.toolId)
  89. if entity is None:
  90. return False
  91. self.tool_definition_repository.delete(entity)
  92. return True
  93. def get_tool_definition_from_contract(
  94. self,
  95. payload: ToolDetailRequestDto) -> ToolDefinition | None:
  96. return self.tool_definition_repository.get_by_id(tool_id=payload.toolId)
  97. def update_tool_definition_from_contract(
  98. self,
  99. payload: ToolUpdateRequestDto) -> ToolDefinition | None:
  100. entity = self.tool_definition_repository.get_by_id(tool_id=payload.toolId)
  101. if entity is None:
  102. return None
  103. if payload.name is not None:
  104. entity.name = payload.name
  105. entity.code = self._build_tool_code(payload.name)
  106. if payload.toolType is not None:
  107. entity.tool_type = payload.toolType
  108. if payload.description is not None:
  109. entity.description = payload.description
  110. if payload.pluginId is not None:
  111. entity.plugin_id = payload.pluginId
  112. return self.tool_definition_repository.save(entity)
  113. def create_tool_connection(self, payload: ToolConnectionCreateRequest) -> ToolConnection:
  114. return self.tool_connection_repository.create(
  115. tool_id=payload.tool_id,
  116. input_schema_json=payload.input_schema_json,
  117. output_schema_json=payload.output_schema_json,
  118. invoke_config_json=payload.invoke_config_json,
  119. timeout_ms=payload.timeout_ms,
  120. retry_policy_json=payload.retry_policy_json)
  121. def list_tool_connections(self, tool_id: str | None = None) -> list[ToolConnection]:
  122. if tool_id is None:
  123. return self.tool_connection_repository.list_all()
  124. return self.tool_connection_repository.list_by_tool(tool_id=tool_id)
  125. def create_tool_connection_from_contract(
  126. self,
  127. payload: ToolConnectionCreateRequestDto) -> ToolConnection:
  128. return self.create_tool_connection(
  129. ToolConnectionCreateRequest(
  130. tool_id=payload.toolId,
  131. input_schema_json=payload.inputSchema,
  132. output_schema_json=payload.outputSchema,
  133. invoke_config_json=payload.invokeConfig,
  134. timeout_ms=payload.timeoutMs,
  135. retry_policy_json=payload.retryPolicy))
  136. def delete_tool_connection(self, *, connection_id: str) -> bool:
  137. entity = self.tool_connection_repository.get_by_id(tool_connection_id=connection_id)
  138. if entity is None:
  139. return False
  140. self.tool_connection_repository.delete(entity)
  141. return True
  142. def get_tool_connection_from_contract(
  143. self,
  144. payload: ToolConnectionDetailRequestDto) -> ToolConnection | None:
  145. return self.tool_connection_repository.get_by_id(
  146. tool_connection_id=payload.connectionId)
  147. def update_tool_connection_from_contract(
  148. self,
  149. payload: ToolConnectionUpdateRequestDto) -> ToolConnection | None:
  150. entity = self.tool_connection_repository.get_by_id(
  151. tool_connection_id=payload.connectionId)
  152. if entity is None:
  153. return None
  154. if payload.inputSchema is not None:
  155. entity.input_schema_json = payload.inputSchema
  156. if payload.outputSchema is not None:
  157. entity.output_schema_json = payload.outputSchema
  158. if payload.invokeConfig is not None:
  159. entity.invoke_config_json = payload.invokeConfig
  160. if payload.timeoutMs is not None:
  161. entity.timeout_ms = payload.timeoutMs
  162. if payload.retryPolicy is not None:
  163. entity.retry_policy_json = payload.retryPolicy
  164. return self.tool_connection_repository.save(entity)
  165. def connect_mcp_server(self, payload: McpConnectRequestDto) -> McpConnectData:
  166. server_name, config = self._normalize_mcp_config(payload)
  167. discovered_tools = self._extract_mcp_tools(config)
  168. if discovered_tools:
  169. config["mcp_tools"] = [
  170. tool.model_dump(mode="json", by_alias=False)
  171. for tool in discovered_tools
  172. ]
  173. job_id = f"mcpjob_{uuid4().hex}"
  174. config["mcp_status"] = self._build_mcp_status(
  175. job_id=job_id,
  176. status="queued",
  177. progress=0)
  178. tool = self.create_tool_definition(
  179. ToolCreateRequest(
  180. name=payload.name or server_name,
  181. tool_type="mcp",
  182. description=f"MCP SSE server: {config.get('url', '')}"))
  183. connection = self.create_tool_connection(
  184. ToolConnectionCreateRequest(
  185. tool_id=tool.id,
  186. input_schema_json={},
  187. output_schema_json={},
  188. invoke_config_json=config,
  189. timeout_ms=self._timeout_ms(config),
  190. retry_policy_json={"max_attempts": 1}))
  191. published = self._publish_mcp_discovery_job(
  192. connection_id=connection.id,
  193. job_id=job_id)
  194. if not published:
  195. config["mcp_status"] = self._build_mcp_status(
  196. job_id=job_id,
  197. status="skipped",
  198. progress=0,
  199. error_message="Redis queue is unavailable; connection check was not scheduled.")
  200. connection.invoke_config_json = config
  201. connection = self.tool_connection_repository.save(connection)
  202. return McpConnectData(
  203. tool=ToolDto.from_entity(tool),
  204. connection=ToolConnectionDto.from_entity(connection),
  205. discoveredTools=discovered_tools)
  206. def execute_mcp_discovery_job(
  207. self,
  208. *,
  209. connection_id: str,
  210. worker_key: str,
  211. lease_seconds: int,
  212. job_id: str | None = None,
  213. redis_client: Redis | None = None) -> ToolConnection | None:
  214. resolved_redis_client = redis_client or self.redis_client
  215. lock = None
  216. idempotency_store = None
  217. idempotency_key = f"{connection_id}:{job_id or 'discovery'}"
  218. if resolved_redis_client is not None:
  219. from core_shared.redis_primitives import DistributedLock, IdempotencyStore
  220. lock = DistributedLock(
  221. client=resolved_redis_client,
  222. name=f"tool-mcp-discovery:{connection_id}:lock",
  223. ttl_seconds=lease_seconds)
  224. if not lock.acquire():
  225. return None
  226. idempotency_store = IdempotencyStore(
  227. client=resolved_redis_client,
  228. prefix="tool-mcp-discovery-idempotency")
  229. if not idempotency_store.begin(key=idempotency_key):
  230. lock.release()
  231. return None
  232. try:
  233. result = self._run_mcp_discovery(
  234. connection_id=connection_id,
  235. worker_key=worker_key,
  236. job_id=job_id)
  237. if idempotency_store is not None and result is not None:
  238. status_payload = self._read_mcp_status(result.invoke_config_json or {})
  239. idempotency_store.complete(
  240. key=idempotency_key,
  241. result={
  242. "connection_id": result.id,
  243. "status": str(status_payload.get("status") or ""),
  244. })
  245. except Exception:
  246. if idempotency_store is not None:
  247. idempotency_store.clear(key=idempotency_key)
  248. raise
  249. finally:
  250. if lock is not None:
  251. lock.release()
  252. return result
  253. def execute_next_pending_mcp_discovery(
  254. self,
  255. *,
  256. worker_key: str,
  257. lease_seconds: int,
  258. stale_discovery_seconds: int,
  259. redis_client: Redis | None = None) -> ToolConnection | None:
  260. stale_before = datetime.utcnow() - timedelta(seconds=stale_discovery_seconds)
  261. pending_connections = self.tool_connection_repository.list_pending_mcp_discovery(
  262. stale_before=stale_before,
  263. limit=1)
  264. if not pending_connections:
  265. return None
  266. connection = pending_connections[0]
  267. status_payload = self._read_mcp_status(connection.invoke_config_json or {})
  268. job_id = status_payload.get("jobId")
  269. return self.execute_mcp_discovery_job(
  270. connection_id=connection.id,
  271. job_id=job_id if isinstance(job_id, str) else None,
  272. worker_key=worker_key,
  273. lease_seconds=lease_seconds,
  274. redis_client=redis_client)
  275. def _run_mcp_discovery(
  276. self,
  277. *,
  278. connection_id: str,
  279. worker_key: str,
  280. job_id: str | None = None) -> ToolConnection | None:
  281. connection = self.tool_connection_repository.get_by_id(tool_connection_id=connection_id)
  282. if connection is None:
  283. return None
  284. config = dict(connection.invoke_config_json or {})
  285. resolved_job_id = job_id or self._read_mcp_job_id(config) or f"mcpjob_{uuid4().hex}"
  286. config["mcp_status"] = self._build_mcp_status(
  287. job_id=resolved_job_id,
  288. status="running",
  289. progress=20,
  290. worker_key=worker_key)
  291. connection.invoke_config_json = config
  292. connection = self.tool_connection_repository.save(connection)
  293. try:
  294. self._validate_mcp_connection(config)
  295. except Exception as exc:
  296. config = dict(connection.invoke_config_json or {})
  297. config["mcp_status"] = self._build_mcp_status(
  298. job_id=resolved_job_id,
  299. status="failed",
  300. progress=100,
  301. worker_key=worker_key,
  302. error_message=str(exc))
  303. connection.invoke_config_json = config
  304. return self.tool_connection_repository.save(connection)
  305. config = dict(connection.invoke_config_json or {})
  306. config["mcp_status"] = self._build_mcp_status(
  307. job_id=resolved_job_id,
  308. status="completed",
  309. progress=100,
  310. worker_key=worker_key)
  311. connection.invoke_config_json = config
  312. return self.tool_connection_repository.save(connection)
  313. def create_tool_binding(self, payload: ToolBindingCreateRequest) -> ToolBinding:
  314. if payload.credential_id is not None:
  315. credential = self.tool_credential_repository.get_by_id(
  316. credential_id=payload.credential_id)
  317. if credential is None:
  318. raise ValueError(f"tool credential not found: {payload.credential_id}")
  319. return self.tool_binding_repository.create(
  320. app_id=payload.app_id,
  321. tool_connection_id=payload.tool_connection_id,
  322. credential_id=payload.credential_id,
  323. binding_scope=payload.binding_scope,
  324. enabled=payload.enabled,
  325. config_json=payload.config_json)
  326. def create_tool_binding_from_contract(
  327. self,
  328. payload: ToolBindingCreateRequestDto) -> ToolBinding:
  329. return self.create_tool_binding(
  330. ToolBindingCreateRequest(
  331. app_id=payload.appId,
  332. tool_connection_id=payload.connectionId,
  333. credential_id=payload.credentialId,
  334. binding_scope=payload.bindingScope,
  335. enabled=True,
  336. config_json=payload.configJson))
  337. def list_tool_bindings(self, app_id: str | None = None) -> list[ToolBinding]:
  338. return self.tool_binding_repository.list_by_scope(app_id=app_id)
  339. def delete_tool_binding(self, payload: ToolBindingDeleteRequestDto) -> bool:
  340. entity = self.tool_binding_repository.get_by_id(binding_id=payload.bindingId)
  341. if entity is None:
  342. return False
  343. self.tool_binding_repository.delete(entity)
  344. return True
  345. def get_tool_binding_from_contract(
  346. self,
  347. payload: ToolBindingDetailRequestDto) -> ToolBinding | None:
  348. return self.tool_binding_repository.get_by_id(binding_id=payload.bindingId)
  349. def update_tool_binding_from_contract(
  350. self,
  351. payload: ToolBindingUpdateRequestDto) -> ToolBinding | None:
  352. entity = self.tool_binding_repository.get_by_id(binding_id=payload.bindingId)
  353. if entity is None:
  354. return None
  355. if payload.credentialId is not None:
  356. entity.credential_id = payload.credentialId
  357. if payload.bindingScope is not None:
  358. entity.binding_scope = payload.bindingScope
  359. if payload.configJson is not None:
  360. entity.config_json = payload.configJson
  361. return self.tool_binding_repository.save(entity)
  362. def create_tool_credential(self, payload: ToolCredentialCreateRequest) -> ToolCredential:
  363. encrypted = self.secret_cipher.encrypt_json(payload.secret_json)
  364. return self.tool_credential_repository.create(
  365. name=payload.name,
  366. credential_type=payload.credential_type,
  367. encrypted_payload_text=encrypted.ciphertext,
  368. secret_fingerprint=encrypted.fingerprint,
  369. encryption_algorithm=encrypted.algorithm,
  370. metadata_json=payload.metadata_json)
  371. def create_tool_credential_from_contract(
  372. self,
  373. payload: ToolCredentialCreateRequestDto) -> ToolCredential:
  374. return self.create_tool_credential(
  375. ToolCredentialCreateRequest(
  376. name=payload.name,
  377. credential_type=payload.credentialType,
  378. secret_json=payload.secretJson,
  379. metadata_json=payload.metadataJson))
  380. def list_tool_credentials(self) -> list[ToolCredential]:
  381. return self.tool_credential_repository.list_all()
  382. def delete_tool_credential(self, payload: ToolCredentialDeleteRequestDto) -> bool:
  383. entity = self.tool_credential_repository.get_by_id(
  384. credential_id=payload.credentialId)
  385. if entity is None:
  386. return False
  387. self.tool_credential_repository.delete(entity)
  388. return True
  389. def get_tool_credential_from_contract(
  390. self,
  391. payload: ToolCredentialDetailRequestDto) -> ToolCredential | None:
  392. return self.tool_credential_repository.get_by_id(
  393. credential_id=payload.credentialId)
  394. def update_tool_credential_from_contract(
  395. self,
  396. payload: ToolCredentialUpdateRequestDto) -> ToolCredential | None:
  397. entity = self.tool_credential_repository.get_by_id(
  398. credential_id=payload.credentialId)
  399. if entity is None:
  400. return None
  401. if payload.name is not None:
  402. entity.name = payload.name
  403. if payload.metadataJson is not None:
  404. entity.metadata_json = payload.metadataJson
  405. return self.tool_credential_repository.save(entity)
  406. def reveal_tool_credential(
  407. self,
  408. *,
  409. credential_id: str) -> tuple[ToolCredential, dict[str, JSONValue]] | None:
  410. credential = self.tool_credential_repository.get_by_id(
  411. credential_id=credential_id)
  412. if credential is None:
  413. return None
  414. secret_json = self.secret_cipher.decrypt_json(
  415. EncryptedSecret(
  416. ciphertext=credential.encrypted_payload_text,
  417. fingerprint=credential.secret_fingerprint,
  418. algorithm=credential.encryption_algorithm)
  419. )
  420. return credential, secret_json
  421. def reveal_tool_credential_from_contract(
  422. self,
  423. *,
  424. credential_id: str) -> ToolCredentialRevealDto | None:
  425. result = self.reveal_tool_credential(credential_id=credential_id)
  426. if result is None:
  427. return None
  428. credential, secret_json = result
  429. return ToolCredentialRevealDto(
  430. credential=ToolCredentialDto.from_entity(credential),
  431. secretJson=secret_json)
  432. def get_tool_binding_detail(
  433. self,
  434. *,
  435. binding_id: str) -> tuple[ToolBinding, ToolConnection, ToolDefinition] | None:
  436. binding = self.tool_binding_repository.get_by_id(binding_id=binding_id)
  437. if binding is None:
  438. return None
  439. tool_connection = self.tool_connection_repository.get_by_id(
  440. tool_connection_id=binding.tool_connection_id)
  441. if tool_connection is None:
  442. return None
  443. tool_definition = self.tool_definition_repository.get_by_id(
  444. tool_id=tool_connection.tool_id)
  445. if tool_definition is None:
  446. return None
  447. return binding, tool_connection, tool_definition
  448. def _build_tool_code(self, name: str) -> str:
  449. base = "".join(
  450. char.lower() if char.isalnum() else "_"
  451. for char in name
  452. ).strip("_") or "tool"
  453. return base[:64]
  454. def _normalize_mcp_config(
  455. self,
  456. payload: McpConnectRequestDto) -> tuple[str, dict[str, JSONValue]]:
  457. config = payload.config
  458. if len(config) == 1:
  459. server_name, value = next(iter(config.items()))
  460. if isinstance(value, dict):
  461. normalized = {str(key): item for key, item in value.items()}
  462. normalized.setdefault("server_name", server_name)
  463. normalized.setdefault("transport", "sse")
  464. return payload.name or server_name, normalized
  465. server_name_value = config.get("server_name") or config.get("serverName")
  466. server_name = str(server_name_value or payload.name or "mcp_server")
  467. normalized = {str(key): value for key, value in config.items()}
  468. normalized.setdefault("server_name", server_name)
  469. normalized.setdefault("transport", "sse")
  470. return server_name, normalized
  471. def _extract_mcp_tools(self, config: dict[str, JSONValue]) -> list[McpToolDto]:
  472. raw_tools = config.get("mcp_tools") or config.get("tools")
  473. if not isinstance(raw_tools, list):
  474. return []
  475. tools: list[McpToolDto] = []
  476. for item in raw_tools:
  477. if not isinstance(item, dict):
  478. continue
  479. name = item.get("name")
  480. if not isinstance(name, str) or not name:
  481. continue
  482. description = item.get("description")
  483. input_schema = item.get("inputSchema") or item.get("input_schema")
  484. tools.append(
  485. McpToolDto(
  486. name=name,
  487. description=description if isinstance(description, str) else None,
  488. inputSchema=input_schema if isinstance(input_schema, dict) else None))
  489. return tools
  490. def _publish_mcp_discovery_job(
  491. self,
  492. *,
  493. connection_id: str,
  494. job_id: str) -> bool:
  495. if not self.settings.mcp_discovery_async_enabled or self.task_queue_publisher is None:
  496. return False
  497. return self.task_queue_publisher.publish_tool_mcp_discovery(
  498. connection_id=connection_id,
  499. job_id=job_id)
  500. def _validate_mcp_connection(self, config: dict[str, JSONValue]) -> None:
  501. url = config.get("url")
  502. if not isinstance(url, str) or not url:
  503. raise ValueError("MCP server url is required")
  504. headers = self._read_mcp_headers(config)
  505. request = urllib.request.Request(
  506. url,
  507. headers={
  508. **headers,
  509. "Accept": headers.get("Accept", "text/event-stream"),
  510. },
  511. method="GET")
  512. timeout = self._read_timeout_seconds(config)
  513. try:
  514. with urllib.request.urlopen(request, timeout=timeout) as response:
  515. status_code = int(getattr(response, "status", 200))
  516. if status_code >= 500:
  517. raise ValueError(f"MCP server returned HTTP {status_code}")
  518. except urllib.error.HTTPError as exc:
  519. if exc.code >= 500:
  520. raise ValueError(f"MCP server returned HTTP {exc.code}") from exc
  521. except Exception as exc:
  522. raise ValueError(f"MCP server connection failed: {exc}") from exc
  523. def _read_mcp_headers(self, config: dict[str, JSONValue]) -> dict[str, str]:
  524. headers = config.get("headers")
  525. if not isinstance(headers, dict):
  526. return {}
  527. return {
  528. str(key): str(value)
  529. for key, value in headers.items()
  530. if isinstance(value, (str, int, float, bool))
  531. }
  532. def _read_timeout_seconds(self, config: dict[str, JSONValue]) -> float:
  533. timeout = config.get("timeout") or config.get("timeout_seconds")
  534. if isinstance(timeout, (int, float)) and not isinstance(timeout, bool):
  535. return min(float(timeout), self.settings.mcp_discovery_timeout_seconds)
  536. return self.settings.mcp_discovery_timeout_seconds
  537. def _build_mcp_status(
  538. self,
  539. *,
  540. job_id: str,
  541. status: str,
  542. progress: int,
  543. worker_key: str | None = None,
  544. error_message: str | None = None) -> dict[str, JSONValue]:
  545. now = datetime.utcnow().isoformat()
  546. payload: dict[str, JSONValue] = {
  547. "jobId": job_id,
  548. "status": status,
  549. "progress": max(0, min(progress, 100)),
  550. "queueName": TOOL_MCP_DISCOVERY_QUEUE,
  551. "workerKey": worker_key,
  552. "errorMessage": error_message,
  553. "updatedTime": now,
  554. }
  555. if status == "queued":
  556. payload["queuedTime"] = now
  557. if status == "running":
  558. payload["startedTime"] = now
  559. if status in {"completed", "failed", "skipped"}:
  560. payload["completedTime"] = now
  561. return payload
  562. def _read_mcp_status(self, config: dict[str, JSONValue]) -> dict[str, JSONValue]:
  563. status_payload = config.get("mcp_status")
  564. if isinstance(status_payload, dict):
  565. return {str(key): value for key, value in status_payload.items()}
  566. return {}
  567. def _read_mcp_job_id(self, config: dict[str, JSONValue]) -> str | None:
  568. job_id = self._read_mcp_status(config).get("jobId")
  569. return job_id if isinstance(job_id, str) and job_id else None
  570. def _timeout_ms(self, config: dict[str, JSONValue]) -> int | None:
  571. timeout = config.get("timeout") or config.get("timeout_seconds")
  572. if isinstance(timeout, int | float):
  573. return int(timeout * 1000)
  574. return None
  575. def build_tool_application_service(
  576. *,
  577. db: Session,
  578. settings: ToolServiceSettings) -> ToolApplicationService:
  579. redis_client = try_build_redis_client(settings.redis_url)
  580. return ToolApplicationService(
  581. tool_definition_repository=ToolDefinitionRepository(db),
  582. tool_connection_repository=ToolConnectionRepository(db),
  583. tool_binding_repository=ToolBindingRepository(db),
  584. tool_credential_repository=ToolCredentialRepository(db),
  585. secret_cipher=SecretCipher(key=settings.credential_encryption_key),
  586. settings=settings,
  587. redis_client=redis_client,
  588. task_queue_publisher=(
  589. TaskQueuePublisher(client=redis_client) if redis_client is not None else None
  590. ))