15 package com.cloudera.impala.planner;
17 import java.util.ArrayList;
18 import java.util.List;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
33 import com.cloudera.impala.thrift.TPartitionType;
34 import com.google.common.base.Preconditions;
35 import com.google.common.collect.Lists;
43 private final static Logger
LOG = LoggerFactory.getLogger(DistributedPlanner.class);
64 Preconditions.checkState(!ctx_.isSingleNodeExec());
65 AnalysisContext.AnalysisResult analysisResult = ctx_.getAnalysisResult();
66 QueryStmt queryStmt = ctx_.getQueryStmt();
67 ArrayList<PlanFragment> fragments = Lists.newArrayList();
71 boolean isPartitioned =
false;
72 if ((analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt())
73 && !singleNodePlan.hasLimit()) {
74 Preconditions.checkState(!queryStmt.hasOffset());
77 LOG.debug(
"create plan fragments");
78 long perNodeMemLimit = ctx_.getQueryOptions().mem_limit;
79 LOG.debug(
"memlimit=" + Long.toString(perNodeMemLimit));
94 PlanNode root,
boolean isPartitioned,
95 long perNodeMemLimit, ArrayList<PlanFragment> fragments)
97 ArrayList<PlanFragment> childFragments = Lists.newArrayList();
98 for (
PlanNode child: root.getChildren()) {
102 boolean childIsPartitioned = !child.hasLimit();
105 child, childIsPartitioned, perNodeMemLimit, fragments));
111 fragments.add(result);
113 Preconditions.checkState(childFragments.size() == 2);
115 (HashJoinNode) root, childFragments.get(1), childFragments.get(0),
116 perNodeMemLimit, fragments);
118 Preconditions.checkState(childFragments.size() == 2);
120 (CrossJoinNode) root, childFragments.get(1), childFragments.get(0),
121 perNodeMemLimit, fragments);
128 (AggregationNode) root, childFragments.get(0), fragments);
129 }
else if (root instanceof
SortNode) {
130 if (((SortNode) root).isAnalyticSort()) {
133 (SortNode) root, childFragments.get(0), fragments);
136 (SortNode) root, childFragments.get(0), fragments);
145 "Cannot create plan fragment for this node type: " + root.getExplainString());
148 fragments.remove(result);
149 fragments.add(result);
153 fragments.add(result);
165 for (
Expr expr: exprs) {
166 result *= expr.getNumDistinctValues();
167 if (result < 0)
return -1;
183 ArrayList<PlanFragment> fragments)
185 List<Expr> partitionExprs = insertStmt.getPartitionKeyExprs();
186 Boolean partitionHint = insertStmt.isRepartition();
187 if (partitionExprs.isEmpty())
return inputFragment;
188 if (partitionHint != null && !partitionHint)
return inputFragment;
191 List<Expr> nonConstPartitionExprs = Lists.newArrayList(partitionExprs);
192 Expr.removeConstants(nonConstPartitionExprs);
193 DataPartition inputPartition = inputFragment.getDataPartition();
196 if (analyzer.equivSets(inputPartition.getPartitionExprs(),
197 nonConstPartitionExprs)) {
198 return inputFragment;
203 if (
Expr.
isSubset(inputPartition.getPartitionExprs(), nonConstPartitionExprs)) {
205 if (numPartitions >= inputFragment.getNumNodes())
return inputFragment;
219 Preconditions.checkState(inputFragment.getNumNodes() != -1);
220 if (partitionHint == null && numPartitions > 0 &&
221 numPartitions <= inputFragment.getNumNodes()) {
222 return inputFragment;
225 Preconditions.checkState(partitionHint == null || partitionHint);
227 exchNode.addChild(inputFragment.getPlanRoot(),
false);
228 exchNode.init(analyzer);
229 Preconditions.checkState(exchNode.hasValidStats());
231 new DataPartition(TPartitionType.HASH_PARTITIONED, nonConstPartitionExprs);
234 inputFragment.setDestination(exchNode);
235 inputFragment.setOutputPartition(partition);
236 fragments.add(fragment);
247 Preconditions.checkState(inputFragment.isPartitioned());
249 mergePlan.addChild(inputFragment.getPlanRoot(),
false);
250 mergePlan.init(ctx_.getRootAnalyzer());
251 Preconditions.checkState(mergePlan.hasValidStats());
254 inputFragment.setDestination(mergePlan);
275 long perNodeMemLimit, ArrayList<PlanFragment> fragments)
277 node.setChild(0, leftChildFragment.getPlanRoot());
279 leftChildFragment.setPlanRoot(node);
280 return leftChildFragment;
297 ArrayList<PlanFragment> fragments)
302 Analyzer analyzer = ctx_.getRootAnalyzer();
303 PlanNode rhsTree = rightChildFragment.getPlanRoot();
304 long rhsDataSize = 0;
305 long broadcastCost = Long.MAX_VALUE;
306 if (rhsTree.getCardinality() != -1 && leftChildFragment.getNumNodes() != -1) {
307 rhsDataSize = Math.round(
308 (double) rhsTree.getCardinality() * rhsTree.getAvgRowSize());
309 broadcastCost = rhsDataSize * leftChildFragment.getNumNodes();
311 LOG.debug(
"broadcast: cost=" + Long.toString(broadcastCost));
312 LOG.debug(
"card=" + Long.toString(rhsTree.getCardinality()) +
" row_size="
313 + Float.toString(rhsTree.getAvgRowSize()) +
" #nodes="
314 + Integer.toString(leftChildFragment.getNumNodes()));
318 PlanNode lhsTree = leftChildFragment.getPlanRoot();
319 long partitionCost = Long.MAX_VALUE;
320 List<Expr> lhsJoinExprs = Lists.newArrayList();
321 List<Expr> rhsJoinExprs = Lists.newArrayList();
322 for (
Expr joinConjunct: node.getEqJoinConjuncts()) {
324 lhsJoinExprs.add(joinConjunct.getChild(0).clone());
325 rhsJoinExprs.add(joinConjunct.getChild(1).clone());
327 boolean lhsHasCompatPartition =
false;
328 boolean rhsHasCompatPartition =
false;
329 if (lhsTree.getCardinality() != -1 && rhsTree.getCardinality() != -1) {
330 lhsHasCompatPartition = analyzer.equivSets(lhsJoinExprs,
331 leftChildFragment.getDataPartition().getPartitionExprs());
332 rhsHasCompatPartition = analyzer.equivSets(rhsJoinExprs,
333 rightChildFragment.getDataPartition().getPartitionExprs());
335 double lhsCost = (lhsHasCompatPartition) ? 0.0 :
336 Math.round((
double) lhsTree.getCardinality() * lhsTree.getAvgRowSize());
337 double rhsCost = (rhsHasCompatPartition) ? 0.0 :
338 Math.round((
double) rhsTree.getCardinality() * rhsTree.getAvgRowSize());
339 partitionCost = Math.round(lhsCost + rhsCost);
341 LOG.debug(
"partition: cost=" + Long.toString(partitionCost));
342 LOG.debug(
"lhs card=" + Long.toString(lhsTree.getCardinality()) +
" row_size="
343 + Float.toString(lhsTree.getAvgRowSize()));
344 LOG.debug(
"rhs card=" + Long.toString(rhsTree.getCardinality()) +
" row_size="
345 + Float.toString(rhsTree.getAvgRowSize()));
346 LOG.debug(rhsTree.getExplainString());
359 if ((node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN
361 && node.getJoinOp() != JoinOperator.RIGHT_SEMI_JOIN
363 && (perNodeMemLimit == 0
366 && (node.getTableRef().isBroadcastJoin()
367 || (!node.getTableRef().isPartitionedJoin()
368 && broadcastCost <= partitionCost)))
369 || node.getJoinOp().isNullAwareLeftAntiJoin()) {
376 node.setDistributionMode(HashJoinNode.DistributionMode.BROADCAST);
380 node.setChild(0, leftChildFragment.getPlanRoot());
382 leftChildFragment.setPlanRoot(node);
383 return leftChildFragment;
385 node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED);
395 if (lhsHasCompatPartition
396 && rhsHasCompatPartition
398 leftChildFragment.getDataPartition(),
399 rightChildFragment.getDataPartition(),
400 lhsJoinExprs, rhsJoinExprs, analyzer)) {
401 node.setChild(0, leftChildFragment.getPlanRoot());
402 node.setChild(1, rightChildFragment.getPlanRoot());
405 if (fragment.getDestFragment() == rightChildFragment) {
406 fragment.setDestination(fragment.getDestNode());
410 fragments.remove(rightChildFragment);
411 leftChildFragment.setPlanRoot(node);
412 return leftChildFragment;
421 if (lhsHasCompatPartition) {
423 leftChildFragment.getDataPartition(), rhsJoinExprs, analyzer);
424 if (rhsJoinPartition != null) {
425 node.setChild(0, leftChildFragment.getPlanRoot());
427 rightChildFragment.setOutputPartition(rhsJoinPartition);
428 leftChildFragment.setPlanRoot(node);
429 return leftChildFragment;
435 if (rhsHasCompatPartition) {
437 rightChildFragment.getDataPartition(), lhsJoinExprs, analyzer);
438 if (lhsJoinPartition != null) {
439 node.setChild(1, rightChildFragment.getPlanRoot());
441 leftChildFragment.setOutputPartition(lhsJoinPartition);
442 rightChildFragment.setPlanRoot(node);
443 return rightChildFragment;
447 Preconditions.checkState(lhsJoinPartition == null);
448 Preconditions.checkState(rhsJoinPartition == null);
449 lhsJoinPartition =
new DataPartition(TPartitionType.HASH_PARTITIONED,
451 rhsJoinPartition =
new DataPartition(TPartitionType.HASH_PARTITIONED,
461 lhsExchange.addChild(leftChildFragment.getPlanRoot(),
false);
462 lhsExchange.computeStats(null);
463 node.setChild(0, lhsExchange);
465 rhsExchange.addChild(rightChildFragment.getPlanRoot(),
false);
466 rhsExchange.computeStats(null);
467 node.setChild(1, rhsExchange);
473 leftChildFragment.setDestination(lhsExchange);
474 leftChildFragment.setOutputPartition(lhsJoinPartition);
475 rightChildFragment.setDestination(rhsExchange);
476 rightChildFragment.setOutputPartition(rhsJoinPartition);
499 DataPartition rhsPartition, List<Expr> lhsJoinExprs, List<Expr> rhsJoinExprs,
501 List<Expr> lhsPartExprs = lhsPartition.getPartitionExprs();
502 List<Expr> rhsPartExprs = rhsPartition.getPartitionExprs();
504 if (lhsPartExprs.size() != rhsPartExprs.size())
return false;
506 Preconditions.checkState(lhsJoinExprs.size() == rhsJoinExprs.size());
507 if (lhsJoinExprs.size() == lhsPartExprs.size()) {
508 if (lhsJoinExprs.equals(lhsPartExprs) && rhsJoinExprs.equals(rhsPartExprs)) {
514 for (
int i = 0; i < lhsPartExprs.size(); ++i) {
515 if (!analyzer.
equivExprs(lhsPartExprs.get(i), rhsPartExprs.get(i)))
return false;
539 Preconditions.checkState(srcPartition.isHashPartitioned());
540 List<Expr> srcPartExprs = srcPartition.getPartitionExprs();
541 List<Expr> resultPartExprs = Lists.newArrayList();
542 for (
int i = 0; i < srcPartExprs.size(); ++i) {
543 for (
int j = 0; j < srcJoinExprs.size(); ++j) {
544 if (analyzer.
equivExprs(srcPartExprs.get(i), srcJoinExprs.get(j))) {
545 resultPartExprs.add(joinExprs.get(j).clone());
550 if (resultPartExprs.size() != srcPartExprs.size())
return null;
551 return new DataPartition(TPartitionType.HASH_PARTITIONED, resultPartExprs);
567 ArrayList<PlanFragment> childFragments, ArrayList<PlanFragment> fragments)
569 Preconditions.checkState(unionNode.getChildren().size() == childFragments.size());
573 if (unionNode.getChildren().isEmpty()) {
578 Preconditions.checkState(!childFragments.isEmpty());
579 int numUnpartitionedChildFragments = 0;
580 for (
int i = 0; i < childFragments.size(); ++i) {
581 if (!childFragments.get(i).isPartitioned()) ++numUnpartitionedChildFragments;
586 if (numUnpartitionedChildFragments == childFragments.size()) {
588 for (
int i = 0; i < childFragments.size(); ++i) {
589 unionNode.setChild(i, childFragments.get(i).getPlanRoot());
593 unionNode.init(ctx_.getRootAnalyzer());
595 fragments.removeAll(childFragments);
596 return unionFragment;
600 for (
int i = 0; i < childFragments.size(); ++i) {
604 unionNode.setChild(i, childFragment.getPlanRoot());
605 fragments.remove(childFragment);
616 unionNode.reorderOperands(ctx_.getRootAnalyzer());
617 unionNode.init(ctx_.getRootAnalyzer());
618 return unionFragment;
626 ArrayList<PlanFragment> childFragments) {
627 Preconditions.checkState(selectNode.getChildren().size() == childFragments.size());
631 selectNode.setChild(0, childFragment.getPlanRoot());
632 childFragment.setPlanRoot(selectNode);
633 return childFragment;
643 exchangeNode.addChild(childFragment.getPlanRoot(),
false);
644 exchangeNode.init(ctx_.getRootAnalyzer());
645 node.setChild(childIdx, exchangeNode);
646 childFragment.setDestination(exchangeNode);
663 exchangeNode.addChild(childFragment.getPlanRoot(),
false);
664 exchangeNode.init(ctx_.getRootAnalyzer());
666 exchangeNode, parentPartition);
667 childFragment.setDestination(exchangeNode);
668 childFragment.setOutputPartition(parentPartition);
669 return parentFragment;
682 PlanFragment childFragment, ArrayList<PlanFragment> fragments)
684 if (!childFragment.isPartitioned()) {
686 childFragment.addPlanRoot(node);
687 return childFragment;
690 if (node.getAggInfo().isDistinctAgg()) {
694 childFragment.addPlanRoot(node);
695 return childFragment;
698 ArrayList<Expr> groupingExprs = node.getAggInfo().getGroupingExprs();
699 boolean hasGrouping = !groupingExprs.isEmpty();
709 childFragment.addPlanRoot(node);
710 node.setIntermediateTuple();
714 long limit = node.getLimit();
716 node.unsetNeedsFinalize();
722 List<Expr> partitionExprs = node.getAggInfo().getPartitionExprs();
723 if (partitionExprs == null) partitionExprs = groupingExprs;
724 partitionExprs = Expr.substituteList(partitionExprs,
725 node.getAggInfo().getIntermediateSmap(), ctx_.getRootAnalyzer(),
false);
727 new DataPartition(TPartitionType.HASH_PARTITIONED, partitionExprs);
737 node.getAggInfo().getMergeAggInfo());
738 mergeAggNode.init(ctx_.getRootAnalyzer());
739 mergeAggNode.setLimit(limit);
742 node.transferConjuncts(mergeAggNode);
744 node.computeStats(ctx_.getRootAnalyzer());
746 mergeAggNode.computeStats(ctx_.getRootAnalyzer());
748 mergeFragment.addPlanRoot(mergeAggNode);
750 return mergeFragment;
753 Preconditions.checkState(isDistinct);
755 Preconditions.checkState(node.getChild(0) == childFragment.getPlanRoot());
758 List<Expr> partitionExprs = null;
769 partitionExprs = Expr.substituteList(
770 groupingExprs, firstPhaseAggInfo.getOutputToIntermediateSmap(),
781 partitionExprs = Expr.substituteList(firstPhaseAggInfo.getGroupingExprs(),
782 firstPhaseAggInfo.getIntermediateSmap(), ctx_.getRootAnalyzer(),
false);
785 new DataPartition(TPartitionType.HASH_PARTITIONED, partitionExprs);
789 AggregateInfo mergeAggInfo = firstPhaseAggInfo.getMergeAggInfo();
792 mergeAggNode.init(ctx_.getRootAnalyzer());
793 mergeAggNode.unsetNeedsFinalize();
794 mergeAggNode.setIntermediateTuple();
795 mergeFragment.addPlanRoot(mergeAggNode);
799 mergeFragment.addPlanRoot(node);
804 fragments.add(mergeFragment);
806 node.unsetNeedsFinalize();
807 node.setIntermediateTuple();
809 long limit = node.getLimit();
812 mergeAggInfo = node.getAggInfo().getMergeAggInfo();
815 mergeAggNode.init(ctx_.getRootAnalyzer());
818 node.transferConjuncts(mergeAggNode);
819 mergeAggNode.setLimit(limit);
820 mergeFragment.addPlanRoot(mergeAggNode);
822 return mergeFragment;
833 PlanFragment childFragment, ArrayList<PlanFragment> fragments)
835 Preconditions.checkState(
838 AnalyticEvalNode analyticNode = (AnalyticEvalNode) node;
844 if (childFragment.isPartitioned()) {
847 fragment.addPlanRoot(analyticNode);
850 childFragment.addPlanRoot(analyticNode);
851 return childFragment;
856 Preconditions.checkState(sortNode.isAnalyticSort());
860 sortNode.getInputPartition().substitute(
862 if (!childFragment.getDataPartition().equals(sortNode.getInputPartition())) {
867 analyticFragment.addPlanRoot(sortNode);
868 return analyticFragment;
879 PlanFragment childFragment, ArrayList<PlanFragment> fragments)
881 node.setChild(0, childFragment.getPlanRoot());
882 childFragment.addPlanRoot(node);
883 if (!childFragment.isPartitioned())
return childFragment;
886 boolean hasLimit = node.hasLimit();
887 long limit = node.getLimit();
888 long offset = node.getOffset();
896 exchNode.unsetLimit();
897 if (hasLimit) exchNode.setLimit(limit);
898 exchNode.setMergeInfo(node.getSortInfo(), offset);
903 Preconditions.checkState(node == childSortNode);
905 childSortNode.unsetLimit();
906 childSortNode.setLimit(limit +
offset);
908 childSortNode.setOffset(0);
909 childSortNode.computeStats(ctx_.getRootAnalyzer());
910 exchNode.computeStats(ctx_.getRootAnalyzer());
912 return mergeFragment;
final PlannerContext ctx_
List< Expr > getPartitionExprs()
PlanFragment createCrossJoinFragment(CrossJoinNode node, PlanFragment rightChildFragment, PlanFragment leftChildFragment, long perNodeMemLimit, ArrayList< PlanFragment > fragments)
PlanFragment createInsertFragment(PlanFragment inputFragment, InsertStmt insertStmt, Analyzer analyzer, ArrayList< PlanFragment > fragments)
PlanFragment createUnionNodeFragment(UnionNode unionNode, ArrayList< PlanFragment > childFragments, ArrayList< PlanFragment > fragments)
static final DataPartition UNPARTITIONED
void connectChildFragment(PlanNode node, int childIdx, PlanFragment childFragment)
DataPartition getInputPartition()
long getNumDistinctValues(List< Expr > exprs)
List< OrderByElement > getOrderByElements()
PlanFragmentId getNextFragmentId()
DataPartition getCompatPartition(List< Expr > srcJoinExprs, DataPartition srcPartition, List< Expr > joinExprs, Analyzer analyzer)
boolean equivExprs(Expr e1, Expr e2)
PlanFragment createHashJoinFragment(HashJoinNode node, PlanFragment rightChildFragment, PlanFragment leftChildFragment, long perNodeMemLimit, ArrayList< PlanFragment > fragments)
PlanFragment createScanFragment(PlanNode node)
Analyzer getRootAnalyzer()
PlanFragment createPlanFragments(PlanNode root, boolean isPartitioned, long perNodeMemLimit, ArrayList< PlanFragment > fragments)
boolean isCompatPartition(DataPartition lhsPartition, DataPartition rhsPartition, List< Expr > lhsJoinExprs, List< Expr > rhsJoinExprs, Analyzer analyzer)
PlanFragment createOrderByFragment(SortNode node, PlanFragment childFragment, ArrayList< PlanFragment > fragments)
ArrayList< PlanFragment > createPlanFragments(PlanNode singleNodePlan)
static< CextendsExpr > ArrayList< C > cloneList(Iterable< C > l)
PlanFragment createAggregationFragment(AggregationNode node, PlanFragment childFragment, ArrayList< PlanFragment > fragments)
PlanFragment createSelectNodeFragment(SelectNode selectNode, ArrayList< PlanFragment > childFragments)
PlanNodeId getNextNodeId()
uint8_t offset[7 *64-sizeof(uint64_t)]
PlanFragment createMergeFragment(PlanFragment inputFragment)
static final double HASH_TBL_SPACE_OVERHEAD
static final DataPartition RANDOM
DistributedPlanner(PlannerContext ctx)
PlanFragment createParentFragment(PlanFragment childFragment, DataPartition parentPartition)
PlanFragment createAnalyticFragment(PlanNode node, PlanFragment childFragment, ArrayList< PlanFragment > fragments)
static< CextendsExpr > boolean isSubset(List< C > l1, List< C > l2)