30 #include <thrift/protocol/TDebugProtocol.h>
34 using namespace impala;
35 using namespace impala_udf;
66 return Create(pool, desc,
false, result);
71 DCHECK_GT(desc.nodes.size(), 0);
74 for (
int i = 0; i < desc.nodes[0].num_children; ++i) {
79 pool, desc.nodes, NULL, &node_idx, &expr, &ctx));
80 (*result)->input_expr_ctxs_.push_back(ctx);
87 is_merge_(desc.agg_expr.is_merge_agg),
88 is_analytic_fn_(is_analytic_fn),
89 intermediate_slot_desc_(NULL),
90 output_slot_desc_(NULL),
99 DCHECK(desc.fn.__isset.aggregate_fn);
100 DCHECK(desc.node_type == TExprNodeType::AGGREGATE_EXPR);
102 if (
fn_.name.function_name ==
"count") {
104 }
else if (
fn_.name.function_name ==
"min") {
106 }
else if (
fn_.name.function_name ==
"max") {
108 }
else if (
fn_.name.function_name ==
"sum") {
110 }
else if (
fn_.name.function_name ==
"avg") {
112 }
else if (
fn_.name.function_name ==
"ndv" ||
113 fn_.name.function_name ==
"ndv_no_finalize") {
128 DCHECK(intermediate_slot_desc != NULL);
129 DCHECK_EQ(intermediate_slot_desc->
type().
type,
134 DCHECK(output_slot_desc != NULL);
156 if (
fn_.aggregate_fn.init_fn_symbol.empty() ||
157 fn_.aggregate_fn.update_fn_symbol.empty() ||
160 DCHECK_EQ(
fn_.binary_type, TFunctionBinaryType::BUILTIN);
162 ss <<
"Function " <<
fn_.name.function_name <<
" is not implemented.";
178 if (!
fn_.aggregate_fn.serialize_fn_symbol.empty()) {
183 if (!
fn_.aggregate_fn.get_value_fn_symbol.empty()) {
188 if (!
fn_.aggregate_fn.remove_fn_symbol.empty()) {
193 if (!
fn_.aggregate_fn.finalize_fn_symbol.empty()) {
199 vector<FunctionContext::TypeDesc> arg_types;
210 state, agg_fn_pool, intermediate_type, output_type, arg_types);
245 switch (dst_slot_desc->
type().
type) {
249 *
reinterpret_cast<bool*
>(slot) = reinterpret_cast<const BooleanVal*>(src)->val;
252 *
reinterpret_cast<int8_t*
>(slot) = reinterpret_cast<const TinyIntVal*>(src)->val;
255 *
reinterpret_cast<int16_t*
>(slot) = reinterpret_cast<const SmallIntVal*>(src)->val;
258 *
reinterpret_cast<int32_t*
>(slot) = reinterpret_cast<const IntVal*>(src)->val;
261 *
reinterpret_cast<int64_t*
>(slot) = reinterpret_cast<const BigIntVal*>(src)->val;
264 *
reinterpret_cast<float*
>(slot) = reinterpret_cast<const FloatVal*>(src)->val;
267 *
reinterpret_cast<double*
>(slot) = reinterpret_cast<const DoubleVal*>(src)->val;
275 if (slot != reinterpret_cast<const StringVal*>(src)->ptr) {
276 ctx->
SetError(
"UDA should not set pointer of CHAR(N) intermediate");
281 *reinterpret_cast<const TimestampVal*>(src));
286 *
reinterpret_cast<int32_t*
>(slot) =
287 reinterpret_cast<const DecimalVal*>(src)->val4;
290 *
reinterpret_cast<int64_t*
>(slot) =
291 reinterpret_cast<const DecimalVal*>(src)->val8;
294 #if __BYTE_ORDER == __LITTLE_ENDIAN
299 memcpy(slot, &reinterpret_cast<const DecimalVal*>(src)->val4,
302 DCHECK(
false) <<
"Not implemented.";
309 DCHECK(
false) <<
"NYI: " << dst_slot_desc->
type();
322 sv->
ptr =
reinterpret_cast<uint8_t*
>(
341 if (fn == NULL)
return;
359 reinterpret_cast<UpdateFn1>(fn)(agg_fn_ctx,
363 reinterpret_cast<UpdateFn2>(fn)(agg_fn_ctx,
367 reinterpret_cast<UpdateFn3>(fn)(agg_fn_ctx,
372 reinterpret_cast<UpdateFn4>(fn)(agg_fn_ctx,
377 reinterpret_cast<UpdateFn5>(fn)(agg_fn_ctx,
383 reinterpret_cast<UpdateFn6>(fn)(agg_fn_ctx,
389 reinterpret_cast<UpdateFn7>(fn)(agg_fn_ctx,
396 reinterpret_cast<UpdateFn8>(fn)(agg_fn_ctx,
404 DCHECK(
false) <<
"NYI";
424 if (fn == NULL && src == dst)
return;
429 void* src_slot = NULL;
441 switch (dst_slot_desc->
type().
type) {
445 SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
451 SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
457 SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
463 SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
469 SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
475 SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
481 SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
488 SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
494 SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
500 SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
504 DCHECK(
false) <<
"NYI";
511 for (
int i = 0; i < exprs.size(); ++i) {
512 out << (i == 0 ?
"" :
" ") << exprs[i]->
DebugString();
520 out <<
"AggFnEvaluator(op=" <<
agg_op_;
void(* UpdateFn8)(FunctionContext *, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, AnyVal *)
AnyVal * CreateAnyVal(ObjectPool *pool, const ColumnType &type)
Creates the corresponding AnyVal subclass for type. The object is added to the pool.
void Update(FunctionContext *agg_fn_ctx, TupleRow *row, Tuple *dst, void *fn)
Status Prepare(RuntimeState *state, const RowDescriptor &desc, const SlotDescriptor *intermediate_slot_desc, const SlotDescriptor *output_slot_desc, MemPool *agg_fn_pool, FunctionContext **agg_fn_ctx)
static FunctionContext::TypeDesc ColumnTypeToTypeDesc(const ColumnType &type)
static Status CreateTreeFromThrift(ObjectPool *pool, const std::vector< TExprNode > &nodes, Expr *parent, int *node_idx, Expr **root_expr, ExprContext **ctx)
void SetNull(const NullIndicatorOffset &offset)
const ColumnType & intermediate_type() const
impala::FunctionContextImpl * impl()
TODO: Add mechanism for UDAs to update stats similar to runtime profile counters. ...
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
void(* UpdateFn4)(FunctionContext *, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, AnyVal *)
A tuple with 0 materialised slots is represented as NULL.
void SetConstantArgs(const std::vector< impala_udf::AnyVal * > &constant_args)
Sets constant_args_. The AnyVal* values are owned by the caller.
void SerializeOrFinalize(FunctionContext *agg_fn_ctx, Tuple *src, const SlotDescriptor *dst_slot_desc, Tuple *dst, void *fn)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
std::string DebugString() const
impala_udf::AnyVal * staging_intermediate_val_
std::vector< ExprContext * > input_expr_ctxs_
AnyVal(* GetValueFn)(FunctionContext *, const AnyVal &)
void * GetSlot(int offset)
const bool is_merge_
Indicates whether to Update() or Merge()
static Status Create(ObjectPool *pool, const TExpr &desc, AggFnEvaluator **result)
AnyVal(* FinalizeFn)(FunctionContext *, const AnyVal &)
This object has a compatible storage format with boost::ptime.
Status Open(RuntimeState *state, FunctionContext *agg_fn_ctx)
const NullIndicatorOffset & null_indicator_offset() const
impala_udf::AnyVal * staging_merge_input_val_
std::vector< impala_udf::AnyVal * > staging_input_vals_
void Merge(FunctionContext *agg_fn_ctx, Tuple *src, Tuple *dst)
AggFnEvaluator(const TExprNode &desc, bool is_analytic_fn)
Use Create() instead.
void Close(RuntimeState *state)
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
bool IsNull(const NullIndicatorOffset &offset) const
ObjectPool * obj_pool()
Returns a local object pool.
MemTracker * mem_tracker()
StringVal(* SerializeFn)(FunctionContext *, const StringVal &)
const ColumnType & type() const
const bool is_analytic_fn_
Indicates which functions must be loaded.
ObjectPool * obj_pool() const
void Init(FunctionContext *agg_fn_ctx, Tuple *dst)
Functions for different phases of the aggregation.
void set_num_removes(int64_t n)
function< void(int64_t, int64_t, AtomicInt< int > *)> Fn
int GetByteSize() const
Returns the byte size of this type. Returns 0 for variable length types.
const SlotDescriptor * intermediate_slot_desc_
Slot into which Update()/Merge()/Serialize() write their result. Not owned.
void SetDstSlot(FunctionContext *ctx, const impala_udf::AnyVal *src, const SlotDescriptor *dst_slot_desc, Tuple *dst)
Writes the result in src into dst pointed to by dst_slot_desc.
void set_num_updates(int64_t n)
int len
Only set if type == TYPE_CHAR or type == TYPE_VARCHAR.
static void Write(const void *value, Tuple *tuple, const SlotDescriptor *slot_desc, MemPool *pool)
This is the superclass of all expr evaluation nodes.
static LibCache * instance()
AggregationOp agg_op_
The enum for some of the builtins that still require special cased logic.
void(* UpdateFn6)(FunctionContext *, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, AnyVal *)
void(* UpdateFn7)(FunctionContext *, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, AnyVal *)
void(* UpdateFn2)(FunctionContext *, const AnyVal &, const AnyVal &, AnyVal *)
static TimestampValue FromTimestampVal(const impala_udf::TimestampVal &udf_value)
void(* InitFn)(FunctionContext *, AnyVal *)
void * init_fn_
Function ptrs for the different phases of the aggregate function.
const SlotDescriptor * output_slot_desc_
static impala_udf::FunctionContext * CreateContext(RuntimeState *state, MemPool *pool, const impala_udf::FunctionContext::TypeDesc &return_type, const std::vector< impala_udf::FunctionContext::TypeDesc > &arg_types, int varargs_buffer_size=0, bool debug=false)
Create a FunctionContext for a UDF. Caller is responsible for deleting it.
static StringValue FromStringVal(const impala_udf::StringVal &sv)
static char * CharSlotToPtr(void *slot, const ColumnType &type)
void(* UpdateFn0)(FunctionContext *, AnyVal *)
void(* UpdateFn3)(FunctionContext *, const AnyVal &, const AnyVal &, const AnyVal &, AnyVal *)
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
void DecrementUseCount(LibCacheEntry *entry)
See comment in GetSoFunctionPtr().
void(* UpdateFn1)(FunctionContext *, const AnyVal &, AnyVal *)
void SetError(const char *error_msg)
LibCache::LibCacheEntry * cache_entry_
Cache entry for the library containing the function ptrs.
static void SetAnyVal(const void *slot, const ColumnType &type, AnyVal *dst)
Utility to put val into an AnyVal struct.
void(* UpdateFn5)(FunctionContext *, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, const AnyVal &, AnyVal *)
static void SetAnyVal(const SlotDescriptor *desc, Tuple *tuple, AnyVal *dst)
void SetNotNull(const NullIndicatorOffset &offset)
Turn null indicator bit off.