Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
impala::Sorter Class Reference

Note that Init() must be called right after the constructor. More...

#include <sorter.h>

Collaboration diagram for impala::Sorter:

Classes

class  Run
 
class  TupleSorter
 

Public Member Functions

 Sorter (const TupleRowComparator &compare_less_than, const std::vector< ExprContext * > &sort_tuple_slot_expr_ctxs, RowDescriptor *output_row_desc, MemTracker *mem_tracker, RuntimeProfile *profile, RuntimeState *state)
 
 ~Sorter ()
 
Status Init ()
 
Status AddBatch (RowBatch *batch)
 Adds a batch of input rows to the current unsorted run. More...
 
Status InputDone ()
 
Status GetNext (RowBatch *batch, bool *eos)
 Get the next batch of sorted output rows from the sorter. More...
 

Static Public Member Functions

static uint64_t EstimateMergeMem (uint64_t available_blocks, RowDescriptor *row_desc, int merge_batch_size)
 

Private Member Functions

Status CreateMerger (int num_runs)
 
Status MergeIntermediateRuns ()
 
Status SortRun ()
 

Private Attributes

RuntimeState *const state_
 Runtime state instance used to check for cancellation. Not owned. More...
 
TupleRowComparator compare_less_than_
 In memory sorter and less-than comparator. More...
 
boost::scoped_ptr< TupleSorterin_mem_tuple_sorter_
 
BufferedBlockMgrblock_mgr_
 Block manager object used to allocate, pin and release runs. Not owned by Sorter. More...
 
BufferedBlockMgr::Clientblock_mgr_client_
 Handle to block mgr to make allocations from. More...
 
bool has_var_len_slots_
 True if the tuples to be sorted have var-length slots. More...
 
Rununsorted_run_
 
std::list< Run * > sorted_runs_
 
RowDescriptoroutput_row_desc_
 
std::vector< ExprContext * > sort_tuple_slot_expr_ctxs_
 
MemTrackermem_tracker_
 Mem tracker for batches created during merge. Not owned by Sorter. More...
 
boost::scoped_ptr
< SortedRunMerger
merger_
 
std::list< Run * > merging_runs_
 
ObjectPool obj_pool_
 Pool of owned Run objects. More...
 
RuntimeProfileprofile_
 Runtime profile and counters for this sorter instance. More...
 
RuntimeProfile::Counterinitial_runs_counter_
 
RuntimeProfile::Counternum_merges_counter_
 
RuntimeProfile::Counterin_mem_sort_timer_
 
RuntimeProfile::Countersorted_data_size_
 

Detailed Description

Note that Init() must be called right after the constructor.

Sorter contains the external sort implementation. Its purpose is to sort arbitrarily large input data sets with a fixed memory budget by spilling data to disk if necessary. BufferedBlockMgr is used to allocate and manage blocks of data to be sorted. The client API for Sorter is as follows: AddBatch() is used to add input rows to be sorted. Multiple tuples in an input row are materialized into a row with a single tuple (the sort tuple) using the materialization exprs in sort_tuple_slot_expr_ctxs_. The sort tuples are sorted according to the sort parameters and output by the sorter. AddBatch() can be called multiple times. InputDone() is called to indicate the end of input. If multiple sorted runs were created, it triggers intermediate merge steps (if necessary) and creates the final merger that returns results via GetNext(). GetNext() is used to retrieve sorted rows. It can be called multiple times. AddBatch(), InputDone() and GetNext() must be called in that order. Batches of input rows are collected into a sequence of pinned BufferedBlockMgr blocks called a run. The maximum size of a run is determined by the maximum available buffers in the block manager. After the run is full, it is sorted in memory, unpinned and the next run is collected. The variable-length column data (e.g. string slots) in the materialized sort tuples are stored in separate sequence of blocks from the tuples themselves. When the blocks containing tuples in a run are unpinned, the var-len slot pointers are converted to offsets from the start of the first var-len data block. When a block is read back, these offsets are converted back to pointers. The in-memory sorter sorts the fixed-length tuples in-place. The output rows have the same schema as the materialized sort tuples. After the input is consumed, the sorter is left with one or more sorted runs. The client calls GetNext(output_batch) to retrieve batches of sorted rows. If there are multiple runs, the runs are merged using SortedRunMerger to produce a stream of sorted tuples. At least one block per run (two if there are var-length slots) must be pinned in memory during a merge, so multiple merges may be necessary if the number of runs is too large. During a merge, rows from multiple sorted input runs are compared and copied into a single larger run. One input batch is created to hold tuple rows for each input run, and one batch is created to hold deep copied rows (i.e. ptrs + data) from the output of the merge. If there is a single sorted run (i.e. no merge required), only tuple rows are copied into the output batch supplied by GetNext, and the data itself is left in pinned blocks held by the sorter.During a merge, one row batch is created for each input run, and one batch is created for the output of the merge (if is not the final merge). It is assumed that the memory for these batches have already been accounted for in the memory budget for the sort. That is, the memory for these batches does not come out of the block buffer manager. TODO: Not necessary to actually copy var-len data - instead take ownership of the var-length data in the input batch. Copying can be deferred until a run is unpinned. TODO: When the first run is constructed, create a sequence of pointers to materialized tuples. If the input fits in memory, the pointers can be sorted instead of sorting the tuples in place.

Definition at line 84 of file sorter.h.

Constructor & Destructor Documentation

impala::Sorter::Sorter ( const TupleRowComparator compare_less_than,
const std::vector< ExprContext * > &  sort_tuple_slot_expr_ctxs,
RowDescriptor output_row_desc,
MemTracker mem_tracker,
RuntimeProfile profile,
RuntimeState state 
)

sort_tuple_slot_exprs are the slot exprs used to materialize the tuple to be sorted. compare_less_than is a comparator for the sort tuples (returns true if lhs < rhs). merge_batch_size_ is the size of the batches created to provide rows to the merger and retrieve rows from an intermediate merger.

Definition at line 870 of file sorter.cc.

Member Function Documentation

Status impala::Sorter::AddBatch ( RowBatch batch)
Status impala::Sorter::CreateMerger ( int  num_runs)
private

Create a SortedRunMerger from the first 'num_runs' sorted runs in sorted_runs_ and assign it to merger_. The runs to be merged are removed from sorted_runs_. The Sorter sets the deep_copy_input flag to true for the merger, since the blocks containing input run data will be unpinned as input runs are read.

Definition at line 1123 of file sorter.cc.

References impala::RuntimeProfile::Counter::Add(), compare_less_than_, impala::Sorter::Run::GetNextBatch(), merger_, merging_runs_, num_merges_counter_, impala::Status::OK, output_row_desc_, impala::Sorter::Run::PrepareRead(), profile_, RETURN_IF_ERROR, and sorted_runs_.

Referenced by InputDone(), and MergeIntermediateRuns().

uint64_t impala::Sorter::EstimateMergeMem ( uint64_t  available_blocks,
RowDescriptor row_desc,
int  merge_batch_size 
)
static

Estimate the memory overhead in bytes for an intermediate merge, based on the maximum number of memory buffers available for the sort, the row descriptor for the sorted tuples and the batch size used (in rows). This is a pessimistic estimate of the memory needed by the sorter in addition to the memory used by the block buffer manager. The memory overhead is 0 if the input fits in memory. Merges incur additional memory overhead because row batches are created to hold tuple rows from the input runs, and the merger itself deep-copies sort-merged rows into its output batch.

Definition at line 1038 of file sorter.cc.

References impala::RowBatch::AT_CAPACITY_MEM_USAGE, and impala::RowDescriptor::tuple_descriptors().

Status impala::Sorter::GetNext ( RowBatch batch,
bool eos 
)

Get the next batch of sorted output rows from the sorter.

Definition at line 993 of file sorter.cc.

References merger_, impala::Status::OK, RETURN_IF_ERROR, and sorted_runs_.

Status impala::Sorter::InputDone ( )

Called to indicate there is no more input. Triggers the creation of merger(s) if necessary.

Definition at line 949 of file sorter.cc.

References impala::BufferedBlockMgr::available_allocated_buffers(), block_mgr_, CreateMerger(), has_var_len_slots_, MergeIntermediateRuns(), impala::BufferedBlockMgr::num_free_buffers(), impala::Status::OK, RETURN_IF_ERROR, sorted_runs_, and SortRun().

Status impala::Sorter::MergeIntermediateRuns ( )
private

Repeatedly replaces multiple smaller runs in sorted_runs_ with a single larger merged run until the number of remaining runs is small enough for a single merge. At least 1 (2 if var-len slots) block from each sorted run must be pinned for a merge. If the number of sorted runs is too large, merge sets of smaller runs into large runs until a final merge can be performed. An intermediate row batch containing deep copied rows is used for the output of each intermediate merge.

Definition at line 1054 of file sorter.cc.

References impala::ObjectPool::Add(), impala::BufferedBlockMgr::available_allocated_buffers(), impala::RuntimeState::batch_size(), block_mgr_, CreateMerger(), impala::BufferedBlockMgr::Block::Delete(), has_var_len_slots_, mem_tracker_, merger_, obj_pool_, impala::Status::OK, output_row_desc_, RETURN_IF_CANCELLED, RETURN_IF_ERROR, sorted_runs_, state_, impala::RowDescriptor::tuple_descriptors(), impala::BufferedBlockMgr::Block::Unpin(), and impala::BufferedBlockMgr::Block::valid_data_len().

Referenced by InputDone().

Member Data Documentation

BufferedBlockMgr* impala::Sorter::block_mgr_
private

Block manager object used to allocate, pin and release runs. Not owned by Sorter.

Definition at line 152 of file sorter.h.

Referenced by Init(), InputDone(), MergeIntermediateRuns(), and ~Sorter().

BufferedBlockMgr::Client* impala::Sorter::block_mgr_client_
private

Handle to block mgr to make allocations from.

Definition at line 155 of file sorter.h.

Referenced by Init(), and ~Sorter().

TupleRowComparator impala::Sorter::compare_less_than_
private

In memory sorter and less-than comparator.

Definition at line 148 of file sorter.h.

Referenced by CreateMerger(), and Init().

bool impala::Sorter::has_var_len_slots_
private
RuntimeProfile::Counter* impala::Sorter::in_mem_sort_timer_
private

Definition at line 198 of file sorter.h.

Referenced by Init(), and SortRun().

boost::scoped_ptr<TupleSorter> impala::Sorter::in_mem_tuple_sorter_
private

Definition at line 149 of file sorter.h.

Referenced by Init(), and SortRun().

RuntimeProfile::Counter* impala::Sorter::initial_runs_counter_
private

Definition at line 196 of file sorter.h.

Referenced by Init().

MemTracker* impala::Sorter::mem_tracker_
private

Mem tracker for batches created during merge. Not owned by Sorter.

Definition at line 180 of file sorter.h.

Referenced by Init(), and MergeIntermediateRuns().

boost::scoped_ptr<SortedRunMerger> impala::Sorter::merger_
private

Merger object (intermediate or final) currently used to produce sorted runs. Only one merge is performed at a time. Will never be used if the input fits in memory.

Definition at line 185 of file sorter.h.

Referenced by CreateMerger(), GetNext(), and MergeIntermediateRuns().

std::list<Run*> impala::Sorter::merging_runs_
private

Runs that are currently processed by the merge_. These runs can be deleted when we are done with the current merge.

Definition at line 189 of file sorter.h.

Referenced by CreateMerger(), and ~Sorter().

RuntimeProfile::Counter* impala::Sorter::num_merges_counter_
private

Definition at line 197 of file sorter.h.

Referenced by CreateMerger(), and Init().

ObjectPool impala::Sorter::obj_pool_
private

Pool of owned Run objects.

Definition at line 192 of file sorter.h.

Referenced by AddBatch(), Init(), and MergeIntermediateRuns().

RowDescriptor* impala::Sorter::output_row_desc_
private

Descriptor for the sort tuple. Input rows are materialized into 1 tuple before sorting. Not owned by the Sorter.

Definition at line 173 of file sorter.h.

Referenced by AddBatch(), CreateMerger(), Init(), and MergeIntermediateRuns().

RuntimeProfile* impala::Sorter::profile_
private

Runtime profile and counters for this sorter instance.

Definition at line 195 of file sorter.h.

Referenced by CreateMerger(), and Init().

std::vector<ExprContext*> impala::Sorter::sort_tuple_slot_expr_ctxs_
private

Expressions used to materialize the sort tuple. Contains one expr per slot in the tuple.

Definition at line 177 of file sorter.h.

RuntimeProfile::Counter* impala::Sorter::sorted_data_size_
private

Definition at line 199 of file sorter.h.

Referenced by Init(), and SortRun().

std::list<Run*> impala::Sorter::sorted_runs_
private

List of sorted runs that have been produced but not merged. unsorted_run_ is added to this list after an in-memory sort. Sorted runs produced by intermediate merges are also added to this list. Runs are added to the object pool.

Definition at line 169 of file sorter.h.

Referenced by AddBatch(), CreateMerger(), GetNext(), InputDone(), MergeIntermediateRuns(), SortRun(), and ~Sorter().

RuntimeState* const impala::Sorter::state_
private

Runtime state instance used to check for cancellation. Not owned.

Definition at line 145 of file sorter.h.

Referenced by Init(), MergeIntermediateRuns(), impala::Sorter::TupleSorter::SortHelper(), and SortRun().

Run* impala::Sorter::unsorted_run_
private

The current unsorted run that is being collected. Is sorted and added to sorted_runs_ after it is full (i.e. number of blocks allocated == max available buffers) or after the input is complete. Owned and placed in obj_pool_. When it is added to sorted_runs_, it is set to NULL.

Definition at line 164 of file sorter.h.

Referenced by AddBatch(), Init(), SortRun(), and ~Sorter().


The documentation for this class was generated from the following files: