Presto调度task方式 1 2 3 4 5 6 7 8 9 10 11 12 13 public final class SystemPartitioningHandle implements ConnectorPartitioningHandle { private enum SystemPartitioning { SINGLE, FIXED, SOURCE, SCALED, COORDINATOR_ONLY, ARBITRARY } }
常见的场景主要包含SINGLE、FIXED及SOURCE类型,其中SINGLE表示最后数据的汇总输出,FIXED表示中间数据的计算,如JOIN等,SOURCE类型表示与源数据打交道的类型。
以下SQL为例:
1 2 select * from (select * from 1 test join 2 test1 on 1 test.id = 2 test1.123 id);
其执行计划为
如上图所示,左右两个stage调度task时都是SOURCE类型,中间为两张表JOIN,其为FIXED类型,最上面的output为SINGLE类型,做最后的汇总输出。
无论哪种类型,在调度task时,都要选择Worker,那其是怎么选择Worker的呢?
根据上面我们提到的Presto调度task方式,可以划分为两类:非源头fragment和源头fragment
非源头fragment 依赖SystemPartitioningHandle策略,通过当前集群可用节点以及hash partition count配置(query.initial-hash-partitions,默认为100)来共同确定该fragment需要的节点个数, 之后再调用 NodeSelector 选出Worker。逻辑为下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public NodePartitionMap getNodePartitionMap(Session session , NodeScheduler nodeScheduler){ NodeSelector nodeSelector = nodeScheduler.createNodeSelector(null ); List<Node> nodes; if (partitioning == SystemPartitioning.COORDINATOR_ONLY) { nodes = ImmutableList.of (nodeSelector.selectCurrentNode()); } else if (partitioning == SystemPartitioning.SINGLE) { nodes = nodeSelector.selectRandomNodes(1 ); } else if (partitioning == SystemPartitioning.FIXED) { nodes = nodeSelector.selectRandomNodes(getHashPartitionCount(session )); } else { throw new IllegalArgumentException("Unsupported plan distribution " + partitioning); } checkCondition(!nodes.isEmpty(), NO_NODES_AVAILABLE, "No worker nodes available"); ImmutableMap.Builder<Integer , Node> partitionToNode = ImmutableMap.builder(); for (int i = 0 ; i < nodes.size(); i++) { Node node = nodes.get (i); partitionToNode.put(i, node); } return new NodePartitionMap(partitionToNode.build(), split -> { throw new UnsupportedOperationException("System distribution does not support source splits"); }); }
根据上面代码,我们可以看到SINGLE及FIXED会根据调度task的类型选择Node,最终调用的是同一个函数,函数逻辑为:
1 2 3 4 5 6 public List<Node> selectRandomNodes(int limit , Set <Node> excludedNodes){ return selectNodes(limit , randomizedNodes(nodeMap.get ().get (), includeCoordinator, excludedNodes)); }
源头fragment 源头fragment依赖于其connector id,以hive connector为例,其通过discovery服务注册上来,具体实现是在DiscoveryNodeManger#refreshNodesInternal。在真正Schedule task时,为split分配Node时,采用DynamicSplitPlacementPolicy策略调用以下接口:
1 2 3 4 5 6 7 8 9 public static ResettableRandomizedIterator<Node> randomizedNodes(NodeMap nodeMap, boolean includeCoordinator, Set<Node> excludedNodes) { ImmutableList<Node> nodes = nodeMap.getNodesByHostAndPort().values().stream() .filter(node -> includeCoordinator || !nodeMap.getCoordinatorNodeIds().contains(node.getNodeIdentifier())) .filter(node -> !excludedNodes .contains(node )) .collect (toImmutableList()); return new ResettableRandomizedIterator<> (nodes); }
可以看到无论是哪种选择方式,最终都绕不开NodeMap,那NodeMap怎么来的,由上文代码可以看到,其需要创建nodeScheduler.createNodeSelector,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public NodeSelector createNodeSelector(ConnectorId connectorId) { // this supplier is thread-safe. TODO: this logic should probably move to the scheduler since the choice of which node to run in should be // done as close to when the the split is about to be scheduled Supplier<NodeMap> nodeMap = Suppliers.memoizeWithExpiration(() -> { ImmutableSetMultimap.Builder<HostAddress, Node> byHostAndPort = ImmutableSetMultimap.builder(); ImmutableSetMultimap.Builder<InetAddress, Node> byHost = ImmutableSetMultimap.builder(); ImmutableSetMultimap.Builder<NetworkLocation, Node> workersByNetworkPath = ImmutableSetMultimap.builder(); Set <Node> nodes; if (connectorId != null ) { nodes = nodeManager.getActiveConnectorNodes(connectorId); } else { nodes = nodeManager.getNodes(ACTIVE); } Set <String > coordinatorNodeIds = nodeManager.getCoordinators().stream() .map (Node::getNodeIdentifier) .collect(toImmutableSet()); for (Node node : nodes) { // 略
这里我们找到源头了,Presto的机器是由Discovery管理的,上文nodeManager即DiscoveryNodeManager封装了Discovery服务接口,Discovery维护Worker死活信息。
知道Worker怎么选择后,之后就会使用stage的不同的调度器来开始下发task和split,同样根据SystemPartitioningHandle的不同,源头分为SourcePartitionedScheduler、FixedSourcePartitionedScheduler,非源头使用FixedCountScheduler进行调度,其在调度某个stage时下发多少task到Worker以及split下发过程,我们将在后续介绍。