Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
HashJoinNode.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.planner;
16 
17 import java.util.List;
18 
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21 
35 import com.cloudera.impala.thrift.TEqJoinCondition;
36 import com.cloudera.impala.thrift.TExplainLevel;
37 import com.cloudera.impala.thrift.THashJoinNode;
38 import com.cloudera.impala.thrift.TPlanNode;
39 import com.cloudera.impala.thrift.TPlanNodeType;
40 import com.cloudera.impala.thrift.TQueryOptions;
41 import com.google.common.base.Objects;
42 import com.google.common.base.Preconditions;
43 import com.google.common.collect.Lists;
44 
50 public class HashJoinNode extends PlanNode {
51  private final static Logger LOG = LoggerFactory.getLogger(HashJoinNode.class);
52 
53  // Default per-host memory requirement used if no valid stats are available.
54  // TODO: Come up with a more useful heuristic (e.g., based on scanned partitions).
55  private final static long DEFAULT_PER_HOST_MEM = 2L * 1024L * 1024L * 1024L;
56 
57  // tableRef corresponding to the left or right child of this join; only used for
58  // getting the plan hints of this join, so it's irrelevant which child exactly
59  private final TableRef tblRef_;
60  private final JoinOperator joinOp_;
61 
63  NONE("NONE"),
64  BROADCAST("BROADCAST"),
65  PARTITIONED("PARTITIONED");
66 
67  private final String description;
68 
69  private DistributionMode(String descr) {
70  this.description = descr;
71  }
72 
73  @Override
74  public String toString() { return description; }
75  }
76 
78 
79  // conjuncts_ of the form "<lhs> = <rhs>"
80  private List<BinaryPredicate> eqJoinConjuncts_;
81 
82  // join conjuncts_ from the JOIN clause that aren't equi-join predicates
83  private List<Expr> otherJoinConjuncts_;
84 
85  // If true, this node can add filters for the probe side that can be generated
86  // after reading the build side. This can be very helpful if the join is selective and
87  // there are few build rows.
88  private boolean addProbeFilters_;
89 
90  public HashJoinNode(
91  PlanNode outer, PlanNode inner, TableRef tblRef,
92  List<BinaryPredicate> eqJoinConjuncts, List<Expr> otherJoinConjuncts) {
93  super("HASH JOIN");
94  Preconditions.checkArgument(eqJoinConjuncts != null);
95  Preconditions.checkArgument(otherJoinConjuncts != null);
96  tblRef_ = tblRef;
97  joinOp_ = tblRef.getJoinOp();
98 
99  // Only retain the non-semi-joined tuples of the inputs.
100  switch (joinOp_) {
101  case LEFT_ANTI_JOIN:
102  case LEFT_SEMI_JOIN:
103  case NULL_AWARE_LEFT_ANTI_JOIN: {
104  tupleIds_.addAll(outer.getTupleIds());
105  break;
106  }
107  case RIGHT_ANTI_JOIN:
108  case RIGHT_SEMI_JOIN: {
109  tupleIds_.addAll(inner.getTupleIds());
110  break;
111  }
112  default: {
113  tupleIds_.addAll(outer.getTupleIds());
114  tupleIds_.addAll(inner.getTupleIds());
115  break;
116  }
117  }
118  tblRefIds_.addAll(outer.getTblRefIds());
119  tblRefIds_.addAll(inner.getTblRefIds());
120 
122  eqJoinConjuncts_ = eqJoinConjuncts;
123  otherJoinConjuncts_ = otherJoinConjuncts;
124  children_.add(outer);
125  children_.add(inner);
126 
127  // Inherits all the nullable tuple from the children
128  // Mark tuples that form the "nullable" side of the outer join as nullable.
129  nullableTupleIds_.addAll(inner.getNullableTupleIds());
130  nullableTupleIds_.addAll(outer.getNullableTupleIds());
131  if (joinOp_.equals(JoinOperator.FULL_OUTER_JOIN)) {
132  nullableTupleIds_.addAll(outer.getTupleIds());
133  nullableTupleIds_.addAll(inner.getTupleIds());
134  } else if (joinOp_.equals(JoinOperator.LEFT_OUTER_JOIN)) {
135  nullableTupleIds_.addAll(inner.getTupleIds());
136  } else if (joinOp_.equals(JoinOperator.RIGHT_OUTER_JOIN)) {
137  nullableTupleIds_.addAll(outer.getTupleIds());
138  }
139  }
140 
141  public List<BinaryPredicate> getEqJoinConjuncts() { return eqJoinConjuncts_; }
142  public JoinOperator getJoinOp() { return joinOp_; }
143  public TableRef getTableRef() { return tblRef_; }
145  public void setDistributionMode(DistributionMode distrMode) { distrMode_ = distrMode; }
146  public void setAddProbeFilters(boolean b) { addProbeFilters_ = true; }
147 
148  @Override
149  public void init(Analyzer analyzer) throws InternalException {
150  assignConjuncts(analyzer);
151 
152  // Set smap to the combined childrens' smaps and apply that to all conjuncts_.
153  createDefaultSmap(analyzer);
154 
155  computeStats(analyzer);
156  assignedConjuncts_ = analyzer.getAssignedConjuncts();
157 
158  ExprSubstitutionMap combinedChildSmap = getCombinedChildSmap();
160  Expr.substituteList(otherJoinConjuncts_, combinedChildSmap, analyzer, false);
161 
162  List<BinaryPredicate> newEqJoinConjuncts = Lists.newArrayList();
163  for (Expr c: eqJoinConjuncts_) {
164  BinaryPredicate eqPred =
165  (BinaryPredicate) c.substitute(combinedChildSmap, analyzer, false);
166  Type t0 = eqPred.getChild(0).getType();
167  Type t1 = eqPred.getChild(1).getType();
168  if (!t0.matchesType(t1)) {
169  // With decimal and char types, the child types do not have to match because
170  // the equality builtin handles it. However, they will not hash correctly so
171  // insert a cast.
172  boolean bothDecimal = t0.isDecimal() && t1.isDecimal();
173  boolean bothString = t0.isStringType() && t1.isStringType();
174  if (!bothDecimal && !bothString) {
175  throw new InternalException("Cannot compare " +
176  t0.toSql() + " to " + t1.toSql() + " in join predicate.");
177  }
178  Type compatibleType = Type.getAssignmentCompatibleType(t0, t1);
179  Preconditions.checkState(compatibleType.isDecimal() ||
180  compatibleType.isStringType());
181  try {
182  if (!t0.equals(compatibleType)) {
183  eqPred.setChild(0, eqPred.getChild(0).castTo(compatibleType));
184  }
185  if (!t1.equals(compatibleType)) {
186  eqPred.setChild(1, eqPred.getChild(1).castTo(compatibleType));
187  }
188  } catch (AnalysisException e) {
189  throw new InternalException("Should not happen", e);
190  }
191  }
192  Preconditions.checkState(
193  eqPred.getChild(0).getType().matchesType(eqPred.getChild(1).getType()));
194  newEqJoinConjuncts.add(new BinaryPredicate(eqPred.getOp(),
195  eqPred.getChild(0), eqPred.getChild(1)));
196  }
197  eqJoinConjuncts_ = newEqJoinConjuncts;
198  }
199 
222  private long getJoinCardinality(Analyzer analyzer) {
223  Preconditions.checkState(
224  joinOp_ == JoinOperator.INNER_JOIN || joinOp_.isOuterJoin());
225  long maxNumDistinct = 0;
226  for (Expr eqJoinPredicate: eqJoinConjuncts_) {
227  if (eqJoinPredicate.getChild(0).unwrapSlotRef(false) == null) continue;
228  SlotRef rhsSlotRef = eqJoinPredicate.getChild(1).unwrapSlotRef(false);
229  if (rhsSlotRef == null) continue;
230  SlotDescriptor slotDesc = rhsSlotRef.getDesc();
231  if (slotDesc == null) continue;
232  ColumnStats stats = slotDesc.getStats();
233  if (!stats.hasNumDistinctValues()) continue;
234  long numDistinct = stats.getNumDistinctValues();
235  Table rhsTbl = slotDesc.getParent().getTable();
236  if (rhsTbl != null && rhsTbl.getNumRows() != -1) {
237  // we can't have more distinct values than rows in the table, even though
238  // the metastore stats may think so
239  LOG.debug("#distinct=" + numDistinct + " #rows="
240  + Long.toString(rhsTbl.getNumRows()));
241  numDistinct = Math.min(numDistinct, rhsTbl.getNumRows());
242  }
243  if (getChild(1).cardinality_ != -1 && numDistinct != -1) {
244  // The number of distinct values of a slot cannot exceed the cardinality_
245  // of the plan node the slot is coming from.
246  numDistinct = Math.min(numDistinct, getChild(1).cardinality_);
247  }
248  maxNumDistinct = Math.max(maxNumDistinct, numDistinct);
249  LOG.debug("min slotref=" + rhsSlotRef.toSql() + " #distinct="
250  + Long.toString(numDistinct));
251  }
252 
253  long result = -1;
254  if (maxNumDistinct == 0) {
255  // if we didn't find any suitable join predicates or don't have stats
256  // on the relevant columns, we very optimistically assume we're doing an
257  // FK/PK join (which doesn't alter the cardinality of the left-hand side)
258  result = getChild(0).cardinality_;
259  } else if (getChild(0).cardinality_ != -1 && getChild(1).cardinality_ != -1) {
260  result = multiplyCardinalities(getChild(0).cardinality_,
261  getChild(1).cardinality_);
262  result = Math.round((double)result / (double) maxNumDistinct);
263  }
264  return result;
265  }
266 
294  private long getSemiJoinCardinality() {
295  Preconditions.checkState(joinOp_.isSemiJoin());
296 
297  // Return -1 if the cardinality of the returned side is unknown.
298  long cardinality;
301  if (getChild(1).cardinality_ == -1) return -1;
302  cardinality = getChild(1).cardinality_;
303  } else {
304  if (getChild(0).cardinality_ == -1) return -1;
305  cardinality = getChild(0).cardinality_;
306  }
307  double minSelectivity = 1.0;
308  for (Expr eqJoinPredicate: eqJoinConjuncts_) {
309  long lhsNdv = getNdv(eqJoinPredicate.getChild(0));
310  lhsNdv = Math.min(lhsNdv, getChild(0).cardinality_);
311  long rhsNdv = getNdv(eqJoinPredicate.getChild(1));
312  rhsNdv = Math.min(rhsNdv, getChild(1).cardinality_);
313 
314  // Skip conjuncts with unknown NDV on either side.
315  if (lhsNdv == -1 || rhsNdv == -1) continue;
316 
317  double selectivity = 1.0;
318  switch (joinOp_) {
319  case LEFT_SEMI_JOIN: {
320  selectivity = (double) Math.min(lhsNdv, rhsNdv) / (double) (lhsNdv);
321  break;
322  }
323  case RIGHT_SEMI_JOIN: {
324  selectivity = (double) Math.min(lhsNdv, rhsNdv) / (double) (rhsNdv);
325  break;
326  }
327  case LEFT_ANTI_JOIN:
328  case NULL_AWARE_LEFT_ANTI_JOIN: {
329  selectivity = (double) Math.max(lhsNdv - rhsNdv, lhsNdv) / (double) lhsNdv;
330  break;
331  }
332  case RIGHT_ANTI_JOIN: {
333  selectivity = (double) Math.max(rhsNdv - lhsNdv, rhsNdv) / (double) rhsNdv;
334  break;
335  }
336  default: Preconditions.checkState(false);
337  }
338  minSelectivity = Math.min(minSelectivity, selectivity);
339  }
340 
341  Preconditions.checkState(cardinality != -1);
342  return Math.round(cardinality * minSelectivity);
343  }
344 
349  private long getNdv(Expr expr) {
350  SlotRef slotRef = expr.unwrapSlotRef(false);
351  if (slotRef == null) return -1;
352  SlotDescriptor slotDesc = slotRef.getDesc();
353  if (slotDesc == null) return -1;
354  ColumnStats stats = slotDesc.getStats();
355  if (!stats.hasNumDistinctValues()) return -1;
356  return stats.getNumDistinctValues();
357  }
358 
359  @Override
360  public void computeStats(Analyzer analyzer) {
361  super.computeStats(analyzer);
362  if (joinOp_.isSemiJoin()) {
364  } else {
365  cardinality_ = getJoinCardinality(analyzer);
366  }
367 
368  // Impose lower/upper bounds on the cardinality based on the join type.
369  long leftCard = getChild(0).cardinality_;
370  long rightCard = getChild(1).cardinality_;
371  switch (joinOp_) {
372  case LEFT_SEMI_JOIN: {
373  if (leftCard != -1) {
374  cardinality_ = Math.min(leftCard, cardinality_);
375  }
376  break;
377  }
378  case RIGHT_SEMI_JOIN: {
379  if (rightCard != -1) {
380  cardinality_ = Math.min(rightCard, cardinality_);
381  }
382  break;
383  }
384  case LEFT_OUTER_JOIN: {
385  if (leftCard != -1) {
386  cardinality_ = Math.max(leftCard, cardinality_);
387  }
388  break;
389  }
390  case RIGHT_OUTER_JOIN: {
391  if (rightCard != -1) {
392  cardinality_ = Math.max(rightCard, cardinality_);
393  }
394  break;
395  }
396  case FULL_OUTER_JOIN: {
397  if (leftCard != -1 && rightCard != -1) {
398  long cardinalitySum = addCardinalities(leftCard, rightCard);
399  cardinality_ = Math.max(cardinalitySum, cardinality_);
400  }
401  break;
402  }
403  case LEFT_ANTI_JOIN:
404  case NULL_AWARE_LEFT_ANTI_JOIN: {
405  if (leftCard != -1) {
406  cardinality_ = Math.min(leftCard, cardinality_);
407  }
408  break;
409  }
410  case RIGHT_ANTI_JOIN: {
411  if (rightCard != -1) {
412  cardinality_ = Math.min(rightCard, cardinality_);
413  }
414  break;
415  }
416  }
417 
418  Preconditions.checkState(hasValidStats());
419  LOG.debug("stats HashJoin: cardinality=" + Long.toString(cardinality_));
420  }
421 
422  @Override
423  protected String debugString() {
424  return Objects.toStringHelper(this)
425  .add("eqJoinConjuncts_", eqJoinConjunctsDebugString())
426  .addValue(super.debugString())
427  .toString();
428  }
429 
430  private String eqJoinConjunctsDebugString() {
431  Objects.ToStringHelper helper = Objects.toStringHelper(this);
432  for (Expr entry: eqJoinConjuncts_) {
433  helper.add("lhs" , entry.getChild(0)).add("rhs", entry.getChild(1));
434  }
435  return helper.toString();
436  }
437 
438  @Override
439  protected void toThrift(TPlanNode msg) {
440  msg.node_type = TPlanNodeType.HASH_JOIN_NODE;
441  msg.hash_join_node = new THashJoinNode();
442  msg.hash_join_node.join_op = joinOp_.toThrift();
443  for (Expr entry: eqJoinConjuncts_) {
444  TEqJoinCondition eqJoinCondition =
445  new TEqJoinCondition(entry.getChild(0).treeToThrift(),
446  entry.getChild(1).treeToThrift());
447  msg.hash_join_node.addToEq_join_conjuncts(eqJoinCondition);
448  }
449  for (Expr e: otherJoinConjuncts_) {
450  msg.hash_join_node.addToOther_join_conjuncts(e.treeToThrift());
451  }
452  msg.hash_join_node.setAdd_probe_filters(addProbeFilters_);
453  }
454 
455  @Override
456  protected String getDisplayLabelDetail() {
457  StringBuilder output = new StringBuilder(joinOp_.toString());
458  if (distrMode_ != DistributionMode.NONE) output.append(", " + distrMode_.toString());
459  return output.toString();
460  }
461 
462  @Override
463  protected String getNodeExplainString(String prefix, String detailPrefix,
464  TExplainLevel detailLevel) {
465  StringBuilder output = new StringBuilder();
466  output.append(String.format("%s%s [%s]\n", prefix, getDisplayLabel(),
468 
469  if (detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
470  output.append(detailPrefix + "hash predicates: ");
471  for (int i = 0; i < eqJoinConjuncts_.size(); ++i) {
472  Expr eqConjunct = eqJoinConjuncts_.get(i);
473  output.append(eqConjunct.toSql());
474  if (i + 1 != eqJoinConjuncts_.size()) output.append(", ");
475  }
476  output.append("\n");
477  if (!otherJoinConjuncts_.isEmpty()) {
478  output.append(detailPrefix + "other join predicates: ")
479  .append(getExplainString(otherJoinConjuncts_) + "\n");
480  }
481  if (!conjuncts_.isEmpty()) {
482  output.append(detailPrefix + "other predicates: ")
483  .append(getExplainString(conjuncts_) + "\n");
484  }
485  }
486  return output.toString();
487  }
488 
489  @Override
490  public void computeCosts(TQueryOptions queryOptions) {
491  if (getChild(1).getCardinality() == -1 || getChild(1).getAvgRowSize() == -1
492  || numNodes_ == 0) {
494  return;
495  }
497  (long) Math.ceil(getChild(1).cardinality_ * getChild(1).avgRowSize_
500  }
501 }
void assignConjuncts(Analyzer analyzer)
Definition: PlanNode.java:401
ExprSubstitutionMap getCombinedChildSmap()
Definition: PlanNode.java:410
long getJoinCardinality(Analyzer analyzer)
boolean matchesType(Type t)
Definition: Type.java:218
String getNodeExplainString(String prefix, String detailPrefix, TExplainLevel detailLevel)
void setDistributionMode(DistributionMode distrMode)
HashJoinNode(PlanNode outer, PlanNode inner, TableRef tblRef, List< BinaryPredicate > eqJoinConjuncts, List< Expr > otherJoinConjuncts)
static long multiplyCardinalities(long a, long b)
Definition: PlanNode.java:541
void computeCosts(TQueryOptions queryOptions)
List< BinaryPredicate > eqJoinConjuncts_
Expr substitute(ExprSubstitutionMap smap, Analyzer analyzer, boolean preserveRootType)
Definition: Expr.java:710
List< BinaryPredicate > getEqJoinConjuncts()
void createDefaultSmap(Analyzer analyzer)
Definition: PlanNode.java:425
static long addCardinalities(long a, long b)
Definition: PlanNode.java:528