Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PlanFragment.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.planner;
16 
17 import java.util.List;
18 
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21 
33 import com.cloudera.impala.thrift.TExplainLevel;
34 import com.cloudera.impala.thrift.TPartitionType;
35 import com.cloudera.impala.thrift.TPlanFragment;
36 import com.google.common.base.Preconditions;
37 import com.google.common.base.Predicates;
38 import com.google.common.collect.Lists;
39 
63 public class PlanFragment {
64  private final static Logger LOG = LoggerFactory.getLogger(PlanFragment.class);
65 
66  private final PlanFragmentId fragmentId_;
67 
68  // root of plan tree executed by this fragment
70 
71  // exchange node to which this fragment sends its output
73 
74  // if null, outputs the entire row produced by planRoot_
75  private List<Expr> outputExprs_;
76 
77  // created in finalize() or set in setSink()
78  private DataSink sink_;
79 
80  // specification of the partition of the input of this fragment;
81  // an UNPARTITIONED fragment is executed on only a single node
82  // TODO: improve this comment, "input" is a bit misleading
84 
85  // specification of how the output of this fragment is partitioned (i.e., how
86  // it's sent to its destination);
87  // if the output is UNPARTITIONED, it is being broadcast
89 
93  public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition) {
94  fragmentId_ = id;
95  planRoot_ = root;
96  dataPartition_ = partition;
99  }
100 
106  private void setFragmentInPlanTree(PlanNode node) {
107  if (node == null) return;
108  node.setFragment(this);
109  if (!(node instanceof ExchangeNode)) {
110  for (PlanNode child : node.getChildren()) {
111  setFragmentInPlanTree(child);
112  }
113  }
114  }
115 
116  public void setOutputExprs(List<Expr> outputExprs) {
117  outputExprs_ = Expr.cloneList(outputExprs);
118  }
119  public List<Expr> getOutputExprs() { return outputExprs_; }
120 
128  public void finalize(Analyzer analyzer)
131 
132  if (destNode_ != null) {
133  Preconditions.checkState(sink_ == null);
134  // we're streaming to an exchange node
136  streamSink.setFragment(this);
137  sink_ = streamSink;
138  }
139 
140  if (!dataPartition_.isHashPartitioned()) return;
141 
142  // This fragment is hash partitioned. Gather all exchange nodes and ensure
143  // that all hash-partitioning senders hash on exprs-values of the same type.
144  List<ExchangeNode> exchNodes = Lists.newArrayList();
145  planRoot_.collect(Predicates.instanceOf(ExchangeNode.class), exchNodes);
146 
147  // Contains partition-expr lists of all hash-partitioning sender fragments.
148  List<List<Expr>> senderPartitionExprs = Lists.newArrayList();
149  for (ExchangeNode exchNode: exchNodes) {
150  Preconditions.checkState(!exchNode.getChildren().isEmpty());
151  PlanFragment senderFragment = exchNode.getChild(0).getFragment();
152  Preconditions.checkNotNull(senderFragment);
153  if (!senderFragment.getOutputPartition().isHashPartitioned()) continue;
154  List<Expr> partExprs = senderFragment.getOutputPartition().getPartitionExprs();
155  // All hash-partitioning senders must have compatible partition exprs, otherwise
156  // this fragment's data partition must not be hash partitioned.
157  Preconditions.checkState(
158  partExprs.size() == dataPartition_.getPartitionExprs().size());
159  senderPartitionExprs.add(partExprs);
160  }
161 
162  // Cast all corresponding hash partition exprs of all hash-partitioning senders
163  // to their compatible types. Also cast the data partition's exprs for consistency,
164  // although not strictly necessary. They should already be type identical to the
165  // exprs of one of the senders and they are not directly used for hashing in the BE.
166  senderPartitionExprs.add(dataPartition_.getPartitionExprs());
167  try {
168  analyzer.castToUnionCompatibleTypes(senderPartitionExprs);
169  } catch (AnalysisException e) {
170  // Should never happen. Analysis should have ensured type compatibility already.
171  throw new IllegalStateException(e);
172  }
173  }
174 
179  public int getNumNodes() {
180  return dataPartition_ == DataPartition.UNPARTITIONED ? 1 : planRoot_.getNumNodes();
181  }
182 
195  private boolean computeCanAddSlotFilters(PlanNode node) {
196  if (node instanceof HashJoinNode) {
197  HashJoinNode hashJoinNode = (HashJoinNode)node;
198  boolean childResult = computeCanAddSlotFilters(node.getChild(0));
199  if (!childResult) return false;
200  if (hashJoinNode.getJoinOp().equals(JoinOperator.FULL_OUTER_JOIN) ||
201  hashJoinNode.getJoinOp().equals(JoinOperator.LEFT_OUTER_JOIN) ||
202  hashJoinNode.getJoinOp().equals(JoinOperator.LEFT_ANTI_JOIN) ||
203  hashJoinNode.getJoinOp().equals(JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN)) {
204  // It is not correct to push through an outer or anti join on the probe side.
205  // We cannot filter those rows out.
206  return false;
207  }
208  // We can't push down predicates for partitioned joins yet.
209  // TODO: this can be hugely helpful to avoid network traffic. Implement this.
210  if (hashJoinNode.getDistributionMode() == DistributionMode.PARTITIONED) {
211  return false;
212  }
213 
214  List<BinaryPredicate> joinConjuncts = hashJoinNode.getEqJoinConjuncts();
215  // We can only add these filters for conjuncts of the form:
216  // <probe_slot> = *. If the hash join has any equal join conjuncts in this form,
217  // mark the hash join node.
218  for (Expr c: joinConjuncts) {
219  if (c.getChild(0) instanceof SlotRef) {
220  hashJoinNode.setAddProbeFilters(true);
221  break;
222  }
223  }
224  // Even if this join cannot add predicates, return true so the parent node can.
225  return true;
226  } else if (node instanceof HdfsScanNode) {
227  // Since currently only the Parquet scanner employs the slot filter optimization,
228  // we enable it only if the majority format is Parquet. Otherwise we are adding
229  // the overhead of creating the SlotFilters in the build side in queries not on
230  // Parquet data.
231  // TODO: Modify the other scanners to exploit the slot filter optimization.
232  HdfsScanNode scanNode = (HdfsScanNode) node;
233  Preconditions.checkNotNull(scanNode.desc_);
234  Preconditions.checkNotNull(scanNode.desc_.getTable() instanceof HdfsTable);
235  HdfsTable table = (HdfsTable) scanNode.desc_.getTable();
236  if (table.getMajorityFormat() == HdfsFileFormat.PARQUET) {
237  return true;
238  } else {
239  return false;
240  }
241  } else {
242  for (PlanNode child : node.getChildren()) {
244  }
245  return false;
246  }
247  }
248 
254  public long getNumDistinctValues(List<Expr> exprs) {
255  Preconditions.checkNotNull(dataPartition_);
256  long result = 1;
257  int numNodes = getNumNodes();
258  Preconditions.checkState(numNodes >= 0);
259  // The number of nodes is zero for empty tables.
260  if (numNodes == 0) return 0;
261  for (Expr expr: exprs) {
262  long numDistinct = expr.getNumDistinctValues();
263  if (numDistinct == -1) {
264  result = -1;
265  break;
266  }
267  if (dataPartition_.getPartitionExprs().contains(expr)) {
268  numDistinct = (long)Math.max((double) numDistinct / (double) numNodes, 1L);
269  }
270  result = PlanNode.multiplyCardinalities(result, numDistinct);
271  }
272  return result;
273  }
274 
275  public TPlanFragment toThrift() {
276  TPlanFragment result = new TPlanFragment();
277  result.setDisplay_name(fragmentId_.toString());
278  if (planRoot_ != null) result.setPlan(planRoot_.treeToThrift());
279  if (outputExprs_ != null) {
280  result.setOutput_exprs(Expr.treesToThrift(outputExprs_));
281  }
282  if (sink_ != null) result.setOutput_sink(sink_.toThrift());
283  result.setPartition(dataPartition_.toThrift());
284  return result;
285  }
286 
287  public String getExplainString(TExplainLevel explainLevel) {
288  StringBuilder str = new StringBuilder();
289  Preconditions.checkState(dataPartition_ != null);
290  String rootPrefix = "";
291  String prefix = "";
292  String detailPrefix = "| ";
293  if (explainLevel == TExplainLevel.VERBOSE) {
294  prefix = " ";
295  rootPrefix = " ";
296  detailPrefix = prefix + "| ";
297  str.append(String.format("%s:PLAN FRAGMENT [%s]\n", fragmentId_.toString(),
299  if (sink_ != null && sink_ instanceof DataStreamSink) {
300  str.append(sink_.getExplainString(prefix, detailPrefix, explainLevel) + "\n");
301  }
302  }
303  // Always print table sinks.
304  if (sink_ != null && sink_ instanceof TableSink) {
305  str.append(sink_.getExplainString(prefix, detailPrefix, explainLevel));
306  if (explainLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
307  str.append(prefix + "|\n");
308  }
309  }
310  if (planRoot_ != null) {
311  str.append(planRoot_.getExplainString(rootPrefix, prefix, explainLevel));
312  }
313  return str.toString();
314  }
315 
317  public boolean isPartitioned() {
318  return (dataPartition_.getType() != TPartitionType.UNPARTITIONED);
319  }
320 
321  public PlanFragmentId getId() { return fragmentId_; }
323  if (destNode_ == null) return null;
324  return destNode_.getFragment();
325  }
326  public ExchangeNode getDestNode() { return destNode_; }
328  public void setDataPartition(DataPartition dataPartition) {
329  this.dataPartition_ = dataPartition;
330  }
332  public void setOutputPartition(DataPartition outputPartition) {
333  this.outputPartition_ = outputPartition;
334  }
335  public PlanNode getPlanRoot() { return planRoot_; }
336  public void setPlanRoot(PlanNode root) {
337  planRoot_ = root;
339  }
340 
341  public void setDestination(ExchangeNode destNode) { destNode_ = destNode; }
342  public boolean hasSink() { return sink_ != null; }
343  public DataSink getSink() { return sink_; }
344  public void setSink(DataSink sink) {
345  Preconditions.checkState(this.sink_ == null);
346  Preconditions.checkNotNull(sink);
347  sink.setFragment(this);
348  this.sink_ = sink;
349  }
350 
355  public void addPlanRoot(PlanNode newRoot) {
356  Preconditions.checkState(newRoot.getChildren().size() == 1);
357  newRoot.setChild(0, planRoot_);
358  planRoot_ = newRoot;
359  planRoot_.setFragment(this);
360  }
361 }
long getNumDistinctValues(List< Expr > exprs)
static final DataPartition UNPARTITIONED
void setDestination(ExchangeNode destNode)
void setOutputPartition(DataPartition outputPartition)
void setDataPartition(DataPartition dataPartition)
String getExplainString(TExplainLevel explainLevel)
final TupleDescriptor desc_
Definition: ScanNode.java:33
PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition)
boolean computeCanAddSlotFilters(PlanNode node)
void setOutputExprs(List< Expr > outputExprs)