20 #include <thrift/protocol/TDebugProtocol.h>
55 DEFINE_bool(enable_partitioned_hash_join,
true,
"Enable partitioned hash join");
56 DEFINE_bool(enable_partitioned_aggregation,
true,
"Enable partitioned hash agg");
60 const string ExecNode::ROW_THROUGHPUT_COUNTER =
"RowsReturnedRate";
66 ExecNode::RowBatchQueue::RowBatchQueue(
int max_batches) :
71 DCHECK(cleanup_queue_.empty());
75 if (!BlockingPut(batch)) {
76 lock_guard<SpinLock> l(
lock_);
77 cleanup_queue_.push_back(batch);
83 if (BlockingGet(&result))
return result;
88 int num_io_buffers = 0;
91 while ((batch = GetBatch()) != NULL) {
96 lock_guard<SpinLock> l(
lock_);
97 for (list<RowBatch*>::iterator it = cleanup_queue_.begin();
98 it != cleanup_queue_.end(); ++it) {
99 num_io_buffers += (*it)->num_io_buffers();
102 cleanup_queue_.clear();
103 return num_io_buffers;
107 :
id_(tnode.node_id),
108 type_(tnode.node_type),
148 for (
int i = 0; i <
children_.size(); ++i) {
160 for (
int i = 0; i <
children_.size(); ++i) {
173 for (
int i = 0; i <
children_.size(); ++i) {
180 LOG(WARNING) <<
"Query " << state->
query_id() <<
" leaked memory." << endl
201 if (plan.nodes.size() == 0) {
207 if (status.
ok() && node_idx + 1 != plan.nodes.size()) {
209 "Plan tree only partially reconstructed. Not all thrift nodes were used.");
212 LOG(ERROR) <<
"Could not construct plan tree:\n"
213 << apache::thrift::ThriftDebugString(plan);
220 const vector<TPlanNode>& tnodes,
226 if (*node_idx >= tnodes.size()) {
227 return Status(
"Failed to reconstruct plan tree from thrift.");
229 int num_children = tnodes[*node_idx].num_children;
233 if (parent != NULL) {
238 for (
int i = 0; i < num_children; ++i) {
243 if (*node_idx >= tnodes.size()) {
244 return Status(
"Failed to reconstruct plan tree from thrift.");
250 for (
int i = 1; i < node->
children_.size(); ++i) {
262 stringstream error_msg;
263 switch (tnode.node_type) {
264 case TPlanNodeType::HDFS_SCAN_NODE:
267 case TPlanNodeType::HBASE_SCAN_NODE:
270 case TPlanNodeType::DATA_SOURCE_NODE:
273 case TPlanNodeType::AGGREGATION_NODE:
274 if (FLAGS_enable_partitioned_aggregation) {
280 case TPlanNodeType::HASH_JOIN_NODE:
283 if (tnode.hash_join_node.join_op == TJoinOp::LEFT_ANTI_JOIN ||
284 tnode.hash_join_node.join_op == TJoinOp::RIGHT_SEMI_JOIN ||
285 tnode.hash_join_node.join_op == TJoinOp::RIGHT_ANTI_JOIN ||
286 tnode.hash_join_node.join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
287 FLAGS_enable_partitioned_hash_join) {
293 case TPlanNodeType::CROSS_JOIN_NODE:
296 case TPlanNodeType::EMPTY_SET_NODE:
299 case TPlanNodeType::EXCHANGE_NODE:
302 case TPlanNodeType::SELECT_NODE:
305 case TPlanNodeType::SORT_NODE:
306 if (tnode.sort_node.use_top_n) {
307 *node = pool->
Add(
new TopNNode(pool, tnode, descs));
309 *node = pool->
Add(
new SortNode(pool, tnode, descs));
312 case TPlanNodeType::UNION_NODE:
315 case TPlanNodeType::ANALYTIC_EVAL_NODE:
319 map<int, const char*>::const_iterator i =
320 _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
321 const char* str =
"unknown node type";
322 if (i != _TPlanNodeType_VALUES_TO_NAMES.end()) {
325 error_msg << str <<
" not implemented";
326 return Status(error_msg.str());
333 int node_id, TExecNodePhase::type phase, TDebugAction::type action,
335 if (root->
id_ == node_id) {
340 for (
int i = 0; i < root->
children_.size(); ++i) {
353 for (
int i = 0; i <
children_.size(); ++i) {
355 children_[i]->DebugString(indentation_level + 1, out);
360 if (
type_ == node_type) nodes->push_back(
this);
361 for (
int i = 0; i <
children_.size(); ++i) {
362 children_[i]->CollectNodes(node_type, nodes);
373 ss << name <<
" (id=" <<
id_ <<
")";
379 DCHECK(phase != TExecNodePhase::INVALID);
382 return Status(TErrorCode::INTERNAL_ERROR,
"Debug Action: FAIL");
394 for (
int i = 0; i < num_ctxs; ++i) {
453 RuntimeState* state,
const vector<ExprContext*>& conjunct_ctxs,
const char*
name) {
454 Function* conjunct_fns[conjunct_ctxs.size()];
455 for (
int i = 0; i < conjunct_ctxs.size(); ++i) {
457 conjunct_ctxs[i]->root()->GetCodegendComputeFn(state, &conjunct_fns[i]);
471 DCHECK(tuple_row_type != NULL);
472 DCHECK(expr_ctx_type != NULL);
474 PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
475 PointerType* expr_ctx_ptr_type = PointerType::get(expr_ctx_type, 0);
480 prototype.AddArgument(
486 Function* fn = prototype.GeneratePrototype(&builder, args);
487 Value* ctxs_arg = args[0];
488 Value* tuple_row_arg = args[2];
490 if (conjunct_ctxs.size() > 0) {
491 LLVMContext& context = codegen->
context();
492 BasicBlock* false_block = BasicBlock::Create(context,
"false", fn);
494 for (
int i = 0; i < conjunct_ctxs.size(); ++i) {
495 BasicBlock* true_block = BasicBlock::Create(context,
"continue", fn, false_block);
497 Value* ctx_arg_ptr = builder.CreateConstGEP1_32(ctxs_arg, i,
"ctx_ptr");
498 Value* ctx_arg = builder.CreateLoad(ctx_arg_ptr,
"ctx");
499 Value* expr_args[] = { ctx_arg, tuple_row_arg };
503 codegen, &builder, conjunct_ctxs[i]->root()->
type(), conjunct_fns[i], expr_args,
508 Value* is_false = builder.CreateNot(result.
GetVal(),
"is_false");
509 Value* return_false = builder.CreateOr(is_null, is_false,
"return_false");
510 builder.CreateCondBr(return_false, false_block, true_block);
513 builder.SetInsertPoint(true_block);
517 builder.SetInsertPoint(false_block);
DerivedCounter * AddDerivedCounter(const std::string &name, TUnit::type unit, const DerivedCounterFunction &counter_fn, const std::string &parent_counter_name="")
void AddRuntimeExecOption(const std::string &option)
Appends option to 'runtime_exec_options_'.
void CollectNodes(TPlanNodeType::type node_type, std::vector< ExecNode * > *nodes)
void CollectScanNodes(std::vector< ExecNode * > *nodes)
Collect all scan node types.
const std::string GetDetail() const
static CodegenAnyVal CreateCallWrapped(LlvmCodeGen *cg, LlvmCodeGen::LlvmBuilder *builder, const ColumnType &type, llvm::Function *fn, llvm::ArrayRef< llvm::Value * > args, const char *name="", llvm::Value *result_ptr=NULL)
Same as above but wraps the result in a CodegenAnyVal.
BooleanVal GetBooleanVal(TupleRow *row)
Calls Get*Val on root_.
const TUniqueId & query_id() const
void AddInfoString(const std::string &key, const std::string &value)
int64_t num_rows_returned_
std::vector< ExprContext * > expr_ctxs_to_free_
Expr contexts whose local allocations are safe to free in the main execution thread.
MemTracker * mem_tracker()
Utility struct that wraps a variable name and llvm type.
boost::scoped_ptr< RuntimeProfile > runtime_profile_
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
int num_io_buffers() const
boost::scoped_ptr< MemTracker > expr_mem_tracker_
MemTracker that should be used for ExprContexts.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
void InitRuntimeProfile(const std::string &name)
boost::mutex lock_
protects all fields below
virtual Status Init(const TPlanNode &tnode)
RowDescriptor row_descriptor_
static Status CreateTree(ObjectPool *pool, const TPlan &plan, const DescriptorTbl &descs, ExecNode **root)
static int64_t UnitsPerSecond(const Counter *total_counter, const Counter *timer)
Derived counter function: return measured throughput as input_value/second.
const RowDescriptor & row_desc() const
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
void AddExprCtxToFree(ExprContext *ctx)
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
boost::mutex exec_options_lock_
TPlanNodeType::type type() const
LLVM code generator. This is the top level object to generate jitted code.
ExecNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
Init conjuncts.
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
MemTracker * expr_mem_tracker()
static const char * LLVM_CLASS_NAME
static const char * LLVM_CLASS_NAME
void AddArgument(const NamedVariable &var)
Add argument.
static llvm::Function * CodegenEvalConjuncts(RuntimeState *state, const std::vector< ExprContext * > &conjunct_ctxs, const char *name="EvalConjuncts")
TExecNodePhase::type debug_phase_
virtual Status Prepare(RuntimeState *state)
static Status CreateNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs, ExecNode **node)
Create a single exec node derived from thrift node; place exec node in 'pool'.
#define ADD_COUNTER(profile, name, unit)
DEFINE_bool(enable_partitioned_hash_join, true,"Enable partitioned hash join")
const std::vector< ExprContext * > & lhs_ordering_expr_ctxs() const
Can only be used after calling Prepare()
void AddExprCtxsToFree(const std::vector< ExprContext * > &ctxs)
virtual Status QueryMaintenance(RuntimeState *state)
TDebugAction::type debug_action_
static const std::string ROW_THROUGHPUT_COUNTER
Names of counters shared by all exec nodes.
static Status CreateTreeHelper(ObjectPool *pool, const std::vector< TPlanNode > &tnodes, const DescriptorTbl &descs, ExecNode *parent, int *node_idx, ExecNode **root)
This class is thread-safe.
std::vector< ExecNode * > children_
llvm::Value * true_value()
Returns true/false constants (bool type)
#define COUNTER_SET(c, v)
static const Status CANCELLED
RuntimeProfile::Counter * rows_returned_counter_
bool is_cancelled() const
MemTracker * instance_mem_tracker()
boost::scoped_ptr< MemTracker > mem_tracker_
Account for peak memory used by this node.
void FreeLocalAllocations()
TPlanNodeType::type type_
llvm::Value * false_value()
string PrintPlanNodeType(const TPlanNodeType::type &type)
std::string LogUsage(const std::string &prefix="") const
Logs the usage of this tracker and all of its children (recursively).
llvm::Value * GetVal(const char *name="val")
llvm::Type * GetType(const ColumnType &type)
Returns llvm type for the column type.
Status GetCodegen(LlvmCodeGen **codegen, bool initialize=true)
llvm::Value * GetIsNull(const char *name="is_null")
Gets the 'is_null' field of the *Val.
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
llvm::Function * FinalizeFunction(llvm::Function *function)
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
void AddBatch(RowBatch *batch)
Adds a batch to the queue. This is blocking if the queue is full.
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
void AddChild(RuntimeProfile *child, bool indent=true, RuntimeProfile *location=NULL)
virtual Status Open(RuntimeState *state)
std::string runtime_exec_options_
std::vector< ExprContext * > conjunct_ctxs_
virtual void Close(RuntimeState *state)
static void SetDebugOptions(int node_id, TExecNodePhase::type phase, TDebugAction::type action, ExecNode *tree)
Set debug action for node with given id in 'tree'.
llvm::LLVMContext & context()
virtual std::string DebugString() const
const std::vector< ExprContext * > & sort_tuple_slot_expr_ctxs() const
const std::vector< ExprContext * > & rhs_ordering_expr_ctxs() const
Can only be used after calling Open()
RuntimeProfile::Counter * rows_returned_rate_
virtual Status Reset(RuntimeState *state)
RuntimeProfile * runtime_profile()