Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
AnalyticPlanner.java
Go to the documentation of this file.
1 // Copyright 2014 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.planner;
16 
17 import java.util.Collections;
18 import java.util.Comparator;
19 import java.util.List;
20 
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23 
44 import com.cloudera.impala.thrift.TPartitionType;
45 import com.google.common.base.Preconditions;
46 import com.google.common.collect.Lists;
47 
48 
68 public class AnalyticPlanner {
69  private final static Logger LOG = LoggerFactory.getLogger(AnalyticPlanner.class);
70 
71  // List of tuple ids materialized by the originating SelectStmt (i.e., what is returned
72  // by SelectStmt.getMaterializedTupleIds()). During analysis, conjuncts have been
73  // registered against these tuples, so we need them to find unassigned conjuncts during
74  // plan generation.
75  // If the size of this list is 1 it means the stmt has a sort after the analytics.
76  // Otherwise, the last tuple id must be analyticInfo_.getOutputTupleDesc().
77  private final List<TupleId> stmtTupleIds_;
78 
79  private final AnalyticInfo analyticInfo_;
80  private final Analyzer analyzer_;
81  private final PlannerContext ctx_;
82 
83  public AnalyticPlanner(List<TupleId> stmtTupleIds,
84  AnalyticInfo analyticInfo, Analyzer analyzer,
85  PlannerContext ctx) {
86  Preconditions.checkState(!stmtTupleIds.isEmpty());
87  TupleId lastStmtTupleId = stmtTupleIds.get(stmtTupleIds.size() - 1);
88  Preconditions.checkState(stmtTupleIds.size() == 1 ||
89  lastStmtTupleId.equals(analyticInfo.getOutputTupleId()));
90  stmtTupleIds_ = stmtTupleIds;
91  analyticInfo_ = analyticInfo;
92  analyzer_ = analyzer;
93  ctx_ = ctx;
94  }
95 
110  List<Expr> groupingExprs, List<Expr> inputPartitionExprs) throws ImpalaException {
111  List<WindowGroup> windowGroups = collectWindowGroups();
112  for (int i = 0; i < windowGroups.size(); ++i) {
113  windowGroups.get(i).init(analyzer_, "wg-" + i);
114  }
115  List<SortGroup> sortGroups = collectSortGroups(windowGroups);
116  mergeSortGroups(sortGroups);
117  for (SortGroup g: sortGroups) {
118  g.init();
119  }
120  List<PartitionGroup> partitionGroups = collectPartitionGroups(sortGroups);
121  mergePartitionGroups(partitionGroups, root.getNumNodes());
122  orderGroups(partitionGroups);
123  if (groupingExprs != null) {
124  Preconditions.checkNotNull(inputPartitionExprs);
126  partitionGroups, groupingExprs, root.getNumNodes(), inputPartitionExprs);
127  }
128 
129  for (PartitionGroup partitionGroup: partitionGroups) {
130  for (int i = 0; i < partitionGroup.sortGroups.size(); ++i) {
131  root = createSortGroupPlan(root, partitionGroup.sortGroups.get(i),
132  i == 0 ? partitionGroup.partitionByExprs : null);
133  }
134  }
135 
136  // create equiv classes for newly added slots
137  analyzer_.createIdentityEquivClasses();
138  return root;
139  }
140 
145  private void mergeSortGroups(List<SortGroup> sortGroups) {
146  boolean hasMerged = false;
147  do {
148  hasMerged = false;
149  for (SortGroup sg1: sortGroups) {
150  for (SortGroup sg2: sortGroups) {
151  if (sg1 != sg2 && sg1.isPrefixOf(sg2)) {
152  sg1.absorb(sg2);
153  sortGroups.remove(sg2);
154  hasMerged = true;
155  break;
156  }
157  }
158  if (hasMerged) break;
159  }
160  } while (hasMerged);
161  }
162 
168  private void mergePartitionGroups(
169  List<PartitionGroup> partitionGroups, int numNodes) {
170  boolean hasMerged = false;
171  do {
172  hasMerged = false;
173  for (PartitionGroup pg1: partitionGroups) {
174  for (PartitionGroup pg2: partitionGroups) {
175  if (pg1 != pg2) {
176  long ndv = Expr.getNumDistinctValues(
177  Expr.intersect(pg1.partitionByExprs, pg2.partitionByExprs));
178  if (ndv == -1 || ndv < 0 || ndv < numNodes) {
179  // didn't get a usable value or the number of partitions is too small
180  continue;
181  }
182  pg1.merge(pg2);
183  partitionGroups.remove(pg2);
184  hasMerged = true;
185  break;
186  }
187  }
188  if (hasMerged) break;
189  }
190  } while (hasMerged);
191  }
192 
200  private void computeInputPartitionExprs(List<PartitionGroup> partitionGroups,
201  List<Expr> groupingExprs, int numNodes, List<Expr> inputPartitionExprs) {
202  inputPartitionExprs.clear();
203  // find partition group with maximum intersection
204  long maxNdv = 0;
205  PartitionGroup maxPg = null;
206  List<Expr> maxGroupingExprs = null;
207  for (PartitionGroup pg: partitionGroups) {
208  List<Expr> l1 = Lists.newArrayList();
209  List<Expr> l2 = Lists.newArrayList();
210  Expr.intersect(analyzer_, pg.partitionByExprs, groupingExprs,
211  analyzer_.getEquivClassSmap(), l1, l2);
212  // TODO: also look at l2 and take the max?
213  long ndv = Expr.getNumDistinctValues(l1);
214  if (ndv < 0 || ndv < numNodes || ndv < maxNdv) continue;
215  // found a better partition group
216  maxPg = pg;
217  maxPg.partitionByExprs = l1;
218  maxGroupingExprs = l2;
219  maxNdv = ndv;
220  }
221 
222  if (maxNdv > numNodes) {
223  Preconditions.checkNotNull(maxPg);
224  // we found a partition group that gives us enough parallelism;
225  // move it to the front
226  partitionGroups.remove(maxPg);
227  partitionGroups.add(0, maxPg);
228  inputPartitionExprs.addAll(maxGroupingExprs);
229  }
230  }
231 
238  private void orderGroups(List<PartitionGroup> partitionGroups) {
239  // remove the non-partitioning group from partitionGroups
240  PartitionGroup nonPartitioning = null;
241  for (PartitionGroup pg: partitionGroups) {
242  if (pg.partitionByExprs.isEmpty()) {
243  nonPartitioning = pg;
244  break;
245  }
246  }
247  if (nonPartitioning != null) partitionGroups.remove(nonPartitioning);
248 
249  // order by ascending combined output tuple size
250  Collections.sort(partitionGroups,
251  new Comparator<PartitionGroup>() {
252  public int compare(PartitionGroup pg1, PartitionGroup pg2) {
253  Preconditions.checkState(pg1.totalOutputTupleSize > 0);
254  Preconditions.checkState(pg2.totalOutputTupleSize > 0);
255  int diff = pg1.totalOutputTupleSize - pg2.totalOutputTupleSize;
256  return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
257  }
258  });
259  if (nonPartitioning != null) partitionGroups.add(nonPartitioning);
260 
261  for (PartitionGroup pg: partitionGroups) {
262  pg.orderSortGroups();
263  }
264  }
265 
271  PlanNode input, List<Expr> sortExprs, List<Boolean> isAsc,
272  List<Boolean> nullsFirst) {
273  // create tuple for sort output = the entire materialized input in a single tuple
274  TupleDescriptor sortTupleDesc =
275  analyzer_.getDescTbl().createTupleDescriptor("sort-tuple");
276  ExprSubstitutionMap sortSmap = new ExprSubstitutionMap();
277  List<Expr> sortSlotExprs = Lists.newArrayList();
278  sortTupleDesc.setIsMaterialized(true);
279  for (TupleId tid: input.getTupleIds()) {
280  TupleDescriptor tupleDesc = analyzer_.getTupleDesc(tid);
281  for (SlotDescriptor inputSlotDesc: tupleDesc.getSlots()) {
282  if (!inputSlotDesc.isMaterialized()) continue;
283  SlotDescriptor sortSlotDesc =
284  analyzer_.copySlotDescriptor(inputSlotDesc, sortTupleDesc);
285  // all output slots need to be materialized
286  sortSlotDesc.setIsMaterialized(true);
287  sortSmap.put(new SlotRef(inputSlotDesc), new SlotRef(sortSlotDesc));
288  sortSlotExprs.add(new SlotRef(inputSlotDesc));
289  }
290  }
291 
292  // Lhs exprs to be substituted in ancestor plan nodes could have a rhs that contains
293  // TupleIsNullPredicates. TupleIsNullPredicates require specific tuple ids for
294  // evaluation. Since this sort materializes a new tuple, it's impossible to evaluate
295  // TupleIsNullPredicates referring to this sort's input after this sort,
296  // To preserve the information whether an input tuple was null or not this sort node,
297  // we materialize those rhs TupleIsNullPredicates, which are then substituted
298  // by a SlotRef into the sort's tuple in ancestor nodes (IMPALA-1519).
299  ExprSubstitutionMap inputSmap = input.getOutputSmap();
300  if (inputSmap != null) {
301  List<Expr> tupleIsNullPredsToMaterialize = Lists.newArrayList();
302  for (int i = 0; i < inputSmap.size(); ++i) {
303  Expr rhsExpr = inputSmap.getRhs().get(i);
304  // Ignore substitutions that are irrelevant at this plan node and its ancestors.
305  if (!rhsExpr.isBoundByTupleIds(input.getTupleIds())) continue;
306  rhsExpr.collect(TupleIsNullPredicate.class, tupleIsNullPredsToMaterialize);
307  }
308  Expr.removeDuplicates(tupleIsNullPredsToMaterialize);
309 
310  // Materialize relevant unique TupleIsNullPredicates.
311  for (Expr tupleIsNullPred: tupleIsNullPredsToMaterialize) {
312  SlotDescriptor sortSlotDesc = analyzer_.addSlotDescriptor(sortTupleDesc);
313  sortSlotDesc.setType(tupleIsNullPred.getType());
314  sortSlotDesc.setIsMaterialized(true);
315  sortSlotDesc.setSourceExpr(tupleIsNullPred);
316  sortSlotDesc.setLabel(tupleIsNullPred.toSql());
317  sortSlotExprs.add(tupleIsNullPred.clone());
318  }
319  }
320 
321  SortInfo sortInfo = new SortInfo(
322  Expr.substituteList(sortExprs, sortSmap, analyzer_, false), isAsc, nullsFirst);
323  LOG.trace("sortinfo exprs: " + Expr.debugString(sortInfo.getOrderingExprs()));
324  sortInfo.setMaterializedTupleInfo(sortTupleDesc, sortSlotExprs);
325  return sortInfo;
326  }
327 
335  List<Expr> partitionExprs) throws ImpalaException {
336  List<Expr> partitionByExprs = sortGroup.partitionByExprs;
337  List<OrderByElement> orderByElements = sortGroup.orderByElements;
338  ExprSubstitutionMap sortSmap = null;
339  TupleId sortTupleId = null;
340  TupleDescriptor bufferedTupleDesc = null;
341  // map from input to buffered tuple
342  ExprSubstitutionMap bufferedSmap = new ExprSubstitutionMap();
343 
344  // sort on partition by (pb) + order by (ob) exprs and create pb/ob predicates
345  if (!partitionByExprs.isEmpty() || !orderByElements.isEmpty()) {
346  // first sort on partitionExprs (direction doesn't matter)
347  List<Expr> sortExprs = Lists.newArrayList(partitionByExprs);
348  List<Boolean> isAsc =
349  Lists.newArrayList(Collections.nCopies(sortExprs.size(), new Boolean(true)));
350  // TODO: utilize a direction and nulls/first last that has benefit
351  // for subsequent sort groups
352  List<Boolean> nullsFirst =
353  Lists.newArrayList(Collections.nCopies(sortExprs.size(), new Boolean(true)));
354 
355  // then sort on orderByExprs
356  for (OrderByElement orderByElement: sortGroup.orderByElements) {
357  sortExprs.add(orderByElement.getExpr());
358  isAsc.add(orderByElement.isAsc());
359  nullsFirst.add(orderByElement.getNullsFirstParam());
360  }
361 
362  SortInfo sortInfo = createSortInfo(root, sortExprs, isAsc, nullsFirst);
363  SortNode sortNode = new SortNode(ctx_.getNextNodeId(), root, sortInfo, false, 0);
364 
365  // if this sort group does not have partitioning exprs, we want the sort
366  // to be executed like a regular distributed sort
367  if (!partitionByExprs.isEmpty()) sortNode.setIsAnalyticSort(true);
368 
369  if (partitionExprs != null) {
370  // create required input partition
372  if (!partitionExprs.isEmpty()) {
373  inputPartition =
374  new DataPartition(TPartitionType.HASH_PARTITIONED, partitionExprs);
375  }
376  sortNode.setInputPartition(inputPartition);
377  }
378 
379  root = sortNode;
380  root.init(analyzer_);
381  sortSmap = sortNode.getOutputSmap();
382 
383  // create bufferedTupleDesc and bufferedSmap
384  sortTupleId = sortNode.tupleIds_.get(0);
385  bufferedTupleDesc =
386  analyzer_.getDescTbl().copyTupleDescriptor(sortTupleId, "buffered-tuple");
387  LOG.trace("desctbl: " + analyzer_.getDescTbl().debugString());
388 
389  List<SlotDescriptor> inputSlots = analyzer_.getTupleDesc(sortTupleId).getSlots();
390  List<SlotDescriptor> bufferedSlots = bufferedTupleDesc.getSlots();
391  for (int i = 0; i < inputSlots.size(); ++i) {
392  bufferedSmap.put(
393  new SlotRef(inputSlots.get(i)), new SlotRef(bufferedSlots.get(i)));
394  }
395  }
396 
397  // create one AnalyticEvalNode per window group
398  for (WindowGroup windowGroup: sortGroup.windowGroups) {
399  // Create partition-by (pb) and order-by (ob) less-than predicates between the
400  // input tuple (the output of the preceding sort) and a buffered tuple that is
401  // identical to the input tuple. We need a different tuple descriptor for the
402  // buffered tuple because the generated predicates should compare two different
403  // tuple instances from the same input stream (i.e., the predicates should be
404  // evaluated over a row that is composed of the input and the buffered tuple).
405 
406  // we need to remap the pb/ob exprs to a) the sort output, b) our buffer of the
407  // sort input
408  Expr partitionByEq = null;
409  if (!windowGroup.partitionByExprs.isEmpty()) {
410  partitionByEq = createNullMatchingEquals(
411  Expr.substituteList(windowGroup.partitionByExprs, sortSmap, analyzer_, false),
412  sortTupleId, bufferedSmap);
413  LOG.trace("partitionByEq: " + partitionByEq.debugString());
414  }
415  Expr orderByEq = null;
416  if (!windowGroup.orderByElements.isEmpty()) {
417  orderByEq = createNullMatchingEquals(
419  windowGroup.orderByElements, sortSmap, analyzer_)),
420  sortTupleId, bufferedSmap);
421  LOG.trace("orderByEq: " + orderByEq.debugString());
422  }
423 
424  root = new AnalyticEvalNode(ctx_.getNextNodeId(), root, stmtTupleIds_,
425  windowGroup.analyticFnCalls, windowGroup.partitionByExprs,
426  windowGroup.orderByElements,
427  windowGroup.window, analyticInfo_.getOutputTupleDesc(),
428  windowGroup.physicalIntermediateTuple,
429  windowGroup.physicalOutputTuple, windowGroup.logicalToPhysicalSmap,
430  partitionByEq, orderByEq, bufferedTupleDesc);
431  root.init(analyzer_);
432  }
433  return root;
434  }
435 
439  private Expr createNullMatchingEquals(List<Expr> exprs, TupleId inputTid,
440  ExprSubstitutionMap bufferedSmap) {
441  Preconditions.checkState(!exprs.isEmpty());
442  Expr result = createNullMatchingEqualsAux(exprs, 0, inputTid, bufferedSmap);
443  result.analyzeNoThrow(analyzer_);
444  return result;
445  }
446 
456  private Expr createNullMatchingEqualsAux(List<Expr> elements, int i,
457  TupleId inputTid, ExprSubstitutionMap bufferedSmap) {
458  if (i > elements.size() - 1) return new BoolLiteral(true);
459 
460  // compare elements[i]
461  Expr lhs = elements.get(i);
462  Preconditions.checkState(lhs.isBound(inputTid));
463  Expr rhs = lhs.substitute(bufferedSmap, analyzer_, false);
464 
465  Expr bothNull = new CompoundPredicate(Operator.AND,
466  new IsNullPredicate(lhs, false), new IsNullPredicate(rhs, false));
467  Expr lhsEqRhsNotNull = new CompoundPredicate(Operator.AND,
468  new CompoundPredicate(Operator.AND,
469  new IsNullPredicate(lhs, true), new IsNullPredicate(rhs, true)),
470  new BinaryPredicate(BinaryPredicate.Operator.EQ, lhs, rhs));
471  Expr remainder = createNullMatchingEqualsAux(elements, i + 1, inputTid, bufferedSmap);
472  return new CompoundPredicate(CompoundPredicate.Operator.AND,
473  new CompoundPredicate(Operator.OR, bothNull, lhsEqRhsNotNull),
474  remainder);
475  }
476 
481  private static class WindowGroup {
482  public final List<Expr> partitionByExprs;
483  public final List<OrderByElement> orderByElements;
484  public final AnalyticWindow window; // not null
485 
486  // Analytic exprs belonging to this window group and their corresponding logical
487  // intermediate and output slots from AnalyticInfo.intermediateTupleDesc_
488  // and AnalyticInfo.outputTupleDesc_.
489  public final List<AnalyticExpr> analyticExprs = Lists.newArrayList();
490  // Result of getFnCall() for every analytic expr.
491  public final List<Expr> analyticFnCalls = Lists.newArrayList();
492  public final List<SlotDescriptor> logicalOutputSlots = Lists.newArrayList();
493  public final List<SlotDescriptor> logicalIntermediateSlots = Lists.newArrayList();
494 
495  // Physical output and intermediate tuples as well as an smap that maps the
496  // corresponding logical output slots to their physical slots in physicalOutputTuple.
497  // Set in init().
501 
502  public WindowGroup(AnalyticExpr analyticExpr, SlotDescriptor logicalOutputSlot,
503  SlotDescriptor logicalIntermediateSlot) {
504  partitionByExprs = analyticExpr.getPartitionExprs();
505  orderByElements = analyticExpr.getOrderByElements();
506  window = analyticExpr.getWindow();
507  analyticExprs.add(analyticExpr);
508  analyticFnCalls.add(analyticExpr.getFnCall());
509  logicalOutputSlots.add(logicalOutputSlot);
510  logicalIntermediateSlots.add(logicalIntermediateSlot);
511  }
512 
516  private static boolean requiresIndependentEval(AnalyticExpr analyticExpr) {
517  return analyticExpr.getFnCall().getFnName().getFunction().equals(
519  }
520 
525  public boolean isCompatible(AnalyticExpr analyticExpr) {
527  requiresIndependentEval(analyticExpr)) {
528  return false;
529  }
530 
531  if (!Expr.equalSets(analyticExpr.getPartitionExprs(), partitionByExprs)) {
532  return false;
533  }
534  if (!analyticExpr.getOrderByElements().equals(orderByElements)) return false;
535  if ((window == null) != (analyticExpr.getWindow() == null)) return false;
536  if (window == null) return true;
537  return analyticExpr.getWindow().equals(window);
538  }
539 
544  public void add(AnalyticExpr analyticExpr, SlotDescriptor logicalOutputSlot,
545  SlotDescriptor logicalIntermediateSlot) {
546  Preconditions.checkState(isCompatible(analyticExpr));
547  analyticExprs.add(analyticExpr);
548  analyticFnCalls.add(analyticExpr.getFnCall());
549  logicalOutputSlots.add(logicalOutputSlot);
550  logicalIntermediateSlots.add(logicalIntermediateSlot);
551  }
552 
558  public void init(Analyzer analyzer, String tupleName) {
559  Preconditions.checkState(physicalOutputTuple == null);
560  Preconditions.checkState(physicalIntermediateTuple == null);
561  Preconditions.checkState(analyticFnCalls.size() == analyticExprs.size());
562 
563  // If needed, create the intermediate tuple first to maintain
564  // intermediateTupleId < outputTupleId for debugging purposes and consistency with
565  // tuple creation for aggregations.
566  boolean requiresIntermediateTuple =
567  AggregateInfoBase.requiresIntermediateTuple(analyticFnCalls);
568  if (requiresIntermediateTuple) {
569  physicalIntermediateTuple =
570  analyzer.getDescTbl().createTupleDescriptor(tupleName + "intermed");
571  physicalOutputTuple =
572  analyzer.getDescTbl().createTupleDescriptor(tupleName + "out");
573  } else {
574  physicalOutputTuple =
575  analyzer.getDescTbl().createTupleDescriptor(tupleName + "out");
576  physicalIntermediateTuple = physicalOutputTuple;
577  }
578 
579  Preconditions.checkState(analyticExprs.size() == logicalIntermediateSlots.size());
580  Preconditions.checkState(analyticExprs.size() == logicalOutputSlots.size());
581  for (int i = 0; i < analyticExprs.size(); ++i) {
582  SlotDescriptor logicalOutputSlot = logicalOutputSlots.get(i);
583  SlotDescriptor physicalOutputSlot =
584  analyzer.copySlotDescriptor(logicalOutputSlot, physicalOutputTuple);
585  physicalOutputSlot.setIsMaterialized(true);
586  if (requiresIntermediateTuple) {
587  SlotDescriptor logicalIntermediateSlot = logicalIntermediateSlots.get(i);
588  SlotDescriptor physicalIntermediateSlot = analyzer.copySlotDescriptor(
589  logicalIntermediateSlot, physicalIntermediateTuple);
590  physicalIntermediateSlot.setIsMaterialized(true);
591  }
592  logicalToPhysicalSmap.put(
593  new SlotRef(logicalOutputSlot), new SlotRef(physicalOutputSlot));
594  }
595  physicalOutputTuple.computeMemLayout();
596  if (requiresIntermediateTuple) physicalIntermediateTuple.computeMemLayout();
597  }
598  }
599 
603  private List<WindowGroup> collectWindowGroups() {
604  List<Expr> analyticExprs = analyticInfo_.getAnalyticExprs();
605  List<WindowGroup> groups = Lists.newArrayList();
606  for (int i = 0; i < analyticExprs.size(); ++i) {
607  AnalyticExpr analyticExpr = (AnalyticExpr) analyticExprs.get(i);
608  // Do not generate the plan for non-materialized analytic exprs.
609  if (!analyticInfo_.getOutputTupleDesc().getSlots().get(i).isMaterialized()) {
610  continue;
611  }
612  boolean match = false;
613  for (WindowGroup group: groups) {
614  if (group.isCompatible(analyticExpr)) {
615  group.add((AnalyticExpr) analyticInfo_.getAnalyticExprs().get(i),
616  analyticInfo_.getOutputTupleDesc().getSlots().get(i),
617  analyticInfo_.getIntermediateTupleDesc().getSlots().get(i));
618  match = true;
619  break;
620  }
621  }
622  if (!match) {
623  groups.add(new WindowGroup(
625  analyticInfo_.getOutputTupleDesc().getSlots().get(i),
626  analyticInfo_.getIntermediateTupleDesc().getSlots().get(i)));
627  }
628  }
629  return groups;
630  }
631 
636  private static class SortGroup {
637  public List<Expr> partitionByExprs;
638  public List<OrderByElement> orderByElements;
639  public List<WindowGroup> windowGroups = Lists.newArrayList();
640 
641  // sum of windowGroups.physicalOutputTuple.getByteSize()
642  public int totalOutputTupleSize = -1;
643 
644  public SortGroup(WindowGroup windowGroup) {
645  partitionByExprs = windowGroup.partitionByExprs;
646  orderByElements = windowGroup.orderByElements;
647  windowGroups.add(windowGroup);
648  }
649 
653  public boolean isCompatible(WindowGroup windowGroup) {
654  return Expr.equalSets(windowGroup.partitionByExprs, partitionByExprs)
655  && windowGroup.orderByElements.equals(orderByElements);
656  }
657 
658  public void add(WindowGroup windowGroup) {
659  Preconditions.checkState(isCompatible(windowGroup));
660  windowGroups.add(windowGroup);
661  }
662 
667  public boolean isPrefixOf(SortGroup other) {
668  if (other.orderByElements.size() > orderByElements.size()) return false;
669  if (!Expr.equalSets(partitionByExprs, other.partitionByExprs)) return false;
670  for (int i = 0; i < other.orderByElements.size(); ++i) {
671  OrderByElement ob = orderByElements.get(i);
672  OrderByElement otherOb = other.orderByElements.get(i);
673  // TODO: compare equiv classes by comparing each equiv class's placeholder
674  // slotref
675  if (!ob.getExpr().equals(otherOb.getExpr())) return false;
676  if (ob.isAsc() != otherOb.isAsc()) return false;
677  if (ob.nullsFirst() != otherOb.nullsFirst()) return false;
678  }
679  return true;
680  }
681 
685  public void absorb(SortGroup other) {
686  Preconditions.checkState(isPrefixOf(other));
687  windowGroups.addAll(other.windowGroups);
688  }
689 
693  public void init() {
695  for (WindowGroup g: windowGroups) {
696  TupleDescriptor outputTuple = g.physicalOutputTuple;
697  Preconditions.checkState(outputTuple.isMaterialized());
698  Preconditions.checkState(outputTuple.getByteSize() != -1);
699  totalOutputTupleSize += outputTuple.getByteSize();
700  }
701  }
702 
703  private static class SizeLt implements Comparator<WindowGroup> {
704  public int compare(WindowGroup wg1, WindowGroup wg2) {
705  Preconditions.checkState(wg1.physicalOutputTuple != null
706  && wg1.physicalOutputTuple.getByteSize() != -1);
707  Preconditions.checkState(wg2.physicalOutputTuple != null
708  && wg2.physicalOutputTuple.getByteSize() != -1);
709  int diff = wg1.physicalOutputTuple.getByteSize()
711  return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
712  }
713  }
714 
715  private static final SizeLt SIZE_LT;
716  static {
717  SIZE_LT = new SizeLt();
718  }
719 
724  public void orderWindowGroups() {
725  Collections.sort(windowGroups, SIZE_LT);
726  }
727  }
728 
732  private List<SortGroup> collectSortGroups(List<WindowGroup> windowGroups) {
733  List<SortGroup> sortGroups = Lists.newArrayList();
734  for (WindowGroup windowGroup: windowGroups) {
735  boolean match = false;
736  for (SortGroup sortGroup: sortGroups) {
737  if (sortGroup.isCompatible(windowGroup)) {
738  sortGroup.add(windowGroup);
739  match = true;
740  break;
741  }
742  }
743  if (!match) sortGroups.add(new SortGroup(windowGroup));
744  }
745  return sortGroups;
746  }
747 
751  private static class PartitionGroup {
752  public List<Expr> partitionByExprs;
753  public List<SortGroup> sortGroups = Lists.newArrayList();
754 
755  // sum of sortGroups.windowGroups.physicalOutputTuple.getByteSize()
756  public int totalOutputTupleSize = -1;
757 
758  public PartitionGroup(SortGroup sortGroup) {
759  partitionByExprs = sortGroup.partitionByExprs;
760  sortGroups.add(sortGroup);
761  totalOutputTupleSize = sortGroup.totalOutputTupleSize;
762  }
763 
768  public boolean isCompatible(SortGroup sortGroup) {
769  return Expr.equalSets(sortGroup.partitionByExprs, partitionByExprs);
770  }
771 
772  public void add(SortGroup sortGroup) {
773  Preconditions.checkState(isCompatible(sortGroup));
774  sortGroups.add(sortGroup);
775  totalOutputTupleSize += sortGroup.totalOutputTupleSize;
776  }
777 
783  public void merge(PartitionGroup other) {
784  partitionByExprs = Expr.intersect(partitionByExprs, other.partitionByExprs);
785  Preconditions.checkState(Expr.getNumDistinctValues(partitionByExprs) >= 0);
786  sortGroups.addAll(other.sortGroups);
787  }
788 
793  public void orderSortGroups() {
794  Collections.sort(sortGroups,
795  new Comparator<SortGroup>() {
796  public int compare(SortGroup sg1, SortGroup sg2) {
797  Preconditions.checkState(sg1.totalOutputTupleSize > 0);
798  Preconditions.checkState(sg2.totalOutputTupleSize > 0);
799  int diff = sg1.totalOutputTupleSize - sg2.totalOutputTupleSize;
800  return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
801  }
802  });
803  for (SortGroup sortGroup: sortGroups) {
804  sortGroup.orderWindowGroups();
805  }
806  }
807  }
808 
812  private List<PartitionGroup> collectPartitionGroups(List<SortGroup> sortGroups) {
813  List<PartitionGroup> partitionGroups = Lists.newArrayList();
814  for (SortGroup sortGroup: sortGroups) {
815  boolean match = false;
816  for (PartitionGroup partitionGroup: partitionGroups) {
817  if (partitionGroup.isCompatible(sortGroup)) {
818  partitionGroup.add(sortGroup);
819  match = true;
820  break;
821  }
822  }
823  if (!match) partitionGroups.add(new PartitionGroup(sortGroup));
824  }
825  return partitionGroups;
826  }
827 }
void mergeSortGroups(List< SortGroup > sortGroups)
static final DataPartition UNPARTITIONED
Expr createNullMatchingEquals(List< Expr > exprs, TupleId inputTid, ExprSubstitutionMap bufferedSmap)
PlanNode createSingleNodePlan(PlanNode root, List< Expr > groupingExprs, List< Expr > inputPartitionExprs)
void mergePartitionGroups(List< PartitionGroup > partitionGroups, int numNodes)
void add(AnalyticExpr analyticExpr, SlotDescriptor logicalOutputSlot, SlotDescriptor logicalIntermediateSlot)
List< PartitionGroup > collectPartitionGroups(List< SortGroup > sortGroups)
static List< Expr > getOrderByExprs(List< OrderByElement > src)
int TupleId
Definition: global-types.h:23
static ArrayList< Expr > substituteList(Iterable<?extends Expr > exprs, ExprSubstitutionMap smap, Analyzer analyzer, boolean preserveRootTypes)
Definition: Expr.java:730
SortInfo createSortInfo(PlanNode input, List< Expr > sortExprs, List< Boolean > isAsc, List< Boolean > nullsFirst)
AnalyticPlanner(List< TupleId > stmtTupleIds, AnalyticInfo analyticInfo, Analyzer analyzer, PlannerContext ctx)
List< OrderByElement > getOrderByElements()
Expr createNullMatchingEqualsAux(List< Expr > elements, int i, TupleId inputTid, ExprSubstitutionMap bufferedSmap)
ArrayList< TupleId > getTupleIds()
Definition: PlanNode.java:196
static List< OrderByElement > substitute(List< OrderByElement > src, ExprSubstitutionMap smap, Analyzer analyzer)
void orderGroups(List< PartitionGroup > partitionGroups)
void init(Analyzer analyzer, String tupleName)
List< SortGroup > collectSortGroups(List< WindowGroup > windowGroups)
WindowGroup(AnalyticExpr analyticExpr, SlotDescriptor logicalOutputSlot, SlotDescriptor logicalIntermediateSlot)
static< CextendsExpr > boolean equalSets(List< C > l1, List< C > l2)
Definition: Expr.java:598
void computeInputPartitionExprs(List< PartitionGroup > partitionGroups, List< Expr > groupingExprs, int numNodes, List< Expr > inputPartitionExprs)
boolean isBoundByTupleIds(List< TupleId > tids)
Definition: Expr.java:852
static boolean requiresIndependentEval(AnalyticExpr analyticExpr)
PlanNode createSortGroupPlan(PlanNode root, SortGroup sortGroup, List< Expr > partitionExprs)