Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
incr-stats-util.cc
Go to the documentation of this file.
1 // Copyright 2014 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "incr-stats-util.h"
16 
17 #include <boost/foreach.hpp>
18 #include <boost/unordered_set.hpp>
19 #include <gutil/strings/substitute.h>
20 #include <cmath>
21 #include <sstream>
22 
23 #include "common/logging.h"
24 #include "service/hs2-util.h"
25 #include "udf/udf.h"
26 #include "gen-cpp/CatalogService_types.h"
27 #include "gen-cpp/CatalogObjects_types.h"
29 
30 #include "common/names.h"
31 
32 using namespace apache::hive::service::cli::thrift;
33 using namespace impala;
34 using namespace impala_udf;
35 using namespace strings;
36 
37 // Finalize method for the NDV_NO_FINALIZE() UDA, which only copies the intermediate state
38 // of the NDV computation into its output StringVal.
40  DCHECK(!src.is_null);
41  DCHECK_EQ(src.len, AggregateFunctions::HLL_LEN);
42  StringVal result_str(ctx, src.len);
43  memcpy(result_str.ptr, src.ptr, src.len);
44  ctx->Free(src.ptr);
45  return result_str;
46 }
47 
48 // To save space when sending NDV estimates around the cluster, we compress them using
49 // RLE, since they are often sparse. The resulting string has the form CVCVCVCV where C is
50 // the count, i.e. the number of times the subsequent V (value) should be repeated in the
51 // output string. C is between 0 and 255 inclusive, the count it represents is one more
52 // than the absolute value of C (since we never have a 0 count, and want to use the full
53 // range available to us).
54 //
55 // The output parameter is_encoded is set to true only if the RLE-compressed string is
56 // shorter than the input. Otherwise it is set to false, and the input is returned
57 // unencoded.
58 string EncodeNdv(const string& ndv, bool* is_encoded) {
59  DCHECK_EQ(ndv.size(), AggregateFunctions::HLL_LEN);
60  string encoded_ndv(AggregateFunctions::HLL_LEN, 0);
61  int idx = 0;
62  char last = ndv[0];
63 
64  // Keep a count of how many times a value appears in succession. We encode this count as
65  // a byte 0-255, but the actual count is always one more than the encoded value
66  // (i.e. in the range 1-256 inclusive).
67  uint8_t count = 0;
68  for (int i = 1; i < AggregateFunctions::HLL_LEN; ++i) {
69  if (ndv[i] != last || count == numeric_limits<uint8_t>::max()) {
70  if (idx + 2 > AggregateFunctions::HLL_LEN) break;
71  // Write a (count, value) pair to two successive bytes
72  encoded_ndv[idx++] = count;
73  count = 0;
74  encoded_ndv[idx++] = last;
75  last = ndv[i];
76  } else {
77  ++count;
78  }
79  }
80 
81  // +2 for the remaining two bytes written below
82  if (idx + 2 > AggregateFunctions::HLL_LEN) {
83  *is_encoded = false;
84  return ndv;
85  }
86 
87  encoded_ndv[idx++] = count;
88  encoded_ndv[idx++] = last;
89 
90  *is_encoded = true;
91  encoded_ndv.resize(idx);
92  DCHECK_GT(encoded_ndv.size(), 0);
93  DCHECK_LE(encoded_ndv.size(), AggregateFunctions::HLL_LEN);
94  return encoded_ndv;
95 }
96 
97 string DecodeNdv(const string& ndv, bool is_encoded) {
98  if (!is_encoded) return ndv;
99  DCHECK_EQ(ndv.size() % 2, 0);
100  string decoded_ndv(AggregateFunctions::HLL_LEN, 0);
101  int idx = 0;
102  for (int i = 0; i < ndv.size(); i += 2) {
103  for (int j = 0; j < (static_cast<uint8_t>(ndv[i])) + 1; ++j) {
104  decoded_ndv[idx++] = ndv[i+1];
105  }
106  }
107  DCHECK_EQ(idx, AggregateFunctions::HLL_LEN);
108  return decoded_ndv;
109 }
110 
111 // A container for statistics for a single column that are aggregated partition by
112 // partition during the incremental computation of column stats. The aggregations are
113 // updated during Update(), and the final statistics are computed by Finalize().
115  // Should have length AggregateFunctions::HLL_PRECISION. Intermediate buckets for the
116  // HLL calculation.
118 
119  // The total number of nulls counted, or -1 for no sample.
120  int64_t num_nulls;
121 
122  // The maximum width of the column, in bytes.
123  int32_t max_width;
124 
125  // The total number of rows
126  int64_t num_rows;
127 
128  // The sum of avg_width * num_rows for each partition, so that avg_width can be
129  // correctly computed during Finalize()
130  double total_width;
131 
132  // Populated after Finalize(), the result of the HLL computation
133  int64_t ndv_estimate;
134 
135  // The average column width, in bytes (but may have non-integer value)
136  double avg_width;
137 
139  : intermediate_ndv(AggregateFunctions::HLL_LEN, 0), num_nulls(-1),
140  max_width(0), num_rows(0), avg_width(0) { }
141 
142  // Updates all aggregate statistics with a new set of measurements.
143  void Update(const string& ndv, int64_t num_new_rows, double new_avg_width,
144  int32_t max_new_width, int64_t num_new_nulls) {
145  DCHECK_EQ(intermediate_ndv.size(), ndv.size()) << "Incompatible intermediate NDVs";
146  DCHECK_GE(num_new_rows, 0);
147  DCHECK_GE(max_new_width, 0);
148  DCHECK_GE(new_avg_width, 0);
149  DCHECK_GE(num_new_nulls, -1);
150  for (int j = 0; j < ndv.size(); ++j) {
151  intermediate_ndv[j] = ::max(intermediate_ndv[j], ndv[j]);
152  }
153  if (num_new_nulls >= 0) num_nulls += num_new_nulls;
154  max_width = ::max(max_width, max_new_width);
155  avg_width += (new_avg_width * num_new_rows);
156  num_rows += num_new_rows;
157  }
158 
159  // Performs any stats computations that are not distributive, that is they may not be
160  // computed in part during Update(). After this method returns, ndv_estimate and
161  // avg_width contain valid values.
162  void Finalize() {
163  ndv_estimate = AggregateFunctions::HllFinalEstimate(
164  reinterpret_cast<const uint8_t*>(intermediate_ndv.data()),
165  intermediate_ndv.size());
166  avg_width = num_rows == 0 ? 0 : avg_width / num_rows;
167  }
168 
169  TColumnStats ToTColumnStats() const {
170  TColumnStats col_stats;
171  col_stats.__set_num_distinct_values(ndv_estimate);
172  col_stats.__set_num_nulls(num_nulls);
173  col_stats.__set_max_size(max_width);
174  col_stats.__set_avg_size(avg_width);
175  return col_stats;
176  }
177 
178  // Returns a string with debug information for this
179  string DebugString() const {
180  return Substitute(
181  "ndv: $0, num_nulls: $1, max_width: $2, avg_width: $3, num_rows: $4",
182  ndv_estimate, num_nulls, max_width, avg_width, num_rows);
183  }
184 };
185 
186 namespace impala {
187 
188 void FinalizePartitionedColumnStats(const TTableSchema& col_stats_schema,
189  const vector<TPartitionStats>& existing_part_stats,
190  const vector<vector<string> >& expected_partitions, const TRowSet& rowset,
191  int32_t num_partition_cols, TAlterTableUpdateStatsParams* params) {
192  // The rowset should have the following schema: for every column in the source table,
193  // five columns are produced, one row per partition.
194  // <ndv buckets>, <num nulls>, <max width>, <avg width>, <count rows>
195  static const int COLUMNS_PER_STAT = 5;
196 
197  const int num_cols =
198  (col_stats_schema.columns.size() - num_partition_cols) / COLUMNS_PER_STAT;
199  unordered_set<vector<string> > seen_partitions;
200  vector<PerColumnStats> stats(num_cols);
201 
202  if (rowset.rows.size() > 0) {
203  DCHECK_GE(rowset.rows[0].colVals.size(), COLUMNS_PER_STAT);
204  params->__isset.partition_stats = true;
205  BOOST_FOREACH(const TRow& col_stats_row, rowset.rows) {
206  // The last few columns are partition columns that the results are grouped by, and
207  // so uniquely identify the partition that these stats belong to.
208  vector<string> partition_key_vals;
209  partition_key_vals.reserve(col_stats_row.colVals.size());
210  for (int j = num_cols * COLUMNS_PER_STAT; j < col_stats_row.colVals.size(); ++j) {
211  stringstream ss;
212  PrintTColumnValue(col_stats_row.colVals[j], &ss);
213  partition_key_vals.push_back(ss.str());
214  }
215  seen_partitions.insert(partition_key_vals);
216 
217  TPartitionStats* part_stat = &params->partition_stats[partition_key_vals];
218  part_stat->__isset.intermediate_col_stats = true;
219  for (int i = 0; i < num_cols * COLUMNS_PER_STAT; i += COLUMNS_PER_STAT) {
220  PerColumnStats* stat = &stats[i / COLUMNS_PER_STAT];
221  const string& ndv = col_stats_row.colVals[i].stringVal.value;
222  int64_t num_rows = col_stats_row.colVals[i + 4].i64Val.value;
223  double avg_width = col_stats_row.colVals[i + 3].doubleVal.value;
224  int32_t max_width = col_stats_row.colVals[i + 2].i32Val.value;
225  int64_t num_nulls = col_stats_row.colVals[i + 1].i64Val.value;
226 
227  stat->Update(ndv, num_rows, avg_width, max_width, num_nulls);
228 
229  // Save the intermediate state per-column, per-partition
230  TIntermediateColumnStats int_stats;
231  bool is_encoded;
232  int_stats.__set_intermediate_ndv(EncodeNdv(ndv, &is_encoded));
233  int_stats.__set_is_ndv_encoded(is_encoded);
234  int_stats.__set_num_nulls(num_nulls);
235  int_stats.__set_max_width(max_width);
236  int_stats.__set_avg_width(avg_width);
237  int_stats.__set_num_rows(num_rows);
238 
239  part_stat->intermediate_col_stats[col_stats_schema.columns[i].columnName] =
240  int_stats;
241  }
242  }
243  }
244 
245  // Make sure there's a zeroed entry for all partitions that were included in the query -
246  // empty partitions will not have a row in the GROUP BY, but should still emit a
247  // TPartitionStats.
248  TIntermediateColumnStats empty_column_stats;
249  bool is_encoded;
250  empty_column_stats.__set_intermediate_ndv(
251  EncodeNdv(string(AggregateFunctions::HLL_LEN, 0), &is_encoded));
252  empty_column_stats.__set_is_ndv_encoded(is_encoded);
253  empty_column_stats.__set_num_nulls(0);
254  empty_column_stats.__set_max_width(0);
255  empty_column_stats.__set_avg_width(0);
256  empty_column_stats.__set_num_rows(0);
257  TPartitionStats empty_part_stats;
258  for (int i = 0; i < num_cols * COLUMNS_PER_STAT; i += COLUMNS_PER_STAT) {
259  empty_part_stats.intermediate_col_stats[col_stats_schema.columns[i].columnName] =
260  empty_column_stats;
261  }
262  empty_part_stats.__isset.intermediate_col_stats = true;
263  TTableStats empty_table_stats;
264  empty_table_stats.__set_num_rows(0);
265  empty_part_stats.stats = empty_table_stats;
266  BOOST_FOREACH(const vector<string>& part_key_vals, expected_partitions) {
267  DCHECK_EQ(part_key_vals.size(), num_partition_cols);
268  if (seen_partitions.find(part_key_vals) != seen_partitions.end()) continue;
269  params->partition_stats[part_key_vals] = empty_part_stats;
270  }
271 
272  // Now aggregate the existing statistics. The FE will ensure that the set of
273  // partitions accessed by the query and this list are disjoint and cover the entire
274  // set of partitions.
275  BOOST_FOREACH(const TPartitionStats& existing_stats, existing_part_stats) {
276  DCHECK_LE(existing_stats.intermediate_col_stats.size(),
277  col_stats_schema.columns.size());
278  for (int i = 0; i < num_cols; ++i) {
279  const string& col_name = col_stats_schema.columns[i * COLUMNS_PER_STAT].columnName;
280  map<string, TIntermediateColumnStats>::const_iterator it =
281  existing_stats.intermediate_col_stats.find(col_name);
282  if (it == existing_stats.intermediate_col_stats.end()) {
283  VLOG(2) << "Could not find column in existing column stat state: " << col_name;
284  continue;
285  }
286 
287  const TIntermediateColumnStats& int_stats = it->second;
288  stats[i].Update(DecodeNdv(int_stats.intermediate_ndv, int_stats.is_ndv_encoded),
289  int_stats.num_rows, int_stats.avg_width, int_stats.max_width,
290  int_stats.num_nulls);
291  }
292  }
293 
294  // Compute the final results now that all aggregations are done, and save those as
295  // column stats for each column in turn.
296  for (int i = 0; i < stats.size(); ++i) {
297  stats[i].Finalize();
298  const string& col_name = col_stats_schema.columns[i * COLUMNS_PER_STAT].columnName;
299  params->column_stats[col_name] = stats[i].ToTColumnStats();
300 
301  VLOG(3) << "Incremental stats result for column: " << col_name << ": "
302  << stats[i].DebugString();
303  }
304 
305  params->__isset.column_stats = true;
306 }
307 
308 }
string EncodeNdv(const string &ndv, bool *is_encoded)
void PrintTColumnValue(const apache::hive::service::cli::thrift::TColumnValue &colval, std::stringstream *out)
string DecodeNdv(const string &ndv, bool is_encoded)
uint8_t * ptr
Definition: udf.h:523
bool is_null
Definition: udf.h:359
void Update(const string &ndv, int64_t num_new_rows, double new_avg_width, int32_t max_new_width, int64_t num_new_nulls)
void Free(uint8_t *buffer)
Frees a buffer returned from Allocate() or Reallocate()
Definition: udf.cc:291
StringVal IncrementNdvFinalize(FunctionContext *ctx, const StringVal &src)
static const int HLL_LEN
uint64_t count
string DebugString() const
TColumnStats ToTColumnStats() const
void FinalizePartitionedColumnStats(const TTableSchema &col_stats_schema, const vector< TPartitionStats > &existing_part_stats, const vector< vector< string > > &expected_partitions, const TRowSet &rowset, int32_t num_partition_cols, TAlterTableUpdateStatsParams *params)