18 #include <gutil/strings/substitute.h>
32 #include "gen-cpp/PlanNodes_types.h"
37 "Enables pushing PHJ build side filters to probe side");
39 using namespace impala;
41 using namespace strings;
47 using_small_buffers_(true),
48 state_(PARTITIONING_BUILD),
49 block_mgr_client_(NULL),
50 partition_build_timer_(NULL),
51 null_aware_eval_timer_(NULL),
52 process_build_batch_fn_(NULL),
53 process_build_batch_fn_level0_(NULL),
54 process_probe_batch_fn_(NULL),
55 process_probe_batch_fn_level0_(NULL),
56 input_partition_(NULL),
57 null_aware_partition_(NULL),
58 non_empty_build_(false),
59 null_probe_rows_(NULL),
60 null_probe_output_idx_(-1) {
68 DCHECK(tnode.__isset.hash_join_node);
69 const vector<TEqJoinCondition>& eq_join_conjuncts =
70 tnode.hash_join_node.eq_join_conjuncts;
71 for (
int i = 0; i < eq_join_conjuncts.size(); ++i) {
82 if (
join_op_ == TJoinOp::LEFT_SEMI_JOIN ||
join_op_ == TJoinOp::LEFT_ANTI_JOIN ||
83 join_op_ == TJoinOp::RIGHT_SEMI_JOIN ||
join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
84 join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
87 if (
join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
88 DCHECK_EQ(eq_join_conjuncts.size(), 1);
131 bool should_store_nulls =
join_op_ == TJoinOp::RIGHT_OUTER_JOIN ||
132 join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
join_op_ == TJoinOp::FULL_OUTER_JOIN;
137 if (
join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
157 "MaxPartitionLevel", TUnit::UNIT);
167 "LargestPartitionPercent", TUnit::UNIT);
171 Function* hash_fn =
ht_ctx_->CodegenHashCurrentRow(state,
false);
172 Function* murmur_hash_fn =
ht_ctx_->CodegenHashCurrentRow(state,
true);
173 if (hash_fn != NULL && murmur_hash_fn != NULL) {
188 DCHECK(
false) <<
"NYI";
228 state, parent_->child(1)->
row_desc(), state->block_mgr(),
229 parent_->block_mgr_client_, use_small_buffers))),
231 state, parent_->child(0)->
row_desc(),
232 state->block_mgr(), parent_->block_mgr_client_, use_small_buffers))) {
244 DCHECK(hash_tbl_.get() != NULL);
245 return build_rows_->byte_size() + hash_tbl_->byte_size();
251 if (hash_tbl_.get() != NULL) hash_tbl_->Close();
255 if (build_rows_ != NULL) {
257 build_rows_->Close();
263 if (probe_rows_ != NULL) {
265 probe_rows_->Close();
276 if (parent_->num_spilled_partitions_->value() == 1) {
277 parent_->AddRuntimeExecOption(
"Spilled");
281 if (hash_tbl() != NULL) {
287 if (build_rows()->using_small_buffers())
return Status::OK;
288 return build_rows()->UnpinStream(unpin_all_build);
292 bool* built,
const bool add_probe_filters) {
293 if (add_probe_filters) {
294 return BuildHashTableInternal<true>(state, built);
296 return BuildHashTableInternal<false>(state, built);
300 template<
bool const AddProbeFilters>
303 DCHECK_NOTNULL(build_rows_);
316 parent_->mem_tracker());
319 vector<BufferedTupleStream::RowIdx> indices;
320 uint32_t seed0 = ctx->
seed(0);
328 int64_t estimated_num_buckets =
330 hash_tbl_.reset(
new HashTable(state, parent_->block_mgr_client_,
331 parent_->child(1)->row_desc().tuple_descriptors().size(), build_rows(),
333 if (!hash_tbl_->Init())
goto not_built;
335 if (AddProbeFilters) DCHECK_EQ(level_, 0) <<
"Should not add filters if repartitioning";
338 DCHECK_EQ(batch.num_rows(), indices.size());
339 int num_rows = batch.num_rows();
340 DCHECK_LE(num_rows, hash_tbl_->EmptyBuckets());
342 for (
int i = 0; i < num_rows; ++i) {
345 if (!ctx->EvalAndHashBuild(row, &hash))
continue;
346 if (
UNLIKELY(!hash_tbl_->Insert(ctx, indices[i], row, hash)))
goto not_built;
347 if (AddProbeFilters) {
349 for (
int j = 0; j < parent_->probe_filters_.size(); ++j) {
350 if (parent_->probe_filters_[j].second == NULL)
continue;
351 void* e = parent_->build_expr_ctxs_[j]->GetValue(row);
353 parent_->build_expr_ctxs_[j]->root()->type(), seed0);
354 parent_->probe_filters_[j].second->Set<
true>(h,
true);
362 DCHECK_NOTNULL(hash_tbl_.get());
364 COUNTER_ADD(parent_->num_hash_buckets_, hash_tbl_->num_buckets());
369 if (hash_tbl_.get() != NULL) {
373 if (parent_->can_add_probe_filters_) {
376 parent_->can_add_probe_filters_ =
false;
377 VLOG(2) <<
"Disabling probe filter push down because a partition will spill.";
404 bool acquired_ownership =
false;
408 &acquired_ownership);
409 VLOG(2) <<
"Bitmap filter added on slot: " <<
probe_filters_[i].first;
410 if (!acquired_ownership) {
434 if (stream->
AddRow(row))
return true;
442 if (stream->
AddRow(row))
return true;
451 int64_t max_freed_mem = 0;
452 int partition_idx = -1;
453 *spilled_partition = NULL;
466 if (mem > max_freed_mem) {
472 if (partition_idx == -1) {
477 VLOG(2) <<
"Spilling partition: " << partition_idx << endl <<
NodeDebugString();
504 TErrorCode::PARTITIONED_HASH_JOIN_MAX_PARTITION_DEPTH,
536 int64_t total_build_rows = 0;
548 total_build_rows += build_batch.num_rows();
562 DCHECK(!build_batch.AtCapacity());
572 ss <<
"PHJ(node_id=" <<
id() <<
") partitioned(level="
574 << total_build_rows <<
" rows into:" << endl;
579 ss <<
" " << i <<
" " << (partition->
is_spilled() ?
"spilled" :
"not spilled")
580 <<
" (fraction=" << fixed << setprecision(2) << percent <<
"%)" << endl
641 if (
join_op_ == TJoinOp::RIGHT_OUTER_JOIN ||
join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
642 join_op_ == TJoinOp::FULL_OUTER_JOIN) {
694 DCHECK_GE(num_input_rows, largest_partition) <<
"Cannot have a partition with "
695 "more rows than the input";
696 if (num_input_rows == largest_partition) {
698 status.
AddDetail(Substitute(
"Cannot perform hash join at node with id $0. "
699 "Repartitioning did not reduce the size of a spilled partition. "
700 "Repartitioning level $1. Number of rows $2.",
724 int64_t max_rows = 0;
729 if (rows > max_rows) max_rows = rows;
753 if ((
join_op_ == TJoinOp::RIGHT_OUTER_JOIN ||
join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
754 join_op_ == TJoinOp::FULL_OUTER_JOIN) &&
772 if (
join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
839 if ((
join_op_ == TJoinOp::RIGHT_OUTER_JOIN ||
join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
840 join_op_ == TJoinOp::FULL_OUTER_JOIN) &&
849 if (
join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
867 DCHECK(
join_op_ == TJoinOp::RIGHT_OUTER_JOIN ||
join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
868 join_op_ == TJoinOp::FULL_OUTER_JOIN);
874 int num_rows_added = 0;
877 num_rows_added < max_rows) {
881 DCHECK_NOTNULL(build_row);
882 if (
join_op_ == TJoinOp::RIGHT_ANTI_JOIN) {
883 out_batch->
CopyRow(build_row, out_row);
889 out_row = out_row->
next_row(out_batch);
912 DCHECK_LE(num_rows_added, max_rows);
965 return Status(Substitute(
"Unable to perform Null-Aware Anti-Join. There are too"
966 " many NULLs on the $0 side to perform this join.", build ?
"build" :
"probe"));
978 if (build_stream->num_rows() == 0) {
981 DCHECK_EQ(probe_stream->
num_rows(), 0);
1031 bool matched =
false;
1043 out_batch->
CopyRow(probe_row, out_row);
1073 num_build_rows += partition_num_rows;
1080 VLOG(2) <<
"Disabling probe filter push down because build side is too large: "
1092 partition->
Close(NULL);
1109 list<Partition*> spilled_partitions;
1113 spilled_partitions.push_back(partition);
1116 while (!spilled_partitions.empty()) {
1117 Partition* partition = spilled_partitions.front();
1118 spilled_partitions.pop_front();
1123 if (got_buffer)
break;
1128 spilled_partitions.push_back(spilled_partition);
1171 scoped_ptr<RowBatch> build_rows;
1174 scoped_ptr<RowBatch> probe_rows;
1184 for (
int i = 0; i < probe_rows->num_rows(); ++i) {
1186 for (
int j = 0; j < build_rows->num_rows(); ++j) {
1188 build_rows->GetRow(j));
1219 status.
AddDetail(
"Not enough memory to get the minimum required buffers for "
1256 <<
"No probe rows should have been spilled for this partition.";
1257 if (
join_op_ == TJoinOp::RIGHT_OUTER_JOIN ||
1258 join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
1259 join_op_ == TJoinOp::FULL_OUTER_JOIN) {
1264 }
else if (
join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
1269 partition->
Close(batch);
1271 partition->
Close(batch);
1286 *out <<
" hash_tbl=";
1287 *out << string(indent * 2,
' ');
1305 default: DCHECK(
false);
1312 ss <<
"PartitionedHashJoinNode (id=" <<
id() <<
" op=" <<
join_op_
1320 Partition* partition = hash_partitions_[i];
1322 ss <<
" closed" << endl;
1327 << (partition->
is_spilled() ?
"(Spilled)" :
"")
1331 if (partition->
hash_tbl() != NULL) {
1332 ss <<
" Hash Table Rows: " << partition->
hash_tbl()->
size() << endl;
1340 ss <<
"SpilledPartitions" << endl;
1343 DCHECK((*it)->is_spilled());
1344 DCHECK((*it)->hash_tbl() == NULL);
1345 ss <<
" Partition=" << *it << endl
1346 <<
" Spilled Build Rows: "
1347 << (*it)->build_rows()->num_rows() << endl
1348 <<
" Spilled Probe Rows: "
1349 << (*it)->probe_rows()->num_rows() << endl;
1384 DCHECK(tuple_row_type != NULL);
1385 PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
1388 DCHECK(this_type != NULL);
1389 PointerType* this_ptr_type = PointerType::get(this_type, 0);
1393 PointerType* tuple_row_working_type = PointerType::get(codegen->
ptr_type(), 0);
1402 LLVMContext& context = codegen->
context();
1405 Function* fn = prototype.GeneratePrototype(&builder, args);
1406 Value* out_row_arg = builder.CreateBitCast(args[1], tuple_row_working_type,
"out");
1407 Value* probe_row_arg = builder.CreateBitCast(args[2], tuple_row_working_type,
"probe");
1408 Value* build_row_arg = builder.CreateBitCast(args[3], tuple_row_working_type,
"build");
1416 Value* build_row_dst = builder.CreateGEP(out_row_arg, build_row_idx,
"build_dst_ptr");
1419 BasicBlock* build_not_null_block = BasicBlock::Create(context,
"build_not_null", fn);
1420 BasicBlock* build_null_block = NULL;
1422 if (
join_op_ == TJoinOp::LEFT_ANTI_JOIN ||
join_op_ == TJoinOp::LEFT_OUTER_JOIN ||
1423 join_op_ == TJoinOp::FULL_OUTER_JOIN ||
1424 join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
1426 build_null_block = BasicBlock::Create(context,
"build_null", fn);
1427 Value* is_build_null = builder.CreateIsNull(build_row_arg,
"is_build_null");
1428 builder.CreateCondBr(is_build_null, build_null_block, build_not_null_block);
1433 builder.SetInsertPoint(build_null_block);
1434 for (
int i = 0; i < num_build_tuples; ++i) {
1435 Value* array_idx[] =
1437 Value* dst = builder.CreateGEP(out_row_arg, array_idx,
"dst_tuple_ptr");
1440 builder.CreateRetVoid();
1443 builder.CreateBr(build_not_null_block);
1447 builder.SetInsertPoint(build_not_null_block);
1449 builder.CreateRetVoid();
1455 RuntimeState* state, Function* hash_fn, Function* murmur_hash_fn) {
1459 Function* process_build_batch_fn =
1460 codegen->
GetFunction(IRFunction::PHJ_PROCESS_BUILD_BATCH);
1461 DCHECK(process_build_batch_fn != NULL);
1464 Function* eval_row_fn =
ht_ctx_->CodegenEvalRow(state,
true);
1465 if (eval_row_fn == NULL)
return false;
1469 process_build_batch_fn = codegen->
ReplaceCallSites(process_build_batch_fn,
false,
1470 eval_row_fn,
"EvalBuildRow", &replaced);
1471 DCHECK_EQ(replaced, 1);
1476 process_build_batch_fn,
false, hash_fn,
"HashCurrentRow", &replaced);
1477 DCHECK_EQ(replaced, 1);
1480 process_build_batch_fn,
true, murmur_hash_fn,
"HashCurrentRow", &replaced);
1481 DCHECK_EQ(replaced, 1);
1485 if (process_build_batch_fn == NULL)
return false;
1486 process_build_batch_fn_level0 =
1488 if (process_build_batch_fn_level0 == NULL)
return false;
1499 RuntimeState* state, Function* hash_fn, Function* murmur_hash_fn) {
1504 IRFunction::Type ir_fn;
1506 case TJoinOp::INNER_JOIN:
1507 ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_INNER_JOIN;
1509 case TJoinOp::LEFT_OUTER_JOIN:
1510 ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_LEFT_OUTER_JOIN;
1512 case TJoinOp::LEFT_SEMI_JOIN:
1513 ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_LEFT_SEMI_JOIN;
1515 case TJoinOp::LEFT_ANTI_JOIN:
1516 ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_LEFT_ANTI_JOIN;
1518 case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN:
1519 ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_NULL_AWARE_LEFT_ANTI_JOIN;
1521 case TJoinOp::RIGHT_OUTER_JOIN:
1522 ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_RIGHT_OUTER_JOIN;
1524 case TJoinOp::RIGHT_SEMI_JOIN:
1525 ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_RIGHT_SEMI_JOIN;
1527 case TJoinOp::RIGHT_ANTI_JOIN:
1528 ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_RIGHT_ANTI_JOIN;
1530 case TJoinOp::FULL_OUTER_JOIN:
1531 ir_fn = IRFunction::PHJ_PROCESS_PROBE_BATCH_FULL_OUTER_JOIN;
1537 Function* process_probe_batch_fn = codegen->
GetFunction(ir_fn);
1538 DCHECK(process_probe_batch_fn != NULL);
1541 process_probe_batch_fn = codegen->
CloneFunction(process_probe_batch_fn);
1542 process_probe_batch_fn->setName(
"ProcessProbeBatch");
1548 DCHECK(process_probe_batch_fn->getLinkage() == GlobalValue::LinkOnceODRLinkage)
1550 process_probe_batch_fn->setLinkage(GlobalValue::WeakODRLinkage);
1553 Value* this_arg = codegen->
GetArgument(process_probe_batch_fn, 0);
1555 this_arg->replaceAllUsesWith(this_loc);
1558 Value* ht_ctx_arg = codegen->
GetArgument(process_probe_batch_fn, 2);
1560 ht_ctx_arg->replaceAllUsesWith(ht_ctx_loc);
1563 Function* equals_fn =
ht_ctx_->CodegenEquals(state);
1564 if (equals_fn == NULL)
return false;
1567 Function* eval_row_fn =
ht_ctx_->CodegenEvalRow(state,
false);
1568 if (eval_row_fn == NULL)
return false;
1572 if (create_output_row_fn == NULL)
return false;
1577 if (eval_other_conjuncts_fn == NULL)
return false;
1581 if (eval_conjuncts_fn == NULL)
return false;
1585 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
true,
1586 eval_row_fn,
"EvalProbeRow", &replaced);
1587 DCHECK_EQ(replaced, 1);
1589 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
true,
1590 create_output_row_fn,
"CreateOutputRow", &replaced);
1591 DCHECK(replaced == 1 || replaced == 2) << replaced;
1593 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
true,
1594 eval_conjuncts_fn,
"EvalConjuncts", &replaced);
1605 DCHECK(replaced == 0 || replaced == 1 || replaced == 2) << replaced;
1607 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
true,
1608 eval_other_conjuncts_fn,
"EvalOtherJoinConjuncts", &replaced);
1609 DCHECK_EQ(replaced, 1);
1611 process_probe_batch_fn = codegen->
ReplaceCallSites(process_probe_batch_fn,
true,
1612 equals_fn,
"Equals", &replaced);
1614 DCHECK(replaced == 1 || replaced == 2 || replaced == 3 || replaced == 4) << replaced;
1619 process_probe_batch_fn,
false, hash_fn,
"HashCurrentRow", &replaced);
1620 DCHECK_EQ(replaced, 1);
1623 process_probe_batch_fn,
true, murmur_hash_fn,
"HashCurrentRow", &replaced);
1624 DCHECK_EQ(replaced, 1);
1628 if (process_probe_batch_fn == NULL)
return false;
1629 process_probe_batch_fn_level0 =
1631 if (process_probe_batch_fn_level0 == NULL)
return false;
RuntimeState * runtime_state_
void UpdateState(State s)
Updates state_ to 's', logging the transition.
uint32_t slot_filter_bitmap_size() const
The underlying memory management is done by the BufferedBlockMgr.
void AddRuntimeExecOption(const std::string &option)
Appends option to 'runtime_exec_options_'.
bool AtEnd() const
Returns true if this iterator is at the end, i.e. GetRow() cannot be called.
ProcessProbeBatchFn process_probe_batch_fn_level0_
std::list< Partition * > spilled_partitions_
bool non_empty_build_
If true, the build side has at least one row.
int64_t EstimatedInMemSize() const
Status BuildHashTableInternal(RuntimeState *state, bool *built)
static int64_t EstimateNumBuckets(int64_t num_rows)
RuntimeProfile::Counter * partition_build_timer_
Total time spent partitioning build.
bool has_write_block() const
int64_t num_rows_returned_
void Close()
Must be called once at the end to cleanup all resources. Idempotent.
BufferedBlockMgr * block_mgr()
MemTracker * mem_tracker()
Status MemLimitTooLowError(Client *client)
int blocks_pinned() const
RuntimeProfile::Counter * num_probe_rows_partitioned_
Status OutputNullAwareProbeRows(RuntimeState *state, RowBatch *out_batch)
std::vector< ExprContext * > other_join_conjunct_ctxs_
Non-equi-join conjuncts from the JOIN clause.
llvm::Function * CodegenCreateOutputRow(LlvmCodeGen *codegen)
Codegen function to create output row. Assumes that the probe row is non-NULL.
static Status NullAwareAntiJoinError(bool build)
void CreateOutputRow(TupleRow *out_row, TupleRow *probe_row, TupleRow *build_row)
Utility struct that wraps a variable name and llvm type.
boost::scoped_ptr< RuntimeProfile > runtime_profile_
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
int64_t rows_returned() const
Number of rows returned via GetNext().
bool AllocateProbeFilters(RuntimeState *state)
Status ProcessBuildBatch(RowBatch *build_batch)
Reads the rows in build_batch and partitions them in hash_partitions_.
RuntimeProfile::HighWaterMarkCounter * max_partition_level_
Level of max partition (i.e. number of repartitioning steps).
virtual Status Prepare(RuntimeState *state)
TupleRow * current_probe_row_
PartitionedHashJoinNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
const StringSearch UrlParser::hash_search & hash
RuntimeProfile::Counter * null_aware_eval_timer_
Time spent evaluating other_join_conjuncts for NAAJ.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
bool CodegenProcessBuildBatch(RuntimeState *state, llvm::Function *hash_fn, llvm::Function *murmur_hash_fn)
TupleRow * GetRow(int row_idx)
int probe_tuple_row_size_
Status ProcessBuildInput(RuntimeState *state, int level)
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
Open() implemented in BlockingJoinNode.
Status SpillPartition(Partition **spilled_partition)
HashTable * hash_tbls_[PARTITION_FANOUT]
int64_t SpareCapacity() const
#define ADD_TIMER(profile, name)
Status BuildHashTables(RuntimeState *state)
virtual void Close(RuntimeState *state)
Status RegisterClient(int num_reserved_buffers, MemTracker *tracker, RuntimeState *state, Client **client)
virtual void Close(RuntimeState *state)
virtual Status Init(const TPlanNode &tnode)
bool CodegenProcessProbeBatch(RuntimeState *state, llvm::Function *hash_fn, llvm::Function *murmur_hash_fn)
std::string PrintState() const
Returns the current state of the partition as a string.
TupleRow * next_row(RowBatch *batch) const
RuntimeProfile::Counter * num_hash_buckets_
Total number of hash buckets across all partitions.
void ResetForProbe()
Prepares for probing the next batch.
Partition * input_partition_
void AddDetail(const std::string &msg)
Add a detail string. Calling this method is only defined on a non-OK message.
const RowDescriptor & row_desc() const
BufferedTupleStream * probe_rows()
Status NextSpilledProbeRowBatch(RuntimeState *, RowBatch *out_batch)
#define COUNTER_ADD(c, v)
virtual void AddToDebugString(int indentation_level, std::stringstream *out) const
HashTable * hash_tbl() const
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
llvm::Argument * GetArgument(llvm::Function *fn, int i)
Returns the i-th argument of fn.
std::vector< bool > matched_null_probe_
llvm::Value * null_ptr_value()
const std::vector< ExprContext * > & conjunct_ctxs() const
Status PrepareNextPartition(RuntimeState *)
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
int64_t InMemSize() const
void AddBitmapFilter(SlotId slot, Bitmap *bitmap, bool *acquired_ownership)
ObjectPool * obj_pool()
Returns a local object pool.
RuntimeProfile::Counter * probe_timer_
std::vector< Partition * > hash_partitions_
BufferedTupleStream * null_probe_rows_
void Close(RowBatch *batch)
static std::string Print(T *value_or_type)
Returns the string representation of a llvm::Value* or llvm::Type*.
void AddTupleStream(BufferedTupleStream *stream)
virtual Status Init(const TPlanNode &tnode)
LLVM code generator. This is the top level object to generate jitted code.
void ClearReservations(Client *client)
Clears all reservations for this client.
int MinRequiredBuffers() const
int64_t null_probe_output_idx_
ProcessProbeBatchFn process_probe_batch_fn_
RuntimeProfile::Counter * probe_row_counter_
MemTracker * expr_mem_tracker()
static const char * LLVM_CLASS_NAME
virtual Status Prepare(RuntimeState *state)
llvm::Value * CastPtrToLlvmPtr(llvm::Type *type, const void *ptr)
void AddArgument(const NamedVariable &var)
Add argument.
static llvm::Function * CodegenEvalConjuncts(RuntimeState *state, const std::vector< ExprContext * > &conjunct_ctxs, const char *name="EvalConjuncts")
std::string NodeDebugString() const
ProcessBuildBatchFn process_build_batch_fn_level0_
Status BuildHashTable(RuntimeState *state, bool *built, const bool add_probe_filters)
Wrapper for the template-based BuildHashTable() based on 'add_probe_filters'.
void CodegenMemcpy(LlvmBuilder *, llvm::Value *dst, llvm::Value *src, int size)
boost::scoped_ptr< RowBatch > nulls_build_batch_
ProcessBuildBatchFn process_build_batch_fn_
ObjectPool * obj_pool() const
RuntimeProfile::Counter * num_spilled_partitions_
Number of partitions that have been spilled.
#define RETURN_IF_CANCELLED(state)
bool using_small_buffers_
If true, the partitions in hash_partitions_ are using small buffers.
bool can_add_probe_filters_
#define ADD_COUNTER(profile, name, unit)
uint32_t fragment_hash_seed() const
llvm::Function * GetFunction(IRFunction::Type)
void OutputUnmatchedBuild(RowBatch *out_batch)
std::vector< ExprContext * > probe_expr_ctxs_
Status EvaluateNullProbe(BufferedTupleStream *build)
Status NextProbeRowBatch(RuntimeState *, RowBatch *out_batch)
void AddFunctionToJit(llvm::Function *fn, void **fn_ptr)
llvm::Function * CloneFunction(llvm::Function *fn)
Returns a copy of fn. The copy is added to the module.
void AddExprCtxsToFree(const std::vector< ExprContext * > &ctxs)
bool using_small_buffers() const
virtual Status QueryMaintenance(RuntimeState *state)
BufferedTupleStream * build_rows_
int build_tuple_row_size_
Status GetRows(boost::scoped_ptr< RowBatch > *batch, bool *got_rows)
virtual Status Open(RuntimeState *state)
Status Spill(bool unpin_all_build)
const RowDescriptor & row_desc() const
#define COUNTER_SET(c, v)
int64_t LargestSpilledPartition() const
virtual Status InitGetNext(TupleRow *first_probe_row)
Partition(RuntimeState *state, PartitionedHashJoinNode *parent, int level, bool use_small_buffers)
static Status CreateExprTree(ObjectPool *pool, const TExpr &texpr, ExprContext **ctx)
RuntimeProfile::Counter * rows_returned_counter_
RuntimeProfile::HighWaterMarkCounter * largest_partition_percent_
static const Status MEM_LIMIT_EXCEEDED
RuntimeProfile::Counter * num_build_rows_partitioned_
Number of build/probe rows that have been partitioned.
Status SetMemLimitExceeded(MemTracker *tracker=NULL, int64_t failed_allocation_size=0)
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
BufferedTupleStream * build_rows()
RuntimeProfile::Counter * partitions_created_
Total number of partitions created.
Status ReserveTupleStreamBlocks()
static const int MAX_PARTITION_DEPTH
void SetErrorMsg(const ErrorMsg &m)
std::vector< ExprContext * > build_expr_ctxs_
bool AppendRowStreamFull(BufferedTupleStream *stream, TupleRow *row)
Status Init(RuntimeProfile *profile=NULL, bool pinned=true)
bool codegen_enabled() const
Returns true if codegen is enabled for this query.
void CopyRow(TupleRow *src, TupleRow *dest)
bool AddRow(TupleRow *row, uint8_t **dst=NULL)
HighWaterMarkCounter * AddHighWaterMarkCounter(const std::string &name, TUnit::type unit, const std::string &parent_counter_name="")
Status PrepareNullAwareNullProbe()
Reference to a single slot of a tuple.
Status OutputNullAwareNullProbe(RuntimeState *state, RowBatch *out_batch)
std::list< Partition * > output_build_partitions_
static const char * LLVM_CLASS_NAME
Status PrepareForRead(bool *got_buffer=NULL)
virtual Status ConstructBuildSide(RuntimeState *state)
Partition * null_aware_partition_
llvm::Type * GetType(const ColumnType &type)
Returns llvm type for the column type.
Status GetCodegen(LlvmCodeGen **codegen, bool initialize=true)
int64_t num_rows() const
Number of rows in the stream.
BufferedBlockMgr::Client * block_mgr_client_
Client to the buffered block mgr.
bool AttachProbeFilters(RuntimeState *state)
Attach the probe filters to runtime state.
llvm::Value * GetIntConstant(PrimitiveType type, int64_t val)
Returns the constant 'val' of 'type'.
int ProcessProbeBatch(RowBatch *out_batch, HashTableCtx *ht_ctx)
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
static int64_t EstimateSize(int64_t num_rows)
RuntimeProfile::Counter * build_row_counter_
llvm::Function * FinalizeFunction(llvm::Function *function)
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
Status PrepareNullAwarePartition()
Initializes null_aware_partition_ and nulls_build_batch_ to output rows.
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
Status UnpinStream(bool all=false)
Status SwitchToIoBuffers(bool *got_buffer)
Status CleanUpHashPartitions(RowBatch *batch)
boost::scoped_ptr< HashTableCtx > ht_ctx_
boost::scoped_ptr< RowBatch > probe_batch_
HashTable::Iterator hash_tbl_iterator_
The iterator that corresponds to the look up of current_probe_row_.
llvm::Function * ReplaceCallSites(llvm::Function *caller, bool update_in_place, llvm::Function *new_fn, const std::string &target_name, int *num_replaced)
static const int PARTITION_FANOUT
TupleRow * semi_join_staging_row_
std::vector< ExprContext * > conjunct_ctxs_
State state_
State of the algorithm. Used just for debugging.
llvm::LLVMContext & context()
TupleRow * GetRow() const
Status GetNext(RowBatch *batch, bool *eos, std::vector< RowIdx > *indices=NULL)
virtual std::string DebugString() const
llvm::Function * OptimizeFunctionWithExprs(llvm::Function *fn)
int64_t size() const
Returns number of elements inserted in the hash table.
virtual Status Reset(RuntimeState *state)
static uint32_t GetHashValue(const void *v, const ColumnType &type, uint32_t seed=0)
llvm::PointerType * ptr_type()
std::vector< std::pair< SlotId, Bitmap * > > probe_filters_
boost::scoped_ptr< HashTable > hash_tbl_
The hash table for this partition.
RuntimeProfile * runtime_profile()
static const int NUM_PARTITIONING_BITS
Needs to be the log(PARTITION_FANOUT)
RuntimeProfile::Counter * num_repartitions_
Number of partitions that have been repartitioned.
DEFINE_bool(enable_phj_probe_side_filtering, true,"Enables pushing PHJ build side filters to probe side")