Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
internal-queue-test.cc
Go to the documentation of this file.
1 // Copyright 2013 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #include <boost/thread.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <glog/logging.h>
19 #include <gtest/gtest.h>
20 #include <unistd.h>
21 
22 #include "util/internal-queue.h"
23 
24 #include "common/names.h"
25 
26 namespace impala {
27 
28 struct IntNode : public InternalQueue<IntNode>::Node {
29  IntNode(int value = 0) : value(value) {}
30  int value;
31 };
32 
33 // Basic single threaded operation.
34 TEST(InternalQueue, TestBasic) {
35  IntNode one(1);
36  IntNode two(2);
37  IntNode three(3);
38  IntNode four(4);
39 
41  ASSERT_TRUE(list.empty());
42  ASSERT_EQ(list.size(), 0);
43  ASSERT_TRUE(list.Dequeue() == NULL);
44  ASSERT_TRUE(list.Validate());
45 
46  list.Enqueue(&one);
47  ASSERT_TRUE(!list.empty());
48  ASSERT_EQ(list.size(), 1);
49  IntNode* i = list.Dequeue();
50  ASSERT_TRUE(i != NULL);
51  ASSERT_TRUE(list.empty());
52  ASSERT_EQ(list.size(), 0);
53  ASSERT_EQ(i->value, 1);
54  ASSERT_TRUE(list.Validate());
55 
56  list.Enqueue(&one);
57  list.Enqueue(&two);
58  list.Enqueue(&three);
59  list.Enqueue(&four);
60  ASSERT_EQ(list.size(), 4);
61  ASSERT_TRUE(list.Validate());
62 
63  i = list.Dequeue();
64  ASSERT_TRUE(i != NULL);
65  ASSERT_EQ(i->value, 1);
66  ASSERT_TRUE(list.Validate());
67 
68  i = list.Dequeue();
69  ASSERT_TRUE(i != NULL);
70  ASSERT_EQ(i->value, 2);
71  ASSERT_TRUE(list.Validate());
72 
73  i = list.Dequeue();
74  ASSERT_TRUE(i != NULL);
75  ASSERT_EQ(i->value, 3);
76  ASSERT_TRUE(list.Validate());
77 
78  i = list.Dequeue();
79  ASSERT_TRUE(i != NULL);
80  ASSERT_EQ(i->value, 4);
81  ASSERT_TRUE(list.Validate());
82 
83  list.Enqueue(&one);
84  list.Enqueue(&two);
85  list.Enqueue(&three);
86  list.Enqueue(&four);
87 
88  IntNode* node = list.head();
89  int val = 1;
90  while (node != NULL) {
91  ASSERT_EQ(node->value, val);
92  node = node->Next();
93  ++val;
94  }
95 
96  node = list.tail();
97  val = 4;
98  while (node != NULL) {
99  ASSERT_EQ(node->value, val);
100  node = node->Prev();
101  --val;
102  }
103 
104  for (int i = 0; i < 4; ++i) {
105  node = list.PopBack();
106  ASSERT_TRUE(node != NULL);
107  ASSERT_EQ(node->value, 4 - i);
108  ASSERT_TRUE(list.Validate());
109  }
110  ASSERT_TRUE(list.PopBack() == NULL);
111  ASSERT_EQ(list.size(), 0);
112  ASSERT_TRUE(list.empty());
113 }
114 
115 // Add all the nodes and then remove every other one.
116 TEST(InternalQueue, TestRemove) {
117  vector<IntNode> nodes;
118  nodes.resize(100);
119 
121 
122  queue.Enqueue(&nodes[0]);
123  queue.Remove(&nodes[1]);
124  ASSERT_TRUE(queue.Validate());
125  queue.Remove(&nodes[0]);
126  ASSERT_TRUE(queue.Validate());
127  queue.Remove(&nodes[0]);
128  ASSERT_TRUE(queue.Validate());
129 
130  for (int i = 0; i < nodes.size(); ++i) {
131  nodes[i].value = i;
132  queue.Enqueue(&nodes[i]);
133  }
134 
135  for (int i = 0; i < nodes.size(); i += 2) {
136  queue.Remove(&nodes[i]);
137  ASSERT_TRUE(queue.Validate());
138  }
139 
140  ASSERT_EQ(queue.size(), nodes.size() / 2);
141  for (int i = 0; i < nodes.size() / 2; ++i) {
142  IntNode* node = queue.Dequeue();
143  ASSERT_TRUE(node != NULL);
144  ASSERT_EQ(node->value, i * 2 + 1);
145  }
146 }
147 
148 const int VALIDATE_INTERVAL = 10000;
149 
150 // CHECK() is not thread safe so return the result in *failed.
151 void ProducerThread(InternalQueue<IntNode>* queue, int num_inserts,
152  vector<IntNode>* nodes, AtomicInt<int32_t>* counter, bool* failed) {
153  for (int i = 0; i < num_inserts && !*failed; ++i) {
154  // Get the next index to queue.
155  AtomicInt<int32_t> value = (*counter)++;
156  nodes->at(value).value = value;
157  queue->Enqueue(&nodes->at(value));
158  if (i % VALIDATE_INTERVAL == 0) {
159  if (!queue->Validate()) *failed = true;
160  }
161  }
162 }
163 
164 void ConsumerThread(InternalQueue<IntNode>* queue, int num_consumes, int delta,
165  vector<int>* results, bool* failed) {
166  // Dequeued nodes should be strictly increasing.
167  int previous_value = -1;
168  for (int i = 0; i < num_consumes && !*failed;) {
169  IntNode* node = queue->Dequeue();
170  if (node == NULL) continue;
171  ++i;
172  if (delta > 0) {
173  if (node->value != previous_value + delta) *failed = true;
174  } else if (delta == 0) {
175  if (node->value <= previous_value) *failed = true;
176  }
177  results->push_back(node->value);
178  previous_value = node->value;
179  if (i % VALIDATE_INTERVAL == 0) {
180  if (!queue->Validate()) *failed = true;
181  }
182  }
183 }
184 
185 TEST(InternalQueue, TestClear) {
186  vector<IntNode> nodes;
187  nodes.resize(100);
189  queue.Enqueue(&nodes[0]);
190  queue.Enqueue(&nodes[1]);
191  queue.Enqueue(&nodes[2]);
192 
193  queue.Clear();
194  ASSERT_TRUE(queue.Validate());
195  ASSERT_TRUE(queue.empty());
196 
197  queue.Enqueue(&nodes[0]);
198  queue.Enqueue(&nodes[1]);
199  queue.Enqueue(&nodes[2]);
200  ASSERT_TRUE(queue.Validate());
201  ASSERT_EQ(queue.size(), 3);
202 }
203 
204 TEST(InternalQueue, TestSingleProducerSingleConsumer) {
205  vector<IntNode> nodes;
206  AtomicInt<int32_t> counter;
207  nodes.resize(1000000);
208  vector<int> results;
209 
211  bool failed = false;
212  ProducerThread(&queue, nodes.size(), &nodes, &counter, &failed);
213  ConsumerThread(&queue, nodes.size(), 1, &results, &failed);
214  ASSERT_TRUE(!failed);
215  ASSERT_TRUE(queue.empty());
216  ASSERT_EQ(results.size(), nodes.size());
217 
218  counter = 0;
219  results.clear();
220  thread producer_thread(ProducerThread, &queue, nodes.size(), &nodes, &counter, &failed);
221  thread consumer_thread(ConsumerThread, &queue, nodes.size(), 1, &results, &failed);
222  producer_thread.join();
223  consumer_thread.join();
224  ASSERT_TRUE(!failed);
225  ASSERT_TRUE(queue.empty());
226  ASSERT_EQ(results.size(), nodes.size());
227 }
228 
229 TEST(InternalQueue, TestMultiProducerMultiConsumer) {
230  vector<IntNode> nodes;
231  nodes.resize(1000000);
232 
233  bool failed = false;
234  for (int num_producers = 1; num_producers < 5; num_producers += 3) {
235  AtomicInt<int32_t> counter;
236  const int NUM_CONSUMERS = 4;
237  ASSERT_EQ(nodes.size() % NUM_CONSUMERS, 0);
238  ASSERT_EQ(nodes.size() % num_producers, 0);
239  const int num_per_consumer = nodes.size() / NUM_CONSUMERS;
240  const int num_per_producer = nodes.size() / num_producers;
241 
242  vector<vector<int> > results;
243  results.resize(NUM_CONSUMERS);
244 
245  int expected_delta = -1;
246  if (NUM_CONSUMERS == 1 && num_producers == 1) {
247  // With one producer and consumer, the queue should have sequential values.
248  expected_delta = 1;
249  } else if (num_producers == 1) {
250  // With one producer, the values added are sequential but can be read off
251  // with gaps in each consumer thread. E.g. thread1 reads: 1, 4, 5, 7, etc.
252  // but they should be strictly increasing.
253  expected_delta = 0;
254  } else {
255  // With multiple producers there isn't a guarantee on the order values get
256  // enqueued.
257  expected_delta = -1;
258  }
259 
261  thread_group consumers;
262  thread_group producers;
263 
264  for (int i = 0; i < num_producers; ++i) {
265  producers.add_thread(
266  new thread(ProducerThread, &queue, num_per_producer, &nodes, &counter, &failed));
267  }
268 
269  for (int i = 0; i < NUM_CONSUMERS; ++i) {
270  consumers.add_thread(new thread(ConsumerThread,
271  &queue, num_per_consumer, expected_delta, &results[i], &failed));
272  }
273 
274  producers.join_all();
275  consumers.join_all();
276  ASSERT_TRUE(queue.empty());
277  ASSERT_TRUE(!failed);
278 
279  vector<int> all_results;
280  for (int i = 0; i < NUM_CONSUMERS; ++i) {
281  ASSERT_EQ(results[i].size(), num_per_consumer);
282  all_results.insert(all_results.end(), results[i].begin(), results[i].end());
283  }
284  ASSERT_EQ(all_results.size(), nodes.size());
285  sort(all_results.begin(), all_results.end());
286  for (int i = 0; i < all_results.size(); ++i) {
287  ASSERT_EQ(i, all_results[i]) << all_results[i -1] << " " << all_results[i + 1];
288  }
289  }
290 }
291 
292 }
293 
294 int main(int argc, char **argv) {
295 #ifdef ADDRESS_SANITIZER
296  // These tests are disabled for address sanitizer builds.
297  // TODO: investigate why the multithreaded ones fail in boost:thread_local_data.
298  cerr << "Internal Queue Test Skipped" << endl;
299  return 0;
300 #endif
301  google::InitGoogleLogging(argv[0]);
302  ::testing::InitGoogleTest(&argc, argv);
303  return RUN_ALL_TESTS();
304 }
void ProducerThread(InternalQueue< IntNode > *queue, int num_inserts, vector< IntNode > *nodes, AtomicInt< int32_t > *counter, bool *failed)
TEST(AtomicTest, Basic)
Definition: atomic-test.cc:28
const int VALIDATE_INTERVAL
void Enqueue(T *n)
Enqueue node onto the queue's tail. This is O(1).
void Clear()
Clears all elements in the list.
T must be a subclass of InternalQueue::Node.
T * Next() const
Returns the Next/Prev node or NULL if this is the end/front.
bool Validate()
Validates the internal structure of the list.
void ConsumerThread(InternalQueue< IntNode > *queue, int num_consumes, int delta, vector< int > *results, bool *failed)
int main(int argc, char **argv)