18 #include <boost/shared_ptr.hpp>
19 #include <thrift/protocol/TDebugProtocol.h>
36 #include "gen-cpp/Types_types.h"
37 #include "gen-cpp/ImpalaInternalService.h"
38 #include "gen-cpp/ImpalaInternalService_types.h"
42 using boost::condition_variable;
43 using namespace apache::thrift;
45 using namespace apache::thrift::transport;
64 const TNetworkAddress& destination,
const TUniqueId& fragment_instance_id,
67 buffer_size_(buffer_size),
71 fragment_instance_id_(fragment_instance_id),
72 dest_node_id_(dest_node_id),
73 num_data_bytes_sent_(0),
74 rpc_thread_(
"DataStreamSender",
"SenderThread", 1, 1,
75 bind<void>(mem_fn(&
Channel::TransmitData), this, _1, _2)),
76 rpc_in_flight_(false) {
91 Status SendBatch(TRowBatch* batch);
140 Status SendCurrentBatch();
145 void TransmitData(
int thread_id,
const TRowBatch*);
146 void TransmitDataHelper(
const TRowBatch*);
159 Status DataStreamSender::Channel::SendBatch(TRowBatch* batch) {
160 VLOG_ROW <<
"Channel::SendBatch() instance_id=" << fragment_instance_id_
161 <<
" dest_node=" << dest_node_id_ <<
" #rows=" << batch->num_rows;
165 unique_lock<mutex> l(rpc_thread_lock_);
166 rpc_in_flight_ =
true;
168 if (!rpc_thread_.Offer(batch)) {
169 unique_lock<mutex> l(rpc_thread_lock_);
170 rpc_in_flight_ =
false;
175 void DataStreamSender::Channel::TransmitData(
int thread_id,
const TRowBatch* batch) {
176 DCHECK(rpc_in_flight_);
177 TransmitDataHelper(batch);
180 unique_lock<mutex> l(rpc_thread_lock_);
181 rpc_in_flight_ =
false;
183 rpc_done_cv_.notify_one();
186 void DataStreamSender::Channel::TransmitDataHelper(
const TRowBatch* batch) {
187 DCHECK(batch != NULL);
188 VLOG_ROW <<
"Channel::TransmitData() instance_id=" << fragment_instance_id_
189 <<
" dest_node=" << dest_node_id_
190 <<
" #rows=" << batch->num_rows;
191 TTransmitDataParams params;
192 params.protocol_version = ImpalaInternalServiceVersion::V1;
193 params.__set_dest_fragment_instance_id(fragment_instance_id_);
194 params.__set_dest_node_id(dest_node_id_);
195 params.__set_row_batch(*batch);
196 params.__set_eos(
false);
197 params.__set_sender_id(parent_->sender_id_);
200 if (!rpc_status_.ok())
return;
202 TTransmitDataResult res;
206 client.
DoRpc(&ImpalaInternalServiceClient::TransmitData, params, &res);
207 if (!rpc_status_.ok())
return;
211 rpc_status_ = res.status;
213 num_data_bytes_sent_ += RowBatch::GetBatchSize(*batch);
214 VLOG_ROW <<
"incremented #data_bytes_sent="
215 << num_data_bytes_sent_;
219 void DataStreamSender::Channel::WaitForRpc() {
220 SCOPED_TIMER(parent_->state_->total_network_send_timer());
221 unique_lock<mutex> l(rpc_thread_lock_);
222 while (rpc_in_flight_) {
223 rpc_done_cv_.wait(l);
228 int row_num = batch_->AddRow();
229 if (row_num == RowBatch::INVALID_ROW_INDEX) {
233 row_num = batch_->AddRow();
234 DCHECK_NE(row_num, RowBatch::INVALID_ROW_INDEX);
237 TupleRow* dest = batch_->GetRow(row_num);
238 batch_->CopyRow(row, dest);
240 for (
int i = 0; i < descs.size(); ++i) {
245 batch_->tuple_data_pool()));
248 batch_->CommitLastRow();
252 Status DataStreamSender::Channel::SendCurrentBatch() {
256 parent_->SerializeBatch(batch_.get(), &thrift_batch_);
262 Status DataStreamSender::Channel::GetSendStatus() {
264 if (!rpc_status_.ok()) {
265 LOG(ERROR) <<
"channel send status: " << rpc_status_.GetDetail();
270 Status DataStreamSender::Channel::CloseInternal() {
271 VLOG_RPC <<
"Channel::Close() instance_id=" << fragment_instance_id_
272 <<
" dest_node=" << dest_node_id_
273 <<
" #rows= " << batch_->num_rows();
275 if (batch_->num_rows() > 0) {
286 TTransmitDataParams params;
287 params.protocol_version = ImpalaInternalServiceVersion::V1;
288 params.__set_dest_fragment_instance_id(fragment_instance_id_);
289 params.__set_dest_node_id(dest_node_id_);
290 params.__set_sender_id(parent_->sender_id_);
291 params.__set_eos(
true);
292 TTransmitDataResult res;
293 VLOG_RPC <<
"calling TransmitData to close channel";
294 rpc_status_ = client.
DoRpc(&ImpalaInternalServiceClient::TransmitData, params, &res);
295 if (!rpc_status_.ok()) {
297 msg <<
"CloseChannel() to " << address_ <<
" failed:\n" << rpc_status_.
msg().
msg();
298 return Status(rpc_status_.code(), msg.str());
300 return Status(res.status);
304 Status s = CloseInternal();
306 rpc_thread_.DrainAndShutdown();
312 const vector<TPlanFragmentDestination>& destinations,
313 int per_channel_buffer_size)
314 : sender_id_(sender_id),
317 current_channel_idx_(0),
319 current_thrift_batch_(&thrift_batch1_),
321 serialize_batch_timer_(NULL),
322 thrift_transmit_timer_(NULL),
323 bytes_sent_counter_(NULL),
324 dest_node_id_(sink.dest_node_id) {
325 DCHECK_GT(destinations.size(), 0);
326 DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
327 || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
328 || sink.output_partition.type == TPartitionType::RANDOM);
329 broadcast_ = sink.output_partition.type == TPartitionType::UNPARTITIONED;
330 random_ = sink.output_partition.type == TPartitionType::RANDOM;
332 for (
int i = 0; i < destinations.size(); ++i) {
334 new Channel(
this, row_desc, destinations[i].server,
335 destinations[i].fragment_instance_id,
336 sink.dest_node_id, per_channel_buffer_size));
341 srand(reinterpret_cast<uint64_t>(
this));
345 if (sink.output_partition.type == TPartitionType::HASH_PARTITIONED) {
357 for (
int i = 0; i <
channels_.size(); ++i) {
363 DCHECK(state != NULL);
366 title <<
"DataStreamSender (dst_id=" <<
dest_node_id_ <<
")";
389 profile()->total_time_counter()));
391 for (
int i = 0; i <
channels_.size(); ++i) {
414 for (
int i = 0; i <
channels_.size(); ++i) {
430 for (
int i = 0; i < batch->
num_rows(); ++i) {
435 void* partition_val = ctx->
GetValue(row);
452 for (
int i = 0; i <
channels_.size(); ++i) {
463 int uncompressed_bytes = src->
Serialize(dest);
473 for (
int i = 0; i <
channels_.size(); ++i) {
474 result +=
channels_[i]->num_data_bytes_sent();
const std::string & msg() const
Returns the formatted error string.
RuntimeProfile::Counter * thrift_transmit_timer_
DerivedCounter * AddDerivedCounter(const std::string &name, TUnit::type unit, const DerivedCounterFunction &counter_fn, const std::string &parent_counter_name="")
RuntimeProfile::Counter * uncompressed_bytes_counter_
ImpalaInternalServiceClientCache * client_cache_
RuntimeProfile::Counter * bytes_sent_counter_
Status SendBatch(TRowBatch *batch)
Tuple * GetTuple(int tuple_idx)
const StringSearch UrlParser::protocol_search & protocol
condition_variable rpc_done_cv_
RuntimeProfile::Counter * network_throughput_
Throughput per time spent in TransmitData.
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
scoped_ptr< RowBatch > batch_
TupleRow * GetRow(int row_idx)
void * GetValue(TupleRow *row)
#define ADD_TIMER(profile, name)
virtual Status Prepare(RuntimeState *state)
static int64_t UnitsPerSecond(const Counter *total_counter, const Counter *timer)
Derived counter function: return measured throughput as input_value/second.
virtual RuntimeProfile * profile()
Returns the runtime profile for the sink.
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
virtual Status Open(RuntimeState *state)
#define COUNTER_ADD(c, v)
static uint32_t GetHashValueFnv(const void *v, const ColumnType &type, uint32_t seed)
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
Tuple * DeepCopy(const TupleDescriptor &desc, MemPool *pool, bool convert_ptrs=false)
const RowDescriptor & row_desc_
const RowDescriptor & row_desc_
DataStreamSender * parent_
bool LogError(const ErrorMsg &msg)
std::vector< Channel * > channels_
TUniqueId fragment_instance_id_
void SerializeBatch(RowBatch *src, TRowBatch *dest, int num_receivers=1)
virtual Status Send(RuntimeState *state, RowBatch *batch, bool eos)
Status DoRpc(const F &f, const Request &request, Response *response)
TRowBatch * thrift_batch()
#define ADD_COUNTER(profile, name, unit)
static int GetBatchSize(const TRowBatch &batch)
Utility function: returns total size of batch.
int64_t GetNumDataBytesSent() const
virtual ~DataStreamSender()
virtual void Close(RuntimeState *state)
This class is thread-safe.
RuntimeProfile::Counter * overall_throughput_
Throughput per total time spent in sender.
const RowDescriptor & row_desc() const
bool closed_
If true, this sender has been closed. Not valid to call Send() anymore.
static const uint32_t FNV_SEED
RuntimeProfile * profile_
MemTracker * instance_mem_tracker()
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
int Serialize(TRowBatch *output_batch)
std::vector< ExprContext * > partition_expr_ctxs_
RuntimeProfile::Counter * serialize_batch_timer_
void SetTuple(int tuple_idx, Tuple *tuple)
const ColumnType & type() const
void FreeLocalAllocations()
ImpalaInternalServiceClientCache * impalad_client_cache()
const RowDescriptor * row_desc_
owned by plan root, which resides in runtime_state_'s pool
TRowBatch * current_thrift_batch_
Channel(DataStreamSender *parent, const RowDescriptor &row_desc, const TNetworkAddress &destination, const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, int buffer_size)
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
const ErrorMsg & msg() const
Returns the error message associated with a non-successful status.
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
int64_t num_data_bytes_sent() const
boost::scoped_ptr< MemTracker > mem_tracker_
ThreadPool< TRowBatch * > rpc_thread_
int64_t num_data_bytes_sent_
PlanNodeId dest_node_id_
Identifier of the destination plan node.
Counter * total_time_counter()
Returns the counter for the total elapsed time.