Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
runtime-profile-test.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <stdlib.h>
16 #include <stdio.h>
17 #include <iostream>
18 #include <gtest/gtest.h>
19 #include <boost/bind.hpp>
20 #include <boost/foreach.hpp>
21 #include "common/object-pool.h"
22 #include "util/cpu-info.h"
24 #include "util/runtime-profile.h"
25 #include "util/streaming-sampler.h"
26 #include "util/thread.h"
27 
28 #include "common/names.h"
29 
30 namespace impala {
31 
32 TEST(CountersTest, Basic) {
34  RuntimeProfile profile_a(&pool, "ProfileA");
35  RuntimeProfile profile_a1(&pool, "ProfileA1");
36  RuntimeProfile profile_a2(&pool, "ProfileAb");
37 
38  TRuntimeProfileTree thrift_profile;
39 
40  profile_a.AddChild(&profile_a1);
41  profile_a.AddChild(&profile_a2);
42 
43  // Test Empty
44  profile_a.ToThrift(&thrift_profile.nodes);
45  EXPECT_EQ(thrift_profile.nodes.size(), 3);
46  thrift_profile.nodes.clear();
47 
48  RuntimeProfile::Counter* counter_a;
49  RuntimeProfile::Counter* counter_b;
50  RuntimeProfile::Counter* counter_merged;
51 
52  // Updating/setting counter
53  counter_a = profile_a.AddCounter("A", TUnit::UNIT);
54  EXPECT_TRUE(counter_a != NULL);
55  counter_a->Add(10);
56  counter_a->Add(-5);
57  EXPECT_EQ(counter_a->value(), 5);
58  counter_a->Set(1L);
59  EXPECT_EQ(counter_a->value(), 1);
60 
61  counter_b = profile_a2.AddCounter("B", TUnit::BYTES);
62  EXPECT_TRUE(counter_b != NULL);
63 
64  // Serialize/deserialize
65  profile_a.ToThrift(&thrift_profile.nodes);
66  RuntimeProfile* from_thrift = RuntimeProfile::CreateFromThrift(&pool, thrift_profile);
67  counter_merged = from_thrift->GetCounter("A");
68  EXPECT_EQ(counter_merged->value(), 1);
69  EXPECT_TRUE(from_thrift->GetCounter("Not there") == NULL);
70 
71  // Averaged
72  RuntimeProfile averaged_profile(&pool, "Merged", true);
73  averaged_profile.UpdateAverage(from_thrift);
74  counter_merged = averaged_profile.GetCounter("A");
75  EXPECT_EQ(counter_merged->value(), 1);
76 
77  // UpdateAverage again, there should be no change.
78  averaged_profile.UpdateAverage(from_thrift);
79  EXPECT_EQ(counter_merged->value(), 1);
80 
81  counter_a = profile_a2.AddCounter("A", TUnit::UNIT);
82  counter_a->Set(3L);
83  averaged_profile.UpdateAverage(&profile_a2);
84  EXPECT_EQ(counter_merged->value(), 2);
85 
86  // Update
87  RuntimeProfile updated_profile(&pool, "Updated");
88  updated_profile.Update(thrift_profile);
89  RuntimeProfile::Counter* counter_updated = updated_profile.GetCounter("A");
90  EXPECT_EQ(counter_updated->value(), 1);
91 
92  // Update 2 more times, counters should stay the same
93  updated_profile.Update(thrift_profile);
94  updated_profile.Update(thrift_profile);
95  EXPECT_EQ(counter_updated->value(), 1);
96 }
97 
98 void ValidateCounter(RuntimeProfile* profile, const string& name, int64_t value) {
99  RuntimeProfile::Counter* counter = profile->GetCounter(name);
100  EXPECT_TRUE(counter != NULL);
101  EXPECT_EQ(counter->value(), value);
102 }
103 
104 TEST(CountersTest, MergeAndUpdate) {
105  // Create two trees. Each tree has two children, one of which has the
106  // same name in both trees. Merging the two trees should result in 3
107  // children, with the counters from the shared child aggregated.
108 
110  RuntimeProfile profile1(&pool, "Parent1");
111  RuntimeProfile p1_child1(&pool, "Child1");
112  RuntimeProfile p1_child2(&pool, "Child2");
113  profile1.AddChild(&p1_child1);
114  profile1.AddChild(&p1_child2);
115 
116  RuntimeProfile profile2(&pool, "Parent2");
117  RuntimeProfile p2_child1(&pool, "Child1");
118  RuntimeProfile p2_child3(&pool, "Child3");
119  profile2.AddChild(&p2_child1);
120  profile2.AddChild(&p2_child3);
121 
122  // Create parent level counters
123  RuntimeProfile::Counter* parent1_shared =
124  profile1.AddCounter("Parent Shared", TUnit::UNIT);
125  RuntimeProfile::Counter* parent2_shared =
126  profile2.AddCounter("Parent Shared", TUnit::UNIT);
127  RuntimeProfile::Counter* parent1_only =
128  profile1.AddCounter("Parent 1 Only", TUnit::UNIT);
129  RuntimeProfile::Counter* parent2_only =
130  profile2.AddCounter("Parent 2 Only", TUnit::UNIT);
131  parent1_shared->Add(1);
132  parent2_shared->Add(3);
133  parent1_only->Add(2);
134  parent2_only->Add(5);
135 
136  // Create child level counters
137  RuntimeProfile::Counter* p1_c1_shared =
138  p1_child1.AddCounter("Child1 Shared", TUnit::UNIT);
139  RuntimeProfile::Counter* p1_c1_only =
140  p1_child1.AddCounter("Child1 Parent 1 Only", TUnit::UNIT);
141  RuntimeProfile::Counter* p1_c2 =
142  p1_child2.AddCounter("Child2", TUnit::UNIT);
143  RuntimeProfile::Counter* p2_c1_shared =
144  p2_child1.AddCounter("Child1 Shared", TUnit::UNIT);
145  RuntimeProfile::Counter* p2_c1_only =
146  p1_child1.AddCounter("Child1 Parent 2 Only", TUnit::UNIT);
147  RuntimeProfile::Counter* p2_c3 =
148  p2_child3.AddCounter("Child3", TUnit::UNIT);
149  p1_c1_shared->Add(10);
150  p1_c1_only->Add(50);
151  p2_c1_shared->Add(20);
152  p2_c1_only->Add(100);
153  p2_c3->Add(30);
154  p1_c2->Add(40);
155 
156  // Merge the two and validate
157  TRuntimeProfileTree tprofile1;
158  profile1.ToThrift(&tprofile1);
159  RuntimeProfile averaged_profile(&pool, "merged", true);
160  averaged_profile.UpdateAverage(&profile1);
161  averaged_profile.UpdateAverage(&profile2);
162  EXPECT_EQ(6, averaged_profile.num_counters());
163  ValidateCounter(&averaged_profile, "Parent Shared", 2);
164  ValidateCounter(&averaged_profile, "Parent 1 Only", 2);
165  ValidateCounter(&averaged_profile, "Parent 2 Only", 5);
166 
167  vector<RuntimeProfile*> children;
168  averaged_profile.GetChildren(&children);
169  EXPECT_EQ(children.size(), 3);
170 
171  for (int i = 0; i < 3; ++i) {
172  RuntimeProfile* profile = children[i];
173  if (profile->name().compare("Child1") == 0) {
174  EXPECT_EQ(6, profile->num_counters());
175  ValidateCounter(profile, "Child1 Shared", 15);
176  ValidateCounter(profile, "Child1 Parent 1 Only", 50);
177  ValidateCounter(profile, "Child1 Parent 2 Only", 100);
178  } else if (profile->name().compare("Child2") == 0) {
179  EXPECT_EQ(4, profile->num_counters());
180  ValidateCounter(profile, "Child2", 40);
181  } else if (profile->name().compare("Child3") == 0) {
182  EXPECT_EQ(4, profile->num_counters());
183  ValidateCounter(profile, "Child3", 30);
184  } else {
185  EXPECT_TRUE(false);
186  }
187  }
188 
189  // make sure we can print
190  stringstream dummy;
191  averaged_profile.PrettyPrint(&dummy);
192 
193  // Update profile2 w/ profile1 and validate
194  profile2.Update(tprofile1);
195  EXPECT_EQ(6, profile2.num_counters());
196  ValidateCounter(&profile2, "Parent Shared", 1);
197  ValidateCounter(&profile2, "Parent 1 Only", 2);
198  ValidateCounter(&profile2, "Parent 2 Only", 5);
199 
200  profile2.GetChildren(&children);
201  EXPECT_EQ(children.size(), 3);
202 
203  for (int i = 0; i < 3; ++i) {
204  RuntimeProfile* profile = children[i];
205  if (profile->name().compare("Child1") == 0) {
206  EXPECT_EQ(6, profile->num_counters());
207  ValidateCounter(profile, "Child1 Shared", 10);
208  ValidateCounter(profile, "Child1 Parent 1 Only", 50);
209  ValidateCounter(profile, "Child1 Parent 2 Only", 100);
210  } else if (profile->name().compare("Child2") == 0) {
211  EXPECT_EQ(4, profile->num_counters());
212  ValidateCounter(profile, "Child2", 40);
213  } else if (profile->name().compare("Child3") == 0) {
214  EXPECT_EQ(4, profile->num_counters());
215  ValidateCounter(profile, "Child3", 30);
216  } else {
217  EXPECT_TRUE(false);
218  }
219  }
220 
221  // make sure we can print
222  profile2.PrettyPrint(&dummy);
223 }
224 
225 TEST(CountersTest, DerivedCounters) {
227  RuntimeProfile profile(&pool, "Profile");
228  RuntimeProfile::Counter* bytes_counter =
229  profile.AddCounter("bytes", TUnit::BYTES);
230  RuntimeProfile::Counter* ticks_counter =
231  profile.AddCounter("ticks", TUnit::TIME_NS);
232  // set to 1 sec
233  ticks_counter->Set(1000L * 1000L * 1000L);
234 
235  RuntimeProfile::DerivedCounter* throughput_counter =
236  profile.AddDerivedCounter("throughput", TUnit::BYTES,
237  bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_counter, ticks_counter));
238 
239  bytes_counter->Set(10L);
240  EXPECT_EQ(throughput_counter->value(), 10);
241  bytes_counter->Set(20L);
242  EXPECT_EQ(throughput_counter->value(), 20);
243  ticks_counter->Set(ticks_counter->value() / 2);
244  EXPECT_EQ(throughput_counter->value(), 40);
245 }
246 
247 TEST(CountersTest, AverageSetCounters) {
249  RuntimeProfile profile(&pool, "Profile");
250  RuntimeProfile::Counter* bytes_1_counter =
251  profile.AddCounter("bytes 1", TUnit::BYTES);
252  RuntimeProfile::Counter* bytes_2_counter =
253  profile.AddCounter("bytes 2", TUnit::BYTES);
254 
255  bytes_1_counter->Set(10L);
256  RuntimeProfile::AveragedCounter bytes_avg(TUnit::BYTES);
257  bytes_avg.UpdateCounter(bytes_1_counter);
258  // Avg of 10L
259  EXPECT_EQ(bytes_avg.value(), 10L);
260  bytes_1_counter->Set(20L);
261  bytes_avg.UpdateCounter(bytes_1_counter);
262  // Avg of 20L
263  EXPECT_EQ(bytes_avg.value(), 20L);
264  bytes_2_counter->Set(40L);
265  bytes_avg.UpdateCounter(bytes_2_counter);
266  // Avg of 20L and 40L
267  EXPECT_EQ(bytes_avg.value(), 30L);
268  bytes_2_counter->Set(30L);
269  bytes_avg.UpdateCounter(bytes_2_counter);
270  // Avg of 20L and 30L
271  EXPECT_EQ(bytes_avg.value(), 25L);
272 
273  RuntimeProfile::Counter* double_1_counter =
274  profile.AddCounter("double 1", TUnit::DOUBLE_VALUE);
275  RuntimeProfile::Counter* double_2_counter =
276  profile.AddCounter("double 2", TUnit::DOUBLE_VALUE);
277  double_1_counter->Set(1.0f);
278  RuntimeProfile::AveragedCounter double_avg(TUnit::DOUBLE_VALUE);
279  double_avg.UpdateCounter(double_1_counter);
280  // Avg of 1.0f
281  EXPECT_EQ(double_avg.double_value(), 1.0f);
282  double_1_counter->Set(2.0f);
283  double_avg.UpdateCounter(double_1_counter);
284  // Avg of 2.0f
285  EXPECT_EQ(double_avg.double_value(), 2.0f);
286  double_2_counter->Set(4.0f);
287  double_avg.UpdateCounter(double_2_counter);
288  // Avg of 2.0f and 4.0f
289  EXPECT_EQ(double_avg.double_value(), 3.0f);
290  double_2_counter->Set(3.0f);
291  double_avg.UpdateCounter(double_2_counter);
292  // Avg of 2.0f and 3.0f
293  EXPECT_EQ(double_avg.double_value(), 2.5f);
294 }
295 
296 TEST(CountersTest, InfoStringTest) {
298  RuntimeProfile profile(&pool, "Profile");
299  EXPECT_TRUE(profile.GetInfoString("Key") == NULL);
300 
301  profile.AddInfoString("Key", "Value");
302  const string* value = profile.GetInfoString("Key");
303  EXPECT_TRUE(value != NULL);
304  EXPECT_EQ(*value, "Value");
305 
306  // Convert it to thrift
307  TRuntimeProfileTree tprofile;
308  profile.ToThrift(&tprofile);
309 
310  // Convert it back
312  &pool, tprofile);
313  value = from_thrift->GetInfoString("Key");
314  EXPECT_TRUE(value != NULL);
315  EXPECT_EQ(*value, "Value");
316 
317  // Test update.
318  RuntimeProfile update_dst_profile(&pool, "Profile2");
319  update_dst_profile.Update(tprofile);
320  value = update_dst_profile.GetInfoString("Key");
321  EXPECT_TRUE(value != NULL);
322  EXPECT_EQ(*value, "Value");
323 
324  // Update the original profile, convert it to thrift and update from the dst
325  // profile
326  profile.AddInfoString("Key", "NewValue");
327  profile.AddInfoString("Foo", "Bar");
328  EXPECT_EQ(*profile.GetInfoString("Key"), "NewValue");
329  EXPECT_EQ(*profile.GetInfoString("Foo"), "Bar");
330  profile.ToThrift(&tprofile);
331 
332  update_dst_profile.Update(tprofile);
333  EXPECT_EQ(*update_dst_profile.GetInfoString("Key"), "NewValue");
334  EXPECT_EQ(*update_dst_profile.GetInfoString("Foo"), "Bar");
335 }
336 
337 TEST(CountersTest, RateCounters) {
339  RuntimeProfile profile(&pool, "Profile");
340 
341  RuntimeProfile::Counter* bytes_counter =
342  profile.AddCounter("bytes", TUnit::BYTES);
343 
344  RuntimeProfile::Counter* rate_counter =
345  profile.AddRateCounter("RateCounter", bytes_counter);
346  EXPECT_TRUE(rate_counter->unit() == TUnit::BYTES_PER_SECOND);
347 
348  EXPECT_EQ(rate_counter->value(), 0);
349  // set to 100MB. Use bigger units to avoid truncating to 0 after divides.
350  bytes_counter->Set(100L * 1024L * 1024L);
351 
352  // Wait one second.
353  sleep(1);
354 
355  int64_t rate = rate_counter->value();
356 
357  // Remove the counter so it no longer gets updates
359 
360  // The rate counter is not perfectly accurate. Currently updated at 500ms intervals,
361  // we should have seen somewhere between 1 and 3 updates (33 - 200 MB/s)
362  EXPECT_GT(rate, 66 * 1024 * 1024);
363  EXPECT_LE(rate, 200 * 1024 * 1024);
364 
365  // Wait another second. The counter has been removed. So the value should not be
366  // changed (much).
367  sleep(2);
368 
369  rate = rate_counter->value();
370  EXPECT_GT(rate, 66 * 1024 * 1024);
371  EXPECT_LE(rate, 200 * 1024 * 1024);
372 }
373 
374 TEST(CountersTest, BucketCounters) {
376  RuntimeProfile profile(&pool, "Profile");
377 
378  RuntimeProfile::Counter* unit_counter =
379  profile.AddCounter("unit", TUnit::UNIT);
380 
381  // Set the unit to 1 before sampling
382  unit_counter->Set(1L);
383 
384  // Create the bucket counters and start sampling
385  vector<RuntimeProfile::Counter*> buckets;
386  buckets.push_back(pool.Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
387  buckets.push_back(pool.Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
388  profile.RegisterBucketingCounters(unit_counter, &buckets);
389 
390  // Wait two seconds.
391  sleep(2);
392 
393  // Stop sampling
395 
396  // TODO: change the value to double
397  // The value of buckets[0] should be zero and buckets[1] should be 1.
398  double val0 = buckets[0]->double_value();
399  double val1 = buckets[1]->double_value();
400  EXPECT_EQ(0, val0);
401  EXPECT_EQ(100, val1);
402 
403  // Wait another second. The counter has been removed. So the value should not be
404  // changed (much).
405  sleep(2);
406  EXPECT_EQ(val0, buckets[0]->double_value());
407  EXPECT_EQ(val1, buckets[1]->double_value());
408 }
409 
410 TEST(CountersTest, EventSequences) {
412  RuntimeProfile profile(&pool, "Profile");
413  RuntimeProfile::EventSequence* seq = profile.AddEventSequence("event sequence");
414  seq->MarkEvent("aaaa");
415  seq->MarkEvent("bbbb");
416  seq->MarkEvent("cccc");
417 
418  vector<RuntimeProfile::EventSequence::Event> events;
419  seq->GetEvents(&events);
420  EXPECT_EQ(3, events.size());
421 
422  uint64_t last_timestamp = 0;
423  string last_string = "";
424  BOOST_FOREACH(const RuntimeProfile::EventSequence::Event& ev, events) {
425  EXPECT_TRUE(ev.second >= last_timestamp);
426  last_timestamp = ev.second;
427  EXPECT_TRUE(ev.first > last_string);
428  last_string = ev.first;
429  }
430 
431  TRuntimeProfileTree thrift_profile;
432  profile.ToThrift(&thrift_profile);
433  EXPECT_TRUE(thrift_profile.nodes[0].__isset.event_sequences);
434  EXPECT_EQ(1, thrift_profile.nodes[0].event_sequences.size());
435 
436  RuntimeProfile* reconstructed_profile =
437  RuntimeProfile::CreateFromThrift(&pool, thrift_profile);
438 
439  last_timestamp = 0;
440  last_string = "";
441  EXPECT_EQ(NULL, reconstructed_profile->GetEventSequence("doesn't exist"));
442  seq = reconstructed_profile->GetEventSequence("event sequence");
443  EXPECT_TRUE(seq != NULL);
444  seq->GetEvents(&events);
445  EXPECT_EQ(3, events.size());
446  BOOST_FOREACH(const RuntimeProfile::EventSequence::Event& ev, events) {
447  EXPECT_TRUE(ev.second >= last_timestamp);
448  last_timestamp = ev.second;
449  EXPECT_TRUE(ev.first > last_string);
450  last_string = ev.first;
451  }
452 }
453 
454 void ValidateSampler(const StreamingSampler<int, 10>& sampler, int expected_num,
455  int expected_period, int expected_delta) {
456  const int* samples = NULL;
457  int num_samples;
458  int period;
459 
460  samples = sampler.GetSamples(&num_samples, &period);
461  EXPECT_TRUE(samples != NULL);
462  EXPECT_EQ(num_samples, expected_num);
463  EXPECT_EQ(period, expected_period);
464 
465  for (int i = 0; i < expected_num - 1; ++i) {
466  EXPECT_EQ(samples[i] + expected_delta, samples[i + 1]) << i;
467  }
468 }
469 
470 TEST(CountersTest, StreamingSampler) {
472 
473  int idx = 0;
474  for (int i = 0; i < 3; ++i) {
475  sampler.AddSample(idx++, 500);
476  }
477  ValidateSampler(sampler, 3, 500, 1);
478 
479  for (int i = 0; i < 3; ++i) {
480  sampler.AddSample(idx++, 500);
481  }
482  ValidateSampler(sampler, 6, 500, 1);
483 
484  for (int i = 0; i < 3; ++i) {
485  sampler.AddSample(idx++, 500);
486  }
487  ValidateSampler(sampler, 9, 500, 1);
488 
489  // Added enough to cause a collapse
490  for (int i = 0; i < 3; ++i) {
491  sampler.AddSample(idx++, 500);
492  }
493  // Added enough to cause a collapse
494  ValidateSampler(sampler, 6, 1000, 2);
495 
496  for (int i = 0; i < 3; ++i) {
497  sampler.AddSample(idx++, 500);
498  }
499  ValidateSampler(sampler, 7, 1000, 2);
500 }
501 
502 }
503 
504 int main(int argc, char **argv) {
505  ::testing::InitGoogleTest(&argc, argv);
508  return RUN_ALL_TESTS();
509 }
Counter * AddCounter(const std::string &name, TUnit::type unit, const std::string &parent_counter_name="")
virtual int64_t value() const
void GetEvents(std::vector< Event > *events)
DerivedCounter * AddDerivedCounter(const std::string &name, TUnit::type unit, const DerivedCounterFunction &counter_fn, const std::string &parent_counter_name="")
EventSequence * AddEventSequence(const std::string &key)
virtual double double_value() const
void AddInfoString(const std::string &key, const std::string &value)
client RuntimeProfile::EventSequence * events
Definition: coordinator.h:64
static RuntimeProfile * CreateFromThrift(ObjectPool *pool, const TRuntimeProfileTree &profiles)
Deserialize from thrift. Runtime profiles are allocated from the pool.
std::pair< std::string, int64_t > Event
An Event is a <label, timestamp> pair.
void RegisterBucketingCounters(Counter *src_counter, std::vector< Counter * > *buckets)
TEST(AtomicTest, Basic)
Definition: atomic-test.cc:28
Counter * AddRateCounter(const std::string &name, Counter *src_counter)
static int64_t UnitsPerSecond(const Counter *total_counter, const Counter *timer)
Derived counter function: return measured throughput as input_value/second.
int num_counters() const
Returns the number of counters in this profile.
void ValidateCounter(RuntimeProfile *profile, const string &name, int64_t value)
void UpdateCounter(Counter *new_counter)
virtual void Set(int64_t value)
void UpdateAverage(RuntimeProfile *src)
void MarkEvent(const std::string &label)
static void StopBucketingCounters(std::vector< RuntimeProfile::Counter * > *buckets, bool convert)
int main(int argc, char **argv)
void InitThreading()
Initialises the threading subsystem. Must be called before a Thread is created.
Definition: thread.cc:261
ObjectPool pool
static void StopRateCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'.
const T * GetSamples(int *num_samples, int *period, SpinLock **lock=NULL) const
void Update(const TRuntimeProfileTree &thrift_profile)
Counter * GetCounter(const std::string &name)
void PrettyPrint(std::ostream *s, const std::string &prefix="") const
static void Init()
Initialize CpuInfo.
Definition: cpu-info.cc:75
void AddSample(T sample, int ms)
virtual void Add(int64_t delta)
void AddChild(RuntimeProfile *child, bool indent=true, RuntimeProfile *location=NULL)
string name
Definition: cpu-info.cc:50
const std::string & name() const
Returns name of this profile.
const std::string * GetInfoString(const std::string &key) const
void GetChildren(std::vector< RuntimeProfile * > *children)
void ToThrift(TRuntimeProfileTree *tree) const
void ValidateSampler(const StreamingSampler< int, 10 > &sampler, int expected_num, int expected_period, int expected_delta)