executors.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056
  1. from abc import ABC, abstractmethod
  2. from dataclasses import dataclass
  3. import re
  4. import httpx
  5. from core_domain import (
  6. ChatCompletionRequestContract,
  7. ChatMessageContract,
  8. CodeExecutionRequestContract,
  9. NodeExecutionContextContract,
  10. NodeExecutionRequestContract,
  11. NodeExecutionResultContract,
  12. ToolBindingDetailContract,
  13. )
  14. from core_shared import JSONValue
  15. from .code_runner_client import CodeRunnerClient, CodeRunnerClientError
  16. from .context import (
  17. build_template_context,
  18. coerce_bool,
  19. evaluate_condition_expression,
  20. render_json_value,
  21. render_template_string,
  22. resolve_expression,
  23. )
  24. from .model_gateway_client import ModelGatewayClient, ModelGatewayClientError
  25. from .tool_client import ToolServiceClient, ToolServiceClientError
  26. class NodeExecutor(ABC):
  27. executor_name: str
  28. supported_node_types: frozenset[str]
  29. @abstractmethod
  30. def execute(
  31. self,
  32. context: NodeExecutionContextContract,
  33. request: NodeExecutionRequestContract,
  34. ) -> NodeExecutionResultContract:
  35. raise NotImplementedError
  36. class CompletedNodeExecutor(NodeExecutor):
  37. def __init__(self, *, executor_name: str, supported_node_types: frozenset[str]) -> None:
  38. self.executor_name = executor_name
  39. self.supported_node_types = supported_node_types
  40. def execute(
  41. self,
  42. context: NodeExecutionContextContract,
  43. request: NodeExecutionRequestContract,
  44. ) -> NodeExecutionResultContract:
  45. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  46. return NodeExecutionResultContract(
  47. status="completed",
  48. worker_key=worker_key,
  49. output_json={
  50. "executor_name": self.executor_name,
  51. "node_type": context.node_type,
  52. },
  53. )
  54. class DefaultNodeExecutor(CompletedNodeExecutor):
  55. def __init__(self) -> None:
  56. super().__init__(
  57. executor_name="default-executor",
  58. supported_node_types=frozenset(),
  59. )
  60. class LLMNodeExecutor(CompletedNodeExecutor):
  61. def __init__(self, model_gateway_client: ModelGatewayClient | None = None) -> None:
  62. super().__init__(
  63. executor_name="llm-executor",
  64. supported_node_types=frozenset({"llm"}),
  65. )
  66. self.model_gateway_client = model_gateway_client
  67. def execute(
  68. self,
  69. context: NodeExecutionContextContract,
  70. request: NodeExecutionRequestContract,
  71. ) -> NodeExecutionResultContract:
  72. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  73. render_context = _build_executor_template_context(context)
  74. rendered_config_json = _render_config_json(context.node_config_json, render_context)
  75. chat_request = _build_chat_completion_request(rendered_config_json)
  76. if chat_request is None:
  77. return NodeExecutionResultContract(
  78. status="failed",
  79. worker_key=worker_key,
  80. error_code="llm_config_missing",
  81. error_message="llm node config requires prompt or messages",
  82. )
  83. if self.model_gateway_client is None:
  84. return NodeExecutionResultContract(
  85. status="failed",
  86. worker_key=worker_key,
  87. error_code="llm_gateway_missing",
  88. error_message="model gateway client is not configured",
  89. )
  90. try:
  91. response = self.model_gateway_client.create_chat_completion(chat_request)
  92. except ModelGatewayClientError as exc:
  93. return NodeExecutionResultContract(
  94. status="failed",
  95. worker_key=worker_key,
  96. error_code="llm_request_failed",
  97. error_message=str(exc),
  98. )
  99. return NodeExecutionResultContract(
  100. status="completed",
  101. worker_key=worker_key,
  102. output_text=response.content,
  103. output_json={
  104. "executor_name": self.executor_name,
  105. "model": response.model,
  106. "finish_reason": response.finish_reason,
  107. "usage_json": response.usage_json,
  108. "raw_response_json": response.raw_response_json,
  109. },
  110. )
  111. class ToolNodeExecutor(CompletedNodeExecutor):
  112. def __init__(self, tool_client: ToolServiceClient | None = None) -> None:
  113. super().__init__(
  114. executor_name="tool-executor",
  115. supported_node_types=frozenset({"tool"}),
  116. )
  117. self.tool_client = tool_client
  118. def execute(
  119. self,
  120. context: NodeExecutionContextContract,
  121. request: NodeExecutionRequestContract,
  122. ) -> NodeExecutionResultContract:
  123. tool_binding_id = _read_string_value(context.node_config_json, "tool_binding_id")
  124. tool_code = _read_string_value(context.node_config_json, "tool_code")
  125. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  126. if tool_binding_id is None and tool_code is None:
  127. return NodeExecutionResultContract(
  128. status="failed",
  129. worker_key=worker_key,
  130. error_code="tool_config_missing",
  131. error_message="tool node config requires tool_binding_id or tool_code",
  132. )
  133. if tool_binding_id is not None and self.tool_client is not None:
  134. try:
  135. detail = self.tool_client.get_tool_binding_detail(
  136. tenant_id=context.tenant_id,
  137. binding_id=tool_binding_id,
  138. )
  139. except ToolServiceClientError as exc:
  140. return NodeExecutionResultContract(
  141. status="failed",
  142. worker_key=worker_key,
  143. error_code="tool_binding_lookup_failed",
  144. error_message=str(exc),
  145. )
  146. if not detail.binding.enabled:
  147. return NodeExecutionResultContract(
  148. status="failed",
  149. worker_key=worker_key,
  150. error_code="tool_binding_disabled",
  151. error_message=f"tool binding is disabled: {tool_binding_id}",
  152. )
  153. resolved_tool_code = detail.tool_definition.code
  154. resolved_tool_version_id = detail.tool_version.id
  155. resolved_tool_name = detail.tool_definition.name
  156. invoke_result = self._invoke_http_tool(
  157. context=context,
  158. detail=detail,
  159. worker_key=worker_key,
  160. )
  161. if invoke_result is not None:
  162. return invoke_result
  163. else:
  164. resolved_tool_code = tool_code
  165. resolved_tool_version_id = None
  166. resolved_tool_name = None
  167. return NodeExecutionResultContract(
  168. status="completed",
  169. worker_key=worker_key,
  170. output_text=f"tool node completed: {resolved_tool_code or 'unknown-tool'}",
  171. output_json={
  172. "executor_name": self.executor_name,
  173. "tool_binding_id": tool_binding_id,
  174. "tool_code": resolved_tool_code,
  175. "tool_version_id": resolved_tool_version_id,
  176. "tool_name": resolved_tool_name,
  177. },
  178. )
  179. def _invoke_http_tool(
  180. self,
  181. *,
  182. context: NodeExecutionContextContract,
  183. detail: ToolBindingDetailContract,
  184. worker_key: str,
  185. ) -> NodeExecutionResultContract | None:
  186. if detail.tool_definition.tool_type != "http":
  187. return None
  188. invoke_config_json = detail.tool_version.invoke_config_json or {}
  189. binding_config_json = detail.binding.config_json or {}
  190. render_context = _build_executor_template_context(context)
  191. request_headers = _merge_json_dicts(
  192. _render_json_dict(_read_dict_value(invoke_config_json, "headers"), render_context),
  193. _render_json_dict(_read_dict_value(binding_config_json, "headers"), render_context),
  194. _render_json_dict(_read_dict_value(context.node_config_json, "headers"), render_context),
  195. )
  196. request_query = _merge_json_dicts(
  197. _render_json_dict(_read_dict_value(invoke_config_json, "query"), render_context),
  198. _render_json_dict(_read_dict_value(context.node_config_json, "query"), render_context),
  199. )
  200. request_body = _merge_json_dicts(
  201. _render_json_dict(_read_dict_value(invoke_config_json, "body"), render_context),
  202. _render_json_dict(_read_dict_value(context.node_config_json, "body"), render_context),
  203. )
  204. method = (_read_string_value(invoke_config_json, "method") or "GET").upper()
  205. base_url = (
  206. _read_string_value(context.node_config_json, "base_url")
  207. or _read_string_value(binding_config_json, "base_url")
  208. or _read_string_value(invoke_config_json, "base_url")
  209. )
  210. path = _read_string_value(context.node_config_json, "path") or _read_string_value(
  211. invoke_config_json, "path"
  212. )
  213. url = _read_string_value(context.node_config_json, "url") or _read_string_value(
  214. invoke_config_json, "url"
  215. )
  216. resolved_url = _resolve_http_url(url=url, base_url=base_url, path=path)
  217. if resolved_url is None:
  218. return NodeExecutionResultContract(
  219. status="failed",
  220. worker_key=worker_key,
  221. error_code="tool_http_url_missing",
  222. error_message="http tool requires url or base_url with path",
  223. )
  224. timeout_ms = detail.tool_version.timeout_ms or 10000
  225. try:
  226. with httpx.Client(timeout=timeout_ms / 1000) as client:
  227. response = client.request(
  228. method=method,
  229. url=resolved_url,
  230. params=_coerce_http_params(request_query),
  231. headers=_coerce_http_headers(request_headers),
  232. json=request_body if request_body else None,
  233. )
  234. response.raise_for_status()
  235. except httpx.HTTPError as exc:
  236. return NodeExecutionResultContract(
  237. status="failed",
  238. worker_key=worker_key,
  239. error_code="tool_http_request_failed",
  240. error_message=str(exc),
  241. output_json={
  242. "executor_name": self.executor_name,
  243. "tool_binding_id": detail.binding.id,
  244. "tool_code": detail.tool_definition.code,
  245. "request_url": resolved_url,
  246. "request_method": method,
  247. },
  248. )
  249. response_json = _try_parse_json_response(response)
  250. response_text = None if response_json is not None else response.text
  251. return NodeExecutionResultContract(
  252. status="completed",
  253. worker_key=worker_key,
  254. output_text=response_text,
  255. output_json={
  256. "executor_name": self.executor_name,
  257. "tool_binding_id": detail.binding.id,
  258. "tool_code": detail.tool_definition.code,
  259. "tool_version_id": detail.tool_version.id,
  260. "tool_name": detail.tool_definition.name,
  261. "request_url": resolved_url,
  262. "request_method": method,
  263. "response_status_code": response.status_code,
  264. "response_headers": dict(response.headers),
  265. "response_json": response_json,
  266. },
  267. )
  268. class CodeNodeExecutor(CompletedNodeExecutor):
  269. def __init__(self, code_runner_client: CodeRunnerClient | None = None) -> None:
  270. super().__init__(
  271. executor_name="code-executor",
  272. supported_node_types=frozenset({"code"}),
  273. )
  274. self.code_runner_client = code_runner_client
  275. def execute(
  276. self,
  277. context: NodeExecutionContextContract,
  278. request: NodeExecutionRequestContract,
  279. ) -> NodeExecutionResultContract:
  280. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  281. code = _read_string_value(context.node_config_json, "code")
  282. if code is None:
  283. return NodeExecutionResultContract(
  284. status="failed",
  285. worker_key=worker_key,
  286. error_code="code_config_missing",
  287. error_message="code node config requires code",
  288. )
  289. if self.code_runner_client is None:
  290. return NodeExecutionResultContract(
  291. status="failed",
  292. worker_key=worker_key,
  293. error_code="code_runner_missing",
  294. error_message="code runner client is not configured",
  295. )
  296. render_context = _build_executor_template_context(context)
  297. input_json = _render_json_dict(
  298. _read_dict_value(context.node_config_json, "input_json"),
  299. render_context,
  300. )
  301. language = _read_string_value(context.node_config_json, "language") or "python"
  302. timeout_seconds = _read_int_value(context.node_config_json, "timeout_seconds") or 10
  303. code_request = CodeExecutionRequestContract(
  304. language=language,
  305. code=code,
  306. input_json=input_json,
  307. timeout_seconds=timeout_seconds,
  308. )
  309. try:
  310. response = self.code_runner_client.execute_code(code_request)
  311. except CodeRunnerClientError as exc:
  312. return NodeExecutionResultContract(
  313. status="failed",
  314. worker_key=worker_key,
  315. error_code="code_request_failed",
  316. error_message=str(exc),
  317. )
  318. if not response.success:
  319. return NodeExecutionResultContract(
  320. status="failed",
  321. worker_key=worker_key,
  322. error_code="code_execution_failed",
  323. error_message=response.error_message or response.stderr,
  324. output_text=response.stdout,
  325. output_json={
  326. "executor_name": self.executor_name,
  327. "stderr": response.stderr,
  328. "output_json": response.output_json,
  329. },
  330. )
  331. return NodeExecutionResultContract(
  332. status="completed",
  333. worker_key=worker_key,
  334. output_text=response.stdout,
  335. output_json={
  336. "executor_name": self.executor_name,
  337. "stderr": response.stderr,
  338. "result_json": response.output_json,
  339. },
  340. )
  341. class AnswerNodeExecutor(CompletedNodeExecutor):
  342. def execute(
  343. self,
  344. context: NodeExecutionContextContract,
  345. request: NodeExecutionRequestContract,
  346. ) -> NodeExecutionResultContract:
  347. answer_text = _read_string_value(context.node_config_json, "text")
  348. template = _read_string_value(context.node_config_json, "template")
  349. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  350. if answer_text is None and template is None:
  351. return NodeExecutionResultContract(
  352. status="failed",
  353. worker_key=worker_key,
  354. error_code="answer_config_missing",
  355. error_message="answer node config requires text or template",
  356. )
  357. render_context = _build_executor_template_context(context)
  358. rendered_text = render_template_string(answer_text or template or "", render_context)
  359. return NodeExecutionResultContract(
  360. status="completed",
  361. worker_key=worker_key,
  362. output_text=rendered_text,
  363. output_json={
  364. "executor_name": self.executor_name,
  365. "render_mode": "text" if answer_text is not None else "template",
  366. },
  367. )
  368. def __init__(self) -> None:
  369. super().__init__(
  370. executor_name="answer-executor",
  371. supported_node_types=frozenset({"answer"}),
  372. )
  373. class ConditionNodeExecutor(CompletedNodeExecutor):
  374. def execute(
  375. self,
  376. context: NodeExecutionContextContract,
  377. request: NodeExecutionRequestContract,
  378. ) -> NodeExecutionResultContract:
  379. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  380. render_context = _build_executor_template_context(context)
  381. expression = _read_string_value(context.node_config_json, "expression")
  382. path = _read_string_value(context.node_config_json, "path")
  383. if expression is not None:
  384. condition_result = evaluate_condition_expression(expression, render_context)
  385. evaluated_expression = expression
  386. elif path is not None:
  387. condition_result = _evaluate_path_condition(context.node_config_json, path, render_context)
  388. evaluated_expression = path
  389. else:
  390. return NodeExecutionResultContract(
  391. status="failed",
  392. worker_key=worker_key,
  393. error_code="condition_config_missing",
  394. error_message="condition node config requires expression or path",
  395. )
  396. route = "true" if condition_result else "false"
  397. return NodeExecutionResultContract(
  398. status="completed",
  399. worker_key=worker_key,
  400. output_json={
  401. "executor_name": self.executor_name,
  402. "condition_result": condition_result,
  403. "route": route,
  404. "evaluated_expression": evaluated_expression,
  405. },
  406. )
  407. def __init__(self) -> None:
  408. super().__init__(
  409. executor_name="condition-executor",
  410. supported_node_types=frozenset({"if-else", "condition"}),
  411. )
  412. class AssignerNodeExecutor(CompletedNodeExecutor):
  413. def execute(
  414. self,
  415. context: NodeExecutionContextContract,
  416. request: NodeExecutionRequestContract,
  417. ) -> NodeExecutionResultContract:
  418. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  419. assignments = _read_dict_value(context.node_config_json, "assignments")
  420. if not assignments:
  421. return NodeExecutionResultContract(
  422. status="failed",
  423. worker_key=worker_key,
  424. error_code="assignments_missing",
  425. error_message="assigner node config requires assignments",
  426. )
  427. render_context = _build_executor_template_context(context)
  428. rendered_assignments = _render_json_dict(assignments, render_context)
  429. return NodeExecutionResultContract(
  430. status="completed",
  431. worker_key=worker_key,
  432. output_json={
  433. "executor_name": self.executor_name,
  434. "assigned_values": rendered_assignments,
  435. "state_updates": rendered_assignments,
  436. },
  437. )
  438. def __init__(self) -> None:
  439. super().__init__(
  440. executor_name="assigner-executor",
  441. supported_node_types=frozenset({"assigner"}),
  442. )
  443. class RetrieverNodeExecutor(CompletedNodeExecutor):
  444. def execute(
  445. self,
  446. context: NodeExecutionContextContract,
  447. request: NodeExecutionRequestContract,
  448. ) -> NodeExecutionResultContract:
  449. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  450. render_context = _build_executor_template_context(context)
  451. query = _resolve_retriever_query(context.node_config_json, render_context)
  452. documents = _read_retriever_documents(context.node_config_json, render_context)
  453. source_url = _read_string_value(context.node_config_json, "source_url")
  454. top_k = _read_int_value(context.node_config_json, "top_k") or 3
  455. if query is None:
  456. return NodeExecutionResultContract(
  457. status="failed",
  458. worker_key=worker_key,
  459. error_code="retriever_query_missing",
  460. error_message="retriever node config requires query or query_template",
  461. )
  462. if source_url is not None:
  463. try:
  464. documents.extend(
  465. _fetch_retriever_documents_from_url(
  466. source_url=render_template_string(source_url, render_context),
  467. timeout_ms=_read_int_value(context.node_config_json, "timeout_ms") or 10000,
  468. render_context=render_context,
  469. )
  470. )
  471. except httpx.HTTPError as exc:
  472. return NodeExecutionResultContract(
  473. status="failed",
  474. worker_key=worker_key,
  475. error_code="retriever_source_request_failed",
  476. error_message=str(exc),
  477. )
  478. except ValueError as exc:
  479. return NodeExecutionResultContract(
  480. status="failed",
  481. worker_key=worker_key,
  482. error_code="retriever_source_invalid",
  483. error_message=str(exc),
  484. )
  485. if not documents:
  486. return NodeExecutionResultContract(
  487. status="failed",
  488. worker_key=worker_key,
  489. error_code="retriever_documents_missing",
  490. error_message="retriever node config requires non-empty documents",
  491. )
  492. ranked_documents = rank_documents(query=query, documents=documents, top_k=top_k)
  493. output_documents = [item.to_output_json() for item in ranked_documents]
  494. output_text = "\n\n".join(item.text for item in ranked_documents)
  495. return NodeExecutionResultContract(
  496. status="completed",
  497. worker_key=worker_key,
  498. output_text=output_text,
  499. output_json={
  500. "executor_name": self.executor_name,
  501. "query": query,
  502. "top_k": top_k,
  503. "retrieved_documents": output_documents,
  504. },
  505. )
  506. def __init__(self) -> None:
  507. super().__init__(
  508. executor_name="retriever-executor",
  509. supported_node_types=frozenset({"knowledge-retrieval", "retriever"}),
  510. )
  511. class TemplateNodeExecutor(CompletedNodeExecutor):
  512. def execute(
  513. self,
  514. context: NodeExecutionContextContract,
  515. request: NodeExecutionRequestContract,
  516. ) -> NodeExecutionResultContract:
  517. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  518. render_context = _build_executor_template_context(context)
  519. template = _read_string_value(context.node_config_json, "template")
  520. template_json = _read_dict_value(context.node_config_json, "template_json")
  521. if template is None and not template_json:
  522. return NodeExecutionResultContract(
  523. status="failed",
  524. worker_key=worker_key,
  525. error_code="template_config_missing",
  526. error_message="template node config requires template or template_json",
  527. )
  528. rendered_text = None
  529. rendered_json = None
  530. if template is not None:
  531. rendered_text = render_template_string(template, render_context)
  532. if template_json:
  533. rendered_json = _render_json_dict(template_json, render_context)
  534. output_json: dict[str, JSONValue] = {"executor_name": self.executor_name}
  535. if rendered_json is not None:
  536. output_json["rendered_json"] = rendered_json
  537. return NodeExecutionResultContract(
  538. status="completed",
  539. worker_key=worker_key,
  540. output_text=rendered_text,
  541. output_json=output_json,
  542. )
  543. def __init__(self) -> None:
  544. super().__init__(
  545. executor_name="template-executor",
  546. supported_node_types=frozenset({"template-transform", "template"}),
  547. )
  548. class NodeExecutionDispatcher:
  549. def __init__(
  550. self,
  551. executors: list[NodeExecutor],
  552. default_executor: NodeExecutor,
  553. ) -> None:
  554. self.executors = executors
  555. self.default_executor = default_executor
  556. def resolve_executor(self, node_type: str) -> NodeExecutor:
  557. for executor in self.executors:
  558. if node_type in executor.supported_node_types:
  559. return executor
  560. return self.default_executor
  561. def execute(
  562. self,
  563. context: NodeExecutionContextContract,
  564. request: NodeExecutionRequestContract,
  565. ) -> tuple[NodeExecutionResultContract, str]:
  566. executor = self.resolve_executor(context.node_type)
  567. result = executor.execute(context, request)
  568. return result, executor.executor_name
  569. def build_node_execution_dispatcher() -> NodeExecutionDispatcher:
  570. executors: list[NodeExecutor] = [
  571. LLMNodeExecutor(),
  572. ToolNodeExecutor(),
  573. CodeNodeExecutor(),
  574. AnswerNodeExecutor(),
  575. ConditionNodeExecutor(),
  576. AssignerNodeExecutor(),
  577. RetrieverNodeExecutor(),
  578. TemplateNodeExecutor(),
  579. ]
  580. return NodeExecutionDispatcher(
  581. executors=executors,
  582. default_executor=DefaultNodeExecutor(),
  583. )
  584. def build_node_execution_dispatcher_with_clients(
  585. *,
  586. code_runner_client: CodeRunnerClient | None = None,
  587. model_gateway_client: ModelGatewayClient | None = None,
  588. tool_client: ToolServiceClient | None = None,
  589. ) -> NodeExecutionDispatcher:
  590. executors: list[NodeExecutor] = [
  591. LLMNodeExecutor(model_gateway_client=model_gateway_client),
  592. ToolNodeExecutor(tool_client=tool_client),
  593. CodeNodeExecutor(code_runner_client=code_runner_client),
  594. AnswerNodeExecutor(),
  595. ConditionNodeExecutor(),
  596. AssignerNodeExecutor(),
  597. RetrieverNodeExecutor(),
  598. TemplateNodeExecutor(),
  599. ]
  600. return NodeExecutionDispatcher(
  601. executors=executors,
  602. default_executor=DefaultNodeExecutor(),
  603. )
  604. def _read_string_value(payload: dict[str, JSONValue], key: str) -> str | None:
  605. value = payload.get(key)
  606. if isinstance(value, str):
  607. return value
  608. return None
  609. def _read_dict_value(payload: dict[str, JSONValue], key: str) -> dict[str, JSONValue]:
  610. value = payload.get(key)
  611. if isinstance(value, dict):
  612. return {str(item_key): item_value for item_key, item_value in value.items()}
  613. return {}
  614. def _merge_json_dicts(*items: dict[str, JSONValue]) -> dict[str, JSONValue]:
  615. merged: dict[str, JSONValue] = {}
  616. for item in items:
  617. merged.update(item)
  618. return merged
  619. def _render_json_dict(
  620. payload: dict[str, JSONValue],
  621. context: dict[str, JSONValue],
  622. ) -> dict[str, JSONValue]:
  623. rendered = render_json_value(payload, context)
  624. if isinstance(rendered, dict):
  625. return {str(key): value for key, value in rendered.items()}
  626. return {}
  627. def _render_config_json(
  628. payload: dict[str, JSONValue],
  629. context: dict[str, JSONValue],
  630. ) -> dict[str, JSONValue]:
  631. return _render_json_dict(payload, context)
  632. def _resolve_http_url(*, url: str | None, base_url: str | None, path: str | None) -> str | None:
  633. if url is not None:
  634. return url
  635. if base_url is None or path is None:
  636. return None
  637. return f"{base_url.rstrip('/')}/{path.lstrip('/')}"
  638. def _coerce_http_headers(payload: dict[str, JSONValue]) -> dict[str, str]:
  639. headers: dict[str, str] = {}
  640. for key, value in payload.items():
  641. if isinstance(value, (str, int, float, bool)):
  642. headers[key] = str(value)
  643. return headers
  644. def _coerce_http_params(payload: dict[str, JSONValue]) -> dict[str, str]:
  645. params: dict[str, str] = {}
  646. for key, value in payload.items():
  647. if isinstance(value, (str, int, float, bool)):
  648. params[key] = str(value)
  649. return params
  650. def _try_parse_json_response(response: httpx.Response) -> JSONValue | None:
  651. content_type = response.headers.get("content-type", "")
  652. if "json" not in content_type.lower():
  653. return None
  654. try:
  655. payload = response.json()
  656. except ValueError:
  657. return None
  658. if isinstance(payload, (dict, list, str, int, float, bool)) or payload is None:
  659. return payload
  660. return None
  661. def _build_chat_completion_request(
  662. payload: dict[str, JSONValue],
  663. ) -> ChatCompletionRequestContract | None:
  664. messages = _read_message_list(payload, "messages")
  665. if not messages:
  666. system_prompt = _read_string_value(payload, "system_prompt")
  667. prompt = _read_string_value(payload, "prompt")
  668. if system_prompt is not None:
  669. messages.append(ChatMessageContract(role="system", content=system_prompt))
  670. if prompt is not None:
  671. messages.append(ChatMessageContract(role="user", content=prompt))
  672. if not messages:
  673. return None
  674. temperature = _read_float_value(payload, "temperature")
  675. max_tokens = _read_int_value(payload, "max_tokens")
  676. model = _read_string_value(payload, "model")
  677. return ChatCompletionRequestContract(
  678. model=model,
  679. messages=messages,
  680. temperature=temperature,
  681. max_tokens=max_tokens,
  682. )
  683. def _read_message_list(
  684. payload: dict[str, JSONValue],
  685. key: str,
  686. ) -> list[ChatMessageContract]:
  687. value = payload.get(key)
  688. if not isinstance(value, list):
  689. return []
  690. messages: list[ChatMessageContract] = []
  691. for item in value:
  692. if not isinstance(item, dict):
  693. continue
  694. role = item.get("role")
  695. content = item.get("content")
  696. name = item.get("name")
  697. if isinstance(role, str) and isinstance(content, str):
  698. messages.append(
  699. ChatMessageContract(
  700. role=role,
  701. content=content,
  702. name=name if isinstance(name, str) else None,
  703. )
  704. )
  705. return messages
  706. def _read_float_value(payload: dict[str, JSONValue], key: str) -> float | None:
  707. value = payload.get(key)
  708. if isinstance(value, (int, float)) and not isinstance(value, bool):
  709. return float(value)
  710. return None
  711. def _read_int_value(payload: dict[str, JSONValue], key: str) -> int | None:
  712. value = payload.get(key)
  713. if isinstance(value, int) and not isinstance(value, bool):
  714. return value
  715. return None
  716. def _build_executor_template_context(context: NodeExecutionContextContract) -> dict[str, JSONValue]:
  717. return build_template_context(
  718. node_id=context.node_id,
  719. node_type=context.node_type,
  720. run_state_json=context.run_state_json,
  721. node_output_json_by_node_id=context.node_output_json_by_node_id,
  722. node_output_text_by_node_id=context.node_output_text_by_node_id,
  723. )
  724. def _evaluate_path_condition(
  725. payload: dict[str, JSONValue],
  726. path: str,
  727. render_context: dict[str, JSONValue],
  728. ) -> bool:
  729. value = resolve_expression(render_context, path)
  730. if "equals" in payload:
  731. return value == render_json_value(payload["equals"], render_context)
  732. if "not_equals" in payload:
  733. return value != render_json_value(payload["not_equals"], render_context)
  734. if "gt" in payload:
  735. return _compare_numeric(value, render_json_value(payload["gt"], render_context), ">")
  736. if "gte" in payload:
  737. return _compare_numeric(value, render_json_value(payload["gte"], render_context), ">=")
  738. if "lt" in payload:
  739. return _compare_numeric(value, render_json_value(payload["lt"], render_context), "<")
  740. if "lte" in payload:
  741. return _compare_numeric(value, render_json_value(payload["lte"], render_context), "<=")
  742. if "exists" in payload:
  743. expected = payload["exists"]
  744. if isinstance(expected, bool):
  745. return (value is not None) is expected
  746. return coerce_bool(value)
  747. def _compare_numeric(left: JSONValue, right: JSONValue, operator: str) -> bool:
  748. if not isinstance(left, (int, float)) or not isinstance(right, (int, float)):
  749. return False
  750. if operator == ">":
  751. return left > right
  752. if operator == ">=":
  753. return left >= right
  754. if operator == "<":
  755. return left < right
  756. if operator == "<=":
  757. return left <= right
  758. return False
  759. @dataclass(frozen=True)
  760. class RetrieverDocument:
  761. document_id: str
  762. title: str | None
  763. text: str
  764. metadata: dict[str, JSONValue]
  765. @dataclass(frozen=True)
  766. class RankedRetrieverDocument:
  767. document_id: str
  768. title: str | None
  769. text: str
  770. metadata: dict[str, JSONValue]
  771. score: float
  772. def to_output_json(self) -> dict[str, JSONValue]:
  773. return {
  774. "document_id": self.document_id,
  775. "title": self.title,
  776. "text": self.text,
  777. "metadata": self.metadata,
  778. "score": self.score,
  779. }
  780. def _resolve_retriever_query(
  781. payload: dict[str, JSONValue],
  782. render_context: dict[str, JSONValue],
  783. ) -> str | None:
  784. query = _read_string_value(payload, "query")
  785. query_template = _read_string_value(payload, "query_template")
  786. if query is not None:
  787. rendered_query = render_template_string(query, render_context)
  788. elif query_template is not None:
  789. rendered_query = render_template_string(query_template, render_context)
  790. else:
  791. return None
  792. stripped_query = rendered_query.strip()
  793. if not stripped_query:
  794. return None
  795. return stripped_query
  796. def _read_retriever_documents(
  797. payload: dict[str, JSONValue],
  798. render_context: dict[str, JSONValue],
  799. ) -> list[RetrieverDocument]:
  800. value = payload.get("documents")
  801. if not isinstance(value, list):
  802. return []
  803. documents: list[RetrieverDocument] = []
  804. for index, item in enumerate(value):
  805. document = _parse_retriever_document(
  806. item,
  807. index=index,
  808. render_context=render_context,
  809. )
  810. if document is not None:
  811. documents.append(document)
  812. return documents
  813. def _fetch_retriever_documents_from_url(
  814. *,
  815. source_url: str,
  816. timeout_ms: int,
  817. render_context: dict[str, JSONValue],
  818. ) -> list[RetrieverDocument]:
  819. if not source_url.strip():
  820. return []
  821. with httpx.Client(timeout=timeout_ms / 1000) as client:
  822. response = client.get(source_url)
  823. response.raise_for_status()
  824. payload = response.json()
  825. if isinstance(payload, dict):
  826. documents_payload = payload.get("documents")
  827. else:
  828. documents_payload = payload
  829. if not isinstance(documents_payload, list):
  830. raise ValueError("retriever source must return a JSON list or object.documents list")
  831. documents: list[RetrieverDocument] = []
  832. for index, item in enumerate(documents_payload):
  833. if not _is_json_value(item):
  834. continue
  835. document = _parse_retriever_document(
  836. item,
  837. index=index,
  838. render_context=render_context,
  839. )
  840. if document is not None:
  841. documents.append(document)
  842. return documents
  843. def _parse_retriever_document(
  844. value: JSONValue,
  845. *,
  846. index: int,
  847. render_context: dict[str, JSONValue],
  848. ) -> RetrieverDocument | None:
  849. if isinstance(value, str):
  850. text = render_template_string(value, render_context).strip()
  851. if not text:
  852. return None
  853. return RetrieverDocument(
  854. document_id=f"doc-{index + 1}",
  855. title=None,
  856. text=text,
  857. metadata={},
  858. )
  859. if not isinstance(value, dict):
  860. return None
  861. rendered = _render_json_dict({str(key): item for key, item in value.items()}, render_context)
  862. text_value = rendered.get("text") or rendered.get("content")
  863. if not isinstance(text_value, str) or not text_value.strip():
  864. return None
  865. document_id_value = rendered.get("id") or rendered.get("document_id")
  866. title_value = rendered.get("title")
  867. metadata_value = rendered.get("metadata")
  868. return RetrieverDocument(
  869. document_id=str(document_id_value) if document_id_value is not None else f"doc-{index + 1}",
  870. title=title_value if isinstance(title_value, str) else None,
  871. text=text_value.strip(),
  872. metadata=metadata_value if isinstance(metadata_value, dict) else {},
  873. )
  874. def rank_documents(
  875. *,
  876. query: str,
  877. documents: list[RetrieverDocument],
  878. top_k: int,
  879. ) -> list[RankedRetrieverDocument]:
  880. normalized_top_k = max(top_k, 1)
  881. query_tokens = tokenize_text(query)
  882. ranked_documents: list[RankedRetrieverDocument] = []
  883. for document in documents:
  884. document_tokens = tokenize_text(" ".join(filter(None, [document.title, document.text])))
  885. score = calculate_keyword_score(query_tokens=query_tokens, document_tokens=document_tokens)
  886. ranked_documents.append(
  887. RankedRetrieverDocument(
  888. document_id=document.document_id,
  889. title=document.title,
  890. text=document.text,
  891. metadata=document.metadata,
  892. score=score,
  893. )
  894. )
  895. ranked_documents.sort(key=lambda item: item.score, reverse=True)
  896. return ranked_documents[:normalized_top_k]
  897. def calculate_keyword_score(
  898. *,
  899. query_tokens: set[str],
  900. document_tokens: set[str],
  901. ) -> float:
  902. if not query_tokens or not document_tokens:
  903. return 0.0
  904. overlap_count = len(query_tokens.intersection(document_tokens))
  905. if overlap_count == 0:
  906. return 0.0
  907. return round(overlap_count / len(query_tokens), 4)
  908. def tokenize_text(value: str) -> set[str]:
  909. tokens = {item.lower() for item in re.findall(r"[\w\u4e00-\u9fff]+", value)}
  910. return {item for item in tokens if item}
  911. def _is_json_value(value: object) -> bool:
  912. if value is None or isinstance(value, (str, int, float, bool)):
  913. return True
  914. if isinstance(value, list):
  915. return all(_is_json_value(item) for item in value)
  916. if isinstance(value, dict):
  917. return all(isinstance(key, str) and _is_json_value(item) for key, item in value.items())
  918. return False