| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056 |
- from abc import ABC, abstractmethod
- from dataclasses import dataclass
- import re
- import httpx
- from core_domain import (
- ChatCompletionRequestContract,
- ChatMessageContract,
- CodeExecutionRequestContract,
- NodeExecutionContextContract,
- NodeExecutionRequestContract,
- NodeExecutionResultContract,
- ToolBindingDetailContract,
- )
- from core_shared import JSONValue
- from .code_runner_client import CodeRunnerClient, CodeRunnerClientError
- from .context import (
- build_template_context,
- coerce_bool,
- evaluate_condition_expression,
- render_json_value,
- render_template_string,
- resolve_expression,
- )
- from .model_gateway_client import ModelGatewayClient, ModelGatewayClientError
- from .tool_client import ToolServiceClient, ToolServiceClientError
- class NodeExecutor(ABC):
- executor_name: str
- supported_node_types: frozenset[str]
- @abstractmethod
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- raise NotImplementedError
- class CompletedNodeExecutor(NodeExecutor):
- def __init__(self, *, executor_name: str, supported_node_types: frozenset[str]) -> None:
- self.executor_name = executor_name
- self.supported_node_types = supported_node_types
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_json={
- "executor_name": self.executor_name,
- "node_type": context.node_type,
- },
- )
- class DefaultNodeExecutor(CompletedNodeExecutor):
- def __init__(self) -> None:
- super().__init__(
- executor_name="default-executor",
- supported_node_types=frozenset(),
- )
- class LLMNodeExecutor(CompletedNodeExecutor):
- def __init__(self, model_gateway_client: ModelGatewayClient | None = None) -> None:
- super().__init__(
- executor_name="llm-executor",
- supported_node_types=frozenset({"llm"}),
- )
- self.model_gateway_client = model_gateway_client
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- render_context = _build_executor_template_context(context)
- rendered_config_json = _render_config_json(context.node_config_json, render_context)
- chat_request = _build_chat_completion_request(rendered_config_json)
- if chat_request is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="llm_config_missing",
- error_message="llm node config requires prompt or messages",
- )
- if self.model_gateway_client is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="llm_gateway_missing",
- error_message="model gateway client is not configured",
- )
- try:
- response = self.model_gateway_client.create_chat_completion(chat_request)
- except ModelGatewayClientError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="llm_request_failed",
- error_message=str(exc),
- )
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=response.content,
- output_json={
- "executor_name": self.executor_name,
- "model": response.model,
- "finish_reason": response.finish_reason,
- "usage_json": response.usage_json,
- "raw_response_json": response.raw_response_json,
- },
- )
- class ToolNodeExecutor(CompletedNodeExecutor):
- def __init__(self, tool_client: ToolServiceClient | None = None) -> None:
- super().__init__(
- executor_name="tool-executor",
- supported_node_types=frozenset({"tool"}),
- )
- self.tool_client = tool_client
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- tool_binding_id = _read_string_value(context.node_config_json, "tool_binding_id")
- tool_code = _read_string_value(context.node_config_json, "tool_code")
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- if tool_binding_id is None and tool_code is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="tool_config_missing",
- error_message="tool node config requires tool_binding_id or tool_code",
- )
- if tool_binding_id is not None and self.tool_client is not None:
- try:
- detail = self.tool_client.get_tool_binding_detail(
- tenant_id=context.tenant_id,
- binding_id=tool_binding_id,
- )
- except ToolServiceClientError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="tool_binding_lookup_failed",
- error_message=str(exc),
- )
- if not detail.binding.enabled:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="tool_binding_disabled",
- error_message=f"tool binding is disabled: {tool_binding_id}",
- )
- resolved_tool_code = detail.tool_definition.code
- resolved_tool_version_id = detail.tool_version.id
- resolved_tool_name = detail.tool_definition.name
- invoke_result = self._invoke_http_tool(
- context=context,
- detail=detail,
- worker_key=worker_key,
- )
- if invoke_result is not None:
- return invoke_result
- else:
- resolved_tool_code = tool_code
- resolved_tool_version_id = None
- resolved_tool_name = None
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=f"tool node completed: {resolved_tool_code or 'unknown-tool'}",
- output_json={
- "executor_name": self.executor_name,
- "tool_binding_id": tool_binding_id,
- "tool_code": resolved_tool_code,
- "tool_version_id": resolved_tool_version_id,
- "tool_name": resolved_tool_name,
- },
- )
- def _invoke_http_tool(
- self,
- *,
- context: NodeExecutionContextContract,
- detail: ToolBindingDetailContract,
- worker_key: str,
- ) -> NodeExecutionResultContract | None:
- if detail.tool_definition.tool_type != "http":
- return None
- invoke_config_json = detail.tool_version.invoke_config_json or {}
- binding_config_json = detail.binding.config_json or {}
- render_context = _build_executor_template_context(context)
- request_headers = _merge_json_dicts(
- _render_json_dict(_read_dict_value(invoke_config_json, "headers"), render_context),
- _render_json_dict(_read_dict_value(binding_config_json, "headers"), render_context),
- _render_json_dict(_read_dict_value(context.node_config_json, "headers"), render_context),
- )
- request_query = _merge_json_dicts(
- _render_json_dict(_read_dict_value(invoke_config_json, "query"), render_context),
- _render_json_dict(_read_dict_value(context.node_config_json, "query"), render_context),
- )
- request_body = _merge_json_dicts(
- _render_json_dict(_read_dict_value(invoke_config_json, "body"), render_context),
- _render_json_dict(_read_dict_value(context.node_config_json, "body"), render_context),
- )
- method = (_read_string_value(invoke_config_json, "method") or "GET").upper()
- base_url = (
- _read_string_value(context.node_config_json, "base_url")
- or _read_string_value(binding_config_json, "base_url")
- or _read_string_value(invoke_config_json, "base_url")
- )
- path = _read_string_value(context.node_config_json, "path") or _read_string_value(
- invoke_config_json, "path"
- )
- url = _read_string_value(context.node_config_json, "url") or _read_string_value(
- invoke_config_json, "url"
- )
- resolved_url = _resolve_http_url(url=url, base_url=base_url, path=path)
- if resolved_url is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="tool_http_url_missing",
- error_message="http tool requires url or base_url with path",
- )
- timeout_ms = detail.tool_version.timeout_ms or 10000
- try:
- with httpx.Client(timeout=timeout_ms / 1000) as client:
- response = client.request(
- method=method,
- url=resolved_url,
- params=_coerce_http_params(request_query),
- headers=_coerce_http_headers(request_headers),
- json=request_body if request_body else None,
- )
- response.raise_for_status()
- except httpx.HTTPError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="tool_http_request_failed",
- error_message=str(exc),
- output_json={
- "executor_name": self.executor_name,
- "tool_binding_id": detail.binding.id,
- "tool_code": detail.tool_definition.code,
- "request_url": resolved_url,
- "request_method": method,
- },
- )
- response_json = _try_parse_json_response(response)
- response_text = None if response_json is not None else response.text
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=response_text,
- output_json={
- "executor_name": self.executor_name,
- "tool_binding_id": detail.binding.id,
- "tool_code": detail.tool_definition.code,
- "tool_version_id": detail.tool_version.id,
- "tool_name": detail.tool_definition.name,
- "request_url": resolved_url,
- "request_method": method,
- "response_status_code": response.status_code,
- "response_headers": dict(response.headers),
- "response_json": response_json,
- },
- )
- class CodeNodeExecutor(CompletedNodeExecutor):
- def __init__(self, code_runner_client: CodeRunnerClient | None = None) -> None:
- super().__init__(
- executor_name="code-executor",
- supported_node_types=frozenset({"code"}),
- )
- self.code_runner_client = code_runner_client
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- code = _read_string_value(context.node_config_json, "code")
- if code is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="code_config_missing",
- error_message="code node config requires code",
- )
- if self.code_runner_client is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="code_runner_missing",
- error_message="code runner client is not configured",
- )
- render_context = _build_executor_template_context(context)
- input_json = _render_json_dict(
- _read_dict_value(context.node_config_json, "input_json"),
- render_context,
- )
- language = _read_string_value(context.node_config_json, "language") or "python"
- timeout_seconds = _read_int_value(context.node_config_json, "timeout_seconds") or 10
- code_request = CodeExecutionRequestContract(
- language=language,
- code=code,
- input_json=input_json,
- timeout_seconds=timeout_seconds,
- )
- try:
- response = self.code_runner_client.execute_code(code_request)
- except CodeRunnerClientError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="code_request_failed",
- error_message=str(exc),
- )
- if not response.success:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="code_execution_failed",
- error_message=response.error_message or response.stderr,
- output_text=response.stdout,
- output_json={
- "executor_name": self.executor_name,
- "stderr": response.stderr,
- "output_json": response.output_json,
- },
- )
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=response.stdout,
- output_json={
- "executor_name": self.executor_name,
- "stderr": response.stderr,
- "result_json": response.output_json,
- },
- )
- class AnswerNodeExecutor(CompletedNodeExecutor):
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- answer_text = _read_string_value(context.node_config_json, "text")
- template = _read_string_value(context.node_config_json, "template")
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- if answer_text is None and template is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="answer_config_missing",
- error_message="answer node config requires text or template",
- )
- render_context = _build_executor_template_context(context)
- rendered_text = render_template_string(answer_text or template or "", render_context)
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=rendered_text,
- output_json={
- "executor_name": self.executor_name,
- "render_mode": "text" if answer_text is not None else "template",
- },
- )
- def __init__(self) -> None:
- super().__init__(
- executor_name="answer-executor",
- supported_node_types=frozenset({"answer"}),
- )
- class ConditionNodeExecutor(CompletedNodeExecutor):
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- render_context = _build_executor_template_context(context)
- expression = _read_string_value(context.node_config_json, "expression")
- path = _read_string_value(context.node_config_json, "path")
- if expression is not None:
- condition_result = evaluate_condition_expression(expression, render_context)
- evaluated_expression = expression
- elif path is not None:
- condition_result = _evaluate_path_condition(context.node_config_json, path, render_context)
- evaluated_expression = path
- else:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="condition_config_missing",
- error_message="condition node config requires expression or path",
- )
- route = "true" if condition_result else "false"
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_json={
- "executor_name": self.executor_name,
- "condition_result": condition_result,
- "route": route,
- "evaluated_expression": evaluated_expression,
- },
- )
- def __init__(self) -> None:
- super().__init__(
- executor_name="condition-executor",
- supported_node_types=frozenset({"if-else", "condition"}),
- )
- class AssignerNodeExecutor(CompletedNodeExecutor):
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- assignments = _read_dict_value(context.node_config_json, "assignments")
- if not assignments:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="assignments_missing",
- error_message="assigner node config requires assignments",
- )
- render_context = _build_executor_template_context(context)
- rendered_assignments = _render_json_dict(assignments, render_context)
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_json={
- "executor_name": self.executor_name,
- "assigned_values": rendered_assignments,
- "state_updates": rendered_assignments,
- },
- )
- def __init__(self) -> None:
- super().__init__(
- executor_name="assigner-executor",
- supported_node_types=frozenset({"assigner"}),
- )
- class RetrieverNodeExecutor(CompletedNodeExecutor):
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- render_context = _build_executor_template_context(context)
- query = _resolve_retriever_query(context.node_config_json, render_context)
- documents = _read_retriever_documents(context.node_config_json, render_context)
- source_url = _read_string_value(context.node_config_json, "source_url")
- top_k = _read_int_value(context.node_config_json, "top_k") or 3
- if query is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="retriever_query_missing",
- error_message="retriever node config requires query or query_template",
- )
- if source_url is not None:
- try:
- documents.extend(
- _fetch_retriever_documents_from_url(
- source_url=render_template_string(source_url, render_context),
- timeout_ms=_read_int_value(context.node_config_json, "timeout_ms") or 10000,
- render_context=render_context,
- )
- )
- except httpx.HTTPError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="retriever_source_request_failed",
- error_message=str(exc),
- )
- except ValueError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="retriever_source_invalid",
- error_message=str(exc),
- )
- if not documents:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="retriever_documents_missing",
- error_message="retriever node config requires non-empty documents",
- )
- ranked_documents = rank_documents(query=query, documents=documents, top_k=top_k)
- output_documents = [item.to_output_json() for item in ranked_documents]
- output_text = "\n\n".join(item.text for item in ranked_documents)
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=output_text,
- output_json={
- "executor_name": self.executor_name,
- "query": query,
- "top_k": top_k,
- "retrieved_documents": output_documents,
- },
- )
- def __init__(self) -> None:
- super().__init__(
- executor_name="retriever-executor",
- supported_node_types=frozenset({"knowledge-retrieval", "retriever"}),
- )
- class TemplateNodeExecutor(CompletedNodeExecutor):
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- render_context = _build_executor_template_context(context)
- template = _read_string_value(context.node_config_json, "template")
- template_json = _read_dict_value(context.node_config_json, "template_json")
- if template is None and not template_json:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="template_config_missing",
- error_message="template node config requires template or template_json",
- )
- rendered_text = None
- rendered_json = None
- if template is not None:
- rendered_text = render_template_string(template, render_context)
- if template_json:
- rendered_json = _render_json_dict(template_json, render_context)
- output_json: dict[str, JSONValue] = {"executor_name": self.executor_name}
- if rendered_json is not None:
- output_json["rendered_json"] = rendered_json
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=rendered_text,
- output_json=output_json,
- )
- def __init__(self) -> None:
- super().__init__(
- executor_name="template-executor",
- supported_node_types=frozenset({"template-transform", "template"}),
- )
- class NodeExecutionDispatcher:
- def __init__(
- self,
- executors: list[NodeExecutor],
- default_executor: NodeExecutor,
- ) -> None:
- self.executors = executors
- self.default_executor = default_executor
- def resolve_executor(self, node_type: str) -> NodeExecutor:
- for executor in self.executors:
- if node_type in executor.supported_node_types:
- return executor
- return self.default_executor
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> tuple[NodeExecutionResultContract, str]:
- executor = self.resolve_executor(context.node_type)
- result = executor.execute(context, request)
- return result, executor.executor_name
- def build_node_execution_dispatcher() -> NodeExecutionDispatcher:
- executors: list[NodeExecutor] = [
- LLMNodeExecutor(),
- ToolNodeExecutor(),
- CodeNodeExecutor(),
- AnswerNodeExecutor(),
- ConditionNodeExecutor(),
- AssignerNodeExecutor(),
- RetrieverNodeExecutor(),
- TemplateNodeExecutor(),
- ]
- return NodeExecutionDispatcher(
- executors=executors,
- default_executor=DefaultNodeExecutor(),
- )
- def build_node_execution_dispatcher_with_clients(
- *,
- code_runner_client: CodeRunnerClient | None = None,
- model_gateway_client: ModelGatewayClient | None = None,
- tool_client: ToolServiceClient | None = None,
- ) -> NodeExecutionDispatcher:
- executors: list[NodeExecutor] = [
- LLMNodeExecutor(model_gateway_client=model_gateway_client),
- ToolNodeExecutor(tool_client=tool_client),
- CodeNodeExecutor(code_runner_client=code_runner_client),
- AnswerNodeExecutor(),
- ConditionNodeExecutor(),
- AssignerNodeExecutor(),
- RetrieverNodeExecutor(),
- TemplateNodeExecutor(),
- ]
- return NodeExecutionDispatcher(
- executors=executors,
- default_executor=DefaultNodeExecutor(),
- )
- def _read_string_value(payload: dict[str, JSONValue], key: str) -> str | None:
- value = payload.get(key)
- if isinstance(value, str):
- return value
- return None
- def _read_dict_value(payload: dict[str, JSONValue], key: str) -> dict[str, JSONValue]:
- value = payload.get(key)
- if isinstance(value, dict):
- return {str(item_key): item_value for item_key, item_value in value.items()}
- return {}
- def _merge_json_dicts(*items: dict[str, JSONValue]) -> dict[str, JSONValue]:
- merged: dict[str, JSONValue] = {}
- for item in items:
- merged.update(item)
- return merged
- def _render_json_dict(
- payload: dict[str, JSONValue],
- context: dict[str, JSONValue],
- ) -> dict[str, JSONValue]:
- rendered = render_json_value(payload, context)
- if isinstance(rendered, dict):
- return {str(key): value for key, value in rendered.items()}
- return {}
- def _render_config_json(
- payload: dict[str, JSONValue],
- context: dict[str, JSONValue],
- ) -> dict[str, JSONValue]:
- return _render_json_dict(payload, context)
- def _resolve_http_url(*, url: str | None, base_url: str | None, path: str | None) -> str | None:
- if url is not None:
- return url
- if base_url is None or path is None:
- return None
- return f"{base_url.rstrip('/')}/{path.lstrip('/')}"
- def _coerce_http_headers(payload: dict[str, JSONValue]) -> dict[str, str]:
- headers: dict[str, str] = {}
- for key, value in payload.items():
- if isinstance(value, (str, int, float, bool)):
- headers[key] = str(value)
- return headers
- def _coerce_http_params(payload: dict[str, JSONValue]) -> dict[str, str]:
- params: dict[str, str] = {}
- for key, value in payload.items():
- if isinstance(value, (str, int, float, bool)):
- params[key] = str(value)
- return params
- def _try_parse_json_response(response: httpx.Response) -> JSONValue | None:
- content_type = response.headers.get("content-type", "")
- if "json" not in content_type.lower():
- return None
- try:
- payload = response.json()
- except ValueError:
- return None
- if isinstance(payload, (dict, list, str, int, float, bool)) or payload is None:
- return payload
- return None
- def _build_chat_completion_request(
- payload: dict[str, JSONValue],
- ) -> ChatCompletionRequestContract | None:
- messages = _read_message_list(payload, "messages")
- if not messages:
- system_prompt = _read_string_value(payload, "system_prompt")
- prompt = _read_string_value(payload, "prompt")
- if system_prompt is not None:
- messages.append(ChatMessageContract(role="system", content=system_prompt))
- if prompt is not None:
- messages.append(ChatMessageContract(role="user", content=prompt))
- if not messages:
- return None
- temperature = _read_float_value(payload, "temperature")
- max_tokens = _read_int_value(payload, "max_tokens")
- model = _read_string_value(payload, "model")
- return ChatCompletionRequestContract(
- model=model,
- messages=messages,
- temperature=temperature,
- max_tokens=max_tokens,
- )
- def _read_message_list(
- payload: dict[str, JSONValue],
- key: str,
- ) -> list[ChatMessageContract]:
- value = payload.get(key)
- if not isinstance(value, list):
- return []
- messages: list[ChatMessageContract] = []
- for item in value:
- if not isinstance(item, dict):
- continue
- role = item.get("role")
- content = item.get("content")
- name = item.get("name")
- if isinstance(role, str) and isinstance(content, str):
- messages.append(
- ChatMessageContract(
- role=role,
- content=content,
- name=name if isinstance(name, str) else None,
- )
- )
- return messages
- def _read_float_value(payload: dict[str, JSONValue], key: str) -> float | None:
- value = payload.get(key)
- if isinstance(value, (int, float)) and not isinstance(value, bool):
- return float(value)
- return None
- def _read_int_value(payload: dict[str, JSONValue], key: str) -> int | None:
- value = payload.get(key)
- if isinstance(value, int) and not isinstance(value, bool):
- return value
- return None
- def _build_executor_template_context(context: NodeExecutionContextContract) -> dict[str, JSONValue]:
- return build_template_context(
- node_id=context.node_id,
- node_type=context.node_type,
- run_state_json=context.run_state_json,
- node_output_json_by_node_id=context.node_output_json_by_node_id,
- node_output_text_by_node_id=context.node_output_text_by_node_id,
- )
- def _evaluate_path_condition(
- payload: dict[str, JSONValue],
- path: str,
- render_context: dict[str, JSONValue],
- ) -> bool:
- value = resolve_expression(render_context, path)
- if "equals" in payload:
- return value == render_json_value(payload["equals"], render_context)
- if "not_equals" in payload:
- return value != render_json_value(payload["not_equals"], render_context)
- if "gt" in payload:
- return _compare_numeric(value, render_json_value(payload["gt"], render_context), ">")
- if "gte" in payload:
- return _compare_numeric(value, render_json_value(payload["gte"], render_context), ">=")
- if "lt" in payload:
- return _compare_numeric(value, render_json_value(payload["lt"], render_context), "<")
- if "lte" in payload:
- return _compare_numeric(value, render_json_value(payload["lte"], render_context), "<=")
- if "exists" in payload:
- expected = payload["exists"]
- if isinstance(expected, bool):
- return (value is not None) is expected
- return coerce_bool(value)
- def _compare_numeric(left: JSONValue, right: JSONValue, operator: str) -> bool:
- if not isinstance(left, (int, float)) or not isinstance(right, (int, float)):
- return False
- if operator == ">":
- return left > right
- if operator == ">=":
- return left >= right
- if operator == "<":
- return left < right
- if operator == "<=":
- return left <= right
- return False
- @dataclass(frozen=True)
- class RetrieverDocument:
- document_id: str
- title: str | None
- text: str
- metadata: dict[str, JSONValue]
- @dataclass(frozen=True)
- class RankedRetrieverDocument:
- document_id: str
- title: str | None
- text: str
- metadata: dict[str, JSONValue]
- score: float
- def to_output_json(self) -> dict[str, JSONValue]:
- return {
- "document_id": self.document_id,
- "title": self.title,
- "text": self.text,
- "metadata": self.metadata,
- "score": self.score,
- }
- def _resolve_retriever_query(
- payload: dict[str, JSONValue],
- render_context: dict[str, JSONValue],
- ) -> str | None:
- query = _read_string_value(payload, "query")
- query_template = _read_string_value(payload, "query_template")
- if query is not None:
- rendered_query = render_template_string(query, render_context)
- elif query_template is not None:
- rendered_query = render_template_string(query_template, render_context)
- else:
- return None
- stripped_query = rendered_query.strip()
- if not stripped_query:
- return None
- return stripped_query
- def _read_retriever_documents(
- payload: dict[str, JSONValue],
- render_context: dict[str, JSONValue],
- ) -> list[RetrieverDocument]:
- value = payload.get("documents")
- if not isinstance(value, list):
- return []
- documents: list[RetrieverDocument] = []
- for index, item in enumerate(value):
- document = _parse_retriever_document(
- item,
- index=index,
- render_context=render_context,
- )
- if document is not None:
- documents.append(document)
- return documents
- def _fetch_retriever_documents_from_url(
- *,
- source_url: str,
- timeout_ms: int,
- render_context: dict[str, JSONValue],
- ) -> list[RetrieverDocument]:
- if not source_url.strip():
- return []
- with httpx.Client(timeout=timeout_ms / 1000) as client:
- response = client.get(source_url)
- response.raise_for_status()
- payload = response.json()
- if isinstance(payload, dict):
- documents_payload = payload.get("documents")
- else:
- documents_payload = payload
- if not isinstance(documents_payload, list):
- raise ValueError("retriever source must return a JSON list or object.documents list")
- documents: list[RetrieverDocument] = []
- for index, item in enumerate(documents_payload):
- if not _is_json_value(item):
- continue
- document = _parse_retriever_document(
- item,
- index=index,
- render_context=render_context,
- )
- if document is not None:
- documents.append(document)
- return documents
- def _parse_retriever_document(
- value: JSONValue,
- *,
- index: int,
- render_context: dict[str, JSONValue],
- ) -> RetrieverDocument | None:
- if isinstance(value, str):
- text = render_template_string(value, render_context).strip()
- if not text:
- return None
- return RetrieverDocument(
- document_id=f"doc-{index + 1}",
- title=None,
- text=text,
- metadata={},
- )
- if not isinstance(value, dict):
- return None
- rendered = _render_json_dict({str(key): item for key, item in value.items()}, render_context)
- text_value = rendered.get("text") or rendered.get("content")
- if not isinstance(text_value, str) or not text_value.strip():
- return None
- document_id_value = rendered.get("id") or rendered.get("document_id")
- title_value = rendered.get("title")
- metadata_value = rendered.get("metadata")
- return RetrieverDocument(
- document_id=str(document_id_value) if document_id_value is not None else f"doc-{index + 1}",
- title=title_value if isinstance(title_value, str) else None,
- text=text_value.strip(),
- metadata=metadata_value if isinstance(metadata_value, dict) else {},
- )
- def rank_documents(
- *,
- query: str,
- documents: list[RetrieverDocument],
- top_k: int,
- ) -> list[RankedRetrieverDocument]:
- normalized_top_k = max(top_k, 1)
- query_tokens = tokenize_text(query)
- ranked_documents: list[RankedRetrieverDocument] = []
- for document in documents:
- document_tokens = tokenize_text(" ".join(filter(None, [document.title, document.text])))
- score = calculate_keyword_score(query_tokens=query_tokens, document_tokens=document_tokens)
- ranked_documents.append(
- RankedRetrieverDocument(
- document_id=document.document_id,
- title=document.title,
- text=document.text,
- metadata=document.metadata,
- score=score,
- )
- )
- ranked_documents.sort(key=lambda item: item.score, reverse=True)
- return ranked_documents[:normalized_top_k]
- def calculate_keyword_score(
- *,
- query_tokens: set[str],
- document_tokens: set[str],
- ) -> float:
- if not query_tokens or not document_tokens:
- return 0.0
- overlap_count = len(query_tokens.intersection(document_tokens))
- if overlap_count == 0:
- return 0.0
- return round(overlap_count / len(query_tokens), 4)
- def tokenize_text(value: str) -> set[str]:
- tokens = {item.lower() for item in re.findall(r"[\w\u4e00-\u9fff]+", value)}
- return {item for item in tokens if item}
- def _is_json_value(value: object) -> bool:
- if value is None or isinstance(value, (str, int, float, bool)):
- return True
- if isinstance(value, list):
- return all(_is_json_value(item) for item in value)
- if isinstance(value, dict):
- return all(isinstance(key, str) and _is_json_value(item) for key, item in value.items())
- return False
|