16 #include <boost/bind.hpp>
17 #include <boost/thread/thread.hpp>
20 #include <gtest/gtest.h>
33 using boost::condition_variable;
46 if (expected_status.
code() == TErrorCode::CANCELLED) {
49 EXPECT_TRUE(status.
code() == expected_status.
code());
53 scan_range->
Reset(NULL, (*written_range)->file(), (*written_range)->len(),
55 ValidateSyncRead(io_mgr, reader, scan_range, reinterpret_cast<const char*>(data),
67 EXPECT_TRUE(status.
ok());
77 FILE* file = fopen(filename,
"w");
78 EXPECT_TRUE(file != NULL);
79 fwrite(data, 1, strlen(data), file);
84 FILE* file = fopen(filename,
"w");
85 EXPECT_TRUE(file != NULL);
86 int success = fclose(file);
88 LOG(ERROR) <<
"Error closing file " << filename;
91 return truncate(filename, file_size);
96 for (
int i = 0; i < len; ++i) {
97 if (buffer[i] !=
'\0') {
98 EXPECT_EQ(expected[i], buffer[i]) << (int)expected[i] <<
" != " << (
int)buffer[i];
106 Status status = io_mgr->
Read(reader, range, &buffer);
107 ASSERT_TRUE(status.
ok());
108 ASSERT_TRUE(buffer != NULL);
109 EXPECT_EQ(buffer->
len(), range->
len());
110 if (expected_len < 0) expected_len = strlen(expected);
111 int cmp = memcmp(buffer->
buffer(), expected, expected_len);
112 EXPECT_TRUE(cmp == 0);
117 int expected_len,
const Status& expected_status) {
118 char result[expected_len + 1];
119 memset(result, 0, expected_len + 1);
124 ASSERT_TRUE(status.
ok() || status.
code() == expected_status.
code());
125 if (buffer == NULL || !status.
ok()) {
126 if (buffer != NULL) buffer->
Return();
139 const char* expected_result,
int expected_len,
const Status& expected_status,
142 while (max_ranges == 0 || num_ranges < max_ranges) {
145 ASSERT_TRUE(status.
ok() || status.
code() == expected_status.
code());
146 if (range == NULL)
break;
148 ++(*num_ranges_processed);
154 int len,
int disk_id, int64_t mtime,
void* meta_data = NULL,
bool is_cached =
false) {
156 range->
Reset(NULL, file_path, len, offset, disk_id, is_cached,
true, mtime, meta_data);
157 EXPECT_EQ(mtime, range->
mtime());
174 num_ranges_written_ = 0;
175 string tmp_file =
"/tmp/disk_io_mgr_test.txt";
176 int num_ranges = 100;
177 int64_t file_size = 1024 * 1024;
178 int64_t cur_offset = 0;
181 LOG(ERROR) <<
"Error creating temp file " << tmp_file.c_str() <<
" of size " <<
186 scoped_ptr<DiskIoMgr> read_io_mgr(
new DiskIoMgr(1, 1, 1, 10));
188 Status status = read_io_mgr->Init(&reader_mem_tracker);
189 ASSERT_TRUE(status.
ok());
191 status = read_io_mgr->RegisterContext(&reader, &reader_mem_tracker);
192 ASSERT_TRUE(status.
ok());
193 for (
int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
194 for (
int num_disks = 1; num_disks <= 5; num_disks += 2) {
196 DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10);
197 status = io_mgr.
Init(&mem_tracker);
198 ASSERT_TRUE(status.
ok());
201 for (
int i = 0; i < num_ranges; ++i) {
202 int32_t* data = pool_->Add(
new int32_t);
207 new_range, read_io_mgr.get(), reader, data,
Status::OK, _1);
209 num_ranges % num_disks, callback));
210 (*new_range)->
SetData(reinterpret_cast<uint8_t*>(data),
sizeof(int32_t));
212 EXPECT_TRUE(add_status.
ok());
213 cur_offset +=
sizeof(int32_t);
217 unique_lock<mutex> lock(written_mutex_);
218 while (num_ranges_written_ < num_ranges) writes_done_.wait(lock);
220 num_ranges_written_ = 0;
225 read_io_mgr->UnregisterContext(reader);
232 num_ranges_written_ = 0;
233 string tmp_file =
"/tmp/non-existent.txt";
236 ASSERT_TRUE(status.
ok());
240 int32_t* data = pool_->Add(
new int32_t);
248 data,
Status(TErrorCode::RUNTIME_ERROR,
"Test Failure"), _1);
251 (*new_range)->
SetData(reinterpret_cast<uint8_t*>(data),
sizeof(int32_t));
253 EXPECT_TRUE(status.
ok());
256 tmp_file =
"/tmp/disk_io_mgr_test.txt";
259 LOG(ERROR) <<
"Error creating temp file " << tmp_file.c_str() <<
" of size 100";
266 data,
Status(TErrorCode::RUNTIME_ERROR,
"Test Failure"), _1);
269 (*new_range)->
SetData(reinterpret_cast<uint8_t*>(data),
sizeof(int32_t));
271 EXPECT_TRUE(status.
ok());
274 unique_lock<mutex> lock(written_mutex_);
275 while (num_ranges_written_ < 2) writes_done_.wait(lock);
277 num_ranges_written_ = 0;
286 num_ranges_written_ = 0;
287 string tmp_file =
"/tmp/disk_io_mgr_test.txt";
288 int num_ranges = 100;
289 int num_ranges_before_cancel = 25;
290 int64_t file_size = 1024 * 1024;
291 int64_t cur_offset = 0;
294 LOG(ERROR) <<
"Error creating temp file " << tmp_file.c_str() <<
" of size " <<
299 scoped_ptr<DiskIoMgr> read_io_mgr(
new DiskIoMgr(1, 1, 1, 10));
301 Status status = read_io_mgr->Init(&reader_mem_tracker);
302 ASSERT_TRUE(status.
ok());
304 status = read_io_mgr->RegisterContext(&reader, &reader_mem_tracker);
305 ASSERT_TRUE(status.
ok());
306 for (
int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
307 for (
int num_disks = 1; num_disks <= 5; num_disks += 2) {
309 DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10);
310 status = io_mgr.
Init(&mem_tracker);
314 for (
int i = 0; i < num_ranges; ++i) {
315 if (i == num_ranges_before_cancel) {
319 int32_t* data = pool_->Add(
new int32_t);
324 num_ranges_before_cancel, new_range, read_io_mgr.get(), reader, data,
327 num_ranges % num_disks, callback));
328 (*new_range)->
SetData(reinterpret_cast<uint8_t*>(data),
sizeof(int32_t));
329 cur_offset +=
sizeof(int32_t);
331 EXPECT_TRUE(add_status.
code() == validate_status.
code());
335 unique_lock<mutex> lock(written_mutex_);
336 while (num_ranges_written_ < num_ranges_before_cancel) writes_done_.wait(lock);
338 num_ranges_written_ = 0;
343 read_io_mgr->UnregisterContext(reader);
351 const char* tmp_file =
"/tmp/disk_io_mgr_test.txt";
352 const char* data =
"abcdefghijklm";
353 int len = strlen(data);
357 struct stat stat_val;
358 stat(tmp_file, &stat_val);
361 for (
int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
362 for (
int num_disks = 1; num_disks <= 5; num_disks += 2) {
363 for (
int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
364 for (
int num_read_threads = 1; num_read_threads <= 5; ++num_read_threads) {
366 LOG(INFO) <<
"Starting test with num_threads_per_disk=" << num_threads_per_disk
367 <<
" num_disk=" << num_disks <<
" num_buffers=" << num_buffers
368 <<
" num_read_threads=" << num_read_threads;
370 if (++iters % 5000 == 0) LOG(ERROR) <<
"Starting iteration " << iters;
371 DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 1);
374 ASSERT_TRUE(status.
ok());
378 ASSERT_TRUE(status.
ok());
380 vector<DiskIoMgr::ScanRange*> ranges;
381 for (
int i = 0; i < len; ++i) {
382 int disk_id = i % num_disks;
383 ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id,
387 ASSERT_TRUE(status.
ok());
390 thread_group threads;
391 for (
int i = 0; i < num_read_threads; ++i) {
392 threads.add_thread(
new thread(ScanRangeThread, &io_mgr, reader, data,
397 EXPECT_EQ(num_ranges_processed, ranges.size());
399 EXPECT_EQ(reader_mem_tracker.consumption(), 0);
410 const char* tmp_file =
"/tmp/disk_io_mgr_test.txt";
411 const char* data =
"abcdefghijklm";
412 int len = strlen(data);
416 struct stat stat_val;
417 stat(tmp_file, &stat_val);
420 for (
int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
421 for (
int num_disks = 1; num_disks <= 5; num_disks += 2) {
422 for (
int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
424 LOG(INFO) <<
"Starting test with num_threads_per_disk=" << num_threads_per_disk
425 <<
" num_disk=" << num_disks <<
" num_buffers=" << num_buffers;
427 if (++iters % 5000 == 0) LOG(ERROR) <<
"Starting iteration " << iters;
428 DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 1);
431 ASSERT_TRUE(status.
ok());
435 ASSERT_TRUE(status.
ok());
437 vector<DiskIoMgr::ScanRange*> ranges_first_half;
438 vector<DiskIoMgr::ScanRange*> ranges_second_half;
439 for (
int i = 0; i < len; ++i) {
440 int disk_id = i % num_disks;
442 ranges_second_half.push_back(
443 InitRange(num_buffers, tmp_file, i, 1, disk_id,
446 ranges_first_half.push_back(InitRange(num_buffers, tmp_file, i, 1, disk_id,
454 ASSERT_TRUE(status.
ok());
457 ScanRangeThread(&io_mgr, reader, data, strlen(data),
Status::OK, 2,
458 &num_ranges_processed);
462 ASSERT_TRUE(status.
ok());
465 thread_group threads;
466 for (
int i = 0; i < 3; ++i) {
467 threads.add_thread(
new thread(ScanRangeThread, &io_mgr, reader, data,
472 EXPECT_EQ(num_ranges_processed, len);
474 EXPECT_EQ(reader_mem_tracker.consumption(), 0);
486 const char* tmp_file =
"/tmp/disk_io_mgr_test.txt";
487 const char* data =
"abcdefghijklm";
488 int len = strlen(data);
492 struct stat stat_val;
493 stat(tmp_file, &stat_val);
496 for (
int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
497 for (
int num_disks = 1; num_disks <= 5; num_disks += 2) {
498 for (
int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
500 LOG(INFO) <<
"Starting test with num_threads_per_disk=" << num_threads_per_disk
501 <<
" num_disk=" << num_disks <<
" num_buffers=" << num_buffers;
503 if (++iters % 5000 == 0) LOG(ERROR) <<
"Starting iteration " << iters;
508 ASSERT_TRUE(status.
ok());
512 ASSERT_TRUE(status.
ok());
518 ValidateSyncRead(&io_mgr, reader, complete_range, data);
519 ValidateSyncRead(&io_mgr, reader, complete_range, data);
521 vector<DiskIoMgr::ScanRange*> ranges;
522 for (
int i = 0; i < len; ++i) {
523 int disk_id = i % num_disks;
524 ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id,
528 ASSERT_TRUE(status.
ok());
531 thread_group threads;
532 for (
int i = 0; i < 5; ++i) {
533 threads.add_thread(
new thread(ScanRangeThread, &io_mgr, reader, data,
534 strlen(data),
Status::OK, 0, &num_ranges_processed));
538 for (
int i = 0; i < 5; ++i) {
540 ValidateSyncRead(&io_mgr, reader, complete_range, data);
545 ValidateSyncRead(&io_mgr, reader, complete_range, data);
546 ValidateSyncRead(&io_mgr, reader, complete_range, data);
548 EXPECT_EQ(num_ranges_processed, ranges.size());
550 EXPECT_EQ(reader_mem_tracker.consumption(), 0);
560 const char* tmp_file =
"/tmp/disk_io_mgr_test.txt";
561 const char* data =
"abcdefghijklm";
562 int len = strlen(data);
566 struct stat stat_val;
567 stat(tmp_file, &stat_val);
570 for (
int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
571 for (
int num_disks = 1; num_disks <= 5; num_disks += 2) {
572 for (
int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
574 LOG(INFO) <<
"Starting test with num_threads_per_disk=" << num_threads_per_disk
575 <<
" num_disk=" << num_disks <<
" num_buffers=" << num_buffers;
577 if (++iters % 5000 == 0) LOG(ERROR) <<
"Starting iteration " << iters;
578 DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 1);
581 ASSERT_TRUE(status.
ok());
585 ASSERT_TRUE(status.
ok());
587 vector<DiskIoMgr::ScanRange*> ranges;
588 for (
int i = 0; i < len; ++i) {
589 int disk_id = i % num_disks;
590 ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id,
594 ASSERT_TRUE(status.
ok());
597 int num_succesful_ranges = ranges.size() / 2;
599 for (
int i = 0; i < num_succesful_ranges; ++i) {
600 ScanRangeThread(&io_mgr, reader, data, strlen(data),
Status::OK, 1,
601 &num_ranges_processed);
603 EXPECT_EQ(num_ranges_processed, num_succesful_ranges);
606 thread_group threads;
607 for (
int i = 0; i < 3; ++i) {
608 threads.add_thread(
new thread(ScanRangeThread, &io_mgr, reader, data,
618 EXPECT_EQ(reader_mem_tracker.consumption(), 0);
627 const char* tmp_file =
"/tmp/disk_io_mgr_test.txt";
628 const char* data =
"abcdefghijklm";
629 int len = strlen(data);
633 struct stat stat_val;
634 stat(tmp_file, &stat_val);
636 const int num_buffers = 25;
638 const int mem_limit_num_buffers = 2;
643 if (++iters % 1000 == 0) LOG(ERROR) <<
"Starting iteration " << iters;
649 ASSERT_TRUE(status.
ok());
653 ASSERT_TRUE(status.
ok());
655 vector<DiskIoMgr::ScanRange*> ranges;
656 for (
int i = 0; i < num_buffers; ++i) {
657 ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, 0,
661 ASSERT_TRUE(status.
ok());
664 vector<DiskIoMgr::BufferDescriptor*> buffers;
668 1, &num_ranges_processed);
670 char result[strlen(data) + 1];
674 memset(result, 0, strlen(data) + 1);
678 if (range == NULL)
break;
684 if (buffer == NULL)
break;
687 buffers.push_back(buffer);
689 ValidateEmptyOrCorrect(data, result, strlen(data));
692 for (
int i = 0; i < buffers.size(); ++i) {
693 buffers[i]->Return();
698 EXPECT_EQ(reader_mem_tracker.consumption(), 0);
708 const char* tmp_file =
"/tmp/disk_io_mgr_test.txt";
709 const char* data =
"abcdefghijklm";
710 int len = strlen(data);
714 struct stat stat_val;
715 stat(tmp_file, &stat_val);
717 const int num_disks = 2;
718 const int num_buffers = 3;
723 if (++iters % 5000 == 0) LOG(ERROR) <<
"Starting iteration " << iters;
727 ASSERT_TRUE(status.
ok());
731 ASSERT_TRUE(status.
ok());
734 InitRange(1, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, NULL,
true);
737 ValidateSyncRead(&io_mgr, reader, complete_range, data);
738 ValidateSyncRead(&io_mgr, reader, complete_range, data);
740 vector<DiskIoMgr::ScanRange*> ranges;
741 for (
int i = 0; i < len; ++i) {
742 int disk_id = i % num_disks;
743 ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id,
744 stat_val.st_mtime, NULL,
true));
747 ASSERT_TRUE(status.
ok());
750 thread_group threads;
751 for (
int i = 0; i < 5; ++i) {
752 threads.add_thread(
new thread(ScanRangeThread, &io_mgr, reader, data,
753 strlen(data),
Status::OK, 0, &num_ranges_processed));
757 for (
int i = 0; i < 5; ++i) {
759 ValidateSyncRead(&io_mgr, reader, complete_range, data);
764 ValidateSyncRead(&io_mgr, reader, complete_range, data);
765 ValidateSyncRead(&io_mgr, reader, complete_range, data);
767 EXPECT_EQ(num_ranges_processed, ranges.size());
769 EXPECT_EQ(reader_mem_tracker.consumption(), 0);
777 const char* data =
"abcdefghijklmnopqrstuvwxyz";
778 const int num_contexts = 5;
779 const int file_size = 4 * 1024;
780 const int num_writes_queued = 5;
781 const int num_reads_queued = 5;
783 string file_name =
"/tmp/disk_io_mgr_test.txt";
786 LOG(ERROR) <<
"Error creating temp file " << file_name.c_str() <<
" of size " <<
792 struct stat stat_val;
793 stat(file_name.c_str(), &stat_val);
796 vector<DiskIoMgr::RequestContext*> contexts(num_contexts);
798 for (
int iteration = 0; iteration <
ITERATIONS; ++iteration) {
799 for (
int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
800 for (
int num_disks = 1; num_disks <= 5; num_disks += 2) {
802 io_mgr.
Init(&mem_tracker);
803 for (
int file_index = 0; file_index < num_contexts; ++file_index) {
805 ASSERT_TRUE(status.
ok());
809 int write_offset = 0;
810 while (read_offset < file_size) {
811 for (
int context_index = 0; context_index < num_contexts; ++context_index) {
812 if (++iters % 5000 == 0) LOG(ERROR) <<
"Starting iteration " << iters;
814 thread_group threads;
815 vector<DiskIoMgr::ScanRange*> ranges;
816 int num_scan_ranges = min<int>(num_reads_queued, write_offset - read_offset);
817 for (
int i = 0; i < num_scan_ranges; ++i) {
818 ranges.push_back(InitRange(1, file_name.c_str(), read_offset, 1,
819 i % num_disks, stat_val.st_mtime));
820 threads.add_thread(
new thread(ScanRangeThread, &io_mgr,
821 contexts[context_index],
822 reinterpret_cast<const char*>(data + (read_offset % strlen(data))), 1,
823 Status::OK, num_scan_ranges, &num_ranges_processed));
827 num_ranges_written_ = 0;
828 int num_write_ranges = min<int>(num_writes_queued, file_size - write_offset);
829 for (
int i = 0; i < num_write_ranges; ++i) {
832 this, num_write_ranges, _1);
835 write_offset, i % num_disks, callback));
836 new_range->
SetData(reinterpret_cast<const uint8_t*>
837 (data + (write_offset % strlen(data))), 1);
838 status = io_mgr.
AddWriteRange(contexts[context_index], new_range);
843 unique_lock<mutex> lock(written_mutex_);
844 while (num_ranges_written_ < num_write_ranges) writes_done_.wait(lock);
852 for (
int file_index = 0; file_index < num_contexts; ++file_index) {
863 const int NUM_READERS = 5;
864 const int DATA_LEN = 50;
866 const int NUM_THREADS_PER_READER = 3;
868 vector<string> file_names;
869 vector<int64_t> mtimes;
871 vector<DiskIoMgr::RequestContext*> readers;
872 vector<char*> results;
874 file_names.resize(NUM_READERS);
875 readers.resize(NUM_READERS);
876 mtimes.resize(NUM_READERS);
877 data.resize(NUM_READERS);
878 results.resize(NUM_READERS);
882 for (
int i = 0; i < NUM_READERS; ++i) {
884 for (
int j = 0; j < DATA_LEN; ++j) {
885 int c = (j + i) % 26;
888 data[i] = string(buf, DATA_LEN);
891 ss <<
"/tmp/disk_io_mgr_test" << i <<
".txt";
892 file_names[i] = ss.str();
896 struct stat stat_val;
897 stat(file_names[i].c_str(), &stat_val);
898 mtimes[i] = stat_val.st_mtime;
900 results[i] =
new char[DATA_LEN + 1];
901 memset(results[i], 0, DATA_LEN + 1);
906 for (
int iteration = 0; iteration <
ITERATIONS; ++iteration) {
907 for (
int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
908 for (
int num_disks = 1; num_disks <= 5; num_disks += 2) {
909 for (
int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
911 LOG(INFO) <<
"Starting test with num_threads_per_disk=" << threads_per_disk
912 <<
" num_disk=" << num_disks <<
" num_buffers=" << num_buffers;
913 if (++iters % 2500 == 0) LOG(ERROR) <<
"Starting iteration " << iters;
917 ASSERT_TRUE(status.
ok());
919 for (
int i = 0; i < NUM_READERS; ++i) {
921 ASSERT_TRUE(status.
ok());
923 vector<DiskIoMgr::ScanRange*> ranges;
924 for (
int j = 0; j < DATA_LEN; ++j) {
925 int disk_id = j % num_disks;
927 InitRange(num_buffers,file_names[i].c_str(), j, 1, disk_id,
931 ASSERT_TRUE(status.
ok());
935 thread_group threads;
936 for (
int i = 0; i < NUM_READERS; ++i) {
937 for (
int j = 0; j < NUM_THREADS_PER_READER; ++j) {
938 threads.add_thread(
new thread(ScanRangeThread, &io_mgr, readers[i],
939 data[i].c_str(), data[i].size(),
Status::OK, 0,
940 &num_ranges_processed));
944 EXPECT_EQ(num_ranges_processed, DATA_LEN * NUM_READERS);
945 for (
int i = 0; i < NUM_READERS; ++i) {
966 int min_buffer_size = 1024;
967 int max_buffer_size = 8 * 1024 * 1024;
970 DiskIoMgr io_mgr(1, 1, min_buffer_size, max_buffer_size);
972 ASSERT_TRUE(status.
ok());
976 int64_t buffer_len = 1;
978 EXPECT_EQ(buffer_len, min_buffer_size);
981 EXPECT_EQ(mem_tracker.
consumption(), min_buffer_size);
984 buffer_len = min_buffer_size;
986 EXPECT_EQ(buffer_len, min_buffer_size);
989 EXPECT_EQ(mem_tracker.
consumption(), min_buffer_size);
992 buffer_len = min_buffer_size + 1;
994 EXPECT_EQ(buffer_len, min_buffer_size * 2);
996 EXPECT_EQ(mem_tracker.
consumption(), min_buffer_size * 3);
1001 EXPECT_EQ(mem_tracker.
consumption(), min_buffer_size * 2);
1006 buffer_len = max_buffer_size;
1008 EXPECT_EQ(buffer_len, max_buffer_size);
1011 EXPECT_EQ(mem_tracker.
consumption(), min_buffer_size * 2 + max_buffer_size);
1022 google::InitGoogleLogging(argv[0]);
1023 ::testing::InitGoogleTest(&argc, argv);
1027 return RUN_ALL_TESTS();
static void ValidateEmptyOrCorrect(const char *expected, const char *buffer, int len)
condition_variable writes_done_
void CancelContext(RequestContext *context, bool wait_for_disks_completion=false)
int64_t consumption() const
Returns the memory consumed in bytes.
TEST_F(InstructionCounterTest, Count)
const int MIN_BUFFER_SIZE
boost::function< void(const Status &)> WriteDoneCallback
int main(int argc, char **argv)
AtomicInt< int > num_allocated_buffers_
Total number of allocated buffers, used for debugging.
static void ScanRangeThread(DiskIoMgr *io_mgr, DiskIoMgr::RequestContext *reader, const char *expected_result, int expected_len, const Status &expected_status, int max_ranges, AtomicInt< int > *num_ranges_processed)
DiskIoMgr::ScanRange * InitRange(int num_buffers, const char *file_path, int offset, int len, int disk_id, int64_t mtime, void *meta_data=NULL, bool is_cached=false)
const int MAX_BUFFER_SIZE
Status AddWriteRange(RequestContext *writer, WriteRange *write_range)
int CreateTempFile(const char *filename, int file_size)
static void Init()
Initialize DiskInfo. Just be called before any other functions.
void UnregisterContext(RequestContext *context)
Status AddScanRanges(RequestContext *reader, const std::vector< ScanRange * > &ranges, bool schedule_immediately=false)
Status GetNextRange(RequestContext *reader, ScanRange **range)
static void CreateTempFile(const char *filename, const char *data)
int64_t scan_range_offset() const
Returns the offset within the scan range that this buffer starts at.
const int LARGE_MEM_LIMIT
Status Read(RequestContext *reader, ScanRange *range, BufferDescriptor **buffer)
void SetData(const uint8_t *buffer, int64_t len)
void WriteCompleteCallback(int num_writes, const Status &status)
TErrorCode::type code() const
void InitThreading()
Initialises the threading subsystem. Must be called before a Thread is created.
void ReturnFreeBuffer(char *buffer, int64_t buffer_size)
void CreateTempFile(const char *filename, const char *data)
void Reset(hdfsFS fs, const char *file, int64_t len, int64_t offset, int disk_id, bool try_cache, bool expected_local, int64_t mtime, void *metadata=NULL)
This class is thread-safe.
scoped_ptr< ObjectPool > pool_
static const Status CANCELLED
static const int64_t NEVER_CACHE
If the mtime is set to NEVER_CACHE, the file handle should never be cached.
static void ValidateSyncRead(DiskIoMgr *io_mgr, DiskIoMgr::RequestContext *reader, DiskIoMgr::ScanRange *range, const char *expected, int expected_len=-1)
Status Init(MemTracker *process_mem_tracker)
Initialize the IoMgr. Must be called once before any of the other APIs.
void WriteValidateCallback(int num_writes, DiskIoMgr::WriteRange **written_range, DiskIoMgr *io_mgr, DiskIoMgr::RequestContext *reader, int32_t *data, Status expected_status, const Status &status)
uint64_t Test(T *ht, const ProbeTuple *input, uint64_t num_tuples)
static const Status MEM_LIMIT_EXCEEDED
Status RegisterContext(RequestContext **request_context, MemTracker *reader_mem_tracker=NULL)
char * GetFreeBuffer(int64_t *buffer_size)
Status GetNext(BufferDescriptor **buffer)
uint8_t offset[7 *64-sizeof(uint64_t)]
static void Init()
Initialize CpuInfo.
void Run(int sec)
Run the test for 'sec'. If 0, run forever.
static void ValidateScanRange(DiskIoMgr::ScanRange *range, const char *expected, int expected_len, const Status &expected_status)
bool IsMemLimitExceeded() const
Status context_status(RequestContext *context) const