Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
buffered-tuple-stream-test.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <boost/scoped_ptr.hpp>
16 #include <boost/bind.hpp>
17 #include <boost/filesystem.hpp>
18 
19 #include <gtest/gtest.h>
20 
21 #include <string>
22 #include <limits> // for std::numeric_limits<int>::max()
23 
24 #include "codegen/llvm-codegen.h"
25 #include "common/init.h"
27 #include "runtime/row-batch.h"
28 #include "runtime/tmp-file-mgr.h"
29 #include "runtime/string-value.h"
30 #include "service/fe-support.h"
32 #include "util/test-info.h"
33 
34 #include "gen-cpp/Types_types.h"
35 #include "gen-cpp/ImpalaInternalService_types.h"
36 
37 #include "common/names.h"
38 
39 const int BATCH_SIZE = 250;
40 
41 namespace impala {
42 
43 static const StringValue STRINGS[] = {
44  StringValue("ABC"),
45  StringValue("HELLO"),
46  StringValue("123456789"),
47  StringValue("FOOBAR"),
48  StringValue("ONE"),
49  StringValue("THREE"),
50  StringValue("abcdefghijklmno"),
51  StringValue("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
52  StringValue("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"),
53 };
54 
55 static const int NUM_STRINGS = sizeof(STRINGS) / sizeof(StringValue);
56 
58  protected:
59  virtual void SetUp() {
60  exec_env_.reset(new ExecEnv);
61  exec_env_->disk_io_mgr()->Init(&tracker_);
62  runtime_state_.reset(
63  new RuntimeState(TPlanFragmentInstanceCtx(), "", exec_env_.get()));
64 
66 
67  mem_pool_.reset(new MemPool(&tracker_));
68  }
69 
70  virtual void CreateDescriptors() {
71  vector<bool> nullable_tuples(1, false);
72  vector<TTupleId> tuple_ids(1, static_cast<TTupleId>(0));
73 
74  DescriptorTblBuilder int_builder(&pool_);
75  int_builder.DeclareTuple() << TYPE_INT;
77  *int_builder.Build(), tuple_ids, nullable_tuples));
78 
79  DescriptorTblBuilder string_builder(&pool_);
80  string_builder.DeclareTuple() << TYPE_STRING;
82  *string_builder.Build(), tuple_ids, nullable_tuples));
83  }
84 
85  virtual void TearDown() {
86  block_mgr_.reset();
88  runtime_state_.reset();
89  exec_env_.reset();
90  mem_pool_->FreeAll();
91  }
92 
93  void CreateMgr(int64_t limit, int block_size) {
95  &tracker_, runtime_state_->runtime_profile(), limit, block_size, &block_mgr_);
96  EXPECT_TRUE(status.ok());
97  status = block_mgr_->RegisterClient(0, &tracker_, runtime_state_.get(), &client_);
98  EXPECT_TRUE(status.ok());
99  }
100 
101  virtual RowBatch* CreateIntBatch(int start_val, int num_rows, bool gen_null) {
102  RowBatch* batch = pool_.Add(new RowBatch(*int_desc_, num_rows, &tracker_));
103  int* tuple_mem = reinterpret_cast<int*>(
104  batch->tuple_data_pool()->Allocate(sizeof(int) * num_rows));
105  const int int_tuples = int_desc_->tuple_descriptors().size();
106  if (int_tuples > 1) {
107  for (int i = 0; i < num_rows; ++i) {
108  int idx = batch->AddRow();
109  TupleRow* row = batch->GetRow(idx);
110  tuple_mem[i] = i + start_val;
111  for (int j = 0; j < int_tuples; ++j) {
112  if (!gen_null || (j % 2) == 0) {
113  row->SetTuple(j, reinterpret_cast<Tuple*>(&tuple_mem[i]));
114  } else {
115  row->SetTuple(j, NULL);
116  }
117  }
118  batch->CommitLastRow();
119  }
120  } else {
121  for (int i = 0; i < num_rows; ++i) {
122  int idx = batch->AddRow();
123  TupleRow* row = batch->GetRow(idx);
124  tuple_mem[i] = i + start_val;
125  if (!gen_null || (i % 2) == 0) {
126  row->SetTuple(0, reinterpret_cast<Tuple*>(&tuple_mem[i]));
127  } else {
128  row->SetTuple(0, NULL);
129  }
130  batch->CommitLastRow();
131  }
132  }
133  return batch;
134  }
135 
136  virtual RowBatch* CreateStringBatch(int string_idx, int num_rows, bool gen_null) {
137  int tuple_size = sizeof(StringValue) + 1;
138  RowBatch* batch = pool_.Add(new RowBatch(*string_desc_, num_rows, &tracker_));
139  uint8_t* tuple_mem = batch->tuple_data_pool()->Allocate(tuple_size * num_rows);
140  memset(tuple_mem, 0, tuple_size * num_rows);
141  const int string_tuples = string_desc_->tuple_descriptors().size();
142  if (string_tuples > 1) {
143  for (int i = 0; i < num_rows; ++i) {
144  TupleRow* row = batch->GetRow(batch->AddRow());
145  string_idx %= NUM_STRINGS;
146  *reinterpret_cast<StringValue*>(tuple_mem + 1) = STRINGS[string_idx];
147  ++string_idx;
148  for (int j = 0; j < string_tuples; ++j) {
149  if (!gen_null || (j % 2) == 0) {
150  row->SetTuple(j, reinterpret_cast<Tuple*>(tuple_mem));
151  } else {
152  row->SetTuple(j, NULL);
153  }
154  }
155  batch->CommitLastRow();
156  tuple_mem += tuple_size;
157  }
158  } else {
159  for (int i = 0; i < num_rows; ++i) {
160  TupleRow* row = batch->GetRow(batch->AddRow());
161  string_idx %= NUM_STRINGS;
162  *reinterpret_cast<StringValue*>(tuple_mem + 1) = STRINGS[string_idx];
163  ++string_idx;
164  if (!gen_null || (i % 2) == 0) {
165  row->SetTuple(0, reinterpret_cast<Tuple*>(tuple_mem));
166  } else {
167  row->SetTuple(0, NULL);
168  }
169  batch->CommitLastRow();
170  tuple_mem += tuple_size;
171  }
172  }
173  return batch;
174  }
175 
176  void AppendRowTuples(TupleRow* row, vector<int>* results) {
177  DCHECK_NOTNULL(row);
178  const int int_tuples = int_desc_->tuple_descriptors().size();
179  for (int i = 0; i < int_tuples; ++i) {
180  AppendValue(row->GetTuple(i), results);
181  }
182  }
183 
184  void AppendRowTuples(TupleRow* row, vector<StringValue>* results) {
185  DCHECK_NOTNULL(row);
186  const int string_tuples = string_desc_->tuple_descriptors().size();
187  for (int i = 0; i < string_tuples; ++i) {
188  AppendValue(row->GetTuple(i), results);
189  }
190  }
191 
192  void AppendValue(Tuple* t, vector<int>* results) {
193  if (t == NULL) {
194  // For the tests indicate null-ability using the max int value
195  results->push_back(std::numeric_limits<int>::max());
196  } else {
197  results->push_back(*reinterpret_cast<int*>(t));
198  }
199  }
200 
201  void AppendValue(Tuple* t, vector<StringValue>* results) {
202  if (t == NULL) {
203  results->push_back(StringValue());
204  } else {
205  uint8_t* mem = reinterpret_cast<uint8_t*>(t);
206  StringValue sv = *reinterpret_cast<StringValue*>(mem + 1);
207  uint8_t* copy = mem_pool_->Allocate(sv.len);
208  memcpy(copy, sv.ptr, sv.len);
209  sv.ptr = reinterpret_cast<char*>(copy);
210  results->push_back(sv);
211  }
212  }
213 
214  template <typename T>
215  void ReadValues(BufferedTupleStream* stream, RowDescriptor* desc, vector<T>* results,
216  int num_batches = -1) {
217  bool eos = false;
218  RowBatch batch(*desc, BATCH_SIZE, &tracker_);
219  int batches_read = 0;
220  do {
221  batch.Reset();
222  Status status = stream->GetNext(&batch, &eos);
223  EXPECT_TRUE(status.ok());
224  ++batches_read;
225  for (int i = 0; i < batch.num_rows(); ++i) {
226  AppendRowTuples(batch.GetRow(i), results);
227  }
228  } while (!eos && (num_batches < 0 || batches_read <= num_batches));
229  }
230 
231  virtual void VerifyResults(const vector<int>& results, int exp_rows, bool gen_null) {
232  const int int_tuples = int_desc_->tuple_descriptors().size();
233  EXPECT_EQ(results.size(), exp_rows * int_tuples);
234  if (int_tuples > 1) {
235  for (int i = 0; i < results.size(); i += int_tuples) {
236  for (int j = 0; j < int_tuples; ++j) {
237  if (!gen_null || (j % 2) == 0) {
238  ASSERT_EQ(results[i+j], i / int_tuples)
239  << " results[" << (i + j) << "]: " << results[i + j]
240  << " != " << (i / int_tuples) << " gen_null=" << gen_null;
241  } else {
242  ASSERT_TRUE(results[i+j] == std::numeric_limits<int>::max())
243  << "i: " << i << " j: " << j << " results[" << (i + j) << "]: "
244  << results[i+j] << " != " << std::numeric_limits<int>::max();
245  }
246  }
247  }
248  } else {
249  for (int i = 0; i < results.size(); i += int_tuples) {
250  if (!gen_null || (i % 2) == 0) {
251  ASSERT_TRUE(results[i] == i)
252  << " results[" << (i) << "]: " << results[i]
253  << " != " << i << " gen_null=" << gen_null;
254  } else {
255  ASSERT_TRUE(results[i] == std::numeric_limits<int>::max())
256  << "i: " << i << " results[" << i << "]: "
257  << results[i] << " != " << std::numeric_limits<int>::max();
258  }
259  }
260  }
261  }
262 
263  virtual void VerifyResults(const vector<StringValue>& results, int exp_rows,
264  bool gen_null) {
265  const int string_tuples = string_desc_->tuple_descriptors().size();
266  EXPECT_EQ(results.size(), exp_rows * string_tuples);
267  int idx = 0;
268  if (string_tuples > 1) {
269  for (int i = 0; i < results.size(); i += string_tuples) {
270  for (int j = 0; j < string_tuples; ++j) {
271  if (!gen_null || (j % 2) == 0) {
272  ASSERT_TRUE(results[i+j] == STRINGS[idx])
273  << "results[" << i << "+" << j << "] " << results[i+j]
274  << " != " << STRINGS[idx] << " idx=" << idx << " gen_null=" << gen_null;
275  } else {
276  ASSERT_TRUE(results[i+j] == StringValue())
277  << "results[" << i << "+" << j << "] " << results[i+j] << " not NULL";
278  }
279  }
280  idx = (idx + 1) % NUM_STRINGS;
281  }
282  } else {
283  for (int i = 0; i < results.size(); i += string_tuples) {
284  if (!gen_null || (i % 2) == 0) {
285  ASSERT_TRUE(results[i] == STRINGS[idx])
286  << "results[" << i << "] " << results[i]
287  << " != " << STRINGS[idx] << " idx=" << idx << " gen_null=" << gen_null;
288  } else {
289  ASSERT_TRUE(results[i] == StringValue())
290  << "results[" << i << "] " << results[i] << " not NULL";
291  }
292  idx = (idx + 1) % NUM_STRINGS;
293  }
294  }
295  }
296 
297  // Test adding num_batches of ints to the stream and reading them back.
298  template <typename T>
299  void TestValues(int num_batches, RowDescriptor* desc, bool gen_null) {
300  BufferedTupleStream stream(runtime_state_.get(), *desc, block_mgr_.get(), client_);
301  Status status = stream.Init();
302  ASSERT_TRUE(status.ok()) << status.GetDetail();
303  status = stream.UnpinStream();
304  ASSERT_TRUE(status.ok());
305 
306  // Add rows to the stream
307  int offset = 0;
308  for (int i = 0; i < num_batches; ++i) {
309  RowBatch* batch = NULL;
310  if (sizeof(T) == sizeof(int)) {
311  batch = CreateIntBatch(offset, BATCH_SIZE, gen_null);
312  } else if (sizeof(T) == sizeof(StringValue)) {
313  batch = CreateStringBatch(offset, BATCH_SIZE, gen_null);
314  } else {
315  DCHECK(false);
316  }
317  for (int j = 0; j < batch->num_rows(); ++j) {
318  bool b = stream.AddRow(batch->GetRow(j));
319  if (!b) {
320  ASSERT_TRUE(stream.using_small_buffers());
321  bool got_buffer;
322  status = stream.SwitchToIoBuffers(&got_buffer);
323  ASSERT_TRUE(status.ok());
324  ASSERT_TRUE(got_buffer);
325  b = stream.AddRow(batch->GetRow(j));
326  }
327  ASSERT_TRUE(b);
328  }
329  offset += batch->num_rows();
330  // Reset the batch to make sure the stream handles the memory correctly.
331  batch->Reset();
332  }
333 
334  status = stream.PrepareForRead();
335  ASSERT_TRUE(status.ok());
336 
337  // Read all the rows back
338  vector<T> results;
339  ReadValues(&stream, desc, &results);
340 
341  // Verify result
342  VerifyResults(results, BATCH_SIZE * num_batches, gen_null);
343 
344  stream.Close();
345  }
346 
347  void TestIntValuesInterleaved(int num_batches, int num_batches_before_read) {
348  for (int small_buffers = 0; small_buffers < 2; ++small_buffers) {
350  client_,
351  small_buffers == 0, // initial small buffers
352  true, // delete_on_read
353  true); // read_write
354  Status status = stream.Init();
355  ASSERT_TRUE(status.ok());
356  status = stream.UnpinStream();
357  ASSERT_TRUE(status.ok());
358 
359  vector<int> results;
360 
361  for (int i = 0; i < num_batches; ++i) {
362  RowBatch* batch = CreateIntBatch(i * BATCH_SIZE, BATCH_SIZE, false);
363  for (int j = 0; j < batch->num_rows(); ++j) {
364  bool b = stream.AddRow(batch->GetRow(j));
365  ASSERT_TRUE(b);
366  }
367  // Reset the batch to make sure the stream handles the memory correctly.
368  batch->Reset();
369  if (i % num_batches_before_read == 0) {
370  ReadValues(&stream, int_desc_, &results,
371  (rand() % num_batches_before_read) + 1);
372  }
373  }
374  ReadValues(&stream, int_desc_, &results);
375 
376  // Verify result
377  const int int_tuples = int_desc_->tuple_descriptors().size();
378  EXPECT_EQ(results.size(), BATCH_SIZE * num_batches * int_tuples);
379  for (int i = 0; i < results.size(); ++i) {
380  ASSERT_EQ(results[i], i / int_tuples);
381  }
382 
383  stream.Close();
384  }
385  }
386 
387  scoped_ptr<ExecEnv> exec_env_;
388  scoped_ptr<RuntimeState> runtime_state_;
389  scoped_ptr<MemTracker> block_mgr_parent_tracker_;
390 
391  shared_ptr<BufferedBlockMgr> block_mgr_;
393 
398  scoped_ptr<MemPool> mem_pool_;
399 }; // SimpleTupleStreamTest
400 
401 
402 // Tests with a non-NULLable tuple per row.
404  protected:
405  virtual void CreateDescriptors() {
406  vector<bool> nullable_tuples(1, true);
407  vector<TTupleId> tuple_ids(1, static_cast<TTupleId>(0));
408 
409  DescriptorTblBuilder int_builder(&pool_);
410  int_builder.DeclareTuple() << TYPE_INT;
412  *int_builder.Build(), tuple_ids, nullable_tuples));
413 
414  DescriptorTblBuilder string_builder(&pool_);
415  string_builder.DeclareTuple() << TYPE_STRING;
417  *string_builder.Build(), tuple_ids, nullable_tuples));
418  }
419 }; // SimpleNullStreamTest
420 
421 // Tests with multiple non-NULLable tuples per row.
423  protected:
424  virtual void CreateDescriptors() {
425  vector<bool> nullable_tuples;
426  nullable_tuples.push_back(false);
427  nullable_tuples.push_back(false);
428  nullable_tuples.push_back(false);
429 
430  vector<TTupleId> tuple_ids;
431  tuple_ids.push_back(static_cast<TTupleId>(0));
432  tuple_ids.push_back(static_cast<TTupleId>(1));
433  tuple_ids.push_back(static_cast<TTupleId>(2));
434 
435  DescriptorTblBuilder int_builder(&pool_);
436  int_builder.DeclareTuple() << TYPE_INT;
437  int_builder.DeclareTuple() << TYPE_INT;
438  int_builder.DeclareTuple() << TYPE_INT;
440  *int_builder.Build(), tuple_ids, nullable_tuples));
441 
442  DescriptorTblBuilder string_builder(&pool_);
443  string_builder.DeclareTuple() << TYPE_STRING;
444  string_builder.DeclareTuple() << TYPE_STRING;
445  string_builder.DeclareTuple() << TYPE_STRING;
447  *string_builder.Build(), tuple_ids, nullable_tuples));
448  }
449 };
450 
451 // Tests with multiple NULLable tuples per row.
453  protected:
454  virtual void CreateDescriptors() {
455  vector<bool> nullable_tuples;
456  nullable_tuples.push_back(false);
457  nullable_tuples.push_back(true);
458  nullable_tuples.push_back(true);
459 
460  vector<TTupleId> tuple_ids;
461  tuple_ids.push_back(static_cast<TTupleId>(0));
462  tuple_ids.push_back(static_cast<TTupleId>(1));
463  tuple_ids.push_back(static_cast<TTupleId>(2));
464 
465  DescriptorTblBuilder int_builder(&pool_);
466  int_builder.DeclareTuple() << TYPE_INT;
467  int_builder.DeclareTuple() << TYPE_INT;
468  int_builder.DeclareTuple() << TYPE_INT;
470  *int_builder.Build(), tuple_ids, nullable_tuples));
471 
472  DescriptorTblBuilder string_builder(&pool_);
473  string_builder.DeclareTuple() << TYPE_STRING;
474  string_builder.DeclareTuple() << TYPE_STRING;
475  string_builder.DeclareTuple() << TYPE_STRING;
477  *string_builder.Build(), tuple_ids, nullable_tuples));
478  }
479 };
480 
481 // Basic API test. No data should be going to disk.
483  CreateMgr(-1, 8 * 1024 * 1024);
484  TestValues<int>(1, int_desc_, false);
485  TestValues<int>(10, int_desc_, false);
486  TestValues<int>(100, int_desc_, false);
487 
488  TestValues<StringValue>(1, string_desc_, false);
489  TestValues<StringValue>(10, string_desc_, false);
490  TestValues<StringValue>(100, string_desc_, false);
491 
492  TestIntValuesInterleaved(1, 1);
493  TestIntValuesInterleaved(10, 5);
494  TestIntValuesInterleaved(100, 15);
495 }
496 
497 // Test with only 1 buffer.
498 TEST_F(SimpleTupleStreamTest, OneBufferSpill) {
499  // Each buffer can only hold 100 ints, so this spills quite often.
500  int buffer_size = 100 * sizeof(int);
501  CreateMgr(buffer_size, buffer_size);
502  TestValues<int>(1, int_desc_, false);
503  TestValues<int>(10, int_desc_, false);
504 
505  TestValues<StringValue>(1, string_desc_, false);
506  TestValues<StringValue>(10, string_desc_, false);
507 }
508 
509 // Test with a few buffers.
510 TEST_F(SimpleTupleStreamTest, ManyBufferSpill) {
511  int buffer_size = 100 * sizeof(int);
512  CreateMgr(10 * buffer_size, buffer_size);
513 
514  TestValues<int>(1, int_desc_, false);
515  TestValues<int>(10, int_desc_, false);
516  TestValues<int>(100, int_desc_, false);
517  TestValues<StringValue>(1, string_desc_, false);
518  TestValues<StringValue>(10, string_desc_, false);
519  TestValues<StringValue>(100, string_desc_, false);
520 
521  TestIntValuesInterleaved(1, 1);
522  TestIntValuesInterleaved(10, 5);
523  TestIntValuesInterleaved(100, 15);
524 }
525 
527  int buffer_size = 100 * sizeof(int);
528  CreateMgr(3 * buffer_size, buffer_size);
529 
530  BufferedTupleStream stream(runtime_state_.get(), *int_desc_, block_mgr_.get(), client_);
531  Status status = stream.Init();
532  ASSERT_TRUE(status.ok());
533 
534  int offset = 0;
535  bool full = false;
536  while (!full) {
537  RowBatch* batch = CreateIntBatch(offset, BATCH_SIZE, false);
538  int j = 0;
539  for (; j < batch->num_rows(); ++j) {
540  full = !stream.AddRow(batch->GetRow(j));
541  if (full) break;
542  }
543  offset += j;
544  }
545 
546  status = stream.UnpinStream();
547  ASSERT_TRUE(status.ok());
548 
549  bool pinned = false;
550  status = stream.PinStream(false, &pinned);
551  ASSERT_TRUE(status.ok());
552  ASSERT_TRUE(pinned);
553 
554  vector<int> results;
555 
556  // Read and verify result a few times. We should be able to reread the stream.
557  for (int i = 0; i < 3; ++i) {
558  status = stream.PrepareForRead();
559  ASSERT_TRUE(status.ok());
560  results.clear();
561  ReadValues(&stream, int_desc_, &results);
562  VerifyResults(results, offset, false);
563  }
564 
565  stream.Close();
566 }
567 
569  int buffer_size = 8 * 1024 * 1024;
570  CreateMgr(2 * buffer_size, buffer_size);
571 
572  BufferedTupleStream stream(runtime_state_.get(), *int_desc_, block_mgr_.get(), client_);
573  Status status = stream.Init(NULL, false);
574  ASSERT_TRUE(status.ok());
575 
576  // Initial buffer should be small.
577  EXPECT_LT(stream.bytes_in_mem(false), buffer_size);
578 
579  RowBatch* batch = CreateIntBatch(0, 1024, false);
580  for (int i = 0; i < batch->num_rows(); ++i) {
581  bool ret = stream.AddRow(batch->GetRow(i));
582  EXPECT_TRUE(ret);
583  }
584  EXPECT_LT(stream.bytes_in_mem(false), buffer_size);
585  EXPECT_LT(stream.byte_size(), buffer_size);
586 
587  // 40 MB of ints
588  batch = CreateIntBatch(0, 10 * 1024 * 1024, false);
589  for (int i = 0; i < batch->num_rows(); ++i) {
590  bool ret = stream.AddRow(batch->GetRow(i));
591  if (!ret) {
592  ASSERT_TRUE(stream.using_small_buffers());
593  bool got_buffer;
594  status = stream.SwitchToIoBuffers(&got_buffer);
595  ASSERT_TRUE(status.ok());
596  ASSERT_TRUE(got_buffer);
597  ret = stream.AddRow(batch->GetRow(i));
598  }
599  ASSERT_TRUE(ret);
600  }
601  EXPECT_EQ(stream.bytes_in_mem(false), buffer_size);
602 
603  stream.Close();
604 }
605 
606 // Basic API test. No data should be going to disk.
608  CreateMgr(-1, 8 * 1024 * 1024);
609  TestValues<int>(1, int_desc_, false);
610  TestValues<int>(10, int_desc_, false);
611  TestValues<int>(100, int_desc_, false);
612  TestValues<int>(1, int_desc_, true);
613  TestValues<int>(10, int_desc_, true);
614  TestValues<int>(100, int_desc_, true);
615 
616  TestValues<StringValue>(1, string_desc_, false);
617  TestValues<StringValue>(10, string_desc_, false);
618  TestValues<StringValue>(100, string_desc_, false);
619  TestValues<StringValue>(1, string_desc_, true);
620  TestValues<StringValue>(10, string_desc_, true);
621  TestValues<StringValue>(100, string_desc_, true);
622 
623  TestIntValuesInterleaved(1, 1);
624  TestIntValuesInterleaved(10, 5);
625  TestIntValuesInterleaved(100, 15);
626 }
627 
628 // Test tuple stream with only 1 buffer and rows with multiple tuples.
629 TEST_F(MultiTupleStreamTest, MultiTupleOneBufferSpill) {
630  // Each buffer can only hold 100 ints, so this spills quite often.
631  int buffer_size = 100 * sizeof(int);
632  CreateMgr(buffer_size, buffer_size);
633  TestValues<int>(1, int_desc_, false);
634  TestValues<int>(10, int_desc_, false);
635 
636  TestValues<StringValue>(1, string_desc_, false);
637  TestValues<StringValue>(10, string_desc_, false);
638 }
639 
640 // Test with a few buffers and rows with multiple tuples.
641 TEST_F(MultiTupleStreamTest, MultiTupleManyBufferSpill) {
642  int buffer_size = 100 * sizeof(int);
643  CreateMgr(10 * buffer_size, buffer_size);
644 
645  TestValues<int>(1, int_desc_, false);
646  TestValues<int>(10, int_desc_, false);
647  TestValues<int>(100, int_desc_, false);
648 
649  TestValues<StringValue>(1, string_desc_, false);
650  TestValues<StringValue>(10, string_desc_, false);
651  TestValues<StringValue>(100, string_desc_, false);
652 
653  TestIntValuesInterleaved(1, 1);
654  TestIntValuesInterleaved(10, 5);
655  TestIntValuesInterleaved(100, 15);
656 }
657 
658 // Test with rows with multiple nullable tuples.
659 TEST_F(MultiNullableTupleStreamTest, MultiNullableTupleOneBufferSpill) {
660  // Each buffer can only hold 100 ints, so this spills quite often.
661  int buffer_size = 100 * sizeof(int);
662  CreateMgr(buffer_size, buffer_size);
663  TestValues<int>(1, int_desc_, false);
664  TestValues<int>(10, int_desc_, false);
665  TestValues<int>(1, int_desc_, true);
666  TestValues<int>(10, int_desc_, true);
667 
668  TestValues<StringValue>(1, string_desc_, false);
669  TestValues<StringValue>(10, string_desc_, false);
670  TestValues<StringValue>(1, string_desc_, true);
671  TestValues<StringValue>(10, string_desc_, true);
672 }
673 
674 // Test with a few buffers.
675 TEST_F(MultiNullableTupleStreamTest, MultiNullableTupleManyBufferSpill) {
676  int buffer_size = 100 * sizeof(int);
677  CreateMgr(10 * buffer_size, buffer_size);
678 
679  TestValues<int>(1, int_desc_, false);
680  TestValues<int>(10, int_desc_, false);
681  TestValues<int>(100, int_desc_, false);
682  TestValues<int>(1, int_desc_, true);
683  TestValues<int>(10, int_desc_, true);
684  TestValues<int>(100, int_desc_, true);
685 
686  TestValues<StringValue>(1, string_desc_, false);
687  TestValues<StringValue>(10, string_desc_, false);
688  TestValues<StringValue>(100, string_desc_, false);
689  TestValues<StringValue>(1, string_desc_, true);
690  TestValues<StringValue>(10, string_desc_, true);
691  TestValues<StringValue>(100, string_desc_, true);
692 
693  TestIntValuesInterleaved(1, 1);
694  TestIntValuesInterleaved(10, 5);
695  TestIntValuesInterleaved(100, 15);
696 }
697 
698 // TODO: more tests.
699 // - The stream can operate in many modes
700 
701 }
702 
703 int main(int argc, char** argv) {
704  ::testing::InitGoogleTest(&argc, argv);
709  return RUN_ALL_TESTS();
710 }
void ReadValues(BufferedTupleStream *stream, RowDescriptor *desc, vector< T > *results, int num_batches=-1)
The underlying memory management is done by the BufferedBlockMgr.
void TestIntValuesInterleaved(int num_batches, int num_batches_before_read)
int num_rows() const
Definition: row-batch.h:215
const std::string GetDetail() const
Definition: status.cc:184
void InitFeSupport()
Definition: fe-support.cc:346
TEST_F(InstructionCounterTest, Count)
Tuple * GetTuple(int tuple_idx)
Definition: tuple-row.h:30
scoped_ptr< RuntimeState > runtime_state_
int main(int argc, char **argv)
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
virtual RowBatch * CreateIntBatch(int start_val, int num_rows, bool gen_null)
void AppendValue(Tuple *t, vector< int > *results)
void InitCommonRuntime(int argc, char **argv, bool init_jvm, TestInfo::Mode m=TestInfo::NON_TEST)
Definition: init.cc:122
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
static const StringValue STRINGS[]
void AppendValue(Tuple *t, vector< StringValue > *results)
static Status Init()
Definition: tmp-file-mgr.cc:47
static const int NUM_STRINGS
shared_ptr< BufferedBlockMgr > block_mgr_
void TestValues(int num_batches, RowDescriptor *desc, bool gen_null)
const int BATCH_SIZE
void Reset()
Resets the row batch, returning all resources it has accumulated.
Definition: row-batch.cc:224
void AppendRowTuples(TupleRow *row, vector< StringValue > *results)
virtual void VerifyResults(const vector< int > &results, int exp_rows, bool gen_null)
static Status Create(RuntimeState *state, MemTracker *parent, RuntimeProfile *profile, int64_t mem_limit, int64_t buffer_size, boost::shared_ptr< BufferedBlockMgr > *block_mgr)
void AppendRowTuples(TupleRow *row, vector< int > *results)
This class is thread-safe.
Definition: mem-tracker.h:61
void CommitLastRow()
Definition: row-batch.h:109
MemPool * tuple_data_pool()
Definition: row-batch.h:148
uint64_t Test(T *ht, const ProbeTuple *input, uint64_t num_tuples)
virtual void VerifyResults(const vector< StringValue > &results, int exp_rows, bool gen_null)
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
Definition: descriptors.h:412
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
static void InitializeLlvm(bool load_backend=false)
Definition: llvm-codegen.cc:78
TupleDescBuilder & DeclareTuple()
scoped_ptr< MemTracker > block_mgr_parent_tracker_
virtual RowBatch * CreateStringBatch(int string_idx, int num_rows, bool gen_null)
uint8_t offset[7 *64-sizeof(uint64_t)]
void CreateMgr(int64_t limit, int block_size)
bool ok() const
Definition: status.h:172
Status GetNext(RowBatch *batch, bool *eos, std::vector< RowIdx > *indices=NULL)
uint8_t * Allocate(int size)
Definition: mem-pool.h:92