Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
|
#include <data-stream-recvr.h>
Classes | |
class | SenderQueue |
Public Member Functions | |
~DataStreamRecvr () | |
Status | GetBatch (RowBatch **next_batch) |
void | Close () |
Deregister from DataStreamMgr instance, which shares ownership of this instance. More... | |
Status | CreateMerger (const TupleRowComparator &less_than) |
Status | GetNext (RowBatch *output_batch, bool *eos) |
void | TransferAllResources (RowBatch *transfer_batch) |
const TUniqueId & | fragment_instance_id () const |
PlanNodeId | dest_node_id () const |
const RowDescriptor & | row_desc () const |
MemTracker * | mem_tracker () const |
Private Member Functions | |
DataStreamRecvr (DataStreamMgr *stream_mgr, MemTracker *parent_tracker, const RowDescriptor &row_desc, const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit, RuntimeProfile *profile) | |
void | AddBatch (const TRowBatch &thrift_batch, int sender_id) |
void | RemoveSender (int sender_id) |
void | CancelStream () |
Empties the sender queues and notifies all waiting consumers of cancellation. More... | |
bool | ExceedsLimit (int batch_size) |
Private Attributes | |
DataStreamMgr * | mgr_ |
DataStreamMgr instance used to create this recvr. (Not owned) More... | |
TUniqueId | fragment_instance_id_ |
Fragment and node id of the destination exchange node this receiver is used by. More... | |
PlanNodeId | dest_node_id_ |
int | total_buffer_limit_ |
RowDescriptor | row_desc_ |
Row schema, copied from the caller of CreateRecvr(). More... | |
bool | is_merging_ |
AtomicInt< int > | num_buffered_bytes_ |
total number of bytes held across all sender queues. More... | |
boost::scoped_ptr< MemTracker > | mem_tracker_ |
Memtracker for batches in the sender queue(s). More... | |
std::vector< SenderQueue * > | sender_queues_ |
boost::scoped_ptr < SortedRunMerger > | merger_ |
SortedRunMerger used to merge rows from different senders. More... | |
ObjectPool | sender_queue_pool_ |
Pool of sender queues. More... | |
RuntimeProfile * | profile_ |
Runtime profile storing the counters below. More... | |
RuntimeProfile::Counter * | bytes_received_counter_ |
Number of bytes received. More... | |
RuntimeProfile::TimeSeriesCounter * | bytes_received_time_series_counter_ |
Time series of number of bytes received, samples bytes_received_counter_. More... | |
RuntimeProfile::Counter * | deserialize_row_batch_timer_ |
RuntimeProfile::Counter * | first_batch_wait_total_timer_ |
RuntimeProfile::Counter * | buffer_full_total_timer_ |
boost::try_mutex | buffer_wall_timer_lock_ |
RuntimeProfile::Counter * | buffer_full_wall_timer_ |
Wall time senders spend waiting for the recv buffer to have capacity. More... | |
RuntimeProfile::Counter * | data_arrival_timer_ |
Total time spent waiting for data to arrive in the recv buffer. More... | |
Friends | |
class | DataStreamMgr |
Single receiver of an m:n data stream. DataStreamRecvr maintains one or more queues of row batches received by a DataStreamMgr from one or more sender fragment instances. Receivers are created via DataStreamMgr::CreateRecvr(). Ownership of a stream recvr is shared between the DataStreamMgr that created it and the caller of DataStreamMgr::CreateRecvr() (i.e. the exchange node) The is_merging_ member determines if the recvr merges input streams from different sender fragment instances according to a specified sort order. If is_merging_ = false : Only one batch queue is maintained for row batches from all sender fragment instances. These row batches are returned one at a time via GetBatch(). If is_merging_ is true : One queue is created for the batches from each distinct sender. A SortedRunMerger instance must be created via CreateMerger() prior to retrieving any rows from the receiver. Rows are retrieved from the receiver via GetNext(RowBatch* output_batch, int limit, bool eos). After the final call to GetNext(), TransferAllResources() must be called to transfer resources from the input batches from each sender to the caller's output batch. The receiver sets deep_copy to false on the merger - resources are transferred from the input batches from each sender queue to the merger to the output batch by the merger itself as it processes each run. DataStreamRecvr::Close() must be called by the caller of CreateRecvr() to remove the recvr instance from the tracking structure of its DataStreamMgr in all cases.
Definition at line 60 of file data-stream-recvr.h.
impala::DataStreamRecvr::~DataStreamRecvr | ( | ) |
Definition at line 346 of file data-stream-recvr.cc.
References mem_tracker_, and mgr_.
|
private |
Definition at line 277 of file data-stream-recvr.cc.
References impala::ObjectPool::Add(), ADD_COUNTER, ADD_TIME_SERIES_COUNTER, ADD_TIMER, buffer_full_total_timer_, buffer_full_wall_timer_, bytes_received_counter_, bytes_received_time_series_counter_, data_arrival_timer_, deserialize_row_batch_timer_, first_batch_wait_total_timer_, impala::RuntimeProfile::inactive_timer(), mem_tracker_, profile_, sender_queue_pool_, and sender_queues_.
|
private |
Add a new batch of rows to the appropriate sender queue, blocking if the queue is full. Called from DataStreamMgr.
Definition at line 318 of file data-stream-recvr.cc.
References is_merging_, and sender_queues_.
|
private |
Empties the sender queues and notifies all waiting consumers of cancellation.
Definition at line 329 of file data-stream-recvr.cc.
References sender_queues_.
void impala::DataStreamRecvr::Close | ( | ) |
Deregister from DataStreamMgr instance, which shares ownership of this instance.
Definition at line 335 of file data-stream-recvr.cc.
References impala::DataStreamMgr::DeregisterRecvr(), dest_node_id(), fragment_instance_id(), merger_, mgr_, and sender_queues_.
Status impala::DataStreamRecvr::CreateMerger | ( | const TupleRowComparator & | less_than | ) |
Create a SortedRunMerger instance to merge rows from multiple sender according to the specified row comparator. Fetches the first batches from the individual sender queues. The exprs used in less_than must have already been prepared and opened.
Definition at line 253 of file data-stream-recvr.cc.
References impala::DataStreamRecvr::SenderQueue::GetBatch(), is_merging_, merger_, impala::Status::OK, profile_, RETURN_IF_ERROR, row_desc_, and sender_queues_.
|
inline |
Definition at line 89 of file data-stream-recvr.h.
References dest_node_id_.
Referenced by Close(), and impala::DataStreamRecvr::SenderQueue::DecrementSenders().
|
inlineprivate |
Return true if the addition of a new batch of size 'batch_size' would exceed the total buffer limit.
Definition at line 115 of file data-stream-recvr.h.
References num_buffered_bytes_, and total_buffer_limit_.
|
inline |
Definition at line 88 of file data-stream-recvr.h.
References fragment_instance_id_.
Referenced by Close(), and impala::DataStreamRecvr::SenderQueue::DecrementSenders().
Returns next row batch in data stream; blocks if there aren't any. Retains ownership of the returned batch. The caller must acquire data from the returned batch before the next call to GetBatch(). A NULL returned batch indicated eos. Must only be called if is_merging_ is false. TODO: This is currently only exposed to the non-merging version of the exchange. Refactor so both merging and non-merging exchange use GetNext(RowBatch*, bool* eos).
Definition at line 352 of file data-stream-recvr.cc.
References is_merging_, and sender_queues_.
Fill output_batch with the next batch of rows obtained by merging the per-sender input streams. Must only be called if is_merging_ is true.
Definition at line 313 of file data-stream-recvr.cc.
References merger_.
|
inline |
Definition at line 91 of file data-stream-recvr.h.
References mem_tracker_.
|
private |
Indicate that a particular sender is done. Delegated to the appropriate sender queue. Called from DataStreamMgr.
Definition at line 324 of file data-stream-recvr.cc.
References is_merging_, and sender_queues_.
|
inline |
Definition at line 90 of file data-stream-recvr.h.
References row_desc_.
void impala::DataStreamRecvr::TransferAllResources | ( | RowBatch * | transfer_batch | ) |
Transfer all resources from the current batches being processed from each sender queue to the specified batch.
Definition at line 269 of file data-stream-recvr.cc.
References impala::DataStreamRecvr::SenderQueue::current_batch(), sender_queues_, and impala::RowBatch::TransferResourceOwnership().
|
friend |
Definition at line 94 of file data-stream-recvr.h.
|
private |
Total time (summed across all threads) spent waiting for the recv buffer to be drained so that new batches can be added. Remote plan fragments are blocked for the same amount of time.
Definition at line 175 of file data-stream-recvr.h.
Referenced by DataStreamRecvr().
|
private |
Wall time senders spend waiting for the recv buffer to have capacity.
Definition at line 185 of file data-stream-recvr.h.
Referenced by DataStreamRecvr().
|
private |
Protects access to buffer_full_wall_timer_. We only want one thread to be running the timer at any time, and we use this try_mutex to enforce this condition. If a thread does not get the lock, it continues to execute, but without running the timer.
Definition at line 182 of file data-stream-recvr.h.
|
private |
Number of bytes received.
Definition at line 160 of file data-stream-recvr.h.
Referenced by DataStreamRecvr().
|
private |
Time series of number of bytes received, samples bytes_received_counter_.
Definition at line 163 of file data-stream-recvr.h.
Referenced by DataStreamRecvr().
|
private |
Total time spent waiting for data to arrive in the recv buffer.
Definition at line 188 of file data-stream-recvr.h.
Referenced by DataStreamRecvr().
|
private |
Definition at line 165 of file data-stream-recvr.h.
Referenced by DataStreamRecvr().
|
private |
Definition at line 124 of file data-stream-recvr.h.
Referenced by dest_node_id().
|
private |
Time spent waiting until the first batch arrives across all queues. TODO: Turn this into a wall-clock timer.
Definition at line 169 of file data-stream-recvr.h.
Referenced by DataStreamRecvr().
|
private |
Fragment and node id of the destination exchange node this receiver is used by.
Definition at line 123 of file data-stream-recvr.h.
Referenced by fragment_instance_id().
|
private |
True if this reciver merges incoming rows from different senders. Per-sender row batch queues are maintained in this case.
Definition at line 136 of file data-stream-recvr.h.
Referenced by AddBatch(), CreateMerger(), GetBatch(), and RemoveSender().
|
private |
Memtracker for batches in the sender queue(s).
Definition at line 142 of file data-stream-recvr.h.
Referenced by DataStreamRecvr(), mem_tracker(), and ~DataStreamRecvr().
|
private |
SortedRunMerger used to merge rows from different senders.
Definition at line 151 of file data-stream-recvr.h.
Referenced by Close(), CreateMerger(), and GetNext().
|
private |
DataStreamMgr instance used to create this recvr. (Not owned)
Definition at line 120 of file data-stream-recvr.h.
Referenced by Close(), and ~DataStreamRecvr().
|
private |
total number of bytes held across all sender queues.
Definition at line 139 of file data-stream-recvr.h.
Referenced by ExceedsLimit().
|
private |
Runtime profile storing the counters below.
Definition at line 157 of file data-stream-recvr.h.
Referenced by CreateMerger(), and DataStreamRecvr().
|
private |
Row schema, copied from the caller of CreateRecvr().
Definition at line 132 of file data-stream-recvr.h.
Referenced by CreateMerger(), and row_desc().
|
private |
Pool of sender queues.
Definition at line 154 of file data-stream-recvr.h.
Referenced by DataStreamRecvr().
|
private |
One or more queues of row batches received from senders. If is_merging_ is true, there is one SenderQueue for each sender. Otherwise, row batches from all senders are placed in the same SenderQueue. The SenderQueue instances are owned by the receiver and placed in sender_queue_pool_.
Definition at line 148 of file data-stream-recvr.h.
Referenced by AddBatch(), CancelStream(), Close(), CreateMerger(), DataStreamRecvr(), GetBatch(), RemoveSender(), and TransferAllResources().
|
private |
soft upper limit on the total amount of buffering allowed for this stream across all sender queues. we stop acking incoming data once the amount of buffered data exceeds this value
Definition at line 129 of file data-stream-recvr.h.
Referenced by ExceedsLimit().