Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ExternalDataSourceExecutor.java
Go to the documentation of this file.
1 // Copyright 2014 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.extdatasource;
16 
17 import java.io.File;
18 import java.lang.reflect.Constructor;
19 import java.net.URL;
20 import java.net.URLClassLoader;
21 
22 import org.apache.commons.lang.ArrayUtils;
23 import org.apache.thrift.TException;
24 import org.apache.thrift.TSerializer;
25 import org.apache.thrift.protocol.TBinaryProtocol;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 
33 import com.cloudera.impala.extdatasource.thrift.TCloseParams;
34 import com.cloudera.impala.extdatasource.thrift.TCloseResult;
35 import com.cloudera.impala.extdatasource.thrift.TGetNextParams;
36 import com.cloudera.impala.extdatasource.thrift.TGetNextResult;
37 import com.cloudera.impala.extdatasource.thrift.TOpenParams;
38 import com.cloudera.impala.extdatasource.thrift.TOpenResult;
39 import com.cloudera.impala.extdatasource.thrift.TPrepareParams;
40 import com.cloudera.impala.extdatasource.thrift.TPrepareResult;
41 import com.cloudera.impala.extdatasource.v1.ExternalDataSource;
42 import com.cloudera.impala.thrift.TErrorCode;
43 import com.cloudera.impala.thrift.TStatus;
44 import com.google.common.base.Preconditions;
45 import com.google.common.collect.Lists;
46 
59  private final static Logger LOG = LoggerFactory.getLogger(
60  ExternalDataSourceExecutor.class);
61  private final static TBinaryProtocol.Factory protocolFactory_ =
62  new TBinaryProtocol.Factory();
63 
64  private final ApiVersion apiVersion_;
65  private final ExternalDataSource dataSource_;
66  private final String jarPath_;
67  private final String className_;
68 
75  public ExternalDataSourceExecutor(String jarPath, String className,
76  String apiVersionStr) throws ImpalaException {
77  Preconditions.checkNotNull(jarPath);
78 
79  apiVersion_ = ApiVersion.valueOf(apiVersionStr);
80  if (apiVersion_ == null) {
81  throw new ImpalaRuntimeException("Invalid API version: " + apiVersionStr);
82  }
83  jarPath_ = jarPath;
84  className_ = className;
85 
86  try {
87  URL url = new File(jarPath).toURI().toURL();
88  URLClassLoader loader = URLClassLoader.newInstance(
89  new URL[] { url }, getClass().getClassLoader());
90  Class<?> c = Class.forName(className, true, loader);
91  if (!ArrayUtils.contains(c.getInterfaces(), apiVersion_.getApiInterface())) {
92  throw new ImpalaRuntimeException(String.format(
93  "Class '%s' does not implement interface '%s' required for API version %s",
94  className, apiVersion_.getApiInterface().getName(), apiVersionStr));
95  }
96  Constructor<?> ctor = c.getConstructor();
97  dataSource_ = (ExternalDataSource) ctor.newInstance();
98  } catch (Exception ex) {
99  throw new ImpalaRuntimeException(String.format("Unable to load external data " +
100  "source library from path=%s className=%s apiVersion=%s", jarPath,
101  className, apiVersionStr), ex);
102  }
103  }
104 
105  public byte[] prepare(byte[] thriftParams) throws ImpalaException {
106  TPrepareParams params = new TPrepareParams();
107  JniUtil.deserializeThrift(protocolFactory_, params, thriftParams);
108  TPrepareResult result = prepare(params);
109  try {
110  return new TSerializer(protocolFactory_).serialize(result);
111  } catch (TException e) {
112  throw new InternalException(e.getMessage(), e);
113  }
114  }
115 
116  public byte[] open(byte[] thriftParams) throws ImpalaException {
117  TOpenParams params = new TOpenParams();
118  JniUtil.deserializeThrift(protocolFactory_, params, thriftParams);
119  TOpenResult result = open(params);
120  try {
121  return new TSerializer(protocolFactory_).serialize(result);
122  } catch (TException e) {
123  throw new InternalException(e.getMessage(), e);
124  }
125  }
126 
127  public byte[] getNext(byte[] thriftParams) throws ImpalaException {
128  TGetNextParams params = new TGetNextParams();
129  JniUtil.deserializeThrift(protocolFactory_, params, thriftParams);
130  TGetNextResult result = getNext(params);
131  try {
132  return new TSerializer(protocolFactory_).serialize(result);
133  } catch (TException e) {
134  throw new InternalException(e.getMessage(), e);
135  }
136  }
137 
138  public byte[] close(byte[] thriftParams) throws ImpalaException {
139  TCloseParams params = new TCloseParams();
140  JniUtil.deserializeThrift(protocolFactory_, params, thriftParams);
141  TCloseResult result = close(params);
142  try {
143  return new TSerializer(protocolFactory_).serialize(result);
144  } catch (TException e) {
145  throw new InternalException(e.getMessage(), e);
146  }
147  }
148 
149  // Helper method to log the exception to capture the stack and return an error TStatus
150  private TStatus logAndMakeErrorStatus(String opName, Exception e) {
151  String exceptionMessage = e.getMessage();
152  if (exceptionMessage == null) {
153  exceptionMessage = "No error message returned by data source. Check the " +
154  "impalad log for more information.";
155  }
156  String errorMessage = String.format(
157  "Error in data source (path=%s, class=%s, version=%s) %s: %s",
158  jarPath_, className_, apiVersion_.name(), opName,
159  exceptionMessage);
160  LOG.error(errorMessage, e); // Logs the stack
161  return new TStatus(TErrorCode.RUNTIME_ERROR, Lists.newArrayList(errorMessage));
162  }
163 
164  public TPrepareResult prepare(TPrepareParams params) {
165  try {
166  TPrepareResult result = dataSource_.prepare(params);
167  result.validate();
168  return result;
169  } catch (Exception e) {
170  return new TPrepareResult(logAndMakeErrorStatus("prepare()", e));
171  }
172  }
173 
174  public TOpenResult open(TOpenParams params) {
175  try {
176  TOpenResult result = dataSource_.open(params);
177  result.validate();
178  return result;
179  } catch (Exception e) {
180  return new TOpenResult(logAndMakeErrorStatus("open()", e));
181  }
182  }
183 
184  public TGetNextResult getNext(TGetNextParams params) {
185  try {
186  TGetNextResult result = dataSource_.getNext(params);
187  result.validate();
188  return result;
189  } catch (Exception e) {
190  return new TGetNextResult(logAndMakeErrorStatus("getNext()", e));
191  }
192  }
193 
194  public TCloseResult close(TCloseParams params) {
195  try {
196  TCloseResult result = dataSource_.close(params);
197  result.validate();
198  return result;
199  } catch (Exception e) {
200  return new TCloseResult(logAndMakeErrorStatus("close()", e));
201  }
202  }
203 }
ExternalDataSourceExecutor(String jarPath, String className, String apiVersionStr)