16 #ifndef IMPALA_EXEC_PARTITIONED_AGGREGATION_NODE_H
17 #define IMPALA_EXEC_PARTITIONED_AGGREGATION_NODE_H
20 #include <boost/scoped_ptr.hpp>
42 class TupleDescriptor;
109 virtual void DebugString(
int indentation_level, std::stringstream* out)
const;
239 : parent(parent),
is_closed(false), level(level) {}
253 void Close(
bool finalize_rows);
315 const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs,
327 bool is_merge =
false);
353 template<
bool AGGREGATED_ROWS>
357 template<
bool AGGREGATED_ROWS>
381 Tuple* curr_intermediate_tuple = NULL);
390 void CleanupHashTbl(
const std::vector<impala_udf::FunctionContext*>& fn_ctxs,
stl-like iterator interface.
Partition * output_partition_
The underlying memory management is done by the BufferedBlockMgr.
Status MoveHashPartitions(int64_t input_rows)
int MinRequiredBuffers() const
std::list< Partition * > spilled_partitions_
All partitions that have been spilled and need further processing.
bool is_closed
If true, this partition is closed and there is nothing left to do.
bool needs_serialize_
Contains any evaluators that require the serialize step.
RuntimeProfile::Counter * partitions_created_
Total number of partitions created.
Status ProcessBatch_false(RowBatch *batch, HashTableCtx *ht_ctx)
HashTable::Iterator output_iterator_
Tuple * ConstructIntermediateTuple(const std::vector< impala_udf::FunctionContext * > &agg_fn_ctxs, MemPool *pool, BufferedTupleStream *stream)
static const char * LLVM_CLASS_NAME
Tuple * GetOutputTuple(const std::vector< impala_udf::FunctionContext * > &agg_fn_ctxs, Tuple *tuple, MemPool *pool)
RuntimeProfile::Counter * num_hash_buckets_
Total number of hash buckets across all partitions.
boost::scoped_ptr< BufferedTupleStream > unaggregated_row_stream
Unaggregated rows that are spilled.
boost::scoped_ptr< MemPool > agg_fn_pool
void Close(bool finalize_rows)
bool singleton_output_tuple_returned_
A tuple with 0 materialised slots is represented as NULL.
boost::scoped_ptr< RowDescriptor > intermediate_row_desc_
Row with the intermediate tuple as its only tuple.
RuntimeProfile::Counter * num_spilled_partitions_
Number of partitions that have been spilled.
boost::scoped_ptr< BufferedTupleStream > serialize_stream_
void CleanupHashTbl(const std::vector< impala_udf::FunctionContext * > &fn_ctxs, HashTable::Iterator it)
Calls finalizes on all tuples starting at 'it'.
std::vector< impala_udf::FunctionContext * > agg_fn_ctxs_
static const int PARTITION_FANOUT
Number of initial partitions to create. Must be a power of 2.
Status Spill(Tuple *tuple=NULL)
Status CreateHashPartitions(int level)
bool contains_var_len_grouping_exprs_
BufferedBlockMgr::Client * block_mgr_client_
Tuple * singleton_output_tuple_
RuntimeProfile::HighWaterMarkCounter * largest_partition_percent_
RuntimeProfile::Counter * num_row_repartitioned_
Number of rows that have been repartitioned.
virtual Status Init(const TPlanNode &tnode)
Status SpillPartition(Partition *curr_partition=NULL, Tuple *curr_intermediate_tuple=NULL)
bool InitHashTable()
Initializes the hash table. Returns false on OOM.
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
int64_t LargestSpilledPartition() const
Status ProcessBatch_true(RowBatch *batch, HashTableCtx *ht_ctx)
boost::scoped_ptr< MemPool > mem_pool_
static const int NUM_PARTITIONING_BITS
TupleId intermediate_tuple_id_
Tuple into which Update()/Merge()/Serialize() results are stored.
boost::scoped_ptr< HashTable > hash_tbl
static const int MAX_PARTITION_DEPTH
virtual void Close(RuntimeState *state)
RuntimeProfile::HighWaterMarkCounter * max_partition_level_
Level of max partition (i.e. number of repartitioning steps).
bool using_small_buffers_
If true, the partitions in hash_partitions_ are using small buffers.
virtual Status Reset(RuntimeState *state)
PartitionedAggregationNode * parent
std::vector< Partition * > hash_partitions_
Current partitions we are partitioning into.
RuntimeProfile::Counter * build_timer_
Time spent processing the child rows.
llvm::Function * CodegenUpdateTuple()
Codegen UpdateTuple(). Returns NULL if codegen is unsuccessful.
std::list< Partition * > aggregated_partitions_
TupleDescriptor * intermediate_tuple_desc_
llvm::Function * CodegenProcessBatch()
RuntimeProfile::Counter * num_repartitions_
Number of partitions that have been repartitioned.
const bool needs_finalize_
Partition(PartitionedAggregationNode *parent, int level)
TupleDescriptor * output_tuple_desc_
std::vector< impala_udf::FunctionContext * > agg_fn_ctxs
Clone of parent's agg_fn_ctxs_ and backing MemPool.
ProcessRowBatchFn process_row_batch_fn_
Jitted ProcessRowBatch function pointer. Null if codegen is disabled.
boost::scoped_ptr< BufferedTupleStream > aggregated_row_stream
RuntimeProfile::Counter * get_results_timer_
Time spent returning the aggregated rows.
virtual Status QueryMaintenance(RuntimeState *state)
Frees local allocations from aggregate_evaluators_ and agg_fn_ctxs.
Status ProcessBatchNoGrouping(RowBatch *batch, HashTableCtx *ht_ctx=NULL)
virtual Status Prepare(RuntimeState *state)
boost::scoped_ptr< MemPool > agg_fn_pool_
std::vector< AggFnEvaluator * > aggregate_evaluators_
std::vector< ExprContext * > probe_expr_ctxs_
Exprs used to evaluate input rows.
std::vector< ExprContext * > build_expr_ctxs_
Status ProcessStream(BufferedTupleStream *input_stream)
Reads all the rows from input_stream and process them by calling ProcessBatch().
Status(* ProcessRowBatchFn)(PartitionedAggregationNode *, RowBatch *, HashTableCtx *)
void UpdateTuple(impala_udf::FunctionContext **agg_fn_ctxs, Tuple *tuple, TupleRow *row, bool is_merge=false)
virtual Status Open(RuntimeState *state)
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
boost::scoped_ptr< HashTableCtx > ht_ctx_
llvm::Function * CodegenUpdateSlot(AggFnEvaluator *evaluator, SlotDescriptor *slot_desc)
RuntimeProfile::Counter * ht_resize_timer_
Total time spent resizing hash tables.
Status IR_ALWAYS_INLINE ProcessBatch(RowBatch *batch, HashTableCtx *ht_ctx)
PartitionedAggregationNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)