Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
agg-fn-evaluator.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exprs/agg-fn-evaluator.h"
16 
17 #include <sstream>
18 
19 #include "codegen/llvm-codegen.h"
20 #include "common/logging.h"
21 #include "exec/aggregation-node.h"
23 #include "exprs/expr-context.h"
24 #include "exprs/anyval-util.h"
25 #include "runtime/lib-cache.h"
26 #include "runtime/runtime-state.h"
27 #include "udf/udf-internal.h"
28 #include "util/debug-util.h"
29 
30 #include <thrift/protocol/TDebugProtocol.h>
31 
32 #include "common/names.h"
33 
34 using namespace impala;
35 using namespace impala_udf;
36 using namespace llvm;
37 
38 // typedef for builtin aggregate functions. Unfortunately, these type defs don't
39 // really work since the actual builtin is implemented not in terms of the base
40 // AnyVal* type. Due to this, there are lots of casts when we use these typedefs.
41 // TODO: these typedefs exists as wrappers to go from (TupleRow, Tuple) to the
42 // types the aggregation functions need. This needs to be done with codegen instead.
43 typedef void (*InitFn)(FunctionContext*, AnyVal*);
44 typedef void (*UpdateFn0)(FunctionContext*, AnyVal*);
45 typedef void (*UpdateFn1)(FunctionContext*, const AnyVal&, AnyVal*);
46 typedef void (*UpdateFn2)(FunctionContext*, const AnyVal&, const AnyVal&, AnyVal*);
47 typedef void (*UpdateFn3)(FunctionContext*, const AnyVal&, const AnyVal&,
48  const AnyVal&, AnyVal*);
49 typedef void (*UpdateFn4)(FunctionContext*, const AnyVal&, const AnyVal&,
50  const AnyVal&, const AnyVal&, AnyVal*);
51 typedef void (*UpdateFn5)(FunctionContext*, const AnyVal&, const AnyVal&,
52  const AnyVal&, const AnyVal&, const AnyVal&, AnyVal*);
53 typedef void (*UpdateFn6)(FunctionContext*, const AnyVal&, const AnyVal&,
54  const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, AnyVal*);
55 typedef void (*UpdateFn7)(FunctionContext*, const AnyVal&, const AnyVal&,
56  const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, AnyVal*);
57 typedef void (*UpdateFn8)(FunctionContext*, const AnyVal&, const AnyVal&,
58  const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&,
59  const AnyVal&, AnyVal*);
61 typedef AnyVal (*GetValueFn)(FunctionContext*, const AnyVal&);
62 typedef AnyVal (*FinalizeFn)(FunctionContext*, const AnyVal&);
63 
65  AggFnEvaluator** result) {
66  return Create(pool, desc, false, result);
67 }
68 
70  bool is_analytic_fn, AggFnEvaluator** result) {
71  DCHECK_GT(desc.nodes.size(), 0);
72  *result = pool->Add(new AggFnEvaluator(desc.nodes[0], is_analytic_fn));
73  int node_idx = 0;
74  for (int i = 0; i < desc.nodes[0].num_children; ++i) {
75  ++node_idx;
76  Expr* expr = NULL;
77  ExprContext* ctx = NULL;
79  pool, desc.nodes, NULL, &node_idx, &expr, &ctx));
80  (*result)->input_expr_ctxs_.push_back(ctx);
81  }
82  return Status::OK;
83 }
84 
85 AggFnEvaluator::AggFnEvaluator(const TExprNode& desc, bool is_analytic_fn)
86  : fn_(desc.fn),
87  is_merge_(desc.agg_expr.is_merge_agg),
88  is_analytic_fn_(is_analytic_fn),
89  intermediate_slot_desc_(NULL),
90  output_slot_desc_(NULL),
91  cache_entry_(NULL),
92  init_fn_(NULL),
93  update_fn_(NULL),
94  remove_fn_(NULL),
95  merge_fn_(NULL),
96  serialize_fn_(NULL),
97  get_value_fn_(NULL),
98  finalize_fn_(NULL) {
99  DCHECK(desc.fn.__isset.aggregate_fn);
100  DCHECK(desc.node_type == TExprNodeType::AGGREGATE_EXPR);
101  // TODO: remove. See comment with AggregationOp
102  if (fn_.name.function_name == "count") {
103  agg_op_ = COUNT;
104  } else if (fn_.name.function_name == "min") {
105  agg_op_ = MIN;
106  } else if (fn_.name.function_name == "max") {
107  agg_op_ = MAX;
108  } else if (fn_.name.function_name == "sum") {
109  agg_op_ = SUM;
110  } else if (fn_.name.function_name == "avg") {
111  agg_op_ = AVG;
112  } else if (fn_.name.function_name == "ndv" ||
113  fn_.name.function_name == "ndv_no_finalize") {
114  agg_op_ = NDV;
115  } else {
116  agg_op_ = OTHER;
117  }
118 }
119 
121  DCHECK(cache_entry_ == NULL) << "Need to call Close()";
122 }
123 
125  const SlotDescriptor* intermediate_slot_desc,
126  const SlotDescriptor* output_slot_desc,
127  MemPool* agg_fn_pool, FunctionContext** agg_fn_ctx) {
128  DCHECK(intermediate_slot_desc != NULL);
129  DCHECK_EQ(intermediate_slot_desc->type().type,
130  ColumnType(fn_.aggregate_fn.intermediate_type).type);
131  DCHECK(intermediate_slot_desc_ == NULL);
132  intermediate_slot_desc_ = intermediate_slot_desc;
133 
134  DCHECK(output_slot_desc != NULL);
135  DCHECK_EQ(output_slot_desc->type().type, ColumnType(fn_.ret_type).type);
136  DCHECK(output_slot_desc_ == NULL);
137  output_slot_desc_ = output_slot_desc;
138 
140  Expr::Prepare(input_expr_ctxs_, state, desc, agg_fn_pool->mem_tracker()));
141 
142  ObjectPool* obj_pool = state->obj_pool();
143  for (int i = 0; i < input_expr_ctxs_.size(); ++i) {
144  staging_input_vals_.push_back(
145  CreateAnyVal(obj_pool, input_expr_ctxs_[i]->root()->type()));
146  }
149 
150  if (is_merge_) {
151  DCHECK_EQ(staging_input_vals_.size(), 1) << "Merge should only have 1 input.";
152  }
153 
154  // Load the function pointers. Merge is not required if this is evaluating an
155  // analytic function.
156  if (fn_.aggregate_fn.init_fn_symbol.empty() ||
157  fn_.aggregate_fn.update_fn_symbol.empty() ||
158  (!is_analytic_fn_ && fn_.aggregate_fn.merge_fn_symbol.empty())) {
159  // This path is only for partially implemented builtins.
160  DCHECK_EQ(fn_.binary_type, TFunctionBinaryType::BUILTIN);
161  stringstream ss;
162  ss << "Function " << fn_.name.function_name << " is not implemented.";
163  return Status(ss.str());
164  }
165 
166  RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(
167  fn_.hdfs_location, fn_.aggregate_fn.init_fn_symbol, &init_fn_, &cache_entry_));
168  RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(
169  fn_.hdfs_location, fn_.aggregate_fn.update_fn_symbol, &update_fn_, &cache_entry_));
170 
171  // Merge() is not loaded if evaluating the agg fn as an analytic function.
172  if (!is_analytic_fn_) {
173  RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
174  fn_.aggregate_fn.merge_fn_symbol, &merge_fn_, &cache_entry_));
175  }
176 
177  // Serialize(), GetValue(), Remove() and Finalize() are optional
178  if (!fn_.aggregate_fn.serialize_fn_symbol.empty()) {
179  RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(
180  fn_.hdfs_location, fn_.aggregate_fn.serialize_fn_symbol, &serialize_fn_,
181  &cache_entry_));
182  }
183  if (!fn_.aggregate_fn.get_value_fn_symbol.empty()) {
184  RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(
185  fn_.hdfs_location, fn_.aggregate_fn.get_value_fn_symbol, &get_value_fn_,
186  &cache_entry_));
187  }
188  if (!fn_.aggregate_fn.remove_fn_symbol.empty()) {
189  RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(
190  fn_.hdfs_location, fn_.aggregate_fn.remove_fn_symbol, &remove_fn_,
191  &cache_entry_));
192  }
193  if (!fn_.aggregate_fn.finalize_fn_symbol.empty()) {
194  RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(
195  fn_.hdfs_location, fn_.aggregate_fn.finalize_fn_symbol, &finalize_fn_,
196  &cache_entry_));
197  }
198 
199  vector<FunctionContext::TypeDesc> arg_types;
200  for (int i = 0; i < input_expr_ctxs_.size(); ++i) {
201  arg_types.push_back(
203  }
204 
207  FunctionContext::TypeDesc output_type =
210  state, agg_fn_pool, intermediate_type, output_type, arg_types);
211  return Status::OK;
212 }
213 
216  // Now that we have opened all our input exprs, it is safe to evaluate any constant
217  // values for the UDA's FunctionContext (we cannot evaluate exprs before calling Open()
218  // on them).
219  vector<AnyVal*> constant_args(input_expr_ctxs_.size());
220  for (int i = 0; i < input_expr_ctxs_.size(); ++i) {
221  constant_args[i] = input_expr_ctxs_[i]->root()->GetConstVal(input_expr_ctxs_[i]);
222  }
223  agg_fn_ctx->impl()->SetConstantArgs(constant_args);
224  return Status::OK;
225 }
226 
229 
230  if (cache_entry_ != NULL) {
232  cache_entry_ = NULL;
233  }
234 }
235 
236 inline void AggFnEvaluator::SetDstSlot(FunctionContext* ctx, const AnyVal* src,
237  const SlotDescriptor* dst_slot_desc, Tuple* dst) {
238  if (src->is_null) {
239  dst->SetNull(dst_slot_desc->null_indicator_offset());
240  return;
241  }
242 
243  dst->SetNotNull(dst_slot_desc->null_indicator_offset());
244  void* slot = dst->GetSlot(dst_slot_desc->tuple_offset());
245  switch (dst_slot_desc->type().type) {
246  case TYPE_NULL:
247  return;
248  case TYPE_BOOLEAN:
249  *reinterpret_cast<bool*>(slot) = reinterpret_cast<const BooleanVal*>(src)->val;
250  return;
251  case TYPE_TINYINT:
252  *reinterpret_cast<int8_t*>(slot) = reinterpret_cast<const TinyIntVal*>(src)->val;
253  return;
254  case TYPE_SMALLINT:
255  *reinterpret_cast<int16_t*>(slot) = reinterpret_cast<const SmallIntVal*>(src)->val;
256  return;
257  case TYPE_INT:
258  *reinterpret_cast<int32_t*>(slot) = reinterpret_cast<const IntVal*>(src)->val;
259  return;
260  case TYPE_BIGINT:
261  *reinterpret_cast<int64_t*>(slot) = reinterpret_cast<const BigIntVal*>(src)->val;
262  return;
263  case TYPE_FLOAT:
264  *reinterpret_cast<float*>(slot) = reinterpret_cast<const FloatVal*>(src)->val;
265  return;
266  case TYPE_DOUBLE:
267  *reinterpret_cast<double*>(slot) = reinterpret_cast<const DoubleVal*>(src)->val;
268  return;
269  case TYPE_STRING:
270  case TYPE_VARCHAR:
271  *reinterpret_cast<StringValue*>(slot) =
272  StringValue::FromStringVal(*reinterpret_cast<const StringVal*>(src));
273  return;
274  case TYPE_CHAR:
275  if (slot != reinterpret_cast<const StringVal*>(src)->ptr) {
276  ctx->SetError("UDA should not set pointer of CHAR(N) intermediate");
277  }
278  return;
279  case TYPE_TIMESTAMP:
280  *reinterpret_cast<TimestampValue*>(slot) = TimestampValue::FromTimestampVal(
281  *reinterpret_cast<const TimestampVal*>(src));
282  return;
283  case TYPE_DECIMAL:
284  switch (dst_slot_desc->type().GetByteSize()) {
285  case 4:
286  *reinterpret_cast<int32_t*>(slot) =
287  reinterpret_cast<const DecimalVal*>(src)->val4;
288  return;
289  case 8:
290  *reinterpret_cast<int64_t*>(slot) =
291  reinterpret_cast<const DecimalVal*>(src)->val8;
292  return;
293  case 16:
294 #if __BYTE_ORDER == __LITTLE_ENDIAN
295  // On little endian, &val4, &val8, &val16 are the same address.
296  // This code seems to trip up clang causing it to generate code that crashes.
297  // Be careful when modifying this. See IMPALA-959 for more details.
298  // I suspect an issue with xmm registers not reading from aligned memory.
299  memcpy(slot, &reinterpret_cast<const DecimalVal*>(src)->val4,
300  dst_slot_desc->type().GetByteSize());
301 #else
302  DCHECK(false) << "Not implemented.";
303 #endif
304  return;
305  default:
306  break;
307  }
308  default:
309  DCHECK(false) << "NYI: " << dst_slot_desc->type();
310  }
311 }
312 
313 // This function would be replaced in codegen.
314 void AggFnEvaluator::Init(FunctionContext* agg_fn_ctx, Tuple* dst) {
315  DCHECK(init_fn_ != NULL);
316  if (intermediate_type().type == TYPE_CHAR) {
317  // For type char, we want to initialize the staging_intermediate_val_ with
318  // a pointer into the tuple (the UDA should not be allocating it).
319  void* slot = dst->GetSlot(intermediate_slot_desc_->tuple_offset());
320  StringVal* sv = reinterpret_cast<StringVal*>(staging_intermediate_val_);
322  sv->ptr = reinterpret_cast<uint8_t*>(
324  sv->len = intermediate_type().len;
325  }
326  reinterpret_cast<InitFn>(init_fn_)(agg_fn_ctx, staging_intermediate_val_);
328  agg_fn_ctx->impl()->set_num_updates(0);
329  agg_fn_ctx->impl()->set_num_removes(0);
330 }
331 
332 static void SetAnyVal(const SlotDescriptor* desc, Tuple* tuple, AnyVal* dst) {
333  bool is_null = tuple->IsNull(desc->null_indicator_offset());
334  void* slot = NULL;
335  if (!is_null) slot = tuple->GetSlot(desc->tuple_offset());
336  AnyValUtil::SetAnyVal(slot, desc->type(), dst);
337 }
338 
340  FunctionContext* agg_fn_ctx, TupleRow* row, Tuple* dst, void* fn) {
341  if (fn == NULL) return;
342 
344 
345  for (int i = 0; i < input_expr_ctxs_.size(); ++i) {
346  void* src_slot = input_expr_ctxs_[i]->GetValue(row);
348  src_slot, input_expr_ctxs_[i]->root()->type(), staging_input_vals_[i]);
349  }
350 
351  // TODO: this part is not so good and not scalable. It can be replaced with
352  // codegen but we can also consider leaving it for the first few cases for
353  // debugging.
354  switch (input_expr_ctxs_.size()) {
355  case 0:
356  reinterpret_cast<UpdateFn0>(fn)(agg_fn_ctx, staging_intermediate_val_);
357  break;
358  case 1:
359  reinterpret_cast<UpdateFn1>(fn)(agg_fn_ctx,
361  break;
362  case 2:
363  reinterpret_cast<UpdateFn2>(fn)(agg_fn_ctx,
365  break;
366  case 3:
367  reinterpret_cast<UpdateFn3>(fn)(agg_fn_ctx,
370  break;
371  case 4:
372  reinterpret_cast<UpdateFn4>(fn)(agg_fn_ctx,
375  break;
376  case 5:
377  reinterpret_cast<UpdateFn5>(fn)(agg_fn_ctx,
381  break;
382  case 6:
383  reinterpret_cast<UpdateFn6>(fn)(agg_fn_ctx,
387  break;
388  case 7:
389  reinterpret_cast<UpdateFn7>(fn)(agg_fn_ctx,
394  break;
395  case 8:
396  reinterpret_cast<UpdateFn8>(fn)(agg_fn_ctx,
402  break;
403  default:
404  DCHECK(false) << "NYI";
405  }
407 }
408 
409 void AggFnEvaluator::Merge(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst) {
410  DCHECK(merge_fn_ != NULL);
411 
414 
415  // The merge fn always takes one input argument.
416  reinterpret_cast<UpdateFn1>(merge_fn_)(agg_fn_ctx,
419 }
420 
422  const SlotDescriptor* dst_slot_desc, Tuple* dst, void* fn) {
423  // No fn was given and the src and dst are identical. Nothing to be done.
424  if (fn == NULL && src == dst) return;
425  // src != dst means we are performing a Finalize(), so even if fn == null we
426  // still must copy the value of the src slot into dst.
427 
428  bool src_slot_null = src->IsNull(intermediate_slot_desc_->null_indicator_offset());
429  void* src_slot = NULL;
430  if (!src_slot_null) src_slot = src->GetSlot(intermediate_slot_desc_->tuple_offset());
431 
432  // No fn was given but the src and dst tuples are different (doing a Finalize()).
433  // Just copy the src slot into the dst tuple.
434  if (fn == NULL) {
435  DCHECK_EQ(intermediate_type(), dst_slot_desc->type());
436  RawValue::Write(src_slot, dst, dst_slot_desc, NULL);
437  return;
438  }
439 
441  switch (dst_slot_desc->type().type) {
442  case TYPE_BOOLEAN: {
443  typedef BooleanVal(*Fn)(FunctionContext*, AnyVal*);
444  BooleanVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_);
445  SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
446  break;
447  }
448  case TYPE_TINYINT: {
449  typedef TinyIntVal(*Fn)(FunctionContext*, AnyVal*);
450  TinyIntVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_);
451  SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
452  break;
453  }
454  case TYPE_SMALLINT: {
455  typedef SmallIntVal(*Fn)(FunctionContext*, AnyVal*);
456  SmallIntVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_);
457  SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
458  break;
459  }
460  case TYPE_INT: {
461  typedef IntVal(*Fn)(FunctionContext*, AnyVal*);
462  IntVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_);
463  SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
464  break;
465  }
466  case TYPE_BIGINT: {
467  typedef BigIntVal(*Fn)(FunctionContext*, AnyVal*);
468  BigIntVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_);
469  SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
470  break;
471  }
472  case TYPE_FLOAT: {
473  typedef FloatVal(*Fn)(FunctionContext*, AnyVal*);
474  FloatVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_);
475  SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
476  break;
477  }
478  case TYPE_DOUBLE: {
479  typedef DoubleVal(*Fn)(FunctionContext*, AnyVal*);
480  DoubleVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_);
481  SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
482  break;
483  }
484  case TYPE_STRING:
485  case TYPE_VARCHAR: {
486  typedef StringVal(*Fn)(FunctionContext*, AnyVal*);
487  StringVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_);
488  SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
489  break;
490  }
491  case TYPE_DECIMAL: {
492  typedef DecimalVal(*Fn)(FunctionContext*, AnyVal*);
493  DecimalVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_);
494  SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
495  break;
496  }
497  case TYPE_TIMESTAMP: {
498  typedef TimestampVal(*Fn)(FunctionContext*, AnyVal*);
499  TimestampVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, staging_intermediate_val_);
500  SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
501  break;
502  }
503  default:
504  DCHECK(false) << "NYI";
505  }
506 }
507 
508 string AggFnEvaluator::DebugString(const vector<AggFnEvaluator*>& exprs) {
509  stringstream out;
510  out << "[";
511  for (int i = 0; i < exprs.size(); ++i) {
512  out << (i == 0 ? "" : " ") << exprs[i]->DebugString();
513  }
514  out << "]";
515  return out.str();
516 }
517 
519  stringstream out;
520  out << "AggFnEvaluator(op=" << agg_op_;
521  for (int i = 0; i < input_expr_ctxs_.size(); ++i) {
522  out << " " << input_expr_ctxs_[i]->root()->DebugString() << ")";
523  }
524  out << ")";
525  return out.str();
526 }
void(* UpdateFn8)(FunctionContext *, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, AnyVal *)
AnyVal * CreateAnyVal(ObjectPool *pool, const ColumnType &type)
Creates the corresponding AnyVal subclass for type. The object is added to the pool.
Definition: anyval-util.cc:26
void Update(FunctionContext *agg_fn_ctx, TupleRow *row, Tuple *dst, void *fn)
Status Prepare(RuntimeState *state, const RowDescriptor &desc, const SlotDescriptor *intermediate_slot_desc, const SlotDescriptor *output_slot_desc, MemPool *agg_fn_pool, FunctionContext **agg_fn_ctx)
static FunctionContext::TypeDesc ColumnTypeToTypeDesc(const ColumnType &type)
Definition: anyval-util.cc:52
static Status CreateTreeFromThrift(ObjectPool *pool, const std::vector< TExprNode > &nodes, Expr *parent, int *node_idx, Expr **root_expr, ExprContext **ctx)
Definition: expr.cc:160
void SetNull(const NullIndicatorOffset &offset)
Definition: tuple.h:101
const ColumnType & intermediate_type() const
impala::FunctionContextImpl * impl()
TODO: Add mechanism for UDAs to update stats similar to runtime profile counters. ...
Definition: udf.h:202
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
void(* UpdateFn4)(FunctionContext *, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, AnyVal *)
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
void SetConstantArgs(const std::vector< impala_udf::AnyVal * > &constant_args)
Sets constant_args_. The AnyVal* values are owned by the caller.
Definition: udf.cc:414
void SerializeOrFinalize(FunctionContext *agg_fn_ctx, Tuple *src, const SlotDescriptor *dst_slot_desc, Tuple *dst, void *fn)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
std::string DebugString() const
impala_udf::AnyVal * staging_intermediate_val_
std::vector< ExprContext * > input_expr_ctxs_
AnyVal(* GetValueFn)(FunctionContext *, const AnyVal &)
void * GetSlot(int offset)
Definition: tuple.h:118
const bool is_merge_
Indicates whether to Update() or Merge()
static Status Create(ObjectPool *pool, const TExpr &desc, AggFnEvaluator **result)
AnyVal(* FinalizeFn)(FunctionContext *, const AnyVal &)
This object has a compatible storage format with boost::ptime.
Definition: udf.h:495
Status Open(RuntimeState *state, FunctionContext *agg_fn_ctx)
uint8_t * ptr
Definition: udf.h:523
const NullIndicatorOffset & null_indicator_offset() const
Definition: descriptors.h:89
impala_udf::AnyVal * staging_merge_input_val_
std::vector< impala_udf::AnyVal * > staging_input_vals_
void Merge(FunctionContext *agg_fn_ctx, Tuple *src, Tuple *dst)
AggFnEvaluator(const TExprNode &desc, bool is_analytic_fn)
Use Create() instead.
void Close(RuntimeState *state)
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
bool IsNull(const NullIndicatorOffset &offset) const
Definition: tuple.h:112
ObjectPool * obj_pool()
Returns a local object pool.
Definition: coordinator.h:263
bool is_null
Definition: udf.h:359
PrimitiveType type
Definition: types.h:60
MemTracker * mem_tracker()
Definition: mem-pool.h:151
StringVal(* SerializeFn)(FunctionContext *, const StringVal &)
const ColumnType & type() const
Definition: descriptors.h:78
const bool is_analytic_fn_
Indicates which functions must be loaded.
ObjectPool * obj_pool() const
Definition: runtime-state.h:92
ObjectPool pool
void Init(FunctionContext *agg_fn_ctx, Tuple *dst)
Functions for different phases of the aggregation.
void set_num_removes(int64_t n)
Definition: udf-internal.h:89
function< void(int64_t, int64_t, AtomicInt< int > *)> Fn
Definition: atomic-test.cc:104
int GetByteSize() const
Returns the byte size of this type. Returns 0 for variable length types.
Definition: types.h:178
const SlotDescriptor * intermediate_slot_desc_
Slot into which Update()/Merge()/Serialize() write their result. Not owned.
void SetDstSlot(FunctionContext *ctx, const impala_udf::AnyVal *src, const SlotDescriptor *dst_slot_desc, Tuple *dst)
Writes the result in src into dst pointed to by dst_slot_desc.
void set_num_updates(int64_t n)
Definition: udf-internal.h:88
int len
Only set if type == TYPE_CHAR or type == TYPE_VARCHAR.
Definition: types.h:62
static void Write(const void *value, Tuple *tuple, const SlotDescriptor *slot_desc, MemPool *pool)
Definition: raw-value.cc:303
This is the superclass of all expr evaluation nodes.
Definition: expr.h:116
static LibCache * instance()
Definition: lib-cache.h:63
AggregationOp agg_op_
The enum for some of the builtins that still require special cased logic.
void(* UpdateFn6)(FunctionContext *, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, AnyVal *)
void(* UpdateFn7)(FunctionContext *, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, AnyVal *)
void(* UpdateFn2)(FunctionContext *, const AnyVal &, const AnyVal &, AnyVal *)
static TimestampValue FromTimestampVal(const impala_udf::TimestampVal &udf_value)
void(* InitFn)(FunctionContext *, AnyVal *)
void * init_fn_
Function ptrs for the different phases of the aggregate function.
const SlotDescriptor * output_slot_desc_
static impala_udf::FunctionContext * CreateContext(RuntimeState *state, MemPool *pool, const impala_udf::FunctionContext::TypeDesc &return_type, const std::vector< impala_udf::FunctionContext::TypeDesc > &arg_types, int varargs_buffer_size=0, bool debug=false)
Create a FunctionContext for a UDF. Caller is responsible for deleting it.
static StringValue FromStringVal(const impala_udf::StringVal &sv)
Definition: string-value.h:103
static const Status OK
Definition: status.h:87
static char * CharSlotToPtr(void *slot, const ColumnType &type)
int tuple_offset() const
Definition: descriptors.h:88
void(* UpdateFn0)(FunctionContext *, AnyVal *)
void(* UpdateFn3)(FunctionContext *, const AnyVal &, const AnyVal &, const AnyVal &, AnyVal *)
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
void DecrementUseCount(LibCacheEntry *entry)
See comment in GetSoFunctionPtr().
Definition: lib-cache.cc:170
void(* UpdateFn1)(FunctionContext *, const AnyVal &, AnyVal *)
void SetError(const char *error_msg)
Definition: udf.cc:332
LibCache::LibCacheEntry * cache_entry_
Cache entry for the library containing the function ptrs.
static void SetAnyVal(const void *slot, const ColumnType &type, AnyVal *dst)
Utility to put val into an AnyVal struct.
Definition: anyval-util.h:205
void(* UpdateFn5)(FunctionContext *, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, AnyVal *)
static void SetAnyVal(const SlotDescriptor *desc, Tuple *tuple, AnyVal *dst)
void SetNotNull(const NullIndicatorOffset &offset)
Turn null indicator bit off.
Definition: tuple.h:107