15 package com.cloudera.impala.catalog;
17 import java.util.ArrayList;
18 import java.util.HashMap;
19 import java.util.List;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.locks.ReentrantReadWriteLock;
29 import org.apache.hadoop.fs.RemoteIterator;
30 import org.apache.hadoop.hdfs.DistributedFileSystem;
31 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
32 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
33 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
34 import org.apache.log4j.Logger;
35 import org.apache.thrift.TException;
42 import com.cloudera.impala.common.Pair;
43 import com.cloudera.impala.thrift.TCatalog;
44 import com.cloudera.impala.thrift.TCatalogObject;
45 import com.cloudera.impala.thrift.TCatalogObjectType;
46 import com.cloudera.impala.thrift.TFunctionBinaryType;
47 import com.cloudera.impala.thrift.TGetAllCatalogObjectsResponse;
48 import com.cloudera.impala.thrift.TPartitionKeyValue;
49 import com.cloudera.impala.thrift.TPrivilege;
50 import com.cloudera.impala.thrift.TTable;
51 import com.cloudera.impala.thrift.TTableName;
52 import com.cloudera.impala.thrift.TUniqueId;
55 import com.google.common.base.Preconditions;
56 import com.google.common.collect.Lists;
57 import com.google.common.collect.Maps;
58 import com.google.common.collect.Sets;
82 private static final Logger
LOG = Logger.getLogger(CatalogServiceCatalog.class);
97 private final ReentrantReadWriteLock
catalogLock_ =
new ReentrantReadWriteLock(
true);
114 Executors.newScheduledThreadPool(1);
125 SentryConfig sentryConfig, TUniqueId catalogServiceId) {
130 cachePoolReader_.scheduleAtFixedRate(
new CachePoolReader(), 0, 1, TimeUnit.MINUTES);
131 if (sentryConfig != null) {
144 LOG.trace(
"Reloading cache pool names from HDFS");
147 Map<String, CachePoolInfo> currentCachePools = Maps.newHashMap();
149 DistributedFileSystem dfs = FileSystemUtil.getDistributedFileSystem();
150 RemoteIterator<CachePoolEntry> itr = dfs.listCachePools();
151 while (itr.hasNext()) {
152 CachePoolInfo cachePoolInfo = itr.next().getInfo();
153 currentCachePools.put(cachePoolInfo.getPoolName(), cachePoolInfo);
155 }
catch (Exception e) {
156 LOG.error(
"Error loading cache pools: ", e);
160 catalogLock_.writeLock().lock();
163 Set<String> droppedCachePoolNames = Sets.difference(
164 hdfsCachePools_.keySet(), currentCachePools.keySet());
165 Set<String> createdCachePoolNames = Sets.difference(
168 for (String createdCachePool: createdCachePoolNames) {
170 currentCachePools.get(createdCachePool));
171 cachePool.setCatalogVersion(
172 CatalogServiceCatalog.this.incrementAndGetCatalogVersion());
173 hdfsCachePools_.add(cachePool);
176 for (String cachePoolName: droppedCachePoolNames) {
177 hdfsCachePools_.remove(cachePoolName);
178 CatalogServiceCatalog.this.incrementAndGetCatalogVersion();
181 catalogLock_.writeLock().unlock();
191 tableLoadingMgr_.watchCacheDirs(dirIds, tblName);
199 for (TCatalogObject catalogObject: objectDescs) {
200 Preconditions.checkState(catalogObject.isSetTable());
201 TTable table = catalogObject.getTable();
202 tableLoadingMgr_.prioritizeLoad(
new TTableName(table.getDb_name().toLowerCase(),
203 table.getTbl_name().toLowerCase()));
213 TGetAllCatalogObjectsResponse resp =
new TGetAllCatalogObjectsResponse();
214 resp.setObjects(
new ArrayList<TCatalogObject>());
219 catalogLock_.readLock().lock();
224 LOG.error(
"Database: " + dbName +
" was expected to be in the catalog " +
225 "cache. Skipping database and all child objects for this update.");
228 TCatalogObject catalogDb =
new TCatalogObject(TCatalogObjectType.DATABASE,
230 catalogDb.setDb(db.toThrift());
231 resp.addToObjects(catalogDb);
234 TCatalogObject catalogTbl =
new TCatalogObject(TCatalogObjectType.TABLE,
237 Table tbl = db.getTable(tblName);
239 LOG.error(
"Table: " + tblName +
" was expected to be in the catalog " +
240 "cache. Skipping table for this update.");
248 catalogTbl.setTable(tbl.toThrift());
249 }
catch (Exception e) {
250 LOG.debug(String.format(
"Error calling toThrift() on table %s.%s: %s",
251 dbName, tblName, e.getMessage()), e);
254 catalogTbl.setCatalog_version(tbl.getCatalogVersion());
256 catalogTbl.setTable(
new TTable(dbName, tblName));
258 resp.addToObjects(catalogTbl);
262 TCatalogObject
function =
new TCatalogObject(TCatalogObjectType.FUNCTION,
263 fn.getCatalogVersion());
264 function.setFn(fn.toThrift());
265 resp.addToObjects(
function);
270 TCatalogObject catalogObj =
new TCatalogObject(TCatalogObjectType.DATA_SOURCE,
271 dataSource.getCatalogVersion());
272 catalogObj.setData_source(dataSource.toThrift());
273 resp.addToObjects(catalogObj);
276 TCatalogObject
pool =
new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
277 cachePool.getCatalogVersion());
278 pool.setCache_pool(cachePool.toThrift());
279 resp.addToObjects(
pool);
284 TCatalogObject thriftRole =
new TCatalogObject();
285 thriftRole.setRole(role.toThrift());
286 thriftRole.setCatalog_version(role.getCatalogVersion());
287 thriftRole.setType(role.getCatalogObjectType());
288 resp.addToObjects(thriftRole);
291 TCatalogObject privilege =
new TCatalogObject();
292 privilege.setPrivilege(p.toThrift());
293 privilege.setCatalog_version(p.getCatalogVersion());
294 privilege.setType(p.getCatalogObjectType());
295 resp.addToObjects(privilege);
302 TCatalogObject catalog =
new TCatalogObject();
303 catalog.setType(TCatalogObjectType.CATALOG);
309 resp.addToObjects(catalog);
315 catalogLock_.readLock().unlock();
330 HashMap<String, List<Function>> dbFns = db.getAllFunctions();
331 List<Function> fns =
new ArrayList<Function>(dbFns.size());
332 for (List<Function> fnOverloads: dbFns.values()) {
349 sentryProxy_.refresh();
350 }
catch (Exception e) {
351 throw new CatalogException(
"Error updating authorization policy: ", e);
355 catalogLock_.writeLock().lock();
363 List<Pair<String, HashMap<String, List<Function>>>> functions =
364 Lists.newArrayList();
366 if (db.numFunctions() == 0)
continue;
367 functions.add(Pair.create(db.getName(), db.getAllFunctions()));
372 ConcurrentHashMap<String, Db> newDbCache =
new ConcurrentHashMap<String, Db>();
373 List<TTableName> tblsToBackgroundLoad = Lists.newArrayList();
376 for (String dbName: msClient.getHiveClient().getAllDatabases()) {
377 Db db =
new Db(dbName,
this);
379 newDbCache.put(db.getName().toLowerCase(), db);
381 for (String tableName: msClient.getHiveClient().getAllTables(dbName)) {
382 Table incompleteTbl = IncompleteTable.createUninitializedTable(
385 db.addTable(incompleteTbl);
387 tblsToBackgroundLoad.add(
388 new TTableName(dbName.toLowerCase(), tableName.toLowerCase()));
397 for (Pair<String, HashMap<String, List<Function>>> dbFns: functions) {
400 db = newDbCache.get(dbFns.first);
401 }
catch (Exception e) {
410 for (List<Function> fns: dbFns.second.values()) {
412 if (fn.getBinaryType() == TFunctionBinaryType.BUILTIN)
continue;
418 dbCache_.set(newDbCache);
420 for (TTableName tblName: tblsToBackgroundLoad) {
421 tableLoadingMgr_.backgroundLoad(tblName);
423 }
catch (Exception e) {
425 throw new CatalogException(
"Error initializing Catalog. Catalog may be empty.", e);
427 catalogLock_.writeLock().unlock();
436 Db newDb =
new Db(dbName,
this);
449 Db removedDb = super.removeDb(dbName);
450 if (removedDb != null) {
462 if (db == null)
return null;
463 Table incompleteTable =
464 IncompleteTable.createUninitializedTable(
getNextTableId(), db, tblName);
466 db.addTable(incompleteTable);
467 return db.getTable(tblName);
481 TTableName tableName =
new TTableName(dbName.toLowerCase(), tblName.toLowerCase());
482 TableLoadingMgr.LoadRequest loadReq;
484 long previousCatalogVersion;
486 catalogLock_.readLock().lock();
489 if (tbl == null || tbl.
isLoaded())
return tbl;
490 previousCatalogVersion = tbl.getCatalogVersion();
491 loadReq = tableLoadingMgr_.loadAsync(tableName, null);
493 catalogLock_.readLock().unlock();
495 Preconditions.checkNotNull(loadReq);
511 catalogLock_.writeLock().lock();
513 Db db =
getDb(updatedTbl.getDb().getName());
516 "Database does not exist: " + updatedTbl.getDb().getName());
519 Table existingTbl = db.getTable(updatedTbl.getName());
522 if (existingTbl == null ||
526 db.addTable(updatedTbl);
529 catalogLock_.writeLock().unlock();
539 if (parentDb == null)
return null;
541 Table removedTable = parentDb.removeTable(tblName);
542 if (removedTable != null) {
555 Function removedFn = super.removeFunction(desc);
556 if (removedFn != null) {
569 if (db == null)
return false;
592 DataSource dataSource = dataSources_.remove(dataSourceName);
593 if (dataSource != null) {
603 public static long getLastDdlTime(org.apache.hadoop.hive.metastore.api.Table msTbl) {
604 Preconditions.checkNotNull(msTbl);
605 Map<String, String> params = msTbl.getParameters();
606 String lastDdlTimeStr = params.get(
"transient_lastDdlTime");
607 if (lastDdlTimeStr != null) {
609 return Long.parseLong(lastDdlTimeStr);
610 }
catch (NumberFormatException e) {}
621 Db db =
getDb(tblName.getDb_name());
622 if (db == null)
return;
623 Table tbl = db.getTable(tblName.getTable_name());
624 if (tbl == null)
return;
625 tbl.updateLastDdlTime(ddlTime);
637 catalogLock_.writeLock().lock();
640 Db db =
getDb(oldTableName.getDb_name());
641 if (db != null) db.removeTable(oldTableName.getTable_name());
642 return addTable(newTableName.getDb_name(), newTableName.getTable_name());
644 catalogLock_.writeLock().unlock();
658 LOG.debug(String.format(
"Refreshing table metadata: %s.%s",
659 tblName.getDb_name(), tblName.getTable_name()));
660 long previousCatalogVersion;
661 TableLoadingMgr.LoadRequest loadReq;
662 catalogLock_.readLock().lock();
664 Table tbl =
getTable(tblName.getDb_name(), tblName.getTable_name());
665 if (tbl == null)
return null;
666 previousCatalogVersion = tbl.getCatalogVersion();
667 loadReq = tableLoadingMgr_.loadAsync(tblName, tbl);
669 catalogLock_.readLock().unlock();
671 Preconditions.checkNotNull(loadReq);
688 Preconditions.checkNotNull(partitionSpec);
694 throw new CatalogException(
"Table " + tbl.getFullName() +
" is not an Hdfs table");
696 HdfsTable hdfsTable = (HdfsTable) tbl;
700 catalogLock_.writeLock().lock();
702 HdfsPartition hdfsPartition = hdfsTable.dropPartition(partitionSpec);
703 if (hdfsPartition == null)
return null;
706 catalogLock_.writeLock().unlock();
715 Preconditions.checkNotNull(partition);
716 HdfsTable hdfsTable = partition.getTable();
717 Db db =
getDb(hdfsTable.getDb().getName());
721 catalogLock_.writeLock().lock();
723 hdfsTable.addPartition(partition);
725 db.addTable(hdfsTable);
727 catalogLock_.writeLock().unlock();
753 public boolean invalidateTable(TTableName tableName, Pair<Db, Table> updatedObjects) {
754 Preconditions.checkNotNull(updatedObjects);
755 updatedObjects.first = null;
756 updatedObjects.second = null;
757 LOG.debug(String.format(
"Invalidating table metadata: %s.%s",
758 tableName.getDb_name(), tableName.getTable_name()));
759 String dbName = tableName.getDb_name();
760 String tblName = tableName.getTable_name();
766 Boolean tableExistsInMetaStore;
769 tableExistsInMetaStore = msClient.getHiveClient().tableExists(dbName, tblName);
770 }
catch (UnknownDBException e) {
773 tableExistsInMetaStore =
false;
774 }
catch (TException e) {
775 LOG.error(
"Error executing tableExists() metastore call: " + tblName, e);
776 tableExistsInMetaStore = null;
779 if (tableExistsInMetaStore != null && !tableExistsInMetaStore) {
780 updatedObjects.second =
removeTable(dbName, tblName);
784 if ((db == null || !db.
containsTable(tblName)) && tableExistsInMetaStore == null) {
788 }
else if (db == null && tableExistsInMetaStore) {
791 db =
new Db(dbName,
this);
794 updatedObjects.first = db;
800 Table newTable = IncompleteTable.createUninitializedTable(
803 db.addTable(newTable);
805 tableLoadingMgr_.backgroundLoad(
new TTableName(dbName.toLowerCase(),
806 tblName.toLowerCase()));
808 updatedObjects.second = newTable;
818 catalogLock_.writeLock().lock();
820 Role role =
new Role(roleName, grantGroups);
822 authPolicy_.addRole(role);
825 catalogLock_.writeLock().unlock();
835 catalogLock_.writeLock().lock();
837 Role role = authPolicy_.removeRole(roleName);
838 if (role == null)
return null;
842 catalogLock_.writeLock().unlock();
852 catalogLock_.writeLock().lock();
854 Role role = authPolicy_.addGrantGroup(roleName, groupName);
855 Preconditions.checkNotNull(role);
859 catalogLock_.writeLock().unlock();
869 catalogLock_.writeLock().lock();
871 Role role = authPolicy_.removeGrantGroup(roleName, groupName);
872 Preconditions.checkNotNull(role);
876 catalogLock_.writeLock().unlock();
887 catalogLock_.writeLock().lock();
889 Role role = authPolicy_.getRole(roleName);
890 if (role == null)
throw new CatalogException(
"Role does not exist: " + roleName);
893 authPolicy_.addPrivilege(priv);
896 catalogLock_.writeLock().unlock();
907 catalogLock_.writeLock().lock();
909 Role role = authPolicy_.getRole(roleName);
910 if (role == null)
throw new CatalogException(
"Role does not exist: " + roleName);
912 role.removePrivilege(thriftPriv.getPrivilege_name());
913 if (rolePrivilege == null)
return null;
915 return rolePrivilege;
917 catalogLock_.writeLock().unlock();
928 catalogLock_.readLock().lock();
930 Role role = authPolicy_.getRole(roleName);
931 if (role == null)
throw new CatalogException(
"Role does not exist: " + roleName);
932 return role.getPrivilege(privSpec.getPrivilege_name());
934 catalogLock_.readLock().unlock();
942 catalogLock_.writeLock().lock();
946 catalogLock_.writeLock().unlock();
954 catalogLock_.readLock().lock();
958 catalogLock_.readLock().unlock();
final ScheduledExecutorService cachePoolReader_
Function removeFunction(Function desc)
List< String > getAllTableNames()
final boolean loadInBackground_
AtomicReference< ConcurrentHashMap< String, Db > > dbCache_
final CatalogObjectCache< DataSource > dataSources_
DataSource removeDataSource(String dataSourceName)
final CatalogObjectCache< HdfsCachePool > hdfsCachePools_
Role addRoleGrantGroup(String roleName, String groupName)
RolePrivilege removeRolePrivilege(String roleName, TPrivilege thriftPriv)
List< DataSource > getDataSources()
Role addRole(String roleName, Set< String > grantGroups)
Table addTable(String dbName, String tblName)
final SentryProxy sentryProxy_
Table dropPartition(TableName tableName, List< TPartitionKeyValue > partitionSpec)
boolean containsTable(String tableName)
Role removeRole(String roleName)
Role removeRoleGrantGroup(String roleName, String groupName)
Table renameTable(TTableName oldTableName, TTableName newTableName)
final TableLoadingMgr tableLoadingMgr_
Table getTable(String dbName, String tableName)
final ReentrantReadWriteLock catalogLock_
static final long INITIAL_CATALOG_VERSION
Table removeTable(String dbName, String tblName)
Table reloadTable(TTableName tblName)
final TUniqueId catalogServiceId_
List< String > getDbNames(String dbPattern)
Db removeDb(String dbName)
synchronized List< Role > getAllRoles()
long incrementAndGetCatalogVersion()
boolean addFunction(Function fn)
static long getLastDdlTime(org.apache.hadoop.hive.metastore.api.Table msTbl)
void prioritizeLoad(List< TCatalogObject > objectDescs)
AuthorizationPolicy authPolicy_
List< Function > getFunctions(TFunctionCategory category, PatternMatcher fnPattern)
boolean invalidateTable(TTableName tableName, Pair< Db, Table > updatedObjects)
boolean addFunction(Function fn)
AuthorizationPolicy getAuthPolicy()
void updateLastDdlTime(TTableName tblName, long ddlTime)
Table replaceTableIfUnchanged(Table updatedTbl, long expectedCatalogVersion)
List< Function > getFunctions(String dbName)
RolePrivilege addRolePrivilege(String roleName, TPrivilege thriftPriv)
FunctionName getFunctionName()
MetaStoreClient getMetaStoreClient()
boolean addDataSource(DataSource dataSource)
CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads, SentryConfig sentryConfig, TUniqueId catalogServiceId)
final AtomicInteger nextTableId_
Table getOrLoadTable(String dbName, String tblName)
Table addPartition(HdfsPartition partition)
void watchCacheDirs(List< Long > dirIds, TTableName tblName)
SentryProxy getSentryProxy()
TGetAllCatalogObjectsResponse getCatalogObjects(long fromVersion)
RolePrivilege getRolePrivilege(String roleName, TPrivilege privSpec)