Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
exchange-node.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/exchange-node.h"
16 
17 #include <boost/scoped_ptr.hpp>
18 
21 #include "runtime/runtime-state.h"
22 #include "runtime/row-batch.h"
23 #include "util/debug-util.h"
24 #include "util/runtime-profile.h"
25 #include "gen-cpp/PlanNodes_types.h"
26 
27 #include "common/names.h"
28 
29 using namespace impala;
30 
31 DEFINE_int32(exchg_node_buffer_size_bytes, 1024 * 1024 * 10,
32  "(Advanced) Maximum size of per-query receive-side buffer");
33 
35  ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
36  : ExecNode(pool, tnode, descs),
37  num_senders_(0),
38  stream_recvr_(),
39  input_row_desc_(descs, tnode.exchange_node.input_row_tuples,
40  vector<bool>(
41  tnode.nullable_tuples.begin(),
42  tnode.nullable_tuples.begin() + tnode.exchange_node.input_row_tuples.size())),
43  next_row_idx_(0),
44  is_merging_(tnode.exchange_node.__isset.sort_info),
45  offset_(tnode.exchange_node.__isset.offset ? tnode.exchange_node.offset : 0),
46  num_rows_skipped_(0) {
47  DCHECK_GE(offset_, 0);
48  DCHECK(is_merging_ || (offset_ == 0));
49 }
50 
51 Status ExchangeNode::Init(const TPlanNode& tnode) {
53  if (!is_merging_) return Status::OK;
54 
55  RETURN_IF_ERROR(sort_exec_exprs_.Init(tnode.exchange_node.sort_info, pool_));
56  is_asc_order_ = tnode.exchange_node.sort_info.is_asc_order;
57  nulls_first_ = tnode.exchange_node.sort_info.nulls_first;
58  return Status::OK;
59 }
60 
63  convert_row_batch_timer_ = ADD_TIMER(runtime_profile(), "ConvertRowBatchTime");
64  // TODO: figure out appropriate buffer size
65  DCHECK_GT(num_senders_, 0);
68  FLAGS_exchg_node_buffer_size_bytes, runtime_profile(), is_merging_);
69  if (is_merging_) {
73  }
74  return Status::OK;
75 }
76 
78  SCOPED_TIMER(runtime_profile_->total_time_counter());
80  if (is_merging_) {
84  // CreateMerger() will populate its merging heap with batches from the stream_recvr_,
85  // so it is not necessary to call FillInputRowBatch().
86  stream_recvr_->CreateMerger(less_than);
87  } else {
89  }
90  return Status::OK;
91 }
92 
94  DCHECK(false) << "NYI";
95  return Status("NYI");
96 }
97 
99  if (is_closed()) return;
100  if (is_merging_) sort_exec_exprs_.Close(state);
101  if (stream_recvr_ != NULL) stream_recvr_->Close();
102  stream_recvr_.reset();
103  ExecNode::Close(state);
104 }
105 
107  DCHECK(!is_merging_);
108  Status ret_status;
109  {
111  ret_status = stream_recvr_->GetBatch(&input_batch_);
112  }
113  VLOG_FILE << "exch: has batch=" << (input_batch_ == NULL ? "false" : "true")
114  << " #rows=" << (input_batch_ != NULL ? input_batch_->num_rows() : 0)
115  << " is_cancelled=" << (ret_status.IsCancelled() ? "true" : "false")
116  << " instance_id=" << state->fragment_instance_id();
117  return ret_status;
118 }
119 
120 Status ExchangeNode::GetNext(RuntimeState* state, RowBatch* output_batch, bool* eos) {
121  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
122  SCOPED_TIMER(runtime_profile_->total_time_counter());
123  if (ReachedLimit()) {
124  stream_recvr_->TransferAllResources(output_batch);
125  *eos = true;
126  return Status::OK;
127  } else {
128  *eos = false;
129  }
130 
131  if (is_merging_) return GetNextMerging(state, output_batch, eos);
132 
133  while (true) {
134  {
136  RETURN_IF_CANCELLED(state);
138  // copy rows until we hit the limit/capacity or until we exhaust input_batch_
139  while (!ReachedLimit() && !output_batch->AtCapacity()
140  && input_batch_ != NULL && next_row_idx_ < input_batch_->capacity()) {
142  ++next_row_idx_;
143  int j = output_batch->AddRow();
144  TupleRow* dest = output_batch->GetRow(j);
145  // if the input row is shorter than the output row, make sure not to leave
146  // uninitialized Tuple* around
147  output_batch->ClearRow(dest);
148  // this works as expected if rows from input_batch form a prefix of
149  // rows in output_batch
150  input_batch_->CopyRow(src, dest);
151  output_batch->CommitLastRow();
153  }
155 
156  if (ReachedLimit()) {
157  stream_recvr_->TransferAllResources(output_batch);
158  *eos = true;
159  return Status::OK;
160  }
161  if (output_batch->AtCapacity()) return Status::OK;
162  }
163 
164  // we need more rows
165  stream_recvr_->TransferAllResources(output_batch);
167  *eos = (input_batch_ == NULL);
168  if (*eos) return Status::OK;
169  next_row_idx_ = 0;
170  DCHECK(input_batch_->row_desc().IsPrefixOf(output_batch->row_desc()));
171  }
172 }
173 
175  bool* eos) {
176  DCHECK_EQ(output_batch->num_rows(), 0);
177  RETURN_IF_ERROR(stream_recvr_->GetNext(output_batch, eos));
178 
179  while ((num_rows_skipped_ < offset_)) {
180  num_rows_skipped_ += output_batch->num_rows();
181  // Throw away rows in the output batch until the offset is skipped.
182  int rows_to_keep = num_rows_skipped_ - offset_;
183  if (rows_to_keep > 0) {
184  output_batch->CopyRows(0, output_batch->num_rows() - rows_to_keep, rows_to_keep);
185  output_batch->set_num_rows(rows_to_keep);
186  } else {
187  output_batch->set_num_rows(0);
188  }
189  if (rows_to_keep > 0 || *eos || output_batch->AtCapacity()) break;
190  RETURN_IF_ERROR(stream_recvr_->GetNext(output_batch, eos));
191  }
192 
193  num_rows_returned_ += output_batch->num_rows();
194  if (ReachedLimit()) {
195  output_batch->set_num_rows(output_batch->num_rows() - (num_rows_returned_ - limit_));
196  *eos = true;
197  }
198 
199  // On eos, transfer all remaining resources from the input batches maintained
200  // by the merger to the output batch.
201  if (*eos) stream_recvr_->TransferAllResources(output_batch);
202 
204  return Status::OK;
205 }
206 
207 void ExchangeNode::DebugString(int indentation_level, stringstream* out) const {
208  *out << string(indentation_level * 2, ' ');
209  *out << "ExchangeNode(#senders=" << num_senders_;
210  ExecNode::DebugString(indentation_level, out);
211  *out << ")";
212 }
int num_rows() const
Definition: row-batch.h:215
boost::shared_ptr< DataStreamRecvr > CreateRecvr(RuntimeState *state, const RowDescriptor &row_desc, const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile *profile, bool is_merging)
boost::shared_ptr< DataStreamRecvr > stream_recvr_
Definition: exchange-node.h:72
int64_t num_rows_returned_
Definition: exec-node.h:223
void ClearRow(TupleRow *row)
Definition: row-batch.h:187
virtual Status Open(RuntimeState *state)
Blocks until the first batch is available for consumption via GetNext().
int64_t num_rows_skipped_
Number of rows skipped so far.
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
const RowDescriptor & row_desc() const
Definition: row-batch.h:218
virtual Status Init(const TPlanNode &tnode)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
virtual Status Init(const TPlanNode &tnode)
Definition: exec-node.cc:124
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
virtual Status Prepare(RuntimeState *state)
RowDescriptor row_descriptor_
Definition: exec-node.h:215
std::vector< bool > is_asc_order_
Definition: exchange-node.h:96
#define ADD_TIMER(profile, name)
RowDescriptor input_row_desc_
our input rows are a prefix of the rows we produce
Definition: exchange-node.h:75
bool AtCapacity()
Definition: row-batch.h:120
void CopyRows(int dest, int src, int num_rows)
Definition: row-batch.h:179
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Definition: exec-node.cc:378
bool ReachedLimit()
Definition: exec-node.h:159
Status Open(RuntimeState *state)
Open all expressions used for sorting and tuple materialization.
#define SCOPED_TIMER(c)
int64_t limit_
Definition: exec-node.h:222
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
Definition: exec-node.cc:345
MemTracker * expr_mem_tracker()
Definition: exec-node.h:163
RuntimeProfile::Counter * total_network_receive_timer()
SortExecExprs sort_exec_exprs_
Sort expressions and parameters passed to the merging receiver..
Definition: exchange-node.h:95
Status FillInputRowBatch(RuntimeState *state)
ExchangeNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
bool IsPrefixOf(const RowDescriptor &other_desc) const
Definition: descriptors.cc:352
bool IsCancelled() const
Definition: status.h:174
void set_num_rows(int num_rows)
Definition: row-batch.h:113
#define RETURN_IF_CANCELLED(state)
virtual Status Reset(RuntimeState *state)
ObjectPool pool
Status Init(const TSortInfo &sort_info, ObjectPool *pool)
Initialize the expressions from a TSortInfo using the specified pool.
virtual Status Prepare(RuntimeState *state)
Definition: exec-node.cc:130
const std::vector< ExprContext * > & lhs_ordering_expr_ctxs() const
Can only be used after calling Prepare()
const TUniqueId & fragment_instance_id() const
RuntimeProfile::Counter * convert_row_batch_timer_
time spent reconstructing received rows
Definition: exchange-node.h:88
void AddExprCtxsToFree(const std::vector< ExprContext * > &ctxs)
virtual Status QueryMaintenance(RuntimeState *state)
Definition: exec-node.cc:401
bool is_closed()
Definition: exec-node.h:242
void CommitLastRow()
Definition: row-batch.h:109
#define COUNTER_SET(c, v)
RuntimeProfile::Counter * rows_returned_counter_
Definition: exec-node.h:226
static ExecEnv * GetInstance()
Definition: exec-env.h:63
int capacity() const
Definition: row-batch.h:216
void CopyRow(TupleRow *src, TupleRow *dest)
Definition: row-batch.h:173
static const Status OK
Definition: status.h:87
ObjectPool * pool_
Definition: exec-node.h:211
uint8_t offset[7 *64-sizeof(uint64_t)]
DataStreamMgr * stream_mgr()
Definition: exec-env.h:75
Status Prepare(RuntimeState *state, const RowDescriptor &child_row_desc, const RowDescriptor &output_row_desc, MemTracker *expr_mem_tracker)
Prepare all expressions used for sorting and tuple materialization.
Status GetNextMerging(RuntimeState *state, RowBatch *output_batch, bool *eos)
#define VLOG_FILE
Definition: logging.h:58
DEFINE_int32(periodic_counter_update_period_ms, 500,"Period to update rate counters and"" sampling counters in ms")
virtual Status Open(RuntimeState *state)
Definition: exec-node.cc:154
RowBatch * input_batch_
Definition: exchange-node.h:80
int64_t offset_
Offset specifying number of rows to skip.
virtual void Close(RuntimeState *state)
Definition: exec-node.cc:166
void Close(RuntimeState *state)
Close all expressions used for sorting and tuple materialization.
const std::vector< ExprContext * > & rhs_ordering_expr_ctxs() const
Can only be used after calling Open()
std::vector< bool > nulls_first_
Definition: exchange-node.h:97
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
virtual void Close(RuntimeState *state)
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161