Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
data-stream-test.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <boost/thread/thread.hpp>
16 #include <gtest/gtest.h>
17 
18 #include "common/init.h"
19 #include "common/logging.h"
20 #include "common/status.h"
21 #include "codegen/llvm-codegen.h"
22 #include "exprs/slot-ref.h"
23 #include "rpc/auth-provider.h"
24 #include "rpc/thrift-server.h"
25 #include "runtime/row-batch.h"
26 #include "runtime/runtime-state.h"
30 #include "runtime/descriptors.h"
31 #include "runtime/client-cache.h"
32 #include "runtime/raw-value.h"
33 #include "service/fe-support.h"
34 #include "util/cpu-info.h"
35 #include "util/disk-info.h"
36 #include "util/debug-util.h"
37 #include "util/thread.h"
38 #include "util/time.h"
39 #include "util/mem-info.h"
40 #include "util/test-info.h"
41 #include "util/tuple-row-compare.h"
42 #include "gen-cpp/ImpalaInternalService.h"
43 #include "gen-cpp/ImpalaInternalService_types.h"
44 #include "gen-cpp/Types_types.h"
45 #include "gen-cpp/Descriptors_types.h"
46 #include "service/fe-support.h"
47 
48 #include <iostream>
49 
50 #include "common/names.h"
51 
52 using namespace impala;
53 using namespace apache::thrift;
54 using namespace apache::thrift::protocol;
55 
56 DEFINE_int32(port, 20001, "port on which to run Impala test backend");
57 DECLARE_string(principal);
58 
59 namespace impala {
60 
62  public:
63  ImpalaTestBackend(DataStreamMgr* stream_mgr): mgr_(stream_mgr) {}
64  virtual ~ImpalaTestBackend() {}
65 
66  virtual void ExecPlanFragment(
67  TExecPlanFragmentResult& return_val, const TExecPlanFragmentParams& params) {}
68 
69  virtual void ReportExecStatus(
70  TReportExecStatusResult& return_val, const TReportExecStatusParams& params) {}
71 
72  virtual void CancelPlanFragment(
73  TCancelPlanFragmentResult& return_val, const TCancelPlanFragmentParams& params) {}
74 
75  virtual void TransmitData(
76  TTransmitDataResult& return_val, const TTransmitDataParams& params) {
77  if (!params.eos) {
78  mgr_->AddData(params.dest_fragment_instance_id, params.dest_node_id,
79  params.row_batch, params.sender_id).SetTStatus(&return_val);
80  } else {
81  mgr_->CloseSender(params.dest_fragment_instance_id, params.dest_node_id,
82  params.sender_id).SetTStatus(&return_val);
83  }
84  }
85 
86  private:
88 };
89 
90 class DataStreamTest : public testing::Test {
91  protected:
93  : runtime_state_(TPlanFragmentInstanceCtx(), "", &exec_env_),
94  next_val_(0) {
95  // Initialize Mem trackers for use by the data stream receiver.
97  runtime_state_.InitMemTrackers(TUniqueId(), NULL, -1);
98  }
99 
100  virtual void SetUp() {
101  CreateRowDesc();
102  CreateTupleComparator();
103  CreateRowBatch();
104 
105  next_instance_id_.lo = 0;
106  next_instance_id_.hi = 0;
107  stream_mgr_ = new DataStreamMgr();
108 
109  broadcast_sink_.dest_node_id = DEST_NODE_ID;
110  broadcast_sink_.output_partition.type = TPartitionType::UNPARTITIONED;
111 
112  random_sink_.dest_node_id = DEST_NODE_ID;
113  random_sink_.output_partition.type = TPartitionType::RANDOM;
114 
115  hash_sink_.dest_node_id = DEST_NODE_ID;
116  hash_sink_.output_partition.type = TPartitionType::HASH_PARTITIONED;
117  // there's only one column to partition on
118  TExprNode expr_node;
119  expr_node.node_type = TExprNodeType::SLOT_REF;
120  expr_node.type.types.push_back(TTypeNode());
121  expr_node.type.types.back().__isset.scalar_type = true;
122  expr_node.type.types.back().scalar_type.type = TPrimitiveType::BIGINT;
123  expr_node.num_children = 0;
124  TSlotRef slot_ref;
125  slot_ref.slot_id = 0;
126  expr_node.__set_slot_ref(slot_ref);
127  TExpr expr;
128  expr.nodes.push_back(expr_node);
129  hash_sink_.output_partition.__isset.partition_exprs = true;
130  hash_sink_.output_partition.partition_exprs.push_back(expr);
131 
132  // Ensure that individual sender info addresses don't change
133  sender_info_.reserve(MAX_SENDERS);
134  receiver_info_.reserve(MAX_RECEIVERS);
135  StartBackend();
136  }
137 
138  const TDataStreamSink& GetSink(TPartitionType::type partition_type) {
139  switch (partition_type) {
140  case TPartitionType::UNPARTITIONED: return broadcast_sink_;
141  case TPartitionType::RANDOM: return random_sink_;
142  case TPartitionType::HASH_PARTITIONED: return hash_sink_;
143  default: DCHECK(false) << "Unhandled sink type: " << partition_type;
144  }
145  // Should never reach this.
146  return broadcast_sink_;
147  }
148 
149  virtual void TearDown() {
150  lhs_slot_ctx_->Close(NULL);
151  rhs_slot_ctx_->Close(NULL);
153  StopBackend();
154  }
155 
156  void Reset() {
157  sender_info_.clear();
158  receiver_info_.clear();
159  dest_.clear();
160  }
161 
162  // We reserve contiguous memory for senders in SetUp. If a test uses more
163  // senders, a DCHECK will fail and you should increase this value.
164  static const int MAX_SENDERS = 16;
165  static const int MAX_RECEIVERS = 16;
166  static const PlanNodeId DEST_NODE_ID = 1;
167  static const int BATCH_CAPACITY = 100; // rows
168  static const int PER_ROW_DATA = 8;
169  static const int TOTAL_DATA_SIZE = 8 * 1024;
170  static const int NUM_BATCHES = TOTAL_DATA_SIZE / BATCH_CAPACITY / PER_ROW_DATA;
171 
180  TUniqueId next_instance_id_;
181  string stmt_;
182 
183  // RowBatch generation
184  scoped_ptr<RowBatch> batch_;
186  int64_t* tuple_mem_;
187 
188  // receiving node
191 
192  // sending node(s)
193  TDataStreamSink broadcast_sink_;
194  TDataStreamSink random_sink_;
195  TDataStreamSink hash_sink_;
196  vector<TPlanFragmentDestination> dest_;
197 
198  struct SenderInfo {
199  thread* thread_handle;
202 
203  SenderInfo(): thread_handle(NULL), num_bytes_sent(0) {}
204  };
205  vector<SenderInfo> sender_info_;
206 
207  struct ReceiverInfo {
208  TPartitionType::type stream_type;
211 
212  thread* thread_handle;
213  shared_ptr<DataStreamRecvr> stream_recvr;
216  multiset<int64_t> data_values;
217 
218  ReceiverInfo(TPartitionType::type stream_type, int num_senders, int receiver_num)
219  : stream_type(stream_type),
220  num_senders(num_senders),
221  receiver_num(receiver_num),
222  thread_handle(NULL),
223  num_rows_received(0) {}
224 
226  delete thread_handle;
227  stream_recvr.reset();
228  }
229  };
230  vector<ReceiverInfo> receiver_info_;
231 
232  // Create an instance id and add it to dest_
233  void GetNextInstanceId(TUniqueId* instance_id) {
234  dest_.push_back(TPlanFragmentDestination());
235  TPlanFragmentDestination& dest = dest_.back();
236  dest.fragment_instance_id = next_instance_id_;
237  dest.server.hostname = "127.0.0.1";
238  dest.server.port = FLAGS_port;
239  *instance_id = next_instance_id_;
240  ++next_instance_id_.lo;
241  }
242 
243  // RowDescriptor to mimic "select bigint_col from alltypesagg", except the slot
244  // isn't nullable
245  void CreateRowDesc() {
246  // create DescriptorTbl
247  TTupleDescriptor tuple_desc;
248  tuple_desc.__set_id(0);
249  tuple_desc.__set_byteSize(8);
250  tuple_desc.__set_numNullBytes(0);
251  TDescriptorTable thrift_desc_tbl;
252  thrift_desc_tbl.tupleDescriptors.push_back(tuple_desc);
253  TSlotDescriptor slot_desc;
254  slot_desc.__set_id(0);
255  slot_desc.__set_parent(0);
256  ColumnType type(TYPE_BIGINT);
257  slot_desc.__set_slotType(type.ToThrift());
258  slot_desc.__set_columnPath(vector<int>(1, 0));
259  slot_desc.__set_byteOffset(0);
260  slot_desc.__set_nullIndicatorByte(0);
261  slot_desc.__set_nullIndicatorBit(-1);
262  slot_desc.__set_slotIdx(0);
263  slot_desc.__set_isMaterialized(true);
264  thrift_desc_tbl.slotDescriptors.push_back(slot_desc);
265  EXPECT_TRUE(DescriptorTbl::Create(&obj_pool_, thrift_desc_tbl, &desc_tbl_).ok());
266  runtime_state_.set_desc_tbl(desc_tbl_);
267 
268  vector<TTupleId> row_tids;
269  row_tids.push_back(0);
270  vector<bool> nullable_tuples;
271  nullable_tuples.push_back(false);
272  row_desc_ = obj_pool_.Add(new RowDescriptor(*desc_tbl_, row_tids, nullable_tuples));
273  }
274 
275  // Create a tuple comparator to sort in ascending order on the single bigint column.
277  SlotRef* lhs_slot = obj_pool_.Add(new SlotRef(TYPE_BIGINT, 0));
278  lhs_slot_ctx_ = obj_pool_.Add(new ExprContext(lhs_slot));
279  SlotRef* rhs_slot = obj_pool_.Add(new SlotRef(TYPE_BIGINT, 0));
280  rhs_slot_ctx_ = obj_pool_.Add(new ExprContext(rhs_slot));
281 
282  lhs_slot_ctx_->Prepare(NULL, *row_desc_, &tracker_);
283  rhs_slot_ctx_->Prepare(NULL, *row_desc_, &tracker_);
284  lhs_slot_ctx_->Open(NULL);
285  rhs_slot_ctx_->Open(NULL);
286  less_than_ = obj_pool_.Add(new TupleRowComparator(
287  vector<ExprContext*>(1, lhs_slot_ctx_), vector<ExprContext*>(1, rhs_slot_ctx_),
288  vector<bool>(1, true), vector<bool>(1, false)));
289  }
290 
291  // Create batch_, but don't fill it with data yet. Assumes we created row_desc_.
293  RowBatch* batch = new RowBatch(*row_desc_, BATCH_CAPACITY, &tracker_);
294  int64_t* tuple_mem = reinterpret_cast<int64_t*>(
295  batch->tuple_data_pool()->Allocate(BATCH_CAPACITY * 8));
296  bzero(tuple_mem, BATCH_CAPACITY * 8);
297  for (int i = 0; i < BATCH_CAPACITY; ++i) {
298  int idx = batch->AddRow();
299  TupleRow* row = batch->GetRow(idx);
300  row->SetTuple(0, reinterpret_cast<Tuple*>(&tuple_mem[i]));
301  batch->CommitLastRow();
302  }
303  return batch;
304  }
305 
306  void GetNextBatch(RowBatch* batch, int* next_val) {
307  for (int i = 0; i < BATCH_CAPACITY; ++i) {
308  TupleRow* row = batch->GetRow(i);
309  int64_t* val = reinterpret_cast<int64_t*>(row->GetTuple(0)->GetSlot(0));
310  *val = (*next_val)++;
311  }
312  }
313 
314  // Start receiver (expecting given number of senders) in separate thread.
315  void StartReceiver(TPartitionType::type stream_type, int num_senders, int receiver_num,
316  int buffer_size, bool is_merging, TUniqueId* out_id = NULL) {
317  VLOG_QUERY << "start receiver";
318  RuntimeProfile* profile =
319  obj_pool_.Add(new RuntimeProfile(&obj_pool_, "TestReceiver"));
320  TUniqueId instance_id;
321  GetNextInstanceId(&instance_id);
322  receiver_info_.push_back(ReceiverInfo(stream_type, num_senders, receiver_num));
323  ReceiverInfo& info = receiver_info_.back();
324  info.stream_recvr =
325  stream_mgr_->CreateRecvr(&runtime_state_,
326  *row_desc_, instance_id, DEST_NODE_ID, num_senders, buffer_size, profile,
327  is_merging);
328  if (!is_merging) {
329  info.thread_handle = new thread(&DataStreamTest::ReadStream, this, &info);
330  } else {
331  info.thread_handle = new thread(&DataStreamTest::ReadStreamMerging, this, &info,
332  profile);
333  }
334  if (out_id != NULL) *out_id = instance_id;
335  }
336 
337  void JoinReceivers() {
338  VLOG_QUERY << "join receiver\n";
339  for (int i = 0; i < receiver_info_.size(); ++i) {
340  receiver_info_[i].thread_handle->join();
341  receiver_info_[i].stream_recvr->Close();
342  }
343  }
344 
345  // Deplete stream and print batches
346  void ReadStream(ReceiverInfo* info) {
347  RowBatch* batch;
348  VLOG_QUERY << "start reading";
349  while (!(info->status = info->stream_recvr->GetBatch(&batch)).IsCancelled() &&
350  (batch != NULL)) {
351  VLOG_QUERY << "read batch #rows=" << batch->num_rows();
352  for (int i = 0; i < batch->num_rows(); ++i) {
353  TupleRow* row = batch->GetRow(i);
354  info->data_values.insert(*static_cast<int64_t*>(row->GetTuple(0)->GetSlot(0)));
355  }
356  SleepForMs(100); // slow down receiver to exercise buffering logic
357  }
358  if (info->status.IsCancelled()) VLOG_QUERY << "reader is cancelled";
359  VLOG_QUERY << "done reading";
360  }
361 
363  info->status = info->stream_recvr->CreateMerger(*less_than_);
364  if (info->status.IsCancelled()) return;
365  RowBatch batch(*row_desc_, 1024, &tracker_);
366  VLOG_QUERY << "start reading merging";
367  bool eos;
368  while (!(info->status = info->stream_recvr->GetNext(&batch, &eos)).IsCancelled()) {
369  VLOG_QUERY << "read batch #rows=" << batch.num_rows();
370  for (int i = 0; i < batch.num_rows(); ++i) {
371  TupleRow* row = batch.GetRow(i);
372  info->data_values.insert(*static_cast<int64_t*>(row->GetTuple(0)->GetSlot(0)));
373  }
374  SleepForMs(100);
375  batch.Reset();
376  if (eos) break;
377  }
378  if (info->status.IsCancelled()) VLOG_QUERY << "reader is cancelled";
379  VLOG_QUERY << "done reading";
380  }
381 
382  // Verify correctness of receivers' data values.
383  void CheckReceivers(TPartitionType::type stream_type, int num_senders) {
384  int64_t total = 0;
385  multiset<int64_t> all_data_values;
386  for (int i = 0; i < receiver_info_.size(); ++i) {
387  ReceiverInfo& info = receiver_info_[i];
388  EXPECT_TRUE(info.status.ok());
389  total += info.data_values.size();
390  DCHECK_EQ(info.stream_type, stream_type);
391  DCHECK_EQ(info.num_senders, num_senders);
392  if (stream_type == TPartitionType::UNPARTITIONED) {
393  EXPECT_EQ(
394  NUM_BATCHES * BATCH_CAPACITY * num_senders, info.data_values.size());
395  }
396  all_data_values.insert(info.data_values.begin(), info.data_values.end());
397 
398  int k = 0;
399  for (multiset<int64_t>::iterator j = info.data_values.begin();
400  j != info.data_values.end(); ++j, ++k) {
401  if (stream_type == TPartitionType::UNPARTITIONED) {
402  // unpartitioned streams contain all values as many times as there are
403  // senders
404  EXPECT_EQ(k / num_senders, *j);
405  } else if (stream_type == TPartitionType::HASH_PARTITIONED) {
406  // hash-partitioned streams send values to the right partition
407  int64_t value = *j;
408  uint32_t hash_val =
410  EXPECT_EQ(hash_val % receiver_info_.size(), info.receiver_num);
411  }
412  }
413  }
414 
415  if (stream_type == TPartitionType::HASH_PARTITIONED) {
416  EXPECT_EQ(NUM_BATCHES * BATCH_CAPACITY * num_senders, total);
417 
418  int k = 0;
419  for (multiset<int64_t>::iterator j = all_data_values.begin();
420  j != all_data_values.end(); ++j, ++k) {
421  // each sender sent all values
422  EXPECT_EQ(k / num_senders, *j);
423  if (k/num_senders != *j) break;
424  }
425  }
426  }
427 
428  void CheckSenders() {
429  for (int i = 0; i < sender_info_.size(); ++i) {
430  EXPECT_TRUE(sender_info_[i].status.ok());
431  EXPECT_GT(sender_info_[i].num_bytes_sent, 0);
432  }
433  }
434 
435  // Start backend in separate thread.
436  void StartBackend() {
437  shared_ptr<ImpalaTestBackend> handler(new ImpalaTestBackend(stream_mgr_));
438  shared_ptr<TProcessor> processor(new ImpalaInternalServiceProcessor(handler));
439  server_ = new ThriftServer("DataStreamTest backend", processor, FLAGS_port, NULL);
440  server_->Start();
441  }
442 
443  void StopBackend() {
444  VLOG_QUERY << "stop backend\n";
445  server_->StopForTesting();
446  delete server_;
447  }
448 
449  void StartSender(TPartitionType::type partition_type = TPartitionType::UNPARTITIONED,
450  int channel_buffer_size = 1024) {
451  VLOG_QUERY << "start sender";
452  int num_senders = sender_info_.size();
453  DCHECK_LT(num_senders, MAX_SENDERS);
454  sender_info_.push_back(SenderInfo());
455  SenderInfo& info = sender_info_.back();
456  info.thread_handle =
457  new thread(&DataStreamTest::Sender, this, num_senders, channel_buffer_size,
458  partition_type);
459  }
460 
461  void JoinSenders() {
462  VLOG_QUERY << "join senders\n";
463  for (int i = 0; i < sender_info_.size(); ++i) {
464  sender_info_[i].thread_handle->join();
465  }
466  }
467 
468  void Sender(int sender_num, int channel_buffer_size,
469  TPartitionType::type partition_type) {
470  RuntimeState state(TPlanFragmentInstanceCtx(), "", &exec_env_);
471  state.set_desc_tbl(desc_tbl_);
472  state.InitMemTrackers(TUniqueId(), NULL, -1);
473  VLOG_QUERY << "create sender " << sender_num;
474  const TDataStreamSink& sink = GetSink(partition_type);
475  DataStreamSender sender(
476  &obj_pool_, sender_num, *row_desc_, sink, dest_, channel_buffer_size);
477  EXPECT_TRUE(sender.Prepare(&state).ok());
478  EXPECT_TRUE(sender.Open(&state).ok());
479  scoped_ptr<RowBatch> batch(CreateRowBatch());
480  SenderInfo& info = sender_info_[sender_num];
481  int next_val = 0;
482  for (int i = 0; i < NUM_BATCHES; ++i) {
483  GetNextBatch(batch.get(), &next_val);
484  VLOG_QUERY << "sender " << sender_num << ": #rows=" << batch->num_rows();
485  info.status = sender.Send(&state, batch.get(), false);
486  if (!info.status.ok()) break;
487  }
488  VLOG_QUERY << "closing sender" << sender_num;
489  sender.Close(&state);
490  info.num_bytes_sent = sender.GetNumDataBytesSent();
491 
492  batch->Reset();
493  }
494 
495  void TestStream(TPartitionType::type stream_type, int num_senders,
496  int num_receivers, int buffer_size, bool is_merging) {
497  VLOG_QUERY << "Testing stream=" << stream_type << " #senders=" << num_senders
498  << " #receivers=" << num_receivers << " buffer_size=" << buffer_size
499  << " is_merging=" << is_merging;
500  Reset();
501  for (int i = 0; i < num_receivers; ++i) {
502  StartReceiver(stream_type, num_senders, i, buffer_size, is_merging);
503  }
504  for (int i = 0; i < num_senders; ++i) {
505  StartSender(stream_type, buffer_size);
506  }
507  JoinSenders();
508  CheckSenders();
509  JoinReceivers();
510  CheckReceivers(stream_type, num_senders);
511  }
512 
513  private:
516 };
517 
518 TEST_F(DataStreamTest, UnknownSenderSmallResult) {
519  // starting a sender w/o a corresponding receiver does not result in an error because
520  // we cannot distinguish whether a receiver was never created or the receiver
521  // willingly tore down the stream
522  // case 1: entire query result fits in single buffer, close() returns ok
523  TUniqueId dummy_id;
524  GetNextInstanceId(&dummy_id);
525  StartSender(TPartitionType::UNPARTITIONED, TOTAL_DATA_SIZE + 1024);
526  JoinSenders();
527  EXPECT_TRUE(sender_info_[0].status.ok());
528  EXPECT_GT(sender_info_[0].num_bytes_sent, 0);
529 }
530 
531 TEST_F(DataStreamTest, UnknownSenderLargeResult) {
532  // case 2: query result requires multiple buffers, send() returns ok
533  TUniqueId dummy_id;
534  GetNextInstanceId(&dummy_id);
535  StartSender();
536  JoinSenders();
537  EXPECT_TRUE(sender_info_[0].status.ok());
538  EXPECT_GT(sender_info_[0].num_bytes_sent, 0);
539 }
540 
542  TUniqueId instance_id;
543  StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, false, &instance_id);
544  stream_mgr_->Cancel(instance_id);
545  StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, true, &instance_id);
546  stream_mgr_->Cancel(instance_id);
547  JoinReceivers();
548  EXPECT_TRUE(receiver_info_[0].status.IsCancelled());
549  EXPECT_TRUE(receiver_info_[1].status.IsCancelled());
550 }
551 
552 TEST_F(DataStreamTest, BasicTest) {
553  // TODO: also test that all client connections have been returned
554  TPartitionType::type stream_types[] =
555  {TPartitionType::UNPARTITIONED, TPartitionType::RANDOM,
556  TPartitionType::HASH_PARTITIONED};
557  int sender_nums[] = {1, 4};
558  int receiver_nums[] = {1, 4};
559  int buffer_sizes[] = {1024, 1024 * 1024};
560  bool merging[] = {false, true};
561  for (int i = 0; i < sizeof(stream_types) / sizeof(*stream_types); ++i) {
562  for (int j = 0; j < sizeof(sender_nums) / sizeof(int); ++j) {
563  for (int k = 0; k < sizeof(receiver_nums) / sizeof(int); ++k) {
564  for (int l = 0; l < sizeof(buffer_sizes) / sizeof(int); ++l) {
565  for (int m = 0; m < sizeof(merging) / sizeof(bool); ++m) {
566  TestStream(stream_types[i], sender_nums[j], receiver_nums[k],
567  buffer_sizes[l], merging[m]);
568  }
569  }
570  }
571  }
572  }
573 }
574 
575 // TODO: more tests:
576 // - test case for transmission error in last batch
577 // - receivers getting created concurrently
578 
579 }
580 
581 int main(int argc, char **argv) {
582  ::testing::InitGoogleTest(&argc, argv);
583  InitCommonRuntime(argc, argv, true, TestInfo::BE_TEST);
584  InitFeSupport();
586  return RUN_ALL_TESTS();
587 }
void StartReceiver(TPartitionType::type stream_type, int num_senders, int receiver_num, int buffer_size, bool is_merging, TUniqueId *out_id=NULL)
vector< TPlanFragmentDestination > dest_
int num_rows() const
Definition: row-batch.h:215
DataStreamMgr * stream_mgr_
int PlanNodeId
Definition: global-types.h:26
void InitFeSupport()
Definition: fe-support.cc:346
TEST_F(InstructionCounterTest, Count)
Tuple * GetTuple(int tuple_idx)
Definition: tuple-row.h:30
void TestShutdown()
For testing only: shutdown all clients.
Definition: client-cache.h:287
const StringSearch UrlParser::protocol_search & protocol
Definition: url-parser.cc:36
void CheckReceivers(TPartitionType::type stream_type, int num_senders)
Status InitForFeTests()
Initializes the exec env for running FE tests.
Definition: exec-env.cc:276
boost::scoped_ptr< ObjectPool > obj_pool_
Object pool owned by the coordinator. Any executor will have its own pool.
Definition: coordinator.h:296
void InitMemTrackers(const TUniqueId &query_id, const std::string *request_pool, int64_t query_bytes_limit, int64_t query_rm_reservation_limit_bytes=-1)
const TDataStreamSink & GetSink(TPartitionType::type partition_type)
void set_desc_tbl(DescriptorTbl *desc_tbl)
Definition: runtime-state.h:94
void InitCommonRuntime(int argc, char **argv, bool init_jvm, TestInfo::Mode m=TestInfo::NON_TEST)
Definition: init.cc:122
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
void ReadStreamMerging(ReceiverInfo *info, RuntimeProfile *profile)
virtual Status Prepare(RuntimeState *state)
void TestStream(TPartitionType::type stream_type, int num_senders, int num_receivers, int buffer_size, bool is_merging)
TDescriptorTable desc_tbl_
copied from TQueryExecRequest; constant across all fragments
Definition: coordinator.h:197
void * GetSlot(int offset)
Definition: tuple.h:118
shared_ptr< DataStreamRecvr > stream_recvr
TDataStreamSink broadcast_sink_
void GetNextBatch(RowBatch *batch, int *next_val)
virtual Status Open(RuntimeState *state)
void Cancel(const Status *cause=NULL)
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
Definition: time.cc:21
static uint32_t GetHashValueFnv(const void *v, const ColumnType &type, uint32_t seed)
Definition: raw-value.h:196
DECLARE_string(principal)
virtual void ExecPlanFragment(TExecPlanFragmentResult &return_val, const TExecPlanFragmentParams &params)
ImpalaTestBackend(DataStreamMgr *stream_mgr)
#define VLOG_QUERY
Definition: logging.h:57
virtual void CancelPlanFragment(TCancelPlanFragmentResult &return_val, const TCancelPlanFragmentParams &params)
void Sender(int sender_num, int channel_buffer_size, TPartitionType::type partition_type)
void Reset()
Resets the row batch, returning all resources it has accumulated.
Definition: row-batch.cc:224
bool IsCancelled() const
Definition: status.h:174
virtual Status Send(RuntimeState *state, RowBatch *batch, bool eos)
TDataStreamSink hash_sink_
void StartSender(TPartitionType::type partition_type=TPartitionType::UNPARTITIONED, int channel_buffer_size=1024)
void ReadStream(ReceiverInfo *info)
int64_t GetNumDataBytesSent() const
TDataStreamSink random_sink_
vector< ReceiverInfo > receiver_info_
virtual void ReportExecStatus(TReportExecStatusResult &return_val, const TReportExecStatusParams &params)
TupleRowComparator * less_than_
virtual void Close(RuntimeState *state)
This class is thread-safe.
Definition: mem-tracker.h:61
void CommitLastRow()
Definition: row-batch.h:109
ReceiverInfo(TPartitionType::type stream_type, int num_senders, int receiver_num)
MemPool * tuple_data_pool()
Definition: row-batch.h:148
virtual void TransmitData(TTransmitDataResult &return_val, const TTransmitDataParams &params)
static const uint32_t FNV_SEED
Definition: hash-util.h:99
uint64_t Test(T *ht, const ProbeTuple *input, uint64_t num_tuples)
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
static void InitializeLlvm(bool load_backend=false)
Definition: llvm-codegen.cc:78
vector< SenderInfo > sender_info_
scoped_ptr< RowBatch > batch_
void GetNextInstanceId(TUniqueId *instance_id)
Reference to a single slot of a tuple.
Definition: slot-ref.h:23
const RowDescriptor * row_desc_
owned by plan root, which resides in runtime_state_'s pool
Definition: coordinator.h:255
int main(int argc, char **argv)
static Status Create(ObjectPool *pool, const TDescriptorTable &thrift_tbl, DescriptorTbl **tbl)
Definition: descriptors.cc:378
ExecEnv * exec_env_
Definition: coordinator.h:193
DEFINE_int32(periodic_counter_update_period_ms, 500,"Period to update rate counters and"" sampling counters in ms")
TColumnType ToThrift() const
Definition: types.h:147
const RowDescriptor * row_desc_
bool ok() const
Definition: status.h:172
uint8_t * Allocate(int size)
Definition: mem-pool.h:92
ImpalaInternalServiceClientCache * impalad_client_cache()
Definition: exec-env.h:76