Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
periodic-counter-updater.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include "util/time.h"
18 
19 #include "common/names.h"
20 
21 namespace posix_time = boost::posix_time;
22 using boost::get_system_time;
23 using boost::system_time;
24 
25 namespace impala {
26 
27 // Period to update rate counters and sampling counters in ms.
28 DEFINE_int32(periodic_counter_update_period_ms, 500, "Period to update rate counters and"
29  " sampling counters in ms");
30 
31 PeriodicCounterUpdater PeriodicCounterUpdater::state_;
32 
34  DCHECK_EQ(this, &state_);
35  state_.update_thread_.reset(
36  new thread(&PeriodicCounterUpdater::UpdateLoop, this));
37 }
38 
40  done_.Swap(1);
41  update_thread_->join();
42 }
43 
45  RuntimeProfile::Counter* src_counter,
47  RuntimeProfile::Counter* dst_counter, PeriodicCounterType type) {
48  DCHECK(src_counter == NULL || sample_fn == NULL);
49 
50  switch (type) {
51  case RATE_COUNTER: {
52  RateCounterInfo counter;
53  counter.src_counter = src_counter;
54  counter.sample_fn = sample_fn;
55  counter.elapsed_ms = 0;
56  lock_guard<SpinLock> ratelock(state_.rate_lock_);
57  state_.rate_counters_[dst_counter] = counter;
58  break;
59  }
60  case SAMPLING_COUNTER: {
61  SamplingCounterInfo counter;
62  counter.src_counter = src_counter;
63  counter.sample_fn = sample_fn;
64  counter.num_sampled = 0;
65  counter.total_sampled_value = 0;
66  lock_guard<SpinLock> samplinglock(state_.sampling_lock_);
67  state_.sampling_counters_[dst_counter] = counter;
68  break;
69  }
70  default:
71  DCHECK(false) << "Unsupported PeriodicCounterType:" << type;
72  }
73 }
74 
76  lock_guard<SpinLock> ratelock(state_.rate_lock_);
77  state_.rate_counters_.erase(counter);
78 }
79 
81  lock_guard<SpinLock> samplinglock(state_.sampling_lock_);
82  state_.sampling_counters_.erase(counter);
83 }
84 
86  RuntimeProfile::Counter* src_counter, vector<RuntimeProfile::Counter*>* buckets) {
87  BucketCountersInfo info;
88  info.src_counter = src_counter;
89  info.num_sampled = 0;
90  lock_guard<SpinLock> bucketinglock(state_.bucketing_lock_);
91  state_.bucketing_counters_[buckets] = info;
92 }
93 
95  vector<RuntimeProfile::Counter*>* buckets, bool convert) {
96  int64_t num_sampled = 0;
97  {
98  lock_guard<SpinLock> bucketinglock(state_.bucketing_lock_);
99  BucketCountersMap::iterator itr =
100  state_.bucketing_counters_.find(buckets);
101  if (itr != state_.bucketing_counters_.end()) {
102  num_sampled = itr->second.num_sampled;
103  state_.bucketing_counters_.erase(itr);
104  }
105  }
106 
107  if (convert && num_sampled > 0) {
108  for (int i = 0; i < buckets->size(); ++i) {
109  RuntimeProfile::Counter* counter = (*buckets)[i];
110  double perc = 100 * counter->value() / (double)num_sampled;
111  counter->Set(perc);
112  }
113  }
114 }
115 
118  lock_guard<SpinLock> timeserieslock(state_.time_series_lock_);
119  state_.time_series_counters_.insert(counter);
120 }
121 
124  lock_guard<SpinLock> timeserieslock(state_.time_series_lock_);
125  state_.time_series_counters_.erase(counter);
126 }
127 
129  while (done_.Read() == 0) {
130  system_time before_time = get_system_time();
131  SleepForMs(FLAGS_periodic_counter_update_period_ms);
132  posix_time::time_duration elapsed = get_system_time() - before_time;
133  int elapsed_ms = elapsed.total_milliseconds();
134 
135  {
136  lock_guard<SpinLock> ratelock(state_.rate_lock_);
137  for (RateCounterMap::iterator it = rate_counters_.begin();
138  it != rate_counters_.end(); ++it) {
139  it->second.elapsed_ms += elapsed_ms;
140  int64_t value;
141  if (it->second.src_counter != NULL) {
142  value = it->second.src_counter->value();
143  } else {
144  DCHECK(it->second.sample_fn != NULL);
145  value = it->second.sample_fn();
146  }
147  int64_t rate = value * 1000 / (it->second.elapsed_ms);
148  it->first->Set(rate);
149  }
150  }
151 
152  {
153  lock_guard<SpinLock> samplinglock(state_.sampling_lock_);
154  for (SamplingCounterMap::iterator it = sampling_counters_.begin();
155  it != sampling_counters_.end(); ++it) {
156  ++it->second.num_sampled;
157  int64_t value;
158  if (it->second.src_counter != NULL) {
159  value = it->second.src_counter->value();
160  } else {
161  DCHECK(it->second.sample_fn != NULL);
162  value = it->second.sample_fn();
163  }
164  it->second.total_sampled_value += value;
165  double average = static_cast<double>(it->second.total_sampled_value) /
166  it->second.num_sampled;
167  it->first->Set(average);
168  }
169  }
170 
171  {
172  lock_guard<SpinLock> bucketinglock(state_.bucketing_lock_);
173  for (BucketCountersMap::iterator it = bucketing_counters_.begin();
174  it != bucketing_counters_.end(); ++it) {
175  int64_t val = it->second.src_counter->value();
176  if (val >= it->first->size()) val = it->first->size() - 1;
177  it->first->at(val)->Add(1);
178  ++it->second.num_sampled;
179  }
180  }
181 
182  {
183  lock_guard<SpinLock> timeserieslock(state_.time_series_lock_);
184  for (TimeSeriesCounters::iterator it = time_series_counters_.begin();
185  it != time_series_counters_.end(); ++it) {
186  (*it)->AddSample(elapsed_ms);
187  }
188  }
189  }
190 }
191 
192 
193 
194 
195 }
virtual int64_t value() const
boost::scoped_ptr< boost::thread > update_thread_
Thread performing asynchronous updates.
static void RegisterTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter *counter)
Adds counter to be sampled and updated at regular intervals.
~PeriodicCounterUpdater()
Tears down the update thread.
AtomicInt< uint32_t > done_
If 1, tear down the update thread.
static void RegisterPeriodicCounter(RuntimeProfile::Counter *src_counter, RuntimeProfile::DerivedCounterFunction sample_fn, RuntimeProfile::Counter *dst_counter, PeriodicCounterType type)
boost::function< int64_t()> DerivedCounterFunction
T Swap(const T &new_val)
Atomically updates value_ with new_val. Returns the old value_.
Definition: atomic.h:142
RuntimeProfile::DerivedCounterFunction sample_fn
SpinLock rate_lock_
Spinlock that protects the map of rate counters.
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
Definition: time.cc:21
virtual void Set(int64_t value)
T Read()
Safe read of the value.
Definition: atomic.h:100
static void StopBucketingCounters(std::vector< RuntimeProfile::Counter * > *buckets, bool convert)
static void StopRateCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'.
SpinLock sampling_lock_
Spinlock that protects the map of averages over samples of counters.
static void RegisterBucketingCounters(RuntimeProfile::Counter *src_counter, std::vector< RuntimeProfile::Counter * > *buckets)
Adds a bucketing counter to be updated at regular intervals.
SpinLock bucketing_lock_
Spinlock that protects the map of buckets of counters.
static PeriodicCounterUpdater state_
static void StopSamplingCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'.
SpinLock time_series_lock_
Spinlock that protects the map of time series counters.
DEFINE_int32(periodic_counter_update_period_ms, 500,"Period to update rate counters and"" sampling counters in ms")
static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter *counter)
Stops 'counter' from receiving any more samples.