19 #include <boost/foreach.hpp>
21 #include <sys/syscall.h>
22 #include <sys/types.h>
34 namespace this_thread = boost::this_thread;
35 using boost::ptr_vector;
36 using namespace rapidjson;
63 void AddThread(
const thread::id& thread,
const string&
name,
const string& category,
68 void RemoveThread(
const thread::id& boost_id,
const string& category);
78 : name_(name), category_(category), thread_id_(thread_id) {
81 const string&
name()
const {
return name_; }
82 const string&
category()
const {
return category_; }
156 DCHECK(metrics != NULL);
157 DCHECK(webserver != NULL);
158 lock_guard<mutex> l(
lock_);
159 metrics_enabled_ =
true;
160 total_threads_metric_ = metrics->
AddGauge<int64_t>(
161 "thread-manager.total-threads-created", 0L);
162 current_num_threads_metric_ = metrics->
AddGauge<int64_t>(
163 "thread-manager.running-threads", 0L);
166 bind<void>(mem_fn(&ThreadMgr::ThreadOverviewUrlCallback),
this, _1, _2);
171 bind<void>(mem_fn(&ThreadMgr::ThreadGroupUrlCallback),
this, _1, _2);
173 overview_callback,
false);
178 void ThreadMgr::AddThread(
const thread::id& thread,
const string&
name,
179 const string& category, int64_t tid) {
180 lock_guard<mutex> l(
lock_);
181 thread_categories_[category][thread] =
ThreadDescriptor(category, name, tid);
182 if (metrics_enabled_) {
183 current_num_threads_metric_->Increment(1L);
184 total_threads_metric_->Increment(1L);
188 void ThreadMgr::RemoveThread(
const thread::id& boost_id,
const string& category) {
189 lock_guard<mutex> l(
lock_);
190 ThreadCategoryMap::iterator category_it = thread_categories_.find(category);
191 DCHECK(category_it != thread_categories_.end());
192 category_it->second.erase(boost_id);
193 if (metrics_enabled_) current_num_threads_metric_->Increment(-1L);
197 Document* document) {
198 lock_guard<mutex> l(
lock_);
199 if (metrics_enabled_) {
200 document->AddMember(
"total_threads", current_num_threads_metric_->value(),
201 document->GetAllocator());
203 Value lst(kArrayType);
204 BOOST_FOREACH(
const ThreadCategoryMap::value_type& category, thread_categories_) {
205 Value val(kObjectType);
206 val.AddMember(
"name", category.first.c_str(), document->GetAllocator());
207 val.AddMember(
"size", category.second.size(), document->GetAllocator());
209 lst.PushBack(val, document->GetAllocator());
211 document->AddMember(
"thread-groups", lst, document->GetAllocator());
215 Document* document) {
216 lock_guard<mutex> l(
lock_);
217 vector<const ThreadCategory*> categories_to_print;
218 Webserver::ArgumentMap::const_iterator category_it = args.find(
"group");
219 string category_name = (category_it == args.end()) ?
"all" : category_it->second;
220 if (category_name !=
"all") {
221 ThreadCategoryMap::const_iterator category =
222 thread_categories_.find(category_name);
223 if (category == thread_categories_.end()) {
226 categories_to_print.push_back(&category->second);
227 Value val(kObjectType);
228 val.AddMember(
"category", category->first.c_str(), document->GetAllocator());
229 val.AddMember(
"size", category->second.size(), document->GetAllocator());
230 document->AddMember(
"thread-group", val, document->GetAllocator());
232 BOOST_FOREACH(
const ThreadCategoryMap::value_type& category, thread_categories_) {
233 categories_to_print.push_back(&category.second);
237 Value lst(kArrayType);
238 BOOST_FOREACH(
const ThreadCategory* category, categories_to_print) {
239 BOOST_FOREACH(
const ThreadCategory::value_type& thread, *category) {
240 Value val(kObjectType);
241 val.AddMember(
"name", thread.second.name().c_str(), document->GetAllocator());
245 LOG_EVERY_N(INFO, 100) <<
"Could not get per-thread statistics: "
248 val.AddMember(
"user_ns", static_cast<double>(stats.
user_ns) / 1e9,
249 document->GetAllocator());
250 val.AddMember(
"kernel_ns", static_cast<double>(stats.
kernel_ns) / 1e9,
251 document->GetAllocator());
252 val.AddMember(
"iowait_ns", static_cast<double>(stats.
iowait_ns) / 1e9,
253 document->GetAllocator());
255 lst.PushBack(val, document->GetAllocator());
258 document->AddMember(
"threads", lst, document->GetAllocator());
272 <<
"Thread created before InitThreading called";
273 DCHECK(tid_ == UNINITIALISED_THREAD_ID) <<
"StartThread called twice";
277 new thread(&Thread::SuperviseThread, name_, category_, functor, &thread_started));
283 tid_ = thread_started.
Get();
285 VLOG(2) <<
"Started thread " << tid_ <<
" - " << category_ <<
":" << name_;
288 void Thread::SuperviseThread(
const string&
name,
const string& category,
290 int64_t system_tid = syscall(SYS_gettid);
291 if (system_tid == -1) {
293 LOG_EVERY_N(INFO, 100) <<
"Could not determine thread ID: " << error_msg;
296 string category_copy = category;
299 ss << (name.empty() ?
"thread" :
name) <<
"-" << system_tid;
300 string name_copy = ss.str();
302 if (category_copy.empty()) category_copy =
"no-category";
307 thread_mgr_ref->AddThread(this_thread::get_id(), name_copy, category_copy, system_tid);
308 thread_started->
Set(system_tid);
315 thread_mgr_ref->RemoveThread(this_thread::get_id(), category_copy);
319 threads_.push_back(thread);
320 if (!cgroup_path_.empty()) {
321 DCHECK(cgroups_mgr_ != NULL);
322 RETURN_IF_ERROR(cgroups_mgr_->AssignThreadToCgroup(*thread, cgroup_path_));
327 void ThreadGroup::JoinAll() {
328 BOOST_FOREACH(
const Thread& thread, threads_) {
333 Status ThreadGroup::SetCgroup(
const string& cgroup) {
334 DCHECK_NOTNULL(cgroups_mgr_);
335 cgroup_path_ = cgroup;
337 for (ptr_vector<Thread>::const_iterator it = threads_.begin();
338 it != threads_.end(); ++it) {
static const string THREADS_TEMPLATE
IntGauge * total_threads_metric_
const std::string GetDetail() const
ThreadCategoryMap thread_categories_
TODO: Consider allowing fragment IDs as category parameters.
boost::function< void(const ArgumentMap &args, rapidjson::Document *json)> UrlCallback
const string & category() const
#define RETURN_IF_ERROR(stmt)
some generally useful macros
static const string THREADS_WEB_PAGE
MetricGroups may be organised hierarchically as a tree.
void RegisterUrlCallback(const std::string &path, const std::string &template_filename, const UrlCallback &callback, bool is_on_nav_bar=true)
Only one callback may be registered per URL.
const string & name() const
std::map< std::string, std::string > ArgumentMap
int64_t thread_id() const
map< string, ThreadCategory > ThreadCategoryMap
Status StartThreadInstrumentation(MetricGroup *metrics, Webserver *webserver)
ThreadDescriptor(const string &category, const string &name, int64_t thread_id)
void InitThreading()
Initialises the threading subsystem. Must be called before a Thread is created.
Container struct for statistics read from the /proc filesystem for a thread.
map< const thread::id, ThreadDescriptor > ThreadCategory
SimpleMetric< T > * AddGauge(const std::string &key, const T &value, const TUnit::type unit=TUnit::NONE, const std::string &description="")
Create a gauge metric object with given key and initial value (owned by this object) ...
IntGauge * current_num_threads_metric_
Status GetThreadStats(int64_t tid, ThreadStats *stats)
shared_ptr< ThreadMgr > thread_manager
boost::function< void()> ThreadFunctor
Function object that wraps the user-supplied function to run in a separate thread.