Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
partitioned-hash-join-node.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <sstream>
18 #include <gutil/strings/substitute.h>
19 
20 #include "codegen/llvm-codegen.h"
21 #include "exec/hash-table.inline.h"
22 #include "exprs/expr.h"
23 #include "exprs/expr-context.h"
24 #include "exprs/slot-ref.h"
27 #include "runtime/row-batch.h"
28 #include "runtime/runtime-state.h"
29 #include "util/debug-util.h"
30 #include "util/runtime-profile.h"
31 
32 #include "gen-cpp/PlanNodes_types.h"
33 
34 #include "common/names.h"
35 
36 DEFINE_bool(enable_phj_probe_side_filtering, true,
37  "Enables pushing PHJ build side filters to probe side");
38 
39 using namespace impala;
40 using namespace llvm;
41 using namespace strings;
42 
44  ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
45  : BlockingJoinNode("PartitionedHashJoinNode", tnode.hash_join_node.join_op,
46  pool, tnode, descs),
47  using_small_buffers_(true),
48  state_(PARTITIONING_BUILD),
49  block_mgr_client_(NULL),
50  partition_build_timer_(NULL),
51  null_aware_eval_timer_(NULL),
52  process_build_batch_fn_(NULL),
53  process_build_batch_fn_level0_(NULL),
54  process_probe_batch_fn_(NULL),
55  process_probe_batch_fn_level0_(NULL),
56  input_partition_(NULL),
57  null_aware_partition_(NULL),
58  non_empty_build_(false),
59  null_probe_rows_(NULL),
60  null_probe_output_idx_(-1) {
61  memset(hash_tbls_, 0, sizeof(hash_tbls_));
62  can_add_probe_filters_ = tnode.hash_join_node.add_probe_filters;
63  can_add_probe_filters_ &= FLAGS_enable_phj_probe_side_filtering;
64 }
65 
66 Status PartitionedHashJoinNode::Init(const TPlanNode& tnode) {
68  DCHECK(tnode.__isset.hash_join_node);
69  const vector<TEqJoinCondition>& eq_join_conjuncts =
70  tnode.hash_join_node.eq_join_conjuncts;
71  for (int i = 0; i < eq_join_conjuncts.size(); ++i) {
72  ExprContext* ctx;
73  RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjuncts[i].left, &ctx));
74  probe_expr_ctxs_.push_back(ctx);
75  RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjuncts[i].right, &ctx));
76  build_expr_ctxs_.push_back(ctx);
77  }
79  Expr::CreateExprTrees(pool_, tnode.hash_join_node.other_join_conjuncts,
81 
82  if (join_op_ == TJoinOp::LEFT_SEMI_JOIN || join_op_ == TJoinOp::LEFT_ANTI_JOIN ||
83  join_op_ == TJoinOp::RIGHT_SEMI_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
84  join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
85  DCHECK_EQ(conjunct_ctxs_.size(), 0);
86 
87  if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
88  DCHECK_EQ(eq_join_conjuncts.size(), 1);
89  }
90  }
91  return Status::OK;
92 }
93 
95  SCOPED_TIMER(runtime_profile_->total_time_counter());
96 
97  // Create the codegen object before preparing conjunct_ctxs_ and children_, so that any
98  // ScalarFnCalls will use codegen.
99  // TODO: this is brittle and hard to reason about, revisit
100  if (state->codegen_enabled()) {
101  LlvmCodeGen* codegen;
102  RETURN_IF_ERROR(state->GetCodegen(&codegen));
103  }
104 
106  runtime_state_ = state;
107 
108  // build and probe exprs are evaluated in the context of the rows produced by our
109  // right and left children, respectively
114  // Although ConstructBuildSide() maybe be run in a separate thread, it is safe to free
115  // local allocations in QueryMaintenance() since the build thread is not run
116  // concurrently with other expr evaluation in this join node.
119 
120  // other_join_conjunct_ctxs_ are evaluated in the context of rows assembled from all
121  // build and probe tuples; full_row_desc is not necessarily the same as the output row
122  // desc, e.g., because semi joins only return the build xor probe tuples
123  RowDescriptor full_row_desc(child(0)->row_desc(), child(1)->row_desc());
125  Expr::Prepare(other_join_conjunct_ctxs_, state, full_row_desc, expr_mem_tracker()));
127 
130 
131  bool should_store_nulls = join_op_ == TJoinOp::RIGHT_OUTER_JOIN ||
132  join_op_ == TJoinOp::RIGHT_ANTI_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN;
134  should_store_nulls, false, state->fragment_hash_seed(), MAX_PARTITION_DEPTH,
135  child(1)->row_desc().tuple_descriptors().size()));
136 
137  if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
138  // Since there is only one such NAAJ stream, we don't worry about the memory consumed
139  // and always use IO-sized buffers.
140  null_aware_partition_ = pool_->Add(new Partition(state, this, 0, false));
143 
145  state, child(0)->row_desc(), state->block_mgr(), block_mgr_client_,
146  false /* use small buffers */, false /* delete on read */ ));
147  RETURN_IF_ERROR(null_probe_rows_->Init(runtime_profile(), false));
148  null_aware_eval_timer_ = ADD_TIMER(runtime_profile(), "NullAwareAntiJoinEvalTime");
149  }
150 
151  partition_build_timer_ = ADD_TIMER(runtime_profile(), "BuildPartitionTime");
153  ADD_COUNTER(runtime_profile(), "HashBuckets", TUnit::UNIT);
155  ADD_COUNTER(runtime_profile(), "PartitionsCreated", TUnit::UNIT);
157  "MaxPartitionLevel", TUnit::UNIT);
159  ADD_COUNTER(runtime_profile(), "BuildRowsPartitioned", TUnit::UNIT);
161  ADD_COUNTER(runtime_profile(), "ProbeRowsPartitioned", TUnit::UNIT);
163  ADD_COUNTER(runtime_profile(), "NumRepartitions", TUnit::UNIT);
165  ADD_COUNTER(runtime_profile(), "SpilledPartitions", TUnit::UNIT);
167  "LargestPartitionPercent", TUnit::UNIT);
168 
169  if (state->codegen_enabled()) {
170  // Codegen for hashing rows
171  Function* hash_fn = ht_ctx_->CodegenHashCurrentRow(state, false);
172  Function* murmur_hash_fn = ht_ctx_->CodegenHashCurrentRow(state, true);
173  if (hash_fn != NULL && murmur_hash_fn != NULL) {
174  // Codegen for build path
175  if (CodegenProcessBuildBatch(state, hash_fn, murmur_hash_fn)) {
176  AddRuntimeExecOption("Build Side Codegen Enabled");
177  }
178  // Codegen for probe path
179  if (CodegenProcessProbeBatch(state, hash_fn, murmur_hash_fn)) {
180  AddRuntimeExecOption("Probe Side Codegen Enabled");
181  }
182  }
183  }
184  return Status::OK;
185 }
186 
188  DCHECK(false) << "NYI";
189  return Status("NYI");
190 }
191 
193  if (is_closed()) return;
194  if (ht_ctx_.get() != NULL) ht_ctx_->Close();
195  for (int i = 0; i < hash_partitions_.size(); ++i) {
196  hash_partitions_[i]->Close(NULL);
197  }
198  for (list<Partition*>::iterator it = spilled_partitions_.begin();
199  it != spilled_partitions_.end(); ++it) {
200  (*it)->Close(NULL);
201  }
202  for (list<Partition*>::iterator it = output_build_partitions_.begin();
203  it != output_build_partitions_.end(); ++it) {
204  (*it)->Close(NULL);
205  }
206 
207  if (input_partition_ != NULL) input_partition_->Close(NULL);
209  if (null_probe_rows_ != NULL) null_probe_rows_->Close();
210  nulls_build_batch_.reset();
211 
212  if (block_mgr_client_ != NULL) {
214  }
219 }
220 
222  PartitionedHashJoinNode* parent, int level, bool use_small_buffers)
223  : parent_(parent),
224  is_closed_(false),
225  is_spilled_(false),
226  level_(level),
227  build_rows_(state->obj_pool()->Add(new BufferedTupleStream(
228  state, parent_->child(1)->row_desc(), state->block_mgr(),
229  parent_->block_mgr_client_, use_small_buffers))),
230  probe_rows_(state->obj_pool()->Add(new BufferedTupleStream(
231  state, parent_->child(0)->row_desc(),
232  state->block_mgr(), parent_->block_mgr_client_, use_small_buffers))) {
233 }
234 
236  DCHECK(is_closed());
237 }
238 
240  return build_rows_->byte_size() + HashTable::EstimateSize(build_rows_->num_rows());
241 }
242 
244  DCHECK(hash_tbl_.get() != NULL);
245  return build_rows_->byte_size() + hash_tbl_->byte_size();
246 }
247 
249  if (is_closed()) return;
250  is_closed_ = true;
251  if (hash_tbl_.get() != NULL) hash_tbl_->Close();
252 
253  // Transfer ownership of build_rows_/probe_rows_ to batch if batch is not NULL.
254  // Otherwise, close the stream here.
255  if (build_rows_ != NULL) {
256  if (batch == NULL) {
257  build_rows_->Close();
258  } else {
259  batch->AddTupleStream(build_rows_);
260  }
261  build_rows_ = NULL;
262  }
263  if (probe_rows_ != NULL) {
264  if (batch == NULL) {
265  probe_rows_->Close();
266  } else {
267  batch->AddTupleStream(probe_rows_);
268  }
269  probe_rows_ = NULL;
270  }
271 }
272 
274  if (!is_spilled_) {
275  COUNTER_ADD(parent_->num_spilled_partitions_, 1);
276  if (parent_->num_spilled_partitions_->value() == 1) {
277  parent_->AddRuntimeExecOption("Spilled");
278  }
279  }
280  is_spilled_ = true;
281  if (hash_tbl() != NULL) {
282  hash_tbl()->Close();
283  hash_tbl_.reset();
284  }
285  // If it is still using small buffers, then there is nothing else to do, as the small
286  // buffers are always kept in memory.
287  if (build_rows()->using_small_buffers()) return Status::OK;
288  return build_rows()->UnpinStream(unpin_all_build);
289 }
290 
292  bool* built, const bool add_probe_filters) {
293  if (add_probe_filters) {
294  return BuildHashTableInternal<true>(state, built);
295  } else {
296  return BuildHashTableInternal<false>(state, built);
297  }
298 }
299 
300 template<bool const AddProbeFilters>
302  RuntimeState* state, bool* built) {
303  DCHECK_NOTNULL(build_rows_);
304  *built = false;
305 
306  // TODO: estimate the entire size of the hash table and reserve all of it from
307  // the block mgr.
308 
309  // We got the buffers we think we will need, try to build the hash table.
310  RETURN_IF_ERROR(build_rows_->PinStream(false, built));
311  if (!*built) return Status::OK;
312  RETURN_IF_ERROR(build_rows_->PrepareForRead());
313 
314  bool eos = false;
315  RowBatch batch(parent_->child(1)->row_desc(), state->batch_size(),
316  parent_->mem_tracker());
317  HashTableCtx* ctx = parent_->ht_ctx_.get();
318  // TODO: move the batch and indices as members to avoid reallocating.
319  vector<BufferedTupleStream::RowIdx> indices;
320  uint32_t seed0 = ctx->seed(0);
321 
322  // Allocate the partition-local hash table. Initialize the number of buckets based on
323  // the number of build rows (the number of rows is known at this point). This assumes
324  // there are no duplicates which can be wrong. However, the upside in the common case
325  // (few/no duplicates) is large and the downside when there are is low (a bit more
326  // memory; the bucket memory is small compared to the memory needed for all the build
327  // side allocations). We always start with small pages in the hash table.
328  int64_t estimated_num_buckets =
329  HashTable::EstimateNumBuckets(build_rows()->num_rows());
330  hash_tbl_.reset(new HashTable(state, parent_->block_mgr_client_,
331  parent_->child(1)->row_desc().tuple_descriptors().size(), build_rows(),
332  1 << (32 - NUM_PARTITIONING_BITS), estimated_num_buckets));
333  if (!hash_tbl_->Init()) goto not_built;
334 
335  if (AddProbeFilters) DCHECK_EQ(level_, 0) << "Should not add filters if repartitioning";
336  while (!eos) {
337  RETURN_IF_ERROR(build_rows_->GetNext(&batch, &eos, &indices));
338  DCHECK_EQ(batch.num_rows(), indices.size());
339  int num_rows = batch.num_rows();
340  DCHECK_LE(num_rows, hash_tbl_->EmptyBuckets());
341  SCOPED_TIMER(parent_->build_timer_);
342  for (int i = 0; i < num_rows; ++i) {
343  TupleRow* row = batch.GetRow(i);
344  uint32_t hash = 0;
345  if (!ctx->EvalAndHashBuild(row, &hash)) continue;
346  if (UNLIKELY(!hash_tbl_->Insert(ctx, indices[i], row, hash))) goto not_built;
347  if (AddProbeFilters) {
348  // Update the probe filters for this row.
349  for (int j = 0; j < parent_->probe_filters_.size(); ++j) {
350  if (parent_->probe_filters_[j].second == NULL) continue;
351  void* e = parent_->build_expr_ctxs_[j]->GetValue(row);
352  uint32_t h = RawValue::GetHashValue(e,
353  parent_->build_expr_ctxs_[j]->root()->type(), seed0);
354  parent_->probe_filters_[j].second->Set<true>(h, true);
355  }
356  }
357  }
358  batch.Reset();
359  }
360  // The hash table fits in memory and is built.
361  DCHECK(*built);
362  DCHECK_NOTNULL(hash_tbl_.get());
363  is_spilled_ = false;
364  COUNTER_ADD(parent_->num_hash_buckets_, hash_tbl_->num_buckets());
365  return Status::OK;
366 
367 not_built:
368  *built = false;
369  if (hash_tbl_.get() != NULL) {
370  hash_tbl_->Close();
371  hash_tbl_.reset();
372  }
373  if (parent_->can_add_probe_filters_) {
374  // Disabling probe filter push down because not all rows will be included in the
375  // probe filter due to a spilled partition.
376  parent_->can_add_probe_filters_ = false;
377  VLOG(2) << "Disabling probe filter push down because a partition will spill.";
378  }
379  return Status::OK;
380 }
381 
383  if (!can_add_probe_filters_) return false;
384  DCHECK_NOTNULL(ht_ctx_.get());
385  DCHECK_EQ(build_expr_ctxs_.size(), probe_expr_ctxs_.size());
386  probe_filters_.resize(probe_expr_ctxs_.size());
387  for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
388  if (probe_expr_ctxs_[i]->root()->is_slotref()) {
389  // TODO: Enable probe filters not only for "naked" slotrefs.
390  probe_filters_[i].first =
391  reinterpret_cast<SlotRef*>(probe_expr_ctxs_[i]->root())->slot_id();
392  probe_filters_[i].second = new Bitmap(state->slot_filter_bitmap_size());
393  } else {
394  probe_filters_[i].second = NULL;
395  }
396  }
397  return true;
398 }
399 
402  // Add all the bitmaps to the runtime state.
403  AddRuntimeExecOption("Build-Side Filter Pushed Down");
404  bool acquired_ownership = false;
405  for (int i = 0; i < probe_filters_.size(); ++i) {
406  if (probe_filters_[i].second == NULL) continue;
407  state->AddBitmapFilter(probe_filters_[i].first, probe_filters_[i].second,
408  &acquired_ownership);
409  VLOG(2) << "Bitmap filter added on slot: " << probe_filters_[i].first;
410  if (!acquired_ownership) {
411  delete probe_filters_[i].second;
412  probe_filters_[i].second = NULL;
413  }
414  }
415  return true;
416  } else {
417  // Make sure there are no memory leaks.
418  for (int i = 0; i < probe_filters_.size(); ++i) {
419  if (probe_filters_[i].second == NULL) continue;
420  delete probe_filters_[i].second;
421  probe_filters_[i].second = NULL;
422  }
423  return false;
424  }
425 }
426 
428  TupleRow* row) {
429  status_ = stream->status();
430  if (!status_.ok()) return false;
431  if (using_small_buffers_) {
433  if (!status_.ok()) return false;
434  if (stream->AddRow(row)) return true;
435  }
436 
437  // We ran out of memory. Pick a partition to spill.
438  while (true) {
439  Partition* spilled_partition;
440  status_ = SpillPartition(&spilled_partition);
441  if (!status_.ok()) return false;
442  if (stream->AddRow(row)) return true;
443  // Spilling one partition does not guarantee we can append a row now. Keep
444  // spilling until we can append this row.
445  }
446  return false;
447 }
448 
449 // TODO: can we do better with the spilling heuristic.
451  int64_t max_freed_mem = 0;
452  int partition_idx = -1;
453  *spilled_partition = NULL;
454 
455  // Iterate over the partitions and pick the largest partition to spill.
456  for (int i = 0; i < hash_partitions_.size(); ++i) {
457  if (hash_partitions_[i]->is_closed()) continue;
458  if (hash_partitions_[i]->is_spilled()) continue;
459  int64_t mem = hash_partitions_[i]->build_rows()->bytes_in_mem(false);
460  if (hash_partitions_[i]->hash_tbl() != NULL) {
461  // IMPALA-1488: Do not spill partitions that already had matches, because we
462  // are going to lose information and return wrong results.
463  if (hash_partitions_[i]->hash_tbl()->HasMatches()) continue;
464  mem += hash_partitions_[i]->hash_tbl()->byte_size();
465  }
466  if (mem > max_freed_mem) {
467  max_freed_mem = mem;
468  partition_idx = i;
469  }
470  }
471 
472  if (partition_idx == -1) {
473  // Could not find a partition to spill. This means the mem limit was just too low.
475  }
476 
477  VLOG(2) << "Spilling partition: " << partition_idx << endl << NodeDebugString();
478  RETURN_IF_ERROR(hash_partitions_[partition_idx]->Spill(false));
479  DCHECK(hash_partitions_[partition_idx]->probe_rows()->has_write_block());
480  hash_tbls_[partition_idx] = NULL;
481  *spilled_partition = hash_partitions_[partition_idx];
482  return Status::OK;
483 }
484 
489  AllocateProbeFilters(state);
490 
491  // Do a full scan of child(1) and partition the rows.
492  RETURN_IF_ERROR(child(1)->Open(state));
494 
495  AttachProbeFilters(state);
497  return Status::OK;
498 }
499 
501  if (level >= MAX_PARTITION_DEPTH) {
503  status.SetErrorMsg(ErrorMsg(
504  TErrorCode::PARTITIONED_HASH_JOIN_MAX_PARTITION_DEPTH,
506  state->SetMemLimitExceeded();
507  return status;
508  }
509 
510  DCHECK(hash_partitions_.empty());
511  if (input_partition_ != NULL) {
512  DCHECK_NOTNULL(input_partition_->build_rows());
513  DCHECK_EQ(input_partition_->build_rows()->blocks_pinned(), 0) << NodeDebugString();
515  }
516 
517  for (int i = 0; i < PARTITION_FANOUT; ++i) {
518  hash_partitions_.push_back(pool_->Add(
519  new Partition(state, this, level, using_small_buffers_)));
521 
522  // Initialize a buffer for the probe here to make sure why have it if we need it.
523  // While this is not strictly necessary (there are some cases where we won't need this
524  // buffer), the benefit is low. Not grabbing this buffer means there is an additional
525  // buffer that could be used for the build side. However since this is only one
526  // buffer, there is only a small range of build input sizes where this is beneficial
527  // (an IO buffer size). It makes the logic much more complex to enable this
528  // optimization.
529  RETURN_IF_ERROR(hash_partitions_[i]->probe_rows()->Init(runtime_profile(), false));
530  }
531  COUNTER_ADD(partitions_created_, PARTITION_FANOUT);
533 
534  RowBatch build_batch(child(1)->row_desc(), state->batch_size(), mem_tracker());
535  bool eos = false;
536  int64_t total_build_rows = 0;
537  while (!eos) {
538  RETURN_IF_CANCELLED(state);
540  if (input_partition_ == NULL) {
541  // If we are still consuming batches from the build side.
542  RETURN_IF_ERROR(child(1)->GetNext(state, &build_batch, &eos));
543  COUNTER_ADD(build_row_counter_, build_batch.num_rows());
544  } else {
545  // If we are consuming batches that have already been partitioned.
546  RETURN_IF_ERROR(input_partition_->build_rows()->GetNext(&build_batch, &eos));
547  }
548  total_build_rows += build_batch.num_rows();
549 
551  if (process_build_batch_fn_ == NULL || ht_ctx_->level() != 0) {
552  RETURN_IF_ERROR(ProcessBuildBatch(&build_batch));
553  } else {
554  DCHECK_NOTNULL(process_build_batch_fn_level0_);
555  if (ht_ctx_->level() == 0) {
556  RETURN_IF_ERROR(process_build_batch_fn_level0_(this, &build_batch));
557  } else {
558  RETURN_IF_ERROR(process_build_batch_fn_(this, &build_batch));
559  }
560  }
561  build_batch.Reset();
562  DCHECK(!build_batch.AtCapacity());
563  }
564 
565  if (input_partition_ != NULL) {
566  // Done repartitioning build input, close it now.
569  }
570 
571  stringstream ss;
572  ss << "PHJ(node_id=" << id() << ") partitioned(level="
573  << hash_partitions_[0]->level_ << ") "
574  << total_build_rows << " rows into:" << endl;
575  for (int i = 0; i < hash_partitions_.size(); ++i) {
576  Partition* partition = hash_partitions_[i];
577  double percent =
578  partition->build_rows()->num_rows() * 100 / static_cast<double>(total_build_rows);
579  ss << " " << i << " " << (partition->is_spilled() ? "spilled" : "not spilled")
580  << " (fraction=" << fixed << setprecision(2) << percent << "%)" << endl
581  << " #rows:" << partition->build_rows()->num_rows() << endl;
582  COUNTER_SET(largest_partition_percent_, static_cast<int64_t>(percent));
583  }
584  VLOG(2) << ss.str();
585 
586  COUNTER_ADD(num_build_rows_partitioned_, total_build_rows);
587  non_empty_build_ |= (total_build_rows > 0);
589  return Status::OK;
590 }
591 
593  // TODO: Move this reset to blocking-join. Not yet though because of hash-join.
594  ResetForProbe();
595  return Status::OK;
596 }
597 
599  RuntimeState* state, RowBatch* out_batch) {
600  DCHECK(probe_batch_pos_ == probe_batch_->num_rows() || probe_batch_pos_ == -1);
601  do {
602  // Loop until we find a non-empty row batch.
603  probe_batch_->TransferResourceOwnership(out_batch);
604  if (out_batch->AtCapacity()) {
605  // This out batch is full. Need to return it before getting the next batch.
606  probe_batch_pos_ = -1;
607  return Status::OK;
608  }
609  if (probe_side_eos_) {
610  current_probe_row_ = NULL;
611  probe_batch_pos_ = -1;
612  return Status::OK;
613  }
616  } while (probe_batch_->num_rows() == 0);
617 
618  ResetForProbe();
619  return Status::OK;
620 }
621 
623  RuntimeState* state, RowBatch* out_batch) {
624  DCHECK(input_partition_ != NULL);
625  probe_batch_->TransferResourceOwnership(out_batch);
626  if (out_batch->AtCapacity()) {
627  // The out_batch has resources associated with it that will be recycled on the
628  // next call to GetNext() on the probe stream. Return this batch now.
629  probe_batch_pos_ = -1;
630  return Status::OK;
631  }
633  if (LIKELY(probe_rows->rows_returned() < probe_rows->num_rows())) {
634  // Continue from the current probe stream.
635  bool eos = false;
637  DCHECK_GT(probe_batch_->num_rows(), 0);
638  ResetForProbe();
639  } else {
640  // Done with this partition.
641  if (join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
642  join_op_ == TJoinOp::FULL_OUTER_JOIN) {
643  // In case of right-outer, right-anti and full-outer joins, we move this partition
644  // to the list of partitions that we need to output their unmatched build rows.
645  DCHECK(output_build_partitions_.empty());
646  DCHECK(input_partition_->hash_tbl_.get() != NULL);
648  input_partition_->hash_tbl_->FirstUnmatched(ht_ctx_.get());
650  } else {
651  // In any other case, just close the input partition.
652  input_partition_->Close(out_batch);
653  input_partition_ = NULL;
654  }
655  current_probe_row_ = NULL;
656  probe_batch_pos_ = -1;
657  }
658  return Status::OK;
659 }
660 
662  DCHECK(input_partition_ == NULL);
663  if (spilled_partitions_.empty()) return Status::OK;
664  VLOG(2) << "PrepareNextPartition\n" << NodeDebugString();
665 
667  spilled_partitions_.pop_front();
668  DCHECK(input_partition_->is_spilled());
669 
670  // Reserve one buffer to read the probe side.
672  ht_ctx_->set_level(input_partition_->level_);
673 
674  int64_t mem_limit = mem_tracker()->SpareCapacity();
675  // Try to build a hash table on top the spilled build rows.
676  bool built = false;
677  if (input_partition_->EstimatedInMemSize() < mem_limit) {
678  ht_ctx_->set_level(input_partition_->level_);
679  // TODO: We disable filter on spilled partitions, but perhaps we can revisit
680  // this, especially if the probe side is very big (e.g. has spilled as well).
681  RETURN_IF_ERROR(input_partition_->BuildHashTable(state, &built, false));
682  }
683 
684  if (!built) {
685  // This build partition still does not fit in memory, repartition.
686  DCHECK(input_partition_->is_spilled());
687  input_partition_->Spill(false);
688  ht_ctx_->set_level(input_partition_->level_ + 1);
689  int64_t num_input_rows = input_partition_->build_rows()->num_rows();
691 
692  // Check if there was any reduction in the size of partitions after repartitioning.
693  int64_t largest_partition = LargestSpilledPartition();
694  DCHECK_GE(num_input_rows, largest_partition) << "Cannot have a partition with "
695  "more rows than the input";
696  if (num_input_rows == largest_partition) {
698  status.AddDetail(Substitute("Cannot perform hash join at node with id $0. "
699  "Repartitioning did not reduce the size of a spilled partition. "
700  "Repartitioning level $1. Number of rows $2.",
701  id_, input_partition_->level_ + 1, num_input_rows));
702  state->SetMemLimitExceeded();
703  return status;
704  }
706  } else {
707  DCHECK(hash_partitions_.empty());
708  DCHECK(!input_partition_->is_spilled());
709  DCHECK_NOTNULL(input_partition_->hash_tbl());
710  // In this case, we did not have to partition the build again, we just built
711  // a hash table. This means the probe does not have to be partitioned either.
712  for (int i = 0; i < PARTITION_FANOUT; ++i) {
714  }
716  }
717 
720  return Status::OK;
721 }
722 
724  int64_t max_rows = 0;
725  for (int i = 0; i < hash_partitions_.size(); ++i) {
726  Partition* partition = hash_partitions_[i];
727  if (partition->is_spilled()) {
728  int64_t rows = partition->build_rows()->num_rows();
729  if (rows > max_rows) max_rows = rows;
730  }
731  }
732  return max_rows;
733 }
734 
736  bool* eos) {
737  SCOPED_TIMER(runtime_profile_->total_time_counter());
738  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
739  DCHECK(!out_batch->AtCapacity());
740 
741  if (ReachedLimit()) {
742  *eos = true;
743  return Status::OK;
744  } else {
745  *eos = false;
746  }
747 
748  while (true) {
749  DCHECK_NE(state_, PARTITIONING_BUILD) << "Should not be in GetNext()";
750  RETURN_IF_CANCELLED(state);
752 
753  if ((join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
754  join_op_ == TJoinOp::FULL_OUTER_JOIN) &&
755  !output_build_partitions_.empty()) {
756  // In case of right-outer, right-anti and full-outer joins, flush the remaining
757  // unmatched build rows of any partition we are done processing, before processing
758  // the next batch.
759  OutputUnmatchedBuild(out_batch);
760  if (!output_build_partitions_.empty()) break;
761 
762  // Finished to output unmatched build rows, move to next partition.
763  DCHECK(hash_partitions_.empty());
765  if (input_partition_ == NULL) {
766  *eos = true;
767  break;
768  }
769  if (out_batch->AtCapacity()) break;
770  }
771 
772  if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
773  // In this case, we want to output rows from the null aware partition.
774  if (null_aware_partition_ == NULL) {
775  *eos = true;
776  break;
777  }
778 
779  if (null_probe_output_idx_ >= 0) {
780  RETURN_IF_ERROR(OutputNullAwareNullProbe(state, out_batch));
781  if (out_batch->AtCapacity()) break;
782  continue;
783  }
784 
785  if (nulls_build_batch_.get() != NULL) {
786  RETURN_IF_ERROR(OutputNullAwareProbeRows(state, out_batch));
787  if (out_batch->AtCapacity()) break;
788  continue;
789  }
790  }
791 
792  // Finish up the current batch.
793  {
794  // Putting SCOPED_TIMER in ProcessProbeBatch() causes weird exception handling IR
795  // in the xcompiled function, so call it here instead.
796  int rows_added = 0;
798  if (process_probe_batch_fn_ == NULL || ht_ctx_->level() != 0) {
799  rows_added = ProcessProbeBatch(join_op_, out_batch, ht_ctx_.get());
800  } else {
801  DCHECK_NOTNULL(process_probe_batch_fn_level0_);
802  if (ht_ctx_->level() == 0) {
803  rows_added = process_probe_batch_fn_level0_(this, out_batch, ht_ctx_.get());
804  } else {
805  rows_added = process_probe_batch_fn_(this, out_batch, ht_ctx_.get());
806  }
807  }
808  if (UNLIKELY(rows_added < 0)) {
809  DCHECK(!status_.ok());
810  return status_;
811  }
812  out_batch->CommitRows(rows_added);
813  num_rows_returned_ += rows_added;
815  }
816  if (out_batch->AtCapacity() || ReachedLimit()) break;
817  DCHECK(current_probe_row_ == NULL);
818 
819  // Try to continue from the current probe side input.
820  if (input_partition_ == NULL) {
821  RETURN_IF_ERROR(NextProbeRowBatch(state, out_batch));
822  } else {
823  RETURN_IF_ERROR(NextSpilledProbeRowBatch(state, out_batch));
824  }
825 
826  // We want to return as soon as we have attached a tuple stream to the out_batch
827  // (before preparing a new partition). The attached tuple stream will be recycled
828  // by the caller, freeing up more memory when we prepare the next partition.
829  if (out_batch->AtCapacity()) break;
830 
831  // Got a batch, just keep going.
832  if (probe_batch_pos_ == 0) continue;
833  DCHECK_EQ(probe_batch_pos_, -1);
834 
835  // Finished up all probe rows for hash_partitions_.
837  if (out_batch->AtCapacity()) break;
838 
839  if ((join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
840  join_op_ == TJoinOp::FULL_OUTER_JOIN) &&
841  !output_build_partitions_.empty()) {
842  // There are some partitions that need to flush their unmatched build rows.
843  continue;
844  }
845  // Move onto the next partition.
847 
848  if (input_partition_ == NULL) {
849  if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
851  }
852  if (null_aware_partition_ == NULL) {
853  *eos = true;
854  break;
855  } else {
856  *eos = false;
857  }
858  }
859  }
860 
861  if (ReachedLimit()) *eos = true;
862  return Status::OK;
863 }
864 
867  DCHECK(join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
868  join_op_ == TJoinOp::FULL_OUTER_JOIN);
869  DCHECK(!output_build_partitions_.empty());
871  const int num_conjuncts = conjunct_ctxs_.size();
872  TupleRow* out_row = out_batch->GetRow(out_batch->AddRow());
873  const int max_rows = out_batch->capacity() - out_batch->num_rows();
874  int num_rows_added = 0;
875 
876  while (!out_batch->AtCapacity() && !hash_tbl_iterator_.AtEnd() &&
877  num_rows_added < max_rows) {
878  // Output remaining unmatched build rows.
879  if (!hash_tbl_iterator_.IsMatched()) {
880  TupleRow* build_row = hash_tbl_iterator_.GetRow();
881  DCHECK_NOTNULL(build_row);
882  if (join_op_ == TJoinOp::RIGHT_ANTI_JOIN) {
883  out_batch->CopyRow(build_row, out_row);
884  } else {
885  CreateOutputRow(out_row, NULL, build_row);
886  }
887  if (ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) {
888  ++num_rows_added;
889  out_row = out_row->next_row(out_batch);
890  }
892  }
893  // Move to the next unmatched entry.
895  }
896 
897  // If we reached the end of the hash table, then there are no other unmatched build
898  // rows for this partition. In that case we need to close the partition, and move to
899  // the next. If we have not reached the end of the hash table, it means that we reached
900  // out_batch capacity and we need to continue to output unmatched build rows, without
901  // closing the partition.
902  if (hash_tbl_iterator_.AtEnd()) {
903  output_build_partitions_.front()->Close(out_batch);
904  output_build_partitions_.pop_front();
905  // Move to the next partition to output unmatched rows.
906  if (!output_build_partitions_.empty()) {
908  output_build_partitions_.front()->hash_tbl()->FirstUnmatched(ht_ctx_.get());
909  }
910  }
911 
912  DCHECK_LE(num_rows_added, max_rows);
913  out_batch->CommitRows(num_rows_added);
914  num_rows_returned_ += num_rows_added;
916 }
917 
919  DCHECK_EQ(null_probe_output_idx_, -1);
921  DCHECK_EQ(probe_batch_->num_rows(), 0);
922  probe_batch_pos_ = 0;
924  return Status::OK;
925 }
926 
928  RowBatch* out_batch) {
929  DCHECK_NOTNULL(null_aware_partition_);
930  DCHECK(nulls_build_batch_.get() == NULL);
931  DCHECK_NE(probe_batch_pos_, -1);
932 
933  if (probe_batch_pos_ == probe_batch_->num_rows()) {
934  probe_batch_pos_ = 0;
935  probe_batch_->TransferResourceOwnership(out_batch);
936  if (out_batch->AtCapacity()) return Status::OK;
937  bool eos;
939  if (probe_batch_->num_rows() == 0) {
940  // All done.
941  null_aware_partition_->Close(out_batch);
942  null_aware_partition_ = NULL;
943  return Status::OK;
944  }
945  }
946 
947  for (; probe_batch_pos_ < probe_batch_->num_rows();
949  if (out_batch->AtCapacity()) break;
951  TupleRow* out_row = out_batch->GetRow(out_batch->AddRow());
952  out_batch->CopyRow(probe_batch_->GetRow(probe_batch_pos_), out_row);
953  out_batch->CommitLastRow();
954  }
955 
956  return Status::OK;
957 }
958 
959 // In this case we had a lot of NULLs on either the build/probe side. While this is
960 // possible to process by re-reading the spilled streams for each row with minimal code
961 // effort, this would behave very slowly (we'd need to do IO for each row). This seems
962 // like a reasonable limitation for now.
963 // TODO: revisit.
964 static Status NullAwareAntiJoinError(bool build) {
965  return Status(Substitute("Unable to perform Null-Aware Anti-Join. There are too"
966  " many NULLs on the $0 side to perform this join.", build ? "build" : "probe"));
967 }
968 
970  DCHECK_NOTNULL(null_aware_partition_);
971  DCHECK(nulls_build_batch_.get() == NULL);
972  DCHECK_EQ(probe_batch_pos_, -1);
973  DCHECK_EQ(probe_batch_->num_rows(), 0);
974 
977 
978  if (build_stream->num_rows() == 0) {
979  // There were no build rows. Nothing to do. Just prepare to output the null
980  // probe rows.
981  DCHECK_EQ(probe_stream->num_rows(), 0);
982  nulls_build_batch_.reset();
984  return Status::OK;
985  }
986 
987  // Bring the entire spilled build stream into memory and read into a single batch.
988  bool got_rows;
989  RETURN_IF_ERROR(build_stream->GetRows(&nulls_build_batch_, &got_rows));
990  if (!got_rows) return NullAwareAntiJoinError(true);
991 
992  // Initialize the streams for read.
993  RETURN_IF_ERROR(probe_stream->PrepareForRead());
994  probe_batch_pos_ = 0;
995  return Status::OK;
996 }
997 
999  RowBatch* out_batch) {
1000  DCHECK_NOTNULL(null_aware_partition_);
1001  DCHECK(nulls_build_batch_.get() != NULL);
1002 
1003  ExprContext* const* join_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
1004  int num_join_conjuncts = other_join_conjunct_ctxs_.size();
1005  DCHECK_NOTNULL(probe_batch_.get());
1006 
1008  if (probe_batch_pos_ == probe_batch_->num_rows()) {
1009  probe_batch_pos_ = 0;
1010  probe_batch_->TransferResourceOwnership(out_batch);
1011  if (out_batch->AtCapacity()) return Status::OK;
1012 
1013  // Get the next probe batch.
1014  bool eos;
1015  RETURN_IF_ERROR(probe_stream->GetNext(probe_batch_.get(), &eos));
1016 
1017  if (probe_batch_->num_rows() == 0) {
1019  nulls_build_batch_.reset();
1021  return Status::OK;
1022  }
1023  }
1024 
1025  // For each probe row, iterate over all the build rows and check for rows
1026  // that did not have any matches.
1027  for (; probe_batch_pos_ < probe_batch_->num_rows(); ++probe_batch_pos_) {
1028  if (out_batch->AtCapacity()) break;
1029  TupleRow* probe_row = probe_batch_->GetRow(probe_batch_pos_);
1030 
1031  bool matched = false;
1032  for (int i = 0; i < nulls_build_batch_->num_rows(); ++i) {
1035  join_conjunct_ctxs, num_join_conjuncts, semi_join_staging_row_)) {
1036  matched = true;
1037  break;
1038  }
1039  }
1040 
1041  if (!matched) {
1042  TupleRow* out_row = out_batch->GetRow(out_batch->AddRow());
1043  out_batch->CopyRow(probe_row, out_row);
1044  out_batch->CommitLastRow();
1045  }
1046  }
1047  return Status::OK;
1048 }
1049 
1050 // When this function is called, we've finished processing the current build input
1051 // (either from child(1) or from repartitioning a spilled partition). The build rows
1052 // have only been partitioned, we still need to build hash tables over them. Some
1053 // of the partitions could have already been spilled and attempting to build hash
1054 // tables over the non-spilled ones can cause them to spill.
1055 //
1056 // At the end of the function we'd like all partitions to either have a hash table
1057 // (and therefore not spilled) or be spilled. Partitions that have a hash table don't
1058 // need to spill on the probe side.
1059 //
1060 // This maps perfectly to a 0-1 knapsack where the weight is the memory to keep the
1061 // build rows and hash table and the value is the expected IO savings.
1062 // For now, we go with a greedy solution.
1063 //
1064 // TODO: implement the knapsack solution.
1066  DCHECK_EQ(hash_partitions_.size(), PARTITION_FANOUT);
1067 
1068  // Decide whether probe filters will be built.
1069  if (input_partition_ == NULL && can_add_probe_filters_) {
1070  uint64_t num_build_rows = 0;
1071  BOOST_FOREACH(Partition* partition, hash_partitions_) {
1072  const uint64_t partition_num_rows = partition->build_rows()->num_rows();
1073  num_build_rows += partition_num_rows;
1074  }
1075  // TODO: Using this simple heuristic where we compare the number of build rows
1076  // to the size of the slot filter bitmap. This is essentially not a Bloom filter
1077  // but a 1-1 filter, and probably it is missing oportunities. Should revisit.
1078  if (num_build_rows >= state->slot_filter_bitmap_size()) {
1079  can_add_probe_filters_ = false;
1080  VLOG(2) << "Disabling probe filter push down because build side is too large: "
1081  << num_build_rows;
1082  }
1083  } else {
1084  can_add_probe_filters_ = false;
1085  }
1086 
1087  // First loop over the partitions and build hash tables for the partitions that did
1088  // not already spill.
1089  BOOST_FOREACH(Partition* partition, hash_partitions_) {
1090  if (partition->build_rows()->num_rows() == 0) {
1091  // This partition is empty, no need to do anything else.
1092  partition->Close(NULL);
1093  continue;
1094  }
1095 
1096  if (!partition->is_spilled()) {
1097  bool built = false;
1098  DCHECK(partition->build_rows()->is_pinned());
1099  RETURN_IF_ERROR(partition->BuildHashTable(state, &built, can_add_probe_filters_));
1100  // If we did not have enough memory to build this hash table, we need to spill this
1101  // partition (clean up the hash table, unpin build).
1102  if (!built) RETURN_IF_ERROR(partition->Spill(true));
1103  }
1104  }
1105 
1106  // Collect all the spilled partitions that don't have an IO buffer. We need to reserve
1107  // an IO buffer for those partitions. Reserving an IO buffer can cause more partitions
1108  // to spill so this process is recursive.
1109  list<Partition*> spilled_partitions;
1110  BOOST_FOREACH(Partition* partition, hash_partitions_) {
1111  if (partition->is_closed()) continue;
1112  if (partition->is_spilled() && partition->probe_rows()->using_small_buffers()) {
1113  spilled_partitions.push_back(partition);
1114  }
1115  }
1116  while (!spilled_partitions.empty()) {
1117  Partition* partition = spilled_partitions.front();
1118  spilled_partitions.pop_front();
1119 
1120  while (true) {
1121  bool got_buffer;
1122  RETURN_IF_ERROR(partition->probe_rows()->SwitchToIoBuffers(&got_buffer));
1123  if (got_buffer) break;
1124  Partition* spilled_partition;
1125  RETURN_IF_ERROR(SpillPartition(&spilled_partition));
1126  DCHECK(spilled_partition->is_spilled());
1127  if (spilled_partition->probe_rows()->using_small_buffers()) {
1128  spilled_partitions.push_back(spilled_partition);
1129  }
1130  }
1131 
1132  DCHECK(partition->probe_rows()->has_write_block());
1133  DCHECK(!partition->probe_rows()->using_small_buffers());
1134  }
1135 
1136  // At this point, the partition is in one of these states:
1137  // 1. closed. All done, no buffers in either the build or probe stream.
1138  // 2. in_mem. The build side is pinned and has a hash table built.
1139  // 3. spilled. The build side is fully unpinned and the probe side has an io
1140  // sized buffer.
1141  BOOST_FOREACH(Partition* partition, hash_partitions_) {
1142  if (partition->hash_tbl() != NULL) partition->probe_rows()->Close();
1143  }
1144 
1145  // TODO: at this point we could have freed enough memory to pin and build some
1146  // spilled partitions. This can happen, for example is there is a lot of skew.
1147  // Partition 1: 10GB (pinned initially).
1148  // Partition 2,3,4: 1GB (spilled during partitioning the build).
1149  // In the previous step, we could have unpinned 10GB (because there was not enough
1150  // memory to build a hash table over it) which can now free enough memory to
1151  // build hash tables over the remaining 3 partitions.
1152  // We start by spilling the largest partition though so the build input would have
1153  // to be pretty pathological.
1154  // Investigate if this is worthwhile.
1155 
1156  // Initialize the hash_tbl_ caching array.
1157  for (int i = 0; i < PARTITION_FANOUT; ++i) {
1158  hash_tbls_[i] = hash_partitions_[i]->hash_tbl();
1159  }
1160  return Status::OK;
1161 }
1162 
1164  if (null_probe_rows_ == NULL || null_probe_rows_->num_rows() == 0) {
1165  return Status::OK;
1166  }
1167  DCHECK_EQ(null_probe_rows_->num_rows(), matched_null_probe_.size());
1168 
1169  // Bring both the build and probe side into memory and do a pairwise evaluation.
1170  bool got_rows;
1171  scoped_ptr<RowBatch> build_rows;
1172  RETURN_IF_ERROR(build->GetRows(&build_rows, &got_rows));
1173  if (!got_rows) return NullAwareAntiJoinError(true);
1174  scoped_ptr<RowBatch> probe_rows;
1175  RETURN_IF_ERROR(null_probe_rows_->GetRows(&probe_rows, &got_rows));
1176  if (!got_rows) return NullAwareAntiJoinError(false);
1177 
1178  ExprContext* const* join_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
1179  int num_join_conjuncts = other_join_conjunct_ctxs_.size();
1180 
1181  DCHECK_LE(probe_rows->num_rows(), matched_null_probe_.size());
1182  // For each row, iterate over all rows in the build table.
1184  for (int i = 0; i < probe_rows->num_rows(); ++i) {
1185  if (matched_null_probe_[i]) continue;
1186  for (int j = 0; j < build_rows->num_rows(); ++j) {
1187  CreateOutputRow(semi_join_staging_row_, probe_rows->GetRow(i),
1188  build_rows->GetRow(j));
1190  join_conjunct_ctxs, num_join_conjuncts, semi_join_staging_row_)) {
1191  matched_null_probe_[i] = true;
1192  break;
1193  }
1194  }
1195  }
1196 
1197  return Status::OK;
1198 }
1199 
1201  DCHECK(using_small_buffers_);
1202 
1203  // Partitions that are using small buffers need to switch to IO sized buffers. We want
1204  // this to happen to all partitions at the same time to ensure that all partitions can
1205  // get at least 1 io buffer (at which point then can append indefinitely). This
1206  // initial buffer is guaranteed by the reservation for this operator.
1207  // TODO: this mechanism sucks. Redo.
1208  for (int i = 0; i < hash_partitions_.size(); ++i) {
1209  if (hash_partitions_[i]->is_closed()) continue;
1210  DCHECK(hash_partitions_[i]->build_rows()->using_small_buffers());
1211  DCHECK(hash_partitions_[i]->probe_rows()->using_small_buffers());
1212  bool got_buffer;
1213  RETURN_IF_ERROR(hash_partitions_[i]->build_rows()->SwitchToIoBuffers(&got_buffer));
1214  if (got_buffer) {
1215  RETURN_IF_ERROR(hash_partitions_[i]->probe_rows()->SwitchToIoBuffers(&got_buffer));
1216  }
1217  if (!got_buffer) {
1219  status.AddDetail("Not enough memory to get the minimum required buffers for "
1220  "join.");
1221  return status;
1222  }
1223  }
1224  using_small_buffers_ = false;
1225  return Status::OK;
1226 }
1227 
1229  DCHECK_EQ(probe_batch_pos_, -1);
1230  // At this point all the rows have been read from the probe side for all partitions in
1231  // hash_partitions_.
1232  VLOG(2) << "Probe Side Consumed\n" << NodeDebugString();
1233 
1234  // Walk the partitions that had hash tables built for the probe phase and close them.
1235  // In the case of right outer and full outer joins, instead of closing those partitions,
1236  // add them to the list of partitions that need to output any unmatched build rows.
1237  // This partition will be closed by the function that actually outputs unmatched build
1238  // rows.
1239  for (int i = 0; i < hash_partitions_.size(); ++i) {
1240  Partition* partition = hash_partitions_[i];
1241  if (partition->is_closed()) continue;
1242  if (partition->is_spilled()) {
1243  DCHECK(partition->hash_tbl() == NULL) << NodeDebugString();
1244  // Unpin the build and probe stream to free up more memory. We need to free all
1245  // memory so we can recurse the algorithm and create new hash partitions from
1246  // spilled partitions.
1247  RETURN_IF_ERROR(partition->build_rows()->UnpinStream(true));
1248  RETURN_IF_ERROR(partition->probe_rows()->UnpinStream(true));
1249 
1250  // Push new created partitions at the front. This means a depth first walk
1251  // (more finely partitioned partitions are processed first). This allows us
1252  // to delete blocks earlier and bottom out the recursion earlier.
1253  spilled_partitions_.push_front(partition);
1254  } else {
1255  DCHECK_EQ(partition->probe_rows()->num_rows(), 0)
1256  << "No probe rows should have been spilled for this partition.";
1257  if (join_op_ == TJoinOp::RIGHT_OUTER_JOIN ||
1258  join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
1259  join_op_ == TJoinOp::FULL_OUTER_JOIN) {
1260  if (output_build_partitions_.empty()) {
1261  hash_tbl_iterator_ = partition->hash_tbl_->FirstUnmatched(ht_ctx_.get());
1262  }
1263  output_build_partitions_.push_back(partition);
1264  } else if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
1265  // For NAAJ, we need to try to match all the NULL probe rows with this partition
1266  // before closing it. The NULL probe rows could have come from any partition
1267  // so we collect them all and match them at the end.
1269  partition->Close(batch);
1270  } else {
1271  partition->Close(batch);
1272  }
1273  }
1274  }
1275 
1276  // Just finished evaluating the null probe rows with all the non-spilled build
1277  // partitions. Unpin this now to free this memory for repartitioning.
1279 
1280  hash_partitions_.clear();
1281  input_partition_ = NULL;
1282  return Status::OK;
1283 }
1284 
1285 void PartitionedHashJoinNode::AddToDebugString(int indent, stringstream* out) const {
1286  *out << " hash_tbl=";
1287  *out << string(indent * 2, ' ');
1288  *out << "HashTbl("
1289  << " build_exprs=" << Expr::DebugString(build_expr_ctxs_)
1290  << " probe_exprs=" << Expr::DebugString(probe_expr_ctxs_);
1291  *out << ")";
1292 }
1293 
1295  state_ = s;
1296  VLOG(2) << "Transitioned State:" << endl << NodeDebugString();
1297 }
1298 
1300  switch (state_) {
1301  case PARTITIONING_BUILD: return "PartitioningBuild";
1302  case PROCESSING_PROBE: return "ProcessingProbe";
1303  case PROBING_SPILLED_PARTITION: return "ProbingSpilledPartitions";
1304  case REPARTITIONING: return "Repartioning";
1305  default: DCHECK(false);
1306  }
1307  return "";
1308 }
1309 
1311  stringstream ss;
1312  ss << "PartitionedHashJoinNode (id=" << id() << " op=" << join_op_
1313  << " state=" << PrintState()
1314  << " #partitions=" << hash_partitions_.size()
1315  << " #spilled_partitions=" << spilled_partitions_.size()
1316  << ")" << endl;
1317 
1318  for (int i = 0; i < hash_partitions_.size(); ++i) {
1319  ss << i << ": ptr=" << hash_partitions_[i];
1320  Partition* partition = hash_partitions_[i];
1321  if (partition->is_closed()) {
1322  ss << " closed" << endl;
1323  continue;
1324  }
1325  ss << endl
1326  << " "
1327  << (partition->is_spilled() ? "(Spilled)" : "")
1328  << " Build Rows: " << partition->build_rows()->num_rows()
1329  << " (Blocks pinned: " << partition->build_rows()->blocks_pinned() << ")"
1330  << endl;
1331  if (partition->hash_tbl() != NULL) {
1332  ss << " Hash Table Rows: " << partition->hash_tbl()->size() << endl;
1333  }
1334  ss << " (Spilled) Probe Rows: " << partition->probe_rows()->num_rows()
1335  << " (Blocks pinned: " << partition->probe_rows()->blocks_pinned() << ")"
1336  << endl;
1337  }
1338 
1339  if (!spilled_partitions_.empty()) {
1340  ss << "SpilledPartitions" << endl;
1341  for (list<Partition*>::const_iterator it = spilled_partitions_.begin();
1342  it != spilled_partitions_.end(); ++it) {
1343  DCHECK((*it)->is_spilled());
1344  DCHECK((*it)->hash_tbl() == NULL);
1345  ss << " Partition=" << *it << endl
1346  << " Spilled Build Rows: "
1347  << (*it)->build_rows()->num_rows() << endl
1348  << " Spilled Probe Rows: "
1349  << (*it)->probe_rows()->num_rows() << endl;
1350  }
1351  }
1352  return ss.str();
1353 }
1354 
1355 // For a left outer join, the IR looks like:
1356 // define void @CreateOutputRow(%"class.impala::BlockingJoinNode"* %this_ptr,
1357 // %"class.impala::TupleRow"* %out_arg,
1358 // %"class.impala::TupleRow"* %probe_arg,
1359 // %"class.impala::TupleRow"* %build_arg) #20 {
1360 // entry:
1361 // %out = bitcast %"class.impala::TupleRow"* %out_arg to i8**
1362 // %probe = bitcast %"class.impala::TupleRow"* %probe_arg to i8**
1363 // %build = bitcast %"class.impala::TupleRow"* %build_arg to i8**
1364 // %0 = bitcast i8** %out to i8*
1365 // %1 = bitcast i8** %probe to i8*
1366 // call void @llvm.memcpy.p0i8.p0i8.i32(i8* %0, i8* %1, i32 8, i32 0, i1 false)
1367 // %build_dst_ptr = getelementptr i8** %out, i32 1
1368 // %is_build_null = icmp eq i8** %build, null
1369 // br i1 %is_build_null, label %build_null, label %build_not_null
1370 //
1371 // build_not_null: ; preds = %entry
1372 // %2 = bitcast i8** %build_dst_ptr to i8*
1373 // %3 = bitcast i8** %build to i8*
1374 // call void @llvm.memcpy.p0i8.p0i8.i32(i8* %2, i8* %3, i32 8, i32 0, i1 false)
1375 // ret void
1376 //
1377 // build_null: ; preds = %entry
1378 // %dst_tuple_ptr = getelementptr i8** %out, i32 1
1379 // store i8* null, i8** %dst_tuple_ptr
1380 // ret void
1381 // }
1383  Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
1384  DCHECK(tuple_row_type != NULL);
1385  PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
1386 
1387  Type* this_type = codegen->GetType(BlockingJoinNode::LLVM_CLASS_NAME);
1388  DCHECK(this_type != NULL);
1389  PointerType* this_ptr_type = PointerType::get(this_type, 0);
1390 
1391  // TupleRows are really just an array of pointers. Easier to work with them
1392  // this way.
1393  PointerType* tuple_row_working_type = PointerType::get(codegen->ptr_type(), 0);
1394 
1395  // Construct function signature to match CreateOutputRow()
1396  LlvmCodeGen::FnPrototype prototype(codegen, "CreateOutputRow", codegen->void_type());
1397  prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
1398  prototype.AddArgument(LlvmCodeGen::NamedVariable("out_arg", tuple_row_ptr_type));
1399  prototype.AddArgument(LlvmCodeGen::NamedVariable("probe_arg", tuple_row_ptr_type));
1400  prototype.AddArgument(LlvmCodeGen::NamedVariable("build_arg", tuple_row_ptr_type));
1401 
1402  LLVMContext& context = codegen->context();
1403  LlvmCodeGen::LlvmBuilder builder(context);
1404  Value* args[4];
1405  Function* fn = prototype.GeneratePrototype(&builder, args);
1406  Value* out_row_arg = builder.CreateBitCast(args[1], tuple_row_working_type, "out");
1407  Value* probe_row_arg = builder.CreateBitCast(args[2], tuple_row_working_type, "probe");
1408  Value* build_row_arg = builder.CreateBitCast(args[3], tuple_row_working_type, "build");
1409 
1410  int num_probe_tuples = child(0)->row_desc().tuple_descriptors().size();
1411  int num_build_tuples = child(1)->row_desc().tuple_descriptors().size();
1412 
1413  // Copy probe row
1414  codegen->CodegenMemcpy(&builder, out_row_arg, probe_row_arg, probe_tuple_row_size_);
1415  Value* build_row_idx[] = { codegen->GetIntConstant(TYPE_INT, num_probe_tuples) };
1416  Value* build_row_dst = builder.CreateGEP(out_row_arg, build_row_idx, "build_dst_ptr");
1417 
1418  // Copy build row.
1419  BasicBlock* build_not_null_block = BasicBlock::Create(context, "build_not_null", fn);
1420  BasicBlock* build_null_block = NULL;
1421 
1422  if (join_op_ == TJoinOp::LEFT_ANTI_JOIN || join_op_ == TJoinOp::LEFT_OUTER_JOIN ||
1423  join_op_ == TJoinOp::FULL_OUTER_JOIN ||
1424  join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
1425  // build tuple can be null
1426  build_null_block = BasicBlock::Create(context, "build_null", fn);
1427  Value* is_build_null = builder.CreateIsNull(build_row_arg, "is_build_null");
1428  builder.CreateCondBr(is_build_null, build_null_block, build_not_null_block);
1429 
1430  // Set tuple build ptrs to NULL
1431  // TODO: this should be replaced with memset() but I can't get the llvm intrinsic
1432  // to work.
1433  builder.SetInsertPoint(build_null_block);
1434  for (int i = 0; i < num_build_tuples; ++i) {
1435  Value* array_idx[] =
1436  { codegen->GetIntConstant(TYPE_INT, i + num_probe_tuples) };
1437  Value* dst = builder.CreateGEP(out_row_arg, array_idx, "dst_tuple_ptr");
1438  builder.CreateStore(codegen->null_ptr_value(), dst);
1439  }
1440  builder.CreateRetVoid();
1441  } else {
1442  // build row can't be NULL
1443  builder.CreateBr(build_not_null_block);
1444  }
1445 
1446  // Copy build tuple ptrs
1447  builder.SetInsertPoint(build_not_null_block);
1448  codegen->CodegenMemcpy(&builder, build_row_dst, build_row_arg, build_tuple_row_size_);
1449  builder.CreateRetVoid();
1450 
1451  return codegen->FinalizeFunction(fn);
1452 }
1453 
1455  RuntimeState* state, Function* hash_fn, Function* murmur_hash_fn) {
1456  LlvmCodeGen* codegen;
1457  if (!state->GetCodegen(&codegen).ok()) return false;
1458  // Get cross compiled function
1459  Function* process_build_batch_fn =
1460  codegen->GetFunction(IRFunction::PHJ_PROCESS_BUILD_BATCH);
1461  DCHECK(process_build_batch_fn != NULL);
1462 
1463  // Codegen for evaluating build rows
1464  Function* eval_row_fn = ht_ctx_->CodegenEvalRow(state, true);
1465  if (eval_row_fn == NULL) return false;
1466 
1467  int replaced = 0;
1468  // Replace call sites
1469  process_build_batch_fn = codegen->ReplaceCallSites(process_build_batch_fn, false,
1470  eval_row_fn, "EvalBuildRow", &replaced);
1471  DCHECK_EQ(replaced, 1);
1472 
1473  // process_build_batch_fn_level0 uses CRC hash if available,
1474  // process_build_batch_fn uses murmur
1475  Function* process_build_batch_fn_level0 = codegen->ReplaceCallSites(
1476  process_build_batch_fn, false, hash_fn, "HashCurrentRow", &replaced);
1477  DCHECK_EQ(replaced, 1);
1478 
1479  process_build_batch_fn = codegen->ReplaceCallSites(
1480  process_build_batch_fn, true, murmur_hash_fn, "HashCurrentRow", &replaced);
1481  DCHECK_EQ(replaced, 1);
1482 
1483  // Finalize ProcessBuildBatch functions
1484  process_build_batch_fn = codegen->OptimizeFunctionWithExprs(process_build_batch_fn);
1485  if (process_build_batch_fn == NULL) return false;
1486  process_build_batch_fn_level0 =
1487  codegen->OptimizeFunctionWithExprs(process_build_batch_fn_level0);
1488  if (process_build_batch_fn_level0 == NULL) return false;
1489 
1490  // Register native function pointers
1491  codegen->AddFunctionToJit(process_build_batch_fn,
1492  reinterpret_cast<void**>(&process_build_batch_fn_));
1493  codegen->AddFunctionToJit(process_build_batch_fn_level0,
1494  reinterpret_cast<void**>(&process_build_batch_fn_level0_));
1495  return true;
1496 }
1497 
1499  RuntimeState* state, Function* hash_fn, Function* murmur_hash_fn) {
1500  LlvmCodeGen* codegen;
1501  if (!state->GetCodegen(&codegen).ok()) return false;
1502 
1503  // Get cross compiled function
1504  IRFunction::Type ir_fn;
1505  switch (join_op_) {
1506  case TJoinOp::INNER_JOIN:
1507  ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_INNER_JOIN;
1508  break;
1509  case TJoinOp::LEFT_OUTER_JOIN:
1510  ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_LEFT_OUTER_JOIN;
1511  break;
1512  case TJoinOp::LEFT_SEMI_JOIN:
1513  ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_LEFT_SEMI_JOIN;
1514  break;
1515  case TJoinOp::LEFT_ANTI_JOIN:
1516  ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_LEFT_ANTI_JOIN;
1517  break;
1518  case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN:
1519  ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_NULL_AWARE_LEFT_ANTI_JOIN;
1520  break;
1521  case TJoinOp::RIGHT_OUTER_JOIN:
1522  ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_RIGHT_OUTER_JOIN;
1523  break;
1524  case TJoinOp::RIGHT_SEMI_JOIN:
1525  ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_RIGHT_SEMI_JOIN;
1526  break;
1527  case TJoinOp::RIGHT_ANTI_JOIN:
1528  ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_RIGHT_ANTI_JOIN;
1529  break;
1530  case TJoinOp::FULL_OUTER_JOIN:
1531  ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_FULL_OUTER_JOIN;
1532  break;
1533  default:
1534  DCHECK(false);
1535  return false;
1536  }
1537  Function* process_probe_batch_fn = codegen->GetFunction(ir_fn);
1538  DCHECK(process_probe_batch_fn != NULL);
1539 
1540  // Clone process_probe_batch_fn so we don't clobber the original for other join nodes
1541  process_probe_batch_fn = codegen->CloneFunction(process_probe_batch_fn);
1542  process_probe_batch_fn->setName("ProcessProbeBatch");
1543 
1544  // Since ProcessProbeBatch() is a templated function, it has linkonce_odr linkage, which
1545  // means the function can be removed if it's not referenced. Change to weak_odr, which
1546  // has the same semantics except it can't be removed.
1547  // See http://llvm.org/docs/LangRef.html#linkage-types
1548  DCHECK(process_probe_batch_fn->getLinkage() == GlobalValue::LinkOnceODRLinkage)
1549  << LlvmCodeGen::Print(process_probe_batch_fn);
1550  process_probe_batch_fn->setLinkage(GlobalValue::WeakODRLinkage);
1551 
1552  // Bake in %this pointer argument to process_probe_batch_fn.
1553  Value* this_arg = codegen->GetArgument(process_probe_batch_fn, 0);
1554  Value* this_loc = codegen->CastPtrToLlvmPtr(this_arg->getType(), this);
1555  this_arg->replaceAllUsesWith(this_loc);
1556 
1557  // Bake in %ht_ctx pointer argument to process_probe_batch_fn
1558  Value* ht_ctx_arg = codegen->GetArgument(process_probe_batch_fn, 2);
1559  Value* ht_ctx_loc = codegen->CastPtrToLlvmPtr(ht_ctx_arg->getType(), ht_ctx_.get());
1560  ht_ctx_arg->replaceAllUsesWith(ht_ctx_loc);
1561 
1562  // Codegen HashTable::Equals
1563  Function* equals_fn = ht_ctx_->CodegenEquals(state);
1564  if (equals_fn == NULL) return false;
1565 
1566  // Codegen for evaluating probe rows
1567  Function* eval_row_fn = ht_ctx_->CodegenEvalRow(state, false);
1568  if (eval_row_fn == NULL) return false;
1569 
1570  // Codegen CreateOutputRow
1571  Function* create_output_row_fn = CodegenCreateOutputRow(codegen);
1572  if (create_output_row_fn == NULL) return false;
1573 
1574  // Codegen evaluating other join conjuncts
1575  Function* eval_other_conjuncts_fn = ExecNode::CodegenEvalConjuncts(
1576  state, other_join_conjunct_ctxs_, "EvalOtherConjuncts");
1577  if (eval_other_conjuncts_fn == NULL) return false;
1578 
1579  // Codegen evaluating conjuncts
1580  Function* eval_conjuncts_fn = ExecNode::CodegenEvalConjuncts(state, conjunct_ctxs_);
1581  if (eval_conjuncts_fn == NULL) return false;
1582 
1583  // Replace all call sites with codegen version
1584  int replaced = 0;
1585  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, true,
1586  eval_row_fn, "EvalProbeRow", &replaced);
1587  DCHECK_EQ(replaced, 1);
1588 
1589  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, true,
1590  create_output_row_fn, "CreateOutputRow", &replaced);
1591  DCHECK(replaced == 1 || replaced == 2) << replaced; // Depends on join_op_
1592 
1593  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, true,
1594  eval_conjuncts_fn, "EvalConjuncts", &replaced);
1595  // Depends on join_op_:
1596  // INNER_JOIN -> 1
1597  // LEFT_OUTER_JOIN -> 2
1598  // LEFT_SEMI_JOIN -> 1
1599  // LEFT_ANTI_JOIN/NULL_AWARE_ANTI_JOIN -> 2
1600  // RIGHT_OUTER_JOIN -> 1
1601  // RIGHT_SEMI_JOIN -> 1
1602  // RIGHT_ANTI_JOIN -> 0
1603  // FULL_OUTER_JOIN -> 2
1604  // CROSS_JOIN -> N/A
1605  DCHECK(replaced == 0 || replaced == 1 || replaced == 2) << replaced;
1606 
1607  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, true,
1608  eval_other_conjuncts_fn, "EvalOtherJoinConjuncts", &replaced);
1609  DCHECK_EQ(replaced, 1);
1610 
1611  process_probe_batch_fn = codegen->ReplaceCallSites(process_probe_batch_fn, true,
1612  equals_fn, "Equals", &replaced);
1613  // Depends on join_op_
1614  DCHECK(replaced == 1 || replaced == 2 || replaced == 3 || replaced == 4) << replaced;
1615 
1616  // process_probe_batch_fn_level0 uses CRC hash if available,
1617  // process_probe_batch_fn uses murmur
1618  Function* process_probe_batch_fn_level0 = codegen->ReplaceCallSites(
1619  process_probe_batch_fn, false, hash_fn, "HashCurrentRow", &replaced);
1620  DCHECK_EQ(replaced, 1);
1621 
1622  process_probe_batch_fn = codegen->ReplaceCallSites(
1623  process_probe_batch_fn, true, murmur_hash_fn, "HashCurrentRow", &replaced);
1624  DCHECK_EQ(replaced, 1);
1625 
1626  // Finalize ProcessProbeBatch functions
1627  process_probe_batch_fn = codegen->OptimizeFunctionWithExprs(process_probe_batch_fn);
1628  if (process_probe_batch_fn == NULL) return false;
1629  process_probe_batch_fn_level0 =
1630  codegen->OptimizeFunctionWithExprs(process_probe_batch_fn_level0);
1631  if (process_probe_batch_fn_level0 == NULL) return false;
1632 
1633  // Register native function pointers
1634  codegen->AddFunctionToJit(process_probe_batch_fn,
1635  reinterpret_cast<void**>(&process_probe_batch_fn_));
1636  codegen->AddFunctionToJit(process_probe_batch_fn_level0,
1637  reinterpret_cast<void**>(&process_probe_batch_fn_level0_));
1638  return true;
1639 }
void UpdateState(State s)
Updates state_ to 's', logging the transition.
uint32_t slot_filter_bitmap_size() const
int id() const
Definition: exec-node.h:154
The underlying memory management is done by the BufferedBlockMgr.
void AddRuntimeExecOption(const std::string &option)
Appends option to 'runtime_exec_options_'.
Definition: exec-node.cc:188
bool AtEnd() const
Returns true if this iterator is at the end, i.e. GetRow() cannot be called.
Definition: hash-table.h:492
int num_rows() const
Definition: row-batch.h:215
std::list< Partition * > spilled_partitions_
bool non_empty_build_
If true, the build side has at least one row.
Status BuildHashTableInternal(RuntimeState *state, bool *built)
static int64_t EstimateNumBuckets(int64_t num_rows)
Definition: hash-table.h:398
RuntimeProfile::Counter * partition_build_timer_
Total time spent partitioning build.
int64_t num_rows_returned_
Definition: exec-node.h:223
void Close()
Must be called once at the end to cleanup all resources. Idempotent.
BufferedBlockMgr * block_mgr()
MemTracker * mem_tracker()
Definition: exec-node.h:162
Status MemLimitTooLowError(Client *client)
RuntimeProfile::Counter * num_probe_rows_partitioned_
Status OutputNullAwareProbeRows(RuntimeState *state, RowBatch *out_batch)
std::vector< ExprContext * > other_join_conjunct_ctxs_
Non-equi-join conjuncts from the JOIN clause.
llvm::Function * CodegenCreateOutputRow(LlvmCodeGen *codegen)
Codegen function to create output row. Assumes that the probe row is non-NULL.
static Status NullAwareAntiJoinError(bool build)
void CreateOutputRow(TupleRow *out_row, TupleRow *probe_row, TupleRow *build_row)
Utility struct that wraps a variable name and llvm type.
Definition: llvm-codegen.h:149
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
int64_t rows_returned() const
Number of rows returned via GetNext().
bool AllocateProbeFilters(RuntimeState *state)
Status ProcessBuildBatch(RowBatch *build_batch)
Reads the rows in build_batch and partitions them in hash_partitions_.
RuntimeProfile::HighWaterMarkCounter * max_partition_level_
Level of max partition (i.e. number of repartitioning steps).
virtual Status Prepare(RuntimeState *state)
PartitionedHashJoinNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
const StringSearch UrlParser::hash_search & hash
Definition: url-parser.cc:41
RuntimeProfile::Counter * null_aware_eval_timer_
Time spent evaluating other_join_conjuncts for NAAJ.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
bool CodegenProcessBuildBatch(RuntimeState *state, llvm::Function *hash_fn, llvm::Function *murmur_hash_fn)
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
Status ProcessBuildInput(RuntimeState *state, int level)
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
Open() implemented in BlockingJoinNode.
Status SpillPartition(Partition **spilled_partition)
HashTable * hash_tbls_[PARTITION_FANOUT]
int64_t SpareCapacity() const
Definition: mem-tracker.h:270
#define ADD_TIMER(profile, name)
Status BuildHashTables(RuntimeState *state)
virtual void Close(RuntimeState *state)
Status RegisterClient(int num_reserved_buffers, MemTracker *tracker, RuntimeState *state, Client **client)
virtual void Close(RuntimeState *state)
bool AtCapacity()
Definition: row-batch.h:120
virtual Status Init(const TPlanNode &tnode)
bool CodegenProcessProbeBatch(RuntimeState *state, llvm::Function *hash_fn, llvm::Function *murmur_hash_fn)
std::string PrintState() const
Returns the current state of the partition as a string.
TupleRow * next_row(RowBatch *batch) const
Definition: tuple-row.h:69
RuntimeProfile::Counter * num_hash_buckets_
Total number of hash buckets across all partitions.
void ResetForProbe()
Prepares for probing the next batch.
void AddDetail(const std::string &msg)
Add a detail string. Calling this method is only defined on a non-OK message.
Definition: status.cc:166
const RowDescriptor & row_desc() const
Definition: exec-node.h:156
Status NextSpilledProbeRowBatch(RuntimeState *, RowBatch *out_batch)
#define COUNTER_ADD(c, v)
virtual void AddToDebugString(int indentation_level, std::stringstream *out) const
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Definition: exec-node.cc:378
llvm::Argument * GetArgument(llvm::Function *fn, int i)
Returns the i-th argument of fn.
bool ReachedLimit()
Definition: exec-node.h:159
#define SCOPED_TIMER(c)
llvm::Value * null_ptr_value()
Definition: llvm-codegen.h:382
const std::vector< ExprContext * > & conjunct_ctxs() const
Definition: exec-node.h:152
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
void AddBitmapFilter(SlotId slot, Bitmap *bitmap, bool *acquired_ownership)
ObjectPool * obj_pool()
Returns a local object pool.
Definition: coordinator.h:263
RuntimeProfile::Counter * probe_timer_
std::vector< Partition * > hash_partitions_
static std::string Print(T *value_or_type)
Returns the string representation of a llvm::Value* or llvm::Type*.
Definition: llvm-codegen.h:326
void AddTupleStream(BufferedTupleStream *stream)
Definition: row-batch.cc:218
virtual Status Init(const TPlanNode &tnode)
LLVM code generator. This is the top level object to generate jitted code.
Definition: llvm-codegen.h:107
void ClearReservations(Client *client)
Clears all reservations for this client.
RuntimeProfile::Counter * probe_row_counter_
MemTracker * expr_mem_tracker()
Definition: exec-node.h:163
static const char * LLVM_CLASS_NAME
Definition: tuple-row.h:76
virtual Status Prepare(RuntimeState *state)
llvm::Value * CastPtrToLlvmPtr(llvm::Type *type, const void *ptr)
void AddArgument(const NamedVariable &var)
Add argument.
Definition: llvm-codegen.h:171
static llvm::Function * CodegenEvalConjuncts(RuntimeState *state, const std::vector< ExprContext * > &conjunct_ctxs, const char *name="EvalConjuncts")
Definition: exec-node.cc:452
Status BuildHashTable(RuntimeState *state, bool *built, const bool add_probe_filters)
Wrapper for the template-based BuildHashTable() based on 'add_probe_filters'.
void CodegenMemcpy(LlvmBuilder *, llvm::Value *dst, llvm::Value *src, int size)
boost::scoped_ptr< RowBatch > nulls_build_batch_
ObjectPool * obj_pool() const
Definition: runtime-state.h:92
RuntimeProfile::Counter * num_spilled_partitions_
Number of partitions that have been spilled.
#define RETURN_IF_CANCELLED(state)
ObjectPool pool
bool using_small_buffers_
If true, the partitions in hash_partitions_ are using small buffers.
#define ADD_COUNTER(profile, name, unit)
uint32_t fragment_hash_seed() const
llvm::Function * GetFunction(IRFunction::Type)
void OutputUnmatchedBuild(RowBatch *out_batch)
std::vector< ExprContext * > probe_expr_ctxs_
Status EvaluateNullProbe(BufferedTupleStream *build)
Status NextProbeRowBatch(RuntimeState *, RowBatch *out_batch)
void AddFunctionToJit(llvm::Function *fn, void **fn_ptr)
llvm::Function * CloneFunction(llvm::Function *fn)
Returns a copy of fn. The copy is added to the module.
void AddExprCtxsToFree(const std::vector< ExprContext * > &ctxs)
virtual Status QueryMaintenance(RuntimeState *state)
Definition: exec-node.cc:401
bool is_closed()
Definition: exec-node.h:242
void CommitLastRow()
Definition: row-batch.h:109
uint32_t seed(int level)
Definition: hash-table.h:125
Status GetRows(boost::scoped_ptr< RowBatch > *batch, bool *got_rows)
virtual Status Open(RuntimeState *state)
const RowDescriptor & row_desc() const
#define COUNTER_SET(c, v)
int batch_size() const
Definition: runtime-state.h:98
virtual Status InitGetNext(TupleRow *first_probe_row)
Partition(RuntimeState *state, PartitionedHashJoinNode *parent, int level, bool use_small_buffers)
static Status CreateExprTree(ObjectPool *pool, const TExpr &texpr, ExprContext **ctx)
Definition: expr.cc:129
RuntimeProfile::Counter * rows_returned_counter_
Definition: exec-node.h:226
RuntimeProfile::HighWaterMarkCounter * largest_partition_percent_
static const Status MEM_LIMIT_EXCEEDED
Definition: status.h:89
ExecNode * child(int i)
Definition: exec-node.h:241
RuntimeProfile::Counter * num_build_rows_partitioned_
Number of build/probe rows that have been partitioned.
Status SetMemLimitExceeded(MemTracker *tracker=NULL, int64_t failed_allocation_size=0)
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
Definition: descriptors.h:412
RuntimeProfile::Counter * partitions_created_
Total number of partitions created.
void SetErrorMsg(const ErrorMsg &m)
Definition: status.h:197
std::vector< ExprContext * > build_expr_ctxs_
int capacity() const
Definition: row-batch.h:216
bool AppendRowStreamFull(BufferedTupleStream *stream, TupleRow *row)
Status Init(RuntimeProfile *profile=NULL, bool pinned=true)
#define UNLIKELY(expr)
Definition: compiler-util.h:33
bool codegen_enabled() const
Returns true if codegen is enabled for this query.
void CopyRow(TupleRow *src, TupleRow *dest)
Definition: row-batch.h:173
bool AddRow(TupleRow *row, uint8_t **dst=NULL)
HighWaterMarkCounter * AddHighWaterMarkCounter(const std::string &name, TUnit::type unit, const std::string &parent_counter_name="")
Reference to a single slot of a tuple.
Definition: slot-ref.h:23
Status OutputNullAwareNullProbe(RuntimeState *state, RowBatch *out_batch)
std::list< Partition * > output_build_partitions_
static const char * LLVM_CLASS_NAME
Status PrepareForRead(bool *got_buffer=NULL)
virtual Status ConstructBuildSide(RuntimeState *state)
static const Status OK
Definition: status.h:87
#define LIKELY(expr)
Definition: compiler-util.h:32
ObjectPool * pool_
Definition: exec-node.h:211
llvm::Type * GetType(const ColumnType &type)
Returns llvm type for the column type.
Status GetCodegen(LlvmCodeGen **codegen, bool initialize=true)
int64_t num_rows() const
Number of rows in the stream.
BufferedBlockMgr::Client * block_mgr_client_
Client to the buffered block mgr.
bool AttachProbeFilters(RuntimeState *state)
Attach the probe filters to runtime state.
llvm::Value * GetIntConstant(PrimitiveType type, int64_t val)
Returns the constant 'val' of 'type'.
int ProcessProbeBatch(RowBatch *out_batch, HashTableCtx *ht_ctx)
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
Definition: expr.cc:149
static int64_t EstimateSize(int64_t num_rows)
Definition: hash-table.h:402
RuntimeProfile::Counter * build_row_counter_
llvm::Function * FinalizeFunction(llvm::Function *function)
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
Definition: exec-node.cc:393
Status PrepareNullAwarePartition()
Initializes null_aware_partition_ and nulls_build_batch_ to output rows.
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
Status UnpinStream(bool all=false)
Status SwitchToIoBuffers(bool *got_buffer)
boost::scoped_ptr< HashTableCtx > ht_ctx_
boost::scoped_ptr< RowBatch > probe_batch_
HashTable::Iterator hash_tbl_iterator_
The iterator that corresponds to the look up of current_probe_row_.
llvm::Function * ReplaceCallSites(llvm::Function *caller, bool update_in_place, llvm::Function *new_fn, const std::string &target_name, int *num_replaced)
bool ok() const
Definition: status.h:172
llvm::Type * void_type()
Definition: llvm-codegen.h:394
std::vector< ExprContext * > conjunct_ctxs_
Definition: exec-node.h:212
State state_
State of the algorithm. Used just for debugging.
llvm::LLVMContext & context()
Definition: llvm-codegen.h:214
Status GetNext(RowBatch *batch, bool *eos, std::vector< RowIdx > *indices=NULL)
virtual std::string DebugString() const
Definition: expr.cc:385
llvm::Function * OptimizeFunctionWithExprs(llvm::Function *fn)
void CommitRows(int n)
Definition: row-batch.h:102
int64_t size() const
Returns number of elements inserted in the hash table.
Definition: hash-table.h:380
virtual Status Reset(RuntimeState *state)
static uint32_t GetHashValue(const void *v, const ColumnType &type, uint32_t seed=0)
Definition: raw-value.h:168
llvm::PointerType * ptr_type()
Definition: llvm-codegen.h:393
std::vector< std::pair< SlotId, Bitmap * > > probe_filters_
boost::scoped_ptr< HashTable > hash_tbl_
The hash table for this partition.
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161
static const int NUM_PARTITIONING_BITS
Needs to be the log(PARTITION_FANOUT)
RuntimeProfile::Counter * num_repartitions_
Number of partitions that have been repartitioned.
DEFINE_bool(enable_phj_probe_side_filtering, true,"Enables pushing PHJ build side filters to probe side")