19 #include <boost/filesystem.hpp>
21 #include <gutil/strings/substitute.h>
25 using boost::filesystem::create_directory;
26 using boost::filesystem::exists;
27 using boost::filesystem::remove;
28 using namespace impala;
29 using namespace strings;
41 active_cgroups_metric_ =
42 metrics->
AddCounter<int64_t>(
"cgroups-mgr.active-cgroups", 0);
46 const string& staging_cgroup) {
47 cgroups_hierarchy_path_ = cgroups_hierarchy_path;
48 staging_cgroup_ = staging_cgroup;
55 if (unique_id.empty())
return "";
60 if (v_cpu_cores <= 0)
return -1;
65 const string& cgroup_path = Substitute(
"$0/$1", cgroups_hierarchy_path_, cgroup);
68 if (!create_directory(cgroup_path) && !if_not_exists) {
70 err_msg <<
"Failed to create CGroup at path " << cgroup_path
71 <<
". Path already exists.";
72 return Status(err_msg.str());
74 LOG(INFO) <<
"Created CGroup " << cgroup_path;
75 }
catch (std::exception& e) {
77 err_msg <<
"Failed to create CGroup at path " << cgroup_path <<
". " << e.what();
78 return Status(err_msg.str());
84 const string& cgroup_path = Substitute(
"$0/$1", cgroups_hierarchy_path_, cgroup);
85 LOG(INFO) <<
"Dropping CGroup " << cgroups_hierarchy_path_ <<
" " << cgroup;
87 if(!
remove(cgroup_path) && !if_exists) {
89 err_msg <<
"Failed to create CGroup at path " << cgroup_path
90 <<
". Path does not exist.";
91 return Status(err_msg.str());
93 }
catch (std::exception& e) {
95 err_msg <<
"Failed to drop CGroup at path " << cgroup_path <<
". " << e.what();
96 return Status(err_msg.str());
106 const string& cpu_shares_path = Substitute(
"$0/$1", cgroup_path,
"cpu.shares");
107 ofstream cpu_shares(tasks_path.c_str(), ios::out | ios::trunc);
108 if (!cpu_shares.is_open()) {
109 stringstream err_msg;
110 err_msg <<
"CGroup CPU shares file: " << cpu_shares_path
111 <<
" is not writable by Impala";
112 return Status(err_msg.str());
115 LOG(INFO) <<
"Setting CPU shares of CGroup " << cgroup_path <<
" to " << num_shares;
116 cpu_shares << num_shares << endl;
121 std::string* cgroup_path, std::string* tasks_path)
const {
122 stringstream cgroup_path_ss;
123 cgroup_path_ss << cgroups_hierarchy_path_ <<
"/" << cgroup;
124 *cgroup_path = cgroup_path_ss.str();
125 if (!exists(*cgroup_path)) {
126 stringstream err_msg;
127 err_msg <<
"CGroup " << *cgroup_path <<
" does not exist";
128 return Status(err_msg.str());
131 stringstream tasks_path_ss;
132 tasks_path_ss << *cgroup_path <<
"/tasks";
133 *tasks_path = tasks_path_ss.str();
134 if (!exists(*tasks_path)) {
135 stringstream err_msg;
136 err_msg <<
"CGroup " << *cgroup_path <<
" does not have a /tasks file";
137 return Status(err_msg.str());
143 const string& cgroup)
const {
148 ofstream tasks(tasks_path.c_str(), ios::out | ios::app);
149 if (!tasks.is_open()) {
150 stringstream err_msg;
151 err_msg <<
"CGroup tasks file: " << tasks_path <<
" is not writable by Impala";
152 return Status(err_msg.str());
154 tasks << thread.
tid() << endl;
156 VLOG_ROW <<
"Thread " << thread.
tid() <<
" moved to CGroup " << cgroup_path;
162 const string& dst_cgroup)
const {
163 string src_cgroup_path;
164 string src_tasks_path;
165 RETURN_IF_ERROR(GetCgroupPaths(src_cgroup, &src_cgroup_path, &src_tasks_path));
167 string dst_cgroup_path;
168 string dst_tasks_path;
169 RETURN_IF_ERROR(GetCgroupPaths(dst_cgroup, &dst_cgroup_path, &dst_tasks_path));
171 ifstream src_tasks(src_tasks_path.c_str());
173 stringstream err_msg;
174 err_msg <<
"Failed to open source CGroup tasks file at: " << src_tasks_path;
175 return Status(err_msg.str());
178 ofstream dst_tasks(dst_tasks_path.c_str(), ios::out | ios::app);
180 stringstream err_msg;
181 err_msg <<
"Failed to open destination CGroup tasks file at: " << dst_tasks_path;
182 return Status(err_msg.str());
186 while (src_tasks >> tid) {
187 dst_tasks << tid << endl;
191 VLOG_ROW <<
"Relocating thread id " << tid <<
" from " << src_tasks_path
192 <<
" to " << dst_tasks_path;
199 const string& cgroup,
bool* is_first) {
200 if (cgroup.empty() || cgroups_hierarchy_path_.empty())
return Status::OK;
202 LOG(INFO) <<
"Registering fragment " <<
PrintId(fragment_instance_id)
203 <<
" with CGroup " << cgroups_hierarchy_path_ <<
"/" << cgroup;
204 lock_guard<mutex> l(active_cgroups_lock_);
205 if (++active_cgroups_[cgroup] == 1) {
208 active_cgroups_metric_->Increment(1);
216 const string& cgroup) {
217 if (cgroup.empty() || cgroups_hierarchy_path_.empty())
return Status::OK;
219 LOG(INFO) <<
"Unregistering fragment " <<
PrintId(fragment_instance_id)
220 <<
" from CGroup " << cgroups_hierarchy_path_ <<
"/" << cgroup;
221 lock_guard<mutex> l(active_cgroups_lock_);
222 unordered_map<string, int32_t>::iterator entry = active_cgroups_.find(cgroup);
223 DCHECK(entry != active_cgroups_.end());
225 int32_t* ref_count = &entry->second;
227 if (*ref_count == 0) {
230 active_cgroups_metric_->Increment(-1);
231 active_cgroups_.erase(entry);
SimpleMetric< T, TMetricKind::COUNTER > * AddCounter(const std::string &key, const T &value, const TUnit::type unit=TUnit::UNIT, const std::string &description="")
int32_t VirtualCoresToCpuShares(int16_t v_cpu_cores)
Status CreateCgroup(const std::string &cgroup, bool if_not_exists) const
Status DropCgroup(const std::string &cgroup, bool if_exists) const
Status RegisterFragment(const TUniqueId &fragment_instance_id, const std::string &cgroup, bool *is_first)
TODO: Consider allowing fragment IDs as category parameters.
std::string UniqueIdToCgroup(const std::string &unique_id) const
#define RETURN_IF_ERROR(stmt)
some generally useful macros
MetricGroups may be organised hierarchically as a tree.
const int32_t CPU_DEFAULT_WEIGHT
Status SetCpuShares(const std::string &cgroup, int32_t num_shares)
string PrintId(const TUniqueId &id, const string &separator)
CgroupsMgr(MetricGroup *metrics)
Status Init(const std::string &cgroups_hierarchy_path, const std::string &staging_cgroup)
const std::string IMPALA_CGROUP_SUFFIX
Status AssignThreadToCgroup(const Thread &thread, const std::string &cgroup) const
Status RelocateThreads(const std::string &src_cgroup, const std::string &dst_cgroup) const
Status GetCgroupPaths(const std::string &cgroup, std::string *cgroup_path, std::string *tasks_path) const
Status UnregisterFragment(const TUniqueId &fragment_instance_id, const std::string &cgroup)