27 #include "gen-cpp/PlanNodes_types.h"
29 using namespace boost;
30 using namespace impala;
34 const char* HashJoinNode::LLVM_CLASS_NAME =
"class.impala::HashJoinNode";
36 HashJoinNode::HashJoinNode(
39 join_op_(tnode.hash_join_node.join_op),
40 codegen_process_build_batch_fn_(NULL),
41 process_build_batch_fn_(NULL),
42 codegen_process_probe_batch_fn_(NULL),
43 process_probe_batch_fn_(NULL) {
47 <<
"HashJoinNode c'tor: Init() failed:\n"
48 << status.GetErrorMsg();
51 (
join_op_ == TJoinOp::LEFT_OUTER_JOIN ||
join_op_ == TJoinOp::FULL_OUTER_JOIN);
54 (
join_op_ == TJoinOp::RIGHT_OUTER_JOIN ||
join_op_ == TJoinOp::FULL_OUTER_JOIN);
58 DCHECK(tnode.__isset.hash_join_node);
59 const vector<TEqJoinCondition>& eq_join_conjuncts =
60 tnode.hash_join_node.eq_join_conjuncts;
61 for (
int i = 0; i < eq_join_conjuncts.size(); ++i) {
64 probe_exprs_.push_back(expr);
66 build_exprs_.push_back(expr);
70 &other_join_conjuncts_));
74 HashJoinNode::~HashJoinNode() {
108 build_tuple_idx_.reserve(build_tuple_size_);
109 for (
int i = 0; i < build_tuple_size_; ++i) {
116 false,
id(), *state->mem_limits()));
122 if (codegen != NULL) {
124 Function* hash_fn =
hash_tbl_->CodegenHashCurrentRow(codegen);
142 if (memory_used_counter_ != NULL &&
hash_tbl_.get() != NULL) {
143 COUNTER_UPDATE(memory_used_counter_,
build_pool_->peak_allocated_bytes());
144 COUNTER_UPDATE(memory_used_counter_,
hash_tbl_->byte_size());
150 status->set_value(ConstructHashTable(state));
170 build_pool_->AcquireData(build_batch.tuple_data_pool(),
false);
171 RETURN_IF_LIMIT_EXCEEDED(state);
196 void* jitted_process_build_batch =
198 DCHECK(jitted_process_build_batch != NULL);
204 if (codegen_process_probe_batch_fn_ != NULL) {
205 void* jitted_process_probe_batch =
206 state->llvm_codegen()->JitFunction(codegen_process_probe_batch_fn_);
207 DCHECK(jitted_process_probe_batch != NULL);
220 promise<Status> thread_status;
226 thread_status.set_value(ConstructHashTable(state));
229 Thread build_thread(
"hash-join-node",
"build thread",
231 >>>>>>> 48bb373... Add ImpalaThread to track thread usage
289 Expr*
const* other_conjuncts = &other_join_conjuncts_[0];
290 int num_other_conjuncts = other_join_conjuncts_.size();
292 Expr*
const* conjuncts = &conjuncts_[0];
293 int num_conjuncts = conjuncts_.size();
308 int row_idx = out_batch->
AddRow();
311 if (!
EvalConjuncts(other_conjuncts, num_other_conjuncts, out_row))
continue;
317 joined_build_rows_.insert(matched_build_row);
318 VLOG_ROW <<
"joined build row: " << matched_build_row;
335 int row_idx = out_batch->
AddRow();
354 probe_batch_pos_ = 0;
355 if (out_batch->IsFull() || out_batch->AtResourceLimit())
return Status::OK;
370 if (out_batch->IsFull() || out_batch->AtResourceLimit())
return Status::OK;
402 if (joined_build_rows_.find(build_row) != joined_build_rows_.end()) {
405 int row_idx = out_batch->
AddRow();
455 probe_batch_pos_ = 0;
456 if (out_batch->IsFull() || out_batch->AtResourceLimit())
break;
472 string HashJoinNode::GetProbeRowOutputString(
TupleRow* probe_row) {
475 int* build_tuple_idx_ptr_ = &build_tuple_idx_[0];
477 if (i != 0) out <<
" ";
479 int* is_build_tuple =
480 ::find(build_tuple_idx_ptr_, build_tuple_idx_ptr_ + build_tuple_size_, i);
482 if (is_build_tuple != build_tuple_idx_ptr_ + build_tuple_size_) {
493 *out << string(indentation_level * 2,
' ');
494 *out <<
"HashJoin(eos=" << (
eos_ ?
"true" :
"false")
495 <<
" probe_batch_pos=" << probe_batch_pos_
497 *out << string(indentation_level * 2,
' ');
509 memset(out, 0, result_tuple_row_size_);
511 memcpy(out, probe, result_tuple_row_size_);
515 for (
int i = 0; i < build_tuple_size_; ++i) {
519 for (
int i = 0; i < build_tuple_size_; ++i) {
520 out->
SetTuple(build_tuple_idx_[i], NULL);
554 DCHECK(tuple_row_type != NULL);
555 PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
558 DCHECK(this_type != NULL);
559 PointerType* this_ptr_type = PointerType::get(this_type, 0);
563 PointerType* tuple_row_working_type = PointerType::get(codegen->
ptr_type(), 0);
572 LLVMContext& context = codegen->
context();
575 Function* fn = prototype.GeneratePrototype(&builder, args);
576 Value* out_row_arg = builder.CreateBitCast(args[1], tuple_row_working_type,
"out");
577 Value* probe_row_arg = builder.CreateBitCast(args[2], tuple_row_working_type,
"probe");
578 Value* build_row_arg = builder.CreateBitCast(args[3], tuple_row_working_type,
"build");
581 codegen->
CodegenMemcpy(&builder, out_row_arg, probe_row_arg, result_tuple_row_size_);
584 BasicBlock* build_not_null_block = BasicBlock::Create(context,
"build_not_null", fn);
585 BasicBlock* build_null_block = NULL;
589 build_null_block = BasicBlock::Create(context,
"build_null", fn);
590 Value* is_build_null = builder.CreateIsNull(build_row_arg,
"is_build_null");
591 builder.CreateCondBr(is_build_null, build_null_block, build_not_null_block);
594 builder.SetInsertPoint(build_null_block);
595 for (
int i = 0; i < build_tuple_size_; ++i) {
597 Value* dst = builder.CreateGEP(out_row_arg, array_idx,
"dst_tuple_ptr");
600 builder.CreateRetVoid();
603 builder.CreateBr(build_not_null_block);
607 builder.SetInsertPoint(build_not_null_block);
608 for (
int i = 0; i < build_tuple_size_; ++i) {
611 Value* dst = builder.CreateGEP(out_row_arg, dst_idx,
"dst_tuple_ptr");
612 Value* src = builder.CreateGEP(build_row_arg, src_idx,
"src_tuple_ptr");
613 builder.CreateStore(builder.CreateLoad(src), dst);
615 builder.CreateRetVoid();
623 Function* process_build_batch_fn = codegen->
GetFunction(
624 IRFunction::HASH_JOIN_PROCESS_BUILD_BATCH);
625 DCHECK(process_build_batch_fn != NULL);
628 Function* eval_row_fn =
hash_tbl_->CodegenEvalTupleRow(codegen,
true);
629 if (eval_row_fn == NULL)
return NULL;
633 process_build_batch_fn = codegen->
ReplaceCallSites(process_build_batch_fn,
false,
634 eval_row_fn,
"EvalBuildRow", &replaced);
635 DCHECK_EQ(replaced, 1);
637 process_build_batch_fn = codegen->
ReplaceCallSites(process_build_batch_fn,
false,
638 hash_fn,
"HashCurrentRow", &replaced);
639 DCHECK_EQ(replaced, 1);
647 Function* process_probe_batch_fn = codegen->
GetFunction(
648 IRFunction::HASH_JOIN_PROCESS_PROBE_BATCH);
649 DCHECK(process_probe_batch_fn != NULL);
652 Function* equals_fn =
hash_tbl_->CodegenEquals(codegen);
653 if (equals_fn == NULL)
return NULL;
656 Function* eval_row_fn =
hash_tbl_->CodegenEvalTupleRow(codegen,
false);
657 if (eval_row_fn == NULL)
return NULL;
661 if (create_output_row_fn == NULL)
return NULL;
665 if (join_conjuncts_fn == NULL)
return NULL;
669 if (conjuncts_fn == NULL)
return NULL;
673 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
674 hash_fn,
"HashCurrentRow", &replaced);
675 DCHECK_EQ(replaced, 1);
677 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
678 eval_row_fn,
"EvalProbeRow", &replaced);
679 DCHECK_EQ(replaced, 1);
681 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
682 create_output_row_fn,
"CreateOutputRow", &replaced);
683 DCHECK_EQ(replaced, 2);
685 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
686 conjuncts_fn,
"EvalConjuncts", &replaced);
687 DCHECK_EQ(replaced, 2);
689 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
690 join_conjuncts_fn,
"EvalOtherJoinConjuncts", &replaced);
691 DCHECK_EQ(replaced, 1);
693 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
694 equals_fn,
"Equals", &replaced);
695 DCHECK_EQ(replaced, 2);
void AddRuntimeExecOption(const std::string &option)
Appends option to 'runtime_exec_options_'.
RuntimeProfile::Counter * hash_tbl_load_factor_counter_
int64_t num_rows_returned_
static const char * LLVM_CLASS_NAME
Tuple * GetTuple(int tuple_idx)
OldHashTable::Iterator hash_tbl_iterator_
TODO: Consider allowing fragment IDs as category parameters.
bool TryAcquireThreadToken(bool *is_reserved=NULL)
void CreateOutputRow(TupleRow *out_row, TupleRow *probe_row, TupleRow *build_row)
llvm::Function * CodegenProcessProbeBatch(RuntimeState *state, llvm::Function *hash_fn)
Utility struct that wraps a variable name and llvm type.
boost::scoped_ptr< RuntimeProfile > runtime_profile_
A tuple with 0 materialised slots is represented as NULL.
TupleRow * current_probe_row_
#define RETURN_IF_ERROR(stmt)
some generally useful macros
TupleRow * GetRow(int row_idx)
RowDescriptor row_descriptor_
#define ADD_TIMER(profile, name)
RuntimeProfile::Counter * build_timer_
const RowDescriptor & row_desc() const
void ReleaseThreadToken(bool required)
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
void(* ProcessBuildBatchFn)(HashJoinNode *, RowBatch *)
llvm::Function * CodegenProcessBuildBatch(RuntimeState *state, llvm::Function *hash_fn)
void BuildSideThread(RuntimeState *state, Promise< Status > *status)
int(* ProcessProbeBatchFn)(HashJoinNode *, RowBatch *, RowBatch *, int)
HashJoinNode::ProcessProbeBatch() exactly.
llvm::Value * null_ptr_value()
boost::scoped_ptr< OldHashTable > hash_tbl_
RuntimeProfile::Counter * probe_timer_
LLVM code generator. This is the top level object to generate jitted code.
boost::scoped_ptr< MemPool > build_pool_
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
RuntimeProfile::Counter * probe_row_counter_
static const char * LLVM_CLASS_NAME
virtual Status Prepare(RuntimeState *state)
int ProcessProbeBatch(RowBatch *out_batch, RowBatch *probe_batch, int max_added_rows)
virtual Status Init(const TPlanNode &tnode)
void AddArgument(const NamedVariable &var)
Add argument.
static llvm::Function * CodegenEvalConjuncts(RuntimeState *state, const std::vector< ExprContext * > &conjunct_ctxs, const char *name="EvalConjuncts")
void CodegenMemcpy(LlvmBuilder *, llvm::Value *dst, llvm::Value *src, int size)
RuntimeProfile::Counter * build_buckets_counter_
ProcessProbeBatchFn process_probe_batch_fn_
Jitted ProcessProbeBatch function pointer. Null if codegen is disabled.
#define RETURN_IF_CANCELLED(state)
virtual Status Prepare(RuntimeState *state)
#define ADD_COUNTER(profile, name, unit)
llvm::Function * GetFunction(IRFunction::Type)
This is the superclass of all expr evaluation nodes.
int GetTupleIdx(TupleId id) const
Returns INVALID_IDX if id not part of this row.
void ProcessBuildBatch(RowBatch *build_batch)
Construct the build hash table, adding all the rows in 'build_batch'.
virtual Status Open(RuntimeState *state)
llvm::Function * codegen_process_build_batch_fn_
llvm function for build batch
int64_t rows_returned() const
#define COUNTER_SET(c, v)
static Status CreateExprTree(ObjectPool *pool, const TExpr &texpr, ExprContext **ctx)
RuntimeProfile::Counter * rows_returned_counter_
void IR_ALWAYS_INLINE Next()
llvm::Function * CodegenCreateOutputRow(LlvmCodeGen *codegen)
Codegen function to create output row.
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
void SetTuple(int tuple_idx, Tuple *tuple)
llvm::Type * GetType(const ColumnType &type)
Returns llvm type for the column type.
llvm::Value * GetIntConstant(PrimitiveType type, int64_t val)
Returns the constant 'val' of 'type'.
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
RuntimeProfile::Counter * build_row_counter_
llvm::Function * FinalizeFunction(llvm::Function *function)
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
virtual void Close(RuntimeState *state)
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
virtual Status Open(RuntimeState *state)
boost::scoped_ptr< RowBatch > probe_batch_
llvm::Function * ReplaceCallSites(llvm::Function *caller, bool update_in_place, llvm::Function *new_fn, const std::string &target_name, int *num_replaced)
string PrintRow(TupleRow *row, const RowDescriptor &d)
string PrintTuple(const Tuple *t, const TupleDescriptor &d)
ThreadResourceMgr::ResourcePool * resource_pool()
virtual void Close(RuntimeState *state)
Status LeftJoinGetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
llvm::LLVMContext & context()
ProcessBuildBatchFn process_build_batch_fn_
virtual std::string DebugString() const
llvm::Function * OptimizeFunctionWithExprs(llvm::Function *fn)
llvm::PointerType * ptr_type()
bool match_one_build_
Match at most one build row to each probe row. Used in LEFT_SEMI_JOIN.
RuntimeProfile * runtime_profile()