Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DataSourceScanNode.java
Go to the documentation of this file.
1 // Copyright 2014 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.planner;
16 
17 import java.util.List;
18 
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21 
36 import com.cloudera.impala.extdatasource.thrift.TBinaryPredicate;
37 import com.cloudera.impala.extdatasource.thrift.TColumnDesc;
38 import com.cloudera.impala.extdatasource.thrift.TComparisonOp;
39 import com.cloudera.impala.extdatasource.thrift.TPrepareParams;
40 import com.cloudera.impala.extdatasource.thrift.TPrepareResult;
42 import com.cloudera.impala.thrift.TCacheJarResult;
43 import com.cloudera.impala.thrift.TColumnValue;
44 import com.cloudera.impala.thrift.TDataSourceScanNode;
45 import com.cloudera.impala.thrift.TErrorCode;
46 import com.cloudera.impala.thrift.TExplainLevel;
47 import com.cloudera.impala.thrift.TNetworkAddress;
48 import com.cloudera.impala.thrift.TPlanNode;
49 import com.cloudera.impala.thrift.TPlanNodeType;
50 import com.cloudera.impala.thrift.TQueryOptions;
51 import com.cloudera.impala.thrift.TScanRange;
52 import com.cloudera.impala.thrift.TScanRangeLocation;
53 import com.cloudera.impala.thrift.TScanRangeLocations;
54 import com.cloudera.impala.thrift.TStatus;
55 import com.google.common.base.Joiner;
56 import com.google.common.base.Objects;
57 import com.google.common.base.Preconditions;
58 import com.google.common.collect.ImmutableList;
59 import com.google.common.collect.Lists;
60 
64 public class DataSourceScanNode extends ScanNode {
65  private final static Logger LOG = LoggerFactory.getLogger(DataSourceScanNode.class);
66  private final TupleDescriptor desc_;
67  private final DataSourceTable table_;
68 
69  // The converted conjuncts_ that were accepted by the data source. A conjunct can
70  // be converted if it contains only disjunctive predicates of the form
71  // <slotref> <op> <constant>.
72  private List<List<TBinaryPredicate>> acceptedPredicates_;
73 
74  // The conjuncts that were accepted by the data source and removed from conjuncts_ in
75  // removeAcceptedConjuncts(). Only used in getNodeExplainString() to print the
76  // conjuncts applied by the data source.
77  private List<Expr> acceptedConjuncts_;
78 
79  // The number of rows estimate as returned by prepare().
80  private long numRowsEstimate_;
81 
83  super(id, desc, "SCAN DATA SOURCE");
84  desc_ = desc;
86  acceptedPredicates_ = null;
87  acceptedConjuncts_ = null;
88  }
89 
90  @Override
91  public void init(Analyzer analyzer) throws InternalException {
92  assignConjuncts(analyzer);
93  analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_);
95  computeStats(analyzer);
96  // materialize slots in remaining conjuncts_
97  analyzer.materializeSlots(conjuncts_);
98  computeMemLayout(analyzer);
99  computeScanRangeLocations(analyzer);
100  }
101 
106  public static TColumnValue literalToColumnValue(LiteralExpr expr) {
107  switch (expr.getType().getPrimitiveType()) {
108  case BOOLEAN:
109  return new TColumnValue().setBool_val(((BoolLiteral) expr).getValue());
110  case TINYINT:
111  return new TColumnValue().setByte_val(
112  (byte) ((NumericLiteral) expr).getLongValue());
113  case SMALLINT:
114  return new TColumnValue().setShort_val(
115  (short) ((NumericLiteral) expr).getLongValue());
116  case INT:
117  return new TColumnValue().setInt_val(
118  (int) ((NumericLiteral) expr).getLongValue());
119  case BIGINT:
120  return new TColumnValue().setLong_val(((NumericLiteral) expr).getLongValue());
121  case FLOAT:
122  case DOUBLE:
123  return new TColumnValue().setDouble_val(
124  ((NumericLiteral) expr).getDoubleValue());
125  case STRING:
126  return new TColumnValue().setString_val(((StringLiteral) expr).getValue());
127  case DECIMAL:
128  case DATE:
129  case DATETIME:
130  case TIMESTAMP:
131  // TODO: we support DECIMAL and TIMESTAMP but no way to specify it in SQL.
132  return null;
133  default:
134  Preconditions.checkState(false);
135  return null;
136  }
137  }
138 
144  private void prepareDataSource() throws InternalException {
145  // Binary predicates that will be offered to the data source.
146  List<List<TBinaryPredicate>> offeredPredicates = Lists.newArrayList();
147  // The index into conjuncts_ for each element in offeredPredicates.
148  List<Integer> conjunctsIdx = Lists.newArrayList();
149  for (int i = 0; i < conjuncts_.size(); ++i) {
150  Expr conjunct = conjuncts_.get(i);
151  List<TBinaryPredicate> disjuncts = getDisjuncts(conjunct);
152  if (disjuncts != null) {
153  offeredPredicates.add(disjuncts);
154  conjunctsIdx.add(i);
155  }
156  }
157 
158  String hdfsLocation = table_.getDataSource().getHdfs_location();
159  TCacheJarResult cacheResult = FeSupport.CacheJar(hdfsLocation);
160  TStatus cacheJarStatus = cacheResult.getStatus();
161  if (cacheJarStatus.getStatus_code() != TErrorCode.OK) {
162  throw new InternalException(String.format(
163  "Unable to cache data source library at location '%s'. Check that the file " +
164  "exists and is readable. Message: %s",
165  hdfsLocation, Joiner.on("\n").join(cacheJarStatus.getError_msgs())));
166  }
167  String localPath = cacheResult.getLocal_path();
168  String className = table_.getDataSource().getClass_name();
169  String apiVersion = table_.getDataSource().getApi_version();
170  TPrepareResult prepareResult;
171  TStatus prepareStatus;
172  try {
174  localPath, className, apiVersion);
175  TPrepareParams prepareParams = new TPrepareParams();
176  prepareParams.setInit_string(table_.getInitString());
177  prepareParams.setPredicates(offeredPredicates);
178  // TODO: Include DB (i.e. getFullName())?
179  prepareParams.setTable_name(table_.getName());
180  prepareResult = executor.prepare(prepareParams);
181  prepareStatus = prepareResult.getStatus();
182  } catch (Exception e) {
183  throw new InternalException(String.format(
184  "Error calling prepare() on data source %s",
186  }
187  if (prepareStatus.getStatus_code() != TErrorCode.OK) {
188  throw new InternalException(String.format(
189  "Data source %s returned an error from prepare(): %s",
191  Joiner.on("\n").join(prepareStatus.getError_msgs())));
192  }
193 
194  numRowsEstimate_ = prepareResult.getNum_rows_estimate();
195  acceptedPredicates_ = Lists.newArrayList();
196  List<Integer> acceptedPredicatesIdx = prepareResult.isSetAccepted_conjuncts() ?
197  prepareResult.getAccepted_conjuncts() : ImmutableList.<Integer>of();
198  for (Integer acceptedIdx: acceptedPredicatesIdx) {
199  acceptedPredicates_.add(offeredPredicates.get(acceptedIdx));
200  }
201  removeAcceptedConjuncts(acceptedPredicatesIdx, conjunctsIdx);
202  }
203 
210  private List<TBinaryPredicate> getDisjuncts(Expr conjunct) {
211  List<TBinaryPredicate> disjuncts = Lists.newArrayList();
212  if (getDisjunctsHelper(conjunct, disjuncts)) return disjuncts;
213  return null;
214  }
215 
216  // Recursive helper method for getDisjuncts().
217  private boolean getDisjunctsHelper(Expr conjunct,
218  List<TBinaryPredicate> predicates) {
219  if (conjunct instanceof BinaryPredicate) {
220  if (conjunct.getChildren().size() != 2) return false;
221  SlotRef slotRef = null;
222  LiteralExpr literalExpr = null;
223  TComparisonOp op = null;
224  if ((conjunct.getChild(0).unwrapSlotRef(true) instanceof SlotRef) &&
225  (conjunct.getChild(1) instanceof LiteralExpr)) {
226  slotRef = (SlotRef) conjunct.getChild(0).unwrapSlotRef(true);
227  literalExpr = (LiteralExpr) conjunct.getChild(1);
228  op = ((BinaryPredicate) conjunct).getOp().getThriftOp();
229  } else if ((conjunct.getChild(1).unwrapSlotRef(true) instanceof SlotRef) &&
230  (conjunct.getChild(0) instanceof LiteralExpr)) {
231  slotRef = (SlotRef) conjunct.getChild(1).unwrapSlotRef(true);
232  literalExpr = (LiteralExpr) conjunct.getChild(0);
233  op = ((BinaryPredicate) conjunct).getOp().converse().getThriftOp();
234  } else {
235  return false;
236  }
237 
238  TColumnValue val = literalToColumnValue(literalExpr);
239  if (val == null) return false; // false if unsupported type, e.g.
240 
241  String colName = Joiner.on(".").join(slotRef.getResolvedPath().getRawPath());
242  TColumnDesc col = new TColumnDesc().setName(colName).setType(
243  slotRef.getType().toThrift());
244  predicates.add(new TBinaryPredicate().setCol(col).setOp(op).setValue(val));
245  return true;
246  } else if (conjunct instanceof CompoundPredicate) {
247  CompoundPredicate compoundPredicate = ((CompoundPredicate) conjunct);
248  if (compoundPredicate.getOp() != CompoundPredicate.Operator.OR) return false;
249  if (!getDisjunctsHelper(conjunct.getChild(0), predicates)) return false;
250  if (!getDisjunctsHelper(conjunct.getChild(1), predicates)) return false;
251  return true;
252  } else {
253  return false;
254  }
255  }
256 
257  @Override
258  public void computeStats(Analyzer analyzer) {
259  super.computeStats(analyzer);
263  cardinality_ = Math.max(0, cardinality_);
264  cardinality_ = capAtLimit(cardinality_);
265 
266  LOG.debug("computeStats DataSourceScan: cardinality=" + Long.toString(cardinality_));
267 
268  numNodes_ = desc_.getTable().getNumNodes();
269  LOG.debug("computeStats DataSourceScan: #nodes=" + Integer.toString(numNodes_));
270  }
271 
272  @Override
273  protected String debugString() {
274  return Objects.toStringHelper(this)
275  .add("tid", desc_.getId().asInt())
276  .add("tblName", table_.getFullName())
277  .add("dataSource", DataSource.debugString(table_.getDataSource()))
278  .add("initString", table_.getInitString())
279  .addValue(super.debugString())
280  .toString();
281  }
282 
287  private void removeAcceptedConjuncts(List<Integer> acceptedPredicatesIdx,
288  List<Integer> conjunctsIdx) {
289  acceptedConjuncts_ = Lists.newArrayList();
290  // Because conjuncts_ is modified in place using positional indexes from
291  // conjunctsIdx, we remove the accepted predicates in reverse order.
292  for (int i = acceptedPredicatesIdx.size() - 1; i >= 0; --i) {
293  int acceptedPredIdx = acceptedPredicatesIdx.get(i);
294  int conjunctIdx = conjunctsIdx.get(acceptedPredIdx);
295  acceptedConjuncts_.add(conjuncts_.remove(conjunctIdx));
296  }
297  // Returns a view of the list in the original order as we will print these
298  // in the explain string and it's convenient to have predicates printed
299  // in the same order that they're specified.
300  acceptedConjuncts_ = Lists.reverse(acceptedConjuncts_);
301  }
302 
303  @Override
304  protected void toThrift(TPlanNode msg) {
305  Preconditions.checkNotNull(acceptedPredicates_);
306  msg.node_type = TPlanNodeType.DATA_SOURCE_NODE;
307  msg.data_source_node = new TDataSourceScanNode(desc_.getId().asInt(),
308  table_.getDataSource(), table_.getInitString(), acceptedPredicates_);
309  }
310 
314  private void computeScanRangeLocations(Analyzer analyzer) {
315  // TODO: Does the port matter?
316  TNetworkAddress networkAddress = addressToTNetworkAddress("localhost:12345");
317  Integer hostIndex = analyzer.getHostIndex().getIndex(networkAddress);
318  scanRanges_ = Lists.newArrayList(
319  new TScanRangeLocations(
320  new TScanRange(), Lists.newArrayList(new TScanRangeLocation(hostIndex))));
321  }
322 
323  @Override
324  public void computeCosts(TQueryOptions queryOptions) {
325  // TODO: What's a good estimate of memory consumption?
326  perHostMemCost_ = 1024L * 1024L * 1024L;
327  }
328 
333  public static long getPerHostMemUpperBound() {
334  // TODO: What's a good estimate of memory consumption?
335  return 1024L * 1024L * 1024L;
336  }
337 
338  @Override
339  protected String getNodeExplainString(String prefix, String detailPrefix,
340  TExplainLevel detailLevel) {
341  StringBuilder output = new StringBuilder();
342  String aliasStr = "";
343  if (!table_.getFullName().equalsIgnoreCase(desc_.getAlias()) &&
344  !table_.getName().equalsIgnoreCase(desc_.getAlias())) {
345  aliasStr = " " + desc_.getAlias();
346  }
347  output.append(String.format("%s%s:%s [%s%s]\n", prefix, id_.toString(),
348  displayName_, table_.getFullName(), aliasStr));
349 
350  if (!acceptedConjuncts_.isEmpty()) {
351  output.append(prefix + "data source predicates: " +
353  }
354  if (!conjuncts_.isEmpty()) {
355  output.append(prefix + "predicates: " + getExplainString(conjuncts_) + "\n");
356  }
357 
358  // Add table and column stats in verbose mode.
359  if (detailLevel == TExplainLevel.VERBOSE) {
360  output.append(getStatsExplainString(prefix, detailLevel));
361  output.append("\n");
362  }
363  return output.toString();
364  }
365 }
static TNetworkAddress addressToTNetworkAddress(String address)
Definition: ScanNode.java:132
List< TBinaryPredicate > getDisjuncts(Expr conjunct)
void assignConjuncts(Analyzer analyzer)
Definition: PlanNode.java:401
String getStatsExplainString(String prefix, TExplainLevel detailLevel)
Definition: ScanNode.java:75
void removeAcceptedConjuncts(List< Integer > acceptedPredicatesIdx, List< Integer > conjunctsIdx)
String getNodeExplainString(String prefix, String detailPrefix, TExplainLevel detailLevel)
static TColumnValue literalToColumnValue(LiteralExpr expr)
boolean getDisjunctsHelper(Expr conjunct, List< TBinaryPredicate > predicates)
List< List< TBinaryPredicate > > acceptedPredicates_
void computeMemLayout(Analyzer analyzer)
Definition: PlanNode.java:475
long capAtLimit(long cardinality)
Definition: PlanNode.java:450
List< TScanRangeLocations > scanRanges_
Definition: ScanNode.java:42
DataSourceScanNode(PlanNodeId id, TupleDescriptor desc)