from core_domain import InitialNodeContract, WorkflowVersionContract from core_shared import JSONValue def derive_initial_node(workflow_version: WorkflowVersionContract) -> InitialNodeContract | None: dsl = workflow_version.dsl_json if not isinstance(dsl, dict): return None nodes_value = dsl.get("nodes") if not isinstance(nodes_value, list): return None nodes: list[dict[str, JSONValue]] = [ item for item in nodes_value if isinstance(item, dict) ] if not nodes: return None edges_value = dsl.get("edges") incoming_targets = _collect_incoming_targets(edges_value) for node in nodes: node_id = node.get("id") node_type = node.get("type") if isinstance(node_id, str) and isinstance(node_type, str) and node_id not in incoming_targets: return InitialNodeContract(node_id=node_id, node_type=node_type, status="queued") first = nodes[0] first_id = first.get("id") first_type = first.get("type") if isinstance(first_id, str) and isinstance(first_type, str): return InitialNodeContract(node_id=first_id, node_type=first_type, status="queued") return None def derive_successor_nodes( workflow_version: WorkflowVersionContract, current_node_id: str, ) -> list[InitialNodeContract]: dsl = workflow_version.dsl_json if not isinstance(dsl, dict): return [] nodes_value = dsl.get("nodes") edges_value = dsl.get("edges") if not isinstance(nodes_value, list) or not isinstance(edges_value, list): return [] node_type_map = _build_node_type_map(nodes_value) successor_ids = _collect_successor_ids(edges_value, current_node_id) successors: list[InitialNodeContract] = [] for successor_id in successor_ids: node_type = node_type_map.get(successor_id) if node_type is None: continue successors.append( InitialNodeContract( node_id=successor_id, node_type=node_type, status="queued", ) ) return successors def _collect_incoming_targets(edges_value: JSONValue | None) -> set[str]: if not isinstance(edges_value, list): return set() incoming_targets: set[str] = set() for item in edges_value: if not isinstance(item, dict): continue target = item.get("target") if isinstance(target, str): incoming_targets.add(target) return incoming_targets def _build_node_type_map(nodes_value: list[JSONValue]) -> dict[str, str]: node_type_map: dict[str, str] = {} for item in nodes_value: if not isinstance(item, dict): continue node_id = item.get("id") node_type = item.get("type") if isinstance(node_id, str) and isinstance(node_type, str): node_type_map[node_id] = node_type return node_type_map def _collect_successor_ids(edges_value: list[JSONValue], current_node_id: str) -> list[str]: successor_ids: list[str] = [] for item in edges_value: if not isinstance(item, dict): continue source = item.get("source") target = item.get("target") if isinstance(source, str) and isinstance(target, str) and source == current_node_id: successor_ids.append(target) return successor_ids