Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
ImpaladCatalog.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.catalog;
16 
17 import java.util.concurrent.atomic.AtomicBoolean;
18 
19 import org.apache.hadoop.fs.Path;
20 import org.apache.hadoop.hive.metastore.api.MetaException;
21 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
22 import org.apache.log4j.Logger;
23 import org.apache.thrift.TException;
24 
27 import com.cloudera.impala.thrift.TCatalogObject;
28 import com.cloudera.impala.thrift.TCatalogObjectType;
29 import com.cloudera.impala.thrift.TDataSource;
30 import com.cloudera.impala.thrift.TDatabase;
31 import com.cloudera.impala.thrift.TFunction;
32 import com.cloudera.impala.thrift.TPrivilege;
33 import com.cloudera.impala.thrift.TRole;
34 import com.cloudera.impala.thrift.TTable;
35 import com.cloudera.impala.thrift.TUniqueId;
36 import com.cloudera.impala.thrift.TUpdateCatalogCacheRequest;
37 import com.cloudera.impala.thrift.TUpdateCatalogCacheResponse;
38 
64 public class ImpaladCatalog extends Catalog {
65  private static final Logger LOG = Logger.getLogger(ImpaladCatalog.class);
66  private static final TUniqueId INITIAL_CATALOG_SERVICE_ID = new TUniqueId(0L, 0L);
67 
68  // The last known Catalog Service ID. If the ID changes, it indicates the CatalogServer
69  // has restarted.
71 
72  // The catalog version received in the last StateStore heartbeat. It is guaranteed
73  // all objects in the catalog have at a minimum, this version. Because updates may
74  // be applied out of band of a StateStore heartbeat, it is possible the catalog
75  // contains some objects > than this version.
77 
78  // Flag to determine if the Catalog is ready to accept user requests. See isReady().
79  private final AtomicBoolean isReady_ = new AtomicBoolean(false);
80 
81  // Tracks modifications to this Impalad's catalog from direct updates to the cache.
83 
84  // Object that is used to synchronize on and signal when a catalog update is received.
85  private final Object catalogUpdateEventNotifier_ = new Object();
86 
91  public ImpaladCatalog() {
92  super(false);
93  }
94 
112  public synchronized TUpdateCatalogCacheResponse updateCatalog(
113  TUpdateCatalogCacheRequest req) throws CatalogException {
114  // Check for changes in the catalog service ID.
115  if (!catalogServiceId_.equals(req.getCatalog_service_id())) {
116  boolean firstRun = catalogServiceId_.equals(INITIAL_CATALOG_SERVICE_ID);
117  catalogServiceId_ = req.getCatalog_service_id();
118  if (!firstRun) {
119  // Throw an exception which will trigger a full topic update request.
120  throw new CatalogException("Detected catalog service ID change. Aborting " +
121  "updateCatalog()");
122  }
123  }
124 
125  // First process all updates
126  long newCatalogVersion = lastSyncedCatalogVersion_;
127  for (TCatalogObject catalogObject: req.getUpdated_objects()) {
128  if (catalogObject.getType() == TCatalogObjectType.CATALOG) {
129  newCatalogVersion = catalogObject.getCatalog_version();
130  } else {
131  try {
132  addCatalogObject(catalogObject);
133  } catch (Exception e) {
134  LOG.error("Error adding catalog object: " + e.getMessage(), e);
135  }
136  }
137  }
138 
139  // Now remove all objects from the catalog. Removing a database before removing
140  // its child tables/functions is fine. If that happens, the removal of the child
141  // object will be a no-op.
142  for (TCatalogObject catalogObject: req.getRemoved_objects()) {
143  removeCatalogObject(catalogObject, newCatalogVersion);
144  }
145  lastSyncedCatalogVersion_ = newCatalogVersion;
146  // Cleanup old entries in the log.
147  catalogDeltaLog_.garbageCollect(lastSyncedCatalogVersion_);
148  isReady_.set(true);
149 
150  // Notify all the threads waiting on a catalog update.
151  synchronized (catalogUpdateEventNotifier_) {
152  catalogUpdateEventNotifier_.notifyAll();
153  }
154 
155  return new TUpdateCatalogCacheResponse(catalogServiceId_);
156  }
157 
164  public void waitForCatalogUpdate(long timeoutMs) {
165  synchronized (catalogUpdateEventNotifier_) {
166  try {
167  catalogUpdateEventNotifier_.wait(timeoutMs);
168  } catch (InterruptedException e) {
169  // Ignore
170  }
171  }
172  }
173 
180  @Override
181  public Table getTable(String dbName, String tableName)
182  throws CatalogException {
183  Table table = super.getTable(dbName, tableName);
184  if (table == null) return null;
185 
186  if (table.isLoaded() && table instanceof IncompleteTable) {
187  // If there were problems loading this table's metadata, throw an exception
188  // when it is accessed.
189  ImpalaException cause = ((IncompleteTable) table).getCause();
190  if (cause instanceof TableLoadingException) throw (TableLoadingException) cause;
191  throw new TableLoadingException("Missing metadata for table: " + tableName, cause);
192  }
193  return table;
194  }
195 
205  public Path getTablePath(org.apache.hadoop.hive.metastore.api.Table msTbl)
206  throws NoSuchObjectException, MetaException, TException {
208  try {
209  // If the table did not have its path set, build the path based on the the
210  // location property of the parent database.
211  if (msTbl.getSd().getLocation() == null || msTbl.getSd().getLocation().isEmpty()) {
212  String dbLocation =
213  client.getHiveClient().getDatabase(msTbl.getDbName()).getLocationUri();
214  return new Path(dbLocation, msTbl.getTableName().toLowerCase());
215  } else {
216  return new Path(msTbl.getSd().getLocation());
217  }
218  } finally {
219  client.release();
220  }
221  }
222 
231  private void addCatalogObject(TCatalogObject catalogObject)
233  // This item is out of date and should not be applied to the catalog.
234  if (catalogDeltaLog_.wasObjectRemovedAfter(catalogObject)) {
235  LOG.debug(String.format("Skipping update because a matching object was removed " +
236  "in a later catalog version: %s", catalogObject));
237  return;
238  }
239 
240  switch(catalogObject.getType()) {
241  case DATABASE:
242  addDb(catalogObject.getDb(), catalogObject.getCatalog_version());
243  break;
244  case TABLE:
245  case VIEW:
246  addTable(catalogObject.getTable(), catalogObject.getCatalog_version());
247  break;
248  case FUNCTION:
249  addFunction(catalogObject.getFn(), catalogObject.getCatalog_version());
250  break;
251  case DATA_SOURCE:
252  addDataSource(catalogObject.getData_source(), catalogObject.getCatalog_version());
253  break;
254  case ROLE:
255  Role role = Role.fromThrift(catalogObject.getRole());
256  role.setCatalogVersion(catalogObject.getCatalog_version());
257  authPolicy_.addRole(role);
258  break;
259  case PRIVILEGE:
260  RolePrivilege privilege =
261  RolePrivilege.fromThrift(catalogObject.getPrivilege());
262  privilege.setCatalogVersion(catalogObject.getCatalog_version());
263  try {
264  authPolicy_.addPrivilege(privilege);
265  } catch (CatalogException e) {
266  LOG.error("Error adding privilege: ", e);
267  }
268  break;
269  case HDFS_CACHE_POOL:
270  HdfsCachePool cachePool = new HdfsCachePool(catalogObject.getCache_pool());
271  cachePool.setCatalogVersion(catalogObject.getCatalog_version());
272  hdfsCachePools_.add(cachePool);
273  break;
274  default:
275  throw new IllegalStateException(
276  "Unexpected TCatalogObjectType: " + catalogObject.getType());
277  }
278  }
279 
290  private void removeCatalogObject(TCatalogObject catalogObject,
291  long currentCatalogUpdateVersion) {
292  // The TCatalogObject associated with a drop operation from a state store
293  // heartbeat will always have a version of zero. Because no update from
294  // the state store can contain both a drop and an addition of the same object,
295  // we can assume the drop version is the current catalog version of this update.
296  // If the TCatalogObject contains a version that != 0, it indicates the drop
297  // came from a direct update.
298  long dropCatalogVersion = catalogObject.getCatalog_version() == 0 ?
299  currentCatalogUpdateVersion : catalogObject.getCatalog_version();
300 
301  switch(catalogObject.getType()) {
302  case DATABASE:
303  removeDb(catalogObject.getDb(), dropCatalogVersion);
304  break;
305  case TABLE:
306  case VIEW:
307  removeTable(catalogObject.getTable(), dropCatalogVersion);
308  break;
309  case FUNCTION:
310  removeFunction(catalogObject.getFn(), dropCatalogVersion);
311  break;
312  case DATA_SOURCE:
313  removeDataSource(catalogObject.getData_source(), dropCatalogVersion);
314  break;
315  case ROLE:
316  removeRole(catalogObject.getRole(), dropCatalogVersion);
317  break;
318  case PRIVILEGE:
319  removePrivilege(catalogObject.getPrivilege(), dropCatalogVersion);
320  break;
321  case HDFS_CACHE_POOL:
322  HdfsCachePool existingItem =
323  hdfsCachePools_.get(catalogObject.getCache_pool().getPool_name());
324  if (existingItem.getCatalogVersion() > catalogObject.getCatalog_version()) {
325  hdfsCachePools_.remove(catalogObject.getCache_pool().getPool_name());
326  }
327  break;
328  default:
329  throw new IllegalStateException(
330  "Unexpected TCatalogObjectType: " + catalogObject.getType());
331  }
332 
333  if (catalogObject.getCatalog_version() > lastSyncedCatalogVersion_) {
334  catalogDeltaLog_.addRemovedObject(catalogObject);
335  }
336  }
337 
338  private void addDb(TDatabase thriftDb, long catalogVersion) {
339  Db existingDb = getDb(thriftDb.getDb_name());
340  if (existingDb == null ||
341  existingDb.getCatalogVersion() < catalogVersion) {
342  Db newDb = Db.fromTDatabase(thriftDb, this);
343  newDb.setCatalogVersion(catalogVersion);
344  addDb(newDb);
345  }
346  }
347 
348  private void addTable(TTable thriftTable, long catalogVersion)
349  throws TableLoadingException {
350  Db db = getDb(thriftTable.db_name);
351  if (db == null) {
352  LOG.debug("Parent database of table does not exist: " +
353  thriftTable.db_name + "." + thriftTable.tbl_name);
354  return;
355  }
356 
357  Table newTable = Table.fromThrift(db, thriftTable);
358  newTable.setCatalogVersion(catalogVersion);
359  db.addTable(newTable);
360  }
361 
362  private void addFunction(TFunction fn, long catalogVersion) {
363  Function function = Function.fromThrift(fn);
364  function.setCatalogVersion(catalogVersion);
365  Db db = getDb(function.getFunctionName().getDb());
366  if (db == null) {
367  LOG.debug("Parent database of function does not exist: " + function.getName());
368  return;
369  }
370  Function existingFn = db.getFunction(fn.getSignature());
371  if (existingFn == null ||
372  existingFn.getCatalogVersion() < catalogVersion) {
373  db.addFunction(function);
374  }
375  }
376 
377  private void addDataSource(TDataSource thrift, long catalogVersion) {
378  DataSource dataSource = DataSource.fromThrift(thrift);
379  dataSource.setCatalogVersion(catalogVersion);
380  addDataSource(dataSource);
381  }
382 
383  private void removeDataSource(TDataSource thrift, long dropCatalogVersion) {
384  removeDataSource(thrift.getName());
385  }
386 
387  private void removeDb(TDatabase thriftDb, long dropCatalogVersion) {
388  Db db = getDb(thriftDb.getDb_name());
389  if (db != null && db.getCatalogVersion() < dropCatalogVersion) {
390  removeDb(db.getName());
391  }
392  }
393 
394  private void removeTable(TTable thriftTable, long dropCatalogVersion) {
395  Db db = getDb(thriftTable.db_name);
396  // The parent database doesn't exist, nothing to do.
397  if (db == null) return;
398 
399  Table table = db.getTable(thriftTable.getTbl_name());
400  if (table != null && table.getCatalogVersion() < dropCatalogVersion) {
401  db.removeTable(thriftTable.tbl_name);
402  }
403  }
404 
405  private void removeFunction(TFunction thriftFn, long dropCatalogVersion) {
406  Db db = getDb(thriftFn.name.getDb_name());
407  // The parent database doesn't exist, nothing to do.
408  if (db == null) return;
409 
410  // If the function exists and it has a catalog version less than the
411  // version of the drop, remove the function.
412  Function fn = db.getFunction(thriftFn.getSignature());
413  if (fn != null && fn.getCatalogVersion() < dropCatalogVersion) {
414  db.removeFunction(thriftFn.getSignature());
415  }
416  }
417 
418  private void removeRole(TRole thriftRole, long dropCatalogVersion) {
419  Role existingRole = authPolicy_.getRole(thriftRole.getRole_name());
420  // version of the drop, remove the function.
421  if (existingRole != null && existingRole.getCatalogVersion() < dropCatalogVersion) {
422  authPolicy_.removeRole(thriftRole.getRole_name());
423  }
424  }
425 
426  private void removePrivilege(TPrivilege thriftPrivilege, long dropCatalogVersion) {
427  Role role = authPolicy_.getRole(thriftPrivilege.getRole_id());
428  if (role == null) return;
429  RolePrivilege existingPrivilege =
430  role.getPrivilege(thriftPrivilege.getPrivilege_name());
431  // version of the drop, remove the function.
432  if (existingPrivilege != null &&
433  existingPrivilege.getCatalogVersion() < dropCatalogVersion) {
434  role.removePrivilege(thriftPrivilege.getPrivilege_name());
435  }
436  }
437 
443  public boolean isReady() { return isReady_.get(); }
444 
445  // Only used for testing.
446  public void setIsReady() { isReady_.set(true); }
448 }
synchronized TUpdateCatalogCacheResponse updateCatalog(TUpdateCatalogCacheRequest req)
void removeRole(TRole thriftRole, long dropCatalogVersion)
void removeDataSource(TDataSource thrift, long dropCatalogVersion)
void addFunction(TFunction fn, long catalogVersion)
Path getTablePath(org.apache.hadoop.hive.metastore.api.Table msTbl)
void removeCatalogObject(TCatalogObject catalogObject, long currentCatalogUpdateVersion)
Table getTable(String dbName, String tableName)
synchronized boolean wasObjectRemovedAfter(TCatalogObject catalogObject)
static final long INITIAL_CATALOG_VERSION
Definition: Catalog.java:57
synchronized long getCatalogVersion()
Definition: Role.java:134
void addCatalogObject(TCatalogObject catalogObject)
void addTable(TTable thriftTable, long catalogVersion)
AuthorizationPolicy authPolicy_
Definition: Catalog.java:67
void removeTable(TTable thriftTable, long dropCatalogVersion)
static final TUniqueId INITIAL_CATALOG_SERVICE_ID
void addDataSource(TDataSource thrift, long catalogVersion)
void removeFunction(TFunction thriftFn, long dropCatalogVersion)
void removeDb(TDatabase thriftDb, long dropCatalogVersion)
MetaStoreClient getMetaStoreClient()
Definition: Catalog.java:326
void removePrivilege(TPrivilege thriftPrivilege, long dropCatalogVersion)
void addDb(TDatabase thriftDb, long catalogVersion)