Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hash-join-node.cc.REMOTE.32326.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/hash-join-node.h"
16 
17 #include <sstream>
18 
19 #include "codegen/llvm-codegen.h"
20 #include "exec/hash-table.inline.h"
21 #include "exprs/expr.h"
22 #include "runtime/row-batch.h"
23 #include "runtime/runtime-state.h"
24 #include "util/debug-util.h"
25 #include "util/runtime-profile.h"
26 
27 #include "gen-cpp/PlanNodes_types.h"
28 
29 using namespace boost;
30 using namespace impala;
31 using namespace llvm;
32 using namespace std;
33 
34 const char* HashJoinNode::LLVM_CLASS_NAME = "class.impala::HashJoinNode";
35 
36 HashJoinNode::HashJoinNode(
37  ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
38  : ExecNode(pool, tnode, descs),
39  join_op_(tnode.hash_join_node.join_op),
40  build_pool_(new MemPool()),
41  codegen_process_build_batch_fn_(NULL),
42  process_build_batch_fn_(NULL),
43  codegen_process_probe_batch_fn_(NULL),
44  process_probe_batch_fn_(NULL) {
45  // TODO: log errors in runtime state
46  Status status = Init(pool, tnode);
47  DCHECK(status.ok())
48  << "HashJoinNode c'tor: Init() failed:\n"
49  << status.GetErrorMsg();
50 
52  (join_op_ == TJoinOp::LEFT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN);
53  match_one_build_ = (join_op_ == TJoinOp::LEFT_SEMI_JOIN);
55  (join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN);
56 }
57 
58 Status HashJoinNode::Init(ObjectPool* pool, const TPlanNode& tnode) {
59  DCHECK(tnode.__isset.hash_join_node);
60  const vector<TEqJoinCondition>& eq_join_conjuncts =
61  tnode.hash_join_node.eq_join_conjuncts;
62  for (int i = 0; i < eq_join_conjuncts.size(); ++i) {
63  Expr* expr;
64  RETURN_IF_ERROR(Expr::CreateExprTree(pool, eq_join_conjuncts[i].left, &expr));
65  probe_exprs_.push_back(expr);
66  RETURN_IF_ERROR(Expr::CreateExprTree(pool, eq_join_conjuncts[i].right, &expr));
67  build_exprs_.push_back(expr);
68  }
70  Expr::CreateExprTrees(pool, tnode.hash_join_node.other_join_conjuncts,
71  &other_join_conjuncts_));
72  return Status::OK;
73 }
74 
75 HashJoinNode::~HashJoinNode() {
76  // probe_batch_ must be cleaned up in Close() to ensure proper resource freeing.
77  DCHECK(probe_batch_ == NULL);
78 }
79 
82 
83  build_timer_ =
84  ADD_TIMER(runtime_profile(), "BuildTime");
85  probe_timer_ =
86  ADD_TIMER(runtime_profile(), "ProbeTime");
88  ADD_COUNTER(runtime_profile(), "BuildRows", TCounterType::UNIT);
90  ADD_COUNTER(runtime_profile(), "BuildBuckets", TCounterType::UNIT);
92  ADD_COUNTER(runtime_profile(), "ProbeRows", TCounterType::UNIT);
94  ADD_COUNTER(runtime_profile(), "LoadFactor", TCounterType::DOUBLE_VALUE);
95 
96  // build and probe exprs are evaluated in the context of the rows produced by our
97  // right and left children, respectively
98  Expr::Prepare(build_exprs_, state, child(1)->row_desc());
99  Expr::Prepare(probe_exprs_, state, child(0)->row_desc());
100 
101  // other_join_conjuncts_ are evaluated in the context of the rows produced by this node
102  Expr::Prepare(other_join_conjuncts_, state, row_descriptor_);
103 
104  result_tuple_row_size_ = row_descriptor_.tuple_descriptors().size() * sizeof(Tuple*);
105 
106  // pre-compute the tuple index of build tuples in the output row
107  build_tuple_size_ = child(1)->row_desc().tuple_descriptors().size();
108  build_tuple_idx_.reserve(build_tuple_size_);
109  for (int i = 0; i < build_tuple_size_; ++i) {
110  TupleDescriptor* build_tuple_desc = child(1)->row_desc().tuple_descriptors()[i];
111  build_tuple_idx_.push_back(row_descriptor_.GetTupleIdx(build_tuple_desc->id()));
112  }
113 
114  build_pool_->set_limits(*state->mem_limits());
115 
116  // TODO: default buckets
117  hash_tbl_.reset(new HashTable(build_exprs_, probe_exprs_, build_tuple_size_,
118  false, id(), *state->mem_limits()));
119 
120  probe_batch_.reset(new RowBatch(row_descriptor_, state->batch_size()));
121 
122  LlvmCodeGen* codegen = state->llvm_codegen();
123  if (codegen != NULL) {
124  // Codegen for hashing rows
125  Function* hash_fn = hash_tbl_->CodegenHashCurrentRow(codegen);
126  if (hash_fn == NULL) return Status::OK;
127 
128  // Codegen for build path
130 
131  // Codegen for probe path (only for left joins)
132  if (!match_all_build_) {
133  codegen_process_probe_batch_fn_ = CodegenProcessProbeBatch(codegen, hash_fn);
134  }
135  }
136  return Status::OK;
137 }
138 
140  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::CLOSE));
141  // Must reset probe_batch_ in Close() to release resources
142  probe_batch_.reset(NULL);
143  if (memory_used_counter_ != NULL && hash_tbl_.get() != NULL) {
144  COUNTER_UPDATE(memory_used_counter_, build_pool_->peak_allocated_bytes());
145  COUNTER_UPDATE(memory_used_counter_, hash_tbl_->byte_size());
146  }
147  return ExecNode::Close(state);
148 }
149 
150 void HashJoinNode::BuildSideThread(RuntimeState* state, promise<Status>* status) {
151  status->set_value(ConstructHashTable(state));
152 }
153 
154 Status HashJoinNode::ConstructHashTable(RuntimeState* state) {
155  // Do a full scan of child(1) and store everything in hash_tbl_
156  // The hash join node needs to keep in memory all build tuples, including the tuple
157  // row ptrs. The row ptrs are copied into the hash table's internal structure so they
158  // don't need to be stored in the build_pool_.
159  RowBatch build_batch(child(1)->row_desc(), state->batch_size());
160  RETURN_IF_ERROR(child(1)->Open(state));
161  while (true) {
162  RETURN_IF_CANCELLED(state);
163  bool eos;
164  RETURN_IF_ERROR(child(1)->GetNext(state, &build_batch, &eos));
166  // take ownership of tuple data of build_batch
167  build_pool_->AcquireData(build_batch.tuple_data_pool(), false);
168  RETURN_IF_LIMIT_EXCEEDED(state);
169 
170  // Call codegen version if possible
171  if (process_build_batch_fn_ == NULL) {
172  ProcessBuildBatch(&build_batch);
173  } else {
174  process_build_batch_fn_(this, &build_batch);
175  }
176  VLOG_ROW << hash_tbl_->DebugString(true, &child(1)->row_desc());
177 
181  build_batch.Reset();
182  if (eos) break;
183  }
184  return Status::OK;
185 }
186 
188  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::OPEN));
189  SCOPED_TIMER(runtime_profile_->total_time_counter());
190  RETURN_IF_CANCELLED(state);
191 
192  if (codegen_process_build_batch_fn_ == NULL) {
193  LOG(WARNING) << "Codegen for HashJoinNode (node_id=" << id()
194  << ") was not supported for this query.";
195  } else {
196  void* jitted_process_build_batch =
197  state->llvm_codegen()->JitFunction(codegen_process_build_batch_fn_);
198  DCHECK(jitted_process_build_batch != NULL);
200  reinterpret_cast<ProcessBuildBatchFn>(jitted_process_build_batch);
201  LOG(INFO) << "HashJoinNode(node_id=" << id()
202  << ") using llvm codegend function for building hash table.";
203  }
204 
205  if (codegen_process_probe_batch_fn_ == NULL) {
206  LOG(WARNING) << "Codegen for HashJoinNode (node_id=" << id()
207  << ") was not supported for this query.";
208  } else {
209  void* jitted_process_probe_batch =
210  state->llvm_codegen()->JitFunction(codegen_process_probe_batch_fn_);
211  DCHECK(jitted_process_probe_batch != NULL);
213  reinterpret_cast<ProcessProbeBatchFn>(jitted_process_probe_batch);
214  LOG(INFO) << "HashJoinNode(node_id=" << id()
215  << ") using llvm codegend function for probing hash table.";
216  }
217 
218  eos_ = false;
219 
220  // TODO: fix problems with asynchronous cancellation
221  // Kick-off the construction of the build-side table in a separate
222  // thread, so that the left child can do any initialisation in parallel.
223  promise<Status> thread_status;
224  Thread build_thread("hash-join-node", "build thread",
225  &HashJoinNode::BuildSideThread, this, state, &thread_status);
226 
227  // Open the probe-side child so that it may perform any initialisation in parallel.
228  // Don't exit even if we see an error, we still need to wait for the build thread
229  // to finish.
230  Status open_status = child(0)->Open(state);
231 
232  // Blocks until ConstructHashTable has returned, after which
233  // the hash table is fully constructed and we can start the probe
234  // phase.
235  RETURN_IF_ERROR(thread_status.get_future().get());
236  VLOG_ROW << hash_tbl_->DebugString(true, &child(1)->row_desc());
237  RETURN_IF_ERROR(open_status);
238 
239  // seed probe batch and current_probe_row_, etc.
240  while (true) {
241  RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_eos_));
242  COUNTER_UPDATE(probe_row_counter_, probe_batch_->num_rows());
243  probe_batch_pos_ = 0;
244  if (probe_batch_->num_rows() == 0) {
245  if (probe_eos_) {
246  hash_tbl_iterator_ = hash_tbl_->Begin();
247  eos_ = true;
248  break;
249  }
250  probe_batch_->Reset();
251  continue;
252  } else {
253  current_probe_row_ = probe_batch_->GetRow(probe_batch_pos_++);
254  VLOG_ROW << "probe row: " << GetProbeRowOutputString(current_probe_row_);
255  matched_probe_ = false;
257  break;
258  }
259  }
260 
261  return Status::OK;
262 }
263 
264 Status HashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch, bool* eos) {
265  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT));
266  RETURN_IF_CANCELLED(state);
267  SCOPED_TIMER(runtime_profile_->total_time_counter());
268  if (ReachedLimit()) {
269  *eos = true;
270  return Status::OK;
271  }
272 
273  // These cases are simpler and use a more efficient processing loop
274  if (!match_all_build_) {
275  if (eos_) {
276  *eos = true;
277  return Status::OK;
278  }
279  return LeftJoinGetNext(state, out_batch, eos);
280  }
281 
282  Expr* const* other_conjuncts = &other_join_conjuncts_[0];
283  int num_other_conjuncts = other_join_conjuncts_.size();
284 
285  Expr* const* conjuncts = &conjuncts_[0];
286  int num_conjuncts = conjuncts_.size();
287 
288  // Explicitly manage the timer counter to avoid measuring time in the child
289  // GetNext call.
291 
292  while (!eos_) {
293  // create output rows as long as:
294  // 1) we haven't already created an output row for the probe row and are doing
295  // a semi-join;
296  // 2) there are more matching build rows
297  while (hash_tbl_iterator_.HasNext()) {
298  TupleRow* matched_build_row = hash_tbl_iterator_.GetRow();
299  hash_tbl_iterator_.Next<true>();
300 
301  int row_idx = out_batch->AddRow();
302  TupleRow* out_row = out_batch->GetRow(row_idx);
303  CreateOutputRow(out_row, current_probe_row_, matched_build_row);
304  if (!EvalConjuncts(other_conjuncts, num_other_conjuncts, out_row)) continue;
305  // we have a match for the purpose of the (outer?) join as soon as we
306  // satisfy the JOIN clause conjuncts
307  matched_probe_ = true;
308  if (match_all_build_) {
309  // remember that we matched this build row
310  joined_build_rows_.insert(matched_build_row);
311  VLOG_ROW << "joined build row: " << matched_build_row;
312  }
313  if (EvalConjuncts(conjuncts, num_conjuncts, out_row)) {
314  out_batch->CommitLastRow();
315  VLOG_ROW << "match row: " << PrintRow(out_row, row_desc());
318  if (out_batch->IsFull() || ReachedLimit()) {
319  *eos = ReachedLimit();
320  return Status::OK;
321  }
322  }
323  }
324 
325  // check whether we need to output the current probe row before
326  // getting a new probe batch
328  int row_idx = out_batch->AddRow();
329  TupleRow* out_row = out_batch->GetRow(row_idx);
330  CreateOutputRow(out_row, current_probe_row_, NULL);
331  if (EvalConjuncts(conjuncts, num_conjuncts, out_row)) {
332  out_batch->CommitLastRow();
333  VLOG_ROW << "match row: " << PrintRow(out_row, row_desc());
336  matched_probe_ = true;
337  if (out_batch->IsFull() || ReachedLimit()) {
338  *eos = ReachedLimit();
339  return Status::OK;
340  }
341  }
342  }
343 
344  if (probe_batch_pos_ == probe_batch_->num_rows()) {
345  // pass on resources, out_batch might still need them
346  probe_batch_->TransferResourceOwnership(out_batch);
347  probe_batch_pos_ = 0;
348  if (out_batch->IsFull()) return Status::OK;
349  // get new probe batch
350  if (!probe_eos_) {
351  while (true) {
352  probe_timer.Stop();
353  RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_eos_));
354  probe_timer.Start();
355  if (probe_batch_->num_rows() == 0) {
356  // Empty batches can still contain IO buffers, which need to be passed up to
357  // the caller; transferring resources can fill up out_batch.
358  probe_batch_->TransferResourceOwnership(out_batch);
359  if (probe_eos_) {
360  eos_ = true;
361  break;
362  }
363  if (out_batch->IsFull()) return Status::OK;
364  continue;
365  } else {
366  COUNTER_UPDATE(probe_row_counter_, probe_batch_->num_rows());
367  break;
368  }
369  }
370  } else {
371  eos_ = true;
372  }
373  // finish up right outer join
374  if (eos_ && match_all_build_) {
375  hash_tbl_iterator_ = hash_tbl_->Begin();
376  }
377  }
378 
379  if (eos_) break;
380 
381  // join remaining rows in probe batch_
382  current_probe_row_ = probe_batch_->GetRow(probe_batch_pos_++);
383  VLOG_ROW << "probe row: " << GetProbeRowOutputString(current_probe_row_);
384  matched_probe_ = false;
386  }
387 
388  *eos = true;
389  if (match_all_build_) {
390  // output remaining unmatched build rows
391  TupleRow* build_row = NULL;
392  while (!out_batch->IsFull() && hash_tbl_iterator_.HasNext()) {
393  build_row = hash_tbl_iterator_.GetRow();
394  hash_tbl_iterator_.Next<false>();
395  if (joined_build_rows_.find(build_row) != joined_build_rows_.end()) {
396  continue;
397  }
398  int row_idx = out_batch->AddRow();
399  TupleRow* out_row = out_batch->GetRow(row_idx);
400  CreateOutputRow(out_row, NULL, build_row);
401  if (EvalConjuncts(conjuncts, num_conjuncts, out_row)) {
402  out_batch->CommitLastRow();
403  VLOG_ROW << "match row: " << PrintRow(out_row, row_desc());
406  if (ReachedLimit()) {
407  *eos = true;
408  return Status::OK;
409  }
410  }
411  }
412  // we're done if there are no more rows left to check
413  *eos = !hash_tbl_iterator_.HasNext();
414  }
415  return Status::OK;
416 }
417 
419  RowBatch* out_batch, bool* eos) {
420  *eos = eos_;
421 
423  while (!eos_) {
424  // Compute max rows that should be added to out_batch
425  int64_t max_added_rows = out_batch->capacity() - out_batch->num_rows();
426  if (limit() != -1) max_added_rows = min(max_added_rows, limit() - rows_returned());
427 
428  // Continue processing this row batch
429  if (process_probe_batch_fn_ == NULL) {
431  ProcessProbeBatch(out_batch, probe_batch_.get(), max_added_rows);
433  } else {
434  // Use codegen'd function
436  process_probe_batch_fn_(this, out_batch, probe_batch_.get(), max_added_rows);
438  }
439 
440  if (ReachedLimit() || out_batch->IsFull()) {
441  *eos = ReachedLimit();
442  break;
443  }
444 
445  // Check to see if we're done processing the current probe batch
446  if (!hash_tbl_iterator_.HasNext() && probe_batch_pos_ == probe_batch_->num_rows()) {
447  probe_batch_->TransferResourceOwnership(out_batch);
448  probe_batch_pos_ = 0;
449  if (out_batch->IsFull()) break;
450  if (probe_eos_) {
451  *eos = eos_ = true;
452  break;
453  } else {
454  probe_timer.Stop();
455  RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_eos_));
456  probe_timer.Start();
457  COUNTER_UPDATE(probe_row_counter_, probe_batch_->num_rows());
458  }
459  }
460  }
461 
462  return Status::OK;
463 }
464 
465 string HashJoinNode::GetProbeRowOutputString(TupleRow* probe_row) {
466  stringstream out;
467  out << "[";
468  int* build_tuple_idx_ptr_ = &build_tuple_idx_[0];
469  for (int i = 0; i < row_desc().tuple_descriptors().size(); ++i) {
470  if (i != 0) out << " ";
471 
472  int* is_build_tuple =
473  ::find(build_tuple_idx_ptr_, build_tuple_idx_ptr_ + build_tuple_size_, i);
474 
475  if (is_build_tuple != build_tuple_idx_ptr_ + build_tuple_size_) {
476  out << PrintTuple(NULL, *row_desc().tuple_descriptors()[i]);
477  } else {
478  out << PrintTuple(probe_row->GetTuple(i), *row_desc().tuple_descriptors()[i]);
479  }
480  }
481  out << "]";
482  return out.str();
483 }
484 
485 void HashJoinNode::DebugString(int indentation_level, stringstream* out) const {
486  *out << string(indentation_level * 2, ' ');
487  *out << "HashJoin(eos=" << (eos_ ? "true" : "false")
488  << " probe_batch_pos=" << probe_batch_pos_
489  << " hash_tbl=";
490  *out << string(indentation_level * 2, ' ');
491  *out << "HashTbl("
492  << " build_exprs=" << Expr::DebugString(build_exprs_)
493  << " probe_exprs=" << Expr::DebugString(probe_exprs_);
494  *out << ")";
495  ExecNode::DebugString(indentation_level, out);
496  *out << ")";
497 }
498 
499 // This function is replaced by codegen
500 void HashJoinNode::CreateOutputRow(TupleRow* out, TupleRow* probe, TupleRow* build) {
501  if (probe == NULL) {
502  memset(out, 0, result_tuple_row_size_);
503  } else {
504  memcpy(out, probe, result_tuple_row_size_);
505  }
506 
507  if (build != NULL) {
508  for (int i = 0; i < build_tuple_size_; ++i) {
509  out->SetTuple(build_tuple_idx_[i], build->GetTuple(i));
510  }
511  } else {
512  for (int i = 0; i < build_tuple_size_; ++i) {
513  out->SetTuple(build_tuple_idx_[i], NULL);
514  }
515  }
516 }
517 
518 // This codegen'd function should only be used for left join cases so it assumes that
519 // the probe row is non-null. For a left outer join, the IR looks like:
520 // define void @CreateOutputRow(%"class.impala::TupleRow"* %out_arg,
521 // %"class.impala::TupleRow"* %probe_arg,
522 // %"class.impala::TupleRow"* %build_arg) {
523 // entry:
524 // %out = bitcast %"class.impala::TupleRow"* %out_arg to i8**
525 // %probe = bitcast %"class.impala::TupleRow"* %probe_arg to i8**
526 // %build = bitcast %"class.impala::TupleRow"* %build_arg to i8**
527 // %0 = bitcast i8** %out to i8*
528 // %1 = bitcast i8** %probe to i8*
529 // call void @llvm.memcpy.p0i8.p0i8.i32(i8* %0, i8* %1, i32 16, i32 16, i1 false)
530 // %is_build_null = icmp eq i8** %build, null
531 // br i1 %is_build_null, label %build_null, label %build_not_null
532 //
533 // build_not_null: ; preds = %entry
534 // %dst_tuple_ptr1 = getelementptr i8** %out, i32 1
535 // %src_tuple_ptr = getelementptr i8** %build, i32 0
536 // %2 = load i8** %src_tuple_ptr
537 // store i8* %2, i8** %dst_tuple_ptr1
538 // ret void
539 //
540 // build_null: ; preds = %entry
541 // %dst_tuple_ptr = getelementptr i8** %out, i32 1
542 // store i8* null, i8** %dst_tuple_ptr
543 // ret void
544 // }
546  Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
547  DCHECK(tuple_row_type != NULL);
548  PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
549 
550  Type* this_type = codegen->GetType(HashJoinNode::LLVM_CLASS_NAME);
551  DCHECK(this_type != NULL);
552  PointerType* this_ptr_type = PointerType::get(this_type, 0);
553 
554  // TupleRows are really just an array of pointers. Easier to work with them
555  // this way.
556  PointerType* tuple_row_working_type = PointerType::get(codegen->ptr_type(), 0);
557 
558  // Construct function signature to match CreateOutputRow()
559  LlvmCodeGen::FnPrototype prototype(codegen, "CreateOutputRow", codegen->void_type());
560  prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
561  prototype.AddArgument(LlvmCodeGen::NamedVariable("out_arg", tuple_row_ptr_type));
562  prototype.AddArgument(LlvmCodeGen::NamedVariable("probe_arg", tuple_row_ptr_type));
563  prototype.AddArgument(LlvmCodeGen::NamedVariable("build_arg", tuple_row_ptr_type));
564 
565  LLVMContext& context = codegen->context();
566  LlvmCodeGen::LlvmBuilder builder(context);
567  Value* args[4];
568  Function* fn = prototype.GeneratePrototype(&builder, args);
569  Value* out_row_arg = builder.CreateBitCast(args[1], tuple_row_working_type, "out");
570  Value* probe_row_arg = builder.CreateBitCast(args[2], tuple_row_working_type, "probe");
571  Value* build_row_arg = builder.CreateBitCast(args[3], tuple_row_working_type, "build");
572 
573  // Copy probe row
574  codegen->CodegenMemcpy(&builder, out_row_arg, probe_row_arg, result_tuple_row_size_);
575 
576  // Copy build row.
577  BasicBlock* build_not_null_block = BasicBlock::Create(context, "build_not_null", fn);
578  BasicBlock* build_null_block = NULL;
579 
580  if (match_all_probe_) {
581  // build tuple can be null
582  build_null_block = BasicBlock::Create(context, "build_null", fn);
583  Value* is_build_null = builder.CreateIsNull(build_row_arg, "is_build_null");
584  builder.CreateCondBr(is_build_null, build_null_block, build_not_null_block);
585 
586  // Set tuple build ptrs to NULL
587  builder.SetInsertPoint(build_null_block);
588  for (int i = 0; i < build_tuple_size_; ++i) {
589  Value* array_idx[] = { codegen->GetIntConstant(TYPE_INT, build_tuple_idx_[i]) };
590  Value* dst = builder.CreateGEP(out_row_arg, array_idx, "dst_tuple_ptr");
591  builder.CreateStore(codegen->null_ptr_value(), dst);
592  }
593  builder.CreateRetVoid();
594  } else {
595  // build row can't be NULL
596  builder.CreateBr(build_not_null_block);
597  }
598 
599  // Copy build tuple ptrs
600  builder.SetInsertPoint(build_not_null_block);
601  for (int i = 0; i < build_tuple_size_; ++i) {
602  Value* dst_idx[] = { codegen->GetIntConstant(TYPE_INT, build_tuple_idx_[i]) };
603  Value* src_idx[] = { codegen->GetIntConstant(TYPE_INT, i) };
604  Value* dst = builder.CreateGEP(out_row_arg, dst_idx, "dst_tuple_ptr");
605  Value* src = builder.CreateGEP(build_row_arg, src_idx, "src_tuple_ptr");
606  builder.CreateStore(builder.CreateLoad(src), dst);
607  }
608  builder.CreateRetVoid();
609 
610  return codegen->FinalizeFunction(fn);
611 }
612 
614  Function* hash_fn) {
615  // Get cross compiled function
616  Function* process_build_batch_fn = codegen->GetFunction(
617  IRFunction::HASH_JOIN_PROCESS_BUILD_BATCH);
618  DCHECK(process_build_batch_fn != NULL);
619 
620  // Codegen for evaluating build rows
621  Function* eval_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, true);
622  if (eval_row_fn == NULL) return NULL;
623 
624  int replaced = 0;
625  // Replace call sites
626  process_build_batch_fn = codegen->ReplaceCallSites(process_build_batch_fn, false,
627  eval_row_fn, "EvalBuildRow", &replaced);
628  DCHECK_EQ(replaced, 1);
629 
630  process_build_batch_fn = codegen->ReplaceCallSites(process_build_batch_fn, false,
631  hash_fn, "HashCurrentRow", &replaced);
632  DCHECK_EQ(replaced, 1);
633 
634  return codegen->OptimizeFunctionWithExprs(process_build_batch_fn);
635 }
636 
638  Function* hash_fn) {
639  // Get cross compiled function
640  Function* process_probe_batch_fn = codegen->GetFunction(
641  IRFunction::HASH_JOIN_PROCESS_PROBE_BATCH);
642  DCHECK(process_probe_batch_fn != NULL);
643 
644  // Codegen HashTable::Equals
645  Function* equals_fn = hash_tbl_->CodegenEquals(codegen);
646  if (equals_fn == NULL) return NULL;
647 
648  // Codegen for evaluating build rows
649  Function* eval_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, false);
650  if (eval_row_fn == NULL) return NULL;
651 
652  // Codegen CreateOutputRow
653  Function* create_output_row_fn = CodegenCreateOutputRow(codegen);
654  if (create_output_row_fn == NULL) return NULL;
655 
656  // Codegen evaluating other join conjuncts
657  Function* join_conjuncts_fn = CodegenEvalConjuncts(codegen, other_join_conjuncts_);
658  if (join_conjuncts_fn == NULL) return NULL;
659 
660  // Codegen evaluating conjuncts
661  Function* conjuncts_fn = CodegenEvalConjuncts(codegen, conjuncts_);
662  if (conjuncts_fn == NULL) return NULL;
663 
664  // Replace all call sites with codegen version
665  int replaced = 0;
666  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
667  hash_fn, "HashCurrentRow", &replaced);
668  DCHECK_EQ(replaced, 1);
669 
670  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
671  eval_row_fn, "EvalProbeRow", &replaced);
672  DCHECK_EQ(replaced, 1);
673 
674  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
675  create_output_row_fn, "CreateOutputRow", &replaced);
676  DCHECK_EQ(replaced, 2);
677 
678  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
679  conjuncts_fn, "EvalConjuncts", &replaced);
680  DCHECK_EQ(replaced, 2);
681 
682  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
683  join_conjuncts_fn, "EvalOtherJoinConjuncts", &replaced);
684  DCHECK_EQ(replaced, 1);
685 
686  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
687  equals_fn, "Equals", &replaced);
688  DCHECK_EQ(replaced, 2);
689 
690  return codegen->OptimizeFunctionWithExprs(process_probe_batch_fn);
691 }
int id() const
Definition: exec-node.h:154
RuntimeProfile::Counter * hash_tbl_load_factor_counter_
int num_rows() const
Definition: row-batch.h:215
int64_t num_rows_returned_
Definition: exec-node.h:223
static const char * LLVM_CLASS_NAME
Tuple * GetTuple(int tuple_idx)
Definition: tuple-row.h:30
OldHashTable::Iterator hash_tbl_iterator_
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
void CreateOutputRow(TupleRow *out_row, TupleRow *probe_row, TupleRow *build_row)
llvm::Function * CodegenProcessProbeBatch(RuntimeState *state, llvm::Function *hash_fn)
Utility struct that wraps a variable name and llvm type.
Definition: llvm-codegen.h:149
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
RowDescriptor row_descriptor_
Definition: exec-node.h:215
#define ADD_TIMER(profile, name)
RuntimeProfile::Counter * build_timer_
const RowDescriptor & row_desc() const
Definition: exec-node.h:156
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Definition: exec-node.cc:378
void(* ProcessBuildBatchFn)(HashJoinNode *, RowBatch *)
bool ReachedLimit()
Definition: exec-node.h:159
llvm::Function * CodegenProcessBuildBatch(RuntimeState *state, llvm::Function *hash_fn)
void BuildSideThread(RuntimeState *state, Promise< Status > *status)
#define SCOPED_TIMER(c)
int(* ProcessProbeBatchFn)(HashJoinNode *, RowBatch *, RowBatch *, int)
HashJoinNode::ProcessProbeBatch() exactly.
llvm::Value * null_ptr_value()
Definition: llvm-codegen.h:382
boost::scoped_ptr< OldHashTable > hash_tbl_
RuntimeProfile::Counter * probe_timer_
LLVM code generator. This is the top level object to generate jitted code.
Definition: llvm-codegen.h:107
boost::scoped_ptr< MemPool > build_pool_
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
Definition: exec-node.cc:345
RuntimeProfile::Counter * probe_row_counter_
static const char * LLVM_CLASS_NAME
Definition: tuple-row.h:76
virtual Status Prepare(RuntimeState *state)
int ProcessProbeBatch(RowBatch *out_batch, RowBatch *probe_batch, int max_added_rows)
virtual Status Init(const TPlanNode &tnode)
void AddArgument(const NamedVariable &var)
Add argument.
Definition: llvm-codegen.h:171
static llvm::Function * CodegenEvalConjuncts(RuntimeState *state, const std::vector< ExprContext * > &conjunct_ctxs, const char *name="EvalConjuncts")
Definition: exec-node.cc:452
void CodegenMemcpy(LlvmBuilder *, llvm::Value *dst, llvm::Value *src, int size)
RuntimeProfile::Counter * build_buckets_counter_
ProcessProbeBatchFn process_probe_batch_fn_
Jitted ProcessProbeBatch function pointer. Null if codegen is disabled.
#define RETURN_IF_CANCELLED(state)
ObjectPool pool
virtual Status Prepare(RuntimeState *state)
Definition: exec-node.cc:130
#define ADD_COUNTER(profile, name, unit)
llvm::Function * GetFunction(IRFunction::Type)
This is the superclass of all expr evaluation nodes.
Definition: expr.h:116
#define VLOG_ROW
Definition: logging.h:59
int GetTupleIdx(TupleId id) const
Returns INVALID_IDX if id not part of this row.
Definition: descriptors.cc:328
void CommitLastRow()
Definition: row-batch.h:109
void ProcessBuildBatch(RowBatch *build_batch)
Construct the build hash table, adding all the rows in 'build_batch'.
virtual Status Open(RuntimeState *state)
llvm::Function * codegen_process_build_batch_fn_
llvm function for build batch
int64_t rows_returned() const
Definition: exec-node.h:157
#define COUNTER_SET(c, v)
int batch_size() const
Definition: runtime-state.h:98
TupleId id() const
Definition: descriptors.h:306
static Status CreateExprTree(ObjectPool *pool, const TExpr &texpr, ExprContext **ctx)
Definition: expr.cc:129
RuntimeProfile::Counter * rows_returned_counter_
Definition: exec-node.h:226
void IR_ALWAYS_INLINE Next()
ExecNode * child(int i)
Definition: exec-node.h:241
llvm::Function * CodegenCreateOutputRow(LlvmCodeGen *codegen)
Codegen function to create output row.
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
Definition: descriptors.h:412
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
int capacity() const
Definition: row-batch.h:216
static const Status OK
Definition: status.h:87
llvm::Type * GetType(const ColumnType &type)
Returns llvm type for the column type.
llvm::Value * GetIntConstant(PrimitiveType type, int64_t val)
Returns the constant 'val' of 'type'.
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
Definition: expr.cc:149
RuntimeProfile::Counter * build_row_counter_
llvm::Function * FinalizeFunction(llvm::Function *function)
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
Definition: exec-node.cc:393
virtual void Close(RuntimeState *state)
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
virtual Status Open(RuntimeState *state)
Definition: exec-node.cc:154
boost::scoped_ptr< RowBatch > probe_batch_
llvm::Function * ReplaceCallSites(llvm::Function *caller, bool update_in_place, llvm::Function *new_fn, const std::string &target_name, int *num_replaced)
string PrintRow(TupleRow *row, const RowDescriptor &d)
Definition: debug-util.cc:192
bool ok() const
Definition: status.h:172
string PrintTuple(const Tuple *t, const TupleDescriptor &d)
Definition: debug-util.cc:166
llvm::Type * void_type()
Definition: llvm-codegen.h:394
virtual void Close(RuntimeState *state)
Definition: exec-node.cc:166
Status LeftJoinGetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
llvm::LLVMContext & context()
Definition: llvm-codegen.h:214
ProcessBuildBatchFn process_build_batch_fn_
virtual std::string DebugString() const
Definition: expr.cc:385
llvm::Function * OptimizeFunctionWithExprs(llvm::Function *fn)
llvm::PointerType * ptr_type()
Definition: llvm-codegen.h:393
int64_t limit() const
Definition: exec-node.h:158
bool match_one_build_
Match at most one build row to each probe row. Used in LEFT_SEMI_JOIN.
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161