Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
impala::PartitionedAggregationNode Class Reference

#include <partitioned-aggregation-node.h>

Inheritance diagram for impala::PartitionedAggregationNode:
Collaboration diagram for impala::PartitionedAggregationNode:

Classes

struct  Partition
 

Public Member Functions

 PartitionedAggregationNode (ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
 
virtual Status Init (const TPlanNode &tnode)
 
virtual Status Prepare (RuntimeState *state)
 
virtual Status Open (RuntimeState *state)
 
virtual Status GetNext (RuntimeState *state, RowBatch *row_batch, bool *eos)
 
virtual Status Reset (RuntimeState *state)
 
virtual void Close (RuntimeState *state)
 
template<bool AGGREGATED_ROWS>
Status ProcessBatch (RowBatch *batch, HashTableCtx *ht_ctx)
 
void CollectNodes (TPlanNodeType::type node_type, std::vector< ExecNode * > *nodes)
 
void CollectScanNodes (std::vector< ExecNode * > *nodes)
 Collect all scan node types. More...
 
std::string DebugString () const
 Returns a string representation in DFS order of the plan rooted at this. More...
 
const std::vector< ExprContext * > & conjunct_ctxs () const
 
int id () const
 
TPlanNodeType::type type () const
 
const RowDescriptorrow_desc () const
 
int64_t rows_returned () const
 
int64_t limit () const
 
bool ReachedLimit ()
 
RuntimeProfileruntime_profile ()
 
MemTrackermem_tracker ()
 
MemTrackerexpr_mem_tracker ()
 

Static Public Member Functions

static Status CreateTree (ObjectPool *pool, const TPlan &plan, const DescriptorTbl &descs, ExecNode **root)
 
static void SetDebugOptions (int node_id, TExecNodePhase::type phase, TDebugAction::type action, ExecNode *tree)
 Set debug action for node with given id in 'tree'. More...
 
static bool EvalConjuncts (ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
 
static llvm::Function * CodegenEvalConjuncts (RuntimeState *state, const std::vector< ExprContext * > &conjunct_ctxs, const char *name="EvalConjuncts")
 
static int GetNodeIdFromProfile (RuntimeProfile *p)
 Extract node id from p->name(). More...
 

Static Public Attributes

static const char * LLVM_CLASS_NAME
 
static const std::string ROW_THROUGHPUT_COUNTER = "RowsReturnedRate"
 Names of counters shared by all exec nodes. More...
 

Protected Member Functions

virtual Status QueryMaintenance (RuntimeState *state)
 Frees local allocations from aggregate_evaluators_ and agg_fn_ctxs. More...
 
virtual void DebugString (int indentation_level, std::stringstream *out) const
 
ExecNodechild (int i)
 
bool is_closed ()
 
virtual bool IsScanNode () const
 
void InitRuntimeProfile (const std::string &name)
 
Status ExecDebugAction (TExecNodePhase::type phase, RuntimeState *state)
 
void AddRuntimeExecOption (const std::string &option)
 Appends option to 'runtime_exec_options_'. More...
 
void AddExprCtxToFree (ExprContext *ctx)
 
void AddExprCtxsToFree (const std::vector< ExprContext * > &ctxs)
 
void AddExprCtxsToFree (const SortExecExprs &sort_exec_exprs)
 

Static Protected Member Functions

static Status CreateNode (ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs, ExecNode **node)
 Create a single exec node derived from thrift node; place exec node in 'pool'. More...
 
static Status CreateTreeHelper (ObjectPool *pool, const std::vector< TPlanNode > &tnodes, const DescriptorTbl &descs, ExecNode *parent, int *node_idx, ExecNode **root)
 

Protected Attributes

int id_
 
TPlanNodeType::type type_
 
ObjectPoolpool_
 
std::vector< ExprContext * > conjunct_ctxs_
 
std::vector< ExecNode * > children_
 
RowDescriptor row_descriptor_
 
TExecNodePhase::type debug_phase_
 
TDebugAction::type debug_action_
 
int64_t limit_
 
int64_t num_rows_returned_
 
boost::scoped_ptr< RuntimeProfileruntime_profile_
 
RuntimeProfile::Counterrows_returned_counter_
 
RuntimeProfile::Counterrows_returned_rate_
 
boost::scoped_ptr< MemTrackermem_tracker_
 Account for peak memory used by this node. More...
 
boost::scoped_ptr< MemTrackerexpr_mem_tracker_
 MemTracker that should be used for ExprContexts. More...
 
boost::mutex exec_options_lock_
 
std::string runtime_exec_options_
 

Private Types

typedef Status(* ProcessRowBatchFn )(PartitionedAggregationNode *, RowBatch *, HashTableCtx *)
 

Private Member Functions

TupleConstructIntermediateTuple (const std::vector< impala_udf::FunctionContext * > &agg_fn_ctxs, MemPool *pool, BufferedTupleStream *stream)
 
void UpdateTuple (impala_udf::FunctionContext **agg_fn_ctxs, Tuple *tuple, TupleRow *row, bool is_merge=false)
 
TupleGetOutputTuple (const std::vector< impala_udf::FunctionContext * > &agg_fn_ctxs, Tuple *tuple, MemPool *pool)
 
Status ProcessBatchNoGrouping (RowBatch *batch, HashTableCtx *ht_ctx=NULL)
 
template<bool AGGREGATED_ROWS>
Status IR_ALWAYS_INLINE ProcessBatch (RowBatch *batch, HashTableCtx *ht_ctx)
 
template<bool AGGREGATED_ROWS>
Status ProcessStream (BufferedTupleStream *input_stream)
 Reads all the rows from input_stream and process them by calling ProcessBatch(). More...
 
Status CreateHashPartitions (int level)
 
int64_t LargestSpilledPartition () const
 
Status NextPartition ()
 
Status SpillPartition (Partition *curr_partition=NULL, Tuple *curr_intermediate_tuple=NULL)
 
Status MoveHashPartitions (int64_t input_rows)
 
void CleanupHashTbl (const std::vector< impala_udf::FunctionContext * > &fn_ctxs, HashTable::Iterator it)
 Calls finalizes on all tuples starting at 'it'. More...
 
llvm::Function * CodegenUpdateSlot (AggFnEvaluator *evaluator, SlotDescriptor *slot_desc)
 
llvm::Function * CodegenUpdateTuple ()
 Codegen UpdateTuple(). Returns NULL if codegen is unsuccessful. More...
 
llvm::Function * CodegenProcessBatch ()
 
Status ProcessBatch_false (RowBatch *batch, HashTableCtx *ht_ctx)
 
Status ProcessBatch_true (RowBatch *batch, HashTableCtx *ht_ctx)
 
int MinRequiredBuffers () const
 

Private Attributes

TupleId intermediate_tuple_id_
 Tuple into which Update()/Merge()/Serialize() results are stored. More...
 
TupleDescriptorintermediate_tuple_desc_
 
boost::scoped_ptr< RowDescriptorintermediate_row_desc_
 Row with the intermediate tuple as its only tuple. More...
 
TupleId output_tuple_id_
 
TupleDescriptoroutput_tuple_desc_
 
const bool needs_finalize_
 
bool needs_serialize_
 Contains any evaluators that require the serialize step. More...
 
std::vector< AggFnEvaluator * > aggregate_evaluators_
 
std::vector
< impala_udf::FunctionContext * > 
agg_fn_ctxs_
 
boost::scoped_ptr< MemPoolagg_fn_pool_
 
std::vector< ExprContext * > probe_expr_ctxs_
 Exprs used to evaluate input rows. More...
 
std::vector< ExprContext * > build_expr_ctxs_
 
bool contains_var_len_grouping_exprs_
 
RuntimeStatestate_
 
BufferedBlockMgr::Clientblock_mgr_client_
 
bool using_small_buffers_
 If true, the partitions in hash_partitions_ are using small buffers. More...
 
Tuplesingleton_output_tuple_
 
bool singleton_output_tuple_returned_
 
boost::scoped_ptr< MemPoolmem_pool_
 
boost::scoped_ptr< HashTableCtxht_ctx_
 
Partitionoutput_partition_
 
HashTable::Iterator output_iterator_
 
ProcessRowBatchFn process_row_batch_fn_
 Jitted ProcessRowBatch function pointer. Null if codegen is disabled. More...
 
RuntimeProfile::Counterbuild_timer_
 Time spent processing the child rows. More...
 
RuntimeProfile::Counterht_resize_timer_
 Total time spent resizing hash tables. More...
 
RuntimeProfile::Counterget_results_timer_
 Time spent returning the aggregated rows. More...
 
RuntimeProfile::Counternum_hash_buckets_
 Total number of hash buckets across all partitions. More...
 
RuntimeProfile::Counterpartitions_created_
 Total number of partitions created. More...
 
RuntimeProfile::HighWaterMarkCountermax_partition_level_
 Level of max partition (i.e. number of repartitioning steps). More...
 
RuntimeProfile::Counternum_row_repartitioned_
 Number of rows that have been repartitioned. More...
 
RuntimeProfile::Counternum_repartitions_
 Number of partitions that have been repartitioned. More...
 
RuntimeProfile::Counternum_spilled_partitions_
 Number of partitions that have been spilled. More...
 
RuntimeProfile::HighWaterMarkCounterlargest_partition_percent_
 
std::vector< Partition * > hash_partitions_
 Current partitions we are partitioning into. More...
 
std::list< Partition * > spilled_partitions_
 All partitions that have been spilled and need further processing. More...
 
std::list< Partition * > aggregated_partitions_
 
boost::scoped_ptr
< BufferedTupleStream
serialize_stream_
 

Static Private Attributes

static const int PARTITION_FANOUT = 16
 Number of initial partitions to create. Must be a power of 2. More...
 
static const int NUM_PARTITIONING_BITS = 4
 
static const int MAX_PARTITION_DEPTH = 16
 

Detailed Description

Node for doing partitioned hash aggregation. This node consumes the input (which can be from the child(0) or a spilled partition).

  1. Each row is hashed and we pick a dst partition (hash_partitions_).
  2. If the dst partition is not spilled, we probe into the partitions hash table to aggregate/insert the row.
  3. If the partition is already spilled, the input row is spilled.
  4. When all the input is consumed, we walk hash_partitions_, put the spilled ones into spilled_partitions_ and the non-spilled ones into aggregated_partitions_. aggregated_partitions_ contain partitions that are fully processed and the result can just be returned. Partitions in spilled_partitions_ need to be repartitioned and we just repeat these steps. Each partition contains these structures: 1) Hash Table for aggregated rows. This contains just the hash table directory structure but not the rows themselves. This is NULL for spilled partitions when we stop maintaining the hash table. 2) MemPool for var-len result data for rows in the hash table. If the aggregate function returns a string, we cannot append it to the tuple stream as that structure is immutable. Instead, when we need to spill, we sweep and copy the rows into a tuple stream. 3) Aggregated tuple stream for rows that are/were in the hash table. This stream contains rows that are aggregated. When the partition is not spilled, this stream is pinned and contains the memory referenced by the hash table. In the case where the aggregate function does not return a string (meaning the size of all the slots is known when the row is constructed), this stream contains all the memory for the result rows and the MemPool (2) is not used. 4) Unaggregated tuple stream. Stream to spill unaggregated rows. Rows in this stream always have child(0)'s layout. Buffering: Each stream and hash table needs to maintain at least one buffer for some duration of the processing. To minimize the memory requirements of small queries (memory usage is less than one buffer per partition), the initial streams and hash tables will use smaller (less than io-sized) buffers. Once we spill, the streams and hash table will use io-sized buffers only. TODO: Buffer rows before probing into the hash table? TODO: after spilling, we can still maintain a very small hash table just to remove some number of rows (from likely going to disk). TODO: consider allowing to spill the hash table structure in addition to the rows. TODO: do we want to insert a buffer before probing into the partition's hash table. TODO: use a prefetch/batched probe interface. TODO: return rows from the aggregated_row_stream rather than the HT. TODO: spill the HT as well TODO: think about spilling heuristic. TODO: when processing a spilled partition, we have a lot more information and can size the partitions/hash tables better.

Definition at line 91 of file partitioned-aggregation-node.h.

Member Typedef Documentation

typedef Status(* impala::PartitionedAggregationNode::ProcessRowBatchFn)(PartitionedAggregationNode *, RowBatch *, HashTableCtx *)
private

Definition at line 201 of file partitioned-aggregation-node.h.

Constructor & Destructor Documentation

impala::PartitionedAggregationNode::PartitionedAggregationNode ( ObjectPool pool,
const TPlanNode &  tnode,
const DescriptorTbl descs 
)

Definition at line 56 of file partitioned-aggregation-node.cc.

References NUM_PARTITIONING_BITS, and PARTITION_FANOUT.

Member Function Documentation

void impala::ExecNode::AddExprCtxsToFree ( const SortExecExprs sort_exec_exprs)
protectedinherited
void impala::ExecNode::AddExprCtxToFree ( ExprContext ctx)
inlineprotectedinherited

Add an ExprContext to have its local allocations freed by QueryMaintenance(). Exprs that are evaluated in the main execution thread should be added. Exprs evaluated in a separate thread are generally not safe to add, since a local allocation may be freed while it's being used. Rather than using this mechanism, threads should call FreeLocalAllocations() on local ExprContexts periodically.

Definition at line 276 of file exec-node.h.

References impala::ExecNode::expr_ctxs_to_free_.

Referenced by impala::AnalyticEvalNode::Prepare().

ExecNode* impala::ExecNode::child ( int  i)
inlineprotectedinherited

Definition at line 241 of file exec-node.h.

References impala::ExecNode::children_.

Referenced by impala::CrossJoinNode::BuildListDebugString(), impala::BlockingJoinNode::BuildSideThread(), impala::HashJoinNode::CodegenCreateOutputRow(), impala::PartitionedHashJoinNode::CodegenCreateOutputRow(), impala::CrossJoinNode::ConstructBuildSide(), impala::HashJoinNode::ConstructBuildSide(), impala::PartitionedHashJoinNode::ConstructBuildSide(), impala::BlockingJoinNode::GetLeftChildRowString(), impala::SelectNode::GetNext(), impala::UnionNode::GetNext(), impala::CrossJoinNode::GetNext(), impala::HashJoinNode::GetNext(), impala::AnalyticEvalNode::GetNextOutputBatch(), impala::PartitionedAggregationNode::Partition::InitStreams(), impala::HashJoinNode::LeftJoinGetNext(), impala::PartitionedHashJoinNode::NextProbeRowBatch(), impala::SelectNode::Open(), impala::SortNode::Open(), impala::TopNNode::Open(), impala::BlockingJoinNode::Open(), impala::AggregationNode::Open(), impala::AnalyticEvalNode::Open(), Open(), impala::UnionNode::OpenCurrentChild(), impala::SelectNode::Prepare(), impala::SortNode::Prepare(), impala::UnionNode::Prepare(), impala::TopNNode::Prepare(), impala::BlockingJoinNode::Prepare(), impala::HashJoinNode::Prepare(), impala::AggregationNode::Prepare(), impala::AnalyticEvalNode::Prepare(), impala::PartitionedHashJoinNode::Prepare(), Prepare(), impala::PartitionedHashJoinNode::ProcessBuildInput(), impala::AnalyticEvalNode::ProcessChildBatches(), and impala::SortNode::SortInput().

void impala::PartitionedAggregationNode::Close ( RuntimeState state)
virtual

Close() will get called for every exec node, regardless of what else is called and the status of these calls (i.e. Prepare() may never have been called, or Prepare()/Open()/GetNext() returned with an error). Close() releases all resources that were allocated in Open()/GetNext(), even if the latter ended with an error. Close() can be called if the node has been prepared or the node is closed. The default implementation updates runtime profile counters and calls Close() on the children. Subclasses should check if the node has already been closed (is_closed()), then close themselves, then call the base Close(). Nodes that are using tuples returned by a child may call Close() on their children before their own Close() if the child node has returned eos. It is only safe to call Close() on the child node while the parent node is still returning rows if the parent node fully materializes the child's input.

Reimplemented from impala::ExecNode.

Definition at line 380 of file partitioned-aggregation-node.cc.

References impala::PartitionedAggregationNode::Partition::agg_fn_ctxs, agg_fn_ctxs_, agg_fn_pool_, aggregate_evaluators_, aggregated_partitions_, impala::RuntimeState::block_mgr(), block_mgr_client_, build_expr_ctxs_, CleanupHashTbl(), impala::BufferedBlockMgr::ClearReservations(), impala::ExecNode::Close(), impala::Expr::Close(), impala::PartitionedAggregationNode::Partition::Close(), GetOutputTuple(), hash_partitions_, ht_ctx_, impala::ExecNode::is_closed(), mem_pool_, output_iterator_, output_partition_, probe_expr_ctxs_, serialize_stream_, singleton_output_tuple_, singleton_output_tuple_returned_, and spilled_partitions_.

Function * impala::PartitionedAggregationNode::CodegenProcessBatch ( )
private

Codegen the process row batch loop. The loop has already been compiled to IR and loaded into the codegen object. UpdateAggTuple has also been codegen'd to IR. This function will modify the loop subsituting the statically compiled functions with codegen'd ones. Assumes AGGREGATED_ROWS = false.

Definition at line 1356 of file partitioned-aggregation-node.cc.

References impala::LlvmCodeGen::codegen_timer(), CodegenUpdateTuple(), impala::RuntimeState::GetCodegen(), impala::LlvmCodeGen::GetFunction(), ht_ctx_, impala::Status::ok(), impala::LlvmCodeGen::OptimizeFunctionWithExprs(), probe_expr_ctxs_, impala::LlvmCodeGen::ReplaceCallSites(), SCOPED_TIMER, and state_.

Referenced by Prepare().

llvm::Function * impala::PartitionedAggregationNode::CodegenUpdateSlot ( AggFnEvaluator evaluator,
SlotDescriptor slot_desc 
)
private

Codegen UpdateSlot(). Returns NULL if codegen is unsuccessful. Assumes is_merge = false;

Definition at line 1055 of file partitioned-aggregation-node.cc.

References impala::LlvmCodeGen::FnPrototype::AddArgument(), impala::AggFnEvaluator::agg_op(), impala::AggFnEvaluator::AVG, impala::SlotDescriptor::CodegenUpdateNull(), impala::AggFnEvaluator::COUNT, impala::CodegenAnyVal::CreateCallWrapped(), impala::SlotDescriptor::field_idx(), impala::TupleDescriptor::GenerateLlvmStruct(), impala::RuntimeState::GetCodegen(), impala::Expr::GetCodegendComputeFn(), impala::Status::GetDetail(), impala::CodegenAnyVal::GetIsNull(), impala::CodegenAnyVal::GetNonNullVal(), impala::CodegenAnyVal::GetUnloweredPtrType(), impala::CodegenAnyVal::GetVal(), impala::AggFnEvaluator::input_expr_ctxs(), intermediate_tuple_desc_, impala::AggFnEvaluator::intermediate_type(), impala::SlotDescriptor::is_materialized(), impala::AggFnEvaluator::is_merge(), impala::SlotDescriptor::is_nullable(), impala::TupleRow::LLVM_CLASS_NAME, impala::ExprContext::LLVM_CLASS_NAME, impala::FunctionContextImpl::LLVM_FUNCTIONCONTEXT_NAME, impala::AggFnEvaluator::MAX, impala::AggFnEvaluator::merge_symbol(), impala::AggFnEvaluator::MIN, impala::AggFnEvaluator::NDV, impala::Status::ok(), impala::CodegenAnyVal::SetFromRawValue(), state_, impala::AggFnEvaluator::SUM, impala::CodegenAnyVal::ToNativeValue(), impala::ColumnType::type, impala::SlotDescriptor::type(), impala::Expr::type(), impala::TYPE_BIGINT, impala::TYPE_DECIMAL, impala::TYPE_DOUBLE, impala::TYPE_FLOAT, impala::TYPE_TIMESTAMP, impala::AggFnEvaluator::update_symbol(), impala::CodegenAnyVal::value(), and VLOG_QUERY.

Referenced by CodegenUpdateTuple().

void impala::ExecNode::CollectNodes ( TPlanNodeType::type  node_type,
std::vector< ExecNode * > *  nodes 
)
inherited

Collect all nodes of given 'node_type' that are part of this subtree, and return in 'nodes'.

Definition at line 359 of file exec-node.cc.

References impala::ExecNode::children_, and impala::ExecNode::type_.

Referenced by impala::ExecNode::CollectScanNodes(), and impala::PlanFragmentExecutor::Prepare().

void impala::ExecNode::CollectScanNodes ( std::vector< ExecNode * > *  nodes)
inherited

Collect all scan node types.

Definition at line 366 of file exec-node.cc.

References impala::ExecNode::CollectNodes().

Referenced by impala::PlanFragmentExecutor::Prepare().

Tuple * impala::PartitionedAggregationNode::ConstructIntermediateTuple ( const std::vector< impala_udf::FunctionContext * > &  agg_fn_ctxs,
MemPool pool,
BufferedTupleStream stream 
)
private

Allocates a new allocated aggregation intermediate tuple. Initialized to grouping values computed over 'current_row_' using 'agg_fn_ctxs'. Aggregation expr slots are set to their initial values. Pool/Stream specify where the memory (tuple and var len slots) should be allocated from. Only one can be set. Returns NULL if there was not enough memory to allocate the tuple.

Definition at line 591 of file partitioned-aggregation-node.cc.

References impala::AggFnEvaluator::agg_op(), aggregate_evaluators_, impala::BufferedTupleStream::AllocateRow(), impala::TupleDescriptor::byte_size(), contains_var_len_grouping_exprs_, impala::Tuple::Create(), impala::Tuple::GetSlot(), ht_ctx_, impala::Tuple::Init(), impala::AggFnEvaluator::Init(), intermediate_tuple_desc_, impala::StringValue::len, impala::AggFnEvaluator::MAX, impala::AggFnEvaluator::MIN, pool, probe_expr_ctxs_, impala::Tuple::SetNull(), impala::ExprValue::SetToMax(), impala::ExprValue::SetToMin(), impala::TupleDescriptor::slots(), impala::TYPE_CHAR, impala::TYPE_DECIMAL, impala::TYPE_STRING, impala::TYPE_TIMESTAMP, impala::TYPE_VARCHAR, and impala::RawValue::Write().

Referenced by Prepare(), and ProcessBatch().

Status impala::ExecNode::CreateNode ( ObjectPool pool,
const TPlanNode &  tnode,
const DescriptorTbl descs,
ExecNode **  node 
)
staticprotectedinherited

Create a single exec node derived from thrift node; place exec node in 'pool'.

Definition at line 260 of file exec-node.cc.

References impala::ObjectPool::Add(), impala::Status::OK, and RETURN_IF_ERROR.

Referenced by impala::ExecNode::CreateTreeHelper().

Status impala::ExecNode::CreateTree ( ObjectPool pool,
const TPlan &  plan,
const DescriptorTbl descs,
ExecNode **  root 
)
staticinherited

Creates exec node tree from list of nodes contained in plan via depth-first traversal. All nodes are placed in pool. Returns error if 'plan' is corrupted, otherwise success.

Definition at line 199 of file exec-node.cc.

References impala::ExecNode::CreateTreeHelper(), impala::Status::OK, and impala::Status::ok().

Referenced by impala::PlanFragmentExecutor::Prepare().

Status impala::ExecNode::CreateTreeHelper ( ObjectPool pool,
const std::vector< TPlanNode > &  tnodes,
const DescriptorTbl descs,
ExecNode parent,
int *  node_idx,
ExecNode **  root 
)
staticprotectedinherited
void impala::PartitionedAggregationNode::DebugString ( int  indentation_level,
std::stringstream *  out 
) const
protectedvirtual

Recursive helper method for generating a string for DebugString(). Implementations should call DebugString(int, std::stringstream) on their children. Input parameters: indentation_level: Current level in plan tree. Output parameters: out: Stream to accumulate debug string.

Reimplemented from impala::ExecNode.

Definition at line 716 of file partitioned-aggregation-node.cc.

References aggregate_evaluators_, impala::AggFnEvaluator::DebugString(), impala::ExecNode::DebugString(), impala::Expr::DebugString(), intermediate_tuple_id_, needs_finalize_, output_tuple_id_, and probe_expr_ctxs_.

string impala::ExecNode::DebugString ( ) const
inherited
Status impala::PartitionedAggregationNode::GetNext ( RuntimeState state,
RowBatch row_batch,
bool eos 
)
virtual

Retrieves rows and returns them via row_batch. Sets eos to true if subsequent calls will not retrieve any more rows. Data referenced by any tuples returned in row_batch must not be overwritten by the callee until Close() is called. The memory holding that data can be returned via row_batch's tuple_data_pool (in which case it may be deleted by the caller) or held on to by the callee. The row_batch, including its tuple_data_pool, will be destroyed by the caller at some point prior to the final Close() call. In other words, if the memory holding the tuple data will be referenced by the callee in subsequent GetNext() calls, it must not be attached to the row_batch's tuple_data_pool. Caller must not be holding any io buffers. This will cause deadlock. TODO: AggregationNode and HashJoinNode cannot be "re-opened" yet.

Implements impala::ExecNode.

Definition at line 271 of file partitioned-aggregation-node.cc.

References impala::RowBatch::AddRow(), impala::PartitionedAggregationNode::Partition::agg_fn_ctxs, agg_fn_ctxs_, aggregated_partitions_, impala::RowBatch::AtCapacity(), impala::HashTable::Iterator::AtEnd(), impala::RuntimeState::batch_size(), impala::PartitionedAggregationNode::Partition::Close(), impala::RowBatch::CommitLastRow(), impala::ExecNode::conjunct_ctxs_, count, COUNTER_SET, impala::ExecNode::EvalConjuncts(), impala::ExecNode::ExecDebugAction(), get_results_timer_, GetOutputTuple(), impala::RowBatch::GetRow(), impala::HashTable::Iterator::GetTuple(), impala::RowBatch::MarkNeedToReturn(), impala::HashTable::Iterator::Next(), NextPartition(), impala::BitUtil::NextPowerOfTwo(), impala::ExecNode::num_rows_returned_, impala::Status::OK, output_iterator_, output_partition_, probe_expr_ctxs_, QueryMaintenance(), impala::ExecNode::ReachedLimit(), RETURN_IF_CANCELLED, RETURN_IF_ERROR, impala::ExecNode::rows_returned_counter_, impala::ExecNode::runtime_profile_, SCOPED_TIMER, impala::TupleRow::SetTuple(), singleton_output_tuple_, singleton_output_tuple_returned_, spilled_partitions_, and impala::RowBatch::tuple_data_pool().

Referenced by Open().

int impala::ExecNode::GetNodeIdFromProfile ( RuntimeProfile p)
staticinherited

Extract node id from p->name().

Definition at line 62 of file exec-node.cc.

References impala::RuntimeProfile::metadata().

Tuple * impala::PartitionedAggregationNode::GetOutputTuple ( const std::vector< impala_udf::FunctionContext * > &  agg_fn_ctxs,
Tuple tuple,
MemPool pool 
)
private

Called on the intermediate tuple of each group after all input rows have been consumed and aggregated. Computes the final aggregate values to be returned in GetNext() using the agg fn evaluators' Serialize() or Finalize(). For the Finalize() case if the output tuple is different from the intermediate tuple, then a new tuple is allocated from 'pool' to hold the final result. Grouping values are copied into the output tuple and the the output tuple holding the finalized/serialized aggregate values is returned. TODO: Coordinate the allocation of new tuples with the release of memory so as not to make memory consumption blow up.

Definition at line 688 of file partitioned-aggregation-node.cc.

References aggregate_evaluators_, impala::TupleDescriptor::byte_size(), impala::Tuple::Create(), impala::AggFnEvaluator::Finalize(), impala::Tuple::GetSlot(), intermediate_tuple_desc_, intermediate_tuple_id_, impala::Tuple::IsNull(), needs_finalize_, impala::SlotDescriptor::null_indicator_offset(), output_tuple_desc_, output_tuple_id_, pool, probe_expr_ctxs_, impala::AggFnEvaluator::Serialize(), impala::TupleDescriptor::slots(), impala::SlotDescriptor::tuple_offset(), and impala::RawValue::Write().

Referenced by Close(), and GetNext().

Status impala::PartitionedAggregationNode::Init ( const TPlanNode &  tnode)
virtual

Initializes this object from the thrift tnode desc. The subclass should do any initialization that can fail in Init() rather than the ctor. If overridden in subclass, must first call superclass's Init().

Reimplemented from impala::ExecNode.

Definition at line 82 of file partitioned-aggregation-node.cc.

References aggregate_evaluators_, impala::AggFnEvaluator::Create(), impala::Expr::CreateExprTrees(), impala::ExecNode::Init(), impala::Status::OK, impala::ExecNode::pool_, probe_expr_ctxs_, and RETURN_IF_ERROR.

void impala::ExecNode::InitRuntimeProfile ( const std::string &  name)
protectedinherited
virtual bool impala::ExecNode::IsScanNode ( ) const
inlineprotectedvirtualinherited

Reimplemented in impala::ScanNode.

Definition at line 251 of file exec-node.h.

int64_t impala::PartitionedAggregationNode::LargestSpilledPartition ( ) const
private

Iterates over all the partitions in hash_partitions_ and returns the number of rows of the largest spilled partition (in terms of number of aggregated and unaggregated rows).

Definition at line 758 of file partitioned-aggregation-node.cc.

References impala::PartitionedAggregationNode::Partition::aggregated_row_stream, hash_partitions_, impala::PartitionedAggregationNode::Partition::is_spilled(), and impala::PartitionedAggregationNode::Partition::unaggregated_row_stream.

Referenced by NextPartition().

int64_t impala::ExecNode::limit ( ) const
inlineinherited
int impala::PartitionedAggregationNode::MinRequiredBuffers ( ) const
inlineprivate

We need two buffers per partition, one for the aggregated stream and one for the unaggregated stream. We need an additional buffer to read the stream we are currently repartitioning. If we need to serialize, we need an additional buffer while spilling a partition as the partitions aggregate stream needs to be serialized and rewritten.

Definition at line 418 of file partitioned-aggregation-node.h.

References needs_serialize_, and PARTITION_FANOUT.

Referenced by Prepare().

Status impala::PartitionedAggregationNode::MoveHashPartitions ( int64_t  input_rows)
private

Moves the partitions in hash_partitions_ to aggregated_partitions_ or spilled_partitions_. Partitions moved to spilled_partitions_ are unpinned. input_rows is the number of rows the number of input rows that have been repartitioned. Used for diagnostics.

Definition at line 922 of file partitioned-aggregation-node.cc.

References aggregated_partitions_, impala::PartitionedAggregationNode::Partition::aggregated_row_stream, impala::PartitionedAggregationNode::Partition::Close(), COUNTER_SET, hash_partitions_, impala::PartitionedAggregationNode::Partition::hash_tbl, impala::ExecNode::id(), impala::PartitionedAggregationNode::Partition::is_spilled(), largest_partition_percent_, impala::Status::OK, RETURN_IF_ERROR, spilled_partitions_, and impala::PartitionedAggregationNode::Partition::unaggregated_row_stream.

Referenced by NextPartition(), and Open().

Status impala::PartitionedAggregationNode::Prepare ( RuntimeState state)
virtual

Sets up internal structures, etc., without doing any actual work. Must be called prior to Open(). Will only be called once in this node's lifetime. All code generation (adding functions to the LlvmCodeGen object) must happen in Prepare(). Retrieving the jit compiled function pointer must happen in Open(). If overridden in subclass, must first call superclass's Prepare().

Reimplemented from impala::ExecNode.

Definition at line 95 of file partitioned-aggregation-node.cc.

References impala::ObjectPool::Add(), ADD_COUNTER, ADD_TIMER, impala::ExecNode::AddExprCtxsToFree(), impala::LlvmCodeGen::AddFunctionToJit(), impala::RuntimeProfile::AddHighWaterMarkCounter(), impala::ExecNode::AddRuntimeExecOption(), agg_fn_ctxs_, agg_fn_pool_, aggregate_evaluators_, impala::RuntimeState::block_mgr(), block_mgr_client_, build_expr_ctxs_, build_timer_, impala::ExecNode::child(), impala::RuntimeState::codegen_enabled(), CodegenProcessBatch(), ConstructIntermediateTuple(), contains_var_len_grouping_exprs_, CreateHashPartitions(), impala::RuntimeState::desc_tbl(), impala::ExecNode::expr_mem_tracker(), impala::RuntimeState::fragment_hash_seed(), get_results_timer_, impala::RuntimeState::GetCodegen(), impala::DescriptorTbl::GetTupleDescriptor(), ht_ctx_, ht_resize_timer_, intermediate_row_desc_, intermediate_tuple_desc_, intermediate_tuple_id_, largest_partition_percent_, MAX_PARTITION_DEPTH, max_partition_level_, mem_pool_, impala::ExecNode::mem_tracker(), MinRequiredBuffers(), needs_serialize_, num_hash_buckets_, num_repartitions_, num_row_repartitioned_, num_spilled_partitions_, impala::RuntimeState::obj_pool(), impala::Status::OK, output_tuple_desc_, output_tuple_id_, partitions_created_, impala::ExecNode::Prepare(), impala::Expr::Prepare(), probe_expr_ctxs_, process_row_batch_fn_, impala::BufferedBlockMgr::RegisterClient(), RETURN_IF_ERROR, impala::ExecNode::row_desc(), impala::ExecNode::runtime_profile(), impala::ExecNode::runtime_profile_, SCOPED_TIMER, singleton_output_tuple_, singleton_output_tuple_returned_, impala::TupleDescriptor::slots(), state_, impala::ColumnType::type, impala::SlotDescriptor::type(), impala::TYPE_BOOLEAN, and impala::TYPE_NULL.

template<bool AGGREGATED_ROWS>
Status IR_ALWAYS_INLINE impala::PartitionedAggregationNode::ProcessBatch ( RowBatch batch,
HashTableCtx ht_ctx 
)
private

Processes a batch of rows. This is the core function of the algorithm. We partition the rows into hash_partitions_, spilling as necessary. If AGGREGATED_ROWS is true, it means that the rows in the batch are already pre-aggregated. This function is replaced by codegen. It's inlined into ProcessBatch_true/false in the IR module. We pass in ht_ctx_.get() as an argument for performance.

Status PartitionedAggregationNode::ProcessBatch_false ( RowBatch batch,
HashTableCtx ht_ctx 
)
private

Functions to instantiate templated versions of ProcessBatch(). The xcompiled versions of these functions are used in CodegenProcessBatch(). TODO: is there a better way to do this?

Definition at line 155 of file partitioned-aggregation-node-ir.cc.

Status PartitionedAggregationNode::ProcessBatch_true ( RowBatch batch,
HashTableCtx ht_ctx 
)
private

Definition at line 160 of file partitioned-aggregation-node-ir.cc.

Status PartitionedAggregationNode::ProcessBatchNoGrouping ( RowBatch batch,
HashTableCtx ht_ctx = NULL 
)
private

Do the aggregation for all tuple rows in the batch when there is no grouping. The HashTableCtx argument is unused, but included so the signature matches that of ProcessBatch() for codegen. This function is replaced by codegen.

Definition at line 24 of file partitioned-aggregation-node-ir.cc.

References agg_fn_ctxs_, impala::RowBatch::GetRow(), impala::RowBatch::num_rows(), impala::Status::OK, singleton_output_tuple_, and UpdateTuple().

Referenced by Open().

Status impala::PartitionedAggregationNode::QueryMaintenance ( RuntimeState state)
protectedvirtual

Frees local allocations from aggregate_evaluators_ and agg_fn_ctxs.

Reimplemented from impala::ExecNode.

Definition at line 968 of file partitioned-aggregation-node.cc.

References agg_fn_ctxs_, aggregate_evaluators_, impala::ExprContext::FreeLocalAllocations(), hash_partitions_, and impala::ExecNode::QueryMaintenance().

Referenced by GetNext(), and Open().

Status impala::PartitionedAggregationNode::Reset ( RuntimeState state)
virtual

Resets all data-specific state, returning this node to the state it was in after calling Prepare() and before calling Open(). Prepare() must have already been called before calling Reset(). Open() and GetNext() may have optionally been called. Close() must not have been called. If overridden in a subclass, must call superclass's Reset() at the end. The default implementation calls Reset() on children. Note that this function may be called many times, so should be fast. For example, accumulated memory does not need to be freed on every call if it's expensive.

Reimplemented from impala::ExecNode.

Definition at line 375 of file partitioned-aggregation-node.cc.

void impala::ExecNode::SetDebugOptions ( int  node_id,
TExecNodePhase::type  phase,
TDebugAction::type  action,
ExecNode tree 
)
staticinherited

Set debug action for node with given id in 'tree'.

Definition at line 332 of file exec-node.cc.

References impala::ExecNode::children_, impala::ExecNode::debug_action_, impala::ExecNode::debug_phase_, and impala::ExecNode::id_.

Referenced by impala::PlanFragmentExecutor::Prepare().

Status impala::PartitionedAggregationNode::SpillPartition ( Partition curr_partition = NULL,
Tuple curr_intermediate_tuple = NULL 
)
private

Picks a partition from hash_partitions_ to spill. If curr_partition and curr_intermediate_tuple are non-NULL, it means if curr_partition is spilled, curr_intermediate_tuple must also be cleaned up. curr_intermediate_tuple has been added to curr_partition's aggregated_row_stream but not the hash table.

Definition at line 872 of file partitioned-aggregation-node.cc.

References impala::Status::AddDetail(), impala::RuntimeState::block_mgr(), block_mgr_client_, hash_partitions_, impala::ExecNode::is_closed(), impala::Status::MEM_LIMIT_EXCEEDED, impala::BufferedBlockMgr::MemLimitTooLowError(), impala::Status::OK, RETURN_IF_ERROR, impala::PartitionedAggregationNode::Partition::Spill(), state_, and using_small_buffers_.

Referenced by ProcessBatch(), and ProcessStream().

TPlanNodeType::type impala::ExecNode::type ( ) const
inlineinherited
void impala::PartitionedAggregationNode::UpdateTuple ( impala_udf::FunctionContext **  agg_fn_ctxs,
Tuple tuple,
TupleRow row,
bool  is_merge = false 
)
private

Updates the given aggregation intermediate tuple with aggregation values computed over 'row' using 'agg_fn_ctxs'. Whether the agg fn evaluator calls Update() or Merge() is controlled by the evaluator itself, unless enforced explicitly by passing in is_merge == true. The override is needed to merge spilled and non-spilled rows belonging to the same partition independent of whether the agg fn evaluators have is_merge() == true. This function is replaced by codegen (which is why we don't use a vector argument for agg_fn_ctxs).

Definition at line 676 of file partitioned-aggregation-node.cc.

References aggregate_evaluators_, and impala::TupleRow::GetTuple().

Referenced by ProcessBatch(), and ProcessBatchNoGrouping().

Member Data Documentation

std::vector<impala_udf::FunctionContext*> impala::PartitionedAggregationNode::agg_fn_ctxs_
private

FunctionContext for each aggregate function and backing MemPool. String data returned by the aggregate functions is allocated via these contexts. These contexts are only passed to the evaluators in the non-partitioned (non-grouping) case. Otherwise they are only used to clone FunctionContexts for the partitions. TODO: we really need to plumb through CHAR(N) for intermediate types.

Definition at line 161 of file partitioned-aggregation-node.h.

Referenced by CleanupHashTbl(), Close(), GetNext(), impala::PartitionedAggregationNode::Partition::InitStreams(), Open(), Prepare(), ProcessBatchNoGrouping(), and QueryMaintenance().

boost::scoped_ptr<MemPool> impala::PartitionedAggregationNode::agg_fn_pool_
private

Definition at line 162 of file partitioned-aggregation-node.h.

Referenced by Close(), and Prepare().

std::vector<AggFnEvaluator*> impala::PartitionedAggregationNode::aggregate_evaluators_
private
std::list<Partition*> impala::PartitionedAggregationNode::aggregated_partitions_
private

All partitions that are aggregated and can just return the results in GetNext(). After consuming all the input, hash_partitions_ is split into spilled_partitions_ or aggregated_partitions_, depending on if it was spilled or not.

Definition at line 301 of file partitioned-aggregation-node.h.

Referenced by Close(), GetNext(), MoveHashPartitions(), and NextPartition().

std::vector<ExprContext*> impala::PartitionedAggregationNode::build_expr_ctxs_
private

Exprs used to insert constructed aggregation tuple into the hash table. All the exprs are simply SlotRefs for the intermediate tuple.

Definition at line 169 of file partitioned-aggregation-node.h.

Referenced by Close(), Open(), and Prepare().

RuntimeProfile::Counter* impala::PartitionedAggregationNode::build_timer_
private

Time spent processing the child rows.

Definition at line 207 of file partitioned-aggregation-node.h.

Referenced by Open(), and Prepare().

bool impala::PartitionedAggregationNode::contains_var_len_grouping_exprs_
private

True if the resulting tuple contains var-len agg/grouping expressions. This means we need to do more work when allocating and spilling these rows.

Definition at line 173 of file partitioned-aggregation-node.h.

Referenced by ConstructIntermediateTuple(), and Prepare().

TDebugAction::type impala::ExecNode::debug_action_
protectedinherited
TExecNodePhase::type impala::ExecNode::debug_phase_
protectedinherited

debug-only: if debug_action_ is not INVALID, node will perform action in debug_phase_

Definition at line 219 of file exec-node.h.

Referenced by impala::ExecNode::ExecDebugAction(), and impala::ExecNode::SetDebugOptions().

boost::mutex impala::ExecNode::exec_options_lock_
protectedinherited

Execution options that are determined at runtime. This is added to the runtime profile at Close(). Examples for options logged here would be "Codegen Enabled"

Definition at line 238 of file exec-node.h.

Referenced by impala::ExecNode::AddRuntimeExecOption().

boost::scoped_ptr<MemTracker> impala::ExecNode::expr_mem_tracker_
protectedinherited

MemTracker that should be used for ExprContexts.

Definition at line 233 of file exec-node.h.

Referenced by impala::ExecNode::expr_mem_tracker(), and impala::ExecNode::Prepare().

RuntimeProfile::Counter* impala::PartitionedAggregationNode::get_results_timer_
private

Time spent returning the aggregated rows.

Definition at line 213 of file partitioned-aggregation-node.h.

Referenced by GetNext(), and Prepare().

std::vector<Partition*> impala::PartitionedAggregationNode::hash_partitions_
private

Current partitions we are partitioning into.

Definition at line 293 of file partitioned-aggregation-node.h.

Referenced by Close(), CreateHashPartitions(), LargestSpilledPartition(), MoveHashPartitions(), ProcessBatch(), QueryMaintenance(), and SpillPartition().

boost::scoped_ptr<HashTableCtx> impala::PartitionedAggregationNode::ht_ctx_
private

Used for hash-related functionality, such as evaluating rows and calculating hashes. TODO: If we want to multi-thread then this context should be thread-local and not associated with the node.

Definition at line 194 of file partitioned-aggregation-node.h.

Referenced by Close(), CodegenProcessBatch(), ConstructIntermediateTuple(), CreateHashPartitions(), NextPartition(), Open(), Prepare(), and ProcessStream().

RuntimeProfile::Counter* impala::PartitionedAggregationNode::ht_resize_timer_
private

Total time spent resizing hash tables.

Definition at line 210 of file partitioned-aggregation-node.h.

Referenced by Prepare(), and ProcessBatch().

boost::scoped_ptr<RowDescriptor> impala::PartitionedAggregationNode::intermediate_row_desc_
private

Row with the intermediate tuple as its only tuple.

Definition at line 137 of file partitioned-aggregation-node.h.

Referenced by impala::PartitionedAggregationNode::Partition::InitStreams(), Open(), Prepare(), and ProcessStream().

TupleDescriptor* impala::PartitionedAggregationNode::intermediate_tuple_desc_
private
TupleId impala::PartitionedAggregationNode::intermediate_tuple_id_
private

Tuple into which Update()/Merge()/Serialize() results are stored.

Definition at line 133 of file partitioned-aggregation-node.h.

Referenced by DebugString(), GetOutputTuple(), and Prepare().

RuntimeProfile::HighWaterMarkCounter* impala::PartitionedAggregationNode::largest_partition_percent_
private

The largest fraction after repartitioning. This is expected to be 1 / PARTITION_FANOUT. A value much larger indicates skew.

Definition at line 235 of file partitioned-aggregation-node.h.

Referenced by MoveHashPartitions(), and Prepare().

const char * impala::PartitionedAggregationNode::LLVM_CLASS_NAME
static
Initial value:
=
"class.impala::PartitionedAggregationNode"

Definition at line 103 of file partitioned-aggregation-node.h.

Referenced by CodegenUpdateTuple().

const int impala::PartitionedAggregationNode::MAX_PARTITION_DEPTH = 16
staticprivate

Maximum number of times we will repartition. The maximum build table we can process (if we have enough scratch disk space) in case there is no skew is: MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH). In the case where there is skew, repartitioning is unlikely to help (assuming a reasonable hash function). Note that we need to have at least as many SEED_PRIMES in HashTableCtx. TODO: we can revisit and try harder to explicitly detect skew.

Definition at line 130 of file partitioned-aggregation-node.h.

Referenced by CreateHashPartitions(), and Prepare().

RuntimeProfile::HighWaterMarkCounter* impala::PartitionedAggregationNode::max_partition_level_
private

Level of max partition (i.e. number of repartitioning steps).

Definition at line 222 of file partitioned-aggregation-node.h.

Referenced by CreateHashPartitions(), and Prepare().

boost::scoped_ptr<MemPool> impala::PartitionedAggregationNode::mem_pool_
private

MemPool used to allocate memory for when we don't have grouping and don't initialize the partitioning structures, or during Close() when creating new output tuples.

Definition at line 189 of file partitioned-aggregation-node.h.

Referenced by CleanupHashTbl(), Close(), and Prepare().

boost::scoped_ptr<MemTracker> impala::ExecNode::mem_tracker_
protectedinherited

Account for peak memory used by this node.

Definition at line 230 of file exec-node.h.

Referenced by impala::ExecNode::mem_tracker(), and impala::ExecNode::Prepare().

const bool impala::PartitionedAggregationNode::needs_finalize_
private

Certain aggregates require a finalize step, which is the final step of the aggregate after consuming all input rows. The finalize step converts the aggregate value into its final form. This is true if this node contains aggregate that requires a finalize step.

Definition at line 148 of file partitioned-aggregation-node.h.

Referenced by CleanupHashTbl(), DebugString(), and GetOutputTuple().

bool impala::PartitionedAggregationNode::needs_serialize_
private

Contains any evaluators that require the serialize step.

Definition at line 151 of file partitioned-aggregation-node.h.

Referenced by CleanupHashTbl(), MinRequiredBuffers(), NextPartition(), Open(), Prepare(), and ProcessBatch().

RuntimeProfile::Counter* impala::PartitionedAggregationNode::num_hash_buckets_
private

Total number of hash buckets across all partitions.

Definition at line 216 of file partitioned-aggregation-node.h.

Referenced by NextPartition(), and Prepare().

const int impala::PartitionedAggregationNode::NUM_PARTITIONING_BITS = 4
staticprivate

Needs to be the log(PARTITION_FANOUT). We use the upper bits to pick the partition and lower bits in the HT. TODO: different hash functions here too? We don't need that many bits to pick the partition so this might be okay.

Definition at line 121 of file partitioned-aggregation-node.h.

Referenced by impala::PartitionedAggregationNode::Partition::InitHashTable(), PartitionedAggregationNode(), and ProcessBatch().

RuntimeProfile::Counter* impala::PartitionedAggregationNode::num_repartitions_
private

Number of partitions that have been repartitioned.

Definition at line 228 of file partitioned-aggregation-node.h.

Referenced by NextPartition(), and Prepare().

RuntimeProfile::Counter* impala::PartitionedAggregationNode::num_row_repartitioned_
private

Number of rows that have been repartitioned.

Definition at line 225 of file partitioned-aggregation-node.h.

Referenced by NextPartition(), and Prepare().

RuntimeProfile::Counter* impala::PartitionedAggregationNode::num_spilled_partitions_
private

Number of partitions that have been spilled.

Definition at line 231 of file partitioned-aggregation-node.h.

Referenced by Prepare().

HashTable::Iterator impala::PartitionedAggregationNode::output_iterator_
private

Definition at line 199 of file partitioned-aggregation-node.h.

Referenced by Close(), GetNext(), and NextPartition().

Partition* impala::PartitionedAggregationNode::output_partition_
private

The current partition and iterator to the next row in its hash table that we need to return in GetNext()

Definition at line 198 of file partitioned-aggregation-node.h.

Referenced by Close(), GetNext(), and NextPartition().

TupleDescriptor* impala::PartitionedAggregationNode::output_tuple_desc_
private

Definition at line 142 of file partitioned-aggregation-node.h.

Referenced by CleanupHashTbl(), GetOutputTuple(), and Prepare().

TupleId impala::PartitionedAggregationNode::output_tuple_id_
private

Tuple into which Finalize() results are stored. Possibly the same as the intermediate tuple.

Definition at line 141 of file partitioned-aggregation-node.h.

Referenced by DebugString(), GetOutputTuple(), and Prepare().

const int impala::PartitionedAggregationNode::PARTITION_FANOUT = 16
staticprivate

Number of initial partitions to create. Must be a power of 2.

Definition at line 115 of file partitioned-aggregation-node.h.

Referenced by CreateHashPartitions(), MinRequiredBuffers(), PartitionedAggregationNode(), and ProcessBatch().

RuntimeProfile::Counter* impala::PartitionedAggregationNode::partitions_created_
private

Total number of partitions created.

Definition at line 219 of file partitioned-aggregation-node.h.

Referenced by CreateHashPartitions(), and Prepare().

std::vector<ExprContext*> impala::PartitionedAggregationNode::probe_expr_ctxs_
private
ProcessRowBatchFn impala::PartitionedAggregationNode::process_row_batch_fn_
private

Jitted ProcessRowBatch function pointer. Null if codegen is disabled.

Definition at line 204 of file partitioned-aggregation-node.h.

Referenced by Open(), and Prepare().

RowDescriptor impala::ExecNode::row_descriptor_
protectedinherited
const string impala::ExecNode::ROW_THROUGHPUT_COUNTER = "RowsReturnedRate"
staticinherited

Names of counters shared by all exec nodes.

Definition at line 169 of file exec-node.h.

Referenced by impala::ExecNode::Prepare().

RuntimeProfile::Counter* impala::ExecNode::rows_returned_rate_
protectedinherited

Definition at line 227 of file exec-node.h.

Referenced by impala::ExecNode::Prepare().

std::string impala::ExecNode::runtime_exec_options_
protectedinherited

Definition at line 239 of file exec-node.h.

Referenced by impala::ExecNode::AddRuntimeExecOption().

boost::scoped_ptr<BufferedTupleStream> impala::PartitionedAggregationNode::serialize_stream_
private

Stream used to store serialized spilled rows. Only used if needs_serialize_ is set. This stream is never pinned and only used in Partition::Spill as a a temporary buffer.

Definition at line 306 of file partitioned-aggregation-node.h.

Referenced by Close(), and Open().

Tuple* impala::PartitionedAggregationNode::singleton_output_tuple_
private

Result of aggregation w/o GROUP BY. Note: can be NULL even if there is no grouping if the result tuple is 0 width e.g. select 1 from table group by col.

Definition at line 184 of file partitioned-aggregation-node.h.

Referenced by Close(), GetNext(), Prepare(), and ProcessBatchNoGrouping().

bool impala::PartitionedAggregationNode::singleton_output_tuple_returned_
private

Definition at line 185 of file partitioned-aggregation-node.h.

Referenced by Close(), GetNext(), and Prepare().

std::list<Partition*> impala::PartitionedAggregationNode::spilled_partitions_
private

All partitions that have been spilled and need further processing.

Definition at line 296 of file partitioned-aggregation-node.h.

Referenced by Close(), GetNext(), MoveHashPartitions(), and NextPartition().

TPlanNodeType::type impala::ExecNode::type_
protectedinherited

Definition at line 210 of file exec-node.h.

Referenced by impala::ExecNode::CollectNodes(), and impala::ExecNode::type().

bool impala::PartitionedAggregationNode::using_small_buffers_
private

If true, the partitions in hash_partitions_ are using small buffers.

Definition at line 179 of file partitioned-aggregation-node.h.

Referenced by SpillPartition().


The documentation for this class was generated from the following files: