Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
union-node.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/union-node.h"
16 #include "exprs/expr.h"
17 #include "exprs/expr-context.h"
18 #include "runtime/row-batch.h"
19 #include "runtime/runtime-state.h"
20 #include "runtime/raw-value.h"
21 #include "gen-cpp/PlanNodes_types.h"
22 
23 #include "common/names.h"
24 
25 namespace impala {
26 
27 UnionNode::UnionNode(ObjectPool* pool, const TPlanNode& tnode,
28  const DescriptorTbl& descs)
29  : ExecNode(pool, tnode, descs),
30  tuple_id_(tnode.union_node.tuple_id),
31  const_result_expr_idx_(0),
32  child_idx_(0),
33  child_row_batch_(NULL),
34  child_eos_(false),
35  child_row_idx_(0) {
36 }
37 
38 Status UnionNode::Init(const TPlanNode& tnode) {
40  DCHECK(tnode.__isset.union_node);
41  // Create const_expr_ctx_lists_ from thrift exprs.
42  const vector<vector<TExpr> >& const_texpr_lists = tnode.union_node.const_expr_lists;
43  for (int i = 0; i < const_texpr_lists.size(); ++i) {
44  vector<ExprContext*> ctxs;
45  RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, const_texpr_lists[i], &ctxs));
46  const_result_expr_ctx_lists_.push_back(ctxs);
47  }
48  // Create result_expr_ctx_lists_ from thrift exprs.
49  const vector<vector<TExpr> >& result_texpr_lists = tnode.union_node.result_expr_lists;
50  for (int i = 0; i < result_texpr_lists.size(); ++i) {
51  vector<ExprContext*> ctxs;
52  RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, result_texpr_lists[i], &ctxs));
53  result_expr_ctx_lists_.push_back(ctxs);
54  }
55  return Status::OK;
56 }
57 
59  SCOPED_TIMER(runtime_profile_->total_time_counter());
62  DCHECK(tuple_desc_ != NULL);
63 
64  // prepare materialized_slots_
65  for (int i = 0; i < tuple_desc_->slots().size(); ++i) {
66  SlotDescriptor* desc = tuple_desc_->slots()[i];
67  if (desc->is_materialized()) materialized_slots_.push_back(desc);
68  }
69 
70  // Prepare const expr lists.
71  for (int i = 0; i < const_result_expr_ctx_lists_.size(); ++i) {
75  DCHECK_EQ(const_result_expr_ctx_lists_[i].size(), materialized_slots_.size());
76  }
77 
78  // Prepare result expr lists.
79  for (int i = 0; i < result_expr_ctx_lists_.size(); ++i) {
83  DCHECK_EQ(result_expr_ctx_lists_[i].size(), materialized_slots_.size());
84  }
85  return Status::OK;
86 }
87 
89  SCOPED_TIMER(runtime_profile_->total_time_counter());
91  // Open const expr lists.
92  for (int i = 0; i < const_result_expr_ctx_lists_.size(); ++i) {
94  }
95  // Open result expr lists.
96  for (int i = 0; i < result_expr_ctx_lists_.size(); ++i) {
98  }
99 
100  // Open and fetch from the first child if there is one. Ensures that rows are
101  // available for clients to fetch after this Open() has succeeded.
102  if (!children_.empty()) RETURN_IF_ERROR(OpenCurrentChild(state));
103 
104  return Status::OK;
105 }
106 
108  DCHECK_LT(child_idx_, children_.size());
109  child_row_batch_.reset(new RowBatch(
110  child(child_idx_)->row_desc(), state->batch_size(), mem_tracker()));
111  // Open child and fetch the first row batch.
114  &child_eos_));
115  child_row_idx_ = 0;
116  return Status::OK;
117 }
118 
119 Status UnionNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
120  SCOPED_TIMER(runtime_profile_->total_time_counter());
121  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
122  RETURN_IF_CANCELLED(state);
124  // Create new tuple buffer for row_batch.
125  int tuple_buffer_size = row_batch->MaxTupleBufferSize();
126  Tuple* tuple = Tuple::Create(tuple_buffer_size, row_batch->tuple_data_pool());
127 
128  // Fetch from children, evaluate corresponding exprs and materialize.
129  while (child_idx_ < children_.size()) {
130  // Row batch was either never set or we're moving on to a different child.
131  if (child_row_batch_.get() == NULL) RETURN_IF_ERROR(OpenCurrentChild(state));
132 
133  // Start (or continue) consuming row batches from current child.
134  while (true) {
135  RETURN_IF_CANCELLED(state);
137 
138  // Continue materializing exprs on child_row_batch_ into row batch.
140  result_expr_ctx_lists_[child_idx_], false, &tuple, row_batch)) {
141  *eos = ReachedLimit();
142  return Status::OK;
143  }
144 
145  // Fetch new batch if one is available, otherwise move on to next child.
146  if (child_eos_) break;
147  child_row_batch_->Reset();
148  RETURN_IF_ERROR(child(child_idx_)->GetNext(state, child_row_batch_.get(),
149  &child_eos_));
150  child_row_idx_ = 0;
151  }
152 
153  // Close current child and move on to next one. It is OK to close the child as
154  // long as all RowBatches have already been consumed so that we are sure to have
155  // transfered all resources. It is not OK to close the child above in the case when
156  // ReachedLimit() is true as we may end up releasing resources that are referenced
157  // by the output row_batch.
158  child_row_batch_.reset();
159  child(child_idx_)->Close(state);
160  ++child_idx_;
161  }
162 
163  // Evaluate and materialize the const expr lists exactly once.
165  // Only evaluate the const expr lists by the first fragment instance.
166  if (state->fragment_ctx().fragment_instance_idx == 0) {
167  // Materialize expr results into row_batch.
170  &tuple, row_batch);
171  }
173  *eos = ReachedLimit();
174  if (*eos || row_batch->AtCapacity()) return Status::OK;
175  }
176 
177  *eos = true;
178  return Status::OK;
179 }
180 
182  DCHECK(false) << "NYI";
183  return Status("NYI");
184 }
185 
187  if (is_closed()) return;
188  child_row_batch_.reset();
189  for (int i = 0; i < const_result_expr_ctx_lists_.size(); ++i) {
191  }
192  for (int i = 0; i < result_expr_ctx_lists_.size(); ++i) {
194  }
195  ExecNode::Close(state);
196 }
197 
198 bool UnionNode::EvalAndMaterializeExprs(const vector<ExprContext*>& ctxs, bool const_exprs,
199  Tuple** tuple, RowBatch* row_batch) {
200  // Make sure there are rows left in the batch.
201  if (!const_exprs && child_row_idx_ >= child_row_batch_->num_rows()) {
202  return false;
203  }
204  // Execute the body at least once.
205  bool done = true;
207  int num_conjunct_ctxs = conjunct_ctxs_.size();
208 
209  do {
210  TupleRow* child_row = NULL;
211  if (!const_exprs) {
212  DCHECK(child_row_batch_ != NULL);
213  // Non-const expr list. Fetch next row from batch.
214  child_row = child_row_batch_->GetRow(child_row_idx_);
215  ++child_row_idx_;
216  done = child_row_idx_ >= child_row_batch_->num_rows();
217  }
218 
219  // Add a new row to the batch.
220  int row_idx = row_batch->AddRow();
221  DCHECK(row_idx != RowBatch::INVALID_ROW_INDEX);
222  TupleRow* row = row_batch->GetRow(row_idx);
223  row->SetTuple(0, *tuple);
224 
225  // Materialize expr results into tuple.
226  DCHECK_EQ(ctxs.size(), materialized_slots_.size());
227  for (int i = 0; i < ctxs.size(); ++i) {
228  // our exprs correspond to materialized slots
229  SlotDescriptor* slot_desc = materialized_slots_[i];
230  RawValue::Write(ctxs[i]->GetValue(child_row), *tuple, slot_desc,
231  row_batch->tuple_data_pool());
232  }
233 
234  if (EvalConjuncts(conjunct_ctxs, num_conjunct_ctxs, row)) {
235  row_batch->CommitLastRow();
238  char* new_tuple = reinterpret_cast<char*>(*tuple);
239  new_tuple += tuple_desc_->byte_size();
240  *tuple = reinterpret_cast<Tuple*>(new_tuple);
241  } else {
242  // Make sure to reset null indicators since we're overwriting
243  // the tuple assembled for the previous row.
244  (*tuple)->Init(tuple_desc_->byte_size());
245  }
246 
247  if (row_batch->AtCapacity() || ReachedLimit()) {
248  return true;
249  }
250  } while (!done);
251 
252  return false;
253 }
254 
255 }
int child_idx_
Index of current child.
Definition: union-node.h:66
const TupleDescriptor * tuple_desc_
Descriptor for tuples this union node constructs.
Definition: union-node.h:51
std::vector< std::vector< ExprContext * > > const_result_expr_ctx_lists_
Const exprs materialized by this node. These exprs don't refer to any children.
Definition: union-node.h:57
virtual Status Open(RuntimeState *state)
Definition: union-node.cc:88
int64_t num_rows_returned_
Definition: exec-node.h:223
MemTracker * mem_tracker()
Definition: exec-node.h:162
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
Definition: union-node.cc:119
bool EvalAndMaterializeExprs(const std::vector< ExprContext * > &ctxs, bool const_exprs, Tuple **tuple, RowBatch *row_batch)
Definition: union-node.cc:198
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
virtual Status Init(const TPlanNode &tnode)
Definition: exec-node.cc:124
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
void Init(int size)
Definition: tuple.h:58
std::vector< std::vector< ExprContext * > > result_expr_ctx_lists_
Exprs materialized by this node. The i-th result expr list refers to the i-th child.
Definition: union-node.h:63
bool AtCapacity()
Definition: row-batch.h:120
const std::vector< SlotDescriptor * > & slots() const
Definition: descriptors.h:302
int byte_size() const
Definition: descriptors.h:300
int child_row_idx_
Index of current row in child_row_batch_.
Definition: union-node.h:76
int const_result_expr_idx_
Index of current const result expr list.
Definition: union-node.h:60
const RowDescriptor & row_desc() const
Definition: exec-node.h:156
static Tuple * Create(int size, MemPool *pool)
initialize individual tuple with data residing in mem pool
Definition: tuple.h:51
UnionNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
Definition: union-node.cc:27
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Definition: exec-node.cc:378
bool ReachedLimit()
Definition: exec-node.h:159
TupleDescriptor * GetTupleDescriptor(TupleId id) const
Definition: descriptors.cc:437
#define SCOPED_TIMER(c)
const std::vector< ExprContext * > & conjunct_ctxs() const
Definition: exec-node.h:152
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
virtual Status Reset(RuntimeState *state)
Definition: union-node.cc:181
MemTracker * expr_mem_tracker()
Definition: exec-node.h:163
std::vector< SlotDescriptor * > materialized_slots_
those tuple_desc_->slots() which are materialized, in the same order
Definition: union-node.h:54
virtual Status Init(const TPlanNode &tnode)
Definition: union-node.cc:38
#define RETURN_IF_CANCELLED(state)
ObjectPool pool
virtual Status Prepare(RuntimeState *state)
Definition: exec-node.cc:130
static void Write(const void *value, Tuple *tuple, const SlotDescriptor *slot_desc, MemPool *pool)
Definition: raw-value.cc:303
void AddExprCtxsToFree(const std::vector< ExprContext * > &ctxs)
virtual Status QueryMaintenance(RuntimeState *state)
Definition: exec-node.cc:401
const DescriptorTbl & desc_tbl() const
Definition: runtime-state.h:93
virtual void Close(RuntimeState *state)
Definition: union-node.cc:186
bool is_closed()
Definition: exec-node.h:242
void CommitLastRow()
Definition: row-batch.h:109
std::vector< ExecNode * > children_
Definition: exec-node.h:214
#define COUNTER_SET(c, v)
int batch_size() const
Definition: runtime-state.h:98
virtual Status Prepare(RuntimeState *state)
Definition: union-node.cc:58
MemPool * tuple_data_pool()
Definition: row-batch.h:148
RuntimeProfile::Counter * rows_returned_counter_
Definition: exec-node.h:226
Status OpenCurrentChild(RuntimeState *state)
Definition: union-node.cc:107
ExecNode * child(int i)
Definition: exec-node.h:241
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
bool child_eos_
Saved from the last to GetNext() on the current child.
Definition: union-node.h:73
static const Status OK
Definition: status.h:87
ObjectPool * pool_
Definition: exec-node.h:211
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
Definition: expr.cc:149
int MaxTupleBufferSize()
Computes the maximum size needed to store tuple data for this row batch.
Definition: row-batch.cc:325
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
Definition: exec-node.cc:393
bool is_materialized() const
Definition: descriptors.h:92
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
virtual Status Open(RuntimeState *state)
Definition: exec-node.cc:154
int tuple_id_
Tuple id resolved in Prepare() to set tuple_desc_;.
Definition: union-node.h:48
std::vector< ExprContext * > conjunct_ctxs_
Definition: exec-node.h:212
virtual void Close(RuntimeState *state)
Definition: exec-node.cc:166
const TPlanFragmentInstanceCtx & fragment_ctx() const
boost::scoped_ptr< RowBatch > child_row_batch_
Definition: union-node.h:70
static const int INVALID_ROW_INDEX
Definition: row-batch.h:87