15 package com.cloudera.impala.util;
17 import java.io.IOException;
20 import org.apache.hadoop.fs.Path;
21 import org.apache.hadoop.fs.RemoteIterator;
22 import org.apache.hadoop.hdfs.DFSConfigKeys;
23 import org.apache.hadoop.hdfs.DistributedFileSystem;
24 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
25 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
26 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
27 import org.apache.hadoop.ipc.RemoteException;
28 import org.apache.log4j.Logger;
35 import com.cloudera.impala.thrift.JniCatalogConstants;
36 import com.cloudera.impala.thrift.THdfsCachingOp;
37 import com.google.common.base.Preconditions;
43 private static final Logger
LOG = Logger.getLogger(HdfsCachingUtil.class);
55 private final static DistributedFileSystem
dfs;
58 dfs = FileSystemUtil.getDistributedFileSystem();
59 }
catch (IOException e) {
60 throw new RuntimeException(
"HdfsCachingUtil failed to initialize the " +
61 "DistributedFileSystem: ", e);
75 org.apache.hadoop.hive.metastore.api.Table table,
77 long id = HdfsCachingUtil.submitDirective(
new Path(table.getSd().getLocation()),
78 poolName, replication);
95 long id = HdfsCachingUtil.submitDirective(
new Path(part.getLocation()),
96 poolName, replication);
107 org.apache.hadoop.hive.metastore.api.Partition part,
109 long id = HdfsCachingUtil.submitDirective(
new Path(part.getSd().getLocation()),
110 poolName, replication);
120 public static void uncacheTbl(org.apache.hadoop.hive.metastore.api.Table table)
122 Preconditions.checkNotNull(table);
123 LOG.debug(
"Uncaching table: " + table.getDbName() +
"." + table.getTableName());
125 if (
id == null)
return;
126 HdfsCachingUtil.removeDirective(id);
137 Preconditions.checkNotNull(part);
139 if (
id == null)
return;
140 HdfsCachingUtil.removeDirective(id);
150 org.apache.hadoop.hive.metastore.api.Partition part) throws
ImpalaException {
151 Preconditions.checkNotNull(part);
153 if (
id == null)
return;
154 HdfsCachingUtil.removeDirective(id);
165 if (params == null)
return null;
167 if (idStr == null)
return null;
169 return Long.parseLong(idStr);
170 }
catch (NumberFormatException e) {
182 return entry == null ? null : entry.getInfo().getPool();
192 return entry != null ? entry.getInfo().getReplication() : null;
200 Preconditions.checkNotNull(params);
202 if (replication == null) {
203 return JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
206 return Short.parseShort(replication);
207 }
catch (NumberFormatException e) {
208 return JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
222 long bytesNeeded = 0L;
223 long currentBytesCached = 0L;
224 CacheDirectiveEntry cacheDir =
getDirective(directiveId);
225 if (cacheDir == null)
return;
227 bytesNeeded = cacheDir.getStats().getBytesNeeded();
228 currentBytesCached = cacheDir.getStats().getBytesCached();
229 LOG.debug(String.format(
"Waiting on cache directive id: %d. Bytes " +
230 "cached (%d) / needed (%d)", directiveId, currentBytesCached, bytesNeeded));
232 if (bytesNeeded == currentBytesCached)
return;
236 long hdfsRefreshIntervalMs = dfs.getConf().getLong(
237 DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS,
238 DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT);
239 Preconditions.checkState(hdfsRefreshIntervalMs > 0);
243 int unchangedCounter = 0;
245 long previousBytesCached = currentBytesCached;
247 if (cacheDir == null)
return;
248 currentBytesCached = cacheDir.getStats().getBytesCached();
249 bytesNeeded = cacheDir.getStats().getBytesNeeded();
250 if (currentBytesCached == bytesNeeded) {
251 LOG.debug(String.format(
"Cache directive id: %d has completed." +
252 "Bytes cached (%d) / needed (%d)", directiveId, currentBytesCached,
257 if (currentBytesCached == previousBytesCached) {
260 unchangedCounter = 0;
265 Thread.sleep((long) (hdfsRefreshIntervalMs * 1.25));
266 }
catch (InterruptedException e) { }
268 LOG.warn(String.format(
"No changes in cached bytes in: %d(ms). All data may not " +
269 "be cached. Final stats for cache directive id: %d. Bytes cached (%d)/needed " +
271 directiveId, currentBytesCached, bytesNeeded));
281 Preconditions.checkNotNull(
path);
282 Preconditions.checkState(poolName != null && !poolName.isEmpty());
283 CacheDirectiveInfo info =
new CacheDirectiveInfo.Builder()
284 .setExpiration(Expiration.NEVER)
286 .setReplication(replication)
287 .setPath(
path).build();
288 LOG.debug(
"Submitting cache directive: " + info.toString());
290 return dfs.addCacheDirective(info);
291 }
catch (IOException e) {
301 org.apache.hadoop.hive.metastore.api.Table table,
303 Preconditions.checkNotNull(id);
304 HdfsCachingUtil.modifyCacheDirective(id,
new Path(table.getSd().getLocation()),
305 poolName, replication);
317 Preconditions.checkNotNull(id);
318 HdfsCachingUtil.modifyCacheDirective(id,
new Path(part.getLocation()),
319 poolName, replication);
331 Preconditions.checkNotNull(
path);
332 Preconditions.checkNotNull(id);
333 Preconditions.checkState(poolName != null && !poolName.isEmpty());
334 CacheDirectiveInfo info =
new CacheDirectiveInfo.Builder()
336 .setExpiration(Expiration.NEVER)
338 .setReplication(replication)
339 .setPath(
path).build();
340 LOG.debug(
"Modifying cache directive: " + info.toString());
342 dfs.modifyCacheDirective(info);
343 }
catch (IOException e) {
355 LOG.debug(
"Removing cache directive id: " + directiveId);
357 dfs.removeCacheDirective(directiveId);
358 }
catch (IOException e) {
361 if (e.getMessage().contains(
"No directive with ID"))
return;
372 LOG.trace(
"Getting cache directive id: " + directiveId);
373 CacheDirectiveInfo filter =
new CacheDirectiveInfo.Builder()
377 RemoteIterator<CacheDirectiveEntry> itr = dfs.listCacheDirectives(filter);
378 if (itr.hasNext())
return itr.next();
379 }
catch (IOException e) {
384 "HDFS cache directive filter returned empty result. This must not happen");
391 public static boolean isSamePool(String poolName, Long directiveId)
401 return op.isSetReplication() ? op.getReplication() :
402 JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
409 public static boolean isUpdateOp(THdfsCachingOp op, Map<String, String> params)
414 Preconditions.checkNotNull(entry);
417 if (!op.getCache_pool_name().equals(entry.getInfo().getPool())) {
422 if ((op.isSetReplication() && op.getReplication() !=
423 entry.getInfo().getReplication()) || ( !op.isSetReplication() &&
424 entry.getInfo().getReplication() !=
425 JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR)) {
439 Preconditions.checkNotNull(entry);
441 if (!op.getCache_pool_name().equals(entry.getInfo().getPool())) {
443 "pool '%s' because it is already cached in '%s'. To change the cache " +
444 "pool for this partition, first uncache using: ALTER TABLE %s.%s " +
445 "%sSET UNCACHED", op.getCache_pool_name(),
446 entry.getInfo().getPool(), table.getDb(), table,
448 partition != null ? String.format(
" PARTITION(%s) ",
449 partition.getPartitionName().replaceAll(
"/",
", ")) :
""));
472 if (directiveId == null)
return false;
474 CacheDirectiveEntry entry = null;
478 if (e.getCause() != null && e.getCause() instanceof RemoteException) {
480 LOG.error(
"Cache directive does not exist", e);
485 LOG.error(
"IO Exception, possible connectivity issues with HDFS", e);
489 Preconditions.checkNotNull(entry);
494 if (replicationFactor != null &&
495 Short.parseShort(replicationFactor) != entry.getInfo().getReplication()) {
496 LOG.info(
"Replication factor for entry in HDFS differs from value in Hive MS: " +
497 entry.getInfo().getPath().toString() +
" " +
498 entry.getInfo().getReplication().toString() +
" != " +
502 String.valueOf(entry.getInfo().getReplication()));
static boolean isSamePool(String poolName, Long directiveId)
static long modifyCacheDirective(Long id, org.apache.hadoop.hive.metastore.api.Table table, String poolName, short replication)
static void validateCachePool(THdfsCachingOp op, Long directiveId, TableName table, HdfsPartition partition)
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
static void uncacheTbl(org.apache.hadoop.hive.metastore.api.Table table)
static void modifyCacheDirective(Long id, Path path, String poolName, short replication)
static long submitCachePartitionDirective(org.apache.hadoop.hive.metastore.api.Partition part, String poolName, short replication)
static Short getCachedCacheReplication(Map< String, String > params)
static void uncachePartition(org.apache.hadoop.hive.metastore.api.Partition part)
static final String CACHE_DIR_ID_PROP_NAME
static void removeDirective(long directiveId)
static long submitCacheTblDirective(org.apache.hadoop.hive.metastore.api.Table table, String poolName, short replication)
static boolean isUpdateOp(THdfsCachingOp op, Map< String, String > params)
static final int MAX_UNCHANGED_CACHING_REFRESH_INTERVALS
static short getReplicationOrDefault(THdfsCachingOp op)
static void waitForDirective(long directiveId)
static void uncachePartition(HdfsPartition part)
static void validateCachePool(THdfsCachingOp op, Long directiveId, TableName table)
static long submitDirective(Path path, String poolName, short replication)
static CacheDirectiveEntry getDirective(long directiveId)
static final String CACHE_DIR_REPLICATION_PROP_NAME
static Long getCacheDirectiveId(Map< String, String > params)
static String getCachePool(long directiveId)
static long submitCachePartitionDirective(HdfsPartition part, String poolName, short replication)
static boolean validateCacheParams(Map< String, String > params)
static final DistributedFileSystem dfs
static long modifyCacheDirective(Long id, HdfsPartition part, String poolName, short replication)
static Short getCacheReplication(long directiveId)