Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
CatalogServiceCatalog.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.catalog;
16 
17 import java.util.ArrayList;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Set;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.locks.ReentrantReadWriteLock;
28 
29 import org.apache.hadoop.fs.RemoteIterator;
30 import org.apache.hadoop.hdfs.DistributedFileSystem;
31 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
32 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
33 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
34 import org.apache.log4j.Logger;
35 import org.apache.thrift.TException;
36 
42 import com.cloudera.impala.common.Pair;
43 import com.cloudera.impala.thrift.TCatalog;
44 import com.cloudera.impala.thrift.TCatalogObject;
45 import com.cloudera.impala.thrift.TCatalogObjectType;
46 import com.cloudera.impala.thrift.TFunctionBinaryType;
47 import com.cloudera.impala.thrift.TGetAllCatalogObjectsResponse;
48 import com.cloudera.impala.thrift.TPartitionKeyValue;
49 import com.cloudera.impala.thrift.TPrivilege;
50 import com.cloudera.impala.thrift.TTable;
51 import com.cloudera.impala.thrift.TTableName;
52 import com.cloudera.impala.thrift.TUniqueId;
55 import com.google.common.base.Preconditions;
56 import com.google.common.collect.Lists;
57 import com.google.common.collect.Maps;
58 import com.google.common.collect.Sets;
59 
81 public class CatalogServiceCatalog extends Catalog {
82  private static final Logger LOG = Logger.getLogger(CatalogServiceCatalog.class);
83 
84  private final TUniqueId catalogServiceId_;
85 
86  // Fair lock used to synchronize reads/writes of catalogVersion_. Because this lock
87  // protects catalogVersion_, it can be used to perform atomic bulk catalog operations
88  // since catalogVersion_ cannot change externally while the lock is being held.
89  // In addition to protecting catalogVersion_, it is currently used for the
90  // following bulk operations:
91  // * Building a delta update to send to the statestore in getAllCatalogObjects(),
92  // so a snapshot of the catalog can be taken without any version changes.
93  // * During a catalog invalidation (call to reset()), which re-reads all dbs and tables
94  // from the metastore.
95  // * During renameTable(), because a table must be removed and added to the catalog
96  // atomically (potentially in a different database).
97  private final ReentrantReadWriteLock catalogLock_ = new ReentrantReadWriteLock(true);
98 
99  // Last assigned catalog version. Starts at INITIAL_CATALOG_VERSION and is incremented
100  // with each update to the Catalog. Continued across the lifetime of the object.
101  // Protected by catalogLock_.
102  // TODO: Handle overflow of catalogVersion_ and nextTableId_.
104 
105  protected final AtomicInteger nextTableId_ = new AtomicInteger(0);
106 
107  // Manages the scheduling of background table loading.
109 
110  private final boolean loadInBackground_;
111 
112  // Periodically polls HDFS to get the latest set of known cache pools.
113  private final ScheduledExecutorService cachePoolReader_ =
114  Executors.newScheduledThreadPool(1);
115 
116  // Proxy to access the Sentry Service and also periodically refreshes the
117  // policy metadata. Null if Sentry Service is not enabled.
118  private final SentryProxy sentryProxy_;
119 
124  public CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads,
125  SentryConfig sentryConfig, TUniqueId catalogServiceId) {
126  super(true);
127  catalogServiceId_ = catalogServiceId;
128  tableLoadingMgr_ = new TableLoadingMgr(this, numLoadingThreads);
129  loadInBackground_ = loadInBackground;
130  cachePoolReader_.scheduleAtFixedRate(new CachePoolReader(), 0, 1, TimeUnit.MINUTES);
131  if (sentryConfig != null) {
132  sentryProxy_ = new SentryProxy(sentryConfig, this);
133  } else {
134  sentryProxy_ = null;
135  }
136  }
137 
142  private class CachePoolReader implements Runnable {
143  public void run() {
144  LOG.trace("Reloading cache pool names from HDFS");
145  // Map of cache pool name to CachePoolInfo. Stored in a map to allow Set operations
146  // to be performed on the keys.
147  Map<String, CachePoolInfo> currentCachePools = Maps.newHashMap();
148  try {
149  DistributedFileSystem dfs = FileSystemUtil.getDistributedFileSystem();
150  RemoteIterator<CachePoolEntry> itr = dfs.listCachePools();
151  while (itr.hasNext()) {
152  CachePoolInfo cachePoolInfo = itr.next().getInfo();
153  currentCachePools.put(cachePoolInfo.getPoolName(), cachePoolInfo);
154  }
155  } catch (Exception e) {
156  LOG.error("Error loading cache pools: ", e);
157  return;
158  }
159 
160  catalogLock_.writeLock().lock();
161  try {
162  // Determine what has changed relative to what we have cached.
163  Set<String> droppedCachePoolNames = Sets.difference(
164  hdfsCachePools_.keySet(), currentCachePools.keySet());
165  Set<String> createdCachePoolNames = Sets.difference(
166  currentCachePools.keySet(), hdfsCachePools_.keySet());
167  // Add all new cache pools.
168  for (String createdCachePool: createdCachePoolNames) {
169  HdfsCachePool cachePool = new HdfsCachePool(
170  currentCachePools.get(createdCachePool));
171  cachePool.setCatalogVersion(
172  CatalogServiceCatalog.this.incrementAndGetCatalogVersion());
173  hdfsCachePools_.add(cachePool);
174  }
175  // Remove dropped cache pools.
176  for (String cachePoolName: droppedCachePoolNames) {
177  hdfsCachePools_.remove(cachePoolName);
178  CatalogServiceCatalog.this.incrementAndGetCatalogVersion();
179  }
180  } finally {
181  catalogLock_.writeLock().unlock();
182  }
183  }
184  }
185 
190  public void watchCacheDirs(List<Long> dirIds, TTableName tblName) {
191  tableLoadingMgr_.watchCacheDirs(dirIds, tblName);
192  }
193 
198  public void prioritizeLoad(List<TCatalogObject> objectDescs) {
199  for (TCatalogObject catalogObject: objectDescs) {
200  Preconditions.checkState(catalogObject.isSetTable());
201  TTable table = catalogObject.getTable();
202  tableLoadingMgr_.prioritizeLoad(new TTableName(table.getDb_name().toLowerCase(),
203  table.getTbl_name().toLowerCase()));
204  }
205  }
206 
212  public TGetAllCatalogObjectsResponse getCatalogObjects(long fromVersion) {
213  TGetAllCatalogObjectsResponse resp = new TGetAllCatalogObjectsResponse();
214  resp.setObjects(new ArrayList<TCatalogObject>());
215  resp.setMax_catalog_version(Catalog.INITIAL_CATALOG_VERSION);
216 
217  // Take a lock on the catalog to ensure this update contains a consistent snapshot
218  // of all items in the catalog.
219  catalogLock_.readLock().lock();
220  try {
221  for (String dbName: getDbNames(null)) {
222  Db db = getDb(dbName);
223  if (db == null) {
224  LOG.error("Database: " + dbName + " was expected to be in the catalog " +
225  "cache. Skipping database and all child objects for this update.");
226  continue;
227  }
228  TCatalogObject catalogDb = new TCatalogObject(TCatalogObjectType.DATABASE,
229  db.getCatalogVersion());
230  catalogDb.setDb(db.toThrift());
231  resp.addToObjects(catalogDb);
232 
233  for (String tblName: db.getAllTableNames()) {
234  TCatalogObject catalogTbl = new TCatalogObject(TCatalogObjectType.TABLE,
236 
237  Table tbl = db.getTable(tblName);
238  if (tbl == null) {
239  LOG.error("Table: " + tblName + " was expected to be in the catalog " +
240  "cache. Skipping table for this update.");
241  continue;
242  }
243 
244  // Only add the extended metadata if this table's version is >=
245  // the fromVersion.
246  if (tbl.getCatalogVersion() >= fromVersion) {
247  try {
248  catalogTbl.setTable(tbl.toThrift());
249  } catch (Exception e) {
250  LOG.debug(String.format("Error calling toThrift() on table %s.%s: %s",
251  dbName, tblName, e.getMessage()), e);
252  continue;
253  }
254  catalogTbl.setCatalog_version(tbl.getCatalogVersion());
255  } else {
256  catalogTbl.setTable(new TTable(dbName, tblName));
257  }
258  resp.addToObjects(catalogTbl);
259  }
260 
261  for (Function fn: db.getFunctions(null, new PatternMatcher())) {
262  TCatalogObject function = new TCatalogObject(TCatalogObjectType.FUNCTION,
263  fn.getCatalogVersion());
264  function.setFn(fn.toThrift());
265  resp.addToObjects(function);
266  }
267  }
268 
269  for (DataSource dataSource: getDataSources()) {
270  TCatalogObject catalogObj = new TCatalogObject(TCatalogObjectType.DATA_SOURCE,
271  dataSource.getCatalogVersion());
272  catalogObj.setData_source(dataSource.toThrift());
273  resp.addToObjects(catalogObj);
274  }
275  for (HdfsCachePool cachePool: hdfsCachePools_) {
276  TCatalogObject pool = new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
277  cachePool.getCatalogVersion());
278  pool.setCache_pool(cachePool.toThrift());
279  resp.addToObjects(pool);
280  }
281 
282  // Get all roles
283  for (Role role: authPolicy_.getAllRoles()) {
284  TCatalogObject thriftRole = new TCatalogObject();
285  thriftRole.setRole(role.toThrift());
286  thriftRole.setCatalog_version(role.getCatalogVersion());
287  thriftRole.setType(role.getCatalogObjectType());
288  resp.addToObjects(thriftRole);
289 
290  for (RolePrivilege p: role.getPrivileges()) {
291  TCatalogObject privilege = new TCatalogObject();
292  privilege.setPrivilege(p.toThrift());
293  privilege.setCatalog_version(p.getCatalogVersion());
294  privilege.setType(p.getCatalogObjectType());
295  resp.addToObjects(privilege);
296  }
297  }
298 
299  // Each update should contain a single "TCatalog" object which is used to
300  // pass overall state on the catalog, such as the current version and the
301  // catalog service id.
302  TCatalogObject catalog = new TCatalogObject();
303  catalog.setType(TCatalogObjectType.CATALOG);
304  // By setting the catalog version to the latest catalog version at this point,
305  // it ensure impalads will always bump their versions, even in the case where
306  // an object has been dropped.
307  catalog.setCatalog_version(getCatalogVersion());
308  catalog.setCatalog(new TCatalog(catalogServiceId_));
309  resp.addToObjects(catalog);
310 
311  // The max version is the max catalog version of all items in the update.
312  resp.setMax_catalog_version(getCatalogVersion());
313  return resp;
314  } finally {
315  catalogLock_.readLock().unlock();
316  }
317  }
318 
323  public List<Function> getFunctions(String dbName) throws DatabaseNotFoundException {
324  Db db = getDb(dbName);
325  if (db == null) {
326  throw new DatabaseNotFoundException("Database does not exist: " + dbName);
327  }
328 
329  // Contains map of overloaded function names to all functions matching that name.
330  HashMap<String, List<Function>> dbFns = db.getAllFunctions();
331  List<Function> fns = new ArrayList<Function>(dbFns.size());
332  for (List<Function> fnOverloads: dbFns.values()) {
333  for (Function fn: fnOverloads) {
334  fns.add(fn);
335  }
336  }
337  return fns;
338  }
339 
343  public void reset() throws CatalogException {
344  // First update the policy metadata.
345  if (sentryProxy_ != null) {
346  // Sentry Service is enabled.
347  try {
348  // Update the authorization policy, waiting for the result to complete.
349  sentryProxy_.refresh();
350  } catch (Exception e) {
351  throw new CatalogException("Error updating authorization policy: ", e);
352  }
353  }
354 
355  catalogLock_.writeLock().lock();
356  try {
357  nextTableId_.set(0);
358 
359  // Since UDFs/UDAs are not persisted in the metastore, we won't clear
360  // them across reset. To do this, we store all the functions before
361  // clearing and restore them after.
362  // TODO: Everything about this. Persist them.
363  List<Pair<String, HashMap<String, List<Function>>>> functions =
364  Lists.newArrayList();
365  for (Db db: dbCache_.get().values()) {
366  if (db.numFunctions() == 0) continue;
367  functions.add(Pair.create(db.getName(), db.getAllFunctions()));
368  }
369 
370  // Build a new DB cache, populate it, and replace the existing cache in one
371  // step.
372  ConcurrentHashMap<String, Db> newDbCache = new ConcurrentHashMap<String, Db>();
373  List<TTableName> tblsToBackgroundLoad = Lists.newArrayList();
374  MetaStoreClient msClient = metaStoreClientPool_.getClient();
375  try {
376  for (String dbName: msClient.getHiveClient().getAllDatabases()) {
377  Db db = new Db(dbName, this);
378  db.setCatalogVersion(incrementAndGetCatalogVersion());
379  newDbCache.put(db.getName().toLowerCase(), db);
380 
381  for (String tableName: msClient.getHiveClient().getAllTables(dbName)) {
382  Table incompleteTbl = IncompleteTable.createUninitializedTable(
383  getNextTableId(), db, tableName);
384  incompleteTbl.setCatalogVersion(incrementAndGetCatalogVersion());
385  db.addTable(incompleteTbl);
386  if (loadInBackground_) {
387  tblsToBackgroundLoad.add(
388  new TTableName(dbName.toLowerCase(), tableName.toLowerCase()));
389  }
390  }
391  }
392  } finally {
393  msClient.release();
394  }
395 
396  // Restore UDFs/UDAs.
397  for (Pair<String, HashMap<String, List<Function>>> dbFns: functions) {
398  Db db = null;
399  try {
400  db = newDbCache.get(dbFns.first);
401  } catch (Exception e) {
402  continue;
403  }
404  if (db == null) {
405  // DB no longer exists - it was probably dropped externally.
406  // TODO: We could restore this DB and then add the functions back?
407  continue;
408  }
409 
410  for (List<Function> fns: dbFns.second.values()) {
411  for (Function fn: fns) {
412  if (fn.getBinaryType() == TFunctionBinaryType.BUILTIN) continue;
413  fn.setCatalogVersion(incrementAndGetCatalogVersion());
414  db.addFunction(fn);
415  }
416  }
417  }
418  dbCache_.set(newDbCache);
419  // Submit tables for background loading.
420  for (TTableName tblName: tblsToBackgroundLoad) {
421  tableLoadingMgr_.backgroundLoad(tblName);
422  }
423  } catch (Exception e) {
424  LOG.error(e);
425  throw new CatalogException("Error initializing Catalog. Catalog may be empty.", e);
426  } finally {
427  catalogLock_.writeLock().unlock();
428  }
429  }
430 
435  public Db addDb(String dbName) throws ImpalaException {
436  Db newDb = new Db(dbName, this);
437  newDb.setCatalogVersion(incrementAndGetCatalogVersion());
438  addDb(newDb);
439  return newDb;
440  }
441 
447  @Override
448  public Db removeDb(String dbName) {
449  Db removedDb = super.removeDb(dbName);
450  if (removedDb != null) {
451  removedDb.setCatalogVersion(incrementAndGetCatalogVersion());
452  }
453  return removedDb;
454  }
455 
460  public Table addTable(String dbName, String tblName) throws TableNotFoundException {
461  Db db = getDb(dbName);
462  if (db == null) return null;
463  Table incompleteTable =
464  IncompleteTable.createUninitializedTable(getNextTableId(), db, tblName);
465  incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
466  db.addTable(incompleteTable);
467  return db.getTable(tblName);
468  }
469 
479  public Table getOrLoadTable(String dbName, String tblName)
480  throws CatalogException {
481  TTableName tableName = new TTableName(dbName.toLowerCase(), tblName.toLowerCase());
482  TableLoadingMgr.LoadRequest loadReq;
483 
484  long previousCatalogVersion;
485  // Return the table if it is already loaded or submit a new load request.
486  catalogLock_.readLock().lock();
487  try {
488  Table tbl = getTable(dbName, tblName);
489  if (tbl == null || tbl.isLoaded()) return tbl;
490  previousCatalogVersion = tbl.getCatalogVersion();
491  loadReq = tableLoadingMgr_.loadAsync(tableName, null);
492  } finally {
493  catalogLock_.readLock().unlock();
494  }
495  Preconditions.checkNotNull(loadReq);
496  try {
497  // The table may have been dropped/modified while the load was in progress, so only
498  // apply the update if the existing table hasn't changed.
499  return replaceTableIfUnchanged(loadReq.get(), previousCatalogVersion);
500  } finally {
501  loadReq.close();
502  }
503  }
504 
509  private Table replaceTableIfUnchanged(Table updatedTbl, long expectedCatalogVersion)
511  catalogLock_.writeLock().lock();
512  try {
513  Db db = getDb(updatedTbl.getDb().getName());
514  if (db == null) {
515  throw new DatabaseNotFoundException(
516  "Database does not exist: " + updatedTbl.getDb().getName());
517  }
518 
519  Table existingTbl = db.getTable(updatedTbl.getName());
520  // The existing table does not exist or has been modified. Instead of
521  // adding the loaded value, return the existing table.
522  if (existingTbl == null ||
523  existingTbl.getCatalogVersion() != expectedCatalogVersion) return existingTbl;
524 
525  updatedTbl.setCatalogVersion(incrementAndGetCatalogVersion());
526  db.addTable(updatedTbl);
527  return updatedTbl;
528  } finally {
529  catalogLock_.writeLock().unlock();
530  }
531  }
532 
537  public Table removeTable(String dbName, String tblName) {
538  Db parentDb = getDb(dbName);
539  if (parentDb == null) return null;
540 
541  Table removedTable = parentDb.removeTable(tblName);
542  if (removedTable != null) {
543  removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
544  }
545  return removedTable;
546  }
547 
553  @Override
555  Function removedFn = super.removeFunction(desc);
556  if (removedFn != null) {
557  removedFn.setCatalogVersion(incrementAndGetCatalogVersion());
558  }
559  return removedFn;
560  }
561 
566  @Override
567  public boolean addFunction(Function fn) {
568  Db db = getDb(fn.getFunctionName().getDb());
569  if (db == null) return false;
570  if (db.addFunction(fn)) {
571  fn.setCatalogVersion(incrementAndGetCatalogVersion());
572  return true;
573  }
574  return false;
575  }
576 
581  @Override
582  public boolean addDataSource(DataSource dataSource) {
583  if (dataSources_.add(dataSource)) {
584  dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
585  return true;
586  }
587  return false;
588  }
589 
590  @Override
591  public DataSource removeDataSource(String dataSourceName) {
592  DataSource dataSource = dataSources_.remove(dataSourceName);
593  if (dataSource != null) {
594  dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
595  }
596  return dataSource;
597  }
598 
603  public static long getLastDdlTime(org.apache.hadoop.hive.metastore.api.Table msTbl) {
604  Preconditions.checkNotNull(msTbl);
605  Map<String, String> params = msTbl.getParameters();
606  String lastDdlTimeStr = params.get("transient_lastDdlTime");
607  if (lastDdlTimeStr != null) {
608  try {
609  return Long.parseLong(lastDdlTimeStr);
610  } catch (NumberFormatException e) {}
611  }
612  return -1;
613  }
614 
620  public void updateLastDdlTime(TTableName tblName, long ddlTime) {
621  Db db = getDb(tblName.getDb_name());
622  if (db == null) return;
623  Table tbl = db.getTable(tblName.getTable_name());
624  if (tbl == null) return;
625  tbl.updateLastDdlTime(ddlTime);
626  }
627 
633  public Table renameTable(TTableName oldTableName, TTableName newTableName)
634  throws CatalogException {
635  // Ensure the removal of the old table and addition of the new table happen
636  // atomically.
637  catalogLock_.writeLock().lock();
638  try {
639  // Remove the old table name from the cache and add the new table.
640  Db db = getDb(oldTableName.getDb_name());
641  if (db != null) db.removeTable(oldTableName.getTable_name());
642  return addTable(newTableName.getDb_name(), newTableName.getTable_name());
643  } finally {
644  catalogLock_.writeLock().unlock();
645  }
646  }
647 
657  public Table reloadTable(TTableName tblName) throws CatalogException {
658  LOG.debug(String.format("Refreshing table metadata: %s.%s",
659  tblName.getDb_name(), tblName.getTable_name()));
660  long previousCatalogVersion;
661  TableLoadingMgr.LoadRequest loadReq;
662  catalogLock_.readLock().lock();
663  try {
664  Table tbl = getTable(tblName.getDb_name(), tblName.getTable_name());
665  if (tbl == null) return null;
666  previousCatalogVersion = tbl.getCatalogVersion();
667  loadReq = tableLoadingMgr_.loadAsync(tblName, tbl);
668  } finally {
669  catalogLock_.readLock().unlock();
670  }
671  Preconditions.checkNotNull(loadReq);
672 
673  try {
674  return replaceTableIfUnchanged(loadReq.get(), previousCatalogVersion);
675  } finally {
676  loadReq.close();
677  }
678  }
679 
686  public Table dropPartition(TableName tableName, List<TPartitionKeyValue> partitionSpec)
687  throws CatalogException {
688  Preconditions.checkNotNull(partitionSpec);
689  Table tbl = getOrLoadTable(tableName.getDb(), tableName.getTbl());
690  if (tbl == null) {
691  throw new TableNotFoundException("Table not found: " + tbl.getFullName());
692  }
693  if (!(tbl instanceof HdfsTable)) {
694  throw new CatalogException("Table " + tbl.getFullName() + " is not an Hdfs table");
695  }
696  HdfsTable hdfsTable = (HdfsTable) tbl;
697  // Locking the catalog here because this accesses hdfsTable's partition list and
698  // updates its catalog version.
699  // TODO: Fix this locking pattern.
700  catalogLock_.writeLock().lock();
701  try {
702  HdfsPartition hdfsPartition = hdfsTable.dropPartition(partitionSpec);
703  if (hdfsPartition == null) return null;
704  return replaceTableIfUnchanged(hdfsTable, hdfsTable.getCatalogVersion());
705  } finally {
706  catalogLock_.writeLock().unlock();
707  }
708  }
709 
715  Preconditions.checkNotNull(partition);
716  HdfsTable hdfsTable = partition.getTable();
717  Db db = getDb(hdfsTable.getDb().getName());
718  // Locking the catalog here because this accesses the hdfsTable's partition list and
719  // updates its catalog version.
720  // TODO: Fix this locking pattern.
721  catalogLock_.writeLock().lock();
722  try {
723  hdfsTable.addPartition(partition);
724  hdfsTable.setCatalogVersion(incrementAndGetCatalogVersion());
725  db.addTable(hdfsTable);
726  } finally {
727  catalogLock_.writeLock().unlock();
728  }
729  return hdfsTable;
730  }
731 
753  public boolean invalidateTable(TTableName tableName, Pair<Db, Table> updatedObjects) {
754  Preconditions.checkNotNull(updatedObjects);
755  updatedObjects.first = null;
756  updatedObjects.second = null;
757  LOG.debug(String.format("Invalidating table metadata: %s.%s",
758  tableName.getDb_name(), tableName.getTable_name()));
759  String dbName = tableName.getDb_name();
760  String tblName = tableName.getTable_name();
761 
762  // Stores whether the table exists in the metastore. Can have three states:
763  // 1) true - Table exists in metastore.
764  // 2) false - Table does not exist in metastore.
765  // 3) unknown (null) - There was exception thrown by the metastore client.
766  Boolean tableExistsInMetaStore;
767  MetaStoreClient msClient = getMetaStoreClient();
768  try {
769  tableExistsInMetaStore = msClient.getHiveClient().tableExists(dbName, tblName);
770  } catch (UnknownDBException e) {
771  // The parent database does not exist in the metastore. Treat this the same
772  // as if the table does not exist.
773  tableExistsInMetaStore = false;
774  } catch (TException e) {
775  LOG.error("Error executing tableExists() metastore call: " + tblName, e);
776  tableExistsInMetaStore = null;
777  }
778 
779  if (tableExistsInMetaStore != null && !tableExistsInMetaStore) {
780  updatedObjects.second = removeTable(dbName, tblName);
781  return true;
782  } else {
783  Db db = getDb(dbName);
784  if ((db == null || !db.containsTable(tblName)) && tableExistsInMetaStore == null) {
785  // The table does not exist in our cache AND it is unknown whether the table
786  // exists in the metastore. Do nothing.
787  return false;
788  } else if (db == null && tableExistsInMetaStore) {
789  // The table exists in the metastore, but our cache does not contain the parent
790  // database. A new db will be added to the cache along with the new table.
791  db = new Db(dbName, this);
792  db.setCatalogVersion(incrementAndGetCatalogVersion());
793  addDb(db);
794  updatedObjects.first = db;
795  }
796 
797  // Add a new uninitialized table to the table cache, effectively invalidating
798  // any existing entry. The metadata for the table will be loaded lazily, on the
799  // on the next access to the table.
800  Table newTable = IncompleteTable.createUninitializedTable(
801  getNextTableId(), db, tblName);
802  newTable.setCatalogVersion(incrementAndGetCatalogVersion());
803  db.addTable(newTable);
804  if (loadInBackground_) {
805  tableLoadingMgr_.backgroundLoad(new TTableName(dbName.toLowerCase(),
806  tblName.toLowerCase()));
807  }
808  updatedObjects.second = newTable;
809  return false;
810  }
811  }
812 
817  public Role addRole(String roleName, Set<String> grantGroups) {
818  catalogLock_.writeLock().lock();
819  try {
820  Role role = new Role(roleName, grantGroups);
821  role.setCatalogVersion(incrementAndGetCatalogVersion());
822  authPolicy_.addRole(role);
823  return role;
824  } finally {
825  catalogLock_.writeLock().unlock();
826  }
827  }
828 
834  public Role removeRole(String roleName) {
835  catalogLock_.writeLock().lock();
836  try {
837  Role role = authPolicy_.removeRole(roleName);
838  if (role == null) return null;
839  role.setCatalogVersion(incrementAndGetCatalogVersion());
840  return role;
841  } finally {
842  catalogLock_.writeLock().unlock();
843  }
844  }
845 
850  public Role addRoleGrantGroup(String roleName, String groupName)
851  throws CatalogException {
852  catalogLock_.writeLock().lock();
853  try {
854  Role role = authPolicy_.addGrantGroup(roleName, groupName);
855  Preconditions.checkNotNull(role);
856  role.setCatalogVersion(incrementAndGetCatalogVersion());
857  return role;
858  } finally {
859  catalogLock_.writeLock().unlock();
860  }
861  }
862 
867  public Role removeRoleGrantGroup(String roleName, String groupName)
868  throws CatalogException {
869  catalogLock_.writeLock().lock();
870  try {
871  Role role = authPolicy_.removeGrantGroup(roleName, groupName);
872  Preconditions.checkNotNull(role);
873  role.setCatalogVersion(incrementAndGetCatalogVersion());
874  return role;
875  } finally {
876  catalogLock_.writeLock().unlock();
877  }
878  }
879 
885  public RolePrivilege addRolePrivilege(String roleName, TPrivilege thriftPriv)
886  throws CatalogException {
887  catalogLock_.writeLock().lock();
888  try {
889  Role role = authPolicy_.getRole(roleName);
890  if (role == null) throw new CatalogException("Role does not exist: " + roleName);
891  RolePrivilege priv = RolePrivilege.fromThrift(thriftPriv);
892  priv.setCatalogVersion(incrementAndGetCatalogVersion());
893  authPolicy_.addPrivilege(priv);
894  return priv;
895  } finally {
896  catalogLock_.writeLock().unlock();
897  }
898  }
899 
905  public RolePrivilege removeRolePrivilege(String roleName, TPrivilege thriftPriv)
906  throws CatalogException {
907  catalogLock_.writeLock().lock();
908  try {
909  Role role = authPolicy_.getRole(roleName);
910  if (role == null) throw new CatalogException("Role does not exist: " + roleName);
911  RolePrivilege rolePrivilege =
912  role.removePrivilege(thriftPriv.getPrivilege_name());
913  if (rolePrivilege == null) return null;
914  rolePrivilege.setCatalogVersion(incrementAndGetCatalogVersion());
915  return rolePrivilege;
916  } finally {
917  catalogLock_.writeLock().unlock();
918  }
919  }
920 
926  public RolePrivilege getRolePrivilege(String roleName, TPrivilege privSpec)
927  throws CatalogException {
928  catalogLock_.readLock().lock();
929  try {
930  Role role = authPolicy_.getRole(roleName);
931  if (role == null) throw new CatalogException("Role does not exist: " + roleName);
932  return role.getPrivilege(privSpec.getPrivilege_name());
933  } finally {
934  catalogLock_.readLock().unlock();
935  }
936  }
937 
942  catalogLock_.writeLock().lock();
943  try {
944  return ++catalogVersion_;
945  } finally {
946  catalogLock_.writeLock().unlock();
947  }
948  }
949 
953  public long getCatalogVersion() {
954  catalogLock_.readLock().lock();
955  try {
956  return catalogVersion_;
957  } finally {
958  catalogLock_.readLock().unlock();
959  }
960  }
961 
965  public TableId getNextTableId() { return new TableId(nextTableId_.getAndIncrement()); }
968 }
List< String > getAllTableNames()
Definition: Db.java:96
AtomicReference< ConcurrentHashMap< String, Db > > dbCache_
Definition: Catalog.java:72
final CatalogObjectCache< DataSource > dataSources_
Definition: Catalog.java:80
final CatalogObjectCache< HdfsCachePool > hdfsCachePools_
Definition: Catalog.java:84
Role addRoleGrantGroup(String roleName, String groupName)
RolePrivilege removeRolePrivilege(String roleName, TPrivilege thriftPriv)
List< DataSource > getDataSources()
Definition: Catalog.java:221
int TableId
Definition: global-types.h:25
Role addRole(String roleName, Set< String > grantGroups)
Table dropPartition(TableName tableName, List< TPartitionKeyValue > partitionSpec)
boolean containsTable(String tableName)
Definition: Db.java:100
Role removeRoleGrantGroup(String roleName, String groupName)
Table renameTable(TTableName oldTableName, TTableName newTableName)
Table getTable(String dbName, String tableName)
Definition: Catalog.java:144
static final long INITIAL_CATALOG_VERSION
Definition: Catalog.java:57
List< String > getDbNames(String dbPattern)
Definition: Catalog.java:136
boolean addFunction(Function fn)
Definition: Db.java:179
ObjectPool pool
static long getLastDdlTime(org.apache.hadoop.hive.metastore.api.Table msTbl)
void prioritizeLoad(List< TCatalogObject > objectDescs)
AuthorizationPolicy authPolicy_
Definition: Catalog.java:67
List< Function > getFunctions(TFunctionCategory category, PatternMatcher fnPattern)
Definition: Db.java:266
boolean invalidateTable(TTableName tableName, Pair< Db, Table > updatedObjects)
void updateLastDdlTime(TTableName tblName, long ddlTime)
Table replaceTableIfUnchanged(Table updatedTbl, long expectedCatalogVersion)
RolePrivilege addRolePrivilege(String roleName, TPrivilege thriftPriv)
MetaStoreClient getMetaStoreClient()
Definition: Catalog.java:326
CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads, SentryConfig sentryConfig, TUniqueId catalogServiceId)
Table getOrLoadTable(String dbName, String tblName)
void watchCacheDirs(List< Long > dirIds, TTableName tblName)
TGetAllCatalogObjectsResponse getCatalogObjects(long fromVersion)
RolePrivilege getRolePrivilege(String roleName, TPrivilege privSpec)