Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
disk-io-mgr-test.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <sched.h>
16 #include <boost/bind.hpp>
17 #include <boost/thread/thread.hpp>
18 #include <sys/stat.h>
19 
20 #include <gtest/gtest.h>
21 
22 #include "codegen/llvm-codegen.h"
23 #include "runtime/disk-io-mgr.h"
25 #include "runtime/mem-tracker.h"
27 #include "util/cpu-info.h"
28 #include "util/disk-info.h"
29 #include "util/thread.h"
30 
31 #include "common/names.h"
32 
33 using boost::condition_variable;
34 
35 const int MIN_BUFFER_SIZE = 512;
36 const int MAX_BUFFER_SIZE = 1024;
37 const int LARGE_MEM_LIMIT = 1024 * 1024 * 1024;
38 
39 namespace impala {
40 
41 class DiskIoMgrTest : public testing::Test {
42  public:
43  void WriteValidateCallback(int num_writes, DiskIoMgr::WriteRange** written_range,
44  DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader, int32_t* data,
45  Status expected_status, const Status& status) {
46  if (expected_status.code() == TErrorCode::CANCELLED) {
47  EXPECT_TRUE(status.ok() || status.IsCancelled());
48  } else {
49  EXPECT_TRUE(status.code() == expected_status.code());
50  }
51  if (status.ok()) {
52  DiskIoMgr::ScanRange* scan_range = pool_->Add(new DiskIoMgr::ScanRange());
53  scan_range->Reset(NULL, (*written_range)->file(), (*written_range)->len(),
54  (*written_range)->offset(), 0, false, false, DiskIoMgr::ScanRange::NEVER_CACHE);
55  ValidateSyncRead(io_mgr, reader, scan_range, reinterpret_cast<const char*>(data),
56  sizeof(int32_t));
57  }
58 
59  {
60  lock_guard<mutex> l(written_mutex_);
62  if (num_ranges_written_ == num_writes) writes_done_.notify_one();
63  }
64  }
65 
66  void WriteCompleteCallback(int num_writes, const Status& status) {
67  EXPECT_TRUE(status.ok());
68  {
69  lock_guard<mutex> l(written_mutex_);
71  if (num_ranges_written_ == num_writes) writes_done_.notify_all();
72  }
73  }
74 
75  protected:
76  void CreateTempFile(const char* filename, const char* data) {
77  FILE* file = fopen(filename, "w");
78  EXPECT_TRUE(file != NULL);
79  fwrite(data, 1, strlen(data), file);
80  fclose(file);
81  }
82 
83  int CreateTempFile(const char* filename, int file_size) {
84  FILE* file = fopen(filename, "w");
85  EXPECT_TRUE(file != NULL);
86  int success = fclose(file);
87  if (success != 0) {
88  LOG(ERROR) << "Error closing file " << filename;
89  return success;
90  }
91  return truncate(filename, file_size);
92  }
93 
94  // Validates that buffer[i] is \0 or expected[i]
95  static void ValidateEmptyOrCorrect(const char* expected, const char* buffer, int len) {
96  for (int i = 0; i < len; ++i) {
97  if (buffer[i] != '\0') {
98  EXPECT_EQ(expected[i], buffer[i]) << (int)expected[i] << " != " << (int)buffer[i];
99  }
100  }
101  }
102 
104  DiskIoMgr::ScanRange* range, const char* expected, int expected_len = -1) {
106  Status status = io_mgr->Read(reader, range, &buffer);
107  ASSERT_TRUE(status.ok());
108  ASSERT_TRUE(buffer != NULL);
109  EXPECT_EQ(buffer->len(), range->len());
110  if (expected_len < 0) expected_len = strlen(expected);
111  int cmp = memcmp(buffer->buffer(), expected, expected_len);
112  EXPECT_TRUE(cmp == 0);
113  buffer->Return();
114  }
115 
116  static void ValidateScanRange(DiskIoMgr::ScanRange* range, const char* expected,
117  int expected_len, const Status& expected_status) {
118  char result[expected_len + 1];
119  memset(result, 0, expected_len + 1);
120 
121  while (true) {
122  DiskIoMgr::BufferDescriptor* buffer = NULL;
123  Status status = range->GetNext(&buffer);
124  ASSERT_TRUE(status.ok() || status.code() == expected_status.code());
125  if (buffer == NULL || !status.ok()) {
126  if (buffer != NULL) buffer->Return();
127  break;
128  }
129  memcpy(result + range->offset() + buffer->scan_range_offset(),
130  buffer->buffer(), buffer->len());
131  buffer->Return();
132  }
133  ValidateEmptyOrCorrect(expected, result, expected_len);
134  }
135 
136  // Continues pulling scan ranges from the io mgr until they are all done.
137  // Updates num_ranges_processed with the number of ranges seen by this thread.
139  const char* expected_result, int expected_len, const Status& expected_status,
140  int max_ranges, AtomicInt<int>* num_ranges_processed) {
141  int num_ranges = 0;
142  while (max_ranges == 0 || num_ranges < max_ranges) {
143  DiskIoMgr::ScanRange* range;
144  Status status = io_mgr->GetNextRange(reader, &range);
145  ASSERT_TRUE(status.ok() || status.code() == expected_status.code());
146  if (range == NULL) break;
147  ValidateScanRange(range, expected_result, expected_len, expected_status);
148  ++(*num_ranges_processed);
149  ++num_ranges;
150  }
151  }
152 
153  DiskIoMgr::ScanRange* InitRange(int num_buffers, const char* file_path, int offset,
154  int len, int disk_id, int64_t mtime, void* meta_data = NULL, bool is_cached = false) {
155  DiskIoMgr::ScanRange* range = pool_->Add(new DiskIoMgr::ScanRange(num_buffers));
156  range->Reset(NULL, file_path, len, offset, disk_id, is_cached, true, mtime, meta_data);
157  EXPECT_EQ(mtime, range->mtime());
158  return range;
159  }
160 
161  scoped_ptr<ObjectPool> pool_;
162 
164  condition_variable writes_done_;
166 };
167 
168 // Test a single writer with multiple disks and threads per disk. Each WriteRange
169 // writes random 4-byte integers, and upon completion, the written data is validated
170 // by reading the data back via a separate IoMgr instance. All writes are expected to
171 // complete successfully.
172 TEST_F(DiskIoMgrTest, SingleWriter) {
173  MemTracker mem_tracker(LARGE_MEM_LIMIT);
174  num_ranges_written_ = 0;
175  string tmp_file = "/tmp/disk_io_mgr_test.txt";
176  int num_ranges = 100;
177  int64_t file_size = 1024 * 1024;
178  int64_t cur_offset = 0;
179  int success = CreateTempFile(tmp_file.c_str(), file_size);
180  if (success != 0) {
181  LOG(ERROR) << "Error creating temp file " << tmp_file.c_str() << " of size " <<
182  file_size;
183  EXPECT_TRUE(false);
184  }
185 
186  scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 10));
187  MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
188  Status status = read_io_mgr->Init(&reader_mem_tracker);
189  ASSERT_TRUE(status.ok());
191  status = read_io_mgr->RegisterContext(&reader, &reader_mem_tracker);
192  ASSERT_TRUE(status.ok());
193  for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
194  for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
195  pool_.reset(new ObjectPool);
196  DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10);
197  status = io_mgr.Init(&mem_tracker);
198  ASSERT_TRUE(status.ok());
200  io_mgr.RegisterContext(&writer, &mem_tracker);
201  for (int i = 0; i < num_ranges; ++i) {
202  int32_t* data = pool_->Add(new int32_t);
203  *data = rand();
204  DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*);
206  bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges,
207  new_range, read_io_mgr.get(), reader, data, Status::OK, _1);
208  *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset,
209  num_ranges % num_disks, callback));
210  (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
211  Status add_status = io_mgr.AddWriteRange(writer, *new_range);
212  EXPECT_TRUE(add_status.ok());
213  cur_offset += sizeof(int32_t);
214  }
215 
216  {
217  unique_lock<mutex> lock(written_mutex_);
218  while (num_ranges_written_ < num_ranges) writes_done_.wait(lock);
219  }
220  num_ranges_written_ = 0;
221  io_mgr.UnregisterContext(writer);
222  }
223  }
224 
225  read_io_mgr->UnregisterContext(reader);
226  read_io_mgr.reset();
227 }
228 // Perform invalid writes (e.g. non-existent file, negative offset) and validate
229 // that an error status is returned via the write callback.
230 TEST_F(DiskIoMgrTest, InvalidWrite) {
231  MemTracker mem_tracker(LARGE_MEM_LIMIT);
232  num_ranges_written_ = 0;
233  string tmp_file = "/tmp/non-existent.txt";
234  DiskIoMgr io_mgr(1, 1, 1, 10);
235  Status status = io_mgr.Init(&mem_tracker);
236  ASSERT_TRUE(status.ok());
238  status = io_mgr.RegisterContext(&writer);
239  pool_.reset(new ObjectPool);
240  int32_t* data = pool_->Add(new int32_t);
241  *data = rand();
242 
243  // Write to a non-existent file.
244  DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*);
246  bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
247  new_range, (DiskIoMgr*)NULL, (DiskIoMgr::RequestContext*)NULL,
248  data, Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1);
249  *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback));
250 
251  (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
252  status = io_mgr.AddWriteRange(writer, *new_range);
253  EXPECT_TRUE(status.ok());
254 
255  // Write to a bad location in a file that exists.
256  tmp_file = "/tmp/disk_io_mgr_test.txt";
257  int success = CreateTempFile(tmp_file.c_str(), 100);
258  if (success != 0) {
259  LOG(ERROR) << "Error creating temp file " << tmp_file.c_str() << " of size 100";
260  EXPECT_TRUE(false);
261  }
262 
263  new_range = pool_->Add(new DiskIoMgr::WriteRange*);
264  callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
265  new_range, (DiskIoMgr*)NULL, (DiskIoMgr::RequestContext*)NULL,
266  data, Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1);
267 
268  *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, -1, 0, callback));
269  (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
270  status = io_mgr.AddWriteRange(writer, *new_range);
271  EXPECT_TRUE(status.ok());
272 
273  {
274  unique_lock<mutex> lock(written_mutex_);
275  while (num_ranges_written_ < 2) writes_done_.wait(lock);
276  }
277  num_ranges_written_ = 0;
278  io_mgr.UnregisterContext(writer);
279 }
280 
281 // Issue a number of writes, cancel the writer context and issue more writes.
282 // AddWriteRange() is expected to succeed before the cancel and fail after it.
283 // The writes themselves may finish with status cancelled or ok.
284 TEST_F(DiskIoMgrTest, SingleWriterCancel) {
285  MemTracker mem_tracker(LARGE_MEM_LIMIT);
286  num_ranges_written_ = 0;
287  string tmp_file = "/tmp/disk_io_mgr_test.txt";
288  int num_ranges = 100;
289  int num_ranges_before_cancel = 25;
290  int64_t file_size = 1024 * 1024;
291  int64_t cur_offset = 0;
292  int success = CreateTempFile(tmp_file.c_str(), file_size);
293  if (success != 0) {
294  LOG(ERROR) << "Error creating temp file " << tmp_file.c_str() << " of size " <<
295  file_size;
296  EXPECT_TRUE(false);
297  }
298 
299  scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 10));
300  MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
301  Status status = read_io_mgr->Init(&reader_mem_tracker);
302  ASSERT_TRUE(status.ok());
304  status = read_io_mgr->RegisterContext(&reader, &reader_mem_tracker);
305  ASSERT_TRUE(status.ok());
306  for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
307  for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
308  pool_.reset(new ObjectPool);
309  DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10);
310  status = io_mgr.Init(&mem_tracker);
312  io_mgr.RegisterContext(&writer, &mem_tracker);
313  Status validate_status = Status::OK;
314  for (int i = 0; i < num_ranges; ++i) {
315  if (i == num_ranges_before_cancel) {
316  io_mgr.CancelContext(writer);
317  validate_status = Status::CANCELLED;
318  }
319  int32_t* data = pool_->Add(new int32_t);
320  *data = rand();
321  DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*);
323  bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this,
324  num_ranges_before_cancel, new_range, read_io_mgr.get(), reader, data,
325  Status::CANCELLED, _1);
326  *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset,
327  num_ranges % num_disks, callback));
328  (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
329  cur_offset += sizeof(int32_t);
330  Status add_status = io_mgr.AddWriteRange(writer, *new_range);
331  EXPECT_TRUE(add_status.code() == validate_status.code());
332  }
333 
334  {
335  unique_lock<mutex> lock(written_mutex_);
336  while (num_ranges_written_ < num_ranges_before_cancel) writes_done_.wait(lock);
337  }
338  num_ranges_written_ = 0;
339  io_mgr.UnregisterContext(writer);
340  }
341  }
342 
343  read_io_mgr->UnregisterContext(reader);
344  read_io_mgr.reset();
345 }
346 
347 // Basic test with a single reader, testing multiple threads, disks and a different
348 // number of buffers.
349 TEST_F(DiskIoMgrTest, SingleReader) {
350  MemTracker mem_tracker(LARGE_MEM_LIMIT);
351  const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
352  const char* data = "abcdefghijklm";
353  int len = strlen(data);
354  CreateTempFile(tmp_file, data);
355 
356  // Get mtime for file
357  struct stat stat_val;
358  stat(tmp_file, &stat_val);
359 
360  int64_t iters = 0;
361  for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
362  for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
363  for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
364  for (int num_read_threads = 1; num_read_threads <= 5; ++num_read_threads) {
365  pool_.reset(new ObjectPool);
366  LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
367  << " num_disk=" << num_disks << " num_buffers=" << num_buffers
368  << " num_read_threads=" << num_read_threads;
369 
370  if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
371  DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 1);
372 
373  Status status = io_mgr.Init(&mem_tracker);
374  ASSERT_TRUE(status.ok());
375  MemTracker reader_mem_tracker;
377  status = io_mgr.RegisterContext(&reader, &reader_mem_tracker);
378  ASSERT_TRUE(status.ok());
379 
380  vector<DiskIoMgr::ScanRange*> ranges;
381  for (int i = 0; i < len; ++i) {
382  int disk_id = i % num_disks;
383  ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id,
384  stat_val.st_mtime));
385  }
386  status = io_mgr.AddScanRanges(reader, ranges);
387  ASSERT_TRUE(status.ok());
388 
389  AtomicInt<int> num_ranges_processed;
390  thread_group threads;
391  for (int i = 0; i < num_read_threads; ++i) {
392  threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
393  len, Status::OK, 0, &num_ranges_processed));
394  }
395  threads.join_all();
396 
397  EXPECT_EQ(num_ranges_processed, ranges.size());
398  io_mgr.UnregisterContext(reader);
399  EXPECT_EQ(reader_mem_tracker.consumption(), 0);
400  }
401  }
402  }
403  }
404  EXPECT_EQ(mem_tracker.consumption(), 0);
405 }
406 
407 // This test issues adding additional scan ranges while there are some still in flight.
408 TEST_F(DiskIoMgrTest, AddScanRangeTest) {
409  MemTracker mem_tracker(LARGE_MEM_LIMIT);
410  const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
411  const char* data = "abcdefghijklm";
412  int len = strlen(data);
413  CreateTempFile(tmp_file, data);
414 
415  // Get mtime for file
416  struct stat stat_val;
417  stat(tmp_file, &stat_val);
418 
419  int64_t iters = 0;
420  for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
421  for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
422  for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
423  pool_.reset(new ObjectPool);
424  LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
425  << " num_disk=" << num_disks << " num_buffers=" << num_buffers;
426 
427  if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
428  DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 1);
429 
430  Status status = io_mgr.Init(&mem_tracker);
431  ASSERT_TRUE(status.ok());
432  MemTracker reader_mem_tracker;
434  status = io_mgr.RegisterContext(&reader, &reader_mem_tracker);
435  ASSERT_TRUE(status.ok());
436 
437  vector<DiskIoMgr::ScanRange*> ranges_first_half;
438  vector<DiskIoMgr::ScanRange*> ranges_second_half;
439  for (int i = 0; i < len; ++i) {
440  int disk_id = i % num_disks;
441  if (i > len / 2) {
442  ranges_second_half.push_back(
443  InitRange(num_buffers, tmp_file, i, 1, disk_id,
444  stat_val.st_mtime));
445  } else {
446  ranges_first_half.push_back(InitRange(num_buffers, tmp_file, i, 1, disk_id,
447  stat_val.st_mtime));
448  }
449  }
450  AtomicInt<int> num_ranges_processed;
451 
452  // Issue first half the scan ranges.
453  status = io_mgr.AddScanRanges(reader, ranges_first_half);
454  ASSERT_TRUE(status.ok());
455 
456  // Read a couple of them
457  ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK, 2,
458  &num_ranges_processed);
459 
460  // Issue second half
461  status = io_mgr.AddScanRanges(reader, ranges_second_half);
462  ASSERT_TRUE(status.ok());
463 
464  // Start up some threads and then cancel
465  thread_group threads;
466  for (int i = 0; i < 3; ++i) {
467  threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
468  strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
469  }
470 
471  threads.join_all();
472  EXPECT_EQ(num_ranges_processed, len);
473  io_mgr.UnregisterContext(reader);
474  EXPECT_EQ(reader_mem_tracker.consumption(), 0);
475  }
476  }
477  }
478  EXPECT_EQ(mem_tracker.consumption(), 0);
479 }
480 
481 // Test to make sure that sync reads and async reads work together
482 // Note: this test is constructed so the number of buffers is greater than the
483 // number of scan ranges.
484 TEST_F(DiskIoMgrTest, SyncReadTest) {
485  MemTracker mem_tracker(LARGE_MEM_LIMIT);
486  const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
487  const char* data = "abcdefghijklm";
488  int len = strlen(data);
489  CreateTempFile(tmp_file, data);
490 
491  // Get mtime for file
492  struct stat stat_val;
493  stat(tmp_file, &stat_val);
494 
495  int64_t iters = 0;
496  for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
497  for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
498  for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
499  pool_.reset(new ObjectPool);
500  LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
501  << " num_disk=" << num_disks << " num_buffers=" << num_buffers;
502 
503  if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
504  DiskIoMgr io_mgr(
505  num_disks, num_threads_per_disk, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
506 
507  Status status = io_mgr.Init(&mem_tracker);
508  ASSERT_TRUE(status.ok());
509  MemTracker reader_mem_tracker;
511  status = io_mgr.RegisterContext(&reader, &reader_mem_tracker);
512  ASSERT_TRUE(status.ok());
513 
514  DiskIoMgr::ScanRange* complete_range = InitRange(1, tmp_file, 0, strlen(data), 0,
515  stat_val.st_mtime);
516 
517  // Issue some reads before the async ones are issued
518  ValidateSyncRead(&io_mgr, reader, complete_range, data);
519  ValidateSyncRead(&io_mgr, reader, complete_range, data);
520 
521  vector<DiskIoMgr::ScanRange*> ranges;
522  for (int i = 0; i < len; ++i) {
523  int disk_id = i % num_disks;
524  ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id,
525  stat_val.st_mtime));
526  }
527  status = io_mgr.AddScanRanges(reader, ranges);
528  ASSERT_TRUE(status.ok());
529 
530  AtomicInt<int> num_ranges_processed;
531  thread_group threads;
532  for (int i = 0; i < 5; ++i) {
533  threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
534  strlen(data), Status::OK, 0, &num_ranges_processed));
535  }
536 
537  // Issue some more sync ranges
538  for (int i = 0; i < 5; ++i) {
539  sched_yield();
540  ValidateSyncRead(&io_mgr, reader, complete_range, data);
541  }
542 
543  threads.join_all();
544 
545  ValidateSyncRead(&io_mgr, reader, complete_range, data);
546  ValidateSyncRead(&io_mgr, reader, complete_range, data);
547 
548  EXPECT_EQ(num_ranges_processed, ranges.size());
549  io_mgr.UnregisterContext(reader);
550  EXPECT_EQ(reader_mem_tracker.consumption(), 0);
551  }
552  }
553  }
554  EXPECT_EQ(mem_tracker.consumption(), 0);
555 }
556 
557 // Tests a single reader cancelling half way through scan ranges.
558 TEST_F(DiskIoMgrTest, SingleReaderCancel) {
559  MemTracker mem_tracker(LARGE_MEM_LIMIT);
560  const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
561  const char* data = "abcdefghijklm";
562  int len = strlen(data);
563  CreateTempFile(tmp_file, data);
564 
565  // Get mtime for file
566  struct stat stat_val;
567  stat(tmp_file, &stat_val);
568 
569  int64_t iters = 0;
570  for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
571  for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
572  for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
573  pool_.reset(new ObjectPool);
574  LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
575  << " num_disk=" << num_disks << " num_buffers=" << num_buffers;
576 
577  if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
578  DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 1);
579 
580  Status status = io_mgr.Init(&mem_tracker);
581  ASSERT_TRUE(status.ok());
582  MemTracker reader_mem_tracker;
584  status = io_mgr.RegisterContext(&reader, &reader_mem_tracker);
585  ASSERT_TRUE(status.ok());
586 
587  vector<DiskIoMgr::ScanRange*> ranges;
588  for (int i = 0; i < len; ++i) {
589  int disk_id = i % num_disks;
590  ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id,
591  stat_val.st_mtime));
592  }
593  status = io_mgr.AddScanRanges(reader, ranges);
594  ASSERT_TRUE(status.ok());
595 
596  AtomicInt<int> num_ranges_processed;
597  int num_succesful_ranges = ranges.size() / 2;
598  // Read half the ranges
599  for (int i = 0; i < num_succesful_ranges; ++i) {
600  ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK, 1,
601  &num_ranges_processed);
602  }
603  EXPECT_EQ(num_ranges_processed, num_succesful_ranges);
604 
605  // Start up some threads and then cancel
606  thread_group threads;
607  for (int i = 0; i < 3; ++i) {
608  threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
609  strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
610  }
611 
612  io_mgr.CancelContext(reader);
613  sched_yield();
614 
615  threads.join_all();
616  EXPECT_TRUE(io_mgr.context_status(reader).IsCancelled());
617  io_mgr.UnregisterContext(reader);
618  EXPECT_EQ(reader_mem_tracker.consumption(), 0);
619  }
620  }
621  }
622  EXPECT_EQ(mem_tracker.consumption(), 0);
623 }
624 
625 // Test when the reader goes over the mem limit
626 TEST_F(DiskIoMgrTest, MemLimits) {
627  const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
628  const char* data = "abcdefghijklm";
629  int len = strlen(data);
630  CreateTempFile(tmp_file, data);
631 
632  // Get mtime for file
633  struct stat stat_val;
634  stat(tmp_file, &stat_val);
635 
636  const int num_buffers = 25;
637  // Give the reader more buffers than the limit
638  const int mem_limit_num_buffers = 2;
639 
640  int64_t iters = 0;
641  {
642  pool_.reset(new ObjectPool);
643  if (++iters % 1000 == 0) LOG(ERROR) << "Starting iteration " << iters;
644 
645  MemTracker mem_tracker(mem_limit_num_buffers * MAX_BUFFER_SIZE);
646  DiskIoMgr io_mgr(1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
647 
648  Status status = io_mgr.Init(&mem_tracker);
649  ASSERT_TRUE(status.ok());
650  MemTracker reader_mem_tracker;
652  status = io_mgr.RegisterContext(&reader, &reader_mem_tracker);
653  ASSERT_TRUE(status.ok());
654 
655  vector<DiskIoMgr::ScanRange*> ranges;
656  for (int i = 0; i < num_buffers; ++i) {
657  ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, 0,
658  stat_val.st_mtime));
659  }
660  status = io_mgr.AddScanRanges(reader, ranges);
661  ASSERT_TRUE(status.ok());
662 
663  // Don't return buffers to force memory pressure
664  vector<DiskIoMgr::BufferDescriptor*> buffers;
665 
666  AtomicInt<int> num_ranges_processed;
667  ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::MEM_LIMIT_EXCEEDED,
668  1, &num_ranges_processed);
669 
670  char result[strlen(data) + 1];
671  // Keep reading new ranges without returning buffers. This forces us
672  // to go over the limit eventually.
673  while (true) {
674  memset(result, 0, strlen(data) + 1);
675  DiskIoMgr::ScanRange* range = NULL;
676  status = io_mgr.GetNextRange(reader, &range);
677  ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
678  if (range == NULL) break;
679 
680  while (true) {
681  DiskIoMgr::BufferDescriptor* buffer = NULL;
682  Status status = range->GetNext(&buffer);
683  ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
684  if (buffer == NULL) break;
685  memcpy(result + range->offset() + buffer->scan_range_offset(),
686  buffer->buffer(), buffer->len());
687  buffers.push_back(buffer);
688  }
689  ValidateEmptyOrCorrect(data, result, strlen(data));
690  }
691 
692  for (int i = 0; i < buffers.size(); ++i) {
693  buffers[i]->Return();
694  }
695 
696  EXPECT_TRUE(io_mgr.context_status(reader).IsMemLimitExceeded());
697  io_mgr.UnregisterContext(reader);
698  EXPECT_EQ(reader_mem_tracker.consumption(), 0);
699  }
700 }
701 
702 // Test when some scan ranges are marked as being cached.
703 // Since these files are not in HDFS, the cached path always fails so this
704 // only tests the fallback mechanism.
705 // TODO: we can fake the cached read path without HDFS
706 TEST_F(DiskIoMgrTest, CachedReads) {
707  MemTracker mem_tracker(LARGE_MEM_LIMIT);
708  const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
709  const char* data = "abcdefghijklm";
710  int len = strlen(data);
711  CreateTempFile(tmp_file, data);
712 
713  // Get mtime for file
714  struct stat stat_val;
715  stat(tmp_file, &stat_val);
716 
717  const int num_disks = 2;
718  const int num_buffers = 3;
719 
720  int64_t iters = 0;
721  {
722  pool_.reset(new ObjectPool);
723  if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
724  DiskIoMgr io_mgr(num_disks, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
725 
726  Status status = io_mgr.Init(&mem_tracker);
727  ASSERT_TRUE(status.ok());
728  MemTracker reader_mem_tracker;
730  status = io_mgr.RegisterContext(&reader, &reader_mem_tracker);
731  ASSERT_TRUE(status.ok());
732 
733  DiskIoMgr::ScanRange* complete_range =
734  InitRange(1, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, NULL, true);
735 
736  // Issue some reads before the async ones are issued
737  ValidateSyncRead(&io_mgr, reader, complete_range, data);
738  ValidateSyncRead(&io_mgr, reader, complete_range, data);
739 
740  vector<DiskIoMgr::ScanRange*> ranges;
741  for (int i = 0; i < len; ++i) {
742  int disk_id = i % num_disks;
743  ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id,
744  stat_val.st_mtime, NULL, true));
745  }
746  status = io_mgr.AddScanRanges(reader, ranges);
747  ASSERT_TRUE(status.ok());
748 
749  AtomicInt<int> num_ranges_processed;
750  thread_group threads;
751  for (int i = 0; i < 5; ++i) {
752  threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
753  strlen(data), Status::OK, 0, &num_ranges_processed));
754  }
755 
756  // Issue some more sync ranges
757  for (int i = 0; i < 5; ++i) {
758  sched_yield();
759  ValidateSyncRead(&io_mgr, reader, complete_range, data);
760  }
761 
762  threads.join_all();
763 
764  ValidateSyncRead(&io_mgr, reader, complete_range, data);
765  ValidateSyncRead(&io_mgr, reader, complete_range, data);
766 
767  EXPECT_EQ(num_ranges_processed, ranges.size());
768  io_mgr.UnregisterContext(reader);
769  EXPECT_EQ(reader_mem_tracker.consumption(), 0);
770  }
771  EXPECT_EQ(mem_tracker.consumption(), 0);
772 }
773 
774 TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
775  MemTracker mem_tracker(LARGE_MEM_LIMIT);
776  const int ITERATIONS = 1;
777  const char* data = "abcdefghijklmnopqrstuvwxyz";
778  const int num_contexts = 5;
779  const int file_size = 4 * 1024;
780  const int num_writes_queued = 5;
781  const int num_reads_queued = 5;
782 
783  string file_name = "/tmp/disk_io_mgr_test.txt";
784  int success = CreateTempFile(file_name.c_str(), file_size);
785  if (success != 0) {
786  LOG(ERROR) << "Error creating temp file " << file_name.c_str() << " of size " <<
787  file_size;
788  ASSERT_TRUE(false);
789  }
790 
791  // Get mtime for file
792  struct stat stat_val;
793  stat(file_name.c_str(), &stat_val);
794 
795  int64_t iters = 0;
796  vector<DiskIoMgr::RequestContext*> contexts(num_contexts);
797  Status status;
798  for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
799  for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
800  for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
801  DiskIoMgr io_mgr(num_disks, threads_per_disk, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
802  io_mgr.Init(&mem_tracker);
803  for (int file_index = 0; file_index < num_contexts; ++file_index) {
804  status = io_mgr.RegisterContext(&contexts[file_index]);
805  ASSERT_TRUE(status.ok());
806  }
807  pool_.reset(new ObjectPool);
808  int read_offset = 0;
809  int write_offset = 0;
810  while (read_offset < file_size) {
811  for (int context_index = 0; context_index < num_contexts; ++context_index) {
812  if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
813  AtomicInt<int> num_ranges_processed;
814  thread_group threads;
815  vector<DiskIoMgr::ScanRange*> ranges;
816  int num_scan_ranges = min<int>(num_reads_queued, write_offset - read_offset);
817  for (int i = 0; i < num_scan_ranges; ++i) {
818  ranges.push_back(InitRange(1, file_name.c_str(), read_offset, 1,
819  i % num_disks, stat_val.st_mtime));
820  threads.add_thread(new thread(ScanRangeThread, &io_mgr,
821  contexts[context_index],
822  reinterpret_cast<const char*>(data + (read_offset % strlen(data))), 1,
823  Status::OK, num_scan_ranges, &num_ranges_processed));
824  ++read_offset;
825  }
826 
827  num_ranges_written_ = 0;
828  int num_write_ranges = min<int>(num_writes_queued, file_size - write_offset);
829  for (int i = 0; i < num_write_ranges; ++i) {
832  this, num_write_ranges, _1);
833  DiskIoMgr::WriteRange* new_range = pool_->Add(
834  new DiskIoMgr::WriteRange(file_name,
835  write_offset, i % num_disks, callback));
836  new_range->SetData(reinterpret_cast<const uint8_t*>
837  (data + (write_offset % strlen(data))), 1);
838  status = io_mgr.AddWriteRange(contexts[context_index], new_range);
839  ++write_offset;
840  }
841 
842  {
843  unique_lock<mutex> lock(written_mutex_);
844  while (num_ranges_written_ < num_write_ranges) writes_done_.wait(lock);
845  }
846 
847  threads.join_all();
848  } // for (int context_index
849  } // while (read_offset < file_size)
850 
851 
852  for (int file_index = 0; file_index < num_contexts; ++file_index) {
853  io_mgr.UnregisterContext(contexts[file_index]);
854  }
855  } // for (int num_disks
856  } // for (int threads_per_disk
857  } // for (int iteration
858 }
859 
860 // This test will test multiple concurrent reads each reading a different file.
861 TEST_F(DiskIoMgrTest, MultipleReader) {
862  MemTracker mem_tracker(LARGE_MEM_LIMIT);
863  const int NUM_READERS = 5;
864  const int DATA_LEN = 50;
865  const int ITERATIONS = 25;
866  const int NUM_THREADS_PER_READER = 3;
867 
868  vector<string> file_names;
869  vector<int64_t> mtimes;
870  vector<string> data;
871  vector<DiskIoMgr::RequestContext*> readers;
872  vector<char*> results;
873 
874  file_names.resize(NUM_READERS);
875  readers.resize(NUM_READERS);
876  mtimes.resize(NUM_READERS);
877  data.resize(NUM_READERS);
878  results.resize(NUM_READERS);
879 
880  // Initialize data for each reader. The data will be
881  // 'abcd...' for reader one, 'bcde...' for reader two (wrapping around at 'z')
882  for (int i = 0; i < NUM_READERS; ++i) {
883  char buf[DATA_LEN];
884  for (int j = 0; j < DATA_LEN; ++j) {
885  int c = (j + i) % 26;
886  buf[j] = 'a' + c;
887  }
888  data[i] = string(buf, DATA_LEN);
889 
890  stringstream ss;
891  ss << "/tmp/disk_io_mgr_test" << i << ".txt";
892  file_names[i] = ss.str();
893  CreateTempFile(ss.str().c_str(), data[i].c_str());
894 
895  // Get mtime for file
896  struct stat stat_val;
897  stat(file_names[i].c_str(), &stat_val);
898  mtimes[i] = stat_val.st_mtime;
899 
900  results[i] = new char[DATA_LEN + 1];
901  memset(results[i], 0, DATA_LEN + 1);
902  }
903 
904  // This exercises concurrency, run the test multiple times
905  int64_t iters = 0;
906  for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
907  for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
908  for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
909  for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
910  pool_.reset(new ObjectPool);
911  LOG(INFO) << "Starting test with num_threads_per_disk=" << threads_per_disk
912  << " num_disk=" << num_disks << " num_buffers=" << num_buffers;
913  if (++iters % 2500 == 0) LOG(ERROR) << "Starting iteration " << iters;
914 
915  DiskIoMgr io_mgr(num_disks, threads_per_disk, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
916  Status status = io_mgr.Init(&mem_tracker);
917  ASSERT_TRUE(status.ok());
918 
919  for (int i = 0; i < NUM_READERS; ++i) {
920  status = io_mgr.RegisterContext(&readers[i], NULL);
921  ASSERT_TRUE(status.ok());
922 
923  vector<DiskIoMgr::ScanRange*> ranges;
924  for (int j = 0; j < DATA_LEN; ++j) {
925  int disk_id = j % num_disks;
926  ranges.push_back(
927  InitRange(num_buffers,file_names[i].c_str(), j, 1, disk_id,
928  mtimes[i]));
929  }
930  status = io_mgr.AddScanRanges(readers[i], ranges);
931  ASSERT_TRUE(status.ok());
932  }
933 
934  AtomicInt<int> num_ranges_processed;
935  thread_group threads;
936  for (int i = 0; i < NUM_READERS; ++i) {
937  for (int j = 0; j < NUM_THREADS_PER_READER; ++j) {
938  threads.add_thread(new thread(ScanRangeThread, &io_mgr, readers[i],
939  data[i].c_str(), data[i].size(), Status::OK, 0,
940  &num_ranges_processed));
941  }
942  }
943  threads.join_all();
944  EXPECT_EQ(num_ranges_processed, DATA_LEN * NUM_READERS);
945  for (int i = 0; i < NUM_READERS; ++i) {
946  io_mgr.UnregisterContext(readers[i]);
947  }
948  }
949  }
950  }
951  }
952  EXPECT_EQ(mem_tracker.consumption(), 0);
953 }
954 
955 // Stress test for multiple clients with cancellation
956 // TODO: the stress app should be expanded to include sync reads and adding scan
957 // ranges in the middle.
958 TEST_F(DiskIoMgrTest, StressTest) {
959  // Run the test with 5 disks, 5 threads per disk, 10 clients and with cancellation
960  DiskIoMgrStress test(5, 5, 10, true);
961  test.Run(2); // In seconds
962 }
963 
965  // Test default min/max buffer size
966  int min_buffer_size = 1024;
967  int max_buffer_size = 8 * 1024 * 1024; // 8 MB
968  MemTracker mem_tracker(max_buffer_size * 2);
969 
970  DiskIoMgr io_mgr(1, 1, min_buffer_size, max_buffer_size);
971  Status status = io_mgr.Init(&mem_tracker);
972  ASSERT_TRUE(status.ok());
973  ASSERT_EQ(mem_tracker.consumption(), 0);
974 
975  // buffer length should be rounded up to min buffer size
976  int64_t buffer_len = 1;
977  char* buf = io_mgr.GetFreeBuffer(&buffer_len);
978  EXPECT_EQ(buffer_len, min_buffer_size);
979  EXPECT_EQ(io_mgr.num_allocated_buffers_, 1);
980  io_mgr.ReturnFreeBuffer(buf, buffer_len);
981  EXPECT_EQ(mem_tracker.consumption(), min_buffer_size);
982 
983  // reuse buffer
984  buffer_len = min_buffer_size;
985  buf = io_mgr.GetFreeBuffer(&buffer_len);
986  EXPECT_EQ(buffer_len, min_buffer_size);
987  EXPECT_EQ(io_mgr.num_allocated_buffers_, 1);
988  io_mgr.ReturnFreeBuffer(buf, buffer_len);
989  EXPECT_EQ(mem_tracker.consumption(), min_buffer_size);
990 
991  // bump up to next buffer size
992  buffer_len = min_buffer_size + 1;
993  buf = io_mgr.GetFreeBuffer(&buffer_len);
994  EXPECT_EQ(buffer_len, min_buffer_size * 2);
995  EXPECT_EQ(io_mgr.num_allocated_buffers_, 2);
996  EXPECT_EQ(mem_tracker.consumption(), min_buffer_size * 3);
997 
998  // gc unused buffer
999  io_mgr.GcIoBuffers();
1000  EXPECT_EQ(io_mgr.num_allocated_buffers_, 1);
1001  EXPECT_EQ(mem_tracker.consumption(), min_buffer_size * 2);
1002 
1003  io_mgr.ReturnFreeBuffer(buf, buffer_len);
1004 
1005  // max buffer size
1006  buffer_len = max_buffer_size;
1007  buf = io_mgr.GetFreeBuffer(&buffer_len);
1008  EXPECT_EQ(buffer_len, max_buffer_size);
1009  EXPECT_EQ(io_mgr.num_allocated_buffers_, 2);
1010  io_mgr.ReturnFreeBuffer(buf, buffer_len);
1011  EXPECT_EQ(mem_tracker.consumption(), min_buffer_size * 2 + max_buffer_size);
1012 
1013  // gc buffers
1014  io_mgr.GcIoBuffers();
1015  EXPECT_EQ(io_mgr.num_allocated_buffers_, 0);
1016  EXPECT_EQ(mem_tracker.consumption(), 0);
1017 }
1018 
1019 }
1020 
1021 int main(int argc, char **argv) {
1022  google::InitGoogleLogging(argv[0]);
1023  ::testing::InitGoogleTest(&argc, argv);
1027  return RUN_ALL_TESTS();
1028 }
static void ValidateEmptyOrCorrect(const char *expected, const char *buffer, int len)
condition_variable writes_done_
void CancelContext(RequestContext *context, bool wait_for_disks_completion=false)
Definition: disk-io-mgr.cc:377
int64_t consumption() const
Returns the memory consumed in bytes.
Definition: mem-tracker.h:298
TEST_F(InstructionCounterTest, Count)
const int MIN_BUFFER_SIZE
boost::function< void(const Status &)> WriteDoneCallback
Definition: disk-io-mgr.h:464
int main(int argc, char **argv)
AtomicInt< int > num_allocated_buffers_
Total number of allocated buffers, used for debugging.
Definition: disk-io-mgr.h:693
static void ScanRangeThread(DiskIoMgr *io_mgr, DiskIoMgr::RequestContext *reader, const char *expected_result, int expected_len, const Status &expected_status, int max_ranges, AtomicInt< int > *num_ranges_processed)
DiskIoMgr::ScanRange * InitRange(int num_buffers, const char *file_path, int offset, int len, int disk_id, int64_t mtime, void *meta_data=NULL, bool is_cached=false)
const int MAX_BUFFER_SIZE
Status AddWriteRange(RequestContext *writer, WriteRange *write_range)
int CreateTempFile(const char *filename, int file_size)
static void Init()
Initialize DiskInfo. Just be called before any other functions.
Definition: disk-info.cc:114
void UnregisterContext(RequestContext *context)
Definition: disk-io-mgr.cc:344
Status AddScanRanges(RequestContext *reader, const std::vector< ScanRange * > &ranges, bool schedule_immediately=false)
Definition: disk-io-mgr.cc:455
const int ITERATIONS
Status GetNextRange(RequestContext *reader, ScanRange **range)
Definition: disk-io-mgr.cc:501
static void CreateTempFile(const char *filename, const char *data)
int64_t scan_range_offset() const
Returns the offset within the scan range that this buffer starts at.
Definition: disk-io-mgr.h:205
const int LARGE_MEM_LIMIT
Status Read(RequestContext *reader, ScanRange *range, BufferDescriptor **buffer)
Definition: disk-io-mgr.cc:555
void SetData(const uint8_t *buffer, int64_t len)
Definition: disk-io-mgr.cc:205
void WriteCompleteCallback(int num_writes, const Status &status)
TErrorCode::type code() const
Definition: status.h:226
bool IsCancelled() const
Definition: status.h:174
void InitThreading()
Initialises the threading subsystem. Must be called before a Thread is created.
Definition: thread.cc:261
void ReturnFreeBuffer(char *buffer, int64_t buffer_size)
Definition: disk-io-mgr.cc:696
void CreateTempFile(const char *filename, const char *data)
void Reset(hdfsFS fs, const char *file, int64_t len, int64_t offset, int disk_id, bool try_cache, bool expected_local, int64_t mtime, void *metadata=NULL)
This class is thread-safe.
Definition: mem-tracker.h:61
scoped_ptr< ObjectPool > pool_
static const Status CANCELLED
Definition: status.h:88
static const int64_t NEVER_CACHE
If the mtime is set to NEVER_CACHE, the file handle should never be cached.
Definition: disk-io-mgr.h:299
static void ValidateSyncRead(DiskIoMgr *io_mgr, DiskIoMgr::RequestContext *reader, DiskIoMgr::ScanRange *range, const char *expected, int expected_len=-1)
Status Init(MemTracker *process_mem_tracker)
Initialize the IoMgr. Must be called once before any of the other APIs.
Definition: disk-io-mgr.cc:296
void WriteValidateCallback(int num_writes, DiskIoMgr::WriteRange **written_range, DiskIoMgr *io_mgr, DiskIoMgr::RequestContext *reader, int32_t *data, Status expected_status, const Status &status)
uint64_t Test(T *ht, const ProbeTuple *input, uint64_t num_tuples)
static const Status MEM_LIMIT_EXCEEDED
Definition: status.h:89
Status RegisterContext(RequestContext **request_context, MemTracker *reader_mem_tracker=NULL)
Definition: disk-io-mgr.cc:336
char * GetFreeBuffer(int64_t *buffer_size)
Definition: disk-io-mgr.cc:627
Status GetNext(BufferDescriptor **buffer)
static const Status OK
Definition: status.h:87
uint8_t offset[7 *64-sizeof(uint64_t)]
static void Init()
Initialize CpuInfo.
Definition: cpu-info.cc:75
bool ok() const
Definition: status.h:172
void Run(int sec)
Run the test for 'sec'. If 0, run forever.
static void ValidateScanRange(DiskIoMgr::ScanRange *range, const char *expected, int expected_len, const Status &expected_status)
bool IsMemLimitExceeded() const
Definition: status.h:178
Status context_status(RequestContext *context) const
Definition: disk-io-mgr.cc:411