Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
aggregate-functions.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <math.h>
18 #include <sstream>
19 #include <algorithm>
20 
21 #include <boost/random/ranlux.hpp>
22 #include <boost/random/uniform_int.hpp>
23 
24 #include "common/logging.h"
25 #include "runtime/decimal-value.h"
26 #include "runtime/string-value.h"
28 #include "exprs/anyval-util.h"
29 
30 
31 #include "common/names.h"
32 
33 using boost::uniform_int;
34 using boost::ranlux64_3;
35 using std::push_heap;
36 using std::pop_heap;
37 
38 // TODO: this file should be cross compiled and then all of the builtin
39 // aggregate functions will have a codegen enabled path. Then we can remove
40 // the custom code in aggregation node.
41 namespace impala {
42 
43 // Converts any UDF Val Type to a string representation
44 template <typename T>
46  stringstream ss;
47  ss << val;
48  const string &str = ss.str();
49  StringVal string_val(context, str.size());
50  memcpy(string_val.ptr, str.c_str(), str.size());
51  return string_val;
52 }
53 
54 // Delimiter to use if the separator is NULL.
55 static const StringVal DEFAULT_STRING_CONCAT_DELIM((uint8_t*)", ", 2);
56 
57 // Hyperloglog precision. Default taken from paper. Doesn't seem to matter very
58 // much when between [6,12]
60 const int AggregateFunctions::HLL_LEN = 1024; // 2^HLL_PRECISION
61 
63  dst->is_null = true;
64 }
65 
66 template<typename T>
68  dst->is_null = false;
69  dst->val = 0;
70 }
71 
72 template<>
74  dst->is_null = false;
75  dst->val16 = 0;
76 }
77 
79  FunctionContext* ctx, const StringVal& src) {
80  if (src.is_null) return src;
81  StringVal result(ctx, src.len);
82  memcpy(result.ptr, src.ptr, src.len);
83  return result;
84 }
85 
87  FunctionContext* ctx, const StringVal& src) {
88  StringVal result = StringValGetValue(ctx, src);
89  if (!src.is_null) ctx->Free(src.ptr);
90  return result;
91 }
92 
94  FunctionContext*, const AnyVal& src, BigIntVal* dst) {
95  DCHECK(!dst->is_null);
96  if (!src.is_null) ++dst->val;
97 }
98 
100  DCHECK(!dst->is_null);
101  ++dst->val;
102 }
103 
105  FunctionContext*, const AnyVal& src, BigIntVal* dst) {
106  DCHECK(!dst->is_null);
107  if (!src.is_null) {
108  --dst->val;
109  DCHECK_GE(dst->val, 0);
110  }
111 }
112 
114  DCHECK(!dst->is_null);
115  --dst->val;
116  DCHECK_GE(dst->val, 0);
117 }
118 
120  BigIntVal* dst) {
121  DCHECK(!dst->is_null);
122  DCHECK(!src.is_null);
123  dst->val += src.val;
124 }
125 
126 struct AvgState {
127  double sum;
128  int64_t count;
129 };
130 
132  dst->is_null = false;
133  dst->len = sizeof(AvgState);
134  dst->ptr = ctx->Allocate(dst->len);
135  memset(dst->ptr, 0, sizeof(AvgState));
136 }
137 
138 template <typename T>
140  if (src.is_null) return;
141  DCHECK(dst->ptr != NULL);
142  DCHECK_EQ(sizeof(AvgState), dst->len);
143  AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr);
144  avg->sum += src.val;
145  ++avg->count;
146 }
147 
148 template <typename T>
150  // Remove doesn't need to explicitly check the number of calls to Update() or Remove()
151  // because Finalize() returns NULL if count is 0.
152  if (src.is_null) return;
153  DCHECK(dst->ptr != NULL);
154  DCHECK_EQ(sizeof(AvgState), dst->len);
155  AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr);
156  avg->sum -= src.val;
157  --avg->count;
158  DCHECK_GE(avg->count, 0);
159 }
160 
162  StringVal* dst) {
163  const AvgState* src_struct = reinterpret_cast<const AvgState*>(src.ptr);
164  DCHECK(dst->ptr != NULL);
165  DCHECK_EQ(sizeof(AvgState), dst->len);
166  AvgState* dst_struct = reinterpret_cast<AvgState*>(dst->ptr);
167  dst_struct->sum += src_struct->sum;
168  dst_struct->count += src_struct->count;
169 }
170 
172  AvgState* val_struct = reinterpret_cast<AvgState*>(src.ptr);
173  if (val_struct->count == 0) return DoubleVal::null();
174  return DoubleVal(val_struct->sum / val_struct->count);
175 }
176 
178  DoubleVal result = AvgGetValue(ctx, src);
179  ctx->Free(src.ptr);
180  return result;
181 }
182 
184  const TimestampVal& src, StringVal* dst) {
185  if (src.is_null) return;
186  DCHECK(dst->ptr != NULL);
187  DCHECK_EQ(sizeof(AvgState), dst->len);
188  AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr);
190  avg->sum += val;
191  ++avg->count;
192 }
193 
195  const TimestampVal& src, StringVal* dst) {
196  if (src.is_null) return;
197  DCHECK(dst->ptr != NULL);
198  DCHECK_EQ(sizeof(AvgState), dst->len);
199  AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr);
201  avg->sum -= val;
202  --avg->count;
203  DCHECK_GE(avg->count, 0);
204 }
205 
207  const StringVal& src) {
208  AvgState* val_struct = reinterpret_cast<AvgState*>(src.ptr);
209  if (val_struct->count == 0) return TimestampVal::null();
210  TimestampValue tv(val_struct->sum / val_struct->count);
211  TimestampVal result;
212  tv.ToTimestampVal(&result);
213  return result;
214 }
215 
217  const StringVal& src) {
218  TimestampVal result = TimestampAvgGetValue(ctx, src);
219  ctx->Free(src.ptr);
220  return result;
221 }
222 
224  DecimalVal sum; // only using val16
225  int64_t count;
226 };
227 
229  dst->is_null = false;
230  dst->len = sizeof(DecimalAvgState);
231  dst->ptr = ctx->Allocate(dst->len);
232  memset(dst->ptr, 0, sizeof(DecimalAvgState));
233 }
234 
236  StringVal* dst) {
237  DecimalAvgAddOrRemove(ctx, src, dst, false);
238 }
239 
241  StringVal* dst) {
242  DecimalAvgAddOrRemove(ctx, src, dst, true);
243 }
244 
246  const DecimalVal& src, StringVal* dst, bool remove) {
247  if (src.is_null) return;
248  DCHECK(dst->ptr != NULL);
249  DCHECK_EQ(sizeof(DecimalAvgState), dst->len);
250  DecimalAvgState* avg = reinterpret_cast<DecimalAvgState*>(dst->ptr);
251  const FunctionContext::TypeDesc* arg_desc = ctx->GetArgType(0);
252  DCHECK(arg_desc != NULL);
253  const ColumnType& arg_type = AnyValUtil::TypeDescToColumnType(*arg_desc);
254 
255  // Since the src and dst are guaranteed to be the same scale, we can just
256  // do a simple add.
257  int m = remove ? -1 : 1;
258  switch (arg_type.GetByteSize()) {
259  case 4:
260  avg->sum.val16 += m * src.val4;
261  break;
262  case 8:
263  avg->sum.val16 += m * src.val8;
264  break;
265  case 16:
266  avg->sum.val16 += m * src.val16;
267  break;
268  default:
269  DCHECK(false) << "Invalid byte size for type " << arg_type.DebugString();
270  }
271  if (remove) {
272  --avg->count;
273  DCHECK_GE(avg->count, 0);
274  } else {
275  ++avg->count;
276  }
277 }
278 
280  const StringVal& src, StringVal* dst) {
281  const DecimalAvgState* src_struct =
282  reinterpret_cast<const DecimalAvgState*>(src.ptr);
283  DCHECK(dst->ptr != NULL);
284  DCHECK_EQ(sizeof(DecimalAvgState), dst->len);
285  DecimalAvgState* dst_struct = reinterpret_cast<DecimalAvgState*>(dst->ptr);
286  dst_struct->sum.val16 += src_struct->sum.val16;
287  dst_struct->count += src_struct->count;
288 }
289 
291  const StringVal& src) {
292  DecimalAvgState* val_struct = reinterpret_cast<DecimalAvgState*>(src.ptr);
293  if (val_struct->count == 0) return DecimalVal::null();
294  const FunctionContext::TypeDesc& output_desc = ctx->GetReturnType();
295  DCHECK_EQ(FunctionContext::TYPE_DECIMAL, output_desc.type);
296  Decimal16Value sum(val_struct->sum.val16);
297  Decimal16Value count(val_struct->count);
298  // The scale of the accumulated sum must be the same as the scale of the return type.
299  // TODO: Investigate whether this is always the right thing to do. Does the current
300  // implementation result in an unacceptable loss of output precision?
301  ColumnType sum_type = ColumnType::CreateDecimalType(38, output_desc.scale);
302  ColumnType count_type = ColumnType::CreateDecimalType(38, 0);
303  bool is_nan = false;
304  bool overflow = false;
305  Decimal16Value result = sum.Divide<int128_t>(sum_type, count, count_type,
306  output_desc.scale, &is_nan, &overflow);
307  if (UNLIKELY(is_nan)) return DecimalVal::null();
308  if (UNLIKELY(overflow)) {
309  ctx->AddWarning("Avg computation overflowed, returning NULL");
310  return DecimalVal::null();
311  }
312  return DecimalVal(result.value());
313 }
314 
316  const StringVal& src) {
317  DecimalVal result = DecimalAvgGetValue(ctx, src);
318  ctx->Free(src.ptr);
319  return result;
320 }
321 
322 template<typename SRC_VAL, typename DST_VAL>
323 void AggregateFunctions::SumUpdate(FunctionContext* ctx, const SRC_VAL& src,
324  DST_VAL* dst) {
325  if (src.is_null) {
326  // Do not count null values towards the number of updates
327  ctx->impl()->IncrementNumUpdates(-1);
328  return;
329  }
330  if (dst->is_null) InitZero<DST_VAL>(ctx, dst);
331  dst->val += src.val;
332 }
333 
334 template<typename SRC_VAL, typename DST_VAL>
335 void AggregateFunctions::SumRemove(FunctionContext* ctx, const SRC_VAL& src,
336  DST_VAL* dst) {
337  // Do not count null values towards the number of removes
338  if (src.is_null) ctx->impl()->IncrementNumRemoves(-1);
339  if (ctx->impl()->num_removes() >= ctx->impl()->num_updates()) {
340  *dst = DST_VAL::null();
341  return;
342  }
343  if (src.is_null) return;
344  if (dst->is_null) InitZero<DST_VAL>(ctx, dst);
345  dst->val -= src.val;
346 }
347 
349  const DecimalVal& src, DecimalVal* dst) {
350  SumDecimalAddOrSubtract(ctx, src, dst);
351 }
352 
354  const DecimalVal& src, DecimalVal* dst) {
355  if (ctx->impl()->num_removes() >= ctx->impl()->num_updates()) {
356  *dst = DecimalVal::null();
357  return;
358  }
359  SumDecimalAddOrSubtract(ctx, src, dst, true);
360 }
361 
363  const DecimalVal& src, DecimalVal* dst, bool subtract) {
364  if (src.is_null) return;
365  if (dst->is_null) InitZero<DecimalVal>(ctx, dst);
366  const FunctionContext::TypeDesc* arg_desc = ctx->GetArgType(0);
367  // Since the src and dst are guaranteed to be the same scale, we can just
368  // do a simple add.
369  int m = subtract ? -1 : 1;
370  if (arg_desc->precision <= 9) {
371  dst->val16 += m * src.val4;
372  } else if (arg_desc->precision <= 19) {
373  dst->val16 += m * src.val8;
374  } else {
375  dst->val16 += m * src.val16;
376  }
377 }
378 
380  const DecimalVal& src, DecimalVal* dst) {
381  if (src.is_null) return;
382  if (dst->is_null) InitZero<DecimalVal>(ctx, dst);
383  dst->val16 += src.val16;
384 }
385 
386 template<typename T>
387 void AggregateFunctions::Min(FunctionContext*, const T& src, T* dst) {
388  if (src.is_null) return;
389  if (dst->is_null || src.val < dst->val) *dst = src;
390 }
391 
392 template<typename T>
393 void AggregateFunctions::Max(FunctionContext*, const T& src, T* dst) {
394  if (src.is_null) return;
395  if (dst->is_null || src.val > dst->val) *dst = src;
396 }
397 
399  dst->is_null = true;
400  dst->ptr = NULL;
401  dst->len = 0;
402 }
403 
404 template<>
406  if (src.is_null) return;
407  if (dst->is_null ||
409  if (!dst->is_null) ctx->Free(dst->ptr);
410  uint8_t* copy = ctx->Allocate(src.len);
411  memcpy(copy, src.ptr, src.len);
412  *dst = StringVal(copy, src.len);
413  }
414 }
415 
416 template<>
418  if (src.is_null) return;
419  if (dst->is_null ||
421  if (!dst->is_null) ctx->Free(dst->ptr);
422  uint8_t* copy = ctx->Allocate(src.len);
423  memcpy(copy, src.ptr, src.len);
424  *dst = StringVal(copy, src.len);
425  }
426 }
427 
428 template<>
430  const DecimalVal& src, DecimalVal* dst) {
431  if (src.is_null) return;
432  const FunctionContext::TypeDesc* arg = ctx->GetArgType(0);
433  DCHECK(arg != NULL);
434  if (arg->precision <= 9) {
435  if (dst->is_null || src.val4 < dst->val4) *dst = src;
436  } else if (arg->precision <= 19) {
437  if (dst->is_null || src.val8 < dst->val8) *dst = src;
438  } else {
439  if (dst->is_null || src.val16 < dst->val16) *dst = src;
440  }
441 }
442 
443 template<>
445  const DecimalVal& src, DecimalVal* dst) {
446  if (src.is_null) return;
447  const FunctionContext::TypeDesc* arg = ctx->GetArgType(0);
448  DCHECK(arg != NULL);
449  if (arg->precision <= 9) {
450  if (dst->is_null || src.val4 > dst->val4) *dst = src;
451  } else if (arg->precision <= 19) {
452  if (dst->is_null || src.val8 > dst->val8) *dst = src;
453  } else {
454  if (dst->is_null || src.val16 > dst->val16) *dst = src;
455  }
456 }
457 
458 template<>
460  const TimestampVal& src, TimestampVal* dst) {
461  if (src.is_null) return;
462  if (dst->is_null) {
463  *dst = src;
464  return;
465  }
468  if (src_tv < dst_tv) *dst = src;
469 }
470 
471 template<>
473  const TimestampVal& src, TimestampVal* dst) {
474  if (src.is_null) return;
475  if (dst->is_null) {
476  *dst = src;
477  return;
478  }
481  if (src_tv > dst_tv) *dst = src;
482 }
483 
484 // StringConcat intermediate state starts with the length of the first
485 // separator, followed by the accumulated string. The accumulated
486 // string starts with the separator of the first value that arrived in
487 // StringConcatUpdate().
488 typedef int StringConcatHeader;
489 
491  const StringVal& src, StringVal* result) {
493 }
494 
496  const StringVal& src, const StringVal& separator, StringVal* result) {
497  if (src.is_null) return;
498  const StringVal* sep = separator.is_null ? &DEFAULT_STRING_CONCAT_DELIM : &separator;
499  if (result->is_null) {
500  // Header of the intermediate state holds the length of the first separator.
501  const int header_len = sizeof(StringConcatHeader);
502  DCHECK(header_len == sizeof(sep->len));
503  *result = StringVal(ctx->Allocate(header_len), header_len);
504  *reinterpret_cast<StringConcatHeader*>(result->ptr) = sep->len;
505  }
506  int new_len = result->len + sep->len + src.len;
507  result->ptr = ctx->Reallocate(result->ptr, new_len);
508  memcpy(result->ptr + result->len, sep->ptr, sep->len);
509  result->len += sep->len;
510  memcpy(result->ptr + result->len, src.ptr, src.len);
511  result->len += src.len;
512  DCHECK(result->len == new_len);
513 }
514 
516  const StringVal& src, StringVal* result) {
517  if (src.is_null) return;
518  const int header_len = sizeof(StringConcatHeader);
519  if (result->is_null) {
520  // Copy the header from the first intermediate value.
521  *result = StringVal(ctx->Allocate(header_len), header_len);
522  *reinterpret_cast<StringConcatHeader*>(result->ptr) =
523  *reinterpret_cast<StringConcatHeader*>(src.ptr);
524  }
525  // Append the string portion of the intermediate src to result (omit src's header).
526  int new_len = result->len + src.len - header_len;
527  result->ptr = ctx->Reallocate(result->ptr, new_len);
528  memcpy(result->ptr + result->len, src.ptr + header_len, src.len - header_len);
529  result->len += src.len - header_len;
530  DCHECK(result->len == new_len);
531 }
532 
534  const StringVal& src) {
535  if (src.is_null) return src;
536  const int header_len = sizeof(StringConcatHeader);
537  DCHECK(src.len >= header_len);
538  int sep_len = *reinterpret_cast<StringConcatHeader*>(src.ptr);
539  DCHECK(src.len >= header_len + sep_len);
540  // Remove the header and the first separator.
541  StringVal result(ctx, src.len - header_len - sep_len);
542  memcpy(result.ptr, src.ptr + header_len + sep_len, result.len);
543  ctx->Free(src.ptr);
544  return result;
545 }
546 
547 // Compute distinctpc and distinctpcsa using Flajolet and Martin's algorithm
548 // (Probabilistic Counting Algorithms for Data Base Applications)
549 // We have implemented two variants here: one with stochastic averaging (with PCSA
550 // postfix) and one without.
551 // There are 4 phases to compute the aggregate:
552 // 1. allocate a bitmap, stored in the aggregation tuple's output string slot
553 // 2. update the bitmap per row (UpdateDistinctEstimateSlot)
554 // 3. for distributed plan, merge the bitmaps from all the nodes
555 // (UpdateMergeEstimateSlot)
556 // 4. compute the estimate using the bitmaps when all the rows are processed
557 // (FinalizeEstimateSlot)
558 const static int NUM_PC_BITMAPS = 64; // number of bitmaps
559 const static int PC_BITMAP_LENGTH = 32; // the length of each bit map
560 const static float PC_THETA = 0.77351f; // the magic number to compute the final result
561 
563  // Initialize the distinct estimate bit map - Probabilistic Counting Algorithms for Data
564  // Base Applications (Flajolet and Martin)
565  //
566  // The bitmap is a 64bit(1st index) x 32bit(2nd index) matrix.
567  // So, the string length of 256 byte is enough.
568  // The layout is:
569  // row 1: 8bit 8bit 8bit 8bit
570  // row 2: 8bit 8bit 8bit 8bit
571  // ... ..
572  // ... ..
573  // row 64: 8bit 8bit 8bit 8bit
574  //
575  // Using 32bit length, we can count up to 10^8. This will not be enough for Fact table
576  // primary key, but once we approach the limit, we could interpret the result as
577  // "every row is distinct".
578  //
579  // We use "string" type for DISTINCT_PC function so that we can use the string
580  // slot to hold the bitmaps.
581  dst->is_null = false;
582  int str_len = NUM_PC_BITMAPS * PC_BITMAP_LENGTH / 8;
583  dst->ptr = c->Allocate(str_len);
584  dst->len = str_len;
585  memset(dst->ptr, 0, str_len);
586 }
587 
588 static inline void SetDistinctEstimateBit(uint8_t* bitmap,
589  uint32_t row_index, uint32_t bit_index) {
590  // We need to convert Bitmap[alpha,index] into the index of the string.
591  // alpha tells which of the 32bit we've to jump to.
592  // index then lead us to the byte and bit.
593  uint32_t *int_bitmap = reinterpret_cast<uint32_t*>(bitmap);
594  int_bitmap[row_index] |= (1 << bit_index);
595 }
596 
597 static inline bool GetDistinctEstimateBit(uint8_t* bitmap,
598  uint32_t row_index, uint32_t bit_index) {
599  uint32_t *int_bitmap = reinterpret_cast<uint32_t*>(bitmap);
600  return ((int_bitmap[row_index] & (1 << bit_index)) > 0);
601 }
602 
603 template<typename T>
605  if (input.is_null) return;
606  // Core of the algorithm. This is a direct translation of the code in the paper.
607  // Please see the paper for details. For simple averaging, we need to compute hash
608  // values NUM_PC_BITMAPS times using NUM_PC_BITMAPS different hash functions (by using a
609  // different seed).
610  for (int i = 0; i < NUM_PC_BITMAPS; ++i) {
611  uint32_t hash_value = AnyValUtil::Hash(input, *c->GetArgType(0), i);
612  int bit_index = __builtin_ctz(hash_value);
613  if (UNLIKELY(hash_value == 0)) bit_index = PC_BITMAP_LENGTH - 1;
614  // Set bitmap[i, bit_index] to 1
615  SetDistinctEstimateBit(dst->ptr, i, bit_index);
616  }
617 }
618 
619 template<typename T>
621  if (input.is_null) return;
622 
623  // Core of the algorithm. This is a direct translation of the code in the paper.
624  // Please see the paper for details. Using stochastic averaging, we only need to
625  // the hash value once for each row.
626  uint32_t hash_value = AnyValUtil::Hash(input, *c->GetArgType(0), 0);
627  uint32_t row_index = hash_value % NUM_PC_BITMAPS;
628 
629  // We want the zero-based position of the least significant 1-bit in binary
630  // representation of hash_value. __builtin_ctz does exactly this because it returns
631  // the number of trailing 0-bits in x (or undefined if x is zero).
632  int bit_index = __builtin_ctz(hash_value / NUM_PC_BITMAPS);
633  if (UNLIKELY(hash_value == 0)) bit_index = PC_BITMAP_LENGTH - 1;
634 
635  // Set bitmap[row_index, bit_index] to 1
636  SetDistinctEstimateBit(dst->ptr, row_index, bit_index);
637 }
638 
639 string DistinctEstimateBitMapToString(uint8_t* v) {
640  stringstream debugstr;
641  for (int i = 0; i < NUM_PC_BITMAPS; ++i) {
642  for (int j = 0; j < PC_BITMAP_LENGTH; ++j) {
643  // print bitmap[i][j]
644  debugstr << GetDistinctEstimateBit(v, i, j);
645  }
646  debugstr << "\n";
647  }
648  debugstr << "\n";
649  return debugstr.str();
650 }
651 
653  const StringVal& src, StringVal* dst) {
654  DCHECK(!src.is_null);
655  DCHECK(!dst->is_null);
656  DCHECK_EQ(src.len, NUM_PC_BITMAPS * PC_BITMAP_LENGTH / 8);
657 
658  // Merge the bits
659  // I think _mm_or_ps can do it, but perf doesn't really matter here. We call this only
660  // once group per node.
661  for (int i = 0; i < NUM_PC_BITMAPS * PC_BITMAP_LENGTH / 8; ++i) {
662  *(dst->ptr + i) |= *(src.ptr + i);
663  }
664 
665  VLOG_ROW << "UpdateMergeEstimateSlot Src Bit map:\n"
667  VLOG_ROW << "UpdateMergeEstimateSlot Dst Bit map:\n"
669 }
670 
672  DCHECK(!src.is_null);
673  DCHECK_EQ(src.len, NUM_PC_BITMAPS * PC_BITMAP_LENGTH / 8);
674  VLOG_ROW << "FinalizeEstimateSlot Bit map:\n"
676 
677  // We haven't processed any rows if none of the bits are set. Therefore, we have zero
678  // distinct rows. We're overwriting the result in the same string buffer we've
679  // allocated.
680  bool is_empty = true;
681  for (int i = 0; i < NUM_PC_BITMAPS * PC_BITMAP_LENGTH / 8; ++i) {
682  if (src.ptr[i] != 0) {
683  is_empty = false;
684  break;
685  }
686  }
687  if (is_empty) return 0;
688 
689  // Convert the bitmap to a number, please see the paper for details
690  // In short, we count the average number of leading 1s (per row) in the bit map.
691  // The number is proportional to the log2(1/NUM_PC_BITMAPS of the actual number of
692  // distinct).
693  // To get the actual number of distinct, we'll do 2^avg / PC_THETA.
694  // PC_THETA is a magic number.
695  int sum = 0;
696  for (int i = 0; i < NUM_PC_BITMAPS; ++i) {
697  int row_bit_count = 0;
698  // Count the number of leading ones for each row in the bitmap
699  // We could have used the build in __builtin_clz to count of number of leading zeros
700  // but we first need to invert the 1 and 0.
701  while (GetDistinctEstimateBit(src.ptr, i, row_bit_count) &&
702  row_bit_count < PC_BITMAP_LENGTH) {
703  ++row_bit_count;
704  }
705  sum += row_bit_count;
706  }
707  double avg = static_cast<double>(sum) / static_cast<double>(NUM_PC_BITMAPS);
708  double result = pow(static_cast<double>(2), avg) / PC_THETA;
709  return result;
710 }
711 
713  DCHECK(!src.is_null);
714  double estimate = DistinceEstimateFinalize(src);
715  c->Free(src.ptr);
716  return static_cast<int64_t>(estimate);
717 }
718 
720  DCHECK(!src.is_null);
721  // When using stochastic averaging, the result has to be multiplied by NUM_PC_BITMAPS.
722  double estimate = DistinceEstimateFinalize(src) * NUM_PC_BITMAPS;
723  c->Free(src.ptr);
724  return static_cast<int64_t>(estimate);
725 }
726 
727 // Histogram constants
728 // TODO: Expose as constant argument parameters to the UDA.
729 const static int NUM_BUCKETS = 100;
730 const static int NUM_SAMPLES_PER_BUCKET = 200;
732 const static int MAX_STRING_SAMPLE_LEN = 10;
733 
734 template <typename T>
736  // Sample value
737  T val;
738  // Key on which the samples are sorted.
739  double key;
740 
741  ReservoirSample() : key(-1) { }
742  ReservoirSample(const T& val) : val(val), key(-1) { }
743 
744  // Gets a copy of the sample value that allocates memory from ctx, if necessary.
745  T GetValue(FunctionContext* ctx) { return val; }
746 };
747 
748 // Template specialization for StringVal because we do not store the StringVal itself.
749 // Instead, we keep fixed size arrays and truncate longer strings if necessary.
750 template <>
753  int len; // Size of string (up to MAX_STRING_SAMPLE_LEN)
754  double key;
755 
756  ReservoirSample() : len(0), key(-1) { }
757 
758  ReservoirSample(const StringVal& string_val) : key(-1) {
759  len = min(string_val.len, MAX_STRING_SAMPLE_LEN);
760  memcpy(&val[0], string_val.ptr, len);
761  }
762 
763  // Gets a copy of the sample value that allocates memory from ctx, if necessary.
765  StringVal result = StringVal(ctx, len);
766  memcpy(result.ptr, &val[0], len);
767  return result;
768  }
769 };
770 
771 template <typename T>
774 
775  // Number of collected samples.
777 
778  // Number of values over which the samples were collected.
779  int64_t source_size;
780 
781  // Random number generator for generating 64-bit integers
782  // TODO: Replace with mt19937_64 when upgrading boost
783  ranlux64_3 rng;
784 
785  int64_t GetNext64(int64_t max) {
786  uniform_int<int64_t> dist(0, max);
787  return dist(rng);
788  }
789 };
790 
791 template <typename T>
793  int str_len = sizeof(ReservoirSampleState<T>);
794  dst->is_null = false;
795  dst->ptr = ctx->Allocate(str_len);
796  dst->len = str_len;
797  memset(dst->ptr, 0, str_len);
798  *reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr) = ReservoirSampleState<T>();
799 }
800 
801 template <typename T>
803  StringVal* dst) {
804  if (src.is_null) return;
805  DCHECK(!dst->is_null);
806  DCHECK_EQ(dst->len, sizeof(ReservoirSampleState<T>));
807  ReservoirSampleState<T>* state = reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr);
808 
809  if (state->num_samples < NUM_SAMPLES) {
810  state->samples[state->num_samples++] = ReservoirSample<T>(src);
811  } else {
812  int64_t r = state->GetNext64(state->source_size);
813  if (r < NUM_SAMPLES) state->samples[r] = ReservoirSample<T>(src);
814  }
815  ++state->source_size;
816 }
817 
818 template <typename T>
820  const StringVal& src) {
821  if (src.is_null) return src;
822  StringVal result(ctx, src.len);
823  memcpy(result.ptr, src.ptr, src.len);
824  ctx->Free(src.ptr);
825 
826  ReservoirSampleState<T>* state = reinterpret_cast<ReservoirSampleState<T>*>(result.ptr);
827  // Assign keys to the samples that haven't been set (i.e. if serializing after
828  // Update()). In weighted reservoir sampling the keys are typically assigned as the
829  // sources are being sampled, but this requires maintaining the samples in sorted order
830  // (by key) and it accomplishes the same thing at this point because all data points
831  // coming into Update() get the same weight. When the samples are later merged, they do
832  // have different weights (set here) that are proportional to the source_size, i.e.
833  // samples selected from a larger stream are more likely to end up in the final sample
834  // set. In order to avoid the extra overhead in Update(), we approximate the keys by
835  // picking random numbers in the range [(SOURCE_SIZE - SAMPLE_SIZE)/(SOURCE_SIZE), 1].
836  // This weights the keys by SOURCE_SIZE and implies that the samples picked had the
837  // highest keys, because values not sampled would have keys between 0 and
838  // (SOURCE_SIZE - SAMPLE_SIZE)/(SOURCE_SIZE).
839  for (int i = 0; i < state->num_samples; ++i) {
840  if (state->samples[i].key >= 0) continue;
841  int r = rand() % state->num_samples;
842  state->samples[i].key = ((double) state->source_size - r) / state->source_size;
843  }
844  return result;
845 }
846 
847 template <typename T>
849  return i.val.val < j.val.val;
850 }
851 
852 template <>
854  const ReservoirSample<StringVal>& j) {
855  int n = min(i.len, j.len);
856  int result = memcmp(&i.val[0], &j.val[0], n);
857  if (result == 0) return i.len < j.len;
858  return result < 0;
859 }
860 
861 template <>
863  const ReservoirSample<DecimalVal>& j) {
864  return i.val.val16 < j.val.val16;
865 }
866 
867 template <>
870  if (i.val.date == j.val.date) return i.val.time_of_day < j.val.time_of_day;
871  else return i.val.date < j.val.date;
872 }
873 
874 template <typename T>
876  return i.key > j.key;
877 }
878 
879 template <typename T>
881  const StringVal& src_val, StringVal* dst_val) {
882  if (src_val.is_null) return;
883  DCHECK(!dst_val->is_null);
884  DCHECK(!src_val.is_null);
885  DCHECK_EQ(src_val.len, sizeof(ReservoirSampleState<T>));
886  DCHECK_EQ(dst_val->len, sizeof(ReservoirSampleState<T>));
887  ReservoirSampleState<T>* src = reinterpret_cast<ReservoirSampleState<T>*>(src_val.ptr);
888  ReservoirSampleState<T>* dst = reinterpret_cast<ReservoirSampleState<T>*>(dst_val->ptr);
889 
890  int src_idx = 0;
891  int src_max = src->num_samples;
892  // First, fill up the dst samples if they don't already exist. The samples are now
893  // ordered as a min-heap on the key.
894  while (dst->num_samples < NUM_SAMPLES && src_idx < src_max) {
895  DCHECK_GE(src->samples[src_idx].key, 0);
896  dst->samples[dst->num_samples++] = src->samples[src_idx++];
897  push_heap(dst->samples, dst->samples + dst->num_samples, SampleKeyGreater<T>);
898  }
899  // Then for every sample from source, take the sample if the key is greater than
900  // the minimum key in the min-heap.
901  while (src_idx < src_max) {
902  DCHECK_GE(src->samples[src_idx].key, 0);
903  if (src->samples[src_idx].key > dst->samples[0].key) {
904  pop_heap(dst->samples, dst->samples + NUM_SAMPLES, SampleKeyGreater<T>);
905  dst->samples[NUM_SAMPLES - 1] = src->samples[src_idx];
906  push_heap(dst->samples, dst->samples + NUM_SAMPLES, SampleKeyGreater<T>);
907  }
908  ++src_idx;
909  }
910  dst->source_size += src->source_size;
911 }
912 
913 template <typename T>
914 void PrintSample(const ReservoirSample<T>& v, ostream* os) { *os << v.val.val; }
915 
916 template <>
917 void PrintSample(const ReservoirSample<TinyIntVal>& v, ostream* os) {
918  *os << static_cast<int32_t>(v.val.val);
919 }
920 
921 template <>
922 void PrintSample(const ReservoirSample<StringVal>& v, ostream* os) {
923  string s(reinterpret_cast<const char*>(&v.val[0]), v.len);
924  *os << s;
925 }
926 
927 template <>
928 void PrintSample(const ReservoirSample<DecimalVal>& v, ostream* os) {
929  *os << v.val.val16;
930 }
931 
932 template <>
933 void PrintSample(const ReservoirSample<TimestampVal>& v, ostream* os) {
935 }
936 
937 template <typename T>
939  const StringVal& src_val) {
940  DCHECK(!src_val.is_null);
941  DCHECK_EQ(src_val.len, sizeof(ReservoirSampleState<T>));
942  ReservoirSampleState<T>* src = reinterpret_cast<ReservoirSampleState<T>*>(src_val.ptr);
943 
944  stringstream out;
945  for (int i = 0; i < src->num_samples; ++i) {
946  PrintSample<T>(src->samples[i], &out);
947  if (i < (src->num_samples - 1)) out << ", ";
948  }
949  const string& out_str = out.str();
950  StringVal result_str(ctx, out_str.size());
951  memcpy(result_str.ptr, out_str.c_str(), result_str.len);
952  ctx->Free(src_val.ptr);
953  return result_str;
954 }
955 
956 template <typename T>
958  const StringVal& src_val) {
959  DCHECK(!src_val.is_null);
960  DCHECK_EQ(src_val.len, sizeof(ReservoirSampleState<T>));
961 
962  ReservoirSampleState<T>* src = reinterpret_cast<ReservoirSampleState<T>*>(src_val.ptr);
963  sort(src->samples, src->samples + src->num_samples, SampleValLess<T>);
964 
965  stringstream out;
966  int num_buckets = min(src->num_samples, NUM_BUCKETS);
967  int samples_per_bucket = max(src->num_samples / NUM_BUCKETS, 1);
968  for (int bucket_idx = 0; bucket_idx < num_buckets; ++bucket_idx) {
969  int sample_idx = (bucket_idx + 1) * samples_per_bucket - 1;
970  PrintSample<T>(src->samples[sample_idx], &out);
971  if (bucket_idx < (num_buckets - 1)) out << ", ";
972  }
973  const string& out_str = out.str();
974  StringVal result_str(ctx, out_str.size());
975  memcpy(result_str.ptr, out_str.c_str(), result_str.len);
976  ctx->Free(src_val.ptr);
977  return result_str;
978 }
979 
980 template <typename T>
982  const StringVal& src_val) {
983  DCHECK(!src_val.is_null);
984  DCHECK_EQ(src_val.len, sizeof(ReservoirSampleState<T>));
985 
986  ReservoirSampleState<T>* src = reinterpret_cast<ReservoirSampleState<T>*>(src_val.ptr);
987  if (src->num_samples == 0) {
988  ctx->Free(src_val.ptr);
989  return T::null();
990  }
991  sort(src->samples, src->samples + src->num_samples, SampleValLess<T>);
992 
993  T result = src->samples[src->num_samples / 2].GetValue(ctx);
994  ctx->Free(src_val.ptr);
995  return result;
996 }
997 
999  int str_len = HLL_LEN;
1000  dst->is_null = false;
1001  dst->ptr = ctx->Allocate(str_len);
1002  dst->len = str_len;
1003  memset(dst->ptr, 0, str_len);
1004 }
1005 
1006 template <typename T>
1008  if (src.is_null) return;
1009  DCHECK(!dst->is_null);
1010  DCHECK_EQ(dst->len, HLL_LEN);
1013  if (hash_value != 0) {
1014  // Use the lower bits to index into the number of streams and then
1015  // find the first 1 bit after the index bits.
1016  int idx = hash_value & (HLL_LEN - 1);
1017  uint8_t first_one_bit = __builtin_ctzl(hash_value >> HLL_PRECISION) + 1;
1018  dst->ptr[idx] = ::max(dst->ptr[idx], first_one_bit);
1019  }
1020 }
1021 
1023  StringVal* dst) {
1024  DCHECK(!dst->is_null);
1025  DCHECK(!src.is_null);
1026  DCHECK_EQ(dst->len, HLL_LEN);
1027  DCHECK_EQ(src.len, HLL_LEN);
1028  for (int i = 0; i < src.len; ++i) {
1029  dst->ptr[i] = ::max(dst->ptr[i], src.ptr[i]);
1030  }
1031 }
1032 
1034  int32_t num_buckets) {
1035  DCHECK_NOTNULL(buckets);
1036  DCHECK_EQ(num_buckets, HLL_LEN);
1037 
1038  // Empirical constants for the algorithm.
1039  float alpha = 0;
1040  if (HLL_LEN == 16) {
1041  alpha = 0.673f;
1042  } else if (HLL_LEN == 32) {
1043  alpha = 0.697f;
1044  } else if (HLL_LEN == 64) {
1045  alpha = 0.709f;
1046  } else {
1047  alpha = 0.7213f / (1 + 1.079f / HLL_LEN);
1048  }
1049 
1050  float harmonic_mean = 0;
1051  int num_zero_registers = 0;
1052  // TODO: Consider improving this loop (e.g. replacing 'if' with arithmetic op).
1053  for (int i = 0; i < num_buckets; ++i) {
1054  harmonic_mean += powf(2.0f, -buckets[i]);
1055  if (buckets[i] == 0) ++num_zero_registers;
1056  }
1057  harmonic_mean = 1.0f / harmonic_mean;
1058  int64_t estimate = alpha * HLL_LEN * HLL_LEN * harmonic_mean;
1059 
1060  if (num_zero_registers != 0) {
1061  // Estimated cardinality is too low. Hll is too inaccurate here, instead use
1062  // linear counting.
1063  estimate = HLL_LEN * log(static_cast<float>(HLL_LEN) / num_zero_registers);
1064  }
1065 
1066  return estimate;
1067 }
1068 
1070  DCHECK(!src.is_null);
1071  uint64_t estimate = HllFinalEstimate(src.ptr, src.len);
1072  ctx->Free(src.ptr);
1073  return estimate;
1074 }
1075 
1076 // An implementation of a simple single pass variance algorithm. A standard UDA must
1077 // be single pass (i.e. does not scan the table more than once), so the most canonical
1078 // two pass approach is not practical.
1080  double mean;
1081  double m2;
1082  int64_t count;
1083 };
1084 
1085 // Set pop=true for population variance, false for sample variance
1086 double ComputeKnuthVariance(const KnuthVarianceState& state, bool pop) {
1087  // Return zero for 1 tuple specified by
1088  // http://docs.oracle.com/cd/B19306_01/server.102/b14200/functions212.htm
1089  if (state.count == 1) return 0.0;
1090  if (pop) return state.m2 / state.count;
1091  return state.m2 / (state.count - 1);
1092 }
1093 
1095  dst->is_null = false;
1096  DCHECK_EQ(dst->len, sizeof(KnuthVarianceState));
1097  memset(dst->ptr, 0, dst->len);
1098 }
1099 
1100 template <typename T>
1102  StringVal* dst) {
1103  DCHECK(!dst->is_null);
1104  DCHECK_EQ(dst->len, sizeof(KnuthVarianceState));
1105  if (src.is_null) return;
1106  KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(dst->ptr);
1107  double temp = 1 + state->count;
1108  double delta = src.val - state->mean;
1109  double r = delta / temp;
1110  state->mean += r;
1111  state->m2 += state->count * delta * r;
1112  state->count = temp;
1113 }
1114 
1116  StringVal* dst) {
1117  DCHECK(!dst->is_null);
1118  DCHECK_EQ(dst->len, sizeof(KnuthVarianceState));
1119  DCHECK(!src.is_null);
1120  DCHECK_EQ(src.len, sizeof(KnuthVarianceState));
1121  // Reference implementation:
1122  // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
1123  KnuthVarianceState* src_state = reinterpret_cast<KnuthVarianceState*>(src.ptr);
1124  KnuthVarianceState* dst_state = reinterpret_cast<KnuthVarianceState*>(dst->ptr);
1125  if (src_state->count == 0) return;
1126  double delta = dst_state->mean - src_state->mean;
1127  double sum_count = dst_state->count + src_state->count;
1128  dst_state->mean = src_state->mean + delta * (dst_state->count / sum_count);
1129  dst_state->m2 = (src_state->m2) + dst_state->m2 +
1130  (delta * delta) * (src_state->count * dst_state->count / sum_count);
1131  dst_state->count = sum_count;
1132 }
1133 
1135  FunctionContext* ctx, const StringVal& state_sv) {
1136  KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(state_sv.ptr);
1137  if (state->count == 0) return DoubleVal::null();
1138  double variance = ComputeKnuthVariance(*state, false);
1139  return DoubleVal(variance);
1140 }
1141 
1143  const StringVal& state_sv) {
1144  DCHECK(!state_sv.is_null);
1145  DCHECK_EQ(state_sv.len, sizeof(KnuthVarianceState));
1146  KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(state_sv.ptr);
1147  if (state->count == 0) return DoubleVal::null();
1148  return ComputeKnuthVariance(*state, true);
1149 }
1150 
1152  const StringVal& state_sv) {
1153  DCHECK(!state_sv.is_null);
1154  DCHECK_EQ(state_sv.len, sizeof(KnuthVarianceState));
1155  KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(state_sv.ptr);
1156  if (state->count == 0) return DoubleVal::null();
1157  return sqrt(ComputeKnuthVariance(*state, false));
1158 }
1159 
1161  const StringVal& state_sv) {
1162  DCHECK(!state_sv.is_null);
1163  DCHECK_EQ(state_sv.len, sizeof(KnuthVarianceState));
1164  KnuthVarianceState* state = reinterpret_cast<KnuthVarianceState*>(state_sv.ptr);
1165  if (state->count == 0) return DoubleVal::null();
1166  return sqrt(ComputeKnuthVariance(*state, true));
1167 }
1168 
1169 struct RankState {
1170  int64_t rank;
1171  int64_t count;
1172  RankState() : rank(1), count(0) { }
1173 };
1174 
1176  int str_len = sizeof(RankState);
1177  dst->is_null = false;
1178  dst->ptr = ctx->Allocate(str_len);
1179  dst->len = str_len;
1180  *reinterpret_cast<RankState*>(dst->ptr) = RankState();
1181 }
1182 
1184  DCHECK(!dst->is_null);
1185  DCHECK_EQ(dst->len, sizeof(RankState));
1186  RankState* state = reinterpret_cast<RankState*>(dst->ptr);
1187  ++state->count;
1188 }
1189 
1191 
1193  StringVal& src_val) {
1194  DCHECK(!src_val.is_null);
1195  DCHECK_EQ(src_val.len, sizeof(RankState));
1196  RankState* state = reinterpret_cast<RankState*>(src_val.ptr);
1197  DCHECK_GT(state->count, 0);
1198  DCHECK_GT(state->rank, 0);
1199  int64_t result = state->rank;
1200 
1201  // Prepares future calls for the next rank
1202  state->rank += state->count;
1203  state->count = 0;
1204  return BigIntVal(result);
1205 }
1206 
1208  StringVal& src_val) {
1209  DCHECK(!src_val.is_null);
1210  DCHECK_EQ(src_val.len, sizeof(RankState));
1211  RankState* state = reinterpret_cast<RankState*>(src_val.ptr);
1212  DCHECK_EQ(state->count, 0);
1213  DCHECK_GT(state->rank, 0);
1214  int64_t result = state->rank;
1215 
1216  // Prepares future calls for the next rank
1217  ++state->rank;
1218  return BigIntVal(result);
1219 }
1220 
1222  StringVal& src_val) {
1223  DCHECK(!src_val.is_null);
1224  DCHECK_EQ(src_val.len, sizeof(RankState));
1225  RankState* state = reinterpret_cast<RankState*>(src_val.ptr);
1226  int64_t result = state->rank;
1227  ctx->Free(src_val.ptr);
1228  return BigIntVal(result);
1229 }
1230 
1231 template <typename T>
1232 void AggregateFunctions::LastValUpdate(FunctionContext* ctx, const T& src, T* dst) {
1233  *dst = src;
1234 }
1235 
1236 template <>
1238  StringVal* dst) {
1239  if (src.is_null) {
1240  if (!dst->is_null) ctx->Free(dst->ptr);
1241  *dst = StringVal::null();
1242  return;
1243  }
1244 
1245  if (dst->is_null) {
1246  dst->ptr = ctx->Allocate(src.len);
1247  dst->is_null = false;
1248  } else {
1249  dst->ptr = ctx->Reallocate(dst->ptr, src.len);
1250  }
1251  memcpy(dst->ptr, src.ptr, src.len);
1252  dst->len = src.len;
1253 }
1254 
1255 template <typename T>
1256 void AggregateFunctions::LastValRemove(FunctionContext* ctx, const T& src, T* dst) {
1257  if (ctx->impl()->num_removes() >= ctx->impl()->num_updates()) *dst = T::null();
1258 }
1259 
1260 template <>
1262  StringVal* dst) {
1263  if (ctx->impl()->num_removes() >= ctx->impl()->num_updates()) {
1264  if (!dst->is_null) ctx->Free(dst->ptr);
1265  *dst = StringVal::null();
1266  }
1267 }
1268 
1269 template <typename T>
1270 void AggregateFunctions::FirstValUpdate(FunctionContext* ctx, const T& src, T* dst) {
1271  // The first call to FirstValUpdate sets the value of dst.
1272  if (ctx->impl()->num_updates() > 1) return;
1273  // num_updates is incremented before calling Update(), so it should never be 0.
1274  // Remove() should never be called for FIRST_VALUE.
1275  DCHECK_GT(ctx->impl()->num_updates(), 0);
1276  DCHECK_EQ(ctx->impl()->num_removes(), 0);
1277  *dst = src;
1278 }
1279 
1280 template <>
1282  StringVal* dst) {
1283  if (ctx->impl()->num_updates() > 1) return;
1284  DCHECK_GT(ctx->impl()->num_updates(), 0);
1285  DCHECK_EQ(ctx->impl()->num_removes(), 0);
1286  if (src.is_null) {
1287  *dst = StringVal::null();
1288  return;
1289  }
1290  *dst = StringVal(ctx->Allocate(src.len), src.len);
1291  memcpy(dst->ptr, src.ptr, src.len);
1292 }
1293 
1294 template <typename T>
1296  const BigIntVal&, T* dst) {
1297  LastValUpdate<T>(ctx, src, dst);
1298 }
1299 
1300 template <typename T>
1302  DCHECK_EQ(ctx->GetNumArgs(), 3);
1303  DCHECK(ctx->IsArgConstant(1));
1304  DCHECK(ctx->IsArgConstant(2));
1305  DCHECK_EQ(ctx->GetArgType(0)->type, ctx->GetArgType(2)->type);
1306  *dst = *static_cast<T*>(ctx->GetConstantArg(2));
1307 }
1308 
1309 template <typename T>
1311  const BigIntVal&, const T& default_value, T* dst) {
1312  *dst = src;
1313 }
1314 
1315 // Stamp out the templates for the types we need.
1316 template void AggregateFunctions::InitZero<BigIntVal>(FunctionContext*, BigIntVal* dst);
1317 
1318 template void AggregateFunctions::AvgUpdate<BigIntVal>(
1319  FunctionContext* ctx, const BigIntVal& input, StringVal* dst);
1320 template void AggregateFunctions::AvgUpdate<DoubleVal>(
1321  FunctionContext* ctx, const DoubleVal& input, StringVal* dst);
1322 template void AggregateFunctions::AvgRemove<BigIntVal>(
1323  FunctionContext* ctx, const BigIntVal& input, StringVal* dst);
1324 template void AggregateFunctions::AvgRemove<DoubleVal>(
1325  FunctionContext* ctx, const DoubleVal& input, StringVal* dst);
1326 
1327 template void AggregateFunctions::SumUpdate<TinyIntVal, BigIntVal>(
1328  FunctionContext*, const TinyIntVal& src, BigIntVal* dst);
1329 template void AggregateFunctions::SumUpdate<SmallIntVal, BigIntVal>(
1330  FunctionContext*, const SmallIntVal& src, BigIntVal* dst);
1331 template void AggregateFunctions::SumUpdate<IntVal, BigIntVal>(
1332  FunctionContext*, const IntVal& src, BigIntVal* dst);
1333 template void AggregateFunctions::SumUpdate<BigIntVal, BigIntVal>(
1334  FunctionContext*, const BigIntVal& src, BigIntVal* dst);
1335 template void AggregateFunctions::SumUpdate<FloatVal, DoubleVal>(
1336  FunctionContext*, const FloatVal& src, DoubleVal* dst);
1337 template void AggregateFunctions::SumUpdate<DoubleVal, DoubleVal>(
1338  FunctionContext*, const DoubleVal& src, DoubleVal* dst);
1339 
1340 template void AggregateFunctions::SumRemove<TinyIntVal, BigIntVal>(
1341  FunctionContext*, const TinyIntVal& src, BigIntVal* dst);
1342 template void AggregateFunctions::SumRemove<SmallIntVal, BigIntVal>(
1343  FunctionContext*, const SmallIntVal& src, BigIntVal* dst);
1344 template void AggregateFunctions::SumRemove<IntVal, BigIntVal>(
1345  FunctionContext*, const IntVal& src, BigIntVal* dst);
1346 template void AggregateFunctions::SumRemove<BigIntVal, BigIntVal>(
1347  FunctionContext*, const BigIntVal& src, BigIntVal* dst);
1348 template void AggregateFunctions::SumRemove<FloatVal, DoubleVal>(
1349  FunctionContext*, const FloatVal& src, DoubleVal* dst);
1350 template void AggregateFunctions::SumRemove<DoubleVal, DoubleVal>(
1351  FunctionContext*, const DoubleVal& src, DoubleVal* dst);
1352 
1353 template void AggregateFunctions::Min<BooleanVal>(
1354  FunctionContext*, const BooleanVal& src, BooleanVal* dst);
1355 template void AggregateFunctions::Min<TinyIntVal>(
1356  FunctionContext*, const TinyIntVal& src, TinyIntVal* dst);
1357 template void AggregateFunctions::Min<SmallIntVal>(
1358  FunctionContext*, const SmallIntVal& src, SmallIntVal* dst);
1359 template void AggregateFunctions::Min<IntVal>(
1360  FunctionContext*, const IntVal& src, IntVal* dst);
1361 template void AggregateFunctions::Min<BigIntVal>(
1362  FunctionContext*, const BigIntVal& src, BigIntVal* dst);
1363 template void AggregateFunctions::Min<FloatVal>(
1364  FunctionContext*, const FloatVal& src, FloatVal* dst);
1365 template void AggregateFunctions::Min<DoubleVal>(
1366  FunctionContext*, const DoubleVal& src, DoubleVal* dst);
1367 template void AggregateFunctions::Min<StringVal>(
1368  FunctionContext*, const StringVal& src, StringVal* dst);
1369 template void AggregateFunctions::Min<DecimalVal>(
1370  FunctionContext*, const DecimalVal& src, DecimalVal* dst);
1371 
1372 template void AggregateFunctions::Max<BooleanVal>(
1373  FunctionContext*, const BooleanVal& src, BooleanVal* dst);
1374 template void AggregateFunctions::Max<TinyIntVal>(
1375  FunctionContext*, const TinyIntVal& src, TinyIntVal* dst);
1376 template void AggregateFunctions::Max<SmallIntVal>(
1377  FunctionContext*, const SmallIntVal& src, SmallIntVal* dst);
1378 template void AggregateFunctions::Max<IntVal>(
1379  FunctionContext*, const IntVal& src, IntVal* dst);
1380 template void AggregateFunctions::Max<BigIntVal>(
1381  FunctionContext*, const BigIntVal& src, BigIntVal* dst);
1382 template void AggregateFunctions::Max<FloatVal>(
1383  FunctionContext*, const FloatVal& src, FloatVal* dst);
1384 template void AggregateFunctions::Max<DoubleVal>(
1385  FunctionContext*, const DoubleVal& src, DoubleVal* dst);
1386 template void AggregateFunctions::Max<StringVal>(
1387  FunctionContext*, const StringVal& src, StringVal* dst);
1388 template void AggregateFunctions::Max<DecimalVal>(
1389  FunctionContext*, const DecimalVal& src, DecimalVal* dst);
1390 
1391 template void AggregateFunctions::PcUpdate(
1392  FunctionContext*, const BooleanVal&, StringVal*);
1393 template void AggregateFunctions::PcUpdate(
1394  FunctionContext*, const TinyIntVal&, StringVal*);
1395 template void AggregateFunctions::PcUpdate(
1397 template void AggregateFunctions::PcUpdate(
1398  FunctionContext*, const IntVal&, StringVal*);
1399 template void AggregateFunctions::PcUpdate(
1400  FunctionContext*, const BigIntVal&, StringVal*);
1401 template void AggregateFunctions::PcUpdate(
1402  FunctionContext*, const FloatVal&, StringVal*);
1403 template void AggregateFunctions::PcUpdate(
1404  FunctionContext*, const DoubleVal&, StringVal*);
1405 template void AggregateFunctions::PcUpdate(
1406  FunctionContext*, const StringVal&, StringVal*);
1407 template void AggregateFunctions::PcUpdate(
1409 template void AggregateFunctions::PcUpdate(
1410  FunctionContext*, const DecimalVal&, StringVal*);
1411 
1412 template void AggregateFunctions::PcsaUpdate(
1413  FunctionContext*, const BooleanVal&, StringVal*);
1414 template void AggregateFunctions::PcsaUpdate(
1415  FunctionContext*, const TinyIntVal&, StringVal*);
1416 template void AggregateFunctions::PcsaUpdate(
1418 template void AggregateFunctions::PcsaUpdate(
1419  FunctionContext*, const IntVal&, StringVal*);
1420 template void AggregateFunctions::PcsaUpdate(
1421  FunctionContext*, const BigIntVal&, StringVal*);
1422 template void AggregateFunctions::PcsaUpdate(
1423  FunctionContext*, const FloatVal&, StringVal*);
1424 template void AggregateFunctions::PcsaUpdate(
1425  FunctionContext*, const DoubleVal&, StringVal*);
1426 template void AggregateFunctions::PcsaUpdate(
1427  FunctionContext*, const StringVal&, StringVal*);
1428 template void AggregateFunctions::PcsaUpdate(
1430 template void AggregateFunctions::PcsaUpdate(
1431  FunctionContext*, const DecimalVal&, StringVal*);
1432 
1433 template void AggregateFunctions::ReservoirSampleInit<BooleanVal>(
1435 template void AggregateFunctions::ReservoirSampleInit<TinyIntVal>(
1437 template void AggregateFunctions::ReservoirSampleInit<SmallIntVal>(
1439 template void AggregateFunctions::ReservoirSampleInit<IntVal>(
1441 template void AggregateFunctions::ReservoirSampleInit<BigIntVal>(
1443 template void AggregateFunctions::ReservoirSampleInit<FloatVal>(
1445 template void AggregateFunctions::ReservoirSampleInit<DoubleVal>(
1447 template void AggregateFunctions::ReservoirSampleInit<StringVal>(
1449 template void AggregateFunctions::ReservoirSampleInit<TimestampVal>(
1451 template void AggregateFunctions::ReservoirSampleInit<DecimalVal>(
1453 
1455  FunctionContext*, const BooleanVal&, StringVal*);
1457  FunctionContext*, const TinyIntVal&, StringVal*);
1461  FunctionContext*, const IntVal&, StringVal*);
1463  FunctionContext*, const BigIntVal&, StringVal*);
1465  FunctionContext*, const FloatVal&, StringVal*);
1467  FunctionContext*, const DoubleVal&, StringVal*);
1469  FunctionContext*, const StringVal&, StringVal*);
1473  FunctionContext*, const DecimalVal&, StringVal*);
1474 
1475 template const StringVal AggregateFunctions::ReservoirSampleSerialize<BooleanVal>(
1476  FunctionContext*, const StringVal&);
1477 template const StringVal AggregateFunctions::ReservoirSampleSerialize<TinyIntVal>(
1478  FunctionContext*, const StringVal&);
1479 template const StringVal AggregateFunctions::ReservoirSampleSerialize<SmallIntVal>(
1480  FunctionContext*, const StringVal&);
1481 template const StringVal AggregateFunctions::ReservoirSampleSerialize<IntVal>(
1482  FunctionContext*, const StringVal&);
1483 template const StringVal AggregateFunctions::ReservoirSampleSerialize<BigIntVal>(
1484  FunctionContext*, const StringVal&);
1485 template const StringVal AggregateFunctions::ReservoirSampleSerialize<FloatVal>(
1486  FunctionContext*, const StringVal&);
1487 template const StringVal AggregateFunctions::ReservoirSampleSerialize<DoubleVal>(
1488  FunctionContext*, const StringVal&);
1489 template const StringVal AggregateFunctions::ReservoirSampleSerialize<StringVal>(
1490  FunctionContext*, const StringVal&);
1491 template const StringVal AggregateFunctions::ReservoirSampleSerialize<TimestampVal>(
1492  FunctionContext*, const StringVal&);
1493 template const StringVal AggregateFunctions::ReservoirSampleSerialize<DecimalVal>(
1494  FunctionContext*, const StringVal&);
1495 
1496 template void AggregateFunctions::ReservoirSampleMerge<BooleanVal>(
1497  FunctionContext*, const StringVal&, StringVal*);
1498 template void AggregateFunctions::ReservoirSampleMerge<TinyIntVal>(
1499  FunctionContext*, const StringVal&, StringVal*);
1500 template void AggregateFunctions::ReservoirSampleMerge<SmallIntVal>(
1501  FunctionContext*, const StringVal&, StringVal*);
1502 template void AggregateFunctions::ReservoirSampleMerge<IntVal>(
1503  FunctionContext*, const StringVal&, StringVal*);
1504 template void AggregateFunctions::ReservoirSampleMerge<BigIntVal>(
1505  FunctionContext*, const StringVal&, StringVal*);
1506 template void AggregateFunctions::ReservoirSampleMerge<FloatVal>(
1507  FunctionContext*, const StringVal&, StringVal*);
1508 template void AggregateFunctions::ReservoirSampleMerge<DoubleVal>(
1509  FunctionContext*, const StringVal&, StringVal*);
1510 template void AggregateFunctions::ReservoirSampleMerge<StringVal>(
1511  FunctionContext*, const StringVal&, StringVal*);
1512 template void AggregateFunctions::ReservoirSampleMerge<TimestampVal>(
1513  FunctionContext*, const StringVal&, StringVal*);
1514 template void AggregateFunctions::ReservoirSampleMerge<DecimalVal>(
1515  FunctionContext*, const StringVal&, StringVal*);
1516 
1517 template StringVal AggregateFunctions::ReservoirSampleFinalize<BooleanVal>(
1518  FunctionContext*, const StringVal&);
1519 template StringVal AggregateFunctions::ReservoirSampleFinalize<TinyIntVal>(
1520  FunctionContext*, const StringVal&);
1521 template StringVal AggregateFunctions::ReservoirSampleFinalize<SmallIntVal>(
1522  FunctionContext*, const StringVal&);
1523 template StringVal AggregateFunctions::ReservoirSampleFinalize<IntVal>(
1524  FunctionContext*, const StringVal&);
1525 template StringVal AggregateFunctions::ReservoirSampleFinalize<BigIntVal>(
1526  FunctionContext*, const StringVal&);
1527 template StringVal AggregateFunctions::ReservoirSampleFinalize<FloatVal>(
1528  FunctionContext*, const StringVal&);
1529 template StringVal AggregateFunctions::ReservoirSampleFinalize<DoubleVal>(
1530  FunctionContext*, const StringVal&);
1531 template StringVal AggregateFunctions::ReservoirSampleFinalize<StringVal>(
1532  FunctionContext*, const StringVal&);
1533 template StringVal AggregateFunctions::ReservoirSampleFinalize<TimestampVal>(
1534  FunctionContext*, const StringVal&);
1535 template StringVal AggregateFunctions::ReservoirSampleFinalize<DecimalVal>(
1536  FunctionContext*, const StringVal&);
1537 
1538 template StringVal AggregateFunctions::HistogramFinalize<BooleanVal>(
1539  FunctionContext*, const StringVal&);
1540 template StringVal AggregateFunctions::HistogramFinalize<TinyIntVal>(
1541  FunctionContext*, const StringVal&);
1542 template StringVal AggregateFunctions::HistogramFinalize<SmallIntVal>(
1543  FunctionContext*, const StringVal&);
1544 template StringVal AggregateFunctions::HistogramFinalize<IntVal>(
1545  FunctionContext*, const StringVal&);
1546 template StringVal AggregateFunctions::HistogramFinalize<BigIntVal>(
1547  FunctionContext*, const StringVal&);
1548 template StringVal AggregateFunctions::HistogramFinalize<FloatVal>(
1549  FunctionContext*, const StringVal&);
1550 template StringVal AggregateFunctions::HistogramFinalize<DoubleVal>(
1551  FunctionContext*, const StringVal&);
1552 template StringVal AggregateFunctions::HistogramFinalize<StringVal>(
1553  FunctionContext*, const StringVal&);
1554 template StringVal AggregateFunctions::HistogramFinalize<TimestampVal>(
1555  FunctionContext*, const StringVal&);
1556 template StringVal AggregateFunctions::HistogramFinalize<DecimalVal>(
1557  FunctionContext*, const StringVal&);
1558 
1559 template BooleanVal AggregateFunctions::AppxMedianFinalize<BooleanVal>(
1560  FunctionContext*, const StringVal&);
1561 template TinyIntVal AggregateFunctions::AppxMedianFinalize<TinyIntVal>(
1562  FunctionContext*, const StringVal&);
1563 template SmallIntVal AggregateFunctions::AppxMedianFinalize<SmallIntVal>(
1564  FunctionContext*, const StringVal&);
1565 template IntVal AggregateFunctions::AppxMedianFinalize<IntVal>(
1566  FunctionContext*, const StringVal&);
1567 template BigIntVal AggregateFunctions::AppxMedianFinalize<BigIntVal>(
1568  FunctionContext*, const StringVal&);
1569 template FloatVal AggregateFunctions::AppxMedianFinalize<FloatVal>(
1570  FunctionContext*, const StringVal&);
1571 template DoubleVal AggregateFunctions::AppxMedianFinalize<DoubleVal>(
1572  FunctionContext*, const StringVal&);
1573 template StringVal AggregateFunctions::AppxMedianFinalize<StringVal>(
1574  FunctionContext*, const StringVal&);
1575 template TimestampVal AggregateFunctions::AppxMedianFinalize<TimestampVal>(
1576  FunctionContext*, const StringVal&);
1577 template DecimalVal AggregateFunctions::AppxMedianFinalize<DecimalVal>(
1578  FunctionContext*, const StringVal&);
1579 
1580 template void AggregateFunctions::HllUpdate(
1581  FunctionContext*, const BooleanVal&, StringVal*);
1582 template void AggregateFunctions::HllUpdate(
1583  FunctionContext*, const TinyIntVal&, StringVal*);
1584 template void AggregateFunctions::HllUpdate(
1586 template void AggregateFunctions::HllUpdate(
1587  FunctionContext*, const IntVal&, StringVal*);
1588 template void AggregateFunctions::HllUpdate(
1589  FunctionContext*, const BigIntVal&, StringVal*);
1590 template void AggregateFunctions::HllUpdate(
1591  FunctionContext*, const FloatVal&, StringVal*);
1592 template void AggregateFunctions::HllUpdate(
1593  FunctionContext*, const DoubleVal&, StringVal*);
1594 template void AggregateFunctions::HllUpdate(
1595  FunctionContext*, const StringVal&, StringVal*);
1596 template void AggregateFunctions::HllUpdate(
1598 template void AggregateFunctions::HllUpdate(
1599  FunctionContext*, const DecimalVal&, StringVal*);
1600 
1602  FunctionContext*, const TinyIntVal&, StringVal*);
1606  FunctionContext*, const IntVal&, StringVal*);
1608  FunctionContext*, const BigIntVal&, StringVal*);
1610  FunctionContext*, const FloatVal&, StringVal*);
1612  FunctionContext*, const DoubleVal&, StringVal*);
1613 
1614 template void AggregateFunctions::LastValUpdate<BooleanVal>(
1615  FunctionContext*, const BooleanVal& src, BooleanVal* dst);
1616 template void AggregateFunctions::LastValUpdate<TinyIntVal>(
1617  FunctionContext*, const TinyIntVal& src, TinyIntVal* dst);
1618 template void AggregateFunctions::LastValUpdate<SmallIntVal>(
1619  FunctionContext*, const SmallIntVal& src, SmallIntVal* dst);
1620 template void AggregateFunctions::LastValUpdate<IntVal>(
1621  FunctionContext*, const IntVal& src, IntVal* dst);
1622 template void AggregateFunctions::LastValUpdate<BigIntVal>(
1623  FunctionContext*, const BigIntVal& src, BigIntVal* dst);
1624 template void AggregateFunctions::LastValUpdate<FloatVal>(
1625  FunctionContext*, const FloatVal& src, FloatVal* dst);
1626 template void AggregateFunctions::LastValUpdate<DoubleVal>(
1627  FunctionContext*, const DoubleVal& src, DoubleVal* dst);
1628 template void AggregateFunctions::LastValUpdate<StringVal>(
1629  FunctionContext*, const StringVal& src, StringVal* dst);
1630 template void AggregateFunctions::LastValUpdate<TimestampVal>(
1631  FunctionContext*, const TimestampVal& src, TimestampVal* dst);
1632 template void AggregateFunctions::LastValUpdate<DecimalVal>(
1633  FunctionContext*, const DecimalVal& src, DecimalVal* dst);
1634 
1635 template void AggregateFunctions::LastValRemove<BooleanVal>(
1636  FunctionContext*, const BooleanVal& src, BooleanVal* dst);
1637 template void AggregateFunctions::LastValRemove<TinyIntVal>(
1638  FunctionContext*, const TinyIntVal& src, TinyIntVal* dst);
1639 template void AggregateFunctions::LastValRemove<SmallIntVal>(
1640  FunctionContext*, const SmallIntVal& src, SmallIntVal* dst);
1641 template void AggregateFunctions::LastValRemove<IntVal>(
1642  FunctionContext*, const IntVal& src, IntVal* dst);
1643 template void AggregateFunctions::LastValRemove<BigIntVal>(
1644  FunctionContext*, const BigIntVal& src, BigIntVal* dst);
1645 template void AggregateFunctions::LastValRemove<FloatVal>(
1646  FunctionContext*, const FloatVal& src, FloatVal* dst);
1647 template void AggregateFunctions::LastValRemove<DoubleVal>(
1648  FunctionContext*, const DoubleVal& src, DoubleVal* dst);
1649 template void AggregateFunctions::LastValRemove<StringVal>(
1650  FunctionContext*, const StringVal& src, StringVal* dst);
1651 template void AggregateFunctions::LastValRemove<TimestampVal>(
1652  FunctionContext*, const TimestampVal& src, TimestampVal* dst);
1653 template void AggregateFunctions::LastValRemove<DecimalVal>(
1654  FunctionContext*, const DecimalVal& src, DecimalVal* dst);
1655 
1656 template void AggregateFunctions::FirstValUpdate<BooleanVal>(
1657  FunctionContext*, const BooleanVal& src, BooleanVal* dst);
1658 template void AggregateFunctions::FirstValUpdate<TinyIntVal>(
1659  FunctionContext*, const TinyIntVal& src, TinyIntVal* dst);
1660 template void AggregateFunctions::FirstValUpdate<SmallIntVal>(
1661  FunctionContext*, const SmallIntVal& src, SmallIntVal* dst);
1662 template void AggregateFunctions::FirstValUpdate<IntVal>(
1663  FunctionContext*, const IntVal& src, IntVal* dst);
1664 template void AggregateFunctions::FirstValUpdate<BigIntVal>(
1665  FunctionContext*, const BigIntVal& src, BigIntVal* dst);
1666 template void AggregateFunctions::FirstValUpdate<FloatVal>(
1667  FunctionContext*, const FloatVal& src, FloatVal* dst);
1668 template void AggregateFunctions::FirstValUpdate<DoubleVal>(
1669  FunctionContext*, const DoubleVal& src, DoubleVal* dst);
1670 template void AggregateFunctions::FirstValUpdate<StringVal>(
1671  FunctionContext*, const StringVal& src, StringVal* dst);
1672 template void AggregateFunctions::FirstValUpdate<TimestampVal>(
1673  FunctionContext*, const TimestampVal& src, TimestampVal* dst);
1674 template void AggregateFunctions::FirstValUpdate<DecimalVal>(
1675  FunctionContext*, const DecimalVal& src, DecimalVal* dst);
1676 
1677 template void AggregateFunctions::FirstValRewriteUpdate<BooleanVal>(
1678  FunctionContext*, const BooleanVal& src, const BigIntVal&, BooleanVal* dst);
1679 template void AggregateFunctions::FirstValRewriteUpdate<TinyIntVal>(
1680  FunctionContext*, const TinyIntVal& src, const BigIntVal&, TinyIntVal* dst);
1681 template void AggregateFunctions::FirstValRewriteUpdate<SmallIntVal>(
1682  FunctionContext*, const SmallIntVal& src, const BigIntVal&, SmallIntVal* dst);
1683 template void AggregateFunctions::FirstValRewriteUpdate<IntVal>(
1684  FunctionContext*, const IntVal& src, const BigIntVal&, IntVal* dst);
1685 template void AggregateFunctions::FirstValRewriteUpdate<BigIntVal>(
1686  FunctionContext*, const BigIntVal& src, const BigIntVal&, BigIntVal* dst);
1687 template void AggregateFunctions::FirstValRewriteUpdate<FloatVal>(
1688  FunctionContext*, const FloatVal& src, const BigIntVal&, FloatVal* dst);
1689 template void AggregateFunctions::FirstValRewriteUpdate<DoubleVal>(
1690  FunctionContext*, const DoubleVal& src, const BigIntVal&, DoubleVal* dst);
1691 template void AggregateFunctions::FirstValRewriteUpdate<StringVal>(
1692  FunctionContext*, const StringVal& src, const BigIntVal&, StringVal* dst);
1693 template void AggregateFunctions::FirstValRewriteUpdate<TimestampVal>(
1694  FunctionContext*, const TimestampVal& src, const BigIntVal&, TimestampVal* dst);
1695 template void AggregateFunctions::FirstValRewriteUpdate<DecimalVal>(
1696  FunctionContext*, const DecimalVal& src, const BigIntVal&, DecimalVal* dst);
1697 
1698 template void AggregateFunctions::OffsetFnInit<BooleanVal>(
1700 template void AggregateFunctions::OffsetFnInit<TinyIntVal>(
1702 template void AggregateFunctions::OffsetFnInit<SmallIntVal>(
1704 template void AggregateFunctions::OffsetFnInit<IntVal>(
1705  FunctionContext*, IntVal*);
1706 template void AggregateFunctions::OffsetFnInit<BigIntVal>(
1708 template void AggregateFunctions::OffsetFnInit<FloatVal>(
1710 template void AggregateFunctions::OffsetFnInit<DoubleVal>(
1712 template void AggregateFunctions::OffsetFnInit<StringVal>(
1714 template void AggregateFunctions::OffsetFnInit<TimestampVal>(
1716 template void AggregateFunctions::OffsetFnInit<DecimalVal>(
1718 
1719 template void AggregateFunctions::OffsetFnUpdate<BooleanVal>(
1720  FunctionContext*, const BooleanVal& src, const BigIntVal&, const BooleanVal&,
1721  BooleanVal* dst);
1722 template void AggregateFunctions::OffsetFnUpdate<TinyIntVal>(
1723  FunctionContext*, const TinyIntVal& src, const BigIntVal&, const TinyIntVal&,
1724  TinyIntVal* dst);
1725 template void AggregateFunctions::OffsetFnUpdate<SmallIntVal>(
1726  FunctionContext*, const SmallIntVal& src, const BigIntVal&, const SmallIntVal&,
1727  SmallIntVal* dst);
1728 template void AggregateFunctions::OffsetFnUpdate<IntVal>(
1729  FunctionContext*, const IntVal& src, const BigIntVal&, const IntVal&, IntVal* dst);
1730 template void AggregateFunctions::OffsetFnUpdate<BigIntVal>(
1731  FunctionContext*, const BigIntVal& src, const BigIntVal&, const BigIntVal&,
1732  BigIntVal* dst);
1733 template void AggregateFunctions::OffsetFnUpdate<FloatVal>(
1734  FunctionContext*, const FloatVal& src, const BigIntVal&, const FloatVal&,
1735  FloatVal* dst);
1736 template void AggregateFunctions::OffsetFnUpdate<DoubleVal>(
1737  FunctionContext*, const DoubleVal& src, const BigIntVal&, const DoubleVal&,
1738  DoubleVal* dst);
1739 template void AggregateFunctions::OffsetFnUpdate<StringVal>(
1740  FunctionContext*, const StringVal& src, const BigIntVal&, const StringVal&,
1741  StringVal* dst);
1742 template void AggregateFunctions::OffsetFnUpdate<TimestampVal>(
1743  FunctionContext*, const TimestampVal& src, const BigIntVal&, const TimestampVal&,
1744  TimestampVal* dst);
1745 template void AggregateFunctions::OffsetFnUpdate<DecimalVal>(
1746  FunctionContext*, const DecimalVal& src, const BigIntVal&, const DecimalVal&,
1747  DecimalVal* dst);
1748 }
static void SumDecimalAddOrSubtract(FunctionContext *, const DecimalVal &src, DecimalVal *dst, bool subtract=false)
Adds or or subtracts src from dst. Implements Update() and Remove().
static void ReservoirSampleUpdate(FunctionContext *, const T &src, StringVal *dst)
const T & value() const
ReservoirSample(const StringVal &string_val)
int precision
Only valid if type == TYPE_DECIMAL.
Definition: udf.h:75
static void OffsetFnUpdate(FunctionContext *, const T &src, const BigIntVal &, const T &, T *dst)
static void SumDecimalRemove(FunctionContext *, const DecimalVal &src, DecimalVal *dst)
static DecimalVal DecimalAvgGetValue(FunctionContext *ctx, const StringVal &val)
int64_t time_of_day
Nanoseconds in current day.
Definition: udf.h:499
static void RankUpdate(FunctionContext *, StringVal *dst)
Update state for RANK.
static const int NUM_SAMPLES_PER_BUCKET
static void PcInit(FunctionContext *, StringVal *slot)
static const int NUM_BUCKETS
string DistinctEstimateBitMapToString(uint8_t *v)
static void CountStarRemove(FunctionContext *, BigIntVal *dst)
static void DecimalAvgUpdate(FunctionContext *ctx, const DecimalVal &src, StringVal *dst)
impala::FunctionContextImpl * impl()
TODO: Add mechanism for UDAs to update stats similar to runtime profile counters. ...
Definition: udf.h:202
static uint32_t Hash(const BooleanVal &v, const FunctionContext::TypeDesc &, int seed)
Definition: anyval-util.h:34
static void InitZero(FunctionContext *, T *dst)
Initializes dst to 0.
const TypeDesc & GetReturnType() const
Definition: udf-ir.cc:34
static void InitNull(FunctionContext *, AnyVal *dst)
Initializes dst to NULL.
static void FirstValUpdate(FunctionContext *, const T &src, T *dst)
Implements FIRST_VALUE.
static void DecimalAvgAddOrRemove(FunctionContext *ctx, const DecimalVal &src, StringVal *dst, bool remove=false)
static ColumnType TypeDescToColumnType(const FunctionContext::TypeDesc &type)
Definition: anyval-util.cc:101
static void Max(FunctionContext *, const T &src, T *dst)
MaxUpdate/MaxMerge.
__int128_t val16
Definition: udf.h:572
static void RankInit(FunctionContext *, StringVal *slot)
Initializes the state for RANK and DENSE_RANK.
static void CountStarUpdate(FunctionContext *, BigIntVal *dst)
static void HllInit(FunctionContext *, StringVal *slot)
static DoubleVal KnuthVarFinalize(FunctionContext *context, const StringVal &val)
static void ReservoirSampleMerge(FunctionContext *, const StringVal &src, StringVal *dst)
ReservoirSample< T > samples[NUM_SAMPLES]
static BigIntVal PcFinalize(FunctionContext *, const StringVal &src)
static void HllUpdate(FunctionContext *, const T &src, StringVal *dst)
static T AppxMedianFinalize(FunctionContext *, const StringVal &src)
Returns an approximate median using reservoir sampling.
StringVal ToStringVal(FunctionContext *context, T val)
int32_t date
Gregorian date. This has the same binary format as boost::gregorian::date.
Definition: udf.h:497
static const int MAX_STRING_SAMPLE_LEN
static const StringVal DEFAULT_STRING_CONCAT_DELIM((uint8_t *)", ", 2)
StringVal GetValue(FunctionContext *ctx)
static const int PC_BITMAP_LENGTH
This object has a compatible storage format with boost::ptime.
Definition: udf.h:495
static void LastValRemove(FunctionContext *, const T &src, T *dst)
int64_t num_updates() const
Definition: udf-internal.h:86
static void OffsetFnInit(FunctionContext *, T *dst)
uint8_t * ptr
Definition: udf.h:523
static void CountUpdate(FunctionContext *, const AnyVal &src, BigIntVal *dst)
Implementation of Count and Count(*)
std::size_t hash_value(const Decimal4Value &v)
This function must be called 'hash_value' to be picked up by boost.
void PrintSample(const ReservoirSample< T > &v, ostream *os)
static void StringConcatMerge(FunctionContext *, const StringVal &src, StringVal *result)
void ToTimestampVal(impala_udf::TimestampVal *tv) const
uint8_t val[MAX_STRING_SAMPLE_LEN]
static void SumDecimalMerge(FunctionContext *, const DecimalVal &src, DecimalVal *dst)
static BigIntVal HllFinalize(FunctionContext *, const StringVal &src)
bool AddWarning(const char *warning_msg)
Definition: udf.cc:345
bool is_null
Definition: udf.h:359
static StringVal ReservoirSampleFinalize(FunctionContext *, const StringVal &src)
Returns 20,000 unsorted samples as a list of comma-separated values.
static const StringVal ReservoirSampleSerialize(FunctionContext *, const StringVal &src)
static void AvgInit(FunctionContext *ctx, StringVal *dst)
std::string DebugString() const
Definition: types.cc:194
static void SumRemove(FunctionContext *, const SRC_VAL &src, DST_VAL *dst)
static void AvgUpdate(FunctionContext *ctx, const T &src, StringVal *dst)
void IncrementNumUpdates(int64_t n=1)
Definition: udf-internal.h:90
const TypeDesc * GetArgType(int arg_idx) const
Definition: udf.cc:425
static void SumDecimalUpdate(FunctionContext *, const DecimalVal &src, DecimalVal *dst)
Sum for decimals.
static void SetDistinctEstimateBit(uint8_t *bitmap, uint32_t row_index, uint32_t bit_index)
static void HllMerge(FunctionContext *, const StringVal &src, StringVal *dst)
static void DenseRankUpdate(FunctionContext *, StringVal *dst)
Update state for DENSE_RANK.
static void AvgRemove(FunctionContext *ctx, const T &src, StringVal *dst)
static void KnuthVarUpdate(FunctionContext *context, const T &input, StringVal *val)
static DoubleVal KnuthVarPopFinalize(FunctionContext *context, const StringVal &val)
Calculates the biased variance, uses KnuthVar Init-Update-Merge functions.
static const uint64_t FNV64_SEED
Definition: hash-util.h:101
static void TimestampAvgUpdate(FunctionContext *ctx, const TimestampVal &src, StringVal *dst)
Avg for timestamp. Uses AvgInit() and AvgMerge().
int GetByteSize() const
Returns the byte size of this type. Returns 0 for variable length types.
Definition: types.h:178
void Free(uint8_t *buffer)
Frees a buffer returned from Allocate() or Reallocate()
Definition: udf.cc:291
static void Min(FunctionContext *, const T &src, T *dst)
MinUpdate/MinMerge.
static void CountMerge(FunctionContext *, const BigIntVal &src, BigIntVal *dst)
static void KnuthVarInit(FunctionContext *context, StringVal *val)
static void PcMerge(FunctionContext *, const StringVal &src, StringVal *dst)
double ComputeKnuthVariance(const KnuthVarianceState &state, bool pop)
static bool GetDistinctEstimateBit(uint8_t *bitmap, uint32_t row_index, uint32_t bit_index)
static StringVal StringConcatFinalize(FunctionContext *, const StringVal &src)
#define VLOG_ROW
Definition: logging.h:59
static void DecimalAvgMerge(FunctionContext *ctx, const StringVal &src, StringVal *dst)
DecimalValue< RESULT_T > Divide(const ColumnType &this_type, const DecimalValue &other, const ColumnType &other_type, int result_scale, bool *is_nan, bool *overflow) const
is_nan is set to true if 'other' is 0. The value returned is undefined.
static void PcsaUpdate(FunctionContext *, const T &src, StringVal *dst)
static void DecimalAvgInit(FunctionContext *ctx, StringVal *dst)
Avg for decimals.
bool IsArgConstant(int arg_idx) const
Definition: udf-ir.cc:20
int64_t num_removes() const
Definition: udf-internal.h:87
static uint64_t Hash64(const BooleanVal &v, const FunctionContext::TypeDesc &, int64_t seed)
Definition: anyval-util.h:85
static StringVal StringValSerializeOrFinalize(FunctionContext *ctx, const StringVal &src)
StringVal Serialize/Finalize function that copies and frees src.
uint64_t count
static DoubleVal KnuthStddevPopFinalize(FunctionContext *context, const StringVal &val)
Calculates the biased STDDEV, uses KnuthVar Init-Update-Merge functions.
static void LastValUpdate(FunctionContext *, const T &src, T *dst)
Implements LAST_VALUE.
T GetValue(FunctionContext *ctx)
int GetNumArgs() const
Definition: udf-ir.cc:30
static TimestampValue FromTimestampVal(const impala_udf::TimestampVal &udf_value)
uint8_t * Reallocate(uint8_t *ptr, int byte_size)
Definition: udf.cc:276
double ToSubsecondUnixTime() const
static void SumUpdate(FunctionContext *, const SRC_VAL &src, DST_VAL *dst)
SumUpdate, SumMerge.
uint8_t * Allocate(int byte_size)
Definition: udf.cc:262
static void FirstValRewriteUpdate(FunctionContext *, const T &src, const BigIntVal &, T *dst)
#define UNLIKELY(expr)
Definition: compiler-util.h:33
static TimestampVal TimestampAvgFinalize(FunctionContext *ctx, const StringVal &val)
static StringValue FromStringVal(const impala_udf::StringVal &sv)
Definition: string-value.h:103
static void ReservoirSampleInit(FunctionContext *, StringVal *slot)
static DoubleVal AvgGetValue(FunctionContext *ctx, const StringVal &val)
static BigIntVal PcsaFinalize(FunctionContext *, const StringVal &src)
static void InitNullString(FunctionContext *c, StringVal *dst)
Initializes dst to NULL and sets dst->ptr to NULL.
static BigIntVal RankGetValue(FunctionContext *, StringVal &src)
Returns the result for RANK and prepares the state for the next Update().
static DoubleVal AvgFinalize(FunctionContext *ctx, const StringVal &val)
bool SampleKeyGreater(const ReservoirSample< T > &i, const ReservoirSample< T > &j)
bool SampleValLess(const ReservoirSample< T > &i, const ReservoirSample< T > &j)
static void TimestampAvgRemove(FunctionContext *ctx, const TimestampVal &src, StringVal *dst)
static BigIntVal RankFinalize(FunctionContext *, StringVal &src)
Returns the result for RANK and DENSE_RANK and cleans up intermediate state in src.
static void PcUpdate(FunctionContext *, const T &src, StringVal *dst)
static TimestampVal TimestampAvgGetValue(FunctionContext *ctx, const StringVal &val)
static ColumnType CreateDecimalType(int precision, int scale)
Definition: types.h:103
static void KnuthVarMerge(FunctionContext *context, const StringVal &src, StringVal *dst)
static const int NUM_PC_BITMAPS
static BigIntVal DenseRankGetValue(FunctionContext *, StringVal &src)
AnyVal * GetConstantArg(int arg_idx) const
Definition: udf-ir.cc:25
static StringVal HistogramFinalize(FunctionContext *, const StringVal &src)
static const int NUM_SAMPLES
static void CountRemove(FunctionContext *, const AnyVal &src, BigIntVal *dst)
static void DecimalAvgRemove(FunctionContext *ctx, const DecimalVal &src, StringVal *dst)
static StringVal StringValGetValue(FunctionContext *ctx, const StringVal &src)
StringVal GetValue() function that returns a copy of src.
static DoubleVal KnuthStddevFinalize(FunctionContext *context, const StringVal &val)
Calculates STDDEV, uses KnuthVar Init-Update-Merge functions.
void IncrementNumRemoves(int64_t n=1)
Definition: udf-internal.h:91
double DistinceEstimateFinalize(const StringVal &src)
static void AvgMerge(FunctionContext *ctx, const StringVal &src, StringVal *dst)
static uint64_t HllFinalEstimate(const uint8_t *buckets, int32_t num_buckets)
static const float PC_THETA
static DecimalVal DecimalAvgFinalize(FunctionContext *ctx, const StringVal &val)
static void StringConcatUpdate(FunctionContext *, const StringVal &src, StringVal *result)
String concat.
std::string DebugString() const
__int128_t int128_t
We use the c++ int128_t type. This is stored using 16 bytes and very performant.