16 #include <gutil/strings/substitute.h>
26 using namespace strings;
36 "for sorting. Reducing query concurrency or increasing the memory available to "
37 "Impala may help running this query.";
64 template <
bool has_var_len_data>
65 Status AddBatch(
RowBatch* batch,
int start_index,
int* num_processed);
73 void DeleteAllBlocks();
94 template <
bool convert_offset_to_ptr>
101 bool CanExtendRun()
const;
105 void CollectNonNullVarSlots(
Tuple* src, vector<StringValue*>* var_len_values,
114 Status TryAddBlock(vector<BufferedBlockMgr::Block*>* block_sequence,
bool* added);
122 void CopyVarLenData(
char* dest,
const vector<StringValue*>& var_values);
127 void CopyVarLenDataConvertOffset(
char* dest, int64_t
offset,
128 const vector<StringValue*>& var_values);
216 static const int INSERTION_THRESHOLD = 16;
225 current_tuple_(NULL) {
227 DCHECK_LE(index, parent_->run_->num_tuples_);
229 if (parent_->run_->num_tuples_ == 0)
return;
233 int past_end_bytes = 0;
234 if (
UNLIKELY(index >= parent_->run_->num_tuples_)) {
236 index_ = parent_->run_->num_tuples_;
242 current_tuple_ = buffer_start_ + block_offset + past_end_bytes;
248 current_tuple_ += parent_->tuple_size_;
250 if (
UNLIKELY(current_tuple_ > buffer_start_ + parent_->last_tuple_block_offset_ &&
251 index_ < parent_->run_->num_tuples_)) {
254 DCHECK_LT(block_index_, parent_->run_->fixed_len_blocks_.size());
255 buffer_start_ = parent_->run_->fixed_len_blocks_[block_index_]->buffer();
256 current_tuple_ = buffer_start_;
263 current_tuple_ -= parent_->tuple_size_;
265 if (
UNLIKELY(current_tuple_ < buffer_start_ && index_ >= 0)) {
267 DCHECK_GE(block_index_, 0);
268 buffer_start_ = parent_->run_->fixed_len_blocks_[block_index_]->buffer();
269 current_tuple_ = buffer_start_ + parent_->last_tuple_block_offset_;
332 void Swap(uint8_t* left, uint8_t* right);
337 bool materialize_slots)
339 sort_tuple_desc_(sort_tuple_desc),
340 sort_tuple_size_(sort_tuple_desc->byte_size()),
341 block_size_(parent->block_mgr_->max_block_size()),
342 has_var_len_slots_(sort_tuple_desc->string_slots().size() > 0),
343 materialize_slots_(materialize_slots),
344 is_sorted_(!materialize_slots),
346 var_len_copy_block_(NULL),
353 sorter_->block_mgr_->GetNewBlock(sorter_->block_mgr_client_, NULL, &block));
354 DCHECK_NOTNULL(block);
355 fixed_len_blocks_.push_back(block);
358 sorter_->block_mgr_->GetNewBlock(sorter_->block_mgr_client_, NULL, &block));
359 DCHECK_NOTNULL(block);
360 var_len_blocks_.push_back(block);
363 sorter_->block_mgr_client_, NULL, &var_len_copy_block_));
364 DCHECK_NOTNULL(var_len_copy_block_);
367 if (!is_sorted_) sorter_->initial_runs_counter_->Add(1);
371 template <
bool has_var_len_data>
373 DCHECK(!fixed_len_blocks_.empty());
377 DCHECK_EQ(materialize_slots_, !is_sorted_);
378 if (!materialize_slots_) {
395 int cur_input_index = start_index;
396 vector<StringValue*> var_values;
397 var_values.reserve(sort_tuple_desc_->string_slots().size());
398 while (cur_input_index < batch->num_rows()) {
401 int tuples_remaining = cur_fixed_len_block->
BytesRemaining() / sort_tuple_size_;
402 tuples_remaining = min(batch->
num_rows() - cur_input_index, tuples_remaining);
404 for (
int i = 0; i < tuples_remaining; ++i) {
405 int total_var_len = 0;
408 if (materialize_slots_) {
410 sorter_->sort_tuple_slot_expr_ctxs_, NULL, &var_values, &total_var_len);
411 if (total_var_len > sorter_->block_mgr_->max_block_size()) {
413 "Variable length data in a single tuple larger than block size $0 > $1",
414 total_var_len, sorter_->block_mgr_->max_block_size())));
417 memcpy(new_tuple, input_row->
GetTuple(0), sort_tuple_size_);
418 if (has_var_len_data) {
419 CollectNonNullVarSlots(new_tuple, &var_values, &total_var_len);
423 if (has_var_len_data) {
424 DCHECK_GT(var_len_blocks_.size(), 0);
426 if (cur_var_len_block->BytesRemaining() < total_var_len) {
430 cur_var_len_block = var_len_blocks_.back();
439 char* var_data_ptr = cur_var_len_block->Allocate<
char>(total_var_len);
440 if (materialize_slots_) {
441 CopyVarLenData(var_data_ptr, var_values);
443 int64_t
offset = (var_len_blocks_.size() - 1) * block_size_;
444 offset += var_data_ptr -
reinterpret_cast<char*
>(cur_var_len_block->buffer());
445 CopyVarLenDataConvertOffset(var_data_ptr, offset, var_values);
455 if (cur_input_index < batch->num_rows()) {
459 cur_fixed_len_block = fixed_len_blocks_.back();
470 if (block != NULL) block->
Delete();
473 if (block != NULL) block->
Delete();
475 if (var_len_copy_block_ != NULL) var_len_copy_block_->Delete();
479 vector<BufferedBlockMgr::Block*> sorted_var_len_blocks;
480 sorted_var_len_blocks.reserve(var_len_blocks_.size());
481 vector<StringValue*> var_values;
482 int64_t var_data_offset = 0;
484 var_values.reserve(sort_tuple_desc_->string_slots().size());
487 DCHECK_NOTNULL(var_len_copy_block_);
488 sorted_var_len_blocks.push_back(var_len_copy_block_);
489 cur_sorted_var_len_block = sorted_var_len_blocks.back();
491 DCHECK(var_len_copy_block_ == NULL);
494 for (
int i = 0; i < fixed_len_blocks_.size(); ++i) {
497 for (
int block_offset = 0; block_offset < cur_fixed_block->
valid_data_len();
498 block_offset += sort_tuple_size_) {
500 reinterpret_cast<Tuple*
>(cur_fixed_block->
buffer() + block_offset);
501 CollectNonNullVarSlots(cur_tuple, &var_values, &total_var_len);
506 cur_sorted_var_len_block = sorted_var_len_blocks.back();
508 char* var_data_ptr = cur_sorted_var_len_block->
Allocate<
char>(total_var_len);
509 var_data_offset = block_size_ * (sorted_var_len_blocks.size() - 1) +
510 (var_data_ptr - reinterpret_cast<char*>(cur_sorted_var_len_block->
buffer()));
511 CopyVarLenDataConvertOffset(var_data_ptr, var_data_offset, var_values);
518 DCHECK_GT(sorted_var_len_blocks.back()->valid_data_len(), 0);
526 var_len_blocks_.clear();
527 sorted_var_len_blocks.swap(var_len_blocks_);
530 var_len_copy_block_ = NULL;
536 fixed_len_blocks_index_ = 0;
537 fixed_len_block_offset_ = 0;
538 var_len_blocks_index_ = 0;
539 pin_next_fixed_len_block_ = pin_next_var_len_block_ =
false;
540 num_tuples_returned_ = 0;
542 buffered_batch_.reset(
new RowBatch(*sorter_->output_row_desc_,
543 sorter_->state_->batch_size(), sorter_->mem_tracker_));
551 if (fixed_len_blocks_.size() > 0) {
574 if (buffered_batch_.get() != NULL) {
575 buffered_batch_->Reset();
580 if (buffered_batch_->num_rows() == 0 && !eos) {
588 DCHECK(eos || buffered_batch_->num_rows() > 0);
591 DCHECK_EQ(buffered_batch_->num_rows(), 0);
592 buffered_batch_.reset();
595 fixed_len_blocks_[fixed_len_blocks_.size() - 1] = NULL;
598 var_len_blocks_[var_len_blocks_.size() - 1] = NULL;
604 *output_batch = buffered_batch_.get();
608 template <
bool convert_offset_to_ptr>
610 if (fixed_len_blocks_index_ == fixed_len_blocks_.size()) {
612 DCHECK_EQ(num_tuples_returned_, num_tuples_);
623 if (pin_next_fixed_len_block_) {
624 RETURN_IF_ERROR(fixed_len_blocks_[fixed_len_blocks_index_ - 1]->Delete());
625 fixed_len_blocks_[fixed_len_blocks_index_ - 1] = NULL;
629 pin_next_fixed_len_block_ =
false;
631 if (pin_next_var_len_block_) {
633 var_len_blocks_[var_len_blocks_index_ - 1] = NULL;
637 pin_next_var_len_block_ =
false;
644 Tuple* input_tuple =
reinterpret_cast<Tuple*
>(
645 fixed_len_block->
buffer() + fixed_len_block_offset_);
647 if (convert_offset_to_ptr) {
649 const vector<SlotDescriptor*>& var_slots = sort_tuple_desc_->string_slots();
650 for (
int i = 0; i < var_slots.size(); ++i) {
657 int64_t data_offset =
reinterpret_cast<int64_t
>(value->ptr);
662 int block_index = data_offset / block_size_;
663 int block_offset = data_offset % block_size_;
665 if (block_index > var_len_blocks_index_) {
668 DCHECK_EQ(block_index, var_len_blocks_index_ + 1);
669 DCHECK_EQ(block_offset, 0);
671 var_len_blocks_index_ = block_index;
672 pin_next_var_len_block_ =
true;
675 DCHECK_EQ(block_index, var_len_blocks_index_);
677 value->ptr =
reinterpret_cast<char*
>(
678 var_len_blocks_[var_len_blocks_index_]->buffer() + block_offset);
683 if (pin_next_var_len_block_)
break;
686 int output_row_idx = output_batch->
AddRow();
689 fixed_len_block_offset_ += sort_tuple_size_;
690 ++num_tuples_returned_;
693 if (fixed_len_block_offset_ >= fixed_len_block->
valid_data_len()) {
694 pin_next_fixed_len_block_ =
true;
695 ++fixed_len_blocks_index_;
696 fixed_len_block_offset_ = 0;
703 vector<StringValue*>* var_len_values,
int* total_var_len) {
704 var_len_values->clear();
706 BOOST_FOREACH(
const SlotDescriptor* var_slot, sort_tuple_desc_->string_slots()) {
710 var_len_values->push_back(string_val);
711 *total_var_len += string_val->
len;
718 DCHECK(!block_sequence->empty());
721 sorter_->sorted_data_size_->Add(last_block->valid_data_len());
729 sorter_->block_mgr_client_, last_block, &new_block));
730 if (new_block != NULL) {
732 block_sequence->push_back(new_block);
741 memcpy(dest, var_val->
ptr, var_val->
len);
743 dest += var_val->
len;
748 const vector<StringValue*>& var_values) {
750 memcpy(dest, var_val->
ptr, var_val->
len);
751 var_val->
ptr =
reinterpret_cast<char*
>(
offset);
752 dest += var_val->
len;
753 offset += var_val->
len;
760 : tuple_size_(tuple_size),
761 block_capacity_(block_size / tuple_size),
762 last_tuple_block_offset_(tuple_size * ((block_size / tuple_size) - 1)),
763 less_than_comp_(comp),
771 delete[] temp_tuple_buffer_;
772 delete[] swap_buffer_;
795 memcpy(temp_tuple_buffer_, insert_iter.
current_tuple_, tuple_size_);
803 while (less_than_comp_(temp_tuple_row_,
813 memcpy(copy_to, temp_tuple_buffer_, tuple_size_);
820 memcpy(temp_tuple_buffer_, pivot, tuple_size_);
825 while (less_than_comp_(reinterpret_cast<TupleRow*>(&first.
current_tuple_),
829 while (less_than_comp_(temp_tuple_row_,
848 while (last.
index_ - first.
index_ > INSERTION_THRESHOLD) {
855 SortHelper(cut, last);
860 InsertionSort(first, last);
864 memcpy(swap_buffer_, left, tuple_size_);
865 memcpy(left, right, tuple_size_);
866 memcpy(right, swap_buffer_, tuple_size_);
871 const vector<ExprContext*>& slot_materialize_expr_ctxs,
887 (*it)->DeleteAllBlocks();
890 (*it)->DeleteAllBlocks();
913 min_blocks_required *= 2;
925 DCHECK_NOTNULL(batch);
926 int num_processed = 0;
927 int cur_batch_index = 0;
928 while (cur_batch_index < batch->num_rows()) {
931 batch, cur_batch_index, &num_processed));
934 batch, cur_batch_index, &num_processed));
936 cur_batch_index += num_processed;
937 if (cur_batch_index < batch->num_rows()) {
943 unsorted_run_->Init();
960 int min_buffers_for_merge =
sorted_runs_.size() * blocks_per_run;
962 bool unpinned_final =
false;
968 unpinned_final =
true;
983 DCHECK(unpinned_final);
1040 bool has_var_len_slots = row_desc->
tuple_descriptors()[0]->string_slots().size() > 0;
1041 int blocks_per_run = has_var_len_slots ? 2 : 1;
1042 int max_input_runs_per_merge = (available_blocks / blocks_per_run) - 1;
1046 merge_batch_size *
sizeof(
Tuple*) * max_input_runs_per_merge;
1051 return input_batch_mem + output_batch_mem;
1056 int max_runs_per_final_merge =
1061 int max_runs_per_intermediate_merge = max_runs_per_final_merge - 1;
1062 DCHECK_GT(max_runs_per_intermediate_merge, 1);
1066 scoped_ptr<RowBatch> intermediate_merge_batch;
1067 while (
sorted_runs_.size() > max_runs_per_final_merge) {
1071 int num_runs_to_merge = min<int>(max_runs_per_intermediate_merge,
1072 sorted_runs_.size() - max_runs_per_intermediate_merge);
1088 ret_status = merged_run->AddBatch<
true>(&intermediate_merge_batch,
1091 ret_status = merged_run->AddBatch<
false>(&intermediate_merge_batch,
1094 if (!ret_status.ok())
return ret_status;
1096 DCHECK_EQ(num_copied, intermediate_merge_batch.num_rows());
1097 intermediate_merge_batch.Reset();
1105 merged_run->fixed_len_blocks_.pop_back();
1108 last_block = merged_run->var_len_blocks_.back();
1113 merged_run->var_len_blocks_.pop_back();
1116 merged_run->is_pinned_ =
false;
1124 DCHECK_GT(num_runs, 1);
1128 (*it)->DeleteAllBlocks();
1134 vector<function<Status (RowBatch**)> > merge_runs;
1135 merge_runs.reserve(num_runs);
1136 for (
int i = 0; i < num_runs; ++i) {
void InsertionSort(const TupleIterator &first, const TupleIterator &last)
std::vector< ExprContext * > sort_tuple_slot_expr_ctxs_
RuntimeProfile::Counter * num_merges_counter_
const TupleRowComparator less_than_comp_
Status AddBatch(RowBatch *batch, int start_index, int *num_processed)
void CopyVarLenData(char *dest, const vector< StringValue * > &var_values)
Status GetNextBatch(RowBatch **sorted_batch)
static uint64_t EstimateMergeMem(uint64_t available_blocks, RowDescriptor *row_desc, int merge_batch_size)
Tuple * GetTuple(int tuple_idx)
Status GetNext(RowBatch **batch, RuntimeState *state)
const bool has_var_len_slots_
int var_len_blocks_index_
Status Pin(bool *pinned, Block *release_block=NULL, bool unpin=true)
int available_allocated_buffers() const
A tuple with 0 materialised slots is represented as NULL.
T * Allocate(int size)
Allocates the specified number of bytes from this block.
const RowDescriptor & row_desc() const
#define RETURN_IF_ERROR(stmt)
some generally useful macros
TupleRow * GetRow(int row_idx)
RuntimeState *const state_
TupleSorter(const TupleRowComparator &less_than_comp, int64_t block_size, int tuple_size, RuntimeState *state)
#define ADD_TIMER(profile, name)
Status RegisterClient(int num_reserved_buffers, MemTracker *tracker, RuntimeState *state, Client **client)
Status CreateMerger(int num_runs)
int64_t max_block_size() const
void * GetSlot(int offset)
vector< BufferedBlockMgr::Block * > fixed_len_blocks_
void AddDetail(const std::string &msg)
Add a detail string. Calling this method is only defined on a non-OK message.
const int BLOCKS_REQUIRED_FOR_MERGE
const NullIndicatorOffset & null_indicator_offset() const
RuntimeProfile * profile_
Runtime profile and counters for this sorter instance.
RuntimeState *const state_
Runtime state instance used to check for cancellation. Not owned.
TupleRowComparator compare_less_than_
In memory sorter and less-than comparator.
TupleIterator(TupleSorter *parent, int64_t index)
RuntimeProfile::Counter * in_mem_sort_timer_
BufferedBlockMgr::Block * var_len_copy_block_
bool pin_next_fixed_len_block_
bool IsNull(const NullIndicatorOffset &offset) const
int BytesRemaining() const
Return the number of remaining bytes that can be allocated in this block.
const std::vector< SlotDescriptor * > & string_slots() const
boost::scoped_ptr< TupleSorter > in_mem_tuple_sorter_
void ClearReservations(Client *client)
Clears all reservations for this client.
int64_t valid_data_len() const
Return the number of bytes allocated in this block.
const TupleDescriptor * sort_tuple_desc_
static const int AT_CAPACITY_MEM_USAGE
void Swap(uint8_t *left, uint8_t *right)
RuntimeProfile::Counter * sorted_data_size_
const int last_tuple_block_offset_
boost::scoped_ptr< SortedRunMerger > merger_
MemTracker * mem_tracker_
Mem tracker for batches created during merge. Not owned by Sorter.
TupleRow * temp_tuple_row_
const ColumnType & type() const
Status GetNext(RowBatch *output_batch, bool *eos)
#define RETURN_IF_CANCELLED(state)
scoped_ptr< RowBatch > buffered_batch_
vector< BufferedBlockMgr::Block * > var_len_blocks_
Status AddBatch(RowBatch *batch)
Adds a batch of input rows to the current unsorted run.
int fixed_len_block_offset_
#define ADD_COUNTER(profile, name, unit)
RuntimeProfile::Counter * initial_runs_counter_
RowDescriptor * output_row_desc_
const int sort_tuple_size_
This class is thread-safe.
int num_free_buffers() const
void CopyVarLenDataConvertOffset(char *dest, int64_t offset, const vector< StringValue * > &var_values)
const RowDescriptor & row_desc() const
Status GetNext(RowBatch *batch, bool *eos)
Get the next batch of sorted output rows from the sorter.
static const Status MEM_LIMIT_EXCEEDED
bool is_cancelled() const
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
int64_t num_tuples_returned_
void SetTuple(int tuple_idx, Tuple *tuple)
const string PIN_FAILED_ERROR_MSG
std::list< Run * > sorted_runs_
const int block_capacity_
int fixed_len_blocks_index_
Status TryAddBlock(vector< BufferedBlockMgr::Block * > *block_sequence, bool *added)
uint8_t * temp_tuple_buffer_
Note that Init() must be called right after the constructor.
BufferedBlockMgr::Client * block_mgr_client_
Handle to block mgr to make allocations from.
uint8_t offset[7 *64-sizeof(uint64_t)]
Sorter(const TupleRowComparator &compare_less_than, const std::vector< ExprContext * > &sort_tuple_slot_expr_ctxs, RowDescriptor *output_row_desc, MemTracker *mem_tracker, RuntimeProfile *profile, RuntimeState *state)
std::list< Run * > merging_runs_
void MaterializeExprs(TupleRow *row, const TupleDescriptor &desc, const std::vector< ExprContext * > &materialize_expr_ctxs, MemPool *pool, std::vector< StringValue * > *non_null_var_len_values=NULL, int *total_var_len=NULL)
virtual void Add(int64_t delta)
Status MergeIntermediateRuns()
ObjectPool obj_pool_
Pool of owned Run objects.
bool has_var_len_slots_
True if the tuples to be sorted have var-length slots.
BufferedBlockMgr * block_mgr_
Block manager object used to allocate, pin and release runs. Not owned by Sorter. ...
const bool materialize_slots_
void CollectNonNullVarSlots(Tuple *src, vector< StringValue * > *var_len_values, int *total_var_len)
void SortHelper(TupleIterator first, TupleIterator last)
TupleIterator Partition(TupleIterator first, TupleIterator last, Tuple *pivot)
void ReturnAllocation(int size)
Return size bytes from the most recent allocation.
bool pin_next_var_len_block_