Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hbase-scan-node.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "hbase-scan-node.h"
16 
17 #include <algorithm>
18 #include <boost/foreach.hpp>
19 
20 #include "runtime/runtime-state.h"
21 #include "runtime/row-batch.h"
22 #include "runtime/string-value.h"
23 #include "runtime/tuple-row.h"
24 #include "runtime/tuple.h"
25 #include "util/jni-util.h"
27 #include "util/runtime-profile.h"
28 #include "gen-cpp/PlanNodes_types.h"
30 
31 #include "common/names.h"
32 using namespace impala;
33 
34 HBaseScanNode::HBaseScanNode(ObjectPool* pool, const TPlanNode& tnode,
35  const DescriptorTbl& descs)
36  : ScanNode(pool, tnode, descs),
37  table_name_(tnode.hbase_scan_node.table_name),
38  tuple_id_(tnode.hbase_scan_node.tuple_id),
39  tuple_desc_(NULL),
40  tuple_idx_(0),
41  filters_(tnode.hbase_scan_node.filters),
42  num_errors_(0),
43  hbase_scanner_(NULL),
44  row_key_slot_(NULL),
45  row_key_binary_encoded_(false),
46  text_converter_(new TextConverter('\\', "", false)),
47  suggested_max_caching_(0) {
48  if (tnode.hbase_scan_node.__isset.suggested_max_caching) {
49  suggested_max_caching_ = tnode.hbase_scan_node.suggested_max_caching;
50  }
51 }
52 
54 }
55 
59 
60  tuple_pool_.reset(new MemPool(mem_tracker()));
61  hbase_scanner_.reset(new HBaseTableScanner(this, state->htable_factory(), state));
62 
64  if (tuple_desc_ == NULL) {
65  // TODO: make sure we print all available diagnostic output to our error log
66  return Status("Failed to get tuple descriptor.");
67  }
68  // The data retrieved from HBase via result_.raw() is sorted by family/qualifier.
69  // The corresponding HBase columns in the Impala metadata are also sorted by
70  // family/qualifier.
71  // Here, we re-order the slots from the query by family/qualifier, exploiting the
72  // know sort order of the columns retrieved from HBase, to avoid family/qualifier
73  // comparisons.
74  const vector<SlotDescriptor*>& slots = tuple_desc_->slots();
75  sorted_non_key_slots_.reserve(slots.size());
76  for (int i = 0; i < slots.size(); ++i) {
77  if (!slots[i]->is_materialized()) continue;
78  if (slots[i]->col_pos() == ROW_KEY) {
79  row_key_slot_ = slots[i];
80  } else {
81  sorted_non_key_slots_.push_back(slots[i]);
82  }
83  }
84  sort(sorted_non_key_slots_.begin(), sorted_non_key_slots_.end(),
86 
87  // Create list of family/qualifier pointers in same sort order as sorted_non_key_slots_.
88  const HBaseTableDescriptor* hbase_table =
89  static_cast<const HBaseTableDescriptor*>(tuple_desc_->table_desc());
90  row_key_binary_encoded_ = hbase_table->cols()[ROW_KEY].binary_encoded;
91  sorted_cols_.reserve(sorted_non_key_slots_.size());
92  for (int i = 0; i < sorted_non_key_slots_.size(); ++i) {
93  sorted_cols_.push_back(&hbase_table->cols()[sorted_non_key_slots_[i]->col_pos()]);
94  }
95 
96  // TODO(marcel): add int tuple_idx_[] indexed by TupleId somewhere in runtime-state.h
97  tuple_idx_ = 0;
98 
99  // Convert TScanRangeParams to ScanRanges
100  DCHECK(scan_range_params_ != NULL)
101  << "Must call SetScanRanges() before calling Prepare()";
102  BOOST_FOREACH(const TScanRangeParams& params, *scan_range_params_) {
103  DCHECK(params.scan_range.__isset.hbase_key_range);
104  const THBaseKeyRange& key_range = params.scan_range.hbase_key_range;
107  if (key_range.__isset.startKey) {
108  sr.set_start_key(key_range.startKey);
109  }
110  if (key_range.__isset.stopKey) {
111  sr.set_stop_key(key_range.stopKey);
112  }
113  }
114  return Status::OK;
115 }
116 
119  RETURN_IF_CANCELLED(state);
121  SCOPED_TIMER(runtime_profile_->total_time_counter());
122  JNIEnv* env = getJNIEnv();
123 
124  // No need to initialize hbase_scanner_ if there are no scan ranges.
125  if (scan_range_vector_.size() == 0) return Status::OK;
126  return hbase_scanner_->StartScan(env, tuple_desc_, scan_range_vector_, filters_);
127 }
128 
130  const string& family, const string& qualifier,
131  void* value, int value_length, SlotDescriptor* slot,
132  RuntimeState* state, bool* error_in_row) {
133  if (!text_converter_->WriteSlot(slot, tuple_,
134  reinterpret_cast<char*>(value), value_length, true, false, tuple_pool_.get())) {
135  *error_in_row = true;
136  if (state->LogHasSpace()) {
137  stringstream ss;
138  ss << "Error converting column " << family
139  << ":" << qualifier << ": "
140  << "'" << string(reinterpret_cast<char*>(value), value_length) << "' TO "
141  << slot->type();
142  state->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
143  }
144  }
145 }
146 
147 Status HBaseScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
148  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
149  RETURN_IF_CANCELLED(state);
151  // For GetNext, most of the time is spent in HBaseTableScanner::ResultScanner_next,
152  // but there's still some considerable time inside here.
153  // TODO: need to understand how the time is spent inside this function.
154  SCOPED_TIMER(runtime_profile_->total_time_counter());
156 
157  if (scan_range_vector_.empty() || ReachedLimit()) {
158  *eos = true;
159  return Status::OK;
160  }
161  *eos = false;
162 
163  // Create new tuple buffer for row_batch.
164  tuple_buffer_size_ = row_batch->MaxTupleBufferSize();
166 
167  // Indicates whether the current row has conversion errors. Used for error reporting.
168  bool error_in_row = false;
169 
170  // Indicates whether there are more rows to process. Set in hbase_scanner_.Next().
171  JNIEnv* env = getJNIEnv();
172  bool has_next = false;
173  while (true) {
174  RETURN_IF_CANCELLED(state);
176  if (ReachedLimit() || row_batch->AtCapacity(tuple_pool_.get())) {
177  // hang on to last allocated chunk in pool, we'll keep writing into it in the
178  // next GetNext() call
179  row_batch->tuple_data_pool()->AcquireData(tuple_pool_.get(), !ReachedLimit());
180  *eos = ReachedLimit();
181  return Status::OK;
182  }
183  RETURN_IF_ERROR(hbase_scanner_->Next(env, &has_next));
184  if (!has_next) {
185  if (num_errors_ > 0) {
186  const HBaseTableDescriptor* hbase_table =
187  static_cast<const HBaseTableDescriptor*> (tuple_desc_->table_desc());
188  state->ReportFileErrors(hbase_table->table_name(), num_errors_);
189  }
190  row_batch->tuple_data_pool()->AcquireData(tuple_pool_.get(), false);
191  *eos = true;
192  return Status::OK;
193  }
194 
195  int row_idx = row_batch->AddRow();
196  TupleRow* row = row_batch->GetRow(row_idx);
197  row->SetTuple(tuple_idx_, tuple_);
198 
199  {
200  // Measure row key and column value materialization time
202 
203  // Write row key slot.
204  if (row_key_slot_ != NULL) {
207  } else {
208  void* key;
209  int key_length;
210  RETURN_IF_ERROR(hbase_scanner_->GetRowKey(env, &key, &key_length));
211  WriteTextSlot("key", "", key, key_length, row_key_slot_, state, &error_in_row);
212  }
213  }
214 
215  // Write non-key slots.
216  for (int i = 0; i < sorted_non_key_slots_.size(); ++i) {
217  if (sorted_cols_[i]->binary_encoded) {
218  RETURN_IF_ERROR(hbase_scanner_->GetValue(env, sorted_cols_[i]->family,
219  sorted_cols_[i]->qualifier, sorted_non_key_slots_[i], tuple_));
220  } else {
221  void* value;
222  int value_length;
223  RETURN_IF_ERROR(hbase_scanner_->GetValue(env, sorted_cols_[i]->family,
224  sorted_cols_[i]->qualifier, &value, &value_length));
225  if (value == NULL) {
226  tuple_->SetNull(sorted_non_key_slots_[i]->null_indicator_offset());
227  } else {
228  WriteTextSlot(sorted_cols_[i]->family, sorted_cols_[i]->qualifier,
229  value, value_length, sorted_non_key_slots_[i], state, &error_in_row);
230  }
231  }
232  }
233  }
234 
235  // Error logging: Flush error stream and add name of HBase table and current row key.
236  if (error_in_row) {
237  error_in_row = false;
238  ++num_errors_;
239  if (state->LogHasSpace()) {
240  stringstream ss;
241  ss << "hbase table: " << table_name_ << endl;
242  void* key;
243  int key_length;
244  hbase_scanner_->GetRowKey(env, &key, &key_length);
245  ss << "row key: " << string(reinterpret_cast<const char*>(key), key_length);
246  state->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
247  }
248  if (state->abort_on_error()) {
249  state->ReportFileErrors(table_name_, 1);
250  return Status(state->ErrorLog());
251  }
252  }
253 
254  if (EvalConjuncts(&conjunct_ctxs_[0], conjunct_ctxs_.size(), row)) {
255  row_batch->CommitLastRow();
258  char* new_tuple = reinterpret_cast<char*>(tuple_);
259  new_tuple += tuple_desc_->byte_size();
260  tuple_ = reinterpret_cast<Tuple*>(new_tuple);
261  } else {
262  // make sure to reset null indicators since we're overwriting
263  // the tuple assembled for the previous row
265  }
267  }
268 
269  return Status::OK;
270 }
271 
273  DCHECK(false) << "NYI";
274  return Status("NYI");
275 }
276 
278  if (is_closed()) return;
279  SCOPED_TIMER(runtime_profile_->total_time_counter());
282 
283  if (hbase_scanner_.get() != NULL) {
284  JNIEnv* env = getJNIEnv();
285  hbase_scanner_->Close(env);
286  }
287  if (tuple_pool_.get() != NULL) tuple_pool_->FreeAll();
288 
289  // Report total number of errors.
291  ExecNode::Close(state);
292 }
293 
294 void HBaseScanNode::DebugString(int indentation_level, stringstream* out) const {
295  *out << string(indentation_level * 2, ' ');
296  *out << "HBaseScanNode(tupleid=" << tuple_id_ << " table=" << table_name_;
297  for(int i = 0; i < scan_range_vector_.size(); i++) {
298  *out << " region(" << i << "):";
300  scan_range.DebugString(0, out);
301  }
302  *out << ")" << endl;
303  for (int i = 0; i < children_.size(); ++i) {
304  children_[i]->DebugString(indentation_level + 1, out);
305  }
306 }
RuntimeProfile::ThreadCounters * scanner_thread_counters() const
Definition: scan-node.h:110
virtual Status Prepare(RuntimeState *state)
const TableDescriptor * table_desc() const
Definition: descriptors.h:304
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
virtual Status Prepare(RuntimeState *state)
Definition: scan-node.cc:44
void SetNull(const NullIndicatorOffset &offset)
Definition: tuple.h:101
static bool ColPathLessThan(const SlotDescriptor *a, const SlotDescriptor *b)
Definition: descriptors.cc:66
int64_t num_rows_returned_
Definition: exec-node.h:223
MemTracker * mem_tracker()
Definition: exec-node.h:162
std::string ErrorLog()
Returns the error log lines as a string joined with ' '.
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
const std::string table_name() const
Definition: descriptors.h:267
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
RuntimeProfile::Counter * read_timer_
Definition: scan-node.h:145
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
void Init(int size)
Definition: tuple.h:58
bool LogHasSpace()
Returns true if the error log has not reached max_errors_.
void set_stop_key(const std::string &key)
#define ADD_TIMER(profile, name)
bool AtCapacity()
Definition: row-batch.h:120
void AcquireData(MemPool *src, bool keep_current)
Definition: mem-pool.cc:161
const std::vector< SlotDescriptor * > & slots() const
Definition: descriptors.h:302
int byte_size() const
Definition: descriptors.h:300
static const std::string TOTAL_HBASE_READ_TIMER
Definition: scan-node.h:124
static Tuple * Create(int size, MemPool *pool)
initialize individual tuple with data residing in mem pool
Definition: tuple.h:51
HBaseTableFactory * htable_factory()
int tuple_idx_
Tuple index in tuple row.
#define COUNTER_ADD(c, v)
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Definition: exec-node.cc:378
bool ReachedLimit()
Definition: exec-node.h:159
TupleDescriptor * GetTupleDescriptor(TupleId id) const
Definition: descriptors.cc:437
std::vector< THBaseFilter > filters_
HBase Filters to be set in HBaseTableScanner.
#define SCOPED_TIMER(c)
int num_errors_
Counts the total number of conversion errors for this table.
virtual Status Open(RuntimeState *state)
Start HBase scan using hbase_scanner_.
std::vector< const HBaseTableDescriptor::HBaseColumnDescriptor * > sorted_cols_
TupleId tuple_id_
Tuple id resolved in Prepare() to set tuple_desc_;.
HBaseScanNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
Definition: exec-node.cc:345
void WriteTextSlot(const std::string &family, const std::string &qualifier, void *value, int value_length, SlotDescriptor *slot, RuntimeState *state, bool *error_in_row)
bool LogError(const ErrorMsg &msg)
Tuple * tuple_
Current tuple.
RuntimeProfile::Counter * rows_read_counter_
rows/tuples read from the scanner (including those discarded by EvalConjucts())
Definition: scan-node.h:144
const ColumnType & type() const
Definition: descriptors.h:78
void ReportFileErrors(const std::string &file_name, int num_errors)
Report that num_errors occurred while parsing file_name.
SlotDescriptor * row_key_slot_
int tuple_buffer_size_
Size of tuple buffer determined by size of tuples and capacity of row batches.
#define RETURN_IF_CANCELLED(state)
boost::scoped_ptr< MemPool > tuple_pool_
Pool for allocating tuple data, including all varying-length slots.
ObjectPool pool
virtual Status Reset(RuntimeState *state)
NYI.
void set_start_key(const std::string &key)
static void StopRateCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'.
HBaseTableScanner::ScanRangeVector scan_range_vector_
scan ranges of a region server
virtual Status QueryMaintenance(RuntimeState *state)
Definition: exec-node.cc:401
const DescriptorTbl & desc_tbl() const
Definition: runtime-state.h:93
bool is_closed()
Definition: exec-node.h:242
Abstract base class of all scan nodes; introduces SetScanRange().
Definition: scan-node.h:77
void CommitLastRow()
Definition: row-batch.h:109
std::vector< ExecNode * > children_
Definition: exec-node.h:214
#define COUNTER_SET(c, v)
MemPool * tuple_data_pool()
Definition: row-batch.h:148
RuntimeProfile::Counter * rows_returned_counter_
Definition: exec-node.h:226
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
boost::scoped_ptr< HBaseTableScanner > hbase_scanner_
Jni helper for scanning an HBase table.
RuntimeProfile::Counter * total_throughput_counter() const
Definition: scan-node.h:98
static const Status OK
Definition: status.h:87
bool row_key_binary_encoded_
True, if row key is binary encoded.
static const int ROW_KEY
Column 0 in the Impala metadata refers to the HBasw row key.
#define SCOPED_THREAD_COUNTER_MEASUREMENT(c)
int MaxTupleBufferSize()
Computes the maximum size needed to store tuple data for this row batch.
Definition: row-batch.cc:325
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
Definition: exec-node.cc:393
std::vector< SlotDescriptor * > sorted_non_key_slots_
List of non-row-key slots sorted by col_pos(). Populated in Prepare().
const std::string table_name_
Name of HBase table (not necessarily the table name mapped to Hive).
RuntimeProfile::TimeSeriesCounter * bytes_read_timeseries_counter_
Time series of the bytes_read_counter_.
Definition: scan-node.h:142
bool abort_on_error() const
Definition: runtime-state.h:99
virtual Status Open(RuntimeState *state)
Definition: exec-node.cc:154
const std::vector< TScanRangeParams > * scan_range_params_
The scan ranges this scan node is responsible for. Not owned.
Definition: scan-node.h:138
std::vector< ExprContext * > conjunct_ctxs_
Definition: exec-node.h:212
virtual void Close(RuntimeState *state)
Definition: exec-node.cc:166
boost::scoped_ptr< TextConverter > text_converter_
Helper class for converting text to other types;.
HBase scan range; "" means unbounded.
const TupleDescriptor * tuple_desc_
Descriptor of tuples read from HBase table.
void DebugString(int indentation_level, std::stringstream *out)
Write debug string of this ScanRange into out.
virtual void Close(RuntimeState *state)
Close the hbase_scanner_, and report errors.
JNIEnv * getJNIEnv(void)
C linkage for helper functions in hdfsJniHelper.h.
static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter *counter)
Stops 'counter' from receiving any more samples.
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161
RuntimeProfile::Counter * materialize_tuple_timer() const
Definition: scan-node.h:104