15 package com.cloudera.impala.planner;
 
   17 import java.util.ArrayList;
 
   18 import java.util.HashMap;
 
   19 import java.util.HashSet;
 
   20 import java.util.Iterator;
 
   21 import java.util.List;
 
   22 import java.util.NavigableMap;
 
   23 import java.util.TreeMap;
 
   25 import org.slf4j.Logger;
 
   26 import org.slf4j.LoggerFactory;
 
   50 import com.cloudera.impala.thrift.TExplainLevel;
 
   51 import com.cloudera.impala.thrift.THdfsFileBlock;
 
   52 import com.cloudera.impala.thrift.THdfsFileSplit;
 
   53 import com.cloudera.impala.thrift.THdfsScanNode;
 
   54 import com.cloudera.impala.thrift.TNetworkAddress;
 
   55 import com.cloudera.impala.thrift.TPlanNode;
 
   56 import com.cloudera.impala.thrift.TPlanNodeType;
 
   57 import com.cloudera.impala.thrift.TQueryOptions;
 
   58 import com.cloudera.impala.thrift.TScanRange;
 
   59 import com.cloudera.impala.thrift.TScanRangeLocation;
 
   60 import com.cloudera.impala.thrift.TScanRangeLocations;
 
   61 import com.google.common.base.Objects;
 
   62 import com.google.common.base.Objects.ToStringHelper;
 
   63 import com.google.common.base.Preconditions;
 
   64 import com.google.common.base.Predicates;
 
   65 import com.google.common.collect.Lists;
 
   66 import com.google.common.collect.Sets;
 
   73   private final static Logger 
LOG = LoggerFactory.getLogger(HdfsScanNode.class);
 
   95   private final ArrayList<HdfsPartition> 
partitions_ = Lists.newArrayList();
 
  107     super(
id, desc, 
"SCAN HDFS");
 
  113     ToStringHelper helper = Objects.toStringHelper(
this);
 
  115       helper.add(
"Partition " + partition.getId() + 
":", partition.toString());
 
  117     return helper.addValue(super.debugString()).toString();
 
  125     ArrayList<Expr> bindingPredicates = analyzer.getBoundPredicates(tupleIds_.get(0));
 
  126     conjuncts_.addAll(bindingPredicates);
 
  131     analyzer.createEquivConjuncts(tupleIds_.get(0), 
conjuncts_);
 
  156     long maxScanRangeLength = analyzer.getQueryCtx().getRequest().getQuery_options()
 
  157         .getMax_scan_range_length();
 
  160       Preconditions.checkState(partition.getId() >= 0);
 
  162         for (THdfsFileBlock thriftBlock: fileDesc.getFileBlocks()) {
 
  163           HdfsPartition.FileBlock block = FileBlock.fromThrift(thriftBlock);
 
  164           List<Integer> replicaHostIdxs = block.getReplicaHostIdxs();
 
  165           if (replicaHostIdxs.size() == 0) {
 
  171           List<TScanRangeLocation> locations = Lists.newArrayList();
 
  172           for (
int i = 0; i < replicaHostIdxs.size(); ++i) {
 
  173             TScanRangeLocation location = 
new TScanRangeLocation();
 
  175             Integer tableHostIdx = replicaHostIdxs.get(i);
 
  176             TNetworkAddress networkAddress =
 
  177                 partition.getTable().getHostIndex().getEntry(tableHostIdx);
 
  178             Preconditions.checkNotNull(networkAddress);
 
  180             Integer globalHostIdx = analyzer.getHostIndex().getIndex(networkAddress);
 
  181             location.setHost_idx(globalHostIdx);
 
  182             location.setVolume_id(block.getDiskId(i));
 
  183             location.setIs_cached(block.isCached(i));
 
  184             locations.add(location);
 
  187           long currentOffset = block.getOffset();
 
  188           long remainingLength = block.getLength();
 
  189           while (remainingLength > 0) {
 
  190             long currentLength = remainingLength;
 
  191             if (maxScanRangeLength > 0 && remainingLength > maxScanRangeLength) {
 
  192               currentLength = maxScanRangeLength;
 
  194             TScanRange scanRange = 
new TScanRange();
 
  195             scanRange.setHdfs_file_split(
new THdfsFileSplit(
 
  196                 fileDesc.getFileName(), currentOffset, currentLength, partition.getId(),
 
  197                 fileDesc.getFileLength(), fileDesc.getFileCompression(),
 
  198                 fileDesc.getModificationTime()));
 
  199             TScanRangeLocations scanRangeLocations = 
new TScanRangeLocations();
 
  200             scanRangeLocations.scan_range = scanRange;
 
  201             scanRangeLocations.locations = locations;
 
  202             scanRanges_.add(scanRangeLocations);
 
  203             remainingLength -= currentLength;
 
  204             currentOffset += currentLength;
 
  218     Preconditions.checkNotNull(expr);
 
  222         expr.foldConstantChildren(analyzer);
 
  224         LOG.error(
"Error evaluating constant expressions in the BE: " + e.getMessage());
 
  227       BinaryPredicate bp = (BinaryPredicate)expr;
 
  228       SlotRef slot = bp.getBoundSlot();
 
  229       if (slot == null) 
return false;
 
  230       Expr bindingExpr = bp.getSlotBinding(slot.getSlotId());
 
  231       if (bindingExpr == null || !bindingExpr.
isLiteral()) 
return false;
 
  235       if (expr.getChild(1) != null) {
 
  241       IsNullPredicate nullPredicate = (IsNullPredicate)expr;
 
  242       return nullPredicate.getBoundSlot() != null;
 
  246         expr.foldConstantChildren(analyzer);
 
  248         LOG.error(
"Error evaluating constant expressions in the BE: " + e.getMessage());
 
  252       SlotRef slot = ((InPredicate)expr).getBoundSlot();
 
  253       if (slot == null) 
return false;
 
  254       for (
int i = 1; i < expr.getChildren().size(); ++i) {
 
  255         if (!(expr.getChild(i).isLiteral())) 
return false;
 
  268     Preconditions.checkNotNull(expr);
 
  270     boolean isSlotOnLeft = 
true;
 
  271     if (expr.getChild(0).isLiteral()) isSlotOnLeft = 
false;
 
  275     SlotRef slot = bp.getBoundSlot();
 
  276     Preconditions.checkNotNull(slot);
 
  277     Expr bindingExpr = bp.getSlotBinding(slot.getSlotId());
 
  278     Preconditions.checkNotNull(bindingExpr);
 
  279     Preconditions.checkState(bindingExpr.isLiteral());
 
  281     if (literal instanceof 
NullLiteral) 
return Sets.newHashSet();
 
  285     int partitionPos = slot.getDesc().getColumn().getPosition();
 
  286     TreeMap<LiteralExpr, HashSet<Long>> partitionValueMap =
 
  287         tbl_.getPartitionValueMap(partitionPos);
 
  288     if (partitionValueMap.isEmpty()) 
return Sets.newHashSet();
 
  290     HashSet<Long> matchingIds = Sets.newHashSet();
 
  295       HashSet<Long> ids = partitionValueMap.get(literal);
 
  296       if (ids != null) matchingIds.addAll(ids);
 
  301       matchingIds.addAll(tbl_.getPartitionIds());
 
  302       HashSet<Long> nullIds = tbl_.getNullPartitionIds(partitionPos);
 
  303       matchingIds.removeAll(nullIds);
 
  304       HashSet<Long> ids = partitionValueMap.get(literal);
 
  305       if (ids != null) matchingIds.removeAll(ids);
 
  310     NavigableMap<LiteralExpr, HashSet<Long>> rangeValueMap = null;
 
  311     LiteralExpr firstKey = partitionValueMap.firstKey();
 
  313     boolean upperInclusive = 
false;
 
  314     boolean lowerInclusive = 
false;
 
  321       if (literal.
compareTo(firstKey) < 0) 
return Sets.newHashSet();
 
  325         upperBoundKey = literal;
 
  327         upperBoundKey = lastKey;
 
  328         upperInclusive = 
true;
 
  330       lowerBoundKey = firstKey;
 
  331       lowerInclusive = 
true;
 
  334       if (literal.
compareTo(lastKey) > 0) 
return Sets.newHashSet();
 
  338         lowerBoundKey = literal;
 
  340         lowerBoundKey = firstKey;
 
  341         lowerInclusive = 
true;
 
  343       upperBoundKey = lastKey;
 
  344       upperInclusive = 
true;
 
  349     rangeValueMap = partitionValueMap.subMap(lowerBoundKey, lowerInclusive,
 
  350         upperBoundKey, upperInclusive);
 
  352     for (HashSet<Long> idSet: rangeValueMap.values()) {
 
  353       if (idSet != null) matchingIds.addAll(idSet);
 
  363     Preconditions.checkNotNull(expr);
 
  364     Preconditions.checkState(expr instanceof 
InPredicate);
 
  366     HashSet<Long> matchingIds = Sets.newHashSet();
 
  367     SlotRef slot = inPredicate.getBoundSlot();
 
  368     Preconditions.checkNotNull(slot);
 
  369     int partitionPos = slot.getDesc().getColumn().getPosition();
 
  370     TreeMap<LiteralExpr, HashSet<Long>> partitionValueMap =
 
  371         tbl_.getPartitionValueMap(partitionPos);
 
  373     if (inPredicate.isNotIn()) {
 
  376       List<Expr> nullLiterals = Lists.newArrayList();
 
  377       inPredicate.collectAll(Predicates.instanceOf(NullLiteral.class), nullLiterals);
 
  378       if (!nullLiterals.isEmpty()) 
return matchingIds;
 
  379       matchingIds.addAll(tbl_.getPartitionIds());
 
  381       HashSet<Long> nullIds = tbl_.getNullPartitionIds(partitionPos);
 
  382       matchingIds.removeAll(nullIds);
 
  385     for (
int i = 1; i < inPredicate.getChildren().size(); ++i) {
 
  387       HashSet<Long> idSet = partitionValueMap.get(literal);
 
  389         if (inPredicate.isNotIn()) {
 
  390           matchingIds.removeAll(idSet);
 
  392           matchingIds.addAll(idSet);
 
  404     Preconditions.checkNotNull(expr);
 
  406     HashSet<Long> matchingIds = Sets.newHashSet();
 
  408     SlotRef slot = nullPredicate.getBoundSlot();
 
  409     Preconditions.checkNotNull(slot);
 
  410     int partitionPos = slot.getDesc().getColumn().getPosition();
 
  411     HashSet<Long> nullPartitionIds = tbl_.getNullPartitionIds(partitionPos);
 
  413     if (nullPredicate.isNotNull()) {
 
  414       matchingIds.addAll(tbl_.getPartitionIds());
 
  415       matchingIds.removeAll(nullPartitionIds);
 
  417       matchingIds.addAll(nullPartitionIds);
 
  430     Preconditions.checkNotNull(expr);
 
  435       CompoundPredicate cp = (CompoundPredicate)expr;
 
  437       Preconditions.checkState(cp.getOp() != CompoundPredicate.Operator.NOT);
 
  438       if (cp.getOp() == CompoundPredicate.Operator.AND) {
 
  440         leftChildIds.retainAll(rightChildIds);
 
  441       } 
else if (cp.getOp() == CompoundPredicate.Operator.OR) {
 
  443         leftChildIds.addAll(rightChildIds);
 
  462     List<SlotId> partitionSlots = Lists.newArrayList();
 
  464       if (slotDesc.getColumn() == null) 
continue;
 
  466         partitionSlots.add(slotDesc.getId());
 
  469     List<HdfsPartitionFilter> partitionFilters = Lists.newArrayList();
 
  471     List<Expr> simpleFilterConjuncts = Lists.newArrayList();
 
  478     Iterator<Expr> it = conjuncts_.iterator();
 
  479     while (it.hasNext()) {
 
  480       Expr conjunct = it.next();
 
  487         Expr clonedConjunct = conjunct.clone();
 
  489           simpleFilterConjuncts.add(Expr.pushNegationToOperands(clonedConjunct));
 
  498     HashSet<Long> matchingPartitionIds = null;
 
  502     for (
Expr filter: simpleFilterConjuncts) {
 
  505       if (matchingPartitionIds == null) {
 
  506         matchingPartitionIds = matchingIds;
 
  508         matchingPartitionIds.retainAll(matchingIds);
 
  513     if (simpleFilterConjuncts.size() == 0) {
 
  514       Preconditions.checkState(matchingPartitionIds == null);
 
  515       matchingPartitionIds = Sets.newHashSet(tbl_.getPartitionIds());
 
  522     HashMap<Long, HdfsPartition> partitionMap = tbl_.getPartitionMap();
 
  523     for (Long 
id: matchingPartitionIds) {
 
  525       Preconditions.checkNotNull(partition);
 
  527         partitions_.add(partition);
 
  528         descTbl.addReferencedPartition(
tbl_, partition.getId());
 
  539     HashMap<Long, HdfsPartition> partitionMap = tbl_.getPartitionMap();
 
  541     HashSet<Long> matchingIds = Sets.newHashSet();
 
  543     ArrayList<HdfsPartition> partitionBatch = Lists.newArrayList();
 
  547       for (Long 
id: matchingPartitionIds) {
 
  549         Preconditions.checkState(
 
  550             p.getPartitionValues().size() == tbl_.getNumClusteringCols());
 
  552         partitionBatch.add(partitionMap.get(id));
 
  555           matchingIds.addAll(filter.getMatchingPartitionIds(partitionBatch, analyzer));
 
  556           partitionBatch.clear();
 
  560       if (!partitionBatch.isEmpty()) {
 
  561         matchingIds.addAll(filter.getMatchingPartitionIds(partitionBatch, analyzer));
 
  562         partitionBatch.clear();
 
  565       matchingPartitionIds.retainAll(matchingIds);
 
  575     super.computeStats(analyzer);
 
  576     LOG.debug(
"collecting partitions for table " + tbl_.getName());
 
  584       boolean hasValidPartitionCardinality = 
false;
 
  588         if (p.getNumRows() > -1) {
 
  590           hasValidPartitionCardinality = 
true;
 
  598       if (!partitions_.isEmpty() && !hasValidPartitionCardinality) {
 
  605     Preconditions.checkState(
cardinality_ >= 0 || cardinality_ == -1,
 
  606         "Internal error: invalid scan node cardinality: " + 
cardinality_);
 
  607     if (cardinality_ > 0) {
 
  608       LOG.debug(
"cardinality_=" + Long.toString(
cardinality_) +
 
  613     LOG.debug(
"computeStats HdfsScan: cardinality_=" + Long.toString(
cardinality_));
 
  618     numNodes_ = (cardinality_ == 0 || tbl_.getNumNodes() == 0) ? 1 : tbl_.getNumNodes();
 
  619     LOG.debug(
"computeStats HdfsScan: #nodes=" + Integer.toString(
numNodes_));
 
  625     msg.hdfs_scan_node = 
new THdfsScanNode(
desc_.
getId().asInt());
 
  626     msg.node_type = TPlanNodeType.HDFS_SCAN_NODE;
 
  632     List<String> 
path = Lists.newArrayList();
 
  633     path.add(table.getDb().getName());
 
  634     path.add(table.getName());
 
  635     Preconditions.checkNotNull(desc_.getPath());
 
  637       return desc_.getPath().toString() + 
" " + desc_.getAlias();
 
  639       return desc_.getPath().toString();
 
  645       TExplainLevel detailLevel) {
 
  646     StringBuilder output = 
new StringBuilder();
 
  650     if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal() &&
 
  654     output.append(
"]\n");
 
  655     if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
 
  656       int numPartitions = partitions_.size();
 
  658       output.append(String.format(
"%spartitions=%s/%s files=%s size=%s", detailPrefix,
 
  659           numPartitions, table.getPartitions().size() - 1, 
totalFiles_,
 
  667     if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
 
  671     return output.toString();
 
  676     Preconditions.checkNotNull(
scanRanges_, 
"Cost estimation requires scan ranges.");
 
  677     if (scanRanges_.isEmpty()) {
 
  687     } 
else if (scanRanges_.size() < 
numNodes_) {
 
  690       adjNumNodes = scanRanges_.size();
 
  693     Preconditions.checkNotNull(
desc_);
 
  694     Preconditions.checkNotNull(desc_.getTable() instanceof 
HdfsTable);
 
  696     int perHostScanRanges;
 
  700       perHostScanRanges = 0;
 
  702         if (slot.getColumn() == null ||
 
  703             slot.getColumn().getPosition() >= table.getNumClusteringCols()) {
 
  708       perHostScanRanges = (int) Math.ceil((
 
  715     int maxScannerThreads = Math.min(perHostScanRanges,
 
  718     if (queryOptions.isSetNum_scanner_threads() &&
 
  719         queryOptions.getNum_scanner_threads() > 0) {
 
  721           Math.min(maxScannerThreads, queryOptions.getNum_scanner_threads());
 
  724     long avgScanRangeBytes = (long) Math.ceil(
totalBytes_ / (
double) scanRanges_.size());
 
  727     long perThreadIoBuffers =
 
  734     if (perHostMemCost_ > perHostUpperBound) {
 
  735       LOG.warn(String.format(
"Per-host mem cost %s exceeded per-host upper bound %s.",
 
  738       perHostMemCost_ = perHostUpperBound;
 
static final long IO_MGR_BUFFER_SIZE
 
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
 
List< HdfsPartition.FileDescriptor > getFileDescriptors()
 
int compareTo(LiteralExpr other)
 
HashSet< Long > evalInPredicate(Expr expr)
 
void assignConjuncts(Analyzer analyzer)
 
HdfsFileFormat getMajorityFormat()
 
ArrayList< TupleId > tupleIds_
 
String getStatsExplainString(String prefix, TExplainLevel detailLevel)
 
void computeStats(Analyzer analyzer)
 
String getExplainString()
 
void computeScanRangeLocations(Analyzer analyzer)
 
static final int PARTITION_PRUNING_BATCH_SIZE
 
static final double SCAN_RANGE_SKEW_FACTOR
 
List< HdfsPartition > getPartitions()
 
int numPartitionsMissingStats_
 
static String printBytes(long bytes)
 
boolean hasFileDescriptors()
 
static final long MAX_IO_BUFFERS_PER_THREAD
 
void markSlotsMaterialized(Analyzer analyzer, List< Expr > exprs)
 
final TupleDescriptor desc_
 
void prunePartitions(Analyzer analyzer)
 
HashSet< Long > evalIsNullPredicate(Expr expr)
 
ArrayList< SlotDescriptor > getSlots()
 
String getNodeExplainString(String prefix, String detailPrefix, TExplainLevel detailLevel)
 
TupleDescriptor getTupleDesc(TupleId id)
 
boolean hasExplicitAlias()
 
void computeCosts(TQueryOptions queryOptions)
 
static final int THREADS_PER_CORE
 
HashSet< Long > evalBinaryPredicate(Expr expr)
 
boolean canEvalUsingPartitionMd(Expr expr, Analyzer analyzer)
 
void computeMemLayout(Analyzer analyzer)
 
String getDisplayLabelDetail()
 
HdfsScanNode(PlanNodeId id, TupleDescriptor desc, HdfsTable tbl)
 
void evalPartitionFiltersInBe(List< HdfsPartitionFilter > filters, HashSet< Long > matchingPartitionIds, Analyzer analyzer)
 
long capAtLimit(long cardinality)
 
static long getPerHostMemUpperBound()
 
HashSet< Long > evalSlotBindingFilter(Expr expr)
 
void init(Analyzer analyzer)
 
void toThrift(TPlanNode msg)
 
double computeSelectivity()
 
int getNumClusteringCols()
 
List< TScanRangeLocations > scanRanges_
 
boolean isBoundBySlotIds(List< SlotId > slotIds)
 
final ArrayList< HdfsPartition > partitions_
 
final String getDisplayLabel()
 
Set< ExprId > assignedConjuncts_
 
static long addCardinalities(long a, long b)