Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hbase-table-scanner.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 #include "exec/hbase-scan-node.h"
17 
18 #include <cstring>
19 #include <algorithm>
20 
21 #include "util/bit-util.h"
22 #include "util/jni-util.h"
23 #include "runtime/descriptors.h"
24 #include "runtime/runtime-state.h"
25 #include "runtime/mem-pool.h"
26 #include "runtime/tuple.h"
27 
28 #include "common/names.h"
29 
30 using namespace impala;
31 
32 jclass HBaseTableScanner::scan_cl_ = NULL;
34 jclass HBaseTableScanner::result_cl_ = NULL;
35 jclass HBaseTableScanner::cell_cl_ = NULL;
42 jmethodID HBaseTableScanner::scan_ctor_ = NULL;
69 jobject HBaseTableScanner::empty_row_ = NULL;
71 jobjectArray HBaseTableScanner::compare_ops_ = NULL;
72 
73 void HBaseTableScanner::ScanRange::DebugString(int indentation_level,
74  stringstream* out) {
75  *out << string(indentation_level * 2, ' ');
76  if (!start_key_.empty()) {
77  *out << " start_key=" << start_key_;
78  }
79  if (!stop_key_.empty()) {
80  *out << " stop_key=" << stop_key_;
81  }
82 }
83 
85  HBaseScanNode* scan_node, HBaseTableFactory* htable_factory, RuntimeState* state)
86  : scan_node_(scan_node),
87  state_(state),
88  htable_factory_(htable_factory),
89  htable_(NULL),
90  scan_(NULL),
91  resultscanner_(NULL),
92  cells_(NULL),
93  cell_index_(0),
96  num_cells_(0),
97  all_cells_present_(false),
98  value_pool_(new MemPool(scan_node_->mem_tracker())),
99  scan_setup_timer_(ADD_TIMER(scan_node_->runtime_profile(),
100  "HBaseTableScanner.ScanSetup")) {
101  const TQueryOptions& query_option = state->query_options();
102  if (query_option.__isset.hbase_caching && query_option.hbase_caching > 0) {
103  rows_cached_ = query_option.hbase_caching;
104  } else {
105  int max_caching = scan_node_->suggested_max_caching();
106  rows_cached_ = (max_caching > 0 && max_caching < DEFAULT_ROWS_CACHED) ?
107  max_caching : DEFAULT_ROWS_CACHED;
108  }
109  cache_blocks_ = query_option.__isset.hbase_cache_blocks &&
110  query_option.hbase_cache_blocks;
111 }
112 
114  // Get the JNIEnv* corresponding to current thread.
115  JNIEnv* env = getJNIEnv();
116  if (env == NULL) {
117  return Status("Failed to get/create JVM");
118  }
119 
120  // Global class references:
121  // HTable, Scan, ResultScanner, Result, ImmutableBytesWritable, HConstants.
123  JniUtil::GetGlobalClassRef(env, "org/apache/hadoop/hbase/client/Scan", &scan_cl_));
125  JniUtil::GetGlobalClassRef(env, "org/apache/hadoop/hbase/client/ResultScanner",
128  JniUtil::GetGlobalClassRef(env, "org/apache/hadoop/hbase/client/Result",
129  &result_cl_));
131  JniUtil::GetGlobalClassRef(env, "org/apache/hadoop/hbase/HConstants",
132  &hconstants_cl_));
134  JniUtil::GetGlobalClassRef(env, "org/apache/hadoop/hbase/filter/FilterList",
135  &filter_list_cl_));
138  "org/apache/hadoop/hbase/filter/FilterList$Operator", &filter_list_op_cl_));
141  "org/apache/hadoop/hbase/filter/SingleColumnValueFilter",
145  "org/apache/hadoop/hbase/filter/CompareFilter$CompareOp",
146  &compare_op_cl_));
149  "org/apache/hadoop/hbase/client/ScannerTimeoutException",
151 
152  // Distinguish HBase versions by checking for the existence of the Cell class.
153  // HBase 0.95.2: Use Cell class and corresponding methods.
154  // HBase prior to 0.95.2: Use the KeyValue class and Cell-equivalent methods.
155  bool has_cell_class = JniUtil::ClassExists(env, "org/apache/hadoop/hbase/Cell");
156  if (has_cell_class) {
157  LOG(INFO) << "Detected HBase version >= 0.95.2";
159  JniUtil::GetGlobalClassRef(env, "org/apache/hadoop/hbase/Cell", &cell_cl_));
160  } else {
161  // Assume a non-CDH5 HBase version because the Cell class wasn't found.
162  LOG(INFO) << "Detected HBase version < 0.95.2";
164  JniUtil::GetGlobalClassRef(env, "org/apache/hadoop/hbase/KeyValue",
165  &cell_cl_));
166  }
167 
168  // Scan method ids.
169  scan_ctor_ = env->GetMethodID(scan_cl_, "<init>", "()V");
170  RETURN_ERROR_IF_EXC(env);
171  scan_set_max_versions_id_ = env->GetMethodID(scan_cl_, "setMaxVersions",
172  "(I)Lorg/apache/hadoop/hbase/client/Scan;");
173  RETURN_ERROR_IF_EXC(env);
174  scan_set_caching_id_ = env->GetMethodID(scan_cl_, "setCaching", "(I)V");
175  RETURN_ERROR_IF_EXC(env);
176  scan_set_cache_blocks_id_ = env->GetMethodID(scan_cl_, "setCacheBlocks", "(Z)V");
177  RETURN_ERROR_IF_EXC(env);
178  scan_add_column_id_ = env->GetMethodID(scan_cl_, "addColumn",
179  "([B[B)Lorg/apache/hadoop/hbase/client/Scan;");
180  RETURN_ERROR_IF_EXC(env);
181  scan_set_filter_id_ = env->GetMethodID(scan_cl_, "setFilter",
182  "(Lorg/apache/hadoop/hbase/filter/Filter;)Lorg/apache/hadoop/hbase/client/Scan;");
183  RETURN_ERROR_IF_EXC(env);
184  scan_set_start_row_id_ = env->GetMethodID(scan_cl_, "setStartRow",
185  "([B)Lorg/apache/hadoop/hbase/client/Scan;");
186  RETURN_ERROR_IF_EXC(env);
187  scan_set_stop_row_id_ = env->GetMethodID(scan_cl_, "setStopRow",
188  "([B)Lorg/apache/hadoop/hbase/client/Scan;");
189  RETURN_ERROR_IF_EXC(env);
190 
191  // ResultScanner method ids.
192  resultscanner_next_id_ = env->GetMethodID(resultscanner_cl_, "next",
193  "()Lorg/apache/hadoop/hbase/client/Result;");
194  RETURN_ERROR_IF_EXC(env);
195  resultscanner_close_id_ = env->GetMethodID(resultscanner_cl_, "close", "()V");
196  RETURN_ERROR_IF_EXC(env);
197 
198  // Result method ids.
199  if (has_cell_class) {
200  result_raw_cells_id_ = env->GetMethodID(result_cl_, "rawCells",
201  "()[Lorg/apache/hadoop/hbase/Cell;");
202  } else {
203  result_raw_cells_id_ = env->GetMethodID(result_cl_, "raw",
204  "()[Lorg/apache/hadoop/hbase/KeyValue;");
205  }
206  RETURN_ERROR_IF_EXC(env);
207  result_isempty_id_ = env->GetMethodID(result_cl_, "isEmpty", "()Z");
208  RETURN_ERROR_IF_EXC(env);
209 
210 
211  // Cell or equivalent KeyValue method ids.
212  // Method ids to retrieve buffers backing different portions of row data.
213  if (has_cell_class) {
214  cell_get_row_array_ = env->GetMethodID(cell_cl_, "getRowArray", "()[B");
215  RETURN_ERROR_IF_EXC(env);
216  cell_get_family_array_ = env->GetMethodID(cell_cl_, "getFamilyArray", "()[B");
217  RETURN_ERROR_IF_EXC(env);
218  cell_get_qualifier_array_ = env->GetMethodID(cell_cl_, "getQualifierArray", "()[B");
219  RETURN_ERROR_IF_EXC(env);
220  cell_get_value_array_ = env->GetMethodID(cell_cl_, "getValueArray", "()[B");
221  RETURN_ERROR_IF_EXC(env);
222  } else {
223  // In HBase versions prior to 0.95.2 all data from a row is backed by the same buffer
226  env->GetMethodID(cell_cl_, "getBuffer", "()[B");
227  RETURN_ERROR_IF_EXC(env);
228  }
229  // Method ids for retrieving lengths and offsets into buffers backing different
230  // portions of row data. Both the Cell and KeyValue classes support these methods.
231  cell_get_family_offset_id_ = env->GetMethodID(cell_cl_, "getFamilyOffset", "()I");
232  RETURN_ERROR_IF_EXC(env);
233  cell_get_family_length_id_ = env->GetMethodID(cell_cl_, "getFamilyLength", "()B");
234  RETURN_ERROR_IF_EXC(env);
236  env->GetMethodID(cell_cl_, "getQualifierOffset", "()I");
237  RETURN_ERROR_IF_EXC(env);
239  env->GetMethodID(cell_cl_, "getQualifierLength", "()I");
240  RETURN_ERROR_IF_EXC(env);
241  cell_get_row_offset_id_ = env->GetMethodID(cell_cl_, "getRowOffset", "()I");
242  RETURN_ERROR_IF_EXC(env);
243  cell_get_row_length_id_ = env->GetMethodID(cell_cl_, "getRowLength", "()S");
244  RETURN_ERROR_IF_EXC(env);
245  cell_get_value_offset_id_ = env->GetMethodID(cell_cl_, "getValueOffset", "()I");
246  RETURN_ERROR_IF_EXC(env);
247  cell_get_value_length_id_ = env->GetMethodID(cell_cl_, "getValueLength", "()I");
248  RETURN_ERROR_IF_EXC(env);
249 
250  // HConstants fields.
251  jfieldID empty_start_row_id =
252  env->GetStaticFieldID(hconstants_cl_, "EMPTY_START_ROW", "[B");
253  RETURN_ERROR_IF_EXC(env);
254  empty_row_ = env->GetStaticObjectField(hconstants_cl_, empty_start_row_id);
256 
257  // FilterList method ids.
258  filter_list_ctor_ = env->GetMethodID(filter_list_cl_, "<init>",
259  "(Lorg/apache/hadoop/hbase/filter/FilterList$Operator;)V");
260  RETURN_ERROR_IF_EXC(env);
261  filter_list_add_filter_id_ = env->GetMethodID(filter_list_cl_, "addFilter",
262  "(Lorg/apache/hadoop/hbase/filter/Filter;)V");
263  RETURN_ERROR_IF_EXC(env);
264 
265  // FilterList.Operator fields.
266  jfieldID must_pass_all_id = env->GetStaticFieldID(filter_list_op_cl_, "MUST_PASS_ALL",
267  "Lorg/apache/hadoop/hbase/filter/FilterList$Operator;");
268  RETURN_ERROR_IF_EXC(env);
269  must_pass_all_op_ = env->GetStaticObjectField(filter_list_op_cl_, must_pass_all_id);
271 
272  // SingleColumnValueFilter method ids.
274  env->GetMethodID(single_column_value_filter_cl_, "<init>",
275  "([B[BLorg/apache/hadoop/hbase/filter/CompareFilter$CompareOp;[B)V");
276  RETURN_ERROR_IF_EXC(env);
277 
278  // Get op array from CompareFilter.CompareOp.
279  jmethodID compare_op_values = env->GetStaticMethodID(compare_op_cl_, "values",
280  "()[Lorg/apache/hadoop/hbase/filter/CompareFilter$CompareOp;");
281  RETURN_ERROR_IF_EXC(env);
282  compare_ops_ =
283  (jobjectArray) env->CallStaticObjectMethod(compare_op_cl_, compare_op_values);
284  RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, reinterpret_cast<jobject>(compare_ops_),
285  reinterpret_cast<jobject*>(&compare_ops_)));
286 
287  return Status::OK;
288 }
289 
290 Status HBaseTableScanner::ScanSetup(JNIEnv* env, const TupleDescriptor* tuple_desc,
291  const vector<THBaseFilter>& filters) {
293  JniLocalFrame jni_frame;
294  RETURN_IF_ERROR(jni_frame.push(env));
295 
296  const HBaseTableDescriptor* hbase_table =
297  static_cast<const HBaseTableDescriptor*>(tuple_desc->table_desc());
298  // Use global cache of HTables.
299  RETURN_IF_ERROR(htable_factory_->GetTable(hbase_table->table_name(),
300  &htable_));
301 
302  // Setup an Scan object without the range
303  // scan_ = new Scan();
304  DCHECK(scan_ == NULL);
305  scan_ = env->NewObject(scan_cl_, scan_ctor_);
306  RETURN_ERROR_IF_EXC(env);
307  scan_ = env->NewGlobalRef(scan_);
308 
309  // scan_.setMaxVersions(1);
310  env->CallObjectMethod(scan_, scan_set_max_versions_id_, 1);
311  RETURN_ERROR_IF_EXC(env);
312 
313  // scan_.setCaching(rows_cached_);
314  env->CallObjectMethod(scan_, scan_set_caching_id_, rows_cached_);
315  RETURN_ERROR_IF_EXC(env);
316 
317  // scan_.setCacheBlocks(cache_blocks_);
318  env->CallObjectMethod(scan_, scan_set_cache_blocks_id_, cache_blocks_);
319  RETURN_ERROR_IF_EXC(env);
320 
321  const vector<SlotDescriptor*>& slots = tuple_desc->slots();
322  // Restrict scan to materialized families/qualifiers.
323  for (int i = 0; i < slots.size(); ++i) {
324  if (!slots[i]->is_materialized()) continue;
325  const string& family = hbase_table->cols()[slots[i]->col_pos()].family;
326  const string& qualifier = hbase_table->cols()[slots[i]->col_pos()].qualifier;
327  // The row key has an empty qualifier.
328  if (qualifier.empty()) continue;
329  JniLocalFrame jni_frame;
330  RETURN_IF_ERROR(jni_frame.push(env));
331  jbyteArray family_bytes;
332  RETURN_IF_ERROR(CreateByteArray(env, family, &family_bytes));
333  jbyteArray qualifier_bytes;
334  RETURN_IF_ERROR(CreateByteArray(env, qualifier, &qualifier_bytes));
335  // scan_.addColumn(family_bytes, qualifier_bytes);
336  env->CallObjectMethod(scan_, scan_add_column_id_, family_bytes, qualifier_bytes);
337  RETURN_ERROR_IF_EXC(env);
338  }
339 
340  // circumvent hbase bug: make sure to select all cols that have filters,
341  // otherwise the filter may not get applied;
342  // see HBASE-4364 (https://issues.apache.org/jira/browse/HBASE-4364)
344  for (vector<THBaseFilter>::const_iterator it = filters.begin(); it != filters.end();
345  ++it) {
346  bool requested = false;
347  for (int i = 0; i < slots.size(); ++i) {
348  if (!slots[i]->is_materialized()) continue;
349  const string& family = hbase_table->cols()[slots[i]->col_pos()].family;
350  const string& qualifier = hbase_table->cols()[slots[i]->col_pos()].qualifier;
351  if (family == it->family && qualifier == it->qualifier) {
352  requested = true;
353  break;
354  }
355  }
356  if (requested) continue;
357  JniLocalFrame jni_frame;
358  RETURN_IF_ERROR(jni_frame.push(env));
359  jbyteArray family_bytes;
360  RETURN_IF_ERROR(CreateByteArray(env, it->family, &family_bytes));
361  jbyteArray qualifier_bytes;
362  RETURN_IF_ERROR(CreateByteArray(env, it->qualifier, &qualifier_bytes));
363  // scan_.addColumn(family_bytes, qualifier_bytes);
364  env->CallObjectMethod(scan_, scan_add_column_id_, family_bytes, qualifier_bytes);
365  RETURN_ERROR_IF_EXC(env);
367  }
368 
369  // Add HBase Filters.
370  if (!filters.empty()) {
371  // filter_list = new FilterList(Operator.MUST_PASS_ALL);
372  jobject filter_list =
374  RETURN_ERROR_IF_EXC(env);
375  vector<THBaseFilter>::const_iterator it;
376  for (it = filters.begin(); it != filters.end(); ++it) {
377  JniLocalFrame jni_frame;
378  RETURN_IF_ERROR(jni_frame.push(env));
379  // hbase_op = CompareFilter.CompareOp.values()[it->op_ordinal];
380  jobject hbase_op = env->GetObjectArrayElement(compare_ops_, it->op_ordinal);
381  RETURN_ERROR_IF_EXC(env);
382  jbyteArray family_bytes;
383  RETURN_IF_ERROR(CreateByteArray(env, it->family, &family_bytes));
384  jbyteArray qualifier_bytes;
385  RETURN_IF_ERROR(CreateByteArray(env, it->qualifier, &qualifier_bytes));
386  jbyteArray value_bytes;
387  RETURN_IF_ERROR(CreateByteArray(env, it->filter_constant, &value_bytes));
388  // filter = new SingleColumnValueFilter(family_bytes, qualifier_bytes, hbase_op,
389  // value_bytes);
390  jobject filter = env->NewObject(single_column_value_filter_cl_,
391  single_column_value_filter_ctor_, family_bytes, qualifier_bytes, hbase_op,
392  value_bytes);
393  RETURN_ERROR_IF_EXC(env);
394  // filter_list.add(filter);
395  env->CallBooleanMethod(filter_list, filter_list_add_filter_id_, filter);
396  RETURN_ERROR_IF_EXC(env);
397  }
398  // scan.setFilter(filter_list);
399  env->CallObjectMethod(scan_, scan_set_filter_id_, filter_list);
400  RETURN_ERROR_IF_EXC(env);
401  }
402 
403  return Status::OK;
404 }
405 
407  *timeout = false;
408  jthrowable exc = env->ExceptionOccurred();
409  if (exc == NULL) return Status::OK;
410 
411  // GetJniExceptionMsg gets the error message and clears the exception status (which is
412  // necessary). We return the error if the exception was not a ScannerTimeoutException.
413  Status status = JniUtil::GetJniExceptionMsg(env, false);
414  if (env->IsInstanceOf(exc, scanner_timeout_ex_cl_) != JNI_TRUE) return status;
415 
416  *timeout = true;
417  const ScanRange& scan_range = (*scan_range_vector_)[current_scan_range_idx_];
418  // If cells_ is NULL, then the ResultScanner timed out before it was ever used
419  // so we can just re-create the ResultScanner with the same scan_range
420  if (cells_ == NULL) return InitScanRange(env, scan_range);
421 
422  JniLocalFrame jni_frame;
423  RETURN_IF_ERROR(jni_frame.push(env));
424  DCHECK_LT(cell_index_, num_cells_);
425  jobject cell = env->GetObjectArrayElement(cells_, cell_index_);
426  // Specifically set the start_bytes to the next row since some of them were already
427  // read
428  jbyteArray start_bytes =
429  (jbyteArray) env->CallObjectMethod(cell, cell_get_row_array_);
430  jbyteArray end_bytes;
431  CreateByteArray(env, scan_range.stop_key(), &end_bytes);
432  return InitScanRange(env, start_bytes, end_bytes);
433 }
434 
435 Status HBaseTableScanner::InitScanRange(JNIEnv* env, const ScanRange& scan_range) {
436  JniLocalFrame jni_frame;
437  RETURN_IF_ERROR(jni_frame.push(env));
438  jbyteArray start_bytes;
439  CreateByteArray(env, scan_range.start_key(), &start_bytes);
440  jbyteArray end_bytes;
441  CreateByteArray(env, scan_range.stop_key(), &end_bytes);
442  return InitScanRange(env, start_bytes, end_bytes);
443 }
444 
445 Status HBaseTableScanner::InitScanRange(JNIEnv* env, jbyteArray start_bytes,
446  jbyteArray end_bytes) {
447  // scan_.setStartRow(start_bytes);
448  env->CallObjectMethod(scan_, scan_set_start_row_id_, start_bytes);
449  RETURN_ERROR_IF_EXC(env);
450 
451  // scan_.setStopRow(end_bytes);
452  env->CallObjectMethod(scan_, scan_set_stop_row_id_, end_bytes);
453  RETURN_ERROR_IF_EXC(env);
454 
455  if (resultscanner_ != NULL) {
456  // resultscanner_.close();
457  env->CallObjectMethod(resultscanner_, resultscanner_close_id_);
458  env->DeleteGlobalRef(resultscanner_);
459  RETURN_ERROR_IF_EXC(env);
460  resultscanner_ = NULL;
461  }
462  // resultscanner_ = htable_.getScanner(scan_);
463  RETURN_IF_ERROR(htable_->GetResultScanner(scan_, &resultscanner_));
464  resultscanner_ = env->NewGlobalRef(resultscanner_);
465  RETURN_ERROR_IF_EXC(env);
466  return Status::OK;
467 }
468 
469 Status HBaseTableScanner::StartScan(JNIEnv* env, const TupleDescriptor* tuple_desc,
470  const ScanRangeVector& scan_range_vector, const vector<THBaseFilter>& filters) {
471  DCHECK(scan_range_vector.size() > 0);
472  // Setup the scan without ranges first
473  RETURN_IF_ERROR(ScanSetup(env, tuple_desc, filters));
474 
475  // Record the ranges
476  scan_range_vector_ = &scan_range_vector;
478 
479  // Now, scan the first range (we should have at least one range). The
480  // resultscanner_ is NULL and gets created in InitScanRange, so we don't
481  // need to check if it timed out.
482  DCHECK(resultscanner_ == NULL);
484 }
485 
486 Status HBaseTableScanner::CreateByteArray(JNIEnv* env, const string& s,
487  jbyteArray* bytes) {
488  if (!s.empty()) {
489  *bytes = env->NewByteArray(s.size());
490  if (*bytes == NULL) {
491  return Status("Couldn't construct java byte array for key " + s);
492  }
493  env->SetByteArrayRegion(*bytes, 0, s.size(),
494  reinterpret_cast<const jbyte*>(s.data()));
495  } else {
496  *bytes = reinterpret_cast<jbyteArray>(empty_row_);
497  }
498  return Status::OK;
499 }
500 
501 Status HBaseTableScanner::Next(JNIEnv* env, bool* has_next) {
502  JniLocalFrame jni_frame;
503  RETURN_IF_ERROR(jni_frame.push(env));
504  jobject result = NULL;
505  {
507  while (true) {
508  DCHECK(resultscanner_ != NULL);
509  // result_ = resultscanner_.next();
510  result = env->CallObjectMethod(resultscanner_, resultscanner_next_id_);
511  // Normally we would check for a JNI exception via RETURN_ERROR_IF_EXC, but we
512  // need to also check for scanner timeouts and handle them specially, which is
513  // done by HandleResultScannerTimeout(). If a timeout occurred, then it will
514  // re-create the ResultScanner so we can try again.
515  bool timeout;
517  if (timeout) {
518  result = env->CallObjectMethod(resultscanner_, resultscanner_next_id_);
519  // There shouldn't be a timeout now, so we will just return any errors.
520  RETURN_ERROR_IF_EXC(env);
521  }
522  // jump to the next region when finished with the current region.
523  if (result == NULL && current_scan_range_idx_ + 1 < scan_range_vector_->size()) {
527  continue;
528  }
529 
530  // Ignore empty rows
531  if (result != NULL &&
532  JNI_TRUE == env->CallBooleanMethod(result, result_isempty_id_)) {
533  continue;
534  }
535  break;
536  }
537  }
538 
539  if (result == NULL) {
540  *has_next = false;
541  return Status::OK;
542  }
543 
544  if (cells_ != NULL) env->DeleteGlobalRef(cells_);
545  // cells_ = result.raw();
546  cells_ = reinterpret_cast<jobjectArray>(
547  env->CallObjectMethod(result, result_raw_cells_id_));
548  cells_ = reinterpret_cast<jobjectArray>(env->NewGlobalRef(cells_));
549  num_cells_ = env->GetArrayLength(cells_);
550  // Check that raw() didn't return more cells than expected.
551  // If num_requested_cells_ is 0 then only row key is asked for and this check
552  // should pass.
555  *has_next = false;
556  return Status("Encountered more cells than expected.");
557  }
558  // If all requested columns are present, and we didn't ask for any extra ones to work
559  // around an hbase bug, we avoid family-/qualifier comparisons in NextValue().
561  all_cells_present_ = true;
562  } else {
563  all_cells_present_ = false;
564  }
565  cell_index_ = 0;
566 
567  value_pool_->Clear();
568  *has_next = true;
569  return Status::OK;
570 }
571 
573  Tuple* tuple, void* data) {
574  void* slot = tuple->GetSlot(slot_desc->tuple_offset());
575  BitUtil::ByteSwap(slot, data, slot_desc->type().GetByteSize());
576 }
577 
578 inline void HBaseTableScanner::GetRowKey(JNIEnv* env, jobject cell,
579  void** data, int* length) {
580  int offset = env->CallIntMethod(cell, cell_get_row_offset_id_);
581  *length = env->CallShortMethod(cell, cell_get_row_length_id_);
582  jbyteArray jdata =
583  (jbyteArray) env->CallObjectMethod(cell, cell_get_row_array_);
584  *data = value_pool_->Allocate(*length);
585  env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
587 }
588 
589 inline void HBaseTableScanner::GetFamily(JNIEnv* env, jobject cell,
590  void** data, int* length) {
591  int offset = env->CallIntMethod(cell, cell_get_family_offset_id_);
592  *length = env->CallShortMethod(cell, cell_get_family_length_id_);
593  jbyteArray jdata =
594  (jbyteArray) env->CallObjectMethod(cell, cell_get_family_array_);
595  *data = value_pool_->Allocate(*length);
596  env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
598 }
599 
600 inline void HBaseTableScanner::GetQualifier(JNIEnv* env, jobject cell,
601  void** data, int* length) {
602  int offset = env->CallIntMethod(cell, cell_get_qualifier_offset_id_);
603  *length = env->CallIntMethod(cell, cell_get_qualifier_length_id_);
604  jbyteArray jdata =
605  (jbyteArray) env->CallObjectMethod(cell, cell_get_qualifier_array_);
606  *data = value_pool_->Allocate(*length);
607  env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
609 }
610 
611 inline void HBaseTableScanner::GetValue(JNIEnv* env, jobject cell,
612  void** data, int* length) {
613  int offset = env->CallIntMethod(cell, cell_get_value_offset_id_);
614  *length = env->CallIntMethod(cell, cell_get_value_length_id_);
615  jbyteArray jdata =
616  (jbyteArray) env->CallObjectMethod(cell, cell_get_value_array_);
617  *data = value_pool_->Allocate(*length);
618  env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
620 }
621 
622 Status HBaseTableScanner::GetRowKey(JNIEnv* env, void** key, int* key_length) {
623  jobject cell = env->GetObjectArrayElement(cells_, 0);
624  GetRowKey(env, cell, key, key_length);
625  RETURN_ERROR_IF_EXC(env);
626  return Status::OK;
627 }
628 
629 Status HBaseTableScanner::GetRowKey(JNIEnv* env, const SlotDescriptor* slot_desc,
630  Tuple* tuple) {
631  void* key;
632  int key_length;
633  jobject cell = env->GetObjectArrayElement(cells_, 0);
634  GetRowKey(env, cell, &key, &key_length);
635  DCHECK_EQ(key_length, slot_desc->type().GetByteSize());
636  WriteTupleSlot(slot_desc, tuple, reinterpret_cast<char*>(key));
637  RETURN_ERROR_IF_EXC(env);
638  return Status::OK;
639 }
640 
641 Status HBaseTableScanner::GetCurrentValue(JNIEnv* env, const string& family,
642  const string& qualifier, void** data, int* length, bool* is_null) {
643  // Current row doesn't have any more cells. All remaining values are NULL.
644  if (cell_index_ >= num_cells_) {
645  *is_null = true;
646  return Status::OK;
647  }
648  JniLocalFrame jni_frame;
649  RETURN_IF_ERROR(jni_frame.push(env));
650  jobject cell = env->GetObjectArrayElement(cells_, cell_index_);
651  if (!all_cells_present_) {
652  // Check family. If it doesn't match, we have a NULL value.
653  void* family_data;
654  int family_length;
655  GetFamily(env, cell, &family_data, &family_length);
656  if (CompareStrings(family, family_data, family_length) != 0) {
657  *is_null = true;
658  return Status::OK;
659  }
660 
661  // Check qualifier. If it doesn't match, we have a NULL value.
662  void* qualifier_data;
663  int qualifier_length;
664  GetQualifier(env, cell, &qualifier_data, &qualifier_length);
665  if (CompareStrings(qualifier, qualifier_data, qualifier_length) != 0) {
666  *is_null = true;
667  return Status::OK;
668  }
669  }
670  GetValue(env, cell, data, length);
671  *is_null = false;
672  return Status::OK;
673 }
674 
675 Status HBaseTableScanner::GetValue(JNIEnv* env, const string& family,
676  const string& qualifier, void** value, int* value_length) {
677  bool is_null;
678  GetCurrentValue(env, family, qualifier, value, value_length, &is_null);
679  RETURN_ERROR_IF_EXC(env);
680  if (is_null) {
681  *value = NULL;
682  *value_length = 0;
683  return Status::OK;
684  }
685  ++cell_index_;
686  return Status::OK;
687 }
688 
689 Status HBaseTableScanner::GetValue(JNIEnv* env, const string& family,
690  const string& qualifier, const SlotDescriptor* slot_desc, Tuple* tuple) {
691  void* value;
692  int value_length;
693  bool is_null;
694  GetCurrentValue(env, family, qualifier, &value, &value_length, &is_null);
695  RETURN_ERROR_IF_EXC(env);
696  if (is_null) {
697  tuple->SetNull(slot_desc->null_indicator_offset());
698  return Status::OK;
699  }
700  DCHECK_EQ(value_length, slot_desc->type().GetByteSize());
701  WriteTupleSlot(slot_desc, tuple, reinterpret_cast<char*>(value));
702  ++cell_index_;
703  return Status::OK;
704 }
705 
706 int HBaseTableScanner::CompareStrings(const string& s, void* data, int length) {
707  int slength = static_cast<int>(s.length());
708  if (slength == 0 && length == 0) return 0;
709  if (length == 0) return 1;
710  if (slength == 0) return -1;
711  int result = memcmp(s.data(), reinterpret_cast<char*>(data), min(slength, length));
712  if (result == 0 && slength != length) {
713  return (slength < length ? -1 : 1);
714  } else {
715  return result;
716  }
717 }
718 
719 void HBaseTableScanner::Close(JNIEnv* env) {
720  if (resultscanner_ != NULL) {
721  // resultscanner_.close();
722  env->CallObjectMethod(resultscanner_, resultscanner_close_id_);
723  // Manually check if the ResultScanner timed out so that we can log a less scary
724  // and more specific message.
725  jthrowable exc = env->ExceptionOccurred();
726  if (exc != NULL) {
727  if (env->IsInstanceOf(exc, scanner_timeout_ex_cl_) == JNI_TRUE) {
728  env->ExceptionClear();
729  LOG(INFO) << "ResultScanner timed out before it was closed "
730  << "(this does not necessarily indicate a problem)";
731  } else {
732  // GetJniExceptionMsg will clear the exception status and log
733  JniUtil::GetJniExceptionMsg(env, true,
734  "Unknown error occurred while closing ResultScanner: ");
735  }
736  }
737  env->DeleteGlobalRef(resultscanner_);
738  resultscanner_ = NULL;
739  }
740  if (scan_ != NULL) env->DeleteGlobalRef(scan_);
741  if (cells_ != NULL) env->DeleteGlobalRef(cells_);
742 
743  // Close the HTable so that the connections are not kept around.
744  if (htable_.get() != NULL) htable_->Close(state_);
745 
746  value_pool_->FreeAll();
747 }
static jmethodID cell_get_row_length_id_
const TableDescriptor * table_desc() const
Definition: descriptors.h:304
const std::string & start_key() const
static jmethodID resultscanner_next_id_
void SetNull(const NullIndicatorOffset &offset)
Definition: tuple.h:101
static jmethodID cell_get_qualifier_array_
Status StartScan(JNIEnv *env, const TupleDescriptor *tuple_desc, const ScanRangeVector &scan_range_vector, const std::vector< THBaseFilter > &filters)
HBaseTableFactory * htable_factory_
HBase Table factory from runtime state.
static jmethodID scan_set_cache_blocks_id_
Status ScanSetup(JNIEnv *env, const TupleDescriptor *tuple_desc, const std::vector< THBaseFilter > &filters)
First time scanning the table, do some setup.
static jmethodID scan_set_filter_id_
void WriteTupleSlot(const SlotDescriptor *slot_desc, Tuple *tuple, void *data)
static jclass single_column_value_filter_cl_
static jmethodID cell_get_row_offset_id_
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
Status Next(JNIEnv *env, bool *has_next)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
static jmethodID cell_get_value_array_
Status GetCurrentValue(JNIEnv *env, const std::string &family, const std::string &qualifier, void **data, int *length, bool *is_null)
#define ADD_TIMER(profile, name)
static jmethodID cell_get_row_array_
int CompareStrings(const std::string &s, void *data, int length)
static jmethodID scan_set_start_row_id_
void * GetSlot(int offset)
Definition: tuple.h:118
static jmethodID single_column_value_filter_ctor_
static jmethodID scan_add_column_id_
const std::vector< SlotDescriptor * > & slots() const
Definition: descriptors.h:302
Status GetTable(const std::string &table_name, boost::scoped_ptr< HBaseTable > *hbase_table)
#define COUNTER_ADD(c, v)
const NullIndicatorOffset & null_indicator_offset() const
Definition: descriptors.h:89
Status GetRowKey(JNIEnv *env, void **key, int *key_length)
Get the current HBase row key.
#define SCOPED_TIMER(c)
static jmethodID cell_get_value_length_id_
static int64_t ByteSwap(int64_t value)
Swaps the byte order (i.e. endianess)
Definition: bit-util.h:149
void Close(JNIEnv *env)
Close HTable and ResultScanner.
static jclass scan_cl_
Global class references created with JniUtil. Cleanup is done in JniUtil::Cleanup().
const ScanRangeVector * scan_range_vector_
Vector of ScanRange.
boost::scoped_ptr< HBaseTable > htable_
C++ wrapper for HTable.
static Status LocalToGlobalRef(JNIEnv *env, jobject local_ref, jobject *global_ref)
Definition: jni-util.cc:67
static jmethodID result_raw_cells_id_
HBaseTableScanner(HBaseScanNode *scan_node, HBaseTableFactory *htable_factory, RuntimeState *state)
static jmethodID scan_set_caching_id_
const TQueryOptions & query_options() const
Definition: runtime-state.h:95
static jmethodID scan_set_stop_row_id_
const ColumnType & type() const
Definition: descriptors.h:78
static Status GetJniExceptionMsg(JNIEnv *env, bool log_stack=true, const std::string &prefix="")
Definition: jni-util.cc:161
Status HandleResultScannerTimeout(JNIEnv *env, bool *timeout)
Status InitScanRange(JNIEnv *env, const ScanRange &scan_range)
Initialize the scan to the given range.
static jmethodID cell_get_family_offset_id_
static jmethodID filter_list_add_filter_id_
Status GetValue(JNIEnv *env, const std::string &family, const std::string &qualifier, void **value, int *value_length)
HBaseScanNode * scan_node_
The enclosing HBaseScanNode.
static jmethodID result_isempty_id_
static jmethodID cell_get_value_offset_id_
static jmethodID cell_get_qualifier_offset_id_
int GetByteSize() const
Returns the byte size of this type. Returns 0 for variable length types.
Definition: types.h:178
static jmethodID filter_list_ctor_
const std::string & stop_key() const
static jmethodID cell_get_qualifier_length_id_
static jmethodID cell_get_family_length_id_
RuntimeProfile::Counter * bytes_read_counter() const
Definition: scan-node.h:95
Status push(JNIEnv *env, int max_local_ref=10)
Definition: jni-util.cc:34
#define RETURN_ERROR_IF_EXC(env)
Definition: jni-util.h:99
int cell_index_
Current position in cells_. Incremented in NextValue(). Reset in Next().
int num_cells_
Number of cells returned from last result_.raw().
std::vector< ScanRange > ScanRangeVector
static jmethodID scan_set_max_versions_id_
static jmethodID resultscanner_close_id_
static const Status OK
Definition: status.h:87
static bool ClassExists(JNIEnv *env, const char *class_str)
Definition: jni-util.cc:45
Status CreateByteArray(JNIEnv *env, const std::string &s, jbyteArray *bytes)
Turn strings into Java byte array.
static Status GetGlobalClassRef(JNIEnv *env, const char *class_str, jclass *class_ref)
Definition: jni-util.cc:56
uint8_t offset[7 *64-sizeof(uint64_t)]
int tuple_offset() const
Definition: descriptors.h:88
static const int DEFAULT_ROWS_CACHED
bool cache_blocks_
True if the scanner should set Scan.setCacheBlocks to true.
void GetFamily(JNIEnv *env, jobject cell, void **data, int *length)
static jobjectArray compare_ops_
static jmethodID cell_get_family_array_
const int suggested_max_caching() const
HBase scan range; "" means unbounded.
RuntimeProfile::Counter * read_timer() const
Definition: scan-node.h:97
void DebugString(int indentation_level, std::stringstream *out)
Write debug string of this ScanRange into out.
boost::scoped_ptr< MemPool > value_pool_
void GetQualifier(JNIEnv *env, jobject cell, void **data, int *length)
JNIEnv * getJNIEnv(void)
C linkage for helper functions in hdfsJniHelper.h.
RuntimeProfile::Counter * scan_setup_timer_
HBase specific counters.
static jclass cell_cl_
Cell or KeyValue class depending on HBase version (see class comment).
static jclass scanner_timeout_ex_cl_
Exception thrown when a ResultScanner times out.