Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
data-stream-sender.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <iostream>
18 #include <boost/shared_ptr.hpp>
19 #include <thrift/protocol/TDebugProtocol.h>
20 
21 #include "common/logging.h"
22 #include "exprs/expr.h"
23 #include "exprs/expr-context.h"
24 #include "runtime/descriptors.h"
25 #include "runtime/tuple-row.h"
26 #include "runtime/row-batch.h"
27 #include "runtime/raw-value.h"
28 #include "runtime/runtime-state.h"
29 #include "runtime/client-cache.h"
30 #include "runtime/mem-tracker.h"
31 #include "util/debug-util.h"
32 #include "util/network-util.h"
33 #include "rpc/thrift-client.h"
34 #include "rpc/thrift-util.h"
35 
36 #include "gen-cpp/Types_types.h"
37 #include "gen-cpp/ImpalaInternalService.h"
38 #include "gen-cpp/ImpalaInternalService_types.h"
39 
40 #include "common/names.h"
41 
42 using boost::condition_variable;
43 using namespace apache::thrift;
44 using namespace apache::thrift::protocol;
45 using namespace apache::thrift::transport;
46 
47 namespace impala {
48 
49 // A channel sends data asynchronously via calls to TransmitData
50 // to a single destination ipaddress/node.
51 // It has a fixed-capacity buffer and allows the caller either to add rows to
52 // that buffer individually (AddRow()), or circumvent the buffer altogether and send
53 // TRowBatches directly (SendBatch()). Either way, there can only be one in-flight RPC
54 // at any one time (ie, sending will block if the most recent rpc hasn't finished,
55 // which allows the receiver node to throttle the sender by withholding acks).
56 // *Not* thread-safe.
58  public:
59  // Create channel to send data to particular ipaddress/port/query/node
60  // combination. buffer_size is specified in bytes and a soft limit on
61  // how much tuple data is getting accumulated before being sent; it only applies
62  // when data is added via AddRow() and not sent directly via SendBatch().
64  const TNetworkAddress& destination, const TUniqueId& fragment_instance_id,
65  PlanNodeId dest_node_id, int buffer_size)
66  : parent_(parent),
67  buffer_size_(buffer_size),
68  client_cache_(NULL),
69  row_desc_(row_desc),
70  address_(MakeNetworkAddress(destination.hostname, destination.port)),
71  fragment_instance_id_(fragment_instance_id),
72  dest_node_id_(dest_node_id),
73  num_data_bytes_sent_(0),
74  rpc_thread_("DataStreamSender", "SenderThread", 1, 1,
75  bind<void>(mem_fn(&Channel::TransmitData), this, _1, _2)),
76  rpc_in_flight_(false) {
77  }
78 
79  // Initialize channel.
80  // Returns OK if successful, error indication otherwise.
81  Status Init(RuntimeState* state);
82 
83  // Copies a single row into this channel's output buffer and flushes buffer
84  // if it reaches capacity.
85  // Returns error status if any of the preceding rpcs failed, OK otherwise.
86  Status AddRow(TupleRow* row);
87 
88  // Asynchronously sends a row batch.
89  // Returns the status of the most recently finished TransmitData
90  // rpc (or OK if there wasn't one that hasn't been reported yet).
91  Status SendBatch(TRowBatch* batch);
92 
93  // Return status of last TransmitData rpc (initiated by the most recent call
94  // to either SendBatch() or SendCurrentBatch()).
95  Status GetSendStatus();
96 
97  // Waits for the rpc thread pool to finish the current rpc.
98  void WaitForRpc();
99 
100  // Flush buffered rows and close channel.
101  // Logs errors if any of the preceding rpcs failed.
102  void Close(RuntimeState* state);
103 
104  int64_t num_data_bytes_sent() const { return num_data_bytes_sent_; }
105  TRowBatch* thrift_batch() { return &thrift_batch_; }
106 
107  private:
110 
112 
114  TNetworkAddress address_;
117 
118  // the number of TRowBatch.data bytes sent successfully
120 
121  // we're accumulating rows into this batch
122  scoped_ptr<RowBatch> batch_;
123  TRowBatch thrift_batch_;
124 
125  // We want to reuse the rpc thread to prevent creating a thread per rowbatch.
126  // TODO: currently we only have one batch in flight, but we should buffer more
127  // batches. This is a bit tricky since the channels share the outgoing batch
128  // pointer we need some mechanism to coordinate when the batch is all done.
129  // TODO: if the order of row batches does not matter, we can consider increasing
130  // the number of threads.
132  condition_variable rpc_done_cv_; // signaled when rpc_in_flight_ is set to true.
133  mutex rpc_thread_lock_; // Lock with rpc_done_cv_ protecting rpc_in_flight_
134  bool rpc_in_flight_; // true if the rpc_thread_ is busy sending.
135 
136  Status rpc_status_; // status of most recently finished TransmitData rpc
137 
138  // Serialize batch_ into thrift_batch_ and send via SendBatch().
139  // Returns SendBatch() status.
140  Status SendCurrentBatch();
141 
142  // Synchronously call TransmitData() on a client from client_cache_ and update
143  // rpc_status_ based on return value (or set to error if RPC failed).
144  // Called from a thread from the rpc_thread_ pool.
145  void TransmitData(int thread_id, const TRowBatch*);
146  void TransmitDataHelper(const TRowBatch*);
147 
148  Status CloseInternal();
149 };
150 
151 Status DataStreamSender::Channel::Init(RuntimeState* state) {
152  client_cache_ = state->impalad_client_cache();
153  // TODO: figure out how to size batch_
154  int capacity = max(1, buffer_size_ / max(row_desc_.GetRowSize(), 1));
155  batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker_.get()));
156  return Status::OK;
157 }
158 
159 Status DataStreamSender::Channel::SendBatch(TRowBatch* batch) {
160  VLOG_ROW << "Channel::SendBatch() instance_id=" << fragment_instance_id_
161  << " dest_node=" << dest_node_id_ << " #rows=" << batch->num_rows;
162  // return if the previous batch saw an error
163  RETURN_IF_ERROR(GetSendStatus());
164  {
165  unique_lock<mutex> l(rpc_thread_lock_);
166  rpc_in_flight_ = true;
167  }
168  if (!rpc_thread_.Offer(batch)) {
169  unique_lock<mutex> l(rpc_thread_lock_);
170  rpc_in_flight_ = false;
171  }
172  return Status::OK;
173 }
174 
175 void DataStreamSender::Channel::TransmitData(int thread_id, const TRowBatch* batch) {
176  DCHECK(rpc_in_flight_);
177  TransmitDataHelper(batch);
178 
179  {
180  unique_lock<mutex> l(rpc_thread_lock_);
181  rpc_in_flight_ = false;
182  }
183  rpc_done_cv_.notify_one();
184 }
185 
186 void DataStreamSender::Channel::TransmitDataHelper(const TRowBatch* batch) {
187  DCHECK(batch != NULL);
188  VLOG_ROW << "Channel::TransmitData() instance_id=" << fragment_instance_id_
189  << " dest_node=" << dest_node_id_
190  << " #rows=" << batch->num_rows;
191  TTransmitDataParams params;
192  params.protocol_version = ImpalaInternalServiceVersion::V1;
193  params.__set_dest_fragment_instance_id(fragment_instance_id_);
194  params.__set_dest_node_id(dest_node_id_);
195  params.__set_row_batch(*batch); // yet another copy
196  params.__set_eos(false);
197  params.__set_sender_id(parent_->sender_id_);
198 
199  ImpalaInternalServiceConnection client(client_cache_, address_, &rpc_status_);
200  if (!rpc_status_.ok()) return;
201 
202  TTransmitDataResult res;
203  {
204  SCOPED_TIMER(parent_->thrift_transmit_timer_);
205  rpc_status_ =
206  client.DoRpc(&ImpalaInternalServiceClient::TransmitData, params, &res);
207  if (!rpc_status_.ok()) return;
208  }
209 
210  if (res.status.status_code != TErrorCode::OK) {
211  rpc_status_ = res.status;
212  } else {
213  num_data_bytes_sent_ += RowBatch::GetBatchSize(*batch);
214  VLOG_ROW << "incremented #data_bytes_sent="
215  << num_data_bytes_sent_;
216  }
217 }
218 
219 void DataStreamSender::Channel::WaitForRpc() {
220  SCOPED_TIMER(parent_->state_->total_network_send_timer());
221  unique_lock<mutex> l(rpc_thread_lock_);
222  while (rpc_in_flight_) {
223  rpc_done_cv_.wait(l);
224  }
225 }
226 
227 Status DataStreamSender::Channel::AddRow(TupleRow* row) {
228  int row_num = batch_->AddRow();
229  if (row_num == RowBatch::INVALID_ROW_INDEX) {
230  // batch_ is full, let's send it; but first wait for an ongoing
231  // transmission to finish before modifying thrift_batch_
232  RETURN_IF_ERROR(SendCurrentBatch());
233  row_num = batch_->AddRow();
234  DCHECK_NE(row_num, RowBatch::INVALID_ROW_INDEX);
235  }
236 
237  TupleRow* dest = batch_->GetRow(row_num);
238  batch_->CopyRow(row, dest);
239  const vector<TupleDescriptor*>& descs = row_desc_.tuple_descriptors();
240  for (int i = 0; i < descs.size(); ++i) {
241  if (UNLIKELY(row->GetTuple(i) == NULL)) {
242  dest->SetTuple(i, NULL);
243  } else {
244  dest->SetTuple(i, row->GetTuple(i)->DeepCopy(*descs[i],
245  batch_->tuple_data_pool()));
246  }
247  }
248  batch_->CommitLastRow();
249  return Status::OK;
250 }
251 
252 Status DataStreamSender::Channel::SendCurrentBatch() {
253  // make sure there's no in-flight TransmitData() call that might still want to
254  // access thrift_batch_
255  WaitForRpc();
256  parent_->SerializeBatch(batch_.get(), &thrift_batch_);
257  batch_->Reset();
258  RETURN_IF_ERROR(SendBatch(&thrift_batch_));
259  return Status::OK;
260 }
261 
262 Status DataStreamSender::Channel::GetSendStatus() {
263  WaitForRpc();
264  if (!rpc_status_.ok()) {
265  LOG(ERROR) << "channel send status: " << rpc_status_.GetDetail();
266  }
267  return rpc_status_;
268 }
269 
270 Status DataStreamSender::Channel::CloseInternal() {
271  VLOG_RPC << "Channel::Close() instance_id=" << fragment_instance_id_
272  << " dest_node=" << dest_node_id_
273  << " #rows= " << batch_->num_rows();
274 
275  if (batch_->num_rows() > 0) {
276  // flush
277  RETURN_IF_ERROR(SendCurrentBatch());
278  }
279  // if the last transmitted batch resulted in a error, return that error
280  RETURN_IF_ERROR(GetSendStatus());
281  Status status;
282  ImpalaInternalServiceConnection client(client_cache_, address_, &status);
283  if (!status.ok()) {
284  return status;
285  }
286  TTransmitDataParams params;
287  params.protocol_version = ImpalaInternalServiceVersion::V1;
288  params.__set_dest_fragment_instance_id(fragment_instance_id_);
289  params.__set_dest_node_id(dest_node_id_);
290  params.__set_sender_id(parent_->sender_id_);
291  params.__set_eos(true);
292  TTransmitDataResult res;
293  VLOG_RPC << "calling TransmitData to close channel";
294  rpc_status_ = client.DoRpc(&ImpalaInternalServiceClient::TransmitData, params, &res);
295  if (!rpc_status_.ok()) {
296  stringstream msg;
297  msg << "CloseChannel() to " << address_ << " failed:\n" << rpc_status_.msg().msg();
298  return Status(rpc_status_.code(), msg.str());
299  }
300  return Status(res.status);
301 }
302 
303 void DataStreamSender::Channel::Close(RuntimeState* state) {
304  Status s = CloseInternal();
305  if (!s.ok()) state->LogError(s.msg());
306  rpc_thread_.DrainAndShutdown();
307  batch_.reset();
308 }
309 
310 DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id,
311  const RowDescriptor& row_desc, const TDataStreamSink& sink,
312  const vector<TPlanFragmentDestination>& destinations,
313  int per_channel_buffer_size)
314  : sender_id_(sender_id),
315  pool_(pool),
316  row_desc_(row_desc),
317  current_channel_idx_(0),
318  closed_(false),
319  current_thrift_batch_(&thrift_batch1_),
320  profile_(NULL),
321  serialize_batch_timer_(NULL),
322  thrift_transmit_timer_(NULL),
323  bytes_sent_counter_(NULL),
324  dest_node_id_(sink.dest_node_id) {
325  DCHECK_GT(destinations.size(), 0);
326  DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
327  || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
328  || sink.output_partition.type == TPartitionType::RANDOM);
329  broadcast_ = sink.output_partition.type == TPartitionType::UNPARTITIONED;
330  random_ = sink.output_partition.type == TPartitionType::RANDOM;
331  // TODO: use something like google3's linked_ptr here (scoped_ptr isn't copyable)
332  for (int i = 0; i < destinations.size(); ++i) {
333  channels_.push_back(
334  new Channel(this, row_desc, destinations[i].server,
335  destinations[i].fragment_instance_id,
336  sink.dest_node_id, per_channel_buffer_size));
337  }
338 
339  if (broadcast_ || random_) {
340  // Randomize the order we open/transmit to channels to avoid thundering herd problems.
341  srand(reinterpret_cast<uint64_t>(this));
342  random_shuffle(channels_.begin(), channels_.end());
343  }
344 
345  if (sink.output_partition.type == TPartitionType::HASH_PARTITIONED) {
346  // TODO: move this to Init()? would need to save 'sink' somewhere
347  Status status =
348  Expr::CreateExprTrees(pool, sink.output_partition.partition_exprs,
350  DCHECK(status.ok());
351  }
352 }
353 
355  // TODO: check that sender was either already closed() or there was an error
356  // on some channel
357  for (int i = 0; i < channels_.size(); ++i) {
358  delete channels_[i];
359  }
360 }
361 
363  DCHECK(state != NULL);
364  state_ = state;
365  stringstream title;
366  title << "DataStreamSender (dst_id=" << dest_node_id_ << ")";
367  profile_ = pool_->Add(new RuntimeProfile(pool_, title.str()));
369 
370  mem_tracker_.reset(new MemTracker(profile(), -1, -1, "DataStreamSender",
371  state->instance_mem_tracker()));
374 
376  ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
378  ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
380  ADD_TIMER(profile(), "SerializeBatchTime");
381  thrift_transmit_timer_ = ADD_TIMER(profile(), "ThriftTransmitTime(*)");
383  profile()->AddDerivedCounter("NetworkThroughput(*)", TUnit::BYTES_PER_SECOND,
387  profile()->AddDerivedCounter("OverallThroughput", TUnit::BYTES_PER_SECOND,
389  profile()->total_time_counter()));
390 
391  for (int i = 0; i < channels_.size(); ++i) {
392  RETURN_IF_ERROR(channels_[i]->Init(state));
393  }
394  return Status::OK;
395 }
396 
398  return Expr::Open(partition_expr_ctxs_, state);
399 }
400 
405  DCHECK(!closed_);
406 
407  if (batch->num_rows() == 0) return Status::OK;
408  if (broadcast_ || channels_.size() == 1) {
409  // current_thrift_batch_ is *not* the one that was written by the last call
410  // to Serialize()
412  // SendBatch() will block if there are still in-flight rpcs (and those will
413  // reference the previously written thrift batch)
414  for (int i = 0; i < channels_.size(); ++i) {
416  }
419  } else if (random_) {
420  // Round-robin batches among channels. Wait for the current channel to finish its
421  // rpc before overwriting its batch.
422  Channel* current_channel = channels_[current_channel_idx_];
423  current_channel->WaitForRpc();
424  SerializeBatch(batch, current_channel->thrift_batch());
425  current_channel->SendBatch(current_channel->thrift_batch());
427  } else {
428  // hash-partition batch's rows across channels
429  int num_channels = channels_.size();
430  for (int i = 0; i < batch->num_rows(); ++i) {
431  TupleRow* row = batch->GetRow(i);
432  uint32_t hash_val = HashUtil::FNV_SEED;
433  for (int i = 0; i < partition_expr_ctxs_.size(); ++i) {
435  void* partition_val = ctx->GetValue(row);
436  // We can't use the crc hash function here because it does not result
437  // in uncorrelated hashes with different seeds. Instead we must use
438  // fnv hash.
439  // TODO: fix crc hash/GetHashValue()
440  hash_val =
441  RawValue::GetHashValueFnv(partition_val, ctx->root()->type(), hash_val);
442  }
443 
444  RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row));
445  }
446  }
447  return Status::OK;
448 }
449 
451  if (closed_) return;
452  for (int i = 0; i < channels_.size(); ++i) {
453  channels_[i]->Close(state);
454  }
456  closed_ = true;
457 }
458 
459 void DataStreamSender::SerializeBatch(RowBatch* src, TRowBatch* dest, int num_receivers) {
460  VLOG_ROW << "serializing " << src->num_rows() << " rows";
461  {
463  int uncompressed_bytes = src->Serialize(dest);
464  COUNTER_ADD(bytes_sent_counter_, RowBatch::GetBatchSize(*dest) * num_receivers);
465  COUNTER_ADD(uncompressed_bytes_counter_, uncompressed_bytes * num_receivers);
466  }
467 }
468 
470  // TODO: do we need synchronization here or are reads & writes to 8-byte ints
471  // atomic?
472  int64_t result = 0;
473  for (int i = 0; i < channels_.size(); ++i) {
474  result += channels_[i]->num_data_bytes_sent();
475  }
476  return result;
477 }
478 
479 }
const std::string & msg() const
Returns the formatted error string.
Definition: error-util.h:118
RuntimeProfile::Counter * thrift_transmit_timer_
DerivedCounter * AddDerivedCounter(const std::string &name, TUnit::type unit, const DerivedCounterFunction &counter_fn, const std::string &parent_counter_name="")
RuntimeProfile::Counter * uncompressed_bytes_counter_
int GetRowSize() const
Definition: descriptors.cc:320
ImpalaInternalServiceClientCache * client_cache_
int num_rows() const
Definition: row-batch.h:215
RuntimeProfile::Counter * bytes_sent_counter_
Status SendBatch(TRowBatch *batch)
int PlanNodeId
Definition: global-types.h:26
Tuple * GetTuple(int tuple_idx)
Definition: tuple-row.h:30
const StringSearch UrlParser::protocol_search & protocol
Definition: url-parser.cc:36
RuntimeProfile::Counter * network_throughput_
Throughput per time spent in TransmitData.
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
void * GetValue(TupleRow *row)
#define ADD_TIMER(profile, name)
virtual Status Prepare(RuntimeState *state)
static int64_t UnitsPerSecond(const Counter *total_counter, const Counter *timer)
Derived counter function: return measured throughput as input_value/second.
virtual RuntimeProfile * profile()
Returns the runtime profile for the sink.
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
Definition: network-util.cc:96
virtual Status Open(RuntimeState *state)
#define COUNTER_ADD(c, v)
#define SCOPED_TIMER(c)
static uint32_t GetHashValueFnv(const void *v, const ColumnType &type, uint32_t seed)
Definition: raw-value.h:196
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
Tuple * DeepCopy(const TupleDescriptor &desc, MemPool *pool, bool convert_ptrs=false)
Definition: tuple.cc:34
const RowDescriptor & row_desc_
bool LogError(const ErrorMsg &msg)
std::vector< Channel * > channels_
void SerializeBatch(RowBatch *src, TRowBatch *dest, int num_receivers=1)
virtual Status Send(RuntimeState *state, RowBatch *batch, bool eos)
Status DoRpc(const F &f, const Request &request, Response *response)
Definition: client-cache.h:225
ObjectPool pool
#define ADD_COUNTER(profile, name, unit)
static int GetBatchSize(const TRowBatch &batch)
Utility function: returns total size of batch.
Definition: row-batch.cc:264
int64_t GetNumDataBytesSent() const
#define VLOG_ROW
Definition: logging.h:59
virtual void Close(RuntimeState *state)
This class is thread-safe.
Definition: mem-tracker.h:61
RuntimeProfile::Counter * overall_throughput_
Throughput per total time spent in sender.
const RowDescriptor & row_desc() const
bool closed_
If true, this sender has been closed. Not valid to call Send() anymore.
static const uint32_t FNV_SEED
Definition: hash-util.h:99
MemTracker * instance_mem_tracker()
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
Definition: descriptors.h:412
int Serialize(TRowBatch *output_batch)
Definition: row-batch.cc:147
std::vector< ExprContext * > partition_expr_ctxs_
RuntimeProfile::Counter * serialize_batch_timer_
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
const ColumnType & type() const
Definition: expr.h:145
ImpalaInternalServiceClientCache * impalad_client_cache()
#define UNLIKELY(expr)
Definition: compiler-util.h:33
static const Status OK
Definition: status.h:87
#define VLOG_RPC
Definition: logging.h:56
const RowDescriptor * row_desc_
owned by plan root, which resides in runtime_state_'s pool
Definition: coordinator.h:255
Channel(DataStreamSender *parent, const RowDescriptor &row_desc, const TNetworkAddress &destination, const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, int buffer_size)
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
Definition: expr.cc:149
const ErrorMsg & msg() const
Returns the error message associated with a non-successful status.
Definition: status.h:189
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
bool ok() const
Definition: status.h:172
boost::scoped_ptr< MemTracker > mem_tracker_
ThreadPool< TRowBatch * > rpc_thread_
PlanNodeId dest_node_id_
Identifier of the destination plan node.
Counter * total_time_counter()
Returns the counter for the total elapsed time.