| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- from core_domain import InitialNodeContract, WorkflowVersionContract
- from core_dsl import (
- EdgeDefinition,
- get_initial_node_definition,
- get_node_definition,
- parse_workflow_definition,
- )
- from core_shared import JSONValue
- from .context import build_template_context, evaluate_condition_expression
- def derive_initial_node(workflow_version: WorkflowVersionContract) -> InitialNodeContract | None:
- workflow = parse_workflow_definition(workflow_version.dsl_json)
- if workflow is None:
- return None
- node = get_initial_node_definition(workflow)
- if node is None:
- return None
- return InitialNodeContract(node_id=node.id, node_type=node.type, status="queued")
- def derive_successor_nodes(
- workflow_version: WorkflowVersionContract,
- current_node_id: str,
- current_output_json: dict[str, JSONValue] | None = None,
- run_state_json: dict[str, JSONValue] | None = None,
- node_output_json_by_node_id: dict[str, dict[str, JSONValue]] | None = None,
- node_output_text_by_node_id: dict[str, str] | None = None) -> list[InitialNodeContract]:
- workflow = parse_workflow_definition(workflow_version.dsl_json)
- if workflow is None:
- return []
- node_map = {node.id: node for node in workflow.nodes}
- template_context = build_template_context(
- node_id=current_node_id,
- node_type=node_map.get(current_node_id).type if current_node_id in node_map else "unknown",
- run_state_json=run_state_json or {},
- node_output_json_by_node_id=node_output_json_by_node_id or {},
- node_output_text_by_node_id=node_output_text_by_node_id or {})
- edge_context: dict[str, JSONValue] = {
- **template_context,
- "output": current_output_json or {},
- "route": _read_string_value(current_output_json or {}, "route"),
- "condition_result": _read_bool_value(current_output_json or {}, "condition_result"),
- }
- successors: list[InitialNodeContract] = []
- for edge in _get_matching_edges(
- workflow.edges,
- current_node_id=current_node_id,
- edge_context=edge_context):
- successor = node_map.get(edge.target)
- if successor is None:
- continue
- successors.append(
- InitialNodeContract(
- node_id=successor.id,
- node_type=successor.type,
- status="queued")
- )
- return successors
- def derive_node_config(
- workflow_version: WorkflowVersionContract,
- node_id: str) -> dict[str, JSONValue]:
- workflow = parse_workflow_definition(workflow_version.dsl_json)
- if workflow is None:
- return {}
- node = get_node_definition(workflow, node_id)
- if node is None:
- return {}
- return dict(node.config)
- def _get_matching_edges(
- edges: list[EdgeDefinition],
- *,
- current_node_id: str,
- edge_context: dict[str, JSONValue]) -> list[EdgeDefinition]:
- matching_edges: list[EdgeDefinition] = []
- for edge in edges:
- if edge.source != current_node_id:
- continue
- if _matches_edge_condition(edge.condition, edge_context):
- matching_edges.append(edge)
- return matching_edges
- def _matches_edge_condition(
- condition: str | None,
- context: dict[str, JSONValue]) -> bool:
- if condition is None or not condition.strip():
- return True
- stripped = condition.strip()
- route = context.get("route")
- if isinstance(route, str) and stripped == route:
- return True
- condition_result = context.get("condition_result")
- if isinstance(condition_result, bool) and stripped.lower() in {"true", "false"}:
- return condition_result is (stripped.lower() == "true")
- return evaluate_condition_expression(stripped, context)
- def _read_string_value(payload: dict[str, JSONValue], key: str) -> str | None:
- value = payload.get(key)
- if isinstance(value, str):
- return value
- return None
- def _read_bool_value(payload: dict[str, JSONValue], key: str) -> bool | None:
- value = payload.get(key)
- if isinstance(value, bool):
- return value
- return None
|