Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
select-node.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/select-node.h"
16 #include "exprs/expr.h"
17 #include "runtime/row-batch.h"
18 #include "runtime/runtime-state.h"
19 #include "runtime/raw-value.h"
20 #include "gen-cpp/PlanNodes_types.h"
21 
22 #include "common/names.h"
23 
24 namespace impala {
25 
27  ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
28  : ExecNode(pool, tnode, descs),
29  child_row_batch_(NULL),
30  child_row_idx_(0),
31  child_eos_(false) {
32 }
33 
35  SCOPED_TIMER(runtime_profile_->total_time_counter());
37  child_row_batch_.reset(
38  new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
39  return Status::OK;
40 }
41 
43  SCOPED_TIMER(runtime_profile_->total_time_counter());
45  RETURN_IF_ERROR(child(0)->Open(state));
46  return Status::OK;
47 }
48 
49 Status SelectNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
50  SCOPED_TIMER(runtime_profile_->total_time_counter());
51  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
52 
53  if (ReachedLimit() || (child_row_idx_ == child_row_batch_->num_rows() && child_eos_)) {
54  // we're already done or we exhausted the last child batch and there won't be any
55  // new ones
56  *eos = true;
57  return Status::OK;
58  }
59  *eos = false;
60 
61  // start (or continue) consuming row batches from child
62  while (true) {
63  RETURN_IF_CANCELLED(state);
65  if (child_row_idx_ == child_row_batch_->num_rows()) {
66  child_row_idx_ = 0;
67  // fetch next batch
68  child_row_batch_->TransferResourceOwnership(row_batch);
69  child_row_batch_->Reset();
70  if (row_batch->AtCapacity()) return Status::OK;
72  }
73 
74  if (CopyRows(row_batch)) {
75  *eos = ReachedLimit()
76  || (child_row_idx_ == child_row_batch_->num_rows() && child_eos_);
77  return Status::OK;
78  }
79  if (child_eos_) {
80  // finished w/ last child row batch, and child eos is true
81  *eos = true;
82  return Status::OK;
83  }
84  }
85  return Status::OK;
86 }
87 
88 bool SelectNode::CopyRows(RowBatch* output_batch) {
90  int num_conjunct_ctxs = conjunct_ctxs_.size();
91 
92  for (; child_row_idx_ < child_row_batch_->num_rows(); ++child_row_idx_) {
93  // Add a new row to output_batch
94  int dst_row_idx = output_batch->AddRow();
95  if (dst_row_idx == RowBatch::INVALID_ROW_INDEX) return true;
96  TupleRow* dst_row = output_batch->GetRow(dst_row_idx);
97  TupleRow* src_row = child_row_batch_->GetRow(child_row_idx_);
98 
99  if (EvalConjuncts(conjunct_ctxs, num_conjunct_ctxs, src_row)) {
100  output_batch->CopyRow(src_row, dst_row);
101  output_batch->CommitLastRow();
104  if (ReachedLimit()) return true;
105  }
106  }
107  return output_batch->AtCapacity();
108 }
109 
111  DCHECK(false) << "NYI";
112  return Status("NYI");
113 }
114 
116  if (is_closed()) return;
117  child_row_batch_.reset();
118  ExecNode::Close(state);
119 }
120 
121 }
bool CopyRows(RowBatch *output_batch)
Definition: select-node.cc:88
boost::scoped_ptr< RowBatch > child_row_batch_
current row batch of child
Definition: select-node.h:44
int64_t num_rows_returned_
Definition: exec-node.h:223
MemTracker * mem_tracker()
Definition: exec-node.h:162
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
bool child_eos_
true if last GetNext() call on child signalled eos
Definition: select-node.h:50
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
int child_row_idx_
index of current row in child_row_batch_
Definition: select-node.h:47
bool AtCapacity()
Definition: row-batch.h:120
const RowDescriptor & row_desc() const
Definition: exec-node.h:156
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Definition: exec-node.cc:378
bool ReachedLimit()
Definition: exec-node.h:159
SelectNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
Definition: select-node.cc:26
#define SCOPED_TIMER(c)
const std::vector< ExprContext * > & conjunct_ctxs() const
Definition: exec-node.h:152
virtual Status Reset(RuntimeState *state)
Definition: select-node.cc:110
#define RETURN_IF_CANCELLED(state)
ObjectPool pool
virtual Status Prepare(RuntimeState *state)
Definition: exec-node.cc:130
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
Definition: select-node.cc:49
virtual Status QueryMaintenance(RuntimeState *state)
Definition: exec-node.cc:401
bool is_closed()
Definition: exec-node.h:242
void CommitLastRow()
Definition: row-batch.h:109
virtual Status Prepare(RuntimeState *state)
Definition: select-node.cc:34
#define COUNTER_SET(c, v)
int batch_size() const
Definition: runtime-state.h:98
virtual void Close(RuntimeState *state)
Definition: select-node.cc:115
RuntimeProfile::Counter * rows_returned_counter_
Definition: exec-node.h:226
ExecNode * child(int i)
Definition: exec-node.h:241
void CopyRow(TupleRow *src, TupleRow *dest)
Definition: row-batch.h:173
static const Status OK
Definition: status.h:87
virtual Status Open(RuntimeState *state)
Definition: select-node.cc:42
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
Definition: exec-node.cc:393
virtual Status Open(RuntimeState *state)
Definition: exec-node.cc:154
std::vector< ExprContext * > conjunct_ctxs_
Definition: exec-node.h:212
virtual void Close(RuntimeState *state)
Definition: exec-node.cc:166
static const int INVALID_ROW_INDEX
Definition: row-batch.h:87