Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
runtime-profile.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "util/runtime-profile.h"
16 
17 #include <iomanip>
18 #include <iostream>
19 
20 #include <boost/bind.hpp>
21 #include <boost/foreach.hpp>
22 #include <boost/thread/locks.hpp>
23 #include <boost/thread/thread.hpp>
24 
25 #include "common/object-pool.h"
26 #include "rpc/thrift-util.h"
27 #include "util/compress.h"
28 #include "util/container-util.h"
29 #include "util/cpu-info.h"
30 #include "util/debug-util.h"
32 #include "util/redactor.h"
33 #include "util/url-coding.h"
34 
35 #include "common/names.h"
36 
37 namespace impala {
38 
39 // Thread counters name
40 static const string THREAD_TOTAL_TIME = "TotalWallClockTime";
41 static const string THREAD_USER_TIME = "UserTime";
42 static const string THREAD_SYS_TIME = "SysTime";
43 static const string THREAD_VOLUNTARY_CONTEXT_SWITCHES = "VoluntaryContextSwitches";
44 static const string THREAD_INVOLUNTARY_CONTEXT_SWITCHES = "InvoluntaryContextSwitches";
45 
46 // The root counter name for all top level counters.
47 static const string ROOT_COUNTER = "";
48 
49 const string RuntimeProfile::TOTAL_TIME_COUNTER_NAME = "TotalTime";
50 const string RuntimeProfile::INACTIVE_TIME_COUNTER_NAME = "InactiveTotalTime";
51 const string RuntimeProfile::ASYNC_TIME_COUNTER_NAME = "AsyncTotalTime";
52 
54  bool is_averaged_profile)
55  : pool_(pool),
56  own_pool_(false),
57  name_(name),
58  metadata_(-1),
59  is_averaged_profile_(is_averaged_profile),
60  counter_total_time_(TUnit::TIME_NS),
61  total_async_timer_(TUnit::TIME_NS),
62  inactive_timer_(TUnit::TIME_NS),
63  local_time_percent_(0),
64  local_time_ns_(0) {
68  if (!is_averaged_profile) {
69  total_time_counter = &counter_total_time_;
70  total_async_timer = &total_async_timer_;
71  inactive_timer = &inactive_timer_;
72  } else {
73  total_time_counter = pool->Add(new AveragedCounter(TUnit::TIME_NS));
74  total_async_timer = pool->Add(new AveragedCounter(TUnit::TIME_NS));
75  inactive_timer = pool->Add(new AveragedCounter(TUnit::TIME_NS));
76  }
80 }
81 
83  map<string, Counter*>::const_iterator iter;
84  for (iter = counter_map_.begin(); iter != counter_map_.end(); ++iter) {
87  }
88 
89  set<vector<Counter*>* >::const_iterator buckets_iter;
90  for (buckets_iter = bucketing_counters_.begin();
91  buckets_iter != bucketing_counters_.end(); ++buckets_iter) {
92  // This is just a clean up. No need to perform conversion. Also, the underlying
93  // counters might be gone already.
95  }
96 
97  TimeSeriesCounterMap::const_iterator time_series_it;
98  for (time_series_it = time_series_counter_map_.begin();
99  time_series_it != time_series_counter_map_.end(); ++time_series_it) {
100  PeriodicCounterUpdater::StopTimeSeriesCounter(time_series_it->second);
101  }
102 
103  if (own_pool_) delete pool_;
104 }
105 
107  const TRuntimeProfileTree& profiles) {
108  if (profiles.nodes.size() == 0) return NULL;
109  int idx = 0;
110  return RuntimeProfile::CreateFromThrift(pool, profiles.nodes, &idx);
111 }
112 
114  const vector<TRuntimeProfileNode>& nodes, int* idx) {
115  DCHECK_LT(*idx, nodes.size());
116 
117  const TRuntimeProfileNode& node = nodes[*idx];
118  RuntimeProfile* profile = pool->Add(new RuntimeProfile(pool, node.name));
119  profile->metadata_ = node.metadata;
120  for (int i = 0; i < node.counters.size(); ++i) {
121  const TCounter& counter = node.counters[i];
122  profile->counter_map_[counter.name] =
123  pool->Add(new Counter(counter.unit, counter.value));
124  }
125 
126  if (node.__isset.event_sequences) {
127  BOOST_FOREACH(const TEventSequence& sequence, node.event_sequences) {
128  profile->event_sequence_map_[sequence.name] =
129  pool->Add(new EventSequence(sequence.timestamps, sequence.labels));
130  }
131  }
132 
133  if (node.__isset.time_series_counters) {
134  BOOST_FOREACH(const TTimeSeriesCounter& val, node.time_series_counters) {
135  profile->time_series_counter_map_[val.name] =
136  pool->Add(new TimeSeriesCounter(val.name, val.unit, val.period_ms, val.values));
137  }
138  }
139 
140  profile->child_counter_map_ = node.child_counters_map;
141  profile->info_strings_ = node.info_strings;
142  profile->info_strings_display_order_ = node.info_strings_display_order;
143 
144  ++*idx;
145  for (int i = 0; i < node.num_children; ++i) {
146  profile->AddChild(RuntimeProfile::CreateFromThrift(pool, nodes, idx));
147  }
148  return profile;
149 }
150 
152  DCHECK(other != NULL);
153  DCHECK(is_averaged_profile_);
154 
155  // Merge this level
156  {
157  CounterMap::iterator dst_iter;
158  CounterMap::const_iterator src_iter;
159  lock_guard<mutex> l(counter_map_lock_);
160  lock_guard<mutex> m(other->counter_map_lock_);
161  for (src_iter = other->counter_map_.begin();
162  src_iter != other->counter_map_.end(); ++src_iter) {
163 
164  // Ignore this counter for averages.
165  if (src_iter->first == INACTIVE_TIME_COUNTER_NAME) continue;
166  if (src_iter->first == ASYNC_TIME_COUNTER_NAME) continue;
167 
168  dst_iter = counter_map_.find(src_iter->first);
169  AveragedCounter* avg_counter;
170 
171  // Get the counter with the same name in dst_iter (this->counter_map_)
172  // Create one if it doesn't exist.
173  if (dst_iter == counter_map_.end()) {
174  avg_counter = pool_->Add(new AveragedCounter(src_iter->second->unit()));
175  counter_map_[src_iter->first] = avg_counter;
176  } else {
177  DCHECK(dst_iter->second->unit() == src_iter->second->unit());
178  avg_counter = static_cast<AveragedCounter*>(dst_iter->second);
179  }
180 
181  avg_counter->UpdateCounter(src_iter->second);
182  }
183 
184  ChildCounterMap::const_iterator child_counter_src_itr;
185  for (child_counter_src_itr = other->child_counter_map_.begin();
186  child_counter_src_itr != other->child_counter_map_.end();
187  ++child_counter_src_itr) {
188  set<string>* child_counters = FindOrInsert(&child_counter_map_,
189  child_counter_src_itr->first, set<string>());
190  child_counters->insert(child_counter_src_itr->second.begin(),
191  child_counter_src_itr->second.end());
192  }
193  }
194 
195  {
196  lock_guard<mutex> l(children_lock_);
197  lock_guard<mutex> m(other->children_lock_);
198  // Recursively merge children with matching names
199  for (int i = 0; i < other->children_.size(); ++i) {
200  RuntimeProfile* other_child = other->children_[i].first;
201  ChildMap::iterator j = child_map_.find(other_child->name_);
202  RuntimeProfile* child = NULL;
203  if (j != child_map_.end()) {
204  child = j->second;
205  } else {
206  child = pool_->Add(new RuntimeProfile(pool_, other_child->name_, true));
207  child->metadata_ = other_child->metadata_;
208  bool indent_other_child = other->children_[i].second;
209  child_map_[child->name_] = child;
210  children_.push_back(make_pair(child, indent_other_child));
211  }
212  child->UpdateAverage(other_child);
213  }
214  }
215 
217 }
218 
219 void RuntimeProfile::Update(const TRuntimeProfileTree& thrift_profile) {
220  int idx = 0;
221  Update(thrift_profile.nodes, &idx);
222  DCHECK_EQ(idx, thrift_profile.nodes.size());
223 }
224 
225 void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx) {
226  DCHECK_LT(*idx, nodes.size());
227  const TRuntimeProfileNode& node = nodes[*idx];
228  {
229  lock_guard<mutex> l(counter_map_lock_);
230  // update this level
231  map<string, Counter*>::iterator dst_iter;
232  for (int i = 0; i < node.counters.size(); ++i) {
233  const TCounter& tcounter = node.counters[i];
234  CounterMap::iterator j = counter_map_.find(tcounter.name);
235  if (j == counter_map_.end()) {
236  counter_map_[tcounter.name] =
237  pool_->Add(new Counter(tcounter.unit, tcounter.value));
238  } else {
239  if (j->second->unit() != tcounter.unit) {
240  LOG(ERROR) << "Cannot update counters with the same name ("
241  << j->first << ") but different units.";
242  } else {
243  j->second->Set(tcounter.value);
244  }
245  }
246  }
247 
248  ChildCounterMap::const_iterator child_counter_src_itr;
249  for (child_counter_src_itr = node.child_counters_map.begin();
250  child_counter_src_itr != node.child_counters_map.end();
251  ++child_counter_src_itr) {
252  set<string>* child_counters = FindOrInsert(&child_counter_map_,
253  child_counter_src_itr->first, set<string>());
254  child_counters->insert(child_counter_src_itr->second.begin(),
255  child_counter_src_itr->second.end());
256  }
257  }
258 
259  {
260  lock_guard<mutex> l(info_strings_lock_);
261  const InfoStrings& info_strings = node.info_strings;
262  BOOST_FOREACH(const string& key, node.info_strings_display_order) {
263  // Look for existing info strings and update in place. If there
264  // are new strings, add them to the end of the display order.
265  // TODO: Is nodes.info_strings always a superset of
266  // info_strings_? If so, can just copy the display order.
267  InfoStrings::const_iterator it = info_strings.find(key);
268  DCHECK(it != info_strings.end());
269  InfoStrings::iterator existing = info_strings_.find(key);
270  if (existing == info_strings_.end()) {
271  info_strings_.insert(make_pair(key, it->second));
272  info_strings_display_order_.push_back(key);
273  } else {
274  info_strings_[key] = it->second;
275  }
276  }
277  }
278 
279  {
280  lock_guard<mutex> l(time_series_counter_map_lock_);
281  for (int i = 0; i < node.time_series_counters.size(); ++i) {
282  const TTimeSeriesCounter& c = node.time_series_counters[i];
283  TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(c.name);
284  if (it == time_series_counter_map_.end()) {
285  time_series_counter_map_[c.name] =
286  pool_->Add(new TimeSeriesCounter(c.name, c.unit, c.period_ms, c.values));
287  it = time_series_counter_map_.find(c.name);
288  } else {
289  it->second->samples_.SetSamples(c.period_ms, c.values);
290  }
291  }
292  }
293 
294  ++*idx;
295  {
296  lock_guard<mutex> l(children_lock_);
297  // update children with matching names; create new ones if they don't match
298  for (int i = 0; i < node.num_children; ++i) {
299  const TRuntimeProfileNode& tchild = nodes[*idx];
300  ChildMap::iterator j = child_map_.find(tchild.name);
301  RuntimeProfile* child = NULL;
302  if (j != child_map_.end()) {
303  child = j->second;
304  } else {
305  child = pool_->Add(new RuntimeProfile(pool_, tchild.name));
306  child->metadata_ = tchild.metadata;
307  child_map_[tchild.name] = child;
308  children_.push_back(make_pair(child, tchild.indent));
309  }
310  child->Update(nodes, idx);
311  }
312  }
313 }
314 
316  DCHECK_GT(n, 0);
317  map<string, Counter*>::iterator iter;
318  {
319  lock_guard<mutex> l(counter_map_lock_);
320  for (iter = counter_map_.begin(); iter != counter_map_.end(); ++iter) {
321  if (iter->second->unit() == TUnit::DOUBLE_VALUE) {
322  iter->second->Set(iter->second->double_value() / n);
323  } else {
324  iter->second->value_ = iter->second->value() / n;
325  }
326  }
327  }
328  {
329  lock_guard<mutex> l(children_lock_);
330  for (ChildMap::iterator i = child_map_.begin(); i != child_map_.end(); ++i) {
331  i->second->Divide(n);
332  }
333  }
334 }
335 
338 }
339 
341  if (total == 0) return;
342 
343  // Add all the total times in all the children
344  int64_t total_child_time = 0;
345  lock_guard<mutex> l(children_lock_);
346  for (int i = 0; i < children_.size(); ++i) {
347  total_child_time += children_[i].first->total_time_counter()->value();
348  }
349 
350  local_time_ns_ = total_time_counter()->value() - total_child_time;
351  if (!is_averaged_profile_) {
354  }
355 
356  // Counters have some margin, set to 0 if it was negative.
357  local_time_ns_ = ::max(0L, local_time_ns_);
359  static_cast<double>(local_time_ns_) / total_time_counter()->value();
360  local_time_percent_ = ::min(1.0, local_time_percent_) * 100;
361 
362  // Recurse on children
363  for (int i = 0; i < children_.size(); ++i) {
364  children_[i].first->ComputeTimeInProfile(total);
365  }
366 }
367 
368 void RuntimeProfile::AddChild(RuntimeProfile* child, bool indent, RuntimeProfile* loc) {
369  DCHECK(child != NULL);
370  lock_guard<mutex> l(children_lock_);
371  if (child_map_.count(child->name_) > 0) {
372  // This child has already been added, so do nothing.
373  // Otherwise, the map and vector will be out of sync.
374  return;
375  }
376  child_map_[child->name_] = child;
377  if (loc == NULL) {
378  children_.push_back(make_pair(child, indent));
379  } else {
380  for (ChildVector::iterator it = children_.begin(); it != children_.end(); ++it) {
381  if (it->first == loc) {
382  children_.insert(++it, make_pair(child, indent));
383  return;
384  }
385  }
386  DCHECK(false) << "Invalid loc";
387  }
388 }
389 
390 void RuntimeProfile::GetChildren(vector<RuntimeProfile*>* children) {
391  children->clear();
392  lock_guard<mutex> l(children_lock_);
393  for (ChildMap::iterator i = child_map_.begin(); i != child_map_.end(); ++i) {
394  children->push_back(i->second);
395  }
396 }
397 
398 void RuntimeProfile::GetAllChildren(vector<RuntimeProfile*>* children) {
399  lock_guard<mutex> l(children_lock_);
400  for (ChildMap::iterator i = child_map_.begin(); i != child_map_.end(); ++i) {
401  children->push_back(i->second);
402  i->second->GetAllChildren(children);
403  }
404 }
405 
406 void RuntimeProfile::AddInfoString(const string& key, const string& value) {
407  // Values may contain sensitive data, such as a query.
408  const string& info = RedactCopy(value);
409  lock_guard<mutex> l(info_strings_lock_);
410  InfoStrings::iterator it = info_strings_.find(key);
411  if (it == info_strings_.end()) {
412  info_strings_.insert(make_pair(key, info));
413  info_strings_display_order_.push_back(key);
414  } else {
415  it->second = info;
416  }
417 }
418 
419 const string* RuntimeProfile::GetInfoString(const string& key) const {
420  lock_guard<mutex> l(info_strings_lock_);
421  InfoStrings::const_iterator it = info_strings_.find(key);
422  if (it == info_strings_.end()) return NULL;
423  return &it->second;
424 }
425 
426 #define ADD_COUNTER_IMPL(NAME, T) \
427  RuntimeProfile::T* RuntimeProfile::NAME(\
428  const string& name, TUnit::type unit, const string& parent_counter_name) {\
429  DCHECK_EQ(is_averaged_profile_, false);\
430  lock_guard<mutex> l(counter_map_lock_);\
431  if (counter_map_.find(name) != counter_map_.end()) {\
432  return reinterpret_cast<T*>(counter_map_[name]);\
433  }\
434  DCHECK(parent_counter_name == ROOT_COUNTER ||\
435  counter_map_.find(parent_counter_name) != counter_map_.end());\
436  T* counter = pool_->Add(new T(unit));\
437  counter_map_[name] = counter;\
438  set<string>* child_counters =\
439  FindOrInsert(&child_counter_map_, parent_counter_name, set<string>());\
440  child_counters->insert(name);\
441  return counter;\
442  }
443 
444 ADD_COUNTER_IMPL(AddCounter, Counter);
445 ADD_COUNTER_IMPL(AddHighWaterMarkCounter, HighWaterMarkCounter);
446 
448  const string& name, TUnit::type unit,
449  const DerivedCounterFunction& counter_fn, const string& parent_counter_name) {
450  DCHECK_EQ(is_averaged_profile_, false);
451  lock_guard<mutex> l(counter_map_lock_);
452  if (counter_map_.find(name) != counter_map_.end()) return NULL;
453  DerivedCounter* counter = pool_->Add(new DerivedCounter(unit, counter_fn));
454  counter_map_[name] = counter;
455  set<string>* child_counters =
456  FindOrInsert(&child_counter_map_, parent_counter_name, set<string>());
457  child_counters->insert(name);
458  return counter;
459 }
460 
462  const string& prefix) {
463  ThreadCounters* counter = pool_->Add(new ThreadCounters());
464  counter->total_time_ = AddCounter(prefix + THREAD_TOTAL_TIME, TUnit::TIME_NS);
465  counter->user_time_ = AddCounter(prefix + THREAD_USER_TIME, TUnit::TIME_NS,
466  prefix + THREAD_TOTAL_TIME);
467  counter->sys_time_ = AddCounter(prefix + THREAD_SYS_TIME, TUnit::TIME_NS,
468  prefix + THREAD_TOTAL_TIME);
469  counter->voluntary_context_switches_ =
470  AddCounter(prefix + THREAD_VOLUNTARY_CONTEXT_SWITCHES, TUnit::UNIT);
472  AddCounter(prefix + THREAD_INVOLUNTARY_CONTEXT_SWITCHES, TUnit::UNIT);
473  return counter;
474 }
475 
477  lock_guard<mutex> l(counter_map_lock_);
478  if (counter_map_.find(name) != counter_map_.end()) {
479  return counter_map_[name];
480  }
481  return NULL;
482 }
483 
484 void RuntimeProfile::GetCounters(const string& name, vector<Counter*>* counters) {
485  Counter* c = GetCounter(name);
486  if (c != NULL) counters->push_back(c);
487 
488  lock_guard<mutex> l(children_lock_);
489  for (int i = 0; i < children_.size(); ++i) {
490  children_[i].first->GetCounters(name, counters);
491  }
492 }
493 
495 {
496  lock_guard<mutex> l(event_sequence_lock_);
497  EventSequenceMap::const_iterator it = event_sequence_map_.find(name);
498  if (it == event_sequence_map_.end()) return NULL;
499  return it->second;
500 }
501 
502 // Print the profile:
503 // 1. Profile Name
504 // 2. Info Strings
505 // 3. Counters
506 // 4. Children
507 void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const {
508  ostream& stream = *s;
509 
510  // create copy of counter_map_ and child_counter_map_ so we don't need to hold lock
511  // while we call value() on the counters (some of those might be DerivedCounters)
512  CounterMap counter_map;
513  ChildCounterMap child_counter_map;
514  {
515  lock_guard<mutex> l(counter_map_lock_);
516  counter_map = counter_map_;
517  child_counter_map = child_counter_map_;
518  }
519 
520  map<string, Counter*>::const_iterator total_time =
521  counter_map.find(TOTAL_TIME_COUNTER_NAME);
522  DCHECK(total_time != counter_map.end());
523 
524  stream.flags(ios::fixed);
525  stream << prefix << name_ << ":";
526  if (total_time->second->value() != 0) {
527  stream << "(Total: "
528  << PrettyPrinter::Print(total_time->second->value(),
529  total_time->second->unit())
530  << ", non-child: "
531  << PrettyPrinter::Print(local_time_ns_, TUnit::TIME_NS)
532  << ", % non-child: "
533  << setprecision(2) << local_time_percent_
534  << "%)";
535  }
536  stream << endl;
537 
538  {
539  lock_guard<mutex> l(info_strings_lock_);
540  BOOST_FOREACH(const string& key, info_strings_display_order_) {
541  stream << prefix << " " << key << ": " << info_strings_.find(key)->second << endl;
542  }
543  }
544 
545  {
546  // Print all the event timers as the following:
547  // <EventKey> Timeline: 2s719ms
548  // - Event 1: 6.522us (6.522us)
549  // - Event 2: 2s288ms (2s288ms)
550  // - Event 3: 2s410ms (121.138ms)
551  // The times in parentheses are the time elapsed since the last event.
552  vector<EventSequence::Event> events;
553  lock_guard<mutex> l(event_sequence_lock_);
554  BOOST_FOREACH(
555  const EventSequenceMap::value_type& event_sequence, event_sequence_map_) {
556  // If the stopwatch has never been started (e.g. because this sequence came from
557  // Thrift), look for the last element to tell us the total runtime. For
558  // currently-updating sequences, it's better to use the stopwatch value because that
559  // updates continuously.
560  int64_t last = event_sequence.second->ElapsedTime();
561  event_sequence.second->GetEvents(&events);
562  if (last == 0 && events.size() > 0) last = events.back().second;
563  stream << prefix << " " << event_sequence.first << ": "
564  << PrettyPrinter::Print(last, TUnit::TIME_NS)
565  << endl;
566 
567  int64_t prev = 0L;
568  event_sequence.second->GetEvents(&events);
569  BOOST_FOREACH(const EventSequence::Event& event, events) {
570  stream << prefix << " - " << event.first << ": "
571  << PrettyPrinter::Print(event.second, TUnit::TIME_NS) << " ("
572  << PrettyPrinter::Print(event.second - prev, TUnit::TIME_NS) << ")"
573  << endl;
574  prev = event.second;
575  }
576  }
577  }
578 
579  {
580  // Print all time series counters as following:
581  // <Name> (<period>): <val1>, <val2>, <etc>
582  lock_guard<mutex> l(time_series_counter_map_lock_);
583  BOOST_FOREACH(const TimeSeriesCounterMap::value_type& v, time_series_counter_map_) {
584  SpinLock* lock;
585  int num, period;
586  const int64_t* samples = v.second->samples_.GetSamples(&num, &period, &lock);
587  if (num > 0) {
588  stream << prefix << " " << v.first << "("
589  << PrettyPrinter::Print(period * 1000000L, TUnit::TIME_NS)
590  << "): ";
591  for (int i = 0; i < num; ++i) {
592  stream << PrettyPrinter::Print(samples[i], v.second->unit_);
593  if (i != num - 1) stream << ", ";
594  }
595  stream << endl;
596  }
597  lock->unlock();
598  }
599  }
600 
602  prefix, ROOT_COUNTER, counter_map, child_counter_map, s);
603 
604  // create copy of children_ so we don't need to hold lock while we call
605  // PrettyPrint() on the children
606  ChildVector children;
607  {
608  lock_guard<mutex> l(children_lock_);
609  children = children_;
610  }
611  for (int i = 0; i < children.size(); ++i) {
612  RuntimeProfile* profile = children[i].first;
613  bool indent = children[i].second;
614  profile->PrettyPrint(s, prefix + (indent ? " " : ""));
615  }
616 }
617 
619  stringstream ss;
621  return ss.str();
622 }
623 
624 void RuntimeProfile::SerializeToArchiveString(stringstream* out) const {
625  TRuntimeProfileTree thrift_object;
626  const_cast<RuntimeProfile*>(this)->ToThrift(&thrift_object);
627  ThriftSerializer serializer(true);
628  vector<uint8_t> serialized_buffer;
629  Status status = serializer.Serialize(&thrift_object, &serialized_buffer);
630  if (!status.ok()) return;
631 
632  // Compress the serialized thrift string. This uses string keys and is very
633  // easy to compress.
634  scoped_ptr<Codec> compressor;
635  status = Codec::CreateCompressor(NULL, false, THdfsCompression::DEFAULT, &compressor);
636  DCHECK(status.ok()) << status.GetDetail();
637  if (!status.ok()) return;
638 
639  vector<uint8_t> compressed_buffer;
640  compressed_buffer.resize(compressor->MaxOutputLen(serialized_buffer.size()));
641  int64_t result_len = compressed_buffer.size();
642  uint8_t* compressed_buffer_ptr = &compressed_buffer[0];
643  compressor->ProcessBlock(true, serialized_buffer.size(), &serialized_buffer[0],
644  &result_len, &compressed_buffer_ptr);
645  compressed_buffer.resize(result_len);
646 
647  Base64Encode(compressed_buffer, out);
648  compressor->Close();
649 }
650 
651 void RuntimeProfile::ToThrift(TRuntimeProfileTree* tree) const {
652  tree->nodes.clear();
653  ToThrift(&tree->nodes);
654 }
655 
656 void RuntimeProfile::ToThrift(vector<TRuntimeProfileNode>* nodes) const {
657  nodes->reserve(nodes->size() + children_.size());
658 
659  int index = nodes->size();
660  nodes->push_back(TRuntimeProfileNode());
661  TRuntimeProfileNode& node = (*nodes)[index];
662  node.name = name_;
663  node.num_children = children_.size();
664  node.metadata = metadata_;
665  node.indent = true;
666 
667  CounterMap counter_map;
668  {
669  lock_guard<mutex> l(counter_map_lock_);
670  counter_map = counter_map_;
671  node.child_counters_map = child_counter_map_;
672  }
673  for (map<string, Counter*>::const_iterator iter = counter_map.begin();
674  iter != counter_map.end(); ++iter) {
675  TCounter counter;
676  counter.name = iter->first;
677  counter.value = iter->second->value();
678  counter.unit = iter->second->unit();
679  node.counters.push_back(counter);
680  }
681 
682  {
683  lock_guard<mutex> l(info_strings_lock_);
684  node.info_strings = info_strings_;
685  node.info_strings_display_order = info_strings_display_order_;
686  }
687 
688  {
689  vector<EventSequence::Event> events;
690  lock_guard<mutex> l(event_sequence_lock_);
691  if (event_sequence_map_.size() != 0) {
692  node.__set_event_sequences(vector<TEventSequence>());
693  node.event_sequences.resize(event_sequence_map_.size());
694  int idx = 0;
695  BOOST_FOREACH(const EventSequenceMap::value_type& val, event_sequence_map_) {
696  TEventSequence* seq = &node.event_sequences[idx++];
697  seq->name = val.first;
698  val.second->GetEvents(&events);
699  BOOST_FOREACH(const EventSequence::Event& ev, events) {
700  seq->labels.push_back(ev.first);
701  seq->timestamps.push_back(ev.second);
702  }
703  }
704  }
705  }
706 
707  {
708  lock_guard<mutex> l(time_series_counter_map_lock_);
709  if (time_series_counter_map_.size() != 0) {
710  node.__set_time_series_counters(vector<TTimeSeriesCounter>());
711  node.time_series_counters.resize(time_series_counter_map_.size());
712  int idx = 0;
713  BOOST_FOREACH(const TimeSeriesCounterMap::value_type& val,
715  val.second->ToThrift(&node.time_series_counters[idx++]);
716  }
717  }
718  }
719 
720  ChildVector children;
721  {
722  lock_guard<mutex> l(children_lock_);
723  children = children_;
724  }
725  for (int i = 0; i < children.size(); ++i) {
726  int child_idx = nodes->size();
727  children[i].first->ToThrift(nodes);
728  // fix up indentation flag
729  (*nodes)[child_idx].indent = children[i].second;
730  }
731 }
732 
734  const RuntimeProfile::Counter* total_counter,
735  const RuntimeProfile::Counter* timer) {
736  DCHECK(total_counter->unit() == TUnit::BYTES ||
737  total_counter->unit() == TUnit::UNIT);
738  DCHECK(timer->unit() == TUnit::TIME_NS);
739 
740  if (timer->value() == 0) return 0;
741  double secs = static_cast<double>(timer->value()) / 1000.0 / 1000.0 / 1000.0;
742  return total_counter->value() / secs;
743 }
744 
745 int64_t RuntimeProfile::CounterSum(const vector<Counter*>* counters) {
746  int64_t value = 0;
747  for (int i = 0; i < counters->size(); ++i) {
748  value += (*counters)[i]->value();
749  }
750  return value;
751 }
752 
754  const string& name, Counter* src_counter) {
755  TUnit::type dst_unit;
756  switch (src_counter->unit()) {
757  case TUnit::BYTES:
758  dst_unit = TUnit::BYTES_PER_SECOND;
759  break;
760  case TUnit::UNIT:
761  dst_unit = TUnit::UNIT_PER_SECOND;
762  break;
763  default:
764  DCHECK(false) << "Unsupported src counter unit: " << src_counter->unit();
765  return NULL;
766  }
767  Counter* dst_counter = AddCounter(name, dst_unit);
768  PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
770  return dst_counter;
771 }
772 
773 RuntimeProfile::Counter* RuntimeProfile::AddRateCounter(
774  const string& name, DerivedCounterFunction fn, TUnit::type dst_unit) {
775  Counter* dst_counter = AddCounter(name, dst_unit);
778  return dst_counter;
779 }
780 
781 RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
782  const string& name, Counter* src_counter) {
783  DCHECK(src_counter->unit() == TUnit::UNIT);
784  Counter* dst_counter = AddCounter(name, TUnit::DOUBLE_VALUE);
785  PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
787  return dst_counter;
788 }
789 
790 RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
791  const string& name, DerivedCounterFunction sample_fn) {
792  Counter* dst_counter = AddCounter(name, TUnit::DOUBLE_VALUE);
793  PeriodicCounterUpdater::RegisterPeriodicCounter(NULL, sample_fn, dst_counter,
795  return dst_counter;
796 }
797 
799  vector<Counter*>* buckets) {
800  {
801  lock_guard<mutex> l(counter_map_lock_);
802  bucketing_counters_.insert(buckets);
803  }
804 
806 }
807 
809  lock_guard<mutex> l(event_sequence_lock_);
810  EventSequenceMap::iterator timer_it = event_sequence_map_.find(name);
811  if (timer_it != event_sequence_map_.end()) return timer_it->second;
812 
813  EventSequence* timer = pool_->Add(new EventSequence());
814  event_sequence_map_[name] = timer;
815  return timer;
816 }
817 
818 RuntimeProfile::EventSequence* RuntimeProfile::AddEventSequence(const string& name,
819  const TEventSequence& from) {
820  lock_guard<mutex> l(event_sequence_lock_);
821  EventSequenceMap::iterator timer_it = event_sequence_map_.find(name);
822  if (timer_it != event_sequence_map_.end()) return timer_it->second;
823 
824  EventSequence* timer = pool_->Add(new EventSequence(from.timestamps, from.labels));
825  event_sequence_map_[name] = timer;
826  return timer;
827 }
828 
829 void RuntimeProfile::PrintChildCounters(const string& prefix,
830  const string& counter_name, const CounterMap& counter_map,
831  const ChildCounterMap& child_counter_map, ostream* s) {
832  ostream& stream = *s;
833  ChildCounterMap::const_iterator itr = child_counter_map.find(counter_name);
834  if (itr != child_counter_map.end()) {
835  const set<string>& child_counters = itr->second;
836  BOOST_FOREACH(const string& child_counter, child_counters) {
837  CounterMap::const_iterator iter = counter_map.find(child_counter);
838  if (iter == counter_map.end()) continue;
839  stream << prefix << " - " << iter->first << ": "
840  << PrettyPrinter::Print(iter->second->value(), iter->second->unit(), true)
841  << endl;
842  RuntimeProfile::PrintChildCounters(prefix + " ", child_counter, counter_map,
843  child_counter_map, s);
844  }
845  }
846 }
847 
849  const string& name, TUnit::type unit, DerivedCounterFunction fn) {
850  DCHECK(fn != NULL);
851 
852  lock_guard<mutex> l(time_series_counter_map_lock_);
853  TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
854  if (it != time_series_counter_map_.end()) return it->second;
855  TimeSeriesCounter* counter = pool_->Add(new TimeSeriesCounter(name, unit, fn));
856  time_series_counter_map_[name] = counter;
858  return counter;
859 }
860 
861 RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddTimeSeriesCounter(
862  const string& name, Counter* src_counter) {
863  DCHECK(src_counter != NULL);
864  return AddTimeSeriesCounter(name, src_counter->unit(),
865  bind(&Counter::value, src_counter));
866 }
867 
868 void RuntimeProfile::TimeSeriesCounter::ToThrift(TTimeSeriesCounter* counter) {
869  counter->name = name_;
870  counter->unit = unit_;
871 
872  int num, period;
873  SpinLock* lock;
874  const int64_t* samples = samples_.GetSamples(&num, &period, &lock);
875  counter->values.resize(num);
876  memcpy(&counter->values[0], samples, num * sizeof(int64_t));
877  lock->unlock();
878  counter->period_ms = period;
879 }
880 
882  stringstream ss;
883  ss << "Counter=" << name_ << endl
884  << samples_.DebugString();
885  return ss.str();
886 }
887 
888 void RuntimeProfile::EventSequence::ToThrift(TEventSequence* seq) const {
889  BOOST_FOREACH(const EventSequence::Event& ev, events_) {
890  seq->labels.push_back(ev.first);
891  seq->timestamps.push_back(ev.second);
892  }
893 }
894 
895 }
Counter * AddCounter(const std::string &name, TUnit::type unit, const std::string &parent_counter_name="")
virtual int64_t value() const
DerivedCounter * AddDerivedCounter(const std::string &name, TUnit::type unit, const DerivedCounterFunction &counter_fn, const std::string &parent_counter_name="")
EventSequence * AddEventSequence(const std::string &key)
ChildCounterMap child_counter_map_
A set of counters that measure thread info, such as total time, user time, sys time.
const std::string GetDetail() const
Definition: status.cc:184
std::string RedactCopy(const std::string &original)
Utility function to redacted a string without modifying the original.
Definition: redactor.h:63
Counter * AddSamplingCounter(const std::string &name, Counter *src_counter)
static Status CreateCompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *compressor)
void AddInfoString(const std::string &key, const std::string &value)
client RuntimeProfile::EventSequence * events
Definition: coordinator.h:64
std::string SerializeToArchiveString() const
static void RegisterTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter *counter)
Adds counter to be sampled and updated at regular intervals.
boost::mutex event_sequence_lock_
static const string ROOT_COUNTER
static RuntimeProfile * CreateFromThrift(ObjectPool *pool, const TRuntimeProfileTree &profiles)
Deserialize from thrift. Runtime profiles are allocated from the pool.
std::map< std::string, std::set< std::string > > ChildCounterMap
std::pair< std::string, int64_t > Event
An Event is a <label, timestamp> pair.
RuntimeProfile(ObjectPool *pool, const std::string &name, bool is_averaged_profile=false)
static void RegisterPeriodicCounter(RuntimeProfile::Counter *src_counter, RuntimeProfile::DerivedCounterFunction sample_fn, RuntimeProfile::Counter *dst_counter, PeriodicCounterType type)
void RegisterBucketingCounters(Counter *src_counter, std::vector< Counter * > *buckets)
std::string name_
Name for this runtime profile.
void GetCounters(const std::string &name, std::vector< Counter * > *counters)
EventSequenceMap event_sequence_map_
boost::function< int64_t()> DerivedCounterFunction
Lightweight spinlock.
Definition: spinlock.h:24
std::map< std::string, Counter * > CounterMap
Counter * AddRateCounter(const std::string &name, Counter *src_counter)
static int64_t UnitsPerSecond(const Counter *total_counter, const Counter *timer)
Derived counter function: return measured throughput as input_value/second.
void UpdateCounter(Counter *new_counter)
static void Base64Encode(const char *in, int in_len, stringstream *out)
Definition: url-coding.cc:110
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
boost::mutex time_series_counter_map_lock_
EventSequence * GetEventSequence(const std::string &name) const
Returns event sequence with the provided name if it exists, otherwise NULL.
void unlock()
Definition: spinlock.h:33
static int64_t CounterSum(const std::vector< Counter * > *counters)
Derived counter function: return aggregated value.
void ToThrift(TTimeSeriesCounter *counter)
V * FindOrInsert(std::map< K, V > *m, const K &key, const V &default_val)
void UpdateAverage(RuntimeProfile *src)
static const string THREAD_VOLUNTARY_CONTEXT_SWITCHES
void Divide(int n)
Divides all counters by n.
static const string THREAD_TOTAL_TIME
std::map< PlanNodeId, RuntimeProfile::Counter * > CounterMap
map from id of a scan node to a specific counter in the node's profile
Definition: coordinator.h:204
bool own_pool_
True if we have to delete the pool_ on destruction.
ADD_COUNTER_IMPL(AddCounter, Counter)
static void StopBucketingCounters(std::vector< RuntimeProfile::Counter * > *buckets, bool convert)
static const std::string ASYNC_TIME_COUNTER_NAME
ThreadCounters * AddThreadCounters(const std::string &prefix)
ObjectPool pool
static const string THREAD_SYS_TIME
static void StopRateCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'.
const T * GetSamples(int *num_samples, int *period, SpinLock **lock=NULL) const
void Update(const TRuntimeProfileTree &thrift_profile)
void ToThrift(TEventSequence *seq) const
static void RegisterBucketingCounters(RuntimeProfile::Counter *src_counter, std::vector< RuntimeProfile::Counter * > *buckets)
Adds a bucketing counter to be updated at regular intervals.
std::set< std::vector< Counter * > * > bucketing_counters_
A set of bucket counters registered in this runtime profile.
TimeSeriesCounter * AddTimeSeriesCounter(const std::string &name, TUnit::type unit, DerivedCounterFunction sample_fn)
static void StopSamplingCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'.
static void PrintChildCounters(const std::string &prefix, const std::string &counter_name, const CounterMap &counter_map, const ChildCounterMap &child_counter_map, std::ostream *s)
Print the child counters of the given counter name.
std::vector< std::pair< RuntimeProfile *, bool > > ChildVector
vector of (profile, indentation flag)
int64_t metadata_
user-supplied, uninterpreted metadata.
static const string THREAD_INVOLUNTARY_CONTEXT_SWITCHES
Counter * GetCounter(const std::string &name)
void PrettyPrint(std::ostream *s, const std::string &prefix="") const
boost::mutex counter_map_lock_
protects counter_map_, counter_child_map_ and bucketing_counters_
TimeSeriesCounterMap time_series_counter_map_
static const std::string INACTIVE_TIME_COUNTER_NAME
InfoStringsDisplayOrder info_strings_display_order_
static const string THREAD_USER_TIME
std::map< std::string, std::string > InfoStrings
static const std::string TOTAL_TIME_COUNTER_NAME
Name of the counter maintaining the total time.
void AddChild(RuntimeProfile *child, bool indent=true, RuntimeProfile *location=NULL)
bool ok() const
Definition: status.h:172
string name
Definition: cpu-info.cc:50
const std::string & name() const
Returns name of this profile.
const std::string * GetInfoString(const std::string &key) const
void GetChildren(std::vector< RuntimeProfile * > *children)
boost::mutex info_strings_lock_
Protects info_strings_ and info_strings_display_order_.
void GetAllChildren(std::vector< RuntimeProfile * > *children)
Gets all profiles in tree, including this one.
static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter *counter)
Stops 'counter' from receiving any more samples.
void ToThrift(TRuntimeProfileTree *tree) const
Counter * total_time_counter()
Returns the counter for the total elapsed time.