Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
sort-node.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/sort-node.h"
16 #include "exec/sort-exec-exprs.h"
17 #include "runtime/row-batch.h"
18 #include "runtime/runtime-state.h"
20 
21 #include "common/names.h"
22 
23 namespace impala {
24 
25 SortNode::SortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
26  : ExecNode(pool, tnode, descs),
27  offset_(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
28  num_rows_skipped_(0) {
29 }
30 
32 }
33 
34 Status SortNode::Init(const TPlanNode& tnode) {
36  RETURN_IF_ERROR(sort_exec_exprs_.Init(tnode.sort_node.sort_info, pool_));
37  is_asc_order_ = tnode.sort_node.sort_info.is_asc_order;
38  nulls_first_ = tnode.sort_node.sort_info.nulls_first;
39  return Status::OK;
40 }
41 
43  SCOPED_TIMER(runtime_profile_->total_time_counter());
48  return Status::OK;
49 }
50 
52  SCOPED_TIMER(runtime_profile_->total_time_counter());
55  RETURN_IF_CANCELLED(state);
57  RETURN_IF_ERROR(child(0)->Open(state));
58 
59  TupleRowComparator less_than(
62 
63  // Create and initialize the external sort impl object
64  sorter_.reset(new Sorter(
67  RETURN_IF_ERROR(sorter_->Init());
68 
69  // The child has been opened and the sorter created. Sort the input.
70  // The final merge is done on-demand as rows are requested in GetNext().
71  RETURN_IF_ERROR(SortInput(state));
72 
73  // The child can be closed at this point.
74  child(0)->Close(state);
75  return Status::OK;
76 }
77 
78 Status SortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
79  SCOPED_TIMER(runtime_profile_->total_time_counter());
80  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
81  RETURN_IF_CANCELLED(state);
83 
84  if (ReachedLimit()) {
85  *eos = true;
86  return Status::OK;
87  } else {
88  *eos = false;
89  }
90 
91  DCHECK_EQ(row_batch->num_rows(), 0);
92  RETURN_IF_ERROR(sorter_->GetNext(row_batch, eos));
93  while ((num_rows_skipped_ < offset_)) {
94  num_rows_skipped_ += row_batch->num_rows();
95  // Throw away rows in the output batch until the offset is skipped.
96  int rows_to_keep = num_rows_skipped_ - offset_;
97  if (rows_to_keep > 0) {
98  row_batch->CopyRows(0, row_batch->num_rows() - rows_to_keep, rows_to_keep);
99  row_batch->set_num_rows(rows_to_keep);
100  } else {
101  row_batch->set_num_rows(0);
102  }
103  if (rows_to_keep > 0 || *eos) break;
104  RETURN_IF_ERROR(sorter_->GetNext(row_batch, eos));
105  }
106 
107  num_rows_returned_ += row_batch->num_rows();
108  if (ReachedLimit()) {
109  row_batch->set_num_rows(row_batch->num_rows() - (num_rows_returned_ - limit_));
110  *eos = true;
111  }
112 
114 
115  return Status::OK;
116 }
117 
119  DCHECK(false) << "NYI";
120  return Status("NYI");
121 }
122 
124  if (is_closed()) return;
125  sort_exec_exprs_.Close(state);
126  sorter_.reset();
127  ExecNode::Close(state);
128 }
129 
130 void SortNode::DebugString(int indentation_level, stringstream* out) const {
131  *out << string(indentation_level * 2, ' ');
132  *out << "SortNode("
134  for (int i = 0; i < is_asc_order_.size(); ++i) {
135  *out << (i > 0 ? " " : "")
136  << (is_asc_order_[i] ? "asc" : "desc")
137  << " nulls " << (nulls_first_[i] ? "first" : "last");
138  }
139  ExecNode::DebugString(indentation_level, out);
140  *out << ")";
141 }
142 
144  RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
145  bool eos;
146  do {
147  batch.Reset();
148  RETURN_IF_ERROR(child(0)->GetNext(state, &batch, &eos));
149  RETURN_IF_ERROR(sorter_->AddBatch(&batch));
150  RETURN_IF_CANCELLED(state);
152  } while(!eos);
153 
154  RETURN_IF_ERROR(sorter_->InputDone());
155  return Status::OK;
156 }
157 
158 }
boost::scoped_ptr< Sorter > sorter_
Object used for external sorting.
Definition: sort-node.h:63
int num_rows() const
Definition: row-batch.h:215
int64_t num_rows_skipped_
Definition: sort-node.h:60
int64_t num_rows_returned_
Definition: exec-node.h:223
virtual Status Prepare(RuntimeState *state)
Definition: sort-node.cc:42
MemTracker * mem_tracker()
Definition: exec-node.h:162
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
virtual Status Init(const TPlanNode &tnode)
Definition: exec-node.cc:124
RowDescriptor row_descriptor_
Definition: exec-node.h:215
virtual Status Open(RuntimeState *state)
Definition: sort-node.cc:51
void CopyRows(int dest, int src, int num_rows)
Definition: row-batch.h:179
const RowDescriptor & row_desc() const
Definition: exec-node.h:156
virtual Status Reset(RuntimeState *state)
Definition: sort-node.cc:118
std::vector< bool > is_asc_order_
Definition: sort-node.h:67
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Definition: exec-node.cc:378
bool ReachedLimit()
Definition: exec-node.h:159
Status Open(RuntimeState *state)
Open all expressions used for sorting and tuple materialization.
#define SCOPED_TIMER(c)
virtual Status Init(const TPlanNode &tnode)
Definition: sort-node.cc:34
int64_t limit_
Definition: exec-node.h:222
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
Definition: exec-node.cc:345
MemTracker * expr_mem_tracker()
Definition: exec-node.h:163
Status SortInput(RuntimeState *state)
Fetch input rows and feed them to the sorter until the input is exhausted.
Definition: sort-node.cc:143
void Reset()
Resets the row batch, returning all resources it has accumulated.
Definition: row-batch.cc:224
void set_num_rows(int num_rows)
Definition: row-batch.h:113
virtual void Close(RuntimeState *state)
Definition: sort-node.cc:123
#define RETURN_IF_CANCELLED(state)
ObjectPool pool
Status Init(const TSortInfo &sort_info, ObjectPool *pool)
Initialize the expressions from a TSortInfo using the specified pool.
virtual Status Prepare(RuntimeState *state)
Definition: exec-node.cc:130
const std::vector< ExprContext * > & lhs_ordering_expr_ctxs() const
Can only be used after calling Prepare()
void AddExprCtxsToFree(const std::vector< ExprContext * > &ctxs)
virtual Status QueryMaintenance(RuntimeState *state)
Definition: exec-node.cc:401
bool is_closed()
Definition: exec-node.h:242
#define COUNTER_SET(c, v)
int batch_size() const
Definition: runtime-state.h:98
RuntimeProfile::Counter * rows_returned_counter_
Definition: exec-node.h:226
ExecNode * child(int i)
Definition: exec-node.h:241
int64_t offset_
Number of rows to skip.
Definition: sort-node.h:59
SortExecExprs sort_exec_exprs_
Expressions and parameters used for tuple materialization and tuple comparison.
Definition: sort-node.h:66
SortNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
Definition: sort-node.cc:25
static const Status OK
Definition: status.h:87
ObjectPool * pool_
Definition: exec-node.h:211
Note that Init() must be called right after the constructor.
Definition: sorter.h:84
uint8_t offset[7 *64-sizeof(uint64_t)]
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
Definition: sort-node.cc:78
Status Prepare(RuntimeState *state, const RowDescriptor &child_row_desc, const RowDescriptor &output_row_desc, MemTracker *expr_mem_tracker)
Prepare all expressions used for sorting and tuple materialization.
virtual Status Open(RuntimeState *state)
Definition: exec-node.cc:154
std::vector< bool > nulls_first_
Definition: sort-node.h:68
virtual void Close(RuntimeState *state)
Definition: exec-node.cc:166
virtual std::string DebugString() const
Definition: expr.cc:385
void Close(RuntimeState *state)
Close all expressions used for sorting and tuple materialization.
const std::vector< ExprContext * > & sort_tuple_slot_expr_ctxs() const
const std::vector< ExprContext * > & rhs_ordering_expr_ctxs() const
Can only be used after calling Open()
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161