Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
external-data-source-executor.cc
Go to the documentation of this file.
1 // Copyright 2014 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <boost/thread.hpp>
18 #include <list>
19 #include <string>
20 
21 #include "common/logging.h"
22 #include "rpc/jni-thrift-util.h"
23 #include "runtime/lib-cache.h"
24 #include "util/parse-util.h"
25 
26 #include "common/names.h"
27 
28 using namespace impala;
29 using namespace impala::extdatasource;
30 
32  DCHECK(!is_initialized_);
33 }
34 
36  const string& class_name, const string& api_version) {
37  DCHECK(!is_initialized_);
38  string local_jar_path;
39  RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
40  jar_path, LibCache::TYPE_JAR, &local_jar_path));
41 
42  // TODO: Make finding the class and methods static, i.e. only loaded once
43  JniMethodDescriptor methods[] = {
44  {"<init>", "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)V", &ctor_},
45  {"open", "([B)[B", &open_id_},
46  {"getNext", "([B)[B", &get_next_id_},
47  {"close", "([B)[B", &close_id_}};
48 
49  JNIEnv* jni_env = getJNIEnv();
50 
51  // Add a scoped cleanup jni reference object. This cleans up local refs made below.
52  JniLocalFrame jni_frame;
53  RETURN_IF_ERROR(jni_frame.push(jni_env));
54 
55  jclass cl = jni_env->FindClass(
56  "com/cloudera/impala/extdatasource/ExternalDataSourceExecutor");
57  RETURN_ERROR_IF_EXC(jni_env);
58  executor_class_ = reinterpret_cast<jclass>(jni_env->NewGlobalRef(cl));
59  RETURN_ERROR_IF_EXC(jni_env);
60  uint32_t num_methods = sizeof(methods) / sizeof(methods[0]);
61  for (int i = 0; i < num_methods; ++i) {
62  RETURN_IF_ERROR(JniUtil::LoadJniMethod(jni_env, executor_class_, &(methods[i])));
63  }
64 
65  jstring jar_path_jstr = jni_env->NewStringUTF(local_jar_path.c_str());
66  RETURN_ERROR_IF_EXC(jni_env);
67  jstring class_name_jstr = jni_env->NewStringUTF(class_name.c_str());
68  RETURN_ERROR_IF_EXC(jni_env);
69  jstring api_version_jstr = jni_env->NewStringUTF(api_version.c_str());
70  RETURN_ERROR_IF_EXC(jni_env);
71 
72  jobject local_exec = jni_env->NewObject(executor_class_, ctor_, jar_path_jstr,
73  class_name_jstr, api_version_jstr);
74  RETURN_ERROR_IF_EXC(jni_env);
75  executor_ = jni_env->NewGlobalRef(local_exec);
76  RETURN_ERROR_IF_EXC(jni_env);
77  is_initialized_ = true;
78  return Status::OK;
79 }
80 
81 // JniUtil::CallJniMethod() does not compile when the template parameters are in
82 // another namespace. The issue seems to be that SerializeThriftMsg/DeserializeThriftMsg
83 // are not being generated for these types.
84 // TODO: Understand what's happening, remove, and use JniUtil::CallJniMethod
85 template <typename T, typename R>
86 Status CallJniMethod(const jobject& obj, const jmethodID& method, const T& arg,
87  R* response) {
88  JNIEnv* jni_env = getJNIEnv();
89  jbyteArray request_bytes;
90  JniLocalFrame jni_frame;
91  RETURN_IF_ERROR(jni_frame.push(jni_env));
92  RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &arg, &request_bytes));
93  jbyteArray result_bytes = static_cast<jbyteArray>(
94  jni_env->CallObjectMethod(obj, method, request_bytes));
95  RETURN_ERROR_IF_EXC(jni_env);
96  RETURN_IF_ERROR(DeserializeThriftMsg(jni_env, result_bytes, response));
97  return Status::OK;
98 }
99 
100 Status ExternalDataSourceExecutor::Open(const TOpenParams& params, TOpenResult* result) {
101  DCHECK(is_initialized_);
102  return CallJniMethod(executor_, open_id_, params, result);
103 }
104 
105 Status ExternalDataSourceExecutor::GetNext(const TGetNextParams& params,
106  TGetNextResult* result) {
107  DCHECK(is_initialized_);
108  return CallJniMethod(executor_, get_next_id_, params, result);
109 }
110 
111 Status ExternalDataSourceExecutor::Close(const TCloseParams& params,
112  TCloseResult* result) {
113  DCHECK(is_initialized_);
114  Status status = CallJniMethod(executor_, close_id_, params,
115  result);
116  JNIEnv* env = getJNIEnv();
117  if (executor_ != NULL) {
118  env->DeleteGlobalRef(executor_);
119  status.MergeStatus(JniUtil::GetJniExceptionMsg(env)); // no-op if Status == OK
120  }
121  if (executor_class_ != NULL) {
122  env->DeleteGlobalRef(executor_class_);
124  }
125  is_initialized_ = false;
126  return status;
127 }
Status Init(const std::string &jar_path, const std::string &class_name, const std::string &api_version)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
static Status LoadJniMethod(JNIEnv *jni_env, const jclass &jni_class, JniMethodDescriptor *descriptor)
Definition: jni-util.cc:193
void MergeStatus(const Status &status)
Definition: status.cc:172
Status CallJniMethod(const jobject &obj, const jmethodID &method, const T &arg, R *response)
static Status GetJniExceptionMsg(JNIEnv *env, bool log_stack=true, const std::string &prefix="")
Definition: jni-util.cc:161
static LibCache * instance()
Definition: lib-cache.h:63
Status push(JNIEnv *env, int max_local_ref=10)
Definition: jni-util.cc:34
Describes one method to look up in a Java object.
Definition: jni-util.h:149
#define RETURN_ERROR_IF_EXC(env)
Definition: jni-util.h:99
Status Open(const impala::extdatasource::TOpenParams &params, impala::extdatasource::TOpenResult *result)
Calls ExternalDataSource.open()
static const Status OK
Definition: status.h:87
Status DeserializeThriftMsg(JNIEnv *env, jbyteArray serialized_msg, T *deserialized_msg)
ImpaladQueryExecutor * executor_
execution state of coordinator fragment
Definition: expr-test.cc:71
Status SerializeThriftMsg(JNIEnv *env, T *msg, jbyteArray *serialized_msg)
Status Close(const impala::extdatasource::TCloseParams &params, impala::extdatasource::TCloseResult *result)
JNIEnv * getJNIEnv(void)
C linkage for helper functions in hdfsJniHelper.h.
Status GetNext(const impala::extdatasource::TGetNextParams &params, impala::extdatasource::TGetNextResult *result)
Calls ExternalDataSource.getNext()