executors.py 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150
  1. from abc import ABC, abstractmethod
  2. from dataclasses import dataclass
  3. import re
  4. import httpx
  5. from core_domain import (
  6. ChatCompletionRequestContract,
  7. ChatMessageContract,
  8. CodeExecutionRequestContract,
  9. KnowledgeSearchRequestContract,
  10. NodeExecutionContextContract,
  11. NodeExecutionRequestContract,
  12. NodeExecutionResultContract,
  13. ToolBindingDetailContract,
  14. )
  15. from core_shared import JSONValue
  16. from .code_runner_client import CodeRunnerClient, CodeRunnerClientError
  17. from .context import (
  18. build_template_context,
  19. coerce_bool,
  20. evaluate_condition_expression,
  21. render_json_value,
  22. render_template_string,
  23. resolve_expression,
  24. )
  25. from .knowledge_client import KnowledgeServiceClient, KnowledgeServiceClientError
  26. from .model_gateway_client import ModelGatewayClient, ModelGatewayClientError
  27. from .tool_client import ToolServiceClient, ToolServiceClientError
  28. class NodeExecutor(ABC):
  29. executor_name: str
  30. supported_node_types: frozenset[str]
  31. @abstractmethod
  32. def execute(
  33. self,
  34. context: NodeExecutionContextContract,
  35. request: NodeExecutionRequestContract,
  36. ) -> NodeExecutionResultContract:
  37. raise NotImplementedError
  38. class CompletedNodeExecutor(NodeExecutor):
  39. def __init__(self, *, executor_name: str, supported_node_types: frozenset[str]) -> None:
  40. self.executor_name = executor_name
  41. self.supported_node_types = supported_node_types
  42. def execute(
  43. self,
  44. context: NodeExecutionContextContract,
  45. request: NodeExecutionRequestContract,
  46. ) -> NodeExecutionResultContract:
  47. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  48. return NodeExecutionResultContract(
  49. status="completed",
  50. worker_key=worker_key,
  51. output_json={
  52. "executor_name": self.executor_name,
  53. "node_type": context.node_type,
  54. },
  55. )
  56. class DefaultNodeExecutor(CompletedNodeExecutor):
  57. def __init__(self) -> None:
  58. super().__init__(
  59. executor_name="default-executor",
  60. supported_node_types=frozenset(),
  61. )
  62. class LLMNodeExecutor(CompletedNodeExecutor):
  63. def __init__(self, model_gateway_client: ModelGatewayClient | None = None) -> None:
  64. super().__init__(
  65. executor_name="llm-executor",
  66. supported_node_types=frozenset({"llm"}),
  67. )
  68. self.model_gateway_client = model_gateway_client
  69. def execute(
  70. self,
  71. context: NodeExecutionContextContract,
  72. request: NodeExecutionRequestContract,
  73. ) -> NodeExecutionResultContract:
  74. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  75. render_context = _build_executor_template_context(context)
  76. rendered_config_json = _render_config_json(context.node_config_json, render_context)
  77. chat_request = _build_chat_completion_request(rendered_config_json)
  78. if chat_request is None:
  79. return NodeExecutionResultContract(
  80. status="failed",
  81. worker_key=worker_key,
  82. error_code="llm_config_missing",
  83. error_message="llm node config requires prompt or messages",
  84. )
  85. if self.model_gateway_client is None:
  86. return NodeExecutionResultContract(
  87. status="failed",
  88. worker_key=worker_key,
  89. error_code="llm_gateway_missing",
  90. error_message="model gateway client is not configured",
  91. )
  92. try:
  93. response = self.model_gateway_client.create_chat_completion(chat_request)
  94. except ModelGatewayClientError as exc:
  95. return NodeExecutionResultContract(
  96. status="failed",
  97. worker_key=worker_key,
  98. error_code="llm_request_failed",
  99. error_message=str(exc),
  100. )
  101. return NodeExecutionResultContract(
  102. status="completed",
  103. worker_key=worker_key,
  104. output_text=response.content,
  105. output_json={
  106. "executor_name": self.executor_name,
  107. "model": response.model,
  108. "finish_reason": response.finish_reason,
  109. "usage_json": response.usage_json,
  110. "raw_response_json": response.raw_response_json,
  111. },
  112. )
  113. class ToolNodeExecutor(CompletedNodeExecutor):
  114. def __init__(self, tool_client: ToolServiceClient | None = None) -> None:
  115. super().__init__(
  116. executor_name="tool-executor",
  117. supported_node_types=frozenset({"tool"}),
  118. )
  119. self.tool_client = tool_client
  120. def execute(
  121. self,
  122. context: NodeExecutionContextContract,
  123. request: NodeExecutionRequestContract,
  124. ) -> NodeExecutionResultContract:
  125. tool_binding_id = _read_string_value(context.node_config_json, "tool_binding_id")
  126. tool_code = _read_string_value(context.node_config_json, "tool_code")
  127. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  128. if tool_binding_id is None and tool_code is None:
  129. return NodeExecutionResultContract(
  130. status="failed",
  131. worker_key=worker_key,
  132. error_code="tool_config_missing",
  133. error_message="tool node config requires tool_binding_id or tool_code",
  134. )
  135. if tool_binding_id is not None and self.tool_client is not None:
  136. try:
  137. detail = self.tool_client.get_tool_binding_detail(
  138. tenant_id=context.tenant_id,
  139. binding_id=tool_binding_id,
  140. )
  141. except ToolServiceClientError as exc:
  142. return NodeExecutionResultContract(
  143. status="failed",
  144. worker_key=worker_key,
  145. error_code="tool_binding_lookup_failed",
  146. error_message=str(exc),
  147. )
  148. if not detail.binding.enabled:
  149. return NodeExecutionResultContract(
  150. status="failed",
  151. worker_key=worker_key,
  152. error_code="tool_binding_disabled",
  153. error_message=f"tool binding is disabled: {tool_binding_id}",
  154. )
  155. resolved_tool_code = detail.tool_definition.code
  156. resolved_tool_version_id = detail.tool_version.id
  157. resolved_tool_name = detail.tool_definition.name
  158. invoke_result = self._invoke_http_tool(
  159. context=context,
  160. detail=detail,
  161. worker_key=worker_key,
  162. )
  163. if invoke_result is not None:
  164. return invoke_result
  165. else:
  166. resolved_tool_code = tool_code
  167. resolved_tool_version_id = None
  168. resolved_tool_name = None
  169. return NodeExecutionResultContract(
  170. status="completed",
  171. worker_key=worker_key,
  172. output_text=f"tool node completed: {resolved_tool_code or 'unknown-tool'}",
  173. output_json={
  174. "executor_name": self.executor_name,
  175. "tool_binding_id": tool_binding_id,
  176. "tool_code": resolved_tool_code,
  177. "tool_version_id": resolved_tool_version_id,
  178. "tool_name": resolved_tool_name,
  179. },
  180. )
  181. def _invoke_http_tool(
  182. self,
  183. *,
  184. context: NodeExecutionContextContract,
  185. detail: ToolBindingDetailContract,
  186. worker_key: str,
  187. ) -> NodeExecutionResultContract | None:
  188. if detail.tool_definition.tool_type != "http":
  189. return None
  190. invoke_config_json = detail.tool_version.invoke_config_json or {}
  191. binding_config_json = detail.binding.config_json or {}
  192. render_context = _build_executor_template_context(context)
  193. request_headers = _merge_json_dicts(
  194. _render_json_dict(_read_dict_value(invoke_config_json, "headers"), render_context),
  195. _render_json_dict(_read_dict_value(binding_config_json, "headers"), render_context),
  196. _render_json_dict(
  197. _read_dict_value(context.node_config_json, "headers"),
  198. render_context,
  199. ),
  200. )
  201. request_query = _merge_json_dicts(
  202. _render_json_dict(_read_dict_value(invoke_config_json, "query"), render_context),
  203. _render_json_dict(_read_dict_value(context.node_config_json, "query"), render_context),
  204. )
  205. request_body = _merge_json_dicts(
  206. _render_json_dict(_read_dict_value(invoke_config_json, "body"), render_context),
  207. _render_json_dict(_read_dict_value(context.node_config_json, "body"), render_context),
  208. )
  209. method = (_read_string_value(invoke_config_json, "method") or "GET").upper()
  210. base_url = (
  211. _read_string_value(context.node_config_json, "base_url")
  212. or _read_string_value(binding_config_json, "base_url")
  213. or _read_string_value(invoke_config_json, "base_url")
  214. )
  215. path = _read_string_value(context.node_config_json, "path") or _read_string_value(
  216. invoke_config_json, "path"
  217. )
  218. url = _read_string_value(context.node_config_json, "url") or _read_string_value(
  219. invoke_config_json, "url"
  220. )
  221. resolved_url = _resolve_http_url(url=url, base_url=base_url, path=path)
  222. if resolved_url is None:
  223. return NodeExecutionResultContract(
  224. status="failed",
  225. worker_key=worker_key,
  226. error_code="tool_http_url_missing",
  227. error_message="http tool requires url or base_url with path",
  228. )
  229. timeout_ms = detail.tool_version.timeout_ms or 10000
  230. try:
  231. with httpx.Client(timeout=timeout_ms / 1000) as client:
  232. response = client.request(
  233. method=method,
  234. url=resolved_url,
  235. params=_coerce_http_params(request_query),
  236. headers=_coerce_http_headers(request_headers),
  237. json=request_body if request_body else None,
  238. )
  239. response.raise_for_status()
  240. except httpx.HTTPError as exc:
  241. return NodeExecutionResultContract(
  242. status="failed",
  243. worker_key=worker_key,
  244. error_code="tool_http_request_failed",
  245. error_message=str(exc),
  246. output_json={
  247. "executor_name": self.executor_name,
  248. "tool_binding_id": detail.binding.id,
  249. "tool_code": detail.tool_definition.code,
  250. "request_url": resolved_url,
  251. "request_method": method,
  252. },
  253. )
  254. response_json = _try_parse_json_response(response)
  255. response_text = None if response_json is not None else response.text
  256. return NodeExecutionResultContract(
  257. status="completed",
  258. worker_key=worker_key,
  259. output_text=response_text,
  260. output_json={
  261. "executor_name": self.executor_name,
  262. "tool_binding_id": detail.binding.id,
  263. "tool_code": detail.tool_definition.code,
  264. "tool_version_id": detail.tool_version.id,
  265. "tool_name": detail.tool_definition.name,
  266. "request_url": resolved_url,
  267. "request_method": method,
  268. "response_status_code": response.status_code,
  269. "response_headers": dict(response.headers),
  270. "response_json": response_json,
  271. },
  272. )
  273. class CodeNodeExecutor(CompletedNodeExecutor):
  274. def __init__(self, code_runner_client: CodeRunnerClient | None = None) -> None:
  275. super().__init__(
  276. executor_name="code-executor",
  277. supported_node_types=frozenset({"code"}),
  278. )
  279. self.code_runner_client = code_runner_client
  280. def execute(
  281. self,
  282. context: NodeExecutionContextContract,
  283. request: NodeExecutionRequestContract,
  284. ) -> NodeExecutionResultContract:
  285. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  286. code = _read_string_value(context.node_config_json, "code")
  287. if code is None:
  288. return NodeExecutionResultContract(
  289. status="failed",
  290. worker_key=worker_key,
  291. error_code="code_config_missing",
  292. error_message="code node config requires code",
  293. )
  294. if self.code_runner_client is None:
  295. return NodeExecutionResultContract(
  296. status="failed",
  297. worker_key=worker_key,
  298. error_code="code_runner_missing",
  299. error_message="code runner client is not configured",
  300. )
  301. render_context = _build_executor_template_context(context)
  302. input_json = _render_json_dict(
  303. _read_dict_value(context.node_config_json, "input_json"),
  304. render_context,
  305. )
  306. language = _read_string_value(context.node_config_json, "language") or "python"
  307. timeout_seconds = _read_int_value(context.node_config_json, "timeout_seconds") or 10
  308. code_request = CodeExecutionRequestContract(
  309. language=language,
  310. code=code,
  311. input_json=input_json,
  312. timeout_seconds=timeout_seconds,
  313. )
  314. try:
  315. response = self.code_runner_client.execute_code(code_request)
  316. except CodeRunnerClientError as exc:
  317. return NodeExecutionResultContract(
  318. status="failed",
  319. worker_key=worker_key,
  320. error_code="code_request_failed",
  321. error_message=str(exc),
  322. )
  323. if not response.success:
  324. return NodeExecutionResultContract(
  325. status="failed",
  326. worker_key=worker_key,
  327. error_code="code_execution_failed",
  328. error_message=response.error_message or response.stderr,
  329. output_text=response.stdout,
  330. output_json={
  331. "executor_name": self.executor_name,
  332. "stderr": response.stderr,
  333. "output_json": response.output_json,
  334. },
  335. )
  336. return NodeExecutionResultContract(
  337. status="completed",
  338. worker_key=worker_key,
  339. output_text=response.stdout,
  340. output_json={
  341. "executor_name": self.executor_name,
  342. "stderr": response.stderr,
  343. "result_json": response.output_json,
  344. },
  345. )
  346. class AnswerNodeExecutor(CompletedNodeExecutor):
  347. def execute(
  348. self,
  349. context: NodeExecutionContextContract,
  350. request: NodeExecutionRequestContract,
  351. ) -> NodeExecutionResultContract:
  352. answer_text = _read_string_value(context.node_config_json, "text")
  353. template = _read_string_value(context.node_config_json, "template")
  354. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  355. if answer_text is None and template is None:
  356. return NodeExecutionResultContract(
  357. status="failed",
  358. worker_key=worker_key,
  359. error_code="answer_config_missing",
  360. error_message="answer node config requires text or template",
  361. )
  362. render_context = _build_executor_template_context(context)
  363. rendered_text = render_template_string(answer_text or template or "", render_context)
  364. return NodeExecutionResultContract(
  365. status="completed",
  366. worker_key=worker_key,
  367. output_text=rendered_text,
  368. output_json={
  369. "executor_name": self.executor_name,
  370. "render_mode": "text" if answer_text is not None else "template",
  371. },
  372. )
  373. def __init__(self) -> None:
  374. super().__init__(
  375. executor_name="answer-executor",
  376. supported_node_types=frozenset({"answer"}),
  377. )
  378. class ConditionNodeExecutor(CompletedNodeExecutor):
  379. def execute(
  380. self,
  381. context: NodeExecutionContextContract,
  382. request: NodeExecutionRequestContract,
  383. ) -> NodeExecutionResultContract:
  384. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  385. render_context = _build_executor_template_context(context)
  386. expression = _read_string_value(context.node_config_json, "expression")
  387. path = _read_string_value(context.node_config_json, "path")
  388. if expression is not None:
  389. condition_result = evaluate_condition_expression(expression, render_context)
  390. evaluated_expression = expression
  391. elif path is not None:
  392. condition_result = _evaluate_path_condition(
  393. context.node_config_json,
  394. path,
  395. render_context,
  396. )
  397. evaluated_expression = path
  398. else:
  399. return NodeExecutionResultContract(
  400. status="failed",
  401. worker_key=worker_key,
  402. error_code="condition_config_missing",
  403. error_message="condition node config requires expression or path",
  404. )
  405. route = "true" if condition_result else "false"
  406. return NodeExecutionResultContract(
  407. status="completed",
  408. worker_key=worker_key,
  409. output_json={
  410. "executor_name": self.executor_name,
  411. "condition_result": condition_result,
  412. "route": route,
  413. "evaluated_expression": evaluated_expression,
  414. },
  415. )
  416. def __init__(self) -> None:
  417. super().__init__(
  418. executor_name="condition-executor",
  419. supported_node_types=frozenset({"if-else", "condition"}),
  420. )
  421. class AssignerNodeExecutor(CompletedNodeExecutor):
  422. def execute(
  423. self,
  424. context: NodeExecutionContextContract,
  425. request: NodeExecutionRequestContract,
  426. ) -> NodeExecutionResultContract:
  427. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  428. assignments = _read_dict_value(context.node_config_json, "assignments")
  429. if not assignments:
  430. return NodeExecutionResultContract(
  431. status="failed",
  432. worker_key=worker_key,
  433. error_code="assignments_missing",
  434. error_message="assigner node config requires assignments",
  435. )
  436. render_context = _build_executor_template_context(context)
  437. rendered_assignments = _render_json_dict(assignments, render_context)
  438. return NodeExecutionResultContract(
  439. status="completed",
  440. worker_key=worker_key,
  441. output_json={
  442. "executor_name": self.executor_name,
  443. "assigned_values": rendered_assignments,
  444. "state_updates": rendered_assignments,
  445. },
  446. )
  447. def __init__(self) -> None:
  448. super().__init__(
  449. executor_name="assigner-executor",
  450. supported_node_types=frozenset({"assigner"}),
  451. )
  452. class RetrieverNodeExecutor(CompletedNodeExecutor):
  453. def __init__(self, knowledge_client: KnowledgeServiceClient | None = None) -> None:
  454. super().__init__(
  455. executor_name="retriever-executor",
  456. supported_node_types=frozenset({"knowledge-retrieval", "retriever"}),
  457. )
  458. self.knowledge_client = knowledge_client
  459. def execute(
  460. self,
  461. context: NodeExecutionContextContract,
  462. request: NodeExecutionRequestContract,
  463. ) -> NodeExecutionResultContract:
  464. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  465. render_context = _build_executor_template_context(context)
  466. query = _resolve_retriever_query(context.node_config_json, render_context)
  467. documents = _read_retriever_documents(context.node_config_json, render_context)
  468. source_url = _read_string_value(context.node_config_json, "source_url")
  469. knowledge_base_id = _read_string_value(context.node_config_json, "knowledge_base_id")
  470. top_k = _read_int_value(context.node_config_json, "top_k") or 3
  471. if query is None:
  472. return NodeExecutionResultContract(
  473. status="failed",
  474. worker_key=worker_key,
  475. error_code="retriever_query_missing",
  476. error_message="retriever node config requires query or query_template",
  477. )
  478. if source_url is not None:
  479. try:
  480. documents.extend(
  481. _fetch_retriever_documents_from_url(
  482. source_url=render_template_string(source_url, render_context),
  483. timeout_ms=_read_int_value(context.node_config_json, "timeout_ms") or 10000,
  484. render_context=render_context,
  485. )
  486. )
  487. except httpx.HTTPError as exc:
  488. return NodeExecutionResultContract(
  489. status="failed",
  490. worker_key=worker_key,
  491. error_code="retriever_source_request_failed",
  492. error_message=str(exc),
  493. )
  494. except ValueError as exc:
  495. return NodeExecutionResultContract(
  496. status="failed",
  497. worker_key=worker_key,
  498. error_code="retriever_source_invalid",
  499. error_message=str(exc),
  500. )
  501. knowledge_results: list[dict[str, JSONValue]] = []
  502. if knowledge_base_id is not None:
  503. if self.knowledge_client is None:
  504. return NodeExecutionResultContract(
  505. status="failed",
  506. worker_key=worker_key,
  507. error_code="knowledge_client_missing",
  508. error_message="knowledge-service client is not configured",
  509. )
  510. try:
  511. knowledge_results = [
  512. item.model_dump(mode="json")
  513. for item in self.knowledge_client.search(
  514. KnowledgeSearchRequestContract(
  515. tenant_id=context.tenant_id,
  516. knowledge_base_id=render_template_string(
  517. knowledge_base_id,
  518. render_context,
  519. ),
  520. query=query,
  521. top_k=top_k,
  522. filters_json=_render_json_dict(
  523. _read_dict_value(context.node_config_json, "filters_json"),
  524. render_context,
  525. ),
  526. )
  527. )
  528. ]
  529. except KnowledgeServiceClientError as exc:
  530. return NodeExecutionResultContract(
  531. status="failed",
  532. worker_key=worker_key,
  533. error_code="knowledge_search_failed",
  534. error_message=str(exc),
  535. )
  536. documents.extend(_knowledge_results_to_retriever_documents(knowledge_results))
  537. if not documents:
  538. return NodeExecutionResultContract(
  539. status="failed",
  540. worker_key=worker_key,
  541. error_code="retriever_documents_missing",
  542. error_message=(
  543. "retriever node config requires documents, source_url, "
  544. "or knowledge_base_id"
  545. ),
  546. )
  547. ranked_documents = rank_documents(query=query, documents=documents, top_k=top_k)
  548. output_documents = [item.to_output_json() for item in ranked_documents]
  549. output_text = "\n\n".join(item.text for item in ranked_documents)
  550. return NodeExecutionResultContract(
  551. status="completed",
  552. worker_key=worker_key,
  553. output_text=output_text,
  554. output_json={
  555. "executor_name": self.executor_name,
  556. "query": query,
  557. "top_k": top_k,
  558. "retrieved_documents": output_documents,
  559. "knowledge_base_id": knowledge_base_id,
  560. "knowledge_results": knowledge_results,
  561. },
  562. )
  563. class TemplateNodeExecutor(CompletedNodeExecutor):
  564. def execute(
  565. self,
  566. context: NodeExecutionContextContract,
  567. request: NodeExecutionRequestContract,
  568. ) -> NodeExecutionResultContract:
  569. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  570. render_context = _build_executor_template_context(context)
  571. template = _read_string_value(context.node_config_json, "template")
  572. template_json = _read_dict_value(context.node_config_json, "template_json")
  573. if template is None and not template_json:
  574. return NodeExecutionResultContract(
  575. status="failed",
  576. worker_key=worker_key,
  577. error_code="template_config_missing",
  578. error_message="template node config requires template or template_json",
  579. )
  580. rendered_text = None
  581. rendered_json = None
  582. if template is not None:
  583. rendered_text = render_template_string(template, render_context)
  584. if template_json:
  585. rendered_json = _render_json_dict(template_json, render_context)
  586. output_json: dict[str, JSONValue] = {"executor_name": self.executor_name}
  587. if rendered_json is not None:
  588. output_json["rendered_json"] = rendered_json
  589. return NodeExecutionResultContract(
  590. status="completed",
  591. worker_key=worker_key,
  592. output_text=rendered_text,
  593. output_json=output_json,
  594. )
  595. def __init__(self) -> None:
  596. super().__init__(
  597. executor_name="template-executor",
  598. supported_node_types=frozenset({"template-transform", "template"}),
  599. )
  600. class NodeExecutionDispatcher:
  601. def __init__(
  602. self,
  603. executors: list[NodeExecutor],
  604. default_executor: NodeExecutor,
  605. ) -> None:
  606. self.executors = executors
  607. self.default_executor = default_executor
  608. def resolve_executor(self, node_type: str) -> NodeExecutor:
  609. for executor in self.executors:
  610. if node_type in executor.supported_node_types:
  611. return executor
  612. return self.default_executor
  613. def execute(
  614. self,
  615. context: NodeExecutionContextContract,
  616. request: NodeExecutionRequestContract,
  617. ) -> tuple[NodeExecutionResultContract, str]:
  618. executor = self.resolve_executor(context.node_type)
  619. result = executor.execute(context, request)
  620. return result, executor.executor_name
  621. def build_node_execution_dispatcher() -> NodeExecutionDispatcher:
  622. executors: list[NodeExecutor] = [
  623. LLMNodeExecutor(),
  624. ToolNodeExecutor(),
  625. CodeNodeExecutor(),
  626. AnswerNodeExecutor(),
  627. ConditionNodeExecutor(),
  628. AssignerNodeExecutor(),
  629. RetrieverNodeExecutor(),
  630. TemplateNodeExecutor(),
  631. ]
  632. return NodeExecutionDispatcher(
  633. executors=executors,
  634. default_executor=DefaultNodeExecutor(),
  635. )
  636. def build_node_execution_dispatcher_with_clients(
  637. *,
  638. code_runner_client: CodeRunnerClient | None = None,
  639. model_gateway_client: ModelGatewayClient | None = None,
  640. tool_client: ToolServiceClient | None = None,
  641. knowledge_client: KnowledgeServiceClient | None = None,
  642. ) -> NodeExecutionDispatcher:
  643. executors: list[NodeExecutor] = [
  644. LLMNodeExecutor(model_gateway_client=model_gateway_client),
  645. ToolNodeExecutor(tool_client=tool_client),
  646. CodeNodeExecutor(code_runner_client=code_runner_client),
  647. AnswerNodeExecutor(),
  648. ConditionNodeExecutor(),
  649. AssignerNodeExecutor(),
  650. RetrieverNodeExecutor(knowledge_client=knowledge_client),
  651. TemplateNodeExecutor(),
  652. ]
  653. return NodeExecutionDispatcher(
  654. executors=executors,
  655. default_executor=DefaultNodeExecutor(),
  656. )
  657. def _read_string_value(payload: dict[str, JSONValue], key: str) -> str | None:
  658. value = payload.get(key)
  659. if isinstance(value, str):
  660. return value
  661. return None
  662. def _read_dict_value(payload: dict[str, JSONValue], key: str) -> dict[str, JSONValue]:
  663. value = payload.get(key)
  664. if isinstance(value, dict):
  665. return {str(item_key): item_value for item_key, item_value in value.items()}
  666. return {}
  667. def _merge_json_dicts(*items: dict[str, JSONValue]) -> dict[str, JSONValue]:
  668. merged: dict[str, JSONValue] = {}
  669. for item in items:
  670. merged.update(item)
  671. return merged
  672. def _render_json_dict(
  673. payload: dict[str, JSONValue],
  674. context: dict[str, JSONValue],
  675. ) -> dict[str, JSONValue]:
  676. rendered = render_json_value(payload, context)
  677. if isinstance(rendered, dict):
  678. return {str(key): value for key, value in rendered.items()}
  679. return {}
  680. def _render_config_json(
  681. payload: dict[str, JSONValue],
  682. context: dict[str, JSONValue],
  683. ) -> dict[str, JSONValue]:
  684. return _render_json_dict(payload, context)
  685. def _resolve_http_url(*, url: str | None, base_url: str | None, path: str | None) -> str | None:
  686. if url is not None:
  687. return url
  688. if base_url is None or path is None:
  689. return None
  690. return f"{base_url.rstrip('/')}/{path.lstrip('/')}"
  691. def _coerce_http_headers(payload: dict[str, JSONValue]) -> dict[str, str]:
  692. headers: dict[str, str] = {}
  693. for key, value in payload.items():
  694. if isinstance(value, (str, int, float, bool)):
  695. headers[key] = str(value)
  696. return headers
  697. def _coerce_http_params(payload: dict[str, JSONValue]) -> dict[str, str]:
  698. params: dict[str, str] = {}
  699. for key, value in payload.items():
  700. if isinstance(value, (str, int, float, bool)):
  701. params[key] = str(value)
  702. return params
  703. def _try_parse_json_response(response: httpx.Response) -> JSONValue | None:
  704. content_type = response.headers.get("content-type", "")
  705. if "json" not in content_type.lower():
  706. return None
  707. try:
  708. payload = response.json()
  709. except ValueError:
  710. return None
  711. if isinstance(payload, (dict, list, str, int, float, bool)) or payload is None:
  712. return payload
  713. return None
  714. def _build_chat_completion_request(
  715. payload: dict[str, JSONValue],
  716. ) -> ChatCompletionRequestContract | None:
  717. messages = _read_message_list(payload, "messages")
  718. if not messages:
  719. system_prompt = _read_string_value(payload, "system_prompt")
  720. prompt = _read_string_value(payload, "prompt")
  721. if system_prompt is not None:
  722. messages.append(ChatMessageContract(role="system", content=system_prompt))
  723. if prompt is not None:
  724. messages.append(ChatMessageContract(role="user", content=prompt))
  725. if not messages:
  726. return None
  727. temperature = _read_float_value(payload, "temperature")
  728. max_tokens = _read_int_value(payload, "max_tokens")
  729. model = _read_string_value(payload, "model")
  730. return ChatCompletionRequestContract(
  731. model=model,
  732. messages=messages,
  733. temperature=temperature,
  734. max_tokens=max_tokens,
  735. )
  736. def _read_message_list(
  737. payload: dict[str, JSONValue],
  738. key: str,
  739. ) -> list[ChatMessageContract]:
  740. value = payload.get(key)
  741. if not isinstance(value, list):
  742. return []
  743. messages: list[ChatMessageContract] = []
  744. for item in value:
  745. if not isinstance(item, dict):
  746. continue
  747. role = item.get("role")
  748. content = item.get("content")
  749. name = item.get("name")
  750. if isinstance(role, str) and isinstance(content, str):
  751. messages.append(
  752. ChatMessageContract(
  753. role=role,
  754. content=content,
  755. name=name if isinstance(name, str) else None,
  756. )
  757. )
  758. return messages
  759. def _read_float_value(payload: dict[str, JSONValue], key: str) -> float | None:
  760. value = payload.get(key)
  761. if isinstance(value, (int, float)) and not isinstance(value, bool):
  762. return float(value)
  763. return None
  764. def _read_int_value(payload: dict[str, JSONValue], key: str) -> int | None:
  765. value = payload.get(key)
  766. if isinstance(value, int) and not isinstance(value, bool):
  767. return value
  768. return None
  769. def _build_executor_template_context(context: NodeExecutionContextContract) -> dict[str, JSONValue]:
  770. return build_template_context(
  771. node_id=context.node_id,
  772. node_type=context.node_type,
  773. run_state_json=context.run_state_json,
  774. node_output_json_by_node_id=context.node_output_json_by_node_id,
  775. node_output_text_by_node_id=context.node_output_text_by_node_id,
  776. )
  777. def _evaluate_path_condition(
  778. payload: dict[str, JSONValue],
  779. path: str,
  780. render_context: dict[str, JSONValue],
  781. ) -> bool:
  782. value = resolve_expression(render_context, path)
  783. if "equals" in payload:
  784. return value == render_json_value(payload["equals"], render_context)
  785. if "not_equals" in payload:
  786. return value != render_json_value(payload["not_equals"], render_context)
  787. if "gt" in payload:
  788. return _compare_numeric(value, render_json_value(payload["gt"], render_context), ">")
  789. if "gte" in payload:
  790. return _compare_numeric(value, render_json_value(payload["gte"], render_context), ">=")
  791. if "lt" in payload:
  792. return _compare_numeric(value, render_json_value(payload["lt"], render_context), "<")
  793. if "lte" in payload:
  794. return _compare_numeric(value, render_json_value(payload["lte"], render_context), "<=")
  795. if "exists" in payload:
  796. expected = payload["exists"]
  797. if isinstance(expected, bool):
  798. return (value is not None) is expected
  799. return coerce_bool(value)
  800. def _compare_numeric(left: JSONValue, right: JSONValue, operator: str) -> bool:
  801. if not isinstance(left, (int, float)) or not isinstance(right, (int, float)):
  802. return False
  803. if operator == ">":
  804. return left > right
  805. if operator == ">=":
  806. return left >= right
  807. if operator == "<":
  808. return left < right
  809. if operator == "<=":
  810. return left <= right
  811. return False
  812. @dataclass(frozen=True)
  813. class RetrieverDocument:
  814. document_id: str
  815. title: str | None
  816. text: str
  817. metadata: dict[str, JSONValue]
  818. @dataclass(frozen=True)
  819. class RankedRetrieverDocument:
  820. document_id: str
  821. title: str | None
  822. text: str
  823. metadata: dict[str, JSONValue]
  824. score: float
  825. def to_output_json(self) -> dict[str, JSONValue]:
  826. return {
  827. "document_id": self.document_id,
  828. "title": self.title,
  829. "text": self.text,
  830. "metadata": self.metadata,
  831. "score": self.score,
  832. }
  833. def _resolve_retriever_query(
  834. payload: dict[str, JSONValue],
  835. render_context: dict[str, JSONValue],
  836. ) -> str | None:
  837. query = _read_string_value(payload, "query")
  838. query_template = _read_string_value(payload, "query_template")
  839. if query is not None:
  840. rendered_query = render_template_string(query, render_context)
  841. elif query_template is not None:
  842. rendered_query = render_template_string(query_template, render_context)
  843. else:
  844. return None
  845. stripped_query = rendered_query.strip()
  846. if not stripped_query:
  847. return None
  848. return stripped_query
  849. def _read_retriever_documents(
  850. payload: dict[str, JSONValue],
  851. render_context: dict[str, JSONValue],
  852. ) -> list[RetrieverDocument]:
  853. value = payload.get("documents")
  854. if not isinstance(value, list):
  855. return []
  856. documents: list[RetrieverDocument] = []
  857. for index, item in enumerate(value):
  858. document = _parse_retriever_document(
  859. item,
  860. index=index,
  861. render_context=render_context,
  862. )
  863. if document is not None:
  864. documents.append(document)
  865. return documents
  866. def _fetch_retriever_documents_from_url(
  867. *,
  868. source_url: str,
  869. timeout_ms: int,
  870. render_context: dict[str, JSONValue],
  871. ) -> list[RetrieverDocument]:
  872. if not source_url.strip():
  873. return []
  874. with httpx.Client(timeout=timeout_ms / 1000) as client:
  875. response = client.get(source_url)
  876. response.raise_for_status()
  877. payload = response.json()
  878. if isinstance(payload, dict):
  879. documents_payload = payload.get("documents")
  880. else:
  881. documents_payload = payload
  882. if not isinstance(documents_payload, list):
  883. raise ValueError("retriever source must return a JSON list or object.documents list")
  884. documents: list[RetrieverDocument] = []
  885. for index, item in enumerate(documents_payload):
  886. if not _is_json_value(item):
  887. continue
  888. document = _parse_retriever_document(
  889. item,
  890. index=index,
  891. render_context=render_context,
  892. )
  893. if document is not None:
  894. documents.append(document)
  895. return documents
  896. def _parse_retriever_document(
  897. value: JSONValue,
  898. *,
  899. index: int,
  900. render_context: dict[str, JSONValue],
  901. ) -> RetrieverDocument | None:
  902. if isinstance(value, str):
  903. text = render_template_string(value, render_context).strip()
  904. if not text:
  905. return None
  906. return RetrieverDocument(
  907. document_id=f"doc-{index + 1}",
  908. title=None,
  909. text=text,
  910. metadata={},
  911. )
  912. if not isinstance(value, dict):
  913. return None
  914. rendered = _render_json_dict({str(key): item for key, item in value.items()}, render_context)
  915. text_value = rendered.get("text") or rendered.get("content")
  916. if not isinstance(text_value, str) or not text_value.strip():
  917. return None
  918. document_id_value = rendered.get("id") or rendered.get("document_id")
  919. title_value = rendered.get("title")
  920. metadata_value = rendered.get("metadata")
  921. return RetrieverDocument(
  922. document_id=str(document_id_value) if document_id_value is not None else f"doc-{index + 1}",
  923. title=title_value if isinstance(title_value, str) else None,
  924. text=text_value.strip(),
  925. metadata=metadata_value if isinstance(metadata_value, dict) else {},
  926. )
  927. def _knowledge_results_to_retriever_documents(
  928. results: list[dict[str, JSONValue]],
  929. ) -> list[RetrieverDocument]:
  930. documents: list[RetrieverDocument] = []
  931. for index, result in enumerate(results):
  932. chunk = result.get("chunk")
  933. document = result.get("document")
  934. score = result.get("score")
  935. score_json = result.get("score_json")
  936. if not isinstance(chunk, dict) or not isinstance(document, dict):
  937. continue
  938. content_text = chunk.get("content_text")
  939. if not isinstance(content_text, str) or not content_text.strip():
  940. continue
  941. document_id = document.get("id")
  942. title = document.get("title")
  943. metadata: dict[str, JSONValue] = {
  944. "source": "knowledge-service",
  945. "chunk_id": str(chunk.get("id")) if chunk.get("id") is not None else None,
  946. "chunk_index": (
  947. chunk.get("chunk_index")
  948. if isinstance(chunk.get("chunk_index"), int)
  949. else index
  950. ),
  951. "score": score if isinstance(score, (int, float)) else None,
  952. "score_json": score_json if isinstance(score_json, dict) else {},
  953. }
  954. documents.append(
  955. RetrieverDocument(
  956. document_id=str(document_id)
  957. if document_id is not None
  958. else f"knowledge-{index + 1}",
  959. title=title if isinstance(title, str) else None,
  960. text=content_text.strip(),
  961. metadata=metadata,
  962. )
  963. )
  964. return documents
  965. def rank_documents(
  966. *,
  967. query: str,
  968. documents: list[RetrieverDocument],
  969. top_k: int,
  970. ) -> list[RankedRetrieverDocument]:
  971. normalized_top_k = max(top_k, 1)
  972. query_tokens = tokenize_text(query)
  973. ranked_documents: list[RankedRetrieverDocument] = []
  974. for document in documents:
  975. document_tokens = tokenize_text(" ".join(filter(None, [document.title, document.text])))
  976. score = calculate_keyword_score(query_tokens=query_tokens, document_tokens=document_tokens)
  977. ranked_documents.append(
  978. RankedRetrieverDocument(
  979. document_id=document.document_id,
  980. title=document.title,
  981. text=document.text,
  982. metadata=document.metadata,
  983. score=score,
  984. )
  985. )
  986. ranked_documents.sort(key=lambda item: item.score, reverse=True)
  987. return ranked_documents[:normalized_top_k]
  988. def calculate_keyword_score(
  989. *,
  990. query_tokens: set[str],
  991. document_tokens: set[str],
  992. ) -> float:
  993. if not query_tokens or not document_tokens:
  994. return 0.0
  995. overlap_count = len(query_tokens.intersection(document_tokens))
  996. if overlap_count == 0:
  997. return 0.0
  998. return round(overlap_count / len(query_tokens), 4)
  999. def tokenize_text(value: str) -> set[str]:
  1000. tokens = {item.lower() for item in re.findall(r"[\w\u4e00-\u9fff]+", value)}
  1001. return {item for item in tokens if item}
  1002. def _is_json_value(value: object) -> bool:
  1003. if value is None or isinstance(value, (str, int, float, bool)):
  1004. return True
  1005. if isinstance(value, list):
  1006. return all(_is_json_value(item) for item in value)
  1007. if isinstance(value, dict):
  1008. return all(isinstance(key, str) and _is_json_value(item) for key, item in value.items())
  1009. return False