17 #include <boost/algorithm/string/join.hpp>
18 #include <google/malloc_extension.h>
19 #include <gutil/strings/substitute.h>
31 using boost::algorithm::join;
32 using namespace strings;
36 const string MemTracker::COUNTER_NAME =
"PeakMemoryUsage";
37 MemTracker::RequestTrackersMap MemTracker::request_to_mem_trackers_;
38 MemTracker::PoolTrackersMap MemTracker::pool_to_mem_trackers_;
39 mutex MemTracker::static_mem_trackers_lock_;
41 AtomicInt<int64_t> MemTracker::released_memory_since_gc_;
46 MemTracker::MemTracker(int64_t byte_limit, int64_t rm_reserved_limit,
const string& label,
49 rm_reserved_limit_(rm_reserved_limit),
52 consumption_(&local_counter_),
53 local_counter_(TUnit::BYTES),
54 consumption_metric_(NULL),
55 auto_unregister_(false),
56 enable_logging_(false),
58 log_usage_if_zero_(log_usage_if_zero),
59 query_resource_mgr_(NULL),
60 num_gcs_metric_(NULL),
61 bytes_freed_by_last_gc_metric_(NULL),
62 bytes_over_limit_metric_(NULL) {
63 if (parent != NULL) parent_->AddChildTracker(
this);
67 MemTracker::MemTracker(
68 RuntimeProfile* profile, int64_t byte_limit, int64_t rm_reserved_limit,
71 rm_reserved_limit_(rm_reserved_limit),
74 consumption_(profile->AddHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES)),
75 local_counter_(TUnit::BYTES),
76 consumption_metric_(NULL),
77 auto_unregister_(false),
78 enable_logging_(false),
80 log_usage_if_zero_(true),
81 query_resource_mgr_(NULL),
82 num_gcs_metric_(NULL),
83 bytes_freed_by_last_gc_metric_(NULL),
84 bytes_over_limit_metric_(NULL) {
90 int64_t byte_limit, int64_t rm_reserved_limit,
const string& label)
92 rm_reserved_limit_(rm_reserved_limit),
95 consumption_(&local_counter_),
96 local_counter_(TUnit::BYTES),
97 consumption_metric_(consumption_metric),
98 auto_unregister_(false),
99 enable_logging_(false),
101 log_usage_if_zero_(true),
102 query_resource_mgr_(NULL),
103 num_gcs_metric_(NULL),
104 bytes_freed_by_last_gc_metric_(NULL),
105 bytes_over_limit_metric_(NULL) {
113 while (tracker != NULL) {
136 DCHECK(!pool_name.empty());
144 if (parent == NULL)
return NULL;
157 const TUniqueId&
id, int64_t byte_limit, int64_t rm_reserved_limit,
MemTracker* parent,
159 if (byte_limit != -1) {
161 LOG(WARNING) <<
"Memory limit "
163 <<
" exceeds physical memory of "
175 shared_ptr<MemTracker>
tracker = it->second.lock();
176 DCHECK_EQ(tracker->limit_, byte_limit);
177 DCHECK(
id == tracker->query_id_);
178 DCHECK(parent == tracker->
parent_);
184 Substitute(
"Query($0) Limit", lexical_cast<string>(
id)), parent));
185 tracker->auto_unregister_ =
true;
186 tracker->query_id_ = id;
188 if (res_mgr != NULL) tracker->SetQueryResourceMgr(res_mgr);
205 Substitute(
"$0.num-gcs", prefix), 0L, TUnit::UNIT);
209 Substitute(
"$0.bytes-freed-by-last-gc", prefix), -1, TUnit::BYTES);
212 Substitute(
"$0.bytes-over-limit", prefix), -1, TUnit::BYTES);
231 ss << prefix <<
label_ <<
":";
236 stringstream prefix_ss;
237 prefix_ss << prefix <<
" ";
238 string new_prefix = prefix_ss.str();
241 if (!child_trackers_usage.empty()) ss <<
"\n" << child_trackers_usage;
246 vector<string> usage_strings;
247 for (list<MemTracker*>::const_iterator it = trackers.begin();
248 it != trackers.end(); ++it) {
249 string usage_string = (*it)->LogUsage(prefix);
250 if (!usage_string.empty()) usage_strings.push_back(usage_string);
252 return join(usage_strings,
"\n");
257 ss <<
this <<
" " << (is_consume ?
"Consume: " :
"Release: ") << bytes
260 LOG(ERROR) << ss.str();
264 if (max_consumption < 0)
return true;
269 if (pre_gc_consumption < max_consumption)
return false;
286 #ifndef ADDRESS_SANITIZER
288 MallocExtension::instance()->ReleaseFreeMemory();
305 TResourceBrokerExpansionRequest exp;
308 TResourceBrokerExpansionResponse response;
311 LOG(INFO) <<
"Failed to expand memory limit by "
317 DCHECK(response.allocated_resources.size() == 1) <<
"Got more resources than expected";
318 const llama::TAllocatedResource& resource =
319 response.allocated_resources.begin()->second;
320 DCHECK(resource.v_cpu_cores == 0L) <<
"Unexpected VCPUs returned by Llama";
324 int64_t bytes_allocated = resource.memory_mb * 1024L * 1024L;
326 if (tracker ==
this)
continue;
SimpleMetric< T, TMetricKind::COUNTER > * AddCounter(const std::string &key, const T &value, const TUnit::type unit=TUnit::UNIT, const std::string &description="")
static boost::shared_ptr< MemTracker > GetQueryMemTracker(const TUniqueId &id, int64_t byte_limit, int64_t rm_reserved_limit, MemTracker *parent, QueryResourceMgr *res_mgr)
MemTracker(int64_t byte_limit=-1, int64_t rm_reserved_limit=-1, const std::string &label=std::string(), MemTracker *parent=NULL, bool log_usage_if_zero=true)
UIntGauge * consumption_metric_
int64_t current_value() const
int64_t consumption() const
Returns the memory consumed in bytes.
const std::string GetDetail() const
std::string pool_name_
Only valid for MemTrackers returned from GetRequestPoolMemTracker()
void UnregisterFromParent()
Removes this tracker from parent_->child_trackers_.
static boost::mutex static_mem_trackers_lock_
Protects request_to_mem_trackers_ and pool_to_mem_trackers_.
int64_t rm_reserved_limit_
boost::mutex resource_acquisition_lock_
bool CheckLimitExceeded() const
static int64_t physical_mem()
Get total physical memory in bytes (ignores cgroups memory limits).
SpinLock gc_lock_
Lock to protect GcMemory(). This prevents many GCs from occurring at once.
static RequestTrackersMap request_to_mem_trackers_
MetricGroups may be organised hierarchically as a tree.
TUniqueId query_id_
Only valid for MemTrackers returned from GetQueryMemTracker()
std::list< MemTracker * > child_trackers_
bool log_stack_
If true, log the stack as well.
std::vector< MemTracker * > all_trackers_
bool GcMemory(int64_t max_consumption)
RuntimeProfile::HighWaterMarkCounter * consumption_
in bytes; not owned
bool ExpandRmReservation(int64_t bytes)
void AddChildTracker(MemTracker *tracker)
Adds tracker to child_trackers_.
IntGauge * bytes_freed_by_last_gc_metric_
ResourceBroker * resource_broker()
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
boost::mutex child_trackers_lock_
static AtomicInt< int64_t > released_memory_since_gc_
const string REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT
std::list< MemTracker * >::iterator child_tracker_it_
IntGauge * bytes_over_limit_metric_
This class is thread-safe.
IntCounter * num_gcs_metric_
The number of times the GcFunctions were called.
static ExecEnv * GetInstance()
void RegisterMetrics(MetricGroup *metrics, const std::string &prefix)
static MemTracker * GetRequestPoolMemTracker(const std::string &pool_name, MemTracker *parent)
std::vector< MemTracker * > limit_trackers_
std::vector< GcFunction > gc_functions_
Functions to call after the limit is reached to free memory.
static PoolTrackersMap pool_to_mem_trackers_
void LogUpdate(bool is_consume, int64_t bytes) const
Logs the stack of the current consume/release. Used for debugging only.
SimpleMetric< T > * AddGauge(const std::string &key, const T &value, const TUnit::type unit=TUnit::NONE, const std::string &description="")
Create a gauge metric object with given key and initial value (owned by this object) ...
virtual void Set(int64_t v)
std::string LogUsage(const std::string &prefix="") const
Logs the usage of this tracker and all of its children (recursively).
QueryResourceMgr * query_resource_mgr_
Status Expand(const TResourceBrokerExpansionRequest &request, TResourceBrokerExpansionResponse *response)
Only CPU-heavy threads need be managed using this class.
Status CreateExpansionRequest(int64_t memory_mb, int64_t vcores, TResourceBrokerExpansionRequest *request)