20 #include <boost/bind.hpp> 
   21 #include <boost/foreach.hpp> 
   22 #include <boost/thread/locks.hpp> 
   23 #include <boost/thread/thread.hpp> 
   54     bool is_averaged_profile)
 
   59     is_averaged_profile_(is_averaged_profile),
 
   60     counter_total_time_(TUnit::TIME_NS),
 
   61     total_async_timer_(TUnit::TIME_NS),
 
   62     inactive_timer_(TUnit::TIME_NS),
 
   63     local_time_percent_(0),
 
   68   if (!is_averaged_profile) {
 
   83   map<string, Counter*>::const_iterator iter;
 
   89   set<vector<Counter*>* >::const_iterator buckets_iter;
 
   97   TimeSeriesCounterMap::const_iterator time_series_it;
 
  107     const TRuntimeProfileTree& profiles) {
 
  108   if (profiles.nodes.size() == 0) 
return NULL;
 
  114     const vector<TRuntimeProfileNode>& nodes, 
int* 
idx) {
 
  115   DCHECK_LT(*idx, nodes.size());
 
  117   const TRuntimeProfileNode& node = nodes[*
idx];
 
  120   for (
int i = 0; i < node.counters.size(); ++i) {
 
  121     const TCounter& counter = node.counters[i];
 
  123       pool->
Add(
new Counter(counter.unit, counter.value));
 
  126   if (node.__isset.event_sequences) {
 
  127     BOOST_FOREACH(
const TEventSequence& sequence, node.event_sequences) {
 
  129           pool->
Add(
new EventSequence(sequence.timestamps, sequence.labels));
 
  133   if (node.__isset.time_series_counters) {
 
  134     BOOST_FOREACH(
const TTimeSeriesCounter& val, node.time_series_counters) {
 
  136           pool->
Add(
new TimeSeriesCounter(val.name, val.unit, val.period_ms, val.values));
 
  145   for (
int i = 0; i < node.num_children; ++i) {
 
  152   DCHECK(other != NULL);
 
  157     CounterMap::iterator dst_iter;
 
  158     CounterMap::const_iterator src_iter;
 
  177         DCHECK(dst_iter->second->unit() == src_iter->second->unit());
 
  184     ChildCounterMap::const_iterator child_counter_src_itr;
 
  187          ++child_counter_src_itr) {
 
  189           child_counter_src_itr->first, set<string>());
 
  190       child_counters->insert(child_counter_src_itr->second.begin(),
 
  191           child_counter_src_itr->second.end());
 
  199     for (
int i = 0; i < other->
children_.size(); ++i) {
 
  208         bool indent_other_child = other->
children_[i].second;
 
  210         children_.push_back(make_pair(child, indent_other_child));
 
  221   Update(thrift_profile.nodes, &idx);
 
  222   DCHECK_EQ(idx, thrift_profile.nodes.size());
 
  226   DCHECK_LT(*idx, nodes.size());
 
  227   const TRuntimeProfileNode& node = nodes[*
idx];
 
  231     map<string, Counter*>::iterator dst_iter;
 
  232     for (
int i = 0; i < node.counters.size(); ++i) {
 
  233       const TCounter& tcounter = node.counters[i];
 
  234       CounterMap::iterator j = 
counter_map_.find(tcounter.name);
 
  237           pool_->
Add(
new Counter(tcounter.unit, tcounter.value));
 
  239         if (j->second->unit() != tcounter.unit) {
 
  240           LOG(ERROR) << 
"Cannot update counters with the same name (" 
  241                      << j->first << 
") but different units.";
 
  243           j->second->Set(tcounter.value);
 
  248     ChildCounterMap::const_iterator child_counter_src_itr;
 
  249     for (child_counter_src_itr = node.child_counters_map.begin();
 
  250          child_counter_src_itr != node.child_counters_map.end();
 
  251          ++child_counter_src_itr) {
 
  253           child_counter_src_itr->first, set<string>());
 
  254       child_counters->insert(child_counter_src_itr->second.begin(),
 
  255           child_counter_src_itr->second.end());
 
  261     const InfoStrings& info_strings = node.info_strings;
 
  262     BOOST_FOREACH(
const string& key, node.info_strings_display_order) {
 
  267       InfoStrings::const_iterator it = info_strings.find(key);
 
  268       DCHECK(it != info_strings.end());
 
  281     for (
int i = 0; i < node.time_series_counters.size(); ++i) {
 
  282       const TTimeSeriesCounter& c = node.time_series_counters[i];
 
  286             pool_->
Add(
new TimeSeriesCounter(c.name, c.unit, c.period_ms, c.values));
 
  289         it->second->samples_.SetSamples(c.period_ms, c.values);
 
  298     for (
int i = 0; i < node.num_children; ++i) {
 
  299       const TRuntimeProfileNode& tchild = nodes[*
idx];
 
  300       ChildMap::iterator j = 
child_map_.find(tchild.name);
 
  306         child->metadata_ = tchild.metadata;
 
  308         children_.push_back(make_pair(child, tchild.indent));
 
  310       child->Update(nodes, idx);
 
  317   map<string, Counter*>::iterator iter;
 
  321       if (iter->second->unit() == TUnit::DOUBLE_VALUE) {
 
  322         iter->second->Set(iter->second->double_value() / n);
 
  324         iter->second->value_ = iter->second->value() / n;
 
  331       i->second->Divide(n);
 
  341   if (total == 0) 
return;
 
  344   int64_t total_child_time = 0;
 
  346   for (
int i = 0; i < 
children_.size(); ++i) {
 
  347     total_child_time += 
children_[i].first->total_time_counter()->value();
 
  363   for (
int i = 0; i < 
children_.size(); ++i) {
 
  364     children_[i].first->ComputeTimeInProfile(total);
 
  369   DCHECK(child != NULL);
 
  378     children_.push_back(make_pair(child, indent));
 
  381       if (it->first == loc) {
 
  382         children_.insert(++it, make_pair(child, indent));
 
  386     DCHECK(
false) << 
"Invalid loc";
 
  394     children->push_back(i->second);
 
  401     children->push_back(i->second);
 
  402     i->second->GetAllChildren(children);
 
  426 #define ADD_COUNTER_IMPL(NAME, T) \ 
  427   RuntimeProfile::T* RuntimeProfile::NAME(\ 
  428       const string& name, TUnit::type unit, const string& parent_counter_name) {\ 
  429     DCHECK_EQ(is_averaged_profile_, false);\ 
  430     lock_guard<mutex> l(counter_map_lock_);\ 
  431     if (counter_map_.find(name) != counter_map_.end()) {\ 
  432       return reinterpret_cast<T*>(counter_map_[name]);\ 
  434     DCHECK(parent_counter_name == ROOT_COUNTER ||\ 
  435         counter_map_.find(parent_counter_name) != counter_map_.end());\ 
  436     T* counter = pool_->Add(new T(unit));\ 
  437     counter_map_[name] = counter;\ 
  438     set<string>* child_counters =\ 
  439         FindOrInsert(&child_counter_map_, parent_counter_name, set<string>());\ 
  440     child_counters->insert(name);\ 
  448     const string& 
name, TUnit::type unit,
 
  455   set<string>* child_counters =
 
  457   child_counters->insert(name);
 
  462     const string& prefix) {
 
  486   if (c != NULL) counters->push_back(c);
 
  489   for (
int i = 0; i < 
children_.size(); ++i) {
 
  490     children_[i].first->GetCounters(name, counters);
 
  508   ostream& stream = *s;
 
  520   map<string, Counter*>::const_iterator total_time =
 
  522   DCHECK(total_time != counter_map.end());
 
  524   stream.flags(ios::fixed);
 
  525   stream << prefix << 
name_ << 
":";
 
  526   if (total_time->second->value() != 0) {
 
  529                total_time->second->unit())
 
  541       stream << prefix << 
"  " << key << 
": " << 
info_strings_.find(key)->second << endl;
 
  552     vector<EventSequence::Event> 
events;
 
  560       int64_t last = event_sequence.second->ElapsedTime();
 
  561       event_sequence.second->GetEvents(&events);
 
  562       if (last == 0 && events.size() > 0) last = events.back().second;
 
  563       stream << prefix << 
"  " << event_sequence.first << 
": " 
  568       event_sequence.second->GetEvents(&events);
 
  570         stream << prefix << 
"     - " << 
event.first << 
": " 
  586       const int64_t* samples = v.second->samples_.GetSamples(&num, &period, &lock);
 
  588         stream << prefix << 
"  " << v.first << 
"(" 
  591         for (
int i = 0; i < num; ++i) {
 
  593           if (i != num - 1) stream << 
", ";
 
  602       prefix, 
ROOT_COUNTER, counter_map, child_counter_map, s);
 
  611   for (
int i = 0; i < children.size(); ++i) {
 
  613     bool indent = children[i].second;
 
  614     profile->
PrettyPrint(s, prefix + (indent ? 
"  " : 
""));
 
  625   TRuntimeProfileTree thrift_object;
 
  628   vector<uint8_t> serialized_buffer;
 
  629   Status status = serializer.Serialize(&thrift_object, &serialized_buffer);
 
  630   if (!status.
ok()) 
return;
 
  634   scoped_ptr<Codec> compressor;
 
  637   if (!status.
ok()) 
return;
 
  639   vector<uint8_t> compressed_buffer;
 
  640   compressed_buffer.resize(compressor->MaxOutputLen(serialized_buffer.size()));
 
  641   int64_t result_len = compressed_buffer.size();
 
  642   uint8_t* compressed_buffer_ptr = &compressed_buffer[0];
 
  643   compressor->ProcessBlock(
true, serialized_buffer.size(), &serialized_buffer[0],
 
  644       &result_len, &compressed_buffer_ptr);
 
  645   compressed_buffer.resize(result_len);
 
  657   nodes->reserve(nodes->size() + 
children_.size());
 
  659   int index = nodes->size();
 
  660   nodes->push_back(TRuntimeProfileNode());
 
  661   TRuntimeProfileNode& node = (*nodes)[index];
 
  673   for (map<string, Counter*>::const_iterator iter = counter_map.begin();
 
  674        iter != counter_map.end(); ++iter) {
 
  676     counter.name = iter->first;
 
  677     counter.value = iter->second->value();
 
  678     counter.unit = iter->second->unit();
 
  679     node.counters.push_back(counter);
 
  689     vector<EventSequence::Event> 
events;
 
  692       node.__set_event_sequences(vector<TEventSequence>());
 
  696         TEventSequence* seq = &node.event_sequences[idx++];
 
  697         seq->name = val.first;
 
  698         val.second->GetEvents(&events);
 
  700           seq->labels.push_back(ev.first);
 
  701           seq->timestamps.push_back(ev.second);
 
  710       node.__set_time_series_counters(vector<TTimeSeriesCounter>());
 
  713       BOOST_FOREACH(
const TimeSeriesCounterMap::value_type& val,
 
  715         val.second->ToThrift(&node.time_series_counters[idx++]);
 
  725   for (
int i = 0; i < children.size(); ++i) {
 
  726     int child_idx = nodes->size();
 
  727     children[i].first->ToThrift(nodes);
 
  729     (*nodes)[child_idx].indent = children[i].second;
 
  736   DCHECK(total_counter->
unit() == TUnit::BYTES ||
 
  737          total_counter->
unit() == TUnit::UNIT);
 
  738   DCHECK(timer->
unit() == TUnit::TIME_NS);
 
  740   if (timer->
value() == 0) 
return 0;
 
  741   double secs = 
static_cast<double>(timer->
value()) / 1000.0 / 1000.0 / 1000.0;
 
  742   return total_counter->
value() / secs;
 
  747   for (
int i = 0; i < counters->size(); ++i) {
 
  748     value += (*counters)[i]->value();
 
  754     const string& 
name, Counter* src_counter) {
 
  755   TUnit::type dst_unit;
 
  756   switch (src_counter->unit()) {
 
  758       dst_unit = TUnit::BYTES_PER_SECOND;
 
  761       dst_unit = TUnit::UNIT_PER_SECOND;
 
  764       DCHECK(
false) << 
"Unsupported src counter unit: " << src_counter->
unit();
 
  767   Counter* dst_counter = 
AddCounter(name, dst_unit);
 
  774     const string& name, DerivedCounterFunction fn, TUnit::type dst_unit) {
 
  775   Counter* dst_counter = 
AddCounter(name, dst_unit);
 
  782     const string& name, Counter* src_counter) {
 
  783   DCHECK(src_counter->unit() == TUnit::UNIT);
 
  784   Counter* dst_counter = 
AddCounter(name, TUnit::DOUBLE_VALUE);
 
  791     const string& name, DerivedCounterFunction sample_fn) {
 
  792   Counter* dst_counter = 
AddCounter(name, TUnit::DOUBLE_VALUE);
 
  799     vector<Counter*>* buckets) {
 
  813   EventSequence* timer = 
pool_->
Add(
new EventSequence());
 
  819     const TEventSequence& from) {
 
  824   EventSequence* timer = 
pool_->
Add(
new EventSequence(from.timestamps, from.labels));
 
  830     const string& counter_name, 
const CounterMap& counter_map,
 
  832   ostream& stream = *s;
 
  833   ChildCounterMap::const_iterator itr = child_counter_map.find(counter_name);
 
  834   if (itr != child_counter_map.end()) {
 
  835     const set<string>& child_counters = itr->second;
 
  836     BOOST_FOREACH(
const string& child_counter, child_counters) {
 
  837       CounterMap::const_iterator iter = counter_map.find(child_counter);
 
  838       if (iter == counter_map.end()) 
continue;
 
  839       stream << prefix << 
"   - " << iter->first << 
": " 
  843           child_counter_map, s);
 
  849     const string& name, TUnit::type unit, DerivedCounterFunction fn) {
 
  855   TimeSeriesCounter* counter = 
pool_->
Add(
new TimeSeriesCounter(name, unit, fn));
 
  862     const string& name, Counter* src_counter) {
 
  863   DCHECK(src_counter != NULL);
 
  869   counter->name = 
name_;
 
  870   counter->unit = 
unit_;
 
  875   counter->values.resize(num);
 
  876   memcpy(&counter->values[0], samples, num * 
sizeof(int64_t));
 
  878   counter->period_ms = period;
 
  883   ss << 
"Counter=" << 
name_ << endl
 
  884      << samples_.DebugString();
 
  890     seq->labels.push_back(ev.first);
 
  891     seq->timestamps.push_back(ev.second);
 
Counter * AddCounter(const std::string &name, TUnit::type unit, const std::string &parent_counter_name="")
virtual int64_t value() const 
DerivedCounter * AddDerivedCounter(const std::string &name, TUnit::type unit, const DerivedCounterFunction &counter_fn, const std::string &parent_counter_name="")
EventSequence * AddEventSequence(const std::string &key)
ChildCounterMap child_counter_map_
A set of counters that measure thread info, such as total time, user time, sys time. 
const std::string GetDetail() const 
std::string RedactCopy(const std::string &original)
Utility function to redacted a string without modifying the original. 
Counter * AddSamplingCounter(const std::string &name, Counter *src_counter)
Counter total_async_timer_
static Status CreateCompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *compressor)
void AddInfoString(const std::string &key, const std::string &value)
client RuntimeProfile::EventSequence * events
void ComputeTimeInProfile()
std::string SerializeToArchiveString() const 
static void RegisterTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter *counter)
Adds counter to be sampled and updated at regular intervals. 
InfoStrings info_strings_
boost::mutex event_sequence_lock_
static const string ROOT_COUNTER
static RuntimeProfile * CreateFromThrift(ObjectPool *pool, const TRuntimeProfileTree &profiles)
Deserialize from thrift. Runtime profiles are allocated from the pool. 
std::map< std::string, std::set< std::string > > ChildCounterMap
std::pair< std::string, int64_t > Event
An Event is a <label, timestamp> pair. 
RuntimeProfile(ObjectPool *pool, const std::string &name, bool is_averaged_profile=false)
static void RegisterPeriodicCounter(RuntimeProfile::Counter *src_counter, RuntimeProfile::DerivedCounterFunction sample_fn, RuntimeProfile::Counter *dst_counter, PeriodicCounterType type)
void RegisterBucketingCounters(Counter *src_counter, std::vector< Counter * > *buckets)
std::string name_
Name for this runtime profile. 
void GetCounters(const std::string &name, std::vector< Counter * > *counters)
EventSequenceMap event_sequence_map_
boost::function< int64_t()> DerivedCounterFunction
std::map< std::string, Counter * > CounterMap
Counter * AddRateCounter(const std::string &name, Counter *src_counter)
static int64_t UnitsPerSecond(const Counter *total_counter, const Counter *timer)
Derived counter function: return measured throughput as input_value/second. 
Counter * inactive_timer()
void UpdateCounter(Counter *new_counter)
static void Base64Encode(const char *in, int in_len, stringstream *out)
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
boost::mutex time_series_counter_map_lock_
EventSequence * GetEventSequence(const std::string &name) const 
Returns event sequence with the provided name if it exists, otherwise NULL. 
static int64_t CounterSum(const std::vector< Counter * > *counters)
Derived counter function: return aggregated value. 
void ToThrift(TTimeSeriesCounter *counter)
V * FindOrInsert(std::map< K, V > *m, const K &key, const V &default_val)
double local_time_percent_
void UpdateAverage(RuntimeProfile *src)
static const string THREAD_VOLUNTARY_CONTEXT_SWITCHES
void Divide(int n)
Divides all counters by n. 
static const string THREAD_TOTAL_TIME
std::map< PlanNodeId, RuntimeProfile::Counter * > CounterMap
map from id of a scan node to a specific counter in the node's profile 
bool own_pool_
True if we have to delete the pool_ on destruction. 
ADD_COUNTER_IMPL(AddCounter, Counter)
static void StopBucketingCounters(std::vector< RuntimeProfile::Counter * > *buckets, bool convert)
static const std::string ASYNC_TIME_COUNTER_NAME
ThreadCounters * AddThreadCounters(const std::string &prefix)
Counter * voluntary_context_switches_
static const string THREAD_SYS_TIME
static void StopRateCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'. 
boost::mutex children_lock_
const T * GetSamples(int *num_samples, int *period, SpinLock **lock=NULL) const 
void Update(const TRuntimeProfileTree &thrift_profile)
void ToThrift(TEventSequence *seq) const 
Counter * total_async_timer()
static void RegisterBucketingCounters(RuntimeProfile::Counter *src_counter, std::vector< RuntimeProfile::Counter * > *buckets)
Adds a bucketing counter to be updated at regular intervals. 
std::set< std::vector< Counter * > * > bucketing_counters_
A set of bucket counters registered in this runtime profile. 
TimeSeriesCounter * AddTimeSeriesCounter(const std::string &name, TUnit::type unit, DerivedCounterFunction sample_fn)
static void StopSamplingCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'. 
static void PrintChildCounters(const std::string &prefix, const std::string &counter_name, const CounterMap &counter_map, const ChildCounterMap &child_counter_map, std::ostream *s)
Print the child counters of the given counter name. 
std::vector< std::pair< RuntimeProfile *, bool > > ChildVector
vector of (profile, indentation flag) 
int64_t metadata_
user-supplied, uninterpreted metadata. 
static const string THREAD_INVOLUNTARY_CONTEXT_SWITCHES
Counter counter_total_time_
Counter * GetCounter(const std::string &name)
void PrettyPrint(std::ostream *s, const std::string &prefix="") const 
boost::mutex counter_map_lock_
protects counter_map_, counter_child_map_ and bucketing_counters_ 
TimeSeriesCounterMap time_series_counter_map_
static const std::string INACTIVE_TIME_COUNTER_NAME
InfoStringsDisplayOrder info_strings_display_order_
std::string DebugString() const 
static const string THREAD_USER_TIME
std::map< std::string, std::string > InfoStrings
static const std::string TOTAL_TIME_COUNTER_NAME
Name of the counter maintaining the total time. 
void AddChild(RuntimeProfile *child, bool indent=true, RuntimeProfile *location=NULL)
const std::string & name() const 
Returns name of this profile. 
const std::string * GetInfoString(const std::string &key) const 
void GetChildren(std::vector< RuntimeProfile * > *children)
boost::mutex info_strings_lock_
Protects info_strings_ and info_strings_display_order_. 
StreamingCounterSampler samples_
Counter * involuntary_context_switches_
void GetAllChildren(std::vector< RuntimeProfile * > *children)
Gets all profiles in tree, including this one. 
static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter *counter)
Stops 'counter' from receiving any more samples. 
bool is_averaged_profile_
void ToThrift(TRuntimeProfileTree *tree) const 
Counter * total_time_counter()
Returns the counter for the total elapsed time.