Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
HdfsScanNode.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.planner;
16 
17 import java.util.ArrayList;
18 import java.util.HashMap;
19 import java.util.HashSet;
20 import java.util.Iterator;
21 import java.util.List;
22 import java.util.NavigableMap;
23 import java.util.TreeMap;
24 
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 
50 import com.cloudera.impala.thrift.TExplainLevel;
51 import com.cloudera.impala.thrift.THdfsFileBlock;
52 import com.cloudera.impala.thrift.THdfsFileSplit;
53 import com.cloudera.impala.thrift.THdfsScanNode;
54 import com.cloudera.impala.thrift.TNetworkAddress;
55 import com.cloudera.impala.thrift.TPlanNode;
56 import com.cloudera.impala.thrift.TPlanNodeType;
57 import com.cloudera.impala.thrift.TQueryOptions;
58 import com.cloudera.impala.thrift.TScanRange;
59 import com.cloudera.impala.thrift.TScanRangeLocation;
60 import com.cloudera.impala.thrift.TScanRangeLocations;
61 import com.google.common.base.Objects;
62 import com.google.common.base.Objects.ToStringHelper;
63 import com.google.common.base.Preconditions;
64 import com.google.common.base.Predicates;
65 import com.google.common.collect.Lists;
66 import com.google.common.collect.Sets;
67 
72 public class HdfsScanNode extends ScanNode {
73  private final static Logger LOG = LoggerFactory.getLogger(HdfsScanNode.class);
74 
75  // Read size of the backend I/O manager. Used in computeCosts().
76  private final static long IO_MGR_BUFFER_SIZE = 8L * 1024L * 1024L;
77 
78  // Maximum number of I/O buffers per thread executing this scan.
79  private final static long MAX_IO_BUFFERS_PER_THREAD = 10;
80 
81  // Number of scanner threads per core executing this scan.
82  private final static int THREADS_PER_CORE = 3;
83 
84  // Factor capturing the worst-case deviation from a uniform distribution of scan ranges
85  // among nodes. The factor of 1.2 means that a particular node may have 20% more
86  // scan ranges than would have been estimated assuming a uniform distribution.
87  private final static double SCAN_RANGE_SKEW_FACTOR = 1.2;
88 
89  // Partition batch size used during partition pruning
90  private final static int PARTITION_PRUNING_BATCH_SIZE = 1024;
91 
92  private final HdfsTable tbl_;
93 
94  // Partitions that are filtered in for scanning by the key ranges
95  private final ArrayList<HdfsPartition> partitions_ = Lists.newArrayList();
96 
97  // Total number of files from partitions_
98  private long totalFiles_ = 0;
99 
100  // Total number of bytes from partitions_
101  private long totalBytes_ = 0;
102 
107  super(id, desc, "SCAN HDFS");
108  tbl_ = tbl;
109  }
110 
111  @Override
112  protected String debugString() {
113  ToStringHelper helper = Objects.toStringHelper(this);
114  for (HdfsPartition partition: partitions_) {
115  helper.add("Partition " + partition.getId() + ":", partition.toString());
116  }
117  return helper.addValue(super.debugString()).toString();
118  }
119 
123  @Override
124  public void init(Analyzer analyzer) throws InternalException {
125  ArrayList<Expr> bindingPredicates = analyzer.getBoundPredicates(tupleIds_.get(0));
126  conjuncts_.addAll(bindingPredicates);
127 
128  // also add remaining unassigned conjuncts
129  assignConjuncts(analyzer);
130 
131  analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_);
132 
133  // do partition pruning before deciding which slots to materialize,
134  // we might end up removing some predicates
135  prunePartitions(analyzer);
136 
137  // mark all slots referenced by the remaining conjuncts as materialized
139  computeMemLayout(analyzer);
140 
141  // do this at the end so it can take all conjuncts into account
142  computeStats(analyzer);
143 
144  // compute scan range locations
145  computeScanRangeLocations(analyzer);
146 
147  // TODO: do we need this?
148  assignedConjuncts_ = analyzer.getAssignedConjuncts();
149  }
150 
155  private void computeScanRangeLocations(Analyzer analyzer) {
156  long maxScanRangeLength = analyzer.getQueryCtx().getRequest().getQuery_options()
157  .getMax_scan_range_length();
158  scanRanges_ = Lists.newArrayList();
159  for (HdfsPartition partition: partitions_) {
160  Preconditions.checkState(partition.getId() >= 0);
161  for (HdfsPartition.FileDescriptor fileDesc: partition.getFileDescriptors()) {
162  for (THdfsFileBlock thriftBlock: fileDesc.getFileBlocks()) {
163  HdfsPartition.FileBlock block = FileBlock.fromThrift(thriftBlock);
164  List<Integer> replicaHostIdxs = block.getReplicaHostIdxs();
165  if (replicaHostIdxs.size() == 0) {
166  // we didn't get locations for this block; for now, just ignore the block
167  // TODO: do something meaningful with that
168  continue;
169  }
170  // Collect the network address and volume ID of all replicas of this block.
171  List<TScanRangeLocation> locations = Lists.newArrayList();
172  for (int i = 0; i < replicaHostIdxs.size(); ++i) {
173  TScanRangeLocation location = new TScanRangeLocation();
174  // Translate from the host index (local to the HdfsTable) to network address.
175  Integer tableHostIdx = replicaHostIdxs.get(i);
176  TNetworkAddress networkAddress =
177  partition.getTable().getHostIndex().getEntry(tableHostIdx);
178  Preconditions.checkNotNull(networkAddress);
179  // Translate from network address to the global (to this request) host index.
180  Integer globalHostIdx = analyzer.getHostIndex().getIndex(networkAddress);
181  location.setHost_idx(globalHostIdx);
182  location.setVolume_id(block.getDiskId(i));
183  location.setIs_cached(block.isCached(i));
184  locations.add(location);
185  }
186  // create scan ranges, taking into account maxScanRangeLength
187  long currentOffset = block.getOffset();
188  long remainingLength = block.getLength();
189  while (remainingLength > 0) {
190  long currentLength = remainingLength;
191  if (maxScanRangeLength > 0 && remainingLength > maxScanRangeLength) {
192  currentLength = maxScanRangeLength;
193  }
194  TScanRange scanRange = new TScanRange();
195  scanRange.setHdfs_file_split(new THdfsFileSplit(
196  fileDesc.getFileName(), currentOffset, currentLength, partition.getId(),
197  fileDesc.getFileLength(), fileDesc.getFileCompression(),
198  fileDesc.getModificationTime()));
199  TScanRangeLocations scanRangeLocations = new TScanRangeLocations();
200  scanRangeLocations.scan_range = scanRange;
201  scanRangeLocations.locations = locations;
202  scanRanges_.add(scanRangeLocations);
203  remainingLength -= currentLength;
204  currentOffset += currentLength;
205  }
206  }
207  }
208  }
209  }
210 
217  private boolean canEvalUsingPartitionMd(Expr expr, Analyzer analyzer) {
218  Preconditions.checkNotNull(expr);
219  if (expr instanceof BinaryPredicate) {
220  // Evaluate any constant expression in the BE
221  try {
222  expr.foldConstantChildren(analyzer);
223  } catch (AnalysisException e) {
224  LOG.error("Error evaluating constant expressions in the BE: " + e.getMessage());
225  return false;
226  }
227  BinaryPredicate bp = (BinaryPredicate)expr;
228  SlotRef slot = bp.getBoundSlot();
229  if (slot == null) return false;
230  Expr bindingExpr = bp.getSlotBinding(slot.getSlotId());
231  if (bindingExpr == null || !bindingExpr.isLiteral()) return false;
232  return true;
233  } else if (expr instanceof CompoundPredicate) {
234  boolean res = canEvalUsingPartitionMd(expr.getChild(0), analyzer);
235  if (expr.getChild(1) != null) {
236  res &= canEvalUsingPartitionMd(expr.getChild(1), analyzer);
237  }
238  return res;
239  } else if (expr instanceof IsNullPredicate) {
240  // Check for SlotRef IS [NOT] NULL case
241  IsNullPredicate nullPredicate = (IsNullPredicate)expr;
242  return nullPredicate.getBoundSlot() != null;
243  } else if (expr instanceof InPredicate) {
244  // Evaluate any constant expressions in the BE
245  try {
246  expr.foldConstantChildren(analyzer);
247  } catch (AnalysisException e) {
248  LOG.error("Error evaluating constant expressions in the BE: " + e.getMessage());
249  return false;
250  }
251  // Check for SlotRef [NOT] IN (Literal, ... Literal) case
252  SlotRef slot = ((InPredicate)expr).getBoundSlot();
253  if (slot == null) return false;
254  for (int i = 1; i < expr.getChildren().size(); ++i) {
255  if (!(expr.getChild(i).isLiteral())) return false;
256  }
257  return true;
258  }
259  return false;
260  }
261 
267  private HashSet<Long> evalBinaryPredicate(Expr expr) {
268  Preconditions.checkNotNull(expr);
269  Preconditions.checkState(expr instanceof BinaryPredicate);
270  boolean isSlotOnLeft = true;
271  if (expr.getChild(0).isLiteral()) isSlotOnLeft = false;
272 
273  // Get the operands
274  BinaryPredicate bp = (BinaryPredicate)expr;
275  SlotRef slot = bp.getBoundSlot();
276  Preconditions.checkNotNull(slot);
277  Expr bindingExpr = bp.getSlotBinding(slot.getSlotId());
278  Preconditions.checkNotNull(bindingExpr);
279  Preconditions.checkState(bindingExpr.isLiteral());
280  LiteralExpr literal = (LiteralExpr)bindingExpr;
281  if (literal instanceof NullLiteral) return Sets.newHashSet();
282 
283  // Get the partition column position and retrieve the associated partition
284  // value metadata.
285  int partitionPos = slot.getDesc().getColumn().getPosition();
286  TreeMap<LiteralExpr, HashSet<Long>> partitionValueMap =
287  tbl_.getPartitionValueMap(partitionPos);
288  if (partitionValueMap.isEmpty()) return Sets.newHashSet();
289 
290  HashSet<Long> matchingIds = Sets.newHashSet();
291  // Compute the matching partition ids
292  Operator op = bp.getOp();
293  if (op == Operator.EQ) {
294  // Case: SlotRef = Literal
295  HashSet<Long> ids = partitionValueMap.get(literal);
296  if (ids != null) matchingIds.addAll(ids);
297  return matchingIds;
298  }
299  if (op == Operator.NE) {
300  // Case: SlotRef != Literal
301  matchingIds.addAll(tbl_.getPartitionIds());
302  HashSet<Long> nullIds = tbl_.getNullPartitionIds(partitionPos);
303  matchingIds.removeAll(nullIds);
304  HashSet<Long> ids = partitionValueMap.get(literal);
305  if (ids != null) matchingIds.removeAll(ids);
306  return matchingIds;
307  }
308 
309  // Determine the partition key value range of this predicate.
310  NavigableMap<LiteralExpr, HashSet<Long>> rangeValueMap = null;
311  LiteralExpr firstKey = partitionValueMap.firstKey();
312  LiteralExpr lastKey = partitionValueMap.lastKey();
313  boolean upperInclusive = false;
314  boolean lowerInclusive = false;
315  LiteralExpr upperBoundKey = null;
316  LiteralExpr lowerBoundKey = null;
317 
318  if (((op == Operator.LE || op == Operator.LT) && isSlotOnLeft) ||
319  ((op == Operator.GE || op == Operator.GT) && !isSlotOnLeft)) {
320  // Case: SlotRef <[=] Literal
321  if (literal.compareTo(firstKey) < 0) return Sets.newHashSet();
322  if (op == Operator.LE || op == Operator.GE) upperInclusive = true;
323 
324  if (literal.compareTo(lastKey) <= 0) {
325  upperBoundKey = literal;
326  } else {
327  upperBoundKey = lastKey;
328  upperInclusive = true;
329  }
330  lowerBoundKey = firstKey;
331  lowerInclusive = true;
332  } else {
333  // Cases: SlotRef >[=] Literal
334  if (literal.compareTo(lastKey) > 0) return Sets.newHashSet();
335  if (op == Operator.GE || op == Operator.LE) lowerInclusive = true;
336 
337  if (literal.compareTo(firstKey) >= 0) {
338  lowerBoundKey = literal;
339  } else {
340  lowerBoundKey = firstKey;
341  lowerInclusive = true;
342  }
343  upperBoundKey = lastKey;
344  upperInclusive = true;
345  }
346 
347  // Retrieve the submap that corresponds to the computed partition key
348  // value range.
349  rangeValueMap = partitionValueMap.subMap(lowerBoundKey, lowerInclusive,
350  upperBoundKey, upperInclusive);
351  // Compute the matching partition ids
352  for (HashSet<Long> idSet: rangeValueMap.values()) {
353  if (idSet != null) matchingIds.addAll(idSet);
354  }
355  return matchingIds;
356  }
357 
362  private HashSet<Long> evalInPredicate(Expr expr) {
363  Preconditions.checkNotNull(expr);
364  Preconditions.checkState(expr instanceof InPredicate);
365  InPredicate inPredicate = (InPredicate)expr;
366  HashSet<Long> matchingIds = Sets.newHashSet();
367  SlotRef slot = inPredicate.getBoundSlot();
368  Preconditions.checkNotNull(slot);
369  int partitionPos = slot.getDesc().getColumn().getPosition();
370  TreeMap<LiteralExpr, HashSet<Long>> partitionValueMap =
371  tbl_.getPartitionValueMap(partitionPos);
372 
373  if (inPredicate.isNotIn()) {
374  // Case: SlotRef NOT IN (Literal, ..., Literal)
375  // If there is a NullLiteral, return an empty set.
376  List<Expr> nullLiterals = Lists.newArrayList();
377  inPredicate.collectAll(Predicates.instanceOf(NullLiteral.class), nullLiterals);
378  if (!nullLiterals.isEmpty()) return matchingIds;
379  matchingIds.addAll(tbl_.getPartitionIds());
380  // Exclude partitions with null partition column values
381  HashSet<Long> nullIds = tbl_.getNullPartitionIds(partitionPos);
382  matchingIds.removeAll(nullIds);
383  }
384  // Compute the matching partition ids
385  for (int i = 1; i < inPredicate.getChildren().size(); ++i) {
386  LiteralExpr literal = (LiteralExpr)inPredicate.getChild(i);
387  HashSet<Long> idSet = partitionValueMap.get(literal);
388  if (idSet != null) {
389  if (inPredicate.isNotIn()) {
390  matchingIds.removeAll(idSet);
391  } else {
392  matchingIds.addAll(idSet);
393  }
394  }
395  }
396  return matchingIds;
397  }
398 
403  private HashSet<Long> evalIsNullPredicate(Expr expr) {
404  Preconditions.checkNotNull(expr);
405  Preconditions.checkState(expr instanceof IsNullPredicate);
406  HashSet<Long> matchingIds = Sets.newHashSet();
407  IsNullPredicate nullPredicate = (IsNullPredicate)expr;
408  SlotRef slot = nullPredicate.getBoundSlot();
409  Preconditions.checkNotNull(slot);
410  int partitionPos = slot.getDesc().getColumn().getPosition();
411  HashSet<Long> nullPartitionIds = tbl_.getNullPartitionIds(partitionPos);
412 
413  if (nullPredicate.isNotNull()) {
414  matchingIds.addAll(tbl_.getPartitionIds());
415  matchingIds.removeAll(nullPartitionIds);
416  } else {
417  matchingIds.addAll(nullPartitionIds);
418  }
419  return matchingIds;
420  }
421 
429  private HashSet<Long> evalSlotBindingFilter(Expr expr) {
430  Preconditions.checkNotNull(expr);
431  if (expr instanceof BinaryPredicate) {
432  return evalBinaryPredicate(expr);
433  } else if (expr instanceof CompoundPredicate) {
434  HashSet<Long> leftChildIds = evalSlotBindingFilter(expr.getChild(0));
435  CompoundPredicate cp = (CompoundPredicate)expr;
436  // NOT operators have been eliminated
437  Preconditions.checkState(cp.getOp() != CompoundPredicate.Operator.NOT);
438  if (cp.getOp() == CompoundPredicate.Operator.AND) {
439  HashSet<Long> rightChildIds = evalSlotBindingFilter(expr.getChild(1));
440  leftChildIds.retainAll(rightChildIds);
441  } else if (cp.getOp() == CompoundPredicate.Operator.OR) {
442  HashSet<Long> rightChildIds = evalSlotBindingFilter(expr.getChild(1));
443  leftChildIds.addAll(rightChildIds);
444  }
445  return leftChildIds;
446  } else if (expr instanceof InPredicate) {
447  return evalInPredicate(expr);
448  } else if (expr instanceof IsNullPredicate) {
449  return evalIsNullPredicate(expr);
450  }
451  return null;
452  }
453 
458  private void prunePartitions(Analyzer analyzer) throws InternalException {
459  DescriptorTable descTbl = analyzer.getDescTbl();
460  // loop through all partitions and prune based on applicable conjuncts;
461  // start with creating a collection of partition filters for the applicable conjuncts
462  List<SlotId> partitionSlots = Lists.newArrayList();
463  for (SlotDescriptor slotDesc: descTbl.getTupleDesc(tupleIds_.get(0)).getSlots()) {
464  if (slotDesc.getColumn() == null) continue;
465  if (slotDesc.getColumn().getPosition() < tbl_.getNumClusteringCols()) {
466  partitionSlots.add(slotDesc.getId());
467  }
468  }
469  List<HdfsPartitionFilter> partitionFilters = Lists.newArrayList();
470  // Conjuncts that can be evaluated from the partition key values.
471  List<Expr> simpleFilterConjuncts = Lists.newArrayList();
472 
473  // Simple predicates (e.g. binary predicates of the form
474  // <SlotRef> <op> <LiteralExpr>) can be used to derive lists
475  // of matching partition ids directly from the partition key values.
476  // Split conjuncts among those that can be evaluated from partition
477  // key values and those that need to be evaluated in the BE.
478  Iterator<Expr> it = conjuncts_.iterator();
479  while (it.hasNext()) {
480  Expr conjunct = it.next();
481  if (conjunct.isBoundBySlotIds(partitionSlots)) {
482  // Check if the conjunct can be evaluated from the partition metadata.
483  // canEvalUsingPartitionMd() operates on a cloned conjunct which may get
484  // modified if it contains constant expressions. If the cloned conjunct
485  // cannot be evaluated from the partition metadata, the original unmodified
486  // conjuct is evaluated in the BE.
487  Expr clonedConjunct = conjunct.clone();
488  if (canEvalUsingPartitionMd(clonedConjunct, analyzer)) {
489  simpleFilterConjuncts.add(Expr.pushNegationToOperands(clonedConjunct));
490  } else {
491  partitionFilters.add(new HdfsPartitionFilter(conjunct, tbl_, analyzer));
492  }
493  it.remove();
494  }
495  }
496 
497  // Set of matching partition ids, i.e. partitions that pass all filters
498  HashSet<Long> matchingPartitionIds = null;
499 
500  // Evaluate the partition filters from the partition key values.
501  // The result is the intersection of the associated partition id sets.
502  for (Expr filter: simpleFilterConjuncts) {
503  // Evaluate the filter
504  HashSet<Long> matchingIds = evalSlotBindingFilter(filter);
505  if (matchingPartitionIds == null) {
506  matchingPartitionIds = matchingIds;
507  } else {
508  matchingPartitionIds.retainAll(matchingIds);
509  }
510  }
511 
512  // Check if we need to initialize the set of valid partition ids.
513  if (simpleFilterConjuncts.size() == 0) {
514  Preconditions.checkState(matchingPartitionIds == null);
515  matchingPartitionIds = Sets.newHashSet(tbl_.getPartitionIds());
516  }
517 
518  // Evaluate the 'complex' partition filters in the BE.
519  evalPartitionFiltersInBe(partitionFilters, matchingPartitionIds, analyzer);
520 
521  // Populate the list of valid, non-empty partitions to process
522  HashMap<Long, HdfsPartition> partitionMap = tbl_.getPartitionMap();
523  for (Long id: matchingPartitionIds) {
524  HdfsPartition partition = partitionMap.get(id);
525  Preconditions.checkNotNull(partition);
526  if (partition.hasFileDescriptors()) {
527  partitions_.add(partition);
528  descTbl.addReferencedPartition(tbl_, partition.getId());
529  }
530  }
531  }
532 
537  private void evalPartitionFiltersInBe(List<HdfsPartitionFilter> filters,
538  HashSet<Long> matchingPartitionIds, Analyzer analyzer) throws InternalException {
539  HashMap<Long, HdfsPartition> partitionMap = tbl_.getPartitionMap();
540  // Set of partition ids that pass a filter
541  HashSet<Long> matchingIds = Sets.newHashSet();
542  // Batch of partitions
543  ArrayList<HdfsPartition> partitionBatch = Lists.newArrayList();
544  // Identify the partitions that pass all filters.
545  for (HdfsPartitionFilter filter: filters) {
546  // Iterate through the currently valid partitions
547  for (Long id: matchingPartitionIds) {
548  HdfsPartition p = partitionMap.get(id);
549  Preconditions.checkState(
550  p.getPartitionValues().size() == tbl_.getNumClusteringCols());
551  // Add the partition to the current batch
552  partitionBatch.add(partitionMap.get(id));
553  if (partitionBatch.size() == PARTITION_PRUNING_BATCH_SIZE) {
554  // Batch is full. Evaluate the predicates of this batch in the BE.
555  matchingIds.addAll(filter.getMatchingPartitionIds(partitionBatch, analyzer));
556  partitionBatch.clear();
557  }
558  }
559  // Check if there are any unprocessed partitions.
560  if (!partitionBatch.isEmpty()) {
561  matchingIds.addAll(filter.getMatchingPartitionIds(partitionBatch, analyzer));
562  partitionBatch.clear();
563  }
564  // Prune the partitions ids that didn't pass the filter
565  matchingPartitionIds.retainAll(matchingIds);
566  matchingIds.clear();
567  }
568  }
569 
573  @Override
574  public void computeStats(Analyzer analyzer) {
575  super.computeStats(analyzer);
576  LOG.debug("collecting partitions for table " + tbl_.getName());
578  if (tbl_.getPartitions().isEmpty()) {
579  cardinality_ = tbl_.getNumRows();
580  } else {
581  cardinality_ = 0;
582  totalFiles_ = 0;
583  totalBytes_ = 0;
584  boolean hasValidPartitionCardinality = false;
585  for (HdfsPartition p: partitions_) {
586  // ignore partitions with missing stats in the hope they don't matter
587  // enough to change the planning outcome
588  if (p.getNumRows() > -1) {
589  cardinality_ = addCardinalities(cardinality_, p.getNumRows());
590  hasValidPartitionCardinality = true;
591  } else {
593  }
594  totalFiles_ += p.getFileDescriptors().size();
595  totalBytes_ += p.getSize();
596  }
597 
598  if (!partitions_.isEmpty() && !hasValidPartitionCardinality) {
599  // if none of the partitions knew its number of rows, we fall back on
600  // the table stats
601  cardinality_ = tbl_.getNumRows();
602  }
603  }
605  Preconditions.checkState(cardinality_ >= 0 || cardinality_ == -1,
606  "Internal error: invalid scan node cardinality: " + cardinality_);
607  if (cardinality_ > 0) {
608  LOG.debug("cardinality_=" + Long.toString(cardinality_) +
609  " sel=" + Double.toString(computeSelectivity()));
610  cardinality_ = Math.round((double) cardinality_ * computeSelectivity());
611  }
612  cardinality_ = capAtLimit(cardinality_);
613  LOG.debug("computeStats HdfsScan: cardinality_=" + Long.toString(cardinality_));
614 
615  // TODO: take actual partitions into account
616  // Tables can reside on 0 nodes (empty table), but a plan node must always be
617  // executed on at least one node.
618  numNodes_ = (cardinality_ == 0 || tbl_.getNumNodes() == 0) ? 1 : tbl_.getNumNodes();
619  LOG.debug("computeStats HdfsScan: #nodes=" + Integer.toString(numNodes_));
620  }
621 
622  @Override
623  protected void toThrift(TPlanNode msg) {
624  // TODO: retire this once the migration to the new plan is complete
625  msg.hdfs_scan_node = new THdfsScanNode(desc_.getId().asInt());
626  msg.node_type = TPlanNodeType.HDFS_SCAN_NODE;
627  }
628 
629  @Override
630  protected String getDisplayLabelDetail() {
631  HdfsTable table = (HdfsTable) desc_.getTable();
632  List<String> path = Lists.newArrayList();
633  path.add(table.getDb().getName());
634  path.add(table.getName());
635  Preconditions.checkNotNull(desc_.getPath());
636  if (desc_.hasExplicitAlias()) {
637  return desc_.getPath().toString() + " " + desc_.getAlias();
638  } else {
639  return desc_.getPath().toString();
640  }
641  }
642 
643  @Override
644  protected String getNodeExplainString(String prefix, String detailPrefix,
645  TExplainLevel detailLevel) {
646  StringBuilder output = new StringBuilder();
647  HdfsTable table = (HdfsTable) desc_.getTable();
648  output.append(String.format("%s%s [%s", prefix, getDisplayLabel(),
650  if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal() &&
652  output.append(", " + fragment_.getDataPartition().getExplainString());
653  }
654  output.append("]\n");
655  if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
656  int numPartitions = partitions_.size();
657  if (tbl_.getNumClusteringCols() == 0) numPartitions = 1;
658  output.append(String.format("%spartitions=%s/%s files=%s size=%s", detailPrefix,
659  numPartitions, table.getPartitions().size() - 1, totalFiles_,
660  PrintUtils.printBytes(totalBytes_)));
661  output.append("\n");
662  if (!conjuncts_.isEmpty()) {
663  output.append(
664  detailPrefix + "predicates: " + getExplainString(conjuncts_) + "\n");
665  }
666  }
667  if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
668  output.append(getStatsExplainString(detailPrefix, detailLevel));
669  output.append("\n");
670  }
671  return output.toString();
672  }
673 
674  @Override
675  public void computeCosts(TQueryOptions queryOptions) {
676  Preconditions.checkNotNull(scanRanges_, "Cost estimation requires scan ranges.");
677  if (scanRanges_.isEmpty()) {
678  perHostMemCost_ = 0;
679  return;
680  }
681 
682  // Number of nodes for the purpose of resource estimation adjusted
683  // for the special cases listed below.
684  long adjNumNodes = numNodes_;
685  if (numNodes_ <= 0) {
686  adjNumNodes = 1;
687  } else if (scanRanges_.size() < numNodes_) {
688  // TODO: Empirically evaluate whether there is more Hdfs block skew for relatively
689  // small files, i.e., whether this estimate is too optimistic.
690  adjNumNodes = scanRanges_.size();
691  }
692 
693  Preconditions.checkNotNull(desc_);
694  Preconditions.checkNotNull(desc_.getTable() instanceof HdfsTable);
695  HdfsTable table = (HdfsTable) desc_.getTable();
696  int perHostScanRanges;
697  if (table.getMajorityFormat() == HdfsFileFormat.PARQUET) {
698  // For the purpose of this estimation, the number of per-host scan ranges for
699  // Parquet files are equal to the number of non-partition columns scanned.
700  perHostScanRanges = 0;
701  for (SlotDescriptor slot: desc_.getSlots()) {
702  if (slot.getColumn() == null ||
703  slot.getColumn().getPosition() >= table.getNumClusteringCols()) {
704  ++perHostScanRanges;
705  }
706  }
707  } else {
708  perHostScanRanges = (int) Math.ceil((
709  (double) scanRanges_.size() / (double) adjNumNodes) * SCAN_RANGE_SKEW_FACTOR);
710  }
711 
712  // TODO: The total memory consumption for a particular query depends on the number
713  // of *available* cores, i.e., it depends the resource consumption of other
714  // concurrent queries. Figure out how to account for that.
715  int maxScannerThreads = Math.min(perHostScanRanges,
716  RuntimeEnv.INSTANCE.getNumCores() * THREADS_PER_CORE);
717  // Account for the max scanner threads query option.
718  if (queryOptions.isSetNum_scanner_threads() &&
719  queryOptions.getNum_scanner_threads() > 0) {
720  maxScannerThreads =
721  Math.min(maxScannerThreads, queryOptions.getNum_scanner_threads());
722  }
723 
724  long avgScanRangeBytes = (long) Math.ceil(totalBytes_ / (double) scanRanges_.size());
725  // The +1 accounts for an extra I/O buffer to read past the scan range due to a
726  // trailing record spanning Hdfs blocks.
727  long perThreadIoBuffers =
728  Math.min((long) Math.ceil(avgScanRangeBytes / (double) IO_MGR_BUFFER_SIZE),
730  perHostMemCost_ = maxScannerThreads * perThreadIoBuffers * IO_MGR_BUFFER_SIZE;
731 
732  // Sanity check: the tighter estimation should not exceed the per-host maximum.
733  long perHostUpperBound = getPerHostMemUpperBound();
734  if (perHostMemCost_ > perHostUpperBound) {
735  LOG.warn(String.format("Per-host mem cost %s exceeded per-host upper bound %s.",
736  PrintUtils.printBytes(perHostMemCost_),
737  PrintUtils.printBytes(perHostUpperBound)));
738  perHostMemCost_ = perHostUpperBound;
739  }
740  }
741 
749  public static long getPerHostMemUpperBound() {
750  // THREADS_PER_CORE each using a default of
751  // MAX_IO_BUFFERS_PER_THREAD * IO_MGR_BUFFER_SIZE bytes.
752  return (long) RuntimeEnv.INSTANCE.getNumCores() * (long) THREADS_PER_CORE *
754  }
755 }
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
List< HdfsPartition.FileDescriptor > getFileDescriptors()
HashSet< Long > evalInPredicate(Expr expr)
void assignConjuncts(Analyzer analyzer)
Definition: PlanNode.java:401
ArrayList< TupleId > tupleIds_
Definition: PlanNode.java:74
String getStatsExplainString(String prefix, TExplainLevel detailLevel)
Definition: ScanNode.java:75
void computeScanRangeLocations(Analyzer analyzer)
List< HdfsPartition > getPartitions()
Definition: HdfsTable.java:429
static String printBytes(long bytes)
Definition: PrintUtils.java:33
void markSlotsMaterialized(Analyzer analyzer, List< Expr > exprs)
Definition: PlanNode.java:464
final TupleDescriptor desc_
Definition: ScanNode.java:33
HashSet< Long > evalIsNullPredicate(Expr expr)
int SlotId
Definition: global-types.h:24
String getNodeExplainString(String prefix, String detailPrefix, TExplainLevel detailLevel)
void computeCosts(TQueryOptions queryOptions)
HashSet< Long > evalBinaryPredicate(Expr expr)
boolean canEvalUsingPartitionMd(Expr expr, Analyzer analyzer)
void computeMemLayout(Analyzer analyzer)
Definition: PlanNode.java:475
HdfsScanNode(PlanNodeId id, TupleDescriptor desc, HdfsTable tbl)
void evalPartitionFiltersInBe(List< HdfsPartitionFilter > filters, HashSet< Long > matchingPartitionIds, Analyzer analyzer)
long capAtLimit(long cardinality)
Definition: PlanNode.java:450
HashSet< Long > evalSlotBindingFilter(Expr expr)
List< TScanRangeLocations > scanRanges_
Definition: ScanNode.java:42
boolean isBoundBySlotIds(List< SlotId > slotIds)
Definition: Expr.java:869
final ArrayList< HdfsPartition > partitions_
static long addCardinalities(long a, long b)
Definition: PlanNode.java:528