Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
blocking-queue-test.cc
Go to the documentation of this file.
1 // Copyright 2013 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #include <boost/thread.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <glog/logging.h>
19 #include <gtest/gtest.h>
20 #include <unistd.h>
21 
22 #include "util/blocking-queue.h"
23 
24 #include "common/names.h"
25 
26 namespace impala {
27 
28 TEST(BlockingQueueTest, TestBasic) {
29  int32_t i;
30  BlockingQueue<int32_t> test_queue(5);
31  ASSERT_TRUE(test_queue.BlockingPut(1));
32  ASSERT_TRUE(test_queue.BlockingPut(2));
33  ASSERT_TRUE(test_queue.BlockingPut(3));
34  ASSERT_TRUE(test_queue.BlockingGet(&i));
35  ASSERT_EQ(1, i);
36  ASSERT_TRUE(test_queue.BlockingGet(&i));
37  ASSERT_EQ(2, i);
38  ASSERT_TRUE(test_queue.BlockingGet(&i));
39  ASSERT_EQ(3, i);
40 }
41 
42 TEST(BlockingQueueTest, TestGetFromShutdownQueue) {
43  int64_t i;
44  BlockingQueue<int64_t> test_queue(2);
45  ASSERT_TRUE(test_queue.BlockingPut(123));
46  test_queue.Shutdown();
47  ASSERT_FALSE(test_queue.BlockingPut(456));
48  ASSERT_TRUE(test_queue.BlockingGet(&i));
49  ASSERT_EQ(123, i);
50  ASSERT_FALSE(test_queue.BlockingGet(&i));
51 }
52 
54  public:
56  : iterations_(10000),
57  nthreads_(5),
60  }
61 
62  void InserterThread(int arg) {
63  for (int i = 0; i < iterations_; ++i) {
64  queue_.BlockingPut(arg);
65  }
66 
67  {
68  lock_guard<mutex> guard(lock_);
69  if (--num_inserters_ == 0) {
70  queue_.Shutdown();
71  }
72  }
73  }
74 
75  void RemoverThread() {
76  for (int i = 0; i < iterations_; ++i) {
77  int32_t arg;
78  bool got = queue_.BlockingGet(&arg);
79  if (!got) arg = -1;
80 
81  {
82  lock_guard<mutex> guard(lock_);
83  gotten_[arg] = gotten_[arg] + 1;
84  }
85  }
86  }
87 
88  void Run() {
89  for (int i = 0; i < nthreads_; ++i) {
90  threads_.push_back(shared_ptr<thread>(
91  new thread(bind(&MultiThreadTest::InserterThread, this, i))));
92  threads_.push_back(shared_ptr<thread>(
93  new thread(bind(&MultiThreadTest::RemoverThread, this))));
94  }
95  // We add an extra thread to ensure that there aren't enough elements in
96  // the queue to go around. This way, we test removal after Shutdown.
97  threads_.push_back(shared_ptr<thread>(
98  new thread(bind(
100  for (int i = 0; i < threads_.size(); ++i) {
101  threads_[i]->join();
102  }
103 
104  // Let's check to make sure we got what we should have.
105  lock_guard<mutex> guard(lock_);
106  for (int i = 0; i < nthreads_; ++i) {
107  ASSERT_EQ(iterations_, gotten_[i]);
108  }
109  // And there were nthreads_ * (iterations_ + 1) elements removed, but only
110  // nthreads_ * iterations_ elements added. So some removers hit the shutdown
111  // case.
112  ASSERT_EQ(iterations_, gotten_[-1]);
113  }
114 
115  private:
116  typedef vector<shared_ptr<thread> > ThreadVector;
117 
121  // Lock for gotten_ and num_inserters_.
122  mutex lock_;
123  // Map from inserter thread id to number of consumed elements from that id.
124  // Ultimately, this should map each thread id to insertions_ elements.
125  // Additionally, if the BlockingGet returns false, this increments id=-1.
126  map<int32_t, int> gotten_;
127  // All inserter and remover threads.
129  // Number of inserters which haven't yet finished inserting.
131 };
132 
133 TEST(BlockingQueueTest, TestMultipleThreads) {
134  MultiThreadTest test;
135  test.Run();
136 }
137 
138 }
139 
140 int main(int argc, char **argv) {
141  ::testing::InitGoogleTest(&argc, argv);
142  return RUN_ALL_TESTS();
143 }
int main(int argc, char **argv)
TEST(AtomicTest, Basic)
Definition: atomic-test.cc:28
bool BlockingPut(const T &val)
vector< shared_ptr< thread > > ThreadVector
BlockingQueue< int32_t > queue_
void Shutdown()
Shut down the queue. Wakes up all threads waiting on BlockingGet or BlockingPut.
bool BlockingGet(T *out)
map< int32_t, int > gotten_