Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
mem-tracker.h
Go to the documentation of this file.
1 // Copyright 2013 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_RUNTIME_MEM_TRACKER_H
17 #define IMPALA_RUNTIME_MEM_TRACKER_H
18 
19 #include <stdint.h>
20 #include <map>
21 #include <vector>
22 #include <boost/shared_ptr.hpp>
23 #include <boost/weak_ptr.hpp>
24 #include <boost/thread/mutex.hpp>
25 #include <boost/unordered_map.hpp>
26 
27 #include "common/logging.h"
28 #include "common/atomic.h"
29 #include "util/debug-util.h"
30 #include "util/internal-queue.h"
31 #include "util/metrics.h"
32 #include "util/runtime-profile.h"
33 #include "util/spinlock.h"
34 
35 #include "gen-cpp/Types_types.h" // for TUniqueId
36 
37 namespace impala {
38 
39 class MemTracker;
40 class QueryResourceMgr;
41 
45 //
52 //
59 //
61 class MemTracker {
62  public:
67  MemTracker(int64_t byte_limit = -1, int64_t rm_reserved_limit = -1,
68  const std::string& label = std::string(), MemTracker* parent = NULL,
69  bool log_usage_if_zero = true);
70 
73  MemTracker(RuntimeProfile* profile, int64_t byte_limit, int64_t rm_reserved_limit = -1,
74  const std::string& label = std::string(), MemTracker* parent = NULL);
75 
78  MemTracker(UIntGauge* consumption_metric,
79  int64_t byte_limit = -1, int64_t rm_reserved_limit = -1,
80  const std::string& label = std::string());
81 
82  ~MemTracker();
83 
85  void UnregisterFromParent();
86 
95  static boost::shared_ptr<MemTracker> GetQueryMemTracker(const TUniqueId& id,
96  int64_t byte_limit, int64_t rm_reserved_limit, MemTracker* parent,
97  QueryResourceMgr* res_mgr);
98 
106  static MemTracker* GetRequestPoolMemTracker(const std::string& pool_name,
107  MemTracker* parent);
108 
110  int64_t effective_limit() const {
111  // TODO: maybe no limit should be MAX_LONG?
112  DCHECK(rm_reserved_limit_ <= limit_ || limit_ == -1);
113  if (rm_reserved_limit_ == -1) return limit_;
114  return rm_reserved_limit_;
115  }
116 
118  void Consume(int64_t bytes) {
119  if (bytes < 0) {
120  Release(-bytes);
121  return;
122  }
123 
124  if (consumption_metric_ != NULL) {
125  DCHECK(parent_ == NULL);
127  return;
128  }
129  if (bytes == 0) return;
130  if (UNLIKELY(enable_logging_)) LogUpdate(true, bytes);
131  for (std::vector<MemTracker*>::iterator tracker = all_trackers_.begin();
132  tracker != all_trackers_.end(); ++tracker) {
133  (*tracker)->consumption_->Add(bytes);
134  if ((*tracker)->consumption_metric_ == NULL) {
135  DCHECK_GE((*tracker)->consumption_->current_value(), 0);
136  }
137  }
138  }
139 
145  void ConsumeLocal(int64_t bytes, MemTracker* end_tracker) {
146  DCHECK(consumption_metric_ == NULL) << "Should not be called on root.";
147  if (UNLIKELY(enable_logging_)) LogUpdate(bytes > 0, bytes);
148  for (int i = 0; i < all_trackers_.size(); ++i) {
149  if (all_trackers_[i] == end_tracker) return;
150  DCHECK(!all_trackers_[i]->has_limit());
151  all_trackers_[i]->consumption_->Add(bytes);
152  }
153  }
154 
155  void ReleaseLocal(int64_t bytes, MemTracker* end_tracker) {
156  ConsumeLocal(-bytes, end_tracker);
157  }
158 
163  bool TryConsume(int64_t bytes) {
165  if (bytes <= 0) return true;
166  if (UNLIKELY(enable_logging_)) LogUpdate(true, bytes);
167  int i = 0;
168  // Walk the tracker tree top-down, to avoid expanding a limit on a child whose parent
169  // won't accommodate the change.
170  for (i = all_trackers_.size() - 1; i >= 0; --i) {
172  int64_t limit = tracker->effective_limit();
173  if (limit < 0) {
174  tracker->consumption_->Add(bytes);
175  } else {
176  if (!tracker->consumption_->TryAdd(bytes, limit)) {
177  // One of the trackers failed, attempt to GC memory or expand our limit. If that
178  // succeeds, TryUpdate() again. Bail if either fails.
179  //
180  // TODO: This may not be right if more than one tracker can actually change its
181  // rm reservation limit.
182  if (!tracker->GcMemory(limit - bytes) || tracker->ExpandRmReservation(bytes)) {
183  if (!tracker->consumption_->TryAdd(bytes, tracker->limit_)) break;
184  } else {
185  break;
186  }
187  }
188  }
189  }
190  // Everyone succeeded, return.
191  if (i == -1) return true;
192 
193  // Someone failed, roll back the ones that succeeded.
194  // TODO: this doesn't roll it back completely since the max values for
195  // the updated trackers aren't decremented. The max values are only used
196  // for error reporting so this is probably okay. Rolling those back is
197  // pretty hard; we'd need something like 2PC.
198  //
199  // TODO: This might leave us with an allocated resource that we can't use. Do we need
200  // to adjust the consumption of the query tracker to stop the resource from never
201  // getting used by a subsequent TryConsume()?
202  for (int j = all_trackers_.size() - 1; j > i; --j) {
203  all_trackers_[j]->consumption_->Add(-bytes);
204  }
205  return false;
206  }
207 
209  void Release(int64_t bytes) {
210  if (bytes < 0) {
211  Consume(-bytes);
212  return;
213  }
214 
216  GcTcmalloc();
217  }
218 
219  if (consumption_metric_ != NULL) {
220  DCHECK(parent_ == NULL);
222  return;
223  }
224  if (bytes == 0) return;
225  if (UNLIKELY(enable_logging_)) LogUpdate(false, bytes);
226  for (std::vector<MemTracker*>::iterator tracker = all_trackers_.begin();
227  tracker != all_trackers_.end(); ++tracker) {
228  (*tracker)->consumption_->Add(-bytes);
235  if ((*tracker)->consumption_metric_ == NULL) {
236  DCHECK_GE((*tracker)->consumption_->current_value(), 0)
237  << std::endl << (*tracker)->LogUsage();
238  }
239  }
240 
242  }
243 
247  for (std::vector<MemTracker*>::iterator tracker = limit_trackers_.begin();
248  tracker != limit_trackers_.end(); ++tracker) {
249  if ((*tracker)->LimitExceeded()) return true;
250  }
251  return false;
252  }
253 
257  bool LimitExceeded() {
258  if (UNLIKELY(CheckLimitExceeded())) {
259  if (bytes_over_limit_metric_ != NULL) {
261  }
262  return GcMemory(limit_);
263  }
264  return false;
265  }
266 
270  int64_t SpareCapacity() const {
271  int64_t result = std::numeric_limits<int64_t>::max();
272  for (std::vector<MemTracker*>::const_iterator tracker = limit_trackers_.begin();
273  tracker != limit_trackers_.end(); ++tracker) {
274  int64_t mem_left = (*tracker)->limit() - (*tracker)->consumption();
275  result = std::min(result, mem_left);
276  }
277  return result;
278  }
279 
280 
281  int64_t limit() const { return limit_; }
282  bool has_limit() const { return limit_ >= 0; }
283  const std::string& label() const { return label_; }
284 
287  int64_t lowest_limit() const {
288  if (limit_trackers_.empty()) return -1;
289  int64_t v = std::numeric_limits<int64_t>::max();
290  for (int i = 0; i < limit_trackers_.size(); ++i) {
291  DCHECK(limit_trackers_[i]->has_limit());
292  v = std::min(v, limit_trackers_[i]->limit());
293  }
294  return v;
295  }
296 
298  int64_t consumption() const { return consumption_->current_value(); }
299 
303  int64_t peak_consumption() const { return consumption_->value(); }
304 
305  MemTracker* parent() const { return parent_; }
306 
308  typedef boost::function<void ()> GcFunction;
309 
313  void AddGcFunction(GcFunction f) { gc_functions_.push_back(f); }
314 
317  void RegisterMetrics(MetricGroup* metrics, const std::string& prefix);
318 
320  std::string LogUsage(const std::string& prefix = "") const;
321 
322  void EnableLogging(bool enable, bool log_stack) {
323  enable_logging_ = enable;
324  log_stack_ = log_stack;
325  }
326 
327  static const std::string COUNTER_NAME;
328 
329  private:
330  bool CheckLimitExceeded() const { return limit_ >= 0 && limit_ < consumption(); }
331 
335  bool GcMemory(int64_t max_consumption);
336 
341  void GcTcmalloc();
342 
345  query_resource_mgr_ = context;
346  }
347 
350  void Init();
351 
354 
356  void LogUpdate(bool is_consume, int64_t bytes) const;
357 
358  static std::string LogUsage(const std::string& prefix,
359  const std::list<MemTracker*>& trackers);
360 
364  bool ExpandRmReservation(int64_t bytes);
365 
371  static const int64_t GC_RELEASE_SIZE = 128 * 1024L * 1024L;
372 
376 
379 
381  static boost::mutex static_mem_trackers_lock_;
382 
387  typedef boost::unordered_map<TUniqueId, boost::weak_ptr<MemTracker> >
390 
393  typedef boost::unordered_map<std::string, MemTracker*> PoolTrackersMap;
395 
397  TUniqueId query_id_;
398 
400  std::string pool_name_;
401 
404  int64_t limit_;
405 
410 
411  std::string label_;
413 
416 
419 
424 
425  std::vector<MemTracker*> all_trackers_; // this tracker plus all of its ancestors
426  std::vector<MemTracker*> limit_trackers_; // all_trackers_ with valid limits
427 
430  mutable boost::mutex child_trackers_lock_;
431  std::list<MemTracker*> child_trackers_;
432 
435  std::list<MemTracker*>::iterator child_tracker_it_;
436 
438  std::vector<GcFunction> gc_functions_;
439 
446 
451 
455 
459 
463 
466 
470 
475 };
476 
477 }
478 
479 #endif
boost::unordered_map< std::string, MemTracker * > PoolTrackersMap
Definition: mem-tracker.h:393
T UpdateAndFetch(T delta)
Increments by delta (i.e. += delta) and returns the new val.
Definition: atomic.h:105
virtual int64_t value() const
static boost::shared_ptr< MemTracker > GetQueryMemTracker(const TUniqueId &id, int64_t byte_limit, int64_t rm_reserved_limit, MemTracker *parent, QueryResourceMgr *res_mgr)
Definition: mem-tracker.cc:156
MemTracker(int64_t byte_limit=-1, int64_t rm_reserved_limit=-1, const std::string &label=std::string(), MemTracker *parent=NULL, bool log_usage_if_zero=true)
UIntGauge * consumption_metric_
Definition: mem-tracker.h:423
int64_t effective_limit() const
Returns the minimum of limit and rm_reserved_limit.
Definition: mem-tracker.h:110
int64_t consumption() const
Returns the memory consumed in bytes.
Definition: mem-tracker.h:298
std::string pool_name_
Only valid for MemTrackers returned from GetRequestPoolMemTracker()
Definition: mem-tracker.h:400
void UnregisterFromParent()
Removes this tracker from parent_->child_trackers_.
Definition: mem-tracker.cc:127
const std::string & label() const
Definition: mem-tracker.h:283
void EnableLogging(bool enable, bool log_stack)
Definition: mem-tracker.h:322
static boost::mutex static_mem_trackers_lock_
Protects request_to_mem_trackers_ and pool_to_mem_trackers_.
Definition: mem-tracker.h:381
MemTracker tracker
int64_t rm_reserved_limit_
Definition: mem-tracker.h:409
boost::mutex resource_acquisition_lock_
Definition: mem-tracker.h:458
bool TryConsume(int64_t bytes)
Definition: mem-tracker.h:163
bool CheckLimitExceeded() const
Definition: mem-tracker.h:330
class SimpleMetric< int64_t, TMetricKind::COUNTER > IntCounter
Definition: metrics.h:320
SpinLock gc_lock_
Lock to protect GcMemory(). This prevents many GCs from occurring at once.
Definition: mem-tracker.h:378
Lightweight spinlock.
Definition: spinlock.h:24
static RequestTrackersMap request_to_mem_trackers_
Definition: mem-tracker.h:389
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
void ConsumeLocal(int64_t bytes, MemTracker *end_tracker)
Definition: mem-tracker.h:145
TUniqueId query_id_
Only valid for MemTrackers returned from GetQueryMemTracker()
Definition: mem-tracker.h:397
std::list< MemTracker * > child_trackers_
Definition: mem-tracker.h:431
bool log_stack_
If true, log the stack as well.
Definition: mem-tracker.h:450
std::vector< MemTracker * > all_trackers_
Definition: mem-tracker.h:425
int64_t SpareCapacity() const
Definition: mem-tracker.h:270
bool GcMemory(int64_t max_consumption)
Definition: mem-tracker.cc:263
RuntimeProfile::HighWaterMarkCounter * consumption_
in bytes; not owned
Definition: mem-tracker.h:415
bool TryAdd(int64_t delta, int64_t max)
bool has_limit() const
Definition: mem-tracker.h:282
void AddGcFunction(GcFunction f)
Definition: mem-tracker.h:313
bool ExpandRmReservation(int64_t bytes)
Definition: mem-tracker.cc:294
void AddChildTracker(MemTracker *tracker)
Adds tracker to child_trackers_.
Definition: mem-tracker.cc:122
int64_t peak_consumption() const
Definition: mem-tracker.h:303
IntGauge * bytes_freed_by_last_gc_metric_
Definition: mem-tracker.h:469
boost::mutex child_trackers_lock_
Definition: mem-tracker.h:430
static AtomicInt< int64_t > released_memory_since_gc_
Definition: mem-tracker.h:375
bool enable_logging_
If true, logs to INFO every consume/release called. Used for debugging.
Definition: mem-tracker.h:448
std::list< MemTracker * >::iterator child_tracker_it_
Definition: mem-tracker.h:435
int64_t lowest_limit() const
Definition: mem-tracker.h:287
IntGauge * bytes_over_limit_metric_
Definition: mem-tracker.h:474
This class is thread-safe.
Definition: mem-tracker.h:61
static const int64_t GC_RELEASE_SIZE
Definition: mem-tracker.h:371
void Release(int64_t bytes)
Decreases consumption of this tracker and its ancestors by 'bytes'.
Definition: mem-tracker.h:209
boost::unordered_map< TUniqueId, boost::weak_ptr< MemTracker > > RequestTrackersMap
Definition: mem-tracker.h:388
IntCounter * num_gcs_metric_
The number of times the GcFunctions were called.
Definition: mem-tracker.h:465
RuntimeProfile::HighWaterMarkCounter local_counter_
holds consumption_ counter if not tied to a profile
Definition: mem-tracker.h:418
void RegisterMetrics(MetricGroup *metrics, const std::string &prefix)
Definition: mem-tracker.cc:203
static MemTracker * GetRequestPoolMemTracker(const std::string &pool_name, MemTracker *parent)
Definition: mem-tracker.cc:134
std::vector< MemTracker * > limit_trackers_
Definition: mem-tracker.h:426
static const std::string COUNTER_NAME
Definition: mem-tracker.h:327
std::vector< GcFunction > gc_functions_
Functions to call after the limit is reached to free memory.
Definition: mem-tracker.h:438
static PoolTrackersMap pool_to_mem_trackers_
Definition: mem-tracker.h:394
void LogUpdate(bool is_consume, int64_t bytes) const
Logs the stack of the current consume/release. Used for debugging only.
Definition: mem-tracker.cc:255
#define UNLIKELY(expr)
Definition: compiler-util.h:33
int64_t limit() const
Definition: mem-tracker.h:281
boost::function< void()> GcFunction
Signature for function that can be called to free some memory after limit is reached.
Definition: mem-tracker.h:308
void SetQueryResourceMgr(QueryResourceMgr *context)
Set the resource mgr to allow expansion of limits (if NULL, no expansion is possible) ...
Definition: mem-tracker.h:344
std::string LogUsage(const std::string &prefix="") const
Logs the usage of this tracker and all of its children (recursively).
void Consume(int64_t bytes)
Increases consumption of this tracker and its ancestors by 'bytes'.
Definition: mem-tracker.h:118
QueryResourceMgr * query_resource_mgr_
Definition: mem-tracker.h:462
Only CPU-heavy threads need be managed using this class.
MemTracker * parent_
Definition: mem-tracker.h:412
void ReleaseLocal(int64_t bytes, MemTracker *end_tracker)
Definition: mem-tracker.h:155
std::string label_
Definition: mem-tracker.h:411
MemTracker * parent() const
Definition: mem-tracker.h:305