15 package com.cloudera.impala.planner;
17 import java.util.List;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
36 import com.cloudera.impala.extdatasource.thrift.TBinaryPredicate;
37 import com.cloudera.impala.extdatasource.thrift.TColumnDesc;
38 import com.cloudera.impala.extdatasource.thrift.TComparisonOp;
39 import com.cloudera.impala.extdatasource.thrift.TPrepareParams;
40 import com.cloudera.impala.extdatasource.thrift.TPrepareResult;
42 import com.cloudera.impala.thrift.TCacheJarResult;
43 import com.cloudera.impala.thrift.TColumnValue;
44 import com.cloudera.impala.thrift.TDataSourceScanNode;
45 import com.cloudera.impala.thrift.TErrorCode;
46 import com.cloudera.impala.thrift.TExplainLevel;
47 import com.cloudera.impala.thrift.TNetworkAddress;
48 import com.cloudera.impala.thrift.TPlanNode;
49 import com.cloudera.impala.thrift.TPlanNodeType;
50 import com.cloudera.impala.thrift.TQueryOptions;
51 import com.cloudera.impala.thrift.TScanRange;
52 import com.cloudera.impala.thrift.TScanRangeLocation;
53 import com.cloudera.impala.thrift.TScanRangeLocations;
54 import com.cloudera.impala.thrift.TStatus;
55 import com.google.common.base.Joiner;
56 import com.google.common.base.Objects;
57 import com.google.common.base.Preconditions;
58 import com.google.common.collect.ImmutableList;
59 import com.google.common.collect.Lists;
65 private final static Logger
LOG = LoggerFactory.getLogger(DataSourceScanNode.class);
83 super(
id, desc,
"SCAN DATA SOURCE");
93 analyzer.createEquivConjuncts(tupleIds_.get(0),
conjuncts_);
107 switch (expr.
getType().getPrimitiveType()) {
109 return new TColumnValue().setBool_val(((
BoolLiteral) expr).getValue());
111 return new TColumnValue().setByte_val(
114 return new TColumnValue().setShort_val(
117 return new TColumnValue().setInt_val(
120 return new TColumnValue().setLong_val(((
NumericLiteral) expr).getLongValue());
123 return new TColumnValue().setDouble_val(
126 return new TColumnValue().setString_val(((
StringLiteral) expr).getValue());
134 Preconditions.checkState(
false);
146 List<List<TBinaryPredicate>> offeredPredicates = Lists.newArrayList();
148 List<Integer> conjunctsIdx = Lists.newArrayList();
149 for (
int i = 0; i < conjuncts_.size(); ++i) {
150 Expr conjunct = conjuncts_.get(i);
151 List<TBinaryPredicate> disjuncts =
getDisjuncts(conjunct);
152 if (disjuncts != null) {
153 offeredPredicates.add(disjuncts);
158 String hdfsLocation = table_.getDataSource().getHdfs_location();
159 TCacheJarResult cacheResult = FeSupport.CacheJar(hdfsLocation);
160 TStatus cacheJarStatus = cacheResult.getStatus();
162 throw new InternalException(String.format(
163 "Unable to cache data source library at location '%s'. Check that the file " +
164 "exists and is readable. Message: %s",
165 hdfsLocation, Joiner.on(
"\n").join(cacheJarStatus.getError_msgs())));
167 String localPath = cacheResult.getLocal_path();
168 String className = table_.getDataSource().getClass_name();
169 String apiVersion = table_.getDataSource().getApi_version();
170 TPrepareResult prepareResult;
171 TStatus prepareStatus;
174 localPath, className, apiVersion);
175 TPrepareParams prepareParams =
new TPrepareParams();
176 prepareParams.setInit_string(table_.getInitString());
177 prepareParams.setPredicates(offeredPredicates);
179 prepareParams.setTable_name(table_.getName());
180 prepareResult = executor.prepare(prepareParams);
181 prepareStatus = prepareResult.getStatus();
182 }
catch (Exception e) {
183 throw new InternalException(String.format(
184 "Error calling prepare() on data source %s",
188 throw new InternalException(String.format(
189 "Data source %s returned an error from prepare(): %s",
191 Joiner.on(
"\n").join(prepareStatus.getError_msgs())));
196 List<Integer> acceptedPredicatesIdx = prepareResult.isSetAccepted_conjuncts() ?
197 prepareResult.getAccepted_conjuncts() : ImmutableList.<Integer>of();
198 for (Integer acceptedIdx: acceptedPredicatesIdx) {
199 acceptedPredicates_.add(offeredPredicates.get(acceptedIdx));
211 List<TBinaryPredicate> disjuncts = Lists.newArrayList();
218 List<TBinaryPredicate> predicates) {
220 if (conjunct.getChildren().size() != 2)
return false;
223 TComparisonOp op = null;
224 if ((conjunct.getChild(0).unwrapSlotRef(
true) instanceof
SlotRef) &&
226 slotRef = (SlotRef) conjunct.getChild(0).unwrapSlotRef(
true);
227 literalExpr = (LiteralExpr) conjunct.getChild(1);
228 op = ((BinaryPredicate) conjunct).getOp().getThriftOp();
229 }
else if ((conjunct.getChild(1).unwrapSlotRef(
true) instanceof SlotRef) &&
230 (conjunct.getChild(0) instanceof LiteralExpr)) {
231 slotRef = (SlotRef) conjunct.getChild(1).unwrapSlotRef(
true);
232 literalExpr = (LiteralExpr) conjunct.getChild(0);
233 op = ((BinaryPredicate) conjunct).getOp().converse().getThriftOp();
239 if (val == null)
return false;
241 String colName = Joiner.on(
".").join(slotRef.
getResolvedPath().getRawPath());
242 TColumnDesc col =
new TColumnDesc().setName(colName).setType(
244 predicates.add(
new TBinaryPredicate().setCol(col).setOp(op).setValue(val));
247 CompoundPredicate compoundPredicate = ((CompoundPredicate) conjunct);
248 if (compoundPredicate.
getOp() != CompoundPredicate.Operator.OR)
return false;
259 super.computeStats(analyzer);
266 LOG.debug(
"computeStats DataSourceScan: cardinality=" + Long.toString(
cardinality_));
269 LOG.debug(
"computeStats DataSourceScan: #nodes=" + Integer.toString(
numNodes_));
274 return Objects.toStringHelper(
this)
276 .add(
"tblName", table_.getFullName())
277 .add(
"dataSource", DataSource.debugString(table_.getDataSource()))
279 .addValue(super.debugString())
288 List<Integer> conjunctsIdx) {
292 for (
int i = acceptedPredicatesIdx.size() - 1; i >= 0; --i) {
293 int acceptedPredIdx = acceptedPredicatesIdx.get(i);
294 int conjunctIdx = conjunctsIdx.get(acceptedPredIdx);
295 acceptedConjuncts_.add(conjuncts_.remove(conjunctIdx));
306 msg.node_type = TPlanNodeType.DATA_SOURCE_NODE;
307 msg.data_source_node =
new TDataSourceScanNode(
desc_.
getId().asInt(),
317 Integer hostIndex = analyzer.getHostIndex().getIndex(networkAddress);
319 new TScanRangeLocations(
320 new TScanRange(), Lists.newArrayList(
new TScanRangeLocation(hostIndex))));
335 return 1024L * 1024L * 1024L;
340 TExplainLevel detailLevel) {
341 StringBuilder output =
new StringBuilder();
342 String aliasStr =
"";
345 aliasStr =
" " + desc_.getAlias();
347 output.append(String.format(
"%s%s:%s [%s%s]\n", prefix, id_.toString(),
351 output.append(prefix +
"data source predicates: " +
359 if (detailLevel == TExplainLevel.VERBOSE) {
363 return output.toString();
static TNetworkAddress addressToTNetworkAddress(String address)
List< TBinaryPredicate > getDisjuncts(Expr conjunct)
void assignConjuncts(Analyzer analyzer)
TDataSource getDataSource()
void init(Analyzer analyzer)
String getStatsExplainString(String prefix, TExplainLevel detailLevel)
final DataSourceTable table_
List< Expr > acceptedConjuncts_
String getExplainString()
void computeCosts(TQueryOptions queryOptions)
void computeStats(Analyzer analyzer)
void removeAcceptedConjuncts(List< Integer > acceptedPredicatesIdx, List< Integer > conjunctsIdx)
String getNodeExplainString(String prefix, String detailPrefix, TExplainLevel detailLevel)
final TupleDescriptor desc_
void computeScanRangeLocations(Analyzer analyzer)
static TColumnValue literalToColumnValue(LiteralExpr expr)
boolean getDisjunctsHelper(Expr conjunct, List< TBinaryPredicate > predicates)
List< List< TBinaryPredicate > > acceptedPredicates_
void computeMemLayout(Analyzer analyzer)
long capAtLimit(long cardinality)
static long getPerHostMemUpperBound()
double computeSelectivity()
List< TScanRangeLocations > scanRanges_
DataSourceScanNode(PlanNodeId id, TupleDescriptor desc)
void toThrift(TPlanNode msg)