17 #include <boost/scoped_ptr.hpp>
25 #include "gen-cpp/PlanNodes_types.h"
29 using namespace impala;
31 DEFINE_int32(exchg_node_buffer_size_bytes, 1024 * 1024 * 10,
32 "(Advanced) Maximum size of per-query receive-side buffer");
39 input_row_desc_(descs, tnode.exchange_node.input_row_tuples,
41 tnode.nullable_tuples.begin(),
42 tnode.nullable_tuples.begin() + tnode.exchange_node.input_row_tuples.size())),
44 is_merging_(tnode.exchange_node.__isset.sort_info),
45 offset_(tnode.exchange_node.__isset.
offset ? tnode.exchange_node.
offset : 0),
46 num_rows_skipped_(0) {
57 nulls_first_ = tnode.exchange_node.sort_info.nulls_first;
94 DCHECK(
false) <<
"NYI";
114 <<
" #rows=" << (input_batch_ != NULL ? input_batch_->num_rows() : 0)
115 <<
" is_cancelled=" << (ret_status.
IsCancelled() ?
"true" :
"false")
143 int j = output_batch->
AddRow();
176 DCHECK_EQ(output_batch->
num_rows(), 0);
183 if (rows_to_keep > 0) {
184 output_batch->
CopyRows(0, output_batch->
num_rows() - rows_to_keep, rows_to_keep);
189 if (rows_to_keep > 0 || *eos || output_batch->
AtCapacity())
break;
208 *out << string(indentation_level * 2,
' ');
boost::shared_ptr< DataStreamRecvr > CreateRecvr(RuntimeState *state, const RowDescriptor &row_desc, const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile *profile, bool is_merging)
boost::shared_ptr< DataStreamRecvr > stream_recvr_
int64_t num_rows_returned_
void ClearRow(TupleRow *row)
virtual Status Open(RuntimeState *state)
Blocks until the first batch is available for consumption via GetNext().
int64_t num_rows_skipped_
Number of rows skipped so far.
boost::scoped_ptr< RuntimeProfile > runtime_profile_
const RowDescriptor & row_desc() const
virtual Status Init(const TPlanNode &tnode)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
virtual Status Init(const TPlanNode &tnode)
TupleRow * GetRow(int row_idx)
virtual Status Prepare(RuntimeState *state)
RowDescriptor row_descriptor_
std::vector< bool > is_asc_order_
#define ADD_TIMER(profile, name)
RowDescriptor input_row_desc_
our input rows are a prefix of the rows we produce
void CopyRows(int dest, int src, int num_rows)
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Status Open(RuntimeState *state)
Open all expressions used for sorting and tuple materialization.
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
MemTracker * expr_mem_tracker()
RuntimeProfile::Counter * total_network_receive_timer()
SortExecExprs sort_exec_exprs_
Sort expressions and parameters passed to the merging receiver..
Status FillInputRowBatch(RuntimeState *state)
ExchangeNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
bool IsPrefixOf(const RowDescriptor &other_desc) const
void set_num_rows(int num_rows)
#define RETURN_IF_CANCELLED(state)
virtual Status Reset(RuntimeState *state)
Status Init(const TSortInfo &sort_info, ObjectPool *pool)
Initialize the expressions from a TSortInfo using the specified pool.
virtual Status Prepare(RuntimeState *state)
const std::vector< ExprContext * > & lhs_ordering_expr_ctxs() const
Can only be used after calling Prepare()
const TUniqueId & fragment_instance_id() const
RuntimeProfile::Counter * convert_row_batch_timer_
time spent reconstructing received rows
void AddExprCtxsToFree(const std::vector< ExprContext * > &ctxs)
virtual Status QueryMaintenance(RuntimeState *state)
#define COUNTER_SET(c, v)
RuntimeProfile::Counter * rows_returned_counter_
static ExecEnv * GetInstance()
void CopyRow(TupleRow *src, TupleRow *dest)
uint8_t offset[7 *64-sizeof(uint64_t)]
DataStreamMgr * stream_mgr()
Status Prepare(RuntimeState *state, const RowDescriptor &child_row_desc, const RowDescriptor &output_row_desc, MemTracker *expr_mem_tracker)
Prepare all expressions used for sorting and tuple materialization.
Status GetNextMerging(RuntimeState *state, RowBatch *output_batch, bool *eos)
DEFINE_int32(periodic_counter_update_period_ms, 500,"Period to update rate counters and"" sampling counters in ms")
virtual Status Open(RuntimeState *state)
int64_t offset_
Offset specifying number of rows to skip.
virtual void Close(RuntimeState *state)
void Close(RuntimeState *state)
Close all expressions used for sorting and tuple materialization.
const std::vector< ExprContext * > & rhs_ordering_expr_ctxs() const
Can only be used after calling Open()
std::vector< bool > nulls_first_
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
virtual void Close(RuntimeState *state)
RuntimeProfile * runtime_profile()