Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
partitioned-hash-join-node.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_EXEC_PARTITIONED_HASH_JOIN_NODE_H
17 #define IMPALA_EXEC_PARTITIONED_HASH_JOIN_NODE_H
18 
19 #include <boost/scoped_ptr.hpp>
20 #include <boost/unordered_set.hpp>
21 #include <boost/thread.hpp>
22 #include <string>
23 
25 #include "exec/exec-node.h"
26 #include "exec/hash-table.h"
28 
29 #include "gen-cpp/PlanNodes_types.h" // for TJoinOp
30 
31 namespace impala {
32 
33 class BufferedBlockMgr;
34 class MemPool;
35 class RowBatch;
36 class TupleRow;
37 class BufferedTupleStream;
38 
50 //
58  public:
59  PartitionedHashJoinNode(ObjectPool* pool, const TPlanNode& tnode,
60  const DescriptorTbl& descs);
61 
62  virtual Status Init(const TPlanNode& tnode);
63  virtual Status Prepare(RuntimeState* state);
65  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
66  virtual Status Reset(RuntimeState* state);
67  virtual void Close(RuntimeState* state);
68 
69  protected:
70  virtual void AddToDebugString(int indentation_level, std::stringstream* out) const;
71  virtual Status InitGetNext(TupleRow* first_probe_row);
72  virtual Status ConstructBuildSide(RuntimeState* state);
73 
74  private:
75  class Partition;
76 
89  //
95  enum State {
99 
103 
107 
112  };
113 
116  static const int PARTITION_FANOUT = 16;
117 
119  static const int NUM_PARTITIONING_BITS = 4;
120 
130  static const int MAX_PARTITION_DEPTH = 16;
131 
138 
144  bool AppendRow(BufferedTupleStream* stream, TupleRow* row);
145 
149 
154  Status SpillPartition(Partition** spilled_partition);
155 
160  Status ProcessBuildInput(RuntimeState* state, int level);
161 
163  Status ProcessBuildBatch(RowBatch* build_batch);
164 
172 
179  template<int const JoinOp>
180  int ProcessProbeBatch(RowBatch* out_batch, HashTableCtx* ht_ctx);
181 
183  int ProcessProbeBatch(
184  const TJoinOp::type join_op, RowBatch* out_batch, HashTableCtx* ht_ctx);
185 
191  void OutputUnmatchedBuild(RowBatch* out_batch);
192 
195 
199 
205 
209 
213 
222 
226 
230 
234 
242 
245  int64_t LargestSpilledPartition() const;
246 
248  void ResetForProbe();
249 
252  bool AllocateProbeFilters(RuntimeState* state);
253 
255  bool AttachProbeFilters(RuntimeState* state);
256 
258  llvm::Function* CodegenCreateOutputRow(LlvmCodeGen* codegen);
259 
263  RuntimeState* state, llvm::Function* hash_fn, llvm::Function* murmur_hash_fn);
264 
268  RuntimeState* state, llvm::Function* hash_fn, llvm::Function* murmur_hash_fn);
269 
271  std::string PrintState() const;
272 
274  void UpdateState(State s);
275 
276  std::string NodeDebugString() const;
277 
282  int MinRequiredBuffers() const {
283  int num_reserved_buffers = PARTITION_FANOUT * 2 + 2;
284  num_reserved_buffers += join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ? 3 : 0;
285  return num_reserved_buffers;
286  }
287 
289 
292  std::vector<ExprContext*> probe_expr_ctxs_;
293  std::vector<ExprContext*> build_expr_ctxs_;
294 
296  std::vector<ExprContext*> other_join_conjunct_ctxs_;
297 
300 
304 
307 
311  boost::scoped_ptr<HashTableCtx> ht_ctx_;
312 
315 
318 
321 
324 
327 
331 
334 
337 
341 
344 
345  class Partition {
346  public:
347  Partition(RuntimeState* state, PartitionedHashJoinNode* parent, int level,
348  bool use_small_buffers);
349  ~Partition();
350 
353  HashTable* hash_tbl() const { return hash_tbl_.get(); }
354 
355  bool is_closed() const { return is_closed_; }
356  bool is_spilled() const { return is_spilled_; }
357 
362  void Close(RowBatch* batch);
363 
366  int64_t EstimatedInMemSize() const;
367 
370  int64_t InMemSize() const;
371 
377  template<bool const AddProbeFilters>
378  Status BuildHashTableInternal(RuntimeState* state, bool* built);
379 
381  Status BuildHashTable(RuntimeState* state, bool* built, const bool add_probe_filters);
382 
386  Status Spill(bool unpin_all_build);
387 
388  private:
390 
392 
396 
399 
403  int level_;
404 
406  boost::scoped_ptr<HashTable> hash_tbl_;
407 
415  };
416 
425 
427  typedef int (*ProcessProbeBatchFn)(
435 
439  std::list<Partition*> spilled_partitions_;
440 
444  std::vector<Partition*> hash_partitions_;
445 
452 
457 
461  std::list<Partition*> output_build_partitions_;
462 
465  std::vector<std::pair<SlotId, Bitmap*> > probe_filters_;
466 
478 
481  boost::scoped_ptr<RowBatch> nulls_build_batch_;
482 
485 
489 
493  std::vector<bool> matched_null_probe_;
494 
498 };
499 
500 }
501 
502 #endif
stl-like iterator interface.
Definition: hash-table.h:450
void UpdateState(State s)
Updates state_ to 's', logging the transition.
The underlying memory management is done by the BufferedBlockMgr.
Status(* ProcessBuildBatchFn)(PartitionedHashJoinNode *, RowBatch *)
llvm function and signature for codegening build batch.
bool AppendRow(BufferedTupleStream *stream, TupleRow *row)
std::list< Partition * > spilled_partitions_
bool non_empty_build_
If true, the build side has at least one row.
Status BuildHashTableInternal(RuntimeState *state, bool *built)
RuntimeProfile::Counter * partition_build_timer_
Total time spent partitioning build.
RuntimeProfile::Counter * num_probe_rows_partitioned_
Status OutputNullAwareProbeRows(RuntimeState *state, RowBatch *out_batch)
std::vector< ExprContext * > other_join_conjunct_ctxs_
Non-equi-join conjuncts from the JOIN clause.
llvm::Function * CodegenCreateOutputRow(LlvmCodeGen *codegen)
Codegen function to create output row. Assumes that the probe row is non-NULL.
bool AllocateProbeFilters(RuntimeState *state)
Status ProcessBuildBatch(RowBatch *build_batch)
Reads the rows in build_batch and partitions them in hash_partitions_.
RuntimeProfile::HighWaterMarkCounter * max_partition_level_
Level of max partition (i.e. number of repartitioning steps).
PartitionedHashJoinNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
RuntimeProfile::Counter * null_aware_eval_timer_
Time spent evaluating other_join_conjuncts for NAAJ.
bool CodegenProcessBuildBatch(RuntimeState *state, llvm::Function *hash_fn, llvm::Function *murmur_hash_fn)
Status ProcessBuildInput(RuntimeState *state, int level)
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
Open() implemented in BlockingJoinNode.
Status SpillPartition(Partition **spilled_partition)
HashTable * hash_tbls_[PARTITION_FANOUT]
Status BuildHashTables(RuntimeState *state)
virtual void Close(RuntimeState *state)
bool CodegenProcessProbeBatch(RuntimeState *state, llvm::Function *hash_fn, llvm::Function *murmur_hash_fn)
std::string PrintState() const
Returns the current state of the partition as a string.
RuntimeProfile::Counter * num_hash_buckets_
Total number of hash buckets across all partitions.
void ResetForProbe()
Prepares for probing the next batch.
Status NextSpilledProbeRowBatch(RuntimeState *, RowBatch *out_batch)
virtual void AddToDebugString(int indentation_level, std::stringstream *out) const
int(* ProcessProbeBatchFn)(PartitionedHashJoinNode *, RowBatch *, HashTableCtx *)
llvm function and signature for codegening probe batch.
std::vector< Partition * > hash_partitions_
virtual Status Init(const TPlanNode &tnode)
LLVM code generator. This is the top level object to generate jitted code.
Definition: llvm-codegen.h:107
virtual Status Prepare(RuntimeState *state)
Status BuildHashTable(RuntimeState *state, bool *built, const bool add_probe_filters)
Wrapper for the template-based BuildHashTable() based on 'add_probe_filters'.
boost::scoped_ptr< RowBatch > nulls_build_batch_
RuntimeProfile::Counter * num_spilled_partitions_
Number of partitions that have been spilled.
ObjectPool pool
bool using_small_buffers_
If true, the partitions in hash_partitions_ are using small buffers.
std::vector< ExprContext * > probe_expr_ctxs_
void OutputUnmatchedBuild(RowBatch *out_batch)
Status EvaluateNullProbe(BufferedTupleStream *build)
Status NextProbeRowBatch(RuntimeState *, RowBatch *out_batch)
virtual Status InitGetNext(TupleRow *first_probe_row)
Partition(RuntimeState *state, PartitionedHashJoinNode *parent, int level, bool use_small_buffers)
RuntimeProfile::HighWaterMarkCounter * largest_partition_percent_
RuntimeProfile::Counter * num_build_rows_partitioned_
Number of build/probe rows that have been partitioned.
RuntimeProfile::Counter * partitions_created_
Total number of partitions created.
std::vector< ExprContext * > build_expr_ctxs_
bool AppendRowStreamFull(BufferedTupleStream *stream, TupleRow *row)
Status OutputNullAwareNullProbe(RuntimeState *state, RowBatch *out_batch)
std::list< Partition * > output_build_partitions_
virtual Status ConstructBuildSide(RuntimeState *state)
BufferedBlockMgr::Client * block_mgr_client_
Client to the buffered block mgr.
bool AttachProbeFilters(RuntimeState *state)
Attach the probe filters to runtime state.
int ProcessProbeBatch(RowBatch *out_batch, HashTableCtx *ht_ctx)
Status PrepareNullAwarePartition()
Initializes null_aware_partition_ and nulls_build_batch_ to output rows.
boost::scoped_ptr< HashTableCtx > ht_ctx_
HashTable::Iterator hash_tbl_iterator_
The iterator that corresponds to the look up of current_probe_row_.
State state_
State of the algorithm. Used just for debugging.
virtual Status Reset(RuntimeState *state)
std::vector< std::pair< SlotId, Bitmap * > > probe_filters_
bool is_spilled_
True if this partition is spilled.
boost::scoped_ptr< HashTable > hash_tbl_
The hash table for this partition.
static const int NUM_PARTITIONING_BITS
Needs to be the log(PARTITION_FANOUT)
RuntimeProfile::Counter * num_repartitions_
Number of partitions that have been repartitioned.