Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
HdfsTableSink.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.planner;
16 
17 import java.util.List;
18 
24 import com.cloudera.impala.thrift.TDataSink;
25 import com.cloudera.impala.thrift.TDataSinkType;
26 import com.cloudera.impala.thrift.TExplainLevel;
27 import com.cloudera.impala.thrift.THdfsTableSink;
28 import com.cloudera.impala.thrift.TTableSink;
29 import com.cloudera.impala.thrift.TTableSinkType;
30 import com.google.common.base.Preconditions;
31 
36 public class HdfsTableSink extends TableSink {
37  // Default number of partitions used for computeCosts() in the absence of column stats.
38  protected final long DEFAULT_NUM_PARTITIONS = 10;
39 
40  // Exprs for computing the output partition(s).
41  protected final List<Expr> partitionKeyExprs_;
42  // Whether to overwrite the existing partition(s).
43  protected final boolean overwrite_;
44 
45  public HdfsTableSink(Table targetTable, List<Expr> partitionKeyExprs,
46  boolean overwrite) {
47  super(targetTable);
48  Preconditions.checkState(targetTable instanceof HdfsTable);
49  partitionKeyExprs_ = partitionKeyExprs;
50  overwrite_ = overwrite;
51  }
52 
53  @Override
54  public void computeCosts() {
56  // TODO: Estimate the memory requirements more accurately by partition type.
57  HdfsFileFormat format = table.getMajorityFormat();
58  PlanNode inputNode = fragment_.getPlanRoot();
59  int numNodes = fragment_.getNumNodes();
60  // Compute the per-host number of partitions, taking the number of nodes
61  // and the data partition of the fragment executing this sink into account.
62  long numPartitions = fragment_.getNumDistinctValues(partitionKeyExprs_);
63  if (numPartitions == -1) numPartitions = DEFAULT_NUM_PARTITIONS;
64  long perPartitionMemReq = getPerPartitionMemReq(format);
65 
66  // The estimate is based purely on the per-partition mem req if the input cardinality_
67  // or the avg row size is unknown.
68  if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) {
69  perHostMemCost_ = numPartitions * perPartitionMemReq;
70  return;
71  }
72 
73  // The per-partition estimate may be higher than the memory required to buffer
74  // the entire input data.
75  long perHostInputCardinality = Math.max(1L, inputNode.getCardinality() / numNodes);
76  long perHostInputBytes =
77  (long) Math.ceil(perHostInputCardinality * inputNode.getAvgRowSize());
78  perHostMemCost_ = Math.min(perHostInputBytes, numPartitions * perPartitionMemReq);
79  }
80 
85  private long getPerPartitionMemReq(HdfsFileFormat format) {
86  switch (format) {
87  // Writing to a Parquet table requires up to 1GB of buffer per partition.
88  // TODO: The per-partition memory requirement is configurable in the QueryOptions.
89  case PARQUET: return 1024L * 1024L * 1024L;
90  case TEXT: return 100L * 1024L;
91  default:
92  Preconditions.checkState(false, "Unsupported TableSink format " +
93  format.toString());
94  }
95  return 0;
96  }
97 
98  @Override
99  public String getExplainString(String prefix, String detailPrefix,
100  TExplainLevel explainLevel) {
101  StringBuilder output = new StringBuilder();
102  String overwriteStr = ", OVERWRITE=" + (overwrite_ ? "true" : "false");
103  String partitionKeyStr = "";
104  if (!partitionKeyExprs_.isEmpty()) {
105  StringBuilder tmpBuilder = new StringBuilder(", PARTITION-KEYS=(");
106  for (Expr expr: partitionKeyExprs_) {
107  tmpBuilder.append(expr.toSql() + ",");
108  }
109  tmpBuilder.deleteCharAt(tmpBuilder.length() - 1);
110  tmpBuilder.append(")");
111  partitionKeyStr = tmpBuilder.toString();
112  }
113  output.append(String.format("%sWRITE TO HDFS [%s%s%s]\n", prefix,
114  targetTable_.getFullName(), overwriteStr, partitionKeyStr));
115  // Report the total number of partitions, independent of the number of nodes
116  // and the data partition of the fragment executing this sink.
117  if (explainLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
118  long totalNumPartitions = Expr.getNumDistinctValues(partitionKeyExprs_);
119  if (totalNumPartitions == -1) {
120  output.append(detailPrefix + "partitions=unavailable");
121  } else {
122  output.append(detailPrefix + "partitions="
123  + (totalNumPartitions == 0 ? 1 : totalNumPartitions));
124  }
125  output.append("\n");
126  if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
127  output.append(PrintUtils.printHosts(detailPrefix, fragment_.getNumNodes()));
128  output.append(PrintUtils.printMemCost(" ", perHostMemCost_));
129  output.append("\n");
130  }
131  }
132  return output.toString();
133  }
134 
135  @Override
136  protected TDataSink toThrift() {
137  TDataSink result = new TDataSink(TDataSinkType.TABLE_SINK);
138  THdfsTableSink hdfsTableSink = new THdfsTableSink(
140  TTableSink tTableSink = new TTableSink(targetTable_.getId().asInt(),
141  TTableSinkType.HDFS);
142  tTableSink.hdfs_table_sink = hdfsTableSink;
143  result.table_sink = tTableSink;
144  return result;
145  }
146 }
static List< TExpr > treesToThrift(List<?extends Expr > exprs)
Definition: Expr.java:515
HdfsTableSink(Table targetTable, List< Expr > partitionKeyExprs, boolean overwrite)
String getExplainString(String prefix, String detailPrefix, TExplainLevel explainLevel)
long getPerPartitionMemReq(HdfsFileFormat format)