Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DistributedPlanner.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.planner;
16 
17 import java.util.ArrayList;
18 import java.util.List;
19 
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22 
33 import com.cloudera.impala.thrift.TPartitionType;
34 import com.google.common.base.Preconditions;
35 import com.google.common.collect.Lists;
36 
37 
42 public class DistributedPlanner {
43  private final static Logger LOG = LoggerFactory.getLogger(DistributedPlanner.class);
44 
45  private final PlannerContext ctx_;
46 
48  ctx_ = ctx;
49  }
50 
62  public ArrayList<PlanFragment> createPlanFragments(
63  PlanNode singleNodePlan) throws ImpalaException {
64  Preconditions.checkState(!ctx_.isSingleNodeExec());
65  AnalysisContext.AnalysisResult analysisResult = ctx_.getAnalysisResult();
66  QueryStmt queryStmt = ctx_.getQueryStmt();
67  ArrayList<PlanFragment> fragments = Lists.newArrayList();
68  // For inserts or CTAS, unless there is a limit, leave the root fragment
69  // partitioned, otherwise merge everything into a single coordinator fragment,
70  // so we can pass it back to the client.
71  boolean isPartitioned = false;
72  if ((analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt())
73  && !singleNodePlan.hasLimit()) {
74  Preconditions.checkState(!queryStmt.hasOffset());
75  isPartitioned = true;
76  }
77  LOG.debug("create plan fragments");
78  long perNodeMemLimit = ctx_.getQueryOptions().mem_limit;
79  LOG.debug("memlimit=" + Long.toString(perNodeMemLimit));
80  createPlanFragments(singleNodePlan, isPartitioned, perNodeMemLimit, fragments);
81  return fragments;
82  }
83 
94  PlanNode root, boolean isPartitioned,
95  long perNodeMemLimit, ArrayList<PlanFragment> fragments)
97  ArrayList<PlanFragment> childFragments = Lists.newArrayList();
98  for (PlanNode child: root.getChildren()) {
99  // allow child fragments to be partitioned, unless they contain a limit clause
100  // (the result set with the limit constraint needs to be computed centrally);
101  // merge later if needed
102  boolean childIsPartitioned = !child.hasLimit();
103  childFragments.add(
105  child, childIsPartitioned, perNodeMemLimit, fragments));
106  }
107 
108  PlanFragment result = null;
109  if (root instanceof ScanNode) {
110  result = createScanFragment(root);
111  fragments.add(result);
112  } else if (root instanceof HashJoinNode) {
113  Preconditions.checkState(childFragments.size() == 2);
114  result = createHashJoinFragment(
115  (HashJoinNode) root, childFragments.get(1), childFragments.get(0),
116  perNodeMemLimit, fragments);
117  } else if (root instanceof CrossJoinNode) {
118  Preconditions.checkState(childFragments.size() == 2);
119  result = createCrossJoinFragment(
120  (CrossJoinNode) root, childFragments.get(1), childFragments.get(0),
121  perNodeMemLimit, fragments);
122  } else if (root instanceof SelectNode) {
123  result = createSelectNodeFragment((SelectNode) root, childFragments);
124  } else if (root instanceof UnionNode) {
125  result = createUnionNodeFragment((UnionNode) root, childFragments, fragments);
126  } else if (root instanceof AggregationNode) {
127  result = createAggregationFragment(
128  (AggregationNode) root, childFragments.get(0), fragments);
129  } else if (root instanceof SortNode) {
130  if (((SortNode) root).isAnalyticSort()) {
131  // don't parallelize this like a regular SortNode
132  result = createAnalyticFragment(
133  (SortNode) root, childFragments.get(0), fragments);
134  } else {
135  result = createOrderByFragment(
136  (SortNode) root, childFragments.get(0), fragments);
137  }
138  } else if (root instanceof AnalyticEvalNode) {
139  result = createAnalyticFragment(root, childFragments.get(0), fragments);
140  } else if (root instanceof EmptySetNode) {
141  result = new PlanFragment(
143  } else {
144  throw new InternalException(
145  "Cannot create plan fragment for this node type: " + root.getExplainString());
146  }
147  // move 'result' to end, it depends on all of its children
148  fragments.remove(result);
149  fragments.add(result);
150 
151  if (!isPartitioned && result.isPartitioned()) {
152  result = createMergeFragment(result);
153  fragments.add(result);
154  }
155 
156  return result;
157  }
158 
163  private long getNumDistinctValues(List<Expr> exprs) {
164  long result = 1;
165  for (Expr expr: exprs) {
166  result *= expr.getNumDistinctValues();
167  if (result < 0) return -1;
168  }
169  return result;
170  }
171 
182  PlanFragment inputFragment, InsertStmt insertStmt, Analyzer analyzer,
183  ArrayList<PlanFragment> fragments)
184  throws InternalException {
185  List<Expr> partitionExprs = insertStmt.getPartitionKeyExprs();
186  Boolean partitionHint = insertStmt.isRepartition();
187  if (partitionExprs.isEmpty()) return inputFragment;
188  if (partitionHint != null && !partitionHint) return inputFragment;
189 
190  // we ignore constants for the sake of partitioning
191  List<Expr> nonConstPartitionExprs = Lists.newArrayList(partitionExprs);
192  Expr.removeConstants(nonConstPartitionExprs);
193  DataPartition inputPartition = inputFragment.getDataPartition();
194 
195  // do nothing if the input fragment is already appropriately partitioned
196  if (analyzer.equivSets(inputPartition.getPartitionExprs(),
197  nonConstPartitionExprs)) {
198  return inputFragment;
199  }
200 
201  // if the existing partition exprs are a subset of the table partition exprs, check
202  // if it is distributed across all nodes; if so, don't repartition
203  if (Expr.isSubset(inputPartition.getPartitionExprs(), nonConstPartitionExprs)) {
204  long numPartitions = getNumDistinctValues(inputPartition.getPartitionExprs());
205  if (numPartitions >= inputFragment.getNumNodes()) return inputFragment;
206  }
207 
208  // don't repartition if the resulting number of partitions is too low to get good
209  // parallelism
210  long numPartitions = getNumDistinctValues(nonConstPartitionExprs);
211 
212  // don't repartition if we know we have fewer partitions than nodes
213  // (ie, default to repartitioning if col stats are missing)
214  // TODO: we want to repartition if the resulting files would otherwise
215  // be very small (less than some reasonable multiple of the recommended block size);
216  // in order to do that, we need to come up with an estimate of the avg row size
217  // in the particular file format of the output table/partition.
218  // We should always know on how many nodes our input is running.
219  Preconditions.checkState(inputFragment.getNumNodes() != -1);
220  if (partitionHint == null && numPartitions > 0 &&
221  numPartitions <= inputFragment.getNumNodes()) {
222  return inputFragment;
223  }
224 
225  Preconditions.checkState(partitionHint == null || partitionHint);
226  ExchangeNode exchNode = new ExchangeNode(ctx_.getNextNodeId());
227  exchNode.addChild(inputFragment.getPlanRoot(), false);
228  exchNode.init(analyzer);
229  Preconditions.checkState(exchNode.hasValidStats());
230  DataPartition partition =
231  new DataPartition(TPartitionType.HASH_PARTITIONED, nonConstPartitionExprs);
232  PlanFragment fragment =
233  new PlanFragment(ctx_.getNextFragmentId(), exchNode, partition);
234  inputFragment.setDestination(exchNode);
235  inputFragment.setOutputPartition(partition);
236  fragments.add(fragment);
237  return fragment;
238  }
239 
246  throws InternalException {
247  Preconditions.checkState(inputFragment.isPartitioned());
248  ExchangeNode mergePlan = new ExchangeNode(ctx_.getNextNodeId());
249  mergePlan.addChild(inputFragment.getPlanRoot(), false);
250  mergePlan.init(ctx_.getRootAnalyzer());
251  Preconditions.checkState(mergePlan.hasValidStats());
252  PlanFragment fragment = new PlanFragment(ctx_.getNextFragmentId(), mergePlan,
254  inputFragment.setDestination(mergePlan);
255  return fragment;
256  }
257 
266  }
267 
274  PlanFragment rightChildFragment, PlanFragment leftChildFragment,
275  long perNodeMemLimit, ArrayList<PlanFragment> fragments)
276  throws InternalException {
277  node.setChild(0, leftChildFragment.getPlanRoot());
278  connectChildFragment(node, 1, rightChildFragment);
279  leftChildFragment.setPlanRoot(node);
280  return leftChildFragment;
281  }
282 
295  HashJoinNode node, PlanFragment rightChildFragment,
296  PlanFragment leftChildFragment, long perNodeMemLimit,
297  ArrayList<PlanFragment> fragments)
298  throws InternalException {
299  // broadcast: send the rightChildFragment's output to each node executing
300  // the leftChildFragment; the cost across all nodes is proportional to the
301  // total amount of data sent
302  Analyzer analyzer = ctx_.getRootAnalyzer();
303  PlanNode rhsTree = rightChildFragment.getPlanRoot();
304  long rhsDataSize = 0;
305  long broadcastCost = Long.MAX_VALUE;
306  if (rhsTree.getCardinality() != -1 && leftChildFragment.getNumNodes() != -1) {
307  rhsDataSize = Math.round(
308  (double) rhsTree.getCardinality() * rhsTree.getAvgRowSize());
309  broadcastCost = rhsDataSize * leftChildFragment.getNumNodes();
310  }
311  LOG.debug("broadcast: cost=" + Long.toString(broadcastCost));
312  LOG.debug("card=" + Long.toString(rhsTree.getCardinality()) + " row_size="
313  + Float.toString(rhsTree.getAvgRowSize()) + " #nodes="
314  + Integer.toString(leftChildFragment.getNumNodes()));
315 
316  // repartition: both left- and rightChildFragment are partitioned on the
317  // join exprs
318  PlanNode lhsTree = leftChildFragment.getPlanRoot();
319  long partitionCost = Long.MAX_VALUE;
320  List<Expr> lhsJoinExprs = Lists.newArrayList();
321  List<Expr> rhsJoinExprs = Lists.newArrayList();
322  for (Expr joinConjunct: node.getEqJoinConjuncts()) {
323  // no remapping necessary
324  lhsJoinExprs.add(joinConjunct.getChild(0).clone());
325  rhsJoinExprs.add(joinConjunct.getChild(1).clone());
326  }
327  boolean lhsHasCompatPartition = false;
328  boolean rhsHasCompatPartition = false;
329  if (lhsTree.getCardinality() != -1 && rhsTree.getCardinality() != -1) {
330  lhsHasCompatPartition = analyzer.equivSets(lhsJoinExprs,
331  leftChildFragment.getDataPartition().getPartitionExprs());
332  rhsHasCompatPartition = analyzer.equivSets(rhsJoinExprs,
333  rightChildFragment.getDataPartition().getPartitionExprs());
334 
335  double lhsCost = (lhsHasCompatPartition) ? 0.0 :
336  Math.round((double) lhsTree.getCardinality() * lhsTree.getAvgRowSize());
337  double rhsCost = (rhsHasCompatPartition) ? 0.0 :
338  Math.round((double) rhsTree.getCardinality() * rhsTree.getAvgRowSize());
339  partitionCost = Math.round(lhsCost + rhsCost);
340  }
341  LOG.debug("partition: cost=" + Long.toString(partitionCost));
342  LOG.debug("lhs card=" + Long.toString(lhsTree.getCardinality()) + " row_size="
343  + Float.toString(lhsTree.getAvgRowSize()));
344  LOG.debug("rhs card=" + Long.toString(rhsTree.getCardinality()) + " row_size="
345  + Float.toString(rhsTree.getAvgRowSize()));
346  LOG.debug(rhsTree.getExplainString());
347 
348  boolean doBroadcast;
349  // we do a broadcast join if
350  // - we're explicitly told to do so
351  // - or if it's cheaper and we weren't explicitly told to do a partitioned join
352  // - and we're not doing a full outer or right outer/semi join (those require the
353  // left-hand side to be partitioned for correctness)
354  // - and the expected size of the hash tbl doesn't exceed perNodeMemLimit
355  // - or we are doing a null-aware left anti join (broadcast is required for
356  // correctness)
357  // we do a "<=" comparison of the costs so that we default to broadcast joins if
358  // we're unable to estimate the cost
359  if ((node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN
360  && node.getJoinOp() != JoinOperator.FULL_OUTER_JOIN
361  && node.getJoinOp() != JoinOperator.RIGHT_SEMI_JOIN
362  && node.getJoinOp() != JoinOperator.RIGHT_ANTI_JOIN
363  && (perNodeMemLimit == 0
364  || Math.round((double) rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD)
365  <= perNodeMemLimit)
366  && (node.getTableRef().isBroadcastJoin()
367  || (!node.getTableRef().isPartitionedJoin()
368  && broadcastCost <= partitionCost)))
369  || node.getJoinOp().isNullAwareLeftAntiJoin()) {
370  doBroadcast = true;
371  } else {
372  doBroadcast = false;
373  }
374 
375  if (doBroadcast) {
376  node.setDistributionMode(HashJoinNode.DistributionMode.BROADCAST);
377  // Doesn't create a new fragment, but modifies leftChildFragment to execute
378  // the join; the build input is provided by an ExchangeNode, which is the
379  // destination of the rightChildFragment's output
380  node.setChild(0, leftChildFragment.getPlanRoot());
381  connectChildFragment(node, 1, rightChildFragment);
382  leftChildFragment.setPlanRoot(node);
383  return leftChildFragment;
384  } else {
385  node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED);
386  // The lhs and rhs input fragments are already partitioned on the join exprs.
387  // Combine the lhs/rhs input fragments into leftChildFragment by placing the join
388  // node into leftChildFragment and setting its lhs/rhs children to the plan root of
389  // the lhs/rhs child fragment, respectively. No new child fragments or exchanges
390  // are created, and the rhs fragment is removed.
391  // TODO: Relax the isCompatPartition() check below. The check is conservative and
392  // may reject partitions that could be made physically compatible. Fix this by
393  // removing equivalent duplicates from partition exprs and impose a canonical order
394  // on partition exprs (both using the canonical equivalence class representatives).
395  if (lhsHasCompatPartition
396  && rhsHasCompatPartition
398  leftChildFragment.getDataPartition(),
399  rightChildFragment.getDataPartition(),
400  lhsJoinExprs, rhsJoinExprs, analyzer)) {
401  node.setChild(0, leftChildFragment.getPlanRoot());
402  node.setChild(1, rightChildFragment.getPlanRoot());
403  // Redirect fragments sending to rightFragment to leftFragment.
404  for (PlanFragment fragment: fragments) {
405  if (fragment.getDestFragment() == rightChildFragment) {
406  fragment.setDestination(fragment.getDestNode());
407  }
408  }
409  // Remove right fragment because its plan tree has been merged into leftFragment.
410  fragments.remove(rightChildFragment);
411  leftChildFragment.setPlanRoot(node);
412  return leftChildFragment;
413  }
414 
415  // The lhs input fragment is already partitioned on the join exprs.
416  // Make the HashJoin the new root of leftChildFragment and set the join's
417  // first child to the lhs plan root. The second child of the join is an
418  // ExchangeNode that is fed by the rhsInputFragment whose sink repartitions
419  // its data by the rhs join exprs.
420  DataPartition rhsJoinPartition = null;
421  if (lhsHasCompatPartition) {
422  rhsJoinPartition = getCompatPartition(lhsJoinExprs,
423  leftChildFragment.getDataPartition(), rhsJoinExprs, analyzer);
424  if (rhsJoinPartition != null) {
425  node.setChild(0, leftChildFragment.getPlanRoot());
426  connectChildFragment(node, 1, rightChildFragment);
427  rightChildFragment.setOutputPartition(rhsJoinPartition);
428  leftChildFragment.setPlanRoot(node);
429  return leftChildFragment;
430  }
431  }
432 
433  // Same as above but with rhs and lhs reversed.
434  DataPartition lhsJoinPartition = null;
435  if (rhsHasCompatPartition) {
436  lhsJoinPartition = getCompatPartition(rhsJoinExprs,
437  rightChildFragment.getDataPartition(), lhsJoinExprs, analyzer);
438  if (lhsJoinPartition != null) {
439  node.setChild(1, rightChildFragment.getPlanRoot());
440  connectChildFragment(node, 0, leftChildFragment);
441  leftChildFragment.setOutputPartition(lhsJoinPartition);
442  rightChildFragment.setPlanRoot(node);
443  return rightChildFragment;
444  }
445  }
446 
447  Preconditions.checkState(lhsJoinPartition == null);
448  Preconditions.checkState(rhsJoinPartition == null);
449  lhsJoinPartition = new DataPartition(TPartitionType.HASH_PARTITIONED,
450  Expr.cloneList(lhsJoinExprs));
451  rhsJoinPartition = new DataPartition(TPartitionType.HASH_PARTITIONED,
452  Expr.cloneList(rhsJoinExprs));
453 
454  // Neither lhs nor rhs are already partitioned on the join exprs.
455  // Create a new parent fragment containing a HashJoin node with two
456  // ExchangeNodes as inputs; the latter are the destinations of the
457  // left- and rightChildFragments, which now partition their output
458  // on their respective join exprs.
459  // The new fragment is hash-partitioned on the lhs input join exprs.
460  ExchangeNode lhsExchange = new ExchangeNode(ctx_.getNextNodeId());
461  lhsExchange.addChild(leftChildFragment.getPlanRoot(), false);
462  lhsExchange.computeStats(null);
463  node.setChild(0, lhsExchange);
464  ExchangeNode rhsExchange = new ExchangeNode(ctx_.getNextNodeId());
465  rhsExchange.addChild(rightChildFragment.getPlanRoot(), false);
466  rhsExchange.computeStats(null);
467  node.setChild(1, rhsExchange);
468 
469  // Connect the child fragments in a new fragment, and set the data partition
470  // of the new fragment and its child fragments.
471  PlanFragment joinFragment =
472  new PlanFragment(ctx_.getNextFragmentId(), node, lhsJoinPartition);
473  leftChildFragment.setDestination(lhsExchange);
474  leftChildFragment.setOutputPartition(lhsJoinPartition);
475  rightChildFragment.setDestination(rhsExchange);
476  rightChildFragment.setOutputPartition(rhsJoinPartition);
477 
478  return joinFragment;
479  }
480  }
481 
498  private boolean isCompatPartition(DataPartition lhsPartition,
499  DataPartition rhsPartition, List<Expr> lhsJoinExprs, List<Expr> rhsJoinExprs,
500  Analyzer analyzer) {
501  List<Expr> lhsPartExprs = lhsPartition.getPartitionExprs();
502  List<Expr> rhsPartExprs = rhsPartition.getPartitionExprs();
503  // 1. Sizes must be equal.
504  if (lhsPartExprs.size() != rhsPartExprs.size()) return false;
505  // 2. Lhs/rhs join exprs are identical to lhs/rhs partition exprs.
506  Preconditions.checkState(lhsJoinExprs.size() == rhsJoinExprs.size());
507  if (lhsJoinExprs.size() == lhsPartExprs.size()) {
508  if (lhsJoinExprs.equals(lhsPartExprs) && rhsJoinExprs.equals(rhsPartExprs)) {
509  return true;
510  }
511  }
512  // 3. Each lhs part expr must have an equivalent expr at the same position
513  // in the rhs part exprs.
514  for (int i = 0; i < lhsPartExprs.size(); ++i) {
515  if (!analyzer.equivExprs(lhsPartExprs.get(i), rhsPartExprs.get(i))) return false;
516  }
517  return true;
518  }
519 
537  private DataPartition getCompatPartition(List<Expr> srcJoinExprs,
538  DataPartition srcPartition, List<Expr> joinExprs, Analyzer analyzer) {
539  Preconditions.checkState(srcPartition.isHashPartitioned());
540  List<Expr> srcPartExprs = srcPartition.getPartitionExprs();
541  List<Expr> resultPartExprs = Lists.newArrayList();
542  for (int i = 0; i < srcPartExprs.size(); ++i) {
543  for (int j = 0; j < srcJoinExprs.size(); ++j) {
544  if (analyzer.equivExprs(srcPartExprs.get(i), srcJoinExprs.get(j))) {
545  resultPartExprs.add(joinExprs.get(j).clone());
546  break;
547  }
548  }
549  }
550  if (resultPartExprs.size() != srcPartExprs.size()) return null;
551  return new DataPartition(TPartitionType.HASH_PARTITIONED, resultPartExprs);
552  }
553 
567  ArrayList<PlanFragment> childFragments, ArrayList<PlanFragment> fragments)
568  throws InternalException {
569  Preconditions.checkState(unionNode.getChildren().size() == childFragments.size());
570 
571  // A UnionNode could have no children or constant selects if all of its operands
572  // were dropped because of constant predicates that evaluated to false.
573  if (unionNode.getChildren().isEmpty()) {
574  return new PlanFragment(
576  }
577 
578  Preconditions.checkState(!childFragments.isEmpty());
579  int numUnpartitionedChildFragments = 0;
580  for (int i = 0; i < childFragments.size(); ++i) {
581  if (!childFragments.get(i).isPartitioned()) ++numUnpartitionedChildFragments;
582  }
583 
584  // If all child fragments are unpartitioned, return a single unpartitioned fragment
585  // with a UnionNode that merges all child fragments.
586  if (numUnpartitionedChildFragments == childFragments.size()) {
587  // Absorb the plan trees of all childFragments into unionNode.
588  for (int i = 0; i < childFragments.size(); ++i) {
589  unionNode.setChild(i, childFragments.get(i).getPlanRoot());
590  }
591  PlanFragment unionFragment = new PlanFragment(ctx_.getNextFragmentId(),
592  unionNode, DataPartition.UNPARTITIONED);
593  unionNode.init(ctx_.getRootAnalyzer());
594  // All child fragments have been absorbed into unionFragment.
595  fragments.removeAll(childFragments);
596  return unionFragment;
597  }
598 
599  // There is at least one partitioned child fragment.
600  for (int i = 0; i < childFragments.size(); ++i) {
601  PlanFragment childFragment = childFragments.get(i);
602  if (childFragment.isPartitioned()) {
603  // Absorb the plan trees of all partitioned child fragments into unionNode.
604  unionNode.setChild(i, childFragment.getPlanRoot());
605  fragments.remove(childFragment);
606  } else {
607  // Connect the unpartitioned child fragments to unionNode via a random exchange.
608  connectChildFragment(unionNode, i, childFragment);
609  childFragment.setOutputPartition(DataPartition.RANDOM);
610  }
611  }
612 
613  // Fragment contains the UnionNode that consumes the data of all child fragments.
614  PlanFragment unionFragment = new PlanFragment(ctx_.getNextFragmentId(),
615  unionNode, DataPartition.RANDOM);
616  unionNode.reorderOperands(ctx_.getRootAnalyzer());
617  unionNode.init(ctx_.getRootAnalyzer());
618  return unionFragment;
619  }
620 
626  ArrayList<PlanFragment> childFragments) {
627  Preconditions.checkState(selectNode.getChildren().size() == childFragments.size());
628  PlanFragment childFragment = childFragments.get(0);
629  // set the child explicitly, an ExchangeNode might have been inserted
630  // (whereas selectNode.child[0] would point to the original child)
631  selectNode.setChild(0, childFragment.getPlanRoot());
632  childFragment.setPlanRoot(selectNode);
633  return childFragment;
634  }
635 
640  private void connectChildFragment(PlanNode node, int childIdx,
641  PlanFragment childFragment) throws InternalException {
642  ExchangeNode exchangeNode = new ExchangeNode(ctx_.getNextNodeId());
643  exchangeNode.addChild(childFragment.getPlanRoot(), false);
644  exchangeNode.init(ctx_.getRootAnalyzer());
645  node.setChild(childIdx, exchangeNode);
646  childFragment.setDestination(exchangeNode);
647  }
648 
660  PlanFragment childFragment, DataPartition parentPartition)
661  throws InternalException {
662  ExchangeNode exchangeNode = new ExchangeNode(ctx_.getNextNodeId());
663  exchangeNode.addChild(childFragment.getPlanRoot(), false);
664  exchangeNode.init(ctx_.getRootAnalyzer());
665  PlanFragment parentFragment = new PlanFragment(ctx_.getNextFragmentId(),
666  exchangeNode, parentPartition);
667  childFragment.setDestination(exchangeNode);
668  childFragment.setOutputPartition(parentPartition);
669  return parentFragment;
670  }
671 
682  PlanFragment childFragment, ArrayList<PlanFragment> fragments)
683  throws InternalException {
684  if (!childFragment.isPartitioned()) {
685  // nothing to distribute; do full aggregation directly within childFragment
686  childFragment.addPlanRoot(node);
687  return childFragment;
688  }
689 
690  if (node.getAggInfo().isDistinctAgg()) {
691  // 'node' is phase 1 of a DISTINCT aggregation; the actual agg fragment
692  // will get created in the next createAggregationFragment() call
693  // for the parent AggregationNode
694  childFragment.addPlanRoot(node);
695  return childFragment;
696  }
697 
698  ArrayList<Expr> groupingExprs = node.getAggInfo().getGroupingExprs();
699  boolean hasGrouping = !groupingExprs.isEmpty();
700  // 2nd phase of DISTINCT aggregation
701  boolean isDistinct =
702  node.getChild(0) instanceof AggregationNode
703  && ((AggregationNode)(node.getChild(0))).getAggInfo().isDistinctAgg();
704 
705  if (!isDistinct) {
706  // the original aggregation materializes the intermediate agg tuple and goes
707  // into the child fragment; merge aggregation materializes the output agg tuple
708  // and goes into a parent fragment
709  childFragment.addPlanRoot(node);
710  node.setIntermediateTuple();
711 
712  // if there is a limit, we need to transfer it from the pre-aggregation
713  // node in the child fragment to the merge aggregation node in the parent
714  long limit = node.getLimit();
715  node.unsetLimit();
716  node.unsetNeedsFinalize();
717 
718  DataPartition parentPartition = null;
719  if (hasGrouping) {
720  // the parent fragment is partitioned on the grouping exprs;
721  // substitute grouping exprs to reference the *output* of the agg, not the input
722  List<Expr> partitionExprs = node.getAggInfo().getPartitionExprs();
723  if (partitionExprs == null) partitionExprs = groupingExprs;
724  partitionExprs = Expr.substituteList(partitionExprs,
725  node.getAggInfo().getIntermediateSmap(), ctx_.getRootAnalyzer(), false);
726  parentPartition =
727  new DataPartition(TPartitionType.HASH_PARTITIONED, partitionExprs);
728  } else {
729  // the parent fragment is unpartitioned
730  parentPartition = DataPartition.UNPARTITIONED;
731  }
732 
733  // place a merge aggregation step in a new fragment
734  PlanFragment mergeFragment = createParentFragment(childFragment, parentPartition);
735  AggregationNode mergeAggNode = new AggregationNode(
736  ctx_.getNextNodeId(), mergeFragment.getPlanRoot(),
737  node.getAggInfo().getMergeAggInfo());
738  mergeAggNode.init(ctx_.getRootAnalyzer());
739  mergeAggNode.setLimit(limit);
740 
741  // HAVING predicates can only be evaluated after the merge agg step
742  node.transferConjuncts(mergeAggNode);
743  // Recompute stats after transferring the conjuncts_ (order is important).
744  node.computeStats(ctx_.getRootAnalyzer());
745  mergeFragment.getPlanRoot().computeStats(ctx_.getRootAnalyzer());
746  mergeAggNode.computeStats(ctx_.getRootAnalyzer());
747  // Set new plan root after updating stats.
748  mergeFragment.addPlanRoot(mergeAggNode);
749 
750  return mergeFragment;
751  }
752 
753  Preconditions.checkState(isDistinct);
754  // The first-phase aggregation node is already in the child fragment.
755  Preconditions.checkState(node.getChild(0) == childFragment.getPlanRoot());
756 
757  AggregateInfo firstPhaseAggInfo = ((AggregationNode) node.getChild(0)).getAggInfo();
758  List<Expr> partitionExprs = null;
759  if (hasGrouping) {
760  // We need to do
761  // - child fragment:
762  // * phase-1 aggregation
763  // - merge fragment, hash-partitioned on grouping exprs:
764  // * merge agg of phase 1
765  // * phase 2 agg
766  // The output partition exprs of the child are the (input) grouping exprs of the
767  // parent. The grouping exprs reference the output tuple of the 1st phase, but the
768  // partitioning happens on the intermediate tuple of the 1st phase.
769  partitionExprs = Expr.substituteList(
770  groupingExprs, firstPhaseAggInfo.getOutputToIntermediateSmap(),
771  ctx_.getRootAnalyzer(), false);
772  } else {
773  // We need to do
774  // - child fragment:
775  // * phase-1 aggregation
776  // - merge fragment 1, hash-partitioned on distinct exprs:
777  // * merge agg of phase 1
778  // * phase 2 agg
779  // - merge fragment 2, unpartitioned:
780  // * merge agg of phase 2
781  partitionExprs = Expr.substituteList(firstPhaseAggInfo.getGroupingExprs(),
782  firstPhaseAggInfo.getIntermediateSmap(), ctx_.getRootAnalyzer(), false);
783  }
784  DataPartition mergePartition =
785  new DataPartition(TPartitionType.HASH_PARTITIONED, partitionExprs);
786 
787  // place a merge aggregation step for the 1st phase in a new fragment
788  PlanFragment mergeFragment = createParentFragment(childFragment, mergePartition);
789  AggregateInfo mergeAggInfo = firstPhaseAggInfo.getMergeAggInfo();
790  AggregationNode mergeAggNode =
791  new AggregationNode(ctx_.getNextNodeId(), node.getChild(0), mergeAggInfo);
792  mergeAggNode.init(ctx_.getRootAnalyzer());
793  mergeAggNode.unsetNeedsFinalize();
794  mergeAggNode.setIntermediateTuple();
795  mergeFragment.addPlanRoot(mergeAggNode);
796  // the 2nd-phase aggregation consumes the output of the merge agg;
797  // if there is a limit, it had already been placed with the 2nd aggregation
798  // step (which is where it should be)
799  mergeFragment.addPlanRoot(node);
800 
801  if (!hasGrouping) {
802  // place the merge aggregation of the 2nd phase in an unpartitioned fragment;
803  // add preceding merge fragment at end
804  fragments.add(mergeFragment);
805 
806  node.unsetNeedsFinalize();
807  node.setIntermediateTuple();
808  // Any limit should be placed in the final merge aggregation node
809  long limit = node.getLimit();
810  node.unsetLimit();
811  mergeFragment = createParentFragment(mergeFragment, DataPartition.UNPARTITIONED);
812  mergeAggInfo = node.getAggInfo().getMergeAggInfo();
813  mergeAggNode =
814  new AggregationNode(ctx_.getNextNodeId(), node.getChild(0), mergeAggInfo);
815  mergeAggNode.init(ctx_.getRootAnalyzer());
816  // Transfer having predicates. If hasGrouping == true, the predicates should
817  // instead be evaluated by the 2nd phase agg (the predicates are already there).
818  node.transferConjuncts(mergeAggNode);
819  mergeAggNode.setLimit(limit);
820  mergeFragment.addPlanRoot(mergeAggNode);
821  }
822  return mergeFragment;
823  }
824 
833  PlanFragment childFragment, ArrayList<PlanFragment> fragments)
834  throws InternalException {
835  Preconditions.checkState(
836  node instanceof SortNode || node instanceof AnalyticEvalNode);
837  if (node instanceof AnalyticEvalNode) {
838  AnalyticEvalNode analyticNode = (AnalyticEvalNode) node;
839  if (analyticNode.getPartitionExprs().isEmpty()
840  && analyticNode.getOrderByElements().isEmpty()) {
841  // no Partition-By/Order-By exprs: compute analytic exprs in single
842  // unpartitioned fragment
843  PlanFragment fragment = childFragment;
844  if (childFragment.isPartitioned()) {
845  fragment = createParentFragment(childFragment, DataPartition.UNPARTITIONED);
846  }
847  fragment.addPlanRoot(analyticNode);
848  return fragment;
849  } else {
850  childFragment.addPlanRoot(analyticNode);
851  return childFragment;
852  }
853  }
854 
855  SortNode sortNode = (SortNode) node;
856  Preconditions.checkState(sortNode.isAnalyticSort());
857  PlanFragment analyticFragment = childFragment;
858  if (sortNode.getInputPartition() != null) {
859  // make sure the childFragment's output is partitioned as required by the sortNode
860  sortNode.getInputPartition().substitute(
861  childFragment.getPlanRoot().getOutputSmap(), ctx_.getRootAnalyzer());
862  if (!childFragment.getDataPartition().equals(sortNode.getInputPartition())) {
863  analyticFragment =
864  createParentFragment(childFragment, sortNode.getInputPartition());
865  }
866  }
867  analyticFragment.addPlanRoot(sortNode);
868  return analyticFragment;
869  }
870 
879  PlanFragment childFragment, ArrayList<PlanFragment> fragments)
880  throws InternalException {
881  node.setChild(0, childFragment.getPlanRoot());
882  childFragment.addPlanRoot(node);
883  if (!childFragment.isPartitioned()) return childFragment;
884 
885  // Remember original offset and limit.
886  boolean hasLimit = node.hasLimit();
887  long limit = node.getLimit();
888  long offset = node.getOffset();
889 
890  // Create a new fragment for a sort-merging exchange.
891  PlanFragment mergeFragment =
893  ExchangeNode exchNode = (ExchangeNode) mergeFragment.getPlanRoot();
894 
895  // Set limit, offset and merge parameters in the exchange node.
896  exchNode.unsetLimit();
897  if (hasLimit) exchNode.setLimit(limit);
898  exchNode.setMergeInfo(node.getSortInfo(), offset);
899 
900  // Child nodes should not process the offset. If there is a limit,
901  // the child nodes need only return (offset + limit) rows.
902  SortNode childSortNode = (SortNode) childFragment.getPlanRoot();
903  Preconditions.checkState(node == childSortNode);
904  if (hasLimit) {
905  childSortNode.unsetLimit();
906  childSortNode.setLimit(limit + offset);
907  }
908  childSortNode.setOffset(0);
909  childSortNode.computeStats(ctx_.getRootAnalyzer());
910  exchNode.computeStats(ctx_.getRootAnalyzer());
911 
912  return mergeFragment;
913  }
914 }
PlanFragment createCrossJoinFragment(CrossJoinNode node, PlanFragment rightChildFragment, PlanFragment leftChildFragment, long perNodeMemLimit, ArrayList< PlanFragment > fragments)
PlanFragment createInsertFragment(PlanFragment inputFragment, InsertStmt insertStmt, Analyzer analyzer, ArrayList< PlanFragment > fragments)
PlanFragment createUnionNodeFragment(UnionNode unionNode, ArrayList< PlanFragment > childFragments, ArrayList< PlanFragment > fragments)
static final DataPartition UNPARTITIONED
void connectChildFragment(PlanNode node, int childIdx, PlanFragment childFragment)
DataPartition getCompatPartition(List< Expr > srcJoinExprs, DataPartition srcPartition, List< Expr > joinExprs, Analyzer analyzer)
boolean equivExprs(Expr e1, Expr e2)
Definition: Analyzer.java:1887
PlanFragment createHashJoinFragment(HashJoinNode node, PlanFragment rightChildFragment, PlanFragment leftChildFragment, long perNodeMemLimit, ArrayList< PlanFragment > fragments)
PlanFragment createPlanFragments(PlanNode root, boolean isPartitioned, long perNodeMemLimit, ArrayList< PlanFragment > fragments)
boolean isCompatPartition(DataPartition lhsPartition, DataPartition rhsPartition, List< Expr > lhsJoinExprs, List< Expr > rhsJoinExprs, Analyzer analyzer)
PlanFragment createOrderByFragment(SortNode node, PlanFragment childFragment, ArrayList< PlanFragment > fragments)
ArrayList< PlanFragment > createPlanFragments(PlanNode singleNodePlan)
static< CextendsExpr > ArrayList< C > cloneList(Iterable< C > l)
Definition: Expr.java:798
PlanFragment createAggregationFragment(AggregationNode node, PlanFragment childFragment, ArrayList< PlanFragment > fragments)
PlanFragment createSelectNodeFragment(SelectNode selectNode, ArrayList< PlanFragment > childFragments)
uint8_t offset[7 *64-sizeof(uint64_t)]
PlanFragment createMergeFragment(PlanFragment inputFragment)
PlanFragment createParentFragment(PlanFragment childFragment, DataPartition parentPartition)
PlanFragment createAnalyticFragment(PlanNode node, PlanFragment childFragment, ArrayList< PlanFragment > fragments)
static< CextendsExpr > boolean isSubset(List< C > l1, List< C > l2)
Definition: Expr.java:606