Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
thread-resource-mgr.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef IMPALA_RUNTIME_THREAD_RESOURCE_MGR_H
16 #define IMPALA_RUNTIME_THREAD_RESOURCE_MGR_H
17 
18 #include <stdlib.h>
19 
20 #include <boost/function.hpp>
21 #include <boost/scoped_ptr.hpp>
22 #include <boost/shared_ptr.hpp>
23 #include <boost/thread/mutex.hpp>
24 #include <boost/thread/thread.hpp>
25 
26 #include <list>
27 
28 #include "common/status.h"
29 
30 namespace impala {
31 
39 //
64  public:
65  class ResourcePool;
66 
77  typedef boost::function<void (ResourcePool*)> ThreadAvailableCb;
78 
85  class ResourcePool {
86  public:
91  void AcquireThreadToken();
92 
97  bool TryAcquireThreadToken(bool* is_reserved = NULL);
98 
108  void ReserveOptionalTokens(int num);
109 
115  void ReleaseThreadToken(bool required);
116 
124 
126  int num_required_threads() const { return num_threads_ & 0xFFFFFFFF; }
127 
130  int num_optional_threads() const { return num_threads_ >> 32; }
131 
134  int64_t num_threads() const {
136  }
137 
139 
142  // Cache this so optional/required are computed based on the same value.
143  volatile int64_t num_threads = num_threads_;
144  int64_t optional_threads = num_threads >> 32;
145  int64_t required_threads = num_threads & 0xFFFFFFFF;
146  return optional_threads > num_reserved_optional_threads_ &&
147  optional_threads + required_threads > quota();
148  }
149 
151  int num_available_threads() const {
152  int value = std::max(quota() - static_cast<int>(num_threads()),
154  return std::max(0, value);
155  }
156 
159  int quota() const { return std::min(max_quota_, parent_->per_pool_quota_); }
160 
164 
165  private:
166  friend class ThreadResourceMgr;
167 
169 
171  void Reset();
172 
174 
177 
182  int64_t num_threads_;
183 
187  boost::mutex lock_;
188 
190  };
191 
195  ThreadResourceMgr(int threads_quota = 0);
196 
198 
201  ResourcePool* RegisterPool();
202 
205  void UnregisterPool(ResourcePool* pool);
206 
207  private:
210 
212  boost::mutex lock_;
213 
215  typedef std::set<ResourcePool*> Pools;
217 
221 
223  std::list<ResourcePool*> free_pool_objs_;
224 
228  void UpdatePoolQuotas(ResourcePool* new_pool = NULL);
229 };
230 
232  __sync_fetch_and_add(&num_threads_, 1);
233 }
234 
236  while (true) {
237  int64_t previous_num_threads = num_threads_;
238  int64_t new_optional_threads = (previous_num_threads >> 32) + 1;
239  int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF;
240  if (new_optional_threads > num_reserved_optional_threads_ &&
241  new_optional_threads + new_required_threads > quota()) {
242  return false;
243  }
244  bool thread_is_reserved = new_optional_threads <= num_reserved_optional_threads_;
245  int64_t new_value = new_optional_threads << 32 | new_required_threads;
246  // Atomically swap the new value if no one updated num_threads_. We do not
247  // not care about the ABA problem here.
248  if (__sync_bool_compare_and_swap(&num_threads_, previous_num_threads, new_value)) {
249  if (is_reserved != NULL) *is_reserved = thread_is_reserved;
250  return true;
251  }
252  }
253 }
254 
256  if (required) {
257  DCHECK_GT(num_required_threads(), 0);
258  __sync_fetch_and_add(&num_threads_, -1);
259  } else {
260  DCHECK_GT(num_optional_threads(), 0);
261  while (true) {
262  int64_t previous_num_threads = num_threads_;
263  int64_t new_optional_threads = (previous_num_threads >> 32) - 1;
264  int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF;
265  int64_t new_value = new_optional_threads << 32 | new_required_threads;
266  if (__sync_bool_compare_and_swap(&num_threads_, previous_num_threads, new_value)) {
267  break;
268  }
269  }
270  }
271 
278  if (num_available_threads() > 0 && thread_available_fn_ != NULL) {
279  boost::unique_lock<boost::mutex> l(lock_);
280  if (num_available_threads() > 0 && thread_available_fn_ != NULL) {
281  thread_available_fn_(this);
282  }
283  }
284 }
285 
286 } // namespace impala
287 
288 #endif
int system_threads_quota_
'Optimal' number of threads for the entire process.
bool TryAcquireThreadToken(bool *is_reserved=NULL)
int num_required_threads() const
Returns the number of threads that are from AcquireThreadToken.
boost::mutex lock_
Lock for the entire object. Protects all fields below.
void SetThreadAvailableCb(ThreadAvailableCb fn)
boost::function< void(ResourcePool *)> ThreadAvailableCb
void UpdatePoolQuotas(ResourcePool *new_pool=NULL)
int num_available_threads() const
Returns the number of optional threads that can still be used.
void UnregisterPool(ResourcePool *pool)
bool optional_exceeded()
Returns true if the number of optional threads has now exceeded the quota.
ObjectPool pool
ThreadResourceMgr(int threads_quota=0)
std::set< ResourcePool * > Pools
Pools currently being managed.
std::list< ResourcePool * > free_pool_objs_
Recycled list of pool objects.