Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
HdfsCachingUtil.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.util;
16 
17 import java.io.IOException;
18 import java.util.Map;
19 
20 import org.apache.hadoop.fs.Path;
21 import org.apache.hadoop.fs.RemoteIterator;
22 import org.apache.hadoop.hdfs.DFSConfigKeys;
23 import org.apache.hadoop.hdfs.DistributedFileSystem;
24 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
25 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
26 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
27 import org.apache.hadoop.ipc.RemoteException;
28 import org.apache.log4j.Logger;
29 
35 import com.cloudera.impala.thrift.JniCatalogConstants;
36 import com.cloudera.impala.thrift.THdfsCachingOp;
37 import com.google.common.base.Preconditions;
38 
42 public class HdfsCachingUtil {
43  private static final Logger LOG = Logger.getLogger(HdfsCachingUtil.class);
44 
45  // The key name used to save cache directive IDs in table/partition properties.
46  private final static String CACHE_DIR_ID_PROP_NAME = "cache_directive_id";
47 
48  // The key name used to store the replication factor for cached files
49  private final static String CACHE_DIR_REPLICATION_PROP_NAME = "cache_replication";
50 
51  // The number of caching refresh intervals that can go by when waiting for data to
52  // become cached before assuming no more progress is being made.
53  private final static int MAX_UNCHANGED_CACHING_REFRESH_INTERVALS = 5;
54 
55  private final static DistributedFileSystem dfs;
56  static {
57  try {
58  dfs = FileSystemUtil.getDistributedFileSystem();
59  } catch (IOException e) {
60  throw new RuntimeException("HdfsCachingUtil failed to initialize the " +
61  "DistributedFileSystem: ", e);
62  }
63  }
64 
74  public static long submitCacheTblDirective(
75  org.apache.hadoop.hive.metastore.api.Table table,
76  String poolName, short replication) throws ImpalaRuntimeException {
77  long id = HdfsCachingUtil.submitDirective(new Path(table.getSd().getLocation()),
78  poolName, replication);
79  table.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
80  table.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
81  return id;
82  }
83 
94  String poolName, short replication) throws ImpalaRuntimeException {
95  long id = HdfsCachingUtil.submitDirective(new Path(part.getLocation()),
96  poolName, replication);
97  part.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
98  part.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
99  return id;
100  }
101 
106  public static long submitCachePartitionDirective(
107  org.apache.hadoop.hive.metastore.api.Partition part,
108  String poolName, short replication) throws ImpalaRuntimeException {
109  long id = HdfsCachingUtil.submitDirective(new Path(part.getSd().getLocation()),
110  poolName, replication);
111  part.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
112  part.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
113  return id;
114  }
115 
120  public static void uncacheTbl(org.apache.hadoop.hive.metastore.api.Table table)
121  throws ImpalaRuntimeException {
122  Preconditions.checkNotNull(table);
123  LOG.debug("Uncaching table: " + table.getDbName() + "." + table.getTableName());
124  Long id = getCacheDirectiveId(table.getParameters());
125  if (id == null) return;
126  HdfsCachingUtil.removeDirective(id);
127  table.getParameters().remove(CACHE_DIR_ID_PROP_NAME);
128  table.getParameters().remove(CACHE_DIR_REPLICATION_PROP_NAME);
129  }
130 
136  public static void uncachePartition(HdfsPartition part) throws ImpalaException {
137  Preconditions.checkNotNull(part);
138  Long id = getCacheDirectiveId(part.getParameters());
139  if (id == null) return;
140  HdfsCachingUtil.removeDirective(id);
141  part.getParameters().remove(CACHE_DIR_ID_PROP_NAME);
142  part.getParameters().remove(CACHE_DIR_REPLICATION_PROP_NAME);
143  }
144 
149  public static void uncachePartition(
150  org.apache.hadoop.hive.metastore.api.Partition part) throws ImpalaException {
151  Preconditions.checkNotNull(part);
152  Long id = getCacheDirectiveId(part.getParameters());
153  if (id == null) return;
154  HdfsCachingUtil.removeDirective(id);
155  part.getParameters().remove(CACHE_DIR_ID_PROP_NAME);
156  part.getParameters().remove(CACHE_DIR_REPLICATION_PROP_NAME);
157  }
158 
164  public static Long getCacheDirectiveId(Map<String, String> params) {
165  if (params == null) return null;
166  String idStr = params.get(CACHE_DIR_ID_PROP_NAME);
167  if (idStr == null) return null;
168  try {
169  return Long.parseLong(idStr);
170  } catch (NumberFormatException e) {
171  return null;
172  }
173  }
174 
179  public static String getCachePool(long directiveId)
180  throws ImpalaRuntimeException {
181  CacheDirectiveEntry entry = getDirective(directiveId);
182  return entry == null ? null : entry.getInfo().getPool();
183  }
184 
189  public static Short getCacheReplication(long directiveId)
190  throws ImpalaRuntimeException {
191  CacheDirectiveEntry entry = getDirective(directiveId);
192  return entry != null ? entry.getInfo().getReplication() : null;
193  }
194 
199  public static Short getCachedCacheReplication(Map<String, String> params) {
200  Preconditions.checkNotNull(params);
201  String replication = params.get(CACHE_DIR_REPLICATION_PROP_NAME);
202  if (replication == null) {
203  return JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
204  }
205  try {
206  return Short.parseShort(replication);
207  } catch (NumberFormatException e) {
208  return JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
209  }
210  }
211 
220  public static void waitForDirective(long directiveId)
221  throws ImpalaRuntimeException {
222  long bytesNeeded = 0L;
223  long currentBytesCached = 0L;
224  CacheDirectiveEntry cacheDir = getDirective(directiveId);
225  if (cacheDir == null) return;
226 
227  bytesNeeded = cacheDir.getStats().getBytesNeeded();
228  currentBytesCached = cacheDir.getStats().getBytesCached();
229  LOG.debug(String.format("Waiting on cache directive id: %d. Bytes " +
230  "cached (%d) / needed (%d)", directiveId, currentBytesCached, bytesNeeded));
231  // All the bytes are cached, just return.
232  if (bytesNeeded == currentBytesCached) return;
233 
234  // The refresh interval is how often HDFS will update cache directive stats. We use
235  // this value to determine how frequently we should poll for changes.
236  long hdfsRefreshIntervalMs = dfs.getConf().getLong(
237  DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS,
238  DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT);
239  Preconditions.checkState(hdfsRefreshIntervalMs > 0);
240 
241  // Loop until either MAX_UNCHANGED_CACHING_REFRESH_INTERVALS have passed with no
242  // changes or all required data is cached.
243  int unchangedCounter = 0;
244  while (unchangedCounter < MAX_UNCHANGED_CACHING_REFRESH_INTERVALS) {
245  long previousBytesCached = currentBytesCached;
246  cacheDir = getDirective(directiveId);
247  if (cacheDir == null) return;
248  currentBytesCached = cacheDir.getStats().getBytesCached();
249  bytesNeeded = cacheDir.getStats().getBytesNeeded();
250  if (currentBytesCached == bytesNeeded) {
251  LOG.debug(String.format("Cache directive id: %d has completed." +
252  "Bytes cached (%d) / needed (%d)", directiveId, currentBytesCached,
253  bytesNeeded));
254  return;
255  }
256 
257  if (currentBytesCached == previousBytesCached) {
258  ++unchangedCounter;
259  } else {
260  unchangedCounter = 0;
261  }
262  try {
263  // Sleep for the refresh interval + a little bit more to ensure a full interval
264  // has completed. A value of 25% the refresh interval was arbitrarily chosen.
265  Thread.sleep((long) (hdfsRefreshIntervalMs * 1.25));
266  } catch (InterruptedException e) { /* ignore */ }
267  }
268  LOG.warn(String.format("No changes in cached bytes in: %d(ms). All data may not " +
269  "be cached. Final stats for cache directive id: %d. Bytes cached (%d)/needed " +
270  "(%d)", hdfsRefreshIntervalMs * MAX_UNCHANGED_CACHING_REFRESH_INTERVALS,
271  directiveId, currentBytesCached, bytesNeeded));
272  }
273 
279  private static long submitDirective(Path path, String poolName, short replication)
280  throws ImpalaRuntimeException {
281  Preconditions.checkNotNull(path);
282  Preconditions.checkState(poolName != null && !poolName.isEmpty());
283  CacheDirectiveInfo info = new CacheDirectiveInfo.Builder()
284  .setExpiration(Expiration.NEVER)
285  .setPool(poolName)
286  .setReplication(replication)
287  .setPath(path).build();
288  LOG.debug("Submitting cache directive: " + info.toString());
289  try {
290  return dfs.addCacheDirective(info);
291  } catch (IOException e) {
292  throw new ImpalaRuntimeException(e.getMessage(), e);
293  }
294  }
295 
300  public static long modifyCacheDirective(Long id,
301  org.apache.hadoop.hive.metastore.api.Table table,
302  String poolName, short replication) throws ImpalaRuntimeException {
303  Preconditions.checkNotNull(id);
304  HdfsCachingUtil.modifyCacheDirective(id, new Path(table.getSd().getLocation()),
305  poolName, replication);
306  table.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
307  table.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
308  return id;
309  }
310 
315  public static long modifyCacheDirective(Long id, HdfsPartition part, String poolName,
316  short replication) throws ImpalaRuntimeException {
317  Preconditions.checkNotNull(id);
318  HdfsCachingUtil.modifyCacheDirective(id, new Path(part.getLocation()),
319  poolName, replication);
320  part.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
321  part.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
322  return id;
323  }
324 
329  private static void modifyCacheDirective(Long id, Path path, String poolName,
330  short replication) throws ImpalaRuntimeException {
331  Preconditions.checkNotNull(path);
332  Preconditions.checkNotNull(id);
333  Preconditions.checkState(poolName != null && !poolName.isEmpty());
334  CacheDirectiveInfo info = new CacheDirectiveInfo.Builder()
335  .setId(id)
336  .setExpiration(Expiration.NEVER)
337  .setPool(poolName)
338  .setReplication(replication)
339  .setPath(path).build();
340  LOG.debug("Modifying cache directive: " + info.toString());
341  try {
342  dfs.modifyCacheDirective(info);
343  } catch (IOException e) {
344  throw new ImpalaRuntimeException(e.getMessage(), e);
345  }
346  }
347 
354  private static void removeDirective(long directiveId) throws ImpalaRuntimeException {
355  LOG.debug("Removing cache directive id: " + directiveId);
356  try {
357  dfs.removeCacheDirective(directiveId);
358  } catch (IOException e) {
359  // There is no special exception type for the case where a directive ID does not
360  // exist so we must inspect the error message.
361  if (e.getMessage().contains("No directive with ID")) return;
362  throw new ImpalaRuntimeException(e.getMessage(), e);
363  }
364  }
365 
370  private static CacheDirectiveEntry getDirective(long directiveId)
371  throws ImpalaRuntimeException {
372  LOG.trace("Getting cache directive id: " + directiveId);
373  CacheDirectiveInfo filter = new CacheDirectiveInfo.Builder()
374  .setId(directiveId)
375  .build();
376  try {
377  RemoteIterator<CacheDirectiveEntry> itr = dfs.listCacheDirectives(filter);
378  if (itr.hasNext()) return itr.next();
379  } catch (IOException e) {
380  // Handle connection issues with e.g. HDFS and possible not found errors
381  throw new ImpalaRuntimeException(e.getMessage(), e);
382  }
383  throw new ImpalaRuntimeException(
384  "HDFS cache directive filter returned empty result. This must not happen");
385  }
386 
391  public static boolean isSamePool(String poolName, Long directiveId)
392  throws ImpalaRuntimeException {
393  return poolName.equals(getCachePool(directiveId));
394  }
395 
400  public static short getReplicationOrDefault(THdfsCachingOp op) {
401  return op.isSetReplication() ? op.getReplication() :
402  JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
403  }
404 
409  public static boolean isUpdateOp(THdfsCachingOp op, Map<String, String> params)
410  throws ImpalaRuntimeException {
411 
412  Long directiveId = Long.parseLong(params.get(CACHE_DIR_ID_PROP_NAME));
413  CacheDirectiveEntry entry = getDirective(directiveId);
414  Preconditions.checkNotNull(entry);
415 
416  // Verify cache pool
417  if (!op.getCache_pool_name().equals(entry.getInfo().getPool())) {
418  return false;
419  }
420 
421  // Check cache replication factor
422  if ((op.isSetReplication() && op.getReplication() !=
423  entry.getInfo().getReplication()) || ( !op.isSetReplication() &&
424  entry.getInfo().getReplication() !=
425  JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR)) {
426  return true;
427  }
428  return false;
429  }
430 
434  public static void validateCachePool(THdfsCachingOp op, Long directiveId,
435  TableName table, HdfsPartition partition)
436  throws ImpalaRuntimeException {
437 
438  CacheDirectiveEntry entry = getDirective(directiveId);
439  Preconditions.checkNotNull(entry);
440 
441  if (!op.getCache_pool_name().equals(entry.getInfo().getPool())) {
442  throw new ImpalaRuntimeException(String.format("Cannot cache partition in " +
443  "pool '%s' because it is already cached in '%s'. To change the cache " +
444  "pool for this partition, first uncache using: ALTER TABLE %s.%s " +
445  "%sSET UNCACHED", op.getCache_pool_name(),
446  entry.getInfo().getPool(), table.getDb(), table,
447  // Insert partition string if partition non null
448  partition != null ? String.format(" PARTITION(%s) ",
449  partition.getPartitionName().replaceAll("/", ", ")) : ""));
450  }
451  }
452 
456  public static void validateCachePool(THdfsCachingOp op, Long directiveId,
457  TableName table) throws ImpalaRuntimeException {
458  validateCachePool(op, directiveId, table, null);
459  }
460 
470  public static boolean validateCacheParams(Map<String, String> params) {
471  Long directiveId = getCacheDirectiveId(params);
472  if (directiveId == null) return false;
473 
474  CacheDirectiveEntry entry = null;
475  try {
476  entry = getDirective(directiveId);
477  } catch (ImpalaRuntimeException e) {
478  if (e.getCause() != null && e.getCause() instanceof RemoteException) {
479  // This exception signals that the cache directive no longer exists.
480  LOG.error("Cache directive does not exist", e);
481  params.remove(CACHE_DIR_ID_PROP_NAME);
482  params.remove(CACHE_DIR_REPLICATION_PROP_NAME);
483  } else {
484  // This exception signals that there was a connection problem with HDFS.
485  LOG.error("IO Exception, possible connectivity issues with HDFS", e);
486  }
487  return false;
488  }
489  Preconditions.checkNotNull(entry);
490 
491  // On the upgrade path the property might not exist, if it exists
492  // and is different from the one from the meta store, issue a warning.
493  String replicationFactor = params.get(CACHE_DIR_REPLICATION_PROP_NAME);
494  if (replicationFactor != null &&
495  Short.parseShort(replicationFactor) != entry.getInfo().getReplication()) {
496  LOG.info("Replication factor for entry in HDFS differs from value in Hive MS: " +
497  entry.getInfo().getPath().toString() + " " +
498  entry.getInfo().getReplication().toString() + " != " +
499  params.get(CACHE_DIR_REPLICATION_PROP_NAME));
500  }
502  String.valueOf(entry.getInfo().getReplication()));
503  return true;
504  }
505 }
static boolean isSamePool(String poolName, Long directiveId)
static long modifyCacheDirective(Long id, org.apache.hadoop.hive.metastore.api.Table table, String poolName, short replication)
static void validateCachePool(THdfsCachingOp op, Long directiveId, TableName table, HdfsPartition partition)
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
static void uncacheTbl(org.apache.hadoop.hive.metastore.api.Table table)
static void modifyCacheDirective(Long id, Path path, String poolName, short replication)
static long submitCachePartitionDirective(org.apache.hadoop.hive.metastore.api.Partition part, String poolName, short replication)
static Short getCachedCacheReplication(Map< String, String > params)
static void uncachePartition(org.apache.hadoop.hive.metastore.api.Partition part)
static void removeDirective(long directiveId)
static long submitCacheTblDirective(org.apache.hadoop.hive.metastore.api.Table table, String poolName, short replication)
static boolean isUpdateOp(THdfsCachingOp op, Map< String, String > params)
static short getReplicationOrDefault(THdfsCachingOp op)
static void waitForDirective(long directiveId)
static void uncachePartition(HdfsPartition part)
static void validateCachePool(THdfsCachingOp op, Long directiveId, TableName table)
static long submitDirective(Path path, String poolName, short replication)
static CacheDirectiveEntry getDirective(long directiveId)
static Long getCacheDirectiveId(Map< String, String > params)
static String getCachePool(long directiveId)
static long submitCachePartitionDirective(HdfsPartition part, String poolName, short replication)
static boolean validateCacheParams(Map< String, String > params)
static final DistributedFileSystem dfs
static long modifyCacheDirective(Long id, HdfsPartition part, String poolName, short replication)
static Short getCacheReplication(long directiveId)