Presto调度task选择Worker方法

Presto调度task方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
public final class SystemPartitioningHandle
implements ConnectorPartitioningHandle
{
private enum SystemPartitioning
{
SINGLE,
FIXED,
SOURCE,
SCALED,
COORDINATOR_ONLY,
ARBITRARY
}
}

常见的场景主要包含SINGLE、FIXED及SOURCE类型,其中SINGLE表示最后数据的汇总输出,FIXED表示中间数据的计算,如JOIN等,SOURCE类型表示与源数据打交道的类型。

以下SQL为例:

1
select * from (select * from 1test join 2test1 on 1test.id = 2test1.123id);

其执行计划为:

如上图所示,左右两个stage调度task时都是SOURCE类型,中间为两张表JOIN,其为FIXED类型,最上面的output为SINGLE类型,做最后的汇总输出。

无论哪种类型,在调度task时,都要选择Worker,那其是怎么选择Worker的呢?

根据上面我们提到的Presto调度task方式,可以划分为两类:非源头fragment和源头fragment

非源头fragment

依赖SystemPartitioningHandle策略,通过当前集群可用节点以及hash partition count配置(query.initial-hash-partitions,默认为100)来共同确定该fragment需要的节点个数, 之后再调用 NodeSelector 选出Worker。逻辑为下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public NodePartitionMap getNodePartitionMap(Session session, NodeScheduler nodeScheduler)
{
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(null);
List<Node> nodes;
if (partitioning == SystemPartitioning.COORDINATOR_ONLY) {
nodes = ImmutableList.of(nodeSelector.selectCurrentNode());
}
else if (partitioning == SystemPartitioning.SINGLE) {
nodes = nodeSelector.selectRandomNodes(1);
}
else if (partitioning == SystemPartitioning.FIXED) {
nodes = nodeSelector.selectRandomNodes(getHashPartitionCount(session));
}
else {
throw new IllegalArgumentException("Unsupported plan distribution " + partitioning);
}

checkCondition(!nodes.isEmpty(), NO_NODES_AVAILABLE, "No worker nodes available");

ImmutableMap.Builder<Integer, Node> partitionToNode = ImmutableMap.builder();
for (int i = 0; i < nodes.size(); i++) {
Node node = nodes.get(i);
partitionToNode.put(i, node);
}
return new NodePartitionMap(partitionToNode.build(), split -> {
throw new UnsupportedOperationException("System distribution does not support source splits");
});
}

根据上面代码,我们可以看到SINGLE及FIXED会根据调度task的类型选择Node,最终调用的是同一个函数,函数逻辑为:

1
2
3
4
5

public List<Node> selectRandomNodes(int limit, Set<Node> excludedNodes)
{
return selectNodes(limit, randomizedNodes(nodeMap.get().get(), includeCoordinator, excludedNodes));
}

源头fragment

源头fragment依赖于其connector id,以hive connector为例,其通过discovery服务注册上来,具体实现是在DiscoveryNodeManger#refreshNodesInternal。在真正Schedule task时,为split分配Node时,采用DynamicSplitPlacementPolicy策略调用以下接口:

1
2
3
4
5
6
7
8
public static ResettableRandomizedIterator<Node> randomizedNodes(NodeMap nodeMap, boolean includeCoordinator, Set<Node> excludedNodes)
{
ImmutableList<Node> nodes = nodeMap.getNodesByHostAndPort().values().stream()
.filter(node -> includeCoordinator || !nodeMap.getCoordinatorNodeIds().contains(node.getNodeIdentifier()))
.filter(node -> !excludedNodes.contains(node))
.collect(toImmutableList());
return new ResettableRandomizedIterator<>(nodes);
}

可以看到无论是哪种选择方式,最终都绕不开NodeMap,那NodeMap怎么来的,由上文代码可以看到,其需要创建nodeScheduler.createNodeSelector,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public NodeSelector createNodeSelector(ConnectorId connectorId)
{
// this supplier is thread-safe. TODO: this logic should probably move to the scheduler since the choice of which node to run in should be
// done as close to when the the split is about to be scheduled
Supplier<NodeMap> nodeMap = Suppliers.memoizeWithExpiration(() -> {
ImmutableSetMultimap.Builder<HostAddress, Node> byHostAndPort = ImmutableSetMultimap.builder();
ImmutableSetMultimap.Builder<InetAddress, Node> byHost = ImmutableSetMultimap.builder();
ImmutableSetMultimap.Builder<NetworkLocation, Node> workersByNetworkPath = ImmutableSetMultimap.builder();

Set<Node> nodes;
if (connectorId != null) {
nodes = nodeManager.getActiveConnectorNodes(connectorId);
}
else {
nodes = nodeManager.getNodes(ACTIVE);
}

Set<String> coordinatorNodeIds = nodeManager.getCoordinators().stream()
.map(Node::getNodeIdentifier)
.collect(toImmutableSet());

for (Node node : nodes) {
// 略

这里我们找到源头了,Presto的机器是由Discovery管理的,上文nodeManager即DiscoveryNodeManager封装了Discovery服务接口,Discovery维护Worker死活信息。

知道Worker怎么选择后,之后就会使用stage的不同的调度器来开始下发task和split,同样根据SystemPartitioningHandle的不同,源头分为SourcePartitionedScheduler、FixedSourcePartitionedScheduler,非源头使用FixedCountScheduler进行调度,其在调度某个stage时下发多少task到Worker以及split下发过程,我们将在后续介绍。