Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
fe-support.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 // This file contains implementations for the JNI FeSupport interface.
16 
17 #include "service/fe-support.h"
18 
19 #include <boost/scoped_ptr.hpp>
20 
21 #include "codegen/llvm-codegen.h"
22 #include "common/init.h"
23 #include "common/logging.h"
24 #include "common/status.h"
26 #include "exprs/expr.h"
27 #include "exprs/expr-context.h"
28 #include "runtime/exec-env.h"
29 #include "runtime/runtime-state.h"
30 #include "runtime/hdfs-fs-cache.h"
31 #include "runtime/lib-cache.h"
32 #include "runtime/client-cache.h"
33 #include "service/impala-server.h"
34 #include "util/cpu-info.h"
35 #include "util/disk-info.h"
36 #include "util/dynamic-util.h"
37 #include "util/jni-util.h"
38 #include "util/mem-info.h"
39 #include "util/symbols-util.h"
40 #include "rpc/jni-thrift-util.h"
41 #include "rpc/thrift-server.h"
42 #include "util/debug-util.h"
43 #include "gen-cpp/Data_types.h"
44 #include "gen-cpp/Frontend_types.h"
45 
46 #include "common/names.h"
47 
48 using namespace impala;
49 using namespace apache::thrift::server;
50 
51 // Called from the FE when it explicitly loads libfesupport.so for tests.
52 // This creates the minimal state necessary to service the other JNI calls.
53 // This is not called when we first start up the BE.
54 extern "C"
55 JNIEXPORT void JNICALL
57  JNIEnv* env, jclass caller_class) {
58  DCHECK(ExecEnv::GetInstance() == NULL) << "This should only be called once from the FE";
59  char* name = const_cast<char*>("FeSupport");
60  InitCommonRuntime(1, &name, false, TestInfo::FE_TEST);
62  ExecEnv* exec_env = new ExecEnv(); // This also caches it from the process.
63  exec_env->InitForFeTests();
64 }
65 
66 // Evaluates a batch of const exprs and returns the results in a serialized
67 // TResultRow, where each TColumnValue in the TResultRow stores the result of
68 // a predicate evaluation. It requires JniUtil::Init() to have been
69 // called.
70 extern "C"
71 JNIEXPORT jbyteArray JNICALL
73  JNIEnv* env, jclass caller_class, jbyteArray thrift_expr_batch,
74  jbyteArray thrift_query_ctx_bytes) {
75  jbyteArray result_bytes = NULL;
76  TQueryCtx query_ctx;
77  TExprBatch expr_batch;
78  JniLocalFrame jni_frame;
79  TResultRow expr_results;
81 
82  DeserializeThriftMsg(env, thrift_expr_batch, &expr_batch);
83  DeserializeThriftMsg(env, thrift_query_ctx_bytes, &query_ctx);
84  query_ctx.request.query_options.disable_codegen = true;
85  RuntimeState state(query_ctx);
86 
88  result_bytes);
89  // Exprs can allocate memory so we need to set up the mem trackers before
90  // preparing/running the exprs.
91  state.InitMemTrackers(TUniqueId(), NULL, -1);
92 
93  vector<TExpr>& texprs = expr_batch.exprs;
94  // Prepare the exprs
95  vector<ExprContext*> expr_ctxs;
96  for (vector<TExpr>::iterator it = texprs.begin(); it != texprs.end(); it++) {
97  ExprContext* ctx;
98  THROW_IF_ERROR_RET(Expr::CreateExprTree(&obj_pool, *it, &ctx), env,
99  JniUtil::internal_exc_class(), result_bytes);
101  env, JniUtil::internal_exc_class(), result_bytes);
102  expr_ctxs.push_back(ctx);
103  }
104 
105  if (state.codegen_created()) {
106  // Finalize the module so any UDF functions are jit'd
107  LlvmCodeGen* codegen = NULL;
108  state.GetCodegen(&codegen, /* initialize */ false);
109  DCHECK_NOTNULL(codegen);
110  codegen->EnableOptimizations(false);
111  codegen->FinalizeModule();
112  }
113 
114  vector<TColumnValue> results;
115  // Open and evaluate the exprs
116  for (int i = 0; i < expr_ctxs.size(); ++i) {
117  TColumnValue val;
118  THROW_IF_ERROR_RET(expr_ctxs[i]->Open(&state), env,
119  JniUtil::internal_exc_class(), result_bytes);
120  expr_ctxs[i]->GetValue(NULL, false, &val);
121  expr_ctxs[i]->Close(&state);
122  results.push_back(val);
123  }
124  expr_results.__set_colVals(results);
125  THROW_IF_ERROR_RET(SerializeThriftMsg(env, &expr_results, &result_bytes), env,
126  JniUtil::internal_exc_class(), result_bytes);
127  return result_bytes;
128 }
129 
130 // Does the symbol resolution, filling in the result in *result.
131 static void ResolveSymbolLookup(const TSymbolLookupParams params,
132  const vector<ColumnType>& arg_types, TSymbolLookupResult* result) {
133  LibCache::LibType type;
134  if (params.fn_binary_type == TFunctionBinaryType::NATIVE ||
135  params.fn_binary_type == TFunctionBinaryType::BUILTIN) {
136  // We use TYPE_SO for builtins, since LibCache does not resolve symbols for IR
137  // builtins. This is ok since builtins have the same symbol whether we run the IR or
138  // native versions.
139  type = LibCache::TYPE_SO;
140  } else if (params.fn_binary_type == TFunctionBinaryType::IR) {
141  type = LibCache::TYPE_IR;
142  } else if (params.fn_binary_type == TFunctionBinaryType::HIVE) {
143  type = LibCache::TYPE_JAR;
144  } else {
145  DCHECK(false) << params.fn_binary_type;
146  }
147 
148  // Builtin functions are loaded directly from the running process
149  if (params.fn_binary_type != TFunctionBinaryType::BUILTIN) {
150  // Refresh the library if necessary since we're creating a new function
151  LibCache::instance()->SetNeedsRefresh(params.location);
152  string dummy_local_path;
154  params.location, type, &dummy_local_path);
155  if (!status.ok()) {
156  result->__set_result_code(TSymbolLookupResultCode::BINARY_NOT_FOUND);
157  result->__set_error_msg(status.GetDetail());
158  return;
159  }
160  }
161 
162  // Check if the FE-specified symbol exists as-is.
163  // Set 'quiet' to true so we don't flood the log with unfound builtin symbols on
164  // startup.
165  Status status =
166  LibCache::instance()->CheckSymbolExists(params.location, type, params.symbol, true);
167  if (status.ok()) {
168  result->__set_result_code(TSymbolLookupResultCode::SYMBOL_FOUND);
169  result->__set_symbol(params.symbol);
170  return;
171  }
172 
173  if (params.fn_binary_type == TFunctionBinaryType::HIVE ||
174  SymbolsUtil::IsMangled(params.symbol)) {
175  // No use trying to mangle Hive or already mangled symbols, return the error.
176  // TODO: we can demangle the user symbol here and validate it against
177  // params.arg_types. This would prevent someone from typing the wrong symbol
178  // by accident. This requires more string parsing of the symbol.
179  result->__set_result_code(TSymbolLookupResultCode::SYMBOL_NOT_FOUND);
180  stringstream ss;
181  ss << "Could not find symbol '" << params.symbol << "' in: " << params.location;
182  result->__set_error_msg(ss.str());
183  VLOG(1) << ss.str() << endl << status.GetDetail();
184  return;
185  }
186 
187  string symbol = params.symbol;
188  ColumnType ret_type(INVALID_TYPE);
189  if (params.__isset.ret_arg_type) ret_type = ColumnType(params.ret_arg_type);
190 
191  // Mangle the user input
192  DCHECK_NE(params.fn_binary_type, TFunctionBinaryType::HIVE);
193  if (params.symbol_type == TSymbolType::UDF_EVALUATE) {
194  symbol = SymbolsUtil::MangleUserFunction(params.symbol,
195  arg_types, params.has_var_args, params.__isset.ret_arg_type ? &ret_type : NULL);
196  } else {
197  DCHECK(params.symbol_type == TSymbolType::UDF_PREPARE ||
198  params.symbol_type == TSymbolType::UDF_CLOSE);
199  symbol = SymbolsUtil::ManglePrepareOrCloseFunction(params.symbol);
200  }
201 
202  // Look up the mangled symbol
203  status = LibCache::instance()->CheckSymbolExists(params.location, type, symbol);
204  if (!status.ok()) {
205  result->__set_result_code(TSymbolLookupResultCode::SYMBOL_NOT_FOUND);
206  stringstream ss;
207  ss << "Could not find function " << params.symbol << "(";
208 
209  if (params.symbol_type == TSymbolType::UDF_EVALUATE) {
210  for (int i = 0; i < arg_types.size(); ++i) {
211  ss << arg_types[i].DebugString();
212  if (i != arg_types.size() - 1) ss << ", ";
213  }
214  } else {
215  ss << "impala_udf::FunctionContext*, "
216  << "impala_udf::FunctionContext::FunctionStateScope";
217  }
218 
219  ss << ")";
220  if (params.__isset.ret_arg_type) ss << " returns " << ret_type.DebugString();
221  ss << " in: " << params.location;
222  if (params.__isset.ret_arg_type) {
223  ss << "\nCheck that function name, arguments, and return type are correct.";
224  } else {
225  ss << "\nCheck that symbol and argument types are correct.";
226  }
227  result->__set_error_msg(ss.str());
228  return;
229  }
230 
231  // We were able to resolve the symbol.
232  result->__set_result_code(TSymbolLookupResultCode::SYMBOL_FOUND);
233  result->__set_symbol(symbol);
234 }
235 
236 extern "C"
237 JNIEXPORT jbyteArray JNICALL
239  JNIEnv* env, jclass caller_class, jbyteArray thrift_struct) {
240  TCacheJarParams params;
241  DeserializeThriftMsg(env, thrift_struct, &params);
242 
243  TCacheJarResult result;
244  string local_path;
245  Status status = LibCache::instance()->GetLocalLibPath(params.hdfs_location,
246  LibCache::TYPE_JAR, &local_path);
247  status.ToThrift(&result.status);
248  if (status.ok()) result.__set_local_path(local_path);
249 
250  jbyteArray result_bytes = NULL;
251  THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env,
252  JniUtil::internal_exc_class(), result_bytes);
253  return result_bytes;
254 }
255 
256 extern "C"
257 JNIEXPORT jbyteArray JNICALL
259  JNIEnv* env, jclass caller_class, jbyteArray thrift_struct) {
260  TSymbolLookupParams lookup;
261  DeserializeThriftMsg(env, thrift_struct, &lookup);
262 
263  vector<ColumnType> arg_types;
264  for (int i = 0; i < lookup.arg_types.size(); ++i) {
265  arg_types.push_back(ColumnType(lookup.arg_types[i]));
266  }
267 
268  TSymbolLookupResult result;
269  ResolveSymbolLookup(lookup, arg_types, &result);
270 
271  jbyteArray result_bytes = NULL;
272  THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env,
273  JniUtil::internal_exc_class(), result_bytes);
274  return result_bytes;
275 }
276 
277 // Calls in to the catalog server to request prioritizing the loading of metadata for
278 // specific catalog objects.
279 extern "C"
280 JNIEXPORT jbyteArray JNICALL
282  JNIEnv* env, jclass caller_class, jbyteArray thrift_struct) {
283  TPrioritizeLoadRequest request;
284  DeserializeThriftMsg(env, thrift_struct, &request);
285 
286  CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), NULL, NULL);
287  TPrioritizeLoadResponse result;
288  Status status = catalog_op_executor.PrioritizeLoad(request, &result);
289  if (!status.ok()) {
290  LOG(ERROR) << status.GetDetail();
291  // Create a new Status, copy in this error, then update the result.
292  Status catalog_service_status(result.status);
293  catalog_service_status.MergeStatus(status);
294  status.ToThrift(&result.status);
295  }
296 
297  jbyteArray result_bytes = NULL;
298  THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env,
299  JniUtil::internal_exc_class(), result_bytes);
300  return result_bytes;
301 }
302 
303 extern "C"
304 JNIEXPORT jbyteArray JNICALL
306  jclass caller_class) {
307  TStartupOptions options;
308  ExecEnv* exec_env = ExecEnv::GetInstance();
309  ImpalaServer* impala_server = exec_env->impala_server();
310  options.__set_compute_lineage(impala_server->IsLineageLoggingEnabled());
311  jbyteArray result_bytes = NULL;
312  THROW_IF_ERROR_RET(SerializeThriftMsg(env, &options, &result_bytes), env,
313  JniUtil::internal_exc_class(), result_bytes);
314  return result_bytes;
315 }
316 
317 namespace impala {
318 
319 static JNINativeMethod native_methods[] = {
320  {
321  (char*)"NativeFeTestInit", (char*)"()V",
323  },
324  {
325  (char*)"NativeEvalConstExprs", (char*)"([B[B)[B",
327  },
328  {
329  (char*)"NativeCacheJar", (char*)"([B)[B",
331  },
332  {
333  (char*)"NativeLookupSymbol", (char*)"([B)[B",
335  },
336  {
337  (char*)"NativePrioritizeLoad", (char*)"([B)[B",
339  },
340  {
341  (char*)"NativeGetStartupOptions", (char*)"()[B",
343  },
344 };
345 
347  JNIEnv* env = getJNIEnv();
348  jclass native_backend_cl = env->FindClass("com/cloudera/impala/service/FeSupport");
349  env->RegisterNatives(native_backend_cl, native_methods,
350  sizeof(native_methods) / sizeof(native_methods[0]));
351  EXIT_IF_EXC(env);
352 }
353 
354 }
Status CheckSymbolExists(const std::string &hdfs_lib_file, LibType type, const std::string &symbol, bool quiet=false)
Definition: lib-cache.cc:192
const std::string GetDetail() const
Definition: status.cc:184
void InitFeSupport()
Definition: fe-support.cc:346
JNIEXPORT jbyteArray JNICALL Java_com_cloudera_impala_service_FeSupport_NativeEvalConstExprs(JNIEnv *env, jclass caller_class, jbyteArray thrift_expr_batch, jbyteArray thrift_query_ctx_bytes)
Definition: fe-support.cc:72
bool codegen_created() const
static JNINativeMethod native_methods[]
Definition: fe-support.cc:319
Status InitForFeTests()
Initializes the exec env for running FE tests.
Definition: exec-env.cc:276
void InitMemTrackers(const TUniqueId &query_id, const std::string *request_pool, int64_t query_bytes_limit, int64_t query_rm_reservation_limit_bytes=-1)
void InitCommonRuntime(int argc, char **argv, bool init_jvm, TestInfo::Mode m=TestInfo::NON_TEST)
Definition: init.cc:122
#define EXIT_IF_EXC(env)
Definition: jni-util.h:85
MemTracker * query_mem_tracker()
static jclass internal_exc_class()
Global reference to InternalException class.
Definition: jni-util.h:206
void MergeStatus(const Status &status)
Definition: status.cc:172
JNIEXPORT jbyteArray JNICALL Java_com_cloudera_impala_service_FeSupport_NativeLookupSymbol(JNIEnv *env, jclass caller_class, jbyteArray thrift_struct)
Definition: fe-support.cc:258
bool IsLineageLoggingEnabled()
Returns true if lineage logging is enabled, false otherwise.
ImpalaServer * impala_server()
Definition: exec-env.h:90
ObjectPool * obj_pool()
Returns a local object pool.
Definition: coordinator.h:263
Status Prepare(RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
Definition: expr-context.cc:47
LLVM code generator. This is the top level object to generate jitted code.
Definition: llvm-codegen.h:107
static std::string ManglePrepareOrCloseFunction(const std::string &fn_name)
std::string DebugString() const
Definition: types.cc:194
static bool IsMangled(const std::string &symbol)
Returns true if this symbol is mangled.
Definition: symbols-util.cc:53
#define THROW_IF_ERROR_RET(stmt, env, impala_exc_cl, ret)
Definition: jni-util.h:46
void ToThrift(TStatus *status) const
Convert into TStatus.
Definition: status.cc:188
Status GetLocalLibPath(const std::string &hdfs_lib_file, LibType type, std::string *local_path)
Definition: lib-cache.cc:181
static LibCache * instance()
Definition: lib-cache.h:63
Status push(JNIEnv *env, int max_local_ref=10)
Definition: jni-util.cc:34
JNIEXPORT jbyteArray JNICALL Java_com_cloudera_impala_service_FeSupport_NativePrioritizeLoad(JNIEnv *env, jclass caller_class, jbyteArray thrift_struct)
Definition: fe-support.cc:281
static Status CreateExprTree(ObjectPool *pool, const TExpr &texpr, ExprContext **ctx)
Definition: expr.cc:129
static void ResolveSymbolLookup(const TSymbolLookupParams params, const vector< ColumnType > &arg_types, TSymbolLookupResult *result)
Definition: fe-support.cc:131
static ExecEnv * GetInstance()
Definition: exec-env.h:63
static void InitializeLlvm(bool load_backend=false)
Definition: llvm-codegen.cc:78
void SetNeedsRefresh(const std::string &hdfs_lib_file)
Definition: lib-cache.cc:221
Status GetCodegen(LlvmCodeGen **codegen, bool initialize=true)
Status PrioritizeLoad(const TPrioritizeLoadRequest &req, TPrioritizeLoadResponse *result)
Status DeserializeThriftMsg(JNIEnv *env, jbyteArray serialized_msg, T *deserialized_msg)
static std::string MangleUserFunction(const std::string &fn_name, const std::vector< ColumnType > &arg_types, bool has_var_args=false, ColumnType *ret_argument=NULL)
Status SerializeThriftMsg(JNIEnv *env, T *msg, jbyteArray *serialized_msg)
bool ok() const
Definition: status.h:172
void EnableOptimizations(bool enable)
Turns on/off optimization passes.
JNIEXPORT void JNICALL Java_com_cloudera_impala_service_FeSupport_NativeFeTestInit(JNIEnv *env, jclass caller_class)
Definition: fe-support.cc:56
string name
Definition: cpu-info.cc:50
JNIEXPORT jbyteArray JNICALL Java_com_cloudera_impala_service_FeSupport_NativeCacheJar(JNIEnv *env, jclass caller_class, jbyteArray thrift_struct)
Definition: fe-support.cc:238
JNIEnv * getJNIEnv(void)
C linkage for helper functions in hdfsJniHelper.h.
JNIEXPORT jbyteArray JNICALL Java_com_cloudera_impala_service_FeSupport_NativeGetStartupOptions(JNIEnv *env, jclass caller_class)
Definition: fe-support.cc:305