Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
sorter.cc
Go to the documentation of this file.
1 // Copyright 2013 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "runtime/sorter.h"
16 #include <gutil/strings/substitute.h>
17 
19 #include "runtime/row-batch.h"
20 #include "runtime/runtime-state.h"
22 #include "util/runtime-profile.h"
23 
24 #include "common/names.h"
25 
26 using namespace strings;
27 
28 namespace impala {
29 
30 // Number of pinned blocks required for a merge.
32 
33 // Error message when pinning fixed or variable length blocks failed.
34 // TODO: Add the node id that iniated the sort
35 const string PIN_FAILED_ERROR_MSG = "Failed to pin block for $0-length data needed "
36  "for sorting. Reducing query concurrency or increasing the memory available to "
37  "Impala may help running this query.";
38 
39 // A run is a sequence of blocks containing tuples that are or will eventually be in
40 // sorted order.
41 // A run may maintain two sequences of blocks - one containing the tuples themselves,
42 // (i.e. fixed-len slots and ptrs to var-len data), and the other for the var-length
43 // column data pointed to by those tuples.
44 // Tuples in a run may be sorted in place (in-memory) and merged using a merger.
45 class Sorter::Run {
46  public:
47  // materialize_slots is true for runs constructed from input rows. The input rows are
48  // materialized into single sort tuples using the expressions in
49  // sort_tuple_slot_expr_ctxs_. For intermediate merges, the tuples are already
50  // materialized so materialize_slots is false.
51  Run(Sorter* parent, TupleDescriptor* sort_tuple_desc, bool materialize_slots);
52 
53  // Initialize the run for input rows by allocating the minimum number of required
54  // blocks - one block for fixed-len data added to fixed_len_blocks_, one for the
55  // initially unsorted var-len data added to var_len_blocks_, and one to copy sorted
56  // var-len data into (var_len_copy_block_).
57  Status Init();
58 
59  // Add a batch of input rows to the current run. Returns the number
60  // of rows actually added in num_processed. If the run is full (no more blocks can
61  // be allocated), num_processed may be less than the number of rows in the batch.
62  // If materialize_slots_ is true, materializes the input rows using the expressions
63  // in sorter_->sort_tuple_slot_expr_ctxs_, else just copies the input rows.
64  template <bool has_var_len_data>
65  Status AddBatch(RowBatch* batch, int start_index, int* num_processed);
66 
67  // Unpins all the blocks in a sorted run. Var-length column data is copied into new
68  // blocks in sorted order. Pointers in the original tuples are converted to offsets
69  // from the beginning of the sequence of var-len data blocks.
70  Status UnpinAllBlocks();
71 
72  // Deletes all blocks.
73  void DeleteAllBlocks();
74 
75  // Interface for merger - get the next batch of rows from this run. The callee (Run)
76  // still owns the returned batch. Calls GetNext(RowBatch*, bool*).
77  Status GetNextBatch(RowBatch** sorted_batch);
78 
79  private:
80  friend class Sorter;
81  friend class TupleSorter;
82 
83  // Fill output_batch with rows from this run. If convert_offset_to_ptr is true, offsets
84  // in var-length slots are converted back to pointers. Only row pointers are copied
85  // into output_batch.
86  // If this run was unpinned, one block (2 if there are var-len slots) is pinned while
87  // rows are filled into output_batch. The block is unpinned before the next block is
88  // pinned. Atmost 1 (2) block(s) will be pinned at any time.
89  // If the run was pinned, the blocks are not unpinned (Sorter holds on to the memory).
90  // In either case, all rows in output_batch will have their fixed and var-len data from
91  // the same block.
92  // TODO: If we leave the last run to be merged in memory, the fixed-len blocks can be
93  // unpinned as they are consumed.
94  template <bool convert_offset_to_ptr>
95  Status GetNext(RowBatch* output_batch, bool* eos);
96 
97  // Check if a run can be extended by allocating additional blocks from the block
98  // manager. Always true when building a sorted run in an intermediate merge, because
99  // the current block(s) can be unpinned before getting the next free block (so a block
100  // is always available)
101  bool CanExtendRun() const;
102 
103  // Collect the non-null var-len (e.g. STRING) slots from 'src' in var_slots and return
104  // the total length of all var_len slots in total_var_len.
105  void CollectNonNullVarSlots(Tuple* src, vector<StringValue*>* var_len_values,
106  int* total_var_len);
107 
108  // Check if the current run can be extended by a block. Add the newly allocated block
109  // to block_sequence, or set added to false if the run could not be extended.
110  // If the run is sorted (produced by an intermediate merge), unpin the last block in
111  // block_sequence before allocating and adding a new block - the run can always be
112  // extended in this case. If the run is unsorted, check max_blocks_in_unsorted_run_
113  // to see if a block can be added to the run. Also updates the sort bytes counter.
114  Status TryAddBlock(vector<BufferedBlockMgr::Block*>* block_sequence, bool* added);
115 
116  // Prepare to read a sorted run. Pins the first block(s) in the run if the run was
117  // previously unpinned.
118  Status PrepareRead();
119 
120  // Copy the StringValue data in var_values to dest in order and update the StringValue
121  // ptrs to point to the copied data.
122  void CopyVarLenData(char* dest, const vector<StringValue*>& var_values);
123 
124  // Copy the StringValue in var_values to dest in order. Update the StringValue ptrs to
125  // contain an offset to the copied data. Parameter 'offset' is the offset for the first
126  // StringValue.
127  void CopyVarLenDataConvertOffset(char* dest, int64_t offset,
128  const vector<StringValue*>& var_values);
129 
130  // Parent sorter object.
131  const Sorter* sorter_;
132 
133  // Materialized sort tuple. Input rows are materialized into 1 tuple (with descriptor
134  // sort_tuple_desc_) before sorting.
136 
137  // Sizes of sort tuple and block.
138  const int sort_tuple_size_;
139  const int block_size_;
140 
141  const bool has_var_len_slots_;
142 
143  // True if the sort tuple must be materialized from the input batch in AddBatch().
144  // materialize_slots_ is true for runs being constructed from input batches, and
145  // is false for runs being constructed from intermediate merges.
146  const bool materialize_slots_;
147 
148  // True if the run is sorted. Set to true after an in-memory sort, and initialized to
149  // true for runs resulting from merges.
151 
152  // True if all blocks in the run are pinned.
154 
155  // Sequence of blocks in this run containing the fixed-length portion of the sort tuples
156  // comprising this run. The data pointed to by the var-len slots are in var_len_blocks_.
157  // If is_sorted_ is true, the tuples in fixed_len_blocks_ will be in sorted order.
158  // fixed_len_blocks_[i] is NULL iff it has been deleted.
159  vector<BufferedBlockMgr::Block*> fixed_len_blocks_;
160 
161  // Sequence of blocks in this run containing the var-length data corresponding to the
162  // var-length columns from fixed_len_blocks_. These are reconstructed to be in sorted
163  // order in UnpinAllBlocks().
164  // var_len_blocks_[i] is NULL iff it has been deleted.
165  vector<BufferedBlockMgr::Block*> var_len_blocks_;
166 
167  // If there are var-len slots, an extra pinned block is used to copy out var-len data
168  // into a new sequence of blocks in sorted order. var_len_copy_block_ stores this
169  // extra allocated block.
171 
172  // Number of tuples so far in this run.
173  int64_t num_tuples_;
174 
175  // Number of tuples returned via GetNext(), maintained for debug purposes.
177 
178  // buffered_batch_ is used to return TupleRows to the merger when this run is being
179  // merged. buffered_batch_ is returned in calls to GetNextBatch().
180  scoped_ptr<RowBatch> buffered_batch_;
181 
182  // Members used when a run is read in GetNext()
183  // The index into the fixed_ and var_len_blocks_ vectors of the current blocks being
184  // processed in GetNext().
187 
188  // If true, pin the next fixed and var-len blocks and delete the previous ones
189  // during in the next call to GetNext(). Set during the previous call to GetNext().
190  // Not used if a run is already pinned.
193 
194  // Offset into the current fixed length data block being processed.
196 }; // class Sorter::Run
197 
198 // Sorts a sequence of tuples from a run in place using a provided tuple comparator.
199 // Quick sort is used for sequences of tuples larger that 16 elements, and insertion sort
200 // is used for smaller sequences. The TupleSorter is initialized with a RuntimeState
201 // instance to check for cancellation during an in-memory sort.
203  public:
204  TupleSorter(const TupleRowComparator& less_than_comp, int64_t block_size,
205  int tuple_size, RuntimeState* state);
206 
207  ~TupleSorter();
208 
209  // Performs a quicksort for tuples in 'run' followed by an insertion sort to
210  // finish smaller blocks.
211  // Returns early if stste_->is_cancelled() is true. No status
212  // is returned - the caller must check for cancellation.
213  void Sort(Run* run);
214 
215  private:
216  static const int INSERTION_THRESHOLD = 16;
217 
218  // Helper class used to iterate over tuples in a run during quick sort and insertion
219  // sort.
221  public:
222  TupleIterator(TupleSorter* parent, int64_t index)
223  : parent_(parent),
224  index_(index),
225  current_tuple_(NULL) {
226  DCHECK_GE(index, 0);
227  DCHECK_LE(index, parent_->run_->num_tuples_);
228  // If the run is empty, only index_ and current_tuple_ are initialized.
229  if (parent_->run_->num_tuples_ == 0) return;
230  // If the iterator is initialized to past the end, set up buffer_start_ and
231  // block_index_ as if it pointing to the last tuple. Add tuple_size_ bytes to
232  // current_tuple_, so everything is correct when Prev() is invoked.
233  int past_end_bytes = 0;
234  if (UNLIKELY(index >= parent_->run_->num_tuples_)) {
235  past_end_bytes = parent->tuple_size_;
236  index_ = parent_->run_->num_tuples_;
237  index = index_ - 1;
238  }
239  block_index_ = index / parent->block_capacity_;
240  buffer_start_ = parent->run_->fixed_len_blocks_[block_index_]->buffer();
241  int block_offset = (index % parent->block_capacity_) * parent->tuple_size_;
242  current_tuple_ = buffer_start_ + block_offset + past_end_bytes;
243  }
244 
245  // Sets current_tuple_ to point to the next tuple in the run. Increments
246  // block_index and resets buffer if the next tuple is in the next block.
247  void Next() {
248  current_tuple_ += parent_->tuple_size_;
249  ++index_;
250  if (UNLIKELY(current_tuple_ > buffer_start_ + parent_->last_tuple_block_offset_ &&
251  index_ < parent_->run_->num_tuples_)) {
252  // Don't increment block index, etc. past the end.
253  ++block_index_;
254  DCHECK_LT(block_index_, parent_->run_->fixed_len_blocks_.size());
255  buffer_start_ = parent_->run_->fixed_len_blocks_[block_index_]->buffer();
256  current_tuple_ = buffer_start_;
257  }
258  }
259 
260  // Sets current_tuple to point to the previous tuple in the run. Decrements
261  // block_index and resets buffer if the new tuple is in the previous block.
262  void Prev() {
263  current_tuple_ -= parent_->tuple_size_;
264  --index_;
265  if (UNLIKELY(current_tuple_ < buffer_start_ && index_ >= 0)) {
266  --block_index_;
267  DCHECK_GE(block_index_, 0);
268  buffer_start_ = parent_->run_->fixed_len_blocks_[block_index_]->buffer();
269  current_tuple_ = buffer_start_ + parent_->last_tuple_block_offset_;
270  }
271  }
272 
273  private:
274  friend class TupleSorter;
275 
276  // Pointer to the tuple sorter.
278 
279  // Index of the current tuple in the run.
280  int64_t index_;
281 
282  // Pointer to the current tuple.
283  uint8_t* current_tuple_;
284 
285  // Start of the buffer containing current tuple.
286  uint8_t* buffer_start_;
287 
288  // Index into run_.fixed_len_blocks_ of the block containing the current tuple.
290  };
291 
292  // Size of the tuples in memory.
293  const int tuple_size_;
294 
295  // Number of tuples per block in a run.
296  const int block_capacity_;
297 
298  // Offset in bytes of the last tuple in a block, calculated from block and tuple sizes.
300 
301  // Tuple comparator that returns true if lhs < rhs.
303 
304  // Runtime state instance to check for cancellation. Not owned.
306 
307  // The run to be sorted.
309 
310  // Temporarily allocated space to copy and swap tuples (Both are used in Partition()).
311  // temp_tuple_ points to temp_tuple_buffer_. Owned by this TupleSorter instance.
314  uint8_t* swap_buffer_;
315 
316  // Perform an insertion sort for rows in the range [first, last) in a run.
317  void InsertionSort(const TupleIterator& first, const TupleIterator& last);
318 
319  // Partitions the sequence of tuples in the range [first, last) in a run into two groups
320  // around the pivot tuple - i.e. tuples in first group are <= the pivot, and tuples in
321  // the second group are >= pivot. Tuples are swapped in place to create the groups and
322  // the index to the first element in the second group is returned.
323  // Checks state_->is_cancelled() and returns early with an invalid result if true.
324  TupleIterator Partition(TupleIterator first, TupleIterator last, Tuple* pivot);
325 
326  // Performs a quicksort of rows in the range [first, last) followed by insertion sort
327  // for smaller groups of elements.
328  // Checks state_->is_cancelled() and returns early if true.
329  void SortHelper(TupleIterator first, TupleIterator last);
330 
331  // Swaps tuples pointed to by left and right using the swap buffer.
332  void Swap(uint8_t* left, uint8_t* right);
333 }; // class TupleSorter
334 
335 // Sorter::Run methods
336 Sorter::Run::Run(Sorter* parent, TupleDescriptor* sort_tuple_desc,
337  bool materialize_slots)
338  : sorter_(parent),
339  sort_tuple_desc_(sort_tuple_desc),
340  sort_tuple_size_(sort_tuple_desc->byte_size()),
341  block_size_(parent->block_mgr_->max_block_size()),
342  has_var_len_slots_(sort_tuple_desc->string_slots().size() > 0),
343  materialize_slots_(materialize_slots),
344  is_sorted_(!materialize_slots),
345  is_pinned_(true),
346  var_len_copy_block_(NULL),
347  num_tuples_(0) {
348 }
349 
351  BufferedBlockMgr::Block* block = NULL;
353  sorter_->block_mgr_->GetNewBlock(sorter_->block_mgr_client_, NULL, &block));
354  DCHECK_NOTNULL(block);
355  fixed_len_blocks_.push_back(block);
356  if (has_var_len_slots_) {
358  sorter_->block_mgr_->GetNewBlock(sorter_->block_mgr_client_, NULL, &block));
359  DCHECK_NOTNULL(block);
360  var_len_blocks_.push_back(block);
361  if (!is_sorted_) {
362  RETURN_IF_ERROR(sorter_->block_mgr_->GetNewBlock(
363  sorter_->block_mgr_client_, NULL, &var_len_copy_block_));
364  DCHECK_NOTNULL(var_len_copy_block_);
365  }
366  }
367  if (!is_sorted_) sorter_->initial_runs_counter_->Add(1);
368  return Status::OK;
369 }
370 
371 template <bool has_var_len_data>
372 Status Sorter::Run::AddBatch(RowBatch* batch, int start_index, int* num_processed) {
373  DCHECK(!fixed_len_blocks_.empty());
374  *num_processed = 0;
375  BufferedBlockMgr::Block* cur_fixed_len_block = fixed_len_blocks_.back();
376 
377  DCHECK_EQ(materialize_slots_, !is_sorted_);
378  if (!materialize_slots_) {
379  // If materialize slots is false the run is being constructed for an
380  // intermediate merge and the sort tuples have already been materialized.
381  // The input row should have the same schema as the sort tuples.
382  DCHECK_EQ(batch->row_desc().tuple_descriptors().size(), 1);
383  DCHECK_EQ(batch->row_desc().tuple_descriptors()[0], sort_tuple_desc_);
384  }
385 
386  // Input rows are copied/materialized into tuples allocated in fixed_len_blocks_.
387  // The variable length column data are copied into blocks stored in var_len_blocks_.
388  // Input row processing is split into two loops.
389  // The inner loop processes as many input rows as will fit in cur_fixed_len_block.
390  // The outer loop allocates a new block for fixed-len data if the input batch is
391  // not exhausted.
392 
393  // cur_input_index is the index into the input 'batch' of the current input row being
394  // processed.
395  int cur_input_index = start_index;
396  vector<StringValue*> var_values;
397  var_values.reserve(sort_tuple_desc_->string_slots().size());
398  while (cur_input_index < batch->num_rows()) {
399  // tuples_remaining is the number of tuples to copy/materialize into
400  // cur_fixed_len_block.
401  int tuples_remaining = cur_fixed_len_block->BytesRemaining() / sort_tuple_size_;
402  tuples_remaining = min(batch->num_rows() - cur_input_index, tuples_remaining);
403 
404  for (int i = 0; i < tuples_remaining; ++i) {
405  int total_var_len = 0;
406  TupleRow* input_row = batch->GetRow(cur_input_index);
407  Tuple* new_tuple = cur_fixed_len_block->Allocate<Tuple>(sort_tuple_size_);
408  if (materialize_slots_) {
409  new_tuple->MaterializeExprs<has_var_len_data>(input_row, *sort_tuple_desc_,
410  sorter_->sort_tuple_slot_expr_ctxs_, NULL, &var_values, &total_var_len);
411  if (total_var_len > sorter_->block_mgr_->max_block_size()) {
412  return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, Substitute(
413  "Variable length data in a single tuple larger than block size $0 > $1",
414  total_var_len, sorter_->block_mgr_->max_block_size())));
415  }
416  } else {
417  memcpy(new_tuple, input_row->GetTuple(0), sort_tuple_size_);
418  if (has_var_len_data) {
419  CollectNonNullVarSlots(new_tuple, &var_values, &total_var_len);
420  }
421  }
422 
423  if (has_var_len_data) {
424  DCHECK_GT(var_len_blocks_.size(), 0);
425  BufferedBlockMgr::Block* cur_var_len_block = var_len_blocks_.back();
426  if (cur_var_len_block->BytesRemaining() < total_var_len) {
427  bool added;
428  RETURN_IF_ERROR(TryAddBlock(&var_len_blocks_, &added));
429  if (added) {
430  cur_var_len_block = var_len_blocks_.back();
431  } else {
432  // There was not enough space in the last var-len block for this tuple, and
433  // the run could not be extended. Return the fixed-len allocation and exit.
434  cur_fixed_len_block->ReturnAllocation(sort_tuple_size_);
435  return Status::OK;
436  }
437  }
438 
439  char* var_data_ptr = cur_var_len_block->Allocate<char>(total_var_len);
440  if (materialize_slots_) {
441  CopyVarLenData(var_data_ptr, var_values);
442  } else {
443  int64_t offset = (var_len_blocks_.size() - 1) * block_size_;
444  offset += var_data_ptr - reinterpret_cast<char*>(cur_var_len_block->buffer());
445  CopyVarLenDataConvertOffset(var_data_ptr, offset, var_values);
446  }
447  }
448  ++num_tuples_;
449  ++*num_processed;
450  ++cur_input_index;
451  }
452 
453  // If there are still rows left to process, get a new block for the fixed-length
454  // tuples. If the run is already too long, return.
455  if (cur_input_index < batch->num_rows()) {
456  bool added;
457  RETURN_IF_ERROR(TryAddBlock(&fixed_len_blocks_, &added));
458  if (added) {
459  cur_fixed_len_block = fixed_len_blocks_.back();
460  } else {
461  return Status::OK;
462  }
463  }
464  }
465  return Status::OK;
466 }
467 
469  BOOST_FOREACH(BufferedBlockMgr::Block* block, fixed_len_blocks_) {
470  if (block != NULL) block->Delete();
471  }
472  BOOST_FOREACH(BufferedBlockMgr::Block* block, var_len_blocks_) {
473  if (block != NULL) block->Delete();
474  }
475  if (var_len_copy_block_ != NULL) var_len_copy_block_->Delete();
476 }
477 
479  vector<BufferedBlockMgr::Block*> sorted_var_len_blocks;
480  sorted_var_len_blocks.reserve(var_len_blocks_.size());
481  vector<StringValue*> var_values;
482  int64_t var_data_offset = 0;
483  int total_var_len;
484  var_values.reserve(sort_tuple_desc_->string_slots().size());
485  BufferedBlockMgr::Block* cur_sorted_var_len_block = NULL;
486  if (has_var_len_slots_ && var_len_blocks_.size() > 0) {
487  DCHECK_NOTNULL(var_len_copy_block_);
488  sorted_var_len_blocks.push_back(var_len_copy_block_);
489  cur_sorted_var_len_block = sorted_var_len_blocks.back();
490  } else {
491  DCHECK(var_len_copy_block_ == NULL);
492  }
493 
494  for (int i = 0; i < fixed_len_blocks_.size(); ++i) {
495  BufferedBlockMgr::Block* cur_fixed_block = fixed_len_blocks_[i];
496  if (has_var_len_slots_) {
497  for (int block_offset = 0; block_offset < cur_fixed_block->valid_data_len();
498  block_offset += sort_tuple_size_) {
499  Tuple* cur_tuple =
500  reinterpret_cast<Tuple*>(cur_fixed_block->buffer() + block_offset);
501  CollectNonNullVarSlots(cur_tuple, &var_values, &total_var_len);
502  if (cur_sorted_var_len_block->BytesRemaining() < total_var_len) {
503  bool added;
504  RETURN_IF_ERROR(TryAddBlock(&sorted_var_len_blocks, &added));
505  DCHECK(added);
506  cur_sorted_var_len_block = sorted_var_len_blocks.back();
507  }
508  char* var_data_ptr = cur_sorted_var_len_block->Allocate<char>(total_var_len);
509  var_data_offset = block_size_ * (sorted_var_len_blocks.size() - 1) +
510  (var_data_ptr - reinterpret_cast<char*>(cur_sorted_var_len_block->buffer()));
511  CopyVarLenDataConvertOffset(var_data_ptr, var_data_offset, var_values);
512  }
513  }
514  RETURN_IF_ERROR(cur_fixed_block->Unpin());
515  }
516 
517  if (has_var_len_slots_ && var_len_blocks_.size() > 0) {
518  DCHECK_GT(sorted_var_len_blocks.back()->valid_data_len(), 0);
519  RETURN_IF_ERROR(sorted_var_len_blocks.back()->Unpin());
520  }
521 
522  // Clear var_len_blocks_ and replace with it with the contents of sorted_var_len_blocks
523  BOOST_FOREACH(BufferedBlockMgr::Block* var_block, var_len_blocks_) {
524  RETURN_IF_ERROR(var_block->Delete());
525  }
526  var_len_blocks_.clear();
527  sorted_var_len_blocks.swap(var_len_blocks_);
528  // Set var_len_copy_block_ to NULL since it's now in var_len_blocks_ and is no longer
529  // needed.
530  var_len_copy_block_ = NULL;
531  is_pinned_ = false;
532  return Status::OK;
533 }
534 
536  fixed_len_blocks_index_ = 0;
537  fixed_len_block_offset_ = 0;
538  var_len_blocks_index_ = 0;
539  pin_next_fixed_len_block_ = pin_next_var_len_block_ = false;
540  num_tuples_returned_ = 0;
541 
542  buffered_batch_.reset(new RowBatch(*sorter_->output_row_desc_,
543  sorter_->state_->batch_size(), sorter_->mem_tracker_));
544 
545  // If the run is pinned, merge is not invoked, so buffered_batch_ is not needed
546  // and the individual blocks do not need to be pinned.
547  if (is_pinned_) return Status::OK;
548 
549  // Attempt to pin the first fixed and var-length blocks. In either case, pinning may
550  // fail if the number of reserved blocks is oversubscribed, see IMPALA-1590.
551  if (fixed_len_blocks_.size() > 0) {
552  bool pinned = false;
553  RETURN_IF_ERROR(fixed_len_blocks_[0]->Pin(&pinned));
554  if (!pinned) {
556  status.AddDetail(Substitute(PIN_FAILED_ERROR_MSG, "fixed"));
557  return status;
558  }
559  }
560 
561  if (has_var_len_slots_ && var_len_blocks_.size() > 0) {
562  bool pinned = false;
563  RETURN_IF_ERROR(var_len_blocks_[0]->Pin(&pinned));
564  if (!pinned) {
566  status.AddDetail(Substitute(PIN_FAILED_ERROR_MSG, "variable"));
567  return status;
568  }
569  }
570  return Status::OK;
571 }
572 
574  if (buffered_batch_.get() != NULL) {
575  buffered_batch_->Reset();
576  // Fill more rows into buffered_batch_.
577  bool eos;
578  if (has_var_len_slots_ && !is_pinned_) {
579  RETURN_IF_ERROR(GetNext<true>(buffered_batch_.get(), &eos));
580  if (buffered_batch_->num_rows() == 0 && !eos) {
581  // No rows were filled because GetNext() had to read the next var-len block
582  // Call GetNext() again.
583  RETURN_IF_ERROR(GetNext<true>(buffered_batch_.get(), &eos));
584  }
585  } else {
586  RETURN_IF_ERROR(GetNext<false>(buffered_batch_.get(), &eos));
587  }
588  DCHECK(eos || buffered_batch_->num_rows() > 0);
589  if (eos) {
590  // No rows are filled in GetNext() on eos, so this is safe.
591  DCHECK_EQ(buffered_batch_->num_rows(), 0);
592  buffered_batch_.reset();
593  // The merge is complete. Delete the last blocks in the run.
594  RETURN_IF_ERROR(fixed_len_blocks_.back()->Delete());
595  fixed_len_blocks_[fixed_len_blocks_.size() - 1] = NULL;
596  if (has_var_len_slots_) {
597  RETURN_IF_ERROR(var_len_blocks_.back()->Delete());
598  var_len_blocks_[var_len_blocks_.size() - 1] = NULL;
599  }
600  }
601  }
602 
603  // *output_batch == NULL indicates eos.
604  *output_batch = buffered_batch_.get();
605  return Status::OK;
606 }
607 
608 template <bool convert_offset_to_ptr>
609 Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
610  if (fixed_len_blocks_index_ == fixed_len_blocks_.size()) {
611  *eos = true;
612  DCHECK_EQ(num_tuples_returned_, num_tuples_);
613  return Status::OK;
614  } else {
615  *eos = false;
616  }
617 
618  BufferedBlockMgr::Block* fixed_len_block = fixed_len_blocks_[fixed_len_blocks_index_];
619 
620  if (!is_pinned_) {
621  // Pin the next block and delete the previous if set in the previous call to
622  // GetNext().
623  if (pin_next_fixed_len_block_) {
624  RETURN_IF_ERROR(fixed_len_blocks_[fixed_len_blocks_index_ - 1]->Delete());
625  fixed_len_blocks_[fixed_len_blocks_index_ - 1] = NULL;
626  bool pinned;
627  RETURN_IF_ERROR(fixed_len_block->Pin(&pinned));
628  DCHECK(pinned);
629  pin_next_fixed_len_block_ = false;
630  }
631  if (pin_next_var_len_block_) {
632  RETURN_IF_ERROR(var_len_blocks_[var_len_blocks_index_ - 1]->Delete());
633  var_len_blocks_[var_len_blocks_index_ - 1] = NULL;
634  bool pinned;
635  RETURN_IF_ERROR(var_len_blocks_[var_len_blocks_index_]->Pin(&pinned));
636  DCHECK(pinned);
637  pin_next_var_len_block_ = false;
638  }
639  }
640 
641  // GetNext fills rows into the output batch until a block boundary is reached.
642  while (!output_batch->AtCapacity() &&
643  fixed_len_block_offset_ < fixed_len_block->valid_data_len()) {
644  Tuple* input_tuple = reinterpret_cast<Tuple*>(
645  fixed_len_block->buffer() + fixed_len_block_offset_);
646 
647  if (convert_offset_to_ptr) {
648  // Convert the offsets in the var-len slots in input_tuple back to pointers.
649  const vector<SlotDescriptor*>& var_slots = sort_tuple_desc_->string_slots();
650  for (int i = 0; i < var_slots.size(); ++i) {
651  SlotDescriptor* slot_desc = var_slots[i];
652  if (input_tuple->IsNull(slot_desc->null_indicator_offset())) continue;
653 
654  DCHECK_EQ(slot_desc->type().type, TYPE_STRING);
655  StringValue* value = reinterpret_cast<StringValue*>(
656  input_tuple->GetSlot(slot_desc->tuple_offset()));
657  int64_t data_offset = reinterpret_cast<int64_t>(value->ptr);
658 
659  // data_offset is an offset in bytes from the beginning of the first block
660  // in var_len_blocks_. Convert it into an index into var_len_blocks_ and an
661  // offset within that block.
662  int block_index = data_offset / block_size_;
663  int block_offset = data_offset % block_size_;
664 
665  if (block_index > var_len_blocks_index_) {
666  // We've reached the block boundary for the current var-len block.
667  // This tuple will be returned in the next call to GetNext().
668  DCHECK_EQ(block_index, var_len_blocks_index_ + 1);
669  DCHECK_EQ(block_offset, 0);
670  DCHECK_EQ(i, 0);
671  var_len_blocks_index_ = block_index;
672  pin_next_var_len_block_ = true;
673  break;
674  } else {
675  DCHECK_EQ(block_index, var_len_blocks_index_);
676  // Calculate the address implied by the offset and assign it.
677  value->ptr = reinterpret_cast<char*>(
678  var_len_blocks_[var_len_blocks_index_]->buffer() + block_offset);
679  } // if (block_index > var_len_blocks_index_)
680  } // for (int i = 0; i < var_slots.size(); ++i)
681 
682  // The var-len data is in the next block, so end this call to GetNext().
683  if (pin_next_var_len_block_) break;
684  } // if (convert_offset_to_ptr)
685 
686  int output_row_idx = output_batch->AddRow();
687  output_batch->GetRow(output_row_idx)->SetTuple(0, input_tuple);
688  output_batch->CommitLastRow();
689  fixed_len_block_offset_ += sort_tuple_size_;
690  ++num_tuples_returned_;
691  }
692 
693  if (fixed_len_block_offset_ >= fixed_len_block->valid_data_len()) {
694  pin_next_fixed_len_block_ = true;
695  ++fixed_len_blocks_index_;
696  fixed_len_block_offset_ = 0;
697  }
698 
699  return Status::OK;
700 }
701 
703  vector<StringValue*>* var_len_values, int* total_var_len) {
704  var_len_values->clear();
705  *total_var_len = 0;
706  BOOST_FOREACH(const SlotDescriptor* var_slot, sort_tuple_desc_->string_slots()) {
707  if (!src->IsNull(var_slot->null_indicator_offset())) {
708  StringValue* string_val =
709  reinterpret_cast<StringValue*>(src->GetSlot(var_slot->tuple_offset()));
710  var_len_values->push_back(string_val);
711  *total_var_len += string_val->len;
712  }
713  }
714 }
715 
716 Status Sorter::Run::TryAddBlock(vector<BufferedBlockMgr::Block*>* block_sequence,
717  bool* added) {
718  DCHECK(!block_sequence->empty());
719  BufferedBlockMgr::Block* last_block = block_sequence->back();
720  if (!is_sorted_) {
721  sorter_->sorted_data_size_->Add(last_block->valid_data_len());
722  last_block = NULL;
723  } else {
724  // If the run is sorted, we will unpin the last block and extend the run.
725  }
726 
727  BufferedBlockMgr::Block* new_block;
728  RETURN_IF_ERROR(sorter_->block_mgr_->GetNewBlock(
729  sorter_->block_mgr_client_, last_block, &new_block));
730  if (new_block != NULL) {
731  *added = true;
732  block_sequence->push_back(new_block);
733  } else {
734  *added = false;
735  }
736  return Status::OK;
737 }
738 
739 void Sorter::Run::CopyVarLenData(char* dest, const vector<StringValue*>& var_values) {
740  BOOST_FOREACH(StringValue* var_val, var_values) {
741  memcpy(dest, var_val->ptr, var_val->len);
742  var_val->ptr = dest;
743  dest += var_val->len;
744  }
745 }
746 
748  const vector<StringValue*>& var_values) {
749  BOOST_FOREACH(StringValue* var_val, var_values) {
750  memcpy(dest, var_val->ptr, var_val->len);
751  var_val->ptr = reinterpret_cast<char*>(offset);
752  dest += var_val->len;
753  offset += var_val->len;
754  }
755 }
756 
757 // Sorter::TupleSorter methods.
758 Sorter::TupleSorter::TupleSorter(const TupleRowComparator& comp, int64_t block_size,
759  int tuple_size, RuntimeState* state)
760  : tuple_size_(tuple_size),
761  block_capacity_(block_size / tuple_size),
762  last_tuple_block_offset_(tuple_size * ((block_size / tuple_size) - 1)),
763  less_than_comp_(comp),
764  state_(state) {
765  temp_tuple_buffer_ = new uint8_t[tuple_size];
766  temp_tuple_row_ = reinterpret_cast<TupleRow*>(&temp_tuple_buffer_);
767  swap_buffer_ = new uint8_t[tuple_size];
768 }
769 
771  delete[] temp_tuple_buffer_;
772  delete[] swap_buffer_;
773 }
774 
776  run_ = run;
777  SortHelper(TupleIterator(this, 0), TupleIterator(this, run_->num_tuples_));
778  run->is_sorted_ = true;
779 }
780 
781 // Sort the sequence of tuples from [first, last).
782 // Begin with a sorted sequence of size 1 [first, first+1).
783 // During each pass of the outermost loop, add the next tuple (at position 'i') to
784 // the sorted sequence by comparing it to each element of the sorted sequence
785 // (reverse order) to find its correct place in the sorted sequence, copying tuples
786 // along the way.
788  const TupleIterator& last) {
789  TupleIterator insert_iter = first;
790  insert_iter.Next();
791  for (; insert_iter.index_ < last.index_; insert_iter.Next()) {
792  // insert_iter points to the tuple after the currently sorted sequence that must
793  // be inserted into the sorted sequence. Copy to temp_tuple_row_ since it may be
794  // overwritten by the one at position 'insert_iter - 1'
795  memcpy(temp_tuple_buffer_, insert_iter.current_tuple_, tuple_size_);
796 
797  // 'iter' points to the tuple that temp_tuple_row_ will be compared to.
798  // 'copy_to' is the where iter should be copied to if it is >= temp_tuple_row_.
799  // copy_to always to the next row after 'iter'
800  TupleIterator iter = insert_iter;
801  iter.Prev();
802  uint8_t* copy_to = insert_iter.current_tuple_;
803  while (less_than_comp_(temp_tuple_row_,
804  reinterpret_cast<TupleRow*>(&iter.current_tuple_))) {
805  memcpy(copy_to, iter.current_tuple_, tuple_size_);
806  copy_to = iter.current_tuple_;
807  // Break if 'iter' has reached the first row, meaning that temp_tuple_row_
808  // will be inserted in position 'first'
809  if (iter.index_ <= first.index_) break;
810  iter.Prev();
811  }
812 
813  memcpy(copy_to, temp_tuple_buffer_, tuple_size_);
814  }
815 }
816 
818  TupleIterator last, Tuple* pivot) {
819  // Copy pivot into temp_tuple since it points to a tuple within [first, last).
820  memcpy(temp_tuple_buffer_, pivot, tuple_size_);
821 
822  last.Prev();
823  while (true) {
824  // Search for the first and last out-of-place elements, and swap them.
825  while (less_than_comp_(reinterpret_cast<TupleRow*>(&first.current_tuple_),
826  temp_tuple_row_)) {
827  first.Next();
828  }
829  while (less_than_comp_(temp_tuple_row_,
830  reinterpret_cast<TupleRow*>(&last.current_tuple_))) {
831  last.Prev();
832  }
833 
834  if (first.index_ >= last.index_) break;
835  // Swap first and last tuples.
836  Swap(first.current_tuple_, last.current_tuple_);
837 
838  first.Next();
839  last.Prev();
840  }
841 
842  return first;
843 }
844 
846  if (UNLIKELY(state_->is_cancelled())) return;
847  // Use insertion sort for smaller sequences.
848  while (last.index_ - first.index_ > INSERTION_THRESHOLD) {
849  TupleIterator iter(this, first.index_ + (last.index_ - first.index_)/2);
850  DCHECK_NOTNULL(iter.current_tuple_);
851  // Partition() splits the tuples in [first, last) into two groups (<= pivot
852  // and >= pivot) in-place. 'cut' is the index of the first tuple in the second group.
853  TupleIterator cut = Partition(first, last,
854  reinterpret_cast<Tuple*>(iter.current_tuple_));
855  SortHelper(cut, last);
856  last = cut;
857  if (UNLIKELY(state_->is_cancelled())) return;
858  }
859 
860  InsertionSort(first, last);
861 }
862 
863 inline void Sorter::TupleSorter::Swap(uint8_t* left, uint8_t* right) {
864  memcpy(swap_buffer_, left, tuple_size_);
865  memcpy(left, right, tuple_size_);
866  memcpy(right, swap_buffer_, tuple_size_);
867 }
868 
869 // Sorter methods
870 Sorter::Sorter(const TupleRowComparator& compare_less_than,
871  const vector<ExprContext*>& slot_materialize_expr_ctxs,
872  RowDescriptor* output_row_desc, MemTracker* mem_tracker,
873  RuntimeProfile* profile, RuntimeState* state)
874  : state_(state),
875  compare_less_than_(compare_less_than),
876  block_mgr_(state->block_mgr()),
877  unsorted_run_(NULL),
878  output_row_desc_(output_row_desc),
879  sort_tuple_slot_expr_ctxs_(slot_materialize_expr_ctxs),
880  mem_tracker_(mem_tracker),
881  profile_(profile) {
882 }
883 
885  // Delete all blocks from the block mgr.
886  for (list<Run*>::iterator it = sorted_runs_.begin(); it != sorted_runs_.end(); ++it) {
887  (*it)->DeleteAllBlocks();
888  }
889  for (list<Run*>::iterator it = merging_runs_.begin(); it != merging_runs_.end(); ++it) {
890  (*it)->DeleteAllBlocks();
891  }
894 }
895 
897  DCHECK(unsorted_run_ == NULL) << "Already initialized";
898  TupleDescriptor* sort_tuple_desc = output_row_desc_->tuple_descriptors()[0];
899  has_var_len_slots_ = sort_tuple_desc->string_slots().size() > 0;
901  block_mgr_->max_block_size(), sort_tuple_desc->byte_size(), state_));
902  unsorted_run_ = obj_pool_.Add(new Run(this, sort_tuple_desc, true));
903 
904  initial_runs_counter_ = ADD_COUNTER(profile_, "InitialRunsCreated", TUnit::UNIT);
905  num_merges_counter_ = ADD_COUNTER(profile_, "TotalMergesPerformed", TUnit::UNIT);
906  in_mem_sort_timer_ = ADD_TIMER(profile_, "InMemorySortTime");
907  sorted_data_size_ = ADD_COUNTER(profile_, "SortDataSize", TUnit::BYTES);
908 
909  int min_blocks_required = BLOCKS_REQUIRED_FOR_MERGE;
910  // Fixed and var-length blocks are separate, so we need BLOCKS_REQUIRED_FOR_MERGE
911  // blocks for both if there is var-length data.
912  if (output_row_desc_->tuple_descriptors()[0]->string_slots().size() > 0) {
913  min_blocks_required *= 2;
914  }
917 
918  DCHECK_NOTNULL(unsorted_run_);
920  return Status::OK;
921 }
922 
924  DCHECK_NOTNULL(unsorted_run_);
925  DCHECK_NOTNULL(batch);
926  int num_processed = 0;
927  int cur_batch_index = 0;
928  while (cur_batch_index < batch->num_rows()) {
929  if (has_var_len_slots_) {
931  batch, cur_batch_index, &num_processed));
932  } else {
934  batch, cur_batch_index, &num_processed));
935  }
936  cur_batch_index += num_processed;
937  if (cur_batch_index < batch->num_rows()) {
938  // The current run is full. Sort it and begin the next one.
940  RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllBlocks());
942  new Run(this, output_row_desc_->tuple_descriptors()[0], true));
943  unsorted_run_->Init();
944  }
945  }
946  return Status::OK;
947 }
948 
950  // Sort the tuples accumulated so far in the current run.
952 
953  if (sorted_runs_.size() == 1) {
954  // The entire input fit in one run. Read sorted rows in GetNext() directly
955  // from the sorted run.
956  sorted_runs_.back()->PrepareRead();
957  } else {
958  // At least one merge is necessary.
959  int blocks_per_run = has_var_len_slots_ ? 2 : 1;
960  int min_buffers_for_merge = sorted_runs_.size() * blocks_per_run;
961  // Check if the final run needs to be unpinned.
962  bool unpinned_final = false;
963  if (block_mgr_->num_free_buffers() < min_buffers_for_merge - blocks_per_run) {
964  // Number of available buffers is less than the size of the final run and
965  // the buffers needed to read the remainder of the runs in memory.
966  // Unpin the final run.
967  RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllBlocks());
968  unpinned_final = true;
969  } else {
970  // No need to unpin the current run. There is enough memory to stream the
971  // other runs.
972  // TODO: revisit. It might be better to unpin some from this run if it means
973  // we can get double buffering in the other runs.
974  }
975 
976  // For an intermediate merge, intermediate_merge_batch contains deep-copied rows from
977  // the input runs. If (unmerged_sorted_runs_.size() > max_runs_per_final_merge),
978  // one or more intermediate merges are required.
979  // TODO: Attempt to allocate more memory before doing intermediate merges. This may
980  // be possible if other operators have relinquished memory after the sort has built
981  // its runs.
982  if (min_buffers_for_merge > block_mgr_->available_allocated_buffers()) {
983  DCHECK(unpinned_final);
985  }
986 
987  // Create the final merger.
988  CreateMerger(sorted_runs_.size());
989  }
990  return Status::OK;
991 }
992 
993 Status Sorter::GetNext(RowBatch* output_batch, bool* eos) {
994  if (sorted_runs_.size() == 1) {
995  DCHECK(sorted_runs_.back()->is_pinned_);
996  // In this case, only TupleRows are copied into output_batch. Sorted tuples are left
997  // in the pinned blocks in the single sorted run.
998  RETURN_IF_ERROR(sorted_runs_.back()->GetNext<false>(output_batch, eos));
999  } else {
1000  // In this case, rows are deep copied into output_batch.
1001  RETURN_IF_ERROR(merger_->GetNext(output_batch, eos));
1002  }
1003  return Status::OK;
1004 }
1005 
1008  if (last_block->valid_data_len() > 0) {
1009  sorted_data_size_->Add(last_block->valid_data_len());
1010  } else {
1011  RETURN_IF_ERROR(last_block->Delete());
1012  unsorted_run_->fixed_len_blocks_.pop_back();
1013  }
1014  if (has_var_len_slots_) {
1015  DCHECK_NOTNULL(unsorted_run_->var_len_copy_block_);
1016  last_block = unsorted_run_->var_len_blocks_.back();
1017  if (last_block->valid_data_len() > 0) {
1018  sorted_data_size_->Add(last_block->valid_data_len());
1019  } else {
1020  RETURN_IF_ERROR(last_block->Delete());
1021  unsorted_run_->var_len_blocks_.pop_back();
1022  if (unsorted_run_->var_len_blocks_.size() == 0) {
1025  }
1026  }
1027  }
1028  {
1032  }
1033  sorted_runs_.push_back(unsorted_run_);
1034  unsorted_run_ = NULL;
1035  return Status::OK;
1036 }
1037 
1039  RowDescriptor* row_desc, int merge_batch_size) {
1040  bool has_var_len_slots = row_desc->tuple_descriptors()[0]->string_slots().size() > 0;
1041  int blocks_per_run = has_var_len_slots ? 2 : 1;
1042  int max_input_runs_per_merge = (available_blocks / blocks_per_run) - 1;
1043  // During a merge, the batches corresponding to the input runs contain only TupleRows.
1044  // (The data itself is in pinned blocks held by the run)
1045  uint64_t input_batch_mem =
1046  merge_batch_size * sizeof(Tuple*) * max_input_runs_per_merge;
1047  // Since rows are deep copied into the output batch for the merger, use a pessimistic
1048  // estimate of the memory required.
1049  uint64_t output_batch_mem = RowBatch::AT_CAPACITY_MEM_USAGE;
1050 
1051  return input_batch_mem + output_batch_mem;
1052 }
1053 
1055  int blocks_per_run = has_var_len_slots_ ? 2 : 1;
1056  int max_runs_per_final_merge =
1057  block_mgr_->available_allocated_buffers() / blocks_per_run;
1058 
1059  // During an intermediate merge, blocks from the output sorted run will have to be
1060  // pinned.
1061  int max_runs_per_intermediate_merge = max_runs_per_final_merge - 1;
1062  DCHECK_GT(max_runs_per_intermediate_merge, 1);
1063  // For an intermediate merge, intermediate_merge_batch contains deep-copied rows from
1064  // the input runs. If (sorted_runs_.size() > max_runs_per_final_merge),
1065  // one or more intermediate merges are required.
1066  scoped_ptr<RowBatch> intermediate_merge_batch;
1067  while (sorted_runs_.size() > max_runs_per_final_merge) {
1068  // An intermediate merge adds one merge to unmerged_sorted_runs_.
1069  // Merging 'runs - (max_runs_final_ - 1)' number of runs is sifficient to guarantee
1070  // that the final merge can be performed.
1071  int num_runs_to_merge = min<int>(max_runs_per_intermediate_merge,
1072  sorted_runs_.size() - max_runs_per_intermediate_merge);
1073  CreateMerger(num_runs_to_merge);
1074  RowBatch intermediate_merge_batch(*output_row_desc_, state_->batch_size(),
1075  mem_tracker_);
1076  // merged_run is the new sorted run that is produced by the intermediate merge.
1077  Run* merged_run = obj_pool_.Add(
1078  new Run(this, output_row_desc_->tuple_descriptors()[0], false));
1079  RETURN_IF_ERROR(merged_run->Init());
1080  bool eos = false;
1081  while (!eos) {
1082  // Copy rows into the new run until done.
1083  int num_copied;
1085  RETURN_IF_ERROR(merger_->GetNext(&intermediate_merge_batch, &eos));
1086  Status ret_status;
1087  if (has_var_len_slots_) {
1088  ret_status = merged_run->AddBatch<true>(&intermediate_merge_batch,
1089  0, &num_copied);
1090  } else {
1091  ret_status = merged_run->AddBatch<false>(&intermediate_merge_batch,
1092  0, &num_copied);
1093  }
1094  if (!ret_status.ok()) return ret_status;
1095 
1096  DCHECK_EQ(num_copied, intermediate_merge_batch.num_rows());
1097  intermediate_merge_batch.Reset();
1098  }
1099 
1100  BufferedBlockMgr::Block* last_block = merged_run->fixed_len_blocks_.back();
1101  if (last_block->valid_data_len() > 0) {
1102  RETURN_IF_ERROR(last_block->Unpin());
1103  } else {
1104  RETURN_IF_ERROR(last_block->Delete());
1105  merged_run->fixed_len_blocks_.pop_back();
1106  }
1107  if (has_var_len_slots_) {
1108  last_block = merged_run->var_len_blocks_.back();
1109  if (last_block->valid_data_len() > 0) {
1110  RETURN_IF_ERROR(last_block->Unpin());
1111  } else {
1112  RETURN_IF_ERROR(last_block->Delete());
1113  merged_run->var_len_blocks_.pop_back();
1114  }
1115  }
1116  merged_run->is_pinned_ = false;
1117  sorted_runs_.push_back(merged_run);
1118  }
1119 
1120  return Status::OK;
1121 }
1122 
1124  DCHECK_GT(num_runs, 1);
1125 
1126  // Clean up the runs from the previous merge.
1127  for (list<Run*>::iterator it = merging_runs_.begin(); it != merging_runs_.end(); ++it) {
1128  (*it)->DeleteAllBlocks();
1129  }
1130  merging_runs_.clear();
1131  merger_.reset(
1133 
1134  vector<function<Status (RowBatch**)> > merge_runs;
1135  merge_runs.reserve(num_runs);
1136  for (int i = 0; i < num_runs; ++i) {
1137  Run* run = sorted_runs_.front();
1138  run->PrepareRead();
1139  // Run::GetNextBatch() is used by the merger to retrieve a batch of rows to merge
1140  // from this run.
1141  merge_runs.push_back(bind<Status>(mem_fn(&Run::GetNextBatch), run, _1));
1142  sorted_runs_.pop_front();
1143  merging_runs_.push_back(run);
1144  }
1145  RETURN_IF_ERROR(merger_->Prepare(merge_runs));
1146 
1148  return Status::OK;
1149 }
1150 
1151 } // namespace impala
void InsertionSort(const TupleIterator &first, const TupleIterator &last)
Definition: sorter.cc:787
std::vector< ExprContext * > sort_tuple_slot_expr_ctxs_
Definition: sorter.h:177
RuntimeProfile::Counter * num_merges_counter_
Definition: sorter.h:197
void Sort(Run *run)
Definition: sorter.cc:775
const TupleRowComparator less_than_comp_
Definition: sorter.cc:302
int64_t num_tuples_
Definition: sorter.cc:173
Status AddBatch(RowBatch *batch, int start_index, int *num_processed)
Definition: sorter.cc:372
void CopyVarLenData(char *dest, const vector< StringValue * > &var_values)
Definition: sorter.cc:739
Status GetNextBatch(RowBatch **sorted_batch)
Definition: sorter.cc:573
int num_rows() const
Definition: row-batch.h:215
Status SortRun()
Definition: sorter.cc:1006
static uint64_t EstimateMergeMem(uint64_t available_blocks, RowDescriptor *row_desc, int merge_batch_size)
Definition: sorter.cc:1038
Tuple * GetTuple(int tuple_idx)
Definition: tuple-row.h:30
Status GetNext(RowBatch **batch, RuntimeState *state)
const bool has_var_len_slots_
Definition: sorter.cc:141
int var_len_blocks_index_
Definition: sorter.cc:186
Status Pin(bool *pinned, Block *release_block=NULL, bool unpin=true)
int available_allocated_buffers() const
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
T * Allocate(int size)
Allocates the specified number of bytes from this block.
const RowDescriptor & row_desc() const
Definition: row-batch.h:218
void DeleteAllBlocks()
Definition: sorter.cc:468
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
RuntimeState *const state_
Definition: sorter.cc:305
TupleSorter(const TupleRowComparator &less_than_comp, int64_t block_size, int tuple_size, RuntimeState *state)
Definition: sorter.cc:758
#define ADD_TIMER(profile, name)
Status RegisterClient(int num_reserved_buffers, MemTracker *tracker, RuntimeState *state, Client **client)
Status Init()
Definition: sorter.cc:896
Status CreateMerger(int num_runs)
Definition: sorter.cc:1123
int64_t max_block_size() const
bool AtCapacity()
Definition: row-batch.h:120
void * GetSlot(int offset)
Definition: tuple.h:118
vector< BufferedBlockMgr::Block * > fixed_len_blocks_
Definition: sorter.cc:159
int byte_size() const
Definition: descriptors.h:300
void AddDetail(const std::string &msg)
Add a detail string. Calling this method is only defined on a non-OK message.
Definition: status.cc:166
const int BLOCKS_REQUIRED_FOR_MERGE
Definition: sorter.cc:31
const NullIndicatorOffset & null_indicator_offset() const
Definition: descriptors.h:89
RuntimeProfile * profile_
Runtime profile and counters for this sorter instance.
Definition: sorter.h:195
RuntimeState *const state_
Runtime state instance used to check for cancellation. Not owned.
Definition: sorter.h:145
#define SCOPED_TIMER(c)
TupleRowComparator compare_less_than_
In memory sorter and less-than comparator.
Definition: sorter.h:148
TupleIterator(TupleSorter *parent, int64_t index)
Definition: sorter.cc:222
RuntimeProfile::Counter * in_mem_sort_timer_
Definition: sorter.h:198
BufferedBlockMgr::Block * var_len_copy_block_
Definition: sorter.cc:170
bool pin_next_fixed_len_block_
Definition: sorter.cc:191
bool IsNull(const NullIndicatorOffset &offset) const
Definition: tuple.h:112
int BytesRemaining() const
Return the number of remaining bytes that can be allocated in this block.
const std::vector< SlotDescriptor * > & string_slots() const
Definition: descriptors.h:303
const Sorter * sorter_
Definition: sorter.cc:131
boost::scoped_ptr< TupleSorter > in_mem_tuple_sorter_
Definition: sorter.h:149
void ClearReservations(Client *client)
Clears all reservations for this client.
int64_t valid_data_len() const
Return the number of bytes allocated in this block.
const TupleDescriptor * sort_tuple_desc_
Definition: sorter.cc:135
static const int AT_CAPACITY_MEM_USAGE
Definition: row-batch.h:222
void Swap(uint8_t *left, uint8_t *right)
Definition: sorter.cc:863
RuntimeProfile::Counter * sorted_data_size_
Definition: sorter.h:199
const int last_tuple_block_offset_
Definition: sorter.cc:299
PrimitiveType type
Definition: types.h:60
Status InputDone()
Definition: sorter.cc:949
boost::scoped_ptr< SortedRunMerger > merger_
Definition: sorter.h:185
MemTracker * mem_tracker_
Mem tracker for batches created during merge. Not owned by Sorter.
Definition: sorter.h:180
TupleRow * temp_tuple_row_
Definition: sorter.cc:312
const ColumnType & type() const
Definition: descriptors.h:78
Status GetNext(RowBatch *output_batch, bool *eos)
Definition: sorter.cc:609
#define RETURN_IF_CANCELLED(state)
scoped_ptr< RowBatch > buffered_batch_
Definition: sorter.cc:180
vector< BufferedBlockMgr::Block * > var_len_blocks_
Definition: sorter.cc:165
Status AddBatch(RowBatch *batch)
Adds a batch of input rows to the current unsorted run.
Definition: sorter.cc:923
int fixed_len_block_offset_
Definition: sorter.cc:195
#define ADD_COUNTER(profile, name, unit)
RuntimeProfile::Counter * initial_runs_counter_
Definition: sorter.h:196
RowDescriptor * output_row_desc_
Definition: sorter.h:173
const int sort_tuple_size_
Definition: sorter.cc:138
This class is thread-safe.
Definition: mem-tracker.h:61
void CommitLastRow()
Definition: row-batch.h:109
void CopyVarLenDataConvertOffset(char *dest, int64_t offset, const vector< StringValue * > &var_values)
Definition: sorter.cc:747
const RowDescriptor & row_desc() const
int batch_size() const
Definition: runtime-state.h:98
Status GetNext(RowBatch *batch, bool *eos)
Get the next batch of sorted output rows from the sorter.
Definition: sorter.cc:993
static const Status MEM_LIMIT_EXCEEDED
Definition: status.h:89
bool is_cancelled() const
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
Definition: descriptors.h:412
int64_t num_tuples_returned_
Definition: sorter.cc:176
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
const string PIN_FAILED_ERROR_MSG
Definition: sorter.cc:35
#define UNLIKELY(expr)
Definition: compiler-util.h:33
std::list< Run * > sorted_runs_
Definition: sorter.h:169
int fixed_len_blocks_index_
Definition: sorter.cc:185
Status TryAddBlock(vector< BufferedBlockMgr::Block * > *block_sequence, bool *added)
Definition: sorter.cc:716
static const Status OK
Definition: status.h:87
uint8_t * temp_tuple_buffer_
Definition: sorter.cc:313
Note that Init() must be called right after the constructor.
Definition: sorter.h:84
BufferedBlockMgr::Client * block_mgr_client_
Handle to block mgr to make allocations from.
Definition: sorter.h:155
uint8_t offset[7 *64-sizeof(uint64_t)]
int tuple_offset() const
Definition: descriptors.h:88
Sorter(const TupleRowComparator &compare_less_than, const std::vector< ExprContext * > &sort_tuple_slot_expr_ctxs, RowDescriptor *output_row_desc, MemTracker *mem_tracker, RuntimeProfile *profile, RuntimeState *state)
Definition: sorter.cc:870
std::list< Run * > merging_runs_
Definition: sorter.h:189
Status UnpinAllBlocks()
Definition: sorter.cc:478
void MaterializeExprs(TupleRow *row, const TupleDescriptor &desc, const std::vector< ExprContext * > &materialize_expr_ctxs, MemPool *pool, std::vector< StringValue * > *non_null_var_len_values=NULL, int *total_var_len=NULL)
virtual void Add(int64_t delta)
Status PrepareRead()
Definition: sorter.cc:535
Status MergeIntermediateRuns()
Definition: sorter.cc:1054
const int block_size_
Definition: sorter.cc:139
ObjectPool obj_pool_
Pool of owned Run objects.
Definition: sorter.h:192
Run * unsorted_run_
Definition: sorter.h:164
bool has_var_len_slots_
True if the tuples to be sorted have var-length slots.
Definition: sorter.h:158
BufferedBlockMgr * block_mgr_
Block manager object used to allocate, pin and release runs. Not owned by Sorter. ...
Definition: sorter.h:152
const bool materialize_slots_
Definition: sorter.cc:146
void CollectNonNullVarSlots(Tuple *src, vector< StringValue * > *var_len_values, int *total_var_len)
Definition: sorter.cc:702
void SortHelper(TupleIterator first, TupleIterator last)
Definition: sorter.cc:845
TupleIterator Partition(TupleIterator first, TupleIterator last, Tuple *pivot)
Definition: sorter.cc:817
void ReturnAllocation(int size)
Return size bytes from the most recent allocation.
bool pin_next_var_len_block_
Definition: sorter.cc:192