Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
agg-fn-evaluator.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_EXPRS_AGG_FN_EVALUATOR_H
17 #define IMPALA_EXPRS_AGG_FN_EVALUATOR_H
18 
19 #include <string>
20 
21 #include <boost/scoped_ptr.hpp>
22 #include "common/status.h"
23 #include "runtime/descriptors.h"
24 #include "runtime/lib-cache.h"
25 #include "runtime/tuple-row.h"
26 #include "runtime/types.h"
27 #include "udf/udf.h"
28 #include "udf/udf-internal.h"
29 
30 #include "gen-cpp/Exprs_types.h"
31 #include "gen-cpp/PlanNodes_types.h"
32 #include "gen-cpp/Types_types.h"
33 
34 using namespace impala_udf;
35 
36 namespace impala {
37 
38 class AggregationNode;
39 class Expr;
40 class ExprContext;
41 class MemPool;
42 class MemTracker;
43 class ObjectPool;
44 class RowDescriptor;
45 class RuntimeState;
46 class SlotDescriptor;
47 class Tuple;
48 class TupleRow;
49 class TExprNode;
50 
54 //
59 //
63  public:
68  MIN,
69  MAX,
70  SUM,
71  AVG,
72  NDV,
74  };
75 
80  static Status Create(ObjectPool* pool, const TExpr& desc, AggFnEvaluator** result);
81 
85  static Status Create(ObjectPool* pool, const TExpr& desc, bool is_analytic_fn,
86  AggFnEvaluator** result);
87 
96  Status Prepare(RuntimeState* state, const RowDescriptor& desc,
97  const SlotDescriptor* intermediate_slot_desc,
98  const SlotDescriptor* output_slot_desc,
99  MemPool* agg_fn_pool, FunctionContext** agg_fn_ctx);
100 
101  ~AggFnEvaluator();
102 
106  Status Open(RuntimeState* state, FunctionContext* agg_fn_ctx);
107 
108  void Close(RuntimeState* state);
109 
110  const ColumnType& intermediate_type() const { return intermediate_slot_desc_->type(); }
111  bool is_merge() const { return is_merge_; }
112  AggregationOp agg_op() const { return agg_op_; }
113  const std::vector<ExprContext*>& input_expr_ctxs() const { return input_expr_ctxs_; }
114  bool is_count_star() const { return agg_op_ == COUNT && input_expr_ctxs_.empty(); }
115  bool is_builtin() const { return fn_.binary_type == TFunctionBinaryType::BUILTIN; }
116  bool SupportsRemove() const { return remove_fn_ != NULL; }
117  bool SupportsSerialize() const { return serialize_fn_ != NULL; }
118  const std::string& fn_name() const { return fn_.name.function_name; }
119  const std::string& update_symbol() const { return fn_.aggregate_fn.update_fn_symbol; }
120  const std::string& merge_symbol() const { return fn_.aggregate_fn.merge_fn_symbol; }
121 
122  static std::string DebugString(const std::vector<AggFnEvaluator*>& exprs);
123  std::string DebugString() const;
124 
126  void Init(FunctionContext* agg_fn_ctx, Tuple* dst);
127 
131  void Add(FunctionContext* agg_fn_ctx, TupleRow* src, Tuple* dst);
132 
135  void Remove(FunctionContext* agg_fn_ctx, TupleRow* src, Tuple* dst);
136 
140  void Merge(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst);
141 
142  void Serialize(FunctionContext* agg_fn_ctx, Tuple* dst);
143  void Finalize(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst);
144 
149  void GetValue(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst);
150 
152  static void Init(const std::vector<AggFnEvaluator*>& evaluators,
153  const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst);
154  static void Add(const std::vector<AggFnEvaluator*>& evaluators,
155  const std::vector<FunctionContext*>& fn_ctxs, TupleRow* src, Tuple* dst);
156  static void Remove(const std::vector<AggFnEvaluator*>& evaluators,
157  const std::vector<FunctionContext*>& fn_ctxs, TupleRow* src, Tuple* dst);
158  static void Serialize(const std::vector<AggFnEvaluator*>& evaluators,
159  const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst);
160  static void GetValue(const std::vector<AggFnEvaluator*>& evaluators,
161  const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst);
162  static void Finalize(const std::vector<AggFnEvaluator*>& evaluators,
163  const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst);
164 
173 
174  private:
175  const TFunction fn_;
177  const bool is_merge_;
179  const bool is_analytic_fn_;
180 
183 
187 
188  std::vector<ExprContext*> input_expr_ctxs_;
189 
192 
197  std::vector<impala_udf::AnyVal*> staging_input_vals_;
200 
203 
205  void* init_fn_;
206  void* update_fn_;
207  void* remove_fn_;
208  void* merge_fn_;
212 
214  AggFnEvaluator(const TExprNode& desc, bool is_analytic_fn);
215 
219 
224  void Update(FunctionContext* agg_fn_ctx, TupleRow* row, Tuple* dst, void* fn);
225 
230  void SerializeOrFinalize(FunctionContext* agg_fn_ctx, Tuple* src,
231  const SlotDescriptor* dst_slot_desc, Tuple* dst, void* fn);
232 
234  void SetDstSlot(FunctionContext* ctx, const impala_udf::AnyVal* src,
235  const SlotDescriptor* dst_slot_desc, Tuple* dst);
236 };
237 
238 inline void AggFnEvaluator::Add(
239  FunctionContext* agg_fn_ctx, TupleRow* row, Tuple* dst) {
240  agg_fn_ctx->impl()->IncrementNumUpdates();
241  Update(agg_fn_ctx, row, dst, is_merge() ? merge_fn_ : update_fn_);
242 }
243 inline void AggFnEvaluator::Remove(
244  FunctionContext* agg_fn_ctx, TupleRow* row, Tuple* dst) {
245  agg_fn_ctx->impl()->IncrementNumRemoves();
246  Update(agg_fn_ctx, row, dst, remove_fn_);
247 }
248 inline void AggFnEvaluator::Serialize(
249  FunctionContext* agg_fn_ctx, Tuple* tuple) {
250  SerializeOrFinalize(agg_fn_ctx, tuple, intermediate_slot_desc_, tuple, serialize_fn_);
251 }
252 inline void AggFnEvaluator::Finalize(
253  FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst) {
254  SerializeOrFinalize(agg_fn_ctx, src, output_slot_desc_, dst, finalize_fn_);
255 }
256 inline void AggFnEvaluator::GetValue(
257  FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst) {
258  SerializeOrFinalize(agg_fn_ctx, src, output_slot_desc_, dst, get_value_fn_);
259 }
260 
261 inline void AggFnEvaluator::Init(const std::vector<AggFnEvaluator*>& evaluators,
262  const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) {
263  DCHECK_EQ(evaluators.size(), fn_ctxs.size());
264  for (int i = 0; i < evaluators.size(); ++i) {
265  evaluators[i]->Init(fn_ctxs[i], dst);
266  }
267 }
268 inline void AggFnEvaluator::Add(const std::vector<AggFnEvaluator*>& evaluators,
269  const std::vector<FunctionContext*>& fn_ctxs, TupleRow* src, Tuple* dst) {
270  DCHECK_EQ(evaluators.size(), fn_ctxs.size());
271  for (int i = 0; i < evaluators.size(); ++i) {
272  evaluators[i]->Add(fn_ctxs[i], src, dst);
273  }
274 }
275 inline void AggFnEvaluator::Remove(const std::vector<AggFnEvaluator*>& evaluators,
276  const std::vector<FunctionContext*>& fn_ctxs, TupleRow* src, Tuple* dst) {
277  DCHECK_EQ(evaluators.size(), fn_ctxs.size());
278  for (int i = 0; i < evaluators.size(); ++i) {
279  evaluators[i]->Remove(fn_ctxs[i], src, dst);
280  }
281 }
282 inline void AggFnEvaluator::Serialize(const std::vector<AggFnEvaluator*>& evaluators,
283  const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) {
284  DCHECK_EQ(evaluators.size(), fn_ctxs.size());
285  for (int i = 0; i < evaluators.size(); ++i) {
286  evaluators[i]->Serialize(fn_ctxs[i], dst);
287  }
288 }
289 inline void AggFnEvaluator::GetValue(const std::vector<AggFnEvaluator*>& evaluators,
290  const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) {
291  DCHECK_EQ(evaluators.size(), fn_ctxs.size());
292  for (int i = 0; i < evaluators.size(); ++i) {
293  evaluators[i]->GetValue(fn_ctxs[i], src, dst);
294  }
295 }
296 inline void AggFnEvaluator::Finalize(const std::vector<AggFnEvaluator*>& evaluators,
297  const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) {
298  DCHECK_EQ(evaluators.size(), fn_ctxs.size());
299  for (int i = 0; i < evaluators.size(); ++i) {
300  evaluators[i]->Finalize(fn_ctxs[i], src, dst);
301  }
302 }
303 
304 }
305 
306 #endif
const ColumnType & intermediate_type() const
impala::FunctionContextImpl * impl()
TODO: Add mechanism for UDAs to update stats similar to runtime profile counters. ...
Definition: udf.h:202
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
impala_udf::AnyVal * staging_intermediate_val_
std::vector< ExprContext * > input_expr_ctxs_
const std::string & merge_symbol() const
const bool is_merge_
Indicates whether to Update() or Merge()
impala_udf::AnyVal * staging_merge_input_val_
AggregationOp agg_op() const
std::vector< impala_udf::AnyVal * > staging_input_vals_
PrimitiveType type
Definition: types.h:60
void IncrementNumUpdates(int64_t n=1)
Definition: udf-internal.h:90
const bool is_analytic_fn_
Indicates which functions must be loaded.
ObjectPool pool
std::string DebugString(const T &val)
Definition: udf-debug.h:27
const SlotDescriptor * intermediate_slot_desc_
Slot into which Update()/Merge()/Serialize() write their result. Not owned.
const std::string & fn_name() const
AggregationOp agg_op_
The enum for some of the builtins that still require special cased logic.
void * init_fn_
Function ptrs for the different phases of the aggregate function.
const std::string & update_symbol() const
const SlotDescriptor * output_slot_desc_
bool SupportsSerialize() const
bool SupportsRemove() const
LibCache::LibCacheEntry * cache_entry_
Cache entry for the library containing the function ptrs.
void IncrementNumRemoves(int64_t n=1)
Definition: udf-internal.h:91
const std::vector< ExprContext * > & input_expr_ctxs() const