Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
tuple-splitter-test.cc
Go to the documentation of this file.
1 // Copyright (c) 2012 Cloudera, Inc. All rights reserved.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <iostream>
19 #include <vector>
20 
21 #include "common/compiler-util.h"
23 #include "runtime/mem-pool.h"
24 #include "runtime/mem-tracker.h"
25 #include "runtime/string-value.h"
26 #include "util/cpu-info.h"
27 #include "util/runtime-profile.h"
28 
29 #define PRINT_RESULT 0
30 
31 #define UNUSED_BITS 16
32 #define USED_BITS 48
33 #define UNUSED_UPPER_BITS_MASK 0xFFFF000000000000
34 #define POINTER_LOWER_BITS_MASK 0x0000FFFFFFFFFFFF
35 
36 #include "common/names.h"
37 
38 using namespace impala;
39 
40 inline void Memcpy16(uint8_t* dst, uint8_t* src) {
41  reinterpret_cast<uint64_t*>(dst)[0] = reinterpret_cast<uint64_t*>(src)[0];
42  reinterpret_cast<uint64_t*>(dst)[1] = reinterpret_cast<uint64_t*>(src)[1];
43 }
44 
45 struct PointerValue {
46  union {
48  uint8_t* ptr;
49  };
50 
51  template<typename T>
52  T GetPointer() {
53  return reinterpret_cast<T>(val & POINTER_LOWER_BITS_MASK);
54  }
55 
56  uint16_t GetValue() const {
57  return static_cast<uint16_t>(val >> USED_BITS);
58  }
59 
60  template<typename T>
61  void Get(T& t, uint16_t& v) {
62  t = reinterpret_cast<T>(val & POINTER_LOWER_BITS_MASK);
63  v = static_cast<uint16_t>(val >> USED_BITS);
64  }
65 
66  void UpdatePointer(void* p) {
67  val = val >> USED_BITS;
68  val = val << USED_BITS;
69  val = val | reinterpret_cast<uint64_t>(p);
70  }
71 
72  void UpdateValue(uint16_t v) {
73  val = val << UNUSED_BITS;
74  val = val >> UNUSED_BITS;
75  val = val | (static_cast<uint64_t>(v) << USED_BITS);
76  }
77 
78  void Update(void* p, int16_t v) {
79  val = reinterpret_cast<uint64_t>(p);
80  val = val | (static_cast<uint64_t>(v) << USED_BITS);
81  }
82 };
83 
84 template <class T>
86  k--;
87  for (int i = 1; i < sizeof(T) * CHAR_BIT; i <<=1) k = k | k >> i;
88  return k+1;
89 }
90 
92  public:
93  DataPartitioner(MemPool* pool, RuntimeProfile* profile, int size, int hash_offset) {
94  pool_ = pool;
95  profile_ = profile;
96  size_ = size;
97  hash_offset_ = hash_offset;
98 
99  bytes_allocated_ = ADD_COUNTER(profile, "BytesAllocated", TUnit::BYTES);
100  bytes_copied_ = ADD_COUNTER(profile, "BytesCopied", TUnit::BYTES);
101  add_time_ = ADD_COUNTER(profile, "AddTime", TUnit::CPU_TICKS);
102  split_time_ = ADD_COUNTER(profile, "SplitTime", TUnit::CPU_TICKS);
103  splits_ = ADD_COUNTER(profile, "Splits", TUnit::UNIT);
104 
105  // Max of tuple size and cache line
106  int min_size_per_partition = max(size, 64);
107  partitions_per_level_ = NextPowerOfTwo(L1_size / min_size_per_partition) >> 1;
108  tuples_per_partition_ = L1_size / size;
109 
110  printf("Tuple Size: %d\n", size_);
111  printf("Partitions: %d Per Partition: %d\n", partitions_per_level_, tuples_per_partition_);
112 
113  partition_idx_mask_ = partitions_per_level_ - 1;
114  build_partitions_.resize(partitions_per_level_);
115  child_partitions_.resize(partitions_per_level_);
116 
117  for (int i = 0; i < partitions_per_level_; ++i) {
118  build_partitions_[i].level = 0;
119  }
120 
121  partitions_at_level_.resize(6);
122  partitions_at_level_[0] = partitions_per_level_;
123  partitions_at_level_[1] = partitions_per_level_;
124  partitions_at_level_[2] = partitions_per_level_;
125  partitions_at_level_[3] = partitions_per_level_;
126  partitions_at_level_[4] = partitions_per_level_;
127  partitions_at_level_[5] = partitions_per_level_;
128 
129  split_counts_.resize(partitions_per_level_);
130 
131  outputs_.resize(partitions_per_level_);
132  for (int i = 0; i < outputs_.size(); ++i) {
133  outputs_[i].Update(NULL, tuples_per_partition_);
134  }
135  }
136 
137  void AddData(int n, uint8_t* data);
138 
139  int size_per_block() const {
140  return tuples_per_partition_;
141  }
142 
143  struct Partition {
144  vector<uint8_t*> blocks;
146 
148  num_last_block = 0;
149  }
150 
151  Partition(const vector<uint8_t*>& blocks, int num) {
152  this->blocks = blocks;
153  this->num_last_block = num;
154  }
155  };
156 
157  int NumTuples(const Partition& partition) const {
158  return partition.num_last_block + (partition.blocks.size() - 1) * tuples_per_partition_;
159  }
160 
161  int NumTuples(const Partition& partition, int block_idx) const {
162  if (block_idx == partition.blocks.size() - 1) return partition.num_last_block;
163  return tuples_per_partition_;
164  }
165 
166  bool Finalize(vector<Partition>* results) {
167  SCOPED_TIMER(split_time_);
168  bool result = true;
169  for (int i = 0; i < build_partitions_.size(); ++i) {
170  if (build_partitions_[i].blocks.size() > 0) {
171  BuildPartition& build_partition = build_partitions_[i];
172  build_partition.num_last_block = NumLastBlock(i);
173  }
174  }
175 
176  for (int i = 0; i < build_partitions_.size(); ++i) {
177  if (build_partitions_[i].blocks.size() > 0) {
178  BuildPartition& build_partition = build_partitions_[i];
179  if (TotalTuples(build_partition) > tuples_per_partition_) {
180  if (Split(build_partition)) {
181  continue;
182  } else {
183  result = false;
184  }
185  }
186  results->push_back(ToOutputPartition(build_partition));
187  }
188  }
189  return result;
190  }
191 
192  private:
193  static const int L1_size = 24 * 1024;
194  static const int MIN_SPLITS = 4;
195  static const int HASH_BIT_SHIFT = 4;
203 
204  int size_;
209  vector<PointerValue> outputs_;
210  vector<int32_t> split_counts_;
211  vector<int> partitions_at_level_;
212 
213  struct BuildPartition {
214  vector<uint8_t*> blocks;
216  int level;
217  };
218  vector<BuildPartition> build_partitions_;
219  vector<BuildPartition> child_partitions_;
220 
221  uint8_t* Allocate(BuildPartition* partition, int p, int size = 0) {
222  if (size == 0) size = tuples_per_partition_ * size_;
223  else size = size * size_;
224  COUNTER_ADD(bytes_allocated_, size);
225  uint8_t* out_mem = pool_->Allocate(size);
226  partition->blocks.push_back(out_mem);
227  outputs_[p].Update(out_mem, 0);
228  return out_mem;
229  }
230 
232  return Partition(build.blocks, build.num_last_block);
233  }
234 
235  int TotalTuples(const BuildPartition& partition) const {
236  return partition.num_last_block + (partition.blocks.size() - 1) * tuples_per_partition_;
237  }
238 
239  int NumLastBlock(int p) const {
240  return outputs_[p].GetValue();
241  }
242 
243  int NumTuples(const BuildPartition& partition, int block_idx) const {
244  if (block_idx == partition.blocks.size() - 1) return partition.num_last_block;
245  return tuples_per_partition_;
246  }
247 
248  bool Split(const BuildPartition& build_partition);
249 };
250 
251 void DataPartitioner::AddData(int n, uint8_t* data) {
252  SCOPED_TIMER(add_time_);
253  COUNTER_ADD(bytes_copied_, n * size_);
254 
255  for (int i = 0; i < n; ++i) {
256  int32_t hash = *reinterpret_cast<int32_t*>(data + hash_offset_);
257  hash &= partition_idx_mask_;
258  uint8_t* out_mem = outputs_[hash].GetPointer<uint8_t*>();
259  uint16_t size = outputs_[hash].GetValue();
260  if (size == tuples_per_partition_) {
261  out_mem = Allocate(&build_partitions_[hash], hash);
262  size = 0;
263  }
264  Memcpy16(out_mem, data);
265  data += size_;
266  outputs_[hash].Update(out_mem + size_, size + 1);
267  }
268 }
269 
270 bool DataPartitioner::Split(const BuildPartition& build_partition) {
271  COUNTER_ADD(splits_, 1);
272  const int num_blocks = build_partition.blocks.size();
273  const int next_level = build_partition.level + 1;
274  if (next_level >= 8) {
275  CHECK(false) << "TOO MANY LEVELS: " << next_level;
276  return false;
277  }
278  int partitions = partitions_at_level_[next_level];
279  int partition_mask = partitions - 1;
280 
281  int new_splits = 0;
282  memset(&split_counts_[0], 0, sizeof(split_counts_[0]) * split_counts_.size());
283 
284  for (int i = 0; i < num_blocks; ++i) {
285  int tuples = NumTuples(build_partition, i);
286  uint8_t* hash_slot = build_partition.blocks[i] + hash_offset_;
287  for (int j = 0; j < tuples; ++j) {
288  int32_t hash = *reinterpret_cast<int32_t*>(hash_slot);
289  hash >>= HASH_BIT_SHIFT;
290  *reinterpret_cast<int32_t*>(hash_slot) = hash;
291  hash &= partition_mask;
292  new_splits += !split_counts_[hash];
293  split_counts_[hash]++;
294  hash_slot += size_;
295  }
296  }
297  if (new_splits < MIN_SPLITS) return false;
298 
299  for (int i = 0; i < partitions; ++i) {
300  if (split_counts_[i] > 0) {
301  COUNTER_ADD(bytes_allocated_, split_counts_[i] * size_);
302  uint8_t* mem = pool_->Allocate(split_counts_[i] * size_);
303  child_partitions_[i].blocks.clear();
304  child_partitions_[i].level = next_level;
305  child_partitions_[i].blocks.push_back(mem);
306  outputs_[i].ptr = mem;
307  }
308  }
309  COUNTER_ADD(bytes_copied_, TotalTuples(build_partition) * size_);
310 
311  for (int i = 0; i < num_blocks; ++i) {
312  uint8_t* data = build_partition.blocks[i];
313  int tuples = NumTuples(build_partition, i);
314 
315  const int offset1 = 0;
316  const int offset2 = offset1 + size_;
317  const int offset3 = offset2 + size_;
318  const int offset4 = offset3 + size_;
319 
320  while (tuples >= 4) {
321  int32_t hash1 = *reinterpret_cast<int32_t*>(data + hash_offset_ + offset1) & partition_mask;
322  int32_t hash2 = *reinterpret_cast<int32_t*>(data + hash_offset_ + offset2) & partition_mask;
323  int32_t hash3 = *reinterpret_cast<int32_t*>(data + hash_offset_ + offset3) & partition_mask;
324  int32_t hash4 = *reinterpret_cast<int32_t*>(data + hash_offset_ + offset4) & partition_mask;
325 
326  uint8_t** out_mem1 = &outputs_[hash1].ptr;
327  uint8_t** out_mem2 = &outputs_[hash2].ptr;
328  uint8_t** out_mem3 = &outputs_[hash3].ptr;
329  uint8_t** out_mem4 = &outputs_[hash4].ptr;
330 
331  Memcpy16(*out_mem1, data + offset1);
332  *out_mem1 += size_;
333  Memcpy16(*out_mem2, data + offset2);
334  *out_mem2 += size_;
335  Memcpy16(*out_mem3, data + offset3);
336  *out_mem3 += size_;
337  Memcpy16(*out_mem4, data + offset4);
338  *out_mem4 += size_;
339 
340  data += size_ * 4;
341  tuples -= 4;
342  }
343 
344  for (int j = 0; j < tuples; ++j) {
345  int32_t hash = *reinterpret_cast<int32_t*>(data + hash_offset_) & partition_mask;
346  uint8_t* out_mem = outputs_[hash].ptr;
347  Memcpy16(out_mem, data);
348  data += size_;
349  outputs_[hash].ptr = out_mem + size_;
350  }
351  }
352 
353  for (int i = 0; i < partitions; ++i) {
354  if (split_counts_[i] > 0) {
355  BuildPartition& child = child_partitions_[i];
356  child.num_last_block = split_counts_[i];
357  build_partitions_.push_back(child);
358  }
359  }
360 
361  return true;
362 }
363 
364 
365 int main(int argc, char **argv) {
366  google::InitGoogleLogging(argv[0]);
367 
368  LOG(ERROR) << "Starting Test";
369 
370  vector<DataProvider::ColDesc> cols;
371  // Hash Slot
372  cols.push_back(DataProvider::ColDesc::Create<int32_t>(0, 0));
373  // Grouping Column
374  cols.push_back(DataProvider::ColDesc::Create<int64_t>(0, 500000));
375  // Aggregate Column
376  cols.push_back(DataProvider::ColDesc::Create<float>(-1, 1));
377 
379  MemPool pool(&tracker);
381  RuntimeProfile profile(&obj_pool, "PartitioningTest");
382 
383  DataProvider provider(&pool, &profile);
384  provider.Reset(50*1024*1024, 1024, cols);
385  //provider.Reset(100*1024, 1024, cols);
386  //provider.Reset(100, 1024, cols);
387 
388  DataPartitioner partitioner(&pool, &profile, provider.row_size(), 0);
389 
390  cout << "Begin processing: " << provider.total_rows() << endl;
391  int rows;
392  void* data;
393  while ( (data = provider.NextBatch(&rows)) != NULL) {
394  uint8_t* next_row = reinterpret_cast<uint8_t*>(data);
395  // Compute hash values and store them
396  for (int i = 0; i < rows; ++i) {
397  int32_t* hash = reinterpret_cast<int32_t*>(next_row);
398  int64_t* col = reinterpret_cast<int64_t*>(next_row + 4);
399  *hash = *col;
400  next_row += provider.row_size();
401  }
402  partitioner.AddData(rows, reinterpret_cast<uint8_t*>(data));
403  }
404  cout << endl;
405 
406  LOG(ERROR) << "Partitioning";
407 
408  vector<DataPartitioner::Partition> partitions;
409  bool fully_split = partitioner.Finalize(&partitions);
410 
411  cout << endl << "After Partitioning" << endl;
412  int result_tuples = 0;
413  int blocks = 0;
414  int largest_partition = 0;
415  // Validate results
416  for (int i = 0; i < partitions.size(); ++i) {
417  DataPartitioner::Partition& partition = partitions[i];
418  int tuples = partitioner.NumTuples(partition);
419  if (largest_partition < tuples) largest_partition = tuples;
420  result_tuples += tuples;
421  blocks += partition.blocks.size();
422 #if PRINT_RESULT
423  for (int j = 0; j < partition.blocks.size(); ++j) {
424  int rows = partitioner.NumTuples(partition, j);
425  provider.Print(&cout, partition.blocks[j], rows);
426  cout << "------------------------------" << endl;
427  }
428 #endif
429  }
430  cout << endl;
431  cout << "Fully Split: " << (fully_split ? "yes" : "no") << endl;
432  cout << "Partitioned tuples: " << result_tuples << endl;
433  cout << "Num Blocks: " << blocks << endl;
434  cout << "Num Partitions: " << partitions.size() << endl;
435  cout << "Largest Partition: " << largest_partition << endl;;
436 
437  cout << endl;
438  profile.PrettyPrint(&cout);
439 
440  LOG(ERROR) << "Done.";
441  return 0;
442 }
void UpdatePointer(void *p)
int total_rows() const
The total number of rows that will be generated.
RuntimeProfile::Counter * add_time_
void Reset(int num_rows, int batch_size, const std::vector< ColDesc > &columns)
int NumTuples(const BuildPartition &partition, int block_idx) const
vector< BuildPartition > child_partitions_
MemTracker tracker
RuntimeProfile::Counter * bytes_allocated_
int NumLastBlock(int p) const
ProbeTuple tuples[BUFFER_TUPLES]
bool Split(const BuildPartition &build_partition)
void Update(void *p, int16_t v)
const StringSearch UrlParser::hash_search & hash
Definition: url-parser.cc:41
Partition(const vector< uint8_t * > &blocks, int num)
void * NextBatch(int *rows_returned)
int size_per_block() const
void UpdateValue(uint16_t v)
Partition ToOutputPartition(const BuildPartition &build) const
#define UNUSED_BITS
RuntimeProfile::Counter * splits_
T NextPowerOfTwo(T k)
vector< int > partitions_at_level_
#define COUNTER_ADD(c, v)
#define SCOPED_TIMER(c)
RuntimeProfile::Counter * bytes_copied_
See data-provider-test.cc on how to use this.
Definition: data-provider.h:33
int TotalTuples(const BuildPartition &partition) const
ObjectPool * obj_pool()
Returns a local object pool.
Definition: coordinator.h:263
uint16_t GetValue() const
DataPartitioner(MemPool *pool, RuntimeProfile *profile, int size, int hash_offset)
int NumTuples(const Partition &partition) const
vector< int32_t > split_counts_
ObjectPool pool
#define ADD_COUNTER(profile, name, unit)
RuntimeProfile::Counter * split_time_
This class is thread-safe.
Definition: mem-tracker.h:61
#define USED_BITS
int main(int argc, char **argv)
#define POINTER_LOWER_BITS_MASK
uint8_t * Allocate(BuildPartition *partition, int p, int size=0)
bool Finalize(vector< Partition > *results)
void Memcpy16(uint8_t *dst, uint8_t *src)
vector< PointerValue > outputs_
void AddData(int n, uint8_t *data)
void Get(T &t, uint16_t &v)
void PrettyPrint(std::ostream *s, const std::string &prefix="") const
int row_size() const
The size of a row (tuple size)
RuntimeProfile * profile_
void Print(std::ostream *, char *data, int num_rows) const
Print the row data in csv format.
vector< BuildPartition > build_partitions_
int NumTuples(const Partition &partition, int block_idx) const