Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
SortNode.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.planner;
16 
17 import java.util.List;
18 
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21 
30 import com.cloudera.impala.thrift.TExplainLevel;
31 import com.cloudera.impala.thrift.TPlanNode;
32 import com.cloudera.impala.thrift.TPlanNodeType;
33 import com.cloudera.impala.thrift.TQueryOptions;
34 import com.cloudera.impala.thrift.TSortInfo;
35 import com.cloudera.impala.thrift.TSortNode;
36 import com.google.common.base.Joiner;
37 import com.google.common.base.Objects;
38 import com.google.common.base.Preconditions;
39 import com.google.common.collect.Lists;
40 
47 public class SortNode extends PlanNode {
48  private final static Logger LOG = LoggerFactory.getLogger(SortNode.class);
49 
50  private final SortInfo info_;
51 
52  // if set, this SortNode requires its input to have this data partition
54 
55  // if true, the output of this node feeds an AnalyticNode
56  private boolean isAnalyticSort_;
57 
58  // info_.sortTupleSlotExprs_ substituted with the outputSmap_ for materialized slots
59  // in init().
60  private List<Expr> resolvedTupleExprs_;
61  private final boolean useTopN_;
62  // The offset of the first row to return.
63  protected long offset_;
64 
65  public SortNode(PlanNodeId id, PlanNode input, SortInfo info, boolean useTopN,
66  long offset) {
67  super(id, Lists.newArrayList(info.getSortTupleDescriptor().getId()),
68  getDisplayName(useTopN, false));
69  info_ = info;
70  useTopN_ = useTopN;
71  children_.add(input);
72  offset_ = offset;
73  }
74 
75  public long getOffset() { return offset_; }
76  public void setOffset(long offset) { offset_ = offset; }
77  public boolean hasOffset() { return offset_ > 0; }
78  public boolean useTopN() { return useTopN_; }
79  public SortInfo getSortInfo() { return info_; }
80  public void setInputPartition(DataPartition inputPartition) {
81  inputPartition_ = inputPartition;
82  }
84  public boolean isAnalyticSort() { return isAnalyticSort_; }
85  public void setIsAnalyticSort(boolean v) { isAnalyticSort_ = v; }
86 
87  @Override
88  public boolean isBlockingNode() { return true; }
89 
90  @Override
91  public void init(Analyzer analyzer) throws InternalException {
92  assignConjuncts(analyzer);
93  // Compute the memory layout for the generated tuple.
94  computeMemLayout(analyzer);
95  computeStats(analyzer);
96 
97  // populate resolvedTupleExprs_ and outputSmap_
98  List<SlotDescriptor> sortTupleSlots = info_.getSortTupleDescriptor().getSlots();
99  List<Expr> slotExprs = info_.getSortTupleSlotExprs();
100  Preconditions.checkState(sortTupleSlots.size() == slotExprs.size());
101  resolvedTupleExprs_ = Lists.newArrayList();
103  for (int i = 0; i < slotExprs.size(); ++i) {
104  if (!sortTupleSlots.get(i).isMaterialized()) continue;
105  resolvedTupleExprs_.add(slotExprs.get(i));
106  outputSmap_.put(slotExprs.get(i), new SlotRef(sortTupleSlots.get(i)));
107  }
109  resolvedTupleExprs_ =
110  Expr.substituteList(resolvedTupleExprs_, childSmap, analyzer, false);
111 
112  // Remap the ordering exprs to the tuple materialized by this sort node. The mapping
113  // is a composition of the childSmap and the outputSmap_ because the child node may
114  // have also remapped its input (e.g., as in a a series of (sort->analytic)* nodes).
115  // Parent nodes have have to do the same so set the composition as the outputSmap_.
116  outputSmap_ = ExprSubstitutionMap.compose(childSmap, outputSmap_, analyzer);
117 
118  info_.substituteOrderingExprs(outputSmap_, analyzer);
119  info_.checkConsistency();
120 
121  LOG.trace("sort id " + tupleIds_.get(0).toString() + " smap: "
122  + outputSmap_.debugString());
123  LOG.trace("sort input exprs: " + Expr.debugString(resolvedTupleExprs_));
124  }
125 
126  @Override
127  protected void computeStats(Analyzer analyzer) {
128  super.computeStats(analyzer);
129  cardinality_ = capAtLimit(getChild(0).cardinality_);
130  LOG.debug("stats Sort: cardinality=" + Long.toString(cardinality_));
131  }
132 
133  @Override
134  protected String debugString() {
135  List<String> strings = Lists.newArrayList();
136  for (Boolean isAsc : info_.getIsAscOrder()) {
137  strings.add(isAsc ? "a" : "d");
138  }
139  return Objects.toStringHelper(this)
140  .add("ordering_exprs", Expr.debugString(info_.getOrderingExprs()))
141  .add("is_asc", "[" + Joiner.on(" ").join(strings) + "]")
142  .add("nulls_first", "[" + Joiner.on(" ").join(info_.getNullsFirst()) + "]")
143  .add("offset_", offset_)
144  .addValue(super.debugString())
145  .toString();
146  }
147 
148  @Override
149  protected void toThrift(TPlanNode msg) {
150  msg.node_type = TPlanNodeType.SORT_NODE;
151  TSortInfo sort_info = new TSortInfo(Expr.treesToThrift(info_.getOrderingExprs()),
152  info_.getIsAscOrder(), info_.getNullsFirst());
153  Preconditions.checkState(tupleIds_.size() == 1,
154  "Incorrect size for tupleIds_ in SortNode");
155  sort_info.sort_tuple_slot_exprs = Expr.treesToThrift(resolvedTupleExprs_);
156  TSortNode sort_node = new TSortNode(sort_info, useTopN_);
157  sort_node.setOffset(offset_);
158  msg.sort_node = sort_node;
159  }
160 
161  @Override
162  protected String getNodeExplainString(String prefix, String detailPrefix,
163  TExplainLevel detailLevel) {
164  StringBuilder output = new StringBuilder();
165  output.append(String.format("%s%s:%s%s\n", prefix, id_.toString(),
166  displayName_, getNodeExplainDetail(detailLevel)));
167  if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
168  output.append(detailPrefix + "order by: ");
169  for (int i = 0; i < info_.getOrderingExprs().size(); ++i) {
170  if (i > 0) output.append(", ");
171  output.append(info_.getOrderingExprs().get(i).toSql() + " ");
172  output.append(info_.getIsAscOrder().get(i) ? "ASC" : "DESC");
173 
174  Boolean nullsFirstParam = info_.getNullsFirstParams().get(i);
175  if (nullsFirstParam != null) {
176  output.append(nullsFirstParam ? " NULLS FIRST" : " NULLS LAST");
177  }
178  }
179  output.append("\n");
180  }
181  return output.toString();
182  }
183 
184  private String getNodeExplainDetail(TExplainLevel detailLevel) {
185  if (!hasLimit()) return "";
186  if (hasOffset()) {
187  return String.format(" [LIMIT=%s OFFSET=%s]", limit_, offset_);
188  } else {
189  return String.format(" [LIMIT=%s]", limit_);
190  }
191  }
192 
193  @Override
194  protected String getOffsetExplainString(String prefix) {
195  return offset_ != 0 ? prefix + "offset: " + Long.toString(offset_) + "\n" : "";
196  }
197 
198  @Override
199  public void computeCosts(TQueryOptions queryOptions) {
200  Preconditions.checkState(hasValidStats());
201  if (useTopN_) {
202  perHostMemCost_ = (long) Math.ceil((cardinality_ + offset_) * avgRowSize_);
203  return;
204  }
205 
206  // For an external sort, set the memory cost to be what is required for a 2-phase
207  // sort. If the input to be sorted would take up N blocks in memory, then the
208  // memory required for a 2-phase sort is sqrt(N) blocks. A single run would be of
209  // size sqrt(N) blocks, and we could merge sqrt(N) such runs with sqrt(N) blocks
210  // of memory.
211  double fullInputSize = getChild(0).cardinality_ * avgRowSize_;
212  boolean hasVarLenSlots = false;
213  for (SlotDescriptor slotDesc: info_.getSortTupleDescriptor().getSlots()) {
214  if (slotDesc.isMaterialized() && !slotDesc.getType().isFixedLengthType()) {
215  hasVarLenSlots = true;
216  break;
217  }
218  }
219 
220  // The block size used by the sorter is the same as the configured I/O read size.
221  long blockSize = BackendConfig.INSTANCE.getReadSize();
222  // The external sorter writes fixed-len and var-len data in separate sequences of
223  // blocks on disk and reads from both sequences when merging. This effectively
224  // doubles the block size when there are var-len columns present.
225  if (hasVarLenSlots) blockSize *= 2;
226  double numInputBlocks = Math.ceil(fullInputSize / blockSize);
227  perHostMemCost_ = blockSize * (long) Math.ceil(Math.sqrt(numInputBlocks));
228  }
229 
230  private static String getDisplayName(boolean isTopN, boolean isMergeOnly) {
231  if (isTopN) {
232  return "TOP-N";
233  } else {
234  return "SORT";
235  }
236  }
237 }
void computeStats(Analyzer analyzer)
Definition: SortNode.java:127
static String getDisplayName(boolean isTopN, boolean isMergeOnly)
Definition: SortNode.java:230
static List< TExpr > treesToThrift(List<?extends Expr > exprs)
Definition: Expr.java:515
String getNodeExplainDetail(TExplainLevel detailLevel)
Definition: SortNode.java:184
void setInputPartition(DataPartition inputPartition)
Definition: SortNode.java:80
void assignConjuncts(Analyzer analyzer)
Definition: PlanNode.java:401
void computeCosts(TQueryOptions queryOptions)
Definition: SortNode.java:199
ExprSubstitutionMap getCombinedChildSmap()
Definition: PlanNode.java:410
SortNode(PlanNodeId id, PlanNode input, SortInfo info, boolean useTopN, long offset)
Definition: SortNode.java:65
void init(Analyzer analyzer)
Definition: SortNode.java:91
String getNodeExplainString(String prefix, String detailPrefix, TExplainLevel detailLevel)
Definition: SortNode.java:162
TupleDescriptor getSortTupleDescriptor()
Definition: SortInfo.java:63
void computeMemLayout(Analyzer analyzer)
Definition: PlanNode.java:475
String getOffsetExplainString(String prefix)
Definition: SortNode.java:194
long capAtLimit(long cardinality)
Definition: PlanNode.java:450
uint8_t offset[7 *64-sizeof(uint64_t)]
ExprSubstitutionMap outputSmap_
Definition: PlanNode.java:93