29 #define PRINT_RESULT 0
31 #define UNUSED_BITS 16
33 #define UNUSED_UPPER_BITS_MASK 0xFFFF000000000000
34 #define POINTER_LOWER_BITS_MASK 0x0000FFFFFFFFFFFF
38 using namespace impala;
40 inline void Memcpy16(uint8_t* dst, uint8_t* src) {
41 reinterpret_cast<uint64_t*
>(dst)[0] = reinterpret_cast<uint64_t*>(src)[0];
42 reinterpret_cast<uint64_t*
>(dst)[1] = reinterpret_cast<uint64_t*>(src)[1];
57 return static_cast<uint16_t
>(val >>
USED_BITS);
61 void Get(T& t, uint16_t& v) {
63 v =
static_cast<uint16_t
>(val >>
USED_BITS);
69 val = val |
reinterpret_cast<uint64_t>(p);
87 for (
int i = 1; i < sizeof(T) * CHAR_BIT; i <<=1) k = k | k >> i;
97 hash_offset_ = hash_offset;
99 bytes_allocated_ =
ADD_COUNTER(profile,
"BytesAllocated", TUnit::BYTES);
100 bytes_copied_ =
ADD_COUNTER(profile,
"BytesCopied", TUnit::BYTES);
101 add_time_ =
ADD_COUNTER(profile,
"AddTime", TUnit::CPU_TICKS);
102 split_time_ =
ADD_COUNTER(profile,
"SplitTime", TUnit::CPU_TICKS);
103 splits_ =
ADD_COUNTER(profile,
"Splits", TUnit::UNIT);
106 int min_size_per_partition = max(size, 64);
107 partitions_per_level_ =
NextPowerOfTwo(L1_size / min_size_per_partition) >> 1;
108 tuples_per_partition_ = L1_size / size;
110 printf(
"Tuple Size: %d\n", size_);
111 printf(
"Partitions: %d Per Partition: %d\n", partitions_per_level_, tuples_per_partition_);
113 partition_idx_mask_ = partitions_per_level_ - 1;
114 build_partitions_.resize(partitions_per_level_);
115 child_partitions_.resize(partitions_per_level_);
117 for (
int i = 0; i < partitions_per_level_; ++i) {
118 build_partitions_[i].level = 0;
121 partitions_at_level_.resize(6);
122 partitions_at_level_[0] = partitions_per_level_;
123 partitions_at_level_[1] = partitions_per_level_;
124 partitions_at_level_[2] = partitions_per_level_;
125 partitions_at_level_[3] = partitions_per_level_;
126 partitions_at_level_[4] = partitions_per_level_;
127 partitions_at_level_[5] = partitions_per_level_;
129 split_counts_.resize(partitions_per_level_);
131 outputs_.resize(partitions_per_level_);
132 for (
int i = 0; i < outputs_.size(); ++i) {
133 outputs_[i].Update(NULL, tuples_per_partition_);
137 void AddData(
int n, uint8_t* data);
140 return tuples_per_partition_;
152 this->blocks = blocks;
153 this->num_last_block = num;
163 return tuples_per_partition_;
169 for (
int i = 0; i < build_partitions_.size(); ++i) {
170 if (build_partitions_[i].blocks.size() > 0) {
176 for (
int i = 0; i < build_partitions_.size(); ++i) {
177 if (build_partitions_[i].blocks.size() > 0) {
179 if (TotalTuples(build_partition) > tuples_per_partition_) {
180 if (Split(build_partition)) {
186 results->push_back(ToOutputPartition(build_partition));
193 static const int L1_size = 24 * 1024;
194 static const int MIN_SPLITS = 4;
195 static const int HASH_BIT_SHIFT = 4;
222 if (size == 0) size = tuples_per_partition_ * size_;
223 else size = size * size_;
225 uint8_t* out_mem = pool_->Allocate(size);
226 partition->
blocks.push_back(out_mem);
227 outputs_[p].Update(out_mem, 0);
240 return outputs_[p].GetValue();
245 return tuples_per_partition_;
248 bool Split(
const BuildPartition& build_partition);
255 for (
int i = 0; i < n; ++i) {
256 int32_t
hash = *
reinterpret_cast<int32_t*
>(data + hash_offset_);
257 hash &= partition_idx_mask_;
258 uint8_t* out_mem = outputs_[
hash].GetPointer<uint8_t*>();
259 uint16_t size = outputs_[
hash].GetValue();
260 if (size == tuples_per_partition_) {
261 out_mem = Allocate(&build_partitions_[hash], hash);
266 outputs_[
hash].Update(out_mem + size_, size + 1);
272 const int num_blocks = build_partition.
blocks.size();
273 const int next_level = build_partition.
level + 1;
274 if (next_level >= 8) {
275 CHECK(
false) <<
"TOO MANY LEVELS: " << next_level;
278 int partitions = partitions_at_level_[next_level];
279 int partition_mask = partitions - 1;
282 memset(&split_counts_[0], 0,
sizeof(split_counts_[0]) * split_counts_.size());
284 for (
int i = 0; i < num_blocks; ++i) {
285 int tuples = NumTuples(build_partition, i);
286 uint8_t* hash_slot = build_partition.
blocks[i] + hash_offset_;
287 for (
int j = 0; j <
tuples; ++j) {
288 int32_t
hash = *
reinterpret_cast<int32_t*
>(hash_slot);
289 hash >>= HASH_BIT_SHIFT;
290 *
reinterpret_cast<int32_t*
>(hash_slot) = hash;
291 hash &= partition_mask;
292 new_splits += !split_counts_[
hash];
293 split_counts_[
hash]++;
297 if (new_splits < MIN_SPLITS)
return false;
299 for (
int i = 0; i < partitions; ++i) {
300 if (split_counts_[i] > 0) {
301 COUNTER_ADD(bytes_allocated_, split_counts_[i] * size_);
302 uint8_t* mem = pool_->Allocate(split_counts_[i] * size_);
303 child_partitions_[i].blocks.clear();
304 child_partitions_[i].level = next_level;
305 child_partitions_[i].blocks.push_back(mem);
306 outputs_[i].ptr = mem;
309 COUNTER_ADD(bytes_copied_, TotalTuples(build_partition) * size_);
311 for (
int i = 0; i < num_blocks; ++i) {
312 uint8_t* data = build_partition.
blocks[i];
313 int tuples = NumTuples(build_partition, i);
315 const int offset1 = 0;
316 const int offset2 = offset1 + size_;
317 const int offset3 = offset2 + size_;
318 const int offset4 = offset3 + size_;
320 while (tuples >= 4) {
321 int32_t hash1 = *
reinterpret_cast<int32_t*
>(data + hash_offset_ + offset1) & partition_mask;
322 int32_t hash2 = *
reinterpret_cast<int32_t*
>(data + hash_offset_ + offset2) & partition_mask;
323 int32_t hash3 = *
reinterpret_cast<int32_t*
>(data + hash_offset_ + offset3) & partition_mask;
324 int32_t hash4 = *
reinterpret_cast<int32_t*
>(data + hash_offset_ + offset4) & partition_mask;
326 uint8_t** out_mem1 = &outputs_[hash1].ptr;
327 uint8_t** out_mem2 = &outputs_[hash2].ptr;
328 uint8_t** out_mem3 = &outputs_[hash3].ptr;
329 uint8_t** out_mem4 = &outputs_[hash4].ptr;
331 Memcpy16(*out_mem1, data + offset1);
333 Memcpy16(*out_mem2, data + offset2);
335 Memcpy16(*out_mem3, data + offset3);
337 Memcpy16(*out_mem4, data + offset4);
344 for (
int j = 0; j <
tuples; ++j) {
345 int32_t
hash = *
reinterpret_cast<int32_t*
>(data + hash_offset_) & partition_mask;
346 uint8_t* out_mem = outputs_[
hash].ptr;
349 outputs_[
hash].ptr = out_mem + size_;
353 for (
int i = 0; i < partitions; ++i) {
354 if (split_counts_[i] > 0) {
357 build_partitions_.push_back(child);
365 int main(
int argc,
char **argv) {
366 google::InitGoogleLogging(argv[0]);
368 LOG(ERROR) <<
"Starting Test";
370 vector<DataProvider::ColDesc> cols;
372 cols.push_back(DataProvider::ColDesc::Create<int32_t>(0, 0));
374 cols.push_back(DataProvider::ColDesc::Create<int64_t>(0, 500000));
376 cols.push_back(DataProvider::ColDesc::Create<float>(-1, 1));
384 provider.
Reset(50*1024*1024, 1024, cols);
390 cout <<
"Begin processing: " << provider.
total_rows() << endl;
393 while ( (data = provider.
NextBatch(&rows)) != NULL) {
394 uint8_t* next_row =
reinterpret_cast<uint8_t*
>(data);
396 for (
int i = 0; i < rows; ++i) {
397 int32_t*
hash =
reinterpret_cast<int32_t*
>(next_row);
398 int64_t* col =
reinterpret_cast<int64_t*
>(next_row + 4);
402 partitioner.AddData(rows, reinterpret_cast<uint8_t*>(data));
406 LOG(ERROR) <<
"Partitioning";
408 vector<DataPartitioner::Partition> partitions;
409 bool fully_split = partitioner.Finalize(&partitions);
411 cout << endl <<
"After Partitioning" << endl;
412 int result_tuples = 0;
414 int largest_partition = 0;
416 for (
int i = 0; i < partitions.size(); ++i) {
418 int tuples = partitioner.NumTuples(partition);
419 if (largest_partition < tuples) largest_partition =
tuples;
421 blocks += partition.
blocks.size();
423 for (
int j = 0; j < partition.
blocks.size(); ++j) {
424 int rows = partitioner.NumTuples(partition, j);
426 cout <<
"------------------------------" << endl;
431 cout <<
"Fully Split: " << (fully_split ?
"yes" :
"no") << endl;
432 cout <<
"Partitioned tuples: " << result_tuples << endl;
433 cout <<
"Num Blocks: " << blocks << endl;
434 cout <<
"Num Partitions: " << partitions.size() << endl;
435 cout <<
"Largest Partition: " << largest_partition << endl;;
440 LOG(ERROR) <<
"Done.";
void UpdatePointer(void *p)
int total_rows() const
The total number of rows that will be generated.
RuntimeProfile::Counter * add_time_
void Reset(int num_rows, int batch_size, const std::vector< ColDesc > &columns)
int NumTuples(const BuildPartition &partition, int block_idx) const
vector< BuildPartition > child_partitions_
RuntimeProfile::Counter * bytes_allocated_
int NumLastBlock(int p) const
ProbeTuple tuples[BUFFER_TUPLES]
vector< uint8_t * > blocks
bool Split(const BuildPartition &build_partition)
void Update(void *p, int16_t v)
const StringSearch UrlParser::hash_search & hash
vector< uint8_t * > blocks
Partition(const vector< uint8_t * > &blocks, int num)
void * NextBatch(int *rows_returned)
int size_per_block() const
void UpdateValue(uint16_t v)
Partition ToOutputPartition(const BuildPartition &build) const
RuntimeProfile::Counter * splits_
vector< int > partitions_at_level_
#define COUNTER_ADD(c, v)
RuntimeProfile::Counter * bytes_copied_
See data-provider-test.cc on how to use this.
int TotalTuples(const BuildPartition &partition) const
ObjectPool * obj_pool()
Returns a local object pool.
uint16_t GetValue() const
int tuples_per_partition_
DataPartitioner(MemPool *pool, RuntimeProfile *profile, int size, int hash_offset)
int NumTuples(const Partition &partition) const
vector< int32_t > split_counts_
#define ADD_COUNTER(profile, name, unit)
RuntimeProfile::Counter * split_time_
int partitions_per_level_
This class is thread-safe.
int main(int argc, char **argv)
#define POINTER_LOWER_BITS_MASK
uint8_t * Allocate(BuildPartition *partition, int p, int size=0)
bool Finalize(vector< Partition > *results)
void Memcpy16(uint8_t *dst, uint8_t *src)
vector< PointerValue > outputs_
void AddData(int n, uint8_t *data)
void Get(T &t, uint16_t &v)
void PrettyPrint(std::ostream *s, const std::string &prefix="") const
int row_size() const
The size of a row (tuple size)
RuntimeProfile * profile_
void Print(std::ostream *, char *data, int num_rows) const
Print the row data in csv format.
vector< BuildPartition > build_partitions_
int NumTuples(const Partition &partition, int block_idx) const