Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
partitioned-aggregation-node.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <math.h>
18 #include <sstream>
19 #include <gutil/strings/substitute.h>
20 #include <thrift/protocol/TDebugProtocol.h>
21 
22 #include "codegen/codegen-anyval.h"
23 #include "codegen/llvm-codegen.h"
24 #include "exec/hash-table.inline.h"
25 #include "exprs/agg-fn-evaluator.h"
26 #include "exprs/expr.h"
27 #include "exprs/expr-context.h"
28 #include "exprs/slot-ref.h"
30 #include "runtime/descriptors.h"
31 #include "runtime/mem-pool.h"
32 #include "runtime/raw-value.h"
33 #include "runtime/row-batch.h"
34 #include "runtime/runtime-state.h"
36 #include "runtime/tuple.h"
37 #include "runtime/tuple-row.h"
38 #include "udf/udf-internal.h"
39 #include "util/debug-util.h"
40 #include "util/runtime-profile.h"
41 
42 #include "gen-cpp/Exprs_types.h"
43 #include "gen-cpp/PlanNodes_types.h"
44 
45 #include "common/names.h"
46 
47 using namespace impala;
48 using namespace llvm;
49 using namespace strings;
50 
51 namespace impala {
52 
54  "class.impala::PartitionedAggregationNode";
55 
57  ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
58  : ExecNode(pool, tnode, descs),
59  intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
60  intermediate_tuple_desc_(NULL),
61  output_tuple_id_(tnode.agg_node.output_tuple_id),
62  output_tuple_desc_(NULL),
63  needs_finalize_(tnode.agg_node.need_finalize),
64  needs_serialize_(false),
65  block_mgr_client_(NULL),
66  using_small_buffers_(true),
67  singleton_output_tuple_(NULL),
68  singleton_output_tuple_returned_(true),
69  output_partition_(NULL),
70  process_row_batch_fn_(NULL),
71  build_timer_(NULL),
72  ht_resize_timer_(NULL),
73  get_results_timer_(NULL),
74  num_hash_buckets_(NULL),
75  partitions_created_(NULL),
76  max_partition_level_(NULL),
77  num_row_repartitioned_(NULL),
78  num_repartitions_(NULL) {
79  DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS);
80 }
81 
82 Status PartitionedAggregationNode::Init(const TPlanNode& tnode) {
85  Expr::CreateExprTrees(pool_, tnode.agg_node.grouping_exprs, &probe_expr_ctxs_));
86  for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) {
87  AggFnEvaluator* evaluator;
89  pool_, tnode.agg_node.aggregate_functions[i], &evaluator));
90  aggregate_evaluators_.push_back(evaluator);
91  }
92  return Status::OK;
93 }
94 
96  SCOPED_TIMER(runtime_profile_->total_time_counter());
97 
98  // Create the codegen object before preparing conjunct_ctxs_ and children_, so that any
99  // ScalarFnCalls will use codegen.
100  // TODO: this is brittle and hard to reason about, revisit
101  if (state->codegen_enabled()) {
102  LlvmCodeGen* codegen;
103  RETURN_IF_ERROR(state->GetCodegen(&codegen));
104  }
105 
107  state_ = state;
108 
109  mem_pool_.reset(new MemPool(mem_tracker()));
110  agg_fn_pool_.reset(new MemPool(expr_mem_tracker()));
111 
112  build_timer_ = ADD_TIMER(runtime_profile(), "BuildTime");
113  ht_resize_timer_ = ADD_TIMER(runtime_profile(), "HTResizeTime");
114  get_results_timer_ = ADD_TIMER(runtime_profile(), "GetResultsTime");
116  ADD_COUNTER(runtime_profile(), "HashBuckets", TUnit::UNIT);
118  ADD_COUNTER(runtime_profile(), "PartitionsCreated", TUnit::UNIT);
120  "MaxPartitionLevel", TUnit::UNIT);
122  ADD_COUNTER(runtime_profile(), "RowsRepartitioned", TUnit::UNIT);
124  ADD_COUNTER(runtime_profile(), "NumRepartitions", TUnit::UNIT);
126  ADD_COUNTER(runtime_profile(), "SpilledPartitions", TUnit::UNIT);
128  "LargestPartitionPercent", TUnit::UNIT);
129 
133  DCHECK_EQ(intermediate_tuple_desc_->slots().size(),
134  output_tuple_desc_->slots().size());
135 
139 
141 
142  // Construct build exprs from intermediate_agg_tuple_desc_
143  for (int i = 0; i < probe_expr_ctxs_.size(); ++i) {
145  DCHECK(desc->type().type == TYPE_NULL ||
146  desc->type() == probe_expr_ctxs_[i]->root()->type());
147  // Hack to avoid TYPE_NULL SlotRefs.
148  Expr* expr = desc->type().type != TYPE_NULL ?
149  new SlotRef(desc) : new SlotRef(desc, TYPE_BOOLEAN);
150  state->obj_pool()->Add(expr);
151  build_expr_ctxs_.push_back(new ExprContext(expr));
152  state->obj_pool()->Add(build_expr_ctxs_.back());
153  contains_var_len_grouping_exprs_ |= expr->type().IsVarLen();
154  }
155  // Construct a new row desc for preparing the build exprs because neither the child's
156  // nor this node's output row desc may contain the intermediate tuple, e.g.,
157  // in a single-node plan with an intermediate tuple different from the output tuple.
162 
163  int j = probe_expr_ctxs_.size();
164  for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++j) {
165  // skip non-materialized slots; we don't have evaluators instantiated for those
166  while (!intermediate_tuple_desc_->slots()[j]->is_materialized()) {
167  DCHECK_LT(j, intermediate_tuple_desc_->slots().size() - 1)
168  << "#eval= " << aggregate_evaluators_.size()
169  << " #probe=" << probe_expr_ctxs_.size();
170  ++j;
171  }
172  SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j];
173  SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j];
174  FunctionContext* agg_fn_ctx = NULL;
176  intermediate_slot_desc, output_slot_desc, agg_fn_pool_.get(), &agg_fn_ctx));
177  agg_fn_ctxs_.push_back(agg_fn_ctx);
178  state->obj_pool()->Add(agg_fn_ctx);
179  needs_serialize_ |= aggregate_evaluators_[i]->SupportsSerialize();
180  }
181 
182  if (probe_expr_ctxs_.empty()) {
183  // create single output tuple now; we need to output something
184  // even if our input is empty
188  } else {
189  ht_ctx_.reset(new HashTableCtx(build_expr_ctxs_, probe_expr_ctxs_, true, true,
194  }
195 
196  if (state->codegen_enabled()) {
197  LlvmCodeGen* codegen;
198  RETURN_IF_ERROR(state->GetCodegen(&codegen));
199  Function* codegen_process_row_batch_fn = CodegenProcessBatch();
200  if (codegen_process_row_batch_fn != NULL) {
201  codegen->AddFunctionToJit(codegen_process_row_batch_fn,
202  reinterpret_cast<void**>(&process_row_batch_fn_));
203  AddRuntimeExecOption("Codegen Enabled");
204  }
205  }
206  return Status::OK;
207 }
208 
210  SCOPED_TIMER(runtime_profile_->total_time_counter());
212 
215 
216  DCHECK_EQ(aggregate_evaluators_.size(), agg_fn_ctxs_.size());
217  for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
219  }
220 
221  if (needs_serialize_ && block_mgr_client_ != NULL) {
223  state->block_mgr(), block_mgr_client_,
224  false, /* use initial small buffers */
225  true /* delete on read */));
227  DCHECK(serialize_stream_->has_write_block());
228  }
229 
230  // Read all the rows from the child and process them.
231  RETURN_IF_ERROR(children_[0]->Open(state));
232  RowBatch batch(children_[0]->row_desc(), state->batch_size(), mem_tracker());
233  bool eos = false;
234  while (!eos) {
235  RETURN_IF_CANCELLED(state);
237  RETURN_IF_ERROR(children_[0]->GetNext(state, &batch, &eos));
238 
239  if (VLOG_ROW_IS_ON) {
240  for (int i = 0; i < batch.num_rows(); ++i) {
241  TupleRow* row = batch.GetRow(i);
242  VLOG_ROW << "input row: " << PrintRow(row, children_[0]->row_desc());
243  }
244  }
245 
247  if (process_row_batch_fn_ != NULL) {
248  RETURN_IF_ERROR(process_row_batch_fn_(this, &batch, ht_ctx_.get()));
249  } else if (probe_expr_ctxs_.empty()) {
251  } else {
252  // There is grouping, so we will do partitioned aggregation.
253  RETURN_IF_ERROR(ProcessBatch<false>(&batch, ht_ctx_.get()));
254  }
255  batch.Reset();
256  }
257 
258  // We have consumed all of the input from the child and transfered ownership of the
259  // resources we need, so the child can be closed safely to release its resources.
260  child(0)->Close(state);
261 
262  // Done consuming child(0)'s input. Move all the partitions in hash_partitions_
263  // to spilled_partitions_/aggregated_partitions_. We'll finish the processing in
264  // GetNext().
265  if (!probe_expr_ctxs_.empty()) {
267  }
268  return Status::OK;
269 }
270 
272  RowBatch* row_batch, bool* eos) {
273  SCOPED_TIMER(runtime_profile_->total_time_counter());
274  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
275  RETURN_IF_CANCELLED(state);
277 
278  if (ReachedLimit()) {
279  *eos = true;
280  return Status::OK;
281  }
282 
283  ExprContext** ctxs = &conjunct_ctxs_[0];
284  int num_ctxs = conjunct_ctxs_.size();
285  if (probe_expr_ctxs_.empty()) {
286  // There was grouping, so evaluate the conjuncts and return the single result row.
287  // We allow calling GetNext() after eos, so don't return this row again.
289  int row_idx = row_batch->AddRow();
290  TupleRow* row = row_batch->GetRow(row_idx);
291  Tuple* output_tuple = GetOutputTuple(
293  row->SetTuple(0, output_tuple);
294  if (ExecNode::EvalConjuncts(ctxs, num_ctxs, row)) {
295  row_batch->CommitLastRow();
297  }
299  }
300  *eos = true;
302  return Status::OK;
303  }
304 
305  if (output_iterator_.AtEnd()) {
306  // Done with this partition, move onto the next one.
307  if (output_partition_ != NULL) {
308  output_partition_->Close(false);
309  output_partition_ = NULL;
310  }
311  if (aggregated_partitions_.empty() && spilled_partitions_.empty()) {
312  // No more partitions, all done.
313  *eos = true;
314  return Status::OK;
315  }
316  // Process next partition.
318  DCHECK(output_partition_ != NULL);
319  }
320 
322  int count = 0;
323  const int N = BitUtil::NextPowerOfTwo(state->batch_size());
324  // Keeping returning rows from the current partition.
325  while (!output_iterator_.AtEnd() && !row_batch->AtCapacity()) {
326  // This loop can go on for a long time if the conjuncts are very selective. Do query
327  // maintenance every N iterations.
328  if ((count++ & (N - 1)) == 0) {
329  RETURN_IF_CANCELLED(state);
331  }
332 
333  int row_idx = row_batch->AddRow();
334  TupleRow* row = row_batch->GetRow(row_idx);
335  Tuple* intermediate_tuple = output_iterator_.GetTuple();
336  Tuple* output_tuple = GetOutputTuple(
337  output_partition_->agg_fn_ctxs, intermediate_tuple, row_batch->tuple_data_pool());
339  row->SetTuple(0, output_tuple);
340  if (ExecNode::EvalConjuncts(ctxs, num_ctxs, row)) {
341  row_batch->CommitLastRow();
343  if (ReachedLimit()) break; // TODO: remove this check? is this expensive?
344  }
345  }
347  *eos = ReachedLimit();
348  if (output_iterator_.AtEnd()) row_batch->MarkNeedToReturn();
349  return Status::OK;
350 }
351 
352 void PartitionedAggregationNode::CleanupHashTbl(const vector<FunctionContext*>& ctxs,
353  HashTable::Iterator it) {
354  if (!needs_finalize_ && !needs_serialize_) return;
355 
356  // Iterate through the remaining rows in the hash table and call Serialize/Finalize on
357  // them in order to free any memory allocated by UDAs. Finalize() requires a dst tuple
358  // but we don't actually need the result, so allocate a single dummy tuple to avoid
359  // accumulating memory.
360  Tuple* dummy_dst = NULL;
361  if (needs_finalize_) {
362  dummy_dst = Tuple::Create(output_tuple_desc_->byte_size(), mem_pool_.get());
363  }
364  while (!it.AtEnd()) {
365  Tuple* tuple = it.GetTuple();
366  if (needs_finalize_) {
368  } else {
370  }
371  it.Next();
372  }
373 }
374 
376  DCHECK(false) << "NYI";
377  return Status("NYI");
378 }
379 
381  if (is_closed()) return;
382 
384  DCHECK_EQ(agg_fn_ctxs_.size(), aggregate_evaluators_.size());
386  }
387 
388  // Iterate through the remaining rows in the hash table and call Serialize/Finalize on
389  // them in order to free any memory allocated by UDAs
390  if (output_partition_ != NULL) {
392  output_partition_->Close(false);
393  }
394 
395  for (int i = 0; i < hash_partitions_.size(); ++i) {
396  hash_partitions_[i]->Close(true);
397  }
398  for (list<Partition*>::iterator it = aggregated_partitions_.begin();
399  it != aggregated_partitions_.end(); ++it) {
400  (*it)->Close(true);
401  }
402  for (list<Partition*>::iterator it = spilled_partitions_.begin();
403  it != spilled_partitions_.end(); ++it) {
404  (*it)->Close(true);
405  }
406  aggregated_partitions_.clear();
407  spilled_partitions_.clear();
408 
409  for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
410  aggregate_evaluators_[i]->Close(state);
411  }
412  for (int i = 0; i < agg_fn_ctxs_.size(); ++i) {
413  agg_fn_ctxs_[i]->impl()->Close();
414  }
415  if (agg_fn_pool_.get() != NULL) agg_fn_pool_->FreeAll();
416  if (mem_pool_.get() != NULL) mem_pool_->FreeAll();
417  if (ht_ctx_.get() != NULL) ht_ctx_->Close();
418  if (serialize_stream_.get() != NULL) serialize_stream_->Close();
419 
420  if (block_mgr_client_ != NULL) {
422  }
423 
426  ExecNode::Close(state);
427 }
428 
431  for (int i = 0; i < parent->agg_fn_ctxs_.size(); ++i) {
432  agg_fn_ctxs.push_back(parent->agg_fn_ctxs_[i]->impl()->Clone(agg_fn_pool.get()));
434  }
435 
439  level == 0, /* use small buffers */
440  false /* delete on read */));
442 
446  level == 0, /* use small buffers */
447  true /* delete on read */));
448  // This stream is only used to spill, no need to ever have this pinned.
450  DCHECK(unaggregated_row_stream->has_write_block());
451  return Status::OK;
452 }
453 
455  DCHECK(hash_tbl.get() == NULL);
456  // We use the upper PARTITION_FANOUT num bits to pick the partition so only the
457  // remaining bits can be used for the hash table.
458  // TODO: how many buckets?
459  // TODO: we could switch to 64 bit hashes and then we don't need a max size.
460  // It might be reasonable to limit individual hash table size for other reasons
461  // though. Always start with small buffers.
462  hash_tbl.reset(new HashTable(parent->state_, parent->block_mgr_client_, 1, NULL,
463  1 << (32 - NUM_PARTITIONING_BITS)));
464  return hash_tbl->Init();
465 }
466 
468  DCHECK(!is_spilled());
469  if (parent->needs_serialize_ && aggregated_row_stream->num_rows() != 0) {
470  // We need to do a lot more work in this case. This step effectively does a merge
471  // aggregation in this node. We need to serialize the intermediates, spill the
472  // intermediates and then feed them into the aggregate function's merge step.
473  // This is often used when the intermediate is a string type, meaning the current
474  // (before serialization) in memory layout is not the on disk block layout.
475  // The disk layout does not support mutable rows. We need to rewrite the stream
476  // into the on disk format.
477  // TODO: if it happens to not be a string, we could serialize in place. This is
478  // a future optimization since it is very unlikely to have a serialize phase
479  // for those UDAs.
480  DCHECK_NOTNULL(parent->serialize_stream_.get());
481  DCHECK(!parent->serialize_stream_->is_pinned());
482  DCHECK(parent->serialize_stream_->has_write_block());
483 
484  const vector<AggFnEvaluator*>& evaluators = parent->aggregate_evaluators_;;
485 
486  // Serialize and copy the spilled partition's stream into the new stream.
487  bool failed_to_add = false;
488  BufferedTupleStream* new_stream = parent->serialize_stream_.get();
489  HashTable::Iterator it = hash_tbl->Begin(parent->ht_ctx_.get());
490  while (!it.AtEnd()) {
491  Tuple* tuple = it.GetTuple();
492  it.Next();
493  AggFnEvaluator::Serialize(evaluators, agg_fn_ctxs, tuple);
494  if (UNLIKELY(!new_stream->AddRow(reinterpret_cast<TupleRow*>(&tuple)))) {
495  failed_to_add = true;
496  break;
497  }
498  }
499 
500  if (intermediate_tuple != NULL) {
501  AggFnEvaluator::Serialize(evaluators, agg_fn_ctxs, intermediate_tuple);
502  if (!failed_to_add &&
503  !new_stream->AddRow(reinterpret_cast<TupleRow*>(&intermediate_tuple))) {
504  failed_to_add = true;
505  }
506  }
507 
508  // Even if we can't add to new_stream, finish up processing this agg stream
509  // to make clean up easier (someone has to finalize this stream and we don't want
510  // to remember where we are).
511  if (failed_to_add) {
512  parent->CleanupHashTbl(agg_fn_ctxs, it);
513  hash_tbl->Close();
514  hash_tbl.reset();
515  aggregated_row_stream->Close();
516  RETURN_IF_ERROR(new_stream->status());
517  return parent->state_->block_mgr()->MemLimitTooLowError(parent->block_mgr_client_);
518  }
519 
520  aggregated_row_stream->Close();
521  aggregated_row_stream.swap(parent->serialize_stream_);
522  // Recreate the serialize_stream (and reserve 1 buffer) now in preparation for
523  // when we need to spill again. We need to have this available before we need
524  // to spill to make sure it is available. This must be acquirable since we just
525  // freed at least one buffer from this partition's (old) aggregated_row_stream.
526  parent->serialize_stream_.reset(new BufferedTupleStream(parent->state_,
527  *parent->intermediate_row_desc_, parent->state_->block_mgr(),
528  parent->block_mgr_client_,
529  false, /* use small buffers */
530  true /* delete on read */));
531  Status s = parent->serialize_stream_->Init(parent->runtime_profile(), false);
532  if (!s.ok()) {
533  hash_tbl->Close();
534  hash_tbl.reset();
535  return s;
536  }
537  DCHECK(parent->serialize_stream_->has_write_block());
538  }
539 
540  // Free the in-memory result data
541  for (int i = 0; i < agg_fn_ctxs.size(); ++i) {
542  agg_fn_ctxs[i]->impl()->Close();
543  }
544 
545  if (agg_fn_pool.get() != NULL) {
546  agg_fn_pool->FreeAll();
547  agg_fn_pool.reset();
548  }
549 
550  hash_tbl->Close();
551  hash_tbl.reset();
552  DCHECK(aggregated_row_stream->has_write_block())
553  << aggregated_row_stream->DebugString();
554  RETURN_IF_ERROR(aggregated_row_stream->UnpinStream(false));
555 
556  COUNTER_ADD(parent->num_spilled_partitions_, 1);
557  if (parent->num_spilled_partitions_->value() == 1) {
558  parent->AddRuntimeExecOption("Spilled");
559  }
560  // Need to make sure that we are not going to lose any information from the small
561  // buffers. Therefore, we are checking if we using small buffers and we actually have
562  // added some rows there.
563  DCHECK(!(aggregated_row_stream->using_small_buffers() &&
564  aggregated_row_stream->num_rows() > 0));
565  DCHECK(!(unaggregated_row_stream->using_small_buffers() &&
566  unaggregated_row_stream->num_rows() > 0));
567  return Status::OK;
568 }
569 
571  if (is_closed) return;
572  is_closed = true;
573  if (aggregated_row_stream.get() != NULL) {
574  if (finalize_rows && hash_tbl.get() != NULL) {
575  // We need to walk all the rows and Finalize them here so the UDA gets a chance
576  // to cleanup. If the hash table is gone (meaning this was spilled), the rows
577  // should have been finalized/serialized in Spill().
578  parent->CleanupHashTbl(agg_fn_ctxs, hash_tbl->Begin(parent->ht_ctx_.get()));
579  }
580  aggregated_row_stream->Close();
581  }
582  if (hash_tbl.get() != NULL) hash_tbl->Close();
583  if (unaggregated_row_stream.get() != NULL) unaggregated_row_stream->Close();
584 
585  for (int i = 0; i < agg_fn_ctxs.size(); ++i) {
586  agg_fn_ctxs[i]->impl()->Close();
587  }
588  if (agg_fn_pool.get() != NULL) agg_fn_pool->FreeAll();
589 }
590 
592  const vector<FunctionContext*>& agg_fn_ctxs, MemPool* pool,
593  BufferedTupleStream* stream) {
594  DCHECK(stream == NULL || pool == NULL);
595  DCHECK(stream != NULL || pool != NULL);
596 
597  Tuple* intermediate_tuple = NULL;
598  uint8_t* buffer = NULL;
599  if (pool != NULL) {
600  intermediate_tuple = Tuple::Create(intermediate_tuple_desc_->byte_size(), pool);
601  } else {
602  // Figure out how big it will be to copy the entire tuple. We need the tuple to end
603  // up on one block in the stream.
604  int size = intermediate_tuple_desc_->byte_size();
606  // TODO: This is likely to be too slow. The hash table could maintain this as
607  // it hashes.
608  for (int i = 0; i < probe_expr_ctxs_.size(); ++i) {
609  if (!probe_expr_ctxs_[i]->root()->type().IsVarLen()) continue;
610  if (ht_ctx_->last_expr_value_null(i)) continue;
611  StringValue* sv = reinterpret_cast<StringValue*>(ht_ctx_->last_expr_value(i));
612  size += sv->len;
613  }
614  }
615  buffer = stream->AllocateRow(size);
616  if (buffer == NULL) return NULL;
617  intermediate_tuple = reinterpret_cast<Tuple*>(buffer);
618  // TODO: remove this. we shouldn't need to zero the entire tuple.
619  intermediate_tuple->Init(size);
620  buffer += intermediate_tuple_desc_->byte_size();
621  }
622 
623  // copy grouping values
624  vector<SlotDescriptor*>::const_iterator slot_desc =
625  intermediate_tuple_desc_->slots().begin();
626  for (int i = 0; i < probe_expr_ctxs_.size(); ++i, ++slot_desc) {
627  if (ht_ctx_->last_expr_value_null(i)) {
628  intermediate_tuple->SetNull((*slot_desc)->null_indicator_offset());
629  } else {
630  void* src = ht_ctx_->last_expr_value(i);
631  void* dst = intermediate_tuple->GetSlot((*slot_desc)->tuple_offset());
632  if (stream == NULL) {
633  RawValue::Write(src, dst, (*slot_desc)->type(), pool);
634  } else {
635  RawValue::Write(src, (*slot_desc)->type(), dst, &buffer);
636  }
637  }
638  }
639 
640  // Initialize aggregate output.
641  for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++slot_desc) {
642  while (!(*slot_desc)->is_materialized()) ++slot_desc;
643  AggFnEvaluator* evaluator = aggregate_evaluators_[i];
644  evaluator->Init(agg_fn_ctxs[i], intermediate_tuple);
645  // Codegen specific path for min/max.
646  // To minimize branching on the UpdateTuple path, initialize the result value
647  // so that UpdateTuple doesn't have to check if the aggregation
648  // dst slot is null.
649  // TODO: remove when we don't use the irbuilder for codegen here. This optimization
650  // will no longer be necessary when all aggregates are implemented with the UDA
651  // interface.
652  if ((*slot_desc)->type().type != TYPE_STRING &&
653  (*slot_desc)->type().type != TYPE_VARCHAR &&
654  (*slot_desc)->type().type != TYPE_TIMESTAMP &&
655  (*slot_desc)->type().type != TYPE_CHAR &&
656  (*slot_desc)->type().type != TYPE_DECIMAL) {
657  ExprValue default_value;
658  void* default_value_ptr = NULL;
659  switch (evaluator->agg_op()) {
660  case AggFnEvaluator::MIN:
661  default_value_ptr = default_value.SetToMax((*slot_desc)->type());
662  RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
663  break;
664  case AggFnEvaluator::MAX:
665  default_value_ptr = default_value.SetToMin((*slot_desc)->type());
666  RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
667  break;
668  default:
669  break;
670  }
671  }
672  }
673  return intermediate_tuple;
674 }
675 
677  Tuple* tuple, TupleRow* row, bool is_merge) {
678  DCHECK(tuple != NULL || aggregate_evaluators_.empty());
679  for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
680  if (is_merge) {
681  aggregate_evaluators_[i]->Merge(agg_fn_ctxs[i], row->GetTuple(0), tuple);
682  } else {
683  aggregate_evaluators_[i]->Add(agg_fn_ctxs[i], row, tuple);
684  }
685  }
686 }
687 
689  const vector<FunctionContext*>& agg_fn_ctxs, Tuple* tuple, MemPool* pool) {
690  DCHECK(tuple != NULL || aggregate_evaluators_.empty()) << tuple;
691  Tuple* dst = tuple;
694  }
695  if (needs_finalize_) {
696  AggFnEvaluator::Finalize(aggregate_evaluators_, agg_fn_ctxs, tuple, dst);
697  } else {
699  }
700  // Copy grouping values from tuple to dst.
701  // TODO: Codegen this.
702  if (dst != tuple) {
703  int num_grouping_slots = probe_expr_ctxs_.size();
704  for (int i = 0; i < num_grouping_slots; ++i) {
705  SlotDescriptor* src_slot_desc = intermediate_tuple_desc_->slots()[i];
706  SlotDescriptor* dst_slot_desc = output_tuple_desc_->slots()[i];
707  bool src_slot_null = tuple->IsNull(src_slot_desc->null_indicator_offset());
708  void* src_slot = NULL;
709  if (!src_slot_null) src_slot = tuple->GetSlot(src_slot_desc->tuple_offset());
710  RawValue::Write(src_slot, dst, dst_slot_desc, NULL);
711  }
712  }
713  return dst;
714 }
715 
716 void PartitionedAggregationNode::DebugString(int indentation_level,
717  stringstream* out) const {
718  *out << string(indentation_level * 2, ' ');
719  *out << "PartitionedAggregationNode("
720  << "intermediate_tuple_id=" << intermediate_tuple_id_
721  << " output_tuple_id=" << output_tuple_id_
722  << " needs_finalize=" << needs_finalize_
723  << " probe_exprs=" << Expr::DebugString(probe_expr_ctxs_)
725  ExecNode::DebugString(indentation_level, out);
726  *out << ")";
727 }
728 
730  if (level >= MAX_PARTITION_DEPTH) {
732  status.SetErrorMsg(ErrorMsg(TErrorCode::PARTITIONED_AGG_MAX_PARTITION_DEPTH,
735  return status;
736  }
737  ht_ctx_->set_level(level);
738 
739  DCHECK(hash_partitions_.empty());
740  for (int i = 0; i < PARTITION_FANOUT; ++i) {
741  hash_partitions_.push_back(state_->obj_pool()->Add(new Partition(this, level)));
742  RETURN_IF_ERROR(hash_partitions_[i]->InitStreams());
743  }
745 
746  // Now that all the streams are reserved (meaning we have enough memory to execute
747  // the algorithm), allocate the hash tables. These can fail and we can still continue.
748  for (int i = 0; i < PARTITION_FANOUT; ++i) {
749  if (!hash_partitions_[i]->InitHashTable()) {
750  RETURN_IF_ERROR(hash_partitions_[i]->Spill());
751  }
752  }
753  COUNTER_ADD(partitions_created_, PARTITION_FANOUT);
755  return Status::OK;
756 }
757 
759  int64_t max_rows = 0;
760  for (int i = 0; i < hash_partitions_.size(); ++i) {
761  Partition* partition = hash_partitions_[i];
762  if (partition->is_spilled()) {
763  int64_t rows = partition->aggregated_row_stream->num_rows() +
764  partition->unaggregated_row_stream->num_rows();
765  if (rows > max_rows) max_rows = rows;
766  }
767  }
768  return max_rows;
769 }
770 
772  DCHECK(output_partition_ == NULL);
773 
774  // Keep looping until we get to a partition that fits in memory.
775  Partition* partition = NULL;
776  while (true) {
777  partition = NULL;
778  // First return partitions that are fully aggregated (and in memory).
779  if (!aggregated_partitions_.empty()) {
780  partition = aggregated_partitions_.front();
781  DCHECK(!partition->is_spilled());
782  aggregated_partitions_.pop_front();
783  break;
784  }
785 
786  if (partition == NULL) {
787  DCHECK(!spilled_partitions_.empty());
789  needs_serialize_ ? 1 : 0);
790 
791  // TODO: we can probably do better than just picking the first partition. We
792  // can base this on the amount written to disk, etc.
793  partition = spilled_partitions_.front();
794  DCHECK(partition->is_spilled());
795 
796  // Create the new hash partitions to repartition into.
797  // TODO: we don't need to repartition here. We are now working on 1 / FANOUT
798  // of the input so it's reasonably likely it can fit. We should look at this
799  // partitions size and just do the aggregation if it fits in memory.
800  RETURN_IF_ERROR(CreateHashPartitions(partition->level + 1));
802 
803  // Rows in this partition could have been spilled into two streams, depending
804  // on if it is an aggregated intermediate, or an unaggregated row.
805  // Note: we must process the aggregated rows first to save a hash table lookup
806  // in ProcessBatch().
807  RETURN_IF_ERROR(ProcessStream<true>(partition->aggregated_row_stream.get()));
808  RETURN_IF_ERROR(ProcessStream<false>(partition->unaggregated_row_stream.get()));
809 
810  COUNTER_ADD(num_row_repartitioned_, partition->aggregated_row_stream->num_rows());
812  partition->unaggregated_row_stream->num_rows());
813 
814  partition->Close(false);
815  spilled_partitions_.pop_front();
816 
817  // Done processing this partition. Move the new partitions into
818  // spilled_partitions_/aggregated_partitions_.
819  int64_t num_input_rows = partition->aggregated_row_stream->num_rows() +
820  partition->unaggregated_row_stream->num_rows();
821 
822  // Check if there was any reduction in the size of partitions after repartitioning.
823  int64_t largest_partition = LargestSpilledPartition();
824  DCHECK_GE(num_input_rows, largest_partition) << "Cannot have a partition with "
825  "more rows than the input";
826  if (num_input_rows == largest_partition) {
828  status.AddDetail(Substitute("Cannot perform aggregation at node with id $0. "
829  "Repartitioning did not reduce the size of a spilled partition. "
830  "Repartitioning level $1. Number of rows $2.",
831  id_, partition->level + 1, num_input_rows));
833  return status;
834  }
835  RETURN_IF_ERROR(MoveHashPartitions(num_input_rows));
836  }
837  }
838 
839  DCHECK(partition->hash_tbl.get() != NULL);
840  DCHECK(partition->aggregated_row_stream->is_pinned());
841 
842  output_partition_ = partition;
843  output_iterator_ = output_partition_->hash_tbl->Begin(ht_ctx_.get());
844  COUNTER_ADD(num_hash_buckets_, output_partition_->hash_tbl->num_buckets());
845  return Status::OK;
846 }
847 
848 template<bool AGGREGATED_ROWS>
850  if (input_stream->num_rows() > 0) {
851  while (true) {
852  bool got_buffer = false;
853  RETURN_IF_ERROR(input_stream->PrepareForRead(&got_buffer));
854  if (got_buffer) break;
855  // Did not have a buffer to read the input stream. Spill and try again.
857  }
858 
859  bool eos = false;
860  RowBatch batch(AGGREGATED_ROWS ? *intermediate_row_desc_ : children_[0]->row_desc(),
862  do {
863  RETURN_IF_ERROR(input_stream->GetNext(&batch, &eos));
864  RETURN_IF_ERROR(ProcessBatch<AGGREGATED_ROWS>(&batch, ht_ctx_.get()));
865  batch.Reset();
866  } while (!eos);
867  }
868  input_stream->Close();
869  return Status::OK;
870 }
871 
873  Tuple* intermediate_tuple) {
874  int64_t max_freed_mem = 0;
875  int partition_idx = -1;
876 
877  if (using_small_buffers_) {
878  for (int i = 0; i < hash_partitions_.size(); ++i) {
879  if (hash_partitions_[i]->is_closed) continue;
880  DCHECK(hash_partitions_[i]->aggregated_row_stream->using_small_buffers());
881  DCHECK(hash_partitions_[i]->unaggregated_row_stream->using_small_buffers());
882  bool got_buffer;
884  hash_partitions_[i]->aggregated_row_stream->SwitchToIoBuffers(&got_buffer));
885  if (got_buffer) {
887  hash_partitions_[i]->unaggregated_row_stream->SwitchToIoBuffers(&got_buffer));
888  }
889  if (!got_buffer) {
891  status.AddDetail("Not enough memory to get the minimum required buffers for "
892  "aggregation.");
893  return status;
894  }
895  }
896  using_small_buffers_ = false;
897  }
898 
899  // Iterate over the partitions and pick the largest partition that is not spilled.
900  for (int i = 0; i < hash_partitions_.size(); ++i) {
901  if (hash_partitions_[i]->is_closed) continue;
902  if (hash_partitions_[i]->is_spilled()) continue;
903  int64_t mem = hash_partitions_[i]->aggregated_row_stream->bytes_in_mem(true);
904  mem += hash_partitions_[i]->hash_tbl->byte_size();
905  mem += hash_partitions_[i]->agg_fn_pool->total_reserved_bytes();
906  if (mem > max_freed_mem) {
907  max_freed_mem = mem;
908  partition_idx = i;
909  }
910  }
911  if (partition_idx == -1) {
912  // Could not find a partition to spill. This means the mem limit was just too low.
914  }
915 
916  Partition* spilled_partition = hash_partitions_[partition_idx];
917  RETURN_IF_ERROR(spilled_partition->Spill(
918  spilled_partition == curr_partition ? intermediate_tuple : NULL));
919  return Status::OK;
920 }
921 
923  DCHECK(!hash_partitions_.empty());
924  stringstream ss;
925  ss << "PA(node_id=" << id() << ") partitioned(level="
926  << hash_partitions_[0]->level << ") "
927  << num_input_rows << " rows into:" << endl;
928  for (int i = 0; i < hash_partitions_.size(); ++i) {
929  Partition* partition = hash_partitions_[i];
930  int64_t aggregated_rows = partition->aggregated_row_stream->num_rows();
931  int64_t unaggregated_rows = partition->unaggregated_row_stream->num_rows();
932  double total_rows = aggregated_rows + unaggregated_rows;
933  double percent = total_rows * 100 / num_input_rows;
934  ss << " " << i << " " << (partition->is_spilled() ? "spilled" : "not spilled")
935  << " (fraction=" << fixed << setprecision(2) << percent << "%)" << endl
936  << " #aggregated rows:" << aggregated_rows << endl
937  << " #unaggregated rows: " << unaggregated_rows << endl;
938 
939  // TODO: update counters to support doubles.
940  COUNTER_SET(largest_partition_percent_, static_cast<int64_t>(percent));
941 
942  if (total_rows == 0) {
943  partition->Close(false);
944  } else if (partition->is_spilled()) {
945  DCHECK(partition->hash_tbl.get() == NULL);
946  // We need to unpin all the spilled partitions to make room to allocate new
947  // hash_partitions_ when we repartition the spilled partitions.
948  // TODO: we only need to do this when we have memory pressure. This might be
949  // okay though since the block mgr should only write these to disk if there
950  // is memory pressure.
951  RETURN_IF_ERROR(partition->aggregated_row_stream->UnpinStream(true));
952  RETURN_IF_ERROR(partition->unaggregated_row_stream->UnpinStream(true));
953 
954  // Push new created partitions at the front. This means a depth first walk
955  // (more finely partitioned partitions are processed first). This allows us
956  // to delete blocks earlier and bottom out the recursion earlier.
957  spilled_partitions_.push_front(partition);
958  } else {
959  aggregated_partitions_.push_back(partition);
960  }
961 
962  }
963  VLOG(2) << ss.str();
964  hash_partitions_.clear();
965  return Status::OK;
966 }
967 
969  for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
971  }
973  for (int i = 0; i < hash_partitions_.size(); ++i) {
975  }
976  return ExecNode::QueryMaintenance(state);
977 }
978 
979 // IR Generation for updating a single aggregation slot. Signature is:
980 // void UpdateSlot(FunctionContext* fn_ctx, AggTuple* agg_tuple, char** row)
981 //
982 // The IR for sum(double_col) is:
983 // define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %fn_ctx,
984 // { i8, double }* %agg_tuple,
985 // %"class.impala::TupleRow"* %row) #20 {
986 // entry:
987 // %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* inttoptr
988 // (i64 128241264 to %"class.impala::ExprContext"*), %"class.impala::TupleRow"* %row)
989 // %0 = extractvalue { i8, double } %src, 0
990 // %is_null = trunc i8 %0 to i1
991 // br i1 %is_null, label %ret, label %src_not_null
992 //
993 // src_not_null: ; preds = %entry
994 // %dst_slot_ptr = getelementptr inbounds { i8, double }* %agg_tuple, i32 0, i32 1
995 // call void @SetNotNull({ i8, double }* %agg_tuple)
996 // %dst_val = load double* %dst_slot_ptr
997 // %val = extractvalue { i8, double } %src, 1
998 // %1 = fadd double %dst_val, %val
999 // store double %1, double* %dst_slot_ptr
1000 // br label %ret
1001 //
1002 // ret: ; preds = %src_not_null, %entry
1003 // ret void
1004 // }
1005 //
1006 // The IR for ndv(double_col) is:
1007 // define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %fn_ctx,
1008 // { i8, %"struct.impala::StringValue" }* %agg_tuple,
1009 // %"class.impala::TupleRow"* %row) #20 {
1010 // entry:
1011 // %dst_lowered_ptr = alloca { i64, i8* }
1012 // %src_lowered_ptr = alloca { i8, double }
1013 // %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* inttoptr
1014 // (i64 120530832 to %"class.impala::ExprContext"*), %"class.impala::TupleRow"* %row)
1015 // %0 = extractvalue { i8, double } %src, 0
1016 // %is_null = trunc i8 %0 to i1
1017 // br i1 %is_null, label %ret, label %src_not_null
1018 //
1019 // src_not_null: ; preds = %entry
1020 // %dst_slot_ptr = getelementptr inbounds
1021 // { i8, %"struct.impala::StringValue" }* %agg_tuple, i32 0, i32 1
1022 // call void @SetNotNull({ i8, %"struct.impala::StringValue" }* %agg_tuple)
1023 // %dst_val = load %"struct.impala::StringValue"* %dst_slot_ptr
1024 // store { i8, double } %src, { i8, double }* %src_lowered_ptr
1025 // %src_unlowered_ptr = bitcast { i8, double }* %src_lowered_ptr
1026 // to %"struct.impala_udf::DoubleVal"*
1027 // %ptr = extractvalue %"struct.impala::StringValue" %dst_val, 0
1028 // %dst_stringval = insertvalue { i64, i8* } zeroinitializer, i8* %ptr, 1
1029 // %len = extractvalue %"struct.impala::StringValue" %dst_val, 1
1030 // %1 = extractvalue { i64, i8* } %dst_stringval, 0
1031 // %2 = zext i32 %len to i64
1032 // %3 = shl i64 %2, 32
1033 // %4 = and i64 %1, 4294967295
1034 // %5 = or i64 %4, %3
1035 // %dst_stringval1 = insertvalue { i64, i8* } %dst_stringval, i64 %5, 0
1036 // store { i64, i8* } %dst_stringval1, { i64, i8* }* %dst_lowered_ptr
1037 // %dst_unlowered_ptr = bitcast { i64, i8* }* %dst_lowered_ptr
1038 // to %"struct.impala_udf::StringVal"*
1039 // call void @HllUpdate(%"class.impala_udf::FunctionContext"* %fn_ctx,
1040 // %"struct.impala_udf::DoubleVal"* %src_unlowered_ptr,
1041 // %"struct.impala_udf::StringVal"* %dst_unlowered_ptr)
1042 // %anyval_result = load { i64, i8* }* %dst_lowered_ptr
1043 // %6 = extractvalue { i64, i8* } %anyval_result, 1
1044 // %7 = insertvalue %"struct.impala::StringValue" zeroinitializer, i8* %6, 0
1045 // %8 = extractvalue { i64, i8* } %anyval_result, 0
1046 // %9 = ashr i64 %8, 32
1047 // %10 = trunc i64 %9 to i32
1048 // %11 = insertvalue %"struct.impala::StringValue" %7, i32 %10, 1
1049 // store %"struct.impala::StringValue" %11, %"struct.impala::StringValue"* %dst_slot_ptr
1050 // br label %ret
1051 //
1052 // ret: ; preds = %src_not_null, %entry
1053 // ret void
1054 // }
1056  AggFnEvaluator* evaluator, SlotDescriptor* slot_desc) {
1057  DCHECK(slot_desc->is_materialized());
1058  LlvmCodeGen* codegen;
1059  if (!state_->GetCodegen(&codegen).ok()) return NULL;
1060 
1061  DCHECK_EQ(evaluator->input_expr_ctxs().size(), 1);
1062  ExprContext* input_expr_ctx = evaluator->input_expr_ctxs()[0];
1063  Expr* input_expr = input_expr_ctx->root();
1064 
1065  // TODO: implement timestamp
1066  if (input_expr->type().type == TYPE_TIMESTAMP &&
1067  evaluator->agg_op() != AggFnEvaluator::AVG) {
1068  return NULL;
1069  }
1070 
1071  Function* agg_expr_fn;
1072  Status status = input_expr->GetCodegendComputeFn(state_, &agg_expr_fn);
1073  if (!status.ok()) {
1074  VLOG_QUERY << "Could not codegen UpdateSlot(): " << status.GetDetail();
1075  return NULL;
1076  }
1077  DCHECK(agg_expr_fn != NULL);
1078 
1079  PointerType* fn_ctx_type =
1081  StructType* tuple_struct = intermediate_tuple_desc_->GenerateLlvmStruct(codegen);
1082  PointerType* tuple_ptr_type = PointerType::get(tuple_struct, 0);
1083  PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
1084 
1085  // Create UpdateSlot prototype
1086  LlvmCodeGen::FnPrototype prototype(codegen, "UpdateSlot", codegen->void_type());
1087  prototype.AddArgument(LlvmCodeGen::NamedVariable("fn_ctx", fn_ctx_type));
1088  prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", tuple_ptr_type));
1089  prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
1090 
1091  LlvmCodeGen::LlvmBuilder builder(codegen->context());
1092  Value* args[3];
1093  Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
1094  Value* fn_ctx_arg = args[0];
1095  Value* agg_tuple_arg = args[1];
1096  Value* row_arg = args[2];
1097 
1098  BasicBlock* src_not_null_block =
1099  BasicBlock::Create(codegen->context(), "src_not_null", fn);
1100  BasicBlock* ret_block = BasicBlock::Create(codegen->context(), "ret", fn);
1101 
1102  // Call expr function to get src slot value
1103  Value* expr_ctx = codegen->CastPtrToLlvmPtr(
1104  codegen->GetPtrType(ExprContext::LLVM_CLASS_NAME), input_expr_ctx);
1105  Value* agg_expr_fn_args[] = { expr_ctx, row_arg };
1107  codegen, &builder, input_expr->type(), agg_expr_fn, agg_expr_fn_args, "src");
1108 
1109  Value* src_is_null = src.GetIsNull();
1110  builder.CreateCondBr(src_is_null, ret_block, src_not_null_block);
1111 
1112  // Src slot is not null, update dst_slot
1113  builder.SetInsertPoint(src_not_null_block);
1114  Value* dst_ptr =
1115  builder.CreateStructGEP(agg_tuple_arg, slot_desc->field_idx(), "dst_slot_ptr");
1116  Value* result = NULL;
1117 
1118  if (slot_desc->is_nullable()) {
1119  // Dst is NULL, just update dst slot to src slot and clear null bit
1120  Function* clear_null_fn = slot_desc->CodegenUpdateNull(codegen, tuple_struct, false);
1121  builder.CreateCall(clear_null_fn, agg_tuple_arg);
1122  }
1123 
1124  // Update the slot
1125  Value* dst_value = builder.CreateLoad(dst_ptr, "dst_val");
1126  switch (evaluator->agg_op()) {
1127  case AggFnEvaluator::COUNT:
1128  if (evaluator->is_merge()) {
1129  result = builder.CreateAdd(dst_value, src.GetVal(), "count_sum");
1130  } else {
1131  result = builder.CreateAdd(dst_value,
1132  codegen->GetIntConstant(TYPE_BIGINT, 1), "count_inc");
1133  }
1134  break;
1135  case AggFnEvaluator::MIN: {
1136  Function* min_fn = codegen->CodegenMinMax(slot_desc->type(), true);
1137  Value* min_args[] = { dst_value, src.GetVal() };
1138  result = builder.CreateCall(min_fn, min_args, "min_value");
1139  break;
1140  }
1141  case AggFnEvaluator::MAX: {
1142  Function* max_fn = codegen->CodegenMinMax(slot_desc->type(), false);
1143  Value* max_args[] = { dst_value, src.GetVal() };
1144  result = builder.CreateCall(max_fn, max_args, "max_value");
1145  break;
1146  }
1147  case AggFnEvaluator::SUM:
1148  if (slot_desc->type().type != TYPE_DECIMAL) {
1149  if (slot_desc->type().type == TYPE_FLOAT ||
1150  slot_desc->type().type == TYPE_DOUBLE) {
1151  result = builder.CreateFAdd(dst_value, src.GetVal());
1152  } else {
1153  result = builder.CreateAdd(dst_value, src.GetVal());
1154  }
1155  break;
1156  }
1157  DCHECK_EQ(slot_desc->type().type, TYPE_DECIMAL);
1158  // Fall through to xcompiled case
1159  case AggFnEvaluator::AVG:
1160  case AggFnEvaluator::NDV: {
1161  // Get xcompiled update/merge function from IR module
1162  const string& symbol = evaluator->is_merge() ?
1163  evaluator->merge_symbol() : evaluator->update_symbol();
1164  Function* ir_fn = codegen->module()->getFunction(symbol);
1165  DCHECK_NOTNULL(ir_fn);
1166 
1167  // Create pointer to src to pass to ir_fn. We must use the unlowered type.
1168  Value* src_lowered_ptr = codegen->CreateEntryBlockAlloca(
1169  fn, LlvmCodeGen::NamedVariable("src_lowered_ptr", src.value()->getType()));
1170  builder.CreateStore(src.value(), src_lowered_ptr);
1171  Type* unlowered_ptr_type =
1172  CodegenAnyVal::GetUnloweredPtrType(codegen, input_expr->type());
1173  Value* src_unlowered_ptr =
1174  builder.CreateBitCast(src_lowered_ptr, unlowered_ptr_type, "src_unlowered_ptr");
1175 
1176  // Create intermediate argument 'dst' from 'dst_value'
1177  const ColumnType& dst_type = evaluator->intermediate_type();
1179  codegen, &builder, dst_type, "dst");
1180  dst.SetFromRawValue(dst_value);
1181  // Create pointer to dst to pass to ir_fn. We must use the unlowered type.
1182  Value* dst_lowered_ptr = codegen->CreateEntryBlockAlloca(
1183  fn, LlvmCodeGen::NamedVariable("dst_lowered_ptr", dst.value()->getType()));
1184  builder.CreateStore(dst.value(), dst_lowered_ptr);
1185  unlowered_ptr_type = CodegenAnyVal::GetUnloweredPtrType(codegen, dst_type);
1186  Value* dst_unlowered_ptr =
1187  builder.CreateBitCast(dst_lowered_ptr, unlowered_ptr_type, "dst_unlowered_ptr");
1188 
1189  // Call 'ir_fn'
1190  builder.CreateCall3(ir_fn, fn_ctx_arg, src_unlowered_ptr, dst_unlowered_ptr);
1191 
1192  // Convert StringVal intermediate 'dst_arg' back to StringValue
1193  Value* anyval_result = builder.CreateLoad(dst_lowered_ptr, "anyval_result");
1194  result = CodegenAnyVal(codegen, &builder, dst_type, anyval_result).ToNativeValue();
1195  break;
1196  }
1197  default:
1198  DCHECK(false) << "bad aggregate operator: " << evaluator->agg_op();
1199  }
1200 
1201  builder.CreateStore(result, dst_ptr);
1202  builder.CreateBr(ret_block);
1203 
1204  builder.SetInsertPoint(ret_block);
1205  builder.CreateRetVoid();
1206 
1207  return codegen->FinalizeFunction(fn);
1208 }
1209 
1210 // IR codegen for the UpdateTuple loop. This loop is query specific and based on the
1211 // aggregate functions. The function signature must match the non- codegen'd UpdateTuple
1212 // exactly.
1213 // For the query:
1214 // select count(*), count(int_col), sum(double_col) the IR looks like:
1215 //
1216 
1217 // ; Function Attrs: alwaysinline
1218 // define void @UpdateTuple(%"class.impala::PartitionedAggregationNode"* %this_ptr,
1219 // %"class.impala_udf::FunctionContext"** %agg_fn_ctxs,
1220 // %"class.impala::Tuple"* %tuple,
1221 // %"class.impala::TupleRow"* %row,
1222 // i1 %is_merge) #20 {
1223 // entry:
1224 // %tuple1 = bitcast %"class.impala::Tuple"* %tuple to { i8, i64, i64, double }*
1225 // %src_slot = getelementptr inbounds { i8, i64, i64, double }* %tuple1, i32 0, i32 1
1226 // %count_star_val = load i64* %src_slot
1227 // %count_star_inc = add i64 %count_star_val, 1
1228 // store i64 %count_star_inc, i64* %src_slot
1229 // %0 = getelementptr %"class.impala_udf::FunctionContext"** %agg_fn_ctxs, i32 1
1230 // %fn_ctx = load %"class.impala_udf::FunctionContext"** %0
1231 // call void @UpdateSlot(%"class.impala_udf::FunctionContext"* %fn_ctx,
1232 // { i8, i64, i64, double }* %tuple1,
1233 // %"class.impala::TupleRow"* %row)
1234 // %1 = getelementptr %"class.impala_udf::FunctionContext"** %agg_fn_ctxs, i32 2
1235 // %fn_ctx2 = load %"class.impala_udf::FunctionContext"** %1
1236 // call void @UpdateSlot5(%"class.impala_udf::FunctionContext"* %fn_ctx2,
1237 // { i8, i64, i64, double }* %tuple1,
1238 // %"class.impala::TupleRow"* %row)
1239 // ret void
1240 // }
1242  LlvmCodeGen* codegen;
1243  if (!state_->GetCodegen(&codegen).ok()) return NULL;
1244  SCOPED_TIMER(codegen->codegen_timer());
1245 
1246  int j = probe_expr_ctxs_.size();
1247  for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++j) {
1248  // skip non-materialized slots; we don't have evaluators instantiated for those
1249  while (!intermediate_tuple_desc_->slots()[j]->is_materialized()) {
1250  DCHECK_LT(j, intermediate_tuple_desc_->slots().size() - 1);
1251  ++j;
1252  }
1253  SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j];
1254  AggFnEvaluator* evaluator = aggregate_evaluators_[i];
1255 
1256  // Don't codegen things that aren't builtins (for now)
1257  if (!evaluator->is_builtin()) return NULL;
1258 
1259  bool supported = true;
1260  AggFnEvaluator::AggregationOp op = evaluator->agg_op();
1261  PrimitiveType type = slot_desc->type().type;
1262  // Char and timestamp intermediates aren't supported
1263  if (type == TYPE_TIMESTAMP || type == TYPE_CHAR) supported = false;
1264  // Only AVG and NDV support string intermediates
1265  if ((type == TYPE_STRING || type == TYPE_VARCHAR) &&
1266  !(op == AggFnEvaluator::AVG || op == AggFnEvaluator::NDV)) {
1267  supported = false;
1268  }
1269  // Only SUM, AVG, and NDV support decimal intermediates
1270  if (type == TYPE_DECIMAL &&
1271  !(op == AggFnEvaluator::SUM || op == AggFnEvaluator::AVG ||
1272  op == AggFnEvaluator::NDV)) {
1273  supported = false;
1274  }
1275  if (!supported) {
1276  VLOG_QUERY << "Could not codegen UpdateTuple because intermediate type "
1277  << slot_desc->type()
1278  << " is not yet supported for aggregate function \""
1279  << evaluator->fn_name() << "()\"";
1280  return NULL;
1281  }
1282  }
1283 
1284  if (intermediate_tuple_desc_->GenerateLlvmStruct(codegen) == NULL) {
1285  VLOG_QUERY << "Could not codegen UpdateTuple because we could"
1286  << "not generate a matching llvm struct for the intermediate tuple.";
1287  return NULL;
1288  }
1289 
1290  // Get the types to match the UpdateTuple signature
1291  Type* agg_node_type = codegen->GetType(PartitionedAggregationNode::LLVM_CLASS_NAME);
1292  Type* fn_ctx_type = codegen->GetType(FunctionContextImpl::LLVM_FUNCTIONCONTEXT_NAME);
1293  Type* tuple_type = codegen->GetType(Tuple::LLVM_CLASS_NAME);
1294  Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
1295 
1296  PointerType* agg_node_ptr_type = agg_node_type->getPointerTo();
1297  PointerType* fn_ctx_ptr_ptr_type = fn_ctx_type->getPointerTo()->getPointerTo();
1298  PointerType* tuple_ptr_type = tuple_type->getPointerTo();
1299  PointerType* tuple_row_ptr_type = tuple_row_type->getPointerTo();
1300 
1301  StructType* tuple_struct = intermediate_tuple_desc_->GenerateLlvmStruct(codegen);
1302  PointerType* tuple_ptr = PointerType::get(tuple_struct, 0);
1303  LlvmCodeGen::FnPrototype prototype(codegen, "UpdateTuple", codegen->void_type());
1304  prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", agg_node_ptr_type));
1305  prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_ctxs", fn_ctx_ptr_ptr_type));
1306  prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_ptr_type));
1307  prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
1308  prototype.AddArgument(LlvmCodeGen::NamedVariable("is_merge", codegen->boolean_type()));
1309 
1310  LlvmCodeGen::LlvmBuilder builder(codegen->context());
1311  Value* args[5];
1312  Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
1313 
1314  Value* agg_fn_ctxs_arg = args[1];
1315  Value* tuple_arg = args[2];
1316  Value* row_arg = args[3];
1317 
1318  // Cast the parameter types to the internal llvm runtime types.
1319  // TODO: get rid of this by using right type in function signature
1320  tuple_arg = builder.CreateBitCast(tuple_arg, tuple_ptr, "tuple");
1321 
1322  // Loop over each expr and generate the IR for that slot. If the expr is not
1323  // count(*), generate a helper IR function to update the slot and call that.
1324  j = probe_expr_ctxs_.size();
1325  for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++j) {
1326  // skip non-materialized slots; we don't have evaluators instantiated for those
1327  while (!intermediate_tuple_desc_->slots()[j]->is_materialized()) {
1328  DCHECK_LT(j, intermediate_tuple_desc_->slots().size() - 1);
1329  ++j;
1330  }
1331  SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j];
1332  AggFnEvaluator* evaluator = aggregate_evaluators_[i];
1333  if (evaluator->is_count_star()) {
1334  // TODO: we should be able to hoist this up to the loop over the batch and just
1335  // increment the slot by the number of rows in the batch.
1336  int field_idx = slot_desc->field_idx();
1337  Value* const_one = codegen->GetIntConstant(TYPE_BIGINT, 1);
1338  Value* slot_ptr = builder.CreateStructGEP(tuple_arg, field_idx, "src_slot");
1339  Value* slot_loaded = builder.CreateLoad(slot_ptr, "count_star_val");
1340  Value* count_inc = builder.CreateAdd(slot_loaded, const_one, "count_star_inc");
1341  builder.CreateStore(count_inc, slot_ptr);
1342  } else {
1343  Function* update_slot_fn = CodegenUpdateSlot(evaluator, slot_desc);
1344  if (update_slot_fn == NULL) return NULL;
1345  Value* fn_ctx_ptr = builder.CreateConstGEP1_32(agg_fn_ctxs_arg, i);
1346  Value* fn_ctx = builder.CreateLoad(fn_ctx_ptr, "fn_ctx");
1347  builder.CreateCall3(update_slot_fn, fn_ctx, tuple_arg, row_arg);
1348  }
1349  }
1350  builder.CreateRetVoid();
1351 
1352  // CodegenProcessBatch() does the final optimizations.
1353  return codegen->FinalizeFunction(fn);
1354 }
1355 
1357  LlvmCodeGen* codegen;
1358  if (!state_->GetCodegen(&codegen).ok()) return NULL;
1359  SCOPED_TIMER(codegen->codegen_timer());
1360 
1361  Function* update_tuple_fn = CodegenUpdateTuple();
1362  if (update_tuple_fn == NULL) return NULL;
1363 
1364  // Get the cross compiled update row batch function
1365  IRFunction::Type ir_fn = (!probe_expr_ctxs_.empty() ?
1366  IRFunction::PART_AGG_NODE_PROCESS_BATCH_FALSE :
1367  IRFunction::PART_AGG_NODE_PROCESS_BATCH_NO_GROUPING);
1368  Function* process_batch_fn = codegen->GetFunction(ir_fn);
1369  DCHECK(process_batch_fn != NULL);
1370 
1371  int replaced = 0;
1372  if (!probe_expr_ctxs_.empty()) {
1373  // Aggregation w/o grouping does not use a hash table.
1374 
1375  // Codegen for hash
1376  // The codegen'd ProcessBatch function is only used in Open() with level_ = 0,
1377  // so don't use murmur hash
1378  Function* hash_fn = ht_ctx_->CodegenHashCurrentRow(state_, /* use murmur */ false);
1379  if (hash_fn == NULL) return NULL;
1380 
1381  // Codegen HashTable::Equals
1382  Function* equals_fn = ht_ctx_->CodegenEquals(state_);
1383  if (equals_fn == NULL) return NULL;
1384 
1385  // Codegen for evaluating probe rows
1386  Function* eval_probe_row_fn = ht_ctx_->CodegenEvalRow(state_, false);
1387  if (eval_probe_row_fn == NULL) return NULL;
1388 
1389  // Replace call sites
1390  process_batch_fn = codegen->ReplaceCallSites(process_batch_fn, false,
1391  eval_probe_row_fn, "EvalProbeRow", &replaced);
1392  DCHECK_EQ(replaced, 1);
1393 
1394  process_batch_fn = codegen->ReplaceCallSites(process_batch_fn, true,
1395  hash_fn, "HashCurrentRow", &replaced);
1396  DCHECK_EQ(replaced, 1);
1397 
1398  process_batch_fn = codegen->ReplaceCallSites(process_batch_fn, true,
1399  equals_fn, "Equals", &replaced);
1400  DCHECK_EQ(replaced, 3);
1401  }
1402 
1403  process_batch_fn = codegen->ReplaceCallSites(process_batch_fn, false,
1404  update_tuple_fn, "UpdateTuple", &replaced);
1405  DCHECK_GE(replaced, 1);
1406  DCHECK(process_batch_fn != NULL);
1407  return codegen->OptimizeFunctionWithExprs(process_batch_fn);
1408 }
1409 
1410 }
stl-like iterator interface.
Definition: hash-table.h:450
void SetFromRawValue(llvm::Value *raw_val)
int id() const
Definition: exec-node.h:154
The underlying memory management is done by the BufferedBlockMgr.
void AddRuntimeExecOption(const std::string &option)
Appends option to 'runtime_exec_options_'.
Definition: exec-node.cc:188
bool AtEnd() const
Returns true if this iterator is at the end, i.e. GetRow() cannot be called.
Definition: hash-table.h:492
const std::string GetDetail() const
Definition: status.cc:184
std::list< Partition * > spilled_partitions_
All partitions that have been spilled and need further processing.
static CodegenAnyVal CreateCallWrapped(LlvmCodeGen *cg, LlvmCodeGen::LlvmBuilder *builder, const ColumnType &type, llvm::Function *fn, llvm::ArrayRef< llvm::Value * > args, const char *name="", llvm::Value *result_ptr=NULL)
Same as above but wraps the result in a CodegenAnyVal.
int num_reserved_buffers_remaining(Client *client) const
void SetNull(const NullIndicatorOffset &offset)
Definition: tuple.h:101
RuntimeProfile::Counter * codegen_timer()
Definition: llvm-codegen.h:135
const ColumnType & intermediate_type() const
bool needs_serialize_
Contains any evaluators that require the serialize step.
int64_t num_rows_returned_
Definition: exec-node.h:223
Tuple * GetTuple(int tuple_idx)
Definition: tuple-row.h:30
RuntimeProfile::Counter * partitions_created_
Total number of partitions created.
void Close()
Must be called once at the end to cleanup all resources. Idempotent.
Tuple * ConstructIntermediateTuple(const std::vector< impala_udf::FunctionContext * > &agg_fn_ctxs, MemPool *pool, BufferedTupleStream *stream)
BufferedBlockMgr * block_mgr()
MemTracker * mem_tracker()
Definition: exec-node.h:162
Status MemLimitTooLowError(Client *client)
Tuple * GetOutputTuple(const std::vector< impala_udf::FunctionContext * > &agg_fn_ctxs, Tuple *tuple, MemPool *pool)
RuntimeProfile::Counter * num_hash_buckets_
Total number of hash buckets across all partitions.
boost::scoped_ptr< BufferedTupleStream > unaggregated_row_stream
Unaggregated rows that are spilled.
Utility struct that wraps a variable name and llvm type.
Definition: llvm-codegen.h:149
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
void * SetToMin(const ColumnType &type)
Sets the value for type to min and returns a pointer to the data.
Definition: expr-value.h:103
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
std::string DebugString() const
virtual Status Init(const TPlanNode &tnode)
Definition: exec-node.cc:124
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
boost::scoped_ptr< RowDescriptor > intermediate_row_desc_
Row with the intermediate tuple as its only tuple.
const std::string & merge_symbol() const
void Init(int size)
Definition: tuple.h:58
The materialized value returned by ExprContext::GetValue().
Definition: expr-value.h:25
int num_pinned_buffers(Client *client) const
#define ADD_TIMER(profile, name)
RuntimeProfile::Counter * num_spilled_partitions_
Number of partitions that have been spilled.
Status RegisterClient(int num_reserved_buffers, MemTracker *tracker, RuntimeState *state, Client **client)
boost::scoped_ptr< BufferedTupleStream > serialize_stream_
void CleanupHashTbl(const std::vector< impala_udf::FunctionContext * > &fn_ctxs, HashTable::Iterator it)
Calls finalizes on all tuples starting at 'it'.
bool AtCapacity()
Definition: row-batch.h:120
llvm::Value * ToNativeValue()
void * GetSlot(int offset)
Definition: tuple.h:118
std::vector< impala_udf::FunctionContext * > agg_fn_ctxs_
const std::vector< SlotDescriptor * > & slots() const
Definition: descriptors.h:302
static const int PARTITION_FANOUT
Number of initial partitions to create. Must be a power of 2.
int byte_size() const
Definition: descriptors.h:300
llvm::Type * boolean_type()
Simple wrappers to reduce code verbosity.
Definition: llvm-codegen.h:385
static Status Create(ObjectPool *pool, const TExpr &desc, AggFnEvaluator **result)
void AddDetail(const std::string &msg)
Add a detail string. Calling this method is only defined on a non-OK message.
Definition: status.cc:166
llvm::StructType * GenerateLlvmStruct(LlvmCodeGen *codegen)
Definition: descriptors.cc:556
const RowDescriptor & row_desc() const
Definition: exec-node.h:156
static Tuple * Create(int size, MemPool *pool)
initialize individual tuple with data residing in mem pool
Definition: tuple.h:51
static llvm::Type * GetUnloweredPtrType(LlvmCodeGen *cg, const ColumnType &type)
#define COUNTER_ADD(c, v)
const NullIndicatorOffset & null_indicator_offset() const
Definition: descriptors.h:89
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Definition: exec-node.cc:378
bool ReachedLimit()
Definition: exec-node.h:159
void * SetToMax(const ColumnType &type)
Sets the value for type to max and returns a pointer to the data.
Definition: expr-value.h:137
AggregationOp agg_op() const
TupleDescriptor * GetTupleDescriptor(TupleId id) const
Definition: descriptors.cc:437
#define SCOPED_TIMER(c)
RuntimeProfile::HighWaterMarkCounter * largest_partition_percent_
static CodegenAnyVal GetNonNullVal(LlvmCodeGen *codegen, LlvmCodeGen::LlvmBuilder *builder, const ColumnType &type, const char *name="")
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
RuntimeProfile::Counter * num_row_repartitioned_
Number of rows that have been repartitioned.
virtual Status Init(const TPlanNode &tnode)
bool IsNull(const NullIndicatorOffset &offset) const
Definition: tuple.h:112
Status SpillPartition(Partition *curr_partition=NULL, Tuple *curr_intermediate_tuple=NULL)
TPlanNodeType::type type() const
Definition: exec-node.h:155
LLVM code generator. This is the top level object to generate jitted code.
Definition: llvm-codegen.h:107
bool InitHashTable()
Initializes the hash table. Returns false on OOM.
void ClearReservations(Client *client)
Clears all reservations for this client.
bool is_nullable() const
Definition: descriptors.h:93
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
Definition: exec-node.cc:345
MemTracker * expr_mem_tracker()
Definition: exec-node.h:163
static const char * LLVM_CLASS_NAME
Definition: tuple-row.h:76
#define VLOG_QUERY
Definition: logging.h:57
static const char * LLVM_CLASS_NAME
Definition: expr-context.h:126
PrimitiveType type
Definition: types.h:60
void AddArgument(const NamedVariable &var)
Add argument.
Definition: llvm-codegen.h:171
static const char * LLVM_CLASS_NAME
For C++/IR interop, we need to be able to look up types by name.
Definition: tuple.h:134
void Serialize(FunctionContext *agg_fn_ctx, Tuple *dst)
const ColumnType & type() const
Definition: descriptors.h:78
ObjectPool * obj_pool() const
Definition: runtime-state.h:92
TupleId intermediate_tuple_id_
Tuple into which Update()/Merge()/Serialize() results are stored.
static const char * LLVM_FUNCTIONCONTEXT_NAME
Definition: udf-internal.h:93
PrimitiveType
Definition: types.h:27
#define RETURN_IF_CANCELLED(state)
ObjectPool pool
void Init(FunctionContext *agg_fn_ctx, Tuple *dst)
Functions for different phases of the aggregation.
virtual Status Prepare(RuntimeState *state)
Definition: exec-node.cc:130
#define ADD_COUNTER(profile, name, unit)
uint32_t fragment_hash_seed() const
llvm::Function * GetFunction(IRFunction::Type)
void IR_ALWAYS_INLINE Next()
Iterates to the next element. It should be called only if !AtEnd().
void MarkNeedToReturn()
Definition: row-batch.h:167
void AddFunctionToJit(llvm::Function *fn, void **fn_ptr)
llvm::Function * CodegenUpdateNull(LlvmCodeGen *, llvm::StructType *tuple, bool set_null)
Definition: descriptors.cc:510
static void Write(const void *value, Tuple *tuple, const SlotDescriptor *slot_desc, MemPool *pool)
Definition: raw-value.cc:303
void AddExprCtxsToFree(const std::vector< ExprContext * > &ctxs)
virtual Status QueryMaintenance(RuntimeState *state)
Definition: exec-node.cc:401
This is the superclass of all expr evaluation nodes.
Definition: expr.h:116
virtual void Close(RuntimeState *state)
RuntimeProfile::HighWaterMarkCounter * max_partition_level_
Level of max partition (i.e. number of repartitioning steps).
#define VLOG_ROW
Definition: logging.h:59
const std::string & fn_name() const
const DescriptorTbl & desc_tbl() const
Definition: runtime-state.h:93
bool is_closed()
Definition: exec-node.h:242
void CommitLastRow()
Definition: row-batch.h:109
bool using_small_buffers_
If true, the partitions in hash_partitions_ are using small buffers.
virtual Status Reset(RuntimeState *state)
std::vector< ExecNode * > children_
Definition: exec-node.h:214
std::vector< Partition * > hash_partitions_
Current partitions we are partitioning into.
RuntimeProfile::Counter * build_timer_
Time spent processing the child rows.
int64_t rows_returned() const
Definition: exec-node.h:157
#define COUNTER_SET(c, v)
uint64_t count
int batch_size() const
Definition: runtime-state.h:98
llvm::Function * CodegenUpdateTuple()
Codegen UpdateTuple(). Returns NULL if codegen is unsuccessful.
MemPool * tuple_data_pool()
Definition: row-batch.h:148
RuntimeProfile::Counter * rows_returned_counter_
Definition: exec-node.h:226
static const Status MEM_LIMIT_EXCEEDED
Definition: status.h:89
ExecNode * child(int i)
Definition: exec-node.h:241
Status SetMemLimitExceeded(MemTracker *tracker=NULL, int64_t failed_allocation_size=0)
RuntimeProfile::Counter * num_repartitions_
Number of partitions that have been repartitioned.
const std::string & update_symbol() const
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
const ColumnType & type() const
Definition: expr.h:145
llvm::Value * value()
Returns the current type-lowered value.
void SetErrorMsg(const ErrorMsg &m)
Definition: status.h:197
static int64_t NextPowerOfTwo(int64_t v)
Definition: bit-util.h:50
#define UNLIKELY(expr)
Definition: compiler-util.h:33
bool codegen_enabled() const
Returns true if codegen is enabled for this query.
bool AddRow(TupleRow *row, uint8_t **dst=NULL)
HighWaterMarkCounter * AddHighWaterMarkCounter(const std::string &name, TUnit::type unit, const std::string &parent_counter_name="")
Reference to a single slot of a tuple.
Definition: slot-ref.h:23
std::vector< impala_udf::FunctionContext * > agg_fn_ctxs
Clone of parent's agg_fn_ctxs_ and backing MemPool.
ProcessRowBatchFn process_row_batch_fn_
Jitted ProcessRowBatch function pointer. Null if codegen is disabled.
llvm::Value * GetVal(const char *name="val")
Status PrepareForRead(bool *got_buffer=NULL)
static const Status OK
Definition: status.h:87
ObjectPool * pool_
Definition: exec-node.h:211
boost::scoped_ptr< BufferedTupleStream > aggregated_row_stream
virtual Status GetCodegendComputeFn(RuntimeState *state, llvm::Function **fn)=0
llvm::Type * GetType(const ColumnType &type)
Returns llvm type for the column type.
Status GetCodegen(LlvmCodeGen **codegen, bool initialize=true)
int64_t num_rows() const
Number of rows in the stream.
RuntimeProfile::Counter * get_results_timer_
Time spent returning the aggregated rows.
int tuple_offset() const
Definition: descriptors.h:88
int field_idx() const
Returns the field index in the generated llvm struct for this slot's tuple.
Definition: descriptors.h:87
llvm::Value * GetIsNull(const char *name="is_null")
Gets the 'is_null' field of the *Val.
llvm::Value * GetIntConstant(PrimitiveType type, int64_t val)
Returns the constant 'val' of 'type'.
virtual Status QueryMaintenance(RuntimeState *state)
Frees local allocations from aggregate_evaluators_ and agg_fn_ctxs.
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
Definition: expr.cc:149
llvm::Function * FinalizeFunction(llvm::Function *function)
Status ProcessBatchNoGrouping(RowBatch *batch, HashTableCtx *ht_ctx=NULL)
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
Definition: exec-node.cc:393
bool is_materialized() const
Definition: descriptors.h:92
virtual Status Prepare(RuntimeState *state)
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
#define VLOG_ROW_IS_ON
Definition: logging.h:66
std::vector< AggFnEvaluator * > aggregate_evaluators_
virtual Status Open(RuntimeState *state)
Definition: exec-node.cc:154
std::vector< ExprContext * > probe_expr_ctxs_
Exprs used to evaluate input rows.
llvm::Function * ReplaceCallSites(llvm::Function *caller, bool update_in_place, llvm::Function *new_fn, const std::string &target_name, int *num_replaced)
string PrintRow(TupleRow *row, const RowDescriptor &d)
Definition: debug-util.cc:192
bool ok() const
Definition: status.h:172
llvm::Type * void_type()
Definition: llvm-codegen.h:394
Status ProcessStream(BufferedTupleStream *input_stream)
Reads all the rows from input_stream and process them by calling ProcessBatch().
std::vector< ExprContext * > conjunct_ctxs_
Definition: exec-node.h:212
virtual void Close(RuntimeState *state)
Definition: exec-node.cc:166
llvm::LLVMContext & context()
Definition: llvm-codegen.h:214
Status GetNext(RowBatch *batch, bool *eos, std::vector< RowIdx > *indices=NULL)
virtual std::string DebugString() const
Definition: expr.cc:385
llvm::Function * OptimizeFunctionWithExprs(llvm::Function *fn)
void UpdateTuple(impala_udf::FunctionContext **agg_fn_ctxs, Tuple *tuple, TupleRow *row, bool is_merge=false)
virtual Status Open(RuntimeState *state)
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
boost::scoped_ptr< HashTableCtx > ht_ctx_
llvm::Function * CodegenUpdateSlot(AggFnEvaluator *evaluator, SlotDescriptor *slot_desc)
RuntimeProfile::Counter * ht_resize_timer_
Total time spent resizing hash tables.
PartitionedAggregationNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161
void Finalize(FunctionContext *agg_fn_ctx, Tuple *src, Tuple *dst)
const std::vector< ExprContext * > & input_expr_ctxs() const