Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
blocking-join-node.cc
Go to the documentation of this file.
1 // Copyright 2013 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <sstream>
18 
19 #include "exprs/expr.h"
20 #include "runtime/row-batch.h"
21 #include "runtime/runtime-state.h"
22 #include "util/debug-util.h"
23 #include "util/runtime-profile.h"
24 
25 #include "gen-cpp/PlanNodes_types.h"
26 
27 #include "common/names.h"
28 
29 using namespace impala;
30 using namespace llvm;
31 
32 const char* BlockingJoinNode::LLVM_CLASS_NAME = "class.impala::BlockingJoinNode";
33 
34 BlockingJoinNode::BlockingJoinNode(const string& node_name, const TJoinOp::type join_op,
35  ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
36  : ExecNode(pool, tnode, descs),
37  node_name_(node_name),
38  join_op_(join_op),
39  eos_(false),
40  probe_side_eos_(false),
41  semi_join_staging_row_(NULL),
42  can_add_probe_filters_(false) {
43 }
44 
45 Status BlockingJoinNode::Init(const TPlanNode& tnode) {
47  return Status::OK;
48 }
49 
51  // probe_batch_ must be cleaned up in Close() to ensure proper resource freeing.
52  DCHECK(probe_batch_ == NULL);
53 }
54 
56  SCOPED_TIMER(runtime_profile_->total_time_counter());
58 
59  build_pool_.reset(new MemPool(mem_tracker()));
60  build_timer_ = ADD_TIMER(runtime_profile(), "BuildTime");
61  probe_timer_ = ADD_TIMER(runtime_profile(), "ProbeTime");
62  build_row_counter_ = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT);
63  probe_row_counter_ = ADD_COUNTER(runtime_profile(), "ProbeRows", TUnit::UNIT);
64 
65  // Validate the row desc layout is what we expect because the current join
66  // implementation relies on it to enable some optimizations.
67  int num_left_tuples = child(0)->row_desc().tuple_descriptors().size();
68  int num_build_tuples = child(1)->row_desc().tuple_descriptors().size();
69 
70 #ifndef NDEBUG
71  switch (join_op_) {
72  case TJoinOp::LEFT_ANTI_JOIN:
73  case TJoinOp::LEFT_SEMI_JOIN:
74  case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN: {
75  // Only return the surviving probe-side tuples.
76  DCHECK(row_desc().Equals(child(0)->row_desc()));
77  break;
78  }
79  case TJoinOp::RIGHT_ANTI_JOIN:
80  case TJoinOp::RIGHT_SEMI_JOIN: {
81  // Only return the surviving build-side tuples.
82  DCHECK(row_desc().Equals(child(1)->row_desc()));
83  break;
84  }
85  default: {
86  // The join node returns a row that is a concatenation of the left side and build
87  // side row desc's. For example if the probe row had 1 tuple and the build row had
88  // 2, the resulting row desc of the join node would have 3 tuples with:
89  // result[0] = left[0]
90  // result[1] = build[0]
91  // result[2] = build[1]
92  for (int i = 0; i < num_left_tuples; ++i) {
94  DCHECK_EQ(i, row_desc().GetTupleIdx(desc->id()));
95  }
96  for (int i = 0; i < num_build_tuples; ++i) {
98  DCHECK_EQ(num_left_tuples + i, row_desc().GetTupleIdx(desc->id()));
99  }
100  break;
101  }
102  }
103 #endif
104 
105  probe_tuple_row_size_ = num_left_tuples * sizeof(Tuple*);
106  build_tuple_row_size_ = num_build_tuples * sizeof(Tuple*);
107 
108  if (join_op_ == TJoinOp::LEFT_ANTI_JOIN || join_op_ == TJoinOp::LEFT_SEMI_JOIN ||
109  join_op_ == TJoinOp::RIGHT_ANTI_JOIN || join_op_ == TJoinOp::RIGHT_SEMI_JOIN ||
110  join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
111  semi_join_staging_row_ = reinterpret_cast<TupleRow*>(
113  }
114 
115  probe_batch_.reset(
116  new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
117  return Status::OK;
118 }
119 
121  eos_ = false;
122  probe_side_eos_ = false;
123  probe_batch_->Reset();
124  build_pool_->FreeAll();
125  return ExecNode::Reset(state);
126 }
127 
129  if (is_closed()) return;
130  if (build_pool_.get() != NULL) build_pool_->FreeAll();
131  probe_batch_.reset();
132  if (semi_join_staging_row_ != NULL) delete[] semi_join_staging_row_;
133  ExecNode::Close(state);
134 }
135 
137  Status s;
138  {
139  SCOPED_TIMER(state->total_cpu_timer());
140  SCOPED_TIMER(runtime_profile()->total_async_timer());
141  s = ConstructBuildSide(state);
142  }
143  // IMPALA-1863: If the build-side thread failed, then we need to close the right
144  // (build-side) child to avoid a potential deadlock between fragment instances. This
145  // is safe to do because while the build may have partially completed, it will not be
146  // probed. BlockJoinNode::Open() will return failure as soon as child(0)->Open()
147  // completes.
148  if (!s.ok()) child(1)->Close(state);
149  // Release the thread token as soon as possible (before the main thread joins
150  // on it). This way, if we had a chain of 10 joins using 1 additional thread,
151  // we'd keep the additional thread busy the whole time.
152  state->resource_pool()->ReleaseThreadToken(false);
153  status->Set(s);
154 }
155 
157  SCOPED_TIMER(runtime_profile_->total_time_counter());
159  RETURN_IF_CANCELLED(state);
161 
162  // If we can get a thread token, initiate the construction of the build-side table in
163  // a separate thread, so that the left child can do any initialisation in parallel.
164  // Otherwise, do this in the main thread.
165  if (state->resource_pool()->TryAcquireThreadToken()) {
166  Promise<Status> build_side_status;
167  AddRuntimeExecOption("Join Build-Side Prepared Asynchronously");
168  Thread build_thread(node_name_, "build thread",
169  bind(&BlockingJoinNode::BuildSideThread, this, state, &build_side_status));
170  if (!state->cgroup().empty()) {
171  Status status = state->exec_env()->cgroups_mgr()->AssignThreadToCgroup(
172  build_thread, state->cgroup());
173  // If AssignThreadToCgroup() failed, we still need to wait for the build-side
174  // thread to complete before returning, so just log that error.
175  if (!status.ok()) state->LogError(status.msg());
176  }
177  // Open the left child so that it may perform any initialisation in parallel.
178  // Don't exit even if we see an error, we still need to wait for the build thread
179  // to finish.
180  Status open_status = child(0)->Open(state);
181  // Blocks until ConstructBuildSide has returned, after which the build side structures
182  // are fully constructed.
183  RETURN_IF_ERROR(build_side_status.Get());
184  RETURN_IF_ERROR(open_status);
185  } else {
187  RETURN_IF_ERROR(child(0)->Open(state));
188  }
189 
190  // Seed left child in preparation for GetNext().
191  while (true) {
194  probe_batch_pos_ = 0;
195  if (probe_batch_->num_rows() == 0) {
196  if (probe_side_eos_) {
197  RETURN_IF_ERROR(InitGetNext(NULL /* eos */));
198  eos_ = true;
199  break;
200  }
201  probe_batch_->Reset();
202  continue;
203  } else {
204  current_probe_row_ = probe_batch_->GetRow(probe_batch_pos_++);
206  break;
207  }
208  }
209  return Status::OK;
210 }
211 
212 void BlockingJoinNode::DebugString(int indentation_level, stringstream* out) const {
213  *out << string(indentation_level * 2, ' ');
214  *out << node_name_;
215  *out << "(eos=" << (eos_ ? "true" : "false")
216  << " probe_batch_pos=" << probe_batch_pos_;
217  AddToDebugString(indentation_level, out);
218  ExecNode::DebugString(indentation_level, out);
219  *out << ")";
220 }
221 
223  stringstream out;
224  out << "[";
225  int num_probe_tuple_rows = child(0)->row_desc().tuple_descriptors().size();
226  for (int i = 0; i < row_desc().tuple_descriptors().size(); ++i) {
227  if (i != 0) out << " ";
228  if (i >= num_probe_tuple_rows) {
229  // Build row is not yet populated, print NULL
230  out << PrintTuple(NULL, *row_desc().tuple_descriptors()[i]);
231  } else {
232  out << PrintTuple(row->GetTuple(i), *row_desc().tuple_descriptors()[i]);
233  }
234  }
235  out << "]";
236  return out.str();
237 }
238 
239 // This function is replaced by codegen
241  uint8_t* out_ptr = reinterpret_cast<uint8_t*>(out);
242  if (probe == NULL) {
243  memset(out_ptr, 0, probe_tuple_row_size_);
244  } else {
245  memcpy(out_ptr, probe, probe_tuple_row_size_);
246  }
247  if (build == NULL) {
248  memset(out_ptr + probe_tuple_row_size_, 0, build_tuple_row_size_);
249  } else {
250  memcpy(out_ptr + probe_tuple_row_size_, build, build_tuple_row_size_);
251  }
252 }
void AddRuntimeExecOption(const std::string &option)
Appends option to 'runtime_exec_options_'.
Definition: exec-node.cc:188
Tuple * GetTuple(int tuple_idx)
Definition: tuple-row.h:30
virtual void AddToDebugString(int indentation_level, std::stringstream *out) const
MemTracker * mem_tracker()
Definition: exec-node.h:162
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
bool TryAcquireThreadToken(bool *is_reserved=NULL)
void Set(const T &val)
Definition: promise.h:38
void CreateOutputRow(TupleRow *out_row, TupleRow *probe_row, TupleRow *build_row)
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
virtual Status Prepare(RuntimeState *state)
BlockingJoinNode(const std::string &node_name, const TJoinOp::type join_op, ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
virtual Status Init(const TPlanNode &tnode)
Definition: exec-node.cc:124
#define ADD_TIMER(profile, name)
virtual void Close(RuntimeState *state)
virtual Status Init(const TPlanNode &tnode)
RuntimeProfile::Counter * build_timer_
const std::string & cgroup() const
const RowDescriptor & row_desc() const
Definition: exec-node.h:156
#define COUNTER_ADD(c, v)
void BuildSideThread(RuntimeState *state, Promise< Status > *status)
#define SCOPED_TIMER(c)
RuntimeProfile::Counter * probe_timer_
boost::scoped_ptr< MemPool > build_pool_
CgroupsMgr * cgroups_mgr()
Definition: exec-env.h:88
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
Definition: exec-node.cc:345
RuntimeProfile::Counter * probe_row_counter_
bool LogError(const ErrorMsg &msg)
#define RETURN_IF_CANCELLED(state)
Status AssignThreadToCgroup(const Thread &thread, const std::string &cgroup) const
Definition: cgroups-mgr.cc:142
ObjectPool pool
virtual Status Prepare(RuntimeState *state)
Definition: exec-node.cc:130
#define ADD_COUNTER(profile, name, unit)
virtual Status ConstructBuildSide(RuntimeState *state)=0
virtual Status Reset(RuntimeState *state)
virtual Status QueryMaintenance(RuntimeState *state)
Definition: exec-node.cc:401
bool is_closed()
Definition: exec-node.h:242
virtual Status Open(RuntimeState *state)
int batch_size() const
Definition: runtime-state.h:98
const T & Get()
Definition: promise.h:59
const std::string node_name_
TupleId id() const
Definition: descriptors.h:306
ExecNode * child(int i)
Definition: exec-node.h:241
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
Definition: descriptors.h:412
std::string GetLeftChildRowString(TupleRow *row)
static const char * LLVM_CLASS_NAME
static const Status OK
Definition: status.h:87
RuntimeProfile::Counter * build_row_counter_
const ErrorMsg & msg() const
Returns the error message associated with a non-successful status.
Definition: status.h:189
virtual Status Open(RuntimeState *state)
Definition: exec-node.cc:154
boost::scoped_ptr< RowBatch > probe_batch_
bool ok() const
Definition: status.h:172
string PrintTuple(const Tuple *t, const TupleDescriptor &d)
Definition: debug-util.cc:166
virtual Status InitGetNext(TupleRow *first_left_child_row)=0
ThreadResourceMgr::ResourcePool * resource_pool()
virtual void Close(RuntimeState *state)
Definition: exec-node.cc:166
RuntimeProfile::Counter * total_cpu_timer()
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)=0
virtual Status Reset(RuntimeState *state)
Definition: exec-node.cc:159
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161