Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hash-join-node.cc.BACKUP.32326.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/hash-join-node.h"
16 
17 #include <sstream>
18 
19 #include "codegen/llvm-codegen.h"
20 #include "exec/hash-table.inline.h"
21 #include "exprs/expr.h"
22 #include "runtime/row-batch.h"
23 #include "runtime/runtime-state.h"
24 #include "util/debug-util.h"
25 #include "util/runtime-profile.h"
26 
27 #include "gen-cpp/PlanNodes_types.h"
28 
29 using namespace boost;
30 using namespace impala;
31 using namespace llvm;
32 using namespace std;
33 
34 const char* HashJoinNode::LLVM_CLASS_NAME = "class.impala::HashJoinNode";
35 
36 HashJoinNode::HashJoinNode(
37  ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
38  : ExecNode(pool, tnode, descs),
39  join_op_(tnode.hash_join_node.join_op),
40  codegen_process_build_batch_fn_(NULL),
41  process_build_batch_fn_(NULL),
42  codegen_process_probe_batch_fn_(NULL),
43  process_probe_batch_fn_(NULL) {
44  // TODO: log errors in runtime state
45  Status status = Init(pool, tnode);
46  DCHECK(status.ok())
47  << "HashJoinNode c'tor: Init() failed:\n"
48  << status.GetErrorMsg();
49 
51  (join_op_ == TJoinOp::LEFT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN);
52  match_one_build_ = (join_op_ == TJoinOp::LEFT_SEMI_JOIN);
54  (join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN);
55 }
56 
57 Status HashJoinNode::Init(ObjectPool* pool, const TPlanNode& tnode) {
58  DCHECK(tnode.__isset.hash_join_node);
59  const vector<TEqJoinCondition>& eq_join_conjuncts =
60  tnode.hash_join_node.eq_join_conjuncts;
61  for (int i = 0; i < eq_join_conjuncts.size(); ++i) {
62  Expr* expr;
63  RETURN_IF_ERROR(Expr::CreateExprTree(pool, eq_join_conjuncts[i].left, &expr));
64  probe_exprs_.push_back(expr);
65  RETURN_IF_ERROR(Expr::CreateExprTree(pool, eq_join_conjuncts[i].right, &expr));
66  build_exprs_.push_back(expr);
67  }
69  Expr::CreateExprTrees(pool, tnode.hash_join_node.other_join_conjuncts,
70  &other_join_conjuncts_));
71  return Status::OK;
72 }
73 
74 HashJoinNode::~HashJoinNode() {
75  // probe_batch_ must be cleaned up in Close() to ensure proper resource freeing.
76  DCHECK(probe_batch_ == NULL);
77 }
78 
81 
82  build_pool_.reset(new MemPool(state->mem_limits())),
83  build_timer_ =
84  ADD_TIMER(runtime_profile(), "BuildTime");
85  probe_timer_ =
86  ADD_TIMER(runtime_profile(), "ProbeTime");
88  ADD_COUNTER(runtime_profile(), "BuildRows", TCounterType::UNIT);
90  ADD_COUNTER(runtime_profile(), "BuildBuckets", TCounterType::UNIT);
92  ADD_COUNTER(runtime_profile(), "ProbeRows", TCounterType::UNIT);
94  ADD_COUNTER(runtime_profile(), "LoadFactor", TCounterType::DOUBLE_VALUE);
95 
96  // build and probe exprs are evaluated in the context of the rows produced by our
97  // right and left children, respectively
98  RETURN_IF_ERROR(Expr::Prepare(build_exprs_, state, child(1)->row_desc(), false));
99  RETURN_IF_ERROR(Expr::Prepare(probe_exprs_, state, child(0)->row_desc(), false));
100 
101  // other_join_conjuncts_ are evaluated in the context of the rows produced by this node
102  RETURN_IF_ERROR(Expr::Prepare(other_join_conjuncts_, state, row_descriptor_, false));
103 
104  result_tuple_row_size_ = row_descriptor_.tuple_descriptors().size() * sizeof(Tuple*);
105 
106  // pre-compute the tuple index of build tuples in the output row
107  build_tuple_size_ = child(1)->row_desc().tuple_descriptors().size();
108  build_tuple_idx_.reserve(build_tuple_size_);
109  for (int i = 0; i < build_tuple_size_; ++i) {
110  TupleDescriptor* build_tuple_desc = child(1)->row_desc().tuple_descriptors()[i];
111  build_tuple_idx_.push_back(row_descriptor_.GetTupleIdx(build_tuple_desc->id()));
112  }
113 
114  // TODO: default buckets
115  hash_tbl_.reset(new HashTable(build_exprs_, probe_exprs_, build_tuple_size_,
116  false, id(), *state->mem_limits()));
117 
118  probe_batch_.reset(
119  new RowBatch(row_descriptor_, state->batch_size(), *state->mem_limits()));
120 
121  LlvmCodeGen* codegen = state->llvm_codegen();
122  if (codegen != NULL) {
123  // Codegen for hashing rows
124  Function* hash_fn = hash_tbl_->CodegenHashCurrentRow(codegen);
125  if (hash_fn == NULL) return Status::OK;
126 
127  // Codegen for build path
129 
130  // Codegen for probe path (only for left joins)
131  if (!match_all_build_) {
132  codegen_process_probe_batch_fn_ = CodegenProcessProbeBatch(codegen, hash_fn);
133  }
134  }
135  return Status::OK;
136 }
137 
139  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::CLOSE, state));
140  // Must reset probe_batch_ in Close() to release resources
141  probe_batch_.reset(NULL);
142  if (memory_used_counter_ != NULL && hash_tbl_.get() != NULL) {
143  COUNTER_UPDATE(memory_used_counter_, build_pool_->peak_allocated_bytes());
144  COUNTER_UPDATE(memory_used_counter_, hash_tbl_->byte_size());
145  }
146  return ExecNode::Close(state);
147 }
148 
149 void HashJoinNode::BuildSideThread(RuntimeState* state, promise<Status>* status) {
150  status->set_value(ConstructHashTable(state));
151  // Release the thread token as soon as possible (before the main thread joins
152  // on it). This way, if we had a chain of 10 joins using 1 additional thread,
153  // we'd keep the additional thread busy the whole time.
154  state->resource_pool()->ReleaseThreadToken(false);
155 }
156 
157 Status HashJoinNode::ConstructHashTable(RuntimeState* state) {
158  // Do a full scan of child(1) and store everything in hash_tbl_
159  // The hash join node needs to keep in memory all build tuples, including the tuple
160  // row ptrs. The row ptrs are copied into the hash table's internal structure so they
161  // don't need to be stored in the build_pool_.
162  RowBatch build_batch(child(1)->row_desc(), state->batch_size(), *state->mem_limits());
163  RETURN_IF_ERROR(child(1)->Open(state));
164  while (true) {
165  RETURN_IF_CANCELLED(state);
166  bool eos;
167  RETURN_IF_ERROR(child(1)->GetNext(state, &build_batch, &eos));
169  // take ownership of tuple data of build_batch
170  build_pool_->AcquireData(build_batch.tuple_data_pool(), false);
171  RETURN_IF_LIMIT_EXCEEDED(state);
172 
173  // Call codegen version if possible
174  if (process_build_batch_fn_ == NULL) {
175  ProcessBuildBatch(&build_batch);
176  } else {
177  process_build_batch_fn_(this, &build_batch);
178  }
179  VLOG_ROW << hash_tbl_->DebugString(true, &child(1)->row_desc());
180 
184  build_batch.Reset();
185  if (eos) break;
186  }
187  return Status::OK;
188 }
189 
191  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::OPEN, state));
192  SCOPED_TIMER(runtime_profile_->total_time_counter());
193  RETURN_IF_CANCELLED(state);
194 
195  if (codegen_process_build_batch_fn_ != NULL) {
196  void* jitted_process_build_batch =
197  state->llvm_codegen()->JitFunction(codegen_process_build_batch_fn_);
198  DCHECK(jitted_process_build_batch != NULL);
200  reinterpret_cast<ProcessBuildBatchFn>(jitted_process_build_batch);
201  AddRuntimeExecOption("Build Side Codegen Enabled");
202  }
203 
204  if (codegen_process_probe_batch_fn_ != NULL) {
205  void* jitted_process_probe_batch =
206  state->llvm_codegen()->JitFunction(codegen_process_probe_batch_fn_);
207  DCHECK(jitted_process_probe_batch != NULL);
209  reinterpret_cast<ProcessProbeBatchFn>(jitted_process_probe_batch);
210  AddRuntimeExecOption("Probe Side Codegen Enabled");
211  }
212 
213  eos_ = false;
214 
215  // TODO: fix problems with asynchronous cancellation
216  // Kick-off the construction of the build-side table in a separate
217  // thread, so that the left child can do any initialisation in parallel.
218  // Only do this if we can get a thread token. Otherwise, do this in the
219  // main thread
220  promise<Status> thread_status;
221 <<<<<<< HEAD
222  if (state->resource_pool()->TryAcquireThreadToken()) {
223  AddRuntimeExecOption("Hash Table Built Asynchronously");
224  thread(bind(&HashJoinNode::BuildSideThread, this, state, &thread_status));
225  } else {
226  thread_status.set_value(ConstructHashTable(state));
227  }
228 =======
229  Thread build_thread("hash-join-node", "build thread",
230  &HashJoinNode::BuildSideThread, this, state, &thread_status);
231 >>>>>>> 48bb373... Add ImpalaThread to track thread usage
232 
233  // Open the probe-side child so that it may perform any initialisation in parallel.
234  // Don't exit even if we see an error, we still need to wait for the build thread
235  // to finish.
236  Status open_status = child(0)->Open(state);
237 
238  // Blocks until ConstructHashTable has returned, after which
239  // the hash table is fully constructed and we can start the probe
240  // phase.
241  RETURN_IF_ERROR(thread_status.get_future().get());
242 
243  VLOG_ROW << hash_tbl_->DebugString(true, &child(1)->row_desc());
244  RETURN_IF_ERROR(open_status);
245 
246  // seed probe batch and current_probe_row_, etc.
247  while (true) {
248  RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_eos_));
249  COUNTER_UPDATE(probe_row_counter_, probe_batch_->num_rows());
250  probe_batch_pos_ = 0;
251  if (probe_batch_->num_rows() == 0) {
252  if (probe_eos_) {
253  hash_tbl_iterator_ = hash_tbl_->Begin();
254  eos_ = true;
255  break;
256  }
257  probe_batch_->Reset();
258  continue;
259  } else {
260  current_probe_row_ = probe_batch_->GetRow(probe_batch_pos_++);
261  VLOG_ROW << "probe row: " << GetProbeRowOutputString(current_probe_row_);
262  matched_probe_ = false;
264  break;
265  }
266  }
267 
268  return Status::OK;
269 }
270 
271 Status HashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch, bool* eos) {
272  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
273  RETURN_IF_CANCELLED(state);
274  SCOPED_TIMER(runtime_profile_->total_time_counter());
275  if (ReachedLimit()) {
276  *eos = true;
277  return Status::OK;
278  }
279 
280  // These cases are simpler and use a more efficient processing loop
281  if (!match_all_build_) {
282  if (eos_) {
283  *eos = true;
284  return Status::OK;
285  }
286  return LeftJoinGetNext(state, out_batch, eos);
287  }
288 
289  Expr* const* other_conjuncts = &other_join_conjuncts_[0];
290  int num_other_conjuncts = other_join_conjuncts_.size();
291 
292  Expr* const* conjuncts = &conjuncts_[0];
293  int num_conjuncts = conjuncts_.size();
294 
295  // Explicitly manage the timer counter to avoid measuring time in the child
296  // GetNext call.
298 
299  while (!eos_) {
300  // create output rows as long as:
301  // 1) we haven't already created an output row for the probe row and are doing
302  // a semi-join;
303  // 2) there are more matching build rows
304  while (hash_tbl_iterator_.HasNext()) {
305  TupleRow* matched_build_row = hash_tbl_iterator_.GetRow();
306  hash_tbl_iterator_.Next<true>();
307 
308  int row_idx = out_batch->AddRow();
309  TupleRow* out_row = out_batch->GetRow(row_idx);
310  CreateOutputRow(out_row, current_probe_row_, matched_build_row);
311  if (!EvalConjuncts(other_conjuncts, num_other_conjuncts, out_row)) continue;
312  // we have a match for the purpose of the (outer?) join as soon as we
313  // satisfy the JOIN clause conjuncts
314  matched_probe_ = true;
315  if (match_all_build_) {
316  // remember that we matched this build row
317  joined_build_rows_.insert(matched_build_row);
318  VLOG_ROW << "joined build row: " << matched_build_row;
319  }
320  if (EvalConjuncts(conjuncts, num_conjuncts, out_row)) {
321  out_batch->CommitLastRow();
322  VLOG_ROW << "match row: " << PrintRow(out_row, row_desc());
325  if (out_batch->IsFull() || ReachedLimit()) {
326  *eos = ReachedLimit();
327  return Status::OK;
328  }
329  }
330  }
331 
332  // check whether we need to output the current probe row before
333  // getting a new probe batch
335  int row_idx = out_batch->AddRow();
336  TupleRow* out_row = out_batch->GetRow(row_idx);
337  CreateOutputRow(out_row, current_probe_row_, NULL);
338  if (EvalConjuncts(conjuncts, num_conjuncts, out_row)) {
339  out_batch->CommitLastRow();
340  VLOG_ROW << "match row: " << PrintRow(out_row, row_desc());
343  matched_probe_ = true;
344  if (out_batch->IsFull() || ReachedLimit()) {
345  *eos = ReachedLimit();
346  return Status::OK;
347  }
348  }
349  }
350 
351  if (probe_batch_pos_ == probe_batch_->num_rows()) {
352  // pass on resources, out_batch might still need them
353  probe_batch_->TransferResourceOwnership(out_batch);
354  probe_batch_pos_ = 0;
355  if (out_batch->IsFull() || out_batch->AtResourceLimit()) return Status::OK;
356  // get new probe batch
357  if (!probe_eos_) {
358  while (true) {
359  probe_timer.Stop();
360  RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_eos_));
361  probe_timer.Start();
362  if (probe_batch_->num_rows() == 0) {
363  // Empty batches can still contain IO buffers, which need to be passed up to
364  // the caller; transferring resources can fill up out_batch.
365  probe_batch_->TransferResourceOwnership(out_batch);
366  if (probe_eos_) {
367  eos_ = true;
368  break;
369  }
370  if (out_batch->IsFull() || out_batch->AtResourceLimit()) return Status::OK;
371  continue;
372  } else {
373  COUNTER_UPDATE(probe_row_counter_, probe_batch_->num_rows());
374  break;
375  }
376  }
377  } else {
378  eos_ = true;
379  }
380  // finish up right outer join
381  if (eos_ && match_all_build_) {
382  hash_tbl_iterator_ = hash_tbl_->Begin();
383  }
384  }
385 
386  if (eos_) break;
387 
388  // join remaining rows in probe batch_
389  current_probe_row_ = probe_batch_->GetRow(probe_batch_pos_++);
390  VLOG_ROW << "probe row: " << GetProbeRowOutputString(current_probe_row_);
391  matched_probe_ = false;
393  }
394 
395  *eos = true;
396  if (match_all_build_) {
397  // output remaining unmatched build rows
398  TupleRow* build_row = NULL;
399  while (!out_batch->IsFull() && hash_tbl_iterator_.HasNext()) {
400  build_row = hash_tbl_iterator_.GetRow();
401  hash_tbl_iterator_.Next<false>();
402  if (joined_build_rows_.find(build_row) != joined_build_rows_.end()) {
403  continue;
404  }
405  int row_idx = out_batch->AddRow();
406  TupleRow* out_row = out_batch->GetRow(row_idx);
407  CreateOutputRow(out_row, NULL, build_row);
408  if (EvalConjuncts(conjuncts, num_conjuncts, out_row)) {
409  out_batch->CommitLastRow();
410  VLOG_ROW << "match row: " << PrintRow(out_row, row_desc());
413  if (ReachedLimit()) {
414  *eos = true;
415  return Status::OK;
416  }
417  }
418  }
419  // we're done if there are no more rows left to check
420  *eos = !hash_tbl_iterator_.HasNext();
421  }
422  return Status::OK;
423 }
424 
426  RowBatch* out_batch, bool* eos) {
427  *eos = eos_;
428 
430  while (!eos_) {
431  // Compute max rows that should be added to out_batch
432  int64_t max_added_rows = out_batch->capacity() - out_batch->num_rows();
433  if (limit() != -1) max_added_rows = min(max_added_rows, limit() - rows_returned());
434 
435  // Continue processing this row batch
436  if (process_probe_batch_fn_ == NULL) {
438  ProcessProbeBatch(out_batch, probe_batch_.get(), max_added_rows);
440  } else {
441  // Use codegen'd function
443  process_probe_batch_fn_(this, out_batch, probe_batch_.get(), max_added_rows);
445  }
446 
447  if (ReachedLimit() || out_batch->IsFull()) {
448  *eos = ReachedLimit();
449  break;
450  }
451 
452  // Check to see if we're done processing the current probe batch
453  if (!hash_tbl_iterator_.HasNext() && probe_batch_pos_ == probe_batch_->num_rows()) {
454  probe_batch_->TransferResourceOwnership(out_batch);
455  probe_batch_pos_ = 0;
456  if (out_batch->IsFull() || out_batch->AtResourceLimit()) break;
457  if (probe_eos_) {
458  *eos = eos_ = true;
459  break;
460  } else {
461  probe_timer.Stop();
462  RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_eos_));
463  probe_timer.Start();
464  COUNTER_UPDATE(probe_row_counter_, probe_batch_->num_rows());
465  }
466  }
467  }
468 
469  return Status::OK;
470 }
471 
472 string HashJoinNode::GetProbeRowOutputString(TupleRow* probe_row) {
473  stringstream out;
474  out << "[";
475  int* build_tuple_idx_ptr_ = &build_tuple_idx_[0];
476  for (int i = 0; i < row_desc().tuple_descriptors().size(); ++i) {
477  if (i != 0) out << " ";
478 
479  int* is_build_tuple =
480  ::find(build_tuple_idx_ptr_, build_tuple_idx_ptr_ + build_tuple_size_, i);
481 
482  if (is_build_tuple != build_tuple_idx_ptr_ + build_tuple_size_) {
483  out << PrintTuple(NULL, *row_desc().tuple_descriptors()[i]);
484  } else {
485  out << PrintTuple(probe_row->GetTuple(i), *row_desc().tuple_descriptors()[i]);
486  }
487  }
488  out << "]";
489  return out.str();
490 }
491 
492 void HashJoinNode::DebugString(int indentation_level, stringstream* out) const {
493  *out << string(indentation_level * 2, ' ');
494  *out << "HashJoin(eos=" << (eos_ ? "true" : "false")
495  << " probe_batch_pos=" << probe_batch_pos_
496  << " hash_tbl=";
497  *out << string(indentation_level * 2, ' ');
498  *out << "HashTbl("
499  << " build_exprs=" << Expr::DebugString(build_exprs_)
500  << " probe_exprs=" << Expr::DebugString(probe_exprs_);
501  *out << ")";
502  ExecNode::DebugString(indentation_level, out);
503  *out << ")";
504 }
505 
506 // This function is replaced by codegen
507 void HashJoinNode::CreateOutputRow(TupleRow* out, TupleRow* probe, TupleRow* build) {
508  if (probe == NULL) {
509  memset(out, 0, result_tuple_row_size_);
510  } else {
511  memcpy(out, probe, result_tuple_row_size_);
512  }
513 
514  if (build != NULL) {
515  for (int i = 0; i < build_tuple_size_; ++i) {
516  out->SetTuple(build_tuple_idx_[i], build->GetTuple(i));
517  }
518  } else {
519  for (int i = 0; i < build_tuple_size_; ++i) {
520  out->SetTuple(build_tuple_idx_[i], NULL);
521  }
522  }
523 }
524 
525 // This codegen'd function should only be used for left join cases so it assumes that
526 // the probe row is non-null. For a left outer join, the IR looks like:
527 // define void @CreateOutputRow(%"class.impala::TupleRow"* %out_arg,
528 // %"class.impala::TupleRow"* %probe_arg,
529 // %"class.impala::TupleRow"* %build_arg) {
530 // entry:
531 // %out = bitcast %"class.impala::TupleRow"* %out_arg to i8**
532 // %probe = bitcast %"class.impala::TupleRow"* %probe_arg to i8**
533 // %build = bitcast %"class.impala::TupleRow"* %build_arg to i8**
534 // %0 = bitcast i8** %out to i8*
535 // %1 = bitcast i8** %probe to i8*
536 // call void @llvm.memcpy.p0i8.p0i8.i32(i8* %0, i8* %1, i32 16, i32 16, i1 false)
537 // %is_build_null = icmp eq i8** %build, null
538 // br i1 %is_build_null, label %build_null, label %build_not_null
539 //
540 // build_not_null: ; preds = %entry
541 // %dst_tuple_ptr1 = getelementptr i8** %out, i32 1
542 // %src_tuple_ptr = getelementptr i8** %build, i32 0
543 // %2 = load i8** %src_tuple_ptr
544 // store i8* %2, i8** %dst_tuple_ptr1
545 // ret void
546 //
547 // build_null: ; preds = %entry
548 // %dst_tuple_ptr = getelementptr i8** %out, i32 1
549 // store i8* null, i8** %dst_tuple_ptr
550 // ret void
551 // }
553  Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
554  DCHECK(tuple_row_type != NULL);
555  PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
556 
557  Type* this_type = codegen->GetType(HashJoinNode::LLVM_CLASS_NAME);
558  DCHECK(this_type != NULL);
559  PointerType* this_ptr_type = PointerType::get(this_type, 0);
560 
561  // TupleRows are really just an array of pointers. Easier to work with them
562  // this way.
563  PointerType* tuple_row_working_type = PointerType::get(codegen->ptr_type(), 0);
564 
565  // Construct function signature to match CreateOutputRow()
566  LlvmCodeGen::FnPrototype prototype(codegen, "CreateOutputRow", codegen->void_type());
567  prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
568  prototype.AddArgument(LlvmCodeGen::NamedVariable("out_arg", tuple_row_ptr_type));
569  prototype.AddArgument(LlvmCodeGen::NamedVariable("probe_arg", tuple_row_ptr_type));
570  prototype.AddArgument(LlvmCodeGen::NamedVariable("build_arg", tuple_row_ptr_type));
571 
572  LLVMContext& context = codegen->context();
573  LlvmCodeGen::LlvmBuilder builder(context);
574  Value* args[4];
575  Function* fn = prototype.GeneratePrototype(&builder, args);
576  Value* out_row_arg = builder.CreateBitCast(args[1], tuple_row_working_type, "out");
577  Value* probe_row_arg = builder.CreateBitCast(args[2], tuple_row_working_type, "probe");
578  Value* build_row_arg = builder.CreateBitCast(args[3], tuple_row_working_type, "build");
579 
580  // Copy probe row
581  codegen->CodegenMemcpy(&builder, out_row_arg, probe_row_arg, result_tuple_row_size_);
582 
583  // Copy build row.
584  BasicBlock* build_not_null_block = BasicBlock::Create(context, "build_not_null", fn);
585  BasicBlock* build_null_block = NULL;
586 
587  if (match_all_probe_) {
588  // build tuple can be null
589  build_null_block = BasicBlock::Create(context, "build_null", fn);
590  Value* is_build_null = builder.CreateIsNull(build_row_arg, "is_build_null");
591  builder.CreateCondBr(is_build_null, build_null_block, build_not_null_block);
592 
593  // Set tuple build ptrs to NULL
594  builder.SetInsertPoint(build_null_block);
595  for (int i = 0; i < build_tuple_size_; ++i) {
596  Value* array_idx[] = { codegen->GetIntConstant(TYPE_INT, build_tuple_idx_[i]) };
597  Value* dst = builder.CreateGEP(out_row_arg, array_idx, "dst_tuple_ptr");
598  builder.CreateStore(codegen->null_ptr_value(), dst);
599  }
600  builder.CreateRetVoid();
601  } else {
602  // build row can't be NULL
603  builder.CreateBr(build_not_null_block);
604  }
605 
606  // Copy build tuple ptrs
607  builder.SetInsertPoint(build_not_null_block);
608  for (int i = 0; i < build_tuple_size_; ++i) {
609  Value* dst_idx[] = { codegen->GetIntConstant(TYPE_INT, build_tuple_idx_[i]) };
610  Value* src_idx[] = { codegen->GetIntConstant(TYPE_INT, i) };
611  Value* dst = builder.CreateGEP(out_row_arg, dst_idx, "dst_tuple_ptr");
612  Value* src = builder.CreateGEP(build_row_arg, src_idx, "src_tuple_ptr");
613  builder.CreateStore(builder.CreateLoad(src), dst);
614  }
615  builder.CreateRetVoid();
616 
617  return codegen->FinalizeFunction(fn);
618 }
619 
621  Function* hash_fn) {
622  // Get cross compiled function
623  Function* process_build_batch_fn = codegen->GetFunction(
624  IRFunction::HASH_JOIN_PROCESS_BUILD_BATCH);
625  DCHECK(process_build_batch_fn != NULL);
626 
627  // Codegen for evaluating build rows
628  Function* eval_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, true);
629  if (eval_row_fn == NULL) return NULL;
630 
631  int replaced = 0;
632  // Replace call sites
633  process_build_batch_fn = codegen->ReplaceCallSites(process_build_batch_fn, false,
634  eval_row_fn, "EvalBuildRow", &replaced);
635  DCHECK_EQ(replaced, 1);
636 
637  process_build_batch_fn = codegen->ReplaceCallSites(process_build_batch_fn, false,
638  hash_fn, "HashCurrentRow", &replaced);
639  DCHECK_EQ(replaced, 1);
640 
641  return codegen->OptimizeFunctionWithExprs(process_build_batch_fn);
642 }
643 
645  Function* hash_fn) {
646  // Get cross compiled function
647  Function* process_probe_batch_fn = codegen->GetFunction(
648  IRFunction::HASH_JOIN_PROCESS_PROBE_BATCH);
649  DCHECK(process_probe_batch_fn != NULL);
650 
651  // Codegen HashTable::Equals
652  Function* equals_fn = hash_tbl_->CodegenEquals(codegen);
653  if (equals_fn == NULL) return NULL;
654 
655  // Codegen for evaluating build rows
656  Function* eval_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, false);
657  if (eval_row_fn == NULL) return NULL;
658 
659  // Codegen CreateOutputRow
660  Function* create_output_row_fn = CodegenCreateOutputRow(codegen);
661  if (create_output_row_fn == NULL) return NULL;
662 
663  // Codegen evaluating other join conjuncts
664  Function* join_conjuncts_fn = CodegenEvalConjuncts(codegen, other_join_conjuncts_);
665  if (join_conjuncts_fn == NULL) return NULL;
666 
667  // Codegen evaluating conjuncts
668  Function* conjuncts_fn = CodegenEvalConjuncts(codegen, conjuncts_);
669  if (conjuncts_fn == NULL) return NULL;
670 
671  // Replace all call sites with codegen version
672  int replaced = 0;
673  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
674  hash_fn, "HashCurrentRow", &replaced);
675  DCHECK_EQ(replaced, 1);
676 
677  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
678  eval_row_fn, "EvalProbeRow", &replaced);
679  DCHECK_EQ(replaced, 1);
680 
681  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
682  create_output_row_fn, "CreateOutputRow", &replaced);
683  DCHECK_EQ(replaced, 2);
684 
685  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
686  conjuncts_fn, "EvalConjuncts", &replaced);
687  DCHECK_EQ(replaced, 2);
688 
689  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
690  join_conjuncts_fn, "EvalOtherJoinConjuncts", &replaced);
691  DCHECK_EQ(replaced, 1);
692 
693  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, false,
694  equals_fn, "Equals", &replaced);
695  DCHECK_EQ(replaced, 2);
696 
697  return codegen->OptimizeFunctionWithExprs(process_probe_batch_fn);
698 }
void AddRuntimeExecOption(const std::string &option)
Appends option to 'runtime_exec_options_'.
Definition: exec-node.cc:188
RuntimeProfile::Counter * hash_tbl_load_factor_counter_
int num_rows() const
Definition: row-batch.h:215
int64_t num_rows_returned_
Definition: exec-node.h:223
static const char * LLVM_CLASS_NAME
Tuple * GetTuple(int tuple_idx)
Definition: tuple-row.h:30
OldHashTable::Iterator hash_tbl_iterator_
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
bool TryAcquireThreadToken(bool *is_reserved=NULL)
void CreateOutputRow(TupleRow *out_row, TupleRow *probe_row, TupleRow *build_row)
llvm::Function * CodegenProcessProbeBatch(RuntimeState *state, llvm::Function *hash_fn)
Utility struct that wraps a variable name and llvm type.
Definition: llvm-codegen.h:149
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
RowDescriptor row_descriptor_
Definition: exec-node.h:215
#define ADD_TIMER(profile, name)
RuntimeProfile::Counter * build_timer_
const RowDescriptor & row_desc() const
Definition: exec-node.h:156
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Definition: exec-node.cc:378
void(* ProcessBuildBatchFn)(HashJoinNode *, RowBatch *)
bool ReachedLimit()
Definition: exec-node.h:159
llvm::Function * CodegenProcessBuildBatch(RuntimeState *state, llvm::Function *hash_fn)
void BuildSideThread(RuntimeState *state, Promise< Status > *status)
#define SCOPED_TIMER(c)
int(* ProcessProbeBatchFn)(HashJoinNode *, RowBatch *, RowBatch *, int)
HashJoinNode::ProcessProbeBatch() exactly.
llvm::Value * null_ptr_value()
Definition: llvm-codegen.h:382
boost::scoped_ptr< OldHashTable > hash_tbl_
RuntimeProfile::Counter * probe_timer_
LLVM code generator. This is the top level object to generate jitted code.
Definition: llvm-codegen.h:107
boost::scoped_ptr< MemPool > build_pool_
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
Definition: exec-node.cc:345
RuntimeProfile::Counter * probe_row_counter_
static const char * LLVM_CLASS_NAME
Definition: tuple-row.h:76
virtual Status Prepare(RuntimeState *state)
int ProcessProbeBatch(RowBatch *out_batch, RowBatch *probe_batch, int max_added_rows)
virtual Status Init(const TPlanNode &tnode)
void AddArgument(const NamedVariable &var)
Add argument.
Definition: llvm-codegen.h:171
static llvm::Function * CodegenEvalConjuncts(RuntimeState *state, const std::vector< ExprContext * > &conjunct_ctxs, const char *name="EvalConjuncts")
Definition: exec-node.cc:452
void CodegenMemcpy(LlvmBuilder *, llvm::Value *dst, llvm::Value *src, int size)
RuntimeProfile::Counter * build_buckets_counter_
ProcessProbeBatchFn process_probe_batch_fn_
Jitted ProcessProbeBatch function pointer. Null if codegen is disabled.
#define RETURN_IF_CANCELLED(state)
ObjectPool pool
virtual Status Prepare(RuntimeState *state)
Definition: exec-node.cc:130
#define ADD_COUNTER(profile, name, unit)
llvm::Function * GetFunction(IRFunction::Type)
This is the superclass of all expr evaluation nodes.
Definition: expr.h:116
#define VLOG_ROW
Definition: logging.h:59
int GetTupleIdx(TupleId id) const
Returns INVALID_IDX if id not part of this row.
Definition: descriptors.cc:328
void CommitLastRow()
Definition: row-batch.h:109
void ProcessBuildBatch(RowBatch *build_batch)
Construct the build hash table, adding all the rows in 'build_batch'.
virtual Status Open(RuntimeState *state)
llvm::Function * codegen_process_build_batch_fn_
llvm function for build batch
int64_t rows_returned() const
Definition: exec-node.h:157
#define COUNTER_SET(c, v)
int batch_size() const
Definition: runtime-state.h:98
TupleId id() const
Definition: descriptors.h:306
static Status CreateExprTree(ObjectPool *pool, const TExpr &texpr, ExprContext **ctx)
Definition: expr.cc:129
RuntimeProfile::Counter * rows_returned_counter_
Definition: exec-node.h:226
void IR_ALWAYS_INLINE Next()
ExecNode * child(int i)
Definition: exec-node.h:241
llvm::Function * CodegenCreateOutputRow(LlvmCodeGen *codegen)
Codegen function to create output row.
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
Definition: descriptors.h:412
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
int capacity() const
Definition: row-batch.h:216
static const Status OK
Definition: status.h:87
llvm::Type * GetType(const ColumnType &type)
Returns llvm type for the column type.
llvm::Value * GetIntConstant(PrimitiveType type, int64_t val)
Returns the constant 'val' of 'type'.
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
Definition: expr.cc:149
RuntimeProfile::Counter * build_row_counter_
llvm::Function * FinalizeFunction(llvm::Function *function)
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
Definition: exec-node.cc:393
virtual void Close(RuntimeState *state)
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
virtual Status Open(RuntimeState *state)
Definition: exec-node.cc:154
boost::scoped_ptr< RowBatch > probe_batch_
llvm::Function * ReplaceCallSites(llvm::Function *caller, bool update_in_place, llvm::Function *new_fn, const std::string &target_name, int *num_replaced)
string PrintRow(TupleRow *row, const RowDescriptor &d)
Definition: debug-util.cc:192
bool ok() const
Definition: status.h:172
string PrintTuple(const Tuple *t, const TupleDescriptor &d)
Definition: debug-util.cc:166
llvm::Type * void_type()
Definition: llvm-codegen.h:394
ThreadResourceMgr::ResourcePool * resource_pool()
virtual void Close(RuntimeState *state)
Definition: exec-node.cc:166
Status LeftJoinGetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
llvm::LLVMContext & context()
Definition: llvm-codegen.h:214
ProcessBuildBatchFn process_build_batch_fn_
virtual std::string DebugString() const
Definition: expr.cc:385
llvm::Function * OptimizeFunctionWithExprs(llvm::Function *fn)
llvm::PointerType * ptr_type()
Definition: llvm-codegen.h:393
int64_t limit() const
Definition: exec-node.h:158
bool match_one_build_
Match at most one build row to each probe row. Used in LEFT_SEMI_JOIN.
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161