Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
data-stream-recvr.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <boost/thread/locks.hpp>
16 #include <boost/thread/mutex.hpp>
17 
20 #include "runtime/row-batch.h"
22 #include "util/runtime-profile.h"
24 
25 #include "common/names.h"
26 
27 using boost::condition_variable;
28 
29 namespace impala {
30 
31 // Implements a blocking queue of row batches from one or more senders. One queue
32 // is maintained per sender if is_merging_ is true for the enclosing receiver, otherwise
33 // rows from all senders are placed in the same queue.
35  public:
36  SenderQueue(DataStreamRecvr* parent_recvr, int num_senders, RuntimeProfile* profile);
37 
38  // Return the next batch form this sender queue. Sets the returned batch in cur_batch_.
39  // A returned batch that is not filled to capacity does *not* indicate
40  // end-of-stream.
41  // The call blocks until another batch arrives or all senders close
42  // their channels. The returned batch is owned by the sender queue. The caller
43  // must acquire data from the returned batch before the next call to GetBatch().
44  Status GetBatch(RowBatch** next_batch);
45 
46  // Adds a row batch to this sender queue if this stream has not been cancelled;
47  // blocks if this will make the stream exceed its buffer limit.
48  // If the total size of the batches in this queue would exceed the allowed buffer size,
49  // the queue is considered full and the call blocks until a batch is dequeued.
50  void AddBatch(const TRowBatch& batch);
51 
52  // Decrement the number of remaining senders for this queue and signal eos ("new data")
53  // if the count drops to 0. The number of senders will be 1 for a merging
54  // DataStreamRecvr.
55  void DecrementSenders();
56 
57  // Set cancellation flag and signal cancellation to receiver and sender. Subsequent
58  // incoming batches will be dropped.
59  void Cancel();
60 
61  // Must be called once to cleanup any queued resources.
62  void Close();
63 
64  // Returns the current batch from this queue being processed by a consumer.
65  RowBatch* current_batch() const { return current_batch_.get(); }
66 
67  private:
68  // Receiver of which this queue is a member.
70 
71  // protects all subsequent data.
72  mutex lock_;
73 
74  // if true, the receiver fragment for this stream got cancelled
76 
77  // number of senders which haven't closed the channel yet
78  // (if it drops to 0, end-of-stream is true)
80 
81  // signal arrival of new batch or the eos/cancelled condition
82  condition_variable data_arrival_cv_;
83 
84  // signal removal of data by stream consumer
85  condition_variable data_removal__cv_;
86 
87  // queue of (batch length, batch) pairs. The SenderQueue block owns memory to
88  // these batches. They are handed off to the caller via GetBatch.
89  typedef list<pair<int, RowBatch*> > RowBatchQueue;
91 
92  // The batch that was most recently returned via GetBatch(), i.e. the current batch
93  // from this queue being processed by a consumer. Is destroyed when the next batch
94  // is retrieved.
95  scoped_ptr<RowBatch> current_batch_;
96 
97  // Set to true when the first batch has been received
99 };
100 
102  RuntimeProfile* profile)
103  : recvr_(parent_recvr),
104  is_cancelled_(false),
105  num_remaining_senders_(num_senders),
106  received_first_batch_(false) {
107 }
108 
110  unique_lock<mutex> l(lock_);
111  // wait until something shows up or we know we're done
112  while (!is_cancelled_ && batch_queue_.empty() && num_remaining_senders_ > 0) {
113  VLOG_ROW << "wait arrival fragment_instance_id=" << recvr_->fragment_instance_id()
114  << " node=" << recvr_->dest_node_id();
115  // Don't count time spent waiting on the sender as active time.
116  SCOPED_TIMER(recvr_->data_arrival_timer_);
117  SCOPED_TIMER(received_first_batch_ ? NULL : recvr_->first_batch_wait_total_timer_);
118  data_arrival_cv_.wait(l);
119  }
120 
121  // cur_batch_ must be replaced with the returned batch.
122  current_batch_.reset();
123  *next_batch = NULL;
124  if (is_cancelled_) return Status::CANCELLED;
125 
126  if (batch_queue_.empty()) {
127  DCHECK_EQ(num_remaining_senders_, 0);
128  return Status::OK;
129  }
130 
131  received_first_batch_ = true;
132 
133  DCHECK(!batch_queue_.empty());
134  RowBatch* result = batch_queue_.front().second;
135  recvr_->num_buffered_bytes_ -= batch_queue_.front().first;
136  VLOG_ROW << "fetched #rows=" << result->num_rows();
137  batch_queue_.pop_front();
138  data_removal__cv_.notify_one();
139  current_batch_.reset(result);
140  *next_batch = current_batch_.get();
141  return Status::OK;
142 }
143 
144 void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) {
145  unique_lock<mutex> l(lock_);
146  if (is_cancelled_) return;
147 
148  int batch_size = RowBatch::GetBatchSize(thrift_batch);
149  COUNTER_ADD(recvr_->bytes_received_counter_, batch_size);
150  DCHECK_GT(num_remaining_senders_, 0);
151 
152  // if there's something in the queue and this batch will push us over the
153  // buffer limit we need to wait until the batch gets drained.
154  // Note: It's important that we enqueue thrift_batch regardless of buffer limit if
155  // the queue is currently empty. In the case of a merging receiver, batches are
156  // received from a specific queue based on data order, and the pipeline will stall
157  // if the merger is waiting for data from an empty queue that cannot be filled because
158  // the limit has been reached.
159  while (!batch_queue_.empty() && recvr_->ExceedsLimit(batch_size) && !is_cancelled_) {
160  SCOPED_TIMER(recvr_->buffer_full_total_timer_);
161  VLOG_ROW << " wait removal: empty=" << (batch_queue_.empty() ? 1 : 0)
162  << " #buffered=" << recvr_->num_buffered_bytes_
163  << " batch_size=" << batch_size << "\n";
164 
165  // We only want one thread running the timer at any one time. Only
166  // one thread may lock the try_lock, and that 'winner' starts the
167  // scoped timer.
168  bool got_timer_lock = false;
169  {
170  try_mutex::scoped_try_lock timer_lock(recvr_->buffer_wall_timer_lock_);
171  if (timer_lock) {
172  SCOPED_TIMER(recvr_->buffer_full_wall_timer_);
173  data_removal__cv_.wait(l);
174  got_timer_lock = true;
175  } else {
176  data_removal__cv_.wait(l);
177  got_timer_lock = false;
178  }
179  }
180  // If we had the timer lock, wake up another writer to make sure
181  // that they (if no-one else) starts the timer. The guarantee is
182  // that if no thread has the try_lock, the thread that we wake up
183  // here will obtain it and run the timer.
184  //
185  // We must have given up the try_lock by this point, otherwise the
186  // woken thread might not successfully take the lock once it has
187  // woken up. (In fact, no other thread will run in AddBatch until
188  // this thread exits because of mutual exclusion around lock_, but
189  // it's good not to rely on that fact).
190  //
191  // The timer may therefore be an underestimate by the amount of
192  // time it takes this thread to finish (and yield lock_) and the
193  // notified thread to be woken up and to acquire the try_lock. In
194  // practice, this time is small relative to the total wait time.
195  if (got_timer_lock) data_removal__cv_.notify_one();
196  }
197 
198  if (!is_cancelled_) {
199  RowBatch* batch = NULL;
200  {
201  SCOPED_TIMER(recvr_->deserialize_row_batch_timer_);
202  // Note: if this function makes a row batch, the batch *must* be added
203  // to batch_queue_. It is not valid to create the row batch and destroy
204  // it in this thread.
205  batch = new RowBatch(recvr_->row_desc(), thrift_batch, recvr_->mem_tracker());
206  }
207  VLOG_ROW << "added #rows=" << batch->num_rows()
208  << " batch_size=" << batch_size << "\n";
209  batch_queue_.push_back(make_pair(batch_size, batch));
210  recvr_->num_buffered_bytes_ += batch_size;
211  data_arrival_cv_.notify_one();
212  }
213 }
214 
216  lock_guard<mutex> l(lock_);
217  DCHECK_GT(num_remaining_senders_, 0);
219  VLOG_FILE << "decremented senders: fragment_instance_id="
221  << " node_id=" << recvr_->dest_node_id()
222  << " #senders=" << num_remaining_senders_;
223  if (num_remaining_senders_ == 0) data_arrival_cv_.notify_one();
224 }
225 
227  {
228  lock_guard<mutex> l(lock_);
229  if (is_cancelled_) return;
230  is_cancelled_ = true;
231  VLOG_QUERY << "cancelled stream: fragment_instance_id_="
232  << recvr_->fragment_instance_id()
233  << " node_id=" << recvr_->dest_node_id();
234  }
235  // Wake up all threads waiting to produce/consume batches. They will all
236  // notice that the stream is cancelled and handle it.
237  data_arrival_cv_.notify_all();
238  data_removal__cv_.notify_all();
240  recvr_->bytes_received_time_series_counter_);
241 }
242 
244  // Delete any batches queued in batch_queue_
245  for (RowBatchQueue::iterator it = batch_queue_.begin();
246  it != batch_queue_.end(); ++it) {
247  delete it->second;
248  }
249 
250  current_batch_.reset();
251 }
252 
254  DCHECK(is_merging_);
255  vector<SortedRunMerger::RunBatchSupplier> input_batch_suppliers;
256  input_batch_suppliers.reserve(sender_queues_.size());
257 
258  // Create the merger that will a single stream of sorted rows.
259  merger_.reset(new SortedRunMerger(less_than, &row_desc_, profile_, false));
260 
261  for (int i = 0; i < sender_queues_.size(); ++i) {
262  input_batch_suppliers.push_back(
263  bind(mem_fn(&SenderQueue::GetBatch), sender_queues_[i], _1));
264  }
265  RETURN_IF_ERROR(merger_->Prepare(input_batch_suppliers));
266  return Status::OK;
267 }
268 
270  BOOST_FOREACH(SenderQueue* sender_queue, sender_queues_) {
271  if (sender_queue->current_batch() != NULL) {
272  sender_queue->current_batch()->TransferResourceOwnership(transfer_batch);
273  }
274  }
275 }
276 
278  const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id,
279  PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit,
280  RuntimeProfile* profile)
281  : mgr_(stream_mgr),
282  fragment_instance_id_(fragment_instance_id),
283  dest_node_id_(dest_node_id),
284  total_buffer_limit_(total_buffer_limit),
285  row_desc_(row_desc),
286  is_merging_(is_merging),
288  profile_(profile) {
289  mem_tracker_.reset(new MemTracker(-1, -1, "DataStreamRecvr", parent_tracker));
290  // Create one queue per sender if is_merging is true.
291  int num_queues = is_merging ? num_senders : 1;
292  sender_queues_.reserve(num_queues);
293  int num_sender_per_queue = is_merging ? 1 : num_senders;
294  for (int i = 0; i < num_queues; ++i) {
295  SenderQueue* queue = sender_queue_pool_.Add(new SenderQueue(this,
296  num_sender_per_queue, profile));
297  sender_queues_.push_back(queue);
298  }
299 
300  // Initialize the counters
302  ADD_COUNTER(profile_, "BytesReceived", TUnit::BYTES);
306  ADD_TIMER(profile_, "DeserializeRowBatchTimer");
307  buffer_full_wall_timer_ = ADD_TIMER(profile_, "SendersBlockedTimer");
308  buffer_full_total_timer_ = ADD_TIMER(profile_, "SendersBlockedTotalTimer(*)");
310  first_batch_wait_total_timer_ = ADD_TIMER(profile_, "FirstBatchArrivalWaitTime");
311 }
312 
313 Status DataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
314  DCHECK_NOTNULL(merger_.get());
315  return merger_->GetNext(output_batch, eos);
316 }
317 
318 void DataStreamRecvr::AddBatch(const TRowBatch& thrift_batch, int sender_id) {
319  int use_sender_id = is_merging_ ? sender_id : 0;
320  // Add all batches to the same queue if is_merging_ is false.
321  sender_queues_[use_sender_id]->AddBatch(thrift_batch);
322 }
323 
324 void DataStreamRecvr::RemoveSender(int sender_id) {
325  int use_sender_id = is_merging_ ? sender_id : 0;
326  sender_queues_[use_sender_id]->DecrementSenders();
327 }
328 
330  for (int i = 0; i < sender_queues_.size(); ++i) {
331  sender_queues_[i]->Cancel();
332  }
333 }
334 
336  for (int i = 0; i < sender_queues_.size(); ++i) {
337  sender_queues_[i]->Close();
338  }
339  // Remove this receiver from the DataStreamMgr that created it.
340  // TODO: log error msg
342  mgr_ = NULL;
343  merger_.reset();
344 }
345 
347  DCHECK(mgr_ == NULL) << "Must call Close()";
348  mem_tracker_->UnregisterFromParent();
349  mem_tracker_.reset();
350 }
351 
353  DCHECK(!is_merging_);
354  DCHECK_EQ(sender_queues_.size(), 1);
355  return sender_queues_[0]->GetBatch(next_batch);
356 }
357 
358 }
RuntimeProfile::TimeSeriesCounter * bytes_received_time_series_counter_
Time series of number of bytes received, samples bytes_received_counter_.
RowDescriptor row_desc_
Row schema, copied from the caller of CreateRecvr().
Status DeregisterRecvr(const TUniqueId &fragment_instance_id, PlanNodeId node_id)
Remove receiver block for fragment_instance_id/node_id from the map.
RuntimeProfile::Counter * first_batch_wait_total_timer_
void RemoveSender(int sender_id)
int num_rows() const
Definition: row-batch.h:215
RuntimeProfile * profile_
Runtime profile storing the counters below.
int PlanNodeId
Definition: global-types.h:26
PlanNodeId dest_node_id() const
DataStreamRecvr(DataStreamMgr *stream_mgr, MemTracker *parent_tracker, const RowDescriptor &row_desc, const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit, RuntimeProfile *profile)
TUniqueId fragment_instance_id_
Fragment and node id of the destination exchange node this receiver is used by.
Status GetBatch(RowBatch **next_batch)
void CancelStream()
Empties the sender queues and notifies all waiting consumers of cancellation.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
boost::mutex lock_
protects all fields below
Definition: coordinator.h:233
SenderQueue(DataStreamRecvr *parent_recvr, int num_senders, RuntimeProfile *profile)
#define ADD_TIMER(profile, name)
#define COUNTER_ADD(c, v)
#define SCOPED_TIMER(c)
RuntimeProfile::Counter * deserialize_row_batch_timer_
boost::scoped_ptr< MemTracker > mem_tracker_
Memtracker for batches in the sender queue(s).
const TUniqueId & fragment_instance_id() const
AtomicInt< int > num_buffered_bytes_
total number of bytes held across all sender queues.
#define VLOG_QUERY
Definition: logging.h:57
list< pair< int, RowBatch * > > RowBatchQueue
const RowDescriptor & row_desc() const
#define ADD_COUNTER(profile, name, unit)
void AddBatch(const TRowBatch &batch)
static int GetBatchSize(const TRowBatch &batch)
Utility function: returns total size of batch.
Definition: row-batch.cc:264
std::vector< SenderQueue * > sender_queues_
void TransferResourceOwnership(RowBatch *dest)
Definition: row-batch.cc:243
RuntimeProfile::Counter * buffer_full_total_timer_
#define VLOG_ROW
Definition: logging.h:59
This class is thread-safe.
Definition: mem-tracker.h:61
static const Status CANCELLED
Definition: status.h:88
void Close()
Deregister from DataStreamMgr instance, which shares ownership of this instance.
Status CreateMerger(const TupleRowComparator &less_than)
static const Status OK
Definition: status.h:87
Status GetNext(RowBatch *output_batch, bool *eos)
RuntimeProfile::Counter * bytes_received_counter_
Number of bytes received.
#define ADD_TIME_SERIES_COUNTER(profile, name, src_counter)
#define VLOG_FILE
Definition: logging.h:58
void TransferAllResources(RowBatch *transfer_batch)
ObjectPool sender_queue_pool_
Pool of sender queues.
RuntimeProfile::Counter * buffer_full_wall_timer_
Wall time senders spend waiting for the recv buffer to have capacity.
boost::scoped_ptr< SortedRunMerger > merger_
SortedRunMerger used to merge rows from different senders.
void AddBatch(const TRowBatch &thrift_batch, int sender_id)
DataStreamMgr * mgr_
DataStreamMgr instance used to create this recvr. (Not owned)
RuntimeProfile::Counter * data_arrival_timer_
Total time spent waiting for data to arrive in the recv buffer.
static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter *counter)
Stops 'counter' from receiving any more samples.
Status GetBatch(RowBatch **next_batch)