15 package com.cloudera.impala.catalog;
17 import java.util.Collections;
18 import java.util.HashSet;
19 import java.util.List;
22 import java.util.concurrent.Callable;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.FutureTask;
28 import java.util.concurrent.LinkedBlockingDeque;
29 import java.util.concurrent.LinkedBlockingQueue;
31 import org.apache.log4j.Logger;
33 import com.cloudera.impala.thrift.TTableName;
35 import com.google.common.base.Preconditions;
36 import com.google.common.collect.Maps;
57 private LoadRequest(TTableName tblName, Future<Table> tblTask) {
70 }
catch (Exception e) {
71 tbl = IncompleteTable.createFailedMetadataLoadTable(
75 Preconditions.checkState(tbl.isLoaded());
91 private static final Logger
LOG = Logger.getLogger(TableLoadingMgr.class);
100 new LinkedBlockingDeque<TTableName>();
109 Collections.synchronizedSet(
new HashSet<TTableName>());
114 new ConcurrentHashMap<TTableName, FutureTask<Table>>();
146 new LinkedBlockingQueue<TTableName>();
162 asyncRefreshThread_.submit(
new Callable<Void>() {
164 public Void call()
throws Exception {
175 tableLoadingSet_.add(tblName);
176 tableLoadingDeque_.offerFirst(tblName);
186 tableLoadingDeque_.offerLast(tblName);
201 List<Long> existingCacheReqIds = pendingTableCacheDirs_.get(tblName);
202 if (existingCacheReqIds == null) {
203 existingCacheReqIds = cacheDirIds;
204 pendingTableCacheDirs_.put(tblName, cacheDirIds);
205 refreshThreadWork_.add(tblName);
207 existingCacheReqIds.addAll(cacheDirIds);
223 final Db parentDb = catalog_.getDb(tblName.getDb_name());
224 if (parentDb == null) {
226 "Database '" + tblName.getDb_name() +
"' was not found.");
229 FutureTask<Table> tableLoadTask =
new FutureTask<Table>(
new Callable<Table>() {
231 public Table call()
throws Exception {
232 return tblLoader_.load(parentDb, tblName.table_name,
236 FutureTask<Table> existingValue = loadingTables_.putIfAbsent(tblName, tableLoadTask);
237 if (existingValue == null) {
239 tblLoadingPool_.execute(tableLoadTask);
241 tableLoadTask = existingValue;
255 loadingPool.execute(
new Runnable() {
261 }
catch (Exception e) {
262 LOG.error(
"Error loading table: ", e);
270 loadingPool.shutdown();
280 final TTableName tblName = tableLoadingDeque_.takeFirst();
281 tableLoadingSet_.remove(tblName);
282 LOG.debug(
"Loading next table. Remaining items in queue: "
283 + tableLoadingDeque_.size());
287 catalog_.getOrLoadTable(tblName.getDb_name(), tblName.getTable_name());
300 catalog_.reloadTable(tblName);
302 LOG.error(
"Error reloading cached table: ", e);
311 boolean isRefreshNeeded =
false;
315 List<Long> cacheDirIds = null;
317 cacheDirIds = pendingTableCacheDirs_.remove(tblName);
319 if (cacheDirIds == null || cacheDirIds.size() == 0)
return isRefreshNeeded;
320 isRefreshNeeded =
true;
323 for (Long dirId: cacheDirIds) {
324 if (dirId == null)
continue;
326 HdfsCachingUtil.waitForDirective(dirId);
327 }
catch (Exception e) {
328 LOG.error(String.format(
329 "Error waiting for cache request %d to complete: ", dirId), e);
final CatalogServiceCatalog catalog_
void execAsyncRefreshWork(TTableName tblName)
final TTableName tblName_
final LinkedBlockingDeque< TTableName > tableLoadingDeque_
ExecutorService asyncRefreshThread_
final TableLoader tblLoader_
final Future< Table > tblTask_
boolean waitForCacheDirs(TTableName tblName)
void prioritizeLoad(TTableName tblName)
final ExecutorService tblLoadingPool_
void startTableLoadingThreads()
final int numLoadingThreads_
TableLoadingMgr(CatalogServiceCatalog catalog, int numLoadingThreads)
final Map< TTableName, List< Long > > pendingTableCacheDirs_
void backgroundLoad(TTableName tblName)
final LinkedBlockingQueue< TTableName > refreshThreadWork_
final Set< TTableName > tableLoadingSet_
LoadRequest(TTableName tblName, Future< Table > tblTask)
void watchCacheDirs(List< Long > cacheDirIds, final TTableName tblName)
final ConcurrentHashMap< TTableName, FutureTask< Table > > loadingTables_
LoadRequest loadAsync(final TTableName tblName, final Table previousTbl)