Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
sorted-run-merger.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 #include "exprs/expr.h"
17 #include "runtime/descriptors.h"
18 #include "runtime/row-batch.h"
19 #include "runtime/sorter.h"
20 #include "runtime/tuple-row.h"
21 #include "util/runtime-profile.h"
22 
23 #include "common/names.h"
24 
25 namespace impala {
26 
27 // BatchedRowSupplier returns individual rows in a batch obtained from a sorted input
28 // run (a RunBatchSupplier). Used as the heap element in the min heap maintained by the
29 // merger.
30 // Next() advances the row supplier to the next row in the input batch and retrieves
31 // the next batch from the input if the current input batch is exhausted. Transfers
32 // ownership from the current input batch to an output batch if requested.
34  public:
35  // Construct an instance from a sorted input run.
37  : sorted_run_(sorted_run),
38  input_row_batch_(NULL),
40  parent_(parent) {
41  }
42 
43  // Retrieves the first batch of sorted rows from the run.
44  Status Init(bool* done) {
46  if (input_row_batch_ == NULL) {
47  *done = true;
48  return Status::OK;
49  }
50  RETURN_IF_ERROR(Next(NULL, done));
51  return Status::OK;
52  }
53 
54  // Increment the current row index. If the current input batch is exhausted fetch the
55  // next one from the sorted run. Transfer ownership to transfer_batch if not NULL.
56  Status Next(RowBatch* transfer_batch, bool* done) {
57  DCHECK_NOTNULL(input_row_batch_);
59  if (input_row_batch_index_ < input_row_batch_->num_rows()) {
60  *done = false;
61  } else {
63  if (transfer_batch != NULL) {
65  }
66 
68  DCHECK(input_row_batch_ == NULL || input_row_batch_->num_rows() > 0);
69  *done = input_row_batch_ == NULL;
71  }
72  return Status::OK;
73  }
74 
75  TupleRow* current_row() const {
77  }
78 
79  private:
80  friend class SortedRunMerger;
81 
82  // The run from which this object supplies rows.
84 
85  // The current input batch being processed.
87 
88  // Index into input_row_batch_ of the current row being processed.
90 
91  // The parent merger instance.
93 };
94 
95 void SortedRunMerger::Heapify(int parent_index) {
96  int left_index = 2 * parent_index + 1;
97  int right_index = left_index + 1;
98  if (left_index >= min_heap_.size()) return;
99  int least_child;
100  // Find the least child of parent.
101  if (right_index >= min_heap_.size() ||
102  compare_less_than_(min_heap_[left_index]->current_row(),
103  min_heap_[right_index]->current_row())) {
104  least_child = left_index;
105  } else {
106  least_child = right_index;
107  }
108 
109  // If the parent is out of place, swap it with the least child and invoke
110  // Heapify recursively.
111  if (compare_less_than_(min_heap_[least_child]->current_row(),
112  min_heap_[parent_index]->current_row())) {
113  iter_swap(min_heap_.begin() + least_child, min_heap_.begin() + parent_index);
114  Heapify(least_child);
115  }
116 }
117 
119  RowDescriptor* row_desc, RuntimeProfile* profile, bool deep_copy_input)
120  : compare_less_than_(compare_less_than),
121  input_row_desc_(row_desc),
122  deep_copy_input_(deep_copy_input) {
123  get_next_timer_ = ADD_TIMER(profile, "MergeGetNext");
124  get_next_batch_timer_ = ADD_TIMER(profile, "MergeGetNextBatch");
125 }
126 
127 Status SortedRunMerger::Prepare(const vector<RunBatchSupplier>& input_runs) {
128  DCHECK_EQ(min_heap_.size(), 0);
129  min_heap_.reserve(input_runs.size());
130  BOOST_FOREACH(const RunBatchSupplier& input_run, input_runs) {
131  BatchedRowSupplier* new_elem = pool_.Add(new BatchedRowSupplier(this, input_run));
132  bool empty = false;
133  RETURN_IF_ERROR(new_elem->Init(&empty));
134  if (!empty) min_heap_.push_back(new_elem);
135  }
136 
137  // Construct the min heap from the sorted runs.
138  int last_parent = (min_heap_.size() / 2) - 1;
139  for (int i = last_parent; i >= 0; --i) {
140  Heapify(i);
141  }
142 
143  return Status::OK;
144 }
145 
146 Status SortedRunMerger::GetNext(RowBatch* output_batch, bool* eos) {
148  if (min_heap_.empty()) {
149  *eos = true;
150  return Status::OK;
151  }
152 
153  while (!output_batch->AtCapacity()) {
154  BatchedRowSupplier* min = min_heap_[0];
155  int output_row_index = output_batch->AddRow();
156  TupleRow* output_row = output_batch->GetRow(output_row_index);
157  if (deep_copy_input_) {
158  min->current_row()->DeepCopy(output_row, input_row_desc_->tuple_descriptors(),
159  output_batch->tuple_data_pool(), false);
160  } else {
161  // Simply copy tuple pointers if deep_copy is false.
162  memcpy(output_row, min->current_row(),
163  input_row_desc_->tuple_descriptors().size() * sizeof(Tuple*));
164  }
165 
166  output_batch->CommitLastRow();
167 
168  bool min_run_complete = false;
169  // Advance to the next element in min. output_batch is supplied to transfer
170  // resource ownership if the input batch in min is exhausted.
171  RETURN_IF_ERROR(min->Next(deep_copy_input_ ? NULL : output_batch,
172  &min_run_complete));
173  if (min_run_complete) {
174  // Remove the element from the heap.
175  iter_swap(min_heap_.begin(), min_heap_.end() - 1);
176  min_heap_.pop_back();
177  if (min_heap_.empty()) break;
178  }
179 
180  Heapify(0);
181  }
182 
183  *eos = min_heap_.empty();
184  return Status::OK;
185 }
186 
187 }
RuntimeProfile::Counter * get_next_timer_
Times calls to GetNext().
void Heapify(int parent_index)
int num_rows() const
Definition: row-batch.h:215
std::vector< BatchedRowSupplier * > min_heap_
BatchedRowSupplier(SortedRunMerger *parent, const RunBatchSupplier &sorted_run)
Status GetNext(RowBatch *output_batch, bool *eos)
Return the next batch of sorted rows from this merger.
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
#define ADD_TIMER(profile, name)
boost::function< Status(RowBatch **)> RunBatchSupplier
bool AtCapacity()
Definition: row-batch.h:120
ObjectPool pool_
Pool of BatchedRowSupplier instances.
Status Prepare(const std::vector< RunBatchSupplier > &input_runs)
void TransferResourceOwnership(RowBatch *dest)
Definition: row-batch.cc:243
Status Next(RowBatch *transfer_batch, bool *done)
void CommitLastRow()
Definition: row-batch.h:109
const RowDescriptor & row_desc() const
MemPool * tuple_data_pool()
Definition: row-batch.h:148
RuntimeProfile::Counter * get_next_batch_timer_
Times calls to get the next batch of rows from the input run.
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
Definition: descriptors.h:412
SortedRunMerger(const TupleRowComparator &compare_less_than, RowDescriptor *row_desc, RuntimeProfile *profile, bool deep_copy_input)
bool deep_copy_input_
True if rows must be deep copied into the output batch.
TupleRow * DeepCopy(const std::vector< TupleDescriptor * > &descs, MemPool *pool)
Create a deep copy of this TupleRow. DeepCopy will allocate from the pool.
Definition: tuple-row.h:39
static const Status OK
Definition: status.h:87
TupleRowComparator compare_less_than_
Row comparator. Returns true if lhs < rhs.
RowDescriptor * input_row_desc_