Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
thread.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef IMPALA_UTIL_THREAD_H
16 #define IMPALA_UTIL_THREAD_H
17 
18 #include <boost/bind.hpp>
19 #include <boost/function.hpp>
20 #include <boost/scoped_ptr.hpp>
21 #include <boost/thread.hpp>
22 #include <boost/ptr_container/ptr_vector.hpp>
23 
24 #include "common/status.h"
25 #include "util/promise.h"
26 
27 namespace impala {
28 
29 class MetricGroup;
30 class Webserver;
31 class CgroupsMgr;
32 
39 //
43 //
45 class Thread {
46  public:
51  //
60  template <class F>
61  Thread(const std::string& category, const std::string& name, const F& f)
62  : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
63  StartThread(f);
64  }
65 
66  template <class F, class A1>
67  Thread(const std::string& category, const std::string& name, const F& f, const A1& a1)
68  : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
69  StartThread(boost::bind(f, a1));
70  }
71 
72  template <class F, class A1, class A2>
73  Thread(const std::string& category, const std::string& name, const F& f,
74  const A1& a1, const A2& a2)
75  : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
76  StartThread(boost::bind(f, a1, a2));
77  }
78 
79  template <class F, class A1, class A2, class A3>
80  Thread(const std::string& category, const std::string& name, const F& f,
81  const A1& a1, const A2& a2, const A3& a3)
82  : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
83  StartThread(boost::bind(f, a1, a2, a3));
84  }
85 
86  template <class F, class A1, class A2, class A3, class A4>
87  Thread(const std::string& category, const std::string& name, const F& f,
88  const A1& a1, const A2& a2, const A3& a3, const A4& a4)
89  : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
90  StartThread(boost::bind(f, a1, a2, a3, a4));
91  }
92 
93  template <class F, class A1, class A2, class A3, class A4, class A5>
94  Thread(const std::string& category, const std::string& name, const F& f,
95  const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5)
96  : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
97  StartThread(boost::bind(f, a1, a2, a3, a4, a5));
98  }
99 
102  void Join() const { thread_->join(); }
103 
106  int64_t tid() const { return tid_; }
107 
108  static const int64_t INVALID_THREAD_ID = -1;
109 
110  private:
114  static const int64_t UNINITIALISED_THREAD_ID = -2;
115 
117  typedef boost::function<void ()> ThreadFunctor;
118 
120  boost::scoped_ptr<boost::thread> thread_;
121 
123  const std::string category_;
124  const std::string name_;
125 
129  int64_t tid_;
130 
134  void StartThread(const ThreadFunctor& functor);
135 
139  //
151  //
155  static void SuperviseThread(const std::string& name, const std::string& category,
156  ThreadFunctor functor, Promise<int64_t>* thread_started);
157 };
158 
161 class ThreadGroup {
162  public:
164 
165  ThreadGroup(CgroupsMgr* cgroups_mgr, const std::string& cgroup)
166  : cgroups_mgr_(cgroups_mgr), cgroup_path_(cgroup) { }
167 
174  Status AddThread(Thread* thread);
175 
178  void JoinAll();
179 
184  Status SetCgroup(const std::string& cgroup);
185 
186  void SetCgroupsMgr(CgroupsMgr* cgroups_mgr) { cgroups_mgr_ = cgroups_mgr; }
187 
188  private:
190  boost::ptr_vector<Thread> threads_;
191 
194 
197  std::string cgroup_path_;
198 };
199 
201 void InitThreading();
202 
206 }
207 
208 #endif
boost::ptr_vector< Thread > threads_
All the threads grouped by this set.
Definition: thread.h:190
const std::string name_
Definition: thread.h:124
ThreadGroup(CgroupsMgr *cgroups_mgr, const std::string &cgroup)
Definition: thread.h:165
Thread(const std::string &category, const std::string &name, const F &f)
Definition: thread.h:61
Status SetCgroup(const std::string &cgroup)
Definition: thread.cc:333
Thread(const std::string &category, const std::string &name, const F &f, const A1 &a1, const A2 &a2)
Definition: thread.h:73
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
static void SuperviseThread(const std::string &name, const std::string &category, ThreadFunctor functor, Promise< int64_t > *thread_started)
Definition: thread.cc:288
int64_t tid_
Definition: thread.h:129
static const int64_t UNINITIALISED_THREAD_ID
Definition: thread.h:114
Thread(const std::string &category, const std::string &name, const F &f, const A1 &a1, const A2 &a2, const A3 &a3, const A4 &a4, const A5 &a5)
Definition: thread.h:94
void StartThread(const ThreadFunctor &functor)
Definition: thread.cc:270
Status StartThreadInstrumentation(MetricGroup *metrics, Webserver *webserver)
Definition: thread.cc:266
static const int64_t INVALID_THREAD_ID
Definition: thread.h:108
const std::string category_
Name and category for this thread.
Definition: thread.h:123
void InitThreading()
Initialises the threading subsystem. Must be called before a Thread is created.
Definition: thread.cc:261
Thread(const std::string &category, const std::string &name, const F &f, const A1 &a1)
Definition: thread.h:67
boost::scoped_ptr< boost::thread > thread_
The actual thread object that runs the user's method via SuperviseThread().
Definition: thread.h:120
int64_t tid() const
Definition: thread.h:106
void SetCgroupsMgr(CgroupsMgr *cgroups_mgr)
Definition: thread.h:186
Status AddThread(Thread *thread)
Definition: thread.cc:318
CgroupsMgr * cgroups_mgr_
Cgroups manager for assigning threads in this group to cgroups. Not owned.
Definition: thread.h:193
void Join() const
Definition: thread.h:102
string name
Definition: cpu-info.cc:50
std::string cgroup_path_
Definition: thread.h:197
Thread(const std::string &category, const std::string &name, const F &f, const A1 &a1, const A2 &a2, const A3 &a3, const A4 &a4)
Definition: thread.h:87
Thread(const std::string &category, const std::string &name, const F &f, const A1 &a1, const A2 &a2, const A3 &a3)
Definition: thread.h:80
boost::function< void()> ThreadFunctor
Function object that wraps the user-supplied function to run in a separate thread.
Definition: thread.h:117