Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hive-udf-call.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exprs/hive-udf-call.h"
16 
17 #include <jni.h>
18 #include <sstream>
19 #include <string>
20 
21 #include "codegen/llvm-codegen.h"
22 #include "exprs/anyval-util.h"
23 #include "exprs/expr-context.h"
24 #include "rpc/jni-thrift-util.h"
25 #include "runtime/lib-cache.h"
26 #include "runtime/runtime-state.h"
27 #include "util/bit-util.h"
28 
29 #include "gen-cpp/Frontend_types.h"
30 
31 #include "common/names.h"
32 
33 const char* EXECUTOR_CLASS = "com/cloudera/impala/hive/executor/UdfExecutor";
34 const char* EXECUTOR_CTOR_SIGNATURE ="([B)V";
35 const char* EXECUTOR_EVALUATE_SIGNATURE = "()V";
36 const char* EXECUTOR_CLOSE_SIGNATURE = "()V";
37 
38 namespace impala {
39 
40 struct JniContext {
41  jclass cl;
42  jobject executor;
43  jmethodID evalute_id;
44  jmethodID close_id;
45 
51 
53 
55  : cl(NULL),
56  executor(NULL),
57  evalute_id(NULL),
58  close_id(NULL),
59  input_values_buffer(NULL),
60  input_nulls_buffer(NULL),
61  output_value_buffer(NULL),
62  warning_logged(false),
63  output_anyval(NULL) {
64  }
65 };
66 
67 HiveUdfCall::HiveUdfCall(const TExprNode& node)
68  : Expr(node),
69  input_buffer_size_(0) {
70  DCHECK_EQ(node.node_type, TExprNodeType::FUNCTION_CALL);
71  DCHECK_EQ(node.fn.binary_type, TFunctionBinaryType::HIVE);
72 }
73 
76  JniContext* jni_ctx = reinterpret_cast<JniContext*>(
77  fn_ctx->GetFunctionState(FunctionContext::THREAD_LOCAL));
78  DCHECK(jni_ctx != NULL);
79 
80  JNIEnv* env = getJNIEnv();
81  if (env == NULL) {
82  stringstream ss;
83  ss << "Hive UDF path=" << fn_.hdfs_location << " class=" << fn_.scalar_fn.symbol
84  << " failed due to JNI issue getting the JNIEnv object";
85  fn_ctx->SetError(ss.str().c_str());
86  jni_ctx->output_anyval->is_null = true;
87  return jni_ctx->output_anyval;
88  }
89 
90  // Evaluate all the children values and put the results in input_values_buffer
91  for (int i = 0; i < GetNumChildren(); ++i) {
92  void* v = ctx->GetValue(GetChild(i), row);
93 
94  if (v == NULL) {
95  jni_ctx->input_nulls_buffer[i] = 1;
96  } else {
97  uint8_t* input_ptr = jni_ctx->input_values_buffer + input_byte_offsets_[i];
98  jni_ctx->input_nulls_buffer[i] = 0;
99  switch (GetChild(i)->type().type) {
100  case TYPE_BOOLEAN:
101  case TYPE_TINYINT:
102  // Using explicit sizes helps the compiler unroll memcpy
103  memcpy(input_ptr, v, 1);
104  break;
105  case TYPE_SMALLINT:
106  memcpy(input_ptr, v, 2);
107  break;
108  case TYPE_INT:
109  case TYPE_FLOAT:
110  memcpy(input_ptr, v, 4);
111  break;
112  case TYPE_BIGINT:
113  case TYPE_DOUBLE:
114  memcpy(input_ptr, v, 8);
115  break;
116  case TYPE_TIMESTAMP:
117  case TYPE_STRING:
118  case TYPE_VARCHAR:
119  memcpy(input_ptr, v, 16);
120  break;
121  default:
122  DCHECK(false) << "NYI";
123  }
124  }
125  }
126 
127  // Using this version of Call has the lowest overhead. This eliminates the
128  // vtable lookup and setting up return stacks.
129  env->CallNonvirtualVoidMethodA(
130  jni_ctx->executor, jni_ctx->cl, jni_ctx->evalute_id, NULL);
131  Status status = JniUtil::GetJniExceptionMsg(env);
132  if (!status.ok()) {
133  if (!jni_ctx->warning_logged) {
134  stringstream ss;
135  ss << "Hive UDF path=" << fn_.hdfs_location << " class=" << fn_.scalar_fn.symbol
136  << " failed due to: " << status.GetDetail();
137  fn_ctx->AddWarning(ss.str().c_str());
138  jni_ctx->warning_logged = true;
139  }
140  jni_ctx->output_anyval->is_null = true;
141  return jni_ctx->output_anyval;
142  }
143 
144  // Write output_value_buffer to output_anyval
145  if (jni_ctx->output_null_value) {
146  jni_ctx->output_anyval->is_null = true;
147  } else {
149  }
150  return jni_ctx->output_anyval;
151 }
152 
154  ExprContext* ctx) {
155  RETURN_IF_ERROR(Expr::Prepare(state, row_desc, ctx));
156 
157  // Copy the Hive Jar from hdfs to local file system.
158  RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
159  fn_.hdfs_location, LibCache::TYPE_JAR, &local_location_));
160 
161  // Initialize input_byte_offsets_ and input_buffer_size_
162  for (int i = 0; i < GetNumChildren(); ++i) {
165  // Align all values up to 8 bytes. We don't care about footprint since we allocate
166  // one buffer for all rows and we never copy the entire buffer.
168  }
169 
170  // Register FunctionContext in ExprContext
171  RegisterFunctionContext(ctx, state);
172 
173  return Status::OK;
174 }
175 
178  RETURN_IF_ERROR(Expr::Open(state, ctx, scope));
179 
180  // Create a JniContext in this thread's FunctionContext
181  FunctionContext* fn_ctx = ctx->fn_context(context_index_);
182  JniContext* jni_ctx = new JniContext;
183  fn_ctx->SetFunctionState(FunctionContext::THREAD_LOCAL, jni_ctx);
184 
185  JNIEnv* env = getJNIEnv();
186  if (env == NULL) return Status("Failed to get/create JVM");
187 
189  jmethodID executor_ctor = env->GetMethodID(
190  jni_ctx->cl, "<init>", EXECUTOR_CTOR_SIGNATURE);
191  RETURN_ERROR_IF_EXC(env);
192  jni_ctx->evalute_id = env->GetMethodID(
193  jni_ctx->cl, "evaluate", EXECUTOR_EVALUATE_SIGNATURE);
194  RETURN_ERROR_IF_EXC(env);
195  jni_ctx->close_id = env->GetMethodID(
196  jni_ctx->cl, "close", EXECUTOR_CLOSE_SIGNATURE);
197  RETURN_ERROR_IF_EXC(env);
198 
199  THiveUdfExecutorCtorParams ctor_params;
200  ctor_params.fn = fn_;
201  ctor_params.local_location = local_location_;
202  ctor_params.input_byte_offsets = input_byte_offsets_;
203 
204  jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_];
205  jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()];
206  jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()];
207 
208  ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer;
209  ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer;
210  ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer;
211  ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value;
212 
213  jbyteArray ctor_params_bytes;
214 
215  // Add a scoped cleanup jni reference object. This cleans up local refs made
216  // below.
217  JniLocalFrame jni_frame;
218  RETURN_IF_ERROR(jni_frame.push(env));
219 
220  RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
221  // Create the java executor object
222  jni_ctx->executor = env->NewObject(jni_ctx->cl,
223  executor_ctor, ctor_params_bytes);
224  RETURN_ERROR_IF_EXC(env);
225  jni_ctx->executor = env->NewGlobalRef(jni_ctx->executor);
226 
227  jni_ctx->output_anyval = CreateAnyVal(type_);
228 
229  return Status::OK;
230 }
231 
234  FunctionContext* fn_ctx = ctx->fn_context(context_index_);
235  JniContext* jni_ctx = reinterpret_cast<JniContext*>(
236  fn_ctx->GetFunctionState(FunctionContext::THREAD_LOCAL));
237 
238  if (jni_ctx != NULL) {
239  JNIEnv* env = getJNIEnv();
240  if (jni_ctx->executor != NULL) {
241  env->CallNonvirtualVoidMethodA(
242  jni_ctx->executor, jni_ctx->cl, jni_ctx->close_id, NULL);
243  env->DeleteGlobalRef(jni_ctx->executor);
244  // Clear any exceptions. Not much we can do about them here.
245  Status status = JniUtil::GetJniExceptionMsg(env);
246  if (!status.ok()) VLOG_QUERY << status.GetDetail();
247  }
248  if (jni_ctx->input_values_buffer != NULL) {
249  delete[] jni_ctx->input_values_buffer;
250  jni_ctx->input_values_buffer = NULL;
251  }
252  if (jni_ctx->input_nulls_buffer != NULL) {
253  delete[] jni_ctx->input_nulls_buffer;
254  jni_ctx->input_nulls_buffer = NULL;
255  }
256  if (jni_ctx->output_value_buffer != NULL) {
257  delete[] jni_ctx->output_value_buffer;
258  jni_ctx->output_value_buffer = NULL;
259  }
260  if (jni_ctx->output_anyval != NULL) {
261  delete jni_ctx->output_anyval;
262  jni_ctx->output_anyval = NULL;
263  }
264  } else {
265  DCHECK(!ctx->opened_);
266  }
267 
268  Expr::Close(state, ctx, scope);
269 }
270 
272  return GetCodegendComputeFnWrapper(state, fn);
273 }
274 
275 string HiveUdfCall::DebugString() const {
276  stringstream out;
277  out << "HiveUdfCall(hdfs_location=" << fn_.hdfs_location
278  << " classname=" << fn_.scalar_fn.symbol << " "
279  << Expr::DebugString() << ")";
280  return out.str();
281 }
282 
284  DCHECK_EQ(type_.type, TYPE_BOOLEAN);
285  return *reinterpret_cast<BooleanVal*>(Evaluate(ctx, row));
286 }
287 
289  DCHECK_EQ(type_.type, TYPE_TINYINT);
290  return *reinterpret_cast<TinyIntVal*>(Evaluate(ctx, row));
291 }
292 
294  DCHECK_EQ(type_.type, TYPE_SMALLINT);
295  return * reinterpret_cast<SmallIntVal*>(Evaluate(ctx, row));
296 }
297 
299  DCHECK_EQ(type_.type, TYPE_INT);
300  return *reinterpret_cast<IntVal*>(Evaluate(ctx, row));
301 }
302 
304  DCHECK_EQ(type_.type, TYPE_BIGINT);
305  return *reinterpret_cast<BigIntVal*>(Evaluate(ctx, row));
306 }
307 
309  DCHECK_EQ(type_.type, TYPE_FLOAT);
310  return *reinterpret_cast<FloatVal*>(Evaluate(ctx, row));
311 }
312 
314  DCHECK_EQ(type_.type, TYPE_DOUBLE);
315  return *reinterpret_cast<DoubleVal*>(Evaluate(ctx, row));
316 }
317 
319  DCHECK_EQ(type_.type, TYPE_STRING);
320  return *reinterpret_cast<StringVal*>(Evaluate(ctx, row));
321 }
322 
324  DCHECK_EQ(type_.type, TYPE_TIMESTAMP);
325  return *reinterpret_cast<TimestampVal*>(Evaluate(ctx, row));
326 }
327 
329  DCHECK_EQ(type_.type, TYPE_DECIMAL);
330  return *reinterpret_cast<DecimalVal*>(Evaluate(ctx, row));
331 }
332 
333 }
AnyVal * CreateAnyVal(ObjectPool *pool, const ColumnType &type)
Creates the corresponding AnyVal subclass for type. The object is added to the pool.
Definition: anyval-util.cc:26
uint8_t * output_value_buffer
const std::string GetDetail() const
Definition: status.cc:184
virtual TimestampVal GetTimestampVal(ExprContext *ctx, TupleRow *)
virtual BooleanVal GetBooleanVal(ExprContext *ctx, TupleRow *)
virtual Status Open(RuntimeState *state, ExprContext *context, FunctionContext::FunctionStateScope scope=FunctionContext::FRAGMENT_LOCAL)
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
void * GetValue(TupleRow *row)
AnyVal * Evaluate(ExprContext *ctx, TupleRow *row)
virtual IntVal GetIntVal(ExprContext *ctx, TupleRow *)
int context_index_
Definition: expr.h:296
uint8_t * input_nulls_buffer
int GetSlotSize() const
Returns the size of a slot for this type.
Definition: types.h:212
This object has a compatible storage format with boost::ptime.
Definition: udf.h:495
virtual DecimalVal GetDecimalVal(ExprContext *ctx, TupleRow *)
uint8_t * input_values_buffer
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
virtual TinyIntVal GetTinyIntVal(ExprContext *ctx, TupleRow *)
bool AddWarning(const char *warning_msg)
Definition: udf.cc:345
bool is_null
Definition: udf.h:359
#define VLOG_QUERY
Definition: logging.h:57
PrimitiveType type
Definition: types.h:60
static Status GetJniExceptionMsg(JNIEnv *env, bool log_stack=true, const std::string &prefix="")
Definition: jni-util.cc:161
const char * EXECUTOR_CLOSE_SIGNATURE
Status GetCodegendComputeFnWrapper(RuntimeState *state, llvm::Function **fn)
Definition: expr.cc:546
const char * EXECUTOR_CLASS
void * GetFunctionState(FunctionStateScope scope) const
Definition: udf-ir.cc:38
static uint32_t RoundUpNumBytes(uint32_t bits)
Definition: bit-util.h:77
std::vector< int > input_byte_offsets_
Definition: hive-udf-call.h:97
virtual FloatVal GetFloatVal(ExprContext *ctx, TupleRow *)
HiveUdfCall(const TExprNode &node)
This is the superclass of all expr evaluation nodes.
Definition: expr.h:116
static LibCache * instance()
Definition: lib-cache.h:63
const char * EXECUTOR_EVALUATE_SIGNATURE
Status push(JNIEnv *env, int max_local_ref=10)
Definition: jni-util.cc:34
void SetFunctionState(FunctionStateScope scope, void *ptr)
Definition: udf.cc:370
virtual void Close(RuntimeState *state, ExprContext *context, FunctionContext::FunctionStateScope scope=FunctionContext::FRAGMENT_LOCAL)
Subclasses overriding this function should call Expr::Close().
virtual DoubleVal GetDoubleVal(ExprContext *ctx, TupleRow *)
Expr * GetChild(int i) const
Definition: expr.h:142
const char * EXECUTOR_CTOR_SIGNATURE
const RowDescriptor & row_desc() const
virtual BigIntVal GetBigIntVal(ExprContext *ctx, TupleRow *)
TFunction fn_
Function description.
Definition: expr.h:284
#define RETURN_ERROR_IF_EXC(env)
Definition: jni-util.h:99
std::string local_location_
The path on the local FS to the UDF's jar.
Definition: hive-udf-call.h:93
virtual SmallIntVal GetSmallIntVal(ExprContext *ctx, TupleRow *)
const ColumnType & type() const
Definition: expr.h:145
virtual Status Prepare(RuntimeState *state, const RowDescriptor &row_desc, ExprContext *ctx)
static const Status OK
Definition: status.h:87
virtual std::string DebugString() const
static Status GetGlobalClassRef(JNIEnv *env, const char *class_str, jclass *class_ref)
Definition: jni-util.cc:56
virtual Status GetCodegendComputeFn(RuntimeState *state, llvm::Function **fn)
const ColumnType type_
analysis is done, types are fixed at this point
Definition: expr.h:289
FunctionContext * fn_context(int i)
Definition: expr-context.h:100
virtual StringVal GetStringVal(ExprContext *ctx, TupleRow *)
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
Status SerializeThriftMsg(JNIEnv *env, T *msg, jbyteArray *serialized_msg)
bool ok() const
Definition: status.h:172
void SetError(const char *error_msg)
Definition: udf.cc:332
int input_buffer_size_
The size of the buffer for passing in input arguments.
virtual std::string DebugString() const
Definition: expr.cc:385
static void SetAnyVal(const void *slot, const ColumnType &type, AnyVal *dst)
Utility to put val into an AnyVal struct.
Definition: anyval-util.h:205
int GetNumChildren() const
Definition: expr.h:143
FunctionContext * RegisterFunctionContext(ExprContext *ctx, RuntimeState *state, int varargs_buffer_size=0)
Definition: expr.cc:80
JNIEnv * getJNIEnv(void)
C linkage for helper functions in hdfsJniHelper.h.