15 package com.cloudera.impala.hive.executor;
18 import java.lang.reflect.Constructor;
19 import java.lang.reflect.InvocationTargetException;
20 import java.lang.reflect.Method;
21 import java.net.MalformedURLException;
23 import java.net.URLClassLoader;
24 import java.util.ArrayList;
26 import org.apache.hadoop.hive.ql.exec.UDF;
27 import org.apache.hadoop.hive.serde2.io.ByteWritable;
28 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
29 import org.apache.hadoop.hive.serde2.io.ShortWritable;
30 import org.apache.hadoop.io.BooleanWritable;
31 import org.apache.hadoop.io.BytesWritable;
32 import org.apache.hadoop.io.FloatWritable;
33 import org.apache.hadoop.io.IntWritable;
34 import org.apache.hadoop.io.LongWritable;
35 import org.apache.hadoop.io.Text;
36 import org.apache.hadoop.io.Writable;
37 import org.apache.log4j.Logger;
38 import org.apache.thrift.protocol.TBinaryProtocol;
45 import com.cloudera.impala.thrift.THiveUdfExecutorCtorParams;
47 import com.google.common.base.Joiner;
48 import com.google.common.base.Preconditions;
49 import com.google.common.collect.Lists;
56 @SuppressWarnings(
"restriction")
58 private static final Logger LOG = Logger.getLogger(UdfExecutor.class);
60 private static final String UDF_FUNCTION_NAME =
"evaluate";
63 private final static TBinaryProtocol.Factory protocolFactory =
64 new TBinaryProtocol.Factory();
104 ArrayList<Long> allocations_ = Lists.newArrayList();
111 THiveUdfExecutorCtorParams request =
new THiveUdfExecutorCtorParams();
112 JniUtil.deserializeThrift(protocolFactory, request, thriftParams);
114 String className = request.fn.scalar_fn.symbol;
115 String jarFile = request.local_location;
116 Type retType = Type.fromThrift(request.fn.ret_type);
117 Type[] parameterTypes =
new Type[request.fn.arg_types.size()];
118 for (
int i = 0; i < request.fn.arg_types.size(); ++i) {
119 parameterTypes[i] = Type.fromThrift(request.fn.arg_types.get(i));
121 inputBufferPtr_ = request.input_buffer_ptr;
122 inputNullsPtr_ = request.input_nulls_ptr;
123 outputBufferPtr_ = request.output_buffer_ptr;
124 outputNullPtr_ = request.output_null_ptr;
125 outBufferStringPtr_ = 0;
126 outBufferCapacity_ = 0;
127 inputBufferOffsets_ =
new int[request.input_byte_offsets.size()];
128 for (
int i = 0; i < request.input_byte_offsets.size(); ++i) {
129 inputBufferOffsets_[i] = request.input_byte_offsets.get(i).intValue();
132 init(jarFile, className, retType, parameterTypes);
144 Type retType,
Type... parameterTypes)
147 inputBufferOffsets_ =
new int[parameterTypes.length];
149 int inputBufferSize = 0;
150 for (
int i = 0; i < parameterTypes.length; ++i) {
151 inputBufferOffsets_[i] = inputBufferSize;
152 inputBufferSize += parameterTypes[i].getSlotSize();
155 inputBufferPtr_ = UnsafeUtil.UNSAFE.allocateMemory(inputBufferSize);
156 inputNullsPtr_ = UnsafeUtil.UNSAFE.allocateMemory(parameterTypes.length);
157 outputBufferPtr_ = UnsafeUtil.UNSAFE.allocateMemory(retType.getSlotSize());
158 outputNullPtr_ = UnsafeUtil.UNSAFE.allocateMemory(1);
159 allocations_.add(inputBufferPtr_);
160 allocations_.add(inputNullsPtr_);
161 allocations_.add(outputBufferPtr_);
162 allocations_.add(outputNullPtr_);
163 outBufferStringPtr_ = 0;
164 outBufferCapacity_ = 0;
166 init(jarFile, udfPath, retType, parameterTypes);
180 UnsafeUtil.UNSAFE.freeMemory(outBufferStringPtr_);
181 outBufferStringPtr_ = 0;
182 outBufferCapacity_ = 0;
184 for (
long ptr: allocations_) {
185 UnsafeUtil.UNSAFE.freeMemory(ptr);
187 allocations_.clear();
196 for (
int i = 0; i < argTypes_.length; ++i) {
198 if (isArgString_[i]) {
203 inputArgs_[i] = inputObjects_[i];
206 inputArgs_[i] = null;
209 evaluate(inputArgs_);
210 }
catch (Exception e) {
211 e.printStackTrace(System.err);
212 throw new ImpalaRuntimeException(
"UDF::evaluate() ran into a problem.", e);
223 storeUdfResult(method_.invoke(udf_, args));
225 return outputBufferPtr_;
226 }
catch (IllegalArgumentException e) {
227 throw new ImpalaRuntimeException(
"UDF failed to evaluate", e);
228 }
catch (IllegalAccessException e) {
229 throw new ImpalaRuntimeException(
"UDF failed to evaluate", e);
230 }
catch (InvocationTargetException e) {
231 throw new ImpalaRuntimeException(
"UDF failed to evaluate", e);
242 if (c == BooleanWritable.class) {
244 }
else if (c == ByteWritable.class) {
246 }
else if (c == ShortWritable.class) {
248 }
else if (c == IntWritable.class) {
250 }
else if (c == LongWritable.class) {
252 }
else if (c == FloatWritable.class) {
254 }
else if (c == DoubleWritable.class) {
256 }
else if (c == BytesWritable.class || c == Text.class || c == String.class) {
268 UnsafeUtil.UNSAFE.putByte(outputNullPtr_, (byte)1);
272 UnsafeUtil.UNSAFE.putByte(outputNullPtr_, (byte)0);
276 BooleanWritable val = (BooleanWritable)obj;
277 UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, val.get() ? (byte)1 : 0);
281 ByteWritable val = (ByteWritable)obj;
282 UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, val.get());
286 ShortWritable val = (ShortWritable)obj;
287 UnsafeUtil.UNSAFE.putShort(outputBufferPtr_, val.get());
291 IntWritable val = (IntWritable)obj;
292 UnsafeUtil.UNSAFE.putInt(outputBufferPtr_, val.get());
296 LongWritable val = (LongWritable)obj;
297 UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, val.get());
301 FloatWritable val = (FloatWritable)obj;
302 UnsafeUtil.UNSAFE.putFloat(outputBufferPtr_, val.get());
306 DoubleWritable val = (DoubleWritable)obj;
307 UnsafeUtil.UNSAFE.putDouble(outputBufferPtr_, val.get());
312 if (obj instanceof byte[]) {
313 bytes = (byte[]) obj;
314 }
else if (obj instanceof BytesWritable) {
315 bytes = ((BytesWritable)obj).copyBytes();
316 }
else if (obj instanceof Text) {
317 bytes = ((Text)obj).copyBytes();
318 }
else if (obj instanceof String) {
319 bytes = ((String)obj).getBytes();
324 if (bytes.length > outBufferCapacity_) {
325 outBufferStringPtr_ =
326 UnsafeUtil.UNSAFE.reallocateMemory(outBufferStringPtr_, bytes.length);
327 outBufferCapacity_ = bytes.length;
328 UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, outBufferStringPtr_);
330 UnsafeUtil.Copy(outBufferStringPtr_, bytes, 0, bytes.length);
331 UnsafeUtil.UNSAFE.putInt(
345 inputObjects_ =
new Writable[argTypes_.length];
346 inputArgs_ =
new Object[argTypes_.length];
347 isArgString_ =
new boolean[argTypes_.length];
349 for (
int i = 0; i < argTypes_.length; ++i) {
350 int offset = inputBufferOffsets_[i];
351 switch (argTypes_[i].getPrimitiveType()) {
374 if (method_.getParameterTypes()[i] == Text.class) {
376 inputObjects_[i] = w;
377 }
else if (method_.getParameterTypes()[i] == BytesWritable.class) {
379 inputObjects_[i] = w;
380 }
else if (method_.getParameterTypes()[i] == String.class) {
381 isArgString_[i] =
true;
384 inputObjects_[i] = w;
386 throw new ImpalaRuntimeException(
387 "Unsupported argument type: " + method_.getParameterTypes()[i]);
392 throw new ImpalaRuntimeException(
"Unsupported argument type: " + argTypes_[i]);
397 private ClassLoader
getClassLoader(String jarPath)
throws MalformedURLException {
398 if (jarPath == null) {
399 return ClassLoader.getSystemClassLoader();
401 URL url =
new File(jarPath).toURI().toURL();
402 return URLClassLoader.newInstance(
new URL[] { url }, getClass().getClassLoader());
410 private void init(String jarPath, String udfPath,
411 Type retType,
Type... parameterTypes) throws
413 ArrayList<String> signatures = Lists.newArrayList();
415 LOG.debug(
"Loading UDF '" + udfPath +
"' from " + jarPath);
416 ClassLoader loader = getClassLoader(jarPath);
417 Class<?> c = Class.forName(udfPath,
true, loader);
418 Class<? extends UDF> udfClass = c.asSubclass(UDF.class);
419 Constructor<? extends UDF> ctor = udfClass.getConstructor();
420 udf_ = ctor.newInstance();
422 argTypes_ = parameterTypes;
423 Method[] methods = udfClass.getMethods();
424 for (Method m: methods) {
426 if (!m.getName().equals(UDF_FUNCTION_NAME))
continue;
427 signatures.add(m.toGenericString());
428 Class<?>[] methodTypes = m.getParameterTypes();
431 if (methodTypes.length != parameterTypes.length)
continue;
432 if (methodTypes.length == 0 && parameterTypes.length == 0) {
435 LOG.debug(
"Loaded UDF '" + udfPath +
"' from " + jarPath);
439 boolean incompatible =
false;
440 for (
int i = 0; i < methodTypes.length; ++i) {
441 if (getPrimitiveType(methodTypes[i]) != parameterTypes[i].getPrimitiveType()) {
447 if (incompatible)
continue;
449 allocateInputObjects();
450 LOG.debug(
"Loaded UDF '" + udfPath +
"' from " + jarPath);
454 StringBuilder sb =
new StringBuilder();
455 sb.append(
"Unable to find evaluate function with the correct signature: ")
456 .append(udfPath +
".evaluate(")
457 .append(Joiner.on(
", ").join(argTypes_))
459 .append(
"UDF contains: \n ")
460 .append(Joiner.on(
"\n ").join(signatures));
461 throw new ImpalaRuntimeException(sb.toString());
462 }
catch (MalformedURLException e) {
463 throw new ImpalaRuntimeException(
"Unable load jar.", e);
464 }
catch (SecurityException e) {
465 throw new ImpalaRuntimeException(
"Unable to load function.", e);
466 }
catch (ClassNotFoundException e) {
467 throw new ImpalaRuntimeException(
"Unable to find class.", e);
468 }
catch (NoSuchMethodException e) {
469 throw new ImpalaRuntimeException(
470 "Unable to find constructor with no arguments.", e);
471 }
catch (IllegalArgumentException e) {
472 throw new ImpalaRuntimeException(
473 "Unable to call UDF constructor with no arguments.", e);
474 }
catch (InstantiationException e) {
475 throw new ImpalaRuntimeException(
"Unable to call create UDF instance.", e);
476 }
catch (IllegalAccessException e) {
477 throw new ImpalaRuntimeException(
"Unable to call create UDF instance.", e);
478 }
catch (InvocationTargetException e) {
479 throw new ImpalaRuntimeException(
"Unable to call create UDF instance.", e);
final int[] inputBufferOffsets_
final long inputNullsPtr_
long evaluate(Object...args)
static final Unsafe UNSAFE
PrimitiveType getPrimitiveType()
void storeUdfResult(Object obj)
void allocateInputObjects()
static final int STRING_VALUE_LEN_OFFSET
void init(String jarPath, String udfPath, Type retType, Type...parameterTypes)
ClassLoader getClassLoader(String jarPath)
UdfExecutor(String jarFile, String udfPath, Type retType, Type...parameterTypes)
final long outputNullPtr_
uint8_t offset[7 *64-sizeof(uint64_t)]
final long inputBufferPtr_
UdfExecutor(byte[] thriftParams)
final long outputBufferPtr_
PrimitiveType getPrimitiveType(Class<?> c)