| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343 |
- from abc import ABC, abstractmethod
- from dataclasses import dataclass
- from datetime import datetime, timedelta
- import re
- from typing import cast
- import httpx
- from core_domain import (
- ChatCompletionRequestContract,
- ChatMessageContract,
- CodeExecutionRequestContract,
- HumanTaskCreateContract,
- HumanTaskType,
- KnowledgeSearchRequestContract,
- NodeExecutionContextContract,
- NodeExecutionRequestContract,
- NodeExecutionResultContract,
- ToolBindingDetailContract,
- )
- from core_shared import JSONValue
- from .code_runner_client import CodeRunnerClient, CodeRunnerClientError
- from .context import (
- build_template_context,
- coerce_bool,
- evaluate_condition_expression,
- render_json_value,
- render_template_string,
- resolve_expression,
- )
- from .human_client import HumanServiceClient, HumanServiceClientError
- from .knowledge_client import KnowledgeServiceClient, KnowledgeServiceClientError
- from .model_gateway_client import ModelGatewayClient, ModelGatewayClientError
- from .tool_client import ToolServiceClient, ToolServiceClientError
- class NodeExecutor(ABC):
- executor_name: str
- supported_node_types: frozenset[str]
- @abstractmethod
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- raise NotImplementedError
- class CompletedNodeExecutor(NodeExecutor):
- def __init__(self, *, executor_name: str, supported_node_types: frozenset[str]) -> None:
- self.executor_name = executor_name
- self.supported_node_types = supported_node_types
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_json={
- "executor_name": self.executor_name,
- "node_type": context.node_type,
- },
- )
- class DefaultNodeExecutor(CompletedNodeExecutor):
- def __init__(self) -> None:
- super().__init__(
- executor_name="default-executor",
- supported_node_types=frozenset(),
- )
- class LLMNodeExecutor(CompletedNodeExecutor):
- def __init__(self, model_gateway_client: ModelGatewayClient | None = None) -> None:
- super().__init__(
- executor_name="llm-executor",
- supported_node_types=frozenset({"llm"}),
- )
- self.model_gateway_client = model_gateway_client
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- render_context = _build_executor_template_context(context)
- rendered_config_json = _render_config_json(context.node_config_json, render_context)
- chat_request = _build_chat_completion_request(rendered_config_json)
- if chat_request is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="llm_config_missing",
- error_message="llm node config requires prompt or messages",
- )
- if self.model_gateway_client is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="llm_gateway_missing",
- error_message="model gateway client is not configured",
- )
- try:
- response = self.model_gateway_client.create_chat_completion(chat_request)
- except ModelGatewayClientError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="llm_request_failed",
- error_message=str(exc),
- )
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=response.content,
- output_json={
- "executor_name": self.executor_name,
- "model": response.model,
- "finish_reason": response.finish_reason,
- "usage_json": response.usage_json,
- "raw_response_json": response.raw_response_json,
- },
- )
- class ToolNodeExecutor(CompletedNodeExecutor):
- def __init__(self, tool_client: ToolServiceClient | None = None) -> None:
- super().__init__(
- executor_name="tool-executor",
- supported_node_types=frozenset({"tool"}),
- )
- self.tool_client = tool_client
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- tool_binding_id = _read_string_value(context.node_config_json, "tool_binding_id")
- tool_code = _read_string_value(context.node_config_json, "tool_code")
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- if tool_binding_id is None and tool_code is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="tool_config_missing",
- error_message="tool node config requires tool_binding_id or tool_code",
- )
- if tool_binding_id is not None and self.tool_client is not None:
- try:
- detail = self.tool_client.get_tool_binding_detail(
- tenant_id=context.tenant_id,
- binding_id=tool_binding_id,
- )
- except ToolServiceClientError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="tool_binding_lookup_failed",
- error_message=str(exc),
- )
- if not detail.binding.enabled:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="tool_binding_disabled",
- error_message=f"tool binding is disabled: {tool_binding_id}",
- )
- resolved_tool_code = detail.tool_definition.code
- resolved_tool_version_id = detail.tool_version.id
- resolved_tool_name = detail.tool_definition.name
- invoke_result = self._invoke_http_tool(
- context=context,
- detail=detail,
- worker_key=worker_key,
- )
- if invoke_result is not None:
- return invoke_result
- else:
- resolved_tool_code = tool_code
- resolved_tool_version_id = None
- resolved_tool_name = None
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=f"tool node completed: {resolved_tool_code or 'unknown-tool'}",
- output_json={
- "executor_name": self.executor_name,
- "tool_binding_id": tool_binding_id,
- "tool_code": resolved_tool_code,
- "tool_version_id": resolved_tool_version_id,
- "tool_name": resolved_tool_name,
- },
- )
- def _invoke_http_tool(
- self,
- *,
- context: NodeExecutionContextContract,
- detail: ToolBindingDetailContract,
- worker_key: str,
- ) -> NodeExecutionResultContract | None:
- if detail.tool_definition.tool_type != "http":
- return None
- invoke_config_json = detail.tool_version.invoke_config_json or {}
- binding_config_json = detail.binding.config_json or {}
- render_context = _build_executor_template_context(context)
- request_headers = _merge_json_dicts(
- _render_json_dict(_read_dict_value(invoke_config_json, "headers"), render_context),
- _render_json_dict(_read_dict_value(binding_config_json, "headers"), render_context),
- _render_json_dict(
- _read_dict_value(context.node_config_json, "headers"),
- render_context,
- ),
- )
- request_query = _merge_json_dicts(
- _render_json_dict(_read_dict_value(invoke_config_json, "query"), render_context),
- _render_json_dict(_read_dict_value(context.node_config_json, "query"), render_context),
- )
- request_body = _merge_json_dicts(
- _render_json_dict(_read_dict_value(invoke_config_json, "body"), render_context),
- _render_json_dict(_read_dict_value(context.node_config_json, "body"), render_context),
- )
- method = (_read_string_value(invoke_config_json, "method") or "GET").upper()
- base_url = (
- _read_string_value(context.node_config_json, "base_url")
- or _read_string_value(binding_config_json, "base_url")
- or _read_string_value(invoke_config_json, "base_url")
- )
- path = _read_string_value(context.node_config_json, "path") or _read_string_value(
- invoke_config_json, "path"
- )
- url = _read_string_value(context.node_config_json, "url") or _read_string_value(
- invoke_config_json, "url"
- )
- resolved_url = _resolve_http_url(url=url, base_url=base_url, path=path)
- if resolved_url is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="tool_http_url_missing",
- error_message="http tool requires url or base_url with path",
- )
- timeout_ms = detail.tool_version.timeout_ms or 10000
- try:
- with httpx.Client(timeout=timeout_ms / 1000) as client:
- response = client.request(
- method=method,
- url=resolved_url,
- params=_coerce_http_params(request_query),
- headers=_coerce_http_headers(request_headers),
- json=request_body if request_body else None,
- )
- response.raise_for_status()
- except httpx.HTTPError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="tool_http_request_failed",
- error_message=str(exc),
- output_json={
- "executor_name": self.executor_name,
- "tool_binding_id": detail.binding.id,
- "tool_code": detail.tool_definition.code,
- "request_url": resolved_url,
- "request_method": method,
- },
- )
- response_json = _try_parse_json_response(response)
- response_text = None if response_json is not None else response.text
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=response_text,
- output_json={
- "executor_name": self.executor_name,
- "tool_binding_id": detail.binding.id,
- "tool_code": detail.tool_definition.code,
- "tool_version_id": detail.tool_version.id,
- "tool_name": detail.tool_definition.name,
- "request_url": resolved_url,
- "request_method": method,
- "response_status_code": response.status_code,
- "response_headers": dict(response.headers),
- "response_json": response_json,
- },
- )
- class CodeNodeExecutor(CompletedNodeExecutor):
- def __init__(self, code_runner_client: CodeRunnerClient | None = None) -> None:
- super().__init__(
- executor_name="code-executor",
- supported_node_types=frozenset({"code"}),
- )
- self.code_runner_client = code_runner_client
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- code = _read_string_value(context.node_config_json, "code")
- if code is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="code_config_missing",
- error_message="code node config requires code",
- )
- if self.code_runner_client is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="code_runner_missing",
- error_message="code runner client is not configured",
- )
- render_context = _build_executor_template_context(context)
- input_json = _render_json_dict(
- _read_dict_value(context.node_config_json, "input_json"),
- render_context,
- )
- language = _read_string_value(context.node_config_json, "language") or "python"
- timeout_seconds = _read_int_value(context.node_config_json, "timeout_seconds") or 10
- code_request = CodeExecutionRequestContract(
- language=language,
- code=code,
- input_json=input_json,
- timeout_seconds=timeout_seconds,
- )
- try:
- response = self.code_runner_client.execute_code(code_request)
- except CodeRunnerClientError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="code_request_failed",
- error_message=str(exc),
- )
- if not response.success:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="code_execution_failed",
- error_message=response.error_message or response.stderr,
- output_text=response.stdout,
- output_json={
- "executor_name": self.executor_name,
- "stderr": response.stderr,
- "output_json": response.output_json,
- },
- )
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=response.stdout,
- output_json={
- "executor_name": self.executor_name,
- "stderr": response.stderr,
- "result_json": response.output_json,
- },
- )
- class HumanNodeExecutor(CompletedNodeExecutor):
- def __init__(self, human_client: HumanServiceClient | None = None) -> None:
- super().__init__(
- executor_name="human-executor",
- supported_node_types=frozenset(
- {"human", "approval", "human-input", "human-takeover"}
- ),
- )
- self.human_client = human_client
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- if self.human_client is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="human_service_missing",
- error_message="human service client is not configured",
- )
- human_task_id = _resolve_existing_human_task_id(context)
- if human_task_id is None:
- return self._create_waiting_task(context=context, worker_key=worker_key)
- try:
- task = self.human_client.get_task(
- tenant_id=context.tenant_id,
- human_task_id=human_task_id,
- )
- except HumanServiceClientError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="human_task_lookup_failed",
- error_message=str(exc),
- )
- output_json: dict[str, JSONValue] = {
- "executor_name": self.executor_name,
- "human_task_id": task.id,
- "human_task_status": task.status,
- "response_payload_json": task.response_payload_json or {},
- }
- if task.status in {"pending", "claimed"}:
- return NodeExecutionResultContract(
- status="pending",
- worker_key=worker_key,
- output_text=f"waiting for human task: {task.id}",
- output_json=output_json,
- )
- if task.status in {"approved", "completed"}:
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_json=output_json,
- )
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code=f"human_task_{task.status}",
- error_message=f"human task ended with status: {task.status}",
- output_json=output_json,
- )
- def _create_waiting_task(
- self,
- *,
- context: NodeExecutionContextContract,
- worker_key: str,
- ) -> NodeExecutionResultContract:
- render_context = _build_executor_template_context(context)
- task_type = _resolve_human_task_type(context.node_type, context.node_config_json)
- title = render_template_string(
- _read_string_value(context.node_config_json, "title")
- or f"Human task for {context.node_id}",
- render_context,
- )
- description_template = _read_string_value(context.node_config_json, "description")
- description = (
- render_template_string(description_template, render_context)
- if description_template is not None
- else None
- )
- request_payload_json = _render_json_dict(
- _read_dict_value(context.node_config_json, "request_payload_json"),
- render_context,
- )
- try:
- task = self.human_client.create_task(
- HumanTaskCreateContract(
- tenant_id=context.tenant_id,
- task_type=task_type,
- title=title,
- description=description,
- source_type="runtime-node",
- source_id=context.node_run_id,
- run_id=context.run_id,
- node_run_id=context.node_run_id,
- requested_by=_read_string_value(
- context.node_config_json,
- "requested_by",
- ),
- assigned_to=_read_string_value(
- context.node_config_json,
- "assigned_to",
- ),
- request_payload_json=request_payload_json,
- due_time=_resolve_due_time(context.node_config_json),
- )
- )
- except HumanServiceClientError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="human_task_create_failed",
- error_message=str(exc),
- )
- return NodeExecutionResultContract(
- status="pending",
- worker_key=worker_key,
- output_text=f"waiting for human task: {task.id}",
- output_json={
- "executor_name": self.executor_name,
- "human_task_id": task.id,
- "human_task_status": task.status,
- "task_type": task.task_type,
- },
- )
- class AnswerNodeExecutor(CompletedNodeExecutor):
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- answer_text = _read_string_value(context.node_config_json, "text")
- template = _read_string_value(context.node_config_json, "template")
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- if answer_text is None and template is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="answer_config_missing",
- error_message="answer node config requires text or template",
- )
- render_context = _build_executor_template_context(context)
- rendered_text = render_template_string(answer_text or template or "", render_context)
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=rendered_text,
- output_json={
- "executor_name": self.executor_name,
- "render_mode": "text" if answer_text is not None else "template",
- },
- )
- def __init__(self) -> None:
- super().__init__(
- executor_name="answer-executor",
- supported_node_types=frozenset({"answer"}),
- )
- class ConditionNodeExecutor(CompletedNodeExecutor):
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- render_context = _build_executor_template_context(context)
- expression = _read_string_value(context.node_config_json, "expression")
- path = _read_string_value(context.node_config_json, "path")
- if expression is not None:
- condition_result = evaluate_condition_expression(expression, render_context)
- evaluated_expression = expression
- elif path is not None:
- condition_result = _evaluate_path_condition(
- context.node_config_json,
- path,
- render_context,
- )
- evaluated_expression = path
- else:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="condition_config_missing",
- error_message="condition node config requires expression or path",
- )
- route = "true" if condition_result else "false"
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_json={
- "executor_name": self.executor_name,
- "condition_result": condition_result,
- "route": route,
- "evaluated_expression": evaluated_expression,
- },
- )
- def __init__(self) -> None:
- super().__init__(
- executor_name="condition-executor",
- supported_node_types=frozenset({"if-else", "condition"}),
- )
- class AssignerNodeExecutor(CompletedNodeExecutor):
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- assignments = _read_dict_value(context.node_config_json, "assignments")
- if not assignments:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="assignments_missing",
- error_message="assigner node config requires assignments",
- )
- render_context = _build_executor_template_context(context)
- rendered_assignments = _render_json_dict(assignments, render_context)
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_json={
- "executor_name": self.executor_name,
- "assigned_values": rendered_assignments,
- "state_updates": rendered_assignments,
- },
- )
- def __init__(self) -> None:
- super().__init__(
- executor_name="assigner-executor",
- supported_node_types=frozenset({"assigner"}),
- )
- class RetrieverNodeExecutor(CompletedNodeExecutor):
- def __init__(self, knowledge_client: KnowledgeServiceClient | None = None) -> None:
- super().__init__(
- executor_name="retriever-executor",
- supported_node_types=frozenset({"knowledge-retrieval", "retriever"}),
- )
- self.knowledge_client = knowledge_client
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- render_context = _build_executor_template_context(context)
- query = _resolve_retriever_query(context.node_config_json, render_context)
- documents = _read_retriever_documents(context.node_config_json, render_context)
- source_url = _read_string_value(context.node_config_json, "source_url")
- knowledge_base_id = _read_string_value(context.node_config_json, "knowledge_base_id")
- top_k = _read_int_value(context.node_config_json, "top_k") or 3
- if query is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="retriever_query_missing",
- error_message="retriever node config requires query or query_template",
- )
- if source_url is not None:
- try:
- documents.extend(
- _fetch_retriever_documents_from_url(
- source_url=render_template_string(source_url, render_context),
- timeout_ms=_read_int_value(context.node_config_json, "timeout_ms") or 10000,
- render_context=render_context,
- )
- )
- except httpx.HTTPError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="retriever_source_request_failed",
- error_message=str(exc),
- )
- except ValueError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="retriever_source_invalid",
- error_message=str(exc),
- )
- knowledge_results: list[dict[str, JSONValue]] = []
- if knowledge_base_id is not None:
- if self.knowledge_client is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="knowledge_client_missing",
- error_message="knowledge-service client is not configured",
- )
- try:
- knowledge_results = [
- item.model_dump(mode="json")
- for item in self.knowledge_client.search(
- KnowledgeSearchRequestContract(
- tenant_id=context.tenant_id,
- knowledge_base_id=render_template_string(
- knowledge_base_id,
- render_context,
- ),
- query=query,
- top_k=top_k,
- filters_json=_render_json_dict(
- _read_dict_value(context.node_config_json, "filters_json"),
- render_context,
- ),
- )
- )
- ]
- except KnowledgeServiceClientError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="knowledge_search_failed",
- error_message=str(exc),
- )
- documents.extend(_knowledge_results_to_retriever_documents(knowledge_results))
- if not documents:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="retriever_documents_missing",
- error_message=(
- "retriever node config requires documents, source_url, "
- "or knowledge_base_id"
- ),
- )
- ranked_documents = rank_documents(query=query, documents=documents, top_k=top_k)
- output_documents = [item.to_output_json() for item in ranked_documents]
- output_text = "\n\n".join(item.text for item in ranked_documents)
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=output_text,
- output_json={
- "executor_name": self.executor_name,
- "query": query,
- "top_k": top_k,
- "retrieved_documents": output_documents,
- "knowledge_base_id": knowledge_base_id,
- "knowledge_results": knowledge_results,
- },
- )
- class TemplateNodeExecutor(CompletedNodeExecutor):
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- render_context = _build_executor_template_context(context)
- template = _read_string_value(context.node_config_json, "template")
- template_json = _read_dict_value(context.node_config_json, "template_json")
- if template is None and not template_json:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="template_config_missing",
- error_message="template node config requires template or template_json",
- )
- rendered_text = None
- rendered_json = None
- if template is not None:
- rendered_text = render_template_string(template, render_context)
- if template_json:
- rendered_json = _render_json_dict(template_json, render_context)
- output_json: dict[str, JSONValue] = {"executor_name": self.executor_name}
- if rendered_json is not None:
- output_json["rendered_json"] = rendered_json
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=rendered_text,
- output_json=output_json,
- )
- def __init__(self) -> None:
- super().__init__(
- executor_name="template-executor",
- supported_node_types=frozenset({"template-transform", "template"}),
- )
- class NodeExecutionDispatcher:
- def __init__(
- self,
- executors: list[NodeExecutor],
- default_executor: NodeExecutor,
- ) -> None:
- self.executors = executors
- self.default_executor = default_executor
- def resolve_executor(self, node_type: str) -> NodeExecutor:
- for executor in self.executors:
- if node_type in executor.supported_node_types:
- return executor
- return self.default_executor
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> tuple[NodeExecutionResultContract, str]:
- executor = self.resolve_executor(context.node_type)
- result = executor.execute(context, request)
- return result, executor.executor_name
- def build_node_execution_dispatcher() -> NodeExecutionDispatcher:
- executors: list[NodeExecutor] = [
- LLMNodeExecutor(),
- ToolNodeExecutor(),
- CodeNodeExecutor(),
- HumanNodeExecutor(),
- AnswerNodeExecutor(),
- ConditionNodeExecutor(),
- AssignerNodeExecutor(),
- RetrieverNodeExecutor(),
- TemplateNodeExecutor(),
- ]
- return NodeExecutionDispatcher(
- executors=executors,
- default_executor=DefaultNodeExecutor(),
- )
- def build_node_execution_dispatcher_with_clients(
- *,
- code_runner_client: CodeRunnerClient | None = None,
- model_gateway_client: ModelGatewayClient | None = None,
- tool_client: ToolServiceClient | None = None,
- knowledge_client: KnowledgeServiceClient | None = None,
- human_client: HumanServiceClient | None = None,
- ) -> NodeExecutionDispatcher:
- executors: list[NodeExecutor] = [
- LLMNodeExecutor(model_gateway_client=model_gateway_client),
- ToolNodeExecutor(tool_client=tool_client),
- CodeNodeExecutor(code_runner_client=code_runner_client),
- HumanNodeExecutor(human_client=human_client),
- AnswerNodeExecutor(),
- ConditionNodeExecutor(),
- AssignerNodeExecutor(),
- RetrieverNodeExecutor(knowledge_client=knowledge_client),
- TemplateNodeExecutor(),
- ]
- return NodeExecutionDispatcher(
- executors=executors,
- default_executor=DefaultNodeExecutor(),
- )
- def _read_string_value(payload: dict[str, JSONValue], key: str) -> str | None:
- value = payload.get(key)
- if isinstance(value, str):
- return value
- return None
- def _read_dict_value(payload: dict[str, JSONValue], key: str) -> dict[str, JSONValue]:
- value = payload.get(key)
- if isinstance(value, dict):
- return {str(item_key): item_value for item_key, item_value in value.items()}
- return {}
- def _merge_json_dicts(*items: dict[str, JSONValue]) -> dict[str, JSONValue]:
- merged: dict[str, JSONValue] = {}
- for item in items:
- merged.update(item)
- return merged
- def _render_json_dict(
- payload: dict[str, JSONValue],
- context: dict[str, JSONValue],
- ) -> dict[str, JSONValue]:
- rendered = render_json_value(payload, context)
- if isinstance(rendered, dict):
- return {str(key): value for key, value in rendered.items()}
- return {}
- def _render_config_json(
- payload: dict[str, JSONValue],
- context: dict[str, JSONValue],
- ) -> dict[str, JSONValue]:
- return _render_json_dict(payload, context)
- def _resolve_http_url(*, url: str | None, base_url: str | None, path: str | None) -> str | None:
- if url is not None:
- return url
- if base_url is None or path is None:
- return None
- return f"{base_url.rstrip('/')}/{path.lstrip('/')}"
- def _coerce_http_headers(payload: dict[str, JSONValue]) -> dict[str, str]:
- headers: dict[str, str] = {}
- for key, value in payload.items():
- if isinstance(value, (str, int, float, bool)):
- headers[key] = str(value)
- return headers
- def _coerce_http_params(payload: dict[str, JSONValue]) -> dict[str, str]:
- params: dict[str, str] = {}
- for key, value in payload.items():
- if isinstance(value, (str, int, float, bool)):
- params[key] = str(value)
- return params
- def _try_parse_json_response(response: httpx.Response) -> JSONValue | None:
- content_type = response.headers.get("content-type", "")
- if "json" not in content_type.lower():
- return None
- try:
- payload = response.json()
- except ValueError:
- return None
- if isinstance(payload, (dict, list, str, int, float, bool)) or payload is None:
- return payload
- return None
- def _build_chat_completion_request(
- payload: dict[str, JSONValue],
- ) -> ChatCompletionRequestContract | None:
- messages = _read_message_list(payload, "messages")
- if not messages:
- system_prompt = _read_string_value(payload, "system_prompt")
- prompt = _read_string_value(payload, "prompt")
- if system_prompt is not None:
- messages.append(ChatMessageContract(role="system", content=system_prompt))
- if prompt is not None:
- messages.append(ChatMessageContract(role="user", content=prompt))
- if not messages:
- return None
- temperature = _read_float_value(payload, "temperature")
- max_tokens = _read_int_value(payload, "max_tokens")
- model = _read_string_value(payload, "model")
- return ChatCompletionRequestContract(
- model=model,
- messages=messages,
- temperature=temperature,
- max_tokens=max_tokens,
- )
- def _read_message_list(
- payload: dict[str, JSONValue],
- key: str,
- ) -> list[ChatMessageContract]:
- value = payload.get(key)
- if not isinstance(value, list):
- return []
- messages: list[ChatMessageContract] = []
- for item in value:
- if not isinstance(item, dict):
- continue
- role = item.get("role")
- content = item.get("content")
- name = item.get("name")
- if isinstance(role, str) and isinstance(content, str):
- messages.append(
- ChatMessageContract(
- role=role,
- content=content,
- name=name if isinstance(name, str) else None,
- )
- )
- return messages
- def _read_float_value(payload: dict[str, JSONValue], key: str) -> float | None:
- value = payload.get(key)
- if isinstance(value, (int, float)) and not isinstance(value, bool):
- return float(value)
- return None
- def _read_int_value(payload: dict[str, JSONValue], key: str) -> int | None:
- value = payload.get(key)
- if isinstance(value, int) and not isinstance(value, bool):
- return value
- return None
- def _resolve_existing_human_task_id(context: NodeExecutionContextContract) -> str | None:
- configured_task_id = _read_string_value(context.node_config_json, "human_task_id")
- if configured_task_id is not None:
- return configured_task_id
- current_node_output = context.node_output_json_by_node_id.get(context.node_id)
- if current_node_output is None:
- return None
- return _read_string_value(current_node_output, "human_task_id")
- def _resolve_human_task_type(
- node_type: str,
- payload: dict[str, JSONValue],
- ) -> HumanTaskType:
- configured_task_type = _read_string_value(payload, "task_type")
- if configured_task_type in {"approval", "input", "takeover", "pause", "resume"}:
- return cast(HumanTaskType, configured_task_type)
- if node_type == "approval":
- return "approval"
- if node_type == "human-input":
- return "input"
- if node_type == "human-takeover":
- return "takeover"
- return "input"
- def _resolve_due_time(payload: dict[str, JSONValue]) -> datetime | None:
- due_time = _read_datetime_value(payload, "due_time")
- if due_time is not None:
- return due_time
- due_after_seconds = _read_int_value(payload, "due_after_seconds")
- if due_after_seconds is None or due_after_seconds <= 0:
- return None
- return datetime.utcnow() + timedelta(seconds=due_after_seconds)
- def _read_datetime_value(payload: dict[str, JSONValue], key: str) -> datetime | None:
- value = payload.get(key)
- if isinstance(value, str) and value.strip():
- normalized_value = value.strip().replace("Z", "+00:00")
- try:
- return datetime.fromisoformat(normalized_value)
- except ValueError:
- return None
- return None
- def _build_executor_template_context(context: NodeExecutionContextContract) -> dict[str, JSONValue]:
- return build_template_context(
- node_id=context.node_id,
- node_type=context.node_type,
- run_state_json=context.run_state_json,
- node_output_json_by_node_id=context.node_output_json_by_node_id,
- node_output_text_by_node_id=context.node_output_text_by_node_id,
- )
- def _evaluate_path_condition(
- payload: dict[str, JSONValue],
- path: str,
- render_context: dict[str, JSONValue],
- ) -> bool:
- value = resolve_expression(render_context, path)
- if "equals" in payload:
- return value == render_json_value(payload["equals"], render_context)
- if "not_equals" in payload:
- return value != render_json_value(payload["not_equals"], render_context)
- if "gt" in payload:
- return _compare_numeric(value, render_json_value(payload["gt"], render_context), ">")
- if "gte" in payload:
- return _compare_numeric(value, render_json_value(payload["gte"], render_context), ">=")
- if "lt" in payload:
- return _compare_numeric(value, render_json_value(payload["lt"], render_context), "<")
- if "lte" in payload:
- return _compare_numeric(value, render_json_value(payload["lte"], render_context), "<=")
- if "exists" in payload:
- expected = payload["exists"]
- if isinstance(expected, bool):
- return (value is not None) is expected
- return coerce_bool(value)
- def _compare_numeric(left: JSONValue, right: JSONValue, operator: str) -> bool:
- if not isinstance(left, (int, float)) or not isinstance(right, (int, float)):
- return False
- if operator == ">":
- return left > right
- if operator == ">=":
- return left >= right
- if operator == "<":
- return left < right
- if operator == "<=":
- return left <= right
- return False
- @dataclass(frozen=True)
- class RetrieverDocument:
- document_id: str
- title: str | None
- text: str
- metadata: dict[str, JSONValue]
- @dataclass(frozen=True)
- class RankedRetrieverDocument:
- document_id: str
- title: str | None
- text: str
- metadata: dict[str, JSONValue]
- score: float
- def to_output_json(self) -> dict[str, JSONValue]:
- return {
- "document_id": self.document_id,
- "title": self.title,
- "text": self.text,
- "metadata": self.metadata,
- "score": self.score,
- }
- def _resolve_retriever_query(
- payload: dict[str, JSONValue],
- render_context: dict[str, JSONValue],
- ) -> str | None:
- query = _read_string_value(payload, "query")
- query_template = _read_string_value(payload, "query_template")
- if query is not None:
- rendered_query = render_template_string(query, render_context)
- elif query_template is not None:
- rendered_query = render_template_string(query_template, render_context)
- else:
- return None
- stripped_query = rendered_query.strip()
- if not stripped_query:
- return None
- return stripped_query
- def _read_retriever_documents(
- payload: dict[str, JSONValue],
- render_context: dict[str, JSONValue],
- ) -> list[RetrieverDocument]:
- value = payload.get("documents")
- if not isinstance(value, list):
- return []
- documents: list[RetrieverDocument] = []
- for index, item in enumerate(value):
- document = _parse_retriever_document(
- item,
- index=index,
- render_context=render_context,
- )
- if document is not None:
- documents.append(document)
- return documents
- def _fetch_retriever_documents_from_url(
- *,
- source_url: str,
- timeout_ms: int,
- render_context: dict[str, JSONValue],
- ) -> list[RetrieverDocument]:
- if not source_url.strip():
- return []
- with httpx.Client(timeout=timeout_ms / 1000) as client:
- response = client.get(source_url)
- response.raise_for_status()
- payload = response.json()
- if isinstance(payload, dict):
- documents_payload = payload.get("documents")
- else:
- documents_payload = payload
- if not isinstance(documents_payload, list):
- raise ValueError("retriever source must return a JSON list or object.documents list")
- documents: list[RetrieverDocument] = []
- for index, item in enumerate(documents_payload):
- if not _is_json_value(item):
- continue
- document = _parse_retriever_document(
- item,
- index=index,
- render_context=render_context,
- )
- if document is not None:
- documents.append(document)
- return documents
- def _parse_retriever_document(
- value: JSONValue,
- *,
- index: int,
- render_context: dict[str, JSONValue],
- ) -> RetrieverDocument | None:
- if isinstance(value, str):
- text = render_template_string(value, render_context).strip()
- if not text:
- return None
- return RetrieverDocument(
- document_id=f"doc-{index + 1}",
- title=None,
- text=text,
- metadata={},
- )
- if not isinstance(value, dict):
- return None
- rendered = _render_json_dict({str(key): item for key, item in value.items()}, render_context)
- text_value = rendered.get("text") or rendered.get("content")
- if not isinstance(text_value, str) or not text_value.strip():
- return None
- document_id_value = rendered.get("id") or rendered.get("document_id")
- title_value = rendered.get("title")
- metadata_value = rendered.get("metadata")
- return RetrieverDocument(
- document_id=str(document_id_value) if document_id_value is not None else f"doc-{index + 1}",
- title=title_value if isinstance(title_value, str) else None,
- text=text_value.strip(),
- metadata=metadata_value if isinstance(metadata_value, dict) else {},
- )
- def _knowledge_results_to_retriever_documents(
- results: list[dict[str, JSONValue]],
- ) -> list[RetrieverDocument]:
- documents: list[RetrieverDocument] = []
- for index, result in enumerate(results):
- chunk = result.get("chunk")
- document = result.get("document")
- score = result.get("score")
- score_json = result.get("score_json")
- if not isinstance(chunk, dict) or not isinstance(document, dict):
- continue
- content_text = chunk.get("content_text")
- if not isinstance(content_text, str) or not content_text.strip():
- continue
- document_id = document.get("id")
- title = document.get("title")
- metadata: dict[str, JSONValue] = {
- "source": "knowledge-service",
- "chunk_id": str(chunk.get("id")) if chunk.get("id") is not None else None,
- "chunk_index": (
- chunk.get("chunk_index")
- if isinstance(chunk.get("chunk_index"), int)
- else index
- ),
- "score": score if isinstance(score, (int, float)) else None,
- "score_json": score_json if isinstance(score_json, dict) else {},
- }
- documents.append(
- RetrieverDocument(
- document_id=str(document_id)
- if document_id is not None
- else f"knowledge-{index + 1}",
- title=title if isinstance(title, str) else None,
- text=content_text.strip(),
- metadata=metadata,
- )
- )
- return documents
- def rank_documents(
- *,
- query: str,
- documents: list[RetrieverDocument],
- top_k: int,
- ) -> list[RankedRetrieverDocument]:
- normalized_top_k = max(top_k, 1)
- query_tokens = tokenize_text(query)
- ranked_documents: list[RankedRetrieverDocument] = []
- for document in documents:
- document_tokens = tokenize_text(" ".join(filter(None, [document.title, document.text])))
- score = calculate_keyword_score(query_tokens=query_tokens, document_tokens=document_tokens)
- ranked_documents.append(
- RankedRetrieverDocument(
- document_id=document.document_id,
- title=document.title,
- text=document.text,
- metadata=document.metadata,
- score=score,
- )
- )
- ranked_documents.sort(key=lambda item: item.score, reverse=True)
- return ranked_documents[:normalized_top_k]
- def calculate_keyword_score(
- *,
- query_tokens: set[str],
- document_tokens: set[str],
- ) -> float:
- if not query_tokens or not document_tokens:
- return 0.0
- overlap_count = len(query_tokens.intersection(document_tokens))
- if overlap_count == 0:
- return 0.0
- return round(overlap_count / len(query_tokens), 4)
- def tokenize_text(value: str) -> set[str]:
- tokens = {item.lower() for item in re.findall(r"[\w\u4e00-\u9fff]+", value)}
- return {item for item in tokens if item}
- def _is_json_value(value: object) -> bool:
- if value is None or isinstance(value, (str, int, float, bool)):
- return True
- if isinstance(value, list):
- return all(_is_json_value(item) for item in value)
- if isinstance(value, dict):
- return all(isinstance(key, str) and _is_json_value(item) for key, item in value.items())
- return False
|