27 #include "gen-cpp/PlanNodes_types.h"
33 "Enables pushing build side filters to probe side");
35 using namespace impala;
42 :
BlockingJoinNode(
"HashJoinNode", tnode.hash_join_node.join_op, pool, tnode, descs),
43 codegen_process_build_batch_fn_(NULL),
44 process_build_batch_fn_(NULL),
45 process_probe_batch_fn_(NULL) {
47 DCHECK_NE(
join_op_, TJoinOp::CROSS_JOIN);
48 DCHECK_NE(
join_op_, TJoinOp::LEFT_ANTI_JOIN);
49 DCHECK_NE(
join_op_, TJoinOp::RIGHT_SEMI_JOIN);
50 DCHECK_NE(
join_op_, TJoinOp::RIGHT_ANTI_JOIN);
51 DCHECK_NE(
join_op_, TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
54 (
join_op_ == TJoinOp::LEFT_OUTER_JOIN ||
join_op_ == TJoinOp::FULL_OUTER_JOIN);
57 (
join_op_ == TJoinOp::RIGHT_OUTER_JOIN ||
join_op_ == TJoinOp::FULL_OUTER_JOIN);
64 DCHECK(tnode.__isset.hash_join_node);
65 const vector<TEqJoinCondition>& eq_join_conjuncts =
66 tnode.hash_join_node.eq_join_conjuncts;
67 for (
int i = 0; i < eq_join_conjuncts.size(); ++i) {
105 join_op_ == TJoinOp::RIGHT_OUTER_JOIN ||
join_op_ == TJoinOp::FULL_OUTER_JOIN;
107 child(1)->
row_desc().tuple_descriptors().size(), stores_nulls,
115 Function* hash_fn =
hash_tbl_->CodegenHashCurrentRow(state);
129 if (codegen_process_probe_batch_fn != NULL) {
141 DCHECK(
false) <<
"NYI";
172 build_pool_->AcquireData(build_batch.tuple_data_pool(),
false);
187 DCHECK(!build_batch.AtCapacity());
201 VLOG(2) <<
"Disabling probe filter push down because build table is too large: "
209 if (first_probe_row == NULL) {
254 int row_idx = out_batch->
AddRow();
259 if (!
EvalConjuncts(other_conjunct_ctxs, num_other_conjunct_ctxs, out_row)) {
269 VLOG_ROW <<
"joined build row: " << matched_build_row;
273 if (
EvalConjuncts(conjunct_ctxs, num_conjunct_ctxs, out_row)) {
288 int row_idx = out_batch->
AddRow();
291 if (
EvalConjuncts(conjunct_ctxs, num_conjunct_ctxs, out_row)) {
356 if (matched)
continue;
358 int row_idx = out_batch->
AddRow();
361 if (
EvalConjuncts(conjunct_ctxs, num_conjunct_ctxs, out_row)) {
425 *out <<
" hash_tbl=";
426 *out << string(indentation_level * 2,
' ');
464 DCHECK(tuple_row_type != NULL);
465 PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
468 DCHECK(this_type != NULL);
469 PointerType* this_ptr_type = PointerType::get(this_type, 0);
473 PointerType* tuple_row_working_type = PointerType::get(codegen->
ptr_type(), 0);
482 LLVMContext& context = codegen->
context();
485 Function* fn = prototype.GeneratePrototype(&builder, args);
486 Value* out_row_arg = builder.CreateBitCast(args[1], tuple_row_working_type,
"out");
487 Value* probe_row_arg = builder.CreateBitCast(args[2], tuple_row_working_type,
"probe");
488 Value* build_row_arg = builder.CreateBitCast(args[3], tuple_row_working_type,
"build");
496 Value* build_row_dst = builder.CreateGEP(out_row_arg, build_row_idx,
"build_dst_ptr");
499 BasicBlock* build_not_null_block = BasicBlock::Create(context,
"build_not_null", fn);
500 BasicBlock* build_null_block = NULL;
504 build_null_block = BasicBlock::Create(context,
"build_null", fn);
505 Value* is_build_null = builder.CreateIsNull(build_row_arg,
"is_build_null");
506 builder.CreateCondBr(is_build_null, build_null_block, build_not_null_block);
511 builder.SetInsertPoint(build_null_block);
512 for (
int i = 0; i < num_build_tuples; ++i) {
515 Value* dst = builder.CreateGEP(out_row_arg, array_idx,
"dst_tuple_ptr");
518 builder.CreateRetVoid();
521 builder.CreateBr(build_not_null_block);
525 builder.SetInsertPoint(build_not_null_block);
527 builder.CreateRetVoid();
538 Function* process_build_batch_fn = codegen->
GetFunction(
539 IRFunction::HASH_JOIN_PROCESS_BUILD_BATCH);
540 DCHECK(process_build_batch_fn != NULL);
543 Function* eval_row_fn =
hash_tbl_->CodegenEvalTupleRow(state,
true);
544 if (eval_row_fn == NULL)
return NULL;
548 process_build_batch_fn = codegen->
ReplaceCallSites(process_build_batch_fn,
false,
549 eval_row_fn,
"EvalBuildRow", &replaced);
550 DCHECK_EQ(replaced, 1);
552 process_build_batch_fn = codegen->
ReplaceCallSites(process_build_batch_fn,
false,
553 hash_fn,
"HashCurrentRow", &replaced);
554 DCHECK_EQ(replaced, 1);
564 Function* process_probe_batch_fn =
565 codegen->
GetFunction(IRFunction::HASH_JOIN_PROCESS_PROBE_BATCH);
566 DCHECK(process_probe_batch_fn != NULL);
569 Function* equals_fn =
hash_tbl_->CodegenEquals(state);
570 if (equals_fn == NULL)
return NULL;
573 Function* eval_row_fn =
hash_tbl_->CodegenEvalTupleRow(state,
false);
574 if (eval_row_fn == NULL)
return NULL;
578 if (create_output_row_fn == NULL)
return NULL;
583 if (eval_other_conjuncts_fn == NULL)
return NULL;
587 if (eval_conjuncts_fn == NULL)
return NULL;
591 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
592 hash_fn,
"HashCurrentRow", &replaced);
593 DCHECK_EQ(replaced, 1);
595 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
596 eval_row_fn,
"EvalProbeRow", &replaced);
597 DCHECK_EQ(replaced, 1);
599 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
600 create_output_row_fn,
"CreateOutputRow", &replaced);
601 DCHECK_EQ(replaced, 3);
603 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
604 eval_conjuncts_fn,
"EvalConjuncts", &replaced);
605 DCHECK_EQ(replaced, 2);
607 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
608 eval_other_conjuncts_fn,
"EvalOtherJoinConjuncts", &replaced);
609 DCHECK_EQ(replaced, 2);
611 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
false,
612 equals_fn,
"Equals", &replaced);
613 DCHECK_EQ(replaced, 2);
uint32_t slot_filter_bitmap_size() const
virtual void AddToDebugString(int indentation_level, std::stringstream *out) const
void AddRuntimeExecOption(const std::string &option)
Appends option to 'runtime_exec_options_'.
RuntimeProfile::Counter * hash_tbl_load_factor_counter_
int64_t num_rows_returned_
static const char * LLVM_CLASS_NAME
OldHashTable::Iterator hash_tbl_iterator_
MemTracker * mem_tracker()
void CreateOutputRow(TupleRow *out_row, TupleRow *probe_row, TupleRow *build_row)
llvm::Function * CodegenProcessProbeBatch(RuntimeState *state, llvm::Function *hash_fn)
Utility struct that wraps a variable name and llvm type.
boost::scoped_ptr< RuntimeProfile > runtime_profile_
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
virtual Status Prepare(RuntimeState *state)
TupleRow * current_probe_row_
#define RETURN_IF_ERROR(stmt)
some generally useful macros
HashJoinNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
TupleRow * GetRow(int row_idx)
bool AtEnd() const
Returns true if this iterator is at the end, i.e. GetRow() cannot be called.
int probe_tuple_row_size_
std::vector< ExprContext * > probe_expr_ctxs_
virtual void Close(RuntimeState *state)
virtual Status Init(const TPlanNode &tnode)
RuntimeProfile::Counter * build_timer_
DECLARE_string(cgroup_hierarchy_path)
const RowDescriptor & row_desc() const
#define COUNTER_ADD(c, v)
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
llvm::Function * CodegenProcessBuildBatch(RuntimeState *state, llvm::Function *hash_fn)
llvm::Value * null_ptr_value()
const std::vector< ExprContext * > & conjunct_ctxs() const
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
boost::scoped_ptr< OldHashTable > hash_tbl_
RuntimeProfile::Counter * probe_timer_
virtual Status Reset(RuntimeState *state)
LLVM code generator. This is the top level object to generate jitted code.
boost::scoped_ptr< MemPool > build_pool_
RuntimeProfile::Counter * probe_row_counter_
MemTracker * expr_mem_tracker()
static const char * LLVM_CLASS_NAME
virtual Status Prepare(RuntimeState *state)
int ProcessProbeBatch(RowBatch *out_batch, RowBatch *probe_batch, int max_added_rows)
virtual Status Init(const TPlanNode &tnode)
void AddArgument(const NamedVariable &var)
Add argument.
static llvm::Function * CodegenEvalConjuncts(RuntimeState *state, const std::vector< ExprContext * > &conjunct_ctxs, const char *name="EvalConjuncts")
void CodegenMemcpy(LlvmBuilder *, llvm::Value *dst, llvm::Value *src, int size)
RuntimeProfile::Counter * build_buckets_counter_
ProcessProbeBatchFn process_probe_batch_fn_
Jitted ProcessProbeBatch function pointer. Null if codegen is disabled.
The hash table does not support removes. The hash table is not thread safe.
std::vector< ExprContext * > other_join_conjunct_ctxs_
non-equi-join conjuncts from the JOIN clause
#define RETURN_IF_CANCELLED(state)
bool can_add_probe_filters_
#define ADD_COUNTER(profile, name, unit)
uint32_t fragment_hash_seed() const
llvm::Function * GetFunction(IRFunction::Type)
void AddFunctionToJit(llvm::Function *fn, void **fn_ptr)
virtual Status QueryMaintenance(RuntimeState *state)
int build_tuple_row_size_
void ProcessBuildBatch(RowBatch *build_batch)
Construct the build hash table, adding all the rows in 'build_batch'.
virtual Status Open(RuntimeState *state)
llvm::Function * codegen_process_build_batch_fn_
llvm function for build batch
int64_t rows_returned() const
#define COUNTER_SET(c, v)
DEFINE_bool(enable_probe_side_filtering, true,"Enables pushing build side filters to probe side")
static Status CreateExprTree(ObjectPool *pool, const TExpr &texpr, ExprContext **ctx)
RuntimeProfile::Counter * rows_returned_counter_
void IR_ALWAYS_INLINE Next()
llvm::Function * CodegenCreateOutputRow(LlvmCodeGen *codegen)
Codegen function to create output row.
virtual Status InitGetNext(TupleRow *first_probe_row)
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
std::string GetLeftChildRowString(TupleRow *row)
bool codegen_enabled() const
Returns true if codegen is enabled for this query.
static const char * LLVM_CLASS_NAME
std::vector< ExprContext * > build_expr_ctxs_
llvm::Type * GetType(const ColumnType &type)
Returns llvm type for the column type.
Status GetCodegen(LlvmCodeGen **codegen, bool initialize=true)
virtual Status ConstructBuildSide(RuntimeState *state)
llvm::Value * GetIntConstant(PrimitiveType type, int64_t val)
Returns the constant 'val' of 'type'.
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
RuntimeProfile::Counter * build_row_counter_
llvm::Function * FinalizeFunction(llvm::Function *function)
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
virtual void Close(RuntimeState *state)
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
boost::scoped_ptr< RowBatch > probe_batch_
llvm::Function * ReplaceCallSites(llvm::Function *caller, bool update_in_place, llvm::Function *new_fn, const std::string &target_name, int *num_replaced)
string PrintRow(TupleRow *row, const RowDescriptor &d)
std::vector< ExprContext * > conjunct_ctxs_
Status LeftJoinGetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
llvm::LLVMContext & context()
ProcessBuildBatchFn process_build_batch_fn_
virtual std::string DebugString() const
llvm::Function * OptimizeFunctionWithExprs(llvm::Function *fn)
llvm::PointerType * ptr_type()
bool match_one_build_
Match at most one build row to each probe row. Used in LEFT_SEMI_JOIN.
RuntimeProfile * runtime_profile()