| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- from core_domain import InitialNodeContract, WorkflowVersionContract
- from core_shared import JSONValue
- def derive_initial_node(workflow_version: WorkflowVersionContract) -> InitialNodeContract | None:
- dsl = workflow_version.dsl_json
- if not isinstance(dsl, dict):
- return None
- nodes_value = dsl.get("nodes")
- if not isinstance(nodes_value, list):
- return None
- nodes: list[dict[str, JSONValue]] = [
- item for item in nodes_value if isinstance(item, dict)
- ]
- if not nodes:
- return None
- edges_value = dsl.get("edges")
- incoming_targets = _collect_incoming_targets(edges_value)
- for node in nodes:
- node_id = node.get("id")
- node_type = node.get("type")
- if isinstance(node_id, str) and isinstance(node_type, str) and node_id not in incoming_targets:
- return InitialNodeContract(node_id=node_id, node_type=node_type, status="queued")
- first = nodes[0]
- first_id = first.get("id")
- first_type = first.get("type")
- if isinstance(first_id, str) and isinstance(first_type, str):
- return InitialNodeContract(node_id=first_id, node_type=first_type, status="queued")
- return None
- def derive_successor_nodes(
- workflow_version: WorkflowVersionContract,
- current_node_id: str,
- ) -> list[InitialNodeContract]:
- dsl = workflow_version.dsl_json
- if not isinstance(dsl, dict):
- return []
- nodes_value = dsl.get("nodes")
- edges_value = dsl.get("edges")
- if not isinstance(nodes_value, list) or not isinstance(edges_value, list):
- return []
- node_type_map = _build_node_type_map(nodes_value)
- successor_ids = _collect_successor_ids(edges_value, current_node_id)
- successors: list[InitialNodeContract] = []
- for successor_id in successor_ids:
- node_type = node_type_map.get(successor_id)
- if node_type is None:
- continue
- successors.append(
- InitialNodeContract(
- node_id=successor_id,
- node_type=node_type,
- status="queued",
- )
- )
- return successors
- def _collect_incoming_targets(edges_value: JSONValue | None) -> set[str]:
- if not isinstance(edges_value, list):
- return set()
- incoming_targets: set[str] = set()
- for item in edges_value:
- if not isinstance(item, dict):
- continue
- target = item.get("target")
- if isinstance(target, str):
- incoming_targets.add(target)
- return incoming_targets
- def _build_node_type_map(nodes_value: list[JSONValue]) -> dict[str, str]:
- node_type_map: dict[str, str] = {}
- for item in nodes_value:
- if not isinstance(item, dict):
- continue
- node_id = item.get("id")
- node_type = item.get("type")
- if isinstance(node_id, str) and isinstance(node_type, str):
- node_type_map[node_id] = node_type
- return node_type_map
- def _collect_successor_ids(edges_value: list[JSONValue], current_node_id: str) -> list[str]:
- successor_ids: list[str] = []
- for item in edges_value:
- if not isinstance(item, dict):
- continue
- source = item.get("source")
- target = item.get("target")
- if isinstance(source, str) and isinstance(target, str) and source == current_node_id:
- successor_ids.append(target)
- return successor_ids
|