19 #include <gutil/strings/substitute.h>
20 #include <thrift/protocol/TDebugProtocol.h>
42 #include "gen-cpp/Exprs_types.h"
43 #include "gen-cpp/PlanNodes_types.h"
47 using namespace impala;
49 using namespace strings;
54 "class.impala::PartitionedAggregationNode";
59 intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
60 intermediate_tuple_desc_(NULL),
61 output_tuple_id_(tnode.agg_node.output_tuple_id),
62 output_tuple_desc_(NULL),
63 needs_finalize_(tnode.agg_node.need_finalize),
64 needs_serialize_(false),
65 block_mgr_client_(NULL),
66 using_small_buffers_(true),
67 singleton_output_tuple_(NULL),
68 singleton_output_tuple_returned_(true),
69 output_partition_(NULL),
70 process_row_batch_fn_(NULL),
72 ht_resize_timer_(NULL),
73 get_results_timer_(NULL),
74 num_hash_buckets_(NULL),
75 partitions_created_(NULL),
76 max_partition_level_(NULL),
77 num_row_repartitioned_(NULL),
78 num_repartitions_(NULL) {
86 for (
int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) {
89 pool_, tnode.agg_node.aggregate_functions[i], &evaluator));
120 "MaxPartitionLevel", TUnit::UNIT);
128 "LargestPartitionPercent", TUnit::UNIT);
176 intermediate_slot_desc, output_slot_desc,
agg_fn_pool_.get(), &agg_fn_ctx));
200 if (codegen_process_row_batch_fn != NULL) {
240 for (
int i = 0; i < batch.num_rows(); ++i) {
289 int row_idx = row_batch->
AddRow();
328 if ((count++ & (N - 1)) == 0) {
333 int row_idx = row_batch->
AddRow();
360 Tuple* dummy_dst = NULL;
364 while (!it.
AtEnd()) {
376 DCHECK(
false) <<
"NYI";
455 DCHECK(hash_tbl.get() == NULL);
462 hash_tbl.reset(
new HashTable(parent->state_, parent->block_mgr_client_, 1, NULL,
464 return hash_tbl->Init();
468 DCHECK(!is_spilled());
469 if (parent->needs_serialize_ && aggregated_row_stream->num_rows() != 0) {
480 DCHECK_NOTNULL(parent->serialize_stream_.get());
481 DCHECK(!parent->serialize_stream_->is_pinned());
482 DCHECK(parent->serialize_stream_->has_write_block());
484 const vector<AggFnEvaluator*>& evaluators = parent->aggregate_evaluators_;;
487 bool failed_to_add =
false;
490 while (!it.
AtEnd()) {
494 if (
UNLIKELY(!new_stream->
AddRow(reinterpret_cast<TupleRow*>(&tuple)))) {
495 failed_to_add =
true;
500 if (intermediate_tuple != NULL) {
502 if (!failed_to_add &&
503 !new_stream->
AddRow(reinterpret_cast<TupleRow*>(&intermediate_tuple))) {
504 failed_to_add =
true;
512 parent->CleanupHashTbl(agg_fn_ctxs, it);
515 aggregated_row_stream->Close();
517 return parent->state_->block_mgr()->MemLimitTooLowError(parent->block_mgr_client_);
520 aggregated_row_stream->Close();
521 aggregated_row_stream.swap(parent->serialize_stream_);
527 *parent->intermediate_row_desc_, parent->state_->block_mgr(),
528 parent->block_mgr_client_,
531 Status s = parent->serialize_stream_->Init(parent->runtime_profile(),
false);
537 DCHECK(parent->serialize_stream_->has_write_block());
541 for (
int i = 0; i < agg_fn_ctxs.size(); ++i) {
542 agg_fn_ctxs[i]->impl()->Close();
545 if (agg_fn_pool.get() != NULL) {
546 agg_fn_pool->FreeAll();
552 DCHECK(aggregated_row_stream->has_write_block())
553 << aggregated_row_stream->DebugString();
557 if (parent->num_spilled_partitions_->value() == 1) {
558 parent->AddRuntimeExecOption(
"Spilled");
563 DCHECK(!(aggregated_row_stream->using_small_buffers() &&
564 aggregated_row_stream->num_rows() > 0));
565 DCHECK(!(unaggregated_row_stream->using_small_buffers() &&
566 unaggregated_row_stream->num_rows() > 0));
573 if (aggregated_row_stream.get() != NULL) {
574 if (finalize_rows && hash_tbl.get() != NULL) {
578 parent->CleanupHashTbl(agg_fn_ctxs, hash_tbl->Begin(parent->ht_ctx_.get()));
580 aggregated_row_stream->Close();
582 if (hash_tbl.get() != NULL) hash_tbl->Close();
583 if (unaggregated_row_stream.get() != NULL) unaggregated_row_stream->Close();
585 for (
int i = 0; i < agg_fn_ctxs.size(); ++i) {
586 agg_fn_ctxs[i]->impl()->Close();
588 if (agg_fn_pool.get() != NULL) agg_fn_pool->FreeAll();
592 const vector<FunctionContext*>& agg_fn_ctxs,
MemPool*
pool,
594 DCHECK(stream == NULL || pool == NULL);
595 DCHECK(stream != NULL || pool != NULL);
597 Tuple* intermediate_tuple = NULL;
598 uint8_t* buffer = NULL;
610 if (
ht_ctx_->last_expr_value_null(i))
continue;
616 if (buffer == NULL)
return NULL;
617 intermediate_tuple =
reinterpret_cast<Tuple*
>(buffer);
619 intermediate_tuple->
Init(size);
624 vector<SlotDescriptor*>::const_iterator slot_desc =
627 if (
ht_ctx_->last_expr_value_null(i)) {
628 intermediate_tuple->
SetNull((*slot_desc)->null_indicator_offset());
630 void* src =
ht_ctx_->last_expr_value(i);
631 void* dst = intermediate_tuple->
GetSlot((*slot_desc)->tuple_offset());
632 if (stream == NULL) {
642 while (!(*slot_desc)->is_materialized()) ++slot_desc;
644 evaluator->
Init(agg_fn_ctxs[i], intermediate_tuple);
655 (*slot_desc)->type().type !=
TYPE_CHAR &&
658 void* default_value_ptr = NULL;
659 switch (evaluator->
agg_op()) {
661 default_value_ptr = default_value.
SetToMax((*slot_desc)->type());
662 RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
665 default_value_ptr = default_value.
SetToMin((*slot_desc)->type());
666 RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
673 return intermediate_tuple;
704 for (
int i = 0; i < num_grouping_slots; ++i) {
708 void* src_slot = NULL;
717 stringstream* out)
const {
718 *out << string(indentation_level * 2,
' ');
719 *out <<
"PartitionedAggregationNode("
759 int64_t max_rows = 0;
765 if (rows > max_rows) max_rows = rows;
786 if (partition == NULL) {
794 DCHECK(partition->is_spilled());
807 RETURN_IF_ERROR(ProcessStream<true>(partition->aggregated_row_stream.get()));
808 RETURN_IF_ERROR(ProcessStream<false>(partition->unaggregated_row_stream.get()));
812 partition->unaggregated_row_stream->num_rows());
814 partition->Close(
false);
819 int64_t num_input_rows = partition->aggregated_row_stream->num_rows() +
820 partition->unaggregated_row_stream->num_rows();
824 DCHECK_GE(num_input_rows, largest_partition) <<
"Cannot have a partition with "
825 "more rows than the input";
826 if (num_input_rows == largest_partition) {
828 status.
AddDetail(Substitute(
"Cannot perform aggregation at node with id $0. "
829 "Repartitioning did not reduce the size of a spilled partition. "
830 "Repartitioning level $1. Number of rows $2.",
831 id_, partition->level + 1, num_input_rows));
839 DCHECK(partition->
hash_tbl.get() != NULL);
848 template<
bool AGGREGATED_ROWS>
852 bool got_buffer =
false;
854 if (got_buffer)
break;
868 input_stream->
Close();
873 Tuple* intermediate_tuple) {
874 int64_t max_freed_mem = 0;
875 int partition_idx = -1;
881 DCHECK(
hash_partitions_[i]->unaggregated_row_stream->using_small_buffers());
884 hash_partitions_[i]->aggregated_row_stream->SwitchToIoBuffers(&got_buffer));
887 hash_partitions_[i]->unaggregated_row_stream->SwitchToIoBuffers(&got_buffer));
891 status.
AddDetail(
"Not enough memory to get the minimum required buffers for "
903 int64_t mem =
hash_partitions_[i]->aggregated_row_stream->bytes_in_mem(
true);
906 if (mem > max_freed_mem) {
911 if (partition_idx == -1) {
918 spilled_partition == curr_partition ? intermediate_tuple : NULL));
925 ss <<
"PA(node_id=" <<
id() <<
") partitioned(level="
927 << num_input_rows <<
" rows into:" << endl;
932 double total_rows = aggregated_rows + unaggregated_rows;
933 double percent = total_rows * 100 / num_input_rows;
934 ss <<
" " << i <<
" " << (partition->
is_spilled() ?
"spilled" :
"not spilled")
935 <<
" (fraction=" << fixed << setprecision(2) << percent <<
"%)" << endl
936 <<
" #aggregated rows:" << aggregated_rows << endl
937 <<
" #unaggregated rows: " << unaggregated_rows << endl;
942 if (total_rows == 0) {
943 partition->
Close(
false);
945 DCHECK(partition->
hash_tbl.get() == NULL);
1063 Expr* input_expr = input_expr_ctx->root();
1071 Function* agg_expr_fn;
1077 DCHECK(agg_expr_fn != NULL);
1079 PointerType* fn_ctx_type =
1082 PointerType* tuple_ptr_type = PointerType::get(tuple_struct, 0);
1093 Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
1094 Value* fn_ctx_arg = args[0];
1095 Value* agg_tuple_arg = args[1];
1096 Value* row_arg = args[2];
1098 BasicBlock* src_not_null_block =
1099 BasicBlock::Create(codegen->context(),
"src_not_null", fn);
1100 BasicBlock* ret_block = BasicBlock::Create(codegen->context(),
"ret", fn);
1103 Value* expr_ctx = codegen->CastPtrToLlvmPtr(
1105 Value* agg_expr_fn_args[] = { expr_ctx, row_arg };
1107 codegen, &builder, input_expr->
type(), agg_expr_fn, agg_expr_fn_args,
"src");
1110 builder.CreateCondBr(src_is_null, ret_block, src_not_null_block);
1113 builder.SetInsertPoint(src_not_null_block);
1115 builder.CreateStructGEP(agg_tuple_arg, slot_desc->
field_idx(),
"dst_slot_ptr");
1116 Value* result = NULL;
1120 Function* clear_null_fn = slot_desc->
CodegenUpdateNull(codegen, tuple_struct,
false);
1121 builder.CreateCall(clear_null_fn, agg_tuple_arg);
1125 Value* dst_value = builder.CreateLoad(dst_ptr,
"dst_val");
1126 switch (evaluator->
agg_op()) {
1129 result = builder.CreateAdd(dst_value, src.
GetVal(),
"count_sum");
1131 result = builder.CreateAdd(dst_value,
1132 codegen->GetIntConstant(
TYPE_BIGINT, 1),
"count_inc");
1136 Function* min_fn = codegen->CodegenMinMax(slot_desc->
type(),
true);
1137 Value* min_args[] = { dst_value, src.
GetVal() };
1138 result = builder.CreateCall(min_fn, min_args,
"min_value");
1142 Function* max_fn = codegen->CodegenMinMax(slot_desc->
type(),
false);
1143 Value* max_args[] = { dst_value, src.
GetVal() };
1144 result = builder.CreateCall(max_fn, max_args,
"max_value");
1151 result = builder.CreateFAdd(dst_value, src.
GetVal());
1153 result = builder.CreateAdd(dst_value, src.
GetVal());
1162 const string& symbol = evaluator->
is_merge() ?
1164 Function* ir_fn = codegen->module()->getFunction(symbol);
1165 DCHECK_NOTNULL(ir_fn);
1168 Value* src_lowered_ptr = codegen->CreateEntryBlockAlloca(
1170 builder.CreateStore(src.
value(), src_lowered_ptr);
1171 Type* unlowered_ptr_type =
1173 Value* src_unlowered_ptr =
1174 builder.CreateBitCast(src_lowered_ptr, unlowered_ptr_type,
"src_unlowered_ptr");
1179 codegen, &builder, dst_type,
"dst");
1182 Value* dst_lowered_ptr = codegen->CreateEntryBlockAlloca(
1184 builder.CreateStore(dst.
value(), dst_lowered_ptr);
1186 Value* dst_unlowered_ptr =
1187 builder.CreateBitCast(dst_lowered_ptr, unlowered_ptr_type,
"dst_unlowered_ptr");
1190 builder.CreateCall3(ir_fn, fn_ctx_arg, src_unlowered_ptr, dst_unlowered_ptr);
1193 Value* anyval_result = builder.CreateLoad(dst_lowered_ptr,
"anyval_result");
1198 DCHECK(
false) <<
"bad aggregate operator: " << evaluator->
agg_op();
1201 builder.CreateStore(result, dst_ptr);
1202 builder.CreateBr(ret_block);
1204 builder.SetInsertPoint(ret_block);
1205 builder.CreateRetVoid();
1207 return codegen->FinalizeFunction(fn);
1259 bool supported =
true;
1276 VLOG_QUERY <<
"Could not codegen UpdateTuple because intermediate type "
1277 << slot_desc->
type()
1278 <<
" is not yet supported for aggregate function \""
1279 << evaluator->
fn_name() <<
"()\"";
1285 VLOG_QUERY <<
"Could not codegen UpdateTuple because we could"
1286 <<
"not generate a matching llvm struct for the intermediate tuple.";
1296 PointerType* agg_node_ptr_type = agg_node_type->getPointerTo();
1297 PointerType* fn_ctx_ptr_ptr_type = fn_ctx_type->getPointerTo()->getPointerTo();
1298 PointerType* tuple_ptr_type = tuple_type->getPointerTo();
1299 PointerType* tuple_row_ptr_type = tuple_row_type->getPointerTo();
1302 PointerType* tuple_ptr = PointerType::get(tuple_struct, 0);
1312 Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
1314 Value* agg_fn_ctxs_arg = args[1];
1315 Value* tuple_arg = args[2];
1316 Value* row_arg = args[3];
1320 tuple_arg = builder.CreateBitCast(tuple_arg, tuple_ptr,
"tuple");
1338 Value* slot_ptr = builder.CreateStructGEP(tuple_arg, field_idx,
"src_slot");
1339 Value* slot_loaded = builder.CreateLoad(slot_ptr,
"count_star_val");
1340 Value* count_inc = builder.CreateAdd(slot_loaded, const_one,
"count_star_inc");
1341 builder.CreateStore(count_inc, slot_ptr);
1344 if (update_slot_fn == NULL)
return NULL;
1345 Value* fn_ctx_ptr = builder.CreateConstGEP1_32(agg_fn_ctxs_arg, i);
1346 Value* fn_ctx = builder.CreateLoad(fn_ctx_ptr,
"fn_ctx");
1347 builder.CreateCall3(update_slot_fn, fn_ctx, tuple_arg, row_arg);
1350 builder.CreateRetVoid();
1362 if (update_tuple_fn == NULL)
return NULL;
1366 IRFunction::PART_AGG_NODE_PROCESS_BATCH_FALSE :
1367 IRFunction::PART_AGG_NODE_PROCESS_BATCH_NO_GROUPING);
1368 Function* process_batch_fn = codegen->
GetFunction(ir_fn);
1369 DCHECK(process_batch_fn != NULL);
1378 Function* hash_fn =
ht_ctx_->CodegenHashCurrentRow(
state_,
false);
1379 if (hash_fn == NULL)
return NULL;
1383 if (equals_fn == NULL)
return NULL;
1386 Function* eval_probe_row_fn =
ht_ctx_->CodegenEvalRow(
state_,
false);
1387 if (eval_probe_row_fn == NULL)
return NULL;
1391 eval_probe_row_fn,
"EvalProbeRow", &replaced);
1392 DCHECK_EQ(replaced, 1);
1395 hash_fn,
"HashCurrentRow", &replaced);
1396 DCHECK_EQ(replaced, 1);
1399 equals_fn,
"Equals", &replaced);
1400 DCHECK_EQ(replaced, 3);
1404 update_tuple_fn,
"UpdateTuple", &replaced);
1405 DCHECK_GE(replaced, 1);
1406 DCHECK(process_batch_fn != NULL);
stl-like iterator interface.
Partition * output_partition_
void SetFromRawValue(llvm::Value *raw_val)
The underlying memory management is done by the BufferedBlockMgr.
void AddRuntimeExecOption(const std::string &option)
Appends option to 'runtime_exec_options_'.
bool AtEnd() const
Returns true if this iterator is at the end, i.e. GetRow() cannot be called.
Status MoveHashPartitions(int64_t input_rows)
int MinRequiredBuffers() const
const std::string GetDetail() const
std::list< Partition * > spilled_partitions_
All partitions that have been spilled and need further processing.
static CodegenAnyVal CreateCallWrapped(LlvmCodeGen *cg, LlvmCodeGen::LlvmBuilder *builder, const ColumnType &type, llvm::Function *fn, llvm::ArrayRef< llvm::Value * > args, const char *name="", llvm::Value *result_ptr=NULL)
Same as above but wraps the result in a CodegenAnyVal.
int num_reserved_buffers_remaining(Client *client) const
void SetNull(const NullIndicatorOffset &offset)
RuntimeProfile::Counter * codegen_timer()
const ColumnType & intermediate_type() const
bool needs_serialize_
Contains any evaluators that require the serialize step.
int64_t num_rows_returned_
Tuple * GetTuple(int tuple_idx)
RuntimeProfile::Counter * partitions_created_
Total number of partitions created.
void Close()
Must be called once at the end to cleanup all resources. Idempotent.
HashTable::Iterator output_iterator_
Tuple * ConstructIntermediateTuple(const std::vector< impala_udf::FunctionContext * > &agg_fn_ctxs, MemPool *pool, BufferedTupleStream *stream)
BufferedBlockMgr * block_mgr()
MemTracker * mem_tracker()
Status MemLimitTooLowError(Client *client)
static const char * LLVM_CLASS_NAME
Tuple * GetOutputTuple(const std::vector< impala_udf::FunctionContext * > &agg_fn_ctxs, Tuple *tuple, MemPool *pool)
RuntimeProfile::Counter * num_hash_buckets_
Total number of hash buckets across all partitions.
boost::scoped_ptr< BufferedTupleStream > unaggregated_row_stream
Unaggregated rows that are spilled.
boost::scoped_ptr< MemPool > agg_fn_pool
Utility struct that wraps a variable name and llvm type.
boost::scoped_ptr< RuntimeProfile > runtime_profile_
void Close(bool finalize_rows)
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
bool singleton_output_tuple_returned_
A tuple with 0 materialised slots is represented as NULL.
void * SetToMin(const ColumnType &type)
Sets the value for type to min and returns a pointer to the data.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
std::string DebugString() const
virtual Status Init(const TPlanNode &tnode)
TupleRow * GetRow(int row_idx)
boost::scoped_ptr< RowDescriptor > intermediate_row_desc_
Row with the intermediate tuple as its only tuple.
const std::string & merge_symbol() const
The materialized value returned by ExprContext::GetValue().
int num_pinned_buffers(Client *client) const
#define ADD_TIMER(profile, name)
RuntimeProfile::Counter * num_spilled_partitions_
Number of partitions that have been spilled.
Status RegisterClient(int num_reserved_buffers, MemTracker *tracker, RuntimeState *state, Client **client)
boost::scoped_ptr< BufferedTupleStream > serialize_stream_
void CleanupHashTbl(const std::vector< impala_udf::FunctionContext * > &fn_ctxs, HashTable::Iterator it)
Calls finalizes on all tuples starting at 'it'.
llvm::Value * ToNativeValue()
void * GetSlot(int offset)
bool is_count_star() const
std::vector< impala_udf::FunctionContext * > agg_fn_ctxs_
const std::vector< SlotDescriptor * > & slots() const
static const int PARTITION_FANOUT
Number of initial partitions to create. Must be a power of 2.
llvm::Type * boolean_type()
Simple wrappers to reduce code verbosity.
Status Spill(Tuple *tuple=NULL)
Status CreateHashPartitions(int level)
bool contains_var_len_grouping_exprs_
static Status Create(ObjectPool *pool, const TExpr &desc, AggFnEvaluator **result)
void AddDetail(const std::string &msg)
Add a detail string. Calling this method is only defined on a non-OK message.
llvm::StructType * GenerateLlvmStruct(LlvmCodeGen *codegen)
const RowDescriptor & row_desc() const
static Tuple * Create(int size, MemPool *pool)
initialize individual tuple with data residing in mem pool
BufferedBlockMgr::Client * block_mgr_client_
static llvm::Type * GetUnloweredPtrType(LlvmCodeGen *cg, const ColumnType &type)
#define COUNTER_ADD(c, v)
const NullIndicatorOffset & null_indicator_offset() const
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Tuple * singleton_output_tuple_
void * SetToMax(const ColumnType &type)
Sets the value for type to max and returns a pointer to the data.
AggregationOp agg_op() const
TupleDescriptor * GetTupleDescriptor(TupleId id) const
RuntimeProfile::HighWaterMarkCounter * largest_partition_percent_
static CodegenAnyVal GetNonNullVal(LlvmCodeGen *codegen, LlvmCodeGen::LlvmBuilder *builder, const ColumnType &type, const char *name="")
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
RuntimeProfile::Counter * num_row_repartitioned_
Number of rows that have been repartitioned.
virtual Status Init(const TPlanNode &tnode)
bool IsNull(const NullIndicatorOffset &offset) const
Status SpillPartition(Partition *curr_partition=NULL, Tuple *curr_intermediate_tuple=NULL)
TPlanNodeType::type type() const
LLVM code generator. This is the top level object to generate jitted code.
bool InitHashTable()
Initializes the hash table. Returns false on OOM.
void ClearReservations(Client *client)
Clears all reservations for this client.
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
MemTracker * expr_mem_tracker()
static const char * LLVM_CLASS_NAME
int64_t LargestSpilledPartition() const
static const char * LLVM_CLASS_NAME
void AddArgument(const NamedVariable &var)
Add argument.
static const char * LLVM_CLASS_NAME
For C++/IR interop, we need to be able to look up types by name.
void Serialize(FunctionContext *agg_fn_ctx, Tuple *dst)
const ColumnType & type() const
boost::scoped_ptr< MemPool > mem_pool_
static const int NUM_PARTITIONING_BITS
ObjectPool * obj_pool() const
TupleId intermediate_tuple_id_
Tuple into which Update()/Merge()/Serialize() results are stored.
static const char * LLVM_FUNCTIONCONTEXT_NAME
boost::scoped_ptr< HashTable > hash_tbl
#define RETURN_IF_CANCELLED(state)
void Init(FunctionContext *agg_fn_ctx, Tuple *dst)
Functions for different phases of the aggregation.
virtual Status Prepare(RuntimeState *state)
#define ADD_COUNTER(profile, name, unit)
uint32_t fragment_hash_seed() const
llvm::Function * GetFunction(IRFunction::Type)
void IR_ALWAYS_INLINE Next()
Iterates to the next element. It should be called only if !AtEnd().
void AddFunctionToJit(llvm::Function *fn, void **fn_ptr)
llvm::Function * CodegenUpdateNull(LlvmCodeGen *, llvm::StructType *tuple, bool set_null)
static void Write(const void *value, Tuple *tuple, const SlotDescriptor *slot_desc, MemPool *pool)
void AddExprCtxsToFree(const std::vector< ExprContext * > &ctxs)
static const int MAX_PARTITION_DEPTH
virtual Status QueryMaintenance(RuntimeState *state)
This is the superclass of all expr evaluation nodes.
virtual void Close(RuntimeState *state)
RuntimeProfile::HighWaterMarkCounter * max_partition_level_
Level of max partition (i.e. number of repartitioning steps).
const std::string & fn_name() const
const DescriptorTbl & desc_tbl() const
bool using_small_buffers_
If true, the partitions in hash_partitions_ are using small buffers.
virtual Status Reset(RuntimeState *state)
std::vector< ExecNode * > children_
PartitionedAggregationNode * parent
std::vector< Partition * > hash_partitions_
Current partitions we are partitioning into.
RuntimeProfile::Counter * build_timer_
Time spent processing the child rows.
int64_t rows_returned() const
#define COUNTER_SET(c, v)
llvm::Function * CodegenUpdateTuple()
Codegen UpdateTuple(). Returns NULL if codegen is unsuccessful.
std::list< Partition * > aggregated_partitions_
MemPool * tuple_data_pool()
RuntimeProfile::Counter * rows_returned_counter_
static const Status MEM_LIMIT_EXCEEDED
TupleDescriptor * intermediate_tuple_desc_
llvm::Function * CodegenProcessBatch()
Status SetMemLimitExceeded(MemTracker *tracker=NULL, int64_t failed_allocation_size=0)
RuntimeProfile::Counter * num_repartitions_
Number of partitions that have been repartitioned.
const std::string & update_symbol() const
void SetTuple(int tuple_idx, Tuple *tuple)
const ColumnType & type() const
llvm::Value * value()
Returns the current type-lowered value.
const bool needs_finalize_
void FreeLocalAllocations()
void SetErrorMsg(const ErrorMsg &m)
static int64_t NextPowerOfTwo(int64_t v)
TupleDescriptor * output_tuple_desc_
bool codegen_enabled() const
Returns true if codegen is enabled for this query.
bool AddRow(TupleRow *row, uint8_t **dst=NULL)
HighWaterMarkCounter * AddHighWaterMarkCounter(const std::string &name, TUnit::type unit, const std::string &parent_counter_name="")
Reference to a single slot of a tuple.
uint8_t * AllocateRow(int size)
std::vector< impala_udf::FunctionContext * > agg_fn_ctxs
Clone of parent's agg_fn_ctxs_ and backing MemPool.
ProcessRowBatchFn process_row_batch_fn_
Jitted ProcessRowBatch function pointer. Null if codegen is disabled.
llvm::Value * GetVal(const char *name="val")
Status PrepareForRead(bool *got_buffer=NULL)
boost::scoped_ptr< BufferedTupleStream > aggregated_row_stream
virtual Status GetCodegendComputeFn(RuntimeState *state, llvm::Function **fn)=0
llvm::Type * GetType(const ColumnType &type)
Returns llvm type for the column type.
Status GetCodegen(LlvmCodeGen **codegen, bool initialize=true)
int64_t num_rows() const
Number of rows in the stream.
RuntimeProfile::Counter * get_results_timer_
Time spent returning the aggregated rows.
int field_idx() const
Returns the field index in the generated llvm struct for this slot's tuple.
llvm::Value * GetIsNull(const char *name="is_null")
Gets the 'is_null' field of the *Val.
llvm::Value * GetIntConstant(PrimitiveType type, int64_t val)
Returns the constant 'val' of 'type'.
virtual Status QueryMaintenance(RuntimeState *state)
Frees local allocations from aggregate_evaluators_ and agg_fn_ctxs.
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
llvm::Function * FinalizeFunction(llvm::Function *function)
Status ProcessBatchNoGrouping(RowBatch *batch, HashTableCtx *ht_ctx=NULL)
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
bool is_materialized() const
virtual Status Prepare(RuntimeState *state)
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
boost::scoped_ptr< MemPool > agg_fn_pool_
std::vector< AggFnEvaluator * > aggregate_evaluators_
virtual Status Open(RuntimeState *state)
std::vector< ExprContext * > probe_expr_ctxs_
Exprs used to evaluate input rows.
llvm::Function * ReplaceCallSites(llvm::Function *caller, bool update_in_place, llvm::Function *new_fn, const std::string &target_name, int *num_replaced)
string PrintRow(TupleRow *row, const RowDescriptor &d)
std::vector< ExprContext * > build_expr_ctxs_
Status ProcessStream(BufferedTupleStream *input_stream)
Reads all the rows from input_stream and process them by calling ProcessBatch().
std::vector< ExprContext * > conjunct_ctxs_
virtual void Close(RuntimeState *state)
llvm::LLVMContext & context()
Status GetNext(RowBatch *batch, bool *eos, std::vector< RowIdx > *indices=NULL)
virtual std::string DebugString() const
llvm::Function * OptimizeFunctionWithExprs(llvm::Function *fn)
void UpdateTuple(impala_udf::FunctionContext **agg_fn_ctxs, Tuple *tuple, TupleRow *row, bool is_merge=false)
virtual Status Open(RuntimeState *state)
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
boost::scoped_ptr< HashTableCtx > ht_ctx_
llvm::Function * CodegenUpdateSlot(AggFnEvaluator *evaluator, SlotDescriptor *slot_desc)
RuntimeProfile::Counter * ht_resize_timer_
Total time spent resizing hash tables.
PartitionedAggregationNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
RuntimeProfile * runtime_profile()
void Finalize(FunctionContext *agg_fn_ctx, Tuple *src, Tuple *dst)
const std::vector< ExprContext * > & input_expr_ctxs() const