15 #include <boost/thread/locks.hpp>
16 #include <boost/thread/mutex.hpp>
27 using boost::condition_variable;
50 void AddBatch(
const TRowBatch& batch);
103 : recvr_(parent_recvr),
104 is_cancelled_(false),
105 num_remaining_senders_(num_senders),
106 received_first_batch_(false) {
110 unique_lock<mutex> l(
lock_);
112 while (!is_cancelled_ && batch_queue_.empty() && num_remaining_senders_ > 0) {
113 VLOG_ROW <<
"wait arrival fragment_instance_id=" << recvr_->fragment_instance_id()
114 <<
" node=" << recvr_->dest_node_id();
117 SCOPED_TIMER(received_first_batch_ ? NULL : recvr_->first_batch_wait_total_timer_);
118 data_arrival_cv_.wait(l);
122 current_batch_.reset();
126 if (batch_queue_.empty()) {
127 DCHECK_EQ(num_remaining_senders_, 0);
131 received_first_batch_ =
true;
133 DCHECK(!batch_queue_.empty());
134 RowBatch* result = batch_queue_.front().second;
135 recvr_->num_buffered_bytes_ -= batch_queue_.front().first;
136 VLOG_ROW <<
"fetched #rows=" << result->num_rows();
137 batch_queue_.pop_front();
138 data_removal__cv_.notify_one();
139 current_batch_.reset(result);
140 *next_batch = current_batch_.get();
145 unique_lock<mutex> l(
lock_);
146 if (is_cancelled_)
return;
149 COUNTER_ADD(recvr_->bytes_received_counter_, batch_size);
150 DCHECK_GT(num_remaining_senders_, 0);
159 while (!batch_queue_.empty() && recvr_->ExceedsLimit(batch_size) && !is_cancelled_) {
161 VLOG_ROW <<
" wait removal: empty=" << (batch_queue_.empty() ? 1 : 0)
162 <<
" #buffered=" << recvr_->num_buffered_bytes_
163 <<
" batch_size=" << batch_size <<
"\n";
168 bool got_timer_lock =
false;
170 try_mutex::scoped_try_lock timer_lock(recvr_->buffer_wall_timer_lock_);
173 data_removal__cv_.wait(l);
174 got_timer_lock =
true;
176 data_removal__cv_.wait(l);
177 got_timer_lock =
false;
195 if (got_timer_lock) data_removal__cv_.notify_one();
198 if (!is_cancelled_) {
205 batch =
new RowBatch(recvr_->row_desc(), thrift_batch, recvr_->mem_tracker());
208 <<
" batch_size=" << batch_size <<
"\n";
209 batch_queue_.push_back(make_pair(batch_size, batch));
210 recvr_->num_buffered_bytes_ += batch_size;
211 data_arrival_cv_.notify_one();
216 lock_guard<mutex> l(
lock_);
219 VLOG_FILE <<
"decremented senders: fragment_instance_id="
228 lock_guard<mutex> l(
lock_);
229 if (is_cancelled_)
return;
230 is_cancelled_ =
true;
231 VLOG_QUERY <<
"cancelled stream: fragment_instance_id_="
232 << recvr_->fragment_instance_id()
233 <<
" node_id=" << recvr_->dest_node_id();
237 data_arrival_cv_.notify_all();
238 data_removal__cv_.notify_all();
240 recvr_->bytes_received_time_series_counter_);
245 for (RowBatchQueue::iterator it = batch_queue_.begin();
246 it != batch_queue_.end(); ++it) {
250 current_batch_.reset();
255 vector<SortedRunMerger::RunBatchSupplier> input_batch_suppliers;
262 input_batch_suppliers.push_back(
291 int num_queues = is_merging ? num_senders : 1;
293 int num_sender_per_queue = is_merging ? 1 : num_senders;
294 for (
int i = 0; i < num_queues; ++i) {
296 num_sender_per_queue, profile));
315 return merger_->GetNext(output_batch, eos);
347 DCHECK(
mgr_ == NULL) <<
"Must call Close()";
RuntimeProfile::TimeSeriesCounter * bytes_received_time_series_counter_
Time series of number of bytes received, samples bytes_received_counter_.
RowDescriptor row_desc_
Row schema, copied from the caller of CreateRecvr().
Status DeregisterRecvr(const TUniqueId &fragment_instance_id, PlanNodeId node_id)
Remove receiver block for fragment_instance_id/node_id from the map.
RuntimeProfile::Counter * first_batch_wait_total_timer_
void RemoveSender(int sender_id)
condition_variable data_removal__cv_
RuntimeProfile * profile_
Runtime profile storing the counters below.
PlanNodeId dest_node_id() const
DataStreamRecvr(DataStreamMgr *stream_mgr, MemTracker *parent_tracker, const RowDescriptor &row_desc, const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit, RuntimeProfile *profile)
TUniqueId fragment_instance_id_
Fragment and node id of the destination exchange node this receiver is used by.
Status GetBatch(RowBatch **next_batch)
void CancelStream()
Empties the sender queues and notifies all waiting consumers of cancellation.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
boost::mutex lock_
protects all fields below
SenderQueue(DataStreamRecvr *parent_recvr, int num_senders, RuntimeProfile *profile)
bool received_first_batch_
#define ADD_TIMER(profile, name)
Counter * inactive_timer()
RowBatchQueue batch_queue_
#define COUNTER_ADD(c, v)
RuntimeProfile::Counter * deserialize_row_batch_timer_
boost::scoped_ptr< MemTracker > mem_tracker_
Memtracker for batches in the sender queue(s).
const TUniqueId & fragment_instance_id() const
AtomicInt< int > num_buffered_bytes_
total number of bytes held across all sender queues.
int num_remaining_senders_
list< pair< int, RowBatch * > > RowBatchQueue
const RowDescriptor & row_desc() const
#define ADD_COUNTER(profile, name, unit)
void AddBatch(const TRowBatch &batch)
static int GetBatchSize(const TRowBatch &batch)
Utility function: returns total size of batch.
std::vector< SenderQueue * > sender_queues_
scoped_ptr< RowBatch > current_batch_
void TransferResourceOwnership(RowBatch *dest)
RuntimeProfile::Counter * buffer_full_total_timer_
This class is thread-safe.
static const Status CANCELLED
void Close()
Deregister from DataStreamMgr instance, which shares ownership of this instance.
Status CreateMerger(const TupleRowComparator &less_than)
condition_variable data_arrival_cv_
Status GetNext(RowBatch *output_batch, bool *eos)
RuntimeProfile::Counter * bytes_received_counter_
Number of bytes received.
#define ADD_TIME_SERIES_COUNTER(profile, name, src_counter)
void TransferAllResources(RowBatch *transfer_batch)
ObjectPool sender_queue_pool_
Pool of sender queues.
RuntimeProfile::Counter * buffer_full_wall_timer_
Wall time senders spend waiting for the recv buffer to have capacity.
boost::scoped_ptr< SortedRunMerger > merger_
SortedRunMerger used to merge rows from different senders.
void AddBatch(const TRowBatch &thrift_batch, int sender_id)
DataStreamMgr * mgr_
DataStreamMgr instance used to create this recvr. (Not owned)
RuntimeProfile::Counter * data_arrival_timer_
Total time spent waiting for data to arrive in the recv buffer.
static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter *counter)
Stops 'counter' from receiving any more samples.
Status GetBatch(RowBatch **next_batch)
RowBatch * current_batch() const