20 #include <boost/bind.hpp>
21 #include <boost/foreach.hpp>
22 #include <boost/thread/locks.hpp>
23 #include <boost/thread/thread.hpp>
54 bool is_averaged_profile)
59 is_averaged_profile_(is_averaged_profile),
60 counter_total_time_(TUnit::TIME_NS),
61 total_async_timer_(TUnit::TIME_NS),
62 inactive_timer_(TUnit::TIME_NS),
63 local_time_percent_(0),
68 if (!is_averaged_profile) {
83 map<string, Counter*>::const_iterator iter;
89 set<vector<Counter*>* >::const_iterator buckets_iter;
97 TimeSeriesCounterMap::const_iterator time_series_it;
107 const TRuntimeProfileTree& profiles) {
108 if (profiles.nodes.size() == 0)
return NULL;
114 const vector<TRuntimeProfileNode>& nodes,
int*
idx) {
115 DCHECK_LT(*idx, nodes.size());
117 const TRuntimeProfileNode& node = nodes[*
idx];
120 for (
int i = 0; i < node.counters.size(); ++i) {
121 const TCounter& counter = node.counters[i];
123 pool->
Add(
new Counter(counter.unit, counter.value));
126 if (node.__isset.event_sequences) {
127 BOOST_FOREACH(
const TEventSequence& sequence, node.event_sequences) {
129 pool->
Add(
new EventSequence(sequence.timestamps, sequence.labels));
133 if (node.__isset.time_series_counters) {
134 BOOST_FOREACH(
const TTimeSeriesCounter& val, node.time_series_counters) {
136 pool->
Add(
new TimeSeriesCounter(val.name, val.unit, val.period_ms, val.values));
145 for (
int i = 0; i < node.num_children; ++i) {
152 DCHECK(other != NULL);
157 CounterMap::iterator dst_iter;
158 CounterMap::const_iterator src_iter;
177 DCHECK(dst_iter->second->unit() == src_iter->second->unit());
184 ChildCounterMap::const_iterator child_counter_src_itr;
187 ++child_counter_src_itr) {
189 child_counter_src_itr->first, set<string>());
190 child_counters->insert(child_counter_src_itr->second.begin(),
191 child_counter_src_itr->second.end());
199 for (
int i = 0; i < other->
children_.size(); ++i) {
208 bool indent_other_child = other->
children_[i].second;
210 children_.push_back(make_pair(child, indent_other_child));
221 Update(thrift_profile.nodes, &idx);
222 DCHECK_EQ(idx, thrift_profile.nodes.size());
226 DCHECK_LT(*idx, nodes.size());
227 const TRuntimeProfileNode& node = nodes[*
idx];
231 map<string, Counter*>::iterator dst_iter;
232 for (
int i = 0; i < node.counters.size(); ++i) {
233 const TCounter& tcounter = node.counters[i];
234 CounterMap::iterator j =
counter_map_.find(tcounter.name);
237 pool_->
Add(
new Counter(tcounter.unit, tcounter.value));
239 if (j->second->unit() != tcounter.unit) {
240 LOG(ERROR) <<
"Cannot update counters with the same name ("
241 << j->first <<
") but different units.";
243 j->second->Set(tcounter.value);
248 ChildCounterMap::const_iterator child_counter_src_itr;
249 for (child_counter_src_itr = node.child_counters_map.begin();
250 child_counter_src_itr != node.child_counters_map.end();
251 ++child_counter_src_itr) {
253 child_counter_src_itr->first, set<string>());
254 child_counters->insert(child_counter_src_itr->second.begin(),
255 child_counter_src_itr->second.end());
261 const InfoStrings& info_strings = node.info_strings;
262 BOOST_FOREACH(
const string& key, node.info_strings_display_order) {
267 InfoStrings::const_iterator it = info_strings.find(key);
268 DCHECK(it != info_strings.end());
281 for (
int i = 0; i < node.time_series_counters.size(); ++i) {
282 const TTimeSeriesCounter& c = node.time_series_counters[i];
286 pool_->
Add(
new TimeSeriesCounter(c.name, c.unit, c.period_ms, c.values));
289 it->second->samples_.SetSamples(c.period_ms, c.values);
298 for (
int i = 0; i < node.num_children; ++i) {
299 const TRuntimeProfileNode& tchild = nodes[*
idx];
300 ChildMap::iterator j =
child_map_.find(tchild.name);
306 child->metadata_ = tchild.metadata;
308 children_.push_back(make_pair(child, tchild.indent));
310 child->Update(nodes, idx);
317 map<string, Counter*>::iterator iter;
321 if (iter->second->unit() == TUnit::DOUBLE_VALUE) {
322 iter->second->Set(iter->second->double_value() / n);
324 iter->second->value_ = iter->second->value() / n;
331 i->second->Divide(n);
341 if (total == 0)
return;
344 int64_t total_child_time = 0;
346 for (
int i = 0; i <
children_.size(); ++i) {
347 total_child_time +=
children_[i].first->total_time_counter()->value();
363 for (
int i = 0; i <
children_.size(); ++i) {
364 children_[i].first->ComputeTimeInProfile(total);
369 DCHECK(child != NULL);
378 children_.push_back(make_pair(child, indent));
381 if (it->first == loc) {
382 children_.insert(++it, make_pair(child, indent));
386 DCHECK(
false) <<
"Invalid loc";
394 children->push_back(i->second);
401 children->push_back(i->second);
402 i->second->GetAllChildren(children);
426 #define ADD_COUNTER_IMPL(NAME, T) \
427 RuntimeProfile::T* RuntimeProfile::NAME(\
428 const string& name, TUnit::type unit, const string& parent_counter_name) {\
429 DCHECK_EQ(is_averaged_profile_, false);\
430 lock_guard<mutex> l(counter_map_lock_);\
431 if (counter_map_.find(name) != counter_map_.end()) {\
432 return reinterpret_cast<T*>(counter_map_[name]);\
434 DCHECK(parent_counter_name == ROOT_COUNTER ||\
435 counter_map_.find(parent_counter_name) != counter_map_.end());\
436 T* counter = pool_->Add(new T(unit));\
437 counter_map_[name] = counter;\
438 set<string>* child_counters =\
439 FindOrInsert(&child_counter_map_, parent_counter_name, set<string>());\
440 child_counters->insert(name);\
448 const string&
name, TUnit::type unit,
455 set<string>* child_counters =
457 child_counters->insert(name);
462 const string& prefix) {
486 if (c != NULL) counters->push_back(c);
489 for (
int i = 0; i <
children_.size(); ++i) {
490 children_[i].first->GetCounters(name, counters);
508 ostream& stream = *s;
520 map<string, Counter*>::const_iterator total_time =
522 DCHECK(total_time != counter_map.end());
524 stream.flags(ios::fixed);
525 stream << prefix <<
name_ <<
":";
526 if (total_time->second->value() != 0) {
529 total_time->second->unit())
541 stream << prefix <<
" " << key <<
": " <<
info_strings_.find(key)->second << endl;
552 vector<EventSequence::Event>
events;
560 int64_t last = event_sequence.second->ElapsedTime();
561 event_sequence.second->GetEvents(&events);
562 if (last == 0 && events.size() > 0) last = events.back().second;
563 stream << prefix <<
" " << event_sequence.first <<
": "
568 event_sequence.second->GetEvents(&events);
570 stream << prefix <<
" - " <<
event.first <<
": "
586 const int64_t* samples = v.second->samples_.GetSamples(&num, &period, &lock);
588 stream << prefix <<
" " << v.first <<
"("
591 for (
int i = 0; i < num; ++i) {
593 if (i != num - 1) stream <<
", ";
602 prefix,
ROOT_COUNTER, counter_map, child_counter_map, s);
611 for (
int i = 0; i < children.size(); ++i) {
613 bool indent = children[i].second;
614 profile->
PrettyPrint(s, prefix + (indent ?
" " :
""));
625 TRuntimeProfileTree thrift_object;
628 vector<uint8_t> serialized_buffer;
629 Status status = serializer.Serialize(&thrift_object, &serialized_buffer);
630 if (!status.
ok())
return;
634 scoped_ptr<Codec> compressor;
637 if (!status.
ok())
return;
639 vector<uint8_t> compressed_buffer;
640 compressed_buffer.resize(compressor->MaxOutputLen(serialized_buffer.size()));
641 int64_t result_len = compressed_buffer.size();
642 uint8_t* compressed_buffer_ptr = &compressed_buffer[0];
643 compressor->ProcessBlock(
true, serialized_buffer.size(), &serialized_buffer[0],
644 &result_len, &compressed_buffer_ptr);
645 compressed_buffer.resize(result_len);
657 nodes->reserve(nodes->size() +
children_.size());
659 int index = nodes->size();
660 nodes->push_back(TRuntimeProfileNode());
661 TRuntimeProfileNode& node = (*nodes)[index];
673 for (map<string, Counter*>::const_iterator iter = counter_map.begin();
674 iter != counter_map.end(); ++iter) {
676 counter.name = iter->first;
677 counter.value = iter->second->value();
678 counter.unit = iter->second->unit();
679 node.counters.push_back(counter);
689 vector<EventSequence::Event>
events;
692 node.__set_event_sequences(vector<TEventSequence>());
696 TEventSequence* seq = &node.event_sequences[idx++];
697 seq->name = val.first;
698 val.second->GetEvents(&events);
700 seq->labels.push_back(ev.first);
701 seq->timestamps.push_back(ev.second);
710 node.__set_time_series_counters(vector<TTimeSeriesCounter>());
713 BOOST_FOREACH(
const TimeSeriesCounterMap::value_type& val,
715 val.second->ToThrift(&node.time_series_counters[idx++]);
725 for (
int i = 0; i < children.size(); ++i) {
726 int child_idx = nodes->size();
727 children[i].first->ToThrift(nodes);
729 (*nodes)[child_idx].indent = children[i].second;
736 DCHECK(total_counter->
unit() == TUnit::BYTES ||
737 total_counter->
unit() == TUnit::UNIT);
738 DCHECK(timer->
unit() == TUnit::TIME_NS);
740 if (timer->
value() == 0)
return 0;
741 double secs =
static_cast<double>(timer->
value()) / 1000.0 / 1000.0 / 1000.0;
742 return total_counter->
value() / secs;
747 for (
int i = 0; i < counters->size(); ++i) {
748 value += (*counters)[i]->value();
754 const string&
name, Counter* src_counter) {
755 TUnit::type dst_unit;
756 switch (src_counter->unit()) {
758 dst_unit = TUnit::BYTES_PER_SECOND;
761 dst_unit = TUnit::UNIT_PER_SECOND;
764 DCHECK(
false) <<
"Unsupported src counter unit: " << src_counter->
unit();
767 Counter* dst_counter =
AddCounter(name, dst_unit);
774 const string& name, DerivedCounterFunction fn, TUnit::type dst_unit) {
775 Counter* dst_counter =
AddCounter(name, dst_unit);
782 const string& name, Counter* src_counter) {
783 DCHECK(src_counter->unit() == TUnit::UNIT);
784 Counter* dst_counter =
AddCounter(name, TUnit::DOUBLE_VALUE);
791 const string& name, DerivedCounterFunction sample_fn) {
792 Counter* dst_counter =
AddCounter(name, TUnit::DOUBLE_VALUE);
799 vector<Counter*>* buckets) {
813 EventSequence* timer =
pool_->
Add(
new EventSequence());
819 const TEventSequence& from) {
824 EventSequence* timer =
pool_->
Add(
new EventSequence(from.timestamps, from.labels));
830 const string& counter_name,
const CounterMap& counter_map,
832 ostream& stream = *s;
833 ChildCounterMap::const_iterator itr = child_counter_map.find(counter_name);
834 if (itr != child_counter_map.end()) {
835 const set<string>& child_counters = itr->second;
836 BOOST_FOREACH(
const string& child_counter, child_counters) {
837 CounterMap::const_iterator iter = counter_map.find(child_counter);
838 if (iter == counter_map.end())
continue;
839 stream << prefix <<
" - " << iter->first <<
": "
843 child_counter_map, s);
849 const string& name, TUnit::type unit, DerivedCounterFunction fn) {
855 TimeSeriesCounter* counter =
pool_->
Add(
new TimeSeriesCounter(name, unit, fn));
862 const string& name, Counter* src_counter) {
863 DCHECK(src_counter != NULL);
869 counter->name =
name_;
870 counter->unit =
unit_;
875 counter->values.resize(num);
876 memcpy(&counter->values[0], samples, num *
sizeof(int64_t));
878 counter->period_ms = period;
883 ss <<
"Counter=" <<
name_ << endl
884 << samples_.DebugString();
890 seq->labels.push_back(ev.first);
891 seq->timestamps.push_back(ev.second);
Counter * AddCounter(const std::string &name, TUnit::type unit, const std::string &parent_counter_name="")
virtual int64_t value() const
DerivedCounter * AddDerivedCounter(const std::string &name, TUnit::type unit, const DerivedCounterFunction &counter_fn, const std::string &parent_counter_name="")
EventSequence * AddEventSequence(const std::string &key)
ChildCounterMap child_counter_map_
A set of counters that measure thread info, such as total time, user time, sys time.
const std::string GetDetail() const
std::string RedactCopy(const std::string &original)
Utility function to redacted a string without modifying the original.
Counter * AddSamplingCounter(const std::string &name, Counter *src_counter)
Counter total_async_timer_
static Status CreateCompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *compressor)
void AddInfoString(const std::string &key, const std::string &value)
client RuntimeProfile::EventSequence * events
void ComputeTimeInProfile()
std::string SerializeToArchiveString() const
static void RegisterTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter *counter)
Adds counter to be sampled and updated at regular intervals.
InfoStrings info_strings_
boost::mutex event_sequence_lock_
static const string ROOT_COUNTER
static RuntimeProfile * CreateFromThrift(ObjectPool *pool, const TRuntimeProfileTree &profiles)
Deserialize from thrift. Runtime profiles are allocated from the pool.
std::map< std::string, std::set< std::string > > ChildCounterMap
std::pair< std::string, int64_t > Event
An Event is a <label, timestamp> pair.
RuntimeProfile(ObjectPool *pool, const std::string &name, bool is_averaged_profile=false)
static void RegisterPeriodicCounter(RuntimeProfile::Counter *src_counter, RuntimeProfile::DerivedCounterFunction sample_fn, RuntimeProfile::Counter *dst_counter, PeriodicCounterType type)
void RegisterBucketingCounters(Counter *src_counter, std::vector< Counter * > *buckets)
std::string name_
Name for this runtime profile.
void GetCounters(const std::string &name, std::vector< Counter * > *counters)
EventSequenceMap event_sequence_map_
boost::function< int64_t()> DerivedCounterFunction
std::map< std::string, Counter * > CounterMap
Counter * AddRateCounter(const std::string &name, Counter *src_counter)
static int64_t UnitsPerSecond(const Counter *total_counter, const Counter *timer)
Derived counter function: return measured throughput as input_value/second.
Counter * inactive_timer()
void UpdateCounter(Counter *new_counter)
static void Base64Encode(const char *in, int in_len, stringstream *out)
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
boost::mutex time_series_counter_map_lock_
EventSequence * GetEventSequence(const std::string &name) const
Returns event sequence with the provided name if it exists, otherwise NULL.
static int64_t CounterSum(const std::vector< Counter * > *counters)
Derived counter function: return aggregated value.
void ToThrift(TTimeSeriesCounter *counter)
V * FindOrInsert(std::map< K, V > *m, const K &key, const V &default_val)
double local_time_percent_
void UpdateAverage(RuntimeProfile *src)
static const string THREAD_VOLUNTARY_CONTEXT_SWITCHES
void Divide(int n)
Divides all counters by n.
static const string THREAD_TOTAL_TIME
std::map< PlanNodeId, RuntimeProfile::Counter * > CounterMap
map from id of a scan node to a specific counter in the node's profile
bool own_pool_
True if we have to delete the pool_ on destruction.
ADD_COUNTER_IMPL(AddCounter, Counter)
static void StopBucketingCounters(std::vector< RuntimeProfile::Counter * > *buckets, bool convert)
static const std::string ASYNC_TIME_COUNTER_NAME
ThreadCounters * AddThreadCounters(const std::string &prefix)
Counter * voluntary_context_switches_
static const string THREAD_SYS_TIME
static void StopRateCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'.
boost::mutex children_lock_
const T * GetSamples(int *num_samples, int *period, SpinLock **lock=NULL) const
void Update(const TRuntimeProfileTree &thrift_profile)
void ToThrift(TEventSequence *seq) const
Counter * total_async_timer()
static void RegisterBucketingCounters(RuntimeProfile::Counter *src_counter, std::vector< RuntimeProfile::Counter * > *buckets)
Adds a bucketing counter to be updated at regular intervals.
std::set< std::vector< Counter * > * > bucketing_counters_
A set of bucket counters registered in this runtime profile.
TimeSeriesCounter * AddTimeSeriesCounter(const std::string &name, TUnit::type unit, DerivedCounterFunction sample_fn)
static void StopSamplingCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'.
static void PrintChildCounters(const std::string &prefix, const std::string &counter_name, const CounterMap &counter_map, const ChildCounterMap &child_counter_map, std::ostream *s)
Print the child counters of the given counter name.
std::vector< std::pair< RuntimeProfile *, bool > > ChildVector
vector of (profile, indentation flag)
int64_t metadata_
user-supplied, uninterpreted metadata.
static const string THREAD_INVOLUNTARY_CONTEXT_SWITCHES
Counter counter_total_time_
Counter * GetCounter(const std::string &name)
void PrettyPrint(std::ostream *s, const std::string &prefix="") const
boost::mutex counter_map_lock_
protects counter_map_, counter_child_map_ and bucketing_counters_
TimeSeriesCounterMap time_series_counter_map_
static const std::string INACTIVE_TIME_COUNTER_NAME
InfoStringsDisplayOrder info_strings_display_order_
std::string DebugString() const
static const string THREAD_USER_TIME
std::map< std::string, std::string > InfoStrings
static const std::string TOTAL_TIME_COUNTER_NAME
Name of the counter maintaining the total time.
void AddChild(RuntimeProfile *child, bool indent=true, RuntimeProfile *location=NULL)
const std::string & name() const
Returns name of this profile.
const std::string * GetInfoString(const std::string &key) const
void GetChildren(std::vector< RuntimeProfile * > *children)
boost::mutex info_strings_lock_
Protects info_strings_ and info_strings_display_order_.
StreamingCounterSampler samples_
Counter * involuntary_context_switches_
void GetAllChildren(std::vector< RuntimeProfile * > *children)
Gets all profiles in tree, including this one.
static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter *counter)
Stops 'counter' from receiving any more samples.
bool is_averaged_profile_
void ToThrift(TRuntimeProfileTree *tree) const
Counter * total_time_counter()
Returns the counter for the total elapsed time.