19 #include <boost/functional/hash.hpp>
20 #include <thrift/protocol/TDebugProtocol.h>
22 #include <x86intrin.h>
43 #include "gen-cpp/Exprs_types.h"
44 #include "gen-cpp/PlanNodes_types.h"
48 using namespace impala;
59 intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
60 intermediate_tuple_desc_(NULL),
61 output_tuple_id_(tnode.agg_node.output_tuple_id),
62 output_tuple_desc_(NULL),
63 singleton_intermediate_tuple_(NULL),
64 codegen_process_row_batch_fn_(NULL),
65 process_row_batch_fn_(NULL),
66 needs_finalize_(tnode.agg_node.need_finalize),
68 get_results_timer_(NULL),
69 hash_table_buckets_counter_(NULL) {
76 for (
int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) {
79 pool_, tnode.agg_node.aggregate_functions[i], &evaluator));
160 if (update_tuple_fn != NULL) {
189 int64_t num_input_rows = 0;
198 for (
int i = 0; i < batch.num_rows(); ++i) {
212 num_input_rows += batch.num_rows();
225 VLOG_FILE <<
"aggregated " << num_input_rows <<
" input rows into "
250 if (count++ % N == 0) {
254 int row_idx = row_batch->
AddRow();
257 Tuple* output_tuple =
274 DCHECK(
false) <<
"NYI";
285 Tuple* dummy_dst = NULL;
317 vector<SlotDescriptor*>::const_iterator slot_desc =
322 if (
hash_tbl_->last_expr_value_null(i)) {
323 intermediate_tuple->
SetNull((*slot_desc)->null_indicator_offset());
325 void* src =
hash_tbl_->last_expr_value(i);
326 void* dst = intermediate_tuple->
GetSlot((*slot_desc)->tuple_offset());
333 while (!(*slot_desc)->is_materialized()) ++slot_desc;
348 (*slot_desc)->type().type !=
TYPE_CHAR &&
351 void* default_value_ptr = NULL;
352 switch (evaluator->
agg_op()) {
354 default_value_ptr = default_value.
SetToMax((*slot_desc)->type());
355 RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
358 default_value_ptr = default_value.
SetToMin((*slot_desc)->type());
359 RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
366 return intermediate_tuple;
389 for (
int i = 0; i < num_grouping_slots; ++i) {
393 void* src_slot = NULL;
402 *out << string(indentation_level * 2,
' ');
403 *out <<
"AggregationNode("
415 case TYPE_BOOLEAN:
return IRFunction::HLL_UPDATE_BOOLEAN;
416 case TYPE_TINYINT:
return IRFunction::HLL_UPDATE_TINYINT;
418 case TYPE_INT:
return IRFunction::HLL_UPDATE_INT;
419 case TYPE_BIGINT:
return IRFunction::HLL_UPDATE_BIGINT;
420 case TYPE_FLOAT:
return IRFunction::HLL_UPDATE_FLOAT;
421 case TYPE_DOUBLE:
return IRFunction::HLL_UPDATE_DOUBLE;
422 case TYPE_STRING:
return IRFunction::HLL_UPDATE_STRING;
423 case TYPE_DECIMAL:
return IRFunction::HLL_UPDATE_DECIMAL;
425 DCHECK(
false) <<
"Unsupported type: " << type;
426 return IRFunction::FN_END;
514 Expr* input_expr = input_expr_ctx->root();
517 Function* agg_expr_fn;
523 DCHECK(agg_expr_fn != NULL);
525 PointerType* fn_ctx_type =
528 PointerType* tuple_ptr_type = PointerType::get(tuple_struct, 0);
539 Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
540 Value* fn_ctx_arg = args[0];
541 Value* agg_tuple_arg = args[1];
542 Value* row_arg = args[2];
544 BasicBlock* src_not_null_block =
545 BasicBlock::Create(codegen->context(),
"src_not_null", fn);
546 BasicBlock* ret_block = BasicBlock::Create(codegen->context(),
"ret", fn);
549 Value* ctx_arg = codegen->CastPtrToLlvmPtr(
551 Value* agg_expr_fn_args[] = { ctx_arg, row_arg };
553 codegen, &builder, input_expr->
type(), agg_expr_fn, agg_expr_fn_args,
"src");
556 builder.CreateCondBr(src_is_null, ret_block, src_not_null_block);
559 builder.SetInsertPoint(src_not_null_block);
561 builder.CreateStructGEP(agg_tuple_arg, slot_desc->
field_idx(),
"dst_slot_ptr");
562 Value* result = NULL;
566 Function* clear_null_fn = slot_desc->
CodegenUpdateNull(codegen, tuple_struct,
false);
567 builder.CreateCall(clear_null_fn, agg_tuple_arg);
571 Value* dst_value = builder.CreateLoad(dst_ptr,
"dst_val");
572 switch (evaluator->
agg_op()) {
575 result = builder.CreateAdd(dst_value, src.
GetVal(),
"count_sum");
577 result = builder.CreateAdd(dst_value,
578 codegen->GetIntConstant(
TYPE_BIGINT, 1),
"count_inc");
582 Function* min_fn = codegen->CodegenMinMax(slot_desc->
type(),
true);
583 Value* min_args[] = { dst_value, src.
GetVal() };
584 result = builder.CreateCall(min_fn, min_args,
"min_value");
588 Function* max_fn = codegen->CodegenMinMax(slot_desc->
type(),
false);
589 Value* max_args[] = { dst_value, src.
GetVal() };
590 result = builder.CreateCall(max_fn, max_args,
"max_value");
595 result = builder.CreateFAdd(dst_value, src.
GetVal());
597 result = builder.CreateAdd(dst_value, src.
GetVal());
602 IRFunction::Type ir_function_type = evaluator->
is_merge() ? IRFunction::HLL_MERGE
604 Function* hll_fn = codegen->GetFunction(ir_function_type);
608 Value* src_lowered_ptr = codegen->CreateEntryBlockAlloca(
610 builder.CreateStore(src.
value(), src_lowered_ptr);
611 Type* unlowered_ptr_type =
613 Value* src_unlowered_ptr =
614 builder.CreateBitCast(src_lowered_ptr, unlowered_ptr_type,
"src_unlowered_ptr");
622 Value* dst_lowered_ptr = codegen->CreateEntryBlockAlloca(
624 dst_stringval.
value()->getType()));
625 builder.CreateStore(dst_stringval.
value(), dst_lowered_ptr);
628 Value* dst_unlowered_ptr =
629 builder.CreateBitCast(dst_lowered_ptr, unlowered_ptr_type,
"dst_unlowered_ptr");
632 builder.CreateCall3(hll_fn, fn_ctx_arg, src_unlowered_ptr, dst_unlowered_ptr);
635 Value* anyval_result = builder.CreateLoad(dst_lowered_ptr,
"anyval_result");
641 DCHECK(
false) <<
"bad aggregate operator: " << evaluator->
agg_op();
644 builder.CreateStore(result, dst_ptr);
645 builder.CreateBr(ret_block);
647 builder.SetInsertPoint(ret_block);
648 builder.CreateRetVoid();
650 return codegen->FinalizeFunction(fn);
701 VLOG_QUERY <<
"Could not codegen UpdateIntermediateTuple because "
702 <<
"string, char, timestamp and decimal are not yet supported.";
711 VLOG_QUERY <<
"Could not codegen UpdateTuple because we could"
712 <<
"not generate a matching llvm struct for the intermediate tuple.";
721 DCHECK(agg_node_type != NULL);
722 DCHECK(agg_tuple_type != NULL);
723 DCHECK(tuple_row_type != NULL);
725 PointerType* agg_node_ptr_type = PointerType::get(agg_node_type, 0);
726 PointerType* agg_tuple_ptr_type = PointerType::get(agg_tuple_type, 0);
727 PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
733 PointerType* tuple_ptr = PointerType::get(tuple_struct, 0);
741 Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
745 args[1] = builder.CreateBitCast(args[1], tuple_ptr,
"tuple");
763 Value* slot_ptr = builder.CreateStructGEP(args[1], field_idx,
"src_slot");
764 Value* slot_loaded = builder.CreateLoad(slot_ptr,
"count_star_val");
765 Value* count_inc = builder.CreateAdd(slot_loaded, const_one,
"count_star_inc");
766 builder.CreateStore(count_inc, slot_ptr);
769 if (update_slot_fn == NULL)
return NULL;
773 builder.CreateCall3(update_slot_fn, fn_ctx_arg, args[1], args[2]);
776 builder.CreateRetVoid();
787 DCHECK(update_tuple_fn != NULL);
791 IRFunction::AGG_NODE_PROCESS_ROW_BATCH_WITH_GROUPING :
792 IRFunction::AGG_NODE_PROCESS_ROW_BATCH_NO_GROUPING);
793 Function* process_batch_fn = codegen->
GetFunction(ir_fn);
795 if (process_batch_fn == NULL) {
796 LOG(ERROR) <<
"Could not find AggregationNode::ProcessRowBatch in module.";
805 Function* hash_fn =
hash_tbl_->CodegenHashCurrentRow(state);
806 if (hash_fn == NULL)
return NULL;
809 Function* equals_fn =
hash_tbl_->CodegenEquals(state);
810 if (equals_fn == NULL)
return NULL;
813 Function* eval_build_row_fn =
hash_tbl_->CodegenEvalTupleRow(state,
true);
814 if (eval_build_row_fn == NULL)
return NULL;
817 Function* eval_probe_row_fn =
hash_tbl_->CodegenEvalTupleRow(state,
false);
818 if (eval_probe_row_fn == NULL)
return NULL;
822 eval_build_row_fn,
"EvalBuildRow", &replaced);
823 DCHECK_EQ(replaced, 1);
826 eval_probe_row_fn,
"EvalProbeRow", &replaced);
827 DCHECK_EQ(replaced, 1);
830 hash_fn,
"HashCurrentRow", &replaced);
831 DCHECK_EQ(replaced, 2);
834 equals_fn,
"Equals", &replaced);
835 DCHECK_EQ(replaced, 1);
839 update_tuple_fn,
"UpdateTuple", &replaced);
840 DCHECK_EQ(replaced, 1) <<
"One call site should be replaced.";
841 DCHECK(process_batch_fn != NULL);
static const char * LLVM_CLASS_NAME
TupleDescriptor * output_tuple_desc_
void SetFromRawValue(llvm::Value *raw_val)
void AddRuntimeExecOption(const std::string &option)
Appends option to 'runtime_exec_options_'.
const std::string GetDetail() const
static CodegenAnyVal CreateCallWrapped(LlvmCodeGen *cg, LlvmCodeGen::LlvmBuilder *builder, const ColumnType &type, llvm::Function *fn, llvm::ArrayRef< llvm::Value * > args, const char *name="", llvm::Value *result_ptr=NULL)
Same as above but wraps the result in a CodegenAnyVal.
void SetNull(const NullIndicatorOffset &offset)
RuntimeProfile::Counter * codegen_timer()
llvm::PointerType * GetPtrType(llvm::Type *type)
Return a pointer type to 'type'.
int64_t num_rows_returned_
virtual Status Prepare(RuntimeState *state)
std::vector< ExprContext * > build_expr_ctxs_
MemTracker * mem_tracker()
llvm::Function * CodegenUpdateSlot(RuntimeState *state, AggFnEvaluator *evaluator, SlotDescriptor *slot_desc)
AggregationNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
Utility struct that wraps a variable name and llvm type.
boost::scoped_ptr< RuntimeProfile > runtime_profile_
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
RuntimeProfile::Counter * build_timer_
Time spent processing the child rows.
A tuple with 0 materialised slots is represented as NULL.
void * SetToMin(const ColumnType &type)
Sets the value for type to min and returns a pointer to the data.
static llvm::Type * GetUnloweredType(LlvmCodeGen *cg, const ColumnType &type)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
std::string DebugString() const
virtual Status Init(const TPlanNode &tnode)
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
TupleRow * GetRow(int row_idx)
bool AtEnd() const
Returns true if this iterator is at the end, i.e. GetRow() cannot be called.
The materialized value returned by ExprContext::GetValue().
#define ADD_TIMER(profile, name)
TupleId intermediate_tuple_id_
Tuple into which Update()/Merge()/Serialize() results are stored.
llvm::Value * ToNativeValue()
void * GetSlot(int offset)
bool is_count_star() const
const std::vector< SlotDescriptor * > & slots() const
void Add(FunctionContext *agg_fn_ctx, TupleRow *src, Tuple *dst)
static Status Create(ObjectPool *pool, const TExpr &desc, AggFnEvaluator **result)
llvm::StructType * GenerateLlvmStruct(LlvmCodeGen *codegen)
const RowDescriptor & row_desc() const
static Tuple * Create(int size, MemPool *pool)
initialize individual tuple with data residing in mem pool
const NullIndicatorOffset & null_indicator_offset() const
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Tuple * ConstructIntermediateTuple()
void * SetToMax(const ColumnType &type)
Sets the value for type to max and returns a pointer to the data.
AggregationOp agg_op() const
TupleDescriptor * GetTupleDescriptor(TupleId id) const
static CodegenAnyVal GetNonNullVal(LlvmCodeGen *codegen, LlvmCodeGen::LlvmBuilder *builder, const ColumnType &type, const char *name="")
Tuple * singleton_intermediate_tuple_
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
bool IsNull(const NullIndicatorOffset &offset) const
virtual Status Reset(RuntimeState *state)
LLVM code generator. This is the top level object to generate jitted code.
llvm::Function * CodegenUpdateTuple(RuntimeState *state)
Codegen UpdateTuple(). Returns NULL if codegen is unsuccessful.
void ProcessRowBatchWithGrouping(RowBatch *batch)
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
MemTracker * expr_mem_tracker()
static const char * LLVM_CLASS_NAME
static const char * LLVM_CLASS_NAME
llvm::Value * CastPtrToLlvmPtr(llvm::Type *type, const void *ptr)
void AddArgument(const NamedVariable &var)
Add argument.
std::vector< ExprContext * > probe_expr_ctxs_
Exprs used to evaluate input rows.
static const char * LLVM_CLASS_NAME
For C++/IR interop, we need to be able to look up types by name.
void Serialize(FunctionContext *agg_fn_ctx, Tuple *dst)
const ColumnType & type() const
ObjectPool * obj_pool() const
The hash table does not support removes. The hash table is not thread safe.
static const char * LLVM_FUNCTIONCONTEXT_NAME
#define RETURN_IF_CANCELLED(state)
void Init(FunctionContext *agg_fn_ctx, Tuple *dst)
Functions for different phases of the aggregation.
virtual Status Prepare(RuntimeState *state)
#define ADD_COUNTER(profile, name, unit)
void UpdateTuple(Tuple *tuple, TupleRow *row)
llvm::Function * GetFunction(IRFunction::Type)
void ProcessRowBatchNoGrouping(RowBatch *batch)
Do the aggregation for all tuple rows in the batch.
llvm::Function * codegen_process_row_batch_fn_
IR for process row batch. NULL if codegen is disabled.
void AddFunctionToJit(llvm::Function *fn, void **fn_ptr)
llvm::Function * CodegenUpdateNull(LlvmCodeGen *, llvm::StructType *tuple, bool set_null)
static void Write(const void *value, Tuple *tuple, const SlotDescriptor *slot_desc, MemPool *pool)
virtual Status QueryMaintenance(RuntimeState *state)
This is the superclass of all expr evaluation nodes.
const DescriptorTbl & desc_tbl() const
std::vector< ExecNode * > children_
#define COUNTER_SET(c, v)
virtual Status Open(RuntimeState *state)
MemPool * tuple_data_pool()
RuntimeProfile::Counter * rows_returned_counter_
std::vector< AggFnEvaluator * > aggregate_evaluators_
void IR_ALWAYS_INLINE Next()
IRFunction::Type GetHllUpdateFunction2(const ColumnType &type)
void SetTuple(int tuple_idx, Tuple *tuple)
const ColumnType & type() const
llvm::Value * value()
Returns the current type-lowered value.
bool codegen_enabled() const
Returns true if codegen is enabled for this query.
Reference to a single slot of a tuple.
llvm::Value * GetVal(const char *name="val")
virtual Status GetCodegendComputeFn(RuntimeState *state, llvm::Function **fn)=0
llvm::Type * GetType(const ColumnType &type)
Returns llvm type for the column type.
std::vector< impala_udf::FunctionContext * > agg_fn_ctxs_
FunctionContext for each agg fn and backing pool.
Status GetCodegen(LlvmCodeGen **codegen, bool initialize=true)
Tuple * FinalizeTuple(Tuple *tuple, MemPool *pool)
int field_idx() const
Returns the field index in the generated llvm struct for this slot's tuple.
llvm::Value * GetIsNull(const char *name="is_null")
Gets the 'is_null' field of the *Val.
llvm::Value * GetIntConstant(PrimitiveType type, int64_t val)
Returns the constant 'val' of 'type'.
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
llvm::Function * FinalizeFunction(llvm::Function *function)
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
bool is_materialized() const
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
OldHashTable::Iterator output_iterator_
boost::scoped_ptr< MemPool > agg_fn_pool_
virtual Status Open(RuntimeState *state)
llvm::Function * ReplaceCallSites(llvm::Function *caller, bool update_in_place, llvm::Function *new_fn, const std::string &target_name, int *num_replaced)
RuntimeProfile::Counter * get_results_timer_
Time spent returning the aggregated rows.
string PrintRow(TupleRow *row, const RowDescriptor &d)
RuntimeProfile::Counter * hash_table_load_factor_counter_
Load factor in hash table.
ProcessRowBatchFn process_row_batch_fn_
Jitted ProcessRowBatch function pointer. Null if codegen is disabled.
TupleDescriptor * intermediate_tuple_desc_
RuntimeProfile::Counter * hash_table_buckets_counter_
Num buckets in hash table.
boost::scoped_ptr< MemPool > tuple_pool_
std::vector< ExprContext * > conjunct_ctxs_
virtual void Close(RuntimeState *state)
llvm::LLVMContext & context()
virtual std::string DebugString() const
llvm::Function * OptimizeFunctionWithExprs(llvm::Function *fn)
llvm::Function * CodegenProcessRowBatch(RuntimeState *state, llvm::Function *update_tuple_fn)
virtual void Close(RuntimeState *state)
boost::scoped_ptr< OldHashTable > hash_tbl_
RuntimeProfile * runtime_profile()
virtual Status Init(const TPlanNode &tnode)
void Finalize(FunctionContext *agg_fn_ctx, Tuple *src, Tuple *dst)
const std::vector< ExprContext * > & input_expr_ctxs() const