Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
thread-pool.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef IMPALA_UTIL_THREAD_POOL_H
16 #define IMPALA_UTIL_THREAD_POOL_H
17 
18 #include "util/blocking-queue.h"
19 
20 #include <boost/thread/mutex.hpp>
21 #include <boost/bind/mem_fn.hpp>
22 
23 #include "util/thread.h"
24 
25 namespace impala {
26 
29 template <typename T>
30 class ThreadPool {
31  public:
35  typedef boost::function<void (int thread_id, const T& workitem)> WorkFunction;
36 
43  ThreadPool(const std::string& group, const std::string& thread_prefix,
44  uint32_t num_threads, uint32_t queue_size, const WorkFunction& work_function)
45  : work_function_(work_function),
46  work_queue_(queue_size),
47  shutdown_(false) {
48  for (int i = 0; i < num_threads; ++i) {
49  std::stringstream threadname;
50  threadname << thread_prefix << "(" << i + 1 << ":" << num_threads << ")";
51  threads_.AddThread(new Thread(group, threadname.str(),
52  boost::bind<void>(boost::mem_fn(&ThreadPool<T>::WorkerThread), this, i)));
53  }
54  }
55 
59  Shutdown();
60  Join();
61  }
62 
65  //
71  //
74  bool Offer(const T& work) {
75  return work_queue_.BlockingPut(work);
76  }
77 
82  void Shutdown() {
83  {
84  boost::lock_guard<boost::mutex> l(lock_);
85  shutdown_ = true;
86  }
87  work_queue_.Shutdown();
88  }
89 
92  void Join() {
93  threads_.JoinAll();
94  }
95 
96  uint32_t GetQueueSize() const {
97  return work_queue_.GetSize();
98  }
99 
104  {
105  boost::unique_lock<boost::mutex> l(lock_);
106  while (work_queue_.GetSize() != 0) {
107  empty_cv_.wait(l);
108  }
109  }
110  Shutdown();
111  Join();
112  }
113 
114  Status AssignToCgroup(const std::string& cgroup) {
115  return threads_.SetCgroup(cgroup);
116  }
117 
118  private:
121  void WorkerThread(int thread_id) {
122  while (!IsShutdown()) {
123  T workitem;
124  if (work_queue_.BlockingGet(&workitem)) {
125  work_function_(thread_id, workitem);
126  }
127  if (work_queue_.GetSize() == 0) {
131  boost::unique_lock<boost::mutex> l(lock_);
132  empty_cv_.notify_all();
133  }
134  }
135  }
136 
138  bool IsShutdown() {
139  boost::lock_guard<boost::mutex> l(lock_);
140  return shutdown_;
141  }
142 
145 
149 
152 
154  boost::mutex lock_;
155 
157  bool shutdown_;
158 
160  boost::condition_variable empty_cv_;
161 };
162 
163 }
164 
165 #endif
Status SetCgroup(const std::string &cgroup)
Definition: thread.cc:333
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
ThreadGroup threads_
Collection of worker threads that process work from the queue.
Definition: thread-pool.h:151
void WorkerThread(int thread_id)
Definition: thread-pool.h:121
bool shutdown_
Set to true when threads should stop doing work and terminate.
Definition: thread-pool.h:157
bool Offer(const T &work)
Definition: thread-pool.h:74
BlockingQueue< T > work_queue_
Definition: thread-pool.h:148
boost::condition_variable empty_cv_
Signalled when the queue becomes empty.
Definition: thread-pool.h:160
bool IsShutdown()
Returns value of shutdown_ under a lock, forcing visibility to threads in the pool.
Definition: thread-pool.h:138
uint32_t GetQueueSize() const
Definition: thread-pool.h:96
ThreadPool(const std::string &group, const std::string &thread_prefix, uint32_t num_threads, uint32_t queue_size, const WorkFunction &work_function)
Definition: thread-pool.h:43
boost::function< void(int thread_id, const T &workitem)> WorkFunction
Definition: thread-pool.h:35
Status AddThread(Thread *thread)
Definition: thread.cc:318
boost::mutex lock_
Guards shutdown_ and empty_cv_.
Definition: thread-pool.h:154
WorkFunction work_function_
User-supplied method to call to process each work item.
Definition: thread-pool.h:144
Status AssignToCgroup(const std::string &cgroup)
Definition: thread-pool.h:114