30 using namespace impala;
75 *out << string(indentation_level * 2,
' ');
100 "HBaseTableScanner.ScanSetup")) {
102 if (query_option.__isset.hbase_caching && query_option.hbase_caching > 0) {
110 query_option.hbase_cache_blocks;
117 return Status(
"Failed to get/create JVM");
141 "org/apache/hadoop/hbase/filter/SingleColumnValueFilter",
145 "org/apache/hadoop/hbase/filter/CompareFilter$CompareOp",
149 "org/apache/hadoop/hbase/client/ScannerTimeoutException",
156 if (has_cell_class) {
157 LOG(INFO) <<
"Detected HBase version >= 0.95.2";
162 LOG(INFO) <<
"Detected HBase version < 0.95.2";
172 "(I)Lorg/apache/hadoop/hbase/client/Scan;");
179 "([B[B)Lorg/apache/hadoop/hbase/client/Scan;");
182 "(Lorg/apache/hadoop/hbase/filter/Filter;)Lorg/apache/hadoop/hbase/client/Scan;");
185 "([B)Lorg/apache/hadoop/hbase/client/Scan;");
188 "([B)Lorg/apache/hadoop/hbase/client/Scan;");
193 "()Lorg/apache/hadoop/hbase/client/Result;");
199 if (has_cell_class) {
201 "()[Lorg/apache/hadoop/hbase/Cell;");
204 "()[Lorg/apache/hadoop/hbase/KeyValue;");
213 if (has_cell_class) {
226 env->GetMethodID(
cell_cl_,
"getBuffer",
"()[B");
236 env->GetMethodID(
cell_cl_,
"getQualifierOffset",
"()I");
239 env->GetMethodID(
cell_cl_,
"getQualifierLength",
"()I");
251 jfieldID empty_start_row_id =
259 "(Lorg/apache/hadoop/hbase/filter/FilterList$Operator;)V");
262 "(Lorg/apache/hadoop/hbase/filter/Filter;)V");
266 jfieldID must_pass_all_id = env->GetStaticFieldID(
filter_list_op_cl_,
"MUST_PASS_ALL",
267 "Lorg/apache/hadoop/hbase/filter/FilterList$Operator;");
275 "([B[BLorg/apache/hadoop/hbase/filter/CompareFilter$CompareOp;[B)V");
279 jmethodID compare_op_values = env->GetStaticMethodID(
compare_op_cl_,
"values",
280 "()[Lorg/apache/hadoop/hbase/filter/CompareFilter$CompareOp;");
283 (jobjectArray) env->CallStaticObjectMethod(
compare_op_cl_, compare_op_values);
291 const vector<THBaseFilter>& filters) {
304 DCHECK(
scan_ == NULL);
321 const vector<SlotDescriptor*>& slots = tuple_desc->
slots();
323 for (
int i = 0; i < slots.size(); ++i) {
324 if (!slots[i]->is_materialized())
continue;
325 const string& family = hbase_table->cols()[slots[i]->col_pos()].family;
326 const string& qualifier = hbase_table->cols()[slots[i]->col_pos()].qualifier;
328 if (qualifier.empty())
continue;
331 jbyteArray family_bytes;
333 jbyteArray qualifier_bytes;
344 for (vector<THBaseFilter>::const_iterator it = filters.begin(); it != filters.end();
346 bool requested =
false;
347 for (
int i = 0; i < slots.size(); ++i) {
348 if (!slots[i]->is_materialized())
continue;
349 const string& family = hbase_table->cols()[slots[i]->col_pos()].family;
350 const string& qualifier = hbase_table->cols()[slots[i]->col_pos()].qualifier;
351 if (family == it->family && qualifier == it->qualifier) {
356 if (requested)
continue;
359 jbyteArray family_bytes;
361 jbyteArray qualifier_bytes;
370 if (!filters.empty()) {
372 jobject filter_list =
375 vector<THBaseFilter>::const_iterator it;
376 for (it = filters.begin(); it != filters.end(); ++it) {
380 jobject hbase_op = env->GetObjectArrayElement(
compare_ops_, it->op_ordinal);
382 jbyteArray family_bytes;
384 jbyteArray qualifier_bytes;
386 jbyteArray value_bytes;
408 jthrowable exc = env->ExceptionOccurred();
428 jbyteArray start_bytes =
430 jbyteArray end_bytes;
438 jbyteArray start_bytes;
440 jbyteArray end_bytes;
446 jbyteArray end_bytes) {
470 const ScanRangeVector& scan_range_vector,
const vector<THBaseFilter>& filters) {
471 DCHECK(scan_range_vector.size() > 0);
489 *bytes = env->NewByteArray(s.size());
490 if (*bytes == NULL) {
491 return Status(
"Couldn't construct java byte array for key " + s);
493 env->SetByteArrayRegion(*bytes, 0, s.size(),
494 reinterpret_cast<const jbyte*
>(s.data()));
496 *bytes =
reinterpret_cast<jbyteArray
>(
empty_row_);
504 jobject result = NULL;
531 if (result != NULL &&
539 if (result == NULL) {
546 cells_ =
reinterpret_cast<jobjectArray
>(
548 cells_ =
reinterpret_cast<jobjectArray
>(env->NewGlobalRef(
cells_));
556 return Status(
"Encountered more cells than expected.");
573 Tuple* tuple,
void* data) {
579 void** data,
int* length) {
585 env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
590 void** data,
int* length) {
596 env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
601 void** data,
int* length) {
607 env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
612 void** data,
int* length) {
618 env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data));
623 jobject cell = env->GetObjectArrayElement(
cells_, 0);
633 jobject cell = env->GetObjectArrayElement(
cells_, 0);
642 const string& qualifier,
void** data,
int* length,
bool* is_null) {
655 GetFamily(env, cell, &family_data, &family_length);
662 void* qualifier_data;
663 int qualifier_length;
664 GetQualifier(env, cell, &qualifier_data, &qualifier_length);
665 if (
CompareStrings(qualifier, qualifier_data, qualifier_length) != 0) {
676 const string& qualifier,
void** value,
int* value_length) {
678 GetCurrentValue(env, family, qualifier, value, value_length, &is_null);
694 GetCurrentValue(env, family, qualifier, &value, &value_length, &is_null);
707 int slength =
static_cast<int>(s.length());
708 if (slength == 0 && length == 0)
return 0;
709 if (length == 0)
return 1;
710 if (slength == 0)
return -1;
711 int result = memcmp(s.data(),
reinterpret_cast<char*
>(data), min(slength, length));
712 if (result == 0 && slength != length) {
713 return (slength < length ? -1 : 1);
725 jthrowable exc = env->ExceptionOccurred();
728 env->ExceptionClear();
729 LOG(INFO) <<
"ResultScanner timed out before it was closed "
730 <<
"(this does not necessarily indicate a problem)";
734 "Unknown error occurred while closing ResultScanner: ");
740 if (
scan_ != NULL) env->DeleteGlobalRef(
scan_);
static jmethodID cell_get_row_length_id_
const TableDescriptor * table_desc() const
const std::string & start_key() const
static jmethodID resultscanner_next_id_
void SetNull(const NullIndicatorOffset &offset)
static jmethodID cell_get_qualifier_array_
Status StartScan(JNIEnv *env, const TupleDescriptor *tuple_desc, const ScanRangeVector &scan_range_vector, const std::vector< THBaseFilter > &filters)
HBaseTableFactory * htable_factory_
HBase Table factory from runtime state.
static jmethodID scan_set_cache_blocks_id_
Status ScanSetup(JNIEnv *env, const TupleDescriptor *tuple_desc, const std::vector< THBaseFilter > &filters)
First time scanning the table, do some setup.
static jmethodID scan_set_filter_id_
void WriteTupleSlot(const SlotDescriptor *slot_desc, Tuple *tuple, void *data)
static jclass single_column_value_filter_cl_
static jmethodID cell_get_row_offset_id_
A tuple with 0 materialised slots is represented as NULL.
Status Next(JNIEnv *env, bool *has_next)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
static jmethodID cell_get_value_array_
Status GetCurrentValue(JNIEnv *env, const std::string &family, const std::string &qualifier, void **data, int *length, bool *is_null)
#define ADD_TIMER(profile, name)
static jmethodID cell_get_row_array_
int CompareStrings(const std::string &s, void *data, int length)
static jmethodID scan_set_start_row_id_
void * GetSlot(int offset)
static jmethodID single_column_value_filter_ctor_
static jmethodID scan_add_column_id_
const std::vector< SlotDescriptor * > & slots() const
static jobject empty_row_
static jobject must_pass_all_op_
Status GetTable(const std::string &table_name, boost::scoped_ptr< HBaseTable > *hbase_table)
#define COUNTER_ADD(c, v)
const NullIndicatorOffset & null_indicator_offset() const
Status GetRowKey(JNIEnv *env, void **key, int *key_length)
Get the current HBase row key.
static jmethodID cell_get_value_length_id_
static int64_t ByteSwap(int64_t value)
Swaps the byte order (i.e. endianess)
void Close(JNIEnv *env)
Close HTable and ResultScanner.
static jclass scan_cl_
Global class references created with JniUtil. Cleanup is done in JniUtil::Cleanup().
int current_scan_range_idx_
const ScanRangeVector * scan_range_vector_
Vector of ScanRange.
boost::scoped_ptr< HBaseTable > htable_
C++ wrapper for HTable.
static Status LocalToGlobalRef(JNIEnv *env, jobject local_ref, jobject *global_ref)
static jmethodID result_raw_cells_id_
HBaseTableScanner(HBaseScanNode *scan_node, HBaseTableFactory *htable_factory, RuntimeState *state)
static jclass resultscanner_cl_
static jmethodID scan_set_caching_id_
const TQueryOptions & query_options() const
static jmethodID scan_set_stop_row_id_
const ColumnType & type() const
static Status GetJniExceptionMsg(JNIEnv *env, bool log_stack=true, const std::string &prefix="")
Status HandleResultScannerTimeout(JNIEnv *env, bool *timeout)
Status InitScanRange(JNIEnv *env, const ScanRange &scan_range)
Initialize the scan to the given range.
int num_addl_requested_cols_
static jmethodID cell_get_family_offset_id_
static jmethodID filter_list_add_filter_id_
Status GetValue(JNIEnv *env, const std::string &family, const std::string &qualifier, void **value, int *value_length)
HBaseScanNode * scan_node_
The enclosing HBaseScanNode.
static jmethodID result_isempty_id_
static jmethodID cell_get_value_offset_id_
static jclass hconstants_cl_
static jmethodID cell_get_qualifier_offset_id_
int GetByteSize() const
Returns the byte size of this type. Returns 0 for variable length types.
static jmethodID filter_list_ctor_
const std::string & stop_key() const
static jclass filter_list_op_cl_
static jmethodID cell_get_qualifier_length_id_
static jmethodID cell_get_family_length_id_
RuntimeProfile::Counter * bytes_read_counter() const
Status push(JNIEnv *env, int max_local_ref=10)
static jclass compare_op_cl_
#define RETURN_ERROR_IF_EXC(env)
static jmethodID scan_ctor_
int cell_index_
Current position in cells_. Incremented in NextValue(). Reset in Next().
int num_cells_
Number of cells returned from last result_.raw().
std::vector< ScanRange > ScanRangeVector
static jmethodID scan_set_max_versions_id_
static jmethodID resultscanner_close_id_
static bool ClassExists(JNIEnv *env, const char *class_str)
Status CreateByteArray(JNIEnv *env, const std::string &s, jbyteArray *bytes)
Turn strings into Java byte array.
static Status GetGlobalClassRef(JNIEnv *env, const char *class_str, jclass *class_ref)
uint8_t offset[7 *64-sizeof(uint64_t)]
static const int DEFAULT_ROWS_CACHED
bool cache_blocks_
True if the scanner should set Scan.setCacheBlocks to true.
void GetFamily(JNIEnv *env, jobject cell, void **data, int *length)
static jobjectArray compare_ops_
static jmethodID cell_get_family_array_
const int suggested_max_caching() const
HBase scan range; "" means unbounded.
RuntimeProfile::Counter * read_timer() const
void DebugString(int indentation_level, std::stringstream *out)
Write debug string of this ScanRange into out.
boost::scoped_ptr< MemPool > value_pool_
void GetQualifier(JNIEnv *env, jobject cell, void **data, int *length)
JNIEnv * getJNIEnv(void)
C linkage for helper functions in hdfsJniHelper.h.
RuntimeProfile::Counter * scan_setup_timer_
HBase specific counters.
static jclass cell_cl_
Cell or KeyValue class depending on HBase version (see class comment).
static jclass scanner_timeout_ex_cl_
Exception thrown when a ResultScanner times out.
static jclass filter_list_cl_