Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PipelinedPlanNodeSet.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.planner;
16 
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Set;
20 
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23 
24 import com.cloudera.impala.thrift.TQueryOptions;
25 import com.google.common.base.Preconditions;
26 import com.google.common.collect.Lists;
27 import com.google.common.collect.Sets;
28 
41 public class PipelinedPlanNodeSet {
42  private final static Logger LOG = LoggerFactory.getLogger(PipelinedPlanNodeSet.class);
43 
44  // Minimum per-host resource requirements to ensure that no plan node set can have
45  // estimates of zero, even if the contained PlanNodes have estimates of zero.
46  public static final long MIN_PER_HOST_MEM = 10 * 1024 * 1024;
47  public static final int MIN_PER_HOST_VCORES = 1;
48 
49  // List of plan nodes that execute and consume resources concurrently.
50  private final ArrayList<PlanNode> planNodes = Lists.newArrayList();
51 
52  // DataSinks that execute and consume resources concurrently.
53  // Primarily used for estimating the cost of insert queries.
54  private final List<DataSink> dataSinks = Lists.newArrayList();
55 
56  // Estimated per-host memory and CPU requirements.
57  // Valid after computeResourceEstimates().
58  private long perHostMem = MIN_PER_HOST_MEM;
60 
61  public void add(PlanNode node) {
62  Preconditions.checkNotNull(node.getFragment());
63  planNodes.add(node);
64  }
65 
66  public void addSink(DataSink sink) {
67  Preconditions.checkNotNull(sink);
68  dataSinks.add(sink);
69  }
70 
77  public boolean computeResourceEstimates(boolean excludeUnpartitionedFragments,
78  TQueryOptions queryOptions) {
79  Set<PlanFragment> uniqueFragments = Sets.newHashSet();
80 
81  // Distinguish the per-host memory estimates for scan nodes and non-scan nodes to
82  // get a tighter estimate on the amount of memory required by multiple concurrent
83  // scans. The memory required by all concurrent scans of the same type (Hdfs/Hbase)
84  // cannot exceed the per-host upper memory bound for that scan type. Intuitively,
85  // the amount of I/O buffers is limited by the disk bandwidth.
86  long perHostHbaseScanMem = 0L;
87  long perHostHdfsScanMem = 0L;
88  long perHostNonScanMem = 0L;
89 
90  for (int i = 0; i < planNodes.size(); ++i) {
91  PlanNode node = planNodes.get(i);
92  PlanFragment fragment = node.getFragment();
93  if (!fragment.isPartitioned() && excludeUnpartitionedFragments) continue;
94  node.computeCosts(queryOptions);
95  uniqueFragments.add(fragment);
96  if (node.getPerHostMemCost() < 0) {
97  LOG.warn(String.format("Invalid per-host memory requirement %s of node %s.\n" +
98  "PlanNode stats are: numNodes_=%s ", node.getPerHostMemCost(),
99  node.getClass().getSimpleName(), node.getNumNodes()));
100  }
101  if (node instanceof HBaseScanNode) {
102  perHostHbaseScanMem += node.getPerHostMemCost();
103  } else if (node instanceof HdfsScanNode) {
104  perHostHdfsScanMem += node.getPerHostMemCost();
105  } else {
106  perHostNonScanMem += node.getPerHostMemCost();
107  }
108  }
109 
110  // The memory required by concurrent scans cannot exceed the upper memory bound
111  // for that scan type.
112  // TODO: In the future, we may want to restrict scanner concurrency based on a
113  // memory limit. This estimation will need to accoung for that as well.
114  perHostHbaseScanMem =
115  Math.min(perHostHbaseScanMem, HBaseScanNode.getPerHostMemUpperBound());
116  perHostHdfsScanMem =
117  Math.min(perHostHdfsScanMem, HdfsScanNode.getPerHostMemUpperBound());
118 
119  long perHostDataSinkMem = 0L;
120  for (int i = 0; i < dataSinks.size(); ++i) {
121  DataSink sink = dataSinks.get(i);
122  PlanFragment fragment = sink.getFragment();
123  if (!fragment.isPartitioned() && excludeUnpartitionedFragments) continue;
124  // Sanity check that this plan-node set has at least one PlanNode of fragment.
125  Preconditions.checkState(uniqueFragments.contains(fragment));
126  sink.computeCosts();
127  if (sink.getPerHostMemCost() < 0) {
128  LOG.warn(String.format("Invalid per-host memory requirement %s of sink %s.\n",
129  sink.getPerHostMemCost(), sink.getClass().getSimpleName()));
130  }
131  perHostDataSinkMem += sink.getPerHostMemCost();
132  }
133 
134  // Combine the memory estimates of all sinks, scans nodes and non-scan nodes.
135  long perHostMem = perHostHdfsScanMem + perHostHbaseScanMem + perHostNonScanMem +
136  perHostDataSinkMem;
137 
138  // The backend needs at least one thread per fragment.
139  int perHostVcores = uniqueFragments.size();
140 
141  // This plan node set might only have unpartitioned fragments.
142  // Only set estimates if they are valid.
143  if (perHostMem >= 0 && perHostVcores >= 0) {
144  this.perHostMem = perHostMem;
145  this.perHostVcores = perHostVcores;
146  return true;
147  }
148  return false;
149  }
150 
151  public long getPerHostMem() { return perHostMem; }
152  public int getPerHostVcores() { return perHostVcores; }
153 
157  public static ArrayList<PipelinedPlanNodeSet> computePlanNodeSets(PlanNode root) {
158  ArrayList<PipelinedPlanNodeSet> planNodeSets =
159  Lists.newArrayList(new PipelinedPlanNodeSet());
160  computePlanNodeSets(root, planNodeSets.get(0), null, planNodeSets);
161  return planNodeSets;
162  }
163 
173  private static void computePlanNodeSets(PlanNode node, PipelinedPlanNodeSet lhsSet,
174  PipelinedPlanNodeSet rhsSet, ArrayList<PipelinedPlanNodeSet> planNodeSets) {
175  lhsSet.add(node);
176  if (node == node.getFragment().getPlanRoot() && node.getFragment().hasSink()) {
177  lhsSet.addSink(node.getFragment().getSink());
178  }
179 
180  if (node instanceof HashJoinNode) {
181  // Create a new set for the right-hand sides of joins if necessary.
182  if (rhsSet == null) {
183  rhsSet = new PipelinedPlanNodeSet();
184  planNodeSets.add(rhsSet);
185  }
186  // The join node itself is added to the lhsSet (above) and the rhsSet.
187  rhsSet.add(node);
188  computePlanNodeSets(node.getChild(1), rhsSet, null, planNodeSets);
189  computePlanNodeSets(node.getChild(0), lhsSet, rhsSet, planNodeSets);
190  return;
191  }
192 
193  if (node.isBlockingNode()) {
194  // We add blocking nodes to two plan node sets because they require resources while
195  // consuming their input (execution of the preceding set) and while they
196  // emit their output (execution of the following set).
197  lhsSet = new PipelinedPlanNodeSet();
198  lhsSet.add(node);
199  planNodeSets.add(lhsSet);
200  // Join builds under this blocking node belong in a new rhsSet.
201  rhsSet = null;
202  }
203 
204  // Assume that non-join, non-blocking nodes with multiple children
205  // (e.g., ExchangeNodes) consume their inputs in an arbitrary order,
206  // i.e., all child subtrees execute concurrently.
207  // TODO: This is not true for UnionNodes anymore. Fix the estimates accordingly.
208  for (PlanNode child: node.getChildren()) {
209  computePlanNodeSets(child, lhsSet, rhsSet, planNodeSets);
210  }
211  }
212 }
static ArrayList< PipelinedPlanNodeSet > computePlanNodeSets(PlanNode root)
boolean computeResourceEstimates(boolean excludeUnpartitionedFragments, TQueryOptions queryOptions)
static void computePlanNodeSets(PlanNode node, PipelinedPlanNodeSet lhsSet, PipelinedPlanNodeSet rhsSet, ArrayList< PipelinedPlanNodeSet > planNodeSets)