Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AggregateInfo.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.analysis;
16 
17 import java.util.ArrayList;
18 import java.util.List;
19 
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22 
27 import com.cloudera.impala.thrift.TPartitionType;
28 import com.google.common.base.Objects;
29 import com.google.common.base.Preconditions;
30 import com.google.common.collect.Lists;
31 
66 public class AggregateInfo extends AggregateInfoBase {
67  private final static Logger LOG = LoggerFactory.getLogger(AggregateInfo.class);
68 
69  public enum AggPhase {
74 
75  public boolean isMerge() { return this == FIRST_MERGE || this == SECOND_MERGE; }
76  };
77 
78  // created by createMergeAggInfo()
80 
81  // created by createDistinctAggInfo()
83 
84  private final AggPhase aggPhase_;
85 
86  // Map from all grouping and aggregate exprs to a SlotRef referencing the corresp. slot
87  // in the intermediate tuple. Identical to outputTupleSmap_ if no aggregateExpr has an
88  // output type that is different from its intermediate type.
90 
91  // Map from all grouping and aggregate exprs to a SlotRef referencing the corresp. slot
92  // in the output tuple.
94 
95  // Map from slots of outputTupleSmap_ to the corresponding slot in
96  // intermediateTupleSmap_.
98  new ExprSubstitutionMap();
99 
100  // if set, a subset of groupingExprs_; set and used during planning
101  private List<Expr> partitionExprs_;
102 
103  // C'tor creates copies of groupingExprs and aggExprs.
104  private AggregateInfo(ArrayList<Expr> groupingExprs,
105  ArrayList<FunctionCallExpr> aggExprs, AggPhase aggPhase) {
106  super(groupingExprs, aggExprs);
107  aggPhase_ = aggPhase;
108  }
109 
110  public List<Expr> getPartitionExprs() { return partitionExprs_; }
111  public void setPartitionExprs(List<Expr> exprs) { partitionExprs_ = exprs; }
112 
122  static public AggregateInfo create(
123  ArrayList<Expr> groupingExprs, ArrayList<FunctionCallExpr> aggExprs,
124  TupleDescriptor tupleDesc, Analyzer analyzer)
125  throws AnalysisException {
126  Preconditions.checkState(
127  (groupingExprs != null && !groupingExprs.isEmpty())
128  || (aggExprs != null && !aggExprs.isEmpty()));
129  Expr.removeDuplicates(groupingExprs);
130  Expr.removeDuplicates(aggExprs);
131  AggregateInfo result = new AggregateInfo(groupingExprs, aggExprs, AggPhase.FIRST);
132 
133  // collect agg exprs with DISTINCT clause
134  ArrayList<FunctionCallExpr> distinctAggExprs = Lists.newArrayList();
135  if (aggExprs != null) {
136  for (FunctionCallExpr aggExpr: aggExprs) {
137  if (aggExpr.isDistinct()) distinctAggExprs.add(aggExpr);
138  }
139  }
140 
141  if (distinctAggExprs.isEmpty()) {
142  if (tupleDesc == null) {
143  result.createTupleDescs(analyzer);
144  result.createSmaps(analyzer);
145  } else {
146  // A tupleDesc should only be given for UNION DISTINCT.
147  Preconditions.checkState(aggExprs == null);
148  result.outputTupleDesc_ = tupleDesc;
149  result.intermediateTupleDesc_ = tupleDesc;
150  }
151  result.createMergeAggInfo(analyzer);
152  } else {
153  // we don't allow you to pass in a descriptor for distinct aggregation
154  // (we need two descriptors)
155  Preconditions.checkState(tupleDesc == null);
156  result.createDistinctAggInfo(groupingExprs, distinctAggExprs, analyzer);
157  }
158  LOG.debug("agg info:\n" + result.debugString());
159  return result;
160  }
161 
188  private void createDistinctAggInfo(
189  ArrayList<Expr> origGroupingExprs,
190  ArrayList<FunctionCallExpr> distinctAggExprs, Analyzer analyzer)
191  throws AnalysisException {
192  Preconditions.checkState(!distinctAggExprs.isEmpty());
193  // make sure that all DISTINCT params are the same;
194  // ignore top-level implicit casts in the comparison, we might have inserted
195  // those during analysis
196  ArrayList<Expr> expr0Children = Lists.newArrayList();
197  for (Expr expr: distinctAggExprs.get(0).getChildren()) {
198  expr0Children.add(expr.ignoreImplicitCast());
199  }
200  for (int i = 1; i < distinctAggExprs.size(); ++i) {
201  ArrayList<Expr> exprIChildren = Lists.newArrayList();
202  for (Expr expr: distinctAggExprs.get(i).getChildren()) {
203  exprIChildren.add(expr.ignoreImplicitCast());
204  }
205  if (!Expr.equalLists(expr0Children, exprIChildren)) {
206  throw new AnalysisException(
207  "all DISTINCT aggregate functions need to have the same set of "
208  + "parameters as " + distinctAggExprs.get(0).toSql()
209  + "; deviating function: " + distinctAggExprs.get(i).toSql());
210  }
211  }
212 
213  // add DISTINCT parameters to grouping exprs
214  groupingExprs_.addAll(expr0Children);
215 
216  // remove DISTINCT aggregate functions from aggExprs
217  aggregateExprs_.removeAll(distinctAggExprs);
218 
219  createTupleDescs(analyzer);
220  createSmaps(analyzer);
221  createMergeAggInfo(analyzer);
222  createSecondPhaseAggInfo(origGroupingExprs, distinctAggExprs, analyzer);
223  }
224 
228  }
229  public AggPhase getAggPhase() { return aggPhase_; }
230  public boolean isMerge() { return aggPhase_.isMerge(); }
231  public boolean isDistinctAgg() { return secondPhaseDistinctAggInfo_ != null; }
236  }
237 
238  public boolean hasAggregateExprs() {
239  return !aggregateExprs_.isEmpty() ||
240  (secondPhaseDistinctAggInfo_ != null &&
242  }
243 
248  if (isDistinctAgg()) return secondPhaseDistinctAggInfo_.getOutputTupleId();
249  return getOutputTupleId();
250  }
251 
252  public ArrayList<FunctionCallExpr> getMaterializedAggregateExprs() {
253  ArrayList<FunctionCallExpr> result = Lists.newArrayList();
254  for (Integer i: materializedSlots_) {
255  result.add(aggregateExprs_.get(i));
256  }
257  return result;
258  }
259 
264  public void getRefdSlots(List<SlotId> ids) {
265  Preconditions.checkState(outputTupleDesc_ != null);
266  if (groupingExprs_ != null) {
267  Expr.getIds(groupingExprs_, null, ids);
268  }
269  Expr.getIds(aggregateExprs_, null, ids);
270  // The backend assumes that the entire aggTupleDesc is materialized
271  for (int i = 0; i < outputTupleDesc_.getSlots().size(); ++i) {
272  ids.add(outputTupleDesc_.getSlots().get(i).getId());
273  }
274  }
275 
295  public void substitute(ExprSubstitutionMap smap, Analyzer analyzer)
296  throws InternalException {
297  groupingExprs_ = Expr.substituteList(groupingExprs_, smap, analyzer, false);
298  LOG.trace("AggInfo: grouping_exprs=" + Expr.debugString(groupingExprs_));
299 
300  // The smap in this case should not substitute the aggs themselves, only
301  // their subexpressions.
302  List<Expr> substitutedAggs =
303  Expr.substituteList(aggregateExprs_, smap, analyzer, false);
304  aggregateExprs_.clear();
305  for (Expr substitutedAgg: substitutedAggs) {
306  aggregateExprs_.add((FunctionCallExpr) substitutedAgg);
307  }
308 
309  LOG.trace("AggInfo: agg_exprs=" + Expr.debugString(aggregateExprs_));
310  outputTupleSmap_.substituteLhs(smap, analyzer);
311  intermediateTupleSmap_.substituteLhs(smap, analyzer);
312  if (secondPhaseDistinctAggInfo_ != null) {
313  secondPhaseDistinctAggInfo_.substitute(smap, analyzer);
314  }
315  }
316 
328  private void createMergeAggInfo(Analyzer analyzer) {
329  Preconditions.checkState(mergeAggInfo_ == null);
331  // construct grouping exprs
332  ArrayList<Expr> groupingExprs = Lists.newArrayList();
333  for (int i = 0; i < getGroupingExprs().size(); ++i) {
334  SlotRef slotRef = new SlotRef(inputDesc.getSlots().get(i));
335  groupingExprs.add(slotRef);
336  }
337 
338  // construct agg exprs
339  ArrayList<FunctionCallExpr> aggExprs = Lists.newArrayList();
340  for (int i = 0; i < getAggregateExprs().size(); ++i) {
341  FunctionCallExpr inputExpr = getAggregateExprs().get(i);
342  Preconditions.checkState(inputExpr.isAggregateFunction());
343  Expr aggExprParam =
344  new SlotRef(inputDesc.getSlots().get(i + getGroupingExprs().size()));
345  FunctionCallExpr aggExpr = FunctionCallExpr.createMergeAggCall(
346  inputExpr, Lists.newArrayList(aggExprParam));
347  aggExpr.analyzeNoThrow(analyzer);
348  aggExprs.add(aggExpr);
349  }
350 
351  AggPhase aggPhase =
353  mergeAggInfo_ = new AggregateInfo(groupingExprs, aggExprs, aggPhase);
354  mergeAggInfo_.intermediateTupleDesc_ = intermediateTupleDesc_;
355  mergeAggInfo_.outputTupleDesc_ = outputTupleDesc_;
356  mergeAggInfo_.intermediateTupleSmap_ = intermediateTupleSmap_;
357  mergeAggInfo_.outputTupleSmap_ = outputTupleSmap_;
358  mergeAggInfo_.mergeAggInfo_ = mergeAggInfo_;
359  mergeAggInfo_.materializedSlots_ = materializedSlots_;
360  }
361 
370  private Expr createCountDistinctAggExprParam(int firstIdx, int lastIdx,
371  ArrayList<SlotDescriptor> slots) {
372  if (firstIdx > lastIdx) return null;
373 
374  Expr elseExpr = new SlotRef(slots.get(lastIdx));
375  if (firstIdx == lastIdx) return elseExpr;
376 
377  for (int i = lastIdx - 1; i >= firstIdx; --i) {
378  ArrayList<Expr> ifArgs = Lists.newArrayList();
379  SlotRef slotRef = new SlotRef(slots.get(i));
380  // Build expr: IF(IsNull(slotRef), NULL, elseExpr)
381  Expr isNullPred = new IsNullPredicate(slotRef, false);
382  ifArgs.add(isNullPred);
383  ifArgs.add(new NullLiteral());
384  ifArgs.add(elseExpr);
385  elseExpr = new FunctionCallExpr("if", ifArgs);
386  }
387  return elseExpr;
388  }
389 
405  ArrayList<Expr> origGroupingExprs,
406  ArrayList<FunctionCallExpr> distinctAggExprs, Analyzer analyzer)
407  throws AnalysisException {
408  Preconditions.checkState(secondPhaseDistinctAggInfo_ == null);
409  Preconditions.checkState(!distinctAggExprs.isEmpty());
410  // The output of the 1st phase agg is the 1st phase intermediate.
412 
413  // construct agg exprs for original DISTINCT aggregate functions
414  // (these aren't part of aggExprs_)
415  ArrayList<FunctionCallExpr> secondPhaseAggExprs = Lists.newArrayList();
416  for (FunctionCallExpr inputExpr: distinctAggExprs) {
417  Preconditions.checkState(inputExpr.isAggregateFunction());
418  FunctionCallExpr aggExpr = null;
419  if (inputExpr.getFnName().getFunction().equals("count")) {
420  // COUNT(DISTINCT ...) ->
421  // COUNT(IF(IsNull(<agg slot 1>), NULL, IF(IsNull(<agg slot 2>), NULL, ...)))
422  // We need the nested IF to make sure that we do not count
423  // column-value combinations if any of the distinct columns are NULL.
424  // This behavior is consistent with MySQL.
425  Expr ifExpr = createCountDistinctAggExprParam(origGroupingExprs.size(),
426  origGroupingExprs.size() + inputExpr.getChildren().size() - 1,
427  inputDesc.getSlots());
428  Preconditions.checkNotNull(ifExpr);
429  ifExpr.analyzeNoThrow(analyzer);
430  aggExpr = new FunctionCallExpr("count", Lists.newArrayList(ifExpr));
431  } else {
432  // SUM(DISTINCT <expr>) -> SUM(<last grouping slot>);
433  // (MIN(DISTINCT ...) and MAX(DISTINCT ...) have their DISTINCT turned
434  // off during analysis, and AVG() is changed to SUM()/COUNT())
435  Expr aggExprParam =
436  new SlotRef(inputDesc.getSlots().get(origGroupingExprs.size()));
437  aggExpr = new FunctionCallExpr(inputExpr.getFnName(),
438  Lists.newArrayList(aggExprParam));
439  }
440  secondPhaseAggExprs.add(aggExpr);
441  }
442 
443  // map all the remaining agg fns
444  for (int i = 0; i < aggregateExprs_.size(); ++i) {
445  FunctionCallExpr inputExpr = aggregateExprs_.get(i);
446  Preconditions.checkState(inputExpr.isAggregateFunction());
447  // we're aggregating an intermediate slot of the 1st agg phase
448  Expr aggExprParam =
449  new SlotRef(inputDesc.getSlots().get(i + getGroupingExprs().size()));
450  FunctionCallExpr aggExpr = FunctionCallExpr.createMergeAggCall(
451  inputExpr, Lists.newArrayList(aggExprParam));
452  secondPhaseAggExprs.add(aggExpr);
453  }
454  Preconditions.checkState(
455  secondPhaseAggExprs.size() == aggregateExprs_.size() + distinctAggExprs.size());
456 
457  for (FunctionCallExpr aggExpr: secondPhaseAggExprs) {
458  aggExpr.analyzeNoThrow(analyzer);
459  Preconditions.checkState(aggExpr.isAggregateFunction());
460  }
461 
462  ArrayList<Expr> substGroupingExprs =
463  Expr.substituteList(origGroupingExprs, intermediateTupleSmap_, analyzer, false);
464  secondPhaseDistinctAggInfo_ =
465  new AggregateInfo(substGroupingExprs, secondPhaseAggExprs, AggPhase.SECOND);
466  secondPhaseDistinctAggInfo_.createTupleDescs(analyzer);
467  secondPhaseDistinctAggInfo_.createSecondPhaseAggSMap(this, distinctAggExprs);
468  secondPhaseDistinctAggInfo_.createMergeAggInfo(analyzer);
469  }
470 
476  AggregateInfo inputAggInfo, ArrayList<FunctionCallExpr> distinctAggExprs) {
477  outputTupleSmap_.clear();
478  int slotIdx = 0;
479  ArrayList<SlotDescriptor> slotDescs = outputTupleDesc_.getSlots();
480 
481  int numDistinctParams = distinctAggExprs.get(0).getChildren().size();
482  int numOrigGroupingExprs =
483  inputAggInfo.getGroupingExprs().size() - numDistinctParams;
484  Preconditions.checkState(slotDescs.size() ==
485  numOrigGroupingExprs + distinctAggExprs.size() +
486  inputAggInfo.getAggregateExprs().size());
487 
488  // original grouping exprs -> first m slots
489  for (int i = 0; i < numOrigGroupingExprs; ++i, ++slotIdx) {
490  Expr groupingExpr = inputAggInfo.getGroupingExprs().get(i);
491  outputTupleSmap_.put(
492  groupingExpr.clone(), new SlotRef(slotDescs.get(slotIdx)));
493  }
494 
495  // distinct agg exprs -> next n slots
496  for (int i = 0; i < distinctAggExprs.size(); ++i, ++slotIdx) {
497  Expr aggExpr = distinctAggExprs.get(i);
498  outputTupleSmap_.put(
499  aggExpr.clone(), (new SlotRef(slotDescs.get(slotIdx))));
500  }
501 
502  // remaining agg exprs -> remaining slots
503  for (int i = 0; i < inputAggInfo.getAggregateExprs().size(); ++i, ++slotIdx) {
504  Expr aggExpr = inputAggInfo.getAggregateExprs().get(i);
505  outputTupleSmap_.put(aggExpr.clone(), new SlotRef(slotDescs.get(slotIdx)));
506  }
507  }
508 
515  public void createSmaps(Analyzer analyzer) {
516  Preconditions.checkNotNull(outputTupleDesc_);
517  Preconditions.checkNotNull(intermediateTupleDesc_);
518 
519  List<Expr> exprs = Lists.newArrayListWithCapacity(
520  groupingExprs_.size() + aggregateExprs_.size());
521  exprs.addAll(groupingExprs_);
522  exprs.addAll(aggregateExprs_);
523  for (int i = 0; i < exprs.size(); ++i) {
524  outputTupleSmap_.put(exprs.get(i).clone(),
525  new SlotRef(outputTupleDesc_.getSlots().get(i)));
526  if (!requiresIntermediateTuple()) continue;
527  intermediateTupleSmap_.put(exprs.get(i).clone(),
528  new SlotRef(intermediateTupleDesc_.getSlots().get(i)));
529  outputToIntermediateTupleSmap_.put(
530  new SlotRef(outputTupleDesc_.getSlots().get(i)),
531  new SlotRef(intermediateTupleDesc_.getSlots().get(i)));
532  if (i < groupingExprs_.size()) {
533  analyzer.createAuxEquivPredicate(
534  new SlotRef(outputTupleDesc_.getSlots().get(i)),
535  new SlotRef(intermediateTupleDesc_.getSlots().get(i)));
536  }
537  }
539 
540  LOG.trace("output smap=" + outputTupleSmap_.debugString());
541  LOG.trace("intermediate smap=" + intermediateTupleSmap_.debugString());
542  }
543 
555  @Override
557  for (int i = 0; i < groupingExprs_.size(); ++i) {
558  outputTupleDesc_.getSlots().get(i).setIsMaterialized(true);
559  intermediateTupleDesc_.getSlots().get(i).setIsMaterialized(true);
560  }
561 
562  // collect input exprs: grouping exprs plus aggregate exprs that need to be
563  // materialized
564  materializedSlots_.clear();
565  List<Expr> exprs = Lists.newArrayList();
566  exprs.addAll(groupingExprs_);
567  for (int i = 0; i < aggregateExprs_.size(); ++i) {
568  SlotDescriptor slotDesc =
569  outputTupleDesc_.getSlots().get(groupingExprs_.size() + i);
570  SlotDescriptor intermediateSlotDesc =
571  intermediateTupleDesc_.getSlots().get(groupingExprs_.size() + i);
572  if (isDistinctAgg()) {
573  slotDesc.setIsMaterialized(true);
574  intermediateSlotDesc.setIsMaterialized(true);
575  }
576  if (!slotDesc.isMaterialized()) continue;
577  intermediateSlotDesc.setIsMaterialized(true);
578  exprs.add(aggregateExprs_.get(i));
579  materializedSlots_.add(i);
580  }
581  List<Expr> resolvedExprs = Expr.substituteList(exprs, smap, analyzer, false);
582  analyzer.materializeSlots(resolvedExprs);
583 
584  if (isDistinctAgg()) {
585  secondPhaseDistinctAggInfo_.materializeRequiredSlots(analyzer, null);
586  }
587  }
588 
596  public void checkConsistency() {
597  ArrayList<SlotDescriptor> slots = outputTupleDesc_.getSlots();
598 
599  // Check materialized slots.
600  int numMaterializedSlots = 0;
601  for (SlotDescriptor slotDesc: slots) {
602  if (slotDesc.isMaterialized()) ++numMaterializedSlots;
603  }
604  Preconditions.checkState(numMaterializedSlots ==
605  materializedSlots_.size() + groupingExprs_.size());
606 
607  // Check that grouping expr return types match the slot descriptors.
608  int slotIdx = 0;
609  for (int i = 0; i < groupingExprs_.size(); ++i) {
610  Expr groupingExpr = groupingExprs_.get(i);
611  Type slotType = slots.get(slotIdx).getType();
612  Preconditions.checkState(groupingExpr.getType().equals(slotType),
613  String.format("Grouping expr %s returns type %s but its output tuple " +
614  "slot has type %s", groupingExpr.toSql(),
615  groupingExpr.getType().toString(), slotType.toString()));
616  ++slotIdx;
617  }
618  // Check that aggregate expr return types match the slot descriptors.
619  for (int i = 0; i < aggregateExprs_.size(); ++i) {
620  Expr aggExpr = aggregateExprs_.get(i);
621  Type slotType = slots.get(slotIdx).getType();
622  Preconditions.checkState(aggExpr.getType().equals(slotType),
623  String.format("Agg expr %s returns type %s but its output tuple " +
624  "slot has type %s", aggExpr.toSql(), aggExpr.getType().toString(),
625  slotType.toString()));
626  ++slotIdx;
627  }
628  }
629 
637  if (groupingExprs_.isEmpty()) {
639  } else {
640  return new DataPartition(TPartitionType.HASH_PARTITIONED, groupingExprs_);
641  }
642  }
643 
644  @Override
645  public String debugString() {
646  StringBuilder out = new StringBuilder(super.debugString());
647  out.append(Objects.toStringHelper(this)
648  .add("phase", aggPhase_)
649  .add("intermediate_smap", intermediateTupleSmap_.debugString())
650  .add("output_smap", outputTupleSmap_.debugString())
651  .toString());
652  if (mergeAggInfo_ != this) {
653  out.append("\nmergeAggInfo:\n" + mergeAggInfo_.debugString());
654  }
655  if (secondPhaseDistinctAggInfo_ != null) {
656  out.append("\nsecondPhaseDistinctAggInfo:\n"
657  + secondPhaseDistinctAggInfo_.debugString());
658  }
659  return out.toString();
660  }
661 
662  @Override
663  protected String tupleDebugName() { return "agg-tuple"; }
664 }
void createSecondPhaseAggInfo(ArrayList< Expr > origGroupingExprs, ArrayList< FunctionCallExpr > distinctAggExprs, Analyzer analyzer)
static AggregateInfo create(ArrayList< Expr > groupingExprs, ArrayList< FunctionCallExpr > aggExprs, TupleDescriptor tupleDesc, Analyzer analyzer)
static final DataPartition UNPARTITIONED
AggregateInfo(ArrayList< Expr > groupingExprs, ArrayList< FunctionCallExpr > aggExprs, AggPhase aggPhase)
ArrayList< FunctionCallExpr > getAggregateExprs()
ArrayList< FunctionCallExpr > getMaterializedAggregateExprs()
void materializeRequiredSlots(Analyzer analyzer, ExprSubstitutionMap smap)
void createDistinctAggInfo(ArrayList< Expr > origGroupingExprs, ArrayList< FunctionCallExpr > distinctAggExprs, Analyzer analyzer)
final ExprSubstitutionMap outputToIntermediateTupleSmap_
static< CextendsExpr > boolean equalLists(List< C > l1, List< C > l2)
Definition: Expr.java:584
void createSecondPhaseAggSMap(AggregateInfo inputAggInfo, ArrayList< FunctionCallExpr > distinctAggExprs)
void substitute(ExprSubstitutionMap smap, Analyzer analyzer)
Expr createCountDistinctAggExprParam(int firstIdx, int lastIdx, ArrayList< SlotDescriptor > slots)