27 #include "gen-cpp/PlanNodes_types.h"
29 using namespace boost;
30 using namespace impala;
34 const char* HashJoinNode::LLVM_CLASS_NAME =
"class.impala::HashJoinNode";
36 HashJoinNode::HashJoinNode(
39 join_op_(tnode.hash_join_node.join_op),
41 codegen_process_build_batch_fn_(NULL),
42 process_build_batch_fn_(NULL),
43 codegen_process_probe_batch_fn_(NULL),
44 process_probe_batch_fn_(NULL) {
48 <<
"HashJoinNode c'tor: Init() failed:\n"
49 << status.GetErrorMsg();
52 (
join_op_ == TJoinOp::LEFT_OUTER_JOIN ||
join_op_ == TJoinOp::FULL_OUTER_JOIN);
55 (
join_op_ == TJoinOp::RIGHT_OUTER_JOIN ||
join_op_ == TJoinOp::FULL_OUTER_JOIN);
59 DCHECK(tnode.__isset.hash_join_node);
60 const vector<TEqJoinCondition>& eq_join_conjuncts =
61 tnode.hash_join_node.eq_join_conjuncts;
62 for (
int i = 0; i < eq_join_conjuncts.size(); ++i) {
65 probe_exprs_.push_back(expr);
67 build_exprs_.push_back(expr);
71 &other_join_conjuncts_));
75 HashJoinNode::~HashJoinNode() {
108 build_tuple_idx_.reserve(build_tuple_size_);
109 for (
int i = 0; i < build_tuple_size_; ++i) {
118 false,
id(), *state->mem_limits()));
123 if (codegen != NULL) {
125 Function* hash_fn =
hash_tbl_->CodegenHashCurrentRow(codegen);
143 if (memory_used_counter_ != NULL &&
hash_tbl_.get() != NULL) {
144 COUNTER_UPDATE(memory_used_counter_,
build_pool_->peak_allocated_bytes());
145 COUNTER_UPDATE(memory_used_counter_,
hash_tbl_->byte_size());
151 status->set_value(ConstructHashTable(state));
167 build_pool_->AcquireData(build_batch.tuple_data_pool(),
false);
168 RETURN_IF_LIMIT_EXCEEDED(state);
193 LOG(WARNING) <<
"Codegen for HashJoinNode (node_id=" <<
id()
194 <<
") was not supported for this query.";
196 void* jitted_process_build_batch =
198 DCHECK(jitted_process_build_batch != NULL);
201 LOG(INFO) <<
"HashJoinNode(node_id=" <<
id()
202 <<
") using llvm codegend function for building hash table.";
205 if (codegen_process_probe_batch_fn_ == NULL) {
206 LOG(WARNING) <<
"Codegen for HashJoinNode (node_id=" <<
id()
207 <<
") was not supported for this query.";
209 void* jitted_process_probe_batch =
210 state->llvm_codegen()->JitFunction(codegen_process_probe_batch_fn_);
211 DCHECK(jitted_process_probe_batch != NULL);
214 LOG(INFO) <<
"HashJoinNode(node_id=" <<
id()
215 <<
") using llvm codegend function for probing hash table.";
223 promise<Status> thread_status;
224 Thread build_thread(
"hash-join-node",
"build thread",
282 Expr*
const* other_conjuncts = &other_join_conjuncts_[0];
283 int num_other_conjuncts = other_join_conjuncts_.size();
285 Expr*
const* conjuncts = &conjuncts_[0];
286 int num_conjuncts = conjuncts_.size();
301 int row_idx = out_batch->
AddRow();
304 if (!
EvalConjuncts(other_conjuncts, num_other_conjuncts, out_row))
continue;
310 joined_build_rows_.insert(matched_build_row);
311 VLOG_ROW <<
"joined build row: " << matched_build_row;
328 int row_idx = out_batch->
AddRow();
347 probe_batch_pos_ = 0;
395 if (joined_build_rows_.find(build_row) != joined_build_rows_.end()) {
398 int row_idx = out_batch->
AddRow();
448 probe_batch_pos_ = 0;
449 if (out_batch->IsFull())
break;
465 string HashJoinNode::GetProbeRowOutputString(
TupleRow* probe_row) {
468 int* build_tuple_idx_ptr_ = &build_tuple_idx_[0];
470 if (i != 0) out <<
" ";
472 int* is_build_tuple =
473 ::find(build_tuple_idx_ptr_, build_tuple_idx_ptr_ + build_tuple_size_, i);
475 if (is_build_tuple != build_tuple_idx_ptr_ + build_tuple_size_) {
486 *out << string(indentation_level * 2,
' ');
487 *out <<
"HashJoin(eos=" << (
eos_ ?
"true" :
"false")
488 <<
" probe_batch_pos=" << probe_batch_pos_
490 *out << string(indentation_level * 2,
' ');
502 memset(out, 0, result_tuple_row_size_);
504 memcpy(out, probe, result_tuple_row_size_);
508 for (
int i = 0; i < build_tuple_size_; ++i) {
512 for (
int i = 0; i < build_tuple_size_; ++i) {
513 out->
SetTuple(build_tuple_idx_[i], NULL);
547 DCHECK(tuple_row_type != NULL);
548 PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
551 DCHECK(this_type != NULL);
552 PointerType* this_ptr_type = PointerType::get(this_type, 0);
556 PointerType* tuple_row_working_type = PointerType::get(codegen->
ptr_type(), 0);
565 LLVMContext& context = codegen->
context();
568 Function* fn = prototype.GeneratePrototype(&builder, args);
569 Value* out_row_arg = builder.CreateBitCast(args[1], tuple_row_working_type,
"out");
570 Value* probe_row_arg = builder.CreateBitCast(args[2], tuple_row_working_type,
"probe");
571 Value* build_row_arg = builder.CreateBitCast(args[3], tuple_row_working_type,
"build");
574 codegen->
CodegenMemcpy(&builder, out_row_arg, probe_row_arg, result_tuple_row_size_);
577 BasicBlock* build_not_null_block = BasicBlock::Create(context,
"build_not_null", fn);
578 BasicBlock* build_null_block = NULL;
582 build_null_block = BasicBlock::Create(context,
"build_null", fn);
583 Value* is_build_null = builder.CreateIsNull(build_row_arg,
"is_build_null");
584 builder.CreateCondBr(is_build_null, build_null_block, build_not_null_block);
587 builder.SetInsertPoint(build_null_block);
588 for (
int i = 0; i < build_tuple_size_; ++i) {
590 Value* dst = builder.CreateGEP(out_row_arg, array_idx,
"dst_tuple_ptr");
593 builder.CreateRetVoid();
596 builder.CreateBr(build_not_null_block);
600 builder.SetInsertPoint(build_not_null_block);
601 for (
int i = 0; i < build_tuple_size_; ++i) {
604 Value* dst = builder.CreateGEP(out_row_arg, dst_idx,
"dst_tuple_ptr");
605 Value* src = builder.CreateGEP(build_row_arg, src_idx,
"src_tuple_ptr");
606 builder.CreateStore(builder.CreateLoad(src), dst);
608 builder.CreateRetVoid();
616 Function* process_build_batch_fn = codegen->
GetFunction(
617 IRFunction::HASH_JOIN_PROCESS_BUILD_BATCH);
618 DCHECK(process_build_batch_fn != NULL);
621 Function* eval_row_fn =
hash_tbl_->CodegenEvalTupleRow(codegen,
true);
622 if (eval_row_fn == NULL)
return NULL;
626 process_build_batch_fn = codegen->
ReplaceCallSites(process_build_batch_fn,
false,
627 eval_row_fn,
"EvalBuildRow", &replaced);
628 DCHECK_EQ(replaced, 1);
630 process_build_batch_fn = codegen->
ReplaceCallSites(process_build_batch_fn,
false,
631 hash_fn,
"HashCurrentRow", &replaced);
632 DCHECK_EQ(replaced, 1);
640 Function* process_probe_batch_fn = codegen->
GetFunction(
641 IRFunction::HASH_JOIN_PROCESS_PROBE_BATCH);
642 DCHECK(process_probe_batch_fn != NULL);
645 Function* equals_fn =
hash_tbl_->CodegenEquals(codegen);
646 if (equals_fn == NULL)
return NULL;
649 Function* eval_row_fn =
hash_tbl_->CodegenEvalTupleRow(codegen,
false);
650 if (eval_row_fn == NULL)
return NULL;
654 if (create_output_row_fn == NULL)
return NULL;
658 if (join_conjuncts_fn == NULL)
return NULL;
662 if (conjuncts_fn == NULL)
return NULL;
666 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
667 hash_fn,
"HashCurrentRow", &replaced);
668 DCHECK_EQ(replaced, 1);
670 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
671 eval_row_fn,
"EvalProbeRow", &replaced);
672 DCHECK_EQ(replaced, 1);
674 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
675 create_output_row_fn,
"CreateOutputRow", &replaced);
676 DCHECK_EQ(replaced, 2);
678 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
679 conjuncts_fn,
"EvalConjuncts", &replaced);
680 DCHECK_EQ(replaced, 2);
682 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
683 join_conjuncts_fn,
"EvalOtherJoinConjuncts", &replaced);
684 DCHECK_EQ(replaced, 1);
686 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
687 equals_fn,
"Equals", &replaced);
688 DCHECK_EQ(replaced, 2);
RuntimeProfile::Counter * hash_tbl_load_factor_counter_
int64_t num_rows_returned_
static const char * LLVM_CLASS_NAME
Tuple * GetTuple(int tuple_idx)
OldHashTable::Iterator hash_tbl_iterator_
TODO: Consider allowing fragment IDs as category parameters.
void CreateOutputRow(TupleRow *out_row, TupleRow *probe_row, TupleRow *build_row)
llvm::Function * CodegenProcessProbeBatch(RuntimeState *state, llvm::Function *hash_fn)
Utility struct that wraps a variable name and llvm type.
boost::scoped_ptr< RuntimeProfile > runtime_profile_
A tuple with 0 materialised slots is represented as NULL.
TupleRow * current_probe_row_
#define RETURN_IF_ERROR(stmt)
some generally useful macros
TupleRow * GetRow(int row_idx)
RowDescriptor row_descriptor_
#define ADD_TIMER(profile, name)
RuntimeProfile::Counter * build_timer_
const RowDescriptor & row_desc() const
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
void(* ProcessBuildBatchFn)(HashJoinNode *, RowBatch *)
llvm::Function * CodegenProcessBuildBatch(RuntimeState *state, llvm::Function *hash_fn)
void BuildSideThread(RuntimeState *state, Promise< Status > *status)
int(* ProcessProbeBatchFn)(HashJoinNode *, RowBatch *, RowBatch *, int)
HashJoinNode::ProcessProbeBatch() exactly.
llvm::Value * null_ptr_value()
boost::scoped_ptr< OldHashTable > hash_tbl_
RuntimeProfile::Counter * probe_timer_
LLVM code generator. This is the top level object to generate jitted code.
boost::scoped_ptr< MemPool > build_pool_
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
RuntimeProfile::Counter * probe_row_counter_
static const char * LLVM_CLASS_NAME
virtual Status Prepare(RuntimeState *state)
int ProcessProbeBatch(RowBatch *out_batch, RowBatch *probe_batch, int max_added_rows)
virtual Status Init(const TPlanNode &tnode)
void AddArgument(const NamedVariable &var)
Add argument.
static llvm::Function * CodegenEvalConjuncts(RuntimeState *state, const std::vector< ExprContext * > &conjunct_ctxs, const char *name="EvalConjuncts")
void CodegenMemcpy(LlvmBuilder *, llvm::Value *dst, llvm::Value *src, int size)
RuntimeProfile::Counter * build_buckets_counter_
ProcessProbeBatchFn process_probe_batch_fn_
Jitted ProcessProbeBatch function pointer. Null if codegen is disabled.
#define RETURN_IF_CANCELLED(state)
virtual Status Prepare(RuntimeState *state)
#define ADD_COUNTER(profile, name, unit)
llvm::Function * GetFunction(IRFunction::Type)
This is the superclass of all expr evaluation nodes.
int GetTupleIdx(TupleId id) const
Returns INVALID_IDX if id not part of this row.
void ProcessBuildBatch(RowBatch *build_batch)
Construct the build hash table, adding all the rows in 'build_batch'.
virtual Status Open(RuntimeState *state)
llvm::Function * codegen_process_build_batch_fn_
llvm function for build batch
int64_t rows_returned() const
#define COUNTER_SET(c, v)
static Status CreateExprTree(ObjectPool *pool, const TExpr &texpr, ExprContext **ctx)
RuntimeProfile::Counter * rows_returned_counter_
void IR_ALWAYS_INLINE Next()
llvm::Function * CodegenCreateOutputRow(LlvmCodeGen *codegen)
Codegen function to create output row.
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
void SetTuple(int tuple_idx, Tuple *tuple)
llvm::Type * GetType(const ColumnType &type)
Returns llvm type for the column type.
llvm::Value * GetIntConstant(PrimitiveType type, int64_t val)
Returns the constant 'val' of 'type'.
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
RuntimeProfile::Counter * build_row_counter_
llvm::Function * FinalizeFunction(llvm::Function *function)
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
virtual void Close(RuntimeState *state)
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
virtual Status Open(RuntimeState *state)
boost::scoped_ptr< RowBatch > probe_batch_
llvm::Function * ReplaceCallSites(llvm::Function *caller, bool update_in_place, llvm::Function *new_fn, const std::string &target_name, int *num_replaced)
string PrintRow(TupleRow *row, const RowDescriptor &d)
string PrintTuple(const Tuple *t, const TupleDescriptor &d)
virtual void Close(RuntimeState *state)
Status LeftJoinGetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
llvm::LLVMContext & context()
ProcessBuildBatchFn process_build_batch_fn_
virtual std::string DebugString() const
llvm::Function * OptimizeFunctionWithExprs(llvm::Function *fn)
llvm::PointerType * ptr_type()
bool match_one_build_
Match at most one build row to each probe row. Used in LEFT_SEMI_JOIN.
RuntimeProfile * runtime_profile()