| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150 |
- from abc import ABC, abstractmethod
- from dataclasses import dataclass
- import re
- import httpx
- from core_domain import (
- ChatCompletionRequestContract,
- ChatMessageContract,
- CodeExecutionRequestContract,
- KnowledgeSearchRequestContract,
- NodeExecutionContextContract,
- NodeExecutionRequestContract,
- NodeExecutionResultContract,
- ToolBindingDetailContract,
- )
- from core_shared import JSONValue
- from .code_runner_client import CodeRunnerClient, CodeRunnerClientError
- from .context import (
- build_template_context,
- coerce_bool,
- evaluate_condition_expression,
- render_json_value,
- render_template_string,
- resolve_expression,
- )
- from .knowledge_client import KnowledgeServiceClient, KnowledgeServiceClientError
- from .model_gateway_client import ModelGatewayClient, ModelGatewayClientError
- from .tool_client import ToolServiceClient, ToolServiceClientError
- class NodeExecutor(ABC):
- executor_name: str
- supported_node_types: frozenset[str]
- @abstractmethod
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- raise NotImplementedError
- class CompletedNodeExecutor(NodeExecutor):
- def __init__(self, *, executor_name: str, supported_node_types: frozenset[str]) -> None:
- self.executor_name = executor_name
- self.supported_node_types = supported_node_types
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_json={
- "executor_name": self.executor_name,
- "node_type": context.node_type,
- },
- )
- class DefaultNodeExecutor(CompletedNodeExecutor):
- def __init__(self) -> None:
- super().__init__(
- executor_name="default-executor",
- supported_node_types=frozenset(),
- )
- class LLMNodeExecutor(CompletedNodeExecutor):
- def __init__(self, model_gateway_client: ModelGatewayClient | None = None) -> None:
- super().__init__(
- executor_name="llm-executor",
- supported_node_types=frozenset({"llm"}),
- )
- self.model_gateway_client = model_gateway_client
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- render_context = _build_executor_template_context(context)
- rendered_config_json = _render_config_json(context.node_config_json, render_context)
- chat_request = _build_chat_completion_request(rendered_config_json)
- if chat_request is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="llm_config_missing",
- error_message="llm node config requires prompt or messages",
- )
- if self.model_gateway_client is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="llm_gateway_missing",
- error_message="model gateway client is not configured",
- )
- try:
- response = self.model_gateway_client.create_chat_completion(chat_request)
- except ModelGatewayClientError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="llm_request_failed",
- error_message=str(exc),
- )
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=response.content,
- output_json={
- "executor_name": self.executor_name,
- "model": response.model,
- "finish_reason": response.finish_reason,
- "usage_json": response.usage_json,
- "raw_response_json": response.raw_response_json,
- },
- )
- class ToolNodeExecutor(CompletedNodeExecutor):
- def __init__(self, tool_client: ToolServiceClient | None = None) -> None:
- super().__init__(
- executor_name="tool-executor",
- supported_node_types=frozenset({"tool"}),
- )
- self.tool_client = tool_client
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- tool_binding_id = _read_string_value(context.node_config_json, "tool_binding_id")
- tool_code = _read_string_value(context.node_config_json, "tool_code")
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- if tool_binding_id is None and tool_code is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="tool_config_missing",
- error_message="tool node config requires tool_binding_id or tool_code",
- )
- if tool_binding_id is not None and self.tool_client is not None:
- try:
- detail = self.tool_client.get_tool_binding_detail(
- tenant_id=context.tenant_id,
- binding_id=tool_binding_id,
- )
- except ToolServiceClientError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="tool_binding_lookup_failed",
- error_message=str(exc),
- )
- if not detail.binding.enabled:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="tool_binding_disabled",
- error_message=f"tool binding is disabled: {tool_binding_id}",
- )
- resolved_tool_code = detail.tool_definition.code
- resolved_tool_version_id = detail.tool_version.id
- resolved_tool_name = detail.tool_definition.name
- invoke_result = self._invoke_http_tool(
- context=context,
- detail=detail,
- worker_key=worker_key,
- )
- if invoke_result is not None:
- return invoke_result
- else:
- resolved_tool_code = tool_code
- resolved_tool_version_id = None
- resolved_tool_name = None
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=f"tool node completed: {resolved_tool_code or 'unknown-tool'}",
- output_json={
- "executor_name": self.executor_name,
- "tool_binding_id": tool_binding_id,
- "tool_code": resolved_tool_code,
- "tool_version_id": resolved_tool_version_id,
- "tool_name": resolved_tool_name,
- },
- )
- def _invoke_http_tool(
- self,
- *,
- context: NodeExecutionContextContract,
- detail: ToolBindingDetailContract,
- worker_key: str,
- ) -> NodeExecutionResultContract | None:
- if detail.tool_definition.tool_type != "http":
- return None
- invoke_config_json = detail.tool_version.invoke_config_json or {}
- binding_config_json = detail.binding.config_json or {}
- render_context = _build_executor_template_context(context)
- request_headers = _merge_json_dicts(
- _render_json_dict(_read_dict_value(invoke_config_json, "headers"), render_context),
- _render_json_dict(_read_dict_value(binding_config_json, "headers"), render_context),
- _render_json_dict(
- _read_dict_value(context.node_config_json, "headers"),
- render_context,
- ),
- )
- request_query = _merge_json_dicts(
- _render_json_dict(_read_dict_value(invoke_config_json, "query"), render_context),
- _render_json_dict(_read_dict_value(context.node_config_json, "query"), render_context),
- )
- request_body = _merge_json_dicts(
- _render_json_dict(_read_dict_value(invoke_config_json, "body"), render_context),
- _render_json_dict(_read_dict_value(context.node_config_json, "body"), render_context),
- )
- method = (_read_string_value(invoke_config_json, "method") or "GET").upper()
- base_url = (
- _read_string_value(context.node_config_json, "base_url")
- or _read_string_value(binding_config_json, "base_url")
- or _read_string_value(invoke_config_json, "base_url")
- )
- path = _read_string_value(context.node_config_json, "path") or _read_string_value(
- invoke_config_json, "path"
- )
- url = _read_string_value(context.node_config_json, "url") or _read_string_value(
- invoke_config_json, "url"
- )
- resolved_url = _resolve_http_url(url=url, base_url=base_url, path=path)
- if resolved_url is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="tool_http_url_missing",
- error_message="http tool requires url or base_url with path",
- )
- timeout_ms = detail.tool_version.timeout_ms or 10000
- try:
- with httpx.Client(timeout=timeout_ms / 1000) as client:
- response = client.request(
- method=method,
- url=resolved_url,
- params=_coerce_http_params(request_query),
- headers=_coerce_http_headers(request_headers),
- json=request_body if request_body else None,
- )
- response.raise_for_status()
- except httpx.HTTPError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="tool_http_request_failed",
- error_message=str(exc),
- output_json={
- "executor_name": self.executor_name,
- "tool_binding_id": detail.binding.id,
- "tool_code": detail.tool_definition.code,
- "request_url": resolved_url,
- "request_method": method,
- },
- )
- response_json = _try_parse_json_response(response)
- response_text = None if response_json is not None else response.text
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=response_text,
- output_json={
- "executor_name": self.executor_name,
- "tool_binding_id": detail.binding.id,
- "tool_code": detail.tool_definition.code,
- "tool_version_id": detail.tool_version.id,
- "tool_name": detail.tool_definition.name,
- "request_url": resolved_url,
- "request_method": method,
- "response_status_code": response.status_code,
- "response_headers": dict(response.headers),
- "response_json": response_json,
- },
- )
- class CodeNodeExecutor(CompletedNodeExecutor):
- def __init__(self, code_runner_client: CodeRunnerClient | None = None) -> None:
- super().__init__(
- executor_name="code-executor",
- supported_node_types=frozenset({"code"}),
- )
- self.code_runner_client = code_runner_client
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- code = _read_string_value(context.node_config_json, "code")
- if code is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="code_config_missing",
- error_message="code node config requires code",
- )
- if self.code_runner_client is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="code_runner_missing",
- error_message="code runner client is not configured",
- )
- render_context = _build_executor_template_context(context)
- input_json = _render_json_dict(
- _read_dict_value(context.node_config_json, "input_json"),
- render_context,
- )
- language = _read_string_value(context.node_config_json, "language") or "python"
- timeout_seconds = _read_int_value(context.node_config_json, "timeout_seconds") or 10
- code_request = CodeExecutionRequestContract(
- language=language,
- code=code,
- input_json=input_json,
- timeout_seconds=timeout_seconds,
- )
- try:
- response = self.code_runner_client.execute_code(code_request)
- except CodeRunnerClientError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="code_request_failed",
- error_message=str(exc),
- )
- if not response.success:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="code_execution_failed",
- error_message=response.error_message or response.stderr,
- output_text=response.stdout,
- output_json={
- "executor_name": self.executor_name,
- "stderr": response.stderr,
- "output_json": response.output_json,
- },
- )
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=response.stdout,
- output_json={
- "executor_name": self.executor_name,
- "stderr": response.stderr,
- "result_json": response.output_json,
- },
- )
- class AnswerNodeExecutor(CompletedNodeExecutor):
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- answer_text = _read_string_value(context.node_config_json, "text")
- template = _read_string_value(context.node_config_json, "template")
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- if answer_text is None and template is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="answer_config_missing",
- error_message="answer node config requires text or template",
- )
- render_context = _build_executor_template_context(context)
- rendered_text = render_template_string(answer_text or template or "", render_context)
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=rendered_text,
- output_json={
- "executor_name": self.executor_name,
- "render_mode": "text" if answer_text is not None else "template",
- },
- )
- def __init__(self) -> None:
- super().__init__(
- executor_name="answer-executor",
- supported_node_types=frozenset({"answer"}),
- )
- class ConditionNodeExecutor(CompletedNodeExecutor):
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- render_context = _build_executor_template_context(context)
- expression = _read_string_value(context.node_config_json, "expression")
- path = _read_string_value(context.node_config_json, "path")
- if expression is not None:
- condition_result = evaluate_condition_expression(expression, render_context)
- evaluated_expression = expression
- elif path is not None:
- condition_result = _evaluate_path_condition(
- context.node_config_json,
- path,
- render_context,
- )
- evaluated_expression = path
- else:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="condition_config_missing",
- error_message="condition node config requires expression or path",
- )
- route = "true" if condition_result else "false"
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_json={
- "executor_name": self.executor_name,
- "condition_result": condition_result,
- "route": route,
- "evaluated_expression": evaluated_expression,
- },
- )
- def __init__(self) -> None:
- super().__init__(
- executor_name="condition-executor",
- supported_node_types=frozenset({"if-else", "condition"}),
- )
- class AssignerNodeExecutor(CompletedNodeExecutor):
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- assignments = _read_dict_value(context.node_config_json, "assignments")
- if not assignments:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="assignments_missing",
- error_message="assigner node config requires assignments",
- )
- render_context = _build_executor_template_context(context)
- rendered_assignments = _render_json_dict(assignments, render_context)
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_json={
- "executor_name": self.executor_name,
- "assigned_values": rendered_assignments,
- "state_updates": rendered_assignments,
- },
- )
- def __init__(self) -> None:
- super().__init__(
- executor_name="assigner-executor",
- supported_node_types=frozenset({"assigner"}),
- )
- class RetrieverNodeExecutor(CompletedNodeExecutor):
- def __init__(self, knowledge_client: KnowledgeServiceClient | None = None) -> None:
- super().__init__(
- executor_name="retriever-executor",
- supported_node_types=frozenset({"knowledge-retrieval", "retriever"}),
- )
- self.knowledge_client = knowledge_client
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- render_context = _build_executor_template_context(context)
- query = _resolve_retriever_query(context.node_config_json, render_context)
- documents = _read_retriever_documents(context.node_config_json, render_context)
- source_url = _read_string_value(context.node_config_json, "source_url")
- knowledge_base_id = _read_string_value(context.node_config_json, "knowledge_base_id")
- top_k = _read_int_value(context.node_config_json, "top_k") or 3
- if query is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="retriever_query_missing",
- error_message="retriever node config requires query or query_template",
- )
- if source_url is not None:
- try:
- documents.extend(
- _fetch_retriever_documents_from_url(
- source_url=render_template_string(source_url, render_context),
- timeout_ms=_read_int_value(context.node_config_json, "timeout_ms") or 10000,
- render_context=render_context,
- )
- )
- except httpx.HTTPError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="retriever_source_request_failed",
- error_message=str(exc),
- )
- except ValueError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="retriever_source_invalid",
- error_message=str(exc),
- )
- knowledge_results: list[dict[str, JSONValue]] = []
- if knowledge_base_id is not None:
- if self.knowledge_client is None:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="knowledge_client_missing",
- error_message="knowledge-service client is not configured",
- )
- try:
- knowledge_results = [
- item.model_dump(mode="json")
- for item in self.knowledge_client.search(
- KnowledgeSearchRequestContract(
- tenant_id=context.tenant_id,
- knowledge_base_id=render_template_string(
- knowledge_base_id,
- render_context,
- ),
- query=query,
- top_k=top_k,
- filters_json=_render_json_dict(
- _read_dict_value(context.node_config_json, "filters_json"),
- render_context,
- ),
- )
- )
- ]
- except KnowledgeServiceClientError as exc:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="knowledge_search_failed",
- error_message=str(exc),
- )
- documents.extend(_knowledge_results_to_retriever_documents(knowledge_results))
- if not documents:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="retriever_documents_missing",
- error_message=(
- "retriever node config requires documents, source_url, "
- "or knowledge_base_id"
- ),
- )
- ranked_documents = rank_documents(query=query, documents=documents, top_k=top_k)
- output_documents = [item.to_output_json() for item in ranked_documents]
- output_text = "\n\n".join(item.text for item in ranked_documents)
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=output_text,
- output_json={
- "executor_name": self.executor_name,
- "query": query,
- "top_k": top_k,
- "retrieved_documents": output_documents,
- "knowledge_base_id": knowledge_base_id,
- "knowledge_results": knowledge_results,
- },
- )
- class TemplateNodeExecutor(CompletedNodeExecutor):
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> NodeExecutionResultContract:
- worker_key = request.worker_key or f"{self.executor_name}:{context.node_type}"
- render_context = _build_executor_template_context(context)
- template = _read_string_value(context.node_config_json, "template")
- template_json = _read_dict_value(context.node_config_json, "template_json")
- if template is None and not template_json:
- return NodeExecutionResultContract(
- status="failed",
- worker_key=worker_key,
- error_code="template_config_missing",
- error_message="template node config requires template or template_json",
- )
- rendered_text = None
- rendered_json = None
- if template is not None:
- rendered_text = render_template_string(template, render_context)
- if template_json:
- rendered_json = _render_json_dict(template_json, render_context)
- output_json: dict[str, JSONValue] = {"executor_name": self.executor_name}
- if rendered_json is not None:
- output_json["rendered_json"] = rendered_json
- return NodeExecutionResultContract(
- status="completed",
- worker_key=worker_key,
- output_text=rendered_text,
- output_json=output_json,
- )
- def __init__(self) -> None:
- super().__init__(
- executor_name="template-executor",
- supported_node_types=frozenset({"template-transform", "template"}),
- )
- class NodeExecutionDispatcher:
- def __init__(
- self,
- executors: list[NodeExecutor],
- default_executor: NodeExecutor,
- ) -> None:
- self.executors = executors
- self.default_executor = default_executor
- def resolve_executor(self, node_type: str) -> NodeExecutor:
- for executor in self.executors:
- if node_type in executor.supported_node_types:
- return executor
- return self.default_executor
- def execute(
- self,
- context: NodeExecutionContextContract,
- request: NodeExecutionRequestContract,
- ) -> tuple[NodeExecutionResultContract, str]:
- executor = self.resolve_executor(context.node_type)
- result = executor.execute(context, request)
- return result, executor.executor_name
- def build_node_execution_dispatcher() -> NodeExecutionDispatcher:
- executors: list[NodeExecutor] = [
- LLMNodeExecutor(),
- ToolNodeExecutor(),
- CodeNodeExecutor(),
- AnswerNodeExecutor(),
- ConditionNodeExecutor(),
- AssignerNodeExecutor(),
- RetrieverNodeExecutor(),
- TemplateNodeExecutor(),
- ]
- return NodeExecutionDispatcher(
- executors=executors,
- default_executor=DefaultNodeExecutor(),
- )
- def build_node_execution_dispatcher_with_clients(
- *,
- code_runner_client: CodeRunnerClient | None = None,
- model_gateway_client: ModelGatewayClient | None = None,
- tool_client: ToolServiceClient | None = None,
- knowledge_client: KnowledgeServiceClient | None = None,
- ) -> NodeExecutionDispatcher:
- executors: list[NodeExecutor] = [
- LLMNodeExecutor(model_gateway_client=model_gateway_client),
- ToolNodeExecutor(tool_client=tool_client),
- CodeNodeExecutor(code_runner_client=code_runner_client),
- AnswerNodeExecutor(),
- ConditionNodeExecutor(),
- AssignerNodeExecutor(),
- RetrieverNodeExecutor(knowledge_client=knowledge_client),
- TemplateNodeExecutor(),
- ]
- return NodeExecutionDispatcher(
- executors=executors,
- default_executor=DefaultNodeExecutor(),
- )
- def _read_string_value(payload: dict[str, JSONValue], key: str) -> str | None:
- value = payload.get(key)
- if isinstance(value, str):
- return value
- return None
- def _read_dict_value(payload: dict[str, JSONValue], key: str) -> dict[str, JSONValue]:
- value = payload.get(key)
- if isinstance(value, dict):
- return {str(item_key): item_value for item_key, item_value in value.items()}
- return {}
- def _merge_json_dicts(*items: dict[str, JSONValue]) -> dict[str, JSONValue]:
- merged: dict[str, JSONValue] = {}
- for item in items:
- merged.update(item)
- return merged
- def _render_json_dict(
- payload: dict[str, JSONValue],
- context: dict[str, JSONValue],
- ) -> dict[str, JSONValue]:
- rendered = render_json_value(payload, context)
- if isinstance(rendered, dict):
- return {str(key): value for key, value in rendered.items()}
- return {}
- def _render_config_json(
- payload: dict[str, JSONValue],
- context: dict[str, JSONValue],
- ) -> dict[str, JSONValue]:
- return _render_json_dict(payload, context)
- def _resolve_http_url(*, url: str | None, base_url: str | None, path: str | None) -> str | None:
- if url is not None:
- return url
- if base_url is None or path is None:
- return None
- return f"{base_url.rstrip('/')}/{path.lstrip('/')}"
- def _coerce_http_headers(payload: dict[str, JSONValue]) -> dict[str, str]:
- headers: dict[str, str] = {}
- for key, value in payload.items():
- if isinstance(value, (str, int, float, bool)):
- headers[key] = str(value)
- return headers
- def _coerce_http_params(payload: dict[str, JSONValue]) -> dict[str, str]:
- params: dict[str, str] = {}
- for key, value in payload.items():
- if isinstance(value, (str, int, float, bool)):
- params[key] = str(value)
- return params
- def _try_parse_json_response(response: httpx.Response) -> JSONValue | None:
- content_type = response.headers.get("content-type", "")
- if "json" not in content_type.lower():
- return None
- try:
- payload = response.json()
- except ValueError:
- return None
- if isinstance(payload, (dict, list, str, int, float, bool)) or payload is None:
- return payload
- return None
- def _build_chat_completion_request(
- payload: dict[str, JSONValue],
- ) -> ChatCompletionRequestContract | None:
- messages = _read_message_list(payload, "messages")
- if not messages:
- system_prompt = _read_string_value(payload, "system_prompt")
- prompt = _read_string_value(payload, "prompt")
- if system_prompt is not None:
- messages.append(ChatMessageContract(role="system", content=system_prompt))
- if prompt is not None:
- messages.append(ChatMessageContract(role="user", content=prompt))
- if not messages:
- return None
- temperature = _read_float_value(payload, "temperature")
- max_tokens = _read_int_value(payload, "max_tokens")
- model = _read_string_value(payload, "model")
- return ChatCompletionRequestContract(
- model=model,
- messages=messages,
- temperature=temperature,
- max_tokens=max_tokens,
- )
- def _read_message_list(
- payload: dict[str, JSONValue],
- key: str,
- ) -> list[ChatMessageContract]:
- value = payload.get(key)
- if not isinstance(value, list):
- return []
- messages: list[ChatMessageContract] = []
- for item in value:
- if not isinstance(item, dict):
- continue
- role = item.get("role")
- content = item.get("content")
- name = item.get("name")
- if isinstance(role, str) and isinstance(content, str):
- messages.append(
- ChatMessageContract(
- role=role,
- content=content,
- name=name if isinstance(name, str) else None,
- )
- )
- return messages
- def _read_float_value(payload: dict[str, JSONValue], key: str) -> float | None:
- value = payload.get(key)
- if isinstance(value, (int, float)) and not isinstance(value, bool):
- return float(value)
- return None
- def _read_int_value(payload: dict[str, JSONValue], key: str) -> int | None:
- value = payload.get(key)
- if isinstance(value, int) and not isinstance(value, bool):
- return value
- return None
- def _build_executor_template_context(context: NodeExecutionContextContract) -> dict[str, JSONValue]:
- return build_template_context(
- node_id=context.node_id,
- node_type=context.node_type,
- run_state_json=context.run_state_json,
- node_output_json_by_node_id=context.node_output_json_by_node_id,
- node_output_text_by_node_id=context.node_output_text_by_node_id,
- )
- def _evaluate_path_condition(
- payload: dict[str, JSONValue],
- path: str,
- render_context: dict[str, JSONValue],
- ) -> bool:
- value = resolve_expression(render_context, path)
- if "equals" in payload:
- return value == render_json_value(payload["equals"], render_context)
- if "not_equals" in payload:
- return value != render_json_value(payload["not_equals"], render_context)
- if "gt" in payload:
- return _compare_numeric(value, render_json_value(payload["gt"], render_context), ">")
- if "gte" in payload:
- return _compare_numeric(value, render_json_value(payload["gte"], render_context), ">=")
- if "lt" in payload:
- return _compare_numeric(value, render_json_value(payload["lt"], render_context), "<")
- if "lte" in payload:
- return _compare_numeric(value, render_json_value(payload["lte"], render_context), "<=")
- if "exists" in payload:
- expected = payload["exists"]
- if isinstance(expected, bool):
- return (value is not None) is expected
- return coerce_bool(value)
- def _compare_numeric(left: JSONValue, right: JSONValue, operator: str) -> bool:
- if not isinstance(left, (int, float)) or not isinstance(right, (int, float)):
- return False
- if operator == ">":
- return left > right
- if operator == ">=":
- return left >= right
- if operator == "<":
- return left < right
- if operator == "<=":
- return left <= right
- return False
- @dataclass(frozen=True)
- class RetrieverDocument:
- document_id: str
- title: str | None
- text: str
- metadata: dict[str, JSONValue]
- @dataclass(frozen=True)
- class RankedRetrieverDocument:
- document_id: str
- title: str | None
- text: str
- metadata: dict[str, JSONValue]
- score: float
- def to_output_json(self) -> dict[str, JSONValue]:
- return {
- "document_id": self.document_id,
- "title": self.title,
- "text": self.text,
- "metadata": self.metadata,
- "score": self.score,
- }
- def _resolve_retriever_query(
- payload: dict[str, JSONValue],
- render_context: dict[str, JSONValue],
- ) -> str | None:
- query = _read_string_value(payload, "query")
- query_template = _read_string_value(payload, "query_template")
- if query is not None:
- rendered_query = render_template_string(query, render_context)
- elif query_template is not None:
- rendered_query = render_template_string(query_template, render_context)
- else:
- return None
- stripped_query = rendered_query.strip()
- if not stripped_query:
- return None
- return stripped_query
- def _read_retriever_documents(
- payload: dict[str, JSONValue],
- render_context: dict[str, JSONValue],
- ) -> list[RetrieverDocument]:
- value = payload.get("documents")
- if not isinstance(value, list):
- return []
- documents: list[RetrieverDocument] = []
- for index, item in enumerate(value):
- document = _parse_retriever_document(
- item,
- index=index,
- render_context=render_context,
- )
- if document is not None:
- documents.append(document)
- return documents
- def _fetch_retriever_documents_from_url(
- *,
- source_url: str,
- timeout_ms: int,
- render_context: dict[str, JSONValue],
- ) -> list[RetrieverDocument]:
- if not source_url.strip():
- return []
- with httpx.Client(timeout=timeout_ms / 1000) as client:
- response = client.get(source_url)
- response.raise_for_status()
- payload = response.json()
- if isinstance(payload, dict):
- documents_payload = payload.get("documents")
- else:
- documents_payload = payload
- if not isinstance(documents_payload, list):
- raise ValueError("retriever source must return a JSON list or object.documents list")
- documents: list[RetrieverDocument] = []
- for index, item in enumerate(documents_payload):
- if not _is_json_value(item):
- continue
- document = _parse_retriever_document(
- item,
- index=index,
- render_context=render_context,
- )
- if document is not None:
- documents.append(document)
- return documents
- def _parse_retriever_document(
- value: JSONValue,
- *,
- index: int,
- render_context: dict[str, JSONValue],
- ) -> RetrieverDocument | None:
- if isinstance(value, str):
- text = render_template_string(value, render_context).strip()
- if not text:
- return None
- return RetrieverDocument(
- document_id=f"doc-{index + 1}",
- title=None,
- text=text,
- metadata={},
- )
- if not isinstance(value, dict):
- return None
- rendered = _render_json_dict({str(key): item for key, item in value.items()}, render_context)
- text_value = rendered.get("text") or rendered.get("content")
- if not isinstance(text_value, str) or not text_value.strip():
- return None
- document_id_value = rendered.get("id") or rendered.get("document_id")
- title_value = rendered.get("title")
- metadata_value = rendered.get("metadata")
- return RetrieverDocument(
- document_id=str(document_id_value) if document_id_value is not None else f"doc-{index + 1}",
- title=title_value if isinstance(title_value, str) else None,
- text=text_value.strip(),
- metadata=metadata_value if isinstance(metadata_value, dict) else {},
- )
- def _knowledge_results_to_retriever_documents(
- results: list[dict[str, JSONValue]],
- ) -> list[RetrieverDocument]:
- documents: list[RetrieverDocument] = []
- for index, result in enumerate(results):
- chunk = result.get("chunk")
- document = result.get("document")
- score = result.get("score")
- score_json = result.get("score_json")
- if not isinstance(chunk, dict) or not isinstance(document, dict):
- continue
- content_text = chunk.get("content_text")
- if not isinstance(content_text, str) or not content_text.strip():
- continue
- document_id = document.get("id")
- title = document.get("title")
- metadata: dict[str, JSONValue] = {
- "source": "knowledge-service",
- "chunk_id": str(chunk.get("id")) if chunk.get("id") is not None else None,
- "chunk_index": (
- chunk.get("chunk_index")
- if isinstance(chunk.get("chunk_index"), int)
- else index
- ),
- "score": score if isinstance(score, (int, float)) else None,
- "score_json": score_json if isinstance(score_json, dict) else {},
- }
- documents.append(
- RetrieverDocument(
- document_id=str(document_id)
- if document_id is not None
- else f"knowledge-{index + 1}",
- title=title if isinstance(title, str) else None,
- text=content_text.strip(),
- metadata=metadata,
- )
- )
- return documents
- def rank_documents(
- *,
- query: str,
- documents: list[RetrieverDocument],
- top_k: int,
- ) -> list[RankedRetrieverDocument]:
- normalized_top_k = max(top_k, 1)
- query_tokens = tokenize_text(query)
- ranked_documents: list[RankedRetrieverDocument] = []
- for document in documents:
- document_tokens = tokenize_text(" ".join(filter(None, [document.title, document.text])))
- score = calculate_keyword_score(query_tokens=query_tokens, document_tokens=document_tokens)
- ranked_documents.append(
- RankedRetrieverDocument(
- document_id=document.document_id,
- title=document.title,
- text=document.text,
- metadata=document.metadata,
- score=score,
- )
- )
- ranked_documents.sort(key=lambda item: item.score, reverse=True)
- return ranked_documents[:normalized_top_k]
- def calculate_keyword_score(
- *,
- query_tokens: set[str],
- document_tokens: set[str],
- ) -> float:
- if not query_tokens or not document_tokens:
- return 0.0
- overlap_count = len(query_tokens.intersection(document_tokens))
- if overlap_count == 0:
- return 0.0
- return round(overlap_count / len(query_tokens), 4)
- def tokenize_text(value: str) -> set[str]:
- tokens = {item.lower() for item in re.findall(r"[\w\u4e00-\u9fff]+", value)}
- return {item for item in tokens if item}
- def _is_json_value(value: object) -> bool:
- if value is None or isinstance(value, (str, int, float, bool)):
- return True
- if isinstance(value, list):
- return all(_is_json_value(item) for item in value)
- if isinstance(value, dict):
- return all(isinstance(key, str) and _is_json_value(item) for key, item in value.items())
- return False
|