executors.py 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230
  1. import re
  2. from abc import ABC, abstractmethod
  3. from dataclasses import dataclass
  4. from datetime import datetime, timedelta
  5. from typing import cast
  6. import httpx
  7. from core_domain import (
  8. ChatCompletionRequestContract,
  9. ChatMessageContract,
  10. CodeExecutionRequestContract,
  11. HumanTaskCreateContract,
  12. HumanTaskType,
  13. KnowledgeSearchRequestContract,
  14. NodeExecutionContextContract,
  15. NodeExecutionRequestContract,
  16. NodeExecutionResultContract,
  17. ToolBindingDetailContract,
  18. )
  19. from core_shared import JSONValue
  20. from .code_runner_client import CodeRunnerClient, CodeRunnerClientError
  21. from .context import (
  22. build_template_context,
  23. coerce_bool,
  24. evaluate_condition_expression,
  25. render_json_value,
  26. render_template_string,
  27. resolve_expression,
  28. )
  29. from .human_client import HumanServiceClient, HumanServiceClientError
  30. from .knowledge_client import KnowledgeServiceClient, KnowledgeServiceClientError
  31. from .model_gateway_client import ModelGatewayClient, ModelGatewayClientError
  32. from .tool_client import ToolServiceClient, ToolServiceClientError
  33. class NodeExecutor(ABC):
  34. executor_name: str
  35. supported_node_types: frozenset[str]
  36. @abstractmethod
  37. def execute(
  38. self,
  39. context: NodeExecutionContextContract,
  40. request: NodeExecutionRequestContract) -> NodeExecutionResultContract:
  41. raise NotImplementedError
  42. class CompletedNodeExecutor(NodeExecutor):
  43. def __init__(self, *, executor_name: str, supported_node_types: frozenset[str]) -> None:
  44. self.executor_name = executor_name
  45. self.supported_node_types = supported_node_types
  46. def execute(
  47. self,
  48. context: NodeExecutionContextContract,
  49. request: NodeExecutionRequestContract) -> NodeExecutionResultContract:
  50. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  51. return NodeExecutionResultContract(
  52. status="completed",
  53. worker_key=worker_key,
  54. output_json={
  55. "executor_name": self.executor_name,
  56. "node_type": context.node_type,
  57. })
  58. class DefaultNodeExecutor(CompletedNodeExecutor):
  59. def __init__(self) -> None:
  60. super().__init__(
  61. executor_name="default-executor",
  62. supported_node_types=frozenset())
  63. class LLMNodeExecutor(CompletedNodeExecutor):
  64. def __init__(self, model_gateway_client: ModelGatewayClient | None = None) -> None:
  65. super().__init__(
  66. executor_name="llm-executor",
  67. supported_node_types=frozenset({"llm"}))
  68. self.model_gateway_client = model_gateway_client
  69. def execute(
  70. self,
  71. context: NodeExecutionContextContract,
  72. request: NodeExecutionRequestContract) -> NodeExecutionResultContract:
  73. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  74. render_context = _build_executor_template_context(context)
  75. rendered_config_json = _render_config_json(context.node_config_json, render_context)
  76. chat_request = _build_chat_completion_request(rendered_config_json)
  77. if chat_request is None:
  78. return NodeExecutionResultContract(
  79. status="failed",
  80. worker_key=worker_key,
  81. error_code="llm_config_missing",
  82. error_message="llm node config requires prompt or messages")
  83. if self.model_gateway_client is None:
  84. return NodeExecutionResultContract(
  85. status="failed",
  86. worker_key=worker_key,
  87. error_code="llm_gateway_missing",
  88. error_message="model gateway client is not configured")
  89. try:
  90. response = self.model_gateway_client.create_chat_completion(chat_request)
  91. except ModelGatewayClientError as exc:
  92. return NodeExecutionResultContract(
  93. status="failed",
  94. worker_key=worker_key,
  95. error_code="llm_request_failed",
  96. error_message=str(exc))
  97. return NodeExecutionResultContract(
  98. status="completed",
  99. worker_key=worker_key,
  100. output_text=response.content,
  101. output_json={
  102. "executor_name": self.executor_name,
  103. "model": response.model,
  104. "finish_reason": response.finish_reason,
  105. "usage_json": response.usage_json,
  106. "raw_response_json": response.raw_response_json,
  107. })
  108. class ToolNodeExecutor(CompletedNodeExecutor):
  109. def __init__(self, tool_client: ToolServiceClient | None = None) -> None:
  110. super().__init__(
  111. executor_name="tool-executor",
  112. supported_node_types=frozenset({"tool"}))
  113. self.tool_client = tool_client
  114. def execute(
  115. self,
  116. context: NodeExecutionContextContract,
  117. request: NodeExecutionRequestContract) -> NodeExecutionResultContract:
  118. tool_binding_id = _read_string_value(context.node_config_json, "tool_binding_id")
  119. tool_code = _read_string_value(context.node_config_json, "tool_code")
  120. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  121. if tool_binding_id is None and tool_code is None:
  122. return NodeExecutionResultContract(
  123. status="failed",
  124. worker_key=worker_key,
  125. error_code="tool_config_missing",
  126. error_message="tool node config requires tool_binding_id or tool_code")
  127. if tool_binding_id is not None and self.tool_client is not None:
  128. try:
  129. detail = self.tool_client.get_tool_binding_detail(
  130. binding_id=tool_binding_id)
  131. except ToolServiceClientError as exc:
  132. return NodeExecutionResultContract(
  133. status="failed",
  134. worker_key=worker_key,
  135. error_code="tool_binding_lookup_failed",
  136. error_message=str(exc))
  137. if not detail.binding.enabled:
  138. return NodeExecutionResultContract(
  139. status="failed",
  140. worker_key=worker_key,
  141. error_code="tool_binding_disabled",
  142. error_message=f"tool binding is disabled: {tool_binding_id}")
  143. resolved_tool_code = detail.tool_definition.code
  144. resolved_tool_version_id = detail.tool_version.id
  145. resolved_tool_name = detail.tool_definition.name
  146. invoke_result = self._invoke_http_tool(
  147. context=context,
  148. detail=detail,
  149. worker_key=worker_key)
  150. if invoke_result is not None:
  151. return invoke_result
  152. else:
  153. resolved_tool_code = tool_code
  154. resolved_tool_version_id = None
  155. resolved_tool_name = None
  156. return NodeExecutionResultContract(
  157. status="completed",
  158. worker_key=worker_key,
  159. output_text=f"tool node completed: {resolved_tool_code or 'unknown-tool'}",
  160. output_json={
  161. "executor_name": self.executor_name,
  162. "tool_binding_id": tool_binding_id,
  163. "tool_code": resolved_tool_code,
  164. "tool_version_id": resolved_tool_version_id,
  165. "tool_name": resolved_tool_name,
  166. })
  167. def _invoke_http_tool(
  168. self,
  169. *,
  170. context: NodeExecutionContextContract,
  171. detail: ToolBindingDetailContract,
  172. worker_key: str) -> NodeExecutionResultContract | None:
  173. if detail.tool_definition.tool_type != "http":
  174. return None
  175. invoke_config_json = detail.tool_version.invoke_config_json or {}
  176. binding_config_json = detail.binding.config_json or {}
  177. render_context = _build_executor_template_context(context)
  178. request_headers = _merge_json_dicts(
  179. _render_json_dict(_read_dict_value(invoke_config_json, "headers"), render_context),
  180. _render_json_dict(_read_dict_value(binding_config_json, "headers"), render_context),
  181. _render_json_dict(
  182. _read_dict_value(context.node_config_json, "headers"),
  183. render_context))
  184. request_query = _merge_json_dicts(
  185. _render_json_dict(_read_dict_value(invoke_config_json, "query"), render_context),
  186. _render_json_dict(_read_dict_value(context.node_config_json, "query"), render_context))
  187. request_body = _merge_json_dicts(
  188. _render_json_dict(_read_dict_value(invoke_config_json, "body"), render_context),
  189. _render_json_dict(_read_dict_value(context.node_config_json, "body"), render_context))
  190. method = (_read_string_value(invoke_config_json, "method") or "GET").upper()
  191. base_url = (
  192. _read_string_value(context.node_config_json, "base_url")
  193. or _read_string_value(binding_config_json, "base_url")
  194. or _read_string_value(invoke_config_json, "base_url")
  195. )
  196. path = _read_string_value(context.node_config_json, "path") or _read_string_value(
  197. invoke_config_json, "path"
  198. )
  199. url = _read_string_value(context.node_config_json, "url") or _read_string_value(
  200. invoke_config_json, "url"
  201. )
  202. resolved_url = _resolve_http_url(url=url, base_url=base_url, path=path)
  203. if resolved_url is None:
  204. return NodeExecutionResultContract(
  205. status="failed",
  206. worker_key=worker_key,
  207. error_code="tool_http_url_missing",
  208. error_message="http tool requires url or base_url with path")
  209. timeout_ms = detail.tool_version.timeout_ms or 10000
  210. try:
  211. with httpx.Client(timeout=timeout_ms / 1000) as client:
  212. response = client.request(
  213. method=method,
  214. url=resolved_url,
  215. params=_coerce_http_params(request_query),
  216. headers=_coerce_http_headers(request_headers),
  217. json=request_body if request_body else None)
  218. response.raise_for_status()
  219. except httpx.HTTPError as exc:
  220. return NodeExecutionResultContract(
  221. status="failed",
  222. worker_key=worker_key,
  223. error_code="tool_http_request_failed",
  224. error_message=str(exc),
  225. output_json={
  226. "executor_name": self.executor_name,
  227. "tool_binding_id": detail.binding.id,
  228. "tool_code": detail.tool_definition.code,
  229. "request_url": resolved_url,
  230. "request_method": method,
  231. })
  232. response_json = _try_parse_json_response(response)
  233. response_text = None if response_json is not None else response.text
  234. return NodeExecutionResultContract(
  235. status="completed",
  236. worker_key=worker_key,
  237. output_text=response_text,
  238. output_json={
  239. "executor_name": self.executor_name,
  240. "tool_binding_id": detail.binding.id,
  241. "tool_code": detail.tool_definition.code,
  242. "tool_version_id": detail.tool_version.id,
  243. "tool_name": detail.tool_definition.name,
  244. "request_url": resolved_url,
  245. "request_method": method,
  246. "response_status_code": response.status_code,
  247. "response_headers": dict(response.headers),
  248. "response_json": response_json,
  249. })
  250. class CodeNodeExecutor(CompletedNodeExecutor):
  251. def __init__(self, code_runner_client: CodeRunnerClient | None = None) -> None:
  252. super().__init__(
  253. executor_name="code-executor",
  254. supported_node_types=frozenset({"code"}))
  255. self.code_runner_client = code_runner_client
  256. def execute(
  257. self,
  258. context: NodeExecutionContextContract,
  259. request: NodeExecutionRequestContract) -> NodeExecutionResultContract:
  260. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  261. code = _read_string_value(context.node_config_json, "code")
  262. if code is None:
  263. return NodeExecutionResultContract(
  264. status="failed",
  265. worker_key=worker_key,
  266. error_code="code_config_missing",
  267. error_message="code node config requires code")
  268. if self.code_runner_client is None:
  269. return NodeExecutionResultContract(
  270. status="failed",
  271. worker_key=worker_key,
  272. error_code="code_runner_missing",
  273. error_message="code runner client is not configured")
  274. render_context = _build_executor_template_context(context)
  275. input_json = _render_json_dict(
  276. _read_dict_value(context.node_config_json, "input_json"),
  277. render_context)
  278. language = _read_string_value(context.node_config_json, "language") or "python"
  279. timeout_seconds = _read_int_value(context.node_config_json, "timeout_seconds") or 10
  280. code_request = CodeExecutionRequestContract(
  281. language=language,
  282. code=code,
  283. input_json=input_json,
  284. timeout_seconds=timeout_seconds)
  285. try:
  286. response = self.code_runner_client.execute_code(code_request)
  287. except CodeRunnerClientError as exc:
  288. return NodeExecutionResultContract(
  289. status="failed",
  290. worker_key=worker_key,
  291. error_code="code_request_failed",
  292. error_message=str(exc))
  293. if not response.success:
  294. return NodeExecutionResultContract(
  295. status="failed",
  296. worker_key=worker_key,
  297. error_code="code_execution_failed",
  298. error_message=response.error_message or response.stderr,
  299. output_text=response.stdout,
  300. output_json={
  301. "executor_name": self.executor_name,
  302. "stderr": response.stderr,
  303. "output_json": response.output_json,
  304. })
  305. return NodeExecutionResultContract(
  306. status="completed",
  307. worker_key=worker_key,
  308. output_text=response.stdout,
  309. output_json={
  310. "executor_name": self.executor_name,
  311. "stderr": response.stderr,
  312. "result_json": response.output_json,
  313. })
  314. class HumanNodeExecutor(CompletedNodeExecutor):
  315. def __init__(self, human_client: HumanServiceClient | None = None) -> None:
  316. super().__init__(
  317. executor_name="human-executor",
  318. supported_node_types=frozenset(
  319. {"human", "approval", "human-input", "human-takeover"}
  320. ))
  321. self.human_client = human_client
  322. def execute(
  323. self,
  324. context: NodeExecutionContextContract,
  325. request: NodeExecutionRequestContract) -> NodeExecutionResultContract:
  326. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  327. if self.human_client is None:
  328. return NodeExecutionResultContract(
  329. status="failed",
  330. worker_key=worker_key,
  331. error_code="human_service_missing",
  332. error_message="human service client is not configured")
  333. human_task_id = _resolve_existing_human_task_id(context)
  334. if human_task_id is None:
  335. return self._create_waiting_task(context=context, worker_key=worker_key)
  336. try:
  337. task = self.human_client.get_task(
  338. human_task_id=human_task_id)
  339. except HumanServiceClientError as exc:
  340. return NodeExecutionResultContract(
  341. status="failed",
  342. worker_key=worker_key,
  343. error_code="human_task_lookup_failed",
  344. error_message=str(exc))
  345. output_json: dict[str, JSONValue] = {
  346. "executor_name": self.executor_name,
  347. "human_task_id": task.id,
  348. "human_task_status": task.status,
  349. "response_payload_json": task.response_payload_json or {},
  350. }
  351. if task.status in {"pending", "claimed"}:
  352. return NodeExecutionResultContract(
  353. status="pending",
  354. worker_key=worker_key,
  355. output_text=f"waiting for human task: {task.id}",
  356. output_json=output_json)
  357. if task.status in {"approved", "completed"}:
  358. return NodeExecutionResultContract(
  359. status="completed",
  360. worker_key=worker_key,
  361. output_json=output_json)
  362. return NodeExecutionResultContract(
  363. status="failed",
  364. worker_key=worker_key,
  365. error_code=f"human_task_{task.status}",
  366. error_message=f"human task ended with status: {task.status}",
  367. output_json=output_json)
  368. def _create_waiting_task(
  369. self,
  370. *,
  371. context: NodeExecutionContextContract,
  372. worker_key: str) -> NodeExecutionResultContract:
  373. render_context = _build_executor_template_context(context)
  374. task_type = _resolve_human_task_type(context.node_type, context.node_config_json)
  375. title = render_template_string(
  376. _read_string_value(context.node_config_json, "title")
  377. or f"Human task for {context.node_id}",
  378. render_context)
  379. description_template = _read_string_value(context.node_config_json, "description")
  380. description = (
  381. render_template_string(description_template, render_context)
  382. if description_template is not None
  383. else None
  384. )
  385. request_payload_json = _render_json_dict(
  386. _read_dict_value(context.node_config_json, "request_payload_json"),
  387. render_context)
  388. try:
  389. task = self.human_client.create_task(
  390. HumanTaskCreateContract(
  391. task_type=task_type,
  392. title=title,
  393. description=description,
  394. source_type="runtime-node",
  395. source_id=context.node_run_id,
  396. run_id=context.run_id,
  397. node_run_id=context.node_run_id,
  398. requested_by=_read_string_value(
  399. context.node_config_json,
  400. "requested_by"),
  401. assigned_to=_read_string_value(
  402. context.node_config_json,
  403. "assigned_to"),
  404. request_payload_json=request_payload_json,
  405. due_time=_resolve_due_time(context.node_config_json))
  406. )
  407. except HumanServiceClientError as exc:
  408. return NodeExecutionResultContract(
  409. status="failed",
  410. worker_key=worker_key,
  411. error_code="human_task_create_failed",
  412. error_message=str(exc))
  413. return NodeExecutionResultContract(
  414. status="pending",
  415. worker_key=worker_key,
  416. output_text=f"waiting for human task: {task.id}",
  417. output_json={
  418. "executor_name": self.executor_name,
  419. "human_task_id": task.id,
  420. "human_task_status": task.status,
  421. "task_type": task.task_type,
  422. })
  423. class AnswerNodeExecutor(CompletedNodeExecutor):
  424. def execute(
  425. self,
  426. context: NodeExecutionContextContract,
  427. request: NodeExecutionRequestContract) -> NodeExecutionResultContract:
  428. answer_text = _read_string_value(context.node_config_json, "text")
  429. template = _read_string_value(context.node_config_json, "template")
  430. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  431. if answer_text is None and template is None:
  432. return NodeExecutionResultContract(
  433. status="failed",
  434. worker_key=worker_key,
  435. error_code="answer_config_missing",
  436. error_message="answer node config requires text or template")
  437. render_context = _build_executor_template_context(context)
  438. rendered_text = render_template_string(answer_text or template or "", render_context)
  439. return NodeExecutionResultContract(
  440. status="completed",
  441. worker_key=worker_key,
  442. output_text=rendered_text,
  443. output_json={
  444. "executor_name": self.executor_name,
  445. "render_mode": "text" if answer_text is not None else "template",
  446. })
  447. def __init__(self) -> None:
  448. super().__init__(
  449. executor_name="answer-executor",
  450. supported_node_types=frozenset({"answer"}))
  451. class ConditionNodeExecutor(CompletedNodeExecutor):
  452. def execute(
  453. self,
  454. context: NodeExecutionContextContract,
  455. request: NodeExecutionRequestContract) -> NodeExecutionResultContract:
  456. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  457. render_context = _build_executor_template_context(context)
  458. expression = _read_string_value(context.node_config_json, "expression")
  459. path = _read_string_value(context.node_config_json, "path")
  460. if expression is not None:
  461. condition_result = evaluate_condition_expression(expression, render_context)
  462. evaluated_expression = expression
  463. elif path is not None:
  464. condition_result = _evaluate_path_condition(
  465. context.node_config_json,
  466. path,
  467. render_context)
  468. evaluated_expression = path
  469. else:
  470. return NodeExecutionResultContract(
  471. status="failed",
  472. worker_key=worker_key,
  473. error_code="condition_config_missing",
  474. error_message="condition node config requires expression or path")
  475. route = "true" if condition_result else "false"
  476. return NodeExecutionResultContract(
  477. status="completed",
  478. worker_key=worker_key,
  479. output_json={
  480. "executor_name": self.executor_name,
  481. "condition_result": condition_result,
  482. "route": route,
  483. "evaluated_expression": evaluated_expression,
  484. })
  485. def __init__(self) -> None:
  486. super().__init__(
  487. executor_name="condition-executor",
  488. supported_node_types=frozenset({"if-else", "condition"}))
  489. class AssignerNodeExecutor(CompletedNodeExecutor):
  490. def execute(
  491. self,
  492. context: NodeExecutionContextContract,
  493. request: NodeExecutionRequestContract) -> NodeExecutionResultContract:
  494. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  495. assignments = _read_dict_value(context.node_config_json, "assignments")
  496. if not assignments:
  497. return NodeExecutionResultContract(
  498. status="failed",
  499. worker_key=worker_key,
  500. error_code="assignments_missing",
  501. error_message="assigner node config requires assignments")
  502. render_context = _build_executor_template_context(context)
  503. rendered_assignments = _render_json_dict(assignments, render_context)
  504. return NodeExecutionResultContract(
  505. status="completed",
  506. worker_key=worker_key,
  507. output_json={
  508. "executor_name": self.executor_name,
  509. "assigned_values": rendered_assignments,
  510. "state_updates": rendered_assignments,
  511. })
  512. def __init__(self) -> None:
  513. super().__init__(
  514. executor_name="assigner-executor",
  515. supported_node_types=frozenset({"assigner"}))
  516. class RetrieverNodeExecutor(CompletedNodeExecutor):
  517. def __init__(self, knowledge_client: KnowledgeServiceClient | None = None) -> None:
  518. super().__init__(
  519. executor_name="retriever-executor",
  520. supported_node_types=frozenset({"knowledge-retrieval", "retriever"}))
  521. self.knowledge_client = knowledge_client
  522. def execute(
  523. self,
  524. context: NodeExecutionContextContract,
  525. request: NodeExecutionRequestContract) -> NodeExecutionResultContract:
  526. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  527. render_context = _build_executor_template_context(context)
  528. query = _resolve_retriever_query(context.node_config_json, render_context)
  529. documents = _read_retriever_documents(context.node_config_json, render_context)
  530. source_url = _read_string_value(context.node_config_json, "source_url")
  531. knowledge_base_id = _read_string_value(context.node_config_json, "knowledge_base_id")
  532. top_k = _read_int_value(context.node_config_json, "top_k") or 3
  533. if query is None:
  534. return NodeExecutionResultContract(
  535. status="failed",
  536. worker_key=worker_key,
  537. error_code="retriever_query_missing",
  538. error_message="retriever node config requires query or query_template")
  539. if source_url is not None:
  540. try:
  541. documents.extend(
  542. _fetch_retriever_documents_from_url(
  543. source_url=render_template_string(source_url, render_context),
  544. timeout_ms=_read_int_value(context.node_config_json, "timeout_ms") or 10000,
  545. render_context=render_context)
  546. )
  547. except httpx.HTTPError as exc:
  548. return NodeExecutionResultContract(
  549. status="failed",
  550. worker_key=worker_key,
  551. error_code="retriever_source_request_failed",
  552. error_message=str(exc))
  553. except ValueError as exc:
  554. return NodeExecutionResultContract(
  555. status="failed",
  556. worker_key=worker_key,
  557. error_code="retriever_source_invalid",
  558. error_message=str(exc))
  559. knowledge_results: list[dict[str, JSONValue]] = []
  560. if knowledge_base_id is not None:
  561. if self.knowledge_client is None:
  562. return NodeExecutionResultContract(
  563. status="failed",
  564. worker_key=worker_key,
  565. error_code="knowledge_client_missing",
  566. error_message="knowledge-service client is not configured")
  567. try:
  568. knowledge_results = [
  569. item.model_dump(mode="json")
  570. for item in self.knowledge_client.search(
  571. KnowledgeSearchRequestContract(
  572. knowledge_base_id=render_template_string(
  573. knowledge_base_id,
  574. render_context),
  575. query=query,
  576. top_k=top_k,
  577. filters_json=_render_json_dict(
  578. _read_dict_value(context.node_config_json, "filters_json"),
  579. render_context))
  580. )
  581. ]
  582. except KnowledgeServiceClientError as exc:
  583. return NodeExecutionResultContract(
  584. status="failed",
  585. worker_key=worker_key,
  586. error_code="knowledge_search_failed",
  587. error_message=str(exc))
  588. documents.extend(_knowledge_results_to_retriever_documents(knowledge_results))
  589. if not documents:
  590. return NodeExecutionResultContract(
  591. status="failed",
  592. worker_key=worker_key,
  593. error_code="retriever_documents_missing",
  594. error_message=(
  595. "retriever node config requires documents, source_url, "
  596. "or knowledge_base_id"
  597. ))
  598. ranked_documents = rank_documents(query=query, documents=documents, top_k=top_k)
  599. output_documents = [item.to_output_json() for item in ranked_documents]
  600. output_text = "\n\n".join(item.text for item in ranked_documents)
  601. return NodeExecutionResultContract(
  602. status="completed",
  603. worker_key=worker_key,
  604. output_text=output_text,
  605. output_json={
  606. "executor_name": self.executor_name,
  607. "query": query,
  608. "top_k": top_k,
  609. "retrieved_documents": output_documents,
  610. "knowledge_base_id": knowledge_base_id,
  611. "knowledge_results": knowledge_results,
  612. })
  613. class TemplateNodeExecutor(CompletedNodeExecutor):
  614. def execute(
  615. self,
  616. context: NodeExecutionContextContract,
  617. request: NodeExecutionRequestContract) -> NodeExecutionResultContract:
  618. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  619. render_context = _build_executor_template_context(context)
  620. template = _read_string_value(context.node_config_json, "template")
  621. template_json = _read_dict_value(context.node_config_json, "template_json")
  622. if template is None and not template_json:
  623. return NodeExecutionResultContract(
  624. status="failed",
  625. worker_key=worker_key,
  626. error_code="template_config_missing",
  627. error_message="template node config requires template or template_json")
  628. rendered_text = None
  629. rendered_json = None
  630. if template is not None:
  631. rendered_text = render_template_string(template, render_context)
  632. if template_json:
  633. rendered_json = _render_json_dict(template_json, render_context)
  634. output_json: dict[str, JSONValue] = {"executor_name": self.executor_name}
  635. if rendered_json is not None:
  636. output_json["rendered_json"] = rendered_json
  637. return NodeExecutionResultContract(
  638. status="completed",
  639. worker_key=worker_key,
  640. output_text=rendered_text,
  641. output_json=output_json)
  642. def __init__(self) -> None:
  643. super().__init__(
  644. executor_name="template-executor",
  645. supported_node_types=frozenset({"template-transform", "template"}))
  646. class NodeExecutionDispatcher:
  647. def __init__(
  648. self,
  649. executors: list[NodeExecutor],
  650. default_executor: NodeExecutor) -> None:
  651. self.executors = executors
  652. self.default_executor = default_executor
  653. def resolve_executor(self, node_type: str) -> NodeExecutor:
  654. for executor in self.executors:
  655. if node_type in executor.supported_node_types:
  656. return executor
  657. return self.default_executor
  658. def execute(
  659. self,
  660. context: NodeExecutionContextContract,
  661. request: NodeExecutionRequestContract) -> tuple[NodeExecutionResultContract, str]:
  662. executor = self.resolve_executor(context.node_type)
  663. result = executor.execute(context, request)
  664. return result, executor.executor_name
  665. def build_node_execution_dispatcher() -> NodeExecutionDispatcher:
  666. executors: list[NodeExecutor] = [
  667. LLMNodeExecutor(),
  668. ToolNodeExecutor(),
  669. CodeNodeExecutor(),
  670. HumanNodeExecutor(),
  671. AnswerNodeExecutor(),
  672. ConditionNodeExecutor(),
  673. AssignerNodeExecutor(),
  674. RetrieverNodeExecutor(),
  675. TemplateNodeExecutor(),
  676. ]
  677. return NodeExecutionDispatcher(
  678. executors=executors,
  679. default_executor=DefaultNodeExecutor())
  680. def build_node_execution_dispatcher_with_clients(
  681. *,
  682. code_runner_client: CodeRunnerClient | None = None,
  683. model_gateway_client: ModelGatewayClient | None = None,
  684. tool_client: ToolServiceClient | None = None,
  685. knowledge_client: KnowledgeServiceClient | None = None,
  686. human_client: HumanServiceClient | None = None) -> NodeExecutionDispatcher:
  687. executors: list[NodeExecutor] = [
  688. LLMNodeExecutor(model_gateway_client=model_gateway_client),
  689. ToolNodeExecutor(tool_client=tool_client),
  690. CodeNodeExecutor(code_runner_client=code_runner_client),
  691. HumanNodeExecutor(human_client=human_client),
  692. AnswerNodeExecutor(),
  693. ConditionNodeExecutor(),
  694. AssignerNodeExecutor(),
  695. RetrieverNodeExecutor(knowledge_client=knowledge_client),
  696. TemplateNodeExecutor(),
  697. ]
  698. return NodeExecutionDispatcher(
  699. executors=executors,
  700. default_executor=DefaultNodeExecutor())
  701. def _read_string_value(payload: dict[str, JSONValue], key: str) -> str | None:
  702. value = payload.get(key)
  703. if isinstance(value, str):
  704. return value
  705. return None
  706. def _read_dict_value(payload: dict[str, JSONValue], key: str) -> dict[str, JSONValue]:
  707. value = payload.get(key)
  708. if isinstance(value, dict):
  709. return {str(item_key): item_value for item_key, item_value in value.items()}
  710. return {}
  711. def _merge_json_dicts(*items: dict[str, JSONValue]) -> dict[str, JSONValue]:
  712. merged: dict[str, JSONValue] = {}
  713. for item in items:
  714. merged.update(item)
  715. return merged
  716. def _render_json_dict(
  717. payload: dict[str, JSONValue],
  718. context: dict[str, JSONValue]) -> dict[str, JSONValue]:
  719. rendered = render_json_value(payload, context)
  720. if isinstance(rendered, dict):
  721. return {str(key): value for key, value in rendered.items()}
  722. return {}
  723. def _render_config_json(
  724. payload: dict[str, JSONValue],
  725. context: dict[str, JSONValue]) -> dict[str, JSONValue]:
  726. return _render_json_dict(payload, context)
  727. def _resolve_http_url(*, url: str | None, base_url: str | None, path: str | None) -> str | None:
  728. if url is not None:
  729. return url
  730. if base_url is None or path is None:
  731. return None
  732. return f"{base_url.rstrip('/')}/{path.lstrip('/')}"
  733. def _coerce_http_headers(payload: dict[str, JSONValue]) -> dict[str, str]:
  734. headers: dict[str, str] = {}
  735. for key, value in payload.items():
  736. if isinstance(value, (str, int, float, bool)):
  737. headers[key] = str(value)
  738. return headers
  739. def _coerce_http_params(payload: dict[str, JSONValue]) -> dict[str, str]:
  740. params: dict[str, str] = {}
  741. for key, value in payload.items():
  742. if isinstance(value, (str, int, float, bool)):
  743. params[key] = str(value)
  744. return params
  745. def _try_parse_json_response(response: httpx.Response) -> JSONValue | None:
  746. content_type = response.headers.get("content-type", "")
  747. if "json" not in content_type.lower():
  748. return None
  749. try:
  750. payload = response.json()
  751. except ValueError:
  752. return None
  753. if isinstance(payload, (dict, list, str, int, float, bool)) or payload is None:
  754. return payload
  755. return None
  756. def _build_chat_completion_request(
  757. payload: dict[str, JSONValue]) -> ChatCompletionRequestContract | None:
  758. messages = _read_message_list(payload, "messages")
  759. if not messages:
  760. system_prompt = _read_string_value(payload, "system_prompt")
  761. prompt = _read_string_value(payload, "prompt")
  762. if system_prompt is not None:
  763. messages.append(ChatMessageContract(role="system", content=system_prompt))
  764. if prompt is not None:
  765. messages.append(ChatMessageContract(role="user", content=prompt))
  766. if not messages:
  767. return None
  768. temperature = _read_float_value(payload, "temperature")
  769. max_tokens = _read_int_value(payload, "max_tokens")
  770. model = _read_string_value(payload, "model")
  771. return ChatCompletionRequestContract(
  772. model=model,
  773. messages=messages,
  774. temperature=temperature,
  775. max_tokens=max_tokens)
  776. def _read_message_list(
  777. payload: dict[str, JSONValue],
  778. key: str) -> list[ChatMessageContract]:
  779. value = payload.get(key)
  780. if not isinstance(value, list):
  781. return []
  782. messages: list[ChatMessageContract] = []
  783. for item in value:
  784. if not isinstance(item, dict):
  785. continue
  786. role = item.get("role")
  787. content = item.get("content")
  788. name = item.get("name")
  789. if isinstance(role, str) and isinstance(content, str):
  790. messages.append(
  791. ChatMessageContract(
  792. role=role,
  793. content=content,
  794. name=name if isinstance(name, str) else None)
  795. )
  796. return messages
  797. def _read_float_value(payload: dict[str, JSONValue], key: str) -> float | None:
  798. value = payload.get(key)
  799. if isinstance(value, (int, float)) and not isinstance(value, bool):
  800. return float(value)
  801. return None
  802. def _read_int_value(payload: dict[str, JSONValue], key: str) -> int | None:
  803. value = payload.get(key)
  804. if isinstance(value, int) and not isinstance(value, bool):
  805. return value
  806. return None
  807. def _resolve_existing_human_task_id(context: NodeExecutionContextContract) -> str | None:
  808. configured_task_id = _read_string_value(context.node_config_json, "human_task_id")
  809. if configured_task_id is not None:
  810. return configured_task_id
  811. current_node_output = context.node_output_json_by_node_id.get(context.node_id)
  812. if current_node_output is None:
  813. return None
  814. return _read_string_value(current_node_output, "human_task_id")
  815. def _resolve_human_task_type(
  816. node_type: str,
  817. payload: dict[str, JSONValue]) -> HumanTaskType:
  818. configured_task_type = _read_string_value(payload, "task_type")
  819. if configured_task_type in {"approval", "input", "takeover", "pause", "resume"}:
  820. return cast(HumanTaskType, configured_task_type)
  821. if node_type == "approval":
  822. return "approval"
  823. if node_type == "human-input":
  824. return "input"
  825. if node_type == "human-takeover":
  826. return "takeover"
  827. return "input"
  828. def _resolve_due_time(payload: dict[str, JSONValue]) -> datetime | None:
  829. due_time = _read_datetime_value(payload, "due_time")
  830. if due_time is not None:
  831. return due_time
  832. due_after_seconds = _read_int_value(payload, "due_after_seconds")
  833. if due_after_seconds is None or due_after_seconds <= 0:
  834. return None
  835. return datetime.utcnow() + timedelta(seconds=due_after_seconds)
  836. def _read_datetime_value(payload: dict[str, JSONValue], key: str) -> datetime | None:
  837. value = payload.get(key)
  838. if isinstance(value, str) and value.strip():
  839. normalized_value = value.strip().replace("Z", "+00:00")
  840. try:
  841. return datetime.fromisoformat(normalized_value)
  842. except ValueError:
  843. return None
  844. return None
  845. def _build_executor_template_context(context: NodeExecutionContextContract) -> dict[str, JSONValue]:
  846. return build_template_context(
  847. node_id=context.node_id,
  848. node_type=context.node_type,
  849. run_state_json=context.run_state_json,
  850. node_output_json_by_node_id=context.node_output_json_by_node_id,
  851. node_output_text_by_node_id=context.node_output_text_by_node_id)
  852. def _evaluate_path_condition(
  853. payload: dict[str, JSONValue],
  854. path: str,
  855. render_context: dict[str, JSONValue]) -> bool:
  856. value = resolve_expression(render_context, path)
  857. if "equals" in payload:
  858. return value == render_json_value(payload["equals"], render_context)
  859. if "not_equals" in payload:
  860. return value != render_json_value(payload["not_equals"], render_context)
  861. if "gt" in payload:
  862. return _compare_numeric(value, render_json_value(payload["gt"], render_context), ">")
  863. if "gte" in payload:
  864. return _compare_numeric(value, render_json_value(payload["gte"], render_context), ">=")
  865. if "lt" in payload:
  866. return _compare_numeric(value, render_json_value(payload["lt"], render_context), "<")
  867. if "lte" in payload:
  868. return _compare_numeric(value, render_json_value(payload["lte"], render_context), "<=")
  869. if "exists" in payload:
  870. expected = payload["exists"]
  871. if isinstance(expected, bool):
  872. return (value is not None) is expected
  873. return coerce_bool(value)
  874. def _compare_numeric(left: JSONValue, right: JSONValue, operator: str) -> bool:
  875. if not isinstance(left, (int, float)) or not isinstance(right, (int, float)):
  876. return False
  877. if operator == ">":
  878. return left > right
  879. if operator == ">=":
  880. return left >= right
  881. if operator == "<":
  882. return left < right
  883. if operator == "<=":
  884. return left <= right
  885. return False
  886. @dataclass(frozen=True)
  887. class RetrieverDocument:
  888. document_id: str
  889. title: str | None
  890. text: str
  891. metadata: dict[str, JSONValue]
  892. @dataclass(frozen=True)
  893. class RankedRetrieverDocument:
  894. document_id: str
  895. title: str | None
  896. text: str
  897. metadata: dict[str, JSONValue]
  898. score: float
  899. def to_output_json(self) -> dict[str, JSONValue]:
  900. return {
  901. "document_id": self.document_id,
  902. "title": self.title,
  903. "text": self.text,
  904. "metadata": self.metadata,
  905. "score": self.score,
  906. }
  907. def _resolve_retriever_query(
  908. payload: dict[str, JSONValue],
  909. render_context: dict[str, JSONValue]) -> str | None:
  910. query = _read_string_value(payload, "query")
  911. query_template = _read_string_value(payload, "query_template")
  912. if query is not None:
  913. rendered_query = render_template_string(query, render_context)
  914. elif query_template is not None:
  915. rendered_query = render_template_string(query_template, render_context)
  916. else:
  917. return None
  918. stripped_query = rendered_query.strip()
  919. if not stripped_query:
  920. return None
  921. return stripped_query
  922. def _read_retriever_documents(
  923. payload: dict[str, JSONValue],
  924. render_context: dict[str, JSONValue]) -> list[RetrieverDocument]:
  925. value = payload.get("documents")
  926. if not isinstance(value, list):
  927. return []
  928. documents: list[RetrieverDocument] = []
  929. for index, item in enumerate(value):
  930. document = _parse_retriever_document(
  931. item,
  932. index=index,
  933. render_context=render_context)
  934. if document is not None:
  935. documents.append(document)
  936. return documents
  937. def _fetch_retriever_documents_from_url(
  938. *,
  939. source_url: str,
  940. timeout_ms: int,
  941. render_context: dict[str, JSONValue]) -> list[RetrieverDocument]:
  942. if not source_url.strip():
  943. return []
  944. with httpx.Client(timeout=timeout_ms / 1000) as client:
  945. response = client.get(source_url)
  946. response.raise_for_status()
  947. payload = response.json()
  948. if isinstance(payload, dict):
  949. documents_payload = payload.get("documents")
  950. else:
  951. documents_payload = payload
  952. if not isinstance(documents_payload, list):
  953. raise ValueError("retriever source must return a JSON list or object.documents list")
  954. documents: list[RetrieverDocument] = []
  955. for index, item in enumerate(documents_payload):
  956. if not _is_json_value(item):
  957. continue
  958. document = _parse_retriever_document(
  959. item,
  960. index=index,
  961. render_context=render_context)
  962. if document is not None:
  963. documents.append(document)
  964. return documents
  965. def _parse_retriever_document(
  966. value: JSONValue,
  967. *,
  968. index: int,
  969. render_context: dict[str, JSONValue]) -> RetrieverDocument | None:
  970. if isinstance(value, str):
  971. text = render_template_string(value, render_context).strip()
  972. if not text:
  973. return None
  974. return RetrieverDocument(
  975. document_id=f"doc-{index + 1}",
  976. title=None,
  977. text=text,
  978. metadata={})
  979. if not isinstance(value, dict):
  980. return None
  981. rendered = _render_json_dict({str(key): item for key, item in value.items()}, render_context)
  982. text_value = rendered.get("text") or rendered.get("content")
  983. if not isinstance(text_value, str) or not text_value.strip():
  984. return None
  985. document_id_value = rendered.get("id") or rendered.get("document_id")
  986. title_value = rendered.get("title")
  987. metadata_value = rendered.get("metadata")
  988. return RetrieverDocument(
  989. document_id=str(document_id_value) if document_id_value is not None else f"doc-{index + 1}",
  990. title=title_value if isinstance(title_value, str) else None,
  991. text=text_value.strip(),
  992. metadata=metadata_value if isinstance(metadata_value, dict) else {})
  993. def _knowledge_results_to_retriever_documents(
  994. results: list[dict[str, JSONValue]]) -> list[RetrieverDocument]:
  995. documents: list[RetrieverDocument] = []
  996. for index, result in enumerate(results):
  997. chunk = result.get("chunk")
  998. document = result.get("document")
  999. score = result.get("score")
  1000. score_json = result.get("score_json")
  1001. if not isinstance(chunk, dict) or not isinstance(document, dict):
  1002. continue
  1003. content_text = chunk.get("content_text")
  1004. if not isinstance(content_text, str) or not content_text.strip():
  1005. continue
  1006. document_id = document.get("id")
  1007. title = document.get("title")
  1008. metadata: dict[str, JSONValue] = {
  1009. "source": "knowledge-service",
  1010. "chunk_id": str(chunk.get("id")) if chunk.get("id") is not None else None,
  1011. "chunk_index": (
  1012. chunk.get("chunk_index")
  1013. if isinstance(chunk.get("chunk_index"), int)
  1014. else index
  1015. ),
  1016. "score": score if isinstance(score, (int, float)) else None,
  1017. "score_json": score_json if isinstance(score_json, dict) else {},
  1018. }
  1019. documents.append(
  1020. RetrieverDocument(
  1021. document_id=str(document_id)
  1022. if document_id is not None
  1023. else f"knowledge-{index + 1}",
  1024. title=title if isinstance(title, str) else None,
  1025. text=content_text.strip(),
  1026. metadata=metadata)
  1027. )
  1028. return documents
  1029. def rank_documents(
  1030. *,
  1031. query: str,
  1032. documents: list[RetrieverDocument],
  1033. top_k: int) -> list[RankedRetrieverDocument]:
  1034. normalized_top_k = max(top_k, 1)
  1035. query_tokens = tokenize_text(query)
  1036. ranked_documents: list[RankedRetrieverDocument] = []
  1037. for document in documents:
  1038. document_tokens = tokenize_text(" ".join(filter(None, [document.title, document.text])))
  1039. score = calculate_keyword_score(query_tokens=query_tokens, document_tokens=document_tokens)
  1040. ranked_documents.append(
  1041. RankedRetrieverDocument(
  1042. document_id=document.document_id,
  1043. title=document.title,
  1044. text=document.text,
  1045. metadata=document.metadata,
  1046. score=score)
  1047. )
  1048. ranked_documents.sort(key=lambda item: item.score, reverse=True)
  1049. return ranked_documents[:normalized_top_k]
  1050. def calculate_keyword_score(
  1051. *,
  1052. query_tokens: set[str],
  1053. document_tokens: set[str]) -> float:
  1054. if not query_tokens or not document_tokens:
  1055. return 0.0
  1056. overlap_count = len(query_tokens.intersection(document_tokens))
  1057. if overlap_count == 0:
  1058. return 0.0
  1059. return round(overlap_count / len(query_tokens), 4)
  1060. def tokenize_text(value: str) -> set[str]:
  1061. tokens = {item.lower() for item in re.findall(r"[\w\u4e00-\u9fff]+", value)}
  1062. return {item for item in tokens if item}
  1063. def _is_json_value(value: object) -> bool:
  1064. if value is None or isinstance(value, (str, int, float, bool)):
  1065. return True
  1066. if isinstance(value, list):
  1067. return all(_is_json_value(item) for item in value)
  1068. if isinstance(value, dict):
  1069. return all(isinstance(key, str) and _is_json_value(item) for key, item in value.items())
  1070. return False