Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
data-stream-recvr.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef IMPALA_RUNTIME_DATA_STREAM_RECVR_H
16 #define IMPALA_RUNTIME_DATA_STREAM_RECVR_H
17 
18 #include <boost/scoped_ptr.hpp>
19 #include <boost/thread/mutex.hpp>
20 
21 #include "common/object-pool.h"
22 #include "common/status.h"
23 #include "gen-cpp/Types_types.h" // for TUniqueId
24 #include "gen-cpp/Results_types.h" // for TRowBatch
25 #include "runtime/descriptors.h"
26 #include "util/tuple-row-compare.h"
27 
28 namespace impala {
29 
30 class DataStreamMgr;
31 class SortedRunMerger;
32 class MemTracker;
33 class RowBatch;
34 class RuntimeProfile;
35 
42 //
57 //
61  public:
63 
70  Status GetBatch(RowBatch** next_batch);
71 
73  void Close();
74 
78  Status CreateMerger(const TupleRowComparator& less_than);
79 
82  Status GetNext(RowBatch* output_batch, bool* eos);
83 
86  void TransferAllResources(RowBatch* transfer_batch);
87 
88  const TUniqueId& fragment_instance_id() const { return fragment_instance_id_; }
90  const RowDescriptor& row_desc() const { return row_desc_; }
91  MemTracker* mem_tracker() const { return mem_tracker_.get(); }
92 
93  private:
94  friend class DataStreamMgr;
95  class SenderQueue;
96 
97  DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker,
98  const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id,
99  PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit,
100  RuntimeProfile* profile);
101 
104  void AddBatch(const TRowBatch& thrift_batch, int sender_id);
105 
108  void RemoveSender(int sender_id);
109 
111  void CancelStream();
112 
115  bool ExceedsLimit(int batch_size) {
116  return num_buffered_bytes_ + batch_size > total_buffer_limit_;
117  }
118 
121 
125 
130 
133 
137 
140 
142  boost::scoped_ptr<MemTracker> mem_tracker_;
143 
148  std::vector<SenderQueue*> sender_queues_;
149 
151  boost::scoped_ptr<SortedRunMerger> merger_;
152 
155 
158 
161 
164 
166 
170 
176 
182  boost::try_mutex buffer_wall_timer_lock_;
183 
186 
189 };
190 
191 }
192 
193 #endif
bool ExceedsLimit(int batch_size)
RuntimeProfile::TimeSeriesCounter * bytes_received_time_series_counter_
Time series of number of bytes received, samples bytes_received_counter_.
RowDescriptor row_desc_
Row schema, copied from the caller of CreateRecvr().
RuntimeProfile::Counter * first_batch_wait_total_timer_
void RemoveSender(int sender_id)
RuntimeProfile * profile_
Runtime profile storing the counters below.
int PlanNodeId
Definition: global-types.h:26
PlanNodeId dest_node_id() const
DataStreamRecvr(DataStreamMgr *stream_mgr, MemTracker *parent_tracker, const RowDescriptor &row_desc, const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit, RuntimeProfile *profile)
TUniqueId fragment_instance_id_
Fragment and node id of the destination exchange node this receiver is used by.
Status GetBatch(RowBatch **next_batch)
void CancelStream()
Empties the sender queues and notifies all waiting consumers of cancellation.
RuntimeProfile::Counter * deserialize_row_batch_timer_
boost::scoped_ptr< MemTracker > mem_tracker_
Memtracker for batches in the sender queue(s).
const TUniqueId & fragment_instance_id() const
AtomicInt< int > num_buffered_bytes_
total number of bytes held across all sender queues.
const RowDescriptor & row_desc() const
std::vector< SenderQueue * > sender_queues_
RuntimeProfile::Counter * buffer_full_total_timer_
This class is thread-safe.
Definition: mem-tracker.h:61
void Close()
Deregister from DataStreamMgr instance, which shares ownership of this instance.
Status CreateMerger(const TupleRowComparator &less_than)
boost::try_mutex buffer_wall_timer_lock_
Status GetNext(RowBatch *output_batch, bool *eos)
RuntimeProfile::Counter * bytes_received_counter_
Number of bytes received.
void TransferAllResources(RowBatch *transfer_batch)
ObjectPool sender_queue_pool_
Pool of sender queues.
RuntimeProfile::Counter * buffer_full_wall_timer_
Wall time senders spend waiting for the recv buffer to have capacity.
boost::scoped_ptr< SortedRunMerger > merger_
SortedRunMerger used to merge rows from different senders.
void AddBatch(const TRowBatch &thrift_batch, int sender_id)
DataStreamMgr * mgr_
DataStreamMgr instance used to create this recvr. (Not owned)
RuntimeProfile::Counter * data_arrival_timer_
Total time spent waiting for data to arrive in the recv buffer.
MemTracker * mem_tracker() const