Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ComputeStatsStmt.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.analysis;
16 
17 import java.util.Iterator;
18 import java.util.List;
19 
20 import org.apache.hadoop.hive.metastore.api.FieldSchema;
21 import org.apache.log4j.Logger;
22 
32 import com.cloudera.impala.thrift.TComputeStatsParams;
33 import com.cloudera.impala.thrift.TPartitionStats;
34 import com.cloudera.impala.thrift.TTableName;
35 import com.google.common.base.Joiner;
36 import com.google.common.base.Preconditions;
37 import com.google.common.collect.Lists;
38 
52 public class ComputeStatsStmt extends StatementBase {
53  private static final Logger LOG = Logger.getLogger(ComputeStatsStmt.class);
54 
55  private static String AVRO_SCHEMA_MSG_PREFIX = "Cannot COMPUTE STATS on Avro table " +
56  "'%s' because its column definitions do not match those in the Avro schema.";
57  private static String AVRO_SCHEMA_MSG_SUFFIX = "Please re-create the table with " +
58  "column definitions, e.g., using the result of 'SHOW CREATE TABLE'";
59 
60  protected final TableName tableName_;
61 
62  // Set during analysis.
63  protected Table table_;
64 
65  // The Null count is not currently being used in optimization or run-time,
66  // and compute stats runs 2x faster in many cases when not counting NULLs.
67  private static final boolean COUNT_NULLS = false;
68 
69  // Query for getting the per-partition row count and the total row count.
70  // Set during analysis.
71  protected String tableStatsQueryStr_;
72 
73  // Query for getting the per-column NDVs and number of NULLs.
74  // Set during analysis.
75  protected String columnStatsQueryStr_;
76 
77  // If true, stats will be gathered incrementally per-partition.
78  private boolean isIncremental_ = false;
79 
80  // If true, expect the compute stats process to produce output for all partitions in the
81  // target table (only meaningful, therefore, if partitioned). This is always true for
82  // non-incremental computations. If set, expectedPartitions_ will be empty - the point
83  // of this flag is to optimise the case where all partitions are targeted.
84  private boolean expectAllPartitions_ = false;
85 
86  // The list of valid partition statistics that can be used in an incremental computation
87  // without themselves being recomputed. Populated in analyze().
88  private final List<TPartitionStats> validPartStats_ = Lists.newArrayList();
89 
90  // For incremental computations, the list of partitions (identified by list of partition
91  // column values) that we expect to receive results for. Used to ensure that even empty
92  // partitions emit results.
93  // TODO: Consider using partition IDs (and adding them to the child queries with a
94  // PARTITION_ID() builtin)
95  private final List<List<String>> expectedPartitions_ = Lists.newArrayList();
96 
97  // If non-null, the partition that an incremental computation might apply to. Must be
98  // null if this is a non-incremental computation.
99  private PartitionSpec partitionSpec_ = null;
100 
101  // The maximum number of partitions that may be explicitly selected by filter
102  // predicates. Any query that selects more than this automatically drops back to a full
103  // incremental stats recomputation.
104  // TODO: We can probably do better than this, e.g. running several queries, each of
105  // which selects up to MAX_INCREMENTAL_PARTITIONS partitions.
106  private static final int MAX_INCREMENTAL_PARTITIONS = 1000;
107 
111  protected ComputeStatsStmt(TableName tableName) {
112  this(tableName, false, null);
113  }
114 
121  protected ComputeStatsStmt(TableName tableName, boolean isIncremental,
122  PartitionSpec partSpec) {
123  Preconditions.checkState(tableName != null && !tableName.isEmpty());
124  Preconditions.checkState(isIncremental || partSpec == null);
125  this.tableName_ = tableName;
126  this.table_ = null;
127  this.isIncremental_ = isIncremental;
128  this.partitionSpec_ = partSpec;
129  if (partitionSpec_ != null) {
130  partitionSpec_.setTableName(tableName);
131  partitionSpec_.setPrivilegeRequirement(Privilege.ALTER);
132  }
133  }
134 
139  private void addPartitionCols(HdfsTable table, List<String> selectList,
140  List<String> groupByCols) {
141  for (int i = 0; i < table.getNumClusteringCols(); ++i) {
142  String colRefSql = ToSqlUtils.getIdentSql(table.getColumns().get(i).getName());
143  groupByCols.add(colRefSql);
144  // For the select list, wrap the group by columns in a cast to string because
145  // the Metastore stores them as strings.
146  selectList.add(colRefSql);
147  }
148  }
149 
150  private List<String> getBaseColumnStatsQuerySelectList(Analyzer analyzer) {
151  List<String> columnStatsSelectList = Lists.newArrayList();
152  // For Hdfs tables, exclude partition columns from stats gathering because Hive
153  // cannot store them as part of the non-partition column stats. For HBase tables,
154  // include the single clustering column (the row key).
155  int startColIdx = (table_ instanceof HBaseTable) ? 0 : table_.getNumClusteringCols();
156  final String ndvUda = isIncremental_ ? "NDV_NO_FINALIZE" : "NDV";
157 
158  for (int i = startColIdx; i < table_.getColumns().size(); ++i) {
159  Column c = table_.getColumns().get(i);
160  Type type = c.getType();
161 
162  // Ignore columns with an invalid/unsupported type. For example, complex types in
163  // an HBase-backed table will appear as invalid types.
164  if (!type.isValid() || !type.isSupported()
165  || c.getType().isComplexType()) {
166  continue;
167  }
168  // NDV approximation function. Add explicit alias for later identification when
169  // updating the Metastore.
170  String colRefSql = ToSqlUtils.getIdentSql(c.getName());
171  columnStatsSelectList.add(ndvUda + "(" + colRefSql + ") AS " + colRefSql);
172 
173  if (COUNT_NULLS) {
174  // Count the number of NULL values.
175  columnStatsSelectList.add("COUNT(IF(" + colRefSql + " IS NULL, 1, NULL))");
176  } else {
177  // Using -1 to indicate "unknown". We need cast to BIGINT because backend expects
178  // an i64Val as the number of NULLs returned by the COMPUTE STATS column stats
179  // child query. See CatalogOpExecutor::SetColumnStats(). If we do not cast, then
180  // the -1 will be treated as TINYINT resulting a 0 to be placed in the #NULLs
181  // column (see IMPALA-1068).
182  columnStatsSelectList.add("CAST(-1 as BIGINT)");
183  }
184 
185  // For STRING columns also compute the max and avg string length.
186  if (type.isStringType()) {
187  columnStatsSelectList.add("MAX(length(" + colRefSql + "))");
188  columnStatsSelectList.add("AVG(length(" + colRefSql + "))");
189  } else {
190  // For non-STRING columns we use the fixed size of the type.
191  // We store the same information for all types to avoid having to
192  // treat STRING columns specially in the BE CatalogOpExecutor.
193  Integer typeSize = type.getPrimitiveType().getSlotSize();
194  columnStatsSelectList.add(typeSize.toString());
195  columnStatsSelectList.add("CAST(" + typeSize.toString() + " as DOUBLE)");
196  }
197 
198  if (isIncremental_) {
199  // Need the count in order to properly combine per-partition column stats
200  columnStatsSelectList.add("COUNT(" + colRefSql + ")");
201  }
202  }
203  return columnStatsSelectList;
204  }
205 
241  @Override
242  public void analyze(Analyzer analyzer) throws AnalysisException {
243  table_ = analyzer.getTable(tableName_, Privilege.ALTER);
244  String sqlTableName = table_.getTableName().toSql();
245  if (table_ instanceof View) {
246  throw new AnalysisException(String.format(
247  "COMPUTE STATS not supported for view %s", sqlTableName));
248  }
249 
250  if (!(table_ instanceof HdfsTable)) {
251  if (partitionSpec_ != null) {
252  throw new AnalysisException("COMPUTE INCREMENTAL ... PARTITION not supported " +
253  "for non-HDFS table " + table_.getTableName());
254  }
255  isIncremental_ = false;
256  }
257 
258  // Ensure that we write an entry for every partition if this isn't incremental
260 
261  HdfsTable hdfsTable = null;
262  if (table_ instanceof HdfsTable) {
263  hdfsTable = (HdfsTable)table_;
264  if (isIncremental_ && hdfsTable.getNumClusteringCols() == 0 &&
265  partitionSpec_ != null) {
266  throw new AnalysisException(String.format(
267  "Can't compute PARTITION stats on an unpartitioned table: %s",
268  sqlTableName));
269  } else if (partitionSpec_ != null) {
270  partitionSpec_.setPartitionShouldExist();
271  partitionSpec_.analyze(analyzer);
273  // TODO: We could match the dynamic keys (i.e. as wildcards) as well, but that
274  // would involve looping over all partitions and seeing which match the
275  // partition spec.
276  if (!kv.isStatic()) {
277  throw new AnalysisException("All partition keys must have values: " +
278  kv.toString());
279  }
280  }
281  }
282  }
283 
284  // Build partition filters that only select partitions without valid statistics for
285  // incremental computation.
286  List<String> filterPreds = Lists.newArrayList();
287  if (isIncremental_) {
288  if (partitionSpec_ == null) {
289  // If any column does not have stats, we recompute statistics for all partitions
290  // TODO: need a better way to invalidate stats for all partitions, so that we can
291  // use this logic to only recompute new / changed columns.
292  boolean tableIsMissingColStats = false;
293 
294  // We'll warn the user if a column is missing stats (and therefore we rescan the
295  // whole table), but if all columns are missing stats, the table just doesn't have
296  // any stats and there's no need to warn.
297  boolean allColumnsMissingStats = true;
298  String exampleColumnMissingStats = null;
299  // Partition columns always have stats, so exclude them from this search
300  for (Column col: table_.getNonClusteringColumns()) {
301  if (!col.getStats().hasStats()) {
302  if (!tableIsMissingColStats) {
303  tableIsMissingColStats = true;
304  exampleColumnMissingStats = col.getName();
305  }
306  } else {
307  allColumnsMissingStats = false;
308  }
309  }
310 
311  if (tableIsMissingColStats && !allColumnsMissingStats) {
312  analyzer.addWarning("Column " + exampleColumnMissingStats +
313  " does not have statistics, recomputing stats for the whole table");
314  }
315 
316  for (HdfsPartition p: hdfsTable.getPartitions()) {
317  if (p.isDefaultPartition()) continue;
318  TPartitionStats partStats = p.getPartitionStats();
319  if (!p.hasIncrementalStats() || tableIsMissingColStats) {
320  if (partStats == null) LOG.trace(p.toString() + " does not have stats");
321  if (!tableIsMissingColStats) filterPreds.add(p.getConjunctSql());
322  List<String> partValues = Lists.newArrayList();
323  for (LiteralExpr partValue: p.getPartitionValues()) {
324  partValues.add(PartitionKeyValue.getPartitionKeyValueString(partValue,
325  "NULL"));
326  }
327  expectedPartitions_.add(partValues);
328  } else {
329  LOG.trace(p.toString() + " does have statistics");
330  validPartStats_.add(partStats);
331  }
332  }
333  if (expectedPartitions_.size() == hdfsTable.getPartitions().size() - 1) {
334  expectedPartitions_.clear();
335  expectAllPartitions_ = true;
336  }
337  } else {
338  // Always compute stats on a particular partition when told to.
339  List<String> partitionConjuncts = Lists.newArrayList();
341  partitionConjuncts.add(kv.toPredicateSql());
342  }
343  filterPreds.add("(" + Joiner.on(" AND ").join(partitionConjuncts) + ")");
344  HdfsPartition targetPartition =
345  hdfsTable.getPartition(partitionSpec_.getPartitionSpecKeyValues());
346  for (HdfsPartition p: hdfsTable.getPartitions()) {
347  if (p.isDefaultPartition()) continue;
348  if (p == targetPartition) continue;
349  TPartitionStats partStats = p.getPartitionStats();
350  if (partStats != null) validPartStats_.add(partStats);
351  }
352  }
353 
354  if (filterPreds.size() == 0 && validPartStats_.size() != 0) {
355  LOG.info("No partitions selected for incremental stats update");
356  analyzer.addWarning("No partitions selected for incremental stats update");
357  return;
358  }
359  }
360 
361  if (filterPreds.size() > MAX_INCREMENTAL_PARTITIONS) {
362  // TODO: Consider simply running for MAX_INCREMENTAL_PARTITIONS partitions, and then
363  // advising the user to iterate.
364  analyzer.addWarning(
365  "Too many partitions selected, doing full recomputation of incremental stats");
366  filterPreds.clear();
367  validPartStats_.clear();
368  }
369 
370  List<String> groupByCols = Lists.newArrayList();
371  List<String> partitionColsSelectList = Lists.newArrayList();
372  // Only add group by clause for HdfsTables.
373  if (hdfsTable != null) {
374  if (hdfsTable.isAvroTable()) checkIncompleteAvroSchema(hdfsTable);
375  addPartitionCols(hdfsTable, partitionColsSelectList, groupByCols);
376  }
377 
378  // Query for getting the per-partition row count and the total row count.
379  StringBuilder tableStatsQueryBuilder = new StringBuilder("SELECT ");
380  List<String> tableStatsSelectList = Lists.newArrayList();
381  tableStatsSelectList.add("COUNT(*)");
382 
383  tableStatsSelectList.addAll(partitionColsSelectList);
384  tableStatsQueryBuilder.append(Joiner.on(", ").join(tableStatsSelectList));
385  tableStatsQueryBuilder.append(" FROM " + sqlTableName);
386 
387  // Query for getting the per-column NDVs and number of NULLs.
388  List<String> columnStatsSelectList = getBaseColumnStatsQuerySelectList(analyzer);
389 
390  if (isIncremental_) columnStatsSelectList.addAll(partitionColsSelectList);
391 
392  StringBuilder columnStatsQueryBuilder = new StringBuilder("SELECT ");
393  columnStatsQueryBuilder.append(Joiner.on(", ").join(columnStatsSelectList));
394  columnStatsQueryBuilder.append(" FROM " + sqlTableName);
395 
396  // Add the WHERE clause to filter out partitions that we don't want to compute
397  // incremental stats for. While this is a win in most situations, we would like to
398  // avoid this where it does no useful work (i.e. it selects all rows). This happens
399  // when there are no existing valid partitions (so all partitions will have been
400  // selected in) and there is no partition spec (so no single partition was explicitly
401  // selected in).
402  if (filterPreds.size() > 0 &&
403  (validPartStats_.size() > 0 || partitionSpec_ != null)) {
404  String filterClause = " WHERE " + Joiner.on(" OR ").join(filterPreds);
405  columnStatsQueryBuilder.append(filterClause);
406  tableStatsQueryBuilder.append(filterClause);
407  }
408 
409  if (groupByCols.size() > 0) {
410  String groupBy = " GROUP BY " + Joiner.on(", ").join(groupByCols);
411  if (isIncremental_) columnStatsQueryBuilder.append(groupBy);
412  tableStatsQueryBuilder.append(groupBy);
413  }
414 
415  tableStatsQueryStr_ = tableStatsQueryBuilder.toString();
416  LOG.debug("Table stats query: " + tableStatsQueryStr_);
417 
418  if (columnStatsSelectList.isEmpty()) {
419  // Table doesn't have any columns that we can compute stats for.
420  LOG.info("No supported column types in table " + table_.getTableName() +
421  ", no column statistics will be gathered.");
422  columnStatsQueryStr_ = null;
423  return;
424  }
425 
426  columnStatsQueryStr_ = columnStatsQueryBuilder.toString();
427  LOG.debug("Column stats query: " + columnStatsQueryStr_);
428  }
429 
438  Preconditions.checkState(table.isAvroTable());
439  org.apache.hadoop.hive.metastore.api.Table msTable = table.getMetaStoreTable();
440  // The column definitions from 'CREATE TABLE (column definitions) ...'
441  Iterator<FieldSchema> colDefs = msTable.getSd().getCols().iterator();
442  // The columns derived from the Avro schema file or literal schema.
443  // Inconsistencies between the Avro-schema columns and the column definitions
444  // are sometimes resolved in the CREATE TABLE, and sometimes not (see below).
445  Iterator<Column> avroSchemaCols = table.getColumns().iterator();
446  // Skip partition columns from 'table' since those are not present in
447  // the msTable field schemas.
448  for (int i = 0; i < table.getNumClusteringCols(); ++i) {
449  if (avroSchemaCols.hasNext()) avroSchemaCols.next();
450  }
451  int pos = 0;
452  while (colDefs.hasNext() || avroSchemaCols.hasNext()) {
453  if (colDefs.hasNext() && avroSchemaCols.hasNext()) {
454  FieldSchema colDef = colDefs.next();
455  Column avroSchemaCol = avroSchemaCols.next();
456  // Check that the column names are identical. Ignore mismatched types
457  // as those will either fail in the scan or succeed.
458  if (!colDef.getName().equalsIgnoreCase(avroSchemaCol.getName())) {
459  throw new AnalysisException(
460  String.format(AVRO_SCHEMA_MSG_PREFIX +
461  "\nDefinition of column '%s' of type '%s' does not match " +
462  "the Avro-schema column '%s' of type '%s' at position '%s'.\n" +
464  table.getName(), colDef.getName(), colDef.getType(),
465  avroSchemaCol.getName(), avroSchemaCol.getType(), pos));
466  }
467  }
468  // The following two cases are typically not possible because Hive resolves
469  // inconsistencies between the column-definition list and the Avro schema if a
470  // column-definition list was given in the CREATE TABLE (having no column
471  // definitions at all results in HIVE-6308). Even so, we check these cases for
472  // extra safety. COMPUTE STATS could be made to succeed in special instances of
473  // the cases below but we chose to throw an AnalysisException to avoid confusion
474  // because this scenario "should" never arise as mentioned above.
475  if (colDefs.hasNext() && !avroSchemaCols.hasNext()) {
476  FieldSchema colDef = colDefs.next();
477  throw new AnalysisException(
478  String.format(AVRO_SCHEMA_MSG_PREFIX +
479  "\nMissing Avro-schema column corresponding to column " +
480  "definition '%s' of type '%s' at position '%s'.\n" +
482  table.getName(), colDef.getName(), colDef.getType(), pos));
483  }
484  if (!colDefs.hasNext() && avroSchemaCols.hasNext()) {
485  Column avroSchemaCol = avroSchemaCols.next();
486  throw new AnalysisException(
487  String.format(AVRO_SCHEMA_MSG_PREFIX +
488  "\nMissing column definition corresponding to Avro-schema " +
489  "column '%s' of type '%s' at position '%s'.\n" +
491  table.getName(), avroSchemaCol.getName(), avroSchemaCol.getType(), pos));
492  }
493  ++pos;
494  }
495  }
496 
497  public String getTblStatsQuery() { return tableStatsQueryStr_; }
498  public String getColStatsQuery() { return columnStatsQueryStr_; }
499 
500  @Override
501  public String toSql() {
502  if (!isIncremental_) {
503  return "COMPUTE STATS " + tableName_.toSql();
504  } else {
505  return "COMPUTE INCREMENTAL STATS " + tableName_.toSql() +
506  partitionSpec_ == null ? "" : partitionSpec_.toSql();
507  }
508  }
509 
510  public TComputeStatsParams toThrift() {
511  TComputeStatsParams params = new TComputeStatsParams();
512  params.setTable_name(new TTableName(table_.getDb().getName(), table_.getName()));
513  params.setTbl_stats_query(tableStatsQueryStr_);
514  if (columnStatsQueryStr_ != null) {
515  params.setCol_stats_query(columnStatsQueryStr_);
516  } else {
517  params.setCol_stats_queryIsSet(false);
518  }
519 
520  params.setIs_incremental(isIncremental_);
521  params.setExisting_part_stats(validPartStats_);
522  params.setExpect_all_partitions(expectAllPartitions_);
523  if (!expectAllPartitions_) params.setExpected_partitions(expectedPartitions_);
524  if (isIncremental_) {
525  params.setNum_partition_cols(((HdfsTable)table_).getNumClusteringCols());
526  }
527  return params;
528  }
529 }
final List< TPartitionStats > validPartStats_
List< HdfsPartition > getPartitions()
Definition: HdfsTable.java:429
List< PartitionKeyValue > getPartitionSpecKeyValues()
final List< List< String > > expectedPartitions_
List< Column > getNonClusteringColumns()
Definition: Table.java:385
void addPartitionCols(HdfsTable table, List< String > selectList, List< String > groupByCols)
List< String > getBaseColumnStatsQuerySelectList(Analyzer analyzer)
ComputeStatsStmt(TableName tableName, boolean isIncremental, PartitionSpec partSpec)