| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- import json
- import re
- from collections.abc import Callable
- from core_shared import JSONValue
- TEMPLATE_PATTERN = re.compile(r"\{\{\s*(?P<expr>[^{}]+?)\s*\}\}")
- COMPARISON_OPERATORS = ("==", "!=", ">=", "<=", ">", "<")
- def build_template_context(
- *,
- node_id: str,
- node_type: str,
- run_state_json: dict[str, JSONValue],
- node_output_json_by_node_id: dict[str, dict[str, JSONValue]],
- node_output_text_by_node_id: dict[str, str],
- ) -> dict[str, JSONValue]:
- current_node_outputs = node_output_json_by_node_id.get(node_id, {})
- current_node_text = node_output_text_by_node_id.get(node_id)
- return {
- "state": run_state_json,
- "nodes": {
- item_node_id: {
- "output": output_json,
- "text": node_output_text_by_node_id.get(item_node_id),
- }
- for item_node_id, output_json in node_output_json_by_node_id.items()
- },
- "current": {
- "node_id": node_id,
- "node_type": node_type,
- "output": current_node_outputs,
- "text": current_node_text,
- },
- }
- def render_template_string(template: str, context: dict[str, JSONValue]) -> str:
- def replace(match: re.Match[str]) -> str:
- expression = match.group("expr").strip()
- value = resolve_expression(context, expression)
- if value is None:
- return ""
- if isinstance(value, (dict, list)):
- return json.dumps(value, ensure_ascii=True, separators=(",", ":"))
- return str(value)
- return TEMPLATE_PATTERN.sub(replace, template)
- def render_json_value(value: JSONValue, context: dict[str, JSONValue]) -> JSONValue:
- if isinstance(value, str):
- return render_template_string(value, context)
- if isinstance(value, list):
- return [render_json_value(item, context) for item in value]
- if isinstance(value, dict):
- return {
- str(item_key): render_json_value(item_value, context)
- for item_key, item_value in value.items()
- }
- return value
- def evaluate_condition_expression(expression: str, context: dict[str, JSONValue]) -> bool:
- stripped_expression = expression.strip()
- if not stripped_expression:
- return False
- for operator in COMPARISON_OPERATORS:
- if operator in stripped_expression:
- left_text, right_text = stripped_expression.split(operator, 1)
- left_value = resolve_expression(context, left_text.strip())
- right_value = resolve_expression(context, right_text.strip())
- return compare_values(left_value, right_value, operator)
- resolved = resolve_expression(context, stripped_expression)
- return coerce_bool(resolved)
- def resolve_expression(context: dict[str, JSONValue], expression: str) -> JSONValue:
- if expression == "":
- return None
- if (expression.startswith('"') and expression.endswith('"')) or (
- expression.startswith("'") and expression.endswith("'")
- ):
- return expression[1:-1]
- lowered = expression.lower()
- if lowered == "true":
- return True
- if lowered == "false":
- return False
- if lowered == "null":
- return None
- integer_value = try_parse_int(expression)
- if integer_value is not None:
- return integer_value
- float_value = try_parse_float(expression)
- if float_value is not None:
- return float_value
- return resolve_reference(context, expression)
- def resolve_reference(context: dict[str, JSONValue], path: str) -> JSONValue:
- current: JSONValue = context
- for segment in path.split("."):
- if not segment:
- return None
- if isinstance(current, dict):
- current = current.get(segment)
- continue
- if isinstance(current, list) and segment.isdigit():
- index = int(segment)
- if index < 0 or index >= len(current):
- return None
- current = current[index]
- continue
- return None
- return current
- def coerce_bool(value: JSONValue) -> bool:
- if isinstance(value, bool):
- return value
- if value is None:
- return False
- if isinstance(value, (int, float)):
- return value != 0
- if isinstance(value, str):
- lowered = value.strip().lower()
- if lowered in {"", "false", "0", "null", "none"}:
- return False
- return True
- if isinstance(value, (list, dict)):
- return len(value) > 0
- return False
- def compare_values(left: JSONValue, right: JSONValue, operator: str) -> bool:
- if operator == "==":
- return left == right
- if operator == "!=":
- return left != right
- if operator == ">":
- return compare_order(left, right, lambda x, y: x > y)
- if operator == "<":
- return compare_order(left, right, lambda x, y: x < y)
- if operator == ">=":
- return compare_order(left, right, lambda x, y: x >= y)
- if operator == "<=":
- return compare_order(left, right, lambda x, y: x <= y)
- return False
- def compare_order(
- left: JSONValue,
- right: JSONValue,
- operator: Callable[[int | float | str, int | float | str], bool],
- ) -> bool:
- if isinstance(left, (int, float)) and isinstance(right, (int, float)):
- return bool(operator(left, right))
- if isinstance(left, str) and isinstance(right, str):
- return bool(operator(left, right))
- return False
- def try_parse_int(value: str) -> int | None:
- if not value or any(item in value for item in {".", "e", "E"}):
- return None
- if value.startswith(("+", "-")):
- digits = value[1:]
- else:
- digits = value
- if not digits.isdigit():
- return None
- return int(value)
- def try_parse_float(value: str) -> float | None:
- try:
- parsed = float(value)
- except ValueError:
- return None
- if parsed.is_integer() and "." not in value and "e" not in value.lower():
- return None
- return parsed
|