15 package com.cloudera.impala.planner;
17 import java.util.Collections;
18 import java.util.Comparator;
19 import java.util.List;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
44 import com.cloudera.impala.thrift.TPartitionType;
45 import com.google.common.base.Preconditions;
46 import com.google.common.collect.Lists;
69 private final static Logger
LOG = LoggerFactory.getLogger(AnalyticPlanner.class);
86 Preconditions.checkState(!stmtTupleIds.isEmpty());
87 TupleId lastStmtTupleId = stmtTupleIds.get(stmtTupleIds.size() - 1);
88 Preconditions.checkState(stmtTupleIds.size() == 1 ||
110 List<Expr> groupingExprs, List<Expr> inputPartitionExprs)
throws ImpalaException {
112 for (
int i = 0; i < windowGroups.size(); ++i) {
113 windowGroups.get(i).init(
analyzer_,
"wg-" + i);
123 if (groupingExprs != null) {
124 Preconditions.checkNotNull(inputPartitionExprs);
126 partitionGroups, groupingExprs, root.getNumNodes(), inputPartitionExprs);
130 for (
int i = 0; i < partitionGroup.sortGroups.size(); ++i) {
132 i == 0 ? partitionGroup.partitionByExprs : null);
137 analyzer_.createIdentityEquivClasses();
146 boolean hasMerged =
false;
151 if (sg1 != sg2 && sg1.isPrefixOf(sg2)) {
153 sortGroups.remove(sg2);
158 if (hasMerged)
break;
169 List<PartitionGroup> partitionGroups,
int numNodes) {
170 boolean hasMerged =
false;
176 long ndv = Expr.getNumDistinctValues(
177 Expr.intersect(pg1.partitionByExprs, pg2.partitionByExprs));
178 if (ndv == -1 || ndv < 0 || ndv < numNodes) {
183 partitionGroups.remove(pg2);
188 if (hasMerged)
break;
201 List<Expr> groupingExprs,
int numNodes, List<Expr> inputPartitionExprs) {
202 inputPartitionExprs.clear();
206 List<Expr> maxGroupingExprs = null;
208 List<Expr> l1 = Lists.newArrayList();
209 List<Expr> l2 = Lists.newArrayList();
210 Expr.intersect(
analyzer_, pg.partitionByExprs, groupingExprs,
211 analyzer_.getEquivClassSmap(), l1, l2);
213 long ndv = Expr.getNumDistinctValues(l1);
214 if (ndv < 0 || ndv < numNodes || ndv < maxNdv)
continue;
217 maxPg.partitionByExprs = l1;
218 maxGroupingExprs = l2;
222 if (maxNdv > numNodes) {
223 Preconditions.checkNotNull(maxPg);
226 partitionGroups.remove(maxPg);
227 partitionGroups.add(0, maxPg);
228 inputPartitionExprs.addAll(maxGroupingExprs);
242 if (pg.partitionByExprs.isEmpty()) {
243 nonPartitioning = pg;
247 if (nonPartitioning != null) partitionGroups.remove(nonPartitioning);
250 Collections.sort(partitionGroups,
251 new Comparator<PartitionGroup>() {
253 Preconditions.checkState(pg1.totalOutputTupleSize > 0);
254 Preconditions.checkState(pg2.totalOutputTupleSize > 0);
255 int diff = pg1.totalOutputTupleSize - pg2.totalOutputTupleSize;
256 return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
259 if (nonPartitioning != null) partitionGroups.add(nonPartitioning);
262 pg.orderSortGroups();
271 PlanNode input, List<Expr> sortExprs, List<Boolean> isAsc,
272 List<Boolean> nullsFirst) {
275 analyzer_.getDescTbl().createTupleDescriptor(
"sort-tuple");
277 List<Expr> sortSlotExprs = Lists.newArrayList();
278 sortTupleDesc.setIsMaterialized(
true);
282 if (!inputSlotDesc.isMaterialized())
continue;
284 analyzer_.copySlotDescriptor(inputSlotDesc, sortTupleDesc);
286 sortSlotDesc.setIsMaterialized(
true);
287 sortSmap.put(
new SlotRef(inputSlotDesc),
new SlotRef(sortSlotDesc));
288 sortSlotExprs.add(
new SlotRef(inputSlotDesc));
300 if (inputSmap != null) {
301 List<Expr> tupleIsNullPredsToMaterialize = Lists.newArrayList();
302 for (
int i = 0; i < inputSmap.size(); ++i) {
303 Expr rhsExpr = inputSmap.getRhs().
get(i);
306 rhsExpr.collect(TupleIsNullPredicate.class, tupleIsNullPredsToMaterialize);
308 Expr.removeDuplicates(tupleIsNullPredsToMaterialize);
311 for (
Expr tupleIsNullPred: tupleIsNullPredsToMaterialize) {
312 SlotDescriptor sortSlotDesc = analyzer_.addSlotDescriptor(sortTupleDesc);
313 sortSlotDesc.setType(tupleIsNullPred.getType());
314 sortSlotDesc.setIsMaterialized(
true);
315 sortSlotDesc.setSourceExpr(tupleIsNullPred);
316 sortSlotDesc.setLabel(tupleIsNullPred.toSql());
317 sortSlotExprs.add(tupleIsNullPred.clone());
323 LOG.trace(
"sortinfo exprs: " + Expr.debugString(sortInfo.getOrderingExprs()));
324 sortInfo.setMaterializedTupleInfo(sortTupleDesc, sortSlotExprs);
336 List<Expr> partitionByExprs = sortGroup.partitionByExprs;
337 List<OrderByElement> orderByElements = sortGroup.orderByElements;
345 if (!partitionByExprs.isEmpty() || !orderByElements.isEmpty()) {
347 List<Expr> sortExprs = Lists.newArrayList(partitionByExprs);
348 List<Boolean> isAsc =
349 Lists.newArrayList(Collections.nCopies(sortExprs.size(),
new Boolean(
true)));
352 List<Boolean> nullsFirst =
353 Lists.newArrayList(Collections.nCopies(sortExprs.size(),
new Boolean(
true)));
357 sortExprs.add(orderByElement.getExpr());
358 isAsc.add(orderByElement.isAsc());
359 nullsFirst.add(orderByElement.getNullsFirstParam());
369 if (partitionExprs != null) {
372 if (!partitionExprs.isEmpty()) {
374 new DataPartition(TPartitionType.HASH_PARTITIONED, partitionExprs);
376 sortNode.setInputPartition(inputPartition);
381 sortSmap = sortNode.getOutputSmap();
384 sortTupleId = sortNode.tupleIds_.get(0);
386 analyzer_.getDescTbl().copyTupleDescriptor(sortTupleId,
"buffered-tuple");
387 LOG.trace(
"desctbl: " + analyzer_.getDescTbl().debugString());
389 List<SlotDescriptor> inputSlots = analyzer_.getTupleDesc(sortTupleId).getSlots();
390 List<SlotDescriptor> bufferedSlots = bufferedTupleDesc.getSlots();
391 for (
int i = 0; i < inputSlots.size(); ++i) {
393 new SlotRef(inputSlots.get(i)),
new SlotRef(bufferedSlots.get(i)));
398 for (
WindowGroup windowGroup: sortGroup.windowGroups) {
408 Expr partitionByEq = null;
412 sortTupleId, bufferedSmap);
413 LOG.trace(
"partitionByEq: " + partitionByEq.debugString());
415 Expr orderByEq = null;
420 sortTupleId, bufferedSmap);
421 LOG.trace(
"orderByEq: " + orderByEq.debugString());
425 windowGroup.analyticFnCalls, windowGroup.partitionByExprs,
426 windowGroup.orderByElements,
427 windowGroup.window, analyticInfo_.getOutputTupleDesc(),
430 partitionByEq, orderByEq, bufferedTupleDesc);
441 Preconditions.checkState(!exprs.isEmpty());
458 if (i > elements.size() - 1)
return new BoolLiteral(
true);
461 Expr lhs = elements.get(i);
462 Preconditions.checkState(lhs.isBound(inputTid));
506 window = analyticExpr.getWindow();
507 analyticExprs.add(analyticExpr);
508 analyticFnCalls.add(analyticExpr.getFnCall());
509 logicalOutputSlots.add(logicalOutputSlot);
510 logicalIntermediateSlots.add(logicalIntermediateSlot);
517 return analyticExpr.getFnCall().getFnName().getFunction().equals(
535 if ((
window == null) != (analyticExpr.getWindow() == null))
return false;
536 if (
window == null)
return true;
537 return analyticExpr.getWindow().equals(
window);
547 analyticExprs.add(analyticExpr);
548 analyticFnCalls.add(analyticExpr.getFnCall());
549 logicalOutputSlots.add(logicalOutputSlot);
550 logicalIntermediateSlots.add(logicalIntermediateSlot);
561 Preconditions.checkState(analyticFnCalls.size() ==
analyticExprs.size());
566 boolean requiresIntermediateTuple =
568 if (requiresIntermediateTuple) {
569 physicalIntermediateTuple =
570 analyzer.getDescTbl().createTupleDescriptor(tupleName +
"intermed");
571 physicalOutputTuple =
572 analyzer.getDescTbl().createTupleDescriptor(tupleName +
"out");
574 physicalOutputTuple =
575 analyzer.getDescTbl().createTupleDescriptor(tupleName +
"out");
581 for (
int i = 0; i < analyticExprs.size(); ++i) {
585 physicalOutputSlot.setIsMaterialized(
true);
586 if (requiresIntermediateTuple) {
587 SlotDescriptor logicalIntermediateSlot = logicalIntermediateSlots.get(i);
588 SlotDescriptor physicalIntermediateSlot = analyzer.copySlotDescriptor(
590 physicalIntermediateSlot.setIsMaterialized(
true);
592 logicalToPhysicalSmap.put(
595 physicalOutputTuple.computeMemLayout();
596 if (requiresIntermediateTuple) physicalIntermediateTuple.computeMemLayout();
604 List<Expr> analyticExprs = analyticInfo_.getAnalyticExprs();
605 List<WindowGroup> groups = Lists.newArrayList();
606 for (
int i = 0; i < analyticExprs.size(); ++i) {
612 boolean match =
false;
614 if (group.isCompatible(analyticExpr)) {
617 analyticInfo_.getIntermediateTupleDesc().getSlots().get(i));
626 analyticInfo_.getIntermediateTupleDesc().getSlots().get(i)));
647 windowGroups.add(windowGroup);
660 windowGroups.add(windowGroup);
668 if (other.
orderByElements.size() > orderByElements.size())
return false;
670 for (
int i = 0; i < other.orderByElements.size(); ++i) {
675 if (!ob.
getExpr().equals(otherOb.getExpr()))
return false;
676 if (ob.
isAsc() != otherOb.isAsc())
return false;
677 if (ob.
nullsFirst() != otherOb.nullsFirst())
return false;
687 windowGroups.addAll(other.windowGroups);
697 Preconditions.checkState(outputTuple.isMaterialized());
698 Preconditions.checkState(outputTuple.getByteSize() != -1);
703 private static class SizeLt implements Comparator<WindowGroup> {
705 Preconditions.checkState(wg1.physicalOutputTuple != null
706 && wg1.physicalOutputTuple.getByteSize() != -1);
707 Preconditions.checkState(wg2.physicalOutputTuple != null
708 && wg2.physicalOutputTuple.getByteSize() != -1);
709 int diff = wg1.physicalOutputTuple.getByteSize()
711 return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
733 List<SortGroup> sortGroups = Lists.newArrayList();
735 boolean match =
false;
737 if (sortGroup.isCompatible(windowGroup)) {
738 sortGroup.add(windowGroup);
743 if (!match) sortGroups.add(
new SortGroup(windowGroup));
760 sortGroups.add(sortGroup);
774 sortGroups.add(sortGroup);
786 sortGroups.addAll(other.sortGroups);
795 new Comparator<SortGroup>() {
797 Preconditions.checkState(sg1.totalOutputTupleSize > 0);
798 Preconditions.checkState(sg2.totalOutputTupleSize > 0);
799 int diff = sg1.totalOutputTupleSize - sg2.totalOutputTupleSize;
800 return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
804 sortGroup.orderWindowGroups();
813 List<PartitionGroup> partitionGroups = Lists.newArrayList();
815 boolean match =
false;
817 if (partitionGroup.isCompatible(sortGroup)) {
818 partitionGroup.add(sortGroup);
825 return partitionGroups;
boolean isCompatible(SortGroup sortGroup)
List< OrderByElement > orderByElements
List< Expr > partitionByExprs
static final SizeLt SIZE_LT
void mergeSortGroups(List< SortGroup > sortGroups)
ArrayList< Expr > getAnalyticExprs()
final List< AnalyticExpr > analyticExprs
final List< SlotDescriptor > logicalOutputSlots
final List< TupleId > stmtTupleIds_
PartitionGroup(SortGroup sortGroup)
static final DataPartition UNPARTITIONED
Expr createNullMatchingEquals(List< Expr > exprs, TupleId inputTid, ExprSubstitutionMap bufferedSmap)
final AnalyticWindow window
PlanNode createSingleNodePlan(PlanNode root, List< Expr > groupingExprs, List< Expr > inputPartitionExprs)
void mergePartitionGroups(List< PartitionGroup > partitionGroups, int numNodes)
final List< Expr > partitionByExprs
TupleId getOutputTupleId()
List< Expr > getPartitionExprs()
void add(AnalyticExpr analyticExpr, SlotDescriptor logicalOutputSlot, SlotDescriptor logicalIntermediateSlot)
List< PartitionGroup > collectPartitionGroups(List< SortGroup > sortGroups)
boolean isCompatible(AnalyticExpr analyticExpr)
static List< Expr > getOrderByExprs(List< OrderByElement > src)
final PlannerContext ctx_
static ArrayList< Expr > substituteList(Iterable<?extends Expr > exprs, ExprSubstitutionMap smap, Analyzer analyzer, boolean preserveRootTypes)
SortInfo createSortInfo(PlanNode input, List< Expr > sortExprs, List< Boolean > isAsc, List< Boolean > nullsFirst)
static String FIRST_VALUE_REWRITE
AnalyticPlanner(List< TupleId > stmtTupleIds, AnalyticInfo analyticInfo, Analyzer analyzer, PlannerContext ctx)
List< OrderByElement > getOrderByElements()
Expr createNullMatchingEqualsAux(List< Expr > elements, int i, TupleId inputTid, ExprSubstitutionMap bufferedSmap)
ArrayList< TupleId > getTupleIds()
List< SortGroup > sortGroups
void absorb(SortGroup other)
TupleDescriptor physicalOutputTuple
static List< OrderByElement > substitute(List< OrderByElement > src, ExprSubstitutionMap smap, Analyzer analyzer)
final List< SlotDescriptor > logicalIntermediateSlots
ArrayList< SlotDescriptor > getSlots()
void orderGroups(List< PartitionGroup > partitionGroups)
final AnalyticInfo analyticInfo_
void add(WindowGroup windowGroup)
boolean isPrefixOf(SortGroup other)
void init(Analyzer analyzer, String tupleName)
boolean isCompatible(WindowGroup windowGroup)
List< SortGroup > collectSortGroups(List< WindowGroup > windowGroups)
List< WindowGroup > collectWindowGroups()
void merge(PartitionGroup other)
WindowGroup(AnalyticExpr analyticExpr, SlotDescriptor logicalOutputSlot, SlotDescriptor logicalIntermediateSlot)
int compare(WindowGroup wg1, WindowGroup wg2)
List< Expr > partitionByExprs
final List< OrderByElement > orderByElements
PlanNodeId getNextNodeId()
static< CextendsExpr > boolean equalSets(List< C > l1, List< C > l2)
List< WindowGroup > windowGroups
final List< Expr > analyticFnCalls
final ExprSubstitutionMap logicalToPhysicalSmap
void computeInputPartitionExprs(List< PartitionGroup > partitionGroups, List< Expr > groupingExprs, int numNodes, List< Expr > inputPartitionExprs)
TupleDescriptor physicalIntermediateTuple
void setIsAnalyticSort(boolean v)
boolean isBoundByTupleIds(List< TupleId > tids)
static boolean requiresIndependentEval(AnalyticExpr analyticExpr)
TupleDescriptor getOutputTupleDesc()
void add(SortGroup sortGroup)
SortGroup(WindowGroup windowGroup)
PlanNode createSortGroupPlan(PlanNode root, SortGroup sortGroup, List< Expr > partitionExprs)