Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
streaming-sampler.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef IMPALA_UTIL_STREAMING_SAMPLER_H
16 #define IMPALA_UTIL_STREAMING_SAMPLER_H
17 
18 #include <string.h>
19 #include <iostream>
20 #include "util/spinlock.h"
21 
22 namespace impala {
23 
31 template<typename T, int MAX_SAMPLES>
33  public:
34  StreamingSampler(int initial_period = 500)
35  : samples_collected_(0) ,
36  period_(initial_period),
40  }
41 
43  StreamingSampler(int period, const std::vector<T>& initial_samples)
44  : samples_collected_(initial_samples.size()),
45  period_(period),
49  DCHECK_LE(samples_collected_, MAX_SAMPLES);
50  memcpy(samples_, &initial_samples[0], sizeof(T) * samples_collected_);
51  }
52 
61  void AddSample(T sample, int ms) {
62  boost::lock_guard<SpinLock> l(lock_);
64  current_sample_sum_ += sample;
66 
72 
73  if (samples_collected_ == MAX_SAMPLES) {
75  period_ *= 2;
76  for (int i = 0; i < MAX_SAMPLES / 2; ++i) {
77  samples_[i] = (samples_[i * 2] + samples_[i * 2 + 1]) / 2;
78  }
79  samples_collected_ /= 2;
80  }
81  }
82  }
83 
88  const T* GetSamples(int* num_samples, int* period, SpinLock** lock = NULL) const {
89  if (lock != NULL) {
90  lock_.lock();
91  *lock = &lock_;
92  }
93  *num_samples = samples_collected_;
94  *period = period_;
95  return samples_;
96  }
97 
99  void SetSamples(int period, const std::vector<T>& samples) {
100  DCHECK_LE(samples.size(), MAX_SAMPLES);
101 
102  boost::lock_guard<SpinLock> l(lock_);
103  period_ = period;
104  samples_collected_ = samples.size();
105  memcpy(samples_, &samples[0], sizeof(T) * samples_collected_);
109  }
110 
111  std::string DebugString(const std::string& prefix="") const {
112  boost::lock_guard<SpinLock> l(lock_);
113  std::stringstream ss;
114  ss << prefix << "Period = " << period_ << std::endl
115  << prefix << "Num = " << samples_collected_ << std::endl
116  << prefix << "Samples = {";
117  for (int i = 0; i < samples_collected_; ++i) {
118  ss << samples_[i] << ", ";
119  }
120  ss << prefix << "}" << std::endl;
121  return ss.str();
122  }
123 
124  private:
125  mutable SpinLock lock_;
126 
129  T samples_[MAX_SAMPLES];
130 
133 
135  int period_;
136 
139 
142 
145 };
146 
147 }
148 
149 #endif
StreamingSampler(int initial_period=500)
Lightweight spinlock.
Definition: spinlock.h:24
int samples_collected_
Number of samples collected <= MAX_SAMPLES.
int period_
Storage period in ms.
void lock()
Acquires the lock, spins until the lock becomes available.
Definition: spinlock.h:29
int current_sample_total_time_
The total time that current_sample_sum_ represents.
int current_sample_count_
The number of input samples that contribute to current_sample_sum_.
void SetSamples(int period, const std::vector< T > &samples)
Set the underlying data to period/samples.
const T * GetSamples(int *num_samples, int *period, SpinLock **lock=NULL) const
T current_sample_sum_
The sum of input samples that makes up the next stored sample.
void AddSample(T sample, int ms)
StreamingSampler(int period, const std::vector< T > &initial_samples)
Initialize the sampler with values.
std::string DebugString(const std::string &prefix="") const