15 #include <boost/scoped_ptr.hpp>
16 #include <boost/bind.hpp>
17 #include <boost/filesystem.hpp>
19 #include <gtest/gtest.h>
34 #include "gen-cpp/Types_types.h"
35 #include "gen-cpp/ImpalaInternalService_types.h"
52 StringValue(
"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"),
71 vector<bool> nullable_tuples(1,
false);
72 vector<TTupleId> tuple_ids(1, static_cast<TTupleId>(0));
77 *int_builder.
Build(), tuple_ids, nullable_tuples));
82 *string_builder.
Build(), tuple_ids, nullable_tuples));
96 EXPECT_TRUE(status.
ok());
98 EXPECT_TRUE(status.ok());
103 int* tuple_mem =
reinterpret_cast<int*
>(
106 if (int_tuples > 1) {
107 for (
int i = 0; i < num_rows; ++i) {
110 tuple_mem[i] = i + start_val;
111 for (
int j = 0; j < int_tuples; ++j) {
112 if (!gen_null || (j % 2) == 0) {
113 row->
SetTuple(j, reinterpret_cast<Tuple*>(&tuple_mem[i]));
121 for (
int i = 0; i < num_rows; ++i) {
124 tuple_mem[i] = i + start_val;
125 if (!gen_null || (i % 2) == 0) {
126 row->
SetTuple(0, reinterpret_cast<Tuple*>(&tuple_mem[i]));
140 memset(tuple_mem, 0, tuple_size * num_rows);
142 if (string_tuples > 1) {
143 for (
int i = 0; i < num_rows; ++i) {
148 for (
int j = 0; j < string_tuples; ++j) {
149 if (!gen_null || (j % 2) == 0) {
150 row->
SetTuple(j, reinterpret_cast<Tuple*>(tuple_mem));
156 tuple_mem += tuple_size;
159 for (
int i = 0; i < num_rows; ++i) {
164 if (!gen_null || (i % 2) == 0) {
165 row->
SetTuple(0, reinterpret_cast<Tuple*>(tuple_mem));
170 tuple_mem += tuple_size;
179 for (
int i = 0; i < int_tuples; ++i) {
187 for (
int i = 0; i < string_tuples; ++i) {
195 results->push_back(std::numeric_limits<int>::max());
197 results->push_back(*reinterpret_cast<int*>(t));
205 uint8_t* mem =
reinterpret_cast<uint8_t*
>(t);
208 memcpy(copy, sv.
ptr, sv.
len);
209 sv.
ptr =
reinterpret_cast<char*
>(copy);
210 results->push_back(sv);
214 template <
typename T>
216 int num_batches = -1) {
219 int batches_read = 0;
223 EXPECT_TRUE(status.
ok());
225 for (
int i = 0; i < batch.
num_rows(); ++i) {
228 }
while (!eos && (num_batches < 0 || batches_read <= num_batches));
231 virtual void VerifyResults(
const vector<int>& results,
int exp_rows,
bool gen_null) {
233 EXPECT_EQ(results.size(), exp_rows * int_tuples);
234 if (int_tuples > 1) {
235 for (
int i = 0; i < results.size(); i += int_tuples) {
236 for (
int j = 0; j < int_tuples; ++j) {
237 if (!gen_null || (j % 2) == 0) {
238 ASSERT_EQ(results[i+j], i / int_tuples)
239 <<
" results[" << (i + j) <<
"]: " << results[i + j]
240 <<
" != " << (i / int_tuples) <<
" gen_null=" << gen_null;
242 ASSERT_TRUE(results[i+j] == std::numeric_limits<int>::max())
243 <<
"i: " << i <<
" j: " << j <<
" results[" << (i + j) <<
"]: "
244 << results[i+j] <<
" != " << std::numeric_limits<int>::max();
249 for (
int i = 0; i < results.size(); i += int_tuples) {
250 if (!gen_null || (i % 2) == 0) {
251 ASSERT_TRUE(results[i] == i)
252 <<
" results[" << (i) <<
"]: " << results[i]
253 <<
" != " << i <<
" gen_null=" << gen_null;
255 ASSERT_TRUE(results[i] == std::numeric_limits<int>::max())
256 <<
"i: " << i <<
" results[" << i <<
"]: "
257 << results[i] <<
" != " << std::numeric_limits<int>::max();
263 virtual void VerifyResults(
const vector<StringValue>& results,
int exp_rows,
266 EXPECT_EQ(results.size(), exp_rows * string_tuples);
268 if (string_tuples > 1) {
269 for (
int i = 0; i < results.size(); i += string_tuples) {
270 for (
int j = 0; j < string_tuples; ++j) {
271 if (!gen_null || (j % 2) == 0) {
272 ASSERT_TRUE(results[i+j] ==
STRINGS[idx])
273 <<
"results[" << i <<
"+" << j <<
"] " << results[i+j]
274 <<
" != " <<
STRINGS[
idx] <<
" idx=" << idx <<
" gen_null=" << gen_null;
277 <<
"results[" << i <<
"+" << j <<
"] " << results[i+j] <<
" not NULL";
283 for (
int i = 0; i < results.size(); i += string_tuples) {
284 if (!gen_null || (i % 2) == 0) {
285 ASSERT_TRUE(results[i] ==
STRINGS[idx])
286 <<
"results[" << i <<
"] " << results[i]
287 <<
" != " <<
STRINGS[
idx] <<
" idx=" << idx <<
" gen_null=" << gen_null;
290 <<
"results[" << i <<
"] " << results[i] <<
" not NULL";
298 template <
typename T>
301 Status status = stream.Init();
302 ASSERT_TRUE(status.ok()) << status.
GetDetail();
303 status = stream.UnpinStream();
304 ASSERT_TRUE(status.ok());
308 for (
int i = 0; i < num_batches; ++i) {
310 if (
sizeof(T) ==
sizeof(int)) {
317 for (
int j = 0; j < batch->
num_rows(); ++j) {
318 bool b = stream.AddRow(batch->
GetRow(j));
320 ASSERT_TRUE(stream.using_small_buffers());
322 status = stream.SwitchToIoBuffers(&got_buffer);
323 ASSERT_TRUE(status.ok());
324 ASSERT_TRUE(got_buffer);
325 b = stream.AddRow(batch->
GetRow(j));
334 status = stream.PrepareForRead();
335 ASSERT_TRUE(status.ok());
348 for (
int small_buffers = 0; small_buffers < 2; ++small_buffers) {
354 Status status = stream.Init();
355 ASSERT_TRUE(status.ok());
356 status = stream.UnpinStream();
357 ASSERT_TRUE(status.ok());
361 for (
int i = 0; i < num_batches; ++i) {
363 for (
int j = 0; j < batch->
num_rows(); ++j) {
364 bool b = stream.AddRow(batch->
GetRow(j));
369 if (i % num_batches_before_read == 0) {
371 (rand() % num_batches_before_read) + 1);
378 EXPECT_EQ(results.size(),
BATCH_SIZE * num_batches * int_tuples);
379 for (
int i = 0; i < results.size(); ++i) {
380 ASSERT_EQ(results[i], i / int_tuples);
406 vector<bool> nullable_tuples(1,
true);
407 vector<TTupleId> tuple_ids(1, static_cast<TTupleId>(0));
412 *int_builder.
Build(), tuple_ids, nullable_tuples));
417 *string_builder.
Build(), tuple_ids, nullable_tuples));
425 vector<bool> nullable_tuples;
426 nullable_tuples.push_back(
false);
427 nullable_tuples.push_back(
false);
428 nullable_tuples.push_back(
false);
430 vector<TTupleId> tuple_ids;
431 tuple_ids.push_back(static_cast<TTupleId>(0));
432 tuple_ids.push_back(static_cast<TTupleId>(1));
433 tuple_ids.push_back(static_cast<TTupleId>(2));
440 *int_builder.
Build(), tuple_ids, nullable_tuples));
447 *string_builder.
Build(), tuple_ids, nullable_tuples));
455 vector<bool> nullable_tuples;
456 nullable_tuples.push_back(
false);
457 nullable_tuples.push_back(
true);
458 nullable_tuples.push_back(
true);
460 vector<TTupleId> tuple_ids;
461 tuple_ids.push_back(static_cast<TTupleId>(0));
462 tuple_ids.push_back(static_cast<TTupleId>(1));
463 tuple_ids.push_back(static_cast<TTupleId>(2));
470 *int_builder.
Build(), tuple_ids, nullable_tuples));
477 *string_builder.
Build(), tuple_ids, nullable_tuples));
483 CreateMgr(-1, 8 * 1024 * 1024);
484 TestValues<int>(1, int_desc_,
false);
485 TestValues<int>(10, int_desc_,
false);
486 TestValues<int>(100, int_desc_,
false);
488 TestValues<StringValue>(1, string_desc_,
false);
489 TestValues<StringValue>(10, string_desc_,
false);
490 TestValues<StringValue>(100, string_desc_,
false);
492 TestIntValuesInterleaved(1, 1);
493 TestIntValuesInterleaved(10, 5);
494 TestIntValuesInterleaved(100, 15);
500 int buffer_size = 100 *
sizeof(int);
501 CreateMgr(buffer_size, buffer_size);
502 TestValues<int>(1, int_desc_,
false);
503 TestValues<int>(10, int_desc_,
false);
505 TestValues<StringValue>(1, string_desc_,
false);
506 TestValues<StringValue>(10, string_desc_,
false);
511 int buffer_size = 100 *
sizeof(int);
512 CreateMgr(10 * buffer_size, buffer_size);
514 TestValues<int>(1, int_desc_,
false);
515 TestValues<int>(10, int_desc_,
false);
516 TestValues<int>(100, int_desc_,
false);
517 TestValues<StringValue>(1, string_desc_,
false);
518 TestValues<StringValue>(10, string_desc_,
false);
519 TestValues<StringValue>(100, string_desc_,
false);
521 TestIntValuesInterleaved(1, 1);
522 TestIntValuesInterleaved(10, 5);
523 TestIntValuesInterleaved(100, 15);
527 int buffer_size = 100 *
sizeof(int);
528 CreateMgr(3 * buffer_size, buffer_size);
531 Status status = stream.Init();
532 ASSERT_TRUE(status.ok());
539 for (; j < batch->
num_rows(); ++j) {
540 full = !stream.AddRow(batch->
GetRow(j));
546 status = stream.UnpinStream();
547 ASSERT_TRUE(status.ok());
550 status = stream.PinStream(
false, &pinned);
551 ASSERT_TRUE(status.ok());
557 for (
int i = 0; i < 3; ++i) {
558 status = stream.PrepareForRead();
559 ASSERT_TRUE(status.ok());
561 ReadValues(&stream, int_desc_, &results);
562 VerifyResults(results, offset,
false);
569 int buffer_size = 8 * 1024 * 1024;
570 CreateMgr(2 * buffer_size, buffer_size);
573 Status status = stream.Init(NULL,
false);
574 ASSERT_TRUE(status.ok());
577 EXPECT_LT(stream.bytes_in_mem(
false), buffer_size);
579 RowBatch* batch = CreateIntBatch(0, 1024,
false);
580 for (
int i = 0; i < batch->num_rows(); ++i) {
581 bool ret = stream.AddRow(batch->GetRow(i));
584 EXPECT_LT(stream.bytes_in_mem(
false), buffer_size);
585 EXPECT_LT(stream.byte_size(), buffer_size);
588 batch = CreateIntBatch(0, 10 * 1024 * 1024,
false);
589 for (
int i = 0; i < batch->num_rows(); ++i) {
590 bool ret = stream.AddRow(batch->GetRow(i));
592 ASSERT_TRUE(stream.using_small_buffers());
594 status = stream.SwitchToIoBuffers(&got_buffer);
595 ASSERT_TRUE(status.ok());
596 ASSERT_TRUE(got_buffer);
597 ret = stream.AddRow(batch->GetRow(i));
601 EXPECT_EQ(stream.bytes_in_mem(
false), buffer_size);
608 CreateMgr(-1, 8 * 1024 * 1024);
609 TestValues<int>(1, int_desc_,
false);
610 TestValues<int>(10, int_desc_,
false);
611 TestValues<int>(100, int_desc_,
false);
612 TestValues<int>(1, int_desc_,
true);
613 TestValues<int>(10, int_desc_,
true);
614 TestValues<int>(100, int_desc_,
true);
616 TestValues<StringValue>(1, string_desc_,
false);
617 TestValues<StringValue>(10, string_desc_,
false);
618 TestValues<StringValue>(100, string_desc_,
false);
619 TestValues<StringValue>(1, string_desc_,
true);
620 TestValues<StringValue>(10, string_desc_,
true);
621 TestValues<StringValue>(100, string_desc_,
true);
623 TestIntValuesInterleaved(1, 1);
624 TestIntValuesInterleaved(10, 5);
625 TestIntValuesInterleaved(100, 15);
631 int buffer_size = 100 *
sizeof(int);
632 CreateMgr(buffer_size, buffer_size);
633 TestValues<int>(1, int_desc_,
false);
634 TestValues<int>(10, int_desc_,
false);
636 TestValues<StringValue>(1, string_desc_,
false);
637 TestValues<StringValue>(10, string_desc_,
false);
642 int buffer_size = 100 *
sizeof(int);
643 CreateMgr(10 * buffer_size, buffer_size);
645 TestValues<int>(1, int_desc_,
false);
646 TestValues<int>(10, int_desc_,
false);
647 TestValues<int>(100, int_desc_,
false);
649 TestValues<StringValue>(1, string_desc_,
false);
650 TestValues<StringValue>(10, string_desc_,
false);
651 TestValues<StringValue>(100, string_desc_,
false);
653 TestIntValuesInterleaved(1, 1);
654 TestIntValuesInterleaved(10, 5);
655 TestIntValuesInterleaved(100, 15);
661 int buffer_size = 100 *
sizeof(int);
662 CreateMgr(buffer_size, buffer_size);
663 TestValues<int>(1, int_desc_,
false);
664 TestValues<int>(10, int_desc_,
false);
665 TestValues<int>(1, int_desc_,
true);
666 TestValues<int>(10, int_desc_,
true);
668 TestValues<StringValue>(1, string_desc_,
false);
669 TestValues<StringValue>(10, string_desc_,
false);
670 TestValues<StringValue>(1, string_desc_,
true);
671 TestValues<StringValue>(10, string_desc_,
true);
676 int buffer_size = 100 *
sizeof(int);
677 CreateMgr(10 * buffer_size, buffer_size);
679 TestValues<int>(1, int_desc_,
false);
680 TestValues<int>(10, int_desc_,
false);
681 TestValues<int>(100, int_desc_,
false);
682 TestValues<int>(1, int_desc_,
true);
683 TestValues<int>(10, int_desc_,
true);
684 TestValues<int>(100, int_desc_,
true);
686 TestValues<StringValue>(1, string_desc_,
false);
687 TestValues<StringValue>(10, string_desc_,
false);
688 TestValues<StringValue>(100, string_desc_,
false);
689 TestValues<StringValue>(1, string_desc_,
true);
690 TestValues<StringValue>(10, string_desc_,
true);
691 TestValues<StringValue>(100, string_desc_,
true);
693 TestIntValuesInterleaved(1, 1);
694 TestIntValuesInterleaved(10, 5);
695 TestIntValuesInterleaved(100, 15);
703 int main(
int argc,
char** argv) {
704 ::testing::InitGoogleTest(&argc, argv);
709 return RUN_ALL_TESTS();
void ReadValues(BufferedTupleStream *stream, RowDescriptor *desc, vector< T > *results, int num_batches=-1)
The underlying memory management is done by the BufferedBlockMgr.
void TestIntValuesInterleaved(int num_batches, int num_batches_before_read)
const std::string GetDetail() const
TEST_F(InstructionCounterTest, Count)
Tuple * GetTuple(int tuple_idx)
virtual void CreateDescriptors()
scoped_ptr< RuntimeState > runtime_state_
int main(int argc, char **argv)
A tuple with 0 materialised slots is represented as NULL.
virtual void CreateDescriptors()
scoped_ptr< ExecEnv > exec_env_
virtual RowBatch * CreateIntBatch(int start_val, int num_rows, bool gen_null)
void AppendValue(Tuple *t, vector< int > *results)
void InitCommonRuntime(int argc, char **argv, bool init_jvm, TestInfo::Mode m=TestInfo::NON_TEST)
TupleRow * GetRow(int row_idx)
static const StringValue STRINGS[]
void AppendValue(Tuple *t, vector< StringValue > *results)
static const int NUM_STRINGS
shared_ptr< BufferedBlockMgr > block_mgr_
void TestValues(int num_batches, RowDescriptor *desc, bool gen_null)
virtual void CreateDescriptors()
void Reset()
Resets the row batch, returning all resources it has accumulated.
void AppendRowTuples(TupleRow *row, vector< StringValue > *results)
virtual void VerifyResults(const vector< int > &results, int exp_rows, bool gen_null)
static Status Create(RuntimeState *state, MemTracker *parent, RuntimeProfile *profile, int64_t mem_limit, int64_t buffer_size, boost::shared_ptr< BufferedBlockMgr > *block_mgr)
void AppendRowTuples(TupleRow *row, vector< int > *results)
RowDescriptor * int_desc_
This class is thread-safe.
BufferedBlockMgr::Client * client_
virtual void CreateDescriptors()
MemPool * tuple_data_pool()
uint64_t Test(T *ht, const ProbeTuple *input, uint64_t num_tuples)
virtual void VerifyResults(const vector< StringValue > &results, int exp_rows, bool gen_null)
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
void SetTuple(int tuple_idx, Tuple *tuple)
static void InitializeLlvm(bool load_backend=false)
TupleDescBuilder & DeclareTuple()
scoped_ptr< MemTracker > block_mgr_parent_tracker_
virtual RowBatch * CreateStringBatch(int string_idx, int num_rows, bool gen_null)
uint8_t offset[7 *64-sizeof(uint64_t)]
void CreateMgr(int64_t limit, int block_size)
RowDescriptor * string_desc_
Status GetNext(RowBatch *batch, bool *eos, std::vector< RowIdx > *indices=NULL)
scoped_ptr< MemPool > mem_pool_
uint8_t * Allocate(int size)