Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
UdfExecutor.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.hive.executor;
16 
17 import java.io.File;
18 import java.lang.reflect.Constructor;
19 import java.lang.reflect.InvocationTargetException;
20 import java.lang.reflect.Method;
21 import java.net.MalformedURLException;
22 import java.net.URL;
23 import java.net.URLClassLoader;
24 import java.util.ArrayList;
25 
26 import org.apache.hadoop.hive.ql.exec.UDF;
27 import org.apache.hadoop.hive.serde2.io.ByteWritable;
28 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
29 import org.apache.hadoop.hive.serde2.io.ShortWritable;
30 import org.apache.hadoop.io.BooleanWritable;
31 import org.apache.hadoop.io.BytesWritable;
32 import org.apache.hadoop.io.FloatWritable;
33 import org.apache.hadoop.io.IntWritable;
34 import org.apache.hadoop.io.LongWritable;
35 import org.apache.hadoop.io.Text;
36 import org.apache.hadoop.io.Writable;
37 import org.apache.log4j.Logger;
38 import org.apache.thrift.protocol.TBinaryProtocol;
39 
45 import com.cloudera.impala.thrift.THiveUdfExecutorCtorParams;
47 import com.google.common.base.Joiner;
48 import com.google.common.base.Preconditions;
49 import com.google.common.collect.Lists;
50 
51 // Wrapper object to run hive UDFs. This class works with UdfCallExpr in the
52 // backend to marshall data back and forth between the execution engine and
53 // the java UDF class.
54 // See the comments in be/src/exprs/hive-udf-call.h for more details.
55 // TODO: should we cache loaded jars and classes?
56 @SuppressWarnings("restriction")
57 public class UdfExecutor {
58  private static final Logger LOG = Logger.getLogger(UdfExecutor.class);
59  // By convention, the function in the class must be called evaluate()
60  private static final String UDF_FUNCTION_NAME = "evaluate";
61 
62  // Object to deserialize ctor params from BE.
63  private final static TBinaryProtocol.Factory protocolFactory =
64  new TBinaryProtocol.Factory();
65 
66  private UDF udf_;
67  private Method method_;
68  private Type[] argTypes_;
69  private Type retType_;
70 
71  // Input buffer from the backend. This is valid for the duration of an evaluate() call.
72  // These buffers are allocated in the BE.
73  private final long inputBufferPtr_;
74  private final long inputNullsPtr_;
75 
76  // This is the byte offset in inputBufferPtr to the start of the input argument.
77  // e.g. *inputBufferPtr_[inputBufferOffsets[i]] is the ith input argument.
78  private final int[] inputBufferOffsets_;
79 
80  // Output buffer to return non-string values. This buffers are allocated in the BE.
81  private final long outputBufferPtr_;
82  private final long outputNullPtr_;
83 
84  // For StringValue return types, outputBufferPtr_ is the location of the 16-byte
85  // StringValue object. StringValue.ptr is set to outBufferStringPtr_. This buffer
86  // grows as necessary to fit the return string.
87  // This is allocated from the FE.
88  private long outBufferStringPtr_;
89 
90  // Size of outBufferStringPtr_.
91  private int outBufferCapacity_;
92 
93  // Preconstructed input objects for the UDF. This minimizes object creation overhead
94  // as these objects are reused across calls to evaluate().
95  private Object[] inputObjects_;
96  private Object[] inputArgs_; // inputArgs_[i] is either inputObjects_[i] or null
97  // True if inputArgs_[i] is the java String type (and not a writable). We need
98  // to make a String object before calling the UDF.
99  // TODO: is there a unsafe way to make string objects?
100  private boolean[] isArgString_;
101 
102  // Allocations made from the native heap that need to be cleaned when this object
103  // is GC'ed.
104  ArrayList<Long> allocations_ = Lists.newArrayList();
105 
110  public UdfExecutor(byte[] thriftParams) throws ImpalaException {
111  THiveUdfExecutorCtorParams request = new THiveUdfExecutorCtorParams();
112  JniUtil.deserializeThrift(protocolFactory, request, thriftParams);
113 
114  String className = request.fn.scalar_fn.symbol;
115  String jarFile = request.local_location;
116  Type retType = Type.fromThrift(request.fn.ret_type);
117  Type[] parameterTypes = new Type[request.fn.arg_types.size()];
118  for (int i = 0; i < request.fn.arg_types.size(); ++i) {
119  parameterTypes[i] = Type.fromThrift(request.fn.arg_types.get(i));
120  }
121  inputBufferPtr_ = request.input_buffer_ptr;
122  inputNullsPtr_ = request.input_nulls_ptr;
123  outputBufferPtr_ = request.output_buffer_ptr;
124  outputNullPtr_ = request.output_null_ptr;
125  outBufferStringPtr_ = 0;
126  outBufferCapacity_ = 0;
127  inputBufferOffsets_ = new int[request.input_byte_offsets.size()];
128  for (int i = 0; i < request.input_byte_offsets.size(); ++i) {
129  inputBufferOffsets_[i] = request.input_byte_offsets.get(i).intValue();
130  }
131 
132  init(jarFile, className, retType, parameterTypes);
133  }
134 
143  public UdfExecutor(String jarFile, String udfPath,
144  Type retType, Type... parameterTypes)
145  throws ImpalaRuntimeException {
146 
147  inputBufferOffsets_ = new int[parameterTypes.length];
148 
149  int inputBufferSize = 0;
150  for (int i = 0; i < parameterTypes.length; ++i) {
151  inputBufferOffsets_[i] = inputBufferSize;
152  inputBufferSize += parameterTypes[i].getSlotSize();
153  }
154 
155  inputBufferPtr_ = UnsafeUtil.UNSAFE.allocateMemory(inputBufferSize);
156  inputNullsPtr_ = UnsafeUtil.UNSAFE.allocateMemory(parameterTypes.length);
157  outputBufferPtr_ = UnsafeUtil.UNSAFE.allocateMemory(retType.getSlotSize());
158  outputNullPtr_ = UnsafeUtil.UNSAFE.allocateMemory(1);
159  allocations_.add(inputBufferPtr_);
160  allocations_.add(inputNullsPtr_);
161  allocations_.add(outputBufferPtr_);
162  allocations_.add(outputNullPtr_);
163  outBufferStringPtr_ = 0;
164  outBufferCapacity_ = 0;
165 
166  init(jarFile, udfPath, retType, parameterTypes);
167  }
168 
169 
170  @Override
171  protected void finalize() throws Throwable {
172  close();
173  super.finalize();
174  }
175 
179  public void close() {
180  UnsafeUtil.UNSAFE.freeMemory(outBufferStringPtr_);
181  outBufferStringPtr_ = 0;
182  outBufferCapacity_ = 0;
183 
184  for (long ptr: allocations_) {
185  UnsafeUtil.UNSAFE.freeMemory(ptr);
186  }
187  allocations_.clear();
188  }
189 
194  public void evaluate() throws ImpalaRuntimeException {
195  try {
196  for (int i = 0; i < argTypes_.length; ++i) {
197  if (UnsafeUtil.UNSAFE.getByte(inputNullsPtr_ + i) == 0) {
198  if (isArgString_[i]) {
199  Preconditions.checkState(inputObjects_[i] instanceof ImpalaBytesWritable);
200  inputArgs_[i] =
201  new String(((ImpalaBytesWritable)inputObjects_[i]).getBytes());
202  } else {
203  inputArgs_[i] = inputObjects_[i];
204  }
205  } else {
206  inputArgs_[i] = null;
207  }
208  }
209  evaluate(inputArgs_);
210  } catch (Exception e) {
211  e.printStackTrace(System.err);
212  throw new ImpalaRuntimeException("UDF::evaluate() ran into a problem.", e);
213  }
214  }
215 
221  public long evaluate(Object... args) throws ImpalaRuntimeException {
222  try {
223  storeUdfResult(method_.invoke(udf_, args));
224  if (UnsafeUtil.UNSAFE.getByte(outputNullPtr_) == 1) return 0;
225  return outputBufferPtr_;
226  } catch (IllegalArgumentException e) {
227  throw new ImpalaRuntimeException("UDF failed to evaluate", e);
228  } catch (IllegalAccessException e) {
229  throw new ImpalaRuntimeException("UDF failed to evaluate", e);
230  } catch (InvocationTargetException e) {
231  throw new ImpalaRuntimeException("UDF failed to evaluate", e);
232  }
233  }
234 
235  public Method getMethod() { return method_; }
236 
237  // Returns the primitive type that c is for. 'c' is expected to be
238  // a subclass of Writable. This is a many to one mapping: e.g. many
239  // writables map to the same type.
240  private PrimitiveType getPrimitiveType(Class<?> c) {
241  // TODO: do we need to check c is a subclass of *Writable.class?
242  if (c == BooleanWritable.class) {
243  return PrimitiveType.BOOLEAN;
244  } else if (c == ByteWritable.class) {
245  return PrimitiveType.TINYINT;
246  } else if (c == ShortWritable.class) {
247  return PrimitiveType.SMALLINT;
248  } else if (c == IntWritable.class) {
249  return PrimitiveType.INT;
250  } else if (c == LongWritable.class) {
251  return PrimitiveType.BIGINT;
252  } else if (c == FloatWritable.class) {
253  return PrimitiveType.FLOAT;
254  } else if (c == DoubleWritable.class) {
255  return PrimitiveType.DOUBLE;
256  } else if (c == BytesWritable.class || c == Text.class || c == String.class) {
257  // TODO: we don't distinguish between these types and will pick between them
258  // arbitrarily. This can be problematic, if for example, the UDF has different
259  // behavior for Bytes vs. Text.
260  return PrimitiveType.STRING;
261  }
263  }
264 
265  // Sets the result object 'obj' into the outputBufferPtr_
266  private void storeUdfResult(Object obj) throws ImpalaRuntimeException {
267  if (obj == null) {
268  UnsafeUtil.UNSAFE.putByte(outputNullPtr_, (byte)1);
269  return;
270  }
271 
272  UnsafeUtil.UNSAFE.putByte(outputNullPtr_, (byte)0);
273 
274  switch (retType_.getPrimitiveType()) {
275  case BOOLEAN: {
276  BooleanWritable val = (BooleanWritable)obj;
277  UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, val.get() ? (byte)1 : 0);
278  return;
279  }
280  case TINYINT: {
281  ByteWritable val = (ByteWritable)obj;
282  UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, val.get());
283  return;
284  }
285  case SMALLINT: {
286  ShortWritable val = (ShortWritable)obj;
287  UnsafeUtil.UNSAFE.putShort(outputBufferPtr_, val.get());
288  return;
289  }
290  case INT: {
291  IntWritable val = (IntWritable)obj;
292  UnsafeUtil.UNSAFE.putInt(outputBufferPtr_, val.get());
293  return;
294  }
295  case BIGINT: {
296  LongWritable val = (LongWritable)obj;
297  UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, val.get());
298  return;
299  }
300  case FLOAT: {
301  FloatWritable val = (FloatWritable)obj;
302  UnsafeUtil.UNSAFE.putFloat(outputBufferPtr_, val.get());
303  return;
304  }
305  case DOUBLE: {
306  DoubleWritable val = (DoubleWritable)obj;
307  UnsafeUtil.UNSAFE.putDouble(outputBufferPtr_, val.get());
308  return;
309  }
310  case STRING: {
311  byte[] bytes = null;
312  if (obj instanceof byte[]) {
313  bytes = (byte[]) obj;
314  } else if (obj instanceof BytesWritable) {
315  bytes = ((BytesWritable)obj).copyBytes();
316  } else if (obj instanceof Text) {
317  bytes = ((Text)obj).copyBytes();
318  } else if (obj instanceof String) {
319  bytes = ((String)obj).getBytes();
320  } else {
321  throw new ImpalaRuntimeException("Unexpected return type: " + obj.getClass());
322  }
323 
324  if (bytes.length > outBufferCapacity_) {
325  outBufferStringPtr_ =
326  UnsafeUtil.UNSAFE.reallocateMemory(outBufferStringPtr_, bytes.length);
327  outBufferCapacity_ = bytes.length;
328  UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, outBufferStringPtr_);
329  }
330  UnsafeUtil.Copy(outBufferStringPtr_, bytes, 0, bytes.length);
331  UnsafeUtil.UNSAFE.putInt(
333  bytes.length);
334  return;
335  }
336  case TIMESTAMP:
337  default:
338  throw new ImpalaRuntimeException("Unsupported argument type: " + retType_);
339  }
340  }
341 
342  // Preallocate the input objects that will be passed to the underlying UDF.
343  // These objects are allocated once and reused across calls to evaluate()
345  inputObjects_ = new Writable[argTypes_.length];
346  inputArgs_ = new Object[argTypes_.length];
347  isArgString_ = new boolean[argTypes_.length];
348 
349  for (int i = 0; i < argTypes_.length; ++i) {
350  int offset = inputBufferOffsets_[i];
351  switch (argTypes_[i].getPrimitiveType()) {
352  case BOOLEAN:
353  inputObjects_[i] = new ImpalaBooleanWritable(inputBufferPtr_ + offset);
354  break;
355  case TINYINT:
356  inputObjects_[i] = new ImpalaTinyIntWritable(inputBufferPtr_ + offset);
357  break;
358  case SMALLINT:
359  inputObjects_[i] = new ImpalaSmallIntWritable(inputBufferPtr_ + offset);
360  break;
361  case INT:
362  inputObjects_[i] = new ImpalaIntWritable(inputBufferPtr_ + offset);
363  break;
364  case BIGINT:
365  inputObjects_[i] = new ImpalaBigIntWritable(inputBufferPtr_ + offset);
366  break;
367  case FLOAT:
368  inputObjects_[i] = new ImpalaFloatWritable(inputBufferPtr_ + offset);
369  break;
370  case DOUBLE:
371  inputObjects_[i] = new ImpalaDoubleWritable(inputBufferPtr_ + offset);
372  break;
373  case STRING:
374  if (method_.getParameterTypes()[i] == Text.class) {
375  ImpalaTextWritable w = new ImpalaTextWritable(inputBufferPtr_ + offset);
376  inputObjects_[i] = w;
377  } else if (method_.getParameterTypes()[i] == BytesWritable.class) {
378  ImpalaBytesWritable w = new ImpalaBytesWritable(inputBufferPtr_ + offset);
379  inputObjects_[i] = w;
380  } else if (method_.getParameterTypes()[i] == String.class) {
381  isArgString_[i] = true;
382  // String can be mapped to any String-like Writable class.
383  ImpalaBytesWritable w = new ImpalaBytesWritable(inputBufferPtr_ + offset);
384  inputObjects_[i] = w;
385  } else {
386  throw new ImpalaRuntimeException(
387  "Unsupported argument type: " + method_.getParameterTypes()[i]);
388  }
389  break;
390  case TIMESTAMP:
391  default:
392  throw new ImpalaRuntimeException("Unsupported argument type: " + argTypes_[i]);
393  }
394  }
395  }
396 
397  private ClassLoader getClassLoader(String jarPath) throws MalformedURLException {
398  if (jarPath == null) {
399  return ClassLoader.getSystemClassLoader();
400  } else {
401  URL url = new File(jarPath).toURI().toURL();
402  return URLClassLoader.newInstance(new URL[] { url }, getClass().getClassLoader());
403  }
404  }
405 
410  private void init(String jarPath, String udfPath,
411  Type retType, Type... parameterTypes) throws
413  ArrayList<String> signatures = Lists.newArrayList();
414  try {
415  LOG.debug("Loading UDF '" + udfPath + "' from " + jarPath);
416  ClassLoader loader = getClassLoader(jarPath);
417  Class<?> c = Class.forName(udfPath, true, loader);
418  Class<? extends UDF> udfClass = c.asSubclass(UDF.class);
419  Constructor<? extends UDF> ctor = udfClass.getConstructor();
420  udf_ = ctor.newInstance();
421  retType_ = retType;
422  argTypes_ = parameterTypes;
423  Method[] methods = udfClass.getMethods();
424  for (Method m: methods) {
425  // By convention, the udf must contain the function "evaluate"
426  if (!m.getName().equals(UDF_FUNCTION_NAME)) continue;
427  signatures.add(m.toGenericString());
428  Class<?>[] methodTypes = m.getParameterTypes();
429 
430  // Try to match the arguments
431  if (methodTypes.length != parameterTypes.length) continue;
432  if (methodTypes.length == 0 && parameterTypes.length == 0) {
433  // Special case where the UDF doesn't take any input args
434  method_ = m;
435  LOG.debug("Loaded UDF '" + udfPath + "' from " + jarPath);
436  return;
437  }
438 
439  boolean incompatible = false;
440  for (int i = 0; i < methodTypes.length; ++i) {
441  if (getPrimitiveType(methodTypes[i]) != parameterTypes[i].getPrimitiveType()) {
442  incompatible = true;
443  break;
444  }
445  }
446 
447  if (incompatible) continue;
448  method_ = m;
449  allocateInputObjects();
450  LOG.debug("Loaded UDF '" + udfPath + "' from " + jarPath);
451  return;
452  }
453 
454  StringBuilder sb = new StringBuilder();
455  sb.append("Unable to find evaluate function with the correct signature: ")
456  .append(udfPath + ".evaluate(")
457  .append(Joiner.on(", ").join(argTypes_))
458  .append(")\n")
459  .append("UDF contains: \n ")
460  .append(Joiner.on("\n ").join(signatures));
461  throw new ImpalaRuntimeException(sb.toString());
462  } catch (MalformedURLException e) {
463  throw new ImpalaRuntimeException("Unable load jar.", e);
464  } catch (SecurityException e) {
465  throw new ImpalaRuntimeException("Unable to load function.", e);
466  } catch (ClassNotFoundException e) {
467  throw new ImpalaRuntimeException("Unable to find class.", e);
468  } catch (NoSuchMethodException e) {
469  throw new ImpalaRuntimeException(
470  "Unable to find constructor with no arguments.", e);
471  } catch (IllegalArgumentException e) {
472  throw new ImpalaRuntimeException(
473  "Unable to call UDF constructor with no arguments.", e);
474  } catch (InstantiationException e) {
475  throw new ImpalaRuntimeException("Unable to call create UDF instance.", e);
476  } catch (IllegalAccessException e) {
477  throw new ImpalaRuntimeException("Unable to call create UDF instance.", e);
478  } catch (InvocationTargetException e) {
479  throw new ImpalaRuntimeException("Unable to call create UDF instance.", e);
480  }
481  }
482 }
PrimitiveType getPrimitiveType()
Definition: Type.java:188
void init(String jarPath, String udfPath, Type retType, Type...parameterTypes)
PrimitiveType
Definition: types.h:27
ClassLoader getClassLoader(String jarPath)
UdfExecutor(String jarFile, String udfPath, Type retType, Type...parameterTypes)
uint8_t offset[7 *64-sizeof(uint64_t)]