executors.py 48 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343
  1. from abc import ABC, abstractmethod
  2. from dataclasses import dataclass
  3. from datetime import datetime, timedelta
  4. import re
  5. from typing import cast
  6. import httpx
  7. from core_domain import (
  8. ChatCompletionRequestContract,
  9. ChatMessageContract,
  10. CodeExecutionRequestContract,
  11. HumanTaskCreateContract,
  12. HumanTaskType,
  13. KnowledgeSearchRequestContract,
  14. NodeExecutionContextContract,
  15. NodeExecutionRequestContract,
  16. NodeExecutionResultContract,
  17. ToolBindingDetailContract,
  18. )
  19. from core_shared import JSONValue
  20. from .code_runner_client import CodeRunnerClient, CodeRunnerClientError
  21. from .context import (
  22. build_template_context,
  23. coerce_bool,
  24. evaluate_condition_expression,
  25. render_json_value,
  26. render_template_string,
  27. resolve_expression,
  28. )
  29. from .human_client import HumanServiceClient, HumanServiceClientError
  30. from .knowledge_client import KnowledgeServiceClient, KnowledgeServiceClientError
  31. from .model_gateway_client import ModelGatewayClient, ModelGatewayClientError
  32. from .tool_client import ToolServiceClient, ToolServiceClientError
  33. class NodeExecutor(ABC):
  34. executor_name: str
  35. supported_node_types: frozenset[str]
  36. @abstractmethod
  37. def execute(
  38. self,
  39. context: NodeExecutionContextContract,
  40. request: NodeExecutionRequestContract,
  41. ) -> NodeExecutionResultContract:
  42. raise NotImplementedError
  43. class CompletedNodeExecutor(NodeExecutor):
  44. def __init__(self, *, executor_name: str, supported_node_types: frozenset[str]) -> None:
  45. self.executor_name = executor_name
  46. self.supported_node_types = supported_node_types
  47. def execute(
  48. self,
  49. context: NodeExecutionContextContract,
  50. request: NodeExecutionRequestContract,
  51. ) -> NodeExecutionResultContract:
  52. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  53. return NodeExecutionResultContract(
  54. status="completed",
  55. worker_key=worker_key,
  56. output_json={
  57. "executor_name": self.executor_name,
  58. "node_type": context.node_type,
  59. },
  60. )
  61. class DefaultNodeExecutor(CompletedNodeExecutor):
  62. def __init__(self) -> None:
  63. super().__init__(
  64. executor_name="default-executor",
  65. supported_node_types=frozenset(),
  66. )
  67. class LLMNodeExecutor(CompletedNodeExecutor):
  68. def __init__(self, model_gateway_client: ModelGatewayClient | None = None) -> None:
  69. super().__init__(
  70. executor_name="llm-executor",
  71. supported_node_types=frozenset({"llm"}),
  72. )
  73. self.model_gateway_client = model_gateway_client
  74. def execute(
  75. self,
  76. context: NodeExecutionContextContract,
  77. request: NodeExecutionRequestContract,
  78. ) -> NodeExecutionResultContract:
  79. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  80. render_context = _build_executor_template_context(context)
  81. rendered_config_json = _render_config_json(context.node_config_json, render_context)
  82. chat_request = _build_chat_completion_request(rendered_config_json)
  83. if chat_request is None:
  84. return NodeExecutionResultContract(
  85. status="failed",
  86. worker_key=worker_key,
  87. error_code="llm_config_missing",
  88. error_message="llm node config requires prompt or messages",
  89. )
  90. if self.model_gateway_client is None:
  91. return NodeExecutionResultContract(
  92. status="failed",
  93. worker_key=worker_key,
  94. error_code="llm_gateway_missing",
  95. error_message="model gateway client is not configured",
  96. )
  97. try:
  98. response = self.model_gateway_client.create_chat_completion(chat_request)
  99. except ModelGatewayClientError as exc:
  100. return NodeExecutionResultContract(
  101. status="failed",
  102. worker_key=worker_key,
  103. error_code="llm_request_failed",
  104. error_message=str(exc),
  105. )
  106. return NodeExecutionResultContract(
  107. status="completed",
  108. worker_key=worker_key,
  109. output_text=response.content,
  110. output_json={
  111. "executor_name": self.executor_name,
  112. "model": response.model,
  113. "finish_reason": response.finish_reason,
  114. "usage_json": response.usage_json,
  115. "raw_response_json": response.raw_response_json,
  116. },
  117. )
  118. class ToolNodeExecutor(CompletedNodeExecutor):
  119. def __init__(self, tool_client: ToolServiceClient | None = None) -> None:
  120. super().__init__(
  121. executor_name="tool-executor",
  122. supported_node_types=frozenset({"tool"}),
  123. )
  124. self.tool_client = tool_client
  125. def execute(
  126. self,
  127. context: NodeExecutionContextContract,
  128. request: NodeExecutionRequestContract,
  129. ) -> NodeExecutionResultContract:
  130. tool_binding_id = _read_string_value(context.node_config_json, "tool_binding_id")
  131. tool_code = _read_string_value(context.node_config_json, "tool_code")
  132. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  133. if tool_binding_id is None and tool_code is None:
  134. return NodeExecutionResultContract(
  135. status="failed",
  136. worker_key=worker_key,
  137. error_code="tool_config_missing",
  138. error_message="tool node config requires tool_binding_id or tool_code",
  139. )
  140. if tool_binding_id is not None and self.tool_client is not None:
  141. try:
  142. detail = self.tool_client.get_tool_binding_detail(
  143. tenant_id=context.tenant_id,
  144. binding_id=tool_binding_id,
  145. )
  146. except ToolServiceClientError as exc:
  147. return NodeExecutionResultContract(
  148. status="failed",
  149. worker_key=worker_key,
  150. error_code="tool_binding_lookup_failed",
  151. error_message=str(exc),
  152. )
  153. if not detail.binding.enabled:
  154. return NodeExecutionResultContract(
  155. status="failed",
  156. worker_key=worker_key,
  157. error_code="tool_binding_disabled",
  158. error_message=f"tool binding is disabled: {tool_binding_id}",
  159. )
  160. resolved_tool_code = detail.tool_definition.code
  161. resolved_tool_version_id = detail.tool_version.id
  162. resolved_tool_name = detail.tool_definition.name
  163. invoke_result = self._invoke_http_tool(
  164. context=context,
  165. detail=detail,
  166. worker_key=worker_key,
  167. )
  168. if invoke_result is not None:
  169. return invoke_result
  170. else:
  171. resolved_tool_code = tool_code
  172. resolved_tool_version_id = None
  173. resolved_tool_name = None
  174. return NodeExecutionResultContract(
  175. status="completed",
  176. worker_key=worker_key,
  177. output_text=f"tool node completed: {resolved_tool_code or 'unknown-tool'}",
  178. output_json={
  179. "executor_name": self.executor_name,
  180. "tool_binding_id": tool_binding_id,
  181. "tool_code": resolved_tool_code,
  182. "tool_version_id": resolved_tool_version_id,
  183. "tool_name": resolved_tool_name,
  184. },
  185. )
  186. def _invoke_http_tool(
  187. self,
  188. *,
  189. context: NodeExecutionContextContract,
  190. detail: ToolBindingDetailContract,
  191. worker_key: str,
  192. ) -> NodeExecutionResultContract | None:
  193. if detail.tool_definition.tool_type != "http":
  194. return None
  195. invoke_config_json = detail.tool_version.invoke_config_json or {}
  196. binding_config_json = detail.binding.config_json or {}
  197. render_context = _build_executor_template_context(context)
  198. request_headers = _merge_json_dicts(
  199. _render_json_dict(_read_dict_value(invoke_config_json, "headers"), render_context),
  200. _render_json_dict(_read_dict_value(binding_config_json, "headers"), render_context),
  201. _render_json_dict(
  202. _read_dict_value(context.node_config_json, "headers"),
  203. render_context,
  204. ),
  205. )
  206. request_query = _merge_json_dicts(
  207. _render_json_dict(_read_dict_value(invoke_config_json, "query"), render_context),
  208. _render_json_dict(_read_dict_value(context.node_config_json, "query"), render_context),
  209. )
  210. request_body = _merge_json_dicts(
  211. _render_json_dict(_read_dict_value(invoke_config_json, "body"), render_context),
  212. _render_json_dict(_read_dict_value(context.node_config_json, "body"), render_context),
  213. )
  214. method = (_read_string_value(invoke_config_json, "method") or "GET").upper()
  215. base_url = (
  216. _read_string_value(context.node_config_json, "base_url")
  217. or _read_string_value(binding_config_json, "base_url")
  218. or _read_string_value(invoke_config_json, "base_url")
  219. )
  220. path = _read_string_value(context.node_config_json, "path") or _read_string_value(
  221. invoke_config_json, "path"
  222. )
  223. url = _read_string_value(context.node_config_json, "url") or _read_string_value(
  224. invoke_config_json, "url"
  225. )
  226. resolved_url = _resolve_http_url(url=url, base_url=base_url, path=path)
  227. if resolved_url is None:
  228. return NodeExecutionResultContract(
  229. status="failed",
  230. worker_key=worker_key,
  231. error_code="tool_http_url_missing",
  232. error_message="http tool requires url or base_url with path",
  233. )
  234. timeout_ms = detail.tool_version.timeout_ms or 10000
  235. try:
  236. with httpx.Client(timeout=timeout_ms / 1000) as client:
  237. response = client.request(
  238. method=method,
  239. url=resolved_url,
  240. params=_coerce_http_params(request_query),
  241. headers=_coerce_http_headers(request_headers),
  242. json=request_body if request_body else None,
  243. )
  244. response.raise_for_status()
  245. except httpx.HTTPError as exc:
  246. return NodeExecutionResultContract(
  247. status="failed",
  248. worker_key=worker_key,
  249. error_code="tool_http_request_failed",
  250. error_message=str(exc),
  251. output_json={
  252. "executor_name": self.executor_name,
  253. "tool_binding_id": detail.binding.id,
  254. "tool_code": detail.tool_definition.code,
  255. "request_url": resolved_url,
  256. "request_method": method,
  257. },
  258. )
  259. response_json = _try_parse_json_response(response)
  260. response_text = None if response_json is not None else response.text
  261. return NodeExecutionResultContract(
  262. status="completed",
  263. worker_key=worker_key,
  264. output_text=response_text,
  265. output_json={
  266. "executor_name": self.executor_name,
  267. "tool_binding_id": detail.binding.id,
  268. "tool_code": detail.tool_definition.code,
  269. "tool_version_id": detail.tool_version.id,
  270. "tool_name": detail.tool_definition.name,
  271. "request_url": resolved_url,
  272. "request_method": method,
  273. "response_status_code": response.status_code,
  274. "response_headers": dict(response.headers),
  275. "response_json": response_json,
  276. },
  277. )
  278. class CodeNodeExecutor(CompletedNodeExecutor):
  279. def __init__(self, code_runner_client: CodeRunnerClient | None = None) -> None:
  280. super().__init__(
  281. executor_name="code-executor",
  282. supported_node_types=frozenset({"code"}),
  283. )
  284. self.code_runner_client = code_runner_client
  285. def execute(
  286. self,
  287. context: NodeExecutionContextContract,
  288. request: NodeExecutionRequestContract,
  289. ) -> NodeExecutionResultContract:
  290. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  291. code = _read_string_value(context.node_config_json, "code")
  292. if code is None:
  293. return NodeExecutionResultContract(
  294. status="failed",
  295. worker_key=worker_key,
  296. error_code="code_config_missing",
  297. error_message="code node config requires code",
  298. )
  299. if self.code_runner_client is None:
  300. return NodeExecutionResultContract(
  301. status="failed",
  302. worker_key=worker_key,
  303. error_code="code_runner_missing",
  304. error_message="code runner client is not configured",
  305. )
  306. render_context = _build_executor_template_context(context)
  307. input_json = _render_json_dict(
  308. _read_dict_value(context.node_config_json, "input_json"),
  309. render_context,
  310. )
  311. language = _read_string_value(context.node_config_json, "language") or "python"
  312. timeout_seconds = _read_int_value(context.node_config_json, "timeout_seconds") or 10
  313. code_request = CodeExecutionRequestContract(
  314. language=language,
  315. code=code,
  316. input_json=input_json,
  317. timeout_seconds=timeout_seconds,
  318. )
  319. try:
  320. response = self.code_runner_client.execute_code(code_request)
  321. except CodeRunnerClientError as exc:
  322. return NodeExecutionResultContract(
  323. status="failed",
  324. worker_key=worker_key,
  325. error_code="code_request_failed",
  326. error_message=str(exc),
  327. )
  328. if not response.success:
  329. return NodeExecutionResultContract(
  330. status="failed",
  331. worker_key=worker_key,
  332. error_code="code_execution_failed",
  333. error_message=response.error_message or response.stderr,
  334. output_text=response.stdout,
  335. output_json={
  336. "executor_name": self.executor_name,
  337. "stderr": response.stderr,
  338. "output_json": response.output_json,
  339. },
  340. )
  341. return NodeExecutionResultContract(
  342. status="completed",
  343. worker_key=worker_key,
  344. output_text=response.stdout,
  345. output_json={
  346. "executor_name": self.executor_name,
  347. "stderr": response.stderr,
  348. "result_json": response.output_json,
  349. },
  350. )
  351. class HumanNodeExecutor(CompletedNodeExecutor):
  352. def __init__(self, human_client: HumanServiceClient | None = None) -> None:
  353. super().__init__(
  354. executor_name="human-executor",
  355. supported_node_types=frozenset(
  356. {"human", "approval", "human-input", "human-takeover"}
  357. ),
  358. )
  359. self.human_client = human_client
  360. def execute(
  361. self,
  362. context: NodeExecutionContextContract,
  363. request: NodeExecutionRequestContract,
  364. ) -> NodeExecutionResultContract:
  365. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  366. if self.human_client is None:
  367. return NodeExecutionResultContract(
  368. status="failed",
  369. worker_key=worker_key,
  370. error_code="human_service_missing",
  371. error_message="human service client is not configured",
  372. )
  373. human_task_id = _resolve_existing_human_task_id(context)
  374. if human_task_id is None:
  375. return self._create_waiting_task(context=context, worker_key=worker_key)
  376. try:
  377. task = self.human_client.get_task(
  378. tenant_id=context.tenant_id,
  379. human_task_id=human_task_id,
  380. )
  381. except HumanServiceClientError as exc:
  382. return NodeExecutionResultContract(
  383. status="failed",
  384. worker_key=worker_key,
  385. error_code="human_task_lookup_failed",
  386. error_message=str(exc),
  387. )
  388. output_json: dict[str, JSONValue] = {
  389. "executor_name": self.executor_name,
  390. "human_task_id": task.id,
  391. "human_task_status": task.status,
  392. "response_payload_json": task.response_payload_json or {},
  393. }
  394. if task.status in {"pending", "claimed"}:
  395. return NodeExecutionResultContract(
  396. status="pending",
  397. worker_key=worker_key,
  398. output_text=f"waiting for human task: {task.id}",
  399. output_json=output_json,
  400. )
  401. if task.status in {"approved", "completed"}:
  402. return NodeExecutionResultContract(
  403. status="completed",
  404. worker_key=worker_key,
  405. output_json=output_json,
  406. )
  407. return NodeExecutionResultContract(
  408. status="failed",
  409. worker_key=worker_key,
  410. error_code=f"human_task_{task.status}",
  411. error_message=f"human task ended with status: {task.status}",
  412. output_json=output_json,
  413. )
  414. def _create_waiting_task(
  415. self,
  416. *,
  417. context: NodeExecutionContextContract,
  418. worker_key: str,
  419. ) -> NodeExecutionResultContract:
  420. render_context = _build_executor_template_context(context)
  421. task_type = _resolve_human_task_type(context.node_type, context.node_config_json)
  422. title = render_template_string(
  423. _read_string_value(context.node_config_json, "title")
  424. or f"Human task for {context.node_id}",
  425. render_context,
  426. )
  427. description_template = _read_string_value(context.node_config_json, "description")
  428. description = (
  429. render_template_string(description_template, render_context)
  430. if description_template is not None
  431. else None
  432. )
  433. request_payload_json = _render_json_dict(
  434. _read_dict_value(context.node_config_json, "request_payload_json"),
  435. render_context,
  436. )
  437. try:
  438. task = self.human_client.create_task(
  439. HumanTaskCreateContract(
  440. tenant_id=context.tenant_id,
  441. task_type=task_type,
  442. title=title,
  443. description=description,
  444. source_type="runtime-node",
  445. source_id=context.node_run_id,
  446. run_id=context.run_id,
  447. node_run_id=context.node_run_id,
  448. requested_by=_read_string_value(
  449. context.node_config_json,
  450. "requested_by",
  451. ),
  452. assigned_to=_read_string_value(
  453. context.node_config_json,
  454. "assigned_to",
  455. ),
  456. request_payload_json=request_payload_json,
  457. due_time=_resolve_due_time(context.node_config_json),
  458. )
  459. )
  460. except HumanServiceClientError as exc:
  461. return NodeExecutionResultContract(
  462. status="failed",
  463. worker_key=worker_key,
  464. error_code="human_task_create_failed",
  465. error_message=str(exc),
  466. )
  467. return NodeExecutionResultContract(
  468. status="pending",
  469. worker_key=worker_key,
  470. output_text=f"waiting for human task: {task.id}",
  471. output_json={
  472. "executor_name": self.executor_name,
  473. "human_task_id": task.id,
  474. "human_task_status": task.status,
  475. "task_type": task.task_type,
  476. },
  477. )
  478. class AnswerNodeExecutor(CompletedNodeExecutor):
  479. def execute(
  480. self,
  481. context: NodeExecutionContextContract,
  482. request: NodeExecutionRequestContract,
  483. ) -> NodeExecutionResultContract:
  484. answer_text = _read_string_value(context.node_config_json, "text")
  485. template = _read_string_value(context.node_config_json, "template")
  486. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  487. if answer_text is None and template is None:
  488. return NodeExecutionResultContract(
  489. status="failed",
  490. worker_key=worker_key,
  491. error_code="answer_config_missing",
  492. error_message="answer node config requires text or template",
  493. )
  494. render_context = _build_executor_template_context(context)
  495. rendered_text = render_template_string(answer_text or template or "", render_context)
  496. return NodeExecutionResultContract(
  497. status="completed",
  498. worker_key=worker_key,
  499. output_text=rendered_text,
  500. output_json={
  501. "executor_name": self.executor_name,
  502. "render_mode": "text" if answer_text is not None else "template",
  503. },
  504. )
  505. def __init__(self) -> None:
  506. super().__init__(
  507. executor_name="answer-executor",
  508. supported_node_types=frozenset({"answer"}),
  509. )
  510. class ConditionNodeExecutor(CompletedNodeExecutor):
  511. def execute(
  512. self,
  513. context: NodeExecutionContextContract,
  514. request: NodeExecutionRequestContract,
  515. ) -> NodeExecutionResultContract:
  516. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  517. render_context = _build_executor_template_context(context)
  518. expression = _read_string_value(context.node_config_json, "expression")
  519. path = _read_string_value(context.node_config_json, "path")
  520. if expression is not None:
  521. condition_result = evaluate_condition_expression(expression, render_context)
  522. evaluated_expression = expression
  523. elif path is not None:
  524. condition_result = _evaluate_path_condition(
  525. context.node_config_json,
  526. path,
  527. render_context,
  528. )
  529. evaluated_expression = path
  530. else:
  531. return NodeExecutionResultContract(
  532. status="failed",
  533. worker_key=worker_key,
  534. error_code="condition_config_missing",
  535. error_message="condition node config requires expression or path",
  536. )
  537. route = "true" if condition_result else "false"
  538. return NodeExecutionResultContract(
  539. status="completed",
  540. worker_key=worker_key,
  541. output_json={
  542. "executor_name": self.executor_name,
  543. "condition_result": condition_result,
  544. "route": route,
  545. "evaluated_expression": evaluated_expression,
  546. },
  547. )
  548. def __init__(self) -> None:
  549. super().__init__(
  550. executor_name="condition-executor",
  551. supported_node_types=frozenset({"if-else", "condition"}),
  552. )
  553. class AssignerNodeExecutor(CompletedNodeExecutor):
  554. def execute(
  555. self,
  556. context: NodeExecutionContextContract,
  557. request: NodeExecutionRequestContract,
  558. ) -> NodeExecutionResultContract:
  559. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  560. assignments = _read_dict_value(context.node_config_json, "assignments")
  561. if not assignments:
  562. return NodeExecutionResultContract(
  563. status="failed",
  564. worker_key=worker_key,
  565. error_code="assignments_missing",
  566. error_message="assigner node config requires assignments",
  567. )
  568. render_context = _build_executor_template_context(context)
  569. rendered_assignments = _render_json_dict(assignments, render_context)
  570. return NodeExecutionResultContract(
  571. status="completed",
  572. worker_key=worker_key,
  573. output_json={
  574. "executor_name": self.executor_name,
  575. "assigned_values": rendered_assignments,
  576. "state_updates": rendered_assignments,
  577. },
  578. )
  579. def __init__(self) -> None:
  580. super().__init__(
  581. executor_name="assigner-executor",
  582. supported_node_types=frozenset({"assigner"}),
  583. )
  584. class RetrieverNodeExecutor(CompletedNodeExecutor):
  585. def __init__(self, knowledge_client: KnowledgeServiceClient | None = None) -> None:
  586. super().__init__(
  587. executor_name="retriever-executor",
  588. supported_node_types=frozenset({"knowledge-retrieval", "retriever"}),
  589. )
  590. self.knowledge_client = knowledge_client
  591. def execute(
  592. self,
  593. context: NodeExecutionContextContract,
  594. request: NodeExecutionRequestContract,
  595. ) -> NodeExecutionResultContract:
  596. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  597. render_context = _build_executor_template_context(context)
  598. query = _resolve_retriever_query(context.node_config_json, render_context)
  599. documents = _read_retriever_documents(context.node_config_json, render_context)
  600. source_url = _read_string_value(context.node_config_json, "source_url")
  601. knowledge_base_id = _read_string_value(context.node_config_json, "knowledge_base_id")
  602. top_k = _read_int_value(context.node_config_json, "top_k") or 3
  603. if query is None:
  604. return NodeExecutionResultContract(
  605. status="failed",
  606. worker_key=worker_key,
  607. error_code="retriever_query_missing",
  608. error_message="retriever node config requires query or query_template",
  609. )
  610. if source_url is not None:
  611. try:
  612. documents.extend(
  613. _fetch_retriever_documents_from_url(
  614. source_url=render_template_string(source_url, render_context),
  615. timeout_ms=_read_int_value(context.node_config_json, "timeout_ms") or 10000,
  616. render_context=render_context,
  617. )
  618. )
  619. except httpx.HTTPError as exc:
  620. return NodeExecutionResultContract(
  621. status="failed",
  622. worker_key=worker_key,
  623. error_code="retriever_source_request_failed",
  624. error_message=str(exc),
  625. )
  626. except ValueError as exc:
  627. return NodeExecutionResultContract(
  628. status="failed",
  629. worker_key=worker_key,
  630. error_code="retriever_source_invalid",
  631. error_message=str(exc),
  632. )
  633. knowledge_results: list[dict[str, JSONValue]] = []
  634. if knowledge_base_id is not None:
  635. if self.knowledge_client is None:
  636. return NodeExecutionResultContract(
  637. status="failed",
  638. worker_key=worker_key,
  639. error_code="knowledge_client_missing",
  640. error_message="knowledge-service client is not configured",
  641. )
  642. try:
  643. knowledge_results = [
  644. item.model_dump(mode="json")
  645. for item in self.knowledge_client.search(
  646. KnowledgeSearchRequestContract(
  647. tenant_id=context.tenant_id,
  648. knowledge_base_id=render_template_string(
  649. knowledge_base_id,
  650. render_context,
  651. ),
  652. query=query,
  653. top_k=top_k,
  654. filters_json=_render_json_dict(
  655. _read_dict_value(context.node_config_json, "filters_json"),
  656. render_context,
  657. ),
  658. )
  659. )
  660. ]
  661. except KnowledgeServiceClientError as exc:
  662. return NodeExecutionResultContract(
  663. status="failed",
  664. worker_key=worker_key,
  665. error_code="knowledge_search_failed",
  666. error_message=str(exc),
  667. )
  668. documents.extend(_knowledge_results_to_retriever_documents(knowledge_results))
  669. if not documents:
  670. return NodeExecutionResultContract(
  671. status="failed",
  672. worker_key=worker_key,
  673. error_code="retriever_documents_missing",
  674. error_message=(
  675. "retriever node config requires documents, source_url, "
  676. "or knowledge_base_id"
  677. ),
  678. )
  679. ranked_documents = rank_documents(query=query, documents=documents, top_k=top_k)
  680. output_documents = [item.to_output_json() for item in ranked_documents]
  681. output_text = "\n\n".join(item.text for item in ranked_documents)
  682. return NodeExecutionResultContract(
  683. status="completed",
  684. worker_key=worker_key,
  685. output_text=output_text,
  686. output_json={
  687. "executor_name": self.executor_name,
  688. "query": query,
  689. "top_k": top_k,
  690. "retrieved_documents": output_documents,
  691. "knowledge_base_id": knowledge_base_id,
  692. "knowledge_results": knowledge_results,
  693. },
  694. )
  695. class TemplateNodeExecutor(CompletedNodeExecutor):
  696. def execute(
  697. self,
  698. context: NodeExecutionContextContract,
  699. request: NodeExecutionRequestContract,
  700. ) -> NodeExecutionResultContract:
  701. worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
  702. render_context = _build_executor_template_context(context)
  703. template = _read_string_value(context.node_config_json, "template")
  704. template_json = _read_dict_value(context.node_config_json, "template_json")
  705. if template is None and not template_json:
  706. return NodeExecutionResultContract(
  707. status="failed",
  708. worker_key=worker_key,
  709. error_code="template_config_missing",
  710. error_message="template node config requires template or template_json",
  711. )
  712. rendered_text = None
  713. rendered_json = None
  714. if template is not None:
  715. rendered_text = render_template_string(template, render_context)
  716. if template_json:
  717. rendered_json = _render_json_dict(template_json, render_context)
  718. output_json: dict[str, JSONValue] = {"executor_name": self.executor_name}
  719. if rendered_json is not None:
  720. output_json["rendered_json"] = rendered_json
  721. return NodeExecutionResultContract(
  722. status="completed",
  723. worker_key=worker_key,
  724. output_text=rendered_text,
  725. output_json=output_json,
  726. )
  727. def __init__(self) -> None:
  728. super().__init__(
  729. executor_name="template-executor",
  730. supported_node_types=frozenset({"template-transform", "template"}),
  731. )
  732. class NodeExecutionDispatcher:
  733. def __init__(
  734. self,
  735. executors: list[NodeExecutor],
  736. default_executor: NodeExecutor,
  737. ) -> None:
  738. self.executors = executors
  739. self.default_executor = default_executor
  740. def resolve_executor(self, node_type: str) -> NodeExecutor:
  741. for executor in self.executors:
  742. if node_type in executor.supported_node_types:
  743. return executor
  744. return self.default_executor
  745. def execute(
  746. self,
  747. context: NodeExecutionContextContract,
  748. request: NodeExecutionRequestContract,
  749. ) -> tuple[NodeExecutionResultContract, str]:
  750. executor = self.resolve_executor(context.node_type)
  751. result = executor.execute(context, request)
  752. return result, executor.executor_name
  753. def build_node_execution_dispatcher() -> NodeExecutionDispatcher:
  754. executors: list[NodeExecutor] = [
  755. LLMNodeExecutor(),
  756. ToolNodeExecutor(),
  757. CodeNodeExecutor(),
  758. HumanNodeExecutor(),
  759. AnswerNodeExecutor(),
  760. ConditionNodeExecutor(),
  761. AssignerNodeExecutor(),
  762. RetrieverNodeExecutor(),
  763. TemplateNodeExecutor(),
  764. ]
  765. return NodeExecutionDispatcher(
  766. executors=executors,
  767. default_executor=DefaultNodeExecutor(),
  768. )
  769. def build_node_execution_dispatcher_with_clients(
  770. *,
  771. code_runner_client: CodeRunnerClient | None = None,
  772. model_gateway_client: ModelGatewayClient | None = None,
  773. tool_client: ToolServiceClient | None = None,
  774. knowledge_client: KnowledgeServiceClient | None = None,
  775. human_client: HumanServiceClient | None = None,
  776. ) -> NodeExecutionDispatcher:
  777. executors: list[NodeExecutor] = [
  778. LLMNodeExecutor(model_gateway_client=model_gateway_client),
  779. ToolNodeExecutor(tool_client=tool_client),
  780. CodeNodeExecutor(code_runner_client=code_runner_client),
  781. HumanNodeExecutor(human_client=human_client),
  782. AnswerNodeExecutor(),
  783. ConditionNodeExecutor(),
  784. AssignerNodeExecutor(),
  785. RetrieverNodeExecutor(knowledge_client=knowledge_client),
  786. TemplateNodeExecutor(),
  787. ]
  788. return NodeExecutionDispatcher(
  789. executors=executors,
  790. default_executor=DefaultNodeExecutor(),
  791. )
  792. def _read_string_value(payload: dict[str, JSONValue], key: str) -> str | None:
  793. value = payload.get(key)
  794. if isinstance(value, str):
  795. return value
  796. return None
  797. def _read_dict_value(payload: dict[str, JSONValue], key: str) -> dict[str, JSONValue]:
  798. value = payload.get(key)
  799. if isinstance(value, dict):
  800. return {str(item_key): item_value for item_key, item_value in value.items()}
  801. return {}
  802. def _merge_json_dicts(*items: dict[str, JSONValue]) -> dict[str, JSONValue]:
  803. merged: dict[str, JSONValue] = {}
  804. for item in items:
  805. merged.update(item)
  806. return merged
  807. def _render_json_dict(
  808. payload: dict[str, JSONValue],
  809. context: dict[str, JSONValue],
  810. ) -> dict[str, JSONValue]:
  811. rendered = render_json_value(payload, context)
  812. if isinstance(rendered, dict):
  813. return {str(key): value for key, value in rendered.items()}
  814. return {}
  815. def _render_config_json(
  816. payload: dict[str, JSONValue],
  817. context: dict[str, JSONValue],
  818. ) -> dict[str, JSONValue]:
  819. return _render_json_dict(payload, context)
  820. def _resolve_http_url(*, url: str | None, base_url: str | None, path: str | None) -> str | None:
  821. if url is not None:
  822. return url
  823. if base_url is None or path is None:
  824. return None
  825. return f"{base_url.rstrip('/')}/{path.lstrip('/')}"
  826. def _coerce_http_headers(payload: dict[str, JSONValue]) -> dict[str, str]:
  827. headers: dict[str, str] = {}
  828. for key, value in payload.items():
  829. if isinstance(value, (str, int, float, bool)):
  830. headers[key] = str(value)
  831. return headers
  832. def _coerce_http_params(payload: dict[str, JSONValue]) -> dict[str, str]:
  833. params: dict[str, str] = {}
  834. for key, value in payload.items():
  835. if isinstance(value, (str, int, float, bool)):
  836. params[key] = str(value)
  837. return params
  838. def _try_parse_json_response(response: httpx.Response) -> JSONValue | None:
  839. content_type = response.headers.get("content-type", "")
  840. if "json" not in content_type.lower():
  841. return None
  842. try:
  843. payload = response.json()
  844. except ValueError:
  845. return None
  846. if isinstance(payload, (dict, list, str, int, float, bool)) or payload is None:
  847. return payload
  848. return None
  849. def _build_chat_completion_request(
  850. payload: dict[str, JSONValue],
  851. ) -> ChatCompletionRequestContract | None:
  852. messages = _read_message_list(payload, "messages")
  853. if not messages:
  854. system_prompt = _read_string_value(payload, "system_prompt")
  855. prompt = _read_string_value(payload, "prompt")
  856. if system_prompt is not None:
  857. messages.append(ChatMessageContract(role="system", content=system_prompt))
  858. if prompt is not None:
  859. messages.append(ChatMessageContract(role="user", content=prompt))
  860. if not messages:
  861. return None
  862. temperature = _read_float_value(payload, "temperature")
  863. max_tokens = _read_int_value(payload, "max_tokens")
  864. model = _read_string_value(payload, "model")
  865. return ChatCompletionRequestContract(
  866. model=model,
  867. messages=messages,
  868. temperature=temperature,
  869. max_tokens=max_tokens,
  870. )
  871. def _read_message_list(
  872. payload: dict[str, JSONValue],
  873. key: str,
  874. ) -> list[ChatMessageContract]:
  875. value = payload.get(key)
  876. if not isinstance(value, list):
  877. return []
  878. messages: list[ChatMessageContract] = []
  879. for item in value:
  880. if not isinstance(item, dict):
  881. continue
  882. role = item.get("role")
  883. content = item.get("content")
  884. name = item.get("name")
  885. if isinstance(role, str) and isinstance(content, str):
  886. messages.append(
  887. ChatMessageContract(
  888. role=role,
  889. content=content,
  890. name=name if isinstance(name, str) else None,
  891. )
  892. )
  893. return messages
  894. def _read_float_value(payload: dict[str, JSONValue], key: str) -> float | None:
  895. value = payload.get(key)
  896. if isinstance(value, (int, float)) and not isinstance(value, bool):
  897. return float(value)
  898. return None
  899. def _read_int_value(payload: dict[str, JSONValue], key: str) -> int | None:
  900. value = payload.get(key)
  901. if isinstance(value, int) and not isinstance(value, bool):
  902. return value
  903. return None
  904. def _resolve_existing_human_task_id(context: NodeExecutionContextContract) -> str | None:
  905. configured_task_id = _read_string_value(context.node_config_json, "human_task_id")
  906. if configured_task_id is not None:
  907. return configured_task_id
  908. current_node_output = context.node_output_json_by_node_id.get(context.node_id)
  909. if current_node_output is None:
  910. return None
  911. return _read_string_value(current_node_output, "human_task_id")
  912. def _resolve_human_task_type(
  913. node_type: str,
  914. payload: dict[str, JSONValue],
  915. ) -> HumanTaskType:
  916. configured_task_type = _read_string_value(payload, "task_type")
  917. if configured_task_type in {"approval", "input", "takeover", "pause", "resume"}:
  918. return cast(HumanTaskType, configured_task_type)
  919. if node_type == "approval":
  920. return "approval"
  921. if node_type == "human-input":
  922. return "input"
  923. if node_type == "human-takeover":
  924. return "takeover"
  925. return "input"
  926. def _resolve_due_time(payload: dict[str, JSONValue]) -> datetime | None:
  927. due_time = _read_datetime_value(payload, "due_time")
  928. if due_time is not None:
  929. return due_time
  930. due_after_seconds = _read_int_value(payload, "due_after_seconds")
  931. if due_after_seconds is None or due_after_seconds <= 0:
  932. return None
  933. return datetime.utcnow() + timedelta(seconds=due_after_seconds)
  934. def _read_datetime_value(payload: dict[str, JSONValue], key: str) -> datetime | None:
  935. value = payload.get(key)
  936. if isinstance(value, str) and value.strip():
  937. normalized_value = value.strip().replace("Z", "+00:00")
  938. try:
  939. return datetime.fromisoformat(normalized_value)
  940. except ValueError:
  941. return None
  942. return None
  943. def _build_executor_template_context(context: NodeExecutionContextContract) -> dict[str, JSONValue]:
  944. return build_template_context(
  945. node_id=context.node_id,
  946. node_type=context.node_type,
  947. run_state_json=context.run_state_json,
  948. node_output_json_by_node_id=context.node_output_json_by_node_id,
  949. node_output_text_by_node_id=context.node_output_text_by_node_id,
  950. )
  951. def _evaluate_path_condition(
  952. payload: dict[str, JSONValue],
  953. path: str,
  954. render_context: dict[str, JSONValue],
  955. ) -> bool:
  956. value = resolve_expression(render_context, path)
  957. if "equals" in payload:
  958. return value == render_json_value(payload["equals"], render_context)
  959. if "not_equals" in payload:
  960. return value != render_json_value(payload["not_equals"], render_context)
  961. if "gt" in payload:
  962. return _compare_numeric(value, render_json_value(payload["gt"], render_context), ">")
  963. if "gte" in payload:
  964. return _compare_numeric(value, render_json_value(payload["gte"], render_context), ">=")
  965. if "lt" in payload:
  966. return _compare_numeric(value, render_json_value(payload["lt"], render_context), "<")
  967. if "lte" in payload:
  968. return _compare_numeric(value, render_json_value(payload["lte"], render_context), "<=")
  969. if "exists" in payload:
  970. expected = payload["exists"]
  971. if isinstance(expected, bool):
  972. return (value is not None) is expected
  973. return coerce_bool(value)
  974. def _compare_numeric(left: JSONValue, right: JSONValue, operator: str) -> bool:
  975. if not isinstance(left, (int, float)) or not isinstance(right, (int, float)):
  976. return False
  977. if operator == ">":
  978. return left > right
  979. if operator == ">=":
  980. return left >= right
  981. if operator == "<":
  982. return left < right
  983. if operator == "<=":
  984. return left <= right
  985. return False
  986. @dataclass(frozen=True)
  987. class RetrieverDocument:
  988. document_id: str
  989. title: str | None
  990. text: str
  991. metadata: dict[str, JSONValue]
  992. @dataclass(frozen=True)
  993. class RankedRetrieverDocument:
  994. document_id: str
  995. title: str | None
  996. text: str
  997. metadata: dict[str, JSONValue]
  998. score: float
  999. def to_output_json(self) -> dict[str, JSONValue]:
  1000. return {
  1001. "document_id": self.document_id,
  1002. "title": self.title,
  1003. "text": self.text,
  1004. "metadata": self.metadata,
  1005. "score": self.score,
  1006. }
  1007. def _resolve_retriever_query(
  1008. payload: dict[str, JSONValue],
  1009. render_context: dict[str, JSONValue],
  1010. ) -> str | None:
  1011. query = _read_string_value(payload, "query")
  1012. query_template = _read_string_value(payload, "query_template")
  1013. if query is not None:
  1014. rendered_query = render_template_string(query, render_context)
  1015. elif query_template is not None:
  1016. rendered_query = render_template_string(query_template, render_context)
  1017. else:
  1018. return None
  1019. stripped_query = rendered_query.strip()
  1020. if not stripped_query:
  1021. return None
  1022. return stripped_query
  1023. def _read_retriever_documents(
  1024. payload: dict[str, JSONValue],
  1025. render_context: dict[str, JSONValue],
  1026. ) -> list[RetrieverDocument]:
  1027. value = payload.get("documents")
  1028. if not isinstance(value, list):
  1029. return []
  1030. documents: list[RetrieverDocument] = []
  1031. for index, item in enumerate(value):
  1032. document = _parse_retriever_document(
  1033. item,
  1034. index=index,
  1035. render_context=render_context,
  1036. )
  1037. if document is not None:
  1038. documents.append(document)
  1039. return documents
  1040. def _fetch_retriever_documents_from_url(
  1041. *,
  1042. source_url: str,
  1043. timeout_ms: int,
  1044. render_context: dict[str, JSONValue],
  1045. ) -> list[RetrieverDocument]:
  1046. if not source_url.strip():
  1047. return []
  1048. with httpx.Client(timeout=timeout_ms / 1000) as client:
  1049. response = client.get(source_url)
  1050. response.raise_for_status()
  1051. payload = response.json()
  1052. if isinstance(payload, dict):
  1053. documents_payload = payload.get("documents")
  1054. else:
  1055. documents_payload = payload
  1056. if not isinstance(documents_payload, list):
  1057. raise ValueError("retriever source must return a JSON list or object.documents list")
  1058. documents: list[RetrieverDocument] = []
  1059. for index, item in enumerate(documents_payload):
  1060. if not _is_json_value(item):
  1061. continue
  1062. document = _parse_retriever_document(
  1063. item,
  1064. index=index,
  1065. render_context=render_context,
  1066. )
  1067. if document is not None:
  1068. documents.append(document)
  1069. return documents
  1070. def _parse_retriever_document(
  1071. value: JSONValue,
  1072. *,
  1073. index: int,
  1074. render_context: dict[str, JSONValue],
  1075. ) -> RetrieverDocument | None:
  1076. if isinstance(value, str):
  1077. text = render_template_string(value, render_context).strip()
  1078. if not text:
  1079. return None
  1080. return RetrieverDocument(
  1081. document_id=f"doc-{index + 1}",
  1082. title=None,
  1083. text=text,
  1084. metadata={},
  1085. )
  1086. if not isinstance(value, dict):
  1087. return None
  1088. rendered = _render_json_dict({str(key): item for key, item in value.items()}, render_context)
  1089. text_value = rendered.get("text") or rendered.get("content")
  1090. if not isinstance(text_value, str) or not text_value.strip():
  1091. return None
  1092. document_id_value = rendered.get("id") or rendered.get("document_id")
  1093. title_value = rendered.get("title")
  1094. metadata_value = rendered.get("metadata")
  1095. return RetrieverDocument(
  1096. document_id=str(document_id_value) if document_id_value is not None else f"doc-{index + 1}",
  1097. title=title_value if isinstance(title_value, str) else None,
  1098. text=text_value.strip(),
  1099. metadata=metadata_value if isinstance(metadata_value, dict) else {},
  1100. )
  1101. def _knowledge_results_to_retriever_documents(
  1102. results: list[dict[str, JSONValue]],
  1103. ) -> list[RetrieverDocument]:
  1104. documents: list[RetrieverDocument] = []
  1105. for index, result in enumerate(results):
  1106. chunk = result.get("chunk")
  1107. document = result.get("document")
  1108. score = result.get("score")
  1109. score_json = result.get("score_json")
  1110. if not isinstance(chunk, dict) or not isinstance(document, dict):
  1111. continue
  1112. content_text = chunk.get("content_text")
  1113. if not isinstance(content_text, str) or not content_text.strip():
  1114. continue
  1115. document_id = document.get("id")
  1116. title = document.get("title")
  1117. metadata: dict[str, JSONValue] = {
  1118. "source": "knowledge-service",
  1119. "chunk_id": str(chunk.get("id")) if chunk.get("id") is not None else None,
  1120. "chunk_index": (
  1121. chunk.get("chunk_index")
  1122. if isinstance(chunk.get("chunk_index"), int)
  1123. else index
  1124. ),
  1125. "score": score if isinstance(score, (int, float)) else None,
  1126. "score_json": score_json if isinstance(score_json, dict) else {},
  1127. }
  1128. documents.append(
  1129. RetrieverDocument(
  1130. document_id=str(document_id)
  1131. if document_id is not None
  1132. else f"knowledge-{index + 1}",
  1133. title=title if isinstance(title, str) else None,
  1134. text=content_text.strip(),
  1135. metadata=metadata,
  1136. )
  1137. )
  1138. return documents
  1139. def rank_documents(
  1140. *,
  1141. query: str,
  1142. documents: list[RetrieverDocument],
  1143. top_k: int,
  1144. ) -> list[RankedRetrieverDocument]:
  1145. normalized_top_k = max(top_k, 1)
  1146. query_tokens = tokenize_text(query)
  1147. ranked_documents: list[RankedRetrieverDocument] = []
  1148. for document in documents:
  1149. document_tokens = tokenize_text(" ".join(filter(None, [document.title, document.text])))
  1150. score = calculate_keyword_score(query_tokens=query_tokens, document_tokens=document_tokens)
  1151. ranked_documents.append(
  1152. RankedRetrieverDocument(
  1153. document_id=document.document_id,
  1154. title=document.title,
  1155. text=document.text,
  1156. metadata=document.metadata,
  1157. score=score,
  1158. )
  1159. )
  1160. ranked_documents.sort(key=lambda item: item.score, reverse=True)
  1161. return ranked_documents[:normalized_top_k]
  1162. def calculate_keyword_score(
  1163. *,
  1164. query_tokens: set[str],
  1165. document_tokens: set[str],
  1166. ) -> float:
  1167. if not query_tokens or not document_tokens:
  1168. return 0.0
  1169. overlap_count = len(query_tokens.intersection(document_tokens))
  1170. if overlap_count == 0:
  1171. return 0.0
  1172. return round(overlap_count / len(query_tokens), 4)
  1173. def tokenize_text(value: str) -> set[str]:
  1174. tokens = {item.lower() for item in re.findall(r"[\w\u4e00-\u9fff]+", value)}
  1175. return {item for item in tokens if item}
  1176. def _is_json_value(value: object) -> bool:
  1177. if value is None or isinstance(value, (str, int, float, bool)):
  1178. return True
  1179. if isinstance(value, list):
  1180. return all(_is_json_value(item) for item in value)
  1181. if isinstance(value, dict):
  1182. return all(isinstance(key, str) and _is_json_value(item) for key, item in value.items())
  1183. return False