17 #include <boost/foreach.hpp>
18 #include <boost/scoped_array.hpp>
43 const vector<ExprContext*>& output_expr_ctxs,
45 : table_desc_(table_desc),
47 output_expr_ctxs_(output_expr_ctxs),
49 runtime_profile_(profile) {
60 return Status(
"HBase tables must contain at least"
61 " one column in addition to the row key.");
65 if (env == NULL)
return Status(
"Error getting JNIEnv.");
69 for (
int i = 0; i < num_col; ++i) {
76 jbyteArray byte_array;
80 cf_arrays_.push_back(reinterpret_cast<jbyteArray>(global_ref));
83 qual_arrays_.push_back(reinterpret_cast<jbyteArray>(global_ref));
91 if (env == NULL)
return Status(
"Error getting JNIEnv.");
95 env,
"org/apache/hadoop/hbase/client/Put", &
put_cl_));
100 "([B[B[B)Lorg/apache/hadoop/hbase/client/Put;");
114 if (env == NULL)
return Status(
"Error getting JNIEnv.");
119 DCHECK_GE(num_cols, 2);
127 char binary_value[8];
132 for (
int idx_batch = 0; idx_batch < limit; idx_batch++) {
138 return Status(
"Cannot insert into HBase with a null row key.");
141 for (
int j = 0; j < num_cols; j++) {
148 string_value.clear();
150 data = string_value.data();
151 data_len = string_value.length();
158 DCHECK(data_len == 1 || data_len == 2 || data_len == 4 || data_len == 8)
167 DCHECK(put != NULL) <<
"Put shouldn't be NULL for non-key cols.";
168 jbyteArray val_array;
175 env->DeleteLocalRef(val_array);
182 env->DeleteLocalRef(put);
201 if (env == NULL)
return Status(
"Error getting JNIEnv.");
210 env->DeleteGlobalRef(reinterpret_cast<jobject>(ref));
213 env->DeleteGlobalRef(reinterpret_cast<jobject>(ref));
242 env->DeleteLocalRef(rk_array);
249 jbyteArray* j_array) {
250 int s_len = s.size();
255 jbyteArray* j_array) {
256 (*j_array) = env->NewByteArray(data_len);
258 env->SetByteArrayRegion((*j_array), 0, data_len, reinterpret_cast<const jbyte*>(data));
265 if (
table_.get() != NULL) {
276 ss <<
"HBaseTableWriter::Close ran into an issue: " << status.
GetDetail();
static jmethodID put_add_id_
Put::add(byte[], byte[], byte[])
jobject put_list_
jni ArrayList<Put>
Status CreatePutList(JNIEnv *env, int num_puts)
const std::string GetDetail() const
std::vector< int > output_exprs_byte_sizes_
output_exprs_byte_sizes_[i] is the byte size of output_expr_ctxs_[i]->root()'s type.
std::vector< jbyteArray > qual_arrays_
qual_arrays_[i-1] is the column family qualifier jbyteArray for column i.
static jmethodID put_ctor_
new Put(byte[])
#define RETURN_IF_ERROR(stmt)
some generally useful macros
TupleRow * GetRow(int row_idx)
#define ADD_TIMER(profile, name)
Status CreatePut(JNIEnv *env, const void *rk, int rk_len, jobject *put)
HBaseTableFactory * htable_factory()
Status GetTable(const std::string &table_name, boost::scoped_ptr< HBaseTable > *hbase_table)
static int64_t ByteSwap(int64_t value)
Swaps the byte order (i.e. endianess)
static jmethodID list_add_id_
ArrayList::add(V);.
static Status LocalToGlobalRef(JNIEnv *env, jobject local_ref, jobject *global_ref)
std::vector< jbyteArray > cf_arrays_
cf_arrays_[i-1] is the column family jbyteArray for column i.
const std::vector< HBaseColumnDescriptor > & cols() const
bool LogError(const ErrorMsg &msg)
const std::vector< ExprContext * > output_expr_ctxs_
The expressions that are run to create tuples to be written to hbase.
RuntimeProfile * runtime_profile_
Parent table sink's profile.
static jclass put_cl_
org.apache.hadoop.hbase.client.Put
static Status InitJNI()
Grab all of the Java classes needed to get data into and out of HBase.
HBaseTableWriter(HBaseTableDescriptor *table_desc, const std::vector< ExprContext * > &output_expr_ctxs, RuntimeProfile *profile)
RuntimeProfile::Counter * htable_put_timer_
Status Init(RuntimeState *state)
const std::string & name() const
#define RETURN_ERROR_IF_EXC(env)
Status CreateByteArray(JNIEnv *env, const std::string &s, jbyteArray *j_array)
Create a byte array containing the string's chars.
Status AppendRowBatch(RowBatch *batch)
static Status GetGlobalClassRef(JNIEnv *env, const char *class_str, jclass *class_ref)
RuntimeProfile::Counter * encoding_timer_
static jclass list_cl_
java.util.ArrayList
void Close(RuntimeState *state)
Calls to Close release the HBaseTable.
HBaseTableDescriptor * table_desc_
Owned by RuntimeState not by this object.
static jmethodID list_ctor_
new ArrayList<V>(starting_capacity)
boost::scoped_ptr< HBaseTable > table_
JNIEnv * getJNIEnv(void)
C linkage for helper functions in hdfsJniHelper.h.