Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hash-join-node.cc.LOCAL.32326.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/hash-join-node.h"
16 
17 #include <sstream>
18 
19 #include "codegen/llvm-codegen.h"
20 #include "exec/hash-table.inline.h"
21 #include "exprs/expr.h"
22 #include "runtime/row-batch.h"
23 #include "runtime/runtime-state.h"
24 #include "util/debug-util.h"
25 #include "util/runtime-profile.h"
26 
27 #include "gen-cpp/PlanNodes_types.h"
28 
29 using namespace boost;
30 using namespace impala;
31 using namespace llvm;
32 using namespace std;
33 
34 const char* HashJoinNode::LLVM_CLASS_NAME = "class.impala::HashJoinNode";
35 
36 HashJoinNode::HashJoinNode(
37  ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
38  : ExecNode(pool, tnode, descs),
39  join_op_(tnode.hash_join_node.join_op),
40  codegen_process_build_batch_fn_(NULL),
41  process_build_batch_fn_(NULL),
42  codegen_process_probe_batch_fn_(NULL),
43  process_probe_batch_fn_(NULL) {
44  // TODO: log errors in runtime state
45  Status status = Init(pool, tnode);
46  DCHECK(status.ok())
47  << "HashJoinNode c'tor: Init() failed:\n"
48  << status.GetErrorMsg();
49 
51  (join_op_ == TJoinOp::LEFT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN);
52  match_one_build_ = (join_op_ == TJoinOp::LEFT_SEMI_JOIN);
54  (join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN);
55 }
56 
57 Status HashJoinNode::Init(ObjectPool* pool, const TPlanNode& tnode) {
58  DCHECK(tnode.__isset.hash_join_node);
59  const vector<TEqJoinCondition>& eq_join_conjuncts =
60  tnode.hash_join_node.eq_join_conjuncts;
61  for (int i = 0; i < eq_join_conjuncts.size(); ++i) {
62  Expr* expr;
63  RETURN_IF_ERROR(Expr::CreateExprTree(pool, eq_join_conjuncts[i].left, &expr));
64  probe_exprs_.push_back(expr);
65  RETURN_IF_ERROR(Expr::CreateExprTree(pool, eq_join_conjuncts[i].right, &expr));
66  build_exprs_.push_back(expr);
67  }
69  Expr::CreateExprTrees(pool, tnode.hash_join_node.other_join_conjuncts,
70  &other_join_conjuncts_));
71  return Status::OK;
72 }
73 
74 HashJoinNode::~HashJoinNode() {
75  // probe_batch_ must be cleaned up in Close() to ensure proper resource freeing.
76  DCHECK(probe_batch_ == NULL);
77 }
78 
81 
82  build_pool_.reset(new MemPool(state->mem_limits())),
83  build_timer_ =
84  ADD_TIMER(runtime_profile(), "BuildTime");
85  probe_timer_ =
86  ADD_TIMER(runtime_profile(), "ProbeTime");
88  ADD_COUNTER(runtime_profile(), "BuildRows", TCounterType::UNIT);
90  ADD_COUNTER(runtime_profile(), "BuildBuckets", TCounterType::UNIT);
92  ADD_COUNTER(runtime_profile(), "ProbeRows", TCounterType::UNIT);
94  ADD_COUNTER(runtime_profile(), "LoadFactor", TCounterType::DOUBLE_VALUE);
95 
96  // build and probe exprs are evaluated in the context of the rows produced by our
97  // right and left children, respectively
98  RETURN_IF_ERROR(Expr::Prepare(build_exprs_, state, child(1)->row_desc(), false));
99  RETURN_IF_ERROR(Expr::Prepare(probe_exprs_, state, child(0)->row_desc(), false));
100 
101  // other_join_conjuncts_ are evaluated in the context of the rows produced by this node
102  RETURN_IF_ERROR(Expr::Prepare(other_join_conjuncts_, state, row_descriptor_, false));
103 
104  result_tuple_row_size_ = row_descriptor_.tuple_descriptors().size() * sizeof(Tuple*);
105 
106  // pre-compute the tuple index of build tuples in the output row
107  build_tuple_size_ = child(1)->row_desc().tuple_descriptors().size();
108  build_tuple_idx_.reserve(build_tuple_size_);
109  for (int i = 0; i < build_tuple_size_; ++i) {
110  TupleDescriptor* build_tuple_desc = child(1)->row_desc().tuple_descriptors()[i];
111  build_tuple_idx_.push_back(row_descriptor_.GetTupleIdx(build_tuple_desc->id()));
112  }
113 
114  // TODO: default buckets
115  hash_tbl_.reset(new HashTable(build_exprs_, probe_exprs_, build_tuple_size_,
116  false, id(), *state->mem_limits()));
117 
118  probe_batch_.reset(
119  new RowBatch(row_descriptor_, state->batch_size(), *state->mem_limits()));
120 
121  LlvmCodeGen* codegen = state->llvm_codegen();
122  if (codegen != NULL) {
123  // Codegen for hashing rows
124  Function* hash_fn = hash_tbl_->CodegenHashCurrentRow(codegen);
125  if (hash_fn == NULL) return Status::OK;
126 
127  // Codegen for build path
129 
130  // Codegen for probe path (only for left joins)
131  if (!match_all_build_) {
132  codegen_process_probe_batch_fn_ = CodegenProcessProbeBatch(codegen, hash_fn);
133  }
134  }
135  return Status::OK;
136 }
137 
139  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::CLOSE, state));
140  // Must reset probe_batch_ in Close() to release resources
141  probe_batch_.reset(NULL);
142  if (memory_used_counter_ != NULL && hash_tbl_.get() != NULL) {
143  COUNTER_UPDATE(memory_used_counter_, build_pool_->peak_allocated_bytes());
144  COUNTER_UPDATE(memory_used_counter_, hash_tbl_->byte_size());
145  }
146  return ExecNode::Close(state);
147 }
148 
149 void HashJoinNode::BuildSideThread(RuntimeState* state, promise<Status>* status) {
150  status->set_value(ConstructHashTable(state));
151  // Release the thread token as soon as possible (before the main thread joins
152  // on it). This way, if we had a chain of 10 joins using 1 additional thread,
153  // we'd keep the additional thread busy the whole time.
154  state->resource_pool()->ReleaseThreadToken(false);
155 }
156 
157 Status HashJoinNode::ConstructHashTable(RuntimeState* state) {
158  // Do a full scan of child(1) and store everything in hash_tbl_
159  // The hash join node needs to keep in memory all build tuples, including the tuple
160  // row ptrs. The row ptrs are copied into the hash table's internal structure so they
161  // don't need to be stored in the build_pool_.
162  RowBatch build_batch(child(1)->row_desc(), state->batch_size(), *state->mem_limits());
163  RETURN_IF_ERROR(child(1)->Open(state));
164  while (true) {
165  RETURN_IF_CANCELLED(state);
166  bool eos;
167  RETURN_IF_ERROR(child(1)->GetNext(state, &build_batch, &eos));
169  // take ownership of tuple data of build_batch
170  build_pool_->AcquireData(build_batch.tuple_data_pool(), false);
171  RETURN_IF_LIMIT_EXCEEDED(state);
172 
173  // Call codegen version if possible
174  if (process_build_batch_fn_ == NULL) {
175  ProcessBuildBatch(&build_batch);
176  } else {
177  process_build_batch_fn_(this, &build_batch);
178  }
179  VLOG_ROW << hash_tbl_->DebugString(true, &child(1)->row_desc());
180 
184  build_batch.Reset();
185  if (eos) break;
186  }
187  return Status::OK;
188 }
189 
191  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::OPEN, state));
192  SCOPED_TIMER(runtime_profile_->total_time_counter());
193  RETURN_IF_CANCELLED(state);
194 
195  if (codegen_process_build_batch_fn_ != NULL) {
196  void* jitted_process_build_batch =
197  state->llvm_codegen()->JitFunction(codegen_process_build_batch_fn_);
198  DCHECK(jitted_process_build_batch != NULL);
200  reinterpret_cast<ProcessBuildBatchFn>(jitted_process_build_batch);
201  AddRuntimeExecOption("Build Side Codegen Enabled");
202  }
203 
204  if (codegen_process_probe_batch_fn_ != NULL) {
205  void* jitted_process_probe_batch =
206  state->llvm_codegen()->JitFunction(codegen_process_probe_batch_fn_);
207  DCHECK(jitted_process_probe_batch != NULL);
209  reinterpret_cast<ProcessProbeBatchFn>(jitted_process_probe_batch);
210  AddRuntimeExecOption("Probe Side Codegen Enabled");
211  }
212 
213  eos_ = false;
214 
215  // TODO: fix problems with asynchronous cancellation
216  // Kick-off the construction of the build-side table in a separate
217  // thread, so that the left child can do any initialisation in parallel.
218  // Only do this if we can get a thread token. Otherwise, do this in the
219  // main thread
220  promise<Status> thread_status;
221  if (state->resource_pool()->TryAcquireThreadToken()) {
222  AddRuntimeExecOption("Hash Table Built Asynchronously");
223  thread(bind(&HashJoinNode::BuildSideThread, this, state, &thread_status));
224  } else {
225  thread_status.set_value(ConstructHashTable(state));
226  }
227 
228  // Open the probe-side child so that it may perform any initialisation in parallel.
229  // Don't exit even if we see an error, we still need to wait for the build thread
230  // to finish.
231  Status open_status = child(0)->Open(state);
232 
233  // Blocks until ConstructHashTable has returned, after which
234  // the hash table is fully constructed and we can start the probe
235  // phase.
236  RETURN_IF_ERROR(thread_status.get_future().get());
237 
238  VLOG_ROW << hash_tbl_->DebugString(true, &child(1)->row_desc());
239  RETURN_IF_ERROR(open_status);
240 
241  // seed probe batch and current_probe_row_, etc.
242  while (true) {
243  RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_eos_));
244  COUNTER_UPDATE(probe_row_counter_, probe_batch_->num_rows());
245  probe_batch_pos_ = 0;
246  if (probe_batch_->num_rows() == 0) {
247  if (probe_eos_) {
248  hash_tbl_iterator_ = hash_tbl_->Begin();
249  eos_ = true;
250  break;
251  }
252  probe_batch_->Reset();
253  continue;
254  } else {
255  current_probe_row_ = probe_batch_->GetRow(probe_batch_pos_++);
256  VLOG_ROW << "probe row: " << GetProbeRowOutputString(current_probe_row_);
257  matched_probe_ = false;
259  break;
260  }
261  }
262 
263  return Status::OK;
264 }
265 
266 Status HashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch, bool* eos) {
267  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
268  RETURN_IF_CANCELLED(state);
269  SCOPED_TIMER(runtime_profile_->total_time_counter());
270  if (ReachedLimit()) {
271  *eos = true;
272  return Status::OK;
273  }
274 
275  // These cases are simpler and use a more efficient processing loop
276  if (!match_all_build_) {
277  if (eos_) {
278  *eos = true;
279  return Status::OK;
280  }
281  return LeftJoinGetNext(state, out_batch, eos);
282  }
283 
284  Expr* const* other_conjuncts = &other_join_conjuncts_[0];
285  int num_other_conjuncts = other_join_conjuncts_.size();
286 
287  Expr* const* conjuncts = &conjuncts_[0];
288  int num_conjuncts = conjuncts_.size();
289 
290  // Explicitly manage the timer counter to avoid measuring time in the child
291  // GetNext call.
293 
294  while (!eos_) {
295  // create output rows as long as:
296  // 1) we haven't already created an output row for the probe row and are doing
297  // a semi-join;
298  // 2) there are more matching build rows
299  while (hash_tbl_iterator_.HasNext()) {
300  TupleRow* matched_build_row = hash_tbl_iterator_.GetRow();
301  hash_tbl_iterator_.Next<true>();
302 
303  int row_idx = out_batch->AddRow();
304  TupleRow* out_row = out_batch->GetRow(row_idx);
305  CreateOutputRow(out_row, current_probe_row_, matched_build_row);
306  if (!EvalConjuncts(other_conjuncts, num_other_conjuncts, out_row)) continue;
307  // we have a match for the purpose of the (outer?) join as soon as we
308  // satisfy the JOIN clause conjuncts
309  matched_probe_ = true;
310  if (match_all_build_) {
311  // remember that we matched this build row
312  joined_build_rows_.insert(matched_build_row);
313  VLOG_ROW << "joined build row: " << matched_build_row;
314  }
315  if (EvalConjuncts(conjuncts, num_conjuncts, out_row)) {
316  out_batch->CommitLastRow();
317  VLOG_ROW << "match row: " << PrintRow(out_row, row_desc());
320  if (out_batch->IsFull() || ReachedLimit()) {
321  *eos = ReachedLimit();
322  return Status::OK;
323  }
324  }
325  }
326 
327  // check whether we need to output the current probe row before
328  // getting a new probe batch
330  int row_idx = out_batch->AddRow();
331  TupleRow* out_row = out_batch->GetRow(row_idx);
332  CreateOutputRow(out_row, current_probe_row_, NULL);
333  if (EvalConjuncts(conjuncts, num_conjuncts, out_row)) {
334  out_batch->CommitLastRow();
335  VLOG_ROW << "match row: " << PrintRow(out_row, row_desc());
338  matched_probe_ = true;
339  if (out_batch->IsFull() || ReachedLimit()) {
340  *eos = ReachedLimit();
341  return Status::OK;
342  }
343  }
344  }
345 
346  if (probe_batch_pos_ == probe_batch_->num_rows()) {
347  // pass on resources, out_batch might still need them
348  probe_batch_->TransferResourceOwnership(out_batch);
349  probe_batch_pos_ = 0;
350  if (out_batch->IsFull() || out_batch->AtResourceLimit()) return Status::OK;
351  // get new probe batch
352  if (!probe_eos_) {
353  while (true) {
354  probe_timer.Stop();
355  RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_eos_));
356  probe_timer.Start();
357  if (probe_batch_->num_rows() == 0) {
358  // Empty batches can still contain IO buffers, which need to be passed up to
359  // the caller; transferring resources can fill up out_batch.
360  probe_batch_->TransferResourceOwnership(out_batch);
361  if (probe_eos_) {
362  eos_ = true;
363  break;
364  }
365  if (out_batch->IsFull() || out_batch->AtResourceLimit()) return Status::OK;
366  continue;
367  } else {
368  COUNTER_UPDATE(probe_row_counter_, probe_batch_->num_rows());
369  break;
370  }
371  }
372  } else {
373  eos_ = true;
374  }
375  // finish up right outer join
376  if (eos_ && match_all_build_) {
377  hash_tbl_iterator_ = hash_tbl_->Begin();
378  }
379  }
380 
381  if (eos_) break;
382 
383  // join remaining rows in probe batch_
384  current_probe_row_ = probe_batch_->GetRow(probe_batch_pos_++);
385  VLOG_ROW << "probe row: " << GetProbeRowOutputString(current_probe_row_);
386  matched_probe_ = false;
388  }
389 
390  *eos = true;
391  if (match_all_build_) {
392  // output remaining unmatched build rows
393  TupleRow* build_row = NULL;
394  while (!out_batch->IsFull() && hash_tbl_iterator_.HasNext()) {
395  build_row = hash_tbl_iterator_.GetRow();
396  hash_tbl_iterator_.Next<false>();
397  if (joined_build_rows_.find(build_row) != joined_build_rows_.end()) {
398  continue;
399  }
400  int row_idx = out_batch->AddRow();
401  TupleRow* out_row = out_batch->GetRow(row_idx);
402  CreateOutputRow(out_row, NULL, build_row);
403  if (EvalConjuncts(conjuncts, num_conjuncts, out_row)) {
404  out_batch->CommitLastRow();
405  VLOG_ROW << "match row: " << PrintRow(out_row, row_desc());
408  if (ReachedLimit()) {
409  *eos = true;
410  return Status::OK;
411  }
412  }
413  }
414  // we're done if there are no more rows left to check
415  *eos = !hash_tbl_iterator_.HasNext();
416  }
417  return Status::OK;
418 }
419 
421  RowBatch* out_batch, bool* eos) {
422  *eos = eos_;
423 
425  while (!eos_) {
426  // Compute max rows that should be added to out_batch
427  int64_t max_added_rows = out_batch->capacity() - out_batch->num_rows();
428  if (limit() != -1) max_added_rows = min(max_added_rows, limit() - rows_returned());
429 
430  // Continue processing this row batch
431  if (process_probe_batch_fn_ == NULL) {
433  ProcessProbeBatch(out_batch, probe_batch_.get(), max_added_rows);
435  } else {
436  // Use codegen'd function
438  process_probe_batch_fn_(this, out_batch, probe_batch_.get(), max_added_rows);
440  }
441 
442  if (ReachedLimit() || out_batch->IsFull()) {
443  *eos = ReachedLimit();
444  break;
445  }
446 
447  // Check to see if we're done processing the current probe batch
448  if (!hash_tbl_iterator_.HasNext() && probe_batch_pos_ == probe_batch_->num_rows()) {
449  probe_batch_->TransferResourceOwnership(out_batch);
450  probe_batch_pos_ = 0;
451  if (out_batch->IsFull() || out_batch->AtResourceLimit()) break;
452  if (probe_eos_) {
453  *eos = eos_ = true;
454  break;
455  } else {
456  probe_timer.Stop();
457  RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_eos_));
458  probe_timer.Start();
459  COUNTER_UPDATE(probe_row_counter_, probe_batch_->num_rows());
460  }
461  }
462  }
463 
464  return Status::OK;
465 }
466 
467 string HashJoinNode::GetProbeRowOutputString(TupleRow* probe_row) {
468  stringstream out;
469  out << "[";
470  int* build_tuple_idx_ptr_ = &build_tuple_idx_[0];
471  for (int i = 0; i < row_desc().tuple_descriptors().size(); ++i) {
472  if (i != 0) out << " ";
473 
474  int* is_build_tuple =
475  ::find(build_tuple_idx_ptr_, build_tuple_idx_ptr_ + build_tuple_size_, i);
476 
477  if (is_build_tuple != build_tuple_idx_ptr_ + build_tuple_size_) {
478  out << PrintTuple(NULL, *row_desc().tuple_descriptors()[i]);
479  } else {
480  out << PrintTuple(probe_row->GetTuple(i), *row_desc().tuple_descriptors()[i]);
481  }
482  }
483  out << "]";
484  return out.str();
485 }
486 
487 void HashJoinNode::DebugString(int indentation_level, stringstream* out) const {
488  *out << string(indentation_level * 2, ' ');
489  *out << "HashJoin(eos=" << (eos_ ? "true" : "false")
490  << " probe_batch_pos=" << probe_batch_pos_
491  << " hash_tbl=";
492  *out << string(indentation_level * 2, ' ');
493  *out << "HashTbl("
494  << " build_exprs=" << Expr::DebugString(build_exprs_)
495  << " probe_exprs=" << Expr::DebugString(probe_exprs_);
496  *out << ")";
497  ExecNode::DebugString(indentation_level, out);
498  *out << ")";
499 }
500 
501 // This function is replaced by codegen
502 void HashJoinNode::CreateOutputRow(TupleRow* out, TupleRow* probe, TupleRow* build) {
503  if (probe == NULL) {
504  memset(out, 0, result_tuple_row_size_);
505  } else {
506  memcpy(out, probe, result_tuple_row_size_);
507  }
508 
509  if (build != NULL) {
510  for (int i = 0; i < build_tuple_size_; ++i) {
511  out->SetTuple(build_tuple_idx_[i], build->GetTuple(i));
512  }
513  } else {
514  for (int i = 0; i < build_tuple_size_; ++i) {
515  out->SetTuple(build_tuple_idx_[i], NULL);
516  }
517  }
518 }
519 
520 // This codegen'd function should only be used for left join cases so it assumes that
521 // the probe row is non-null. For a left outer join, the IR looks like:
522 // define void @CreateOutputRow(%"class.impala::TupleRow"* %out_arg,
523 // %"class.impala::TupleRow"* %probe_arg,
524 // %"class.impala::TupleRow"* %build_arg) {
525 // entry:
526 // %out = bitcast %"class.impala::TupleRow"* %out_arg to i8**
527 // %probe = bitcast %"class.impala::TupleRow"* %probe_arg to i8**
528 // %build = bitcast %"class.impala::TupleRow"* %build_arg to i8**
529 // %0 = bitcast i8** %out to i8*
530 // %1 = bitcast i8** %probe to i8*
531 // call void @llvm.memcpy.p0i8.p0i8.i32(i8* %0, i8* %1, i32 16, i32 16, i1 false)
532 // %is_build_null = icmp eq i8** %build, null
533 // br i1 %is_build_null, label %build_null, label %build_not_null
534 //
535 // build_not_null: ; preds = %entry
536 // %dst_tuple_ptr1 = getelementptr i8** %out, i32 1
537 // %src_tuple_ptr = getelementptr i8** %build, i32 0
538 // %2 = load i8** %src_tuple_ptr
539 // store i8* %2, i8** %dst_tuple_ptr1
540 // ret void
541 //
542 // build_null: ; preds = %entry
543 // %dst_tuple_ptr = getelementptr i8** %out, i32 1
544 // store i8* null, i8** %dst_tuple_ptr
545 // ret void
546 // }
548  Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
549  DCHECK(tuple_row_type != NULL);
550  PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
551 
552  Type* this_type = codegen->GetType(HashJoinNode::LLVM_CLASS_NAME);
553  DCHECK(this_type != NULL);
554  PointerType* this_ptr_type = PointerType::get(this_type, 0);
555 
556  // TupleRows are really just an array of pointers. Easier to work with them
557  // this way.
558  PointerType* tuple_row_working_type = PointerType::get(codegen->ptr_type(), 0);
559 
560  // Construct function signature to match CreateOutputRow()
561  LlvmCodeGen::FnPrototype prototype(codegen, "CreateOutputRow", codegen->void_type());
562  prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
563  prototype.AddArgument(LlvmCodeGen::NamedVariable("out_arg", tuple_row_ptr_type));
564  prototype.AddArgument(LlvmCodeGen::NamedVariable("probe_arg", tuple_row_ptr_type));
565  prototype.AddArgument(LlvmCodeGen::NamedVariable("build_arg", tuple_row_ptr_type));
566 
567  LLVMContext& context = codegen->context();
568  LlvmCodeGen::LlvmBuilder builder(context);
569  Value* args[4];
570  Function* fn = prototype.GeneratePrototype(&builder, args);
571  Value* out_row_arg = builder.CreateBitCast(args[1], tuple_row_working_type, "out");
572  Value* probe_row_arg = builder.CreateBitCast(args[2], tuple_row_working_type, "probe");
573  Value* build_row_arg = builder.CreateBitCast(args[3], tuple_row_working_type, "build");
574 
575  // Copy probe row
576  codegen->CodegenMemcpy(&builder, out_row_arg, probe_row_arg, result_tuple_row_size_);
577 
578  // Copy build row.
579  BasicBlock* build_not_null_block = BasicBlock::Create(context, "build_not_null", fn);
580  BasicBlock* build_null_block = NULL;
581 
582  if (match_all_probe_) {
583  // build tuple can be null
584  build_null_block = BasicBlock::Create(context, "build_null", fn);
585  Value* is_build_null = builder.CreateIsNull(build_row_arg, "is_build_null");
586  builder.CreateCondBr(is_build_null, build_null_block, build_not_null_block);
587 
588  // Set tuple build ptrs to NULL
589  builder.SetInsertPoint(build_null_block);
590  for (int i = 0; i < build_tuple_size_; ++i) {
591  Value* array_idx[] = { codegen->GetIntConstant(TYPE_INT, build_tuple_idx_[i]) };
592  Value* dst = builder.CreateGEP(out_row_arg, array_idx, "dst_tuple_ptr");
593  builder.CreateStore(codegen->null_ptr_value(), dst);
594  }
595  builder.CreateRetVoid();
596  } else {
597  // build row can't be NULL
598  builder.CreateBr(build_not_null_block);
599  }
600 
601  // Copy build tuple ptrs
602  builder.SetInsertPoint(build_not_null_block);
603  for (int i = 0; i < build_tuple_size_; ++i) {
604  Value* dst_idx[] = { codegen->GetIntConstant(TYPE_INT, build_tuple_idx_[i]) };
605  Value* src_idx[] = { codegen->GetIntConstant(TYPE_INT, i) };
606  Value* dst = builder.CreateGEP(out_row_arg, dst_idx, "dst_tuple_ptr");
607  Value* src = builder.CreateGEP(build_row_arg, src_idx, "src_tuple_ptr");
608  builder.CreateStore(builder.CreateLoad(src), dst);
609  }
610  builder.CreateRetVoid();
611 
612  return codegen->FinalizeFunction(fn);
613 }
614 
616  Function* hash_fn) {
617  // Get cross compiled function
618  Function* process_build_batch_fn = codegen->GetFunction(
619  IRFunction::HASH_JOIN_PROCESS_BUILD_BATCH);
620  DCHECK(process_build_batch_fn != NULL);
621 
622  // Codegen for evaluating build rows
623  Function* eval_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, true);
624  if (eval_row_fn == NULL) return NULL;
625 
626  int replaced = 0;
627  // Replace call sites
628  process_build_batch_fn = codegen->ReplaceCallSites(process_build_batch_fn, false,
629  eval_row_fn, "EvalBuildRow", &replaced);
630  DCHECK_EQ(replaced, 1);
631 
632  process_build_batch_fn = codegen->ReplaceCallSites(process_build_batch_fn, false,
633  hash_fn, "HashCurrentRow", &replaced);
634  DCHECK_EQ(replaced, 1);
635 
636  return codegen->OptimizeFunctionWithExprs(process_build_batch_fn);
637 }
638 
640  Function* hash_fn) {
641  // Get cross compiled function
642  Function* process_probe_batch_fn = codegen->GetFunction(
643  IRFunction::HASH_JOIN_PROCESS_PROBE_BATCH);
644  DCHECK(process_probe_batch_fn != NULL);
645 
646  // Codegen HashTable::Equals
647  Function* equals_fn = hash_tbl_->CodegenEquals(codegen);
648  if (equals_fn == NULL) return NULL;
649 
650  // Codegen for evaluating build rows
651  Function* eval_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, false);
652  if (eval_row_fn == NULL) return NULL;
653 
654  // Codegen CreateOutputRow
655  Function* create_output_row_fn = CodegenCreateOutputRow(codegen);
656  if (create_output_row_fn == NULL) return NULL;
657 
658  // Codegen evaluating other join conjuncts
659  Function* join_conjuncts_fn = CodegenEvalConjuncts(codegen, other_join_conjuncts_);
660  if (join_conjuncts_fn == NULL) return NULL;
661 
662  // Codegen evaluating conjuncts
663  Function* conjuncts_fn = CodegenEvalConjuncts(codegen, conjuncts_);
664  if (conjuncts_fn == NULL) return NULL;
665 
666  // Replace all call sites with codegen version
667  int replaced = 0;
668  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
669  hash_fn, "HashCurrentRow", &replaced);
670  DCHECK_EQ(replaced, 1);
671 
672  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
673  eval_row_fn, "EvalProbeRow", &replaced);
674  DCHECK_EQ(replaced, 1);
675 
676  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
677  create_output_row_fn, "CreateOutputRow", &replaced);
678  DCHECK_EQ(replaced, 2);
679 
680  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
681  conjuncts_fn, "EvalConjuncts", &replaced);
682  DCHECK_EQ(replaced, 2);
683 
684  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
685  join_conjuncts_fn, "EvalOtherJoinConjuncts", &replaced);
686  DCHECK_EQ(replaced, 1);
687 
688  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
689  equals_fn, "Equals", &replaced);
690  DCHECK_EQ(replaced, 2);
691 
692  return codegen->OptimizeFunctionWithExprs(process_probe_batch_fn);
693 }
void AddRuntimeExecOption(const std::string &option)
Appends option to 'runtime_exec_options_'.
Definition: exec-node.cc:188
RuntimeProfile::Counter * hash_tbl_load_factor_counter_
int num_rows() const
Definition: row-batch.h:215
int64_t num_rows_returned_
Definition: exec-node.h:223
static const char * LLVM_CLASS_NAME
Tuple * GetTuple(int tuple_idx)
Definition: tuple-row.h:30
OldHashTable::Iterator hash_tbl_iterator_
bool TryAcquireThreadToken(bool *is_reserved=NULL)
void CreateOutputRow(TupleRow *out_row, TupleRow *probe_row, TupleRow *build_row)
llvm::Function * CodegenProcessProbeBatch(RuntimeState *state, llvm::Function *hash_fn)
Utility struct that wraps a variable name and llvm type.
Definition: llvm-codegen.h:149
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
RowDescriptor row_descriptor_
Definition: exec-node.h:215
#define ADD_TIMER(profile, name)
RuntimeProfile::Counter * build_timer_
const RowDescriptor & row_desc() const
Definition: exec-node.h:156
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Definition: exec-node.cc:378
void(* ProcessBuildBatchFn)(HashJoinNode *, RowBatch *)
bool ReachedLimit()
Definition: exec-node.h:159
llvm::Function * CodegenProcessBuildBatch(RuntimeState *state, llvm::Function *hash_fn)
void BuildSideThread(RuntimeState *state, Promise< Status > *status)
#define SCOPED_TIMER(c)
int(* ProcessProbeBatchFn)(HashJoinNode *, RowBatch *, RowBatch *, int)
HashJoinNode::ProcessProbeBatch() exactly.
llvm::Value * null_ptr_value()
Definition: llvm-codegen.h:382
boost::scoped_ptr< OldHashTable > hash_tbl_
RuntimeProfile::Counter * probe_timer_
LLVM code generator. This is the top level object to generate jitted code.
Definition: llvm-codegen.h:107
boost::scoped_ptr< MemPool > build_pool_
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
Definition: exec-node.cc:345
RuntimeProfile::Counter * probe_row_counter_
static const char * LLVM_CLASS_NAME
Definition: tuple-row.h:76
virtual Status Prepare(RuntimeState *state)
int ProcessProbeBatch(RowBatch *out_batch, RowBatch *probe_batch, int max_added_rows)
virtual Status Init(const TPlanNode &tnode)
void AddArgument(const NamedVariable &var)
Add argument.
Definition: llvm-codegen.h:171
static llvm::Function * CodegenEvalConjuncts(RuntimeState *state, const std::vector< ExprContext * > &conjunct_ctxs, const char *name="EvalConjuncts")
Definition: exec-node.cc:452
void CodegenMemcpy(LlvmBuilder *, llvm::Value *dst, llvm::Value *src, int size)
RuntimeProfile::Counter * build_buckets_counter_
ProcessProbeBatchFn process_probe_batch_fn_
Jitted ProcessProbeBatch function pointer. Null if codegen is disabled.
#define RETURN_IF_CANCELLED(state)
ObjectPool pool
virtual Status Prepare(RuntimeState *state)
Definition: exec-node.cc:130
#define ADD_COUNTER(profile, name, unit)
llvm::Function * GetFunction(IRFunction::Type)
This is the superclass of all expr evaluation nodes.
Definition: expr.h:116
#define VLOG_ROW
Definition: logging.h:59
int GetTupleIdx(TupleId id) const
Returns INVALID_IDX if id not part of this row.
Definition: descriptors.cc:328
void CommitLastRow()
Definition: row-batch.h:109
void ProcessBuildBatch(RowBatch *build_batch)
Construct the build hash table, adding all the rows in 'build_batch'.
virtual Status Open(RuntimeState *state)
llvm::Function * codegen_process_build_batch_fn_
llvm function for build batch
int64_t rows_returned() const
Definition: exec-node.h:157
#define COUNTER_SET(c, v)
int batch_size() const
Definition: runtime-state.h:98
TupleId id() const
Definition: descriptors.h:306
static Status CreateExprTree(ObjectPool *pool, const TExpr &texpr, ExprContext **ctx)
Definition: expr.cc:129
RuntimeProfile::Counter * rows_returned_counter_
Definition: exec-node.h:226
void IR_ALWAYS_INLINE Next()
ExecNode * child(int i)
Definition: exec-node.h:241
llvm::Function * CodegenCreateOutputRow(LlvmCodeGen *codegen)
Codegen function to create output row.
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
Definition: descriptors.h:412
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
int capacity() const
Definition: row-batch.h:216
static const Status OK
Definition: status.h:87
llvm::Type * GetType(const ColumnType &type)
Returns llvm type for the column type.
llvm::Value * GetIntConstant(PrimitiveType type, int64_t val)
Returns the constant 'val' of 'type'.
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
Definition: expr.cc:149
RuntimeProfile::Counter * build_row_counter_
llvm::Function * FinalizeFunction(llvm::Function *function)
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
Definition: exec-node.cc:393
virtual void Close(RuntimeState *state)
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
virtual Status Open(RuntimeState *state)
Definition: exec-node.cc:154
boost::scoped_ptr< RowBatch > probe_batch_
llvm::Function * ReplaceCallSites(llvm::Function *caller, bool update_in_place, llvm::Function *new_fn, const std::string &target_name, int *num_replaced)
string PrintRow(TupleRow *row, const RowDescriptor &d)
Definition: debug-util.cc:192
bool ok() const
Definition: status.h:172
string PrintTuple(const Tuple *t, const TupleDescriptor &d)
Definition: debug-util.cc:166
llvm::Type * void_type()
Definition: llvm-codegen.h:394
ThreadResourceMgr::ResourcePool * resource_pool()
virtual void Close(RuntimeState *state)
Definition: exec-node.cc:166
Status LeftJoinGetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
llvm::LLVMContext & context()
Definition: llvm-codegen.h:214
ProcessBuildBatchFn process_build_batch_fn_
virtual std::string DebugString() const
Definition: expr.cc:385
llvm::Function * OptimizeFunctionWithExprs(llvm::Function *fn)
llvm::PointerType * ptr_type()
Definition: llvm-codegen.h:393
int64_t limit() const
Definition: exec-node.h:158
bool match_one_build_
Match at most one build row to each probe row. Used in LEFT_SEMI_JOIN.
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161