Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
thread-resource-mgr.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <vector>
18 
19 #include <boost/algorithm/string.hpp>
20 #include <boost/thread/locks.hpp>
21 #include <gflags/gflags.h>
22 
23 #include "common/logging.h"
24 #include "util/cpu-info.h"
25 
26 #include "common/names.h"
27 
28 using namespace impala;
29 
30 // Controls the number of threads to run work per core. It's common to pick 2x
31 // or 3x the number of cores. This keeps the cores busy without causing excessive
32 // thrashing.
33 DEFINE_int32(num_threads_per_core, 3, "Number of threads per core.");
34 
36  DCHECK_GE(threads_quota, 0);
37  if (threads_quota == 0) {
38  system_threads_quota_ = CpuInfo::num_cores() * FLAGS_num_threads_per_core;
39  } else {
40  system_threads_quota_ = threads_quota;
41  }
42  per_pool_quota_ = 0;
43 }
44 
46  : parent_(parent) {
47 }
48 
50  num_threads_ = 0;
51  num_reserved_optional_threads_ = 0;
52  thread_available_fn_ = NULL;
53  max_quota_ = INT_MAX;
54 }
55 
57  DCHECK_GE(num, 0);
58  num_reserved_optional_threads_ = num;
59 }
60 
62  unique_lock<mutex> l(lock_);
63  ResourcePool* pool = NULL;
64  if (free_pool_objs_.empty()) {
65  pool = new ResourcePool(this);
66  } else {
67  pool = free_pool_objs_.front();
68  free_pool_objs_.pop_front();
69  }
70 
71  DCHECK(pool != NULL);
72  DCHECK(pools_.find(pool) == pools_.end());
73  pools_.insert(pool);
74  pool->Reset();
75 
76  // Added a new pool, update the quotas for each pool.
77  UpdatePoolQuotas(pool);
78  return pool;
79 }
80 
82  DCHECK(pool != NULL);
83  unique_lock<mutex> l(lock_);
84  DCHECK(pools_.find(pool) != pools_.end());
85  pools_.erase(pool);
86  free_pool_objs_.push_back(pool);
88 }
89 
91  unique_lock<mutex> l(lock_);
92  DCHECK(thread_available_fn_ == NULL || fn == NULL);
93  thread_available_fn_ = fn;
94 }
95 
97  if (pools_.empty()) return;
99  ceil(static_cast<double>(system_threads_quota_) / pools_.size());
100  for (Pools::iterator it = pools_.begin(); it != pools_.end(); ++it) {
101  ResourcePool* pool = *it;
102  if (pool == new_pool) continue;
103  unique_lock<mutex> l(pool->lock_);
104  if (pool->num_available_threads() > 0 && pool->thread_available_fn_ != NULL) {
105  pool->thread_available_fn_(pool);
106  }
107  }
108 }
int system_threads_quota_
'Optimal' number of threads for the entire process.
boost::mutex lock_
Lock for the entire object. Protects all fields below.
void SetThreadAvailableCb(ThreadAvailableCb fn)
boost::function< void(ResourcePool *)> ThreadAvailableCb
void UpdatePoolQuotas(ResourcePool *new_pool=NULL)
int num_available_threads() const
Returns the number of optional threads that can still be used.
void UnregisterPool(ResourcePool *pool)
ObjectPool pool
ThreadResourceMgr(int threads_quota=0)
static int num_cores()
Returns the number of cores (including hyper-threaded) on this machine.
Definition: cpu-info.h:80
DEFINE_int32(periodic_counter_update_period_ms, 500,"Period to update rate counters and"" sampling counters in ms")
std::list< ResourcePool * > free_pool_objs_
Recycled list of pool objects.