Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
buffered-block-mgr-test.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <boost/scoped_ptr.hpp>
16 #include <boost/bind.hpp>
17 #include <boost/thread/thread.hpp>
18 #include <boost/filesystem.hpp>
19 #include <boost/date_time/posix_time/posix_time.hpp>
20 
21 #include <gtest/gtest.h>
22 
23 #include "common/init.h"
24 #include "codegen/llvm-codegen.h"
25 #include "runtime/disk-io-mgr.h"
27 #include "runtime/exec-env.h"
28 #include "runtime/mem-tracker.h"
29 #include "runtime/runtime-state.h"
30 #include "runtime/tmp-file-mgr.h"
31 #include "service/fe-support.h"
32 #include "util/disk-info.h"
33 #include "util/cpu-info.h"
34 #include "util/promise.h"
35 #include "util/test-info.h"
36 #include "util/time.h"
37 
38 #include "gen-cpp/Types_types.h"
39 #include "gen-cpp/ImpalaInternalService_types.h"
40 
41 #include "common/names.h"
42 
43 using boost::filesystem::directory_iterator;
44 
45 // Note: This is the default scratch dir created by impala.
46 // FLAGS_scratch_dirs + TmpFileMgr::TMP_SUB_DIR_NAME.
47 const string SCRATCH_DIR = "/tmp/impala-scratch";
48 
49 DECLARE_bool(disk_spill_encryption);
50 
51 namespace impala {
52 
54  protected:
55  const static int block_size_ = 1024;
56 
57  virtual void SetUp() {
58  exec_env_.reset(new ExecEnv);
59  exec_env_->InitForFeTests();
60  io_mgr_tracker_.reset(new MemTracker(-1));
62  exec_env_->disk_io_mgr()->Init(io_mgr_tracker_.get());
63  runtime_state_.reset(
64  new RuntimeState(TPlanFragmentInstanceCtx(), "", exec_env_.get()));
65  }
66 
67  virtual void TearDown() {
69  runtime_state_.reset();
70  exec_env_.reset();
71  io_mgr_tracker_.reset();
72  }
73 
74  static void GetFreeBlock(BufferedBlockMgr* block_mgr, BufferedBlockMgr::Client* client,
75  BufferedBlockMgr::Block** new_block, Promise<bool>* promise) {
76  block_mgr->GetNewBlock(client, NULL, new_block);
77  promise->Set(true);
78  }
79 
80  static void ValidateBlock(BufferedBlockMgr::Block* block, int32_t data) {
81  EXPECT_TRUE(block->valid_data_len() == sizeof(int32_t));
82  EXPECT_TRUE(*reinterpret_cast<int32_t*>(block->buffer()) == data);
83  }
84 
85  static int32_t* MakeRandomSizeData(BufferedBlockMgr::Block* block) {
86  // Format is int32_t size, followed by size bytes of data
87  int32_t size = (rand() % 252) + 4; // So blocks have 4-256 bytes of data
88  uint8_t* data = block->Allocate<uint8_t>(size);
89  *(reinterpret_cast<int32_t*>(data)) = size;
90  int i;
91  for (i = 4; i < size-5; ++i) {
92  data[i] = i;
93  }
94  for (; i < size; ++i) { // End marker of at least 5 0xff's
95  data[i] = 0xff;
96  }
97  return reinterpret_cast<int32_t*>(data); // Really returns a pointer to size
98  }
99 
100  static void ValidateRandomSizeData(BufferedBlockMgr::Block* block, int32_t size) {
101  int32_t bsize = *(reinterpret_cast<int32_t*>(block->buffer()));
102  uint8_t* data = reinterpret_cast<uint8_t*>(block->buffer());
103  int i;
104  EXPECT_EQ(block->valid_data_len(), size);
105  EXPECT_EQ(size, bsize);
106  for (i = 4; i < size-5; ++i) {
107  EXPECT_EQ(data[i], i);
108  }
109  for (; i < size; ++i) {
110  EXPECT_EQ(data[i], 0xff);
111  }
112  }
113 
114  shared_ptr<BufferedBlockMgr> CreateMgr(int max_buffers) {
115  shared_ptr<BufferedBlockMgr> mgr;
117  block_mgr_parent_tracker_.get(), runtime_state_->runtime_profile(),
118  max_buffers * block_size_, block_size_, &mgr);
119  EXPECT_TRUE(mgr != NULL);
120  EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
121  return mgr;
122  }
123 
125  int num_blocks, vector<BufferedBlockMgr::Block*>* blocks) {
126  int32_t* data;
127  Status status;
128  BufferedBlockMgr::Block* new_block;
129  for (int i = 0; i < num_blocks; ++i) {
130  status = block_mgr->GetNewBlock(client, NULL, &new_block);
131  EXPECT_TRUE(status.ok());
132  EXPECT_TRUE(new_block != NULL);
133  data = new_block->Allocate<int32_t>(sizeof(int32_t));
134  *data = blocks->size();
135  blocks->push_back(new_block);
136  }
137  }
138 
139  // Test that randomly issues GetFreeBlock(), Pin(), Unpin(), Delete() and Close()
140  // calls. All calls made are legal - error conditions are not expected until the
141  // first call to Close(). This is called 2 times with encryption+integrity on/off.
142  // When executed in single-threaded mode 'tid' should be SINGLE_THREADED_TID.
143  static const int SINGLE_THREADED_TID = -1;
144  void TestRandomInternalImpl(BufferedBlockMgr* block_mgr, int num_buffers, int tid) {
145  DCHECK_NOTNULL(block_mgr);
146  const int num_iterations = 100000;
147  const int iters_before_close = num_iterations - 5000;
148  bool close_called = false;
149  unordered_map<BufferedBlockMgr::Block*, int> pinned_block_map;
150  vector<pair<BufferedBlockMgr::Block*, int32_t> > pinned_blocks;
151  unordered_map<BufferedBlockMgr::Block*, int> unpinned_block_map;
152  vector<pair<BufferedBlockMgr::Block*, int32_t> > unpinned_blocks;
153 
154  typedef enum { Pin, New, Unpin, Delete, Close } ApiFunction;
155  ApiFunction api_function;
156 
157  BufferedBlockMgr::Client* client;
158  Status status = block_mgr->RegisterClient(0, NULL, runtime_state_.get(), &client);
159  EXPECT_TRUE(status.ok());
160  EXPECT_TRUE(client != NULL);
161 
162  pinned_blocks.reserve(num_buffers);
163  BufferedBlockMgr::Block* new_block;
164  for (int i = 0; i < num_iterations; ++i) {
165  if ((i % 20000) == 0) LOG (ERROR) << " Iteration " << i << endl;
166  if (i > iters_before_close && (rand() % 5 == 0)) {
167  api_function = Close;
168  } else if (pinned_blocks.size() == 0 && unpinned_blocks.size() == 0) {
169  api_function = New;
170  } else if (pinned_blocks.size() == 0) {
171  // Pin or New. Can't unpin or delete.
172  api_function = static_cast<ApiFunction>(rand() % 2);
173  } else if (pinned_blocks.size() >= num_buffers) {
174  // Unpin or delete. Can't pin or get new.
175  api_function = static_cast<ApiFunction>(2 + (rand() % 2));
176  } else if (unpinned_blocks.size() == 0) {
177  // Can't pin. Unpin, new or delete.
178  api_function = static_cast<ApiFunction>(1 + (rand() % 3));
179  } else {
180  // Any api function.
181  api_function = static_cast<ApiFunction>(rand() % 4);
182  }
183 
184  pair<BufferedBlockMgr::Block*, int32_t> block_data;
185  int rand_pick = 0;
186  int32_t* data = NULL;
187  bool pinned = false;
188  switch (api_function) {
189  case New:
190  status = block_mgr->GetNewBlock(client, NULL, &new_block);
191  if (close_called || (tid != SINGLE_THREADED_TID && status.IsCancelled())) {
192  EXPECT_TRUE(new_block == NULL);
193  EXPECT_TRUE(status.IsCancelled());
194  continue;
195  }
196  EXPECT_TRUE(status.ok());
197  EXPECT_TRUE(new_block != NULL);
198  data = MakeRandomSizeData(new_block);
199  block_data = make_pair(new_block, *data);
200 
201  pinned_blocks.push_back(block_data);
202  pinned_block_map.insert(make_pair(block_data.first, pinned_blocks.size() - 1));
203  break;
204  case Pin:
205  rand_pick = rand() % unpinned_blocks.size();
206  block_data = unpinned_blocks[rand_pick];
207  status = block_data.first->Pin(&pinned);
208  if (close_called || (tid != SINGLE_THREADED_TID && status.IsCancelled())) {
209  EXPECT_TRUE(status.IsCancelled());
210  // In single-threaded runs the block should not have been pinned.
211  // In multi-threaded runs Pin() may return the block pinned but the status to
212  // be cancelled. In this case we could move the block from unpinned_blocks
213  // to pinned_blocks. We do not do that because after IsCancelled() no actual
214  // block operations should take place.
215  if (tid == SINGLE_THREADED_TID) EXPECT_FALSE(pinned);
216  continue;
217  }
218  EXPECT_TRUE(status.ok());
219  EXPECT_TRUE(pinned);
220  ValidateRandomSizeData(block_data.first, block_data.second);
221  unpinned_blocks[rand_pick] = unpinned_blocks.back();
222  unpinned_blocks.pop_back();
223  unpinned_block_map[unpinned_blocks[rand_pick].first] = rand_pick;
224 
225  pinned_blocks.push_back(block_data);
226  pinned_block_map.insert(make_pair(block_data.first, pinned_blocks.size() - 1));
227  break;
228  case Unpin:
229  rand_pick = rand() % pinned_blocks.size();
230  block_data = pinned_blocks[rand_pick];
231  status = block_data.first->Unpin();
232  if (close_called || (tid != SINGLE_THREADED_TID && status.IsCancelled())) {
233  EXPECT_TRUE(status.IsCancelled());
234  continue;
235  }
236  EXPECT_TRUE(status.ok());
237  pinned_blocks[rand_pick] = pinned_blocks.back();
238  pinned_blocks.pop_back();
239  pinned_block_map[pinned_blocks[rand_pick].first] = rand_pick;
240 
241  unpinned_blocks.push_back(block_data);
242  unpinned_block_map.insert(make_pair(block_data.first,
243  unpinned_blocks.size() - 1));
244  break;
245  case Delete:
246  rand_pick = rand() % pinned_blocks.size();
247  block_data = pinned_blocks[rand_pick];
248  status = block_data.first->Delete();
249  if (close_called || (tid != SINGLE_THREADED_TID && status.IsCancelled())) {
250  EXPECT_TRUE(status.IsCancelled());
251  } else {
252  EXPECT_TRUE(status.ok());
253  }
254  pinned_blocks[rand_pick] = pinned_blocks.back();
255  pinned_blocks.pop_back();
256  pinned_block_map[pinned_blocks[rand_pick].first] = rand_pick;
257  break;
258  case Close:
259  block_mgr->Cancel();
260  close_called = true;
261  break;
262  } // end switch (apiFunction)
263  } // end for ()
264  }
265 
266  // Single-threaded execution of the TestRandomInternalImpl.
268  const int num_buffers = 10;
269  shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(num_buffers);
270  TestRandomInternalImpl(block_mgr.get(), num_buffers, SINGLE_THREADED_TID);
271  block_mgr.reset();
272  EXPECT_EQ(block_mgr_parent_tracker_->consumption(), 0);
273  }
274 
275  // Multi-threaded execution of the TestRandomInternalImpl.
276  void TestRandomInternalMulti(int num_threads) {
277  DCHECK_GT(num_threads, 0);
278  const int num_buffers = 10;
279  shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(num_buffers * num_threads);
280  thread_group workers;
281  for (int i = 0; i < num_threads; ++i) {
282  thread* t = new thread(bind(&BufferedBlockMgrTest::TestRandomInternalImpl, this,
283  block_mgr.get(), num_buffers, i));
284  workers.add_thread(t);
285  }
286  workers.join_all();
287  block_mgr.reset();
288  EXPECT_EQ(block_mgr_parent_tracker_->consumption(), 0);
289  }
290 
291  scoped_ptr<ExecEnv> exec_env_;
292  scoped_ptr<RuntimeState> runtime_state_;
293  scoped_ptr<MemTracker> block_mgr_parent_tracker_;
294  scoped_ptr<MemTracker> io_mgr_tracker_;
295 };
296 
298  int max_num_blocks = 5;
299  shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_blocks);
300  BufferedBlockMgr::Client* client;
301  Status status = block_mgr->RegisterClient(0, NULL, runtime_state_.get(), &client);
302  EXPECT_TRUE(status.ok());
303  EXPECT_EQ(block_mgr_parent_tracker_->consumption(), 0);
304 
305  // Allocate blocks until max_num_blocks, they should all succeed and memory
306  // usage should go up.
307  BufferedBlockMgr::Block* new_block;
308  BufferedBlockMgr::Block* first_block = NULL;
309  for (int i = 0; i < max_num_blocks; ++i) {
310  status = block_mgr->GetNewBlock(client, NULL, &new_block);
311  EXPECT_TRUE(new_block != NULL);
312  EXPECT_EQ(block_mgr->bytes_allocated(), (i + 1) * block_size_);
313  if (first_block == NULL) first_block = new_block;
314  }
315 
316  // Trying to allocate a new one should fail.
317  status = block_mgr->GetNewBlock(client, NULL, &new_block);
318  EXPECT_TRUE(new_block == NULL);
319  EXPECT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size_);
320 
321  // We can allocate a new block by transferring an already allocated one.
322  uint8_t* old_buffer = first_block->buffer();
323  status = block_mgr->GetNewBlock(client, first_block, &new_block);
324  EXPECT_TRUE(new_block != NULL);
325  EXPECT_TRUE(old_buffer == new_block->buffer());
326  EXPECT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size_);
327  EXPECT_TRUE(!first_block->is_pinned());
328 
329  // Trying to allocate a new one should still fail.
330  status = block_mgr->GetNewBlock(client, NULL, &new_block);
331  EXPECT_TRUE(new_block == NULL);
332  EXPECT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size_);
333 
334  EXPECT_EQ(block_mgr->writes_issued(), 1);;
335  block_mgr.reset();
336  EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
337 }
338 
339 TEST_F(BufferedBlockMgrTest, GetNewBlockSmallBlocks) {
340  int max_num_blocks = 3;
341  shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_blocks);
342  BufferedBlockMgr::Client* client;
344  Status status = block_mgr->RegisterClient(0, &tracker, runtime_state_.get(), &client);
345  EXPECT_TRUE(status.ok());
346  EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
347 
348  vector<BufferedBlockMgr::Block*> blocks;
349 
350  // Allocate a small block.
351  BufferedBlockMgr::Block* new_block = NULL;
352  status = block_mgr->GetNewBlock(client, NULL, &new_block, 128);
353  EXPECT_TRUE(new_block != NULL);
354  EXPECT_TRUE(status.ok());
355  EXPECT_EQ(block_mgr->bytes_allocated(), 0);
356  EXPECT_EQ(block_mgr_parent_tracker_->consumption(), 0);
357  EXPECT_EQ(tracker.consumption(), 128);
358  EXPECT_TRUE(new_block->is_pinned());
359  EXPECT_EQ(new_block->BytesRemaining(), 128);
360  EXPECT_TRUE(new_block->buffer() != NULL);
361  blocks.push_back(new_block);
362 
363  // Allocate a normal block
364  status = block_mgr->GetNewBlock(client, NULL, &new_block);
365  EXPECT_TRUE(new_block != NULL);
366  EXPECT_TRUE(status.ok());
367  EXPECT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size());
368  EXPECT_EQ(block_mgr_parent_tracker_->consumption(), block_mgr->max_block_size());
369  EXPECT_EQ(tracker.consumption(), 128 + block_mgr->max_block_size());
370  EXPECT_TRUE(new_block->is_pinned());
371  EXPECT_EQ(new_block->BytesRemaining(), block_mgr->max_block_size());
372  EXPECT_TRUE(new_block->buffer() != NULL);
373  blocks.push_back(new_block);
374 
375  // Allocate another small block.
376  status = block_mgr->GetNewBlock(client, NULL, &new_block, 512);
377  EXPECT_TRUE(new_block != NULL);
378  EXPECT_TRUE(status.ok());
379  EXPECT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size());
380  EXPECT_EQ(block_mgr_parent_tracker_->consumption(), block_mgr->max_block_size());
381  EXPECT_EQ(tracker.consumption(), 128 + 512 + block_mgr->max_block_size());
382  EXPECT_TRUE(new_block->is_pinned());
383  EXPECT_EQ(new_block->BytesRemaining(), 512);
384  EXPECT_TRUE(new_block->buffer() != NULL);
385  blocks.push_back(new_block);
386 
387  // Should be able to unpin and pin the middle block
388  status = blocks[1]->Unpin();
389  EXPECT_TRUE(status.ok());
390 
391  bool pinned;
392  status = blocks[1]->Pin(&pinned);
393  EXPECT_TRUE(status.ok());
394  EXPECT_TRUE(pinned);
395 
396  for (int i = 0; i < blocks.size(); ++i) {
397  blocks[i]->Delete();
398  }
399  block_mgr.reset();
400  EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
401 }
402 
403 // Test that pinning more blocks than the max available buffers.
405  int max_num_blocks = 5;
406  shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_blocks);
407  BufferedBlockMgr::Client* client;
408  Status status = block_mgr->RegisterClient(0, NULL, runtime_state_.get(), &client);
409  EXPECT_TRUE(status.ok());
410  EXPECT_TRUE(client != NULL);
411 
412  vector<BufferedBlockMgr::Block*> blocks;
413  AllocateBlocks(block_mgr.get(), client, max_num_blocks, &blocks);
414 
415  // Unpin them all.
416  for (int i = 0; i < blocks.size(); ++i) {
417  status = blocks[i]->Unpin();
418  EXPECT_TRUE(status.ok());
419  }
420 
421  // Allocate more, this should work since we just unpinned some blocks.
422  AllocateBlocks(block_mgr.get(), client, max_num_blocks, &blocks);
423 
424  // Try to pin a unpinned block, this should not be possible.
425  bool pinned;
426  status = blocks[0]->Pin(&pinned);
427  EXPECT_TRUE(status.ok());
428  EXPECT_FALSE(pinned);
429 
430  // Unpin all blocks.
431  for (int i = 0; i < blocks.size(); ++i) {
432  status = blocks[i]->Unpin();
433  EXPECT_TRUE(status.ok());
434  }
435 
436  // Should be able to pin max_num_blocks blocks.
437  for (int i = 0; i < max_num_blocks; ++i) {
438  status = blocks[i]->Pin(&pinned);
439  EXPECT_TRUE(status.ok());
440  EXPECT_TRUE(pinned);
441  }
442 
443  // Can't pin any more though.
444  status = blocks[max_num_blocks]->Pin(&pinned);
445  EXPECT_TRUE(status.ok());
446  EXPECT_FALSE(pinned);
447 
448  block_mgr.reset();
449  EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
450 }
451 
452 // Test the eviction policy of the block mgr. No writes issued until more than
453 // the max available buffers are allocated. Writes must be issued in LIFO order.
455  int max_num_buffers = 5;
456  shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_buffers);
457  BufferedBlockMgr::Client* client;
458  Status status = block_mgr->RegisterClient(0, NULL, runtime_state_.get(), &client);
459  EXPECT_TRUE(status.ok());
460  EXPECT_TRUE(client != NULL);
461 
462  // Check counters.
463  RuntimeProfile* profile = block_mgr->profile();
464  RuntimeProfile::Counter* buffered_pin = profile->GetCounter("BufferedPins");
465 
466  vector<BufferedBlockMgr::Block*> blocks;
467  AllocateBlocks(block_mgr.get(), client, max_num_buffers, &blocks);
468 
469  EXPECT_EQ(block_mgr->bytes_allocated(), max_num_buffers * block_size_);
470  BOOST_FOREACH(BufferedBlockMgr::Block* block, blocks) {
471  block->Unpin();
472  }
473 
474  // Re-pinning all blocks
475  for (int i = 0; i < blocks.size(); ++i) {
476  bool pinned;
477  status = blocks[i]->Pin(&pinned);
478  EXPECT_TRUE(status.ok());
479  EXPECT_TRUE(pinned);
480  ValidateBlock(blocks[i], i);
481  }
482  int buffered_pins_expected = blocks.size();
483  EXPECT_EQ(buffered_pin->value(), buffered_pins_expected);
484 
485  // Unpin all blocks
486  BOOST_FOREACH(BufferedBlockMgr::Block* block, blocks) {
487  block->Unpin();
488  }
489  // Get two new blocks.
490  AllocateBlocks(block_mgr.get(), client, 2, &blocks);
491  // At least two writes must be issued. The first (num_blocks - 2) must be in memory.
492  EXPECT_GE(block_mgr->writes_issued(), 2);
493  for (int i = 0; i < (max_num_buffers - 2); ++i) {
494  bool pinned;
495  status = blocks[i]->Pin(&pinned);
496  EXPECT_TRUE(status.ok());
497  EXPECT_TRUE(pinned);
498  ValidateBlock(blocks[i], i);
499  }
500  EXPECT_GE(buffered_pin->value(), buffered_pins_expected);
501 
502  block_mgr.reset();
503  EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
504 }
505 
506 // Test deletion and reuse of blocks.
508  int max_num_buffers = 5;
509  shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_buffers);
510  BufferedBlockMgr::Client* client;
511  Status status = block_mgr->RegisterClient(0, NULL, runtime_state_.get(), &client);
512  EXPECT_TRUE(status.ok());
513  EXPECT_TRUE(client != NULL);
514 
515  // Check counters.
516  RuntimeProfile* profile = block_mgr->profile();
517  RuntimeProfile::Counter* recycled_cnt = profile->GetCounter("BlocksRecycled");
518  RuntimeProfile::Counter* created_cnt = profile->GetCounter("BlocksCreated");
519 
520  vector<BufferedBlockMgr::Block*> blocks;
521  AllocateBlocks(block_mgr.get(), client, max_num_buffers, &blocks);
522  EXPECT_TRUE(created_cnt->value() == max_num_buffers);
523 
524  BOOST_FOREACH(BufferedBlockMgr::Block* block, blocks) {
525  block->Delete();
526  }
527  AllocateBlocks(block_mgr.get(), client, max_num_buffers, &blocks);
528  EXPECT_TRUE(created_cnt->value() == max_num_buffers);
529  EXPECT_TRUE(recycled_cnt->value() == max_num_buffers);
530 
531  block_mgr.reset();
532  EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
533 }
534 
535 // Test that all APIs return cancelled after close.
537  int max_num_buffers = 5;
538  shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_buffers);
539  BufferedBlockMgr::Client* client;
540  Status status = block_mgr->RegisterClient(0, NULL, runtime_state_.get(), &client);
541  EXPECT_TRUE(status.ok());
542  EXPECT_TRUE(client != NULL);
543 
544  vector<BufferedBlockMgr::Block*> blocks;
545  AllocateBlocks(block_mgr.get(), client, max_num_buffers, &blocks);
546 
547  block_mgr->Cancel();
548 
549  BufferedBlockMgr::Block* new_block;
550  status = block_mgr->GetNewBlock(client, NULL, &new_block);
551  EXPECT_TRUE(new_block == NULL);
552  EXPECT_TRUE(status.IsCancelled());
553  status = blocks[0]->Unpin();
554  EXPECT_TRUE(status.IsCancelled());
555  bool pinned;
556  status = blocks[0]->Pin(&pinned);
557  EXPECT_TRUE(status.IsCancelled());
558  status = blocks[1]->Delete();
559  EXPECT_TRUE(status.IsCancelled());
560 
561  block_mgr.reset();
562  EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
563 }
564 
565 // Test that the block manager behaves correctly after a write error
566 // Delete the scratch directory before an operation that would cause a write
567 // and test that subsequent API calls return 'CANCELLED' correctly.
569  int max_num_buffers = 2;
570  const int write_wait_millis = 500;
571  shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_buffers);
572  BufferedBlockMgr::Client* client;
573  Status status = block_mgr->RegisterClient(0, NULL, runtime_state_.get(), &client);
574  EXPECT_TRUE(status.ok());
575  EXPECT_TRUE(client != NULL);
576 
577  RuntimeProfile* profile = block_mgr->profile();
578  RuntimeProfile::Counter* writes_outstanding =
579  profile->GetCounter("BlockWritesOutstanding");
580  vector<BufferedBlockMgr::Block*> blocks;
581  AllocateBlocks(block_mgr.get(), client, max_num_buffers, &blocks);
582  // Unpin a block, forcing a write.
583  status = blocks[0]->Unpin();
584  EXPECT_TRUE(status.ok());
585  // Wait for the write to go through.
586  SleepForMs(write_wait_millis);
587  EXPECT_TRUE(writes_outstanding->value() == 0);
588 
589  // Empty the scratch directory.
590  int num_files = 0;
591  directory_iterator dir_it(SCRATCH_DIR);
592  for (; dir_it != directory_iterator(); ++dir_it) {
593  ++num_files;
594  remove_all(dir_it->path());
595  }
596  EXPECT_TRUE(num_files > 0);
597  status = blocks[1]->Unpin();
598  EXPECT_TRUE(status.ok());
599  // Allocate one more block, forcing a write and causing an error.
600  AllocateBlocks(block_mgr.get(), client, 1, &blocks);
601  // Wait for the write to go through.
602  SleepForMs(write_wait_millis);
603  EXPECT_TRUE(writes_outstanding->value() == 0);
604 
605  // Subsequent calls should fail.
606  status = blocks[2]->Delete();
607  EXPECT_TRUE(status.IsCancelled());
608  BufferedBlockMgr::Block* new_block;
609  status = block_mgr->GetNewBlock(client, NULL, &new_block);
610  EXPECT_TRUE(new_block == NULL);
611  EXPECT_TRUE(status.IsCancelled());
612  block_mgr.reset();
613  EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
614 }
615 
616 // Create two clients with different number of reserved buffers.
617 TEST_F(BufferedBlockMgrTest, MultipleClients) {
618  int client1_buffers = 3;
619  int client2_buffers = 5;
620  int max_num_buffers = client1_buffers + client2_buffers;
621 
622  shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_buffers);
623  BufferedBlockMgr::Client* client1;
624  BufferedBlockMgr::Client* client2;
625  Status status;
626  bool reserved = false;
627 
628  status = block_mgr->RegisterClient(client1_buffers, NULL, runtime_state_.get(),
629  &client1);
630  EXPECT_TRUE(status.ok());
631  EXPECT_TRUE(client1 != NULL);
632  status = block_mgr->RegisterClient(client2_buffers, NULL, runtime_state_.get(),
633  &client2);
634  EXPECT_TRUE(status.ok());
635  EXPECT_TRUE(client2 != NULL);
636 
637  // Reserve client 1's and 2's buffers. They should succeed.
638  reserved = block_mgr->TryAcquireTmpReservation(client1, 1);
639  EXPECT_TRUE(reserved);
640  reserved = block_mgr->TryAcquireTmpReservation(client2, 1);
641  EXPECT_TRUE(reserved);
642 
643  vector<BufferedBlockMgr::Block*> client1_blocks;
644  // Allocate all of client1's reserved blocks, they should all succeed.
645  AllocateBlocks(block_mgr.get(), client1, client1_buffers, &client1_blocks);
646 
647  // Try allocating one more, that should fail.
649  status = block_mgr->GetNewBlock(client1, NULL, &block);
650  EXPECT_TRUE(status.ok());
651  EXPECT_TRUE(block == NULL);
652 
653  // Trying to reserve should also fail.
654  reserved = block_mgr->TryAcquireTmpReservation(client1, 1);
655  EXPECT_FALSE(reserved);
656 
657  // Allocate all of client2's reserved blocks, these should succeed.
658  vector<BufferedBlockMgr::Block*> client2_blocks;
659  AllocateBlocks(block_mgr.get(), client2, client2_buffers, &client2_blocks);
660 
661  // Try allocating one more from client 2, that should fail.
662  status = block_mgr->GetNewBlock(client2, NULL, &block);
663  EXPECT_TRUE(status.ok());
664  EXPECT_TRUE(block == NULL);
665 
666  // Unpin one block from client 1.
667  status = client1_blocks[0]->Unpin();
668  EXPECT_TRUE(status.ok());
669 
670  // Client 2 should still not be able to allocate.
671  status = block_mgr->GetNewBlock(client2, NULL, &block);
672  EXPECT_TRUE(status.ok());
673  EXPECT_TRUE(block == NULL);
674 
675  // Client 2 should still not be able to reserve.
676  reserved = block_mgr->TryAcquireTmpReservation(client2, 1);
677  EXPECT_FALSE(reserved);
678 
679  // Client 1 should be able to though.
680  status = block_mgr->GetNewBlock(client1, NULL, &block);
681  EXPECT_TRUE(status.ok());
682  EXPECT_TRUE(block != NULL);
683 
684  // Unpin two of client 1's blocks (client 1 should have 3 unpinned blocks now).
685  status = client1_blocks[1]->Unpin();
686  EXPECT_TRUE(status.ok());
687  status = client1_blocks[2]->Unpin();
688  EXPECT_TRUE(status.ok());
689 
690  // Clear client 1's reservation
691  block_mgr->ClearReservations(client1);
692 
693  // Client 2 should be able to reserve 1 buffers now (there are 2 left);
694  reserved = block_mgr->TryAcquireTmpReservation(client2, 1);
695  EXPECT_TRUE(reserved);
696 
697  // Client one can only pin 1.
698  bool pinned;
699  status = client1_blocks[0]->Pin(&pinned);
700  EXPECT_TRUE(status.ok());
701  EXPECT_TRUE(pinned);
702  // Can't get this one.
703  status = client1_blocks[1]->Pin(&pinned);
704  EXPECT_TRUE(status.ok());
705  EXPECT_FALSE(pinned);
706 
707  // Client 2 can pick up the one reserved buffer
708  status = block_mgr->GetNewBlock(client2, NULL, &block);
709  EXPECT_TRUE(status.ok());
710  EXPECT_TRUE(block != NULL);
711  // But not a second
712  BufferedBlockMgr::Block* block2;
713  status = block_mgr->GetNewBlock(client2, NULL, &block2);
714  EXPECT_TRUE(status.ok());
715  EXPECT_TRUE(block2 == NULL);
716 
717  // Unpin client 2's block it got from the reservation. Sine this is a tmp
718  // reservation, client 1 can pick it up again (it is not longer reserved).
719  status = block->Unpin();
720  EXPECT_TRUE(status.ok());
721  status = client1_blocks[1]->Pin(&pinned);
722  EXPECT_TRUE(status.ok());
723  EXPECT_TRUE(pinned);
724 
725  block_mgr.reset();
726  EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
727 }
728 
729 // Create two clients with different number of reserved buffers and some additional.
730 TEST_F(BufferedBlockMgrTest, MultipleClientsExtraBuffers) {
731  int client1_buffers = 1;
732  int client2_buffers = 1;
733  int max_num_buffers = client1_buffers + client2_buffers + 2;
734 
735  shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_buffers);
736  BufferedBlockMgr::Client* client1;
737  BufferedBlockMgr::Client* client2;
738  Status status;
740 
741  status = block_mgr->RegisterClient(client1_buffers, NULL, runtime_state_.get(),
742  &client1);
743  EXPECT_TRUE(status.ok());
744  EXPECT_TRUE(client1 != NULL);
745  status = block_mgr->RegisterClient(client2_buffers, NULL, runtime_state_.get(),
746  &client2);
747  EXPECT_TRUE(status.ok());
748  EXPECT_TRUE(client2 != NULL);
749 
750  vector<BufferedBlockMgr::Block*> client1_blocks;
751  // Allocate all of client1's reserved blocks, they should all succeed.
752  AllocateBlocks(block_mgr.get(), client1, client1_buffers, &client1_blocks);
753 
754  // Allocate all of client2's reserved blocks, these should succeed.
755  vector<BufferedBlockMgr::Block*> client2_blocks;
756  AllocateBlocks(block_mgr.get(), client2, client2_buffers, &client2_blocks);
757 
758  // We have two spare buffers now. Each client should be able to allocate it.
759  status = block_mgr->GetNewBlock(client1, NULL, &block);
760  EXPECT_TRUE(status.ok());
761  EXPECT_TRUE(block != NULL);
762  status = block_mgr->GetNewBlock(client2, NULL, &block);
763  EXPECT_TRUE(status.ok());
764  EXPECT_TRUE(block != NULL);
765 
766  // Now we are completely full, no one should be able to allocate a new block.
767  status = block_mgr->GetNewBlock(client1, NULL, &block);
768  EXPECT_TRUE(status.ok());
769  EXPECT_TRUE(block == NULL);
770  status = block_mgr->GetNewBlock(client2, NULL, &block);
771  EXPECT_TRUE(status.ok());
772  EXPECT_TRUE(block == NULL);
773 
774  block_mgr.reset();
775  EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
776 }
777 
778 // Create two clients causing oversubscription.
779 TEST_F(BufferedBlockMgrTest, ClientOversubscription) {
780  int client1_buffers = 1;
781  int client2_buffers = 2;
782  int max_num_buffers = 2;
783 
784  shared_ptr<BufferedBlockMgr> block_mgr = CreateMgr(max_num_buffers);
785  BufferedBlockMgr::Client* client1;
786  BufferedBlockMgr::Client* client2;
787  Status status;
789 
790  status = block_mgr->RegisterClient(client1_buffers, NULL, runtime_state_.get(),
791  &client1);
792  EXPECT_TRUE(status.ok());
793  EXPECT_TRUE(client1 != NULL);
794  status = block_mgr->RegisterClient(client2_buffers, NULL, runtime_state_.get(),
795  &client2);
796  EXPECT_TRUE(status.ok());
797  EXPECT_TRUE(client2 != NULL);
798 
799  // Client one allocates first block, should work.
800  status = block_mgr->GetNewBlock(client1, NULL, &block);
801  EXPECT_TRUE(status.ok());
802  EXPECT_TRUE(block != NULL);
803 
804  // Client two allocates first block, should work.
805  status = block_mgr->GetNewBlock(client2, NULL, &block);
806  EXPECT_TRUE(status.ok());
807  EXPECT_TRUE(block != NULL);
808 
809  // At this point we've used both buffers. Client one reserved one so subsequent
810  // calls should fail with no error (but returns no block).
811  status = block_mgr->GetNewBlock(client1, NULL, &block);
812  EXPECT_TRUE(status.ok());
813  EXPECT_TRUE(block == NULL);
814 
815  // Allocate with client two. Since client two reserved 2 buffers, this should fail
816  // with MEM_LIMIT_EXCEEDED.
817  status = block_mgr->GetNewBlock(client2, NULL, &block);
818  EXPECT_TRUE(status.IsMemLimitExceeded());
819 
820  block_mgr.reset();
821  EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0);
822 }
823 
824 TEST_F(BufferedBlockMgrTest, SingleRandom_plain) {
825  FLAGS_disk_spill_encryption = false;
826  TestRandomInternalSingle();
827 }
828 
829 TEST_F(BufferedBlockMgrTest, Multi2Random_plain) {
830  FLAGS_disk_spill_encryption = false;
831  TestRandomInternalMulti(2);
832 }
833 
834 TEST_F(BufferedBlockMgrTest, Multi4Random_plain) {
835  FLAGS_disk_spill_encryption = false;
836  TestRandomInternalMulti(4);
837 }
838 
839 // TODO: Enable when we improve concurrency of block mgr.
840 // TEST_F(BufferedBlockMgrTest, Multi8Random_plain) {
841 // FLAGS_disk_spill_encryption = false;
842 // TestRandomInternalMulti(8);
843 // }
844 
845 TEST_F(BufferedBlockMgrTest, SingleRandom_encryption) {
846  FLAGS_disk_spill_encryption = true;
847  TestRandomInternalSingle();
848 }
849 
850 TEST_F(BufferedBlockMgrTest, Multi2Random_encryption) {
851  FLAGS_disk_spill_encryption = true;
852  TestRandomInternalMulti(2);
853 }
854 
855 TEST_F(BufferedBlockMgrTest, Multi4Random_encryption) {
856  FLAGS_disk_spill_encryption = true;
857  TestRandomInternalMulti(4);
858 }
859 
860 // TODO: Enable when we improve concurrency of block mgr.
861 // TEST_F(BufferedBlockMgrTest, Multi8Random_encryption) {
862 // FLAGS_disk_spill_encryption = true;
863 // TestRandomInternalMulti(8);
864 // }
865 
866 }
867 
868 int main(int argc, char** argv) {
869  ::testing::InitGoogleTest(&argc, argv);
874  return RUN_ALL_TESTS();
875 }
virtual int64_t value() const
shared_ptr< BufferedBlockMgr > CreateMgr(int max_buffers)
int64_t consumption() const
Returns the memory consumed in bytes.
Definition: mem-tracker.h:298
void InitFeSupport()
Definition: fe-support.cc:346
TEST_F(InstructionCounterTest, Count)
MemTracker tracker
void Set(const T &val)
Definition: promise.h:38
void TestRandomInternalImpl(BufferedBlockMgr *block_mgr, int num_buffers, int tid)
T * Allocate(int size)
Allocates the specified number of bytes from this block.
void InitCommonRuntime(int argc, char **argv, bool init_jvm, TestInfo::Mode m=TestInfo::NON_TEST)
Definition: init.cc:122
static void GetFreeBlock(BufferedBlockMgr *block_mgr, BufferedBlockMgr::Client *client, BufferedBlockMgr::Block **new_block, Promise< bool > *promise)
Status GetNewBlock(Client *client, Block *unpin_block, Block **block, int64_t len=-1)
Status RegisterClient(int num_reserved_buffers, MemTracker *tracker, RuntimeState *state, Client **client)
int main(int argc, char **argv)
static Status Init()
Definition: tmp-file-mgr.cc:47
const string SCRATCH_DIR
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
Definition: time.cc:21
int BytesRemaining() const
Return the number of remaining bytes that can be allocated in this block.
int64_t valid_data_len() const
Return the number of bytes allocated in this block.
bool IsCancelled() const
Definition: status.h:174
static Status Create(RuntimeState *state, MemTracker *parent, RuntimeProfile *profile, int64_t mem_limit, int64_t buffer_size, boost::shared_ptr< BufferedBlockMgr > *block_mgr)
static void ValidateRandomSizeData(BufferedBlockMgr::Block *block, int32_t size)
This class is thread-safe.
Definition: mem-tracker.h:61
uint64_t Test(T *ht, const ProbeTuple *input, uint64_t num_tuples)
static void InitializeLlvm(bool load_backend=false)
Definition: llvm-codegen.cc:78
static int32_t * MakeRandomSizeData(BufferedBlockMgr::Block *block)
Counter * GetCounter(const std::string &name)
scoped_ptr< MemTracker > io_mgr_tracker_
static void ValidateBlock(BufferedBlockMgr::Block *block, int32_t data)
bool ok() const
Definition: status.h:172
scoped_ptr< MemTracker > block_mgr_parent_tracker_
void AllocateBlocks(BufferedBlockMgr *block_mgr, BufferedBlockMgr::Client *client, int num_blocks, vector< BufferedBlockMgr::Block * > *blocks)
DECLARE_bool(disk_spill_encryption)
scoped_ptr< RuntimeState > runtime_state_
bool IsMemLimitExceeded() const
Definition: status.h:178
void TestRandomInternalMulti(int num_threads)