Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
thread-pool-test.cc
Go to the documentation of this file.
1 // Copyright 2013 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <boost/thread.hpp>
16 #include <boost/thread/mutex.hpp>
17 #include <glog/logging.h>
18 #include <gtest/gtest.h>
19 #include <unistd.h>
20 
21 #include "common/logging.h"
22 #include "util/thread-pool.h"
23 
24 #include "common/names.h"
25 
26 namespace impala {
27 
28 const int NUM_THREADS = 5;
30 
31 // Per-thread mutex to ensure visibility of counters after thread pool terminates
33 
34 void Count(int thread_id, const int& i) {
35  lock_guard<mutex> l(thread_mutexes[thread_id]);
36  thread_counters[thread_id] += i;
37 }
38 
39 TEST(ThreadPoolTest, BasicTest) {
40  const int OFFERED_RANGE = 10000;
41  for (int i = 0; i < NUM_THREADS; ++i) {
42  thread_counters[i] = 0;
43  }
44 
45  ThreadPool<int> thread_pool("thread-pool", "worker", 5, 250, Count);
46  for (int i = 0; i <= OFFERED_RANGE; ++i) {
47  ASSERT_TRUE(thread_pool.Offer(i));
48  }
49 
50  thread_pool.DrainAndShutdown();
51 
52  // Check that Offer() after Shutdown() will return false
53  ASSERT_FALSE(thread_pool.Offer(-1));
54  EXPECT_EQ(0, thread_pool.GetQueueSize());
55 
56  int expected_count = (OFFERED_RANGE * (OFFERED_RANGE + 1)) / 2;
57  int count = 0;
58  for (int i = 0; i < NUM_THREADS; ++i) {
59  lock_guard<mutex> l(thread_mutexes[i]);
60  LOG(INFO) << "Counter " << i << ": " << thread_counters[i];
61  count += thread_counters[i];
62  }
63 
64  EXPECT_EQ(expected_count, count);
65 }
66 
67 }
68 
69 int main(int argc, char** argv) {
72  ::testing::InitGoogleTest(&argc, argv);
73  return RUN_ALL_TESTS();
74 }
void Count(int thread_id, const int &i)
mutex thread_mutexes[NUM_THREADS]
TEST(AtomicTest, Basic)
Definition: atomic-test.cc:28
int thread_counters[NUM_THREADS]
bool Offer(const T &work)
Definition: thread-pool.h:74
void InitGoogleLoggingSafe(const char *arg)
Definition: logging.cc:55
void InitThreading()
Initialises the threading subsystem. Must be called before a Thread is created.
Definition: thread.cc:261
uint32_t GetQueueSize() const
Definition: thread-pool.h:96
uint64_t count
int main(int argc, char **argv)
const int NUM_THREADS