17 #include <boost/foreach.hpp>
18 #include <boost/unordered_set.hpp>
19 #include <gutil/strings/substitute.h>
26 #include "gen-cpp/CatalogService_types.h"
27 #include "gen-cpp/CatalogObjects_types.h"
32 using namespace apache::hive::service::cli::thrift;
33 using namespace impala;
34 using namespace impala_udf;
35 using namespace strings;
58 string EncodeNdv(
const string& ndv,
bool* is_encoded) {
69 if (ndv[i] != last || count == numeric_limits<uint8_t>::max()) {
70 if (idx + 2 > AggregateFunctions::HLL_LEN)
break;
72 encoded_ndv[idx++] =
count;
74 encoded_ndv[idx++] = last;
82 if (idx + 2 > AggregateFunctions::HLL_LEN) {
87 encoded_ndv[idx++] =
count;
88 encoded_ndv[idx++] = last;
91 encoded_ndv.resize(idx);
92 DCHECK_GT(encoded_ndv.size(), 0);
97 string DecodeNdv(
const string& ndv,
bool is_encoded) {
98 if (!is_encoded)
return ndv;
99 DCHECK_EQ(ndv.size() % 2, 0);
102 for (
int i = 0; i < ndv.size(); i += 2) {
103 for (
int j = 0; j < (static_cast<uint8_t>(ndv[i])) + 1; ++j) {
104 decoded_ndv[idx++] = ndv[i+1];
140 max_width(0), num_rows(0), avg_width(0) { }
143 void Update(
const string& ndv, int64_t num_new_rows,
double new_avg_width,
144 int32_t max_new_width, int64_t num_new_nulls) {
145 DCHECK_EQ(intermediate_ndv.size(), ndv.size()) <<
"Incompatible intermediate NDVs";
146 DCHECK_GE(num_new_rows, 0);
147 DCHECK_GE(max_new_width, 0);
148 DCHECK_GE(new_avg_width, 0);
149 DCHECK_GE(num_new_nulls, -1);
150 for (
int j = 0; j < ndv.size(); ++j) {
151 intermediate_ndv[j] = ::max(intermediate_ndv[j], ndv[j]);
153 if (num_new_nulls >= 0) num_nulls += num_new_nulls;
154 max_width = ::max(max_width, max_new_width);
155 avg_width += (new_avg_width * num_new_rows);
156 num_rows += num_new_rows;
163 ndv_estimate = AggregateFunctions::HllFinalEstimate(
164 reinterpret_cast<const uint8_t*>(intermediate_ndv.data()),
165 intermediate_ndv.size());
166 avg_width = num_rows == 0 ? 0 : avg_width / num_rows;
170 TColumnStats col_stats;
171 col_stats.__set_num_distinct_values(ndv_estimate);
172 col_stats.__set_num_nulls(num_nulls);
173 col_stats.__set_max_size(max_width);
174 col_stats.__set_avg_size(avg_width);
181 "ndv: $0, num_nulls: $1, max_width: $2, avg_width: $3, num_rows: $4",
182 ndv_estimate, num_nulls, max_width, avg_width, num_rows);
189 const vector<TPartitionStats>& existing_part_stats,
190 const vector<vector<string> >& expected_partitions,
const TRowSet& rowset,
191 int32_t num_partition_cols, TAlterTableUpdateStatsParams* params) {
195 static const int COLUMNS_PER_STAT = 5;
198 (col_stats_schema.columns.size() - num_partition_cols) / COLUMNS_PER_STAT;
199 unordered_set<vector<string> > seen_partitions;
200 vector<PerColumnStats> stats(num_cols);
202 if (rowset.rows.size() > 0) {
203 DCHECK_GE(rowset.rows[0].colVals.size(), COLUMNS_PER_STAT);
204 params->__isset.partition_stats =
true;
205 BOOST_FOREACH(
const TRow& col_stats_row, rowset.rows) {
208 vector<string> partition_key_vals;
209 partition_key_vals.reserve(col_stats_row.colVals.size());
210 for (
int j = num_cols * COLUMNS_PER_STAT; j < col_stats_row.colVals.size(); ++j) {
213 partition_key_vals.push_back(ss.str());
215 seen_partitions.insert(partition_key_vals);
217 TPartitionStats* part_stat = ¶ms->partition_stats[partition_key_vals];
218 part_stat->__isset.intermediate_col_stats =
true;
219 for (
int i = 0; i < num_cols * COLUMNS_PER_STAT; i += COLUMNS_PER_STAT) {
221 const string& ndv = col_stats_row.colVals[i].stringVal.value;
222 int64_t num_rows = col_stats_row.colVals[i + 4].i64Val.value;
223 double avg_width = col_stats_row.colVals[i + 3].doubleVal.value;
224 int32_t max_width = col_stats_row.colVals[i + 2].i32Val.value;
225 int64_t num_nulls = col_stats_row.colVals[i + 1].i64Val.value;
227 stat->
Update(ndv, num_rows, avg_width, max_width, num_nulls);
230 TIntermediateColumnStats int_stats;
232 int_stats.__set_intermediate_ndv(
EncodeNdv(ndv, &is_encoded));
233 int_stats.__set_is_ndv_encoded(is_encoded);
234 int_stats.__set_num_nulls(num_nulls);
235 int_stats.__set_max_width(max_width);
236 int_stats.__set_avg_width(avg_width);
237 int_stats.__set_num_rows(num_rows);
239 part_stat->intermediate_col_stats[col_stats_schema.columns[i].columnName] =
248 TIntermediateColumnStats empty_column_stats;
250 empty_column_stats.__set_intermediate_ndv(
252 empty_column_stats.__set_is_ndv_encoded(is_encoded);
253 empty_column_stats.__set_num_nulls(0);
254 empty_column_stats.__set_max_width(0);
255 empty_column_stats.__set_avg_width(0);
256 empty_column_stats.__set_num_rows(0);
257 TPartitionStats empty_part_stats;
258 for (
int i = 0; i < num_cols * COLUMNS_PER_STAT; i += COLUMNS_PER_STAT) {
259 empty_part_stats.intermediate_col_stats[col_stats_schema.columns[i].columnName] =
262 empty_part_stats.__isset.intermediate_col_stats =
true;
263 TTableStats empty_table_stats;
264 empty_table_stats.__set_num_rows(0);
265 empty_part_stats.stats = empty_table_stats;
266 BOOST_FOREACH(
const vector<string>& part_key_vals, expected_partitions) {
267 DCHECK_EQ(part_key_vals.size(), num_partition_cols);
268 if (seen_partitions.find(part_key_vals) != seen_partitions.end())
continue;
269 params->partition_stats[part_key_vals] = empty_part_stats;
275 BOOST_FOREACH(
const TPartitionStats& existing_stats, existing_part_stats) {
276 DCHECK_LE(existing_stats.intermediate_col_stats.size(),
277 col_stats_schema.columns.size());
278 for (
int i = 0; i < num_cols; ++i) {
279 const string& col_name = col_stats_schema.columns[i * COLUMNS_PER_STAT].columnName;
280 map<string, TIntermediateColumnStats>::const_iterator it =
281 existing_stats.intermediate_col_stats.find(col_name);
282 if (it == existing_stats.intermediate_col_stats.end()) {
283 VLOG(2) <<
"Could not find column in existing column stat state: " << col_name;
287 const TIntermediateColumnStats& int_stats = it->second;
288 stats[i].Update(
DecodeNdv(int_stats.intermediate_ndv, int_stats.is_ndv_encoded),
289 int_stats.num_rows, int_stats.avg_width, int_stats.max_width,
290 int_stats.num_nulls);
296 for (
int i = 0; i < stats.size(); ++i) {
298 const string& col_name = col_stats_schema.columns[i * COLUMNS_PER_STAT].columnName;
299 params->column_stats[col_name] = stats[i].ToTColumnStats();
301 VLOG(3) <<
"Incremental stats result for column: " << col_name <<
": "
302 << stats[i].DebugString();
305 params->__isset.column_stats =
true;
string EncodeNdv(const string &ndv, bool *is_encoded)
void PrintTColumnValue(const apache::hive::service::cli::thrift::TColumnValue &colval, std::stringstream *out)
string DecodeNdv(const string &ndv, bool is_encoded)
void Update(const string &ndv, int64_t num_new_rows, double new_avg_width, int32_t max_new_width, int64_t num_new_nulls)
void Free(uint8_t *buffer)
Frees a buffer returned from Allocate() or Reallocate()
StringVal IncrementNdvFinalize(FunctionContext *ctx, const StringVal &src)
string DebugString() const
TColumnStats ToTColumnStats() const
void FinalizePartitionedColumnStats(const TTableSchema &col_stats_schema, const vector< TPartitionStats > &existing_part_stats, const vector< vector< string > > &expected_partitions, const TRowSet &rowset, int32_t num_partition_cols, TAlterTableUpdateStatsParams *params)