Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
impala::DataStreamRecvr Class Reference

#include <data-stream-recvr.h>

Collaboration diagram for impala::DataStreamRecvr:

Classes

class  SenderQueue
 

Public Member Functions

 ~DataStreamRecvr ()
 
Status GetBatch (RowBatch **next_batch)
 
void Close ()
 Deregister from DataStreamMgr instance, which shares ownership of this instance. More...
 
Status CreateMerger (const TupleRowComparator &less_than)
 
Status GetNext (RowBatch *output_batch, bool *eos)
 
void TransferAllResources (RowBatch *transfer_batch)
 
const TUniqueId & fragment_instance_id () const
 
PlanNodeId dest_node_id () const
 
const RowDescriptorrow_desc () const
 
MemTrackermem_tracker () const
 

Private Member Functions

 DataStreamRecvr (DataStreamMgr *stream_mgr, MemTracker *parent_tracker, const RowDescriptor &row_desc, const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit, RuntimeProfile *profile)
 
void AddBatch (const TRowBatch &thrift_batch, int sender_id)
 
void RemoveSender (int sender_id)
 
void CancelStream ()
 Empties the sender queues and notifies all waiting consumers of cancellation. More...
 
bool ExceedsLimit (int batch_size)
 

Private Attributes

DataStreamMgrmgr_
 DataStreamMgr instance used to create this recvr. (Not owned) More...
 
TUniqueId fragment_instance_id_
 Fragment and node id of the destination exchange node this receiver is used by. More...
 
PlanNodeId dest_node_id_
 
int total_buffer_limit_
 
RowDescriptor row_desc_
 Row schema, copied from the caller of CreateRecvr(). More...
 
bool is_merging_
 
AtomicInt< int > num_buffered_bytes_
 total number of bytes held across all sender queues. More...
 
boost::scoped_ptr< MemTrackermem_tracker_
 Memtracker for batches in the sender queue(s). More...
 
std::vector< SenderQueue * > sender_queues_
 
boost::scoped_ptr
< SortedRunMerger
merger_
 SortedRunMerger used to merge rows from different senders. More...
 
ObjectPool sender_queue_pool_
 Pool of sender queues. More...
 
RuntimeProfileprofile_
 Runtime profile storing the counters below. More...
 
RuntimeProfile::Counterbytes_received_counter_
 Number of bytes received. More...
 
RuntimeProfile::TimeSeriesCounterbytes_received_time_series_counter_
 Time series of number of bytes received, samples bytes_received_counter_. More...
 
RuntimeProfile::Counterdeserialize_row_batch_timer_
 
RuntimeProfile::Counterfirst_batch_wait_total_timer_
 
RuntimeProfile::Counterbuffer_full_total_timer_
 
boost::try_mutex buffer_wall_timer_lock_
 
RuntimeProfile::Counterbuffer_full_wall_timer_
 Wall time senders spend waiting for the recv buffer to have capacity. More...
 
RuntimeProfile::Counterdata_arrival_timer_
 Total time spent waiting for data to arrive in the recv buffer. More...
 

Friends

class DataStreamMgr
 

Detailed Description

Single receiver of an m:n data stream. DataStreamRecvr maintains one or more queues of row batches received by a DataStreamMgr from one or more sender fragment instances. Receivers are created via DataStreamMgr::CreateRecvr(). Ownership of a stream recvr is shared between the DataStreamMgr that created it and the caller of DataStreamMgr::CreateRecvr() (i.e. the exchange node) The is_merging_ member determines if the recvr merges input streams from different sender fragment instances according to a specified sort order. If is_merging_ = false : Only one batch queue is maintained for row batches from all sender fragment instances. These row batches are returned one at a time via GetBatch(). If is_merging_ is true : One queue is created for the batches from each distinct sender. A SortedRunMerger instance must be created via CreateMerger() prior to retrieving any rows from the receiver. Rows are retrieved from the receiver via GetNext(RowBatch* output_batch, int limit, bool eos). After the final call to GetNext(), TransferAllResources() must be called to transfer resources from the input batches from each sender to the caller's output batch. The receiver sets deep_copy to false on the merger - resources are transferred from the input batches from each sender queue to the merger to the output batch by the merger itself as it processes each run. DataStreamRecvr::Close() must be called by the caller of CreateRecvr() to remove the recvr instance from the tracking structure of its DataStreamMgr in all cases.

Definition at line 60 of file data-stream-recvr.h.

Constructor & Destructor Documentation

impala::DataStreamRecvr::~DataStreamRecvr ( )

Definition at line 346 of file data-stream-recvr.cc.

References mem_tracker_, and mgr_.

impala::DataStreamRecvr::DataStreamRecvr ( DataStreamMgr stream_mgr,
MemTracker parent_tracker,
const RowDescriptor row_desc,
const TUniqueId &  fragment_instance_id,
PlanNodeId  dest_node_id,
int  num_senders,
bool  is_merging,
int  total_buffer_limit,
RuntimeProfile profile 
)
private

Member Function Documentation

void impala::DataStreamRecvr::AddBatch ( const TRowBatch &  thrift_batch,
int  sender_id 
)
private

Add a new batch of rows to the appropriate sender queue, blocking if the queue is full. Called from DataStreamMgr.

Definition at line 318 of file data-stream-recvr.cc.

References is_merging_, and sender_queues_.

void impala::DataStreamRecvr::CancelStream ( )
private

Empties the sender queues and notifies all waiting consumers of cancellation.

Definition at line 329 of file data-stream-recvr.cc.

References sender_queues_.

void impala::DataStreamRecvr::Close ( )

Deregister from DataStreamMgr instance, which shares ownership of this instance.

Definition at line 335 of file data-stream-recvr.cc.

References impala::DataStreamMgr::DeregisterRecvr(), dest_node_id(), fragment_instance_id(), merger_, mgr_, and sender_queues_.

Status impala::DataStreamRecvr::CreateMerger ( const TupleRowComparator less_than)

Create a SortedRunMerger instance to merge rows from multiple sender according to the specified row comparator. Fetches the first batches from the individual sender queues. The exprs used in less_than must have already been prepared and opened.

Definition at line 253 of file data-stream-recvr.cc.

References impala::DataStreamRecvr::SenderQueue::GetBatch(), is_merging_, merger_, impala::Status::OK, profile_, RETURN_IF_ERROR, row_desc_, and sender_queues_.

PlanNodeId impala::DataStreamRecvr::dest_node_id ( ) const
inline

Definition at line 89 of file data-stream-recvr.h.

References dest_node_id_.

Referenced by Close(), and impala::DataStreamRecvr::SenderQueue::DecrementSenders().

bool impala::DataStreamRecvr::ExceedsLimit ( int  batch_size)
inlineprivate

Return true if the addition of a new batch of size 'batch_size' would exceed the total buffer limit.

Definition at line 115 of file data-stream-recvr.h.

References num_buffered_bytes_, and total_buffer_limit_.

const TUniqueId& impala::DataStreamRecvr::fragment_instance_id ( ) const
inline
Status impala::DataStreamRecvr::GetBatch ( RowBatch **  next_batch)

Returns next row batch in data stream; blocks if there aren't any. Retains ownership of the returned batch. The caller must acquire data from the returned batch before the next call to GetBatch(). A NULL returned batch indicated eos. Must only be called if is_merging_ is false. TODO: This is currently only exposed to the non-merging version of the exchange. Refactor so both merging and non-merging exchange use GetNext(RowBatch*, bool* eos).

Definition at line 352 of file data-stream-recvr.cc.

References is_merging_, and sender_queues_.

Status impala::DataStreamRecvr::GetNext ( RowBatch output_batch,
bool eos 
)

Fill output_batch with the next batch of rows obtained by merging the per-sender input streams. Must only be called if is_merging_ is true.

Definition at line 313 of file data-stream-recvr.cc.

References merger_.

MemTracker* impala::DataStreamRecvr::mem_tracker ( ) const
inline

Definition at line 91 of file data-stream-recvr.h.

References mem_tracker_.

void impala::DataStreamRecvr::RemoveSender ( int  sender_id)
private

Indicate that a particular sender is done. Delegated to the appropriate sender queue. Called from DataStreamMgr.

Definition at line 324 of file data-stream-recvr.cc.

References is_merging_, and sender_queues_.

const RowDescriptor& impala::DataStreamRecvr::row_desc ( ) const
inline

Definition at line 90 of file data-stream-recvr.h.

References row_desc_.

void impala::DataStreamRecvr::TransferAllResources ( RowBatch transfer_batch)

Transfer all resources from the current batches being processed from each sender queue to the specified batch.

Definition at line 269 of file data-stream-recvr.cc.

References impala::DataStreamRecvr::SenderQueue::current_batch(), sender_queues_, and impala::RowBatch::TransferResourceOwnership().

Friends And Related Function Documentation

friend class DataStreamMgr
friend

Definition at line 94 of file data-stream-recvr.h.

Member Data Documentation

RuntimeProfile::Counter* impala::DataStreamRecvr::buffer_full_total_timer_
private

Total time (summed across all threads) spent waiting for the recv buffer to be drained so that new batches can be added. Remote plan fragments are blocked for the same amount of time.

Definition at line 175 of file data-stream-recvr.h.

Referenced by DataStreamRecvr().

RuntimeProfile::Counter* impala::DataStreamRecvr::buffer_full_wall_timer_
private

Wall time senders spend waiting for the recv buffer to have capacity.

Definition at line 185 of file data-stream-recvr.h.

Referenced by DataStreamRecvr().

boost::try_mutex impala::DataStreamRecvr::buffer_wall_timer_lock_
private

Protects access to buffer_full_wall_timer_. We only want one thread to be running the timer at any time, and we use this try_mutex to enforce this condition. If a thread does not get the lock, it continues to execute, but without running the timer.

Definition at line 182 of file data-stream-recvr.h.

RuntimeProfile::Counter* impala::DataStreamRecvr::bytes_received_counter_
private

Number of bytes received.

Definition at line 160 of file data-stream-recvr.h.

Referenced by DataStreamRecvr().

RuntimeProfile::TimeSeriesCounter* impala::DataStreamRecvr::bytes_received_time_series_counter_
private

Time series of number of bytes received, samples bytes_received_counter_.

Definition at line 163 of file data-stream-recvr.h.

Referenced by DataStreamRecvr().

RuntimeProfile::Counter* impala::DataStreamRecvr::data_arrival_timer_
private

Total time spent waiting for data to arrive in the recv buffer.

Definition at line 188 of file data-stream-recvr.h.

Referenced by DataStreamRecvr().

RuntimeProfile::Counter* impala::DataStreamRecvr::deserialize_row_batch_timer_
private

Definition at line 165 of file data-stream-recvr.h.

Referenced by DataStreamRecvr().

PlanNodeId impala::DataStreamRecvr::dest_node_id_
private

Definition at line 124 of file data-stream-recvr.h.

Referenced by dest_node_id().

RuntimeProfile::Counter* impala::DataStreamRecvr::first_batch_wait_total_timer_
private

Time spent waiting until the first batch arrives across all queues. TODO: Turn this into a wall-clock timer.

Definition at line 169 of file data-stream-recvr.h.

Referenced by DataStreamRecvr().

TUniqueId impala::DataStreamRecvr::fragment_instance_id_
private

Fragment and node id of the destination exchange node this receiver is used by.

Definition at line 123 of file data-stream-recvr.h.

Referenced by fragment_instance_id().

bool impala::DataStreamRecvr::is_merging_
private

True if this reciver merges incoming rows from different senders. Per-sender row batch queues are maintained in this case.

Definition at line 136 of file data-stream-recvr.h.

Referenced by AddBatch(), CreateMerger(), GetBatch(), and RemoveSender().

boost::scoped_ptr<MemTracker> impala::DataStreamRecvr::mem_tracker_
private

Memtracker for batches in the sender queue(s).

Definition at line 142 of file data-stream-recvr.h.

Referenced by DataStreamRecvr(), mem_tracker(), and ~DataStreamRecvr().

boost::scoped_ptr<SortedRunMerger> impala::DataStreamRecvr::merger_
private

SortedRunMerger used to merge rows from different senders.

Definition at line 151 of file data-stream-recvr.h.

Referenced by Close(), CreateMerger(), and GetNext().

DataStreamMgr* impala::DataStreamRecvr::mgr_
private

DataStreamMgr instance used to create this recvr. (Not owned)

Definition at line 120 of file data-stream-recvr.h.

Referenced by Close(), and ~DataStreamRecvr().

AtomicInt<int> impala::DataStreamRecvr::num_buffered_bytes_
private

total number of bytes held across all sender queues.

Definition at line 139 of file data-stream-recvr.h.

Referenced by ExceedsLimit().

RuntimeProfile* impala::DataStreamRecvr::profile_
private

Runtime profile storing the counters below.

Definition at line 157 of file data-stream-recvr.h.

Referenced by CreateMerger(), and DataStreamRecvr().

RowDescriptor impala::DataStreamRecvr::row_desc_
private

Row schema, copied from the caller of CreateRecvr().

Definition at line 132 of file data-stream-recvr.h.

Referenced by CreateMerger(), and row_desc().

ObjectPool impala::DataStreamRecvr::sender_queue_pool_
private

Pool of sender queues.

Definition at line 154 of file data-stream-recvr.h.

Referenced by DataStreamRecvr().

std::vector<SenderQueue*> impala::DataStreamRecvr::sender_queues_
private

One or more queues of row batches received from senders. If is_merging_ is true, there is one SenderQueue for each sender. Otherwise, row batches from all senders are placed in the same SenderQueue. The SenderQueue instances are owned by the receiver and placed in sender_queue_pool_.

Definition at line 148 of file data-stream-recvr.h.

Referenced by AddBatch(), CancelStream(), Close(), CreateMerger(), DataStreamRecvr(), GetBatch(), RemoveSender(), and TransferAllResources().

int impala::DataStreamRecvr::total_buffer_limit_
private

soft upper limit on the total amount of buffering allowed for this stream across all sender queues. we stop acking incoming data once the amount of buffered data exceeds this value

Definition at line 129 of file data-stream-recvr.h.

Referenced by ExceedsLimit().


The documentation for this class was generated from the following files: