15 package com.cloudera.impala.extdatasource;
18 import java.lang.reflect.Constructor;
20 import java.net.URLClassLoader;
22 import org.apache.commons.lang.ArrayUtils;
23 import org.apache.thrift.TException;
24 import org.apache.thrift.TSerializer;
25 import org.apache.thrift.protocol.TBinaryProtocol;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
33 import com.cloudera.impala.extdatasource.thrift.TCloseParams;
34 import com.cloudera.impala.extdatasource.thrift.TCloseResult;
35 import com.cloudera.impala.extdatasource.thrift.TGetNextParams;
36 import com.cloudera.impala.extdatasource.thrift.TGetNextResult;
37 import com.cloudera.impala.extdatasource.thrift.TOpenParams;
38 import com.cloudera.impala.extdatasource.thrift.TOpenResult;
39 import com.cloudera.impala.extdatasource.thrift.TPrepareParams;
40 import com.cloudera.impala.extdatasource.thrift.TPrepareResult;
41 import com.cloudera.impala.extdatasource.v1.ExternalDataSource;
42 import com.cloudera.impala.thrift.TErrorCode;
43 import com.cloudera.impala.thrift.TStatus;
44 import com.google.common.base.Preconditions;
45 import com.google.common.collect.Lists;
59 private final static Logger
LOG = LoggerFactory.getLogger(
60 ExternalDataSourceExecutor.class);
62 new TBinaryProtocol.Factory();
77 Preconditions.checkNotNull(jarPath);
80 if (apiVersion_ == null) {
87 URL url =
new File(jarPath).toURI().toURL();
88 URLClassLoader loader = URLClassLoader.newInstance(
89 new URL[] { url }, getClass().getClassLoader());
90 Class<?> c = Class.forName(className,
true, loader);
91 if (!ArrayUtils.contains(c.getInterfaces(), apiVersion_.getApiInterface())) {
93 "Class '%s' does not implement interface '%s' required for API version %s",
94 className, apiVersion_.getApiInterface().getName(), apiVersionStr));
96 Constructor<?> ctor = c.getConstructor();
97 dataSource_ = (ExternalDataSource) ctor.newInstance();
98 }
catch (Exception ex) {
100 "source library from path=%s className=%s apiVersion=%s", jarPath,
101 className, apiVersionStr), ex);
106 TPrepareParams params =
new TPrepareParams();
108 TPrepareResult result =
prepare(params);
110 return new TSerializer(protocolFactory_).serialize(result);
111 }
catch (TException e) {
117 TOpenParams params =
new TOpenParams();
119 TOpenResult result =
open(params);
121 return new TSerializer(protocolFactory_).serialize(result);
122 }
catch (TException e) {
128 TGetNextParams params =
new TGetNextParams();
130 TGetNextResult result =
getNext(params);
132 return new TSerializer(protocolFactory_).serialize(result);
133 }
catch (TException e) {
139 TCloseParams params =
new TCloseParams();
141 TCloseResult result =
close(params);
143 return new TSerializer(protocolFactory_).serialize(result);
144 }
catch (TException e) {
151 String exceptionMessage = e.getMessage();
152 if (exceptionMessage == null) {
153 exceptionMessage =
"No error message returned by data source. Check the " +
154 "impalad log for more information.";
156 String errorMessage = String.format(
157 "Error in data source (path=%s, class=%s, version=%s) %s: %s",
160 LOG.error(errorMessage, e);
161 return new TStatus(TErrorCode.RUNTIME_ERROR, Lists.newArrayList(errorMessage));
164 public TPrepareResult
prepare(TPrepareParams params) {
166 TPrepareResult result = dataSource_.prepare(params);
169 }
catch (Exception e) {
174 public TOpenResult
open(TOpenParams params) {
176 TOpenResult result = dataSource_.open(params);
179 }
catch (Exception e) {
184 public TGetNextResult
getNext(TGetNextParams params) {
186 TGetNextResult result = dataSource_.getNext(params);
189 }
catch (Exception e) {
194 public TCloseResult
close(TCloseParams params) {
196 TCloseResult result = dataSource_.close(params);
199 }
catch (Exception e) {
final ExternalDataSource dataSource_
TStatus logAndMakeErrorStatus(String opName, Exception e)
TOpenResult open(TOpenParams params)
TGetNextResult getNext(TGetNextParams params)
byte[] getNext(byte[] thriftParams)
ExternalDataSourceExecutor(String jarPath, String className, String apiVersionStr)
static final TBinaryProtocol.Factory protocolFactory_
byte[] prepare(byte[] thriftParams)
TCloseResult close(TCloseParams params)
final ApiVersion apiVersion_
TPrepareResult prepare(TPrepareParams params)
byte[] close(byte[] thriftParams)
byte[] open(byte[] thriftParams)