Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
analytic-eval-node.cc
Go to the documentation of this file.
1 // Copyright 2014 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include "exprs/agg-fn-evaluator.h"
19 #include "runtime/descriptors.h"
20 #include "runtime/row-batch.h"
21 #include "runtime/runtime-state.h"
22 #include "udf/udf-internal.h"
23 
24 #include "common/names.h"
25 
26 static const int MAX_TUPLE_POOL_SIZE = 8 * 1024 * 1024; // 8MB
27 
28 namespace impala {
29 
31  const DescriptorTbl& descs)
32  : ExecNode(pool, tnode, descs),
33  window_(tnode.analytic_node.window),
34  intermediate_tuple_desc_(
35  descs.GetTupleDescriptor(tnode.analytic_node.intermediate_tuple_id)),
36  result_tuple_desc_(
37  descs.GetTupleDescriptor(tnode.analytic_node.output_tuple_id)),
38  buffered_tuple_desc_(NULL),
39  partition_by_eq_expr_ctx_(NULL),
40  order_by_eq_expr_ctx_(NULL),
41  rows_start_offset_(0),
42  rows_end_offset_(0),
43  has_first_val_null_offset_(false),
44  first_val_null_offset_(0),
45  last_result_idx_(-1),
46  prev_pool_last_result_idx_(-1),
47  prev_pool_last_window_idx_(-1),
48  curr_tuple_(NULL),
49  dummy_result_tuple_(NULL),
50  curr_partition_idx_(-1),
51  prev_input_row_(NULL),
52  input_eos_(false),
53  evaluation_timer_(NULL) {
54  if (tnode.analytic_node.__isset.buffered_tuple_id) {
56  tnode.analytic_node.buffered_tuple_id);
57  }
58  if (!tnode.analytic_node.__isset.window) {
60  } else if (tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) {
62  DCHECK(!window_.__isset.window_start)
63  << "RANGE windows must have UNBOUNDED PRECEDING";
64  DCHECK(!window_.__isset.window_end ||
65  window_.window_end.type == TAnalyticWindowBoundaryType::CURRENT_ROW)
66  << "RANGE window end bound must be CURRENT ROW or UNBOUNDED FOLLOWING";
67  } else {
68  DCHECK_EQ(tnode.analytic_node.window.type, TAnalyticWindowType::ROWS);
70  if (window_.__isset.window_start) {
71  TAnalyticWindowBoundary b = window_.window_start;
72  if (b.__isset.rows_offset_value) {
73  rows_start_offset_ = b.rows_offset_value;
74  if (b.type == TAnalyticWindowBoundaryType::PRECEDING) rows_start_offset_ *= -1;
75  } else {
76  DCHECK_EQ(b.type, TAnalyticWindowBoundaryType::CURRENT_ROW);
78  }
79  }
80  if (window_.__isset.window_end) {
81  TAnalyticWindowBoundary b = window_.window_end;
82  if (b.__isset.rows_offset_value) {
83  rows_end_offset_ = b.rows_offset_value;
84  if (b.type == TAnalyticWindowBoundaryType::PRECEDING) rows_end_offset_ *= -1;
85  } else {
86  DCHECK_EQ(b.type, TAnalyticWindowBoundaryType::CURRENT_ROW);
87  rows_end_offset_ = 0;
88  }
89  }
90  }
91  VLOG_FILE << id() << " Window=" << DebugWindowString();
92 }
93 
94 Status AnalyticEvalNode::Init(const TPlanNode& tnode) {
96  const TAnalyticNode& analytic_node = tnode.analytic_node;
97  bool has_lead_fn = false;
98  for (int i = 0; i < analytic_node.analytic_functions.size(); ++i) {
99  AggFnEvaluator* evaluator;
101  pool_, analytic_node.analytic_functions[i], true, &evaluator));
102  evaluators_.push_back(evaluator);
103  const TFunction& fn = analytic_node.analytic_functions[i].nodes[0].fn;
104  is_lead_fn_.push_back("lead" == fn.name.function_name);
105  has_lead_fn = has_lead_fn || is_lead_fn_.back();
106  }
107  DCHECK(!has_lead_fn || !window_.__isset.window_start);
108  DCHECK(fn_scope_ != PARTITION || analytic_node.order_by_exprs.empty());
109  DCHECK(window_.__isset.window_end || !window_.__isset.window_start)
110  << "UNBOUNDED FOLLOWING is only supported with UNBOUNDED PRECEDING.";
111  if (analytic_node.__isset.partition_by_eq) {
112  DCHECK(analytic_node.__isset.buffered_tuple_id);
113  RETURN_IF_ERROR(Expr::CreateExprTree(pool_, analytic_node.partition_by_eq,
115  }
116  if (analytic_node.__isset.order_by_eq) {
117  DCHECK(analytic_node.__isset.buffered_tuple_id);
118  RETURN_IF_ERROR(Expr::CreateExprTree(pool_, analytic_node.order_by_eq,
120  }
121  return Status::OK;
122 }
123 
125  SCOPED_TIMER(runtime_profile_->total_time_counter());
127  DCHECK(child(0)->row_desc().IsPrefixOf(row_desc()));
129  curr_tuple_pool_.reset(new MemPool(mem_tracker()));
130  prev_tuple_pool_.reset(new MemPool(mem_tracker()));
131  mem_pool_.reset(new MemPool(mem_tracker()));
132  evaluation_timer_ = ADD_TIMER(runtime_profile(), "EvaluationTime");
133 
134  DCHECK_EQ(result_tuple_desc_->slots().size(), evaluators_.size());
135  for (int i = 0; i < evaluators_.size(); ++i) {
139  mem_pool_.get(), &ctx));
140  fn_ctxs_.push_back(ctx);
141  state->obj_pool()->Add(ctx);
142  }
143 
144  if (partition_by_eq_expr_ctx_ != NULL || order_by_eq_expr_ctx_ != NULL) {
145  DCHECK(buffered_tuple_desc_ != NULL);
146  vector<TTupleId> tuple_ids;
147  tuple_ids.push_back(child(0)->row_desc().tuple_descriptors()[0]->id());
148  tuple_ids.push_back(buffered_tuple_desc_->id());
149  RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector<bool>(2, false));
150  if (partition_by_eq_expr_ctx_ != NULL) {
152  partition_by_eq_expr_ctx_->Prepare(state, cmp_row_desc, expr_mem_tracker()));
154  }
155  if (order_by_eq_expr_ctx_ != NULL) {
157  order_by_eq_expr_ctx_->Prepare(state, cmp_row_desc, expr_mem_tracker()));
159  }
160  }
161  child_tuple_cmp_row_ = reinterpret_cast<TupleRow*>(
162  mem_pool_->Allocate(sizeof(Tuple*) * 2));
163  return Status::OK;
164 }
165 
167  SCOPED_TIMER(runtime_profile_->total_time_counter());
169  RETURN_IF_CANCELLED(state);
171  RETURN_IF_ERROR(child(0)->Open(state));
172  RETURN_IF_ERROR(state->block_mgr()->RegisterClient(2, mem_tracker(), state, &client_));
173  input_stream_.reset(new BufferedTupleStream(state, child(0)->row_desc(),
174  state->block_mgr(), client_,
175  false /* initial_small_buffers */,
176  true /* delete_on_read */,
177  true /* read_write */));
179 
180  DCHECK_EQ(evaluators_.size(), fn_ctxs_.size());
181  for (int i = 0; i < evaluators_.size(); ++i) {
182  RETURN_IF_ERROR(evaluators_[i]->Open(state, fn_ctxs_[i]));
183  DCHECK(!evaluators_[i]->is_merge());
184 
185  if ("first_value_rewrite" == evaluators_[i]->fn_name() &&
186  fn_ctxs_[i]->GetNumArgs() == 2) {
189  reinterpret_cast<BigIntVal*>(fn_ctxs_[i]->GetConstantArg(1))->val;
190  VLOG_FILE << id() << " FIRST_VAL rewrite null offset: " << first_val_null_offset_;
192  }
193  }
194 
195  if (partition_by_eq_expr_ctx_ != NULL) {
197  }
198  if (order_by_eq_expr_ctx_ != NULL) {
200  }
201 
202  // An intermediate tuple is only allocated once and is reused.
206 
207  // Initialize state for the first partition.
209  prev_child_batch_.reset(new RowBatch(child(0)->row_desc(), state->batch_size(),
210  mem_tracker()));
211  curr_child_batch_.reset(new RowBatch(child(0)->row_desc(), state->batch_size(),
212  mem_tracker()));
213  return Status::OK;
214 }
215 
216 string DebugWindowBoundString(const TAnalyticWindowBoundary& b) {
217  if (b.type == TAnalyticWindowBoundaryType::CURRENT_ROW) {
218  return "CURRENT_ROW";
219  }
220  stringstream ss;
221  if (b.__isset.rows_offset_value) {
222  ss << b.rows_offset_value;
223  } else {
224  // TODO: Return debug string when range offsets are supported
225  DCHECK(false) << "Range offsets not yet implemented";
226  }
227  if (b.type == TAnalyticWindowBoundaryType::PRECEDING) {
228  ss << " PRECEDING";
229  } else {
230  DCHECK_EQ(b.type, TAnalyticWindowBoundaryType::FOLLOWING);
231  ss << " FOLLOWING";
232  }
233  return ss.str();
234 }
235 
237  stringstream ss;
238  if (fn_scope_ == PARTITION) {
239  ss << "NO WINDOW";
240  return ss.str();
241  }
242  ss << "{type=";
243  if (fn_scope_ == RANGE) {
244  ss << "RANGE";
245  } else {
246  ss << "ROWS";
247  }
248  ss << ", start=";
249  if (window_.__isset.window_start) {
250  ss << DebugWindowBoundString(window_.window_start);
251  } else {
252  ss << "UNBOUNDED_PRECEDING";
253  }
254 
255  ss << ", end=";
256  if (window_.__isset.window_end) {
257  ss << DebugWindowBoundString(window_.window_end) << "}";
258  } else {
259  ss << "UNBOUNDED_FOLLOWING";
260  }
261  return ss.str();
262 }
263 
264 string AnalyticEvalNode::DebugStateString(bool detailed = false) const {
265  stringstream ss;
266  ss << "num_returned=" << input_stream_->rows_returned()
267  << " num_rows=" << input_stream_->num_rows()
268  << " curr_partition_idx_=" << curr_partition_idx_
269  << " last_result_idx=" << last_result_idx_;
270  if (detailed) {
271  ss << " result_tuples idx: [";
272  for (list<pair<int64_t, Tuple*> >::const_iterator it = result_tuples_.begin();
273  it != result_tuples_.end(); ++it) {
274  ss << it->first;
275  if (*it != result_tuples_.back()) ss << ", ";
276  }
277  ss << "]";
278  if (fn_scope_ == ROWS && window_.__isset.window_start) {
279  ss << " window_tuples idx: [";
280  for (list<pair<int64_t, Tuple*> >::const_iterator it = window_tuples_.begin();
281  it != window_tuples_.end(); ++it) {
282  ss << it->first;
283  if (*it != window_tuples_.back()) ss << ", ";
284  }
285  ss << "]";
286  }
287  } else {
288  if (fn_scope_ == ROWS && window_.__isset.window_start) {
289  if (window_tuples_.empty()) {
290  ss << " window_tuples empty";
291  } else {
292  ss << " window_tuples idx range: (" << window_tuples_.front().first << ","
293  << window_tuples_.back().first << ")";
294  }
295  }
296  if (result_tuples_.empty()) {
297  ss << " result_tuples empty";
298  } else {
299  ss << " result_tuples idx range: (" << result_tuples_.front().first << ","
300  << result_tuples_.back().first << ")";
301  }
302  }
303  return ss.str();
304 }
305 
306 inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) {
307  if (fn_scope_ != ROWS || !window_.__isset.window_start ||
308  stream_idx - rows_start_offset_ >= curr_partition_idx_) {
309  VLOG_ROW << id() << " Update idx=" << stream_idx;
311  if (window_.__isset.window_start) {
312  VLOG_ROW << id() << " Adding tuple to window at idx=" << stream_idx;
313  Tuple* tuple = row->GetTuple(0)->DeepCopy(*child_tuple_desc_,
314  curr_tuple_pool_.get());
315  window_tuples_.push_back(pair<int64_t, Tuple*>(stream_idx, tuple));
316  }
317  }
318 
319  // Buffer the entire input row to be returned later with the analytic eval results.
320  if (UNLIKELY(!input_stream_->AddRow(row))) {
321  // AddRow returns false if an error occurs (available via status()) or there is
322  // not enough memory (status() is OK). If there isn't enough memory, we unpin
323  // the stream and continue writing/reading in unpinned mode.
324  // TODO: Consider re-pinning later if the output stream is fully consumed.
325  RETURN_IF_ERROR(input_stream_->status());
326  RETURN_IF_ERROR(input_stream_->UnpinStream());
327  VLOG_FILE << id() << " Unpin input stream while adding row idx=" << stream_idx;
328  if (!input_stream_->AddRow(row)) {
329  // Rows should be added in unpinned mode unless an error occurs.
330  RETURN_IF_ERROR(input_stream_->status());
331  DCHECK(false);
332  }
333  }
334  return Status::OK;
335 }
336 
337 void AnalyticEvalNode::AddResultTuple(int64_t stream_idx) {
338  VLOG_ROW << id() << " AddResultTuple idx=" << stream_idx;
339  DCHECK(curr_tuple_ != NULL);
340  Tuple* result_tuple = Tuple::Create(result_tuple_desc_->byte_size(),
341  curr_tuple_pool_.get());
342 
344  DCHECK_GT(stream_idx, last_result_idx_);
345  result_tuples_.push_back(pair<int64_t, Tuple*>(stream_idx, result_tuple));
346  last_result_idx_ = stream_idx;
347  VLOG_ROW << id() << " Added result tuple, final state: " << DebugStateString(true);
348 }
349 
350 inline void AnalyticEvalNode::TryAddResultTupleForPrevRow(bool next_partition,
351  int64_t stream_idx, TupleRow* row) {
352  // The analytic fns are finalized after the previous row if we found a new partition
353  // or the window is a RANGE and the order by exprs changed. For ROWS windows we do not
354  // need to compare the current row to the previous row.
355  VLOG_ROW << id() << " TryAddResultTupleForPrevRow partition=" << next_partition
356  << " idx=" << stream_idx;
357  if (fn_scope_ == ROWS) return;
358  if (next_partition || (fn_scope_ == RANGE && window_.__isset.window_end &&
360  AddResultTuple(stream_idx - 1);
361  }
362 }
363 
364 inline void AnalyticEvalNode::TryAddResultTupleForCurrRow(int64_t stream_idx,
365  TupleRow* row) {
366  VLOG_ROW << id() << " TryAddResultTupleForCurrRow idx=" << stream_idx;
367  // We only add results at this point for ROWS windows (unless unbounded following)
368  if (fn_scope_ != ROWS || !window_.__isset.window_end) return;
369 
370  // Nothing to add if the end offset is before the start of the partition.
371  if (stream_idx - rows_end_offset_ < curr_partition_idx_) return;
372  AddResultTuple(stream_idx - rows_end_offset_);
373 }
374 
375 inline void AnalyticEvalNode::TryRemoveRowsBeforeWindow(int64_t stream_idx) {
376  if (fn_scope_ != ROWS || !window_.__isset.window_start) return;
377  // The start of the window may have been before the current partition, in which case
378  // there is no tuple to remove in window_tuples_. Check the index of the row at which
379  // tuples from window_tuples_ should begin to be removed.
380  int64_t remove_idx = stream_idx - rows_end_offset_ + min(rows_start_offset_, 0L) - 1;
381  if (remove_idx < curr_partition_idx_) return;
382  VLOG_ROW << id() << " Remove idx=" << remove_idx << " stream_idx=" << stream_idx;
383  DCHECK(!window_tuples_.empty()) << DebugStateString(true);
384  DCHECK_EQ(remove_idx + max(rows_start_offset_, 0L), window_tuples_.front().first)
385  << DebugStateString(true);
386  TupleRow* remove_row = reinterpret_cast<TupleRow*>(&window_tuples_.front().second);
388  window_tuples_.pop_front();
389 }
390 
391 inline void AnalyticEvalNode::TryAddRemainingResults(int64_t partition_idx,
392  int64_t prev_partition_idx) {
393  DCHECK_LT(prev_partition_idx, partition_idx);
394  // For PARTITION, RANGE, or ROWS with UNBOUNDED PRECEDING: add a result tuple for the
395  // remaining rows in the partition that do not have an associated result tuple yet.
396  if (fn_scope_ != ROWS || !window_.__isset.window_end) {
397  if (last_result_idx_ < partition_idx - 1) AddResultTuple(partition_idx - 1);
398  return;
399  }
400 
401  // lead() is re-written to a ROWS window with an end bound FOLLOWING. Any remaining
402  // results need the default value (set by Init()). If this is the case, the start bound
403  // is UNBOUNDED PRECEDING (DCHECK in Init()).
404  for (int i = 0; i < evaluators_.size(); ++i) {
405  if (is_lead_fn_[i]) evaluators_[i]->Init(fn_ctxs_[i], curr_tuple_);
406  }
407 
408  // If the start bound is not UNBOUNDED PRECEDING and there are still rows in the
409  // partition for which we need to produce result tuples, we need to continue removing
410  // input tuples at the start of the window from each row that we're adding results for.
411  VLOG_ROW << id() << " TryAddRemainingResults prev_partition_idx=" << prev_partition_idx
412  << " " << DebugStateString(true);
413  for (int64_t next_result_idx = last_result_idx_ + 1; next_result_idx < partition_idx;
414  ++next_result_idx) {
415  if (window_tuples_.empty()) break;
416  if (next_result_idx + rows_start_offset_ > window_tuples_.front().first) {
417  DCHECK_EQ(next_result_idx + rows_start_offset_ - 1, window_tuples_.front().first);
418  // For every tuple that is removed from the window: Remove() from the evaluators
419  // and add the result tuple at the next index.
420  VLOG_ROW << id() << " Remove window_row_idx=" << window_tuples_.front().first
421  << " for result row at idx=" << next_result_idx;
422  TupleRow* remove_row = reinterpret_cast<TupleRow*>(&window_tuples_.front().second);
424  window_tuples_.pop_front();
425  }
427  }
428 
429  // If there are still rows between the row with the last result (AddResultTuple() may
430  // have updated last_result_idx_) and the partition boundary, add the current results
431  // for the remaining rows with the same result tuple (curr_tuple_ is not modified).
432  if (last_result_idx_ < partition_idx - 1) AddResultTuple(partition_idx - 1);
433 }
434 
435 inline void AnalyticEvalNode::InitNextPartition(int64_t stream_idx) {
436  VLOG_FILE << id() << " InitNextPartition idx=" << stream_idx;
437  DCHECK_LT(curr_partition_idx_, stream_idx);
438  int64_t prev_partition_stream_idx = curr_partition_idx_;
439  curr_partition_idx_ = stream_idx;
440 
441  // If the window has an end bound preceding the current row, we will have output tuples
442  // for rows beyond the previous partition, so they should be removed. Because
443  // result_tuples_ is a sparse structure, the last result tuple of the previous
444  // partition may have been added to result_tuples_ with a stream index equal to or
445  // beyond curr_partition_idx_. So the last entry in result_tuples_ with a stream index
446  // >= curr_partition_idx_ is the last result tuple of the previous partition. Adding
447  // the last result tuple to result_tuples_ with a stream index curr_partition_idx_ - 1
448  // ensures that all rows in the previous partition have corresponding analytic results.
449  Tuple* prev_partition_last_result_tuple = NULL;
450  while (!result_tuples_.empty() && result_tuples_.back().first >= curr_partition_idx_) {
451  DCHECK(fn_scope_ == ROWS && window_.__isset.window_end &&
452  window_.window_end.type == TAnalyticWindowBoundaryType::PRECEDING);
453  VLOG_ROW << id() << " Removing result past partition idx: "
454  << result_tuples_.back().first;
455  prev_partition_last_result_tuple = result_tuples_.back().second;
456  result_tuples_.pop_back();
457  }
458  if (prev_partition_last_result_tuple != NULL) {
459  if (result_tuples_.empty() ||
460  result_tuples_.back().first < curr_partition_idx_ - 1) {
461  // prev_partition_last_result_tuple was the last result tuple in the partition, add
462  // it back with the index of the last row in the partition so that all output rows
463  // in this partition get the correct value.
464  result_tuples_.push_back(pair<int64_t, Tuple*>(curr_partition_idx_ - 1,
465  prev_partition_last_result_tuple));
466  }
467  DCHECK(!result_tuples_.empty());
468  last_result_idx_ = result_tuples_.back().first;
469  VLOG_ROW << id() << " After removing results past partition: "
470  << DebugStateString(true);
471  DCHECK_EQ(last_result_idx_, curr_partition_idx_ - 1);
472  DCHECK_LE(input_stream_->rows_returned(), last_result_idx_);
473  }
474  DCHECK(result_tuples_.empty() || (last_result_idx_ == result_tuples_.back().first));
475 
476  if (fn_scope_ == ROWS && stream_idx > 0 && (!window_.__isset.window_end ||
477  window_.window_end.type == TAnalyticWindowBoundaryType::FOLLOWING)) {
478  TryAddRemainingResults(stream_idx, prev_partition_stream_idx);
479  }
480  window_tuples_.clear();
481 
482  VLOG_ROW << id() << " Reset curr_tuple";
483  // Call finalize to release resources; result is not needed but the dst tuple must be
484  // a tuple described by result_tuple_desc_.
486  // Re-initialize curr_tuple_.
489 
490  // Add a result tuple containing values set by Init() (e.g. NULL for sum(), 0 for
491  // count()) for output rows that have no input rows in the window. We need to add this
492  // result tuple before any input rows are consumed and the evaluators are updated.
493  if (fn_scope_ == ROWS && window_.__isset.window_end &&
494  window_.window_end.type == TAnalyticWindowBoundaryType::PRECEDING) {
496  // Special handling for FIRST_VALUE which has the window rewritten in the FE
497  // in order to evaluate the fn efficiently with a trivial agg fn implementation.
498  // This occurs when the original analytic window has a start bound X PRECEDING. In
499  // that case, the window is rewritten to have an end bound X PRECEDING which would
500  // normally mean we add the newly Init()'d result tuple X rows down (so that those
501  // first rows have the initial value because they have no rows in their windows).
502  // However, the original query did not actually have X PRECEDING so we need to do
503  // one of the following:
504  // 1) Do not insert the initial result tuple with at all, indicated by
505  // first_val_null_offset_ == -1. This happens when the original end bound was
506  // actually CURRENT ROW or Y FOLLOWING.
507  // 2) Insert the initial result tuple at first_val_null_offset_. This happens when
508  // the end bound was actually Y PRECEDING.
509  if (first_val_null_offset_ != -1) {
511  }
512  } else {
514  }
515  }
516 }
517 
519  DCHECK(pred_ctx != NULL);
520  BooleanVal result = pred_ctx->GetBooleanVal(child_tuple_cmp_row_);
521  DCHECK(!result.is_null);
522  return result.val;
523 }
524 
526  // Consume child batches until eos or there are enough rows to return more than an
527  // output batch. Ensuring there is at least one more row left after returning results
528  // allows us to simplify the logic dealing with last_result_idx_ and result_tuples_.
529  while (!input_eos_ && NumOutputRowsReady() < state->batch_size() + 1) {
530  RETURN_IF_CANCELLED(state);
533 
535  // TODO: DCHECK that the size of result_tuples_ is bounded. It shouldn't be larger
536  // than 2x the batch size unless the end bound has an offset preceding, in which
537  // case it may be slightly larger (proportional to the offset but still bounded).
538  prev_child_batch_->Reset();
540  }
541  if (input_eos_) {
542  curr_child_batch_.reset();
543  prev_child_batch_.reset();
544  }
545  return Status::OK;
546 }
547 
549  // TODO: DCHECK input is sorted (even just first row vs prev_input_row_)
550  VLOG_FILE << id() << " ProcessChildBatch: " << DebugStateString()
551  << " input batch size:" << curr_child_batch_->num_rows()
552  << " tuple pool size:" << curr_tuple_pool_->total_allocated_bytes();
554 
555  // BufferedTupleStream::num_rows() returns the total number of rows that have been
556  // inserted into the stream (it does not decrease when we read rows), so the index of
557  // the next input row that will be inserted will be the current size of the stream.
558  int64_t stream_idx = input_stream_->num_rows();
559 
560  // The very first row in the stream is handled specially because there is no previous
561  // row to compare and we cannot rely on PrevRowCompare() returning true even for the
562  // same row pointers if there are NaN values.
563  int batch_idx = 0;
564  if (UNLIKELY(stream_idx == 0 && curr_child_batch_->num_rows() > 0)) {
565  TupleRow* row = curr_child_batch_->GetRow(0);
566  RETURN_IF_ERROR(AddRow(0, row));
568  prev_input_row_ = row;
569  ++batch_idx;
570  ++stream_idx;
571  }
572 
573  for (; batch_idx < curr_child_batch_->num_rows(); ++batch_idx, ++stream_idx) {
574  TupleRow* row = curr_child_batch_->GetRow(batch_idx);
575  if (partition_by_eq_expr_ctx_ != NULL || order_by_eq_expr_ctx_ != NULL) {
576  // Only set the tuples in child_tuple_cmp_row_ if there are partition exprs or
577  // order by exprs that require comparing the current and previous rows. If there
578  // aren't partition or order by exprs (i.e. empty OVER() clause), there was no sort
579  // and there could be nullable tuples (whereas the sort node does not produce
580  // them), see IMPALA-1562.
583  }
584  TryRemoveRowsBeforeWindow(stream_idx);
585 
586  // Every row is compared against the previous row to determine if (a) the row
587  // starts a new partition or (b) the row does not share the same values for the
588  // ordering exprs. When either of these occurs, the evaluators_ are finalized and
589  // the result tuple is added to result_tuples_ so that it may be added to output
590  // rows in GetNextOutputBatch(). When a new partition is found (a), a new, empty
591  // result tuple is created and initialized over the evaluators_. If the row has
592  // different values for the ordering exprs (b), then a new tuple is created but
593  // copied from curr_tuple_ because the original is used for one or more previous
594  // row(s) but the incremental state still applies to the current row.
595  bool next_partition = false;
596  if (partition_by_eq_expr_ctx_ != NULL) {
597  // partition_by_eq_expr_ctx_ checks equality over the predicate exprs
598  next_partition = !PrevRowCompare(partition_by_eq_expr_ctx_);
599  }
600  TryAddResultTupleForPrevRow(next_partition, stream_idx, row);
601  if (next_partition) InitNextPartition(stream_idx);
602 
603  // The evaluators_ are updated with the current row.
604  RETURN_IF_ERROR(AddRow(stream_idx, row));
605 
606  TryAddResultTupleForCurrRow(stream_idx, row);
607  prev_input_row_ = row;
608  }
609 
610  if (UNLIKELY(input_eos_ && stream_idx > curr_partition_idx_)) {
611  // We need to add the results for the last row(s).
613  }
614 
615  // Transfer resources to prev_tuple_pool_ when enough resources have accumulated
616  // and the prev_tuple_pool_ has already been transfered to an output batch.
617  if (curr_tuple_pool_->total_allocated_bytes() > MAX_TUPLE_POOL_SIZE &&
619  prev_tuple_pool_->AcquireData(curr_tuple_pool_.get(), false);
620  prev_pool_last_result_idx_ = last_result_idx_;
621  if (window_tuples_.size() > 0) {
623  } else {
625  }
626  VLOG_FILE << id() << " Transfer resources from curr to prev pool at idx: "
627  << stream_idx << ", stores tuples with last result idx: "
628  << prev_pool_last_result_idx_ << " last window idx: "
630  }
631  return Status::OK;
632 }
633 
635  bool* eos) {
637  VLOG_FILE << id() << " GetNextOutputBatch: " << DebugStateString()
638  << " tuple pool size:" << curr_tuple_pool_->total_allocated_bytes();
639  if (input_stream_->rows_returned() == input_stream_->num_rows()) {
640  *eos = true;
641  return Status::OK;
642  }
643 
644  const int num_child_tuples = child(0)->row_desc().tuple_descriptors().size();
645  ExprContext** ctxs = &conjunct_ctxs_[0];
646  int num_ctxs = conjunct_ctxs_.size();
647 
648  RowBatch input_batch(child(0)->row_desc(), output_batch->capacity(), mem_tracker());
649  int64_t stream_idx = input_stream_->rows_returned();
650  RETURN_IF_ERROR(input_stream_->GetNext(&input_batch, eos));
651  for (int i = 0; i < input_batch.num_rows(); ++i) {
652  if (ReachedLimit()) break;
653  DCHECK(!output_batch->AtCapacity());
654  DCHECK(!result_tuples_.empty());
655  VLOG_ROW << id() << " Output row idx=" << stream_idx << " " << DebugStateString(true);
656 
657  // CopyRow works as expected: input_batch tuples form a prefix of output_batch
658  // tuples.
659  TupleRow* dest = output_batch->GetRow(output_batch->AddRow());
660  input_batch.CopyRow(input_batch.GetRow(i), dest);
661  dest->SetTuple(num_child_tuples, result_tuples_.front().second);
662 
663  if (ExecNode::EvalConjuncts(ctxs, num_ctxs, dest)) {
664  output_batch->CommitLastRow();
666  }
667 
668  // Remove the head of result_tuples_ if all rows using that evaluated tuple
669  // have been returned.
670  DCHECK_LE(stream_idx, result_tuples_.front().first);
671  if (stream_idx >= result_tuples_.front().first) result_tuples_.pop_front();
672  ++stream_idx;
673  }
674  input_batch.TransferResourceOwnership(output_batch);
675  if (ReachedLimit()) *eos = true;
676  return Status::OK;
677 }
678 
679 inline int64_t AnalyticEvalNode::NumOutputRowsReady() const {
680  if (result_tuples_.empty()) return 0;
681  int64_t rows_to_return = last_result_idx_ - input_stream_->rows_returned();
682  if (last_result_idx_ > input_stream_->num_rows()) {
683  // This happens when we were able to add a result tuple before consuming child rows,
684  // e.g. initializing a new partition with an end bound that is X preceding. The first
685  // X rows get the default value and we add that tuple to result_tuples_ before
686  // consuming child rows. It's possible the result is negative, and that's fine
687  // because this result is only used to determine if the number of rows to return
688  // is at least as big as the batch size.
689  rows_to_return -= last_result_idx_ - input_stream_->num_rows();
690  } else {
691  DCHECK_GE(rows_to_return, 0);
692  }
693  return rows_to_return;
694 }
695 
696 Status AnalyticEvalNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
697  SCOPED_TIMER(runtime_profile_->total_time_counter());
698  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
699  RETURN_IF_CANCELLED(state);
701  VLOG_FILE << id() << " GetNext: " << DebugStateString();
702 
703  if (ReachedLimit()) {
704  *eos = true;
705  return Status::OK;
706  } else {
707  *eos = false;
708  }
709 
711  bool output_eos = false;
712  RETURN_IF_ERROR(GetNextOutputBatch(state, row_batch, &output_eos));
713  if (curr_child_batch_.get() == NULL && output_eos) *eos = true;
714 
715  // Transfer resources to the output row batch if enough have accumulated and they're
716  // no longer needed by output rows to be returned later.
717  if (prev_pool_last_result_idx_ != -1 &&
718  prev_pool_last_result_idx_ < input_stream_->rows_returned() &&
719  prev_pool_last_window_idx_ < window_tuples_.front().first) {
720  VLOG_FILE << id() << " Transfer prev pool to output batch, "
721  << " pool size: " << prev_tuple_pool_->total_allocated_bytes()
722  << " last result idx: " << prev_pool_last_result_idx_
723  << " last window idx: " << prev_pool_last_window_idx_;
724  row_batch->tuple_data_pool()->AcquireData(prev_tuple_pool_.get(), !*eos);
726  prev_pool_last_window_idx_ = -1;
727  }
728 
730  return Status::OK;
731 }
732 
734  DCHECK(false) << "NYI";
735  return Status("NYI");
736 }
737 
739  if (is_closed()) return;
740  if (input_stream_.get() != NULL) input_stream_->Close();
741 
742  // Close all evaluators and fn ctxs. If an error occurred in Init or Prepare there may
743  // be fewer ctxs than evaluators. We also need to Finalize if curr_tuple_ was created
744  // in Open.
745  DCHECK_LE(fn_ctxs_.size(), evaluators_.size());
746  DCHECK(curr_tuple_ == NULL || fn_ctxs_.size() == evaluators_.size());
747  for (int i = 0; i < evaluators_.size(); ++i) {
748  // Need to make sure finalize is called in case there is any state to clean up.
749  if (curr_tuple_ != NULL) {
751  }
752  evaluators_[i]->Close(state);
753  }
754  for (int i = 0; i < fn_ctxs_.size(); ++i) fn_ctxs_[i]->impl()->Close();
755 
758  if (prev_child_batch_.get() != NULL) prev_child_batch_.reset();
759  if (curr_child_batch_.get() != NULL) curr_child_batch_.reset();
760  if (curr_tuple_pool_.get() != NULL) curr_tuple_pool_->FreeAll();
761  if (prev_tuple_pool_.get() != NULL) prev_tuple_pool_->FreeAll();
762  if (mem_pool_.get() != NULL) mem_pool_->FreeAll();
763  ExecNode::Close(state);
764 }
765 
766 void AnalyticEvalNode::DebugString(int indentation_level, stringstream* out) const {
767  *out << string(indentation_level * 2, ' ');
768  *out << "AnalyticEvalNode("
769  << " window=" << DebugWindowString();
770  if (partition_by_eq_expr_ctx_ != NULL) {
771  *out << " partition_exprs=" << partition_by_eq_expr_ctx_->root()->DebugString();
772  }
773  if (order_by_eq_expr_ctx_ != NULL) {
774  *out << " order_by_exprs=" << order_by_eq_expr_ctx_->root()->DebugString();
775  }
777  ExecNode::DebugString(indentation_level, out);
778  *out << ")";
779 }
780 
782  for (int i = 0; i < evaluators_.size(); ++i) {
783  ExprContext::FreeLocalAllocations(evaluators_[i]->input_expr_ctxs());
784  }
785  return ExecNode::QueryMaintenance(state);
786 }
787 
788 }
void TryAddRemainingResults(int64_t partition_idx, int64_t prev_partition_idx)
int id() const
Definition: exec-node.h:154
The underlying memory management is done by the BufferedBlockMgr.
std::string DebugStateString(bool detailed) const
Debug string containing current state. If 'detailed', per-row state is included.
std::vector< bool > is_lead_fn_
void TryAddResultTupleForPrevRow(bool next_partition, int64_t stream_idx, TupleRow *row)
virtual Status QueryMaintenance(RuntimeState *state)
Frees local allocations from evaluators_.
BooleanVal GetBooleanVal(TupleRow *row)
Calls Get*Val on root_.
virtual Status Init(const TPlanNode &tnode)
Status GetNextOutputBatch(RuntimeState *state, RowBatch *row_batch, bool *eos)
int64_t num_rows_returned_
Definition: exec-node.h:223
Tuple * GetTuple(int tuple_idx)
Definition: tuple-row.h:30
bool input_eos_
True when there are no more input rows to consume from our child.
BufferedBlockMgr * block_mgr()
MemTracker * mem_tracker()
Definition: exec-node.h:162
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
boost::scoped_ptr< RowBatch > curr_child_batch_
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
virtual Status Reset(RuntimeState *state)
const TAnalyticWindow window_
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
std::string DebugWindowString() const
Debug string containing the window definition.
std::string DebugString() const
virtual Status Init(const TPlanNode &tnode)
Definition: exec-node.cc:124
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
void Init(int size)
Definition: tuple.h:58
#define ADD_TIMER(profile, name)
Status RegisterClient(int num_reserved_buffers, MemTracker *tracker, RuntimeState *state, Client **client)
void GetValue(FunctionContext *agg_fn_ctx, Tuple *src, Tuple *dst)
int64_t NumOutputRowsReady() const
bool AtCapacity()
Definition: row-batch.h:120
void AcquireData(MemPool *src, bool keep_current)
Definition: mem-pool.cc:161
const std::vector< SlotDescriptor * > & slots() const
Definition: descriptors.h:302
int byte_size() const
Definition: descriptors.h:300
void Add(FunctionContext *agg_fn_ctx, TupleRow *src, Tuple *dst)
void Remove(FunctionContext *agg_fn_ctx, TupleRow *src, Tuple *dst)
static Status Create(ObjectPool *pool, const TExpr &desc, AggFnEvaluator **result)
const RowDescriptor & row_desc() const
Definition: exec-node.h:156
static Tuple * Create(int size, MemPool *pool)
initialize individual tuple with data residing in mem pool
Definition: tuple.h:51
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Definition: exec-node.cc:378
bool ReachedLimit()
Definition: exec-node.h:159
TupleDescriptor * GetTupleDescriptor(TupleId id) const
Definition: descriptors.cc:437
virtual void Close(RuntimeState *state)
#define SCOPED_TIMER(c)
void AddExprCtxToFree(ExprContext *ctx)
Definition: exec-node.h:276
Tuple * DeepCopy(const TupleDescriptor &desc, MemPool *pool, bool convert_ptrs=false)
Definition: tuple.cc:34
void TryRemoveRowsBeforeWindow(int64_t stream_idx)
Status Prepare(RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
Definition: expr-context.cc:47
bool is_null
Definition: udf.h:359
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
Definition: exec-node.cc:345
MemTracker * expr_mem_tracker()
Definition: exec-node.h:163
void InitNextPartition(int64_t stream_idx)
boost::scoped_ptr< MemPool > mem_pool_
Pool used for O(1) allocations that live until close.
ObjectPool * obj_pool() const
Definition: runtime-state.h:92
TupleDescriptor * child_tuple_desc_
#define RETURN_IF_CANCELLED(state)
ObjectPool pool
void Init(FunctionContext *agg_fn_ctx, Tuple *dst)
Functions for different phases of the aggregation.
Status ProcessChildBatches(RuntimeState *state)
bool PrevRowCompare(ExprContext *pred_ctx)
virtual Status Prepare(RuntimeState *state)
Definition: exec-node.cc:130
BufferedBlockMgr::Client * client_
Block manager client used by input_stream_. Not owned.
virtual Status QueryMaintenance(RuntimeState *state)
Definition: exec-node.cc:401
#define VLOG_ROW
Definition: logging.h:59
const TupleDescriptor * intermediate_tuple_desc_
Tuple descriptor for storing intermediate values of analytic fn evaluation.
const DescriptorTbl & desc_tbl() const
Definition: runtime-state.h:93
bool is_closed()
Definition: exec-node.h:242
void CommitLastRow()
Definition: row-batch.h:109
std::vector< AggFnEvaluator * > evaluators_
Analytic function evaluators.
int64_t rows_returned() const
Definition: exec-node.h:157
std::list< std::pair< int64_t, Tuple * > > result_tuples_
#define COUNTER_SET(c, v)
int batch_size() const
Definition: runtime-state.h:98
TupleId id() const
Definition: descriptors.h:306
MemPool * tuple_data_pool()
Definition: row-batch.h:148
static Status CreateExprTree(ObjectPool *pool, const TExpr &texpr, ExprContext **ctx)
Definition: expr.cc:129
RuntimeProfile::Counter * rows_returned_counter_
Definition: exec-node.h:226
ExecNode * child(int i)
Definition: exec-node.h:241
RuntimeProfile::Counter * evaluation_timer_
Time spent processing the child rows.
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
Definition: descriptors.h:412
Status ProcessChildBatch(RuntimeState *state)
string DebugWindowBoundString(const TAnalyticWindowBoundary &b)
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
int capacity() const
Definition: row-batch.h:216
#define UNLIKELY(expr)
Definition: compiler-util.h:33
ExprContext * partition_by_eq_expr_ctx_
boost::scoped_ptr< BufferedTupleStream > input_stream_
static const Status OK
Definition: status.h:87
ObjectPool * pool_
Definition: exec-node.h:211
int64_t last_result_idx_
Index in input_stream_ of the most recently added result tuple.
TupleDescriptor * buffered_tuple_desc_
int64_t curr_partition_idx_
Index of the row in input_stream_ at which the current partition started.
boost::scoped_ptr< RowBatch > prev_child_batch_
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
Definition: exec-node.cc:393
std::vector< impala_udf::FunctionContext * > fn_ctxs_
#define VLOG_FILE
Definition: logging.h:58
virtual Status Open(RuntimeState *state)
Definition: exec-node.cc:154
Status Open(RuntimeState *state)
Must be called after calling Prepare(). Should not be called on clones.
Definition: expr-context.cc:56
std::vector< ExprContext * > conjunct_ctxs_
Definition: exec-node.h:212
virtual void Close(RuntimeState *state)
Definition: exec-node.cc:166
boost::scoped_ptr< MemPool > curr_tuple_pool_
virtual std::string DebugString() const
Definition: expr.cc:385
void AddResultTuple(int64_t stream_idx)
void Close(RuntimeState *state)
Closes all FunctionContexts. Must be called on every ExprContext, including clones.
Definition: expr-context.cc:67
boost::scoped_ptr< MemPool > prev_tuple_pool_
static const int MAX_TUPLE_POOL_SIZE
Status AddRow(int64_t stream_idx, TupleRow *row)
Adds the row to the evaluators and the tuple stream.
void TryAddResultTupleForCurrRow(int64_t stream_idx, TupleRow *row)
std::list< std::pair< int64_t, Tuple * > > window_tuples_
virtual Status Open(RuntimeState *state)
AnalyticEvalNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
virtual Status Prepare(RuntimeState *state)
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161
void Finalize(FunctionContext *agg_fn_ctx, Tuple *src, Tuple *dst)
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
const TupleDescriptor * result_tuple_desc_
Tuple descriptor for storing results of analytic fn evaluation.