Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
data-stream-sender.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_RUNTIME_DATA_STREAM_SENDER_H
17 #define IMPALA_RUNTIME_DATA_STREAM_SENDER_H
18 
19 #include <vector>
20 #include <string>
21 
22 #include "exec/data-sink.h"
23 #include "common/global-types.h"
24 #include "common/object-pool.h"
25 #include "common/status.h"
26 #include "util/runtime-profile.h"
27 #include "gen-cpp/Results_types.h" // for TRowBatch
28 
29 namespace impala {
30 
31 class Expr;
32 class RowBatch;
33 class RowDescriptor;
34 class MemTracker;
35 class TDataStreamSink;
36 class TNetworkAddress;
37 class TPlanFragmentDestination;
38 
43 //
46 class DataStreamSender : public DataSink {
47  public:
56  DataStreamSender(ObjectPool* pool, int sender_id,
57  const RowDescriptor& row_desc, const TDataStreamSink& sink,
58  const std::vector<TPlanFragmentDestination>& destinations,
59  int per_channel_buffer_size);
60  virtual ~DataStreamSender();
61 
64  virtual Status Prepare(RuntimeState* state);
65 
68  virtual Status Open(RuntimeState* state);
69 
75  virtual Status Send(RuntimeState* state, RowBatch* batch, bool eos);
76 
79  virtual void Close(RuntimeState* state);
80 
84  void SerializeBatch(RowBatch* src, TRowBatch* dest, int num_receivers = 1);
85 
88  int64_t GetNumDataBytesSent() const;
89 
90  virtual RuntimeProfile* profile() { return profile_; }
91 
92  private:
93  class Channel;
94 
96  int sender_id_;
100  bool broadcast_; // if true, send all rows on all channels
101  bool random_; // if true, round-robins row batches among channels
102  int current_channel_idx_; // index of current channel to send to if random_ == true
103 
105  bool closed_;
106 
109  TRowBatch thrift_batch1_;
110  TRowBatch thrift_batch2_;
111  TRowBatch* current_thrift_batch_; // the next one to fill in Send()
112 
113  std::vector<ExprContext*> partition_expr_ctxs_; // compute per-row partition values
114  std::vector<Channel*> channels_;
115 
116  RuntimeProfile* profile_; // Allocated from pool_
121  boost::scoped_ptr<MemTracker> mem_tracker_;
122 
125 
128 
131 };
132 
133 }
134 
135 #endif
RuntimeProfile::Counter * thrift_transmit_timer_
RuntimeProfile::Counter * uncompressed_bytes_counter_
RuntimeProfile::Counter * bytes_sent_counter_
int PlanNodeId
Definition: global-types.h:26
int sender_id_
Sender instance id, unique within a fragment.
RuntimeProfile::Counter * network_throughput_
Throughput per time spent in TransmitData.
virtual Status Prepare(RuntimeState *state)
virtual RuntimeProfile * profile()
Returns the runtime profile for the sink.
virtual Status Open(RuntimeState *state)
const RowDescriptor & row_desc_
Superclass of all data sinks.
Definition: data-sink.h:39
std::vector< Channel * > channels_
void SerializeBatch(RowBatch *src, TRowBatch *dest, int num_receivers=1)
virtual Status Send(RuntimeState *state, RowBatch *batch, bool eos)
ObjectPool pool
int64_t GetNumDataBytesSent() const
virtual void Close(RuntimeState *state)
RuntimeProfile::Counter * overall_throughput_
Throughput per total time spent in sender.
const RowDescriptor & row_desc() const
bool closed_
If true, this sender has been closed. Not valid to call Send() anymore.
std::vector< ExprContext * > partition_expr_ctxs_
RuntimeProfile::Counter * serialize_batch_timer_
DataStreamSender(ObjectPool *pool, int sender_id, const RowDescriptor &row_desc, const TDataStreamSink &sink, const std::vector< TPlanFragmentDestination > &destinations, int per_channel_buffer_size)
boost::scoped_ptr< MemTracker > mem_tracker_
PlanNodeId dest_node_id_
Identifier of the destination plan node.