Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
lock-benchmark.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <stdlib.h>
16 #include <stdio.h>
17 #include <iostream>
18 #include <vector>
19 #include <sstream>
20 #include <boost/thread.hpp>
21 #include <boost/thread/mutex.hpp>
22 #include "util/benchmark.h"
23 #include "util/cpu-info.h"
24 #include "util/spinlock.h"
25 
26 #include "common/names.h"
27 
28 using namespace impala;
29 
30 
31 // Benchmark for locking.
32 // Machine Info: Intel(R) Core(TM) i7-2600 CPU @ 3.40GHz
33 // locking: Function Rate (iters/ms) Comparison
34 // ----------------------------------------------------------------------
35 // Unlocked 2-Total Threads 45.5 1X
36 // Atomic 2-Total Threads 2.734 0.06009X
37 // SpinLock 2-Total Threads 2.245 0.04934X
38 // Boost 2-Total Threads 0.5453 0.01198X
39 //
40 // Unlocked 6-Total Threads 61.16 1X
41 // Atomic 6-Total Threads 2.875 0.047X
42 // SpinLock 6-Total Threads 1.368 0.02236X
43 // Boost 6-Total Threads 0.3173 0.005187X
44 //
45 // Unlocked 10-Total Threads 52.18 1X
46 // Atomic 10-Total Threads 2.061 0.0395X
47 // SpinLock 10-Total Threads 1.236 0.02369X
48 // Boost 10-Total Threads 0.3184 0.006101X
49 //
50 // Unlocked 14-Total Threads 54.18 1X
51 // Atomic 14-Total Threads 2.659 0.04907X
52 // SpinLock 14-Total Threads 1.274 0.02351X
53 // Boost 14-Total Threads 0.3252 0.006002X
54 //
55 // Unlocked 18-Total Threads 53.36 1X
56 // Atomic 18-Total Threads 1.952 0.03659X
57 // SpinLock 18-Total Threads 1.308 0.02452X
58 // Boost 18-Total Threads 0.3259 0.006109X
59 //
60 // Unlocked 22-Total Threads 56.91 1X
61 // Atomic 22-Total Threads 2.711 0.04764X
62 // SpinLock 22-Total Threads 1.311 0.02303X
63 // Boost 22-Total Threads 0.3254 0.005718X
64 struct TestData {
67  int64_t num_produces;
68  int64_t num_consumes;
69  int64_t value;
70 };
71 
72 mutex lock_;
74 
75 typedef function<void (int64_t, int64_t*)> Fn;
76 
77 void UnlockedConsumeThread(int64_t n, int64_t* value) {
78  // volatile to prevent compile from collapsing this loop to *value -= n
79  volatile int64_t* v = value;
80  for (int64_t i = 0; i < n; ++i) {
81  --(*v);
82  }
83 }
84 void UnlockedProduceThread(int64_t n, int64_t* value) {
85  // volatile to prevent compile from collapsing this loop to *value += n
86  volatile int64_t* v = value;
87  for (int64_t i = 0; i < n; ++i) {
88  ++(*v);
89  }
90 }
91 
92 void AtomicConsumeThread(int64_t n, int64_t* value) {
93  for (int64_t i = 0; i < n; ++i) {
94  __sync_fetch_and_add(value, -1);
95  }
96 }
97 void AtomicProduceThread(int64_t n, int64_t* value) {
98  for (int64_t i = 0; i < n; ++i) {
99  __sync_fetch_and_add(value, 1);
100  }
101 }
102 
103 void SpinLockConsumeThread(int64_t n, int64_t* value) {
104  for (int64_t i = 0; i < n; ++i) {
105  lock_guard<SpinLock> l(spinlock_);
106  --(*value);
107  }
108 }
109 void SpinLockProduceThread(int64_t n, int64_t* value) {
110  for (int64_t i = 0; i < n; ++i) {
111  lock_guard<SpinLock> l(spinlock_);
112  ++(*value);
113  }
114 }
115 
116 void BoostConsumeThread(int64_t n, int64_t* value) {
117  for (int64_t i = 0; i < n; ++i) {
118  lock_guard<mutex> l(lock_);
119  --(*value);
120  }
121 }
122 void BoostProduceThread(int64_t n, int64_t* value) {
123  for (int64_t i = 0; i < n; ++i) {
124  lock_guard<mutex> l(lock_);
125  ++(*value);
126  }
127 }
128 
129 void LaunchThreads(void* d, Fn consume_fn, Fn produce_fn, int64_t scale) {
130  TestData* data = reinterpret_cast<TestData*>(d);
131  data->value = 0;
132  int64_t num_per_consumer = data->num_consumes / data->num_consumer_threads;
133  int64_t num_per_producer = data->num_produces / data->num_producer_threads;
134  num_per_producer *= scale;
135  num_per_consumer *= scale;
136  thread_group consumers, producers;
137  for (int i = 0; i < data->num_consumer_threads; ++i) {
138  consumers.add_thread(
139  new thread(consume_fn, num_per_consumer, &data->value));
140  }
141  for (int i = 0; i < data->num_producer_threads; ++i) {
142  consumers.add_thread(
143  new thread(produce_fn, num_per_producer, &data->value));
144  }
145  consumers.join_all();
146  producers.join_all();
147 }
148 
149 void TestUnlocked(int batch_size, void* d) {
151 }
152 
153 void TestAtomic(int batch_size, void* d) {
154  TestData* data = reinterpret_cast<TestData*>(d);
156  CHECK_EQ(data->value, 0);
157 }
158 
159 void TestSpinLock(int batch_size, void* d) {
160  TestData* data = reinterpret_cast<TestData*>(d);
162  CHECK_EQ(data->value, 0);
163 }
164 
165 void TestBoost(int batch_size, void* d) {
166  TestData* data = reinterpret_cast<TestData*>(d);
168  CHECK_EQ(data->value, 0);
169 }
170 
171 int main(int argc, char **argv) {
172  CpuInfo::Init();
173  cout << Benchmark::GetMachineInfo() << endl;
174 
175  int64_t N = 10000L;
176  const int max_producers = 12;
177 
178  Benchmark suite("locking");
179  TestData data[max_producers];
180  for (int i = 0; i < max_producers; i += 2) {
181  data[i].num_producer_threads = i + 1;
182  data[i].num_consumer_threads = i + 1;
183  data[i].num_produces = N;
184  data[i].num_consumes = N;
185 
186  stringstream suffix;
187  stringstream name;
188  suffix << " " << (i+1) * 2 << "-Total Threads";
189 
190  name.str("");
191  name << "Unlocked" << suffix.str();
192  int baseline = suite.AddBenchmark(name.str(), TestUnlocked, &data[i], -1);
193 
194  name.str("");
195  name << "Atomic" << suffix.str();
196  suite.AddBenchmark(name.str(), TestAtomic, &data[i], baseline);
197 
198  name.str("");
199  name << "SpinLock" << suffix.str();
200  suite.AddBenchmark(name.str(), TestSpinLock, &data[i], baseline);
201 
202  name.str("");
203  name << "Boost" << suffix.str();
204  suite.AddBenchmark(name.str(), TestBoost, &data[i], baseline);
205  }
206  cout << suite.Measure() << endl;
207 
208  return 0;
209 }
void SpinLockConsumeThread(int64_t n, int64_t *value)
int AddBenchmark(const std::string &name, BenchmarkFunction fn, void *args, int baseline_idx=0)
Definition: benchmark.cc:70
static std::string GetMachineInfo()
Output machine/build configuration as a string.
Definition: benchmark.cc:124
void TestBoost(int batch_size, void *d)
boost::mutex lock_
protects all fields below
Definition: coordinator.h:233
void TestUnlocked(int batch_size, void *d)
Lightweight spinlock.
Definition: spinlock.h:24
std::string Measure()
Runs all the benchmarks and returns the result in a formatted string.
Definition: benchmark.cc:83
void AtomicConsumeThread(int64_t n, int64_t *value)
int64_t value
void TestAtomic(int batch_size, void *d)
SpinLock spinlock_
void AtomicProduceThread(int64_t n, int64_t *value)
void TestSpinLock(int batch_size, void *d)
function< void(int64_t, int64_t, AtomicInt< int > *)> Fn
Definition: atomic-test.cc:104
void SpinLockProduceThread(int64_t n, int64_t *value)
void BoostProduceThread(int64_t n, int64_t *value)
int64_t num_produces
int main(int argc, char **argv)
int num_consumer_threads
void UnlockedProduceThread(int64_t n, int64_t *value)
void LaunchThreads(void *d, Fn consume_fn, Fn produce_fn, int64_t scale)
void BoostConsumeThread(int64_t n, int64_t *value)
static void Init()
Initialize CpuInfo.
Definition: cpu-info.cc:75
string name
Definition: cpu-info.cc:50
int num_producer_threads
int64_t num_consumes
void UnlockedConsumeThread(int64_t n, int64_t *value)
function< void(int64_t, int64_t *)> Fn