Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
partitioned-aggregation-node.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_EXEC_PARTITIONED_AGGREGATION_NODE_H
17 #define IMPALA_EXEC_PARTITIONED_AGGREGATION_NODE_H
18 
19 #include <functional>
20 #include <boost/scoped_ptr.hpp>
21 
22 #include "exec/exec-node.h"
23 #include "exec/hash-table.h"
26 #include "runtime/descriptors.h" // for TupleId
27 #include "runtime/mem-pool.h"
28 #include "runtime/string-value.h"
29 
30 namespace llvm {
31  class Function;
32 }
33 
34 namespace impala {
35 
36 class AggFnEvaluator;
37 class LlvmCodeGen;
38 class RowBatch;
39 class RuntimeState;
40 struct StringValue;
41 class Tuple;
42 class TupleDescriptor;
43 class SlotDescriptor;
44 
56 //
73 //
79 //
92  public:
94  const TPlanNode& tnode, const DescriptorTbl& descs);
95 
96  virtual Status Init(const TPlanNode& tnode);
97  virtual Status Prepare(RuntimeState* state);
98  virtual Status Open(RuntimeState* state);
99  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
100  virtual Status Reset(RuntimeState* state);
101  virtual void Close(RuntimeState* state);
102 
103  static const char* LLVM_CLASS_NAME;
104 
105  protected:
107  virtual Status QueryMaintenance(RuntimeState* state);
108 
109  virtual void DebugString(int indentation_level, std::stringstream* out) const;
110 
111  private:
112  struct Partition;
113 
115  static const int PARTITION_FANOUT = 16;
116 
121  static const int NUM_PARTITIONING_BITS = 4;
122 
130  static const int MAX_PARTITION_DEPTH = 16;
131 
135 
137  boost::scoped_ptr<RowDescriptor> intermediate_row_desc_;
138 
143 
148  const bool needs_finalize_;
149 
152 
153  std::vector<AggFnEvaluator*> aggregate_evaluators_;
154 
161  std::vector<impala_udf::FunctionContext*> agg_fn_ctxs_;
162  boost::scoped_ptr<MemPool> agg_fn_pool_;
163 
165  std::vector<ExprContext*> probe_expr_ctxs_;
166 
169  std::vector<ExprContext*> build_expr_ctxs_;
170 
174 
177 
180 
186 
189  boost::scoped_ptr<MemPool> mem_pool_;
190 
194  boost::scoped_ptr<HashTableCtx> ht_ctx_;
195 
200 
205 
208 
211 
214 
217 
220 
223 
226 
229 
232 
236 
237  struct Partition {
239  : parent(parent), is_closed(false), level(level) {}
240 
246 
248  bool InitHashTable();
249 
253  void Close(bool finalize_rows);
254 
259  Status Spill(Tuple* tuple = NULL);
260 
261  bool is_spilled() const { return hash_tbl.get() == NULL; }
262 
264 
266  bool is_closed;
267 
271  const int level;
272 
276  boost::scoped_ptr<HashTable> hash_tbl;
277 
279  std::vector<impala_udf::FunctionContext*> agg_fn_ctxs;
280  boost::scoped_ptr<MemPool> agg_fn_pool;
281 
286  boost::scoped_ptr<BufferedTupleStream> aggregated_row_stream;
287 
289  boost::scoped_ptr<BufferedTupleStream> unaggregated_row_stream;
290  };
291 
293  std::vector<Partition*> hash_partitions_;
294 
296  std::list<Partition*> spilled_partitions_;
297 
301  std::list<Partition*> aggregated_partitions_;
302 
306  boost::scoped_ptr<BufferedTupleStream> serialize_stream_;
307 
315  const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs,
316  MemPool* pool, BufferedTupleStream* stream);
317 
326  void UpdateTuple(impala_udf::FunctionContext** agg_fn_ctxs, Tuple* tuple, TupleRow* row,
327  bool is_merge = false);
328 
338  Tuple* GetOutputTuple(const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs,
339  Tuple* tuple, MemPool* pool);
340 
344  Status ProcessBatchNoGrouping(RowBatch* batch, HashTableCtx* ht_ctx = NULL);
345 
350  //
353  template<bool AGGREGATED_ROWS>
355 
357  template<bool AGGREGATED_ROWS>
359 
362  Status CreateHashPartitions(int level);
363 
367  int64_t LargestSpilledPartition() const;
368 
374 
380  Status SpillPartition(Partition* curr_partition = NULL,
381  Tuple* curr_intermediate_tuple = NULL);
382 
387  Status MoveHashPartitions(int64_t input_rows);
388 
390  void CleanupHashTbl(const std::vector<impala_udf::FunctionContext*>& fn_ctxs,
392 
395  llvm::Function* CodegenUpdateSlot(AggFnEvaluator* evaluator, SlotDescriptor* slot_desc);
396 
398  llvm::Function* CodegenUpdateTuple();
399 
405  llvm::Function* CodegenProcessBatch();
406 
412 
418  int MinRequiredBuffers() const {
419  return 2 * PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0);
420  }
421 };
422 
423 }
424 
425 #endif
stl-like iterator interface.
Definition: hash-table.h:450
The underlying memory management is done by the BufferedBlockMgr.
std::list< Partition * > spilled_partitions_
All partitions that have been spilled and need further processing.
bool is_closed
If true, this partition is closed and there is nothing left to do.
bool needs_serialize_
Contains any evaluators that require the serialize step.
RuntimeProfile::Counter * partitions_created_
Total number of partitions created.
Status ProcessBatch_false(RowBatch *batch, HashTableCtx *ht_ctx)
Tuple * ConstructIntermediateTuple(const std::vector< impala_udf::FunctionContext * > &agg_fn_ctxs, MemPool *pool, BufferedTupleStream *stream)
Tuple * GetOutputTuple(const std::vector< impala_udf::FunctionContext * > &agg_fn_ctxs, Tuple *tuple, MemPool *pool)
RuntimeProfile::Counter * num_hash_buckets_
Total number of hash buckets across all partitions.
boost::scoped_ptr< BufferedTupleStream > unaggregated_row_stream
Unaggregated rows that are spilled.
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
boost::scoped_ptr< RowDescriptor > intermediate_row_desc_
Row with the intermediate tuple as its only tuple.
RuntimeProfile::Counter * num_spilled_partitions_
Number of partitions that have been spilled.
boost::scoped_ptr< BufferedTupleStream > serialize_stream_
void CleanupHashTbl(const std::vector< impala_udf::FunctionContext * > &fn_ctxs, HashTable::Iterator it)
Calls finalizes on all tuples starting at 'it'.
std::vector< impala_udf::FunctionContext * > agg_fn_ctxs_
static const int PARTITION_FANOUT
Number of initial partitions to create. Must be a power of 2.
int TupleId
Definition: global-types.h:23
RuntimeProfile::HighWaterMarkCounter * largest_partition_percent_
#define IR_ALWAYS_INLINE
Definition: impala-ir.h:31
RuntimeProfile::Counter * num_row_repartitioned_
Number of rows that have been repartitioned.
virtual Status Init(const TPlanNode &tnode)
Status SpillPartition(Partition *curr_partition=NULL, Tuple *curr_intermediate_tuple=NULL)
bool InitHashTable()
Initializes the hash table. Returns false on OOM.
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
Definition: exec-node.cc:345
Status ProcessBatch_true(RowBatch *batch, HashTableCtx *ht_ctx)
TupleId intermediate_tuple_id_
Tuple into which Update()/Merge()/Serialize() results are stored.
ObjectPool pool
virtual void Close(RuntimeState *state)
RuntimeProfile::HighWaterMarkCounter * max_partition_level_
Level of max partition (i.e. number of repartitioning steps).
bool using_small_buffers_
If true, the partitions in hash_partitions_ are using small buffers.
virtual Status Reset(RuntimeState *state)
std::vector< Partition * > hash_partitions_
Current partitions we are partitioning into.
RuntimeProfile::Counter * build_timer_
Time spent processing the child rows.
llvm::Function * CodegenUpdateTuple()
Codegen UpdateTuple(). Returns NULL if codegen is unsuccessful.
RuntimeProfile::Counter * num_repartitions_
Number of partitions that have been repartitioned.
Partition(PartitionedAggregationNode *parent, int level)
std::vector< impala_udf::FunctionContext * > agg_fn_ctxs
Clone of parent's agg_fn_ctxs_ and backing MemPool.
ProcessRowBatchFn process_row_batch_fn_
Jitted ProcessRowBatch function pointer. Null if codegen is disabled.
boost::scoped_ptr< BufferedTupleStream > aggregated_row_stream
RuntimeProfile::Counter * get_results_timer_
Time spent returning the aggregated rows.
virtual Status QueryMaintenance(RuntimeState *state)
Frees local allocations from aggregate_evaluators_ and agg_fn_ctxs.
Status ProcessBatchNoGrouping(RowBatch *batch, HashTableCtx *ht_ctx=NULL)
virtual Status Prepare(RuntimeState *state)
std::vector< AggFnEvaluator * > aggregate_evaluators_
std::vector< ExprContext * > probe_expr_ctxs_
Exprs used to evaluate input rows.
Status ProcessStream(BufferedTupleStream *input_stream)
Reads all the rows from input_stream and process them by calling ProcessBatch().
Status(* ProcessRowBatchFn)(PartitionedAggregationNode *, RowBatch *, HashTableCtx *)
void UpdateTuple(impala_udf::FunctionContext **agg_fn_ctxs, Tuple *tuple, TupleRow *row, bool is_merge=false)
virtual Status Open(RuntimeState *state)
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
boost::scoped_ptr< HashTableCtx > ht_ctx_
llvm::Function * CodegenUpdateSlot(AggFnEvaluator *evaluator, SlotDescriptor *slot_desc)
RuntimeProfile::Counter * ht_resize_timer_
Total time spent resizing hash tables.
Status IR_ALWAYS_INLINE ProcessBatch(RowBatch *batch, HashTableCtx *ht_ctx)
PartitionedAggregationNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)