15 #include <boost/thread/thread.hpp>
16 #include <gtest/gtest.h>
42 #include "gen-cpp/ImpalaInternalService.h"
43 #include "gen-cpp/ImpalaInternalService_types.h"
44 #include "gen-cpp/Types_types.h"
45 #include "gen-cpp/Descriptors_types.h"
52 using namespace impala;
53 using namespace apache::thrift;
56 DEFINE_int32(port, 20001,
"port on which to run Impala test backend");
67 TExecPlanFragmentResult& return_val,
const TExecPlanFragmentParams& params) {}
70 TReportExecStatusResult& return_val,
const TReportExecStatusParams& params) {}
73 TCancelPlanFragmentResult& return_val,
const TCancelPlanFragmentParams& params) {}
76 TTransmitDataResult& return_val,
const TTransmitDataParams& params) {
78 mgr_->AddData(params.dest_fragment_instance_id, params.dest_node_id,
79 params.row_batch, params.sender_id).SetTStatus(&return_val);
81 mgr_->CloseSender(params.dest_fragment_instance_id, params.dest_node_id,
82 params.sender_id).SetTStatus(&return_val);
93 : runtime_state_(TPlanFragmentInstanceCtx(),
"", &
exec_env_),
97 runtime_state_.InitMemTrackers(TUniqueId(), NULL, -1);
102 CreateTupleComparator();
105 next_instance_id_.lo = 0;
106 next_instance_id_.hi = 0;
109 broadcast_sink_.dest_node_id = DEST_NODE_ID;
110 broadcast_sink_.output_partition.type = TPartitionType::UNPARTITIONED;
112 random_sink_.dest_node_id = DEST_NODE_ID;
113 random_sink_.output_partition.type = TPartitionType::RANDOM;
115 hash_sink_.dest_node_id = DEST_NODE_ID;
116 hash_sink_.output_partition.type = TPartitionType::HASH_PARTITIONED;
119 expr_node.node_type = TExprNodeType::SLOT_REF;
120 expr_node.type.types.push_back(TTypeNode());
121 expr_node.type.types.back().__isset.scalar_type =
true;
122 expr_node.type.types.back().scalar_type.type = TPrimitiveType::BIGINT;
123 expr_node.num_children = 0;
125 slot_ref.slot_id = 0;
126 expr_node.__set_slot_ref(slot_ref);
128 expr.nodes.push_back(expr_node);
129 hash_sink_.output_partition.__isset.partition_exprs =
true;
130 hash_sink_.output_partition.partition_exprs.push_back(expr);
133 sender_info_.reserve(MAX_SENDERS);
134 receiver_info_.reserve(MAX_RECEIVERS);
138 const TDataStreamSink&
GetSink(TPartitionType::type partition_type) {
139 switch (partition_type) {
140 case TPartitionType::UNPARTITIONED:
return broadcast_sink_;
141 case TPartitionType::RANDOM:
return random_sink_;
142 case TPartitionType::HASH_PARTITIONED:
return hash_sink_;
143 default: DCHECK(
false) <<
"Unhandled sink type: " << partition_type;
146 return broadcast_sink_;
150 lhs_slot_ctx_->Close(NULL);
151 rhs_slot_ctx_->Close(NULL);
157 sender_info_.clear();
158 receiver_info_.clear();
164 static const int MAX_SENDERS = 16;
165 static const int MAX_RECEIVERS = 16;
167 static const int BATCH_CAPACITY = 100;
168 static const int PER_ROW_DATA = 8;
169 static const int TOTAL_DATA_SIZE = 8 * 1024;
170 static const int NUM_BATCHES = TOTAL_DATA_SIZE / BATCH_CAPACITY / PER_ROW_DATA;
196 vector<TPlanFragmentDestination>
dest_;
218 ReceiverInfo(TPartitionType::type stream_type,
int num_senders,
int receiver_num)
219 : stream_type(stream_type),
220 num_senders(num_senders),
221 receiver_num(receiver_num),
223 num_rows_received(0) {}
226 delete thread_handle;
227 stream_recvr.reset();
234 dest_.push_back(TPlanFragmentDestination());
235 TPlanFragmentDestination& dest = dest_.back();
236 dest.fragment_instance_id = next_instance_id_;
237 dest.server.hostname =
"127.0.0.1";
238 dest.server.port = FLAGS_port;
239 *instance_id = next_instance_id_;
240 ++next_instance_id_.lo;
247 TTupleDescriptor tuple_desc;
248 tuple_desc.__set_id(0);
249 tuple_desc.__set_byteSize(8);
250 tuple_desc.__set_numNullBytes(0);
251 TDescriptorTable thrift_desc_tbl;
252 thrift_desc_tbl.tupleDescriptors.push_back(tuple_desc);
253 TSlotDescriptor slot_desc;
254 slot_desc.__set_id(0);
255 slot_desc.__set_parent(0);
257 slot_desc.__set_slotType(type.
ToThrift());
258 slot_desc.__set_columnPath(vector<int>(1, 0));
259 slot_desc.__set_byteOffset(0);
260 slot_desc.__set_nullIndicatorByte(0);
261 slot_desc.__set_nullIndicatorBit(-1);
262 slot_desc.__set_slotIdx(0);
263 slot_desc.__set_isMaterialized(
true);
264 thrift_desc_tbl.slotDescriptors.push_back(slot_desc);
268 vector<TTupleId> row_tids;
269 row_tids.push_back(0);
270 vector<bool> nullable_tuples;
271 nullable_tuples.push_back(
false);
282 lhs_slot_ctx_->Prepare(NULL, *
row_desc_, &tracker_);
283 rhs_slot_ctx_->Prepare(NULL, *
row_desc_, &tracker_);
284 lhs_slot_ctx_->Open(NULL);
285 rhs_slot_ctx_->Open(NULL);
287 vector<ExprContext*>(1, lhs_slot_ctx_), vector<ExprContext*>(1, rhs_slot_ctx_),
288 vector<bool>(1,
true), vector<bool>(1,
false)));
294 int64_t* tuple_mem =
reinterpret_cast<int64_t*
>(
296 bzero(tuple_mem, BATCH_CAPACITY * 8);
297 for (
int i = 0; i < BATCH_CAPACITY; ++i) {
300 row->
SetTuple(0, reinterpret_cast<Tuple*>(&tuple_mem[i]));
307 for (
int i = 0; i < BATCH_CAPACITY; ++i) {
309 int64_t* val =
reinterpret_cast<int64_t*
>(row->
GetTuple(0)->
GetSlot(0));
310 *val = (*next_val)++;
315 void StartReceiver(TPartitionType::type stream_type,
int num_senders,
int receiver_num,
316 int buffer_size,
bool is_merging, TUniqueId* out_id = NULL) {
320 TUniqueId instance_id;
321 GetNextInstanceId(&instance_id);
322 receiver_info_.push_back(
ReceiverInfo(stream_type, num_senders, receiver_num));
325 stream_mgr_->CreateRecvr(&runtime_state_,
326 *
row_desc_, instance_id, DEST_NODE_ID, num_senders, buffer_size, profile,
334 if (out_id != NULL) *out_id = instance_id;
339 for (
int i = 0; i < receiver_info_.size(); ++i) {
340 receiver_info_[i].thread_handle->join();
341 receiver_info_[i].stream_recvr->Close();
352 for (
int i = 0; i < batch->
num_rows(); ++i) {
370 for (
int i = 0; i < batch.
num_rows(); ++i) {
385 multiset<int64_t> all_data_values;
386 for (
int i = 0; i < receiver_info_.size(); ++i) {
392 if (stream_type == TPartitionType::UNPARTITIONED) {
394 NUM_BATCHES * BATCH_CAPACITY * num_senders, info.
data_values.size());
399 for (multiset<int64_t>::iterator j = info.
data_values.begin();
401 if (stream_type == TPartitionType::UNPARTITIONED) {
404 EXPECT_EQ(k / num_senders, *j);
405 }
else if (stream_type == TPartitionType::HASH_PARTITIONED) {
410 EXPECT_EQ(hash_val % receiver_info_.size(), info.
receiver_num);
415 if (stream_type == TPartitionType::HASH_PARTITIONED) {
416 EXPECT_EQ(NUM_BATCHES * BATCH_CAPACITY * num_senders, total);
419 for (multiset<int64_t>::iterator j = all_data_values.begin();
420 j != all_data_values.end(); ++j, ++k) {
422 EXPECT_EQ(k / num_senders, *j);
423 if (k/num_senders != *j)
break;
429 for (
int i = 0; i < sender_info_.size(); ++i) {
430 EXPECT_TRUE(sender_info_[i].status.ok());
431 EXPECT_GT(sender_info_[i].num_bytes_sent, 0);
438 shared_ptr<TProcessor> processor(
new ImpalaInternalServiceProcessor(handler));
439 server_ =
new ThriftServer(
"DataStreamTest backend", processor, FLAGS_port, NULL);
445 server_->StopForTesting();
449 void StartSender(TPartitionType::type partition_type = TPartitionType::UNPARTITIONED,
450 int channel_buffer_size = 1024) {
452 int num_senders = sender_info_.size();
453 DCHECK_LT(num_senders, MAX_SENDERS);
463 for (
int i = 0; i < sender_info_.size(); ++i) {
464 sender_info_[i].thread_handle->join();
468 void Sender(
int sender_num,
int channel_buffer_size,
469 TPartitionType::type partition_type) {
474 const TDataStreamSink& sink = GetSink(partition_type);
477 EXPECT_TRUE(sender.
Prepare(&state).
ok());
478 EXPECT_TRUE(sender.
Open(&state).
ok());
479 scoped_ptr<RowBatch> batch(CreateRowBatch());
482 for (
int i = 0; i < NUM_BATCHES; ++i) {
483 GetNextBatch(batch.get(), &next_val);
484 VLOG_QUERY <<
"sender " << sender_num <<
": #rows=" << batch->num_rows();
485 info.
status = sender.
Send(&state, batch.get(),
false);
489 sender.
Close(&state);
495 void TestStream(TPartitionType::type stream_type,
int num_senders,
496 int num_receivers,
int buffer_size,
bool is_merging) {
497 VLOG_QUERY <<
"Testing stream=" << stream_type <<
" #senders=" << num_senders
498 <<
" #receivers=" << num_receivers <<
" buffer_size=" << buffer_size
499 <<
" is_merging=" << is_merging;
501 for (
int i = 0; i < num_receivers; ++i) {
502 StartReceiver(stream_type, num_senders, i, buffer_size, is_merging);
504 for (
int i = 0; i < num_senders; ++i) {
505 StartSender(stream_type, buffer_size);
510 CheckReceivers(stream_type, num_senders);
524 GetNextInstanceId(&dummy_id);
525 StartSender(TPartitionType::UNPARTITIONED, TOTAL_DATA_SIZE + 1024);
527 EXPECT_TRUE(sender_info_[0].status.ok());
528 EXPECT_GT(sender_info_[0].num_bytes_sent, 0);
534 GetNextInstanceId(&dummy_id);
537 EXPECT_TRUE(sender_info_[0].status.ok());
538 EXPECT_GT(sender_info_[0].num_bytes_sent, 0);
542 TUniqueId instance_id;
543 StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024,
false, &instance_id);
544 stream_mgr_->Cancel(instance_id);
545 StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024,
true, &instance_id);
546 stream_mgr_->Cancel(instance_id);
548 EXPECT_TRUE(receiver_info_[0].status.IsCancelled());
549 EXPECT_TRUE(receiver_info_[1].status.IsCancelled());
554 TPartitionType::type stream_types[] =
555 {TPartitionType::UNPARTITIONED, TPartitionType::RANDOM,
556 TPartitionType::HASH_PARTITIONED};
557 int sender_nums[] = {1, 4};
558 int receiver_nums[] = {1, 4};
559 int buffer_sizes[] = {1024, 1024 * 1024};
560 bool merging[] = {
false,
true};
561 for (
int i = 0; i <
sizeof(stream_types) /
sizeof(*stream_types); ++i) {
562 for (
int j = 0; j <
sizeof(sender_nums) /
sizeof(
int); ++j) {
563 for (
int k = 0; k <
sizeof(receiver_nums) /
sizeof(
int); ++k) {
564 for (
int l = 0; l <
sizeof(buffer_sizes) /
sizeof(
int); ++l) {
565 for (
int m = 0; m <
sizeof(merging) /
sizeof(
bool); ++m) {
566 TestStream(stream_types[i], sender_nums[j], receiver_nums[k],
567 buffer_sizes[l], merging[m]);
581 int main(
int argc,
char **argv) {
582 ::testing::InitGoogleTest(&argc, argv);
586 return RUN_ALL_TESTS();
void StartReceiver(TPartitionType::type stream_type, int num_senders, int receiver_num, int buffer_size, bool is_merging, TUniqueId *out_id=NULL)
TUniqueId next_instance_id_
vector< TPlanFragmentDestination > dest_
multiset< int64_t > data_values
DataStreamMgr * stream_mgr_
TEST_F(InstructionCounterTest, Count)
Tuple * GetTuple(int tuple_idx)
void TestShutdown()
For testing only: shutdown all clients.
const StringSearch UrlParser::protocol_search & protocol
void CheckReceivers(TPartitionType::type stream_type, int num_senders)
Status InitForFeTests()
Initializes the exec env for running FE tests.
boost::scoped_ptr< ObjectPool > obj_pool_
Object pool owned by the coordinator. Any executor will have its own pool.
void InitMemTrackers(const TUniqueId &query_id, const std::string *request_pool, int64_t query_bytes_limit, int64_t query_rm_reservation_limit_bytes=-1)
const TDataStreamSink & GetSink(TPartitionType::type partition_type)
TPartitionType::type stream_type
RowBatch * CreateRowBatch()
RuntimeState runtime_state_
void set_desc_tbl(DescriptorTbl *desc_tbl)
void InitCommonRuntime(int argc, char **argv, bool init_jvm, TestInfo::Mode m=TestInfo::NON_TEST)
TupleRow * GetRow(int row_idx)
void ReadStreamMerging(ReceiverInfo *info, RuntimeProfile *profile)
ExprContext * rhs_slot_ctx_
virtual Status Prepare(RuntimeState *state)
void TestStream(TPartitionType::type stream_type, int num_senders, int num_receivers, int buffer_size, bool is_merging)
TDescriptorTable desc_tbl_
copied from TQueryExecRequest; constant across all fragments
void * GetSlot(int offset)
shared_ptr< DataStreamRecvr > stream_recvr
TDataStreamSink broadcast_sink_
void GetNextBatch(RowBatch *batch, int *next_val)
virtual Status Open(RuntimeState *state)
void Cancel(const Status *cause=NULL)
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
static uint32_t GetHashValueFnv(const void *v, const ColumnType &type, uint32_t seed)
ExprContext * lhs_slot_ctx_
DECLARE_string(principal)
virtual void ExecPlanFragment(TExecPlanFragmentResult &return_val, const TExecPlanFragmentParams ¶ms)
ImpalaTestBackend(DataStreamMgr *stream_mgr)
MemTracker dummy_mem_tracker_
virtual ~ImpalaTestBackend()
virtual void CancelPlanFragment(TCancelPlanFragmentResult &return_val, const TCancelPlanFragmentParams ¶ms)
void Sender(int sender_num, int channel_buffer_size, TPartitionType::type partition_type)
void Reset()
Resets the row batch, returning all resources it has accumulated.
virtual Status Send(RuntimeState *state, RowBatch *batch, bool eos)
void CreateTupleComparator()
TDataStreamSink hash_sink_
void StartSender(TPartitionType::type partition_type=TPartitionType::UNPARTITIONED, int channel_buffer_size=1024)
void ReadStream(ReceiverInfo *info)
int64_t GetNumDataBytesSent() const
TDataStreamSink random_sink_
vector< ReceiverInfo > receiver_info_
virtual void ReportExecStatus(TReportExecStatusResult &return_val, const TReportExecStatusParams ¶ms)
TupleRowComparator * less_than_
virtual void Close(RuntimeState *state)
This class is thread-safe.
ReceiverInfo(TPartitionType::type stream_type, int num_senders, int receiver_num)
MemPool * tuple_data_pool()
virtual void TransmitData(TTransmitDataResult &return_val, const TTransmitDataParams ¶ms)
static const uint32_t FNV_SEED
uint64_t Test(T *ht, const ProbeTuple *input, uint64_t num_tuples)
void SetTuple(int tuple_idx, Tuple *tuple)
static void InitializeLlvm(bool load_backend=false)
vector< SenderInfo > sender_info_
scoped_ptr< RowBatch > batch_
void GetNextInstanceId(TUniqueId *instance_id)
Reference to a single slot of a tuple.
DescriptorTbl * desc_tbl_
const RowDescriptor * row_desc_
owned by plan root, which resides in runtime_state_'s pool
int main(int argc, char **argv)
static Status Create(ObjectPool *pool, const TDescriptorTable &thrift_tbl, DescriptorTbl **tbl)
DEFINE_int32(periodic_counter_update_period_ms, 500,"Period to update rate counters and"" sampling counters in ms")
TColumnType ToThrift() const
const RowDescriptor * row_desc_
uint8_t * Allocate(int size)
ImpalaInternalServiceClientCache * impalad_client_cache()