33 window_(tnode.analytic_node.window),
34 intermediate_tuple_desc_(
35 descs.GetTupleDescriptor(tnode.analytic_node.intermediate_tuple_id)),
37 descs.GetTupleDescriptor(tnode.analytic_node.output_tuple_id)),
38 buffered_tuple_desc_(NULL),
39 partition_by_eq_expr_ctx_(NULL),
40 order_by_eq_expr_ctx_(NULL),
41 rows_start_offset_(0),
43 has_first_val_null_offset_(false),
44 first_val_null_offset_(0),
46 prev_pool_last_result_idx_(-1),
47 prev_pool_last_window_idx_(-1),
49 dummy_result_tuple_(NULL),
50 curr_partition_idx_(-1),
51 prev_input_row_(NULL),
53 evaluation_timer_(NULL) {
54 if (tnode.analytic_node.__isset.buffered_tuple_id) {
56 tnode.analytic_node.buffered_tuple_id);
58 if (!tnode.analytic_node.__isset.window) {
60 }
else if (tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) {
62 DCHECK(!
window_.__isset.window_start)
63 <<
"RANGE windows must have UNBOUNDED PRECEDING";
64 DCHECK(!
window_.__isset.window_end ||
65 window_.window_end.type == TAnalyticWindowBoundaryType::CURRENT_ROW)
66 <<
"RANGE window end bound must be CURRENT ROW or UNBOUNDED FOLLOWING";
68 DCHECK_EQ(tnode.analytic_node.window.type, TAnalyticWindowType::ROWS);
70 if (
window_.__isset.window_start) {
71 TAnalyticWindowBoundary b =
window_.window_start;
72 if (b.__isset.rows_offset_value) {
76 DCHECK_EQ(b.type, TAnalyticWindowBoundaryType::CURRENT_ROW);
80 if (
window_.__isset.window_end) {
81 TAnalyticWindowBoundary b =
window_.window_end;
82 if (b.__isset.rows_offset_value) {
84 if (b.type == TAnalyticWindowBoundaryType::PRECEDING)
rows_end_offset_ *= -1;
86 DCHECK_EQ(b.type, TAnalyticWindowBoundaryType::CURRENT_ROW);
96 const TAnalyticNode& analytic_node = tnode.analytic_node;
97 bool has_lead_fn =
false;
98 for (
int i = 0; i < analytic_node.analytic_functions.size(); ++i) {
101 pool_, analytic_node.analytic_functions[i],
true, &evaluator));
103 const TFunction& fn = analytic_node.analytic_functions[i].nodes[0].fn;
104 is_lead_fn_.push_back(
"lead" == fn.name.function_name);
107 DCHECK(!has_lead_fn || !
window_.__isset.window_start);
109 DCHECK(
window_.__isset.window_end || !
window_.__isset.window_start)
110 <<
"UNBOUNDED FOLLOWING is only supported with UNBOUNDED PRECEDING.";
111 if (analytic_node.__isset.partition_by_eq) {
112 DCHECK(analytic_node.__isset.buffered_tuple_id);
116 if (analytic_node.__isset.order_by_eq) {
117 DCHECK(analytic_node.__isset.buffered_tuple_id);
146 vector<TTupleId> tuple_ids;
147 tuple_ids.push_back(
child(0)->
row_desc().tuple_descriptors()[0]->
id());
217 if (b.type == TAnalyticWindowBoundaryType::CURRENT_ROW) {
218 return "CURRENT_ROW";
221 if (b.__isset.rows_offset_value) {
222 ss << b.rows_offset_value;
225 DCHECK(
false) <<
"Range offsets not yet implemented";
227 if (b.type == TAnalyticWindowBoundaryType::PRECEDING) {
230 DCHECK_EQ(b.type, TAnalyticWindowBoundaryType::FOLLOWING);
249 if (
window_.__isset.window_start) {
252 ss <<
"UNBOUNDED_PRECEDING";
256 if (
window_.__isset.window_end) {
259 ss <<
"UNBOUNDED_FOLLOWING";
271 ss <<
" result_tuples idx: [";
272 for (list<pair<int64_t, Tuple*> >::const_iterator it =
result_tuples_.begin();
279 ss <<
" window_tuples idx: [";
280 for (list<pair<int64_t, Tuple*> >::const_iterator it =
window_tuples_.begin();
290 ss <<
" window_tuples empty";
292 ss <<
" window_tuples idx range: (" <<
window_tuples_.front().first <<
","
297 ss <<
" result_tuples empty";
299 ss <<
" result_tuples idx range: (" <<
result_tuples_.front().first <<
","
309 VLOG_ROW <<
id() <<
" Update idx=" << stream_idx;
311 if (
window_.__isset.window_start) {
312 VLOG_ROW <<
id() <<
" Adding tuple to window at idx=" << stream_idx;
315 window_tuples_.push_back(pair<int64_t, Tuple*>(stream_idx, tuple));
327 VLOG_FILE <<
id() <<
" Unpin input stream while adding row idx=" << stream_idx;
338 VLOG_ROW <<
id() <<
" AddResultTuple idx=" << stream_idx;
345 result_tuples_.push_back(pair<int64_t, Tuple*>(stream_idx, result_tuple));
351 int64_t stream_idx,
TupleRow* row) {
355 VLOG_ROW <<
id() <<
" TryAddResultTupleForPrevRow partition=" << next_partition
356 <<
" idx=" << stream_idx;
366 VLOG_ROW <<
id() <<
" TryAddResultTupleForCurrRow idx=" << stream_idx;
382 VLOG_ROW <<
id() <<
" Remove idx=" << remove_idx <<
" stream_idx=" << stream_idx;
392 int64_t prev_partition_idx) {
393 DCHECK_LT(prev_partition_idx, partition_idx);
411 VLOG_ROW <<
id() <<
" TryAddRemainingResults prev_partition_idx=" << prev_partition_idx
413 for (int64_t next_result_idx =
last_result_idx_ + 1; next_result_idx < partition_idx;
421 <<
" for result row at idx=" << next_result_idx;
436 VLOG_FILE <<
id() <<
" InitNextPartition idx=" << stream_idx;
449 Tuple* prev_partition_last_result_tuple = NULL;
452 window_.window_end.type == TAnalyticWindowBoundaryType::PRECEDING);
453 VLOG_ROW <<
id() <<
" Removing result past partition idx: "
458 if (prev_partition_last_result_tuple != NULL) {
465 prev_partition_last_result_tuple));
469 VLOG_ROW <<
id() <<
" After removing results past partition: "
477 window_.window_end.type == TAnalyticWindowBoundaryType::FOLLOWING)) {
494 window_.window_end.type == TAnalyticWindowBoundaryType::PRECEDING) {
519 DCHECK(pred_ctx != NULL);
595 bool next_partition =
false;
626 VLOG_FILE <<
id() <<
" Transfer resources from curr to prev pool at idx: "
627 << stream_idx <<
", stores tuples with last result idx: "
628 << prev_pool_last_result_idx_ <<
" last window idx: "
651 for (
int i = 0; i < input_batch.num_rows(); ++i) {
660 input_batch.CopyRow(input_batch.GetRow(i), dest);
674 input_batch.TransferResourceOwnership(output_batch);
691 DCHECK_GE(rows_to_return, 0);
693 return rows_to_return;
711 bool output_eos =
false;
718 prev_pool_last_result_idx_ < input_stream_->
rows_returned() &&
720 VLOG_FILE <<
id() <<
" Transfer prev pool to output batch, "
726 prev_pool_last_window_idx_ = -1;
734 DCHECK(
false) <<
"NYI";
767 *out << string(indentation_level * 2,
' ');
768 *out <<
"AnalyticEvalNode("
void TryAddRemainingResults(int64_t partition_idx, int64_t prev_partition_idx)
The underlying memory management is done by the BufferedBlockMgr.
std::string DebugStateString(bool detailed) const
Debug string containing current state. If 'detailed', per-row state is included.
std::vector< bool > is_lead_fn_
void TryAddResultTupleForPrevRow(bool next_partition, int64_t stream_idx, TupleRow *row)
virtual Status QueryMaintenance(RuntimeState *state)
Frees local allocations from evaluators_.
BooleanVal GetBooleanVal(TupleRow *row)
Calls Get*Val on root_.
virtual Status Init(const TPlanNode &tnode)
Status GetNextOutputBatch(RuntimeState *state, RowBatch *row_batch, bool *eos)
int64_t num_rows_returned_
Tuple * GetTuple(int tuple_idx)
TupleRow * child_tuple_cmp_row_
bool input_eos_
True when there are no more input rows to consume from our child.
BufferedBlockMgr * block_mgr()
MemTracker * mem_tracker()
long first_val_null_offset_
boost::scoped_ptr< RuntimeProfile > runtime_profile_
boost::scoped_ptr< RowBatch > curr_child_batch_
A tuple with 0 materialised slots is represented as NULL.
virtual Status Reset(RuntimeState *state)
const TAnalyticWindow window_
#define RETURN_IF_ERROR(stmt)
some generally useful macros
std::string DebugWindowString() const
Debug string containing the window definition.
std::string DebugString() const
virtual Status Init(const TPlanNode &tnode)
TupleRow * GetRow(int row_idx)
#define ADD_TIMER(profile, name)
Status RegisterClient(int num_reserved_buffers, MemTracker *tracker, RuntimeState *state, Client **client)
void GetValue(FunctionContext *agg_fn_ctx, Tuple *src, Tuple *dst)
int64_t NumOutputRowsReady() const
void AcquireData(MemPool *src, bool keep_current)
const std::vector< SlotDescriptor * > & slots() const
void Add(FunctionContext *agg_fn_ctx, TupleRow *src, Tuple *dst)
void Remove(FunctionContext *agg_fn_ctx, TupleRow *src, Tuple *dst)
static Status Create(ObjectPool *pool, const TExpr &desc, AggFnEvaluator **result)
const RowDescriptor & row_desc() const
static Tuple * Create(int size, MemPool *pool)
initialize individual tuple with data residing in mem pool
int64_t prev_pool_last_window_idx_
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
TupleDescriptor * GetTupleDescriptor(TupleId id) const
virtual void Close(RuntimeState *state)
void AddExprCtxToFree(ExprContext *ctx)
Tuple * DeepCopy(const TupleDescriptor &desc, MemPool *pool, bool convert_ptrs=false)
TupleRow * prev_input_row_
void TryRemoveRowsBeforeWindow(int64_t stream_idx)
Status Prepare(RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
MemTracker * expr_mem_tracker()
AnalyticFnScope fn_scope_
void InitNextPartition(int64_t stream_idx)
boost::scoped_ptr< MemPool > mem_pool_
Pool used for O(1) allocations that live until close.
ObjectPool * obj_pool() const
TupleDescriptor * child_tuple_desc_
int64_t prev_pool_last_result_idx_
#define RETURN_IF_CANCELLED(state)
void Init(FunctionContext *agg_fn_ctx, Tuple *dst)
Functions for different phases of the aggregation.
Status ProcessChildBatches(RuntimeState *state)
bool PrevRowCompare(ExprContext *pred_ctx)
int64_t rows_start_offset_
virtual Status Prepare(RuntimeState *state)
BufferedBlockMgr::Client * client_
Block manager client used by input_stream_. Not owned.
virtual Status QueryMaintenance(RuntimeState *state)
const TupleDescriptor * intermediate_tuple_desc_
Tuple descriptor for storing intermediate values of analytic fn evaluation.
const DescriptorTbl & desc_tbl() const
std::vector< AggFnEvaluator * > evaluators_
Analytic function evaluators.
int64_t rows_returned() const
std::list< std::pair< int64_t, Tuple * > > result_tuples_
#define COUNTER_SET(c, v)
MemPool * tuple_data_pool()
static Status CreateExprTree(ObjectPool *pool, const TExpr &texpr, ExprContext **ctx)
RuntimeProfile::Counter * rows_returned_counter_
RuntimeProfile::Counter * evaluation_timer_
Time spent processing the child rows.
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
Status ProcessChildBatch(RuntimeState *state)
string DebugWindowBoundString(const TAnalyticWindowBoundary &b)
bool has_first_val_null_offset_
void SetTuple(int tuple_idx, Tuple *tuple)
void FreeLocalAllocations()
Tuple * dummy_result_tuple_
ExprContext * partition_by_eq_expr_ctx_
boost::scoped_ptr< BufferedTupleStream > input_stream_
int64_t last_result_idx_
Index in input_stream_ of the most recently added result tuple.
TupleDescriptor * buffered_tuple_desc_
int64_t curr_partition_idx_
Index of the row in input_stream_ at which the current partition started.
boost::scoped_ptr< RowBatch > prev_child_batch_
ExprContext * order_by_eq_expr_ctx_
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
std::vector< impala_udf::FunctionContext * > fn_ctxs_
virtual Status Open(RuntimeState *state)
Status Open(RuntimeState *state)
Must be called after calling Prepare(). Should not be called on clones.
std::vector< ExprContext * > conjunct_ctxs_
virtual void Close(RuntimeState *state)
boost::scoped_ptr< MemPool > curr_tuple_pool_
virtual std::string DebugString() const
void AddResultTuple(int64_t stream_idx)
void Close(RuntimeState *state)
Closes all FunctionContexts. Must be called on every ExprContext, including clones.
boost::scoped_ptr< MemPool > prev_tuple_pool_
static const int MAX_TUPLE_POOL_SIZE
Status AddRow(int64_t stream_idx, TupleRow *row)
Adds the row to the evaluators and the tuple stream.
void TryAddResultTupleForCurrRow(int64_t stream_idx, TupleRow *row)
std::list< std::pair< int64_t, Tuple * > > window_tuples_
virtual Status Open(RuntimeState *state)
AnalyticEvalNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
virtual Status Prepare(RuntimeState *state)
RuntimeProfile * runtime_profile()
void Finalize(FunctionContext *agg_fn_ctx, Tuple *src, Tuple *dst)
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
const TupleDescriptor * result_tuple_desc_
Tuple descriptor for storing results of analytic fn evaluation.