15 package com.cloudera.impala.analysis;
17 import java.util.ArrayList;
18 import java.util.List;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
27 import com.cloudera.impala.thrift.TPartitionType;
28 import com.google.common.base.Objects;
29 import com.google.common.base.Preconditions;
30 import com.google.common.collect.Lists;
67 private final static Logger
LOG = LoggerFactory.getLogger(AggregateInfo.class);
75 public boolean isMerge() {
return this == FIRST_MERGE ||
this == SECOND_MERGE; }
105 ArrayList<FunctionCallExpr> aggExprs,
AggPhase aggPhase) {
106 super(groupingExprs, aggExprs);
123 ArrayList<Expr> groupingExprs, ArrayList<FunctionCallExpr> aggExprs,
126 Preconditions.checkState(
127 (groupingExprs != null && !groupingExprs.isEmpty())
128 || (aggExprs != null && !aggExprs.isEmpty()));
129 Expr.removeDuplicates(groupingExprs);
130 Expr.removeDuplicates(aggExprs);
134 ArrayList<FunctionCallExpr> distinctAggExprs = Lists.newArrayList();
135 if (aggExprs != null) {
137 if (aggExpr.isDistinct()) distinctAggExprs.add(aggExpr);
141 if (distinctAggExprs.isEmpty()) {
142 if (tupleDesc == null) {
143 result.createTupleDescs(analyzer);
144 result.createSmaps(analyzer);
147 Preconditions.checkState(aggExprs == null);
148 result.outputTupleDesc_ = tupleDesc;
149 result.intermediateTupleDesc_ = tupleDesc;
151 result.createMergeAggInfo(analyzer);
155 Preconditions.checkState(tupleDesc == null);
156 result.createDistinctAggInfo(groupingExprs, distinctAggExprs, analyzer);
158 LOG.debug(
"agg info:\n" + result.debugString());
189 ArrayList<Expr> origGroupingExprs,
190 ArrayList<FunctionCallExpr> distinctAggExprs,
Analyzer analyzer)
192 Preconditions.checkState(!distinctAggExprs.isEmpty());
196 ArrayList<Expr> expr0Children = Lists.newArrayList();
197 for (
Expr expr: distinctAggExprs.get(0).getChildren()) {
198 expr0Children.add(expr.ignoreImplicitCast());
200 for (
int i = 1; i < distinctAggExprs.size(); ++i) {
201 ArrayList<Expr> exprIChildren = Lists.newArrayList();
202 for (
Expr expr: distinctAggExprs.get(i).getChildren()) {
203 exprIChildren.add(expr.ignoreImplicitCast());
207 "all DISTINCT aggregate functions need to have the same set of "
208 +
"parameters as " + distinctAggExprs.get(0).toSql()
209 +
"; deviating function: " + distinctAggExprs.get(i).toSql());
214 groupingExprs_.addAll(expr0Children);
217 aggregateExprs_.removeAll(distinctAggExprs);
230 public boolean isMerge() {
return aggPhase_.isMerge(); }
239 return !aggregateExprs_.isEmpty() ||
248 if (
isDistinctAgg())
return secondPhaseDistinctAggInfo_.getOutputTupleId();
253 ArrayList<FunctionCallExpr> result = Lists.newArrayList();
255 result.add(aggregateExprs_.get(i));
271 for (
int i = 0; i < outputTupleDesc_.getSlots().size(); ++i) {
272 ids.add(outputTupleDesc_.getSlots().
get(i).getId());
298 LOG.trace(
"AggInfo: grouping_exprs=" + Expr.debugString(
groupingExprs_));
302 List<Expr> substitutedAggs =
304 aggregateExprs_.clear();
305 for (
Expr substitutedAgg: substitutedAggs) {
310 outputTupleSmap_.substituteLhs(smap, analyzer);
311 intermediateTupleSmap_.substituteLhs(smap, analyzer);
313 secondPhaseDistinctAggInfo_.substitute(smap, analyzer);
332 ArrayList<Expr> groupingExprs = Lists.newArrayList();
335 groupingExprs.add(slotRef);
339 ArrayList<FunctionCallExpr> aggExprs = Lists.newArrayList();
342 Preconditions.checkState(inputExpr.isAggregateFunction());
346 inputExpr, Lists.newArrayList(aggExprParam));
347 aggExpr.analyzeNoThrow(analyzer);
348 aggExprs.add(aggExpr);
353 mergeAggInfo_ =
new AggregateInfo(groupingExprs, aggExprs, aggPhase);
371 ArrayList<SlotDescriptor> slots) {
372 if (firstIdx > lastIdx)
return null;
375 if (firstIdx == lastIdx)
return elseExpr;
377 for (
int i = lastIdx - 1; i >= firstIdx; --i) {
378 ArrayList<Expr> ifArgs = Lists.newArrayList();
382 ifArgs.add(isNullPred);
384 ifArgs.add(elseExpr);
405 ArrayList<Expr> origGroupingExprs,
406 ArrayList<FunctionCallExpr> distinctAggExprs,
Analyzer analyzer)
409 Preconditions.checkState(!distinctAggExprs.isEmpty());
415 ArrayList<FunctionCallExpr> secondPhaseAggExprs = Lists.newArrayList();
417 Preconditions.checkState(inputExpr.isAggregateFunction());
419 if (inputExpr.getFnName().getFunction().equals(
"count")) {
426 origGroupingExprs.size() + inputExpr.getChildren().size() - 1,
428 Preconditions.checkNotNull(ifExpr);
429 ifExpr.analyzeNoThrow(analyzer);
438 Lists.newArrayList(aggExprParam));
440 secondPhaseAggExprs.add(aggExpr);
444 for (
int i = 0; i < aggregateExprs_.size(); ++i) {
446 Preconditions.checkState(inputExpr.isAggregateFunction());
451 inputExpr, Lists.newArrayList(aggExprParam));
452 secondPhaseAggExprs.add(aggExpr);
454 Preconditions.checkState(
455 secondPhaseAggExprs.size() ==
aggregateExprs_.size() + distinctAggExprs.size());
458 aggExpr.analyzeNoThrow(analyzer);
459 Preconditions.checkState(aggExpr.isAggregateFunction());
462 ArrayList<Expr> substGroupingExprs =
464 secondPhaseDistinctAggInfo_ =
466 secondPhaseDistinctAggInfo_.createTupleDescs(analyzer);
467 secondPhaseDistinctAggInfo_.createSecondPhaseAggSMap(
this, distinctAggExprs);
468 secondPhaseDistinctAggInfo_.createMergeAggInfo(analyzer);
476 AggregateInfo inputAggInfo, ArrayList<FunctionCallExpr> distinctAggExprs) {
477 outputTupleSmap_.clear();
479 ArrayList<SlotDescriptor> slotDescs = outputTupleDesc_.getSlots();
481 int numDistinctParams = distinctAggExprs.get(0).getChildren().size();
482 int numOrigGroupingExprs =
483 inputAggInfo.getGroupingExprs().size() - numDistinctParams;
484 Preconditions.checkState(slotDescs.size() ==
485 numOrigGroupingExprs + distinctAggExprs.size() +
486 inputAggInfo.getAggregateExprs().size());
489 for (
int i = 0; i < numOrigGroupingExprs; ++i, ++slotIdx) {
490 Expr groupingExpr = inputAggInfo.getGroupingExprs().
get(i);
491 outputTupleSmap_.put(
492 groupingExpr.clone(),
new SlotRef(slotDescs.get(slotIdx)));
496 for (
int i = 0; i < distinctAggExprs.size(); ++i, ++slotIdx) {
497 Expr aggExpr = distinctAggExprs.get(i);
498 outputTupleSmap_.put(
499 aggExpr.clone(), (
new SlotRef(slotDescs.get(slotIdx))));
503 for (
int i = 0; i < inputAggInfo.getAggregateExprs().size(); ++i, ++slotIdx) {
504 Expr aggExpr = inputAggInfo.getAggregateExprs().
get(i);
505 outputTupleSmap_.put(aggExpr.clone(),
new SlotRef(slotDescs.get(slotIdx)));
519 List<Expr> exprs = Lists.newArrayListWithCapacity(
523 for (
int i = 0; i < exprs.size(); ++i) {
524 outputTupleSmap_.put(exprs.get(i).clone(),
527 intermediateTupleSmap_.put(exprs.get(i).clone(),
529 outputToIntermediateTupleSmap_.put(
533 analyzer.createAuxEquivPredicate(
540 LOG.trace(
"output smap=" + outputTupleSmap_.debugString());
541 LOG.trace(
"intermediate smap=" + intermediateTupleSmap_.debugString());
557 for (
int i = 0; i < groupingExprs_.size(); ++i) {
558 outputTupleDesc_.getSlots().
get(i).setIsMaterialized(
true);
559 intermediateTupleDesc_.getSlots().
get(i).setIsMaterialized(
true);
564 materializedSlots_.clear();
565 List<Expr> exprs = Lists.newArrayList();
567 for (
int i = 0; i < aggregateExprs_.size(); ++i) {
573 slotDesc.setIsMaterialized(
true);
574 intermediateSlotDesc.setIsMaterialized(
true);
577 intermediateSlotDesc.setIsMaterialized(
true);
578 exprs.add(aggregateExprs_.get(i));
579 materializedSlots_.add(i);
581 List<Expr> resolvedExprs = Expr.substituteList(exprs, smap, analyzer,
false);
582 analyzer.materializeSlots(resolvedExprs);
585 secondPhaseDistinctAggInfo_.materializeRequiredSlots(analyzer, null);
597 ArrayList<SlotDescriptor> slots = outputTupleDesc_.getSlots();
600 int numMaterializedSlots = 0;
602 if (slotDesc.isMaterialized()) ++numMaterializedSlots;
604 Preconditions.checkState(numMaterializedSlots ==
609 for (
int i = 0; i < groupingExprs_.size(); ++i) {
610 Expr groupingExpr = groupingExprs_.get(i);
611 Type slotType = slots.get(slotIdx).getType();
612 Preconditions.checkState(groupingExpr.getType().equals(slotType),
613 String.format(
"Grouping expr %s returns type %s but its output tuple " +
614 "slot has type %s", groupingExpr.toSql(),
615 groupingExpr.
getType().toString(), slotType.toString()));
619 for (
int i = 0; i < aggregateExprs_.size(); ++i) {
620 Expr aggExpr = aggregateExprs_.get(i);
621 Type slotType = slots.get(slotIdx).getType();
622 Preconditions.checkState(aggExpr.getType().equals(slotType),
623 String.format(
"Agg expr %s returns type %s but its output tuple " +
624 "slot has type %s", aggExpr.toSql(), aggExpr.
getType().toString(),
625 slotType.toString()));
646 StringBuilder out =
new StringBuilder(super.debugString());
647 out.append(Objects.toStringHelper(
this)
649 .add(
"intermediate_smap", intermediateTupleSmap_.debugString())
650 .add(
"output_smap", outputTupleSmap_.debugString())
653 out.append(
"\nmergeAggInfo:\n" + mergeAggInfo_.debugString());
656 out.append(
"\nsecondPhaseDistinctAggInfo:\n"
657 + secondPhaseDistinctAggInfo_.debugString());
659 return out.toString();
boolean hasAggregateExprs()
ArrayList< Expr > groupingExprs_
void createSecondPhaseAggInfo(ArrayList< Expr > origGroupingExprs, ArrayList< FunctionCallExpr > distinctAggExprs, Analyzer analyzer)
TupleDescriptor intermediateTupleDesc_
AggregateInfo getSecondPhaseDistinctAggInfo()
static AggregateInfo create(ArrayList< Expr > groupingExprs, ArrayList< FunctionCallExpr > aggExprs, TupleDescriptor tupleDesc, Analyzer analyzer)
static final DataPartition UNPARTITIONED
void createMergeAggInfo(Analyzer analyzer)
ArrayList< Expr > getGroupingExprs()
TupleDescriptor outputTupleDesc_
ExprSubstitutionMap intermediateTupleSmap_
AggregateInfo(ArrayList< Expr > groupingExprs, ArrayList< FunctionCallExpr > aggExprs, AggPhase aggPhase)
boolean requiresIntermediateTuple()
TupleId getOutputTupleId()
ArrayList< FunctionCallExpr > aggregateExprs_
ExprSubstitutionMap getOutputSmap()
ArrayList< FunctionCallExpr > getAggregateExprs()
void createTupleDescs(Analyzer analyzer)
AggregateInfo secondPhaseDistinctAggInfo_
AggregateInfo getMergeAggInfo()
ExprSubstitutionMap getOutputToIntermediateSmap()
ArrayList< FunctionCallExpr > getMaterializedAggregateExprs()
ArrayList< SlotDescriptor > getSlots()
void materializeRequiredSlots(Analyzer analyzer, ExprSubstitutionMap smap)
void createDistinctAggInfo(ArrayList< Expr > origGroupingExprs, ArrayList< FunctionCallExpr > distinctAggExprs, Analyzer analyzer)
ExprSubstitutionMap outputTupleSmap_
final ExprSubstitutionMap outputToIntermediateTupleSmap_
AggregateInfo mergeAggInfo_
DataPartition getPartition()
List< Expr > getPartitionExprs()
static< CextendsExpr > boolean equalLists(List< C > l1, List< C > l2)
void getRefdSlots(List< SlotId > ids)
void createSecondPhaseAggSMap(AggregateInfo inputAggInfo, ArrayList< FunctionCallExpr > distinctAggExprs)
void createSmaps(Analyzer analyzer)
void setPartitionExprs(List< Expr > exprs)
ExprSubstitutionMap getIntermediateSmap()
List< Expr > partitionExprs_
TupleId getResultTupleId()
void substitute(ExprSubstitutionMap smap, Analyzer analyzer)
Expr createCountDistinctAggExprParam(int firstIdx, int lastIdx, ArrayList< SlotDescriptor > slots)
ArrayList< Integer > materializedSlots_