Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hyperloglog-uda.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <assert.h>
16 #include <math.h>
17 #include <algorithm>
18 #include <sstream>
19 #include <iostream>
20 #include "udf/udf.h"
21 
22 #include "common/names.h"
23 
24 using std::max;
25 using namespace impala_udf;
26 
27 // This sample UDA implements the hyperloglog distinct estimate aggregate
28 // function.
29 // See these papers for more details.
30 // 1) Hyperloglog: The analysis of a near-optimal cardinality estimation algorithm (2007)
31 // 2) HyperLogLog in Practice
32 
33 // Precision taken from the paper. Doesn't seem to matter very much when between [6,12]
34 const int HLL_PRECISION = 10;
35 
36 void HllInit(FunctionContext* ctx, StringVal* dst) {
37  int str_len = pow(2, HLL_PRECISION);
38  dst->is_null = false;
39  dst->ptr = ctx->Allocate(str_len);
40  dst->len = str_len;
41  memset(dst->ptr, 0, str_len);
42 }
43 
44 static const uint64_t FNV64_PRIME = 1099511628211UL;
45 static const uint64_t FNV64_SEED = 14695981039346656037UL;
46 
47 static uint64_t FnvHash(const void* data, int32_t bytes, uint64_t hash) {
48  const uint8_t* ptr = reinterpret_cast<const uint8_t*>(data);
49  while (bytes--) {
50  hash = (*ptr ^ hash) * FNV64_PRIME;
51  ++ptr;
52  }
53  return hash;
54 }
55 
56 static uint64_t Hash(const IntVal& v) {
57  return FnvHash(&v.val, sizeof(int32_t), FNV64_SEED);
58 }
59 
60 void HllUpdate(FunctionContext* ctx, const IntVal& src, StringVal* dst) {
61  if (src.is_null) return;
62  assert(dst != NULL);
63  assert(!dst->is_null);
64  assert(dst->len == pow(2, HLL_PRECISION));
65  uint64_t hash_value = Hash(src);
66  if (hash_value != 0) {
67  // Use the lower bits to index into the number of streams and then
68  // find the first 1 bit after the index bits.
69  int idx = hash_value % dst->len;
70  uint8_t first_one_bit = __builtin_ctzl(hash_value >> HLL_PRECISION) + 1;
71  dst->ptr[idx] = ::max(dst->ptr[idx], first_one_bit);
72  }
73 }
74 
75 void HllMerge(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
76  assert(dst != NULL);
77  assert(!dst->is_null);
78  assert(!src.is_null);
79  assert(dst->len == pow(2, HLL_PRECISION));
80  assert(src.len == pow(2, HLL_PRECISION));
81  for (int i = 0; i < src.len; ++i) {
82  dst->ptr[i] = ::max(dst->ptr[i], src.ptr[i]);
83  }
84 }
85 
87  if (src.is_null) return src;
88  StringVal result(ctx, src.len);
89  memcpy(result.ptr, src.ptr, src.len);
90  ctx->Free(src.ptr);
91  return result;
92 }
93 
95  assert(!src.is_null);
96  assert(src.len == pow(2, HLL_PRECISION));
97 
98  const int num_streams = pow(2, HLL_PRECISION);
99  // Empirical constants for the algorithm.
100  float alpha = 0;
101  if (num_streams == 16) {
102  alpha = 0.673f;
103  } else if (num_streams == 32) {
104  alpha = 0.697f;
105  } else if (num_streams == 64) {
106  alpha = 0.709f;
107  } else {
108  alpha = 0.7213f / (1 + 1.079f / num_streams);
109  }
110 
111  float harmonic_mean = 0;
112  int num_zero_registers = 0;
113  for (int i = 0; i < src.len; ++i) {
114  harmonic_mean += powf(2.0f, -src.ptr[i]);
115  if (src.ptr[i] == 0) ++num_zero_registers;
116  }
117  harmonic_mean = 1.0f / harmonic_mean;
118  int64_t estimate = alpha * num_streams * num_streams * harmonic_mean;
119 
120  if (num_zero_registers != 0) {
121  // Estimated cardinality is too low. Hll is too inaccurate here, instead use
122  // linear counting.
123  estimate = num_streams * log(static_cast<float>(num_streams) / num_zero_registers);
124  }
125 
126  // Output the estimate as ascii string
127  stringstream out;
128  out << estimate;
129  string out_str = out.str();
130  StringVal result_str(ctx, out_str.size());
131  memcpy(result_str.ptr, out_str.c_str(), result_str.len);
132  ctx->Free(src.ptr);
133  return result_str;
134 }
void HllMerge(FunctionContext *ctx, const StringVal &src, StringVal *dst)
StringVal HllFinalize(FunctionContext *ctx, const StringVal &src)
const StringSearch UrlParser::hash_search & hash
Definition: url-parser.cc:41
int32_t val
Definition: udf.h:421
const int HLL_PRECISION
uint8_t * ptr
Definition: udf.h:523
std::size_t hash_value(const Decimal4Value &v)
This function must be called 'hash_value' to be picked up by boost.
bool is_null
Definition: udf.h:359
static uint64_t FnvHash(const void *data, int32_t bytes, uint64_t hash)
void HllInit(FunctionContext *ctx, StringVal *dst)
void HllUpdate(FunctionContext *ctx, const IntVal &src, StringVal *dst)
void Free(uint8_t *buffer)
Frees a buffer returned from Allocate() or Reallocate()
Definition: udf.cc:291
static uint64_t Hash(const IntVal &v)
static const uint64_t FNV64_PRIME
uint8_t * Allocate(int byte_size)
Definition: udf.cc:262
StringVal HllSerialize(FunctionContext *ctx, const StringVal &src)
static const uint64_t FNV64_SEED