import json import re from collections.abc import Callable from core_shared import JSONValue TEMPLATE_PATTERN = re.compile(r"\{\{\s*(?P[^{}]+?)\s*\}\}") COMPARISON_OPERATORS = ("==", "!=", ">=", "<=", ">", "<") def build_template_context( *, node_id: str, node_type: str, run_state_json: dict[str, JSONValue], node_output_json_by_node_id: dict[str, dict[str, JSONValue]], node_output_text_by_node_id: dict[str, str], ) -> dict[str, JSONValue]: current_node_outputs = node_output_json_by_node_id.get(node_id, {}) current_node_text = node_output_text_by_node_id.get(node_id) return { "state": run_state_json, "nodes": { item_node_id: { "output": output_json, "text": node_output_text_by_node_id.get(item_node_id), } for item_node_id, output_json in node_output_json_by_node_id.items() }, "current": { "node_id": node_id, "node_type": node_type, "output": current_node_outputs, "text": current_node_text, }, } def render_template_string(template: str, context: dict[str, JSONValue]) -> str: def replace(match: re.Match[str]) -> str: expression = match.group("expr").strip() value = resolve_expression(context, expression) if value is None: return "" if isinstance(value, (dict, list)): return json.dumps(value, ensure_ascii=True, separators=(",", ":")) return str(value) return TEMPLATE_PATTERN.sub(replace, template) def render_json_value(value: JSONValue, context: dict[str, JSONValue]) -> JSONValue: if isinstance(value, str): return render_template_string(value, context) if isinstance(value, list): return [render_json_value(item, context) for item in value] if isinstance(value, dict): return { str(item_key): render_json_value(item_value, context) for item_key, item_value in value.items() } return value def evaluate_condition_expression(expression: str, context: dict[str, JSONValue]) -> bool: stripped_expression = expression.strip() if not stripped_expression: return False for operator in COMPARISON_OPERATORS: if operator in stripped_expression: left_text, right_text = stripped_expression.split(operator, 1) left_value = resolve_expression(context, left_text.strip()) right_value = resolve_expression(context, right_text.strip()) return compare_values(left_value, right_value, operator) resolved = resolve_expression(context, stripped_expression) return coerce_bool(resolved) def resolve_expression(context: dict[str, JSONValue], expression: str) -> JSONValue: if expression == "": return None if (expression.startswith('"') and expression.endswith('"')) or ( expression.startswith("'") and expression.endswith("'") ): return expression[1:-1] lowered = expression.lower() if lowered == "true": return True if lowered == "false": return False if lowered == "null": return None integer_value = try_parse_int(expression) if integer_value is not None: return integer_value float_value = try_parse_float(expression) if float_value is not None: return float_value return resolve_reference(context, expression) def resolve_reference(context: dict[str, JSONValue], path: str) -> JSONValue: current: JSONValue = context for segment in path.split("."): if not segment: return None if isinstance(current, dict): current = current.get(segment) continue if isinstance(current, list) and segment.isdigit(): index = int(segment) if index < 0 or index >= len(current): return None current = current[index] continue return None return current def coerce_bool(value: JSONValue) -> bool: if isinstance(value, bool): return value if value is None: return False if isinstance(value, (int, float)): return value != 0 if isinstance(value, str): lowered = value.strip().lower() if lowered in {"", "false", "0", "null", "none"}: return False return True if isinstance(value, (list, dict)): return len(value) > 0 return False def compare_values(left: JSONValue, right: JSONValue, operator: str) -> bool: if operator == "==": return left == right if operator == "!=": return left != right if operator == ">": return compare_order(left, right, lambda x, y: x > y) if operator == "<": return compare_order(left, right, lambda x, y: x < y) if operator == ">=": return compare_order(left, right, lambda x, y: x >= y) if operator == "<=": return compare_order(left, right, lambda x, y: x <= y) return False def compare_order( left: JSONValue, right: JSONValue, operator: Callable[[int | float | str, int | float | str], bool], ) -> bool: if isinstance(left, (int, float)) and isinstance(right, (int, float)): return bool(operator(left, right)) if isinstance(left, str) and isinstance(right, str): return bool(operator(left, right)) return False def try_parse_int(value: str) -> int | None: if not value or any(item in value for item in {".", "e", "E"}): return None if value.startswith(("+", "-")): digits = value[1:] else: digits = value if not digits.isdigit(): return None return int(value) def try_parse_float(value: str) -> float | None: try: parsed = float(value) except ValueError: return None if parsed.is_integer() and "." not in value and "e" not in value.lower(): return None return parsed