Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
partitioned-hash-join-node-ir.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include "codegen/impala-ir.h"
18 #include "exec/hash-table.inline.h"
19 #include "runtime/row-batch.h"
20 
21 #include "common/names.h"
22 
23 using namespace impala;
24 
25 // Wrapper around ExecNode's eval conjuncts with a different function name.
26 // This lets us distinguish between the join conjuncts vs. non-join conjuncts
27 // for codegen.
28 // Note: don't declare this static. LLVM will pick the fastcc calling convention and
29 // we will not be able to replace the functions with codegen'd versions.
30 // TODO: explicitly set the calling convention?
31 // TODO: investigate using fastcc for all codegen internal functions?
33  ExprContext* const* ctxs, int num_ctxs, TupleRow* row) {
34  return ExecNode::EvalConjuncts(ctxs, num_ctxs, row);
35 }
36 
37 // CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by
38 // codegen.
39 template<int const JoinOp>
41  RowBatch* out_batch, HashTableCtx* ht_ctx) {
42  ExprContext* const* other_join_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
43  const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size();
45  const int num_conjuncts = conjunct_ctxs_.size();
46 
47  DCHECK(!out_batch->AtCapacity());
48  TupleRow* out_row = out_batch->GetRow(out_batch->AddRow());
49  const int max_rows = out_batch->capacity() - out_batch->num_rows();
50  int num_rows_added = 0;
51 
52  while (probe_batch_pos_ >= 0) {
53  if (current_probe_row_ != NULL) {
54  while (!hash_tbl_iterator_.AtEnd()) {
55  TupleRow* matched_build_row = hash_tbl_iterator_.GetRow();
56  DCHECK(matched_build_row != NULL);
57 
58  if ((JoinOp == TJoinOp::RIGHT_SEMI_JOIN || JoinOp == TJoinOp::RIGHT_ANTI_JOIN) &&
61  continue;
62  }
63 
64  if (JoinOp == TJoinOp::LEFT_ANTI_JOIN || JoinOp == TJoinOp::LEFT_SEMI_JOIN ||
65  JoinOp == TJoinOp::RIGHT_ANTI_JOIN || JoinOp == TJoinOp::RIGHT_SEMI_JOIN ||
66  JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
67  // Evaluate the non-equi-join conjuncts against a temp row assembled from all
68  // build and probe tuples.
69  if (num_other_join_conjuncts > 0) {
71  matched_build_row);
72  if (!EvalOtherJoinConjuncts(other_join_conjunct_ctxs,
73  num_other_join_conjuncts, semi_join_staging_row_)) {
75  continue;
76  }
77  }
78 
79  // Create output row assembled from build xor probe tuples.
80  if (JoinOp == TJoinOp::LEFT_ANTI_JOIN || JoinOp == TJoinOp::LEFT_SEMI_JOIN ||
81  JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
82  out_batch->CopyRow(current_probe_row_, out_row);
83  } else {
84  out_batch->CopyRow(matched_build_row, out_row);
85  }
86  } else {
87  // Not a semi join; create an output row with all probe/build tuples and
88  // evaluate the non-equi-join conjuncts.
89  CreateOutputRow(out_row, current_probe_row_, matched_build_row);
90  if (!EvalOtherJoinConjuncts(other_join_conjunct_ctxs, num_other_join_conjuncts,
91  out_row)) {
93  continue;
94  }
95  }
96 
97  // At this point the probe is considered matched.
98  matched_probe_ = true;
99  if (JoinOp == TJoinOp::LEFT_ANTI_JOIN ||
100  JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
101  // We can safely ignore this probe row for left anti joins.
103  goto next_row;
104  }
105 
106  // Update hash_tbl_iterator.
107  if (JoinOp == TJoinOp::LEFT_SEMI_JOIN) {
109  } else {
110  if (JoinOp == TJoinOp::RIGHT_OUTER_JOIN || JoinOp == TJoinOp::RIGHT_ANTI_JOIN ||
111  JoinOp == TJoinOp::FULL_OUTER_JOIN || JoinOp == TJoinOp::RIGHT_SEMI_JOIN) {
112  // There is a match for this build row. Mark the Bucket or the DuplicateNode
113  // as matched for right/full joins.
115  }
117  }
118 
119  if ((JoinOp != TJoinOp::RIGHT_ANTI_JOIN) &&
120  ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) {
121  ++num_rows_added;
122  out_row = out_row->next_row(out_batch);
123  if (num_rows_added == max_rows) goto end;
124  }
125  }
126 
127  if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !matched_probe_) {
128  // Null aware behavior. The probe row did not match in the hash table so we
129  // should interpret the hash table probe as "unknown" if there are nulls on the
130  // build size. For those rows, we need to process the remaining join
131  // predicates later.
132  if (null_aware_partition_->build_rows()->num_rows() != 0) {
133  if (num_other_join_conjuncts == 0) goto next_row;
136  return -1;
137  }
138  goto next_row;
139  }
140  }
141 
142  if ((JoinOp == TJoinOp::LEFT_OUTER_JOIN || JoinOp == TJoinOp::FULL_OUTER_JOIN) &&
143  !matched_probe_) {
144  // No match for this row, we need to output it.
145  CreateOutputRow(out_row, current_probe_row_, NULL);
146  if (ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) {
147  ++num_rows_added;
148  matched_probe_ = true;
149  out_row = out_row->next_row(out_batch);
150  if (num_rows_added == max_rows) goto end;
151  }
152  }
153  if ((JoinOp == TJoinOp::LEFT_ANTI_JOIN ||
154  JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) &&
155  !matched_probe_) {
156  // No match for this current_probe_row_, we need to output it. No need to
157  // evaluate the conjunct_ctxs since semi joins cannot have any.
158  out_batch->CopyRow(current_probe_row_, out_row);
159  ++num_rows_added;
160  matched_probe_ = true;
161  out_row = out_row->next_row(out_batch);
162  if (num_rows_added == max_rows) goto end;
163  }
164  }
165 
166 next_row:
167  // Must have reached the end of the hash table iterator for the current row before
168  // moving to the row.
169  DCHECK(hash_tbl_iterator_.AtEnd());
170 
171  if (UNLIKELY(probe_batch_pos_ == probe_batch_->num_rows())) {
172  // Finished this batch.
173  current_probe_row_ = NULL;
174  goto end;
175  }
176 
177  // Establish current_probe_row_ and find its corresponding partition.
179  matched_probe_ = false;
180  uint32_t hash;
181  if (!ht_ctx->EvalAndHashProbe(current_probe_row_, &hash)) {
182  if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
183  // For NAAJ, we need to treat NULLs on the probe carefully. The logic is:
184  // 1. No build rows -> Return this row.
185  // 2. Has build rows & no other join predicates, skip row.
186  // 3. Has build rows & other join predicates, we need to evaluate against all
187  // build rows. First evaluate it against this partition, and if there is not
188  // a match, save it to evaluate against other partitions later. If there
189  // is a match, the row is skipped.
190  if (!non_empty_build_) continue;
191  if (num_other_join_conjuncts == 0) goto next_row;
194  return -1;
195  }
196  matched_null_probe_.push_back(false);
197  goto next_row;
198  }
199  continue;
200  }
201  const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
202  if (LIKELY(hash_tbls_[partition_idx] != NULL)) {
203  hash_tbl_iterator_= hash_tbls_[partition_idx]->Find(ht_ctx, hash);
204  } else {
205  Partition* partition = hash_partitions_[partition_idx];
206  if (UNLIKELY(partition->is_closed())) {
207  // This partition is closed, meaning the build side for this partition was empty.
208  DCHECK_EQ(state_, PROCESSING_PROBE);
209  } else {
210  // This partition is not in memory, spill the probe row and move to the next row.
211  DCHECK(partition->is_spilled());
212  DCHECK(partition->probe_rows() != NULL);
213  if (UNLIKELY(!AppendRow(partition->probe_rows(), current_probe_row_))) {
214  status_ = partition->probe_rows()->status();
215  return -1;
216  }
217  goto next_row;
218  }
219  }
220  }
221 
222 end:
223  DCHECK_LE(num_rows_added, max_rows);
224  return num_rows_added;
225 }
226 
228  const TJoinOp::type join_op, RowBatch* out_batch, HashTableCtx* ht_ctx) {
229  switch (join_op) {
230  case TJoinOp::INNER_JOIN:
231  return ProcessProbeBatch<TJoinOp::INNER_JOIN>(out_batch, ht_ctx);
232  case TJoinOp::LEFT_OUTER_JOIN:
233  return ProcessProbeBatch<TJoinOp::LEFT_OUTER_JOIN>(out_batch, ht_ctx);
234  case TJoinOp::LEFT_SEMI_JOIN:
235  return ProcessProbeBatch<TJoinOp::LEFT_SEMI_JOIN>(out_batch, ht_ctx);
236  case TJoinOp::LEFT_ANTI_JOIN:
237  return ProcessProbeBatch<TJoinOp::LEFT_ANTI_JOIN>(out_batch, ht_ctx);
238  case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN:
239  return ProcessProbeBatch<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>(out_batch, ht_ctx);
240  case TJoinOp::RIGHT_OUTER_JOIN:
241  return ProcessProbeBatch<TJoinOp::RIGHT_OUTER_JOIN>(out_batch, ht_ctx);
242  case TJoinOp::RIGHT_SEMI_JOIN:
243  return ProcessProbeBatch<TJoinOp::RIGHT_SEMI_JOIN>(out_batch, ht_ctx);
244  case TJoinOp::RIGHT_ANTI_JOIN:
245  return ProcessProbeBatch<TJoinOp::RIGHT_ANTI_JOIN>(out_batch, ht_ctx);
246  case TJoinOp::FULL_OUTER_JOIN:
247  return ProcessProbeBatch<TJoinOp::FULL_OUTER_JOIN>(out_batch, ht_ctx);
248  default:
249  DCHECK(false) << "Unknown join type";
250  return -1;
251  }
252 }
253 
255  for (int i = 0; i < build_batch->num_rows(); ++i) {
256  TupleRow* build_row = build_batch->GetRow(i);
257  uint32_t hash;
258  if (!ht_ctx_->EvalAndHashBuild(build_row, &hash)) {
259  if (null_aware_partition_ != NULL) {
260  // TODO: remove with codegen/template
261  // If we are NULL aware and this build row has NULL in the eq join slot,
262  // append it to the null_aware partition. We will need it later.
263  if (!null_aware_partition_->build_rows()->AddRow(build_row)) {
265  }
266  }
267  continue;
268  }
269  const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
270  Partition* partition = hash_partitions_[partition_idx];
271  const bool result = AppendRow(partition->build_rows(), build_row);
272  if (UNLIKELY(!result)) return status_;
273  }
274  return Status::OK;
275 }
#define IR_NO_INLINE
Definition: impala-ir.h:30
bool AtEnd() const
Returns true if this iterator is at the end, i.e. GetRow() cannot be called.
Definition: hash-table.h:492
bool AppendRow(BufferedTupleStream *stream, TupleRow *row)
int num_rows() const
Definition: row-batch.h:215
bool non_empty_build_
If true, the build side has at least one row.
void SetAtEnd()
Resets everything but the pointer to the hash table.
std::vector< ExprContext * > other_join_conjunct_ctxs_
Non-equi-join conjuncts from the JOIN clause.
void CreateOutputRow(TupleRow *out_row, TupleRow *probe_row, TupleRow *build_row)
Status ProcessBuildBatch(RowBatch *build_batch)
Reads the rows in build_batch and partitions them in hash_partitions_.
const StringSearch UrlParser::hash_search & hash
Definition: url-parser.cc:41
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
HashTable * hash_tbls_[PARTITION_FANOUT]
bool AtCapacity()
Definition: row-batch.h:120
void IR_ALWAYS_INLINE NextDuplicate()
const std::vector< ExprContext * > & conjunct_ctxs() const
Definition: exec-node.h:152
std::vector< Partition * > hash_partitions_
bool IR_NO_INLINE EvalOtherJoinConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
int capacity() const
Definition: row-batch.h:216
#define UNLIKELY(expr)
Definition: compiler-util.h:33
void CopyRow(TupleRow *src, TupleRow *dest)
Definition: row-batch.h:173
bool AddRow(TupleRow *row, uint8_t **dst=NULL)
Iterator IR_ALWAYS_INLINE Find(HashTableCtx *ht_ctx, uint32_t hash)
static const Status OK
Definition: status.h:87
#define LIKELY(expr)
Definition: compiler-util.h:32
int64_t num_rows() const
Number of rows in the stream.
int ProcessProbeBatch(RowBatch *out_batch, HashTableCtx *ht_ctx)
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
Definition: exec-node.cc:393
boost::scoped_ptr< HashTableCtx > ht_ctx_
boost::scoped_ptr< RowBatch > probe_batch_
HashTable::Iterator hash_tbl_iterator_
The iterator that corresponds to the look up of current_probe_row_.
std::vector< ExprContext * > conjunct_ctxs_
Definition: exec-node.h:212
State state_
State of the algorithm. Used just for debugging.
bool IR_ALWAYS_INLINE EvalAndHashProbe(TupleRow *row, uint32_t *hash)
static const int NUM_PARTITIONING_BITS
Needs to be the log(PARTITION_FANOUT)