Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
thread.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "util/thread.h"
16 
17 #include <set>
18 #include <map>
19 #include <boost/foreach.hpp>
20 #include <unistd.h>
21 #include <sys/syscall.h>
22 #include <sys/types.h>
23 
24 #include "util/debug-util.h"
25 #include "util/error-util.h"
26 #include "util/cgroups-mgr.h"
27 #include "util/metrics.h"
28 #include "util/webserver.h"
29 #include "util/url-coding.h"
30 #include "util/os-util.h"
31 
32 #include "common/names.h"
33 
34 namespace this_thread = boost::this_thread;
35 using boost::ptr_vector;
36 using namespace rapidjson;
37 
38 namespace impala {
39 
40 static const string THREADS_WEB_PAGE = "/threadz";
41 static const string THREADS_TEMPLATE = "threadz.tmpl";
42 
43 class ThreadMgr;
44 
45 // Singleton instance of ThreadMgr. Only visible in this file, used only by Thread.
46 // The Thread class adds a reference to thread_manager while it is supervising a thread so
47 // that a race between the end of the process's main thread (and therefore the destruction
48 // of thread_manager) and the end of a thread that tries to remove itself from the
49 // manager after the destruction can be avoided.
50 shared_ptr<ThreadMgr> thread_manager;
51 
52 // A singleton class that tracks all live threads, and groups them together for easy
53 // auditing. Used only by Thread.
54 class ThreadMgr {
55  public:
56  ThreadMgr() : metrics_enabled_(false) { }
57 
58  Status StartInstrumentation(MetricGroup* metrics, Webserver* webserver);
59 
60  // Registers a thread to the supplied category. The key is a boost::thread::id, used
61  // instead of the system TID since boost::thread::id is always available, unlike
62  // gettid() which might fail.
63  void AddThread(const thread::id& thread, const string& name, const string& category,
64  int64_t tid);
65 
66  // Removes a thread from the supplied category. If the thread has
67  // already been removed, this is a no-op.
68  void RemoveThread(const thread::id& boost_id, const string& category);
69 
70  private:
71  // Container class for any details we want to capture about a thread
72  // TODO: Add start-time.
73  // TODO: Track fragment ID.
75  public:
77  ThreadDescriptor(const string& category, const string& name, int64_t thread_id)
78  : name_(name), category_(category), thread_id_(thread_id) {
79  }
80 
81  const string& name() const { return name_; }
82  const string& category() const { return category_; }
83  int64_t thread_id() const { return thread_id_; }
84 
85  private:
86  string name_;
87  string category_;
88  int64_t thread_id_;
89  };
90 
91  // A ThreadCategory is a set of threads that are logically related.
92  // TODO: unordered_map is incompatible with boost::thread::id, but would be more
93  // efficient here.
94  typedef map<const thread::id, ThreadDescriptor> ThreadCategory;
95 
96  // All thread categorys, keyed on the category name.
97  typedef map<string, ThreadCategory> ThreadCategoryMap;
98 
99  // Protects thread_categories_ and metrics_enabled_
100  mutex lock_;
101 
102  // All thread categorys that ever contained a thread, even if empty
104 
105  // True after StartInstrumentation(..) returns
107 
108  // Metrics to track all-time total number of threads, and the
109  // current number of running threads.
112 
113  // Webpage callbacks; print all threads by category
114  // Example output:
115  // "total_threads": 144,
116  // "thread-groups": [
117  // {
118  // "name": "common",
119  // "size": 1
120  // },
121  // {
122  // "name": "disk-io-mgr",
123  // "size": 2
124  // },
125  // {
126  // "name": "hdfs-worker-pool",
127  // "size": 16
128  // },
129  // ... etc ...
130  // ]
131  void ThreadGroupUrlCallback(const Webserver::ArgumentMap& args, Document* output);
132 
133  // Example output:
134  // "thread-group": {
135  // "category": "disk-io-mgr",
136  // "size": 2
137  // },
138  // "threads": [
139  // {
140  // "name": "work-loop(Disk: 0, Thread: 0)-17049",
141  // "user_ns": 0,
142  // "kernel_ns": 0,
143  // "iowait_ns": 0
144  // },
145  // {
146  // "name": "work-loop(Disk: 1, Thread: 0)-17050",
147  // "user_ns": 0,
148  // "kernel_ns": 0,
149  // "iowait_ns": 0
150  // }
151  // ]
152  void ThreadOverviewUrlCallback(const Webserver::ArgumentMap& args, Document* document);
153 };
154 
155 Status ThreadMgr::StartInstrumentation(MetricGroup* metrics, Webserver* webserver) {
156  DCHECK(metrics != NULL);
157  DCHECK(webserver != NULL);
158  lock_guard<mutex> l(lock_);
159  metrics_enabled_ = true;
160  total_threads_metric_ = metrics->AddGauge<int64_t>(
161  "thread-manager.total-threads-created", 0L);
162  current_num_threads_metric_ = metrics->AddGauge<int64_t>(
163  "thread-manager.running-threads", 0L);
164 
165  Webserver::UrlCallback template_callback =
166  bind<void>(mem_fn(&ThreadMgr::ThreadOverviewUrlCallback), this, _1, _2);
168  template_callback);
169 
170  Webserver::UrlCallback overview_callback =
171  bind<void>(mem_fn(&ThreadMgr::ThreadGroupUrlCallback), this, _1, _2);
172  webserver->RegisterUrlCallback("/thread-group", "thread-group.tmpl",
173  overview_callback, false);
174 
175  return Status::OK;
176 }
177 
178 void ThreadMgr::AddThread(const thread::id& thread, const string& name,
179  const string& category, int64_t tid) {
180  lock_guard<mutex> l(lock_);
181  thread_categories_[category][thread] = ThreadDescriptor(category, name, tid);
182  if (metrics_enabled_) {
183  current_num_threads_metric_->Increment(1L);
184  total_threads_metric_->Increment(1L);
185  }
186 }
187 
188 void ThreadMgr::RemoveThread(const thread::id& boost_id, const string& category) {
189  lock_guard<mutex> l(lock_);
190  ThreadCategoryMap::iterator category_it = thread_categories_.find(category);
191  DCHECK(category_it != thread_categories_.end());
192  category_it->second.erase(boost_id);
193  if (metrics_enabled_) current_num_threads_metric_->Increment(-1L);
194 }
195 
196 void ThreadMgr::ThreadOverviewUrlCallback(const Webserver::ArgumentMap& args,
197  Document* document) {
198  lock_guard<mutex> l(lock_);
199  if (metrics_enabled_) {
200  document->AddMember("total_threads", current_num_threads_metric_->value(),
201  document->GetAllocator());
202  }
203  Value lst(kArrayType);
204  BOOST_FOREACH(const ThreadCategoryMap::value_type& category, thread_categories_) {
205  Value val(kObjectType);
206  val.AddMember("name", category.first.c_str(), document->GetAllocator());
207  val.AddMember("size", category.second.size(), document->GetAllocator());
208  // TODO: URLEncode() name?
209  lst.PushBack(val, document->GetAllocator());
210  }
211  document->AddMember("thread-groups", lst, document->GetAllocator());
212 }
213 
214 void ThreadMgr::ThreadGroupUrlCallback(const Webserver::ArgumentMap& args,
215  Document* document) {
216  lock_guard<mutex> l(lock_);
217  vector<const ThreadCategory*> categories_to_print;
218  Webserver::ArgumentMap::const_iterator category_it = args.find("group");
219  string category_name = (category_it == args.end()) ? "all" : category_it->second;
220  if (category_name != "all") {
221  ThreadCategoryMap::const_iterator category =
222  thread_categories_.find(category_name);
223  if (category == thread_categories_.end()) {
224  return;
225  }
226  categories_to_print.push_back(&category->second);
227  Value val(kObjectType);
228  val.AddMember("category", category->first.c_str(), document->GetAllocator());
229  val.AddMember("size", category->second.size(), document->GetAllocator());
230  document->AddMember("thread-group", val, document->GetAllocator());
231  } else {
232  BOOST_FOREACH(const ThreadCategoryMap::value_type& category, thread_categories_) {
233  categories_to_print.push_back(&category.second);
234  }
235  }
236 
237  Value lst(kArrayType);
238  BOOST_FOREACH(const ThreadCategory* category, categories_to_print) {
239  BOOST_FOREACH(const ThreadCategory::value_type& thread, *category) {
240  Value val(kObjectType);
241  val.AddMember("name", thread.second.name().c_str(), document->GetAllocator());
242  ThreadStats stats;
243  Status status = GetThreadStats(thread.second.thread_id(), &stats);
244  if (!status.ok()) {
245  LOG_EVERY_N(INFO, 100) << "Could not get per-thread statistics: "
246  << status.GetDetail();
247  } else {
248  val.AddMember("user_ns", static_cast<double>(stats.user_ns) / 1e9,
249  document->GetAllocator());
250  val.AddMember("kernel_ns", static_cast<double>(stats.kernel_ns) / 1e9,
251  document->GetAllocator());
252  val.AddMember("iowait_ns", static_cast<double>(stats.iowait_ns) / 1e9,
253  document->GetAllocator());
254  }
255  lst.PushBack(val, document->GetAllocator());
256  }
257  }
258  document->AddMember("threads", lst, document->GetAllocator());
259 }
260 
262  DCHECK(thread_manager.get() == NULL);
263  thread_manager.reset(new ThreadMgr());
264 }
265 
267  return thread_manager->StartInstrumentation(metrics, webserver);
268 }
269 
270 void Thread::StartThread(const ThreadFunctor& functor) {
271  DCHECK(thread_manager.get() != NULL)
272  << "Thread created before InitThreading called";
273  DCHECK(tid_ == UNINITIALISED_THREAD_ID) << "StartThread called twice";
274 
275  Promise<int64_t> thread_started;
276  thread_.reset(
277  new thread(&Thread::SuperviseThread, name_, category_, functor, &thread_started));
278 
279  // TODO: This slows down thread creation although not enormously. To make this faster,
280  // consider delaying thread_started.Get() until the first call to tid(), but bear in
281  // mind that some coordination is required between SuperviseThread() and this to make
282  // sure that the thread is still available to have its tid set.
283  tid_ = thread_started.Get();
284 
285  VLOG(2) << "Started thread " << tid_ << " - " << category_ << ":" << name_;
286 }
287 
288 void Thread::SuperviseThread(const string& name, const string& category,
289  Thread::ThreadFunctor functor, Promise<int64_t>* thread_started) {
290  int64_t system_tid = syscall(SYS_gettid);
291  if (system_tid == -1) {
292  string error_msg = GetStrErrMsg();
293  LOG_EVERY_N(INFO, 100) << "Could not determine thread ID: " << error_msg;
294  }
295  // Make a copy, since we want to refer to these variables after the unsafe point below.
296  string category_copy = category;
297  shared_ptr<ThreadMgr> thread_mgr_ref = thread_manager;
298  stringstream ss;
299  ss << (name.empty() ? "thread" : name) << "-" << system_tid;
300  string name_copy = ss.str();
301 
302  if (category_copy.empty()) category_copy = "no-category";
303 
304  // Use boost's get_id rather than the system thread ID as the unique key for this thread
305  // since the latter is more prone to being recycled.
306 
307  thread_mgr_ref->AddThread(this_thread::get_id(), name_copy, category_copy, system_tid);
308  thread_started->Set(system_tid);
309 
310  // Any reference to any parameter not copied in by value may no longer be valid after
311  // this point, since the caller that is waiting on *tid != 0 may wake, take the lock and
312  // destroy the enclosing Thread object.
313 
314  functor();
315  thread_mgr_ref->RemoveThread(this_thread::get_id(), category_copy);
316 }
317 
318 Status ThreadGroup::AddThread(Thread* thread) {
319  threads_.push_back(thread);
320  if (!cgroup_path_.empty()) {
321  DCHECK(cgroups_mgr_ != NULL);
322  RETURN_IF_ERROR(cgroups_mgr_->AssignThreadToCgroup(*thread, cgroup_path_));
323  }
324  return Status::OK;
325 }
326 
327 void ThreadGroup::JoinAll() {
328  BOOST_FOREACH(const Thread& thread, threads_) {
329  thread.Join();
330  }
331 }
332 
333 Status ThreadGroup::SetCgroup(const string& cgroup) {
334  DCHECK_NOTNULL(cgroups_mgr_);
335  cgroup_path_ = cgroup;
336  // BOOST_FOREACH + ptr_vector + const are not compatible
337  for (ptr_vector<Thread>::const_iterator it = threads_.begin();
338  it != threads_.end(); ++it) {
339  RETURN_IF_ERROR(cgroups_mgr_->AssignThreadToCgroup(*it, cgroup));
340  }
341  return Status::OK;
342 }
343 
344 }
static const string THREADS_TEMPLATE
Definition: thread.cc:41
IntGauge * total_threads_metric_
Definition: thread.cc:110
int64_t kernel_ns
Definition: os-util.h:29
const std::string GetDetail() const
Definition: status.cc:184
ThreadCategoryMap thread_categories_
Definition: thread.cc:103
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
void Set(const T &val)
Definition: promise.h:38
boost::function< void(const ArgumentMap &args, rapidjson::Document *json)> UrlCallback
Definition: webserver.h:38
const string & category() const
Definition: thread.cc:82
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
static const string THREADS_WEB_PAGE
Definition: thread.cc:40
bool metrics_enabled_
Definition: thread.cc:106
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
void RegisterUrlCallback(const std::string &path, const std::string &template_filename, const UrlCallback &callback, bool is_on_nav_bar=true)
Only one callback may be registered per URL.
Definition: webserver.cc:412
const string & name() const
Definition: thread.cc:81
int64_t user_ns
Definition: os-util.h:28
std::map< std::string, std::string > ArgumentMap
Definition: webserver.h:36
map< string, ThreadCategory > ThreadCategoryMap
Definition: thread.cc:97
Status StartThreadInstrumentation(MetricGroup *metrics, Webserver *webserver)
Definition: thread.cc:266
ThreadDescriptor(const string &category, const string &name, int64_t thread_id)
Definition: thread.cc:77
void InitThreading()
Initialises the threading subsystem. Must be called before a Thread is created.
Definition: thread.cc:261
string GetStrErrMsg()
Definition: error-util.cc:30
mutex lock_
const T & Get()
Definition: promise.h:59
Container struct for statistics read from the /proc filesystem for a thread.
Definition: os-util.h:27
map< const thread::id, ThreadDescriptor > ThreadCategory
Definition: thread.cc:94
SimpleMetric< T > * AddGauge(const std::string &key, const T &value, const TUnit::type unit=TUnit::NONE, const std::string &description="")
Create a gauge metric object with given key and initial value (owned by this object) ...
Definition: metrics.h:223
IntGauge * current_num_threads_metric_
Definition: thread.cc:111
Status GetThreadStats(int64_t tid, ThreadStats *stats)
Definition: os-util.cc:52
shared_ptr< ThreadMgr > thread_manager
Definition: thread.cc:43
bool ok() const
Definition: status.h:172
void Join() const
Definition: thread.h:102
string name
Definition: cpu-info.cc:50
int64_t iowait_ns
Definition: os-util.h:30
boost::function< void()> ThreadFunctor
Function object that wraps the user-supplied function to run in a separate thread.
Definition: thread.h:117