Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PartitionStatsUtil.java
Go to the documentation of this file.
1 // Copyright 2014 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.catalog;
16 
17 import com.cloudera.impala.thrift.TPartitionStats;
21 
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Map;
25 import org.apache.commons.codec.binary.Base64;
26 import org.apache.thrift.protocol.TCompactProtocol;
27 import org.apache.thrift.TSerializer;
28 import org.apache.thrift.TException;
29 import com.google.common.base.Preconditions;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 
33 import com.google.common.collect.Lists;
34 
39 public class PartitionStatsUtil {
40  public static final String INTERMEDIATE_STATS_NUM_CHUNKS =
41  "impala_intermediate_stats_num_chunks";
42 
43  public static final String INTERMEDIATE_STATS_CHUNK_PREFIX =
44  "impala_intermediate_stats_chunk";
45 
46  // HMS-imposed maximum length of a string parameter for a partition.
47  private static final int HMS_MAX_CHUNKLEN = 4000;
48 
49  private final static Logger LOG = LoggerFactory.getLogger(PartitionStatsUtil.class);
50 
56  public static TPartitionStats partStatsFromParameters(
57  Map<String, String> hmsParameters) throws ImpalaException {
58  if (hmsParameters == null) return null;
59  String numChunksStr = hmsParameters.get(INTERMEDIATE_STATS_NUM_CHUNKS);
60  if (numChunksStr == null) return null;
61  int numChunks = Integer.parseInt(numChunksStr);
62  if (numChunks == 0) return null;
63 
64  Preconditions.checkState(numChunks >= 0);
65  StringBuilder encodedStats = new StringBuilder();
66  for (int i = 0; i < numChunks; ++i) {
67  String chunk = hmsParameters.get(INTERMEDIATE_STATS_CHUNK_PREFIX + i);
68  if (chunk == null) {
69  throw new ImpalaRuntimeException("Missing stats chunk: " + i);
70  }
71  encodedStats.append(chunk);
72  }
73 
74  byte[] decodedStats = Base64.decodeBase64(encodedStats.toString());
75  TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory();
76  TPartitionStats ret = new TPartitionStats();
77  JniUtil.deserializeThrift(protocolFactory, ret, decodedStats);
78  return ret;
79  }
80 
84  public static void partStatsToParameters(TPartitionStats partStats,
85  HdfsPartition partition) {
86  // null stats means logically delete the stats from this partition
87  if (partStats == null) {
88  deletePartStats(partition);
89  return;
90  }
91 
92  // The HMS has a 4k (as of CDH5.2) limit on the length of any parameter string. The
93  // serialised version of the partition stats is often larger than this. Therefore, we
94  // naively 'chunk' the byte string into 4k pieces, and store the number of pieces in a
95  // separate parameter field.
96  //
97  // The object itself is first serialised by Thrift, and then base-64 encoded to be a
98  // valid string. This inflates its length somewhat; we may want to consider a
99  // different scheme or at least understand why this scheme doesn't seem much more
100  // effective than an ASCII representation.
101  try {
102  TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory();
103  TSerializer serializer = new TSerializer(protocolFactory);
104  byte[] serialized = serializer.serialize(partStats);
105  String base64 = new String(Base64.encodeBase64(serialized));
106  List<String> chunks = chunkStringForHms(base64, HMS_MAX_CHUNKLEN);
107  partition.putToParameters(INTERMEDIATE_STATS_NUM_CHUNKS,
108  Integer.toString(chunks.size()));
109  for (int i = 0; i < chunks.size(); ++i) {
110  partition.putToParameters(INTERMEDIATE_STATS_CHUNK_PREFIX + i, chunks.get(i));
111  }
112  } catch (TException e) {
113  LOG.info("Error saving partition stats: ", e);
114  // TODO: What to throw here?
115  }
116  }
117 
118  public static void deletePartStats(HdfsPartition partition) {
119  partition.putToParameters(INTERMEDIATE_STATS_NUM_CHUNKS, "0");
120  for (Iterator<String> it = partition.getParameters().keySet().iterator();
121  it.hasNext(); ) {
122  if (it.next().startsWith(INTERMEDIATE_STATS_CHUNK_PREFIX)) {
123  it.remove();
124  }
125  }
126  }
127 
128  static private List<String> chunkStringForHms(String data, int chunkLen) {
129  int idx = 0;
130  List<String> ret = Lists.newArrayList();
131  while (idx < data.length()) {
132  int remaining = data.length() - idx;
133  int chunkSize = (chunkLen > remaining) ? remaining : chunkLen;
134  ret.add(data.substring(idx, idx + chunkSize));
135  idx += chunkSize;
136  }
137  return ret;
138  }
139 }
static List< String > chunkStringForHms(String data, int chunkLen)
static TPartitionStats partStatsFromParameters(Map< String, String > hmsParameters)
static void partStatsToParameters(TPartitionStats partStats, HdfsPartition partition)
static void deletePartStats(HdfsPartition partition)