Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hbase-table-writer.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <boost/foreach.hpp>
18 #include <boost/scoped_array.hpp>
19 #include <sstream>
20 
21 #include "common/logging.h"
22 #include "exprs/expr-context.h"
24 #include "util/bit-util.h"
25 #include "util/jni-util.h"
26 #include "exprs/expr.h"
27 #include "runtime/raw-value.h"
28 
29 #include "common/names.h"
30 
31 namespace impala {
32 
33 jclass HBaseTableWriter::put_cl_ = NULL;
34 jclass HBaseTableWriter::list_cl_ = NULL;
35 
36 jmethodID HBaseTableWriter::put_ctor_ = NULL;
37 jmethodID HBaseTableWriter::list_ctor_ = NULL;
38 jmethodID HBaseTableWriter::list_add_id_ = NULL;
39 
40 jmethodID HBaseTableWriter::put_add_id_ = NULL;
41 
43  const vector<ExprContext*>& output_expr_ctxs,
44  RuntimeProfile* profile)
45  : table_desc_(table_desc),
46  table_(NULL),
47  output_expr_ctxs_(output_expr_ctxs),
48  put_list_(NULL),
49  runtime_profile_(profile) {
50 };
51 
54  &table_));
55  encoding_timer_ = ADD_TIMER(runtime_profile_, "EncodingTimer");
56  htable_put_timer_ = ADD_TIMER(runtime_profile_, "HTablePutTimer");
57 
58  int num_col = table_desc_->num_cols();
59  if (num_col < 2) {
60  return Status("HBase tables must contain at least"
61  " one column in addition to the row key.");
62  }
63 
64  JNIEnv* env = getJNIEnv();
65  if (env == NULL) return Status("Error getting JNIEnv.");
66  output_exprs_byte_sizes_.resize(num_col);
67  cf_arrays_.reserve(num_col - 1);
68  qual_arrays_.reserve(num_col - 1);
69  for (int i = 0; i < num_col; ++i) {
70  output_exprs_byte_sizes_[i] = output_expr_ctxs_[i]->root()->type().GetByteSize();
71 
72  if (i == 0) continue;
73 
74  // Setup column family and qualifier byte array for non-rowkey column
76  jbyteArray byte_array;
77  jobject global_ref;
78  RETURN_IF_ERROR(CreateByteArray(env, col.family, &byte_array));
79  RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, byte_array, &global_ref));
80  cf_arrays_.push_back(reinterpret_cast<jbyteArray>(global_ref));
81  RETURN_IF_ERROR(CreateByteArray(env, col.qualifier, &byte_array));
82  RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, byte_array, &global_ref));
83  qual_arrays_.push_back(reinterpret_cast<jbyteArray>(global_ref));
84  }
85 
86  return Status::OK;
87 }
88 
90  JNIEnv* env = getJNIEnv();
91  if (env == NULL) return Status("Error getting JNIEnv.");
92 
95  env, "org/apache/hadoop/hbase/client/Put", &put_cl_));
97  put_ctor_ = env->GetMethodID(put_cl_, "<init>", "([B)V");
99  put_add_id_ = env->GetMethodID(put_cl_, "add",
100  "([B[B[B)Lorg/apache/hadoop/hbase/client/Put;");
101  RETURN_ERROR_IF_EXC(env);
103  JniUtil::GetGlobalClassRef(env, "java/util/ArrayList", &list_cl_));
104  list_ctor_ = env->GetMethodID(list_cl_, "<init>", "(I)V");
105  RETURN_ERROR_IF_EXC(env);
106  list_add_id_ = env->GetMethodID(list_cl_, "add", "(Ljava/lang/Object;)Z");
107  RETURN_ERROR_IF_EXC(env);
108 
109  return Status::OK;
110 }
111 
113  JNIEnv* env = getJNIEnv();
114  if (env == NULL) return Status("Error getting JNIEnv.");
115 
116  int limit = batch->num_rows();
117  if (limit == 0) return Status::OK;
118  int num_cols = table_desc_->num_cols();
119  DCHECK_GE(num_cols, 2);
120 
121  // Create the array list.
122  RETURN_IF_ERROR(CreatePutList(env, limit));
123 
124  // For every TupleRow in the row batch create a put, assign the row key,
125  // and add all of the values generated from the expressions.
126  string string_value; // text encoded value
127  char binary_value[8]; // binary encoded value; at most 8 bytes
128  const void* data; // pointer to the column value in bytes
129  int data_len; // length of the column value in bytes
130  {
132  for (int idx_batch = 0; idx_batch < limit; idx_batch++) {
133  TupleRow* current_row = batch->GetRow(idx_batch);
134  jobject put = NULL;
135 
136  if (output_expr_ctxs_[0]->GetValue(current_row) == NULL) {
137  // HBase row key must not be null.
138  return Status("Cannot insert into HBase with a null row key.");
139  }
140 
141  for (int j = 0; j < num_cols; j++) {
143  void* value = output_expr_ctxs_[j]->GetValue(current_row);
144 
145  if (value != NULL) {
146  if (!col.binary_encoded) {
147  // Text encoded
148  string_value.clear();
149  output_expr_ctxs_[j]->PrintValue(value, &string_value);
150  data = string_value.data();
151  data_len = string_value.length();
152  } else {
153  // Binary encoded
154  // Only bool, tinyint, smallint, int, bigint, float and double can be binary
155  // encoded. Convert the value to big-endian.
156  data = binary_value;
157  data_len = output_exprs_byte_sizes_[j];
158  DCHECK(data_len == 1 || data_len == 2 || data_len == 4 || data_len == 8)
159  << data_len;
160  BitUtil::ByteSwap(binary_value, value, data_len);
161  }
162 
163  if (j == 0) {
164  DCHECK(put == NULL);
165  RETURN_IF_ERROR(CreatePut(env, data, data_len, &put));
166  } else {
167  DCHECK(put != NULL) << "Put shouldn't be NULL for non-key cols.";
168  jbyteArray val_array;
169  RETURN_IF_ERROR(CreateByteArray(env, data, data_len, &val_array));
170  env->CallObjectMethod(put, put_add_id_, cf_arrays_[j-1], qual_arrays_[j-1],
171  val_array);
172  RETURN_ERROR_IF_EXC(env);
173 
174  // Clean up the local references.
175  env->DeleteLocalRef(val_array);
176  RETURN_ERROR_IF_EXC(env);
177  }
178  }
179  }
180 
181  DCHECK(put != NULL);
182  env->DeleteLocalRef(put);
183  RETURN_ERROR_IF_EXC(env);
184  }
185  }
186 
187  // Send the array list to HTable.
188  {
191  }
192  // Now clean put_list_.
193  env->DeleteGlobalRef(put_list_);
194  RETURN_ERROR_IF_EXC(env);
195  put_list_ = NULL;
196  return Status::OK;
197 }
198 
200  JNIEnv* env = getJNIEnv();
201  if (env == NULL) return Status("Error getting JNIEnv.");
202 
203  if (put_list_ != NULL) {
204  env->DeleteGlobalRef(put_list_);
205  RETURN_ERROR_IF_EXC(env);
206  put_list_ = NULL;
207  }
208 
209  BOOST_FOREACH(jbyteArray ref, cf_arrays_) {
210  env->DeleteGlobalRef(reinterpret_cast<jobject>(ref));
211  }
212  BOOST_FOREACH(jbyteArray ref, qual_arrays_) {
213  env->DeleteGlobalRef(reinterpret_cast<jobject>(ref));
214  }
215 
216  return Status::OK;
217 }
218 
219 Status HBaseTableWriter::CreatePutList(JNIEnv* env, int num_puts) {
220  DCHECK(put_list_ == NULL);
221  jobject local_put_list = env->NewObject(list_cl_, list_ctor_, num_puts);
222  RETURN_ERROR_IF_EXC(env);
223 
224  RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, local_put_list, &put_list_));
225 
226  return Status::OK;
227 }
228 
229 Status HBaseTableWriter::CreatePut(JNIEnv* env, const void* rk, int rk_len,
230  jobject* put) {
231  // Create the row key byte array.
232  jbyteArray rk_array;
233  RETURN_IF_ERROR(CreateByteArray(env, rk, rk_len, &rk_array));
234 
235  (*put) = env->NewObject(put_cl_, put_ctor_, rk_array);
236  RETURN_ERROR_IF_EXC(env);
237 
238  // Add the put to the list.
239  env->CallObjectMethod(put_list_, list_add_id_, *put);
240  RETURN_ERROR_IF_EXC(env);
241 
242  env->DeleteLocalRef(rk_array);
243  RETURN_ERROR_IF_EXC(env);
244 
245  return Status::OK;
246 }
247 
248 Status HBaseTableWriter::CreateByteArray(JNIEnv* env, const string& s,
249  jbyteArray* j_array) {
250  int s_len = s.size();
251  return CreateByteArray(env, s.data(), s_len, j_array);
252 }
253 
254 Status HBaseTableWriter::CreateByteArray(JNIEnv* env, const void* data, int data_len,
255  jbyteArray* j_array) {
256  (*j_array) = env->NewByteArray(data_len);
257  RETURN_ERROR_IF_EXC(env);
258  env->SetByteArrayRegion((*j_array), 0, data_len, reinterpret_cast<const jbyte*>(data));
259  RETURN_ERROR_IF_EXC(env);
260  return Status::OK;
261 }
262 
264  // Guard against double closing.
265  if (table_.get() != NULL) {
266  table_->Close(state);
267  table_.reset();
268  }
269 
270  // The jni should already have everything cleaned at this point
271  // but try again just in case there was an error that caused
272  // AppendRowBatch to exit out before calling CleanUpJni.
273  Status status = CleanUpJni();
274  if (!status.ok()) {
275  stringstream ss;
276  ss << "HBaseTableWriter::Close ran into an issue: " << status.GetDetail();
277  state->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
278  }
279 }
280 
281 } // namespace impala
static jmethodID put_add_id_
Put::add(byte[], byte[], byte[])
jobject put_list_
jni ArrayList<Put>
Status CreatePutList(JNIEnv *env, int num_puts)
int num_rows() const
Definition: row-batch.h:215
const std::string GetDetail() const
Definition: status.cc:184
std::vector< int > output_exprs_byte_sizes_
output_exprs_byte_sizes_[i] is the byte size of output_expr_ctxs_[i]->root()'s type.
std::vector< jbyteArray > qual_arrays_
qual_arrays_[i-1] is the column family qualifier jbyteArray for column i.
static jmethodID put_ctor_
new Put(byte[])
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
#define ADD_TIMER(profile, name)
Status CreatePut(JNIEnv *env, const void *rk, int rk_len, jobject *put)
HBaseTableFactory * htable_factory()
Status GetTable(const std::string &table_name, boost::scoped_ptr< HBaseTable > *hbase_table)
#define SCOPED_TIMER(c)
static int64_t ByteSwap(int64_t value)
Swaps the byte order (i.e. endianess)
Definition: bit-util.h:149
static jmethodID list_add_id_
ArrayList::add(V);.
static Status LocalToGlobalRef(JNIEnv *env, jobject local_ref, jobject *global_ref)
Definition: jni-util.cc:67
std::vector< jbyteArray > cf_arrays_
cf_arrays_[i-1] is the column family jbyteArray for column i.
const std::vector< HBaseColumnDescriptor > & cols() const
Definition: descriptors.h:281
bool LogError(const ErrorMsg &msg)
const std::vector< ExprContext * > output_expr_ctxs_
The expressions that are run to create tuples to be written to hbase.
RuntimeProfile * runtime_profile_
Parent table sink's profile.
static jclass put_cl_
org.apache.hadoop.hbase.client.Put
static Status InitJNI()
Grab all of the Java classes needed to get data into and out of HBase.
HBaseTableWriter(HBaseTableDescriptor *table_desc, const std::vector< ExprContext * > &output_expr_ctxs, RuntimeProfile *profile)
RuntimeProfile::Counter * htable_put_timer_
Status Init(RuntimeState *state)
const std::string & name() const
Definition: descriptors.h:163
#define RETURN_ERROR_IF_EXC(env)
Definition: jni-util.h:99
Status CreateByteArray(JNIEnv *env, const std::string &s, jbyteArray *j_array)
Create a byte array containing the string's chars.
static const Status OK
Definition: status.h:87
Status AppendRowBatch(RowBatch *batch)
static Status GetGlobalClassRef(JNIEnv *env, const char *class_str, jclass *class_ref)
Definition: jni-util.cc:56
RuntimeProfile::Counter * encoding_timer_
bool ok() const
Definition: status.h:172
static jclass list_cl_
java.util.ArrayList
void Close(RuntimeState *state)
Calls to Close release the HBaseTable.
HBaseTableDescriptor * table_desc_
Owned by RuntimeState not by this object.
static jmethodID list_ctor_
new ArrayList<V>(starting_capacity)
boost::scoped_ptr< HBaseTable > table_
JNIEnv * getJNIEnv(void)
C linkage for helper functions in hdfsJniHelper.h.