Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AggregationNode.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.planner;
16 
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Set;
20 
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23 
30 import com.cloudera.impala.thrift.TAggregationNode;
31 import com.cloudera.impala.thrift.TExplainLevel;
32 import com.cloudera.impala.thrift.TExpr;
33 import com.cloudera.impala.thrift.TPlanNode;
34 import com.cloudera.impala.thrift.TPlanNodeType;
35 import com.cloudera.impala.thrift.TQueryOptions;
36 import com.google.common.base.Objects;
37 import com.google.common.base.Preconditions;
38 import com.google.common.collect.Lists;
39 import com.google.common.collect.Sets;
40 
45 public class AggregationNode extends PlanNode {
46  private final static Logger LOG = LoggerFactory.getLogger(AggregationNode.class);
47 
48  // Default per-host memory requirement used if no valid stats are available.
49  // TODO: Come up with a more useful heuristic.
50  private final static long DEFAULT_PER_HOST_MEM = 128L * 1024L * 1024L;
51 
52  // Conservative minimum size of hash table for low-cardinality aggregations.
53  private final static long MIN_HASH_TBL_MEM = 10L * 1024L * 1024L;
54 
55  private final AggregateInfo aggInfo_;
56 
57  // Set to true if this aggregation node needs to run the Finalize step. This
58  // node is the root node of a distributed aggregation.
59  private boolean needsFinalize_;
60 
64  public AggregationNode(PlanNodeId id, PlanNode input, AggregateInfo aggInfo) {
65  super(id, aggInfo.getOutputTupleId().asList(), "AGGREGATE");
66  aggInfo_ = aggInfo;
67  children_.add(input);
68  nullableTupleIds_.addAll(input.getNullableTupleIds());
69  needsFinalize_ = true;
70  }
71 
76  super(id, src, "AGGREGATE");
77  aggInfo_ = src.aggInfo_;
78  needsFinalize_ = src.needsFinalize_;
79  }
80 
81  public AggregateInfo getAggInfo() { return aggInfo_; }
82 
83  // Unsets this node as requiring finalize. Only valid to call this if it is
84  // currently marked as needing finalize.
85  public void unsetNeedsFinalize() {
86  Preconditions.checkState(needsFinalize_);
87  needsFinalize_ = false;
88  }
89 
94  public void setIntermediateTuple() {
95  Preconditions.checkState(!tupleIds_.isEmpty());
96  Preconditions.checkState(tupleIds_.get(0).equals(aggInfo_.getOutputTupleId()));
97  tupleIds_.clear();
98  tupleIds_.add(aggInfo_.getIntermediateTupleId());
99  }
100 
101  @Override
102  public boolean isBlockingNode() { return true; }
103 
104  @Override
105  public void init(Analyzer analyzer) throws InternalException {
106  // Assign predicates to the top-most agg in the single-node plan that can evaluate
107  // them, as follows: For non-distinct aggs place them in the 1st phase agg node. For
108  // distinct aggs place them in the 2nd phase agg node. The conjuncts are
109  // transferred to the proper place in the multi-node plan via transferConjuncts().
110  if (tupleIds_.get(0).equals(aggInfo_.getResultTupleId()) && !aggInfo_.isMerge()) {
111  // Ignore predicates bound to a group-by slot because those
112  // are already evaluated below this agg node (e.g., in a scan).
113  Set<SlotId> groupBySlots = Sets.newHashSet();
114  for (int i = 0; i < aggInfo_.getGroupingExprs().size(); ++i) {
115  groupBySlots.add(aggInfo_.getOutputTupleDesc().getSlots().get(i).getId());
116  }
117  ArrayList<Expr> bindingPredicates =
118  analyzer.getBoundPredicates(tupleIds_.get(0), groupBySlots, true);
119  conjuncts_.addAll(bindingPredicates);
120 
121  // also add remaining unassigned conjuncts_
122  assignConjuncts(analyzer);
123 
124  analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_, groupBySlots);
125  }
126  // Compute the mem layout for both tuples here for simplicity.
127  aggInfo_.getOutputTupleDesc().computeMemLayout();
128  aggInfo_.getIntermediateTupleDesc().computeMemLayout();
129 
130  // do this at the end so it can take all conjuncts into account
131  computeStats(analyzer);
132 
133  // don't call createDefaultSMap(), it would point our conjuncts (= Having clause)
134  // to our input; our conjuncts don't get substituted because they already
135  // refer to our output
137  aggInfo_.substitute(outputSmap_, analyzer);
138  // assert consistent aggregate expr and slot materialization
139  aggInfo_.checkConsistency();
140  }
141 
142  @Override
143  public void computeStats(Analyzer analyzer) {
144  super.computeStats(analyzer);
145  // This is prone to overflow, because we keep multiplying cardinalities,
146  // even if the grouping exprs are functionally dependent (example:
147  // group by the primary key of a table plus a number of other columns from that
148  // same table)
149  // TODO: try to recognize functional dependencies
150  // TODO: as a shortcut, instead of recognizing functional dependencies,
151  // limit the contribution of a single table to the number of rows
152  // of that table (so that when we're grouping by the primary key col plus
153  // some others, the estimate doesn't overshoot dramatically)
154  // cardinality: product of # of distinct values produced by grouping exprs
155 
156  // Any non-grouping aggregation has at least one distinct value
157  cardinality_ = aggInfo_.getGroupingExprs().isEmpty() ? 1 :
158  Expr.getNumDistinctValues(aggInfo_.getGroupingExprs());
159  // take HAVING predicate into account
160  LOG.trace("Agg: cardinality=" + Long.toString(cardinality_));
161  if (cardinality_ > 0) {
162  cardinality_ = Math.round((double) cardinality_ * computeSelectivity());
163  LOG.trace("sel=" + Double.toString(computeSelectivity()));
164  }
165  // if we ended up with an overflow, the estimate is certain to be wrong
166  if (cardinality_ < 0) cardinality_ = -1;
167  // Sanity check the cardinality_ based on the input cardinality_.
168  if (getChild(0).getCardinality() != -1) {
169  if (cardinality_ == -1) {
170  // A worst-case cardinality_ is better than an unknown cardinality_.
171  cardinality_ = getChild(0).getCardinality();
172  } else {
173  // An AggregationNode cannot increase the cardinality_.
174  cardinality_ = Math.min(getChild(0).getCardinality(), cardinality_);
175  }
176  }
177  LOG.trace("stats Agg: cardinality=" + Long.toString(cardinality_));
178  }
179 
180  @Override
181  protected String debugString() {
182  return Objects.toStringHelper(this)
183  .add("aggInfo", aggInfo_.debugString())
184  .addValue(super.debugString())
185  .toString();
186  }
187 
188  @Override
189  protected void toThrift(TPlanNode msg) {
190  msg.node_type = TPlanNodeType.AGGREGATION_NODE;
191 
192  List<TExpr> aggregateFunctions = Lists.newArrayList();
193  // only serialize agg exprs that are being materialized
195  aggregateFunctions.add(e.treeToThrift());
196  }
197  aggInfo_.checkConsistency();
198  msg.agg_node = new TAggregationNode(
199  aggregateFunctions,
202  List<Expr> groupingExprs = aggInfo_.getGroupingExprs();
203  if (groupingExprs != null) {
204  msg.agg_node.setGrouping_exprs(Expr.treesToThrift(groupingExprs));
205  }
206  }
207 
208  @Override
209  protected String getDisplayLabelDetail() {
210  if (needsFinalize_) return "FINALIZE";
211  return null;
212  }
213 
214  @Override
215  protected String getNodeExplainString(String prefix, String detailPrefix,
216  TExplainLevel detailLevel) {
217  StringBuilder output = new StringBuilder();
218  String nameDetail = getDisplayLabelDetail();
219  output.append(String.format("%s%s", prefix, getDisplayLabel()));
220  if (nameDetail != null) output.append(" [" + nameDetail + "]");
221  output.append("\n");
222 
223  if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
224  if (aggInfo_.getAggregateExprs() != null &&
225  aggInfo_.getAggregateExprs().size() > 0) {
226  output.append(detailPrefix + "output: ")
227  .append(getExplainString(aggInfo_.getAggregateExprs()) + "\n");
228  }
229  // TODO: is this the best way to display this. It currently would
230  // have DISTINCT_PC(DISTINCT_PC(col)) for the merge phase but not
231  // very obvious what that means if you don't already know.
232 
233  // TODO: group by can be very long. Break it into multiple lines
234  if (!aggInfo_.getGroupingExprs().isEmpty()) {
235  output.append(detailPrefix + "group by: ")
236  .append(getExplainString(aggInfo_.getGroupingExprs()) + "\n");
237  }
238  if (!conjuncts_.isEmpty()) {
239  output.append(detailPrefix + "having: ")
240  .append(getExplainString(conjuncts_) + "\n");
241  }
242  }
243  return output.toString();
244  }
245 
246  @Override
247  public void computeCosts(TQueryOptions queryOptions) {
248  Preconditions.checkNotNull(fragment_,
249  "PlanNode must be placed into a fragment before calling this method.");
250  perHostMemCost_ = 0;
251  long perHostCardinality = fragment_.getNumDistinctValues(aggInfo_.getGroupingExprs());
252  if (perHostCardinality == -1) {
253  perHostMemCost_ = DEFAULT_PER_HOST_MEM;
254  return;
255  }
256 
257  // Per-host cardinality cannot be greater than the total output cardinality.
258  if (cardinality_ != -1) {
259  perHostCardinality = Math.min(perHostCardinality, cardinality_);
260  }
261  perHostMemCost_ += Math.max(perHostCardinality * avgRowSize_ *
263  }
264 }
void assignConjuncts(Analyzer analyzer)
Definition: PlanNode.java:401
ArrayList< TupleId > tupleIds_
Definition: PlanNode.java:74
AggregationNode(PlanNodeId id, AggregationNode src)
ExprSubstitutionMap getCombinedChildSmap()
Definition: PlanNode.java:410
ArrayList< FunctionCallExpr > getAggregateExprs()
String getNodeExplainString(String prefix, String detailPrefix, TExplainLevel detailLevel)
ArrayList< FunctionCallExpr > getMaterializedAggregateExprs()
int SlotId
Definition: global-types.h:24
void computeCosts(TQueryOptions queryOptions)
AggregationNode(PlanNodeId id, PlanNode input, AggregateInfo aggInfo)
void computeMemLayout(Analyzer analyzer)
Definition: PlanNode.java:475
ExprSubstitutionMap outputSmap_
Definition: PlanNode.java:93