Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
aggregation-node.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/aggregation-node.h"
16 
17 #include <math.h>
18 #include <sstream>
19 #include <boost/functional/hash.hpp>
20 #include <thrift/protocol/TDebugProtocol.h>
21 
22 #include <x86intrin.h>
23 
24 #include "codegen/codegen-anyval.h"
25 #include "codegen/llvm-codegen.h"
27 #include "exprs/agg-fn-evaluator.h"
28 #include "exprs/expr.h"
29 #include "exprs/expr-context.h"
30 #include "exprs/slot-ref.h"
31 #include "runtime/descriptors.h"
32 #include "runtime/mem-pool.h"
33 #include "runtime/raw-value.h"
34 #include "runtime/row-batch.h"
35 #include "runtime/runtime-state.h"
37 #include "runtime/tuple.h"
38 #include "runtime/tuple-row.h"
39 #include "udf/udf-internal.h"
40 #include "util/debug-util.h"
41 #include "util/runtime-profile.h"
42 
43 #include "gen-cpp/Exprs_types.h"
44 #include "gen-cpp/PlanNodes_types.h"
45 
46 #include "common/names.h"
47 
48 using namespace impala;
49 using namespace llvm;
50 
51 namespace impala {
52 
53 const char* AggregationNode::LLVM_CLASS_NAME = "class.impala::AggregationNode";
54 
55 // TODO: pass in maximum size; enforce by setting limit in mempool
57  const DescriptorTbl& descs)
58  : ExecNode(pool, tnode, descs),
59  intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
60  intermediate_tuple_desc_(NULL),
61  output_tuple_id_(tnode.agg_node.output_tuple_id),
62  output_tuple_desc_(NULL),
63  singleton_intermediate_tuple_(NULL),
64  codegen_process_row_batch_fn_(NULL),
65  process_row_batch_fn_(NULL),
66  needs_finalize_(tnode.agg_node.need_finalize),
67  build_timer_(NULL),
68  get_results_timer_(NULL),
69  hash_table_buckets_counter_(NULL) {
70 }
71 
72 Status AggregationNode::Init(const TPlanNode& tnode) {
75  Expr::CreateExprTrees(pool_, tnode.agg_node.grouping_exprs, &probe_expr_ctxs_));
76  for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) {
77  AggFnEvaluator* evaluator;
79  pool_, tnode.agg_node.aggregate_functions[i], &evaluator));
80  aggregate_evaluators_.push_back(evaluator);
81  }
82  return Status::OK;
83 }
84 
86  SCOPED_TIMER(runtime_profile_->total_time_counter());
88 
89  tuple_pool_.reset(new MemPool(mem_tracker()));
91  build_timer_ = ADD_TIMER(runtime_profile(), "BuildTime");
92  get_results_timer_ = ADD_TIMER(runtime_profile(), "GetResultsTime");
94  ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT);
96  ADD_COUNTER(runtime_profile(), "LoadFactor", TUnit::DOUBLE_VALUE);
97 
101  DCHECK_EQ(intermediate_tuple_desc_->slots().size(),
102  output_tuple_desc_->slots().size());
105 
106  // Construct build exprs from intermediate_agg_tuple_desc_
107  for (int i = 0; i < probe_expr_ctxs_.size(); ++i) {
109  DCHECK(desc->type().type == TYPE_NULL ||
110  desc->type() == probe_expr_ctxs_[i]->root()->type());
111  // TODO: Generate the build exprs in the FE such that the existing logic
112  // for handling NULL_TYPE works.
113  // Hack to avoid TYPE_NULL SlotRefs.
114  Expr* expr = desc->type().type != TYPE_NULL ?
115  new SlotRef(desc) : new SlotRef(desc, TYPE_BOOLEAN);
116  state->obj_pool()->Add(expr);
117  build_expr_ctxs_.push_back(new ExprContext(expr));
118  state->obj_pool()->Add(build_expr_ctxs_.back());
119  }
120  // Construct a new row desc for preparing the build exprs because neither the child's
121  // nor this node's output row desc may contain the intermediate tuple, e.g.,
122  // in a single-node plan with an intermediate tuple different from the output tuple.
123  RowDescriptor build_row_desc(intermediate_tuple_desc_, false);
125  Expr::Prepare(build_expr_ctxs_, state, build_row_desc, expr_mem_tracker()));
126 
127  agg_fn_ctxs_.resize(aggregate_evaluators_.size());
128  int j = probe_expr_ctxs_.size();
129  for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++j) {
130  // skip non-materialized slots; we don't have evaluators instantiated for those
131  while (!intermediate_tuple_desc_->slots()[j]->is_materialized()) {
132  DCHECK_LT(j, intermediate_tuple_desc_->slots().size() - 1)
133  << "#eval= " << aggregate_evaluators_.size()
134  << " #probe=" << probe_expr_ctxs_.size();
135  ++j;
136  }
137  SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j];
138  SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j];
140  intermediate_slot_desc, output_slot_desc, agg_fn_pool_.get(), &agg_fn_ctxs_[i]));
141  state->obj_pool()->Add(agg_fn_ctxs_[i]);
142  }
143 
144  // TODO: how many buckets?
146  true, true, id(), mem_tracker(), true));
147 
148  if (probe_expr_ctxs_.empty()) {
149  // create single intermediate tuple now; we need to output something
150  // even if our input is empty
153  output_iterator_ = hash_tbl_->Begin();
154  }
155 
156  if (state->codegen_enabled()) {
157  LlvmCodeGen* codegen;
158  RETURN_IF_ERROR(state->GetCodegen(&codegen));
159  Function* update_tuple_fn = CodegenUpdateTuple(state);
160  if (update_tuple_fn != NULL) {
162  CodegenProcessRowBatch(state, update_tuple_fn);
163  if (codegen_process_row_batch_fn_ != NULL) {
164  // Update to using codegen'd process row batch.
166  reinterpret_cast<void**>(&process_row_batch_fn_));
167  AddRuntimeExecOption("Codegen Enabled");
168  }
169  }
170  }
171  return Status::OK;
172 }
173 
175  SCOPED_TIMER(runtime_profile_->total_time_counter());
177 
180 
181  DCHECK_EQ(aggregate_evaluators_.size(), agg_fn_ctxs_.size());
182  for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
184  }
185 
186  RETURN_IF_ERROR(children_[0]->Open(state));
187 
188  RowBatch batch(children_[0]->row_desc(), state->batch_size(), mem_tracker());
189  int64_t num_input_rows = 0;
190  while (true) {
191  bool eos;
192  RETURN_IF_CANCELLED(state);
194  RETURN_IF_ERROR(children_[0]->GetNext(state, &batch, &eos));
196 
197  if (VLOG_ROW_IS_ON) {
198  for (int i = 0; i < batch.num_rows(); ++i) {
199  TupleRow* row = batch.GetRow(i);
200  VLOG_ROW << "input row: " << PrintRow(row, children_[0]->row_desc());
201  }
202  }
203  if (process_row_batch_fn_ != NULL) {
204  process_row_batch_fn_(this, &batch);
205  } else if (probe_expr_ctxs_.empty()) {
207  } else {
209  }
212  num_input_rows += batch.num_rows();
213  // We must set output_iterator_ here, rather than outside the loop, because
214  // output_iterator_ must be set if the function returns within the loop
215  output_iterator_ = hash_tbl_->Begin();
216 
217  batch.Reset();
219  if (eos) break;
220  }
221 
222  // We have consumed all of the input from the child and transfered ownership of the
223  // resources we need, so the child can be closed safely to release its resources.
224  child(0)->Close(state);
225  VLOG_FILE << "aggregated " << num_input_rows << " input rows into "
226  << hash_tbl_->size() << " output rows";
227  return Status::OK;
228 }
229 
230 Status AggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
231  SCOPED_TIMER(runtime_profile_->total_time_counter());
232  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
233  RETURN_IF_CANCELLED(state);
236 
237  if (ReachedLimit()) {
238  *eos = true;
239  return Status::OK;
240  }
241  *eos = false;
242  ExprContext** ctxs = &conjunct_ctxs_[0];
243  int num_ctxs = conjunct_ctxs_.size();
244 
245  int count = 0;
246  const int N = state->batch_size();
247  while (!output_iterator_.AtEnd() && !row_batch->AtCapacity()) {
248  // This loop can go on for a long time if the conjuncts are very selective. Do query
249  // maintenance every N iterations.
250  if (count++ % N == 0) {
251  RETURN_IF_CANCELLED(state);
253  }
254  int row_idx = row_batch->AddRow();
255  TupleRow* row = row_batch->GetRow(row_idx);
256  Tuple* intermediate_tuple = output_iterator_.GetTuple();
257  Tuple* output_tuple =
258  FinalizeTuple(intermediate_tuple, row_batch->tuple_data_pool());
259  output_iterator_.Next<false>();
260  row->SetTuple(0, output_tuple);
261  if (ExecNode::EvalConjuncts(ctxs, num_ctxs, row)) {
262  VLOG_ROW << "output row: " << PrintRow(row, row_desc());
263  row_batch->CommitLastRow();
265  if (ReachedLimit()) break;
266  }
267  }
268  *eos = output_iterator_.AtEnd() || ReachedLimit();
270  return Status::OK;
271 }
272 
274  DCHECK(false) << "NYI";
275  return Status("NYI");
276 }
277 
279  if (is_closed()) return;
280 
281  // Iterate through the remaining rows in the hash table and call Serialize/Finalize on
282  // them in order to free any memory allocated by UDAs. Finalize() requires a dst tuple
283  // but we don't actually need the result, so allocate a single dummy tuple to avoid
284  // accumulating memory.
285  Tuple* dummy_dst = NULL;
286  if (needs_finalize_) {
287  dummy_dst = Tuple::Create(output_tuple_desc_->byte_size(), tuple_pool_.get());
288  }
289  while (!output_iterator_.AtEnd()) {
290  Tuple* tuple = output_iterator_.GetTuple();
291  if (needs_finalize_) {
293  } else {
295  }
296  output_iterator_.Next<false>();
297  }
298 
299  if (tuple_pool_.get() != NULL) tuple_pool_->FreeAll();
300  if (hash_tbl_.get() != NULL) hash_tbl_->Close();
301 
302  DCHECK(agg_fn_ctxs_.empty() || aggregate_evaluators_.size() == agg_fn_ctxs_.size());
303  for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
304  aggregate_evaluators_[i]->Close(state);
305  if (!agg_fn_ctxs_.empty()) agg_fn_ctxs_[i]->impl()->Close();
306  }
307  if (agg_fn_pool_.get() != NULL) agg_fn_pool_->FreeAll();
308 
311  ExecNode::Close(state);
312 }
313 
315  Tuple* intermediate_tuple = Tuple::Create(
317  vector<SlotDescriptor*>::const_iterator slot_desc =
318  intermediate_tuple_desc_->slots().begin();
319 
320  // copy grouping values
321  for (int i = 0; i < probe_expr_ctxs_.size(); ++i, ++slot_desc) {
322  if (hash_tbl_->last_expr_value_null(i)) {
323  intermediate_tuple->SetNull((*slot_desc)->null_indicator_offset());
324  } else {
325  void* src = hash_tbl_->last_expr_value(i);
326  void* dst = intermediate_tuple->GetSlot((*slot_desc)->tuple_offset());
327  RawValue::Write(src, dst, (*slot_desc)->type(), tuple_pool_.get());
328  }
329  }
330 
331  // Initialize aggregate output.
332  for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++slot_desc) {
333  while (!(*slot_desc)->is_materialized()) ++slot_desc;
334  AggFnEvaluator* evaluator = aggregate_evaluators_[i];
335  evaluator->Init(agg_fn_ctxs_[i], intermediate_tuple);
336  // Codegen specific path.
337  // To minimize branching on the UpdateTuple path, initialize the result value
338  // so that UpdateTuple doesn't have to check if the aggregation
339  // dst slot is null.
340  // - sum/count: 0
341  // - min: max_value
342  // - max: min_value
343  // TODO: remove when we don't use the irbuilder for codegen here.
344  // This optimization no longer applies with AnyVal
345  if ((*slot_desc)->type().type != TYPE_STRING &&
346  (*slot_desc)->type().type != TYPE_VARCHAR &&
347  (*slot_desc)->type().type != TYPE_TIMESTAMP &&
348  (*slot_desc)->type().type != TYPE_CHAR &&
349  (*slot_desc)->type().type != TYPE_DECIMAL) {
350  ExprValue default_value;
351  void* default_value_ptr = NULL;
352  switch (evaluator->agg_op()) {
353  case AggFnEvaluator::MIN:
354  default_value_ptr = default_value.SetToMax((*slot_desc)->type());
355  RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
356  break;
357  case AggFnEvaluator::MAX:
358  default_value_ptr = default_value.SetToMin((*slot_desc)->type());
359  RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
360  break;
361  default:
362  break;
363  }
364  }
365  }
366  return intermediate_tuple;
367 }
368 
370  DCHECK(tuple != NULL || aggregate_evaluators_.empty());
372 }
373 
375  DCHECK(tuple != NULL || aggregate_evaluators_.empty());
376  Tuple* dst = tuple;
379  }
380  if (needs_finalize_) {
382  } else {
384  }
385  // Copy grouping values from tuple to dst.
386  // TODO: Codegen this.
387  if (dst != tuple) {
388  int num_grouping_slots = probe_expr_ctxs_.size();
389  for (int i = 0; i < num_grouping_slots; ++i) {
390  SlotDescriptor* src_slot_desc = intermediate_tuple_desc_->slots()[i];
391  SlotDescriptor* dst_slot_desc = output_tuple_desc_->slots()[i];
392  bool src_slot_null = tuple->IsNull(src_slot_desc->null_indicator_offset());
393  void* src_slot = NULL;
394  if (!src_slot_null) src_slot = tuple->GetSlot(src_slot_desc->tuple_offset());
395  RawValue::Write(src_slot, dst, dst_slot_desc, NULL);
396  }
397  }
398  return dst;
399 }
400 
401 void AggregationNode::DebugString(int indentation_level, stringstream* out) const {
402  *out << string(indentation_level * 2, ' ');
403  *out << "AggregationNode("
404  << "intermediate_tuple_id=" << intermediate_tuple_id_
405  << " output_tuple_id=" << output_tuple_id_
406  << " needs_finalize=" << needs_finalize_
407  << " probe_exprs=" << Expr::DebugString(probe_expr_ctxs_)
409  ExecNode::DebugString(indentation_level, out);
410  *out << ")";
411 }
412 
413 IRFunction::Type GetHllUpdateFunction2(const ColumnType& type) {
414  switch (type.type) {
415  case TYPE_BOOLEAN: return IRFunction::HLL_UPDATE_BOOLEAN;
416  case TYPE_TINYINT: return IRFunction::HLL_UPDATE_TINYINT;
417  case TYPE_SMALLINT: return IRFunction::HLL_UPDATE_SMALLINT;
418  case TYPE_INT: return IRFunction::HLL_UPDATE_INT;
419  case TYPE_BIGINT: return IRFunction::HLL_UPDATE_BIGINT;
420  case TYPE_FLOAT: return IRFunction::HLL_UPDATE_FLOAT;
421  case TYPE_DOUBLE: return IRFunction::HLL_UPDATE_DOUBLE;
422  case TYPE_STRING: return IRFunction::HLL_UPDATE_STRING;
423  case TYPE_DECIMAL: return IRFunction::HLL_UPDATE_DECIMAL;
424  default:
425  DCHECK(false) << "Unsupported type: " << type;
426  return IRFunction::FN_END;
427  }
428 }
429 
430 // IR Generation for updating a single aggregation slot. Signature is:
431 // void UpdateSlot(FunctionContext* fn_ctx, AggTuple* agg_tuple, char** row)
432 //
433 // The IR for sum(double_col) is:
434 // define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %fn_ctx,
435 // { i8, double }* %agg_tuple,
436 // %"class.impala::TupleRow"* %row) #20 {
437 // entry:
438 // %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* inttoptr
439 // (i64 128241264 to %"class.impala::ExprContext"*), %"class.impala::TupleRow"* %row)
440 // %0 = extractvalue { i8, double } %src, 0
441 // %is_null = trunc i8 %0 to i1
442 // br i1 %is_null, label %ret, label %src_not_null
443 //
444 // src_not_null: ; preds = %entry
445 // %dst_slot_ptr = getelementptr inbounds { i8, double }* %agg_tuple, i32 0, i32 1
446 // call void @SetNotNull({ i8, double }* %agg_tuple)
447 // %dst_val = load double* %dst_slot_ptr
448 // %val = extractvalue { i8, double } %src, 1
449 // %1 = fadd double %dst_val, %val
450 // store double %1, double* %dst_slot_ptr
451 // br label %ret
452 //
453 // ret: ; preds = %src_not_null, %entry
454 // ret void
455 // }
456 //
457 // The IR for ndv(double_col) is:
458 // define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %fn_ctx,
459 // { i8, %"struct.impala::StringValue" }* %agg_tuple,
460 // %"class.impala::TupleRow"* %row) #20 {
461 // entry:
462 // %dst_lowered_ptr = alloca { i64, i8* }
463 // %src_lowered_ptr = alloca { i8, double }
464 // %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* inttoptr
465 // (i64 120530832 to %"class.impala::ExprContext"*), %"class.impala::TupleRow"* %row)
466 // %0 = extractvalue { i8, double } %src, 0
467 // %is_null = trunc i8 %0 to i1
468 // br i1 %is_null, label %ret, label %src_not_null
469 //
470 // src_not_null: ; preds = %entry
471 // %dst_slot_ptr = getelementptr inbounds
472 // { i8, %"struct.impala::StringValue" }* %agg_tuple, i32 0, i32 1
473 // call void @SetNotNull({ i8, %"struct.impala::StringValue" }* %agg_tuple)
474 // %dst_val = load %"struct.impala::StringValue"* %dst_slot_ptr
475 // store { i8, double } %src, { i8, double }* %src_lowered_ptr
476 // %src_unlowered_ptr = bitcast { i8, double }* %src_lowered_ptr
477 // to %"struct.impala_udf::DoubleVal"*
478 // %ptr = extractvalue %"struct.impala::StringValue" %dst_val, 0
479 // %dst_stringval = insertvalue { i64, i8* } zeroinitializer, i8* %ptr, 1
480 // %len = extractvalue %"struct.impala::StringValue" %dst_val, 1
481 // %1 = extractvalue { i64, i8* } %dst_stringval, 0
482 // %2 = zext i32 %len to i64
483 // %3 = shl i64 %2, 32
484 // %4 = and i64 %1, 4294967295
485 // %5 = or i64 %4, %3
486 // %dst_stringval1 = insertvalue { i64, i8* } %dst_stringval, i64 %5, 0
487 // store { i64, i8* } %dst_stringval1, { i64, i8* }* %dst_lowered_ptr
488 // %dst_unlowered_ptr = bitcast { i64, i8* }* %dst_lowered_ptr
489 // to %"struct.impala_udf::StringVal"*
490 // call void @HllUpdate(%"class.impala_udf::FunctionContext"* %fn_ctx,
491 // %"struct.impala_udf::DoubleVal"* %src_unlowered_ptr,
492 // %"struct.impala_udf::StringVal"* %dst_unlowered_ptr)
493 // %anyval_result = load { i64, i8* }* %dst_lowered_ptr
494 // %6 = extractvalue { i64, i8* } %anyval_result, 1
495 // %7 = insertvalue %"struct.impala::StringValue" zeroinitializer, i8* %6, 0
496 // %8 = extractvalue { i64, i8* } %anyval_result, 0
497 // %9 = ashr i64 %8, 32
498 // %10 = trunc i64 %9 to i32
499 // %11 = insertvalue %"struct.impala::StringValue" %7, i32 %10, 1
500 // store %"struct.impala::StringValue" %11, %"struct.impala::StringValue"* %dst_slot_ptr
501 // br label %ret
502 //
503 // ret: ; preds = %src_not_null, %entry
504 // ret void
505 // }
507  RuntimeState* state, AggFnEvaluator* evaluator, SlotDescriptor* slot_desc) {
508  DCHECK(slot_desc->is_materialized());
509  LlvmCodeGen* codegen;
510  if (!state->GetCodegen(&codegen).ok()) return NULL;
511 
512  DCHECK_EQ(evaluator->input_expr_ctxs().size(), 1);
513  ExprContext* input_expr_ctx = evaluator->input_expr_ctxs()[0];
514  Expr* input_expr = input_expr_ctx->root();
515  // TODO: implement timestamp
516  if (input_expr->type().type == TYPE_TIMESTAMP) return NULL;
517  Function* agg_expr_fn;
518  Status status = input_expr->GetCodegendComputeFn(state, &agg_expr_fn);
519  if (!status.ok()) {
520  VLOG_QUERY << "Could not codegen UpdateSlot(): " << status.GetDetail();
521  return NULL;
522  }
523  DCHECK(agg_expr_fn != NULL);
524 
525  PointerType* fn_ctx_type =
527  StructType* tuple_struct = intermediate_tuple_desc_->GenerateLlvmStruct(codegen);
528  PointerType* tuple_ptr_type = PointerType::get(tuple_struct, 0);
529  PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
530 
531  // Create UpdateSlot prototype
532  LlvmCodeGen::FnPrototype prototype(codegen, "UpdateSlot", codegen->void_type());
533  prototype.AddArgument(LlvmCodeGen::NamedVariable("fn_ctx", fn_ctx_type));
534  prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", tuple_ptr_type));
535  prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
536 
537  LlvmCodeGen::LlvmBuilder builder(codegen->context());
538  Value* args[3];
539  Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
540  Value* fn_ctx_arg = args[0];
541  Value* agg_tuple_arg = args[1];
542  Value* row_arg = args[2];
543 
544  BasicBlock* src_not_null_block =
545  BasicBlock::Create(codegen->context(), "src_not_null", fn);
546  BasicBlock* ret_block = BasicBlock::Create(codegen->context(), "ret", fn);
547 
548  // Call expr function to get src slot value
549  Value* ctx_arg = codegen->CastPtrToLlvmPtr(
550  codegen->GetPtrType(ExprContext::LLVM_CLASS_NAME), input_expr_ctx);
551  Value* agg_expr_fn_args[] = { ctx_arg, row_arg };
553  codegen, &builder, input_expr->type(), agg_expr_fn, agg_expr_fn_args, "src");
554 
555  Value* src_is_null = src.GetIsNull();
556  builder.CreateCondBr(src_is_null, ret_block, src_not_null_block);
557 
558  // Src slot is not null, update dst_slot
559  builder.SetInsertPoint(src_not_null_block);
560  Value* dst_ptr =
561  builder.CreateStructGEP(agg_tuple_arg, slot_desc->field_idx(), "dst_slot_ptr");
562  Value* result = NULL;
563 
564  if (slot_desc->is_nullable()) {
565  // Dst is NULL, just update dst slot to src slot and clear null bit
566  Function* clear_null_fn = slot_desc->CodegenUpdateNull(codegen, tuple_struct, false);
567  builder.CreateCall(clear_null_fn, agg_tuple_arg);
568  }
569 
570  // Update the slot
571  Value* dst_value = builder.CreateLoad(dst_ptr, "dst_val");
572  switch (evaluator->agg_op()) {
574  if (evaluator->is_merge()) {
575  result = builder.CreateAdd(dst_value, src.GetVal(), "count_sum");
576  } else {
577  result = builder.CreateAdd(dst_value,
578  codegen->GetIntConstant(TYPE_BIGINT, 1), "count_inc");
579  }
580  break;
581  case AggFnEvaluator::MIN: {
582  Function* min_fn = codegen->CodegenMinMax(slot_desc->type(), true);
583  Value* min_args[] = { dst_value, src.GetVal() };
584  result = builder.CreateCall(min_fn, min_args, "min_value");
585  break;
586  }
587  case AggFnEvaluator::MAX: {
588  Function* max_fn = codegen->CodegenMinMax(slot_desc->type(), false);
589  Value* max_args[] = { dst_value, src.GetVal() };
590  result = builder.CreateCall(max_fn, max_args, "max_value");
591  break;
592  }
593  case AggFnEvaluator::SUM:
594  if (slot_desc->type().type == TYPE_FLOAT || slot_desc->type().type == TYPE_DOUBLE) {
595  result = builder.CreateFAdd(dst_value, src.GetVal());
596  } else {
597  result = builder.CreateAdd(dst_value, src.GetVal());
598  }
599  break;
600  case AggFnEvaluator::NDV: {
601  DCHECK_EQ(slot_desc->type().type, TYPE_STRING);
602  IRFunction::Type ir_function_type = evaluator->is_merge() ? IRFunction::HLL_MERGE
603  : GetHllUpdateFunction2(input_expr->type());
604  Function* hll_fn = codegen->GetFunction(ir_function_type);
605 
606  // Create pointer to src_anyval to pass to HllUpdate() function. We must use the
607  // unlowered type.
608  Value* src_lowered_ptr = codegen->CreateEntryBlockAlloca(
609  fn, LlvmCodeGen::NamedVariable("src_lowered_ptr", src.value()->getType()));
610  builder.CreateStore(src.value(), src_lowered_ptr);
611  Type* unlowered_ptr_type =
612  CodegenAnyVal::GetUnloweredType(codegen, input_expr->type())->getPointerTo();
613  Value* src_unlowered_ptr =
614  builder.CreateBitCast(src_lowered_ptr, unlowered_ptr_type, "src_unlowered_ptr");
615 
616  // Create StringVal* intermediate argument from dst_value
618  codegen, &builder, TYPE_STRING, "dst_stringval");
619  dst_stringval.SetFromRawValue(dst_value);
620  // Create pointer to dst_stringval to pass to HllUpdate() function. We must use
621  // the unlowered type.
622  Value* dst_lowered_ptr = codegen->CreateEntryBlockAlloca(
623  fn, LlvmCodeGen::NamedVariable("dst_lowered_ptr",
624  dst_stringval.value()->getType()));
625  builder.CreateStore(dst_stringval.value(), dst_lowered_ptr);
626  unlowered_ptr_type =
627  codegen->GetPtrType(CodegenAnyVal::GetUnloweredType(codegen, TYPE_STRING));
628  Value* dst_unlowered_ptr =
629  builder.CreateBitCast(dst_lowered_ptr, unlowered_ptr_type, "dst_unlowered_ptr");
630 
631  // Call 'hll_fn'
632  builder.CreateCall3(hll_fn, fn_ctx_arg, src_unlowered_ptr, dst_unlowered_ptr);
633 
634  // Convert StringVal intermediate 'dst_arg' back to StringValue
635  Value* anyval_result = builder.CreateLoad(dst_lowered_ptr, "anyval_result");
636  result = CodegenAnyVal(codegen, &builder, TYPE_STRING, anyval_result)
637  .ToNativeValue();
638  break;
639  }
640  default:
641  DCHECK(false) << "bad aggregate operator: " << evaluator->agg_op();
642  }
643 
644  builder.CreateStore(result, dst_ptr);
645  builder.CreateBr(ret_block);
646 
647  builder.SetInsertPoint(ret_block);
648  builder.CreateRetVoid();
649 
650  return codegen->FinalizeFunction(fn);
651 }
652 
653 // IR codegen for the UpdateTuple loop. This loop is query specific and
654 // based on the aggregate functions. The function signature must match the non-
655 // codegen'd UpdateTuple exactly.
656 // For the query:
657 // select count(*), count(int_col), sum(double_col) the IR looks like:
658 //
659 // define void @UpdateTuple(%"class.impala::AggregationNode"* %this_ptr,
660 // %"class.impala::Tuple"* %agg_tuple,
661 // %"class.impala::TupleRow"* %tuple_row) #20 {
662 // entry:
663 // %tuple = bitcast %"class.impala::Tuple"* %agg_tuple to { i8, i64, i64, double }*
664 // %src_slot = getelementptr inbounds { i8, i64, i64, double }* %tuple, i32 0, i32 1
665 // %count_star_val = load i64* %src_slot
666 // %count_star_inc = add i64 %count_star_val, 1
667 // store i64 %count_star_inc, i64* %src_slot
668 // call void @UpdateSlot(%"class.impala_udf::FunctionContext"* inttoptr
669 // (i64 44521296 to %"class.impala_udf::FunctionContext"*),
670 // { i8, i64, i64, double }* %tuple,
671 // %"class.impala::TupleRow"* %tuple_row)
672 // call void @UpdateSlot5(%"class.impala_udf::FunctionContext"* inttoptr
673 // (i64 44521328 to %"class.impala_udf::FunctionContext"*),
674 // { i8, i64, i64, double }* %tuple,
675 // %"class.impala::TupleRow"* %tuple_row)
676 // ret void
677 // }
679  LlvmCodeGen* codegen;
680  if (!state->GetCodegen(&codegen).ok()) return NULL;
681  SCOPED_TIMER(codegen->codegen_timer());
682 
683  int j = probe_expr_ctxs_.size();
684  for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++j) {
685  // skip non-materialized slots; we don't have evaluators instantiated for those
686  while (!intermediate_tuple_desc_->slots()[j]->is_materialized()) {
687  DCHECK_LT(j, intermediate_tuple_desc_->slots().size() - 1);
688  ++j;
689  }
690  SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j];
691  AggFnEvaluator* evaluator = aggregate_evaluators_[i];
692 
693  // Timestamp and char are never supported. NDV supports decimal and string but no
694  // other functions.
695  // TODO: the other aggregate functions might work with decimal as-is
696  if (slot_desc->type().type == TYPE_TIMESTAMP || slot_desc->type().type == TYPE_CHAR ||
697  (evaluator->agg_op() != AggFnEvaluator::NDV &&
698  (slot_desc->type().type == TYPE_DECIMAL ||
699  slot_desc->type().type == TYPE_STRING ||
700  slot_desc->type().type == TYPE_VARCHAR))) {
701  VLOG_QUERY << "Could not codegen UpdateIntermediateTuple because "
702  << "string, char, timestamp and decimal are not yet supported.";
703  return NULL;
704  }
705 
706  // Don't codegen things that aren't builtins (for now)
707  if (!evaluator->is_builtin()) return NULL;
708  }
709 
710  if (intermediate_tuple_desc_->GenerateLlvmStruct(codegen) == NULL) {
711  VLOG_QUERY << "Could not codegen UpdateTuple because we could"
712  << "not generate a matching llvm struct for the intermediate tuple.";
713  return NULL;
714  }
715 
716  // Get the types to match the UpdateTuple signature
717  Type* agg_node_type = codegen->GetType(AggregationNode::LLVM_CLASS_NAME);
718  Type* agg_tuple_type = codegen->GetType(Tuple::LLVM_CLASS_NAME);
719  Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
720 
721  DCHECK(agg_node_type != NULL);
722  DCHECK(agg_tuple_type != NULL);
723  DCHECK(tuple_row_type != NULL);
724 
725  PointerType* agg_node_ptr_type = PointerType::get(agg_node_type, 0);
726  PointerType* agg_tuple_ptr_type = PointerType::get(agg_tuple_type, 0);
727  PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
728 
729  // Signature for UpdateTuple is
730  // void UpdateTuple(AggregationNode* this, Tuple* tuple, TupleRow* row)
731  // This signature needs to match the non-codegen'd signature exactly.
732  StructType* tuple_struct = intermediate_tuple_desc_->GenerateLlvmStruct(codegen);
733  PointerType* tuple_ptr = PointerType::get(tuple_struct, 0);
734  LlvmCodeGen::FnPrototype prototype(codegen, "UpdateTuple", codegen->void_type());
735  prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", agg_node_ptr_type));
736  prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", agg_tuple_ptr_type));
737  prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple_row", tuple_row_ptr_type));
738 
739  LlvmCodeGen::LlvmBuilder builder(codegen->context());
740  Value* args[3];
741  Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
742 
743  // Cast the parameter types to the internal llvm runtime types.
744  // TODO: get rid of this by using right type in function signature
745  args[1] = builder.CreateBitCast(args[1], tuple_ptr, "tuple");
746 
747  // Loop over each expr and generate the IR for that slot. If the expr is not
748  // count(*), generate a helper IR function to update the slot and call that.
749  j = probe_expr_ctxs_.size();
750  for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++j) {
751  // skip non-materialized slots; we don't have evaluators instantiated for those
752  while (!intermediate_tuple_desc_->slots()[j]->is_materialized()) {
753  DCHECK_LT(j, intermediate_tuple_desc_->slots().size() - 1);
754  ++j;
755  }
756  SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j];
757  AggFnEvaluator* evaluator = aggregate_evaluators_[i];
758  if (evaluator->is_count_star()) {
759  // TODO: we should be able to hoist this up to the loop over the batch and just
760  // increment the slot by the number of rows in the batch.
761  int field_idx = slot_desc->field_idx();
762  Value* const_one = codegen->GetIntConstant(TYPE_BIGINT, 1);
763  Value* slot_ptr = builder.CreateStructGEP(args[1], field_idx, "src_slot");
764  Value* slot_loaded = builder.CreateLoad(slot_ptr, "count_star_val");
765  Value* count_inc = builder.CreateAdd(slot_loaded, const_one, "count_star_inc");
766  builder.CreateStore(count_inc, slot_ptr);
767  } else {
768  Function* update_slot_fn = CodegenUpdateSlot(state, evaluator, slot_desc);
769  if (update_slot_fn == NULL) return NULL;
770  Value* fn_ctx_arg = codegen->CastPtrToLlvmPtr(
772  agg_fn_ctxs_[i]);
773  builder.CreateCall3(update_slot_fn, fn_ctx_arg, args[1], args[2]);
774  }
775  }
776  builder.CreateRetVoid();
777 
778  // CodegenProcessRowBatch() does the final optimizations.
779  return codegen->FinalizeFunction(fn);
780 }
781 
783  RuntimeState* state, Function* update_tuple_fn) {
784  LlvmCodeGen* codegen;
785  if (!state->GetCodegen(&codegen).ok()) return NULL;
786  SCOPED_TIMER(codegen->codegen_timer());
787  DCHECK(update_tuple_fn != NULL);
788 
789  // Get the cross compiled update row batch function
790  IRFunction::Type ir_fn = (!probe_expr_ctxs_.empty() ?
791  IRFunction::AGG_NODE_PROCESS_ROW_BATCH_WITH_GROUPING :
792  IRFunction::AGG_NODE_PROCESS_ROW_BATCH_NO_GROUPING);
793  Function* process_batch_fn = codegen->GetFunction(ir_fn);
794 
795  if (process_batch_fn == NULL) {
796  LOG(ERROR) << "Could not find AggregationNode::ProcessRowBatch in module.";
797  return NULL;
798  }
799 
800  int replaced = 0;
801  if (!probe_expr_ctxs_.empty()) {
802  // Aggregation w/o grouping does not use a hash table.
803 
804  // Codegen for hash
805  Function* hash_fn = hash_tbl_->CodegenHashCurrentRow(state);
806  if (hash_fn == NULL) return NULL;
807 
808  // Codegen HashTable::Equals
809  Function* equals_fn = hash_tbl_->CodegenEquals(state);
810  if (equals_fn == NULL) return NULL;
811 
812  // Codegen for evaluating build rows
813  Function* eval_build_row_fn = hash_tbl_->CodegenEvalTupleRow(state, true);
814  if (eval_build_row_fn == NULL) return NULL;
815 
816  // Codegen for evaluating probe rows
817  Function* eval_probe_row_fn = hash_tbl_->CodegenEvalTupleRow(state, false);
818  if (eval_probe_row_fn == NULL) return NULL;
819 
820  // Replace call sites
821  process_batch_fn = codegen->ReplaceCallSites(process_batch_fn, false,
822  eval_build_row_fn, "EvalBuildRow", &replaced);
823  DCHECK_EQ(replaced, 1);
824 
825  process_batch_fn = codegen->ReplaceCallSites(process_batch_fn, false,
826  eval_probe_row_fn, "EvalProbeRow", &replaced);
827  DCHECK_EQ(replaced, 1);
828 
829  process_batch_fn = codegen->ReplaceCallSites(process_batch_fn, false,
830  hash_fn, "HashCurrentRow", &replaced);
831  DCHECK_EQ(replaced, 2);
832 
833  process_batch_fn = codegen->ReplaceCallSites(process_batch_fn, false,
834  equals_fn, "Equals", &replaced);
835  DCHECK_EQ(replaced, 1);
836  }
837 
838  process_batch_fn = codegen->ReplaceCallSites(process_batch_fn, false,
839  update_tuple_fn, "UpdateTuple", &replaced);
840  DCHECK_EQ(replaced, 1) << "One call site should be replaced.";
841  DCHECK(process_batch_fn != NULL);
842  return codegen->OptimizeFunctionWithExprs(process_batch_fn);
843 }
844 
845 }
static const char * LLVM_CLASS_NAME
TupleDescriptor * output_tuple_desc_
void SetFromRawValue(llvm::Value *raw_val)
void AddRuntimeExecOption(const std::string &option)
Appends option to 'runtime_exec_options_'.
Definition: exec-node.cc:188
const std::string GetDetail() const
Definition: status.cc:184
static CodegenAnyVal CreateCallWrapped(LlvmCodeGen *cg, LlvmCodeGen::LlvmBuilder *builder, const ColumnType &type, llvm::Function *fn, llvm::ArrayRef< llvm::Value * > args, const char *name="", llvm::Value *result_ptr=NULL)
Same as above but wraps the result in a CodegenAnyVal.
void SetNull(const NullIndicatorOffset &offset)
Definition: tuple.h:101
RuntimeProfile::Counter * codegen_timer()
Definition: llvm-codegen.h:135
llvm::PointerType * GetPtrType(llvm::Type *type)
Return a pointer type to 'type'.
int64_t num_rows_returned_
Definition: exec-node.h:223
virtual Status Prepare(RuntimeState *state)
std::vector< ExprContext * > build_expr_ctxs_
MemTracker * mem_tracker()
Definition: exec-node.h:162
llvm::Function * CodegenUpdateSlot(RuntimeState *state, AggFnEvaluator *evaluator, SlotDescriptor *slot_desc)
AggregationNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
Utility struct that wraps a variable name and llvm type.
Definition: llvm-codegen.h:149
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
RuntimeProfile::Counter * build_timer_
Time spent processing the child rows.
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
void * SetToMin(const ColumnType &type)
Sets the value for type to min and returns a pointer to the data.
Definition: expr-value.h:103
static llvm::Type * GetUnloweredType(LlvmCodeGen *cg, const ColumnType &type)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
std::string DebugString() const
virtual Status Init(const TPlanNode &tnode)
Definition: exec-node.cc:124
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
bool AtEnd() const
Returns true if this iterator is at the end, i.e. GetRow() cannot be called.
The materialized value returned by ExprContext::GetValue().
Definition: expr-value.h:25
#define ADD_TIMER(profile, name)
TupleId intermediate_tuple_id_
Tuple into which Update()/Merge()/Serialize() results are stored.
bool AtCapacity()
Definition: row-batch.h:120
llvm::Value * ToNativeValue()
void * GetSlot(int offset)
Definition: tuple.h:118
const std::vector< SlotDescriptor * > & slots() const
Definition: descriptors.h:302
int byte_size() const
Definition: descriptors.h:300
void Add(FunctionContext *agg_fn_ctx, TupleRow *src, Tuple *dst)
static Status Create(ObjectPool *pool, const TExpr &desc, AggFnEvaluator **result)
llvm::StructType * GenerateLlvmStruct(LlvmCodeGen *codegen)
Definition: descriptors.cc:556
const RowDescriptor & row_desc() const
Definition: exec-node.h:156
static Tuple * Create(int size, MemPool *pool)
initialize individual tuple with data residing in mem pool
Definition: tuple.h:51
const NullIndicatorOffset & null_indicator_offset() const
Definition: descriptors.h:89
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Definition: exec-node.cc:378
bool ReachedLimit()
Definition: exec-node.h:159
void * SetToMax(const ColumnType &type)
Sets the value for type to max and returns a pointer to the data.
Definition: expr-value.h:137
AggregationOp agg_op() const
TupleDescriptor * GetTupleDescriptor(TupleId id) const
Definition: descriptors.cc:437
#define SCOPED_TIMER(c)
static CodegenAnyVal GetNonNullVal(LlvmCodeGen *codegen, LlvmCodeGen::LlvmBuilder *builder, const ColumnType &type, const char *name="")
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
bool IsNull(const NullIndicatorOffset &offset) const
Definition: tuple.h:112
virtual Status Reset(RuntimeState *state)
LLVM code generator. This is the top level object to generate jitted code.
Definition: llvm-codegen.h:107
llvm::Function * CodegenUpdateTuple(RuntimeState *state)
Codegen UpdateTuple(). Returns NULL if codegen is unsuccessful.
bool is_nullable() const
Definition: descriptors.h:93
void ProcessRowBatchWithGrouping(RowBatch *batch)
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
Definition: exec-node.cc:345
MemTracker * expr_mem_tracker()
Definition: exec-node.h:163
static const char * LLVM_CLASS_NAME
Definition: tuple-row.h:76
#define VLOG_QUERY
Definition: logging.h:57
static const char * LLVM_CLASS_NAME
Definition: expr-context.h:126
PrimitiveType type
Definition: types.h:60
llvm::Value * CastPtrToLlvmPtr(llvm::Type *type, const void *ptr)
void AddArgument(const NamedVariable &var)
Add argument.
Definition: llvm-codegen.h:171
std::vector< ExprContext * > probe_expr_ctxs_
Exprs used to evaluate input rows.
static const char * LLVM_CLASS_NAME
For C++/IR interop, we need to be able to look up types by name.
Definition: tuple.h:134
void Serialize(FunctionContext *agg_fn_ctx, Tuple *dst)
const ColumnType & type() const
Definition: descriptors.h:78
ObjectPool * obj_pool() const
Definition: runtime-state.h:92
The hash table does not support removes. The hash table is not thread safe.
static const char * LLVM_FUNCTIONCONTEXT_NAME
Definition: udf-internal.h:93
#define RETURN_IF_CANCELLED(state)
ObjectPool pool
void Init(FunctionContext *agg_fn_ctx, Tuple *dst)
Functions for different phases of the aggregation.
virtual Status Prepare(RuntimeState *state)
Definition: exec-node.cc:130
#define ADD_COUNTER(profile, name, unit)
void UpdateTuple(Tuple *tuple, TupleRow *row)
llvm::Function * GetFunction(IRFunction::Type)
void ProcessRowBatchNoGrouping(RowBatch *batch)
Do the aggregation for all tuple rows in the batch.
llvm::Function * codegen_process_row_batch_fn_
IR for process row batch. NULL if codegen is disabled.
void AddFunctionToJit(llvm::Function *fn, void **fn_ptr)
llvm::Function * CodegenUpdateNull(LlvmCodeGen *, llvm::StructType *tuple, bool set_null)
Definition: descriptors.cc:510
static void Write(const void *value, Tuple *tuple, const SlotDescriptor *slot_desc, MemPool *pool)
Definition: raw-value.cc:303
virtual Status QueryMaintenance(RuntimeState *state)
Definition: exec-node.cc:401
This is the superclass of all expr evaluation nodes.
Definition: expr.h:116
#define VLOG_ROW
Definition: logging.h:59
const DescriptorTbl & desc_tbl() const
Definition: runtime-state.h:93
bool is_closed()
Definition: exec-node.h:242
void CommitLastRow()
Definition: row-batch.h:109
std::vector< ExecNode * > children_
Definition: exec-node.h:214
#define COUNTER_SET(c, v)
uint64_t count
int batch_size() const
Definition: runtime-state.h:98
virtual Status Open(RuntimeState *state)
MemPool * tuple_data_pool()
Definition: row-batch.h:148
RuntimeProfile::Counter * rows_returned_counter_
Definition: exec-node.h:226
std::vector< AggFnEvaluator * > aggregate_evaluators_
void IR_ALWAYS_INLINE Next()
ExecNode * child(int i)
Definition: exec-node.h:241
IRFunction::Type GetHllUpdateFunction2(const ColumnType &type)
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
const ColumnType & type() const
Definition: expr.h:145
llvm::Value * value()
Returns the current type-lowered value.
bool codegen_enabled() const
Returns true if codegen is enabled for this query.
Reference to a single slot of a tuple.
Definition: slot-ref.h:23
llvm::Value * GetVal(const char *name="val")
static const Status OK
Definition: status.h:87
ObjectPool * pool_
Definition: exec-node.h:211
virtual Status GetCodegendComputeFn(RuntimeState *state, llvm::Function **fn)=0
llvm::Type * GetType(const ColumnType &type)
Returns llvm type for the column type.
std::vector< impala_udf::FunctionContext * > agg_fn_ctxs_
FunctionContext for each agg fn and backing pool.
Status GetCodegen(LlvmCodeGen **codegen, bool initialize=true)
int tuple_offset() const
Definition: descriptors.h:88
Tuple * FinalizeTuple(Tuple *tuple, MemPool *pool)
int field_idx() const
Returns the field index in the generated llvm struct for this slot's tuple.
Definition: descriptors.h:87
llvm::Value * GetIsNull(const char *name="is_null")
Gets the 'is_null' field of the *Val.
llvm::Value * GetIntConstant(PrimitiveType type, int64_t val)
Returns the constant 'val' of 'type'.
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
Definition: expr.cc:149
llvm::Function * FinalizeFunction(llvm::Function *function)
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
Definition: exec-node.cc:393
bool is_materialized() const
Definition: descriptors.h:92
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
#define VLOG_FILE
Definition: logging.h:58
OldHashTable::Iterator output_iterator_
#define VLOG_ROW_IS_ON
Definition: logging.h:66
boost::scoped_ptr< MemPool > agg_fn_pool_
virtual Status Open(RuntimeState *state)
Definition: exec-node.cc:154
llvm::Function * ReplaceCallSites(llvm::Function *caller, bool update_in_place, llvm::Function *new_fn, const std::string &target_name, int *num_replaced)
RuntimeProfile::Counter * get_results_timer_
Time spent returning the aggregated rows.
string PrintRow(TupleRow *row, const RowDescriptor &d)
Definition: debug-util.cc:192
RuntimeProfile::Counter * hash_table_load_factor_counter_
Load factor in hash table.
bool ok() const
Definition: status.h:172
ProcessRowBatchFn process_row_batch_fn_
Jitted ProcessRowBatch function pointer. Null if codegen is disabled.
TupleDescriptor * intermediate_tuple_desc_
llvm::Type * void_type()
Definition: llvm-codegen.h:394
RuntimeProfile::Counter * hash_table_buckets_counter_
Num buckets in hash table.
boost::scoped_ptr< MemPool > tuple_pool_
std::vector< ExprContext * > conjunct_ctxs_
Definition: exec-node.h:212
virtual void Close(RuntimeState *state)
Definition: exec-node.cc:166
llvm::LLVMContext & context()
Definition: llvm-codegen.h:214
virtual std::string DebugString() const
Definition: expr.cc:385
llvm::Function * OptimizeFunctionWithExprs(llvm::Function *fn)
llvm::Function * CodegenProcessRowBatch(RuntimeState *state, llvm::Function *update_tuple_fn)
virtual void Close(RuntimeState *state)
boost::scoped_ptr< OldHashTable > hash_tbl_
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161
virtual Status Init(const TPlanNode &tnode)
void Finalize(FunctionContext *agg_fn_ctx, Tuple *src, Tuple *dst)
const std::vector< ExprContext * > & input_expr_ctxs() const