Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
FeSupport.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.service;
16 
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Set;
20 
21 import org.apache.thrift.TDeserializer;
22 import org.apache.thrift.TException;
23 import org.apache.thrift.TSerializer;
24 import org.apache.thrift.protocol.TBinaryProtocol;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 
33 import com.cloudera.impala.thrift.TCacheJarParams;
34 import com.cloudera.impala.thrift.TCacheJarResult;
35 import com.cloudera.impala.thrift.TCatalogObject;
36 import com.cloudera.impala.thrift.TCatalogObjectType;
37 import com.cloudera.impala.thrift.TCatalogServiceRequestHeader;
38 import com.cloudera.impala.thrift.TColumnValue;
39 import com.cloudera.impala.thrift.TExprBatch;
40 import com.cloudera.impala.thrift.TPrioritizeLoadRequest;
41 import com.cloudera.impala.thrift.TPrioritizeLoadResponse;
42 import com.cloudera.impala.thrift.TQueryCtx;
43 import com.cloudera.impala.thrift.TResultRow;
44 import com.cloudera.impala.thrift.TStatus;
45 import com.cloudera.impala.thrift.TStartupOptions;
46 import com.cloudera.impala.thrift.TSymbolLookupParams;
47 import com.cloudera.impala.thrift.TSymbolLookupResult;
48 import com.cloudera.impala.thrift.TTable;
50 import com.google.common.base.Preconditions;
51 
60 public class FeSupport {
61  private final static Logger LOG = LoggerFactory.getLogger(FeSupport.class);
62  private static boolean loaded_ = false;
63 
64  // Only called if this library is explicitly loaded. This only happens
65  // when running FE tests.
66  public native static void NativeFeTestInit();
67 
68  // Returns a serialized TResultRow
69  public native static byte[] NativeEvalConstExprs(byte[] thriftExprBatch,
70  byte[] thriftQueryGlobals);
71 
72  // Returns a serialized TSymbolLookupResult
73  public native static byte[] NativeLookupSymbol(byte[] thriftSymbolLookup);
74 
75  // Returns a serialized TCacheJarResult
76  public native static byte[] NativeCacheJar(byte[] thriftCacheJar);
77 
78  // Does an RPCs to the Catalog Server to prioritize the metadata loading of a
79  // one or more catalog objects. To keep our kerberos configuration consolidated,
80  // we make make all RPCs in the BE layer instead of calling the Catalog Server
81  // using Java Thrift bindings.
82  public native static byte[] NativePrioritizeLoad(byte[] thriftReq);
83 
84  // Return select BE startup options as a serialized TStartupOptions
85  public native static byte[] NativeGetStartupOptions();
86 
94  public static TCacheJarResult CacheJar(String hdfsLocation) throws InternalException {
95  Preconditions.checkNotNull(hdfsLocation);
96  TCacheJarParams params = new TCacheJarParams(hdfsLocation);
97  TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
98  byte[] result;
99  try {
100  result = CacheJar(serializer.serialize(params));
101  Preconditions.checkNotNull(result);
102  TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
103  TCacheJarResult thriftResult = new TCacheJarResult();
104  deserializer.deserialize(thriftResult, result);
105  return thriftResult;
106  } catch (TException e) {
107  // this should never happen
108  throw new InternalException(
109  "Couldn't cache jar at HDFS location " + hdfsLocation, e);
110  }
111  }
112 
113  private static byte[] CacheJar(byte[] thriftParams) {
114  try {
115  return NativeCacheJar(thriftParams);
116  } catch (UnsatisfiedLinkError e) {
117  loadLibrary();
118  }
119  return NativeCacheJar(thriftParams);
120  }
121 
122  public static TColumnValue EvalConstExpr(Expr expr, TQueryCtx queryCtx)
123  throws InternalException {
124  Preconditions.checkState(expr.isConstant());
125  TExprBatch exprBatch = new TExprBatch();
126  exprBatch.addToExprs(expr.treeToThrift());
127  TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
128  byte[] result;
129  try {
130  result = EvalConstExprs(serializer.serialize(exprBatch),
131  serializer.serialize(queryCtx));
132  Preconditions.checkNotNull(result);
133  TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
134  TResultRow val = new TResultRow();
135  deserializer.deserialize(val, result);
136  Preconditions.checkState(val.getColValsSize() == 1);
137  return val.getColVals().get(0);
138  } catch (TException e) {
139  // this should never happen
140  throw new InternalException("couldn't execute expr " + expr.toSql(), e);
141  }
142  }
143 
144  private static byte[] LookupSymbol(byte[] thriftParams) {
145  try {
146  return NativeLookupSymbol(thriftParams);
147  } catch (UnsatisfiedLinkError e) {
148  loadLibrary();
149  }
150  return NativeLookupSymbol(thriftParams);
151  }
152 
153  public static TSymbolLookupResult LookupSymbol(TSymbolLookupParams params)
154  throws InternalException {
155  TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
156  try {
157  byte[] resultBytes = LookupSymbol(serializer.serialize(params));
158  Preconditions.checkNotNull(resultBytes);
159  TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
160  TSymbolLookupResult result = new TSymbolLookupResult();
161  deserializer.deserialize(result, resultBytes);
162  return result;
163  } catch (TException e) {
164  // this should never happen
165  throw new InternalException("couldn't perform symbol lookup.", e);
166  }
167  }
168 
169  private static byte[] EvalConstExprs(byte[] thriftExprBatch,
170  byte[] thriftQueryContext) {
171  try {
172  return NativeEvalConstExprs(thriftExprBatch, thriftQueryContext);
173  } catch (UnsatisfiedLinkError e) {
174  loadLibrary();
175  }
176  return NativeEvalConstExprs(thriftExprBatch, thriftQueryContext);
177  }
178 
179  public static boolean EvalPredicate(Expr pred, TQueryCtx queryCtx)
180  throws InternalException {
181  // Shortcuts to avoid expensive BE evaluation.
182  if (pred instanceof BoolLiteral) return ((BoolLiteral) pred).getValue();
183  if (pred instanceof NullLiteral) return false;
184  Preconditions.checkState(pred.getType().isBoolean());
185  TColumnValue val = EvalConstExpr(pred, queryCtx);
186  // Return false if pred evaluated to false or NULL. True otherwise.
187  return val.isBool_val() && val.bool_val;
188  }
189 
199  public static TResultRow EvalPredicateBatch(ArrayList<Expr> exprs,
200  TQueryCtx queryCtx) throws InternalException {
201  TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
202  TExprBatch exprBatch = new TExprBatch();
203  for (Expr expr: exprs) {
204  // Make sure we only process boolean exprs.
205  Preconditions.checkState(expr.getType().isBoolean());
206  Preconditions.checkState(expr.isConstant());
207  exprBatch.addToExprs(expr.treeToThrift());
208  }
209  byte[] result;
210  try {
211  result = EvalConstExprs(serializer.serialize(exprBatch),
212  serializer.serialize(queryCtx));
213  Preconditions.checkNotNull(result);
214  TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
215  TResultRow val = new TResultRow();
216  deserializer.deserialize(val, result);
217  return val;
218  } catch (TException e) {
219  // this should never happen
220  throw new InternalException("couldn't execute a batch of exprs.", e);
221  }
222  }
223 
224  private static byte[] PrioritizeLoad(byte[] thriftReq) {
225  try {
226  return NativePrioritizeLoad(thriftReq);
227  } catch (UnsatisfiedLinkError e) {
228  loadLibrary();
229  }
230  return NativePrioritizeLoad(thriftReq);
231  }
232 
233  public static TStatus PrioritizeLoad(Set<TableName> tableNames)
234  throws InternalException {
235  Preconditions.checkNotNull(tableNames);
236 
237  List<TCatalogObject> objectDescs = new ArrayList<TCatalogObject>(tableNames.size());
238  for (TableName tableName: tableNames) {
239  TCatalogObject catalogObject = new TCatalogObject();
240  catalogObject.setType(TCatalogObjectType.TABLE);
241  catalogObject.setTable(new TTable(tableName.getDb(), tableName.getTbl()));
242  objectDescs.add(catalogObject);
243  }
244 
245  TPrioritizeLoadRequest request = new TPrioritizeLoadRequest ();
246  request.setHeader(new TCatalogServiceRequestHeader());
247  request.setObject_descs(objectDescs);
248 
249  TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
250  try {
251  byte[] result = PrioritizeLoad(serializer.serialize(request));
252  Preconditions.checkNotNull(result);
253  TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
254  TPrioritizeLoadResponse response = new TPrioritizeLoadResponse();
255  deserializer.deserialize(response, result);
256  return response.getStatus();
257  } catch (TException e) {
258  // this should never happen
259  throw new InternalException("Error processing request: " + e.getMessage(), e);
260  }
261  }
262 
263  public static TStartupOptions GetStartupOptions() throws InternalException {
264  try {
265  byte[] result = NativeGetStartupOptions();
266  Preconditions.checkNotNull(result);
267  TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
268  TStartupOptions options = new TStartupOptions();
269  deserializer.deserialize(options, result);
270  return options;
271  } catch (TException e) {
272  throw new InternalException("Error retrieving startup options: " + e.getMessage(),
273  e);
274  }
275  }
276 
281  private static synchronized void loadLibrary() {
282  if (loaded_) return;
283  LOG.info("Loading libfesupport.so");
284  NativeLibUtil.loadLibrary("libfesupport.so");
285  LOG.info("Loaded libfesupport.so");
286  loaded_ = true;
288  }
289 }
static TSymbolLookupResult LookupSymbol(TSymbolLookupParams params)
Definition: FeSupport.java:153
static TCacheJarResult CacheJar(String hdfsLocation)
Definition: FeSupport.java:94
static native byte[] NativeCacheJar(byte[] thriftCacheJar)
static native byte[] NativeLookupSymbol(byte[] thriftSymbolLookup)
static byte[] CacheJar(byte[] thriftParams)
Definition: FeSupport.java:113
static native void NativeFeTestInit()
static TColumnValue EvalConstExpr(Expr expr, TQueryCtx queryCtx)
Definition: FeSupport.java:122
static native byte[] NativePrioritizeLoad(byte[] thriftReq)
static boolean EvalPredicate(Expr pred, TQueryCtx queryCtx)
Definition: FeSupport.java:179
static TStatus PrioritizeLoad(Set< TableName > tableNames)
Definition: FeSupport.java:233
static byte[] LookupSymbol(byte[] thriftParams)
Definition: FeSupport.java:144
static TResultRow EvalPredicateBatch(ArrayList< Expr > exprs, TQueryCtx queryCtx)
Definition: FeSupport.java:199
static byte[] EvalConstExprs(byte[] thriftExprBatch, byte[] thriftQueryContext)
Definition: FeSupport.java:169
static TStartupOptions GetStartupOptions()
Definition: FeSupport.java:263
static synchronized void loadLibrary()
Definition: FeSupport.java:281
static byte[] PrioritizeLoad(byte[] thriftReq)
Definition: FeSupport.java:224
static native byte[] NativeGetStartupOptions()
static native byte[] NativeEvalConstExprs(byte[] thriftExprBatch, byte[] thriftQueryGlobals)