from core_domain import InitialNodeContract, WorkflowVersionContract from core_dsl import ( EdgeDefinition, get_initial_node_definition, get_node_definition, parse_workflow_definition, ) from core_shared import JSONValue from .context import build_template_context, evaluate_condition_expression def derive_initial_node(workflow_version: WorkflowVersionContract) -> InitialNodeContract | None: workflow = parse_workflow_definition(workflow_version.dsl_json) if workflow is None: return None node = get_initial_node_definition(workflow) if node is None: return None return InitialNodeContract(node_id=node.id, node_type=node.type, status="queued") def derive_successor_nodes( workflow_version: WorkflowVersionContract, current_node_id: str, current_output_json: dict[str, JSONValue] | None = None, run_state_json: dict[str, JSONValue] | None = None, node_output_json_by_node_id: dict[str, dict[str, JSONValue]] | None = None, node_output_text_by_node_id: dict[str, str] | None = None, ) -> list[InitialNodeContract]: workflow = parse_workflow_definition(workflow_version.dsl_json) if workflow is None: return [] node_map = {node.id: node for node in workflow.nodes} template_context = build_template_context( node_id=current_node_id, node_type=node_map.get(current_node_id).type if current_node_id in node_map else "unknown", run_state_json=run_state_json or {}, node_output_json_by_node_id=node_output_json_by_node_id or {}, node_output_text_by_node_id=node_output_text_by_node_id or {}, ) edge_context: dict[str, JSONValue] = { **template_context, "output": current_output_json or {}, "route": _read_string_value(current_output_json or {}, "route"), "condition_result": _read_bool_value(current_output_json or {}, "condition_result"), } successors: list[InitialNodeContract] = [] for edge in _get_matching_edges( workflow.edges, current_node_id=current_node_id, edge_context=edge_context, ): successor = node_map.get(edge.target) if successor is None: continue successors.append( InitialNodeContract( node_id=successor.id, node_type=successor.type, status="queued", ) ) return successors def derive_node_config( workflow_version: WorkflowVersionContract, node_id: str, ) -> dict[str, JSONValue]: workflow = parse_workflow_definition(workflow_version.dsl_json) if workflow is None: return {} node = get_node_definition(workflow, node_id) if node is None: return {} return dict(node.config) def _get_matching_edges( edges: list[EdgeDefinition], *, current_node_id: str, edge_context: dict[str, JSONValue], ) -> list[EdgeDefinition]: matching_edges: list[EdgeDefinition] = [] for edge in edges: if edge.source != current_node_id: continue if _matches_edge_condition(edge.condition, edge_context): matching_edges.append(edge) return matching_edges def _matches_edge_condition( condition: str | None, context: dict[str, JSONValue], ) -> bool: if condition is None or not condition.strip(): return True stripped = condition.strip() route = context.get("route") if isinstance(route, str) and stripped == route: return True condition_result = context.get("condition_result") if isinstance(condition_result, bool) and stripped.lower() in {"true", "false"}: return condition_result is (stripped.lower() == "true") return evaluate_condition_expression(stripped, context) def _read_string_value(payload: dict[str, JSONValue], key: str) -> str | None: value = payload.get(key) if isinstance(value, str): return value return None def _read_bool_value(payload: dict[str, JSONValue], key: str) -> bool | None: value = payload.get(key) if isinstance(value, bool): return value return None