15 #include <boost/scoped_ptr.hpp>
16 #include <boost/bind.hpp>
17 #include <boost/thread/thread.hpp>
18 #include <boost/filesystem.hpp>
19 #include <boost/date_time/posix_time/posix_time.hpp>
21 #include <gtest/gtest.h>
38 #include "gen-cpp/Types_types.h"
39 #include "gen-cpp/ImpalaInternalService_types.h"
43 using boost::filesystem::directory_iterator;
82 EXPECT_TRUE(*reinterpret_cast<int32_t*>(block->
buffer()) == data);
87 int32_t size = (rand() % 252) + 4;
88 uint8_t* data = block->
Allocate<uint8_t>(size);
89 *(
reinterpret_cast<int32_t*
>(data)) = size;
91 for (i = 4; i < size-5; ++i) {
94 for (; i < size; ++i) {
97 return reinterpret_cast<int32_t*
>(data);
101 int32_t bsize = *(
reinterpret_cast<int32_t*
>(block->
buffer()));
102 uint8_t* data =
reinterpret_cast<uint8_t*
>(block->
buffer());
105 EXPECT_EQ(size, bsize);
106 for (i = 4; i < size-5; ++i) {
107 EXPECT_EQ(data[i], i);
109 for (; i < size; ++i) {
110 EXPECT_EQ(data[i], 0xff);
114 shared_ptr<BufferedBlockMgr>
CreateMgr(
int max_buffers) {
115 shared_ptr<BufferedBlockMgr> mgr;
119 EXPECT_TRUE(mgr != NULL);
125 int num_blocks, vector<BufferedBlockMgr::Block*>* blocks) {
129 for (
int i = 0; i < num_blocks; ++i) {
130 status = block_mgr->
GetNewBlock(client, NULL, &new_block);
131 EXPECT_TRUE(status.
ok());
132 EXPECT_TRUE(new_block != NULL);
133 data = new_block->
Allocate<int32_t>(
sizeof(int32_t));
134 *data = blocks->size();
135 blocks->push_back(new_block);
145 DCHECK_NOTNULL(block_mgr);
146 const int num_iterations = 100000;
147 const int iters_before_close = num_iterations - 5000;
148 bool close_called =
false;
149 unordered_map<BufferedBlockMgr::Block*, int> pinned_block_map;
150 vector<pair<BufferedBlockMgr::Block*, int32_t> > pinned_blocks;
151 unordered_map<BufferedBlockMgr::Block*, int> unpinned_block_map;
152 vector<pair<BufferedBlockMgr::Block*, int32_t> > unpinned_blocks;
154 typedef enum { Pin, New, Unpin, Delete, Close } ApiFunction;
155 ApiFunction api_function;
159 EXPECT_TRUE(status.
ok());
160 EXPECT_TRUE(client != NULL);
162 pinned_blocks.reserve(num_buffers);
164 for (
int i = 0; i < num_iterations; ++i) {
165 if ((i % 20000) == 0) LOG (ERROR) <<
" Iteration " << i << endl;
166 if (i > iters_before_close && (rand() % 5 == 0)) {
167 api_function = Close;
168 }
else if (pinned_blocks.size() == 0 && unpinned_blocks.size() == 0) {
170 }
else if (pinned_blocks.size() == 0) {
172 api_function =
static_cast<ApiFunction
>(rand() % 2);
173 }
else if (pinned_blocks.size() >= num_buffers) {
175 api_function =
static_cast<ApiFunction
>(2 + (rand() % 2));
176 }
else if (unpinned_blocks.size() == 0) {
178 api_function =
static_cast<ApiFunction
>(1 + (rand() % 3));
181 api_function =
static_cast<ApiFunction
>(rand() % 4);
184 pair<BufferedBlockMgr::Block*, int32_t> block_data;
186 int32_t* data = NULL;
188 switch (api_function) {
190 status = block_mgr->
GetNewBlock(client, NULL, &new_block);
192 EXPECT_TRUE(new_block == NULL);
196 EXPECT_TRUE(status.
ok());
197 EXPECT_TRUE(new_block != NULL);
199 block_data = make_pair(new_block, *data);
201 pinned_blocks.push_back(block_data);
202 pinned_block_map.insert(make_pair(block_data.first, pinned_blocks.size() - 1));
205 rand_pick = rand() % unpinned_blocks.size();
206 block_data = unpinned_blocks[rand_pick];
207 status = block_data.first->Pin(&pinned);
218 EXPECT_TRUE(status.
ok());
221 unpinned_blocks[rand_pick] = unpinned_blocks.back();
222 unpinned_blocks.pop_back();
223 unpinned_block_map[unpinned_blocks[rand_pick].first] = rand_pick;
225 pinned_blocks.push_back(block_data);
226 pinned_block_map.insert(make_pair(block_data.first, pinned_blocks.size() - 1));
229 rand_pick = rand() % pinned_blocks.size();
230 block_data = pinned_blocks[rand_pick];
231 status = block_data.first->Unpin();
236 EXPECT_TRUE(status.
ok());
237 pinned_blocks[rand_pick] = pinned_blocks.back();
238 pinned_blocks.pop_back();
239 pinned_block_map[pinned_blocks[rand_pick].first] = rand_pick;
241 unpinned_blocks.push_back(block_data);
242 unpinned_block_map.insert(make_pair(block_data.first,
243 unpinned_blocks.size() - 1));
246 rand_pick = rand() % pinned_blocks.size();
247 block_data = pinned_blocks[rand_pick];
248 status = block_data.first->Delete();
252 EXPECT_TRUE(status.
ok());
254 pinned_blocks[rand_pick] = pinned_blocks.back();
255 pinned_blocks.pop_back();
256 pinned_block_map[pinned_blocks[rand_pick].first] = rand_pick;
268 const int num_buffers = 10;
269 shared_ptr<BufferedBlockMgr> block_mgr =
CreateMgr(num_buffers);
277 DCHECK_GT(num_threads, 0);
278 const int num_buffers = 10;
279 shared_ptr<BufferedBlockMgr> block_mgr =
CreateMgr(num_buffers * num_threads);
280 thread_group workers;
281 for (
int i = 0; i < num_threads; ++i) {
283 block_mgr.get(), num_buffers, i));
284 workers.add_thread(t);
298 int max_num_blocks = 5;
299 shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_blocks);
301 Status status = block_mgr->RegisterClient(0, NULL, runtime_state_.get(), &client);
302 EXPECT_TRUE(status.
ok());
303 EXPECT_EQ(block_mgr_parent_tracker_->consumption(), 0);
309 for (
int i = 0; i < max_num_blocks; ++i) {
310 status = block_mgr->GetNewBlock(client, NULL, &new_block);
311 EXPECT_TRUE(new_block != NULL);
312 EXPECT_EQ(block_mgr->bytes_allocated(), (i + 1) * block_size_);
313 if (first_block == NULL) first_block = new_block;
317 status = block_mgr->GetNewBlock(client, NULL, &new_block);
318 EXPECT_TRUE(new_block == NULL);
319 EXPECT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size_);
322 uint8_t* old_buffer = first_block->
buffer();
323 status = block_mgr->GetNewBlock(client, first_block, &new_block);
324 EXPECT_TRUE(new_block != NULL);
325 EXPECT_TRUE(old_buffer == new_block->buffer());
326 EXPECT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size_);
330 status = block_mgr->GetNewBlock(client, NULL, &new_block);
331 EXPECT_TRUE(new_block == NULL);
332 EXPECT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size_);
334 EXPECT_EQ(block_mgr->writes_issued(), 1);;
336 EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
340 int max_num_blocks = 3;
341 shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_blocks);
344 Status status = block_mgr->RegisterClient(0, &tracker, runtime_state_.get(), &client);
345 EXPECT_TRUE(status.
ok());
346 EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
348 vector<BufferedBlockMgr::Block*> blocks;
352 status = block_mgr->GetNewBlock(client, NULL, &new_block, 128);
353 EXPECT_TRUE(new_block != NULL);
354 EXPECT_TRUE(status.
ok());
355 EXPECT_EQ(block_mgr->bytes_allocated(), 0);
356 EXPECT_EQ(block_mgr_parent_tracker_->consumption(), 0);
360 EXPECT_TRUE(new_block->
buffer() != NULL);
361 blocks.push_back(new_block);
364 status = block_mgr->GetNewBlock(client, NULL, &new_block);
365 EXPECT_TRUE(new_block != NULL);
366 EXPECT_TRUE(status.
ok());
367 EXPECT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size());
368 EXPECT_EQ(block_mgr_parent_tracker_->consumption(), block_mgr->max_block_size());
369 EXPECT_EQ(tracker.
consumption(), 128 + block_mgr->max_block_size());
371 EXPECT_EQ(new_block->
BytesRemaining(), block_mgr->max_block_size());
372 EXPECT_TRUE(new_block->
buffer() != NULL);
373 blocks.push_back(new_block);
376 status = block_mgr->GetNewBlock(client, NULL, &new_block, 512);
377 EXPECT_TRUE(new_block != NULL);
378 EXPECT_TRUE(status.
ok());
379 EXPECT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size());
380 EXPECT_EQ(block_mgr_parent_tracker_->consumption(), block_mgr->max_block_size());
381 EXPECT_EQ(tracker.
consumption(), 128 + 512 + block_mgr->max_block_size());
384 EXPECT_TRUE(new_block->
buffer() != NULL);
385 blocks.push_back(new_block);
388 status = blocks[1]->Unpin();
389 EXPECT_TRUE(status.
ok());
392 status = blocks[1]->Pin(&pinned);
393 EXPECT_TRUE(status.
ok());
396 for (
int i = 0; i < blocks.size(); ++i) {
400 EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
405 int max_num_blocks = 5;
406 shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_blocks);
408 Status status = block_mgr->RegisterClient(0, NULL, runtime_state_.get(), &client);
409 EXPECT_TRUE(status.
ok());
410 EXPECT_TRUE(client != NULL);
412 vector<BufferedBlockMgr::Block*> blocks;
413 AllocateBlocks(block_mgr.get(), client, max_num_blocks, &blocks);
416 for (
int i = 0; i < blocks.size(); ++i) {
417 status = blocks[i]->Unpin();
418 EXPECT_TRUE(status.
ok());
422 AllocateBlocks(block_mgr.get(), client, max_num_blocks, &blocks);
426 status = blocks[0]->Pin(&pinned);
427 EXPECT_TRUE(status.
ok());
428 EXPECT_FALSE(pinned);
431 for (
int i = 0; i < blocks.size(); ++i) {
432 status = blocks[i]->Unpin();
433 EXPECT_TRUE(status.
ok());
437 for (
int i = 0; i < max_num_blocks; ++i) {
438 status = blocks[i]->Pin(&pinned);
439 EXPECT_TRUE(status.
ok());
444 status = blocks[max_num_blocks]->Pin(&pinned);
445 EXPECT_TRUE(status.
ok());
446 EXPECT_FALSE(pinned);
449 EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
455 int max_num_buffers = 5;
456 shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_buffers);
458 Status status = block_mgr->RegisterClient(0, NULL, runtime_state_.get(), &client);
459 EXPECT_TRUE(status.
ok());
460 EXPECT_TRUE(client != NULL);
466 vector<BufferedBlockMgr::Block*> blocks;
467 AllocateBlocks(block_mgr.get(), client, max_num_buffers, &blocks);
469 EXPECT_EQ(block_mgr->bytes_allocated(), max_num_buffers * block_size_);
475 for (
int i = 0; i < blocks.size(); ++i) {
477 status = blocks[i]->Pin(&pinned);
478 EXPECT_TRUE(status.
ok());
480 ValidateBlock(blocks[i], i);
482 int buffered_pins_expected = blocks.size();
483 EXPECT_EQ(buffered_pin->
value(), buffered_pins_expected);
490 AllocateBlocks(block_mgr.get(), client, 2, &blocks);
492 EXPECT_GE(block_mgr->writes_issued(), 2);
493 for (
int i = 0; i < (max_num_buffers - 2); ++i) {
495 status = blocks[i]->Pin(&pinned);
496 EXPECT_TRUE(status.
ok());
498 ValidateBlock(blocks[i], i);
500 EXPECT_GE(buffered_pin->
value(), buffered_pins_expected);
503 EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
508 int max_num_buffers = 5;
509 shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_buffers);
511 Status status = block_mgr->RegisterClient(0, NULL, runtime_state_.get(), &client);
512 EXPECT_TRUE(status.
ok());
513 EXPECT_TRUE(client != NULL);
520 vector<BufferedBlockMgr::Block*> blocks;
521 AllocateBlocks(block_mgr.get(), client, max_num_buffers, &blocks);
522 EXPECT_TRUE(created_cnt->
value() == max_num_buffers);
527 AllocateBlocks(block_mgr.get(), client, max_num_buffers, &blocks);
528 EXPECT_TRUE(created_cnt->
value() == max_num_buffers);
529 EXPECT_TRUE(recycled_cnt->
value() == max_num_buffers);
532 EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
537 int max_num_buffers = 5;
538 shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_buffers);
540 Status status = block_mgr->RegisterClient(0, NULL, runtime_state_.get(), &client);
541 EXPECT_TRUE(status.
ok());
542 EXPECT_TRUE(client != NULL);
544 vector<BufferedBlockMgr::Block*> blocks;
545 AllocateBlocks(block_mgr.get(), client, max_num_buffers, &blocks);
550 status = block_mgr->GetNewBlock(client, NULL, &new_block);
551 EXPECT_TRUE(new_block == NULL);
553 status = blocks[0]->Unpin();
554 EXPECT_TRUE(status.IsCancelled());
556 status = blocks[0]->Pin(&pinned);
557 EXPECT_TRUE(status.IsCancelled());
558 status = blocks[1]->Delete();
559 EXPECT_TRUE(status.IsCancelled());
562 EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
569 int max_num_buffers = 2;
570 const int write_wait_millis = 500;
571 shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_buffers);
573 Status status = block_mgr->RegisterClient(0, NULL, runtime_state_.get(), &client);
574 EXPECT_TRUE(status.
ok());
575 EXPECT_TRUE(client != NULL);
579 profile->
GetCounter(
"BlockWritesOutstanding");
580 vector<BufferedBlockMgr::Block*> blocks;
581 AllocateBlocks(block_mgr.get(), client, max_num_buffers, &blocks);
583 status = blocks[0]->Unpin();
584 EXPECT_TRUE(status.ok());
587 EXPECT_TRUE(writes_outstanding->
value() == 0);
592 for (; dir_it != directory_iterator(); ++dir_it) {
594 remove_all(dir_it->path());
596 EXPECT_TRUE(num_files > 0);
597 status = blocks[1]->Unpin();
598 EXPECT_TRUE(status.ok());
600 AllocateBlocks(block_mgr.get(), client, 1, &blocks);
603 EXPECT_TRUE(writes_outstanding->
value() == 0);
606 status = blocks[2]->Delete();
607 EXPECT_TRUE(status.IsCancelled());
609 status = block_mgr->GetNewBlock(client, NULL, &new_block);
610 EXPECT_TRUE(new_block == NULL);
611 EXPECT_TRUE(status.IsCancelled());
613 EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
618 int client1_buffers = 3;
619 int client2_buffers = 5;
620 int max_num_buffers = client1_buffers + client2_buffers;
622 shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_buffers);
626 bool reserved =
false;
628 status = block_mgr->RegisterClient(client1_buffers, NULL, runtime_state_.get(),
630 EXPECT_TRUE(status.
ok());
631 EXPECT_TRUE(client1 != NULL);
632 status = block_mgr->RegisterClient(client2_buffers, NULL, runtime_state_.get(),
634 EXPECT_TRUE(status.
ok());
635 EXPECT_TRUE(client2 != NULL);
638 reserved = block_mgr->TryAcquireTmpReservation(client1, 1);
639 EXPECT_TRUE(reserved);
640 reserved = block_mgr->TryAcquireTmpReservation(client2, 1);
641 EXPECT_TRUE(reserved);
643 vector<BufferedBlockMgr::Block*> client1_blocks;
645 AllocateBlocks(block_mgr.get(), client1, client1_buffers, &client1_blocks);
649 status = block_mgr->GetNewBlock(client1, NULL, &block);
650 EXPECT_TRUE(status.
ok());
651 EXPECT_TRUE(block == NULL);
654 reserved = block_mgr->TryAcquireTmpReservation(client1, 1);
655 EXPECT_FALSE(reserved);
658 vector<BufferedBlockMgr::Block*> client2_blocks;
659 AllocateBlocks(block_mgr.get(), client2, client2_buffers, &client2_blocks);
662 status = block_mgr->GetNewBlock(client2, NULL, &block);
663 EXPECT_TRUE(status.ok());
664 EXPECT_TRUE(block == NULL);
667 status = client1_blocks[0]->Unpin();
668 EXPECT_TRUE(status.ok());
671 status = block_mgr->GetNewBlock(client2, NULL, &block);
672 EXPECT_TRUE(status.ok());
673 EXPECT_TRUE(block == NULL);
676 reserved = block_mgr->TryAcquireTmpReservation(client2, 1);
677 EXPECT_FALSE(reserved);
680 status = block_mgr->GetNewBlock(client1, NULL, &block);
681 EXPECT_TRUE(status.ok());
682 EXPECT_TRUE(block != NULL);
685 status = client1_blocks[1]->Unpin();
686 EXPECT_TRUE(status.ok());
687 status = client1_blocks[2]->Unpin();
688 EXPECT_TRUE(status.ok());
691 block_mgr->ClearReservations(client1);
694 reserved = block_mgr->TryAcquireTmpReservation(client2, 1);
695 EXPECT_TRUE(reserved);
699 status = client1_blocks[0]->Pin(&pinned);
700 EXPECT_TRUE(status.ok());
703 status = client1_blocks[1]->Pin(&pinned);
704 EXPECT_TRUE(status.ok());
705 EXPECT_FALSE(pinned);
708 status = block_mgr->GetNewBlock(client2, NULL, &block);
709 EXPECT_TRUE(status.ok());
710 EXPECT_TRUE(block != NULL);
713 status = block_mgr->GetNewBlock(client2, NULL, &block2);
714 EXPECT_TRUE(status.ok());
715 EXPECT_TRUE(block2 == NULL);
719 status = block->Unpin();
720 EXPECT_TRUE(status.ok());
721 status = client1_blocks[1]->Pin(&pinned);
722 EXPECT_TRUE(status.ok());
726 EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
731 int client1_buffers = 1;
732 int client2_buffers = 1;
733 int max_num_buffers = client1_buffers + client2_buffers + 2;
735 shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_buffers);
741 status = block_mgr->RegisterClient(client1_buffers, NULL, runtime_state_.get(),
743 EXPECT_TRUE(status.
ok());
744 EXPECT_TRUE(client1 != NULL);
745 status = block_mgr->RegisterClient(client2_buffers, NULL, runtime_state_.get(),
747 EXPECT_TRUE(status.
ok());
748 EXPECT_TRUE(client2 != NULL);
750 vector<BufferedBlockMgr::Block*> client1_blocks;
752 AllocateBlocks(block_mgr.get(), client1, client1_buffers, &client1_blocks);
755 vector<BufferedBlockMgr::Block*> client2_blocks;
756 AllocateBlocks(block_mgr.get(), client2, client2_buffers, &client2_blocks);
759 status = block_mgr->GetNewBlock(client1, NULL, &block);
760 EXPECT_TRUE(status.ok());
761 EXPECT_TRUE(block != NULL);
762 status = block_mgr->GetNewBlock(client2, NULL, &block);
763 EXPECT_TRUE(status.ok());
764 EXPECT_TRUE(block != NULL);
767 status = block_mgr->GetNewBlock(client1, NULL, &block);
768 EXPECT_TRUE(status.ok());
769 EXPECT_TRUE(block == NULL);
770 status = block_mgr->GetNewBlock(client2, NULL, &block);
771 EXPECT_TRUE(status.ok());
772 EXPECT_TRUE(block == NULL);
775 EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
780 int client1_buffers = 1;
781 int client2_buffers = 2;
782 int max_num_buffers = 2;
784 shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_buffers);
790 status = block_mgr->RegisterClient(client1_buffers, NULL, runtime_state_.get(),
792 EXPECT_TRUE(status.
ok());
793 EXPECT_TRUE(client1 != NULL);
794 status = block_mgr->RegisterClient(client2_buffers, NULL, runtime_state_.get(),
796 EXPECT_TRUE(status.
ok());
797 EXPECT_TRUE(client2 != NULL);
800 status = block_mgr->GetNewBlock(client1, NULL, &block);
801 EXPECT_TRUE(status.
ok());
802 EXPECT_TRUE(block != NULL);
805 status = block_mgr->GetNewBlock(client2, NULL, &block);
806 EXPECT_TRUE(status.
ok());
807 EXPECT_TRUE(block != NULL);
811 status = block_mgr->GetNewBlock(client1, NULL, &block);
812 EXPECT_TRUE(status.
ok());
813 EXPECT_TRUE(block == NULL);
817 status = block_mgr->GetNewBlock(client2, NULL, &block);
821 EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
825 FLAGS_disk_spill_encryption =
false;
826 TestRandomInternalSingle();
830 FLAGS_disk_spill_encryption =
false;
831 TestRandomInternalMulti(2);
835 FLAGS_disk_spill_encryption =
false;
836 TestRandomInternalMulti(4);
846 FLAGS_disk_spill_encryption =
true;
847 TestRandomInternalSingle();
851 FLAGS_disk_spill_encryption =
true;
852 TestRandomInternalMulti(2);
856 FLAGS_disk_spill_encryption =
true;
857 TestRandomInternalMulti(4);
868 int main(
int argc,
char** argv) {
869 ::testing::InitGoogleTest(&argc, argv);
874 return RUN_ALL_TESTS();
virtual int64_t value() const
shared_ptr< BufferedBlockMgr > CreateMgr(int max_buffers)
int64_t consumption() const
Returns the memory consumed in bytes.
TEST_F(InstructionCounterTest, Count)
void TestRandomInternalImpl(BufferedBlockMgr *block_mgr, int num_buffers, int tid)
T * Allocate(int size)
Allocates the specified number of bytes from this block.
static const int SINGLE_THREADED_TID
void InitCommonRuntime(int argc, char **argv, bool init_jvm, TestInfo::Mode m=TestInfo::NON_TEST)
static void GetFreeBlock(BufferedBlockMgr *block_mgr, BufferedBlockMgr::Client *client, BufferedBlockMgr::Block **new_block, Promise< bool > *promise)
Status GetNewBlock(Client *client, Block *unpin_block, Block **block, int64_t len=-1)
Status RegisterClient(int num_reserved_buffers, MemTracker *tracker, RuntimeState *state, Client **client)
int main(int argc, char **argv)
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
int BytesRemaining() const
Return the number of remaining bytes that can be allocated in this block.
void TestRandomInternalSingle()
int64_t valid_data_len() const
Return the number of bytes allocated in this block.
static Status Create(RuntimeState *state, MemTracker *parent, RuntimeProfile *profile, int64_t mem_limit, int64_t buffer_size, boost::shared_ptr< BufferedBlockMgr > *block_mgr)
static void ValidateRandomSizeData(BufferedBlockMgr::Block *block, int32_t size)
This class is thread-safe.
scoped_ptr< ExecEnv > exec_env_
uint64_t Test(T *ht, const ProbeTuple *input, uint64_t num_tuples)
static void InitializeLlvm(bool load_backend=false)
static int32_t * MakeRandomSizeData(BufferedBlockMgr::Block *block)
Counter * GetCounter(const std::string &name)
scoped_ptr< MemTracker > io_mgr_tracker_
static void ValidateBlock(BufferedBlockMgr::Block *block, int32_t data)
scoped_ptr< MemTracker > block_mgr_parent_tracker_
void AllocateBlocks(BufferedBlockMgr *block_mgr, BufferedBlockMgr::Client *client, int num_blocks, vector< BufferedBlockMgr::Block * > *blocks)
static const int block_size_
DECLARE_bool(disk_spill_encryption)
scoped_ptr< RuntimeState > runtime_state_
bool IsMemLimitExceeded() const
void TestRandomInternalMulti(int num_threads)