Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
partitioning-throughput-test.cc
Go to the documentation of this file.
1 #include <emmintrin.h>
2 #include <stdlib.h>
3 
4 #include <glog/logging.h>
5 
6 #include "tuple-types.h"
7 #include "common/compiler-util.h"
8 #include "util/cpu-info.h"
9 #include "util/debug-util.h"
10 #include "util/stopwatch.h"
11 
12 #define STREAMING true
13 
14 using namespace impala;
15 
16 namespace impala {
17 
18 // Tests the throughput of simply partitioning tuples from one stream into many
19 // with no other processing.
21  public:
22  // There will be 2^FANOUT_BITS buffers
23  static const uint64_t FANOUT_BITS = 6;
24  static const uint64_t NUM_BUFFERS = 1<<FANOUT_BITS;
25  // How many bytes of data to partition
26  static const uint64_t DATA_BYTES = 1<<30; // 2GB
27  static const uint64_t DATA_TUPLES = DATA_BYTES / sizeof(ProbeTuple);
28  // How many bytes each buffer will hold
29  // Twice as much as needed if randomness is perfect.
31  static const uint64_t BUFFER_TUPLES = BUFFER_BYTES / sizeof(ProbeTuple);
32 
33  static const int STREAMING_BUFFER_TUPLES = 8 * 4; // 4 cache linesp
34 
35  struct Buffer {
36  ProbeTuple tuples[BUFFER_TUPLES];
38  // offset by 7 cache lines
39  uint8_t offset[7 * 64 - sizeof(uint64_t)];
40 
41  Buffer() {
42  count = 0;
43  }
44  } __attribute__((__packed__)) __attribute__((aligned(64)));
45 
46  struct BufferBuffer {
48  int count;
49  uint8_t padding[64 - sizeof(int)];
50 
52  count = 0;
53  }
54  } __attribute__((__packed__)) __attribute__((aligned(64)));
55 
56 #if STREAMING
57  inline void BufferTuple(const ProbeTuple* tuple, Buffer* buffer) {
58  BufferBuffer* buffer_buffer = &buffer_buffers_[tuple->id];
59  DCHECK_LT(buffer_buffer->count, STREAMING_BUFFER_TUPLES);
60  buffer_buffer->tuples[buffer_buffer->count++] = *tuple;
61  if (UNLIKELY(buffer_buffer->count == STREAMING_BUFFER_TUPLES)) {
62  DCHECK_LE(buffer->count + buffer_buffer->count, BUFFER_TUPLES);
63  // Do a streaming write of streaming_tuples
64  __m128i* buffer_write_ptr = (__m128i*)&buffer->tuples[buffer->count];
65  // TODO code very dependent on size of ProbeTuple.
66  DCHECK_EQ(buffer_buffer->count % 2, 0);
67  for (int i = 0; i < buffer_buffer->count; i += 2) {
68  __m128i content = _mm_set_epi64x(*(long long*) (buffer_buffer->tuples + i),
69  *(long long*) (buffer_buffer->tuples + i + 1));
70  _mm_stream_si128(buffer_write_ptr + i/2, content);
71  }
72  buffer->count += buffer_buffer->count;
73  buffer_buffer->count = 0;
74  }
75  }
76 #endif
77 
78  void TestThroughput() {
79  // align allocations.
80  bool fail = posix_memalign((void**)&buffers_, __alignof(*buffers_), sizeof(*buffers_) * NUM_BUFFERS);
81  CHECK(!fail);
82  fail = posix_memalign((void**)&buffer_buffers_, __alignof(*buffer_buffers_), sizeof(*buffers_) * NUM_BUFFERS);
83  CHECK(!fail);
84  CHECK_EQ(((long)buffers_) % 64, 0);
85  for (int i = 0; i < NUM_BUFFERS; ++i) {
86  new (buffers_ + i) Buffer();
87  new (buffer_buffers_ + i) BufferBuffer();
88  }
89  ProbeTuple* tuples = GenTuples(DATA_TUPLES, NUM_BUFFERS);
90  StopWatch watch;
91  watch.Start();
92  for (uint64_t i = 0; i < DATA_TUPLES; ++i) {
93  const ProbeTuple* tuple = &tuples[i];
94  Buffer* buffer = &buffers_[tuple->id];
95 #if STREAMING
96  BufferTuple(tuple, buffer);
97 #else
98  buffer->tuples[buffer->count++] = *tuple;
99  DCHECK_LT(buffer->count, BUFFER_TUPLES);
100 #endif
101  }
102  watch.Stop();
103  LOG(ERROR) << PrettyPrinter::Print(watch.Ticks(), TUnit::CPU_TICKS);;
104  free(tuples);
105  // Note: destructors not called.
106  free(buffers_);
107  free(buffer_buffers_);
108  }
109 
111  const int NUM_RECORDS = 1<<27;
112  int64_t* buffer = (int64_t*) malloc(sizeof(long) * NUM_RECORDS);
113  int64_t constant = 0xFA57;
114  StopWatch watch;
115  watch.Start();
116  for (int64_t i = 0; i < NUM_RECORDS; ++i) {
117  buffer[i] = constant;
118  }
119  watch.Stop();
120  LOG(ERROR) << PrettyPrinter::Print(watch.Ticks(), TUnit::CPU_TICKS);;
121  free(buffer);
122  }
123 
124 
127 };
128 
129 }
130 
131 int main(int argc, char** argv) {
132  google::InitGoogleLogging(argv[0]);
133  CpuInfo::Init();
135  test.TestRawThroughput();
136  //test.TestThroughput();
137  return 0;
138 }
ProbeTuple tuples[BUFFER_TUPLES]
struct impala::PartitioningThroughputTest::Buffer __attribute__((__packed__)) __attribute__((aligned(64)))
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
#define UNLIKELY(expr)
Definition: compiler-util.h:33
uint8_t padding[64-sizeof(int)]
static void Init()
Initialize CpuInfo.
Definition: cpu-info.cc:75
int main(int argc, char **argv)