Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
|
Note that Init() must be called right after the constructor. More...
#include <sorter.h>
Classes | |
class | Run |
class | TupleSorter |
Public Member Functions | |
Sorter (const TupleRowComparator &compare_less_than, const std::vector< ExprContext * > &sort_tuple_slot_expr_ctxs, RowDescriptor *output_row_desc, MemTracker *mem_tracker, RuntimeProfile *profile, RuntimeState *state) | |
~Sorter () | |
Status | Init () |
Status | AddBatch (RowBatch *batch) |
Adds a batch of input rows to the current unsorted run. More... | |
Status | InputDone () |
Status | GetNext (RowBatch *batch, bool *eos) |
Get the next batch of sorted output rows from the sorter. More... | |
Static Public Member Functions | |
static uint64_t | EstimateMergeMem (uint64_t available_blocks, RowDescriptor *row_desc, int merge_batch_size) |
Private Member Functions | |
Status | CreateMerger (int num_runs) |
Status | MergeIntermediateRuns () |
Status | SortRun () |
Note that Init() must be called right after the constructor.
Sorter contains the external sort implementation. Its purpose is to sort arbitrarily large input data sets with a fixed memory budget by spilling data to disk if necessary. BufferedBlockMgr is used to allocate and manage blocks of data to be sorted. The client API for Sorter is as follows: AddBatch() is used to add input rows to be sorted. Multiple tuples in an input row are materialized into a row with a single tuple (the sort tuple) using the materialization exprs in sort_tuple_slot_expr_ctxs_. The sort tuples are sorted according to the sort parameters and output by the sorter. AddBatch() can be called multiple times. InputDone() is called to indicate the end of input. If multiple sorted runs were created, it triggers intermediate merge steps (if necessary) and creates the final merger that returns results via GetNext(). GetNext() is used to retrieve sorted rows. It can be called multiple times. AddBatch(), InputDone() and GetNext() must be called in that order. Batches of input rows are collected into a sequence of pinned BufferedBlockMgr blocks called a run. The maximum size of a run is determined by the maximum available buffers in the block manager. After the run is full, it is sorted in memory, unpinned and the next run is collected. The variable-length column data (e.g. string slots) in the materialized sort tuples are stored in separate sequence of blocks from the tuples themselves. When the blocks containing tuples in a run are unpinned, the var-len slot pointers are converted to offsets from the start of the first var-len data block. When a block is read back, these offsets are converted back to pointers. The in-memory sorter sorts the fixed-length tuples in-place. The output rows have the same schema as the materialized sort tuples. After the input is consumed, the sorter is left with one or more sorted runs. The client calls GetNext(output_batch) to retrieve batches of sorted rows. If there are multiple runs, the runs are merged using SortedRunMerger to produce a stream of sorted tuples. At least one block per run (two if there are var-length slots) must be pinned in memory during a merge, so multiple merges may be necessary if the number of runs is too large. During a merge, rows from multiple sorted input runs are compared and copied into a single larger run. One input batch is created to hold tuple rows for each input run, and one batch is created to hold deep copied rows (i.e. ptrs + data) from the output of the merge. If there is a single sorted run (i.e. no merge required), only tuple rows are copied into the output batch supplied by GetNext, and the data itself is left in pinned blocks held by the sorter.During a merge, one row batch is created for each input run, and one batch is created for the output of the merge (if is not the final merge). It is assumed that the memory for these batches have already been accounted for in the memory budget for the sort. That is, the memory for these batches does not come out of the block buffer manager. TODO: Not necessary to actually copy var-len data - instead take ownership of the var-length data in the input batch. Copying can be deferred until a run is unpinned. TODO: When the first run is constructed, create a sequence of pointers to materialized tuples. If the input fits in memory, the pointers can be sorted instead of sorting the tuples in place.
impala::Sorter::Sorter | ( | const TupleRowComparator & | compare_less_than, |
const std::vector< ExprContext * > & | sort_tuple_slot_expr_ctxs, | ||
RowDescriptor * | output_row_desc, | ||
MemTracker * | mem_tracker, | ||
RuntimeProfile * | profile, | ||
RuntimeState * | state | ||
) |
sort_tuple_slot_exprs are the slot exprs used to materialize the tuple to be sorted. compare_less_than is a comparator for the sort tuples (returns true if lhs < rhs). merge_batch_size_ is the size of the batches created to provide rows to the merger and retrieve rows from an intermediate merger.
impala::Sorter::~Sorter | ( | ) |
Definition at line 884 of file sorter.cc.
References block_mgr_, block_mgr_client_, impala::BufferedBlockMgr::ClearReservations(), impala::Sorter::Run::DeleteAllBlocks(), merging_runs_, sorted_runs_, and unsorted_run_.
Adds a batch of input rows to the current unsorted run.
Definition at line 923 of file sorter.cc.
References impala::ObjectPool::Add(), impala::Sorter::Run::AddBatch(), has_var_len_slots_, obj_pool_, impala::Status::OK, output_row_desc_, RETURN_IF_ERROR, sorted_runs_, SortRun(), impala::RowDescriptor::tuple_descriptors(), and unsorted_run_.
|
private |
Create a SortedRunMerger from the first 'num_runs' sorted runs in sorted_runs_ and assign it to merger_. The runs to be merged are removed from sorted_runs_. The Sorter sets the deep_copy_input flag to true for the merger, since the blocks containing input run data will be unpinned as input runs are read.
Definition at line 1123 of file sorter.cc.
References impala::RuntimeProfile::Counter::Add(), compare_less_than_, impala::Sorter::Run::GetNextBatch(), merger_, merging_runs_, num_merges_counter_, impala::Status::OK, output_row_desc_, impala::Sorter::Run::PrepareRead(), profile_, RETURN_IF_ERROR, and sorted_runs_.
Referenced by InputDone(), and MergeIntermediateRuns().
|
static |
Estimate the memory overhead in bytes for an intermediate merge, based on the maximum number of memory buffers available for the sort, the row descriptor for the sorted tuples and the batch size used (in rows). This is a pessimistic estimate of the memory needed by the sorter in addition to the memory used by the block buffer manager. The memory overhead is 0 if the input fits in memory. Merges incur additional memory overhead because row batches are created to hold tuple rows from the input runs, and the merger itself deep-copies sort-merged rows into its output batch.
Definition at line 1038 of file sorter.cc.
References impala::RowBatch::AT_CAPACITY_MEM_USAGE, and impala::RowDescriptor::tuple_descriptors().
Get the next batch of sorted output rows from the sorter.
Definition at line 993 of file sorter.cc.
References merger_, impala::Status::OK, RETURN_IF_ERROR, and sorted_runs_.
Status impala::Sorter::Init | ( | ) |
Initialization code, including registration to the block_mgr and the initialization of the unsorted_run_, both of these may fail.
Definition at line 896 of file sorter.cc.
References impala::ObjectPool::Add(), ADD_COUNTER, ADD_TIMER, block_mgr_, block_mgr_client_, impala::BLOCKS_REQUIRED_FOR_MERGE, impala::TupleDescriptor::byte_size(), compare_less_than_, has_var_len_slots_, in_mem_sort_timer_, in_mem_tuple_sorter_, impala::Sorter::Run::Init(), initial_runs_counter_, impala::BufferedBlockMgr::max_block_size(), mem_tracker_, num_merges_counter_, obj_pool_, impala::Status::OK, output_row_desc_, profile_, impala::BufferedBlockMgr::RegisterClient(), RETURN_IF_ERROR, sorted_data_size_, state_, impala::TupleDescriptor::string_slots(), impala::RowDescriptor::tuple_descriptors(), and unsorted_run_.
Status impala::Sorter::InputDone | ( | ) |
Called to indicate there is no more input. Triggers the creation of merger(s) if necessary.
Definition at line 949 of file sorter.cc.
References impala::BufferedBlockMgr::available_allocated_buffers(), block_mgr_, CreateMerger(), has_var_len_slots_, MergeIntermediateRuns(), impala::BufferedBlockMgr::num_free_buffers(), impala::Status::OK, RETURN_IF_ERROR, sorted_runs_, and SortRun().
|
private |
Repeatedly replaces multiple smaller runs in sorted_runs_ with a single larger merged run until the number of remaining runs is small enough for a single merge. At least 1 (2 if var-len slots) block from each sorted run must be pinned for a merge. If the number of sorted runs is too large, merge sets of smaller runs into large runs until a final merge can be performed. An intermediate row batch containing deep copied rows is used for the output of each intermediate merge.
Definition at line 1054 of file sorter.cc.
References impala::ObjectPool::Add(), impala::BufferedBlockMgr::available_allocated_buffers(), impala::RuntimeState::batch_size(), block_mgr_, CreateMerger(), impala::BufferedBlockMgr::Block::Delete(), has_var_len_slots_, mem_tracker_, merger_, obj_pool_, impala::Status::OK, output_row_desc_, RETURN_IF_CANCELLED, RETURN_IF_ERROR, sorted_runs_, state_, impala::RowDescriptor::tuple_descriptors(), impala::BufferedBlockMgr::Block::Unpin(), and impala::BufferedBlockMgr::Block::valid_data_len().
Referenced by InputDone().
|
private |
Sorts unsorted_run_ and appends it to the list of sorted runs. Deletes any empty blocks at the end of the run. Updates the sort bytes counter if necessary.
Definition at line 1006 of file sorter.cc.
References impala::RuntimeProfile::Counter::Add(), impala::BufferedBlockMgr::Block::Delete(), impala::Sorter::Run::fixed_len_blocks_, has_var_len_slots_, in_mem_sort_timer_, in_mem_tuple_sorter_, impala::Status::OK, RETURN_IF_CANCELLED, RETURN_IF_ERROR, SCOPED_TIMER, sorted_data_size_, sorted_runs_, state_, unsorted_run_, impala::BufferedBlockMgr::Block::valid_data_len(), impala::Sorter::Run::var_len_blocks_, and impala::Sorter::Run::var_len_copy_block_.
Referenced by AddBatch(), and InputDone().
|
private |
Block manager object used to allocate, pin and release runs. Not owned by Sorter.
Definition at line 152 of file sorter.h.
Referenced by Init(), InputDone(), MergeIntermediateRuns(), and ~Sorter().
|
private |
|
private |
In memory sorter and less-than comparator.
Definition at line 148 of file sorter.h.
Referenced by CreateMerger(), and Init().
|
private |
True if the tuples to be sorted have var-length slots.
Definition at line 158 of file sorter.h.
Referenced by AddBatch(), impala::Sorter::Run::GetNextBatch(), impala::Sorter::Run::Init(), Init(), InputDone(), MergeIntermediateRuns(), impala::Sorter::Run::PrepareRead(), SortRun(), and impala::Sorter::Run::UnpinAllBlocks().
|
private |
|
private |
|
private |
|
private |
Mem tracker for batches created during merge. Not owned by Sorter.
Definition at line 180 of file sorter.h.
Referenced by Init(), and MergeIntermediateRuns().
|
private |
Merger object (intermediate or final) currently used to produce sorted runs. Only one merge is performed at a time. Will never be used if the input fits in memory.
Definition at line 185 of file sorter.h.
Referenced by CreateMerger(), GetNext(), and MergeIntermediateRuns().
|
private |
Runs that are currently processed by the merge_. These runs can be deleted when we are done with the current merge.
Definition at line 189 of file sorter.h.
Referenced by CreateMerger(), and ~Sorter().
|
private |
Definition at line 197 of file sorter.h.
Referenced by CreateMerger(), and Init().
|
private |
Pool of owned Run objects.
Definition at line 192 of file sorter.h.
Referenced by AddBatch(), Init(), and MergeIntermediateRuns().
|
private |
Descriptor for the sort tuple. Input rows are materialized into 1 tuple before sorting. Not owned by the Sorter.
Definition at line 173 of file sorter.h.
Referenced by AddBatch(), CreateMerger(), Init(), and MergeIntermediateRuns().
|
private |
Runtime profile and counters for this sorter instance.
Definition at line 195 of file sorter.h.
Referenced by CreateMerger(), and Init().
|
private |
|
private |
|
private |
List of sorted runs that have been produced but not merged. unsorted_run_ is added to this list after an in-memory sort. Sorted runs produced by intermediate merges are also added to this list. Runs are added to the object pool.
Definition at line 169 of file sorter.h.
Referenced by AddBatch(), CreateMerger(), GetNext(), InputDone(), MergeIntermediateRuns(), SortRun(), and ~Sorter().
|
private |
Runtime state instance used to check for cancellation. Not owned.
Definition at line 145 of file sorter.h.
Referenced by Init(), MergeIntermediateRuns(), impala::Sorter::TupleSorter::SortHelper(), and SortRun().
|
private |
The current unsorted run that is being collected. Is sorted and added to sorted_runs_ after it is full (i.e. number of blocks allocated == max available buffers) or after the input is complete. Owned and placed in obj_pool_. When it is added to sorted_runs_, it is set to NULL.
Definition at line 164 of file sorter.h.
Referenced by AddBatch(), Init(), SortRun(), and ~Sorter().