16 #ifndef IMPALA_EXEC_PARTITIONED_HASH_JOIN_NODE_H
17 #define IMPALA_EXEC_PARTITIONED_HASH_JOIN_NODE_H
19 #include <boost/scoped_ptr.hpp>
20 #include <boost/unordered_set.hpp>
21 #include <boost/thread.hpp>
29 #include "gen-cpp/PlanNodes_types.h"
33 class BufferedBlockMgr;
37 class BufferedTupleStream;
70 virtual void AddToDebugString(
int indentation_level, std::stringstream* out)
const;
179 template<
int const JoinOp>
263 RuntimeState* state, llvm::Function* hash_fn, llvm::Function* murmur_hash_fn);
268 RuntimeState* state, llvm::Function* hash_fn, llvm::Function* murmur_hash_fn);
284 num_reserved_buffers +=
join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ? 3 : 0;
285 return num_reserved_buffers;
348 bool use_small_buffers);
377 template<
bool const AddProbeFilters>
RuntimeState * runtime_state_
stl-like iterator interface.
void UpdateState(State s)
Updates state_ to 's', logging the transition.
The underlying memory management is done by the BufferedBlockMgr.
Status(* ProcessBuildBatchFn)(PartitionedHashJoinNode *, RowBatch *)
llvm function and signature for codegening build batch.
ProcessProbeBatchFn process_probe_batch_fn_level0_
bool AppendRow(BufferedTupleStream *stream, TupleRow *row)
std::list< Partition * > spilled_partitions_
bool non_empty_build_
If true, the build side has at least one row.
int64_t EstimatedInMemSize() const
Status BuildHashTableInternal(RuntimeState *state, bool *built)
RuntimeProfile::Counter * partition_build_timer_
Total time spent partitioning build.
RuntimeProfile::Counter * num_probe_rows_partitioned_
Status OutputNullAwareProbeRows(RuntimeState *state, RowBatch *out_batch)
std::vector< ExprContext * > other_join_conjunct_ctxs_
Non-equi-join conjuncts from the JOIN clause.
llvm::Function * CodegenCreateOutputRow(LlvmCodeGen *codegen)
Codegen function to create output row. Assumes that the probe row is non-NULL.
bool AllocateProbeFilters(RuntimeState *state)
Status ProcessBuildBatch(RowBatch *build_batch)
Reads the rows in build_batch and partitions them in hash_partitions_.
RuntimeProfile::HighWaterMarkCounter * max_partition_level_
Level of max partition (i.e. number of repartitioning steps).
PartitionedHashJoinNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
RuntimeProfile::Counter * null_aware_eval_timer_
Time spent evaluating other_join_conjuncts for NAAJ.
bool CodegenProcessBuildBatch(RuntimeState *state, llvm::Function *hash_fn, llvm::Function *murmur_hash_fn)
Status ProcessBuildInput(RuntimeState *state, int level)
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
Open() implemented in BlockingJoinNode.
Status SpillPartition(Partition **spilled_partition)
HashTable * hash_tbls_[PARTITION_FANOUT]
static const int MAX_IN_MEM_BUILD_TABLES
Status BuildHashTables(RuntimeState *state)
virtual void Close(RuntimeState *state)
bool CodegenProcessProbeBatch(RuntimeState *state, llvm::Function *hash_fn, llvm::Function *murmur_hash_fn)
std::string PrintState() const
Returns the current state of the partition as a string.
RuntimeProfile::Counter * num_hash_buckets_
Total number of hash buckets across all partitions.
void ResetForProbe()
Prepares for probing the next batch.
Partition * input_partition_
BufferedTupleStream * probe_rows()
Status NextSpilledProbeRowBatch(RuntimeState *, RowBatch *out_batch)
virtual void AddToDebugString(int indentation_level, std::stringstream *out) const
HashTable * hash_tbl() const
std::vector< bool > matched_null_probe_
Status PrepareNextPartition(RuntimeState *)
int64_t InMemSize() const
int(* ProcessProbeBatchFn)(PartitionedHashJoinNode *, RowBatch *, HashTableCtx *)
llvm function and signature for codegening probe batch.
std::vector< Partition * > hash_partitions_
BufferedTupleStream * null_probe_rows_
void Close(RowBatch *batch)
virtual Status Init(const TPlanNode &tnode)
LLVM code generator. This is the top level object to generate jitted code.
int MinRequiredBuffers() const
int64_t null_probe_output_idx_
ProcessProbeBatchFn process_probe_batch_fn_
virtual Status Prepare(RuntimeState *state)
std::string NodeDebugString() const
ProcessBuildBatchFn process_build_batch_fn_level0_
Status BuildHashTable(RuntimeState *state, bool *built, const bool add_probe_filters)
Wrapper for the template-based BuildHashTable() based on 'add_probe_filters'.
boost::scoped_ptr< RowBatch > nulls_build_batch_
BufferedTupleStream * probe_rows_
ProcessBuildBatchFn process_build_batch_fn_
RuntimeProfile::Counter * num_spilled_partitions_
Number of partitions that have been spilled.
bool using_small_buffers_
If true, the partitions in hash_partitions_ are using small buffers.
std::vector< ExprContext * > probe_expr_ctxs_
void OutputUnmatchedBuild(RowBatch *out_batch)
Status EvaluateNullProbe(BufferedTupleStream *build)
Status NextProbeRowBatch(RuntimeState *, RowBatch *out_batch)
BufferedTupleStream * build_rows_
Status Spill(bool unpin_all_build)
int64_t LargestSpilledPartition() const
virtual Status InitGetNext(TupleRow *first_probe_row)
Partition(RuntimeState *state, PartitionedHashJoinNode *parent, int level, bool use_small_buffers)
RuntimeProfile::HighWaterMarkCounter * largest_partition_percent_
RuntimeProfile::Counter * num_build_rows_partitioned_
Number of build/probe rows that have been partitioned.
BufferedTupleStream * build_rows()
RuntimeProfile::Counter * partitions_created_
Total number of partitions created.
Status ReserveTupleStreamBlocks()
static const int MAX_PARTITION_DEPTH
std::vector< ExprContext * > build_expr_ctxs_
bool AppendRowStreamFull(BufferedTupleStream *stream, TupleRow *row)
Status PrepareNullAwareNullProbe()
Status OutputNullAwareNullProbe(RuntimeState *state, RowBatch *out_batch)
std::list< Partition * > output_build_partitions_
virtual Status ConstructBuildSide(RuntimeState *state)
Partition * null_aware_partition_
BufferedBlockMgr::Client * block_mgr_client_
Client to the buffered block mgr.
bool AttachProbeFilters(RuntimeState *state)
Attach the probe filters to runtime state.
int ProcessProbeBatch(RowBatch *out_batch, HashTableCtx *ht_ctx)
Status PrepareNullAwarePartition()
Initializes null_aware_partition_ and nulls_build_batch_ to output rows.
Status CleanUpHashPartitions(RowBatch *batch)
boost::scoped_ptr< HashTableCtx > ht_ctx_
HashTable::Iterator hash_tbl_iterator_
The iterator that corresponds to the look up of current_probe_row_.
static const int PARTITION_FANOUT
State state_
State of the algorithm. Used just for debugging.
PartitionedHashJoinNode * parent_
virtual Status Reset(RuntimeState *state)
std::vector< std::pair< SlotId, Bitmap * > > probe_filters_
bool is_spilled_
True if this partition is spilled.
boost::scoped_ptr< HashTable > hash_tbl_
The hash table for this partition.
static const int NUM_PARTITIONING_BITS
Needs to be the log(PARTITION_FANOUT)
RuntimeProfile::Counter * num_repartitions_
Number of partitions that have been repartitioned.