16 #ifndef IMPALA_EXPRS_AGG_FN_EVALUATOR_H
17 #define IMPALA_EXPRS_AGG_FN_EVALUATOR_H
21 #include <boost/scoped_ptr.hpp>
30 #include "gen-cpp/Exprs_types.h"
31 #include "gen-cpp/PlanNodes_types.h"
32 #include "gen-cpp/Types_types.h"
34 using namespace impala_udf;
38 class AggregationNode;
114 bool is_count_star()
const {
return agg_op_ == COUNT && input_expr_ctxs_.empty(); }
115 bool is_builtin()
const {
return fn_.binary_type == TFunctionBinaryType::BUILTIN; }
118 const std::string&
fn_name()
const {
return fn_.name.function_name; }
119 const std::string&
update_symbol()
const {
return fn_.aggregate_fn.update_fn_symbol; }
120 const std::string&
merge_symbol()
const {
return fn_.aggregate_fn.merge_fn_symbol; }
122 static std::string
DebugString(
const std::vector<AggFnEvaluator*>& exprs);
152 static void Init(
const std::vector<AggFnEvaluator*>& evaluators,
153 const std::vector<FunctionContext*>& fn_ctxs,
Tuple* dst);
154 static void Add(
const std::vector<AggFnEvaluator*>& evaluators,
155 const std::vector<FunctionContext*>& fn_ctxs,
TupleRow* src,
Tuple* dst);
156 static void Remove(
const std::vector<AggFnEvaluator*>& evaluators,
157 const std::vector<FunctionContext*>& fn_ctxs,
TupleRow* src,
Tuple* dst);
158 static void Serialize(
const std::vector<AggFnEvaluator*>& evaluators,
159 const std::vector<FunctionContext*>& fn_ctxs,
Tuple* dst);
160 static void GetValue(
const std::vector<AggFnEvaluator*>& evaluators,
161 const std::vector<FunctionContext*>& fn_ctxs,
Tuple* src,
Tuple* dst);
162 static void Finalize(
const std::vector<AggFnEvaluator*>& evaluators,
163 const std::vector<FunctionContext*>& fn_ctxs,
Tuple* src,
Tuple* dst);
238 inline void AggFnEvaluator::Add(
241 Update(agg_fn_ctx, row, dst, is_merge() ? merge_fn_ : update_fn_);
243 inline void AggFnEvaluator::Remove(
246 Update(agg_fn_ctx, row, dst, remove_fn_);
248 inline void AggFnEvaluator::Serialize(
250 SerializeOrFinalize(agg_fn_ctx, tuple, intermediate_slot_desc_, tuple, serialize_fn_);
252 inline void AggFnEvaluator::Finalize(
254 SerializeOrFinalize(agg_fn_ctx, src, output_slot_desc_, dst, finalize_fn_);
256 inline void AggFnEvaluator::GetValue(
258 SerializeOrFinalize(agg_fn_ctx, src, output_slot_desc_, dst, get_value_fn_);
261 inline void AggFnEvaluator::Init(
const std::vector<AggFnEvaluator*>& evaluators,
262 const std::vector<FunctionContext*>& fn_ctxs,
Tuple* dst) {
263 DCHECK_EQ(evaluators.size(), fn_ctxs.size());
264 for (
int i = 0; i < evaluators.size(); ++i) {
265 evaluators[i]->Init(fn_ctxs[i], dst);
268 inline void AggFnEvaluator::Add(
const std::vector<AggFnEvaluator*>& evaluators,
269 const std::vector<FunctionContext*>& fn_ctxs,
TupleRow* src,
Tuple* dst) {
270 DCHECK_EQ(evaluators.size(), fn_ctxs.size());
271 for (
int i = 0; i < evaluators.size(); ++i) {
272 evaluators[i]->Add(fn_ctxs[i], src, dst);
275 inline void AggFnEvaluator::Remove(
const std::vector<AggFnEvaluator*>& evaluators,
276 const std::vector<FunctionContext*>& fn_ctxs,
TupleRow* src,
Tuple* dst) {
277 DCHECK_EQ(evaluators.size(), fn_ctxs.size());
278 for (
int i = 0; i < evaluators.size(); ++i) {
279 evaluators[i]->Remove(fn_ctxs[i], src, dst);
282 inline void AggFnEvaluator::Serialize(
const std::vector<AggFnEvaluator*>& evaluators,
283 const std::vector<FunctionContext*>& fn_ctxs,
Tuple* dst) {
284 DCHECK_EQ(evaluators.size(), fn_ctxs.size());
285 for (
int i = 0; i < evaluators.size(); ++i) {
286 evaluators[i]->Serialize(fn_ctxs[i], dst);
289 inline void AggFnEvaluator::GetValue(
const std::vector<AggFnEvaluator*>& evaluators,
290 const std::vector<FunctionContext*>& fn_ctxs,
Tuple* src,
Tuple* dst) {
291 DCHECK_EQ(evaluators.size(), fn_ctxs.size());
292 for (
int i = 0; i < evaluators.size(); ++i) {
293 evaluators[i]->GetValue(fn_ctxs[i], src, dst);
296 inline void AggFnEvaluator::Finalize(
const std::vector<AggFnEvaluator*>& evaluators,
297 const std::vector<FunctionContext*>& fn_ctxs,
Tuple* src,
Tuple* dst) {
298 DCHECK_EQ(evaluators.size(), fn_ctxs.size());
299 for (
int i = 0; i < evaluators.size(); ++i) {
300 evaluators[i]->Finalize(fn_ctxs[i], src, dst);
const ColumnType & intermediate_type() const
impala::FunctionContextImpl * impl()
TODO: Add mechanism for UDAs to update stats similar to runtime profile counters. ...
A tuple with 0 materialised slots is represented as NULL.
impala_udf::AnyVal * staging_intermediate_val_
std::vector< ExprContext * > input_expr_ctxs_
const std::string & merge_symbol() const
bool is_count_star() const
const bool is_merge_
Indicates whether to Update() or Merge()
impala_udf::AnyVal * staging_merge_input_val_
AggregationOp agg_op() const
std::vector< impala_udf::AnyVal * > staging_input_vals_
void IncrementNumUpdates(int64_t n=1)
const bool is_analytic_fn_
Indicates which functions must be loaded.
std::string DebugString(const T &val)
const SlotDescriptor * intermediate_slot_desc_
Slot into which Update()/Merge()/Serialize() write their result. Not owned.
const std::string & fn_name() const
AggregationOp agg_op_
The enum for some of the builtins that still require special cased logic.
void * init_fn_
Function ptrs for the different phases of the aggregate function.
const std::string & update_symbol() const
const SlotDescriptor * output_slot_desc_
bool SupportsSerialize() const
bool SupportsRemove() const
LibCache::LibCacheEntry * cache_entry_
Cache entry for the library containing the function ptrs.
void IncrementNumRemoves(int64_t n=1)
const std::vector< ExprContext * > & input_expr_ctxs() const