 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
15 package com.cloudera.impala.catalog;
17 import java.util.concurrent.atomic.AtomicBoolean;
19 import org.apache.hadoop.fs.Path;
20 import org.apache.hadoop.hive.metastore.api.MetaException;
21 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
22 import org.apache.log4j.Logger;
23 import org.apache.thrift.TException;
27 import com.cloudera.impala.thrift.TCatalogObject;
28 import com.cloudera.impala.thrift.TCatalogObjectType;
29 import com.cloudera.impala.thrift.TDataSource;
30 import com.cloudera.impala.thrift.TDatabase;
31 import com.cloudera.impala.thrift.TFunction;
32 import com.cloudera.impala.thrift.TPrivilege;
33 import com.cloudera.impala.thrift.TRole;
34 import com.cloudera.impala.thrift.TTable;
35 import com.cloudera.impala.thrift.TUniqueId;
36 import com.cloudera.impala.thrift.TUpdateCatalogCacheRequest;
37 import com.cloudera.impala.thrift.TUpdateCatalogCacheResponse;
64 public class ImpaladCatalog extends Catalog {
65  private static final Logger LOG = Logger.getLogger(ImpaladCatalog.class);
66  private static final TUniqueId INITIAL_CATALOG_SERVICE_ID = new TUniqueId(0L, 0L);
68  // The last known Catalog Service ID. If the ID changes, it indicates the CatalogServer
69  // has restarted.
72  // The catalog version received in the last StateStore heartbeat. It is guaranteed
73  // all objects in the catalog have at a minimum, this version. Because updates may
74  // be applied out of band of a StateStore heartbeat, it is possible the catalog
75  // contains some objects > than this version.
78  // Flag to determine if the Catalog is ready to accept user requests. See isReady().
79  private final AtomicBoolean isReady_ = new AtomicBoolean(false);
81  // Tracks modifications to this Impalad's catalog from direct updates to the cache.
84  // Object that is used to synchronize on and signal when a catalog update is received.
85  private final Object catalogUpdateEventNotifier_ = new Object();
91  public ImpaladCatalog() {
92  super(false);
93  }
112  public synchronized TUpdateCatalogCacheResponse updateCatalog(
113  TUpdateCatalogCacheRequest req) throws CatalogException {
114  // Check for changes in the catalog service ID.
115  if (!catalogServiceId_.equals(req.getCatalog_service_id())) {
116  boolean firstRun = catalogServiceId_.equals(INITIAL_CATALOG_SERVICE_ID);
117  catalogServiceId_ = req.getCatalog_service_id();
118  if (!firstRun) {
119  // Throw an exception which will trigger a full topic update request.
120  throw new CatalogException("Detected catalog service ID change. Aborting " +
121  "updateCatalog()");
122  }
123  }
125  // First process all updates
126  long newCatalogVersion = lastSyncedCatalogVersion_;
127  for (TCatalogObject catalogObject: req.getUpdated_objects()) {
128  if (catalogObject.getType() == TCatalogObjectType.CATALOG) {
129  newCatalogVersion = catalogObject.getCatalog_version();
130  } else {
131  try {
132  addCatalogObject(catalogObject);
133  } catch (Exception e) {
134  LOG.error("Error adding catalog object: " + e.getMessage(), e);
135  }
136  }
137  }
139  // Now remove all objects from the catalog. Removing a database before removing
140  // its child tables/functions is fine. If that happens, the removal of the child
141  // object will be a no-op.
142  for (TCatalogObject catalogObject: req.getRemoved_objects()) {
143  removeCatalogObject(catalogObject, newCatalogVersion);
144  }
145  lastSyncedCatalogVersion_ = newCatalogVersion;
146  // Cleanup old entries in the log.
147  catalogDeltaLog_.garbageCollect(lastSyncedCatalogVersion_);
148  isReady_.set(true);
150  // Notify all the threads waiting on a catalog update.
151  synchronized (catalogUpdateEventNotifier_) {
152  catalogUpdateEventNotifier_.notifyAll();
153  }
155  return new TUpdateCatalogCacheResponse(catalogServiceId_);
156  }
164  public void waitForCatalogUpdate(long timeoutMs) {
165  synchronized (catalogUpdateEventNotifier_) {
166  try {
167  catalogUpdateEventNotifier_.wait(timeoutMs);
168  } catch (InterruptedException e) {
169  // Ignore
170  }
171  }
172  }
180  @Override
181  public Table getTable(String dbName, String tableName)
182  throws CatalogException {
183  Table table = super.getTable(dbName, tableName);
184  if (table == null) return null;
186  if (table.isLoaded() && table instanceof IncompleteTable) {
187  // If there were problems loading this table's metadata, throw an exception
188  // when it is accessed.
189  ImpalaException cause = ((IncompleteTable) table).getCause();
190  if (cause instanceof TableLoadingException) throw (TableLoadingException) cause;
191  throw new TableLoadingException("Missing metadata for table: " + tableName, cause);
192  }
193  return table;
194  }
205  public Path getTablePath(org.apache.hadoop.hive.metastore.api.Table msTbl)
206  throws NoSuchObjectException, MetaException, TException {
208  try {
209  // If the table did not have its path set, build the path based on the the
210  // location property of the parent database.
211  if (msTbl.getSd().getLocation() == null || msTbl.getSd().getLocation().isEmpty()) {
212  String dbLocation =
213  client.getHiveClient().getDatabase(msTbl.getDbName()).getLocationUri();
214  return new Path(dbLocation, msTbl.getTableName().toLowerCase());
215  } else {
216  return new Path(msTbl.getSd().getLocation());
217  }
218  } finally {
219  client.release();
220  }
221  }
231  private void addCatalogObject(TCatalogObject catalogObject)
233  // This item is out of date and should not be applied to the catalog.
234  if (catalogDeltaLog_.wasObjectRemovedAfter(catalogObject)) {
235  LOG.debug(String.format("Skipping update because a matching object was removed " +
236  "in a later catalog version: %s", catalogObject));
237  return;
238  }
240  switch(catalogObject.getType()) {
241  case DATABASE:
242  addDb(catalogObject.getDb(), catalogObject.getCatalog_version());
243  break;
244  case TABLE:
245  case VIEW:
246  addTable(catalogObject.getTable(), catalogObject.getCatalog_version());
247  break;
248  case FUNCTION:
249  addFunction(catalogObject.getFn(), catalogObject.getCatalog_version());
250  break;
251  case DATA_SOURCE:
252  addDataSource(catalogObject.getData_source(), catalogObject.getCatalog_version());
253  break;
254  case ROLE:
255  Role role = Role.fromThrift(catalogObject.getRole());
256  role.setCatalogVersion(catalogObject.getCatalog_version());
257  authPolicy_.addRole(role);
258  break;
259  case PRIVILEGE:
260  RolePrivilege privilege =
261  RolePrivilege.fromThrift(catalogObject.getPrivilege());
262  privilege.setCatalogVersion(catalogObject.getCatalog_version());
263  try {
264  authPolicy_.addPrivilege(privilege);
265  } catch (CatalogException e) {
266  LOG.error("Error adding privilege: ", e);
267  }
268  break;
269  case HDFS_CACHE_POOL:
270  HdfsCachePool cachePool = new HdfsCachePool(catalogObject.getCache_pool());
271  cachePool.setCatalogVersion(catalogObject.getCatalog_version());
272  hdfsCachePools_.add(cachePool);
273  break;
274  default:
275  throw new IllegalStateException(
276  "Unexpected TCatalogObjectType: " + catalogObject.getType());
277  }
278  }
290  private void removeCatalogObject(TCatalogObject catalogObject,
291  long currentCatalogUpdateVersion) {
292  // The TCatalogObject associated with a drop operation from a state store
293  // heartbeat will always have a version of zero. Because no update from
294  // the state store can contain both a drop and an addition of the same object,
295  // we can assume the drop version is the current catalog version of this update.
296  // If the TCatalogObject contains a version that != 0, it indicates the drop
297  // came from a direct update.
298  long dropCatalogVersion = catalogObject.getCatalog_version() == 0 ?
299  currentCatalogUpdateVersion : catalogObject.getCatalog_version();
301  switch(catalogObject.getType()) {
302  case DATABASE:
303  removeDb(catalogObject.getDb(), dropCatalogVersion);
304  break;
305  case TABLE:
306  case VIEW:
307  removeTable(catalogObject.getTable(), dropCatalogVersion);
308  break;
309  case FUNCTION:
310  removeFunction(catalogObject.getFn(), dropCatalogVersion);
311  break;
312  case DATA_SOURCE:
313  removeDataSource(catalogObject.getData_source(), dropCatalogVersion);
314  break;
315  case ROLE:
316  removeRole(catalogObject.getRole(), dropCatalogVersion);
317  break;
318  case PRIVILEGE:
319  removePrivilege(catalogObject.getPrivilege(), dropCatalogVersion);
320  break;
321  case HDFS_CACHE_POOL:
322  HdfsCachePool existingItem =
323  hdfsCachePools_.get(catalogObject.getCache_pool().getPool_name());
324  if (existingItem.getCatalogVersion() > catalogObject.getCatalog_version()) {
325  hdfsCachePools_.remove(catalogObject.getCache_pool().getPool_name());
326  }
327  break;
328  default:
329  throw new IllegalStateException(
330  "Unexpected TCatalogObjectType: " + catalogObject.getType());
331  }
333  if (catalogObject.getCatalog_version() > lastSyncedCatalogVersion_) {
334  catalogDeltaLog_.addRemovedObject(catalogObject);
335  }
336  }
338  private void addDb(TDatabase thriftDb, long catalogVersion) {
339  Db existingDb = getDb(thriftDb.getDb_name());
340  if (existingDb == null ||
341  existingDb.getCatalogVersion() < catalogVersion) {
342  Db newDb = Db.fromTDatabase(thriftDb, this);
343  newDb.setCatalogVersion(catalogVersion);
344  addDb(newDb);
345  }
346  }
348  private void addTable(TTable thriftTable, long catalogVersion)
349  throws TableLoadingException {
350  Db db = getDb(thriftTable.db_name);
351  if (db == null) {
352  LOG.debug("Parent database of table does not exist: " +
353  thriftTable.db_name + "." + thriftTable.tbl_name);
354  return;
355  }
357  Table newTable = Table.fromThrift(db, thriftTable);
358  newTable.setCatalogVersion(catalogVersion);
359  db.addTable(newTable);
360  }
362  private void addFunction(TFunction fn, long catalogVersion) {
363  Function function = Function.fromThrift(fn);
364  function.setCatalogVersion(catalogVersion);
365  Db db = getDb(function.getFunctionName().getDb());
366  if (db == null) {
367  LOG.debug("Parent database of function does not exist: " + function.getName());
368  return;
369  }
370  Function existingFn = db.getFunction(fn.getSignature());
371  if (existingFn == null ||
372  existingFn.getCatalogVersion() < catalogVersion) {
373  db.addFunction(function);
374  }
375  }
377  private void addDataSource(TDataSource thrift, long catalogVersion) {
378  DataSource dataSource = DataSource.fromThrift(thrift);
379  dataSource.setCatalogVersion(catalogVersion);
380  addDataSource(dataSource);
381  }
383  private void removeDataSource(TDataSource thrift, long dropCatalogVersion) {
384  removeDataSource(thrift.getName());
385  }
387  private void removeDb(TDatabase thriftDb, long dropCatalogVersion) {
388  Db db = getDb(thriftDb.getDb_name());
389  if (db != null && db.getCatalogVersion() < dropCatalogVersion) {
390  removeDb(db.getName());
391  }
392  }
394  private void removeTable(TTable thriftTable, long dropCatalogVersion) {
395  Db db = getDb(thriftTable.db_name);
396  // The parent database doesn't exist, nothing to do.
397  if (db == null) return;
399  Table table = db.getTable(thriftTable.getTbl_name());
400  if (table != null && table.getCatalogVersion() < dropCatalogVersion) {
401  db.removeTable(thriftTable.tbl_name);
402  }
403  }
405  private void removeFunction(TFunction thriftFn, long dropCatalogVersion) {
406  Db db = getDb(;
407  // The parent database doesn't exist, nothing to do.
408  if (db == null) return;
410  // If the function exists and it has a catalog version less than the
411  // version of the drop, remove the function.
412  Function fn = db.getFunction(thriftFn.getSignature());
413  if (fn != null && fn.getCatalogVersion() < dropCatalogVersion) {
414  db.removeFunction(thriftFn.getSignature());
415  }
416  }
418  private void removeRole(TRole thriftRole, long dropCatalogVersion) {
419  Role existingRole = authPolicy_.getRole(thriftRole.getRole_name());
420  // version of the drop, remove the function.
421  if (existingRole != null && existingRole.getCatalogVersion() < dropCatalogVersion) {
422  authPolicy_.removeRole(thriftRole.getRole_name());
423  }
424  }
426  private void removePrivilege(TPrivilege thriftPrivilege, long dropCatalogVersion) {
427  Role role = authPolicy_.getRole(thriftPrivilege.getRole_id());
428  if (role == null) return;
429  RolePrivilege existingPrivilege =
430  role.getPrivilege(thriftPrivilege.getPrivilege_name());
431  // version of the drop, remove the function.
432  if (existingPrivilege != null &&
433  existingPrivilege.getCatalogVersion() < dropCatalogVersion) {
434  role.removePrivilege(thriftPrivilege.getPrivilege_name());
435  }
436  }
443  public boolean isReady() { return isReady_.get(); }
445  // Only used for testing.
446  public void setIsReady() { isReady_.set(true); }
448 }
synchronized TUpdateCatalogCacheResponse updateCatalog(TUpdateCatalogCacheRequest req)
void removeRole(TRole thriftRole, long dropCatalogVersion)
void removeDataSource(TDataSource thrift, long dropCatalogVersion)
void addFunction(TFunction fn, long catalogVersion)
Path getTablePath(org.apache.hadoop.hive.metastore.api.Table msTbl)
void removeCatalogObject(TCatalogObject catalogObject, long currentCatalogUpdateVersion)
Table getTable(String dbName, String tableName)
synchronized boolean wasObjectRemovedAfter(TCatalogObject catalogObject)
synchronized long getCatalogVersion()
void addCatalogObject(TCatalogObject catalogObject)
void addTable(TTable thriftTable, long catalogVersion)
AuthorizationPolicy authPolicy_
void removeTable(TTable thriftTable, long dropCatalogVersion)
void addDataSource(TDataSource thrift, long catalogVersion)
void removeFunction(TFunction thriftFn, long dropCatalogVersion)
void removeDb(TDatabase thriftDb, long dropCatalogVersion)
MetaStoreClient getMetaStoreClient()
void removePrivilege(TPrivilege thriftPrivilege, long dropCatalogVersion)
void addDb(TDatabase thriftDb, long catalogVersion)