21 namespace posix_time = boost::posix_time;
22 using boost::get_system_time;
23 using boost::system_time;
28 DEFINE_int32(periodic_counter_update_period_ms, 500,
"Period to update rate counters and"
29 " sampling counters in ms");
48 DCHECK(src_counter == NULL || sample_fn == NULL);
71 DCHECK(
false) <<
"Unsupported PeriodicCounterType:" << type;
95 vector<RuntimeProfile::Counter*>* buckets,
bool convert) {
96 int64_t num_sampled = 0;
99 BucketCountersMap::iterator itr =
102 num_sampled = itr->second.num_sampled;
107 if (convert && num_sampled > 0) {
108 for (
int i = 0; i < buckets->size(); ++i) {
110 double perc = 100 * counter->
value() / (double)num_sampled;
130 system_time before_time = get_system_time();
131 SleepForMs(FLAGS_periodic_counter_update_period_ms);
132 posix_time::time_duration elapsed = get_system_time() - before_time;
133 int elapsed_ms = elapsed.total_milliseconds();
139 it->second.elapsed_ms += elapsed_ms;
141 if (it->second.src_counter != NULL) {
142 value = it->second.src_counter->value();
144 DCHECK(it->second.sample_fn != NULL);
145 value = it->second.sample_fn();
147 int64_t rate = value * 1000 / (it->second.elapsed_ms);
148 it->first->Set(rate);
156 ++it->second.num_sampled;
158 if (it->second.src_counter != NULL) {
159 value = it->second.src_counter->value();
161 DCHECK(it->second.sample_fn != NULL);
162 value = it->second.sample_fn();
164 it->second.total_sampled_value += value;
165 double average =
static_cast<double>(it->second.total_sampled_value) /
166 it->second.num_sampled;
167 it->first->Set(average);
175 int64_t val = it->second.src_counter->value();
176 if (val >= it->first->size()) val = it->first->size() - 1;
177 it->first->at(val)->Add(1);
178 ++it->second.num_sampled;
186 (*it)->AddSample(elapsed_ms);
virtual int64_t value() const
boost::scoped_ptr< boost::thread > update_thread_
Thread performing asynchronous updates.
RuntimeProfile::Counter * src_counter
RateCounterMap rate_counters_
static void RegisterTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter *counter)
Adds counter to be sampled and updated at regular intervals.
~PeriodicCounterUpdater()
Tears down the update thread.
AtomicInt< uint32_t > done_
If 1, tear down the update thread.
static void RegisterPeriodicCounter(RuntimeProfile::Counter *src_counter, RuntimeProfile::DerivedCounterFunction sample_fn, RuntimeProfile::Counter *dst_counter, PeriodicCounterType type)
boost::function< int64_t()> DerivedCounterFunction
T Swap(const T &new_val)
Atomically updates value_ with new_val. Returns the old value_.
RuntimeProfile::DerivedCounterFunction sample_fn
SpinLock rate_lock_
Spinlock that protects the map of rate counters.
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
virtual void Set(int64_t value)
T Read()
Safe read of the value.
RuntimeProfile::Counter * src_counter
static void StopBucketingCounters(std::vector< RuntimeProfile::Counter * > *buckets, bool convert)
TimeSeriesCounters time_series_counters_
RuntimeProfile::Counter * src_counter
static void StopRateCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'.
SamplingCounterMap sampling_counters_
SpinLock sampling_lock_
Spinlock that protects the map of averages over samples of counters.
static void RegisterBucketingCounters(RuntimeProfile::Counter *src_counter, std::vector< RuntimeProfile::Counter * > *buckets)
Adds a bucketing counter to be updated at regular intervals.
SpinLock bucketing_lock_
Spinlock that protects the map of buckets of counters.
static PeriodicCounterUpdater state_
static void StopSamplingCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'.
SpinLock time_series_lock_
Spinlock that protects the map of time series counters.
BucketCountersMap bucketing_counters_
int64_t total_sampled_value
DEFINE_int32(periodic_counter_update_period_ms, 500,"Period to update rate counters and"" sampling counters in ms")
RuntimeProfile::DerivedCounterFunction sample_fn
static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter *counter)
Stops 'counter' from receiving any more samples.