import re from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime, timedelta from typing import cast import httpx from core_domain import ( ChatCompletionRequestContract, ChatMessageContract, CodeExecutionRequestContract, HumanTaskCreateContract, HumanTaskType, KnowledgeSearchRequestContract, NodeExecutionContextContract, NodeExecutionRequestContract, NodeExecutionResultContract, ToolBindingDetailContract, ) from core_shared import JSONValue from .code_runner_client import CodeRunnerClient, CodeRunnerClientError from .context import ( build_template_context, coerce_bool, evaluate_condition_expression, render_json_value, render_template_string, resolve_expression, ) from .human_client import HumanServiceClient, HumanServiceClientError from .knowledge_client import KnowledgeServiceClient, KnowledgeServiceClientError from .model_gateway_client import ModelGatewayClient, ModelGatewayClientError from .tool_client import ToolServiceClient, ToolServiceClientError class NodeExecutor(ABC): executor_name: str supported_node_types: frozenset[str] @abstractmethod def execute( self, context: NodeExecutionContextContract, request: NodeExecutionRequestContract) -> NodeExecutionResultContract: raise NotImplementedError class CompletedNodeExecutor(NodeExecutor): def __init__(self, *, executor_name: str, supported_node_types: frozenset[str]) -> None: self.executor_name = executor_name self.supported_node_types = supported_node_types def execute( self, context: NodeExecutionContextContract, request: NodeExecutionRequestContract) -> NodeExecutionResultContract: worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}" return NodeExecutionResultContract( status="completed", worker_key=worker_key, output_json={ "executor_name": self.executor_name, "node_type": context.node_type, }) class DefaultNodeExecutor(CompletedNodeExecutor): def __init__(self) -> None: super().__init__( executor_name="default-executor", supported_node_types=frozenset()) class LLMNodeExecutor(CompletedNodeExecutor): def __init__(self, model_gateway_client: ModelGatewayClient | None = None) -> None: super().__init__( executor_name="llm-executor", supported_node_types=frozenset({"llm"})) self.model_gateway_client = model_gateway_client def execute( self, context: NodeExecutionContextContract, request: NodeExecutionRequestContract) -> NodeExecutionResultContract: worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}" render_context = _build_executor_template_context(context) rendered_config_json = _render_config_json(context.node_config_json, render_context) chat_request = _build_chat_completion_request(rendered_config_json) if chat_request is None: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="llm_config_missing", error_message="llm node config requires prompt or messages") if self.model_gateway_client is None: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="llm_gateway_missing", error_message="model gateway client is not configured") try: response = self.model_gateway_client.create_chat_completion(chat_request) except ModelGatewayClientError as exc: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="llm_request_failed", error_message=str(exc)) return NodeExecutionResultContract( status="completed", worker_key=worker_key, output_text=response.content, output_json={ "executor_name": self.executor_name, "model": response.model, "finish_reason": response.finish_reason, "usage_json": response.usage_json, "raw_response_json": response.raw_response_json, }) class ToolNodeExecutor(CompletedNodeExecutor): def __init__(self, tool_client: ToolServiceClient | None = None) -> None: super().__init__( executor_name="tool-executor", supported_node_types=frozenset({"tool"})) self.tool_client = tool_client def execute( self, context: NodeExecutionContextContract, request: NodeExecutionRequestContract) -> NodeExecutionResultContract: tool_binding_id = _read_string_value(context.node_config_json, "tool_binding_id") tool_code = _read_string_value(context.node_config_json, "tool_code") worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}" if tool_binding_id is None and tool_code is None: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="tool_config_missing", error_message="tool node config requires tool_binding_id or tool_code") if tool_binding_id is not None and self.tool_client is not None: try: detail = self.tool_client.get_tool_binding_detail( binding_id=tool_binding_id) except ToolServiceClientError as exc: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="tool_binding_lookup_failed", error_message=str(exc)) if not detail.binding.enabled: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="tool_binding_disabled", error_message=f"tool binding is disabled: {tool_binding_id}") resolved_tool_code = detail.tool_definition.code resolved_tool_version_id = detail.tool_version.id resolved_tool_name = detail.tool_definition.name invoke_result = self._invoke_http_tool( context=context, detail=detail, worker_key=worker_key) if invoke_result is not None: return invoke_result else: resolved_tool_code = tool_code resolved_tool_version_id = None resolved_tool_name = None return NodeExecutionResultContract( status="completed", worker_key=worker_key, output_text=f"tool node completed: {resolved_tool_code or 'unknown-tool'}", output_json={ "executor_name": self.executor_name, "tool_binding_id": tool_binding_id, "tool_code": resolved_tool_code, "tool_version_id": resolved_tool_version_id, "tool_name": resolved_tool_name, }) def _invoke_http_tool( self, *, context: NodeExecutionContextContract, detail: ToolBindingDetailContract, worker_key: str) -> NodeExecutionResultContract | None: if detail.tool_definition.tool_type != "http": return None invoke_config_json = detail.tool_version.invoke_config_json or {} binding_config_json = detail.binding.config_json or {} render_context = _build_executor_template_context(context) request_headers = _merge_json_dicts( _render_json_dict(_read_dict_value(invoke_config_json, "headers"), render_context), _render_json_dict(_read_dict_value(binding_config_json, "headers"), render_context), _render_json_dict( _read_dict_value(context.node_config_json, "headers"), render_context)) request_query = _merge_json_dicts( _render_json_dict(_read_dict_value(invoke_config_json, "query"), render_context), _render_json_dict(_read_dict_value(context.node_config_json, "query"), render_context)) request_body = _merge_json_dicts( _render_json_dict(_read_dict_value(invoke_config_json, "body"), render_context), _render_json_dict(_read_dict_value(context.node_config_json, "body"), render_context)) method = (_read_string_value(invoke_config_json, "method") or "GET").upper() base_url = ( _read_string_value(context.node_config_json, "base_url") or _read_string_value(binding_config_json, "base_url") or _read_string_value(invoke_config_json, "base_url") ) path = _read_string_value(context.node_config_json, "path") or _read_string_value( invoke_config_json, "path" ) url = _read_string_value(context.node_config_json, "url") or _read_string_value( invoke_config_json, "url" ) resolved_url = _resolve_http_url(url=url, base_url=base_url, path=path) if resolved_url is None: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="tool_http_url_missing", error_message="http tool requires url or base_url with path") timeout_ms = detail.tool_version.timeout_ms or 10000 try: with httpx.Client(timeout=timeout_ms / 1000) as client: response = client.request( method=method, url=resolved_url, params=_coerce_http_params(request_query), headers=_coerce_http_headers(request_headers), json=request_body if request_body else None) response.raise_for_status() except httpx.HTTPError as exc: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="tool_http_request_failed", error_message=str(exc), output_json={ "executor_name": self.executor_name, "tool_binding_id": detail.binding.id, "tool_code": detail.tool_definition.code, "request_url": resolved_url, "request_method": method, }) response_json = _try_parse_json_response(response) response_text = None if response_json is not None else response.text return NodeExecutionResultContract( status="completed", worker_key=worker_key, output_text=response_text, output_json={ "executor_name": self.executor_name, "tool_binding_id": detail.binding.id, "tool_code": detail.tool_definition.code, "tool_version_id": detail.tool_version.id, "tool_name": detail.tool_definition.name, "request_url": resolved_url, "request_method": method, "response_status_code": response.status_code, "response_headers": dict(response.headers), "response_json": response_json, }) class CodeNodeExecutor(CompletedNodeExecutor): def __init__(self, code_runner_client: CodeRunnerClient | None = None) -> None: super().__init__( executor_name="code-executor", supported_node_types=frozenset({"code"})) self.code_runner_client = code_runner_client def execute( self, context: NodeExecutionContextContract, request: NodeExecutionRequestContract) -> NodeExecutionResultContract: worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}" code = _read_string_value(context.node_config_json, "code") if code is None: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="code_config_missing", error_message="code node config requires code") if self.code_runner_client is None: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="code_runner_missing", error_message="code runner client is not configured") render_context = _build_executor_template_context(context) input_json = _render_json_dict( _read_dict_value(context.node_config_json, "input_json"), render_context) language = _read_string_value(context.node_config_json, "language") or "python" timeout_seconds = _read_int_value(context.node_config_json, "timeout_seconds") or 10 code_request = CodeExecutionRequestContract( language=language, code=code, input_json=input_json, timeout_seconds=timeout_seconds) try: response = self.code_runner_client.execute_code(code_request) except CodeRunnerClientError as exc: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="code_request_failed", error_message=str(exc)) if not response.success: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="code_execution_failed", error_message=response.error_message or response.stderr, output_text=response.stdout, output_json={ "executor_name": self.executor_name, "stderr": response.stderr, "output_json": response.output_json, }) return NodeExecutionResultContract( status="completed", worker_key=worker_key, output_text=response.stdout, output_json={ "executor_name": self.executor_name, "stderr": response.stderr, "result_json": response.output_json, }) class HumanNodeExecutor(CompletedNodeExecutor): def __init__(self, human_client: HumanServiceClient | None = None) -> None: super().__init__( executor_name="human-executor", supported_node_types=frozenset( {"human", "approval", "human-input", "human-takeover"} )) self.human_client = human_client def execute( self, context: NodeExecutionContextContract, request: NodeExecutionRequestContract) -> NodeExecutionResultContract: worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}" if self.human_client is None: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="human_service_missing", error_message="human service client is not configured") human_task_id = _resolve_existing_human_task_id(context) if human_task_id is None: return self._create_waiting_task(context=context, worker_key=worker_key) try: task = self.human_client.get_task( human_task_id=human_task_id) except HumanServiceClientError as exc: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="human_task_lookup_failed", error_message=str(exc)) output_json: dict[str, JSONValue] = { "executor_name": self.executor_name, "human_task_id": task.id, "human_task_status": task.status, "response_payload_json": task.response_payload_json or {}, } if task.status in {"pending", "claimed"}: return NodeExecutionResultContract( status="pending", worker_key=worker_key, output_text=f"waiting for human task: {task.id}", output_json=output_json) if task.status in {"approved", "completed"}: return NodeExecutionResultContract( status="completed", worker_key=worker_key, output_json=output_json) return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code=f"human_task_{task.status}", error_message=f"human task ended with status: {task.status}", output_json=output_json) def _create_waiting_task( self, *, context: NodeExecutionContextContract, worker_key: str) -> NodeExecutionResultContract: render_context = _build_executor_template_context(context) task_type = _resolve_human_task_type(context.node_type, context.node_config_json) title = render_template_string( _read_string_value(context.node_config_json, "title") or f"Human task for {context.node_id}", render_context) description_template = _read_string_value(context.node_config_json, "description") description = ( render_template_string(description_template, render_context) if description_template is not None else None ) request_payload_json = _render_json_dict( _read_dict_value(context.node_config_json, "request_payload_json"), render_context) try: task = self.human_client.create_task( HumanTaskCreateContract( task_type=task_type, title=title, description=description, source_type="runtime-node", source_id=context.node_run_id, run_id=context.run_id, node_run_id=context.node_run_id, requested_by=_read_string_value( context.node_config_json, "requested_by"), assigned_to=_read_string_value( context.node_config_json, "assigned_to"), request_payload_json=request_payload_json, due_time=_resolve_due_time(context.node_config_json)) ) except HumanServiceClientError as exc: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="human_task_create_failed", error_message=str(exc)) return NodeExecutionResultContract( status="pending", worker_key=worker_key, output_text=f"waiting for human task: {task.id}", output_json={ "executor_name": self.executor_name, "human_task_id": task.id, "human_task_status": task.status, "task_type": task.task_type, }) class AnswerNodeExecutor(CompletedNodeExecutor): def execute( self, context: NodeExecutionContextContract, request: NodeExecutionRequestContract) -> NodeExecutionResultContract: answer_text = _read_string_value(context.node_config_json, "text") template = _read_string_value(context.node_config_json, "template") worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}" if answer_text is None and template is None: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="answer_config_missing", error_message="answer node config requires text or template") render_context = _build_executor_template_context(context) rendered_text = render_template_string(answer_text or template or "", render_context) return NodeExecutionResultContract( status="completed", worker_key=worker_key, output_text=rendered_text, output_json={ "executor_name": self.executor_name, "render_mode": "text" if answer_text is not None else "template", }) def __init__(self) -> None: super().__init__( executor_name="answer-executor", supported_node_types=frozenset({"answer"})) class ConditionNodeExecutor(CompletedNodeExecutor): def execute( self, context: NodeExecutionContextContract, request: NodeExecutionRequestContract) -> NodeExecutionResultContract: worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}" render_context = _build_executor_template_context(context) expression = _read_string_value(context.node_config_json, "expression") path = _read_string_value(context.node_config_json, "path") if expression is not None: condition_result = evaluate_condition_expression(expression, render_context) evaluated_expression = expression elif path is not None: condition_result = _evaluate_path_condition( context.node_config_json, path, render_context) evaluated_expression = path else: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="condition_config_missing", error_message="condition node config requires expression or path") route = "true" if condition_result else "false" return NodeExecutionResultContract( status="completed", worker_key=worker_key, output_json={ "executor_name": self.executor_name, "condition_result": condition_result, "route": route, "evaluated_expression": evaluated_expression, }) def __init__(self) -> None: super().__init__( executor_name="condition-executor", supported_node_types=frozenset({"if-else", "condition"})) class AssignerNodeExecutor(CompletedNodeExecutor): def execute( self, context: NodeExecutionContextContract, request: NodeExecutionRequestContract) -> NodeExecutionResultContract: worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}" assignments = _read_dict_value(context.node_config_json, "assignments") if not assignments: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="assignments_missing", error_message="assigner node config requires assignments") render_context = _build_executor_template_context(context) rendered_assignments = _render_json_dict(assignments, render_context) return NodeExecutionResultContract( status="completed", worker_key=worker_key, output_json={ "executor_name": self.executor_name, "assigned_values": rendered_assignments, "state_updates": rendered_assignments, }) def __init__(self) -> None: super().__init__( executor_name="assigner-executor", supported_node_types=frozenset({"assigner"})) class RetrieverNodeExecutor(CompletedNodeExecutor): def __init__(self, knowledge_client: KnowledgeServiceClient | None = None) -> None: super().__init__( executor_name="retriever-executor", supported_node_types=frozenset({"knowledge-retrieval", "retriever"})) self.knowledge_client = knowledge_client def execute( self, context: NodeExecutionContextContract, request: NodeExecutionRequestContract) -> NodeExecutionResultContract: worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}" render_context = _build_executor_template_context(context) query = _resolve_retriever_query(context.node_config_json, render_context) documents = _read_retriever_documents(context.node_config_json, render_context) source_url = _read_string_value(context.node_config_json, "source_url") knowledge_base_id = _read_string_value(context.node_config_json, "knowledge_base_id") top_k = _read_int_value(context.node_config_json, "top_k") or 3 if query is None: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="retriever_query_missing", error_message="retriever node config requires query or query_template") if source_url is not None: try: documents.extend( _fetch_retriever_documents_from_url( source_url=render_template_string(source_url, render_context), timeout_ms=_read_int_value(context.node_config_json, "timeout_ms") or 10000, render_context=render_context) ) except httpx.HTTPError as exc: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="retriever_source_request_failed", error_message=str(exc)) except ValueError as exc: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="retriever_source_invalid", error_message=str(exc)) knowledge_results: list[dict[str, JSONValue]] = [] if knowledge_base_id is not None: if self.knowledge_client is None: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="knowledge_client_missing", error_message="knowledge-service client is not configured") try: knowledge_results = [ item.model_dump(mode="json") for item in self.knowledge_client.search( KnowledgeSearchRequestContract( knowledge_base_id=render_template_string( knowledge_base_id, render_context), query=query, top_k=top_k, filters_json=_render_json_dict( _read_dict_value(context.node_config_json, "filters_json"), render_context)) ) ] except KnowledgeServiceClientError as exc: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="knowledge_search_failed", error_message=str(exc)) documents.extend(_knowledge_results_to_retriever_documents(knowledge_results)) if not documents: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="retriever_documents_missing", error_message=( "retriever node config requires documents, source_url, " "or knowledge_base_id" )) ranked_documents = rank_documents(query=query, documents=documents, top_k=top_k) output_documents = [item.to_output_json() for item in ranked_documents] output_text = "\n\n".join(item.text for item in ranked_documents) return NodeExecutionResultContract( status="completed", worker_key=worker_key, output_text=output_text, output_json={ "executor_name": self.executor_name, "query": query, "top_k": top_k, "retrieved_documents": output_documents, "knowledge_base_id": knowledge_base_id, "knowledge_results": knowledge_results, }) class TemplateNodeExecutor(CompletedNodeExecutor): def execute( self, context: NodeExecutionContextContract, request: NodeExecutionRequestContract) -> NodeExecutionResultContract: worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}" render_context = _build_executor_template_context(context) template = _read_string_value(context.node_config_json, "template") template_json = _read_dict_value(context.node_config_json, "template_json") if template is None and not template_json: return NodeExecutionResultContract( status="failed", worker_key=worker_key, error_code="template_config_missing", error_message="template node config requires template or template_json") rendered_text = None rendered_json = None if template is not None: rendered_text = render_template_string(template, render_context) if template_json: rendered_json = _render_json_dict(template_json, render_context) output_json: dict[str, JSONValue] = {"executor_name": self.executor_name} if rendered_json is not None: output_json["rendered_json"] = rendered_json return NodeExecutionResultContract( status="completed", worker_key=worker_key, output_text=rendered_text, output_json=output_json) def __init__(self) -> None: super().__init__( executor_name="template-executor", supported_node_types=frozenset({"template-transform", "template"})) class NodeExecutionDispatcher: def __init__( self, executors: list[NodeExecutor], default_executor: NodeExecutor) -> None: self.executors = executors self.default_executor = default_executor def resolve_executor(self, node_type: str) -> NodeExecutor: for executor in self.executors: if node_type in executor.supported_node_types: return executor return self.default_executor def execute( self, context: NodeExecutionContextContract, request: NodeExecutionRequestContract) -> tuple[NodeExecutionResultContract, str]: executor = self.resolve_executor(context.node_type) result = executor.execute(context, request) return result, executor.executor_name def build_node_execution_dispatcher() -> NodeExecutionDispatcher: executors: list[NodeExecutor] = [ LLMNodeExecutor(), ToolNodeExecutor(), CodeNodeExecutor(), HumanNodeExecutor(), AnswerNodeExecutor(), ConditionNodeExecutor(), AssignerNodeExecutor(), RetrieverNodeExecutor(), TemplateNodeExecutor(), ] return NodeExecutionDispatcher( executors=executors, default_executor=DefaultNodeExecutor()) def build_node_execution_dispatcher_with_clients( *, code_runner_client: CodeRunnerClient | None = None, model_gateway_client: ModelGatewayClient | None = None, tool_client: ToolServiceClient | None = None, knowledge_client: KnowledgeServiceClient | None = None, human_client: HumanServiceClient | None = None) -> NodeExecutionDispatcher: executors: list[NodeExecutor] = [ LLMNodeExecutor(model_gateway_client=model_gateway_client), ToolNodeExecutor(tool_client=tool_client), CodeNodeExecutor(code_runner_client=code_runner_client), HumanNodeExecutor(human_client=human_client), AnswerNodeExecutor(), ConditionNodeExecutor(), AssignerNodeExecutor(), RetrieverNodeExecutor(knowledge_client=knowledge_client), TemplateNodeExecutor(), ] return NodeExecutionDispatcher( executors=executors, default_executor=DefaultNodeExecutor()) def _read_string_value(payload: dict[str, JSONValue], key: str) -> str | None: value = payload.get(key) if isinstance(value, str): return value return None def _read_dict_value(payload: dict[str, JSONValue], key: str) -> dict[str, JSONValue]: value = payload.get(key) if isinstance(value, dict): return {str(item_key): item_value for item_key, item_value in value.items()} return {} def _merge_json_dicts(*items: dict[str, JSONValue]) -> dict[str, JSONValue]: merged: dict[str, JSONValue] = {} for item in items: merged.update(item) return merged def _render_json_dict( payload: dict[str, JSONValue], context: dict[str, JSONValue]) -> dict[str, JSONValue]: rendered = render_json_value(payload, context) if isinstance(rendered, dict): return {str(key): value for key, value in rendered.items()} return {} def _render_config_json( payload: dict[str, JSONValue], context: dict[str, JSONValue]) -> dict[str, JSONValue]: return _render_json_dict(payload, context) def _resolve_http_url(*, url: str | None, base_url: str | None, path: str | None) -> str | None: if url is not None: return url if base_url is None or path is None: return None return f"{base_url.rstrip('/')}/{path.lstrip('/')}" def _coerce_http_headers(payload: dict[str, JSONValue]) -> dict[str, str]: headers: dict[str, str] = {} for key, value in payload.items(): if isinstance(value, (str, int, float, bool)): headers[key] = str(value) return headers def _coerce_http_params(payload: dict[str, JSONValue]) -> dict[str, str]: params: dict[str, str] = {} for key, value in payload.items(): if isinstance(value, (str, int, float, bool)): params[key] = str(value) return params def _try_parse_json_response(response: httpx.Response) -> JSONValue | None: content_type = response.headers.get("content-type", "") if "json" not in content_type.lower(): return None try: payload = response.json() except ValueError: return None if isinstance(payload, (dict, list, str, int, float, bool)) or payload is None: return payload return None def _build_chat_completion_request( payload: dict[str, JSONValue]) -> ChatCompletionRequestContract | None: messages = _read_message_list(payload, "messages") if not messages: system_prompt = _read_string_value(payload, "system_prompt") prompt = _read_string_value(payload, "prompt") if system_prompt is not None: messages.append(ChatMessageContract(role="system", content=system_prompt)) if prompt is not None: messages.append(ChatMessageContract(role="user", content=prompt)) if not messages: return None temperature = _read_float_value(payload, "temperature") max_tokens = _read_int_value(payload, "max_tokens") model = _read_string_value(payload, "model") return ChatCompletionRequestContract( model=model, messages=messages, temperature=temperature, max_tokens=max_tokens) def _read_message_list( payload: dict[str, JSONValue], key: str) -> list[ChatMessageContract]: value = payload.get(key) if not isinstance(value, list): return [] messages: list[ChatMessageContract] = [] for item in value: if not isinstance(item, dict): continue role = item.get("role") content = item.get("content") name = item.get("name") if isinstance(role, str) and isinstance(content, str): messages.append( ChatMessageContract( role=role, content=content, name=name if isinstance(name, str) else None) ) return messages def _read_float_value(payload: dict[str, JSONValue], key: str) -> float | None: value = payload.get(key) if isinstance(value, (int, float)) and not isinstance(value, bool): return float(value) return None def _read_int_value(payload: dict[str, JSONValue], key: str) -> int | None: value = payload.get(key) if isinstance(value, int) and not isinstance(value, bool): return value return None def _resolve_existing_human_task_id(context: NodeExecutionContextContract) -> str | None: configured_task_id = _read_string_value(context.node_config_json, "human_task_id") if configured_task_id is not None: return configured_task_id current_node_output = context.node_output_json_by_node_id.get(context.node_id) if current_node_output is None: return None return _read_string_value(current_node_output, "human_task_id") def _resolve_human_task_type( node_type: str, payload: dict[str, JSONValue]) -> HumanTaskType: configured_task_type = _read_string_value(payload, "task_type") if configured_task_type in {"approval", "input", "takeover", "pause", "resume"}: return cast(HumanTaskType, configured_task_type) if node_type == "approval": return "approval" if node_type == "human-input": return "input" if node_type == "human-takeover": return "takeover" return "input" def _resolve_due_time(payload: dict[str, JSONValue]) -> datetime | None: due_time = _read_datetime_value(payload, "due_time") if due_time is not None: return due_time due_after_seconds = _read_int_value(payload, "due_after_seconds") if due_after_seconds is None or due_after_seconds <= 0: return None return datetime.utcnow() + timedelta(seconds=due_after_seconds) def _read_datetime_value(payload: dict[str, JSONValue], key: str) -> datetime | None: value = payload.get(key) if isinstance(value, str) and value.strip(): normalized_value = value.strip().replace("Z", "+00:00") try: return datetime.fromisoformat(normalized_value) except ValueError: return None return None def _build_executor_template_context(context: NodeExecutionContextContract) -> dict[str, JSONValue]: return build_template_context( node_id=context.node_id, node_type=context.node_type, run_state_json=context.run_state_json, node_output_json_by_node_id=context.node_output_json_by_node_id, node_output_text_by_node_id=context.node_output_text_by_node_id) def _evaluate_path_condition( payload: dict[str, JSONValue], path: str, render_context: dict[str, JSONValue]) -> bool: value = resolve_expression(render_context, path) if "equals" in payload: return value == render_json_value(payload["equals"], render_context) if "not_equals" in payload: return value != render_json_value(payload["not_equals"], render_context) if "gt" in payload: return _compare_numeric(value, render_json_value(payload["gt"], render_context), ">") if "gte" in payload: return _compare_numeric(value, render_json_value(payload["gte"], render_context), ">=") if "lt" in payload: return _compare_numeric(value, render_json_value(payload["lt"], render_context), "<") if "lte" in payload: return _compare_numeric(value, render_json_value(payload["lte"], render_context), "<=") if "exists" in payload: expected = payload["exists"] if isinstance(expected, bool): return (value is not None) is expected return coerce_bool(value) def _compare_numeric(left: JSONValue, right: JSONValue, operator: str) -> bool: if not isinstance(left, (int, float)) or not isinstance(right, (int, float)): return False if operator == ">": return left > right if operator == ">=": return left >= right if operator == "<": return left < right if operator == "<=": return left <= right return False @dataclass(frozen=True) class RetrieverDocument: document_id: str title: str | None text: str metadata: dict[str, JSONValue] @dataclass(frozen=True) class RankedRetrieverDocument: document_id: str title: str | None text: str metadata: dict[str, JSONValue] score: float def to_output_json(self) -> dict[str, JSONValue]: return { "document_id": self.document_id, "title": self.title, "text": self.text, "metadata": self.metadata, "score": self.score, } def _resolve_retriever_query( payload: dict[str, JSONValue], render_context: dict[str, JSONValue]) -> str | None: query = _read_string_value(payload, "query") query_template = _read_string_value(payload, "query_template") if query is not None: rendered_query = render_template_string(query, render_context) elif query_template is not None: rendered_query = render_template_string(query_template, render_context) else: return None stripped_query = rendered_query.strip() if not stripped_query: return None return stripped_query def _read_retriever_documents( payload: dict[str, JSONValue], render_context: dict[str, JSONValue]) -> list[RetrieverDocument]: value = payload.get("documents") if not isinstance(value, list): return [] documents: list[RetrieverDocument] = [] for index, item in enumerate(value): document = _parse_retriever_document( item, index=index, render_context=render_context) if document is not None: documents.append(document) return documents def _fetch_retriever_documents_from_url( *, source_url: str, timeout_ms: int, render_context: dict[str, JSONValue]) -> list[RetrieverDocument]: if not source_url.strip(): return [] with httpx.Client(timeout=timeout_ms / 1000) as client: response = client.get(source_url) response.raise_for_status() payload = response.json() if isinstance(payload, dict): documents_payload = payload.get("documents") else: documents_payload = payload if not isinstance(documents_payload, list): raise ValueError("retriever source must return a JSON list or object.documents list") documents: list[RetrieverDocument] = [] for index, item in enumerate(documents_payload): if not _is_json_value(item): continue document = _parse_retriever_document( item, index=index, render_context=render_context) if document is not None: documents.append(document) return documents def _parse_retriever_document( value: JSONValue, *, index: int, render_context: dict[str, JSONValue]) -> RetrieverDocument | None: if isinstance(value, str): text = render_template_string(value, render_context).strip() if not text: return None return RetrieverDocument( document_id=f"doc-{index + 1}", title=None, text=text, metadata={}) if not isinstance(value, dict): return None rendered = _render_json_dict({str(key): item for key, item in value.items()}, render_context) text_value = rendered.get("text") or rendered.get("content") if not isinstance(text_value, str) or not text_value.strip(): return None document_id_value = rendered.get("id") or rendered.get("document_id") title_value = rendered.get("title") metadata_value = rendered.get("metadata") return RetrieverDocument( document_id=str(document_id_value) if document_id_value is not None else f"doc-{index + 1}", title=title_value if isinstance(title_value, str) else None, text=text_value.strip(), metadata=metadata_value if isinstance(metadata_value, dict) else {}) def _knowledge_results_to_retriever_documents( results: list[dict[str, JSONValue]]) -> list[RetrieverDocument]: documents: list[RetrieverDocument] = [] for index, result in enumerate(results): chunk = result.get("chunk") document = result.get("document") score = result.get("score") score_json = result.get("score_json") if not isinstance(chunk, dict) or not isinstance(document, dict): continue content_text = chunk.get("content_text") if not isinstance(content_text, str) or not content_text.strip(): continue document_id = document.get("id") title = document.get("title") metadata: dict[str, JSONValue] = { "source": "knowledge-service", "chunk_id": str(chunk.get("id")) if chunk.get("id") is not None else None, "chunk_index": ( chunk.get("chunk_index") if isinstance(chunk.get("chunk_index"), int) else index ), "score": score if isinstance(score, (int, float)) else None, "score_json": score_json if isinstance(score_json, dict) else {}, } documents.append( RetrieverDocument( document_id=str(document_id) if document_id is not None else f"knowledge-{index + 1}", title=title if isinstance(title, str) else None, text=content_text.strip(), metadata=metadata) ) return documents def rank_documents( *, query: str, documents: list[RetrieverDocument], top_k: int) -> list[RankedRetrieverDocument]: normalized_top_k = max(top_k, 1) query_tokens = tokenize_text(query) ranked_documents: list[RankedRetrieverDocument] = [] for document in documents: document_tokens = tokenize_text(" ".join(filter(None, [document.title, document.text]))) score = calculate_keyword_score(query_tokens=query_tokens, document_tokens=document_tokens) ranked_documents.append( RankedRetrieverDocument( document_id=document.document_id, title=document.title, text=document.text, metadata=document.metadata, score=score) ) ranked_documents.sort(key=lambda item: item.score, reverse=True) return ranked_documents[:normalized_top_k] def calculate_keyword_score( *, query_tokens: set[str], document_tokens: set[str]) -> float: if not query_tokens or not document_tokens: return 0.0 overlap_count = len(query_tokens.intersection(document_tokens)) if overlap_count == 0: return 0.0 return round(overlap_count / len(query_tokens), 4) def tokenize_text(value: str) -> set[str]: tokens = {item.lower() for item in re.findall(r"[\w\u4e00-\u9fff]+", value)} return {item for item in tokens if item} def _is_json_value(value: object) -> bool: if value is None or isinstance(value, (str, int, float, bool)): return True if isinstance(value, list): return all(_is_json_value(item) for item in value) if isinstance(value, dict): return all(isinstance(key, str) and _is_json_value(item) for key, item in value.items()) return False