Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
exec-node.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_EXEC_EXEC_NODE_H
17 #define IMPALA_EXEC_EXEC_NODE_H
18 
19 #include <vector>
20 #include <sstream>
21 
22 #include "common/status.h"
23 #include "runtime/descriptors.h" // for RowDescriptor
24 #include "util/runtime-profile.h"
25 #include "util/blocking-queue.h"
26 #include "gen-cpp/PlanNodes_types.h"
27 
28 namespace impala {
29 
30 class Expr;
31 class ExprContext;
32 class ObjectPool;
33 class Counters;
34 class SortExecExprs;
35 class RowBatch;
36 class RuntimeState;
37 class TPlan;
38 class TupleRow;
39 class DataSink;
40 class MemTracker;
41 
46 class ExecNode {
47  public:
49  ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
50 
51  virtual ~ExecNode();
52 
56  virtual Status Init(const TPlanNode& tnode);
57 
65  virtual Status Prepare(RuntimeState* state);
66 
72  virtual Status Open(RuntimeState* state);
73 
87  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) = 0;
88 
97  virtual Status Reset(RuntimeState* state);
98 
112  virtual void Close(RuntimeState* state);
113 
117  static Status CreateTree(ObjectPool* pool, const TPlan& plan,
118  const DescriptorTbl& descs, ExecNode** root);
119 
121  static void SetDebugOptions(int node_id, TExecNodePhase::type phase,
122  TDebugAction::type action, ExecNode* tree);
123 
126  void CollectNodes(TPlanNodeType::type node_type, std::vector<ExecNode*>* nodes);
127 
129  void CollectScanNodes(std::vector<ExecNode*>* nodes);
130 
134  static bool EvalConjuncts(ExprContext* const* ctxs, int num_ctxs, TupleRow* row);
135 
138  static llvm::Function* CodegenEvalConjuncts(
139  RuntimeState* state, const std::vector<ExprContext*>& conjunct_ctxs,
140  const char* name = "EvalConjuncts");
141 
143  std::string DebugString() const;
144 
151  virtual void DebugString(int indentation_level, std::stringstream* out) const;
152  const std::vector<ExprContext*>& conjunct_ctxs() const { return conjunct_ctxs_; }
153 
154  int id() const { return id_; }
155  TPlanNodeType::type type() const { return type_; }
156  const RowDescriptor& row_desc() const { return row_descriptor_; }
157  int64_t rows_returned() const { return num_rows_returned_; }
158  int64_t limit() const { return limit_; }
159  bool ReachedLimit() { return limit_ != -1 && num_rows_returned_ >= limit_; }
160 
162  MemTracker* mem_tracker() { return mem_tracker_.get(); }
164 
166  static int GetNodeIdFromProfile(RuntimeProfile* p);
167 
169  static const std::string ROW_THROUGHPUT_COUNTER;
170 
171  protected:
172  friend class DataSink;
173 
181  class RowBatchQueue : public BlockingQueue<RowBatch*> {
182  public:
185  RowBatchQueue(int max_batches);
186  ~RowBatchQueue();
187 
189  void AddBatch(RowBatch* batch);
190 
194  RowBatch* GetBatch();
195 
199  int Cleanup();
200 
201  private:
204 
206  std::list<RowBatch*> cleanup_queue_;
207  };
208 
209  int id_; // unique w/in single plan tree
210  TPlanNodeType::type type_;
212  std::vector<ExprContext*> conjunct_ctxs_;
213 
214  std::vector<ExecNode*> children_;
216 
219  TExecNodePhase::type debug_phase_;
220  TDebugAction::type debug_action_;
221 
222  int64_t limit_; // -1: no limit
224 
225  boost::scoped_ptr<RuntimeProfile> runtime_profile_;
228 
230  boost::scoped_ptr<MemTracker> mem_tracker_;
231 
233  boost::scoped_ptr<MemTracker> expr_mem_tracker_;
234 
238  boost::mutex exec_options_lock_;
240 
241  ExecNode* child(int i) { return children_[i]; }
242  bool is_closed() { return is_closed_; }
243 
245  static Status CreateNode(ObjectPool* pool, const TPlanNode& tnode,
246  const DescriptorTbl& descs, ExecNode** node);
247 
248  static Status CreateTreeHelper(ObjectPool* pool, const std::vector<TPlanNode>& tnodes,
249  const DescriptorTbl& descs, ExecNode* parent, int* node_idx, ExecNode** root);
250 
251  virtual bool IsScanNode() const { return false; }
252 
253  void InitRuntimeProfile(const std::string& name);
254 
257  Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState* state);
258 
260  void AddRuntimeExecOption(const std::string& option);
261 
265  //
269  virtual Status QueryMaintenance(RuntimeState* state);
270 
276  void AddExprCtxToFree(ExprContext* ctx) { expr_ctxs_to_free_.push_back(ctx); }
277  void AddExprCtxsToFree(const std::vector<ExprContext*>& ctxs);
278  void AddExprCtxsToFree(const SortExecExprs& sort_exec_exprs);
279 
280  private:
284 
286  std::vector<ExprContext*> expr_ctxs_to_free_;
287 };
288 
289 }
290 #endif
291 
int id() const
Definition: exec-node.h:154
void AddRuntimeExecOption(const std::string &option)
Appends option to 'runtime_exec_options_'.
Definition: exec-node.cc:188
void CollectNodes(TPlanNodeType::type node_type, std::vector< ExecNode * > *nodes)
Definition: exec-node.cc:359
void CollectScanNodes(std::vector< ExecNode * > *nodes)
Collect all scan node types.
Definition: exec-node.cc:366
int64_t num_rows_returned_
Definition: exec-node.h:223
std::vector< ExprContext * > expr_ctxs_to_free_
Expr contexts whose local allocations are safe to free in the main execution thread.
Definition: exec-node.h:286
MemTracker * mem_tracker()
Definition: exec-node.h:162
static int GetNodeIdFromProfile(RuntimeProfile *p)
Extract node id from p->name().
Definition: exec-node.cc:62
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
boost::scoped_ptr< MemTracker > expr_mem_tracker_
MemTracker that should be used for ExprContexts.
Definition: exec-node.h:233
void InitRuntimeProfile(const std::string &name)
Definition: exec-node.cc:371
virtual Status Init(const TPlanNode &tnode)
Definition: exec-node.cc:124
Lightweight spinlock.
Definition: spinlock.h:24
RowDescriptor row_descriptor_
Definition: exec-node.h:215
static Status CreateTree(ObjectPool *pool, const TPlan &plan, const DescriptorTbl &descs, ExecNode **root)
Definition: exec-node.cc:199
const RowDescriptor & row_desc() const
Definition: exec-node.h:156
RowBatchQueue(int max_batches)
Definition: exec-node.cc:66
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Definition: exec-node.cc:378
bool ReachedLimit()
Definition: exec-node.h:159
const std::vector< ExprContext * > & conjunct_ctxs() const
Definition: exec-node.h:152
void AddExprCtxToFree(ExprContext *ctx)
Definition: exec-node.h:276
int64_t limit_
Definition: exec-node.h:222
boost::mutex exec_options_lock_
Definition: exec-node.h:238
TPlanNodeType::type type() const
Definition: exec-node.h:155
ExecNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
Init conjuncts.
Definition: exec-node.cc:106
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
Definition: exec-node.cc:345
Superclass of all data sinks.
Definition: data-sink.h:39
MemTracker * expr_mem_tracker()
Definition: exec-node.h:163
virtual ~ExecNode()
Definition: exec-node.cc:121
static llvm::Function * CodegenEvalConjuncts(RuntimeState *state, const std::vector< ExprContext * > &conjunct_ctxs, const char *name="EvalConjuncts")
Definition: exec-node.cc:452
virtual bool IsScanNode() const
Definition: exec-node.h:251
TExecNodePhase::type debug_phase_
Definition: exec-node.h:219
ObjectPool pool
virtual Status Prepare(RuntimeState *state)
Definition: exec-node.cc:130
static Status CreateNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs, ExecNode **node)
Create a single exec node derived from thrift node; place exec node in 'pool'.
Definition: exec-node.cc:260
void AddExprCtxsToFree(const std::vector< ExprContext * > &ctxs)
virtual Status QueryMaintenance(RuntimeState *state)
Definition: exec-node.cc:401
TDebugAction::type debug_action_
Definition: exec-node.h:220
static const std::string ROW_THROUGHPUT_COUNTER
Names of counters shared by all exec nodes.
Definition: exec-node.h:169
static Status CreateTreeHelper(ObjectPool *pool, const std::vector< TPlanNode > &tnodes, const DescriptorTbl &descs, ExecNode *parent, int *node_idx, ExecNode **root)
Definition: exec-node.cc:218
bool is_closed()
Definition: exec-node.h:242
This class is thread-safe.
Definition: mem-tracker.h:61
std::vector< ExecNode * > children_
Definition: exec-node.h:214
int64_t rows_returned() const
Definition: exec-node.h:157
RuntimeProfile::Counter * rows_returned_counter_
Definition: exec-node.h:226
ExecNode * child(int i)
Definition: exec-node.h:241
SpinLock lock_
Lock protecting cleanup_queue_.
Definition: exec-node.h:203
boost::scoped_ptr< MemTracker > mem_tracker_
Account for peak memory used by this node.
Definition: exec-node.h:230
TPlanNodeType::type type_
Definition: exec-node.h:210
ObjectPool * pool_
Definition: exec-node.h:211
std::list< RowBatch * > cleanup_queue_
Queue of orphaned row batches.
Definition: exec-node.h:206
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
Definition: exec-node.cc:393
void AddBatch(RowBatch *batch)
Adds a batch to the queue. This is blocking if the queue is full.
Definition: exec-node.cc:74
virtual Status Open(RuntimeState *state)
Definition: exec-node.cc:154
std::string runtime_exec_options_
Definition: exec-node.h:239
std::vector< ExprContext * > conjunct_ctxs_
Definition: exec-node.h:212
virtual void Close(RuntimeState *state)
Definition: exec-node.cc:166
static void SetDebugOptions(int node_id, TExecNodePhase::type phase, TDebugAction::type action, ExecNode *tree)
Set debug action for node with given id in 'tree'.
Definition: exec-node.cc:332
string name
Definition: cpu-info.cc:50
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)=0
RuntimeProfile::Counter * rows_returned_rate_
Definition: exec-node.h:227
int64_t limit() const
Definition: exec-node.h:158
virtual Status Reset(RuntimeState *state)
Definition: exec-node.cc:159
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161