15 package com.cloudera.impala.planner;
17 import java.util.ArrayList;
18 import java.util.HashMap;
19 import java.util.HashSet;
20 import java.util.Iterator;
21 import java.util.List;
22 import java.util.NavigableMap;
23 import java.util.TreeMap;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
50 import com.cloudera.impala.thrift.TExplainLevel;
51 import com.cloudera.impala.thrift.THdfsFileBlock;
52 import com.cloudera.impala.thrift.THdfsFileSplit;
53 import com.cloudera.impala.thrift.THdfsScanNode;
54 import com.cloudera.impala.thrift.TNetworkAddress;
55 import com.cloudera.impala.thrift.TPlanNode;
56 import com.cloudera.impala.thrift.TPlanNodeType;
57 import com.cloudera.impala.thrift.TQueryOptions;
58 import com.cloudera.impala.thrift.TScanRange;
59 import com.cloudera.impala.thrift.TScanRangeLocation;
60 import com.cloudera.impala.thrift.TScanRangeLocations;
61 import com.google.common.base.Objects;
62 import com.google.common.base.Objects.ToStringHelper;
63 import com.google.common.base.Preconditions;
64 import com.google.common.base.Predicates;
65 import com.google.common.collect.Lists;
66 import com.google.common.collect.Sets;
73 private final static Logger
LOG = LoggerFactory.getLogger(HdfsScanNode.class);
95 private final ArrayList<HdfsPartition>
partitions_ = Lists.newArrayList();
107 super(
id, desc,
"SCAN HDFS");
113 ToStringHelper helper = Objects.toStringHelper(
this);
115 helper.add(
"Partition " + partition.getId() +
":", partition.toString());
117 return helper.addValue(super.debugString()).toString();
125 ArrayList<Expr> bindingPredicates = analyzer.getBoundPredicates(tupleIds_.get(0));
126 conjuncts_.addAll(bindingPredicates);
131 analyzer.createEquivConjuncts(tupleIds_.get(0),
conjuncts_);
156 long maxScanRangeLength = analyzer.getQueryCtx().getRequest().getQuery_options()
157 .getMax_scan_range_length();
160 Preconditions.checkState(partition.getId() >= 0);
162 for (THdfsFileBlock thriftBlock: fileDesc.getFileBlocks()) {
163 HdfsPartition.FileBlock block = FileBlock.fromThrift(thriftBlock);
164 List<Integer> replicaHostIdxs = block.getReplicaHostIdxs();
165 if (replicaHostIdxs.size() == 0) {
171 List<TScanRangeLocation> locations = Lists.newArrayList();
172 for (
int i = 0; i < replicaHostIdxs.size(); ++i) {
173 TScanRangeLocation location =
new TScanRangeLocation();
175 Integer tableHostIdx = replicaHostIdxs.get(i);
176 TNetworkAddress networkAddress =
177 partition.getTable().getHostIndex().getEntry(tableHostIdx);
178 Preconditions.checkNotNull(networkAddress);
180 Integer globalHostIdx = analyzer.getHostIndex().getIndex(networkAddress);
181 location.setHost_idx(globalHostIdx);
182 location.setVolume_id(block.getDiskId(i));
183 location.setIs_cached(block.isCached(i));
184 locations.add(location);
187 long currentOffset = block.getOffset();
188 long remainingLength = block.getLength();
189 while (remainingLength > 0) {
190 long currentLength = remainingLength;
191 if (maxScanRangeLength > 0 && remainingLength > maxScanRangeLength) {
192 currentLength = maxScanRangeLength;
194 TScanRange scanRange =
new TScanRange();
195 scanRange.setHdfs_file_split(
new THdfsFileSplit(
196 fileDesc.getFileName(), currentOffset, currentLength, partition.getId(),
197 fileDesc.getFileLength(), fileDesc.getFileCompression(),
198 fileDesc.getModificationTime()));
199 TScanRangeLocations scanRangeLocations =
new TScanRangeLocations();
200 scanRangeLocations.scan_range = scanRange;
201 scanRangeLocations.locations = locations;
202 scanRanges_.add(scanRangeLocations);
203 remainingLength -= currentLength;
204 currentOffset += currentLength;
218 Preconditions.checkNotNull(expr);
222 expr.foldConstantChildren(analyzer);
224 LOG.error(
"Error evaluating constant expressions in the BE: " + e.getMessage());
227 BinaryPredicate bp = (BinaryPredicate)expr;
228 SlotRef slot = bp.getBoundSlot();
229 if (slot == null)
return false;
230 Expr bindingExpr = bp.getSlotBinding(slot.getSlotId());
231 if (bindingExpr == null || !bindingExpr.
isLiteral())
return false;
235 if (expr.getChild(1) != null) {
241 IsNullPredicate nullPredicate = (IsNullPredicate)expr;
242 return nullPredicate.getBoundSlot() != null;
246 expr.foldConstantChildren(analyzer);
248 LOG.error(
"Error evaluating constant expressions in the BE: " + e.getMessage());
252 SlotRef slot = ((InPredicate)expr).getBoundSlot();
253 if (slot == null)
return false;
254 for (
int i = 1; i < expr.getChildren().size(); ++i) {
255 if (!(expr.getChild(i).isLiteral()))
return false;
268 Preconditions.checkNotNull(expr);
270 boolean isSlotOnLeft =
true;
271 if (expr.getChild(0).isLiteral()) isSlotOnLeft =
false;
275 SlotRef slot = bp.getBoundSlot();
276 Preconditions.checkNotNull(slot);
277 Expr bindingExpr = bp.getSlotBinding(slot.getSlotId());
278 Preconditions.checkNotNull(bindingExpr);
279 Preconditions.checkState(bindingExpr.isLiteral());
281 if (literal instanceof
NullLiteral)
return Sets.newHashSet();
285 int partitionPos = slot.getDesc().getColumn().getPosition();
286 TreeMap<LiteralExpr, HashSet<Long>> partitionValueMap =
287 tbl_.getPartitionValueMap(partitionPos);
288 if (partitionValueMap.isEmpty())
return Sets.newHashSet();
290 HashSet<Long> matchingIds = Sets.newHashSet();
295 HashSet<Long> ids = partitionValueMap.get(literal);
296 if (ids != null) matchingIds.addAll(ids);
301 matchingIds.addAll(tbl_.getPartitionIds());
302 HashSet<Long> nullIds = tbl_.getNullPartitionIds(partitionPos);
303 matchingIds.removeAll(nullIds);
304 HashSet<Long> ids = partitionValueMap.get(literal);
305 if (ids != null) matchingIds.removeAll(ids);
310 NavigableMap<LiteralExpr, HashSet<Long>> rangeValueMap = null;
311 LiteralExpr firstKey = partitionValueMap.firstKey();
313 boolean upperInclusive =
false;
314 boolean lowerInclusive =
false;
321 if (literal.
compareTo(firstKey) < 0)
return Sets.newHashSet();
325 upperBoundKey = literal;
327 upperBoundKey = lastKey;
328 upperInclusive =
true;
330 lowerBoundKey = firstKey;
331 lowerInclusive =
true;
334 if (literal.
compareTo(lastKey) > 0)
return Sets.newHashSet();
338 lowerBoundKey = literal;
340 lowerBoundKey = firstKey;
341 lowerInclusive =
true;
343 upperBoundKey = lastKey;
344 upperInclusive =
true;
349 rangeValueMap = partitionValueMap.subMap(lowerBoundKey, lowerInclusive,
350 upperBoundKey, upperInclusive);
352 for (HashSet<Long> idSet: rangeValueMap.values()) {
353 if (idSet != null) matchingIds.addAll(idSet);
363 Preconditions.checkNotNull(expr);
364 Preconditions.checkState(expr instanceof
InPredicate);
366 HashSet<Long> matchingIds = Sets.newHashSet();
367 SlotRef slot = inPredicate.getBoundSlot();
368 Preconditions.checkNotNull(slot);
369 int partitionPos = slot.getDesc().getColumn().getPosition();
370 TreeMap<LiteralExpr, HashSet<Long>> partitionValueMap =
371 tbl_.getPartitionValueMap(partitionPos);
373 if (inPredicate.isNotIn()) {
376 List<Expr> nullLiterals = Lists.newArrayList();
377 inPredicate.collectAll(Predicates.instanceOf(NullLiteral.class), nullLiterals);
378 if (!nullLiterals.isEmpty())
return matchingIds;
379 matchingIds.addAll(tbl_.getPartitionIds());
381 HashSet<Long> nullIds = tbl_.getNullPartitionIds(partitionPos);
382 matchingIds.removeAll(nullIds);
385 for (
int i = 1; i < inPredicate.getChildren().size(); ++i) {
387 HashSet<Long> idSet = partitionValueMap.get(literal);
389 if (inPredicate.isNotIn()) {
390 matchingIds.removeAll(idSet);
392 matchingIds.addAll(idSet);
404 Preconditions.checkNotNull(expr);
406 HashSet<Long> matchingIds = Sets.newHashSet();
408 SlotRef slot = nullPredicate.getBoundSlot();
409 Preconditions.checkNotNull(slot);
410 int partitionPos = slot.getDesc().getColumn().getPosition();
411 HashSet<Long> nullPartitionIds = tbl_.getNullPartitionIds(partitionPos);
413 if (nullPredicate.isNotNull()) {
414 matchingIds.addAll(tbl_.getPartitionIds());
415 matchingIds.removeAll(nullPartitionIds);
417 matchingIds.addAll(nullPartitionIds);
430 Preconditions.checkNotNull(expr);
435 CompoundPredicate cp = (CompoundPredicate)expr;
437 Preconditions.checkState(cp.getOp() != CompoundPredicate.Operator.NOT);
438 if (cp.getOp() == CompoundPredicate.Operator.AND) {
440 leftChildIds.retainAll(rightChildIds);
441 }
else if (cp.getOp() == CompoundPredicate.Operator.OR) {
443 leftChildIds.addAll(rightChildIds);
462 List<SlotId> partitionSlots = Lists.newArrayList();
464 if (slotDesc.getColumn() == null)
continue;
466 partitionSlots.add(slotDesc.getId());
469 List<HdfsPartitionFilter> partitionFilters = Lists.newArrayList();
471 List<Expr> simpleFilterConjuncts = Lists.newArrayList();
478 Iterator<Expr> it = conjuncts_.iterator();
479 while (it.hasNext()) {
480 Expr conjunct = it.next();
487 Expr clonedConjunct = conjunct.clone();
489 simpleFilterConjuncts.add(Expr.pushNegationToOperands(clonedConjunct));
498 HashSet<Long> matchingPartitionIds = null;
502 for (
Expr filter: simpleFilterConjuncts) {
505 if (matchingPartitionIds == null) {
506 matchingPartitionIds = matchingIds;
508 matchingPartitionIds.retainAll(matchingIds);
513 if (simpleFilterConjuncts.size() == 0) {
514 Preconditions.checkState(matchingPartitionIds == null);
515 matchingPartitionIds = Sets.newHashSet(tbl_.getPartitionIds());
522 HashMap<Long, HdfsPartition> partitionMap = tbl_.getPartitionMap();
523 for (Long
id: matchingPartitionIds) {
525 Preconditions.checkNotNull(partition);
527 partitions_.add(partition);
528 descTbl.addReferencedPartition(
tbl_, partition.getId());
539 HashMap<Long, HdfsPartition> partitionMap = tbl_.getPartitionMap();
541 HashSet<Long> matchingIds = Sets.newHashSet();
543 ArrayList<HdfsPartition> partitionBatch = Lists.newArrayList();
547 for (Long
id: matchingPartitionIds) {
549 Preconditions.checkState(
550 p.getPartitionValues().size() == tbl_.getNumClusteringCols());
552 partitionBatch.add(partitionMap.get(id));
555 matchingIds.addAll(filter.getMatchingPartitionIds(partitionBatch, analyzer));
556 partitionBatch.clear();
560 if (!partitionBatch.isEmpty()) {
561 matchingIds.addAll(filter.getMatchingPartitionIds(partitionBatch, analyzer));
562 partitionBatch.clear();
565 matchingPartitionIds.retainAll(matchingIds);
575 super.computeStats(analyzer);
576 LOG.debug(
"collecting partitions for table " + tbl_.getName());
584 boolean hasValidPartitionCardinality =
false;
588 if (p.getNumRows() > -1) {
590 hasValidPartitionCardinality =
true;
598 if (!partitions_.isEmpty() && !hasValidPartitionCardinality) {
605 Preconditions.checkState(
cardinality_ >= 0 || cardinality_ == -1,
606 "Internal error: invalid scan node cardinality: " +
cardinality_);
607 if (cardinality_ > 0) {
608 LOG.debug(
"cardinality_=" + Long.toString(
cardinality_) +
613 LOG.debug(
"computeStats HdfsScan: cardinality_=" + Long.toString(
cardinality_));
618 numNodes_ = (cardinality_ == 0 || tbl_.getNumNodes() == 0) ? 1 : tbl_.getNumNodes();
619 LOG.debug(
"computeStats HdfsScan: #nodes=" + Integer.toString(
numNodes_));
625 msg.hdfs_scan_node =
new THdfsScanNode(
desc_.
getId().asInt());
626 msg.node_type = TPlanNodeType.HDFS_SCAN_NODE;
632 List<String>
path = Lists.newArrayList();
633 path.add(table.getDb().getName());
634 path.add(table.getName());
635 Preconditions.checkNotNull(desc_.getPath());
637 return desc_.getPath().toString() +
" " + desc_.getAlias();
639 return desc_.getPath().toString();
645 TExplainLevel detailLevel) {
646 StringBuilder output =
new StringBuilder();
650 if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal() &&
654 output.append(
"]\n");
655 if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
656 int numPartitions = partitions_.size();
658 output.append(String.format(
"%spartitions=%s/%s files=%s size=%s", detailPrefix,
659 numPartitions, table.getPartitions().size() - 1,
totalFiles_,
667 if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
671 return output.toString();
676 Preconditions.checkNotNull(
scanRanges_,
"Cost estimation requires scan ranges.");
677 if (scanRanges_.isEmpty()) {
687 }
else if (scanRanges_.size() <
numNodes_) {
690 adjNumNodes = scanRanges_.size();
693 Preconditions.checkNotNull(
desc_);
694 Preconditions.checkNotNull(desc_.getTable() instanceof
HdfsTable);
696 int perHostScanRanges;
700 perHostScanRanges = 0;
702 if (slot.getColumn() == null ||
703 slot.getColumn().getPosition() >= table.getNumClusteringCols()) {
708 perHostScanRanges = (int) Math.ceil((
715 int maxScannerThreads = Math.min(perHostScanRanges,
718 if (queryOptions.isSetNum_scanner_threads() &&
719 queryOptions.getNum_scanner_threads() > 0) {
721 Math.min(maxScannerThreads, queryOptions.getNum_scanner_threads());
724 long avgScanRangeBytes = (long) Math.ceil(
totalBytes_ / (
double) scanRanges_.size());
727 long perThreadIoBuffers =
734 if (perHostMemCost_ > perHostUpperBound) {
735 LOG.warn(String.format(
"Per-host mem cost %s exceeded per-host upper bound %s.",
738 perHostMemCost_ = perHostUpperBound;
static final long IO_MGR_BUFFER_SIZE
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
List< HdfsPartition.FileDescriptor > getFileDescriptors()
int compareTo(LiteralExpr other)
HashSet< Long > evalInPredicate(Expr expr)
void assignConjuncts(Analyzer analyzer)
HdfsFileFormat getMajorityFormat()
ArrayList< TupleId > tupleIds_
String getStatsExplainString(String prefix, TExplainLevel detailLevel)
void computeStats(Analyzer analyzer)
String getExplainString()
void computeScanRangeLocations(Analyzer analyzer)
static final int PARTITION_PRUNING_BATCH_SIZE
static final double SCAN_RANGE_SKEW_FACTOR
List< HdfsPartition > getPartitions()
int numPartitionsMissingStats_
static String printBytes(long bytes)
boolean hasFileDescriptors()
static final long MAX_IO_BUFFERS_PER_THREAD
void markSlotsMaterialized(Analyzer analyzer, List< Expr > exprs)
final TupleDescriptor desc_
void prunePartitions(Analyzer analyzer)
HashSet< Long > evalIsNullPredicate(Expr expr)
ArrayList< SlotDescriptor > getSlots()
String getNodeExplainString(String prefix, String detailPrefix, TExplainLevel detailLevel)
TupleDescriptor getTupleDesc(TupleId id)
boolean hasExplicitAlias()
void computeCosts(TQueryOptions queryOptions)
static final int THREADS_PER_CORE
HashSet< Long > evalBinaryPredicate(Expr expr)
boolean canEvalUsingPartitionMd(Expr expr, Analyzer analyzer)
void computeMemLayout(Analyzer analyzer)
String getDisplayLabelDetail()
HdfsScanNode(PlanNodeId id, TupleDescriptor desc, HdfsTable tbl)
void evalPartitionFiltersInBe(List< HdfsPartitionFilter > filters, HashSet< Long > matchingPartitionIds, Analyzer analyzer)
long capAtLimit(long cardinality)
static long getPerHostMemUpperBound()
HashSet< Long > evalSlotBindingFilter(Expr expr)
void init(Analyzer analyzer)
void toThrift(TPlanNode msg)
double computeSelectivity()
int getNumClusteringCols()
List< TScanRangeLocations > scanRanges_
boolean isBoundBySlotIds(List< SlotId > slotIds)
final ArrayList< HdfsPartition > partitions_
final String getDisplayLabel()
Set< ExprId > assignedConjuncts_
static long addCardinalities(long a, long b)