Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ExchangeNode.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.planner;
16 
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19 
24 import com.cloudera.impala.thrift.TExchangeNode;
25 import com.cloudera.impala.thrift.TExplainLevel;
26 import com.cloudera.impala.thrift.TPlanNode;
27 import com.cloudera.impala.thrift.TPlanNodeType;
28 import com.cloudera.impala.thrift.TSortInfo;
29 import com.google.common.base.Preconditions;
30 import com.google.common.collect.Lists;
31 import com.google.common.collect.Sets;
32 
45 public class ExchangeNode extends PlanNode {
46  private final static Logger LOG = LoggerFactory.getLogger(ExchangeNode.class);
47 
48  // The parameters based on which sorted input streams are merged by this
49  // exchange node. Null if this exchange does not merge sorted streams
51 
52  // Offset after which the exchange begins returning rows. Currently valid
53  // only if mergeInfo_ is non-null, i.e. this is a merging exchange node.
54  private long offset_;
55 
56  public ExchangeNode(PlanNodeId id) {
57  super(id, "EXCHANGE");
58  offset_ = 0;
59  }
60 
61  public void addChild(PlanNode node, boolean copyConjuncts) {
62  // This ExchangeNode 'inherits' several parameters from its children.
63  // Ensure that all children agree on them.
64  if (!children_.isEmpty()) {
65  Preconditions.checkState(limit_ == node.limit_);
66  Preconditions.checkState(tupleIds_.equals(node.tupleIds_));
67  Preconditions.checkState(nullableTupleIds_.equals(node.nullableTupleIds_));
68  } else {
69  // Only apply the limit at the receiver if there are multiple senders.
70  if (node.getFragment().isPartitioned()) limit_ = node.limit_;
71  tupleIds_ = Lists.newArrayList(node.tupleIds_);
72  nullableTupleIds_ = Sets.newHashSet(node.nullableTupleIds_);
73  }
74  if (copyConjuncts) conjuncts_.addAll(Expr.cloneList(node.conjuncts_));
75  children_.add(node);
76  }
77 
78  @Override
79  public void computeStats(Analyzer analyzer) {
80  Preconditions.checkState(!children_.isEmpty(),
81  "ExchangeNode must have at least one child");
82  cardinality_ = 0;
83  for (PlanNode child: children_) {
84  if (child.getCardinality() == -1) {
85  cardinality_ = -1;
86  break;
87  }
88  cardinality_ = addCardinalities(cardinality_, child.getCardinality());
89  }
90 
91  if (hasLimit()) {
92  if (cardinality_ == -1) {
94  } else {
95  cardinality_ = Math.min(limit_, cardinality_);
96  }
97  }
98 
99  // Apply the offset correction if there's a valid cardinality
100  if (cardinality_ > -1) {
101  cardinality_ = Math.max(0, cardinality_ - offset_);
102  }
103 
104  // Pick the max numNodes_ and avgRowSize_ of all children.
105  numNodes_ = Integer.MIN_VALUE;
106  avgRowSize_ = Integer.MIN_VALUE;
107  for (PlanNode child: children_) {
108  numNodes_ = Math.max(child.numNodes_, numNodes_);
109  avgRowSize_ = Math.max(child.avgRowSize_, avgRowSize_);
110  }
111  }
112 
117  public void setMergeInfo(SortInfo info, long offset) {
118  mergeInfo_ = info;
119  offset_ = offset;
120  displayName_ = "MERGING-EXCHANGE";
121  }
122 
123  @Override
124  protected String getNodeExplainString(String prefix, String detailPrefix,
125  TExplainLevel detailLevel) {
126  StringBuilder output = new StringBuilder();
127  output.append(String.format("%s%s [%s]\n", prefix,
129 
130  if (offset_ > 0) {
131  output.append(detailPrefix + "offset: ").append(offset_).append("\n");
132  }
133 
134  if (mergeInfo_ != null && detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
135  output.append(detailPrefix + "order by: ");
136  for (int i = 0; i < mergeInfo_.getOrderingExprs().size(); ++i) {
137  if (i > 0) output.append(", ");
138  output.append(mergeInfo_.getOrderingExprs().get(i).toSql() + " ");
139  output.append(mergeInfo_.getIsAscOrder().get(i) ? "ASC" : "DESC");
140 
141  Boolean nullsFirstParam = mergeInfo_.getNullsFirstParams().get(i);
142  if (nullsFirstParam != null) {
143  output.append(nullsFirstParam ? " NULLS FIRST" : " NULLS LAST");
144  }
145  }
146  output.append("\n");
147  }
148  return output.toString();
149  }
150 
151  @Override
152  protected String getDisplayLabelDetail() {
153  // For the non-fragmented explain levels, print the data partition
154  // of the data stream sink that sends to this exchange node.
155  Preconditions.checkState(!children_.isEmpty());
156  DataSink sink = getChild(0).getFragment().getSink();
157  if (sink == null) return "";
158  Preconditions.checkState(sink instanceof DataStreamSink);
159  DataStreamSink streamSink = (DataStreamSink) sink;
160  if (!streamSink.getOutputPartition().isPartitioned() &&
162  // If the output of the sink is not partitioned but the target fragment is
163  // partitioned, then the data exchange is broadcast.
164  return "BROADCAST";
165  } else {
166  return streamSink.getOutputPartition().getExplainString();
167  }
168  }
169 
170  @Override
171  protected void toThrift(TPlanNode msg) {
172  Preconditions.checkState(!children_.isEmpty(),
173  "ExchangeNode must have at least one child");
174  msg.node_type = TPlanNodeType.EXCHANGE_NODE;
175  msg.exchange_node = new TExchangeNode();
176  for (TupleId tid: tupleIds_) {
177  msg.exchange_node.addToInput_row_tuples(tid.asInt());
178  }
179 
180  if (mergeInfo_ != null) {
181  TSortInfo sortInfo = new TSortInfo(
183  mergeInfo_.getNullsFirst());
184  msg.exchange_node.setSort_info(sortInfo);
185  msg.exchange_node.setOffset(offset_);
186  }
187  }
188 }
String getNodeExplainString(String prefix, String detailPrefix, TExplainLevel detailLevel)
static List< TExpr > treesToThrift(List<?extends Expr > exprs)
Definition: Expr.java:515
ArrayList< TupleId > tupleIds_
Definition: PlanNode.java:74
void addChild(PlanNode node, boolean copyConjuncts)
int TupleId
Definition: global-types.h:23
void setMergeInfo(SortInfo info, long offset)
uint8_t offset[7 *64-sizeof(uint64_t)]
static long addCardinalities(long a, long b)
Definition: PlanNode.java:528