Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
atomic-test.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <string>
16 #include <gtest/gtest.h>
17 #include <boost/thread.hpp>
18 
19 #include "common/atomic.h"
20 
21 #include "common/names.h"
22 
23 
24 namespace impala {
25 
26 // Simple test to make sure there is no obvious error in the usage of the
27 // __sync* operations. This is not intended to test the thread safety.
28 TEST(AtomicTest, Basic) {
29  AtomicInt<int> i1;
30  EXPECT_EQ(i1, 0);
31  i1 = 10;
32  EXPECT_EQ(i1, 10);
33  i1 += 5;
34  EXPECT_EQ(i1, 15);
35  i1 -= 25;
36  EXPECT_EQ(i1, -10);
37  ++i1;
38  EXPECT_EQ(i1, -9);
39  --i1;
40  EXPECT_EQ(i1, -10);
41  i1 = 100;
42  EXPECT_EQ(i1, 100);
43 
44  i1.UpdateMax(50);
45  EXPECT_EQ(i1, 100);
46  i1.UpdateMax(150);
47  EXPECT_EQ(i1, 150);
48 
49  i1.UpdateMin(200);
50  EXPECT_EQ(i1, 150);
51  i1.UpdateMin(-200);
52  EXPECT_EQ(i1, -200);
53 
54  bool success = i1.CompareAndSwap(-200, 50);
55  EXPECT_EQ(i1, 50);
56  EXPECT_EQ(success, true);
57  success = i1.CompareAndSwap(50, 100);
58  EXPECT_EQ(i1, 100);
59  EXPECT_EQ(success, true);
60 
61  success = i1.CompareAndSwap(-200, 50);
62  EXPECT_EQ(i1, 100);
63  EXPECT_EQ(success, false);
64  success = i1.CompareAndSwap(50, 200);
65  EXPECT_EQ(i1, 100);
66  EXPECT_EQ(success, false);
67 
68  int retval = i1.CompareAndSwapVal(100, 200);
69  EXPECT_EQ(i1, 200);
70  EXPECT_EQ(retval, 100);
71  retval = i1.CompareAndSwapVal(200, 250);
72  EXPECT_EQ(i1, 250);
73  EXPECT_EQ(retval, 200);
74 
75  retval = i1.CompareAndSwapVal(100, 200);
76  EXPECT_EQ(i1, 250);
77  EXPECT_EQ(retval, 250);
78  retval = i1.CompareAndSwapVal(-200, 50);
79  EXPECT_EQ(i1, 250);
80  EXPECT_EQ(retval, 250);
81 
82  retval = i1.Swap(300);
83  EXPECT_EQ(i1, 300);
84  EXPECT_EQ(retval, 250);
85  retval = i1.Swap(350);
86  EXPECT_EQ(i1, 350);
87  EXPECT_EQ(retval, 300);
88 }
89 
90 TEST(AtomicTest, TestAndSet) {
91  AtomicInt<int> i1;
92  for (int i = 0; i < 100; ++i) {
93  EXPECT_EQ(i + 1, i1.UpdateAndFetch(1));
94  }
95 
96  i1 = 0;
97 
98  for (int i = 0; i < 100; ++i) {
99  EXPECT_EQ(i, i1.FetchAndUpdate(1));
100  }
101 }
102 
103 // Basic multi-threaded testing
104 typedef function<void (int64_t, int64_t , AtomicInt<int>*)> Fn;
105 
106 void IncrementThread(int64_t id, int64_t n, AtomicInt<int>* ai) {
107  for (int64_t i = 0; i < n * id; ++i) {
108  ++*ai;
109  }
110 }
111 
112 void DecrementThread(int64_t id, int64_t n, AtomicInt<int>* ai) {
113  for (int64_t i = 0; i < n * id; ++i) {
114  --*ai;
115  }
116 }
117 
118 TEST(AtomicTest, MultipleThreadsIncDec) {
119  thread_group increments, decrements;
120  vector<int> ops;
121  ops.push_back(1000);
122  ops.push_back(10000);
123  vector<int> num_threads;
124  num_threads.push_back(4);
125  num_threads.push_back(8);
126  num_threads.push_back(16);
127 
128  for (vector<int>::iterator thrit = num_threads.begin(); thrit != num_threads.end();
129  ++thrit) {
130  for (vector<int>::iterator opit = ops.begin(); opit != ops.end(); ++opit) {
131  AtomicInt<int> ai = 0;
132  for (int i = 0; i < *thrit; ++i) {
133  increments.add_thread( new thread(IncrementThread, i, *opit, &ai));
134  decrements.add_thread( new thread(DecrementThread, i, *opit, &ai));
135  }
136  increments.join_all();
137  decrements.join_all();
138  EXPECT_EQ(ai, 0);
139  }
140  }
141 }
142 
143 void CASIncrementThread(int64_t id, int64_t n, AtomicInt<int>* ai) {
144  int oldval = 0;
145  int newval = 0;
146  bool success = false;
147  for (int64_t i = 0; i < n * id; ++i) {
148  success = false;
149  while ( !success ) {
150  oldval = ai->Read();
151  newval = oldval + 1;
152  success = ai->CompareAndSwap(oldval, newval);
153  }
154  }
155 }
156 
157 void CASDecrementThread(int64_t id, int64_t n, AtomicInt<int>* ai) {
158  int oldval = 0;
159  int newval = 0;
160  bool success = false;
161  for (int64_t i = 0; i < n * id; ++i) {
162  success = false;
163  while ( !success ) {
164  oldval = ai->Read();
165  newval = oldval - 1;
166  success = ai->CompareAndSwap(oldval, newval);
167  }
168  }
169 }
170 
171 TEST(AtomicTest, MultipleThreadsCASIncDec) {
172  thread_group increments, decrements;
173  vector<int> ops;
174  ops.push_back(10);
175  ops.push_back(10000);
176  vector<int> num_threads;
177  num_threads.push_back(4);
178  num_threads.push_back(8);
179  num_threads.push_back(16);
180 
181  for (vector<int>::iterator thrit = num_threads.begin(); thrit != num_threads.end();
182  ++thrit) {
183  for (vector<int>::iterator opit = ops.begin(); opit != ops.end(); ++opit) {
184  AtomicInt<int> ai = 0;
185  for (int i = 0; i < *thrit; ++i) {
186  increments.add_thread( new thread(CASIncrementThread, i, *opit, &ai));
187  decrements.add_thread( new thread(CASDecrementThread, i, *opit, &ai));
188  }
189  increments.join_all();
190  decrements.join_all();
191  EXPECT_EQ(ai, 0);
192  }
193  }
194 }
195 
196 }
197 
198 int main(int argc, char **argv) {
199  ::testing::InitGoogleTest(&argc, argv);
200  return RUN_ALL_TESTS();
201 }
T UpdateAndFetch(T delta)
Increments by delta (i.e. += delta) and returns the new val.
Definition: atomic.h:105
void CASIncrementThread(int64_t id, int64_t n, AtomicInt< int > *ai)
Definition: atomic-test.cc:143
void UpdateMin(T value)
Definition: atomic.h:122
T Swap(const T &new_val)
Atomically updates value_ with new_val. Returns the old value_.
Definition: atomic.h:142
TEST(AtomicTest, Basic)
Definition: atomic-test.cc:28
T FetchAndUpdate(T delta)
Increment by delta and returns the old val.
Definition: atomic.h:110
void UpdateMax(T value)
Updates the int to 'value' if value is larger.
Definition: atomic.h:115
T CompareAndSwapVal(T old_val, T new_val)
Definition: atomic.h:137
T Read()
Safe read of the value.
Definition: atomic.h:100
bool CompareAndSwap(T old_val, T new_val)
Returns true if the atomic compare-and-swap was successful.
Definition: atomic.h:131
void CASDecrementThread(int64_t id, int64_t n, AtomicInt< int > *ai)
Definition: atomic-test.cc:157
function< void(int64_t, int64_t, AtomicInt< int > *)> Fn
Definition: atomic-test.cc:104
int main(int argc, char **argv)
Definition: atomic-test.cc:198
void IncrementThread(int64_t id, int64_t n, AtomicInt< int > *ai)
Definition: atomic-test.cc:106
void DecrementThread(int64_t id, int64_t n, AtomicInt< int > *ai)
Definition: atomic-test.cc:112