15 #ifndef IMPALA_RUNTIME_THREAD_RESOURCE_MGR_H
16 #define IMPALA_RUNTIME_THREAD_RESOURCE_MGR_H
20 #include <boost/function.hpp>
21 #include <boost/scoped_ptr.hpp>
22 #include <boost/shared_ptr.hpp>
23 #include <boost/thread/mutex.hpp>
24 #include <boost/thread/thread.hpp>
144 int64_t optional_threads = num_threads >> 32;
145 int64_t required_threads = num_threads & 0xFFFFFFFF;
147 optional_threads + required_threads >
quota();
154 return std::max(0, value);
215 typedef std::set<ResourcePool*>
Pools;
237 int64_t previous_num_threads = num_threads_;
238 int64_t new_optional_threads = (previous_num_threads >> 32) + 1;
239 int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF;
240 if (new_optional_threads > num_reserved_optional_threads_ &&
241 new_optional_threads + new_required_threads > quota()) {
244 bool thread_is_reserved = new_optional_threads <= num_reserved_optional_threads_;
245 int64_t new_value = new_optional_threads << 32 | new_required_threads;
248 if (__sync_bool_compare_and_swap(&num_threads_, previous_num_threads, new_value)) {
249 if (is_reserved != NULL) *is_reserved = thread_is_reserved;
257 DCHECK_GT(num_required_threads(), 0);
258 __sync_fetch_and_add(&num_threads_, -1);
260 DCHECK_GT(num_optional_threads(), 0);
262 int64_t previous_num_threads = num_threads_;
263 int64_t new_optional_threads = (previous_num_threads >> 32) - 1;
264 int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF;
265 int64_t new_value = new_optional_threads << 32 | new_required_threads;
266 if (__sync_bool_compare_and_swap(&num_threads_, previous_num_threads, new_value)) {
278 if (num_available_threads() > 0 && thread_available_fn_ != NULL) {
279 boost::unique_lock<boost::mutex> l(
lock_);
280 if (num_available_threads() > 0 && thread_available_fn_ != NULL) {
281 thread_available_fn_(
this);
int system_threads_quota_
'Optimal' number of threads for the entire process.
int system_threads_quota() const
bool TryAcquireThreadToken(bool *is_reserved=NULL)
int num_required_threads() const
Returns the number of threads that are from AcquireThreadToken.
boost::mutex lock_
Lock for the entire object. Protects all fields below.
int64_t num_threads() const
void SetThreadAvailableCb(ThreadAvailableCb fn)
boost::function< void(ResourcePool *)> ThreadAvailableCb
void UpdatePoolQuotas(ResourcePool *new_pool=NULL)
ResourcePool * RegisterPool()
void ReserveOptionalTokens(int num)
int num_available_threads() const
Returns the number of optional threads that can still be used.
void ReleaseThreadToken(bool required)
void UnregisterPool(ResourcePool *pool)
ThreadResourceMgr * parent_
bool optional_exceeded()
Returns true if the number of optional threads has now exceeded the quota.
void set_max_quota(int quota)
int num_optional_threads() const
ThreadResourceMgr(int threads_quota=0)
void Reset()
Resets internal state.
std::set< ResourcePool * > Pools
Pools currently being managed.
ThreadAvailableCb thread_available_fn_
std::list< ResourcePool * > free_pool_objs_
Recycled list of pool objects.
int num_reserved_optional_threads()
int num_reserved_optional_threads_
void AcquireThreadToken()
ResourcePool(ThreadResourceMgr *parent)