Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
cross-join-node.cc
Go to the documentation of this file.
1 // Copyright 2013 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/cross-join-node.h"
16 
17 #include <sstream>
18 
19 #include "codegen/llvm-codegen.h"
20 #include "exprs/expr.h"
21 #include "runtime/row-batch.h"
22 #include "runtime/runtime-state.h"
23 #include "util/debug-util.h"
24 #include "util/runtime-profile.h"
25 
26 #include "gen-cpp/PlanNodes_types.h"
27 
28 #include "common/names.h"
29 
30 using namespace impala;
31 using namespace llvm;
32 
34  ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
35  : BlockingJoinNode("CrossJoinNode", TJoinOp::CROSS_JOIN, pool, tnode, descs) {
36 }
37 
39  DCHECK(join_op_ == TJoinOp::CROSS_JOIN);
41  build_batch_pool_.reset(new ObjectPool());
42  return Status::OK;
43 }
44 
47  // TODO: consider resetting object pool only when it reaches a certain size
48  build_batch_pool_.reset(new ObjectPool());
49  return BlockingJoinNode::Reset(state);
50 }
51 
53  if (is_closed()) return;
55  build_batch_pool_.reset();
57 }
58 
60  // Do a full scan of child(1) and store all build row batches.
61  RETURN_IF_ERROR(child(1)->Open(state));
62  while (true) {
63  RowBatch* batch = build_batch_pool_->Add(
64  new RowBatch(child(1)->row_desc(), state->batch_size(), mem_tracker()));
65  RETURN_IF_CANCELLED(state);
67  bool eos;
68  RETURN_IF_ERROR(child(1)->GetNext(state, batch, &eos));
69  DCHECK_EQ(batch->num_io_buffers(), 0) << "Build batch should be compact.";
74  static_cast<int64_t>(build_batches_.total_num_rows()));
75  if (eos) break;
76  }
77  return Status::OK;
78 }
79 
82  return Status::OK;
83 }
84 
85 Status CrossJoinNode::GetNext(RuntimeState* state, RowBatch* output_batch, bool* eos) {
86  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
87  SCOPED_TIMER(runtime_profile_->total_time_counter());
88  if (ReachedLimit() || eos_) {
89  *eos = true;
90  return Status::OK;
91  }
92  *eos = false;
93 
95  while (!eos_) {
96  RETURN_IF_CANCELLED(state);
98 
99  // Compute max rows that should be added to output_batch
100  int64_t max_added_rows = output_batch->capacity() - output_batch->num_rows();
101  if (limit() != -1) max_added_rows = min(max_added_rows, limit() - rows_returned());
102 
103  // Continue processing this row batch
105  ProcessLeftChildBatch(output_batch, probe_batch_.get(), max_added_rows);
107 
108  if (ReachedLimit() || output_batch->AtCapacity()) {
109  *eos = ReachedLimit();
110  break;
111  }
112 
113  // Check to see if we're done processing the current left child batch
114  if (current_build_row_.AtEnd() && probe_batch_pos_ == probe_batch_->num_rows()) {
115  probe_batch_->TransferResourceOwnership(output_batch);
116  probe_batch_pos_ = 0;
117  if (output_batch->AtCapacity()) break;
118  if (probe_side_eos_) {
119  *eos = eos_ = true;
120  break;
121  } else {
122  timer.Stop();
124  timer.Start();
126  }
127  }
128  }
129 
130  return Status::OK;
131 }
132 
134  stringstream out;
135  out << "BuildList(";
137  out << ")";
138  return out.str();
139 }
140 
141 // TODO: this can be replaced with a codegen'd function
143  int max_added_rows) {
144  int row_idx = output_batch->AddRows(max_added_rows);
145  DCHECK(row_idx != RowBatch::INVALID_ROW_INDEX);
146  uint8_t* output_row_mem = reinterpret_cast<uint8_t*>(output_batch->GetRow(row_idx));
147  TupleRow* output_row = reinterpret_cast<TupleRow*>(output_row_mem);
148 
149  int rows_returned = 0;
150  ExprContext* const* ctxs = &conjunct_ctxs_[0];
151 
152  while (true) {
153  while (!current_build_row_.AtEnd()) {
156 
157  if (!EvalConjuncts(ctxs, conjunct_ctxs_.size(), output_row)) continue;
158  ++rows_returned;
159  // Filled up out batch or hit limit
160  if (UNLIKELY(rows_returned == max_added_rows)) goto end;
161  // Advance to next out row
162  output_row_mem += output_batch->row_byte_size();
163  output_row = reinterpret_cast<TupleRow*>(output_row_mem);
164  }
165 
166  DCHECK(current_build_row_.AtEnd());
167  // Advance to the next row in the left child batch
168  if (UNLIKELY(probe_batch_pos_ == batch->num_rows())) goto end;
171  }
172 
173 end:
174  output_batch->CommitRows(rows_returned);
175  return rows_returned;
176 }
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
boost::scoped_ptr< ObjectPool > build_batch_pool_
Object pool for build RowBatches, stores all BuildBatches in build_rows_.
TupleRowIterator Iterator()
Returns a new iterator over all the tuple rows.
int num_rows() const
Definition: row-batch.h:215
int AddRows(int n)
Definition: row-batch.h:94
int64_t num_rows_returned_
Definition: exec-node.h:223
MemTracker * mem_tracker()
Definition: exec-node.h:162
void CreateOutputRow(TupleRow *out_row, TupleRow *probe_row, TupleRow *build_row)
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
int num_io_buffers() const
Definition: row-batch.h:149
virtual Status Prepare(RuntimeState *state)
void AddRowBatch(RowBatch *row_batch)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
CrossJoinNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
virtual void Close(RuntimeState *state)
bool AtCapacity()
Definition: row-batch.h:120
virtual Status Prepare(RuntimeState *state)
RuntimeProfile::Counter * build_timer_
const RowDescriptor & row_desc() const
Definition: exec-node.h:156
#define COUNTER_ADD(c, v)
int row_byte_size()
Definition: row-batch.h:147
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Definition: exec-node.cc:378
bool ReachedLimit()
Definition: exec-node.h:159
virtual Status Reset(RuntimeState *state)
void Reset()
Resets the list.
#define SCOPED_TIMER(c)
RuntimeProfile::Counter * probe_timer_
RuntimeProfile::Counter * probe_row_counter_
bool AtEnd()
Returns true if this iterator is at the end, i.e. GetRow() cannot be called.
std::string BuildListDebugString()
#define RETURN_IF_CANCELLED(state)
RowBatchList build_batches_
List of build batches, constructed in Prepare()
ObjectPool pool
int ProcessLeftChildBatch(RowBatch *output_batch, RowBatch *batch, int max_added_rows)
virtual Status Reset(RuntimeState *state)
virtual Status QueryMaintenance(RuntimeState *state)
Definition: exec-node.cc:401
#define VLOG_ROW
Definition: logging.h:59
bool is_closed()
Definition: exec-node.h:242
virtual Status Open(RuntimeState *state)
int64_t rows_returned() const
Definition: exec-node.h:157
#define COUNTER_SET(c, v)
int batch_size() const
Definition: runtime-state.h:98
RuntimeProfile::Counter * rows_returned_counter_
Definition: exec-node.h:226
ExecNode * child(int i)
Definition: exec-node.h:241
virtual Status InitGetNext(TupleRow *first_left_row)
int capacity() const
Definition: row-batch.h:216
std::string DebugString(const RowDescriptor &desc)
Outputs a debug string containing the contents of the list.
#define UNLIKELY(expr)
Definition: compiler-util.h:33
RowBatchList::TupleRowIterator current_build_row_
static const Status OK
Definition: status.h:87
virtual void Close(RuntimeState *state)
virtual Status ConstructBuildSide(RuntimeState *state)
RuntimeProfile::Counter * build_row_counter_
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
Definition: exec-node.cc:393
boost::scoped_ptr< RowBatch > probe_batch_
std::vector< ExprContext * > conjunct_ctxs_
Definition: exec-node.h:212
void Next()
Increments the iterator. No-op if the iterator is at the end.
void CommitRows(int n)
Definition: row-batch.h:102
int64_t limit() const
Definition: exec-node.h:158
static const int INVALID_ROW_INDEX
Definition: row-batch.h:87
int64_t total_num_rows()
Returns the total number of rows in all row batches.