15 package com.cloudera.impala.catalog;
17 import java.util.concurrent.atomic.AtomicBoolean;
19 import org.apache.hadoop.fs.Path;
20 import org.apache.hadoop.hive.metastore.api.MetaException;
21 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
22 import org.apache.log4j.Logger;
23 import org.apache.thrift.TException;
27 import com.cloudera.impala.thrift.TCatalogObject;
28 import com.cloudera.impala.thrift.TCatalogObjectType;
29 import com.cloudera.impala.thrift.TDataSource;
30 import com.cloudera.impala.thrift.TDatabase;
31 import com.cloudera.impala.thrift.TFunction;
32 import com.cloudera.impala.thrift.TPrivilege;
33 import com.cloudera.impala.thrift.TRole;
34 import com.cloudera.impala.thrift.TTable;
35 import com.cloudera.impala.thrift.TUniqueId;
36 import com.cloudera.impala.thrift.TUpdateCatalogCacheRequest;
37 import com.cloudera.impala.thrift.TUpdateCatalogCacheResponse;
65 private static final Logger
LOG = Logger.getLogger(ImpaladCatalog.class);
79 private final AtomicBoolean
isReady_ =
new AtomicBoolean(
false);
120 throw new CatalogException(
"Detected catalog service ID change. Aborting " +
127 for (TCatalogObject catalogObject: req.getUpdated_objects()) {
128 if (catalogObject.getType() == TCatalogObjectType.CATALOG) {
129 newCatalogVersion = catalogObject.getCatalog_version();
133 }
catch (Exception e) {
134 LOG.error(
"Error adding catalog object: " + e.getMessage(), e);
142 for (TCatalogObject catalogObject: req.getRemoved_objects()) {
152 catalogUpdateEventNotifier_.notifyAll();
167 catalogUpdateEventNotifier_.wait(timeoutMs);
168 }
catch (InterruptedException e) {
183 Table table = super.getTable(dbName, tableName);
184 if (table == null)
return null;
191 throw new TableLoadingException(
"Missing metadata for table: " + tableName, cause);
205 public Path
getTablePath(org.apache.hadoop.hive.metastore.api.Table msTbl)
206 throws NoSuchObjectException, MetaException, TException {
211 if (msTbl.getSd().getLocation() == null || msTbl.getSd().getLocation().isEmpty()) {
213 client.getHiveClient().getDatabase(msTbl.getDbName()).getLocationUri();
214 return new Path(dbLocation, msTbl.getTableName().toLowerCase());
216 return new Path(msTbl.getSd().getLocation());
235 LOG.debug(String.format(
"Skipping update because a matching object was removed " +
236 "in a later catalog version: %s", catalogObject));
240 switch(catalogObject.getType()) {
242 addDb(catalogObject.getDb(), catalogObject.getCatalog_version());
246 addTable(catalogObject.getTable(), catalogObject.getCatalog_version());
249 addFunction(catalogObject.getFn(), catalogObject.getCatalog_version());
252 addDataSource(catalogObject.getData_source(), catalogObject.getCatalog_version());
255 Role role = Role.fromThrift(catalogObject.getRole());
256 role.setCatalogVersion(catalogObject.getCatalog_version());
257 authPolicy_.addRole(role);
261 RolePrivilege.fromThrift(catalogObject.getPrivilege());
262 privilege.setCatalogVersion(catalogObject.getCatalog_version());
264 authPolicy_.addPrivilege(privilege);
266 LOG.error(
"Error adding privilege: ", e);
269 case HDFS_CACHE_POOL:
271 cachePool.setCatalogVersion(catalogObject.getCatalog_version());
272 hdfsCachePools_.add(cachePool);
275 throw new IllegalStateException(
276 "Unexpected TCatalogObjectType: " + catalogObject.getType());
291 long currentCatalogUpdateVersion) {
298 long dropCatalogVersion = catalogObject.getCatalog_version() == 0 ?
299 currentCatalogUpdateVersion : catalogObject.getCatalog_version();
301 switch(catalogObject.getType()) {
303 removeDb(catalogObject.getDb(), dropCatalogVersion);
307 removeTable(catalogObject.getTable(), dropCatalogVersion);
316 removeRole(catalogObject.getRole(), dropCatalogVersion);
321 case HDFS_CACHE_POOL:
323 hdfsCachePools_.get(catalogObject.getCache_pool().getPool_name());
325 hdfsCachePools_.remove(catalogObject.getCache_pool().getPool_name());
329 throw new IllegalStateException(
330 "Unexpected TCatalogObjectType: " + catalogObject.getType());
334 catalogDeltaLog_.addRemovedObject(catalogObject);
338 private void addDb(TDatabase thriftDb,
long catalogVersion) {
339 Db existingDb =
getDb(thriftDb.getDb_name());
340 if (existingDb == null ||
342 Db newDb = Db.fromTDatabase(thriftDb,
this);
343 newDb.setCatalogVersion(catalogVersion);
348 private void addTable(TTable thriftTable,
long catalogVersion)
350 Db db =
getDb(thriftTable.db_name);
352 LOG.debug(
"Parent database of table does not exist: " +
353 thriftTable.db_name +
"." + thriftTable.tbl_name);
357 Table newTable = Table.fromThrift(db, thriftTable);
358 newTable.setCatalogVersion(catalogVersion);
359 db.addTable(newTable);
363 Function function = Function.fromThrift(fn);
364 function.setCatalogVersion(catalogVersion);
367 LOG.debug(
"Parent database of function does not exist: " + function.getName());
370 Function existingFn = db.getFunction(fn.getSignature());
371 if (existingFn == null ||
373 db.addFunction(
function);
378 DataSource dataSource = DataSource.fromThrift(thrift);
379 dataSource.setCatalogVersion(catalogVersion);
387 private void removeDb(TDatabase thriftDb,
long dropCatalogVersion) {
388 Db db =
getDb(thriftDb.getDb_name());
394 private void removeTable(TTable thriftTable,
long dropCatalogVersion) {
395 Db db =
getDb(thriftTable.db_name);
397 if (db == null)
return;
399 Table table = db.getTable(thriftTable.getTbl_name());
401 db.removeTable(thriftTable.tbl_name);
406 Db db =
getDb(thriftFn.name.getDb_name());
408 if (db == null)
return;
412 Function fn = db.getFunction(thriftFn.getSignature());
414 db.removeFunction(thriftFn.getSignature());
418 private void removeRole(TRole thriftRole,
long dropCatalogVersion) {
419 Role existingRole = authPolicy_.getRole(thriftRole.getRole_name());
421 if (existingRole != null && existingRole.
getCatalogVersion() < dropCatalogVersion) {
422 authPolicy_.removeRole(thriftRole.getRole_name());
427 Role role = authPolicy_.getRole(thriftPrivilege.getRole_id());
428 if (role == null)
return;
430 role.getPrivilege(thriftPrivilege.getPrivilege_name());
432 if (existingPrivilege != null &&
434 role.removePrivilege(thriftPrivilege.getPrivilege_name());
443 public boolean isReady() {
return isReady_.get(); }
synchronized TUpdateCatalogCacheResponse updateCatalog(TUpdateCatalogCacheRequest req)
void removeRole(TRole thriftRole, long dropCatalogVersion)
void removeDataSource(TDataSource thrift, long dropCatalogVersion)
void addFunction(TFunction fn, long catalogVersion)
Path getTablePath(org.apache.hadoop.hive.metastore.api.Table msTbl)
void removeCatalogObject(TCatalogObject catalogObject, long currentCatalogUpdateVersion)
synchronized long getCatalogVersion()
final Object catalogUpdateEventNotifier_
AuthorizationPolicy getAuthPolicy()
Table getTable(String dbName, String tableName)
synchronized boolean wasObjectRemovedAfter(TCatalogObject catalogObject)
static final long INITIAL_CATALOG_VERSION
synchronized long getCatalogVersion()
TUniqueId catalogServiceId_
void addCatalogObject(TCatalogObject catalogObject)
void addTable(TTable thriftTable, long catalogVersion)
final CatalogDeltaLog catalogDeltaLog_
final AtomicBoolean isReady_
AuthorizationPolicy authPolicy_
void removeTable(TTable thriftTable, long dropCatalogVersion)
static final TUniqueId INITIAL_CATALOG_SERVICE_ID
void addDataSource(TDataSource thrift, long catalogVersion)
long lastSyncedCatalogVersion_
void removeFunction(TFunction thriftFn, long dropCatalogVersion)
void removeDb(TDatabase thriftDb, long dropCatalogVersion)
MetaStoreClient getMetaStoreClient()
void removePrivilege(TPrivilege thriftPrivilege, long dropCatalogVersion)
void waitForCatalogUpdate(long timeoutMs)
void addDb(TDatabase thriftDb, long catalogVersion)