Presto调度task方式 1 2 3 4 5 6 7 8 9 10 11 12 13 public final class SystemPartitioningHandle implements ConnectorPartitioningHandle { private enum SystemPartitioning { SINGLE, FIXED, SOURCE, SCALED, COORDINATOR_ONLY, ARBITRARY } }
1 2 select * from (select * from 1 test join 2 test1 on 1 = 2 test1.123 id);
非源头fragment 依赖SystemPartitioningHandle策略,通过当前集群可用节点以及hash partition count配置(query.initial-hash-partitions,默认为100)来共同确定该fragment需要的节点个数, 之后再调用 NodeSelector 选出Worker。逻辑为下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public NodePartitionMap getNodePartitionMap(Session session , NodeScheduler nodeScheduler){ NodeSelector nodeSelector = nodeScheduler.createNodeSelector(null ); List<Node> nodes; if (partitioning == SystemPartitioning.COORDINATOR_ONLY) { nodes = ImmutableList.of (nodeSelector.selectCurrentNode()); } else if (partitioning == SystemPartitioning.SINGLE) { nodes = nodeSelector.selectRandomNodes(1 ); } else if (partitioning == SystemPartitioning.FIXED) { nodes = nodeSelector.selectRandomNodes(getHashPartitionCount(session )); } else { throw new IllegalArgumentException("Unsupported plan distribution " + partitioning); } checkCondition(!nodes.isEmpty(), NO_NODES_AVAILABLE, "No worker nodes available"); ImmutableMap.Builder<Integer , Node> partitionToNode = ImmutableMap.builder(); for (int i = 0 ; i < nodes.size(); i++) { Node node = nodes.get (i); partitionToNode.put(i, node); } return new NodePartitionMap(, split -> { throw new UnsupportedOperationException("System distribution does not support source splits"); }); }
1 2 3 4 5 6 public List<Node> selectRandomNodes(int limit , Set <Node> excludedNodes){ return selectNodes(limit , randomizedNodes(nodeMap.get ().get (), includeCoordinator, excludedNodes)); }
源头fragment 源头fragment依赖于其connector id,以hive connector为例,其通过discovery服务注册上来,具体实现是在DiscoveryNodeManger#refreshNodesInternal。在真正Schedule task时,为split分配Node时,采用DynamicSplitPlacementPolicy策略调用以下接口:
1 2 3 4 5 6 7 8 9 public static ResettableRandomizedIterator<Node> randomizedNodes(NodeMap nodeMap, boolean includeCoordinator, Set<Node> excludedNodes) { ImmutableList<Node> nodes = nodeMap.getNodesByHostAndPort().values().stream() .filter(node -> includeCoordinator || !nodeMap.getCoordinatorNodeIds().contains(node.getNodeIdentifier())) .filter(node -> !excludedNodes .contains(node )) .collect (toImmutableList()); return new ResettableRandomizedIterator<> (nodes); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public NodeSelector createNodeSelector(ConnectorId connectorId) { // this supplier is thread-safe. TODO: this logic should probably move to the scheduler since the choice of which node to run in should be // done as close to when the the split is about to be scheduled Supplier<NodeMap> nodeMap = Suppliers.memoizeWithExpiration(() -> { ImmutableSetMultimap.Builder<HostAddress, Node> byHostAndPort = ImmutableSetMultimap.builder(); ImmutableSetMultimap.Builder<InetAddress, Node> byHost = ImmutableSetMultimap.builder(); ImmutableSetMultimap.Builder<NetworkLocation, Node> workersByNetworkPath = ImmutableSetMultimap.builder(); Set <Node> nodes; if (connectorId != null ) { nodes = nodeManager.getActiveConnectorNodes(connectorId); } else { nodes = nodeManager.getNodes(ACTIVE); } Set <String > coordinatorNodeIds = nodeManager.getCoordinators().stream() .map (Node::getNodeIdentifier) .collect(toImmutableSet()); for (Node node : nodes) { // 略