Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
mem-tracker.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "runtime/mem-tracker.h"
16 
17 #include <boost/algorithm/string/join.hpp>
18 #include <google/malloc_extension.h>
19 #include <gutil/strings/substitute.h>
20 
21 #include "runtime/exec-env.h"
24 #include "util/debug-util.h"
25 #include "util/mem-info.h"
26 #include "util/pretty-printer.h"
27 #include "util/uid-util.h"
28 
29 #include "common/names.h"
30 
31 using boost::algorithm::join;
32 using namespace strings;
33 
34 namespace impala {
35 
36 const string MemTracker::COUNTER_NAME = "PeakMemoryUsage";
37 MemTracker::RequestTrackersMap MemTracker::request_to_mem_trackers_;
38 MemTracker::PoolTrackersMap MemTracker::pool_to_mem_trackers_;
39 mutex MemTracker::static_mem_trackers_lock_;
40 
41 AtomicInt<int64_t> MemTracker::released_memory_since_gc_;
42 
43 // Name for request pool MemTrackers. '$0' is replaced with the pool name.
44 const string REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT = "RequestPool=$0";
45 
46 MemTracker::MemTracker(int64_t byte_limit, int64_t rm_reserved_limit, const string& label,
47  MemTracker* parent, bool log_usage_if_zero)
48  : limit_(byte_limit),
49  rm_reserved_limit_(rm_reserved_limit),
50  label_(label),
51  parent_(parent),
52  consumption_(&local_counter_),
53  local_counter_(TUnit::BYTES),
54  consumption_metric_(NULL),
55  auto_unregister_(false),
56  enable_logging_(false),
57  log_stack_(false),
58  log_usage_if_zero_(log_usage_if_zero),
59  query_resource_mgr_(NULL),
60  num_gcs_metric_(NULL),
61  bytes_freed_by_last_gc_metric_(NULL),
62  bytes_over_limit_metric_(NULL) {
63  if (parent != NULL) parent_->AddChildTracker(this);
64  Init();
65 }
66 
67 MemTracker::MemTracker(
68  RuntimeProfile* profile, int64_t byte_limit, int64_t rm_reserved_limit,
69  const std::string& label, MemTracker* parent)
70  : limit_(byte_limit),
71  rm_reserved_limit_(rm_reserved_limit),
72  label_(label),
73  parent_(parent),
74  consumption_(profile->AddHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES)),
75  local_counter_(TUnit::BYTES),
76  consumption_metric_(NULL),
77  auto_unregister_(false),
78  enable_logging_(false),
79  log_stack_(false),
80  log_usage_if_zero_(true),
81  query_resource_mgr_(NULL),
82  num_gcs_metric_(NULL),
83  bytes_freed_by_last_gc_metric_(NULL),
84  bytes_over_limit_metric_(NULL) {
85  if (parent != NULL) parent_->AddChildTracker(this);
86  Init();
87 }
88 
89 MemTracker::MemTracker(UIntGauge* consumption_metric,
90  int64_t byte_limit, int64_t rm_reserved_limit, const string& label)
91  : limit_(byte_limit),
92  rm_reserved_limit_(rm_reserved_limit),
93  label_(label),
94  parent_(NULL),
95  consumption_(&local_counter_),
96  local_counter_(TUnit::BYTES),
97  consumption_metric_(consumption_metric),
98  auto_unregister_(false),
99  enable_logging_(false),
100  log_stack_(false),
101  log_usage_if_zero_(true),
102  query_resource_mgr_(NULL),
103  num_gcs_metric_(NULL),
104  bytes_freed_by_last_gc_metric_(NULL),
105  bytes_over_limit_metric_(NULL) {
106  Init();
107 }
108 
110  DCHECK(rm_reserved_limit_ == -1 || limit_ == -1 || rm_reserved_limit_ <= limit_);
111  // populate all_trackers_ and limit_trackers_
112  MemTracker* tracker = this;
113  while (tracker != NULL) {
114  all_trackers_.push_back(tracker);
115  if (tracker->has_limit()) limit_trackers_.push_back(tracker);
116  tracker = tracker->parent_;
117  }
118  DCHECK_GT(all_trackers_.size(), 0);
119  DCHECK_EQ(all_trackers_[0], this);
120 }
121 
123  lock_guard<mutex> l(child_trackers_lock_);
124  tracker->child_tracker_it_ = child_trackers_.insert(child_trackers_.end(), tracker);
125 }
126 
128  DCHECK(parent_ != NULL);
129  lock_guard<mutex> l(parent_->child_trackers_lock_);
132 }
133 
135  MemTracker* parent) {
136  DCHECK(!pool_name.empty());
137  lock_guard<mutex> l(static_mem_trackers_lock_);
138  PoolTrackersMap::iterator it = pool_to_mem_trackers_.find(pool_name);
139  if (it != pool_to_mem_trackers_.end()) {
140  MemTracker* tracker = it->second;
141  DCHECK(pool_name == tracker->pool_name_);
142  return tracker;
143  } else {
144  if (parent == NULL) return NULL;
145  // First time this pool_name registered, make a new object.
146  MemTracker* tracker = new MemTracker(-1, -1,
147  Substitute(REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT, pool_name),
148  parent);
149  tracker->auto_unregister_ = true;
150  tracker->pool_name_ = pool_name;
151  pool_to_mem_trackers_[pool_name] = tracker;
152  return tracker;
153  }
154 }
155 
156 shared_ptr<MemTracker> MemTracker::GetQueryMemTracker(
157  const TUniqueId& id, int64_t byte_limit, int64_t rm_reserved_limit, MemTracker* parent,
158  QueryResourceMgr* res_mgr) {
159  if (byte_limit != -1) {
160  if (byte_limit > MemInfo::physical_mem()) {
161  LOG(WARNING) << "Memory limit "
162  << PrettyPrinter::Print(byte_limit, TUnit::BYTES)
163  << " exceeds physical memory of "
164  << PrettyPrinter::Print(MemInfo::physical_mem(), TUnit::BYTES);
165  }
166  VLOG_QUERY << "Using query memory limit: "
167  << PrettyPrinter::Print(byte_limit, TUnit::BYTES);
168  }
169 
170  lock_guard<mutex> l(static_mem_trackers_lock_);
171  RequestTrackersMap::iterator it = request_to_mem_trackers_.find(id);
172  if (it != request_to_mem_trackers_.end()) {
173  // Return the existing MemTracker object for this id, converting the weak ptr
174  // to a shared ptr.
175  shared_ptr<MemTracker> tracker = it->second.lock();
176  DCHECK_EQ(tracker->limit_, byte_limit);
177  DCHECK(id == tracker->query_id_);
178  DCHECK(parent == tracker->parent_);
179  return tracker;
180  } else {
181  // First time this id registered, make a new object. Give a shared ptr to
182  // the caller and put a weak ptr in the map.
183  shared_ptr<MemTracker> tracker(new MemTracker(byte_limit, rm_reserved_limit,
184  Substitute("Query($0) Limit", lexical_cast<string>(id)), parent));
185  tracker->auto_unregister_ = true;
186  tracker->query_id_ = id;
188  if (res_mgr != NULL) tracker->SetQueryResourceMgr(res_mgr);
189  return tracker;
190  }
191 }
192 
194  lock_guard<mutex> l(static_mem_trackers_lock_);
196  // Erase the weak ptr reference from the map.
198  // Per-pool trackers should live the entire lifetime of the impalad process, but
199  // remove the element from the map in case this changes in the future.
201 }
202 
203 void MemTracker::RegisterMetrics(MetricGroup* metrics, const string& prefix) {
204  num_gcs_metric_ = metrics->AddCounter(
205  Substitute("$0.num-gcs", prefix), 0L, TUnit::UNIT);
206 
207  // TODO: Consider a total amount of bytes freed counter
208  bytes_freed_by_last_gc_metric_ = metrics->AddGauge<int64_t>(
209  Substitute("$0.bytes-freed-by-last-gc", prefix), -1, TUnit::BYTES);
210 
211  bytes_over_limit_metric_ = metrics->AddGauge<int64_t>(
212  Substitute("$0.bytes-over-limit", prefix), -1, TUnit::BYTES);
213 }
214 
215 // Calling this on the query tracker results in output like:
216 // Query Limit: memory limit exceeded. Limit=100.00 MB Consumption=106.19 MB
217 // Fragment 5b45e83bbc2d92bd:d3ff8a7df7a2f491: Consumption=52.00 KB
218 // AGGREGATION_NODE (id=6): Consumption=44.00 KB
219 // EXCHANGE_NODE (id=5): Consumption=0.00
220 // DataStreamMgr: Consumption=0.00
221 // Fragment 5b45e83bbc2d92bd:d3ff8a7df7a2f492: Consumption=100.00 KB
222 // AGGREGATION_NODE (id=2): Consumption=36.00 KB
223 // AGGREGATION_NODE (id=4): Consumption=40.00 KB
224 // EXCHANGE_NODE (id=3): Consumption=0.00
225 // DataStreamMgr: Consumption=0.00
226 // DataStreamSender: Consumption=16.00 KB
227 string MemTracker::LogUsage(const string& prefix) const {
228  if (!log_usage_if_zero_ && consumption() == 0) return "";
229 
230  stringstream ss;
231  ss << prefix << label_ << ":";
232  if (CheckLimitExceeded()) ss << " memory limit exceeded.";
233  if (limit_ > 0) ss << " Limit=" << PrettyPrinter::Print(limit_, TUnit::BYTES);
234  ss << " Consumption=" << PrettyPrinter::Print(consumption(), TUnit::BYTES);
235 
236  stringstream prefix_ss;
237  prefix_ss << prefix << " ";
238  string new_prefix = prefix_ss.str();
239  lock_guard<mutex> l(child_trackers_lock_);
240  string child_trackers_usage = LogUsage(new_prefix, child_trackers_);
241  if (!child_trackers_usage.empty()) ss << "\n" << child_trackers_usage;
242  return ss.str();
243 }
244 
245 string MemTracker::LogUsage(const string& prefix, const list<MemTracker*>& trackers) {
246  vector<string> usage_strings;
247  for (list<MemTracker*>::const_iterator it = trackers.begin();
248  it != trackers.end(); ++it) {
249  string usage_string = (*it)->LogUsage(prefix);
250  if (!usage_string.empty()) usage_strings.push_back(usage_string);
251  }
252  return join(usage_strings, "\n");
253 }
254 
255 void MemTracker::LogUpdate(bool is_consume, int64_t bytes) const {
256  stringstream ss;
257  ss << this << " " << (is_consume ? "Consume: " : "Release: ") << bytes
258  << " Consumption: " << consumption() << " Limit: " << limit_;
259  if (log_stack_) ss << endl << GetStackTrace();
260  LOG(ERROR) << ss.str();
261 }
262 
263 bool MemTracker::GcMemory(int64_t max_consumption) {
264  if (max_consumption < 0) return true;
265  lock_guard<SpinLock> l(gc_lock_);
267  uint64_t pre_gc_consumption = consumption();
268  // Check if someone gc'd before us
269  if (pre_gc_consumption < max_consumption) return false;
270  if (num_gcs_metric_ != NULL) num_gcs_metric_->Increment(1);
271 
272  // Try to free up some memory
273  for (int i = 0; i < gc_functions_.size(); ++i) {
274  gc_functions_[i]();
276  if (consumption() <= max_consumption) break;
277  }
278 
279  if (bytes_freed_by_last_gc_metric_ != NULL) {
280  bytes_freed_by_last_gc_metric_->set_value(pre_gc_consumption - consumption());
281  }
282  return consumption() > max_consumption;
283 }
284 
286 #ifndef ADDRESS_SANITIZER
288  MallocExtension::instance()->ReleaseFreeMemory();
289 #else
290  // Nothing to do if not using tcmalloc.
291 #endif
292 }
293 
294 bool MemTracker::ExpandRmReservation(int64_t bytes) {
295  if (query_resource_mgr_ == NULL || rm_reserved_limit_ == -1) return false;
296  // TODO: Make this asynchronous after IO mgr changes to use TryConsume() are done.
297  lock_guard<mutex> l(resource_acquisition_lock_);
298  int64_t requested = consumption_->current_value() + bytes;
299  // Can't exceed the hard limit under any circumstance
300  if (requested >= limit_ && limit_ != -1) return false;
301  // Test to see if we can satisfy the limit anyhow; maybe a different request was already
302  // in flight.
303  if (requested < rm_reserved_limit_) return true;
304 
305  TResourceBrokerExpansionRequest exp;
306  query_resource_mgr_->CreateExpansionRequest(max(1L, bytes / (1024 * 1024)), 0L, &exp);
307 
308  TResourceBrokerExpansionResponse response;
309  Status status = ExecEnv::GetInstance()->resource_broker()->Expand(exp, &response);
310  if (!status.ok()) {
311  LOG(INFO) << "Failed to expand memory limit by "
312  << PrettyPrinter::Print(bytes, TUnit::BYTES) << ": "
313  << status.GetDetail();
314  return false;
315  }
316 
317  DCHECK(response.allocated_resources.size() == 1) << "Got more resources than expected";
318  const llama::TAllocatedResource& resource =
319  response.allocated_resources.begin()->second;
320  DCHECK(resource.v_cpu_cores == 0L) << "Unexpected VCPUs returned by Llama";
321 
322  // Finally, check whether the allocation that we got took us over the limits for any of
323  // our ancestors.
324  int64_t bytes_allocated = resource.memory_mb * 1024L * 1024L;
325  BOOST_FOREACH(const MemTracker* tracker, all_trackers_) {
326  if (tracker == this) continue;
327  if (tracker->consumption_->current_value() + bytes_allocated > tracker->limit_) {
328  // Don't adjust our limit; rely on query tear-down to release the resource.
329  return false;
330  }
331  }
332 
333  rm_reserved_limit_ += bytes_allocated;
334  // Resource broker might give us more than we ask for
336  return true;
337 }
338 
339 }
SimpleMetric< T, TMetricKind::COUNTER > * AddCounter(const std::string &key, const T &value, const TUnit::type unit=TUnit::UNIT, const std::string &description="")
Definition: metrics.h:239
static boost::shared_ptr< MemTracker > GetQueryMemTracker(const TUniqueId &id, int64_t byte_limit, int64_t rm_reserved_limit, MemTracker *parent, QueryResourceMgr *res_mgr)
Definition: mem-tracker.cc:156
MemTracker(int64_t byte_limit=-1, int64_t rm_reserved_limit=-1, const std::string &label=std::string(), MemTracker *parent=NULL, bool log_usage_if_zero=true)
UIntGauge * consumption_metric_
Definition: mem-tracker.h:423
int64_t consumption() const
Returns the memory consumed in bytes.
Definition: mem-tracker.h:298
const std::string GetDetail() const
Definition: status.cc:184
std::string pool_name_
Only valid for MemTrackers returned from GetRequestPoolMemTracker()
Definition: mem-tracker.h:400
void UnregisterFromParent()
Removes this tracker from parent_->child_trackers_.
Definition: mem-tracker.cc:127
static boost::mutex static_mem_trackers_lock_
Protects request_to_mem_trackers_ and pool_to_mem_trackers_.
Definition: mem-tracker.h:381
MemTracker tracker
int64_t rm_reserved_limit_
Definition: mem-tracker.h:409
boost::mutex resource_acquisition_lock_
Definition: mem-tracker.h:458
bool CheckLimitExceeded() const
Definition: mem-tracker.h:330
static int64_t physical_mem()
Get total physical memory in bytes (ignores cgroups memory limits).
Definition: mem-info.h:36
SpinLock gc_lock_
Lock to protect GcMemory(). This prevents many GCs from occurring at once.
Definition: mem-tracker.h:378
static RequestTrackersMap request_to_mem_trackers_
Definition: mem-tracker.h:389
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
TUniqueId query_id_
Only valid for MemTrackers returned from GetQueryMemTracker()
Definition: mem-tracker.h:397
std::list< MemTracker * > child_trackers_
Definition: mem-tracker.h:431
bool log_stack_
If true, log the stack as well.
Definition: mem-tracker.h:450
std::vector< MemTracker * > all_trackers_
Definition: mem-tracker.h:425
bool GcMemory(int64_t max_consumption)
Definition: mem-tracker.cc:263
RuntimeProfile::HighWaterMarkCounter * consumption_
in bytes; not owned
Definition: mem-tracker.h:415
bool has_limit() const
Definition: mem-tracker.h:282
bool ExpandRmReservation(int64_t bytes)
Definition: mem-tracker.cc:294
void AddChildTracker(MemTracker *tracker)
Adds tracker to child_trackers_.
Definition: mem-tracker.cc:122
IntGauge * bytes_freed_by_last_gc_metric_
Definition: mem-tracker.h:469
ResourceBroker * resource_broker()
Definition: exec-env.h:95
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
boost::mutex child_trackers_lock_
Definition: mem-tracker.h:430
static AtomicInt< int64_t > released_memory_since_gc_
Definition: mem-tracker.h:375
#define VLOG_QUERY
Definition: logging.h:57
const string REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT
Definition: mem-tracker.cc:44
std::list< MemTracker * >::iterator child_tracker_it_
Definition: mem-tracker.h:435
IntGauge * bytes_over_limit_metric_
Definition: mem-tracker.h:474
This class is thread-safe.
Definition: mem-tracker.h:61
IntCounter * num_gcs_metric_
The number of times the GcFunctions were called.
Definition: mem-tracker.h:465
static ExecEnv * GetInstance()
Definition: exec-env.h:63
void RegisterMetrics(MetricGroup *metrics, const std::string &prefix)
Definition: mem-tracker.cc:203
static MemTracker * GetRequestPoolMemTracker(const std::string &pool_name, MemTracker *parent)
Definition: mem-tracker.cc:134
std::vector< MemTracker * > limit_trackers_
Definition: mem-tracker.h:426
std::vector< GcFunction > gc_functions_
Functions to call after the limit is reached to free memory.
Definition: mem-tracker.h:438
static PoolTrackersMap pool_to_mem_trackers_
Definition: mem-tracker.h:394
void LogUpdate(bool is_consume, int64_t bytes) const
Logs the stack of the current consume/release. Used for debugging only.
Definition: mem-tracker.cc:255
SimpleMetric< T > * AddGauge(const std::string &key, const T &value, const TUnit::type unit=TUnit::NONE, const std::string &description="")
Create a gauge metric object with given key and initial value (owned by this object) ...
Definition: metrics.h:223
std::string LogUsage(const std::string &prefix="") const
Logs the usage of this tracker and all of its children (recursively).
QueryResourceMgr * query_resource_mgr_
Definition: mem-tracker.h:462
Status Expand(const TResourceBrokerExpansionRequest &request, TResourceBrokerExpansionResponse *response)
string GetStackTrace()
Definition: debug-util.cc:246
Only CPU-heavy threads need be managed using this class.
bool ok() const
Definition: status.h:172
MemTracker * parent_
Definition: mem-tracker.h:412
Status CreateExpansionRequest(int64_t memory_mb, int64_t vcores, TResourceBrokerExpansionRequest *request)
std::string label_
Definition: mem-tracker.h:411