Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
topn-node.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/topn-node.h"
16 
17 #include <sstream>
18 
19 #include "exprs/expr.h"
20 #include "runtime/descriptors.h"
21 #include "runtime/mem-pool.h"
22 #include "runtime/raw-value.h"
23 #include "runtime/row-batch.h"
24 #include "runtime/runtime-state.h"
25 #include "runtime/tuple.h"
26 #include "runtime/tuple-row.h"
27 #include "util/debug-util.h"
28 #include "util/runtime-profile.h"
29 
30 #include "gen-cpp/Exprs_types.h"
31 #include "gen-cpp/PlanNodes_types.h"
32 
33 #include "common/names.h"
34 
35 using std::priority_queue;
36 using namespace impala;
37 
38 TopNNode::TopNNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
39  : ExecNode(pool, tnode, descs),
40  offset_(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
41  num_rows_skipped_(0) {
42 }
43 
44 Status TopNNode::Init(const TPlanNode& tnode) {
46  RETURN_IF_ERROR(sort_exec_exprs_.Init(tnode.sort_node.sort_info, pool_));
47  is_asc_order_ = tnode.sort_node.sort_info.is_asc_order;
48  nulls_first_ = tnode.sort_node.sort_info.nulls_first;
49 
50  DCHECK_EQ(conjunct_ctxs_.size(), 0)
51  << "TopNNode should never have predicates to evaluate.";
52 
53  return Status::OK;
54 }
55 
57  SCOPED_TIMER(runtime_profile_->total_time_counter());
59  tuple_pool_.reset(new MemPool(mem_tracker()));
64  // Allocate memory for a temporary tuple.
65  tmp_tuple_ = reinterpret_cast<Tuple*>(
67  return Status::OK;
68 }
69 
71  SCOPED_TIMER(runtime_profile_->total_time_counter());
73  RETURN_IF_CANCELLED(state);
76 
80  priority_queue_.reset(
81  new priority_queue<Tuple*, vector<Tuple*>, TupleRowComparator>(
83 
84  RETURN_IF_ERROR(child(0)->Open(state));
85 
86  // Limit of 0, no need to fetch anything from children.
87  if (limit_ != 0) {
88  RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
89  bool eos;
90  do {
91  batch.Reset();
92  RETURN_IF_ERROR(child(0)->GetNext(state, &batch, &eos));
93  for (int i = 0; i < batch.num_rows(); ++i) {
94  InsertTupleRow(batch.GetRow(i));
95  }
96  RETURN_IF_CANCELLED(state);
98  } while (!eos);
99  }
100  DCHECK_LE(priority_queue_->size(), limit_ + offset_);
102  child(0)->Close(state);
103  return Status::OK;
104 }
105 
106 Status TopNNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
107  SCOPED_TIMER(runtime_profile_->total_time_counter());
108  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
109  RETURN_IF_CANCELLED(state);
111  while (!row_batch->AtCapacity() && (get_next_iter_ != sorted_top_n_.end())) {
112  if (num_rows_skipped_ < offset_) {
113  ++get_next_iter_;
115  continue;
116  }
117  int row_idx = row_batch->AddRow();
118  TupleRow* dst_row = row_batch->GetRow(row_idx);
119  Tuple* src_tuple = *get_next_iter_;
120  TupleRow* src_row = reinterpret_cast<TupleRow*>(&src_tuple);
121  row_batch->CopyRow(src_row, dst_row);
122  ++get_next_iter_;
123  row_batch->CommitLastRow();
126  }
127  *eos = get_next_iter_ == sorted_top_n_.end();
128  return Status::OK;
129 }
130 
132  DCHECK(false) << "NYI";
133  return Status("NYI");
134 }
135 
137  if (is_closed()) return;
138  if (tuple_pool_.get() != NULL) tuple_pool_->FreeAll();
139  sort_exec_exprs_.Close(state);
140  ExecNode::Close(state);
141 }
142 
143 // Insert if either not at the limit or it's a new TopN tuple_row
145  Tuple* insert_tuple = NULL;
146 
147  if (priority_queue_->size() < limit_ + offset_) {
148  insert_tuple = reinterpret_cast<Tuple*>(
150  insert_tuple->MaterializeExprs<false>(input_row, *materialized_tuple_desc_,
152  } else {
153  DCHECK(!priority_queue_->empty());
154  Tuple* top_tuple = priority_queue_->top();
157  if ((*tuple_row_less_than_)(tmp_tuple_, top_tuple)) {
158  // TODO: DeepCopy() will allocate new buffers for the string data. This needs
159  // to be fixed to use a freelist
160  tmp_tuple_->DeepCopy(top_tuple, *materialized_tuple_desc_, tuple_pool_.get());
161  insert_tuple = top_tuple;
162  priority_queue_->pop();
163  }
164  }
165 
166  if (insert_tuple != NULL) priority_queue_->push(insert_tuple);
167 }
168 
169 // Reverse the order of the tuples in the priority queue
171  sorted_top_n_.resize(priority_queue_->size());
172  int index = sorted_top_n_.size() - 1;
173 
174  while (priority_queue_->size() > 0) {
175  Tuple* tuple = priority_queue_->top();
176  priority_queue_->pop();
177  sorted_top_n_[index] = tuple;
178  --index;
179  }
180 
181  get_next_iter_ = sorted_top_n_.begin();
182 }
183 
184 void TopNNode::DebugString(int indentation_level, stringstream* out) const {
185  *out << string(indentation_level * 2, ' ');
186  *out << "TopNNode("
188  for (int i = 0; i < is_asc_order_.size(); ++i) {
189  *out << (i > 0 ? " " : "")
190  << (is_asc_order_[i] ? "asc" : "desc")
191  << " nulls " << (nulls_first_[i] ? "first" : "last");
192  }
193 
194  ExecNode::DebugString(indentation_level, out);
195  *out << ")";
196 }
std::vector< bool > is_asc_order_
Definition: topn-node.h:70
int64_t num_rows_returned_
Definition: exec-node.h:223
MemTracker * mem_tracker()
Definition: exec-node.h:162
void InsertTupleRow(TupleRow *tuple_row)
Definition: topn-node.cc:144
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
virtual Status Reset(RuntimeState *state)
Definition: topn-node.cc:131
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
virtual Status Init(const TPlanNode &tnode)
Definition: exec-node.cc:124
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
RowDescriptor row_descriptor_
Definition: exec-node.h:215
bool AtCapacity()
Definition: row-batch.h:120
int byte_size() const
Definition: descriptors.h:300
const RowDescriptor & row_desc() const
Definition: exec-node.h:156
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Definition: exec-node.cc:378
Status Open(RuntimeState *state)
Open all expressions used for sorting and tuple materialization.
#define SCOPED_TIMER(c)
int64_t limit_
Definition: exec-node.h:222
Tuple * DeepCopy(const TupleDescriptor &desc, MemPool *pool, bool convert_ptrs=false)
Definition: tuple.cc:34
boost::scoped_ptr< TupleRowComparator > tuple_row_less_than_
Definition: topn-node.h:75
std::vector< bool > nulls_first_
Definition: topn-node.h:71
boost::scoped_ptr< MemPool > tuple_pool_
Stores everything referenced in priority_queue_.
Definition: topn-node.h:90
virtual Status Prepare(RuntimeState *state)
Definition: topn-node.cc:56
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
Definition: exec-node.cc:345
MemTracker * expr_mem_tracker()
Definition: exec-node.h:163
std::vector< Tuple * >::iterator get_next_iter_
Definition: topn-node.h:87
virtual Status Init(const TPlanNode &tnode)
Definition: topn-node.cc:44
void Reset()
Resets the row batch, returning all resources it has accumulated.
Definition: row-batch.cc:224
virtual void Close(RuntimeState *state)
Definition: topn-node.cc:136
#define RETURN_IF_CANCELLED(state)
ObjectPool pool
Status Init(const TSortInfo &sort_info, ObjectPool *pool)
Initialize the expressions from a TSortInfo using the specified pool.
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
Definition: topn-node.cc:106
TopNNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
Definition: topn-node.cc:38
int64_t offset_
Number of rows to skip.
Definition: topn-node.h:64
virtual Status Prepare(RuntimeState *state)
Definition: exec-node.cc:130
std::vector< Tuple * > sorted_top_n_
After computing the TopN in the priority_queue, pop them and put them in this vector.
Definition: topn-node.h:86
const std::vector< ExprContext * > & lhs_ordering_expr_ctxs() const
Can only be used after calling Prepare()
void AddExprCtxsToFree(const std::vector< ExprContext * > &ctxs)
Tuple * tmp_tuple_
Definition: topn-node.h:94
virtual Status QueryMaintenance(RuntimeState *state)
Definition: exec-node.cc:401
bool is_closed()
Definition: exec-node.h:242
void CommitLastRow()
Definition: row-batch.h:109
#define COUNTER_SET(c, v)
int batch_size() const
Definition: runtime-state.h:98
RuntimeProfile::Counter * rows_returned_counter_
Definition: exec-node.h:226
ExecNode * child(int i)
Definition: exec-node.h:241
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
Definition: descriptors.h:412
TupleDescriptor * materialized_tuple_desc_
Cached descriptor for the materialized tuple. Assigned in Prepare().
Definition: topn-node.h:73
void CopyRow(TupleRow *src, TupleRow *dest)
Definition: row-batch.h:173
static const Status OK
Definition: status.h:87
ObjectPool * pool_
Definition: exec-node.h:211
uint8_t offset[7 *64-sizeof(uint64_t)]
Status Prepare(RuntimeState *state, const RowDescriptor &child_row_desc, const RowDescriptor &output_row_desc, MemTracker *expr_mem_tracker)
Prepare all expressions used for sorting and tuple materialization.
void MaterializeExprs(TupleRow *row, const TupleDescriptor &desc, const std::vector< ExprContext * > &materialize_expr_ctxs, MemPool *pool, std::vector< StringValue * > *non_null_var_len_values=NULL, int *total_var_len=NULL)
SortExecExprs sort_exec_exprs_
Definition: topn-node.h:69
virtual Status Open(RuntimeState *state)
Definition: topn-node.cc:70
virtual Status Open(RuntimeState *state)
Definition: exec-node.cc:154
void PrepareForOutput()
Flatten and reverse the priority queue.
Definition: topn-node.cc:170
boost::scoped_ptr< std::priority_queue< Tuple *, std::vector< Tuple * >, TupleRowComparator > > priority_queue_
Definition: topn-node.h:83
std::vector< ExprContext * > conjunct_ctxs_
Definition: exec-node.h:212
int64_t num_rows_skipped_
Definition: topn-node.h:65
virtual void Close(RuntimeState *state)
Definition: exec-node.cc:166
virtual std::string DebugString() const
Definition: expr.cc:385
void Close(RuntimeState *state)
Close all expressions used for sorting and tuple materialization.
const std::vector< ExprContext * > & sort_tuple_slot_expr_ctxs() const
const std::vector< ExprContext * > & rhs_ordering_expr_ctxs() const
Can only be used after calling Open()