Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
cgroups-mgr.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "util/cgroups-mgr.h"
16 
17 #include <fstream>
18 #include <sstream>
19 #include <boost/filesystem.hpp>
20 #include "util/debug-util.h"
21 #include <gutil/strings/substitute.h>
22 
23 #include "common/names.h"
24 
25 using boost::filesystem::create_directory;
26 using boost::filesystem::exists;
27 using boost::filesystem::remove;
28 using namespace impala;
29 using namespace strings;
30 
31 namespace impala {
32 
33 // Suffix appended to Yarn resource ids to form an Impala-internal cgroups.
34 const std::string IMPALA_CGROUP_SUFFIX = "_impala";
35 
36 // Yarn's default multiplier for translating virtual CPU cores into cgroup CPU shares.
37 // See Yarn's CgroupsLCEResourcesHandler.java for more details.
38 const int32_t CPU_DEFAULT_WEIGHT = 1024;
39 
41  active_cgroups_metric_ =
42  metrics->AddCounter<int64_t>("cgroups-mgr.active-cgroups", 0);
43 }
44 
45 Status CgroupsMgr::Init(const string& cgroups_hierarchy_path,
46  const string& staging_cgroup) {
47  cgroups_hierarchy_path_ = cgroups_hierarchy_path;
48  staging_cgroup_ = staging_cgroup;
49  // Set up the staging cgroup for Impala to retire execution threads into.
50  RETURN_IF_ERROR(CreateCgroup(staging_cgroup, true));
51  return Status::OK;
52 }
53 
54 string CgroupsMgr::UniqueIdToCgroup(const string& unique_id) const {
55  if (unique_id.empty()) return "";
56  return unique_id + IMPALA_CGROUP_SUFFIX;
57 }
58 
59 int32_t CgroupsMgr::VirtualCoresToCpuShares(int16_t v_cpu_cores) {
60  if (v_cpu_cores <= 0) return -1;
61  return CPU_DEFAULT_WEIGHT * v_cpu_cores;
62 }
63 
64 Status CgroupsMgr::CreateCgroup(const string& cgroup, bool if_not_exists) const {
65  const string& cgroup_path = Substitute("$0/$1", cgroups_hierarchy_path_, cgroup);
66  try {
67  // Returns false if the dir already exists, otherwise throws an exception.
68  if (!create_directory(cgroup_path) && !if_not_exists) {
69  stringstream err_msg;
70  err_msg << "Failed to create CGroup at path " << cgroup_path
71  << ". Path already exists.";
72  return Status(err_msg.str());
73  }
74  LOG(INFO) << "Created CGroup " << cgroup_path;
75  } catch (std::exception& e) {
76  stringstream err_msg;
77  err_msg << "Failed to create CGroup at path " << cgroup_path << ". " << e.what();
78  return Status(err_msg.str());
79  }
80  return Status::OK;
81 }
82 
83 Status CgroupsMgr::DropCgroup(const string& cgroup, bool if_exists) const {
84  const string& cgroup_path = Substitute("$0/$1", cgroups_hierarchy_path_, cgroup);
85  LOG(INFO) << "Dropping CGroup " << cgroups_hierarchy_path_ << " " << cgroup;
86  try {
87  if(!remove(cgroup_path) && !if_exists) {
88  stringstream err_msg;
89  err_msg << "Failed to create CGroup at path " << cgroup_path
90  << ". Path does not exist.";
91  return Status(err_msg.str());
92  }
93  } catch (std::exception& e) {
94  stringstream err_msg;
95  err_msg << "Failed to drop CGroup at path " << cgroup_path << ". " << e.what();
96  return Status(err_msg.str());
97  }
98  return Status::OK;
99 }
100 
101 Status CgroupsMgr::SetCpuShares(const string& cgroup, int32_t num_shares) {
102  string cgroup_path;
103  string tasks_path;
104  RETURN_IF_ERROR(GetCgroupPaths(cgroup, &cgroup_path, &tasks_path));
105 
106  const string& cpu_shares_path = Substitute("$0/$1", cgroup_path, "cpu.shares");
107  ofstream cpu_shares(tasks_path.c_str(), ios::out | ios::trunc);
108  if (!cpu_shares.is_open()) {
109  stringstream err_msg;
110  err_msg << "CGroup CPU shares file: " << cpu_shares_path
111  << " is not writable by Impala";
112  return Status(err_msg.str());
113  }
114 
115  LOG(INFO) << "Setting CPU shares of CGroup " << cgroup_path << " to " << num_shares;
116  cpu_shares << num_shares << endl;
117  return Status::OK;
118 }
119 
120 Status CgroupsMgr::GetCgroupPaths(const std::string& cgroup,
121  std::string* cgroup_path, std::string* tasks_path) const {
122  stringstream cgroup_path_ss;
123  cgroup_path_ss << cgroups_hierarchy_path_ << "/" << cgroup;
124  *cgroup_path = cgroup_path_ss.str();
125  if (!exists(*cgroup_path)) {
126  stringstream err_msg;
127  err_msg << "CGroup " << *cgroup_path << " does not exist";
128  return Status(err_msg.str());
129  }
130 
131  stringstream tasks_path_ss;
132  tasks_path_ss << *cgroup_path << "/tasks";
133  *tasks_path = tasks_path_ss.str();
134  if (!exists(*tasks_path)) {
135  stringstream err_msg;
136  err_msg << "CGroup " << *cgroup_path << " does not have a /tasks file";
137  return Status(err_msg.str());
138  }
139  return Status::OK;
140 }
141 
143  const string& cgroup) const {
144  string cgroup_path;
145  string tasks_path;
146  RETURN_IF_ERROR(GetCgroupPaths(cgroup, &cgroup_path, &tasks_path));
147 
148  ofstream tasks(tasks_path.c_str(), ios::out | ios::app);
149  if (!tasks.is_open()) {
150  stringstream err_msg;
151  err_msg << "CGroup tasks file: " << tasks_path << " is not writable by Impala";
152  return Status(err_msg.str());
153  }
154  tasks << thread.tid() << endl;
155 
156  VLOG_ROW << "Thread " << thread.tid() << " moved to CGroup " << cgroup_path;
157  tasks.close();
158  return Status::OK;
159 }
160 
161 Status CgroupsMgr::RelocateThreads(const string& src_cgroup,
162  const string& dst_cgroup) const {
163  string src_cgroup_path;
164  string src_tasks_path;
165  RETURN_IF_ERROR(GetCgroupPaths(src_cgroup, &src_cgroup_path, &src_tasks_path));
166 
167  string dst_cgroup_path;
168  string dst_tasks_path;
169  RETURN_IF_ERROR(GetCgroupPaths(dst_cgroup, &dst_cgroup_path, &dst_tasks_path));
170 
171  ifstream src_tasks(src_tasks_path.c_str());
172  if (!src_tasks) {
173  stringstream err_msg;
174  err_msg << "Failed to open source CGroup tasks file at: " << src_tasks_path;
175  return Status(err_msg.str());
176  }
177 
178  ofstream dst_tasks(dst_tasks_path.c_str(), ios::out | ios::app);
179  if (!dst_tasks) {
180  stringstream err_msg;
181  err_msg << "Failed to open destination CGroup tasks file at: " << dst_tasks_path;
182  return Status(err_msg.str());
183  }
184 
185  int32_t tid;
186  while (src_tasks >> tid) {
187  dst_tasks << tid << endl;
188  // Attempting to write a non-existent tid/pid will result in an error,
189  // so clear the error flags after every append.
190  dst_tasks.clear();
191  VLOG_ROW << "Relocating thread id " << tid << " from " << src_tasks_path
192  << " to " << dst_tasks_path;
193  }
194 
195  return Status::OK;
196 }
197 
198 Status CgroupsMgr::RegisterFragment(const TUniqueId& fragment_instance_id,
199  const string& cgroup, bool* is_first) {
200  if (cgroup.empty() || cgroups_hierarchy_path_.empty()) return Status::OK;
201 
202  LOG(INFO) << "Registering fragment " << PrintId(fragment_instance_id)
203  << " with CGroup " << cgroups_hierarchy_path_ << "/" << cgroup;
204  lock_guard<mutex> l(active_cgroups_lock_);
205  if (++active_cgroups_[cgroup] == 1) {
206  *is_first = true;
207  RETURN_IF_ERROR(CreateCgroup(cgroup, false));
208  active_cgroups_metric_->Increment(1);
209  } else {
210  *is_first = false;
211  }
212  return Status::OK;
213 }
214 
215 Status CgroupsMgr::UnregisterFragment(const TUniqueId& fragment_instance_id,
216  const string& cgroup) {
217  if (cgroup.empty() || cgroups_hierarchy_path_.empty()) return Status::OK;
218 
219  LOG(INFO) << "Unregistering fragment " << PrintId(fragment_instance_id)
220  << " from CGroup " << cgroups_hierarchy_path_ << "/" << cgroup;
221  lock_guard<mutex> l(active_cgroups_lock_);
222  unordered_map<string, int32_t>::iterator entry = active_cgroups_.find(cgroup);
223  DCHECK(entry != active_cgroups_.end());
224 
225  int32_t* ref_count = &entry->second;
226  --(*ref_count);
227  if (*ref_count == 0) {
228  RETURN_IF_ERROR(RelocateThreads(cgroup, staging_cgroup_));
229  RETURN_IF_ERROR(DropCgroup(cgroup, false));
230  active_cgroups_metric_->Increment(-1);
231  active_cgroups_.erase(entry);
232  }
233  return Status::OK;
234 }
235 
236 }
SimpleMetric< T, TMetricKind::COUNTER > * AddCounter(const std::string &key, const T &value, const TUnit::type unit=TUnit::UNIT, const std::string &description="")
Definition: metrics.h:239
int32_t VirtualCoresToCpuShares(int16_t v_cpu_cores)
Definition: cgroups-mgr.cc:59
Status CreateCgroup(const std::string &cgroup, bool if_not_exists) const
Definition: cgroups-mgr.cc:64
Status DropCgroup(const std::string &cgroup, bool if_exists) const
Definition: cgroups-mgr.cc:83
Status RegisterFragment(const TUniqueId &fragment_instance_id, const std::string &cgroup, bool *is_first)
Definition: cgroups-mgr.cc:198
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
std::string UniqueIdToCgroup(const std::string &unique_id) const
Definition: cgroups-mgr.cc:54
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
const int32_t CPU_DEFAULT_WEIGHT
Definition: cgroups-mgr.cc:38
Status SetCpuShares(const std::string &cgroup, int32_t num_shares)
Definition: cgroups-mgr.cc:101
string PrintId(const TUniqueId &id, const string &separator)
Definition: debug-util.cc:97
CgroupsMgr(MetricGroup *metrics)
Definition: cgroups-mgr.cc:40
Status Init(const std::string &cgroups_hierarchy_path, const std::string &staging_cgroup)
Definition: cgroups-mgr.cc:45
const std::string IMPALA_CGROUP_SUFFIX
Definition: cgroups-mgr.cc:34
Status AssignThreadToCgroup(const Thread &thread, const std::string &cgroup) const
Definition: cgroups-mgr.cc:142
#define VLOG_ROW
Definition: logging.h:59
int64_t tid() const
Definition: thread.h:106
Status RelocateThreads(const std::string &src_cgroup, const std::string &dst_cgroup) const
Definition: cgroups-mgr.cc:161
static const Status OK
Definition: status.h:87
Status GetCgroupPaths(const std::string &cgroup, std::string *cgroup_path, std::string *tasks_path) const
Definition: cgroups-mgr.cc:120
Status UnregisterFragment(const TUniqueId &fragment_instance_id, const std::string &cgroup)
Definition: cgroups-mgr.cc:215