Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
TableLoadingMgr.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.catalog;
16 
17 import java.util.Collections;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Set;
22 import java.util.concurrent.Callable;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.FutureTask;
28 import java.util.concurrent.LinkedBlockingDeque;
29 import java.util.concurrent.LinkedBlockingQueue;
30 
31 import org.apache.log4j.Logger;
32 
33 import com.cloudera.impala.thrift.TTableName;
35 import com.google.common.base.Preconditions;
36 import com.google.common.collect.Maps;
37 
47 public class TableLoadingMgr {
53  public class LoadRequest {
54  private final Future<Table> tblTask_;
55  private final TTableName tblName_;
56 
57  private LoadRequest(TTableName tblName, Future<Table> tblTask) {
58  tblTask_ = tblTask;
59  tblName_ = tblName;
60  }
61 
66  public Table get() {
67  Table tbl;
68  try {
69  tbl = tblTask_.get();
70  } catch (Exception e) {
71  tbl = IncompleteTable.createFailedMetadataLoadTable(
72  TableId.createInvalidId(), catalog_.getDb(tblName_.getDb_name()),
73  tblName_.getTable_name(), new TableLoadingException(e.getMessage(), e));
74  }
75  Preconditions.checkState(tbl.isLoaded());
76  return tbl;
77  }
78 
84  public void close() {
85  synchronized (loadingTables_) {
87  }
88  }
89  }
90 
91  private static final Logger LOG = Logger.getLogger(TableLoadingMgr.class);
92 
93  // A thread safe blocking deque that is used to prioritize the loading of table
94  // metadata. The CatalogServer has a background thread that will always add unloaded
95  // tables to the tail of the deque. However, a call to prioritizeLoad() will add
96  // tables to the head of the deque. The next table to load is always taken from the
97  // head of the deque. May contain the same table multiple times, but a second
98  // attempt to load the table metadata will be a no-op.
99  private final LinkedBlockingDeque<TTableName> tableLoadingDeque_ =
100  new LinkedBlockingDeque<TTableName>();
101 
102  // A thread safe HashSet of table names that are in the tableLoadingDeque_. Used to
103  // efficiently check for existence of items in the deque.
104  // Updates may lead/lag updates to the tableLoadingDeque_ - they are added to this set
105  // immediately before being added to the deque and removed immediately after removing
106  // from the deque. The fact the updates are not synchronized shouldn't impact
107  // functionality since this set is only used for efficient lookups.
108  private final Set<TTableName> tableLoadingSet_ =
109  Collections.synchronizedSet(new HashSet<TTableName>());
110 
111  // Map of table name to a FutureTask associated with the table load. Used to
112  // prevent duplicate loads of the same table.
113  private final ConcurrentHashMap<TTableName, FutureTask<Table>> loadingTables_ =
114  new ConcurrentHashMap<TTableName, FutureTask<Table>>();
115 
116  // Map of table name to the cache directives that are being waited on for that table.
117  // Once all directives have completed, the table's metadata will be refreshed and
118  // the table will be removed from this map.
119  // A caching operation may take a long time to complete, so to maximize query
120  // throughput it is preferable to allow the user to continue to run queries against
121  // the table while a cache request completes in the background.
122  private final Map<TTableName, List<Long>> pendingTableCacheDirs_ = Maps.newHashMap();
123 
124  // The number of parallel threads to use to load table metadata. Should be set to a
125  // value that provides good throughput while not putting too much stress on the
126  // metastore.
127  private final int numLoadingThreads_;
128 
129  // Pool of numLoadingThreads_ threads that loads table metadata. If additional tasks
130  // are submitted to the pool after it is full, they will be queued and executed when
131  // the next thread becomes available. There is no hard upper limit on the number of
132  // pending tasks (no work will be rejected, but memory consumption is unbounded).
133  private final ExecutorService tblLoadingPool_;
134 
135  // Thread that incrementally refreshes tables in the background. Used to update a
136  // table's metadata after a long running operation completes, such as marking a
137  // table as cached. There is no hard upper limit on the number of pending tasks
138  // (no work will be rejected, but memory consumption is unbounded). If this thread
139  // dies it will be automatically restarted.
140  // The tables to process are read from the resfreshThreadWork_ queue.
141  ExecutorService asyncRefreshThread_ = Executors.newSingleThreadExecutor();
142 
143  // Tables for the async refresh thread to process. Synchronization must be handled
144  // externally.
145  private final LinkedBlockingQueue<TTableName> refreshThreadWork_ =
146  new LinkedBlockingQueue<TTableName>();
147 
149  private final TableLoader tblLoader_;
150 
151  public TableLoadingMgr(CatalogServiceCatalog catalog, int numLoadingThreads) {
152  catalog_ = catalog;
154  numLoadingThreads_ = numLoadingThreads;
155  tblLoadingPool_ = Executors.newFixedThreadPool(numLoadingThreads_);
156 
157  // Start the background table loading threads.
159 
160  // Start the asyncRefreshThread_. Currently used to wait for cache directives to
161  // complete in the background.
162  asyncRefreshThread_.submit(new Callable<Void>() {
163  @Override
164  public Void call() throws Exception {
165  while(true) {
167  }
168  }});
169  }
170 
174  public void prioritizeLoad(TTableName tblName) {
175  tableLoadingSet_.add(tblName);
176  tableLoadingDeque_.offerFirst(tblName);
177  }
178 
182  public void backgroundLoad(TTableName tblName) {
183  // Only queue for background loading if the table doesn't already exist
184  // in the table loading set.
185  if (tableLoadingSet_.add(tblName)) {
186  tableLoadingDeque_.offerLast(tblName);
187  }
188  }
189 
197  public void watchCacheDirs(List<Long> cacheDirIds, final TTableName tblName) {
198  synchronized (pendingTableCacheDirs_) {
199  // A single table may have multiple pending cache requests since one request
200  // gets submitted per-partition.
201  List<Long> existingCacheReqIds = pendingTableCacheDirs_.get(tblName);
202  if (existingCacheReqIds == null) {
203  existingCacheReqIds = cacheDirIds;
204  pendingTableCacheDirs_.put(tblName, cacheDirIds);
205  refreshThreadWork_.add(tblName);
206  } else {
207  existingCacheReqIds.addAll(cacheDirIds);
208  }
209  }
210  }
211 
221  public LoadRequest loadAsync(final TTableName tblName, final Table previousTbl)
223  final Db parentDb = catalog_.getDb(tblName.getDb_name());
224  if (parentDb == null) {
225  throw new DatabaseNotFoundException(
226  "Database '" + tblName.getDb_name() + "' was not found.");
227  }
228 
229  FutureTask<Table> tableLoadTask = new FutureTask<Table>(new Callable<Table>() {
230  @Override
231  public Table call() throws Exception {
232  return tblLoader_.load(parentDb, tblName.table_name,
233  previousTbl);
234  }});
235 
236  FutureTask<Table> existingValue = loadingTables_.putIfAbsent(tblName, tableLoadTask);
237  if (existingValue == null) {
238  // There was no existing value, submit a new load request.
239  tblLoadingPool_.execute(tableLoadTask);
240  } else {
241  tableLoadTask = existingValue;
242  }
243  return new LoadRequest(tblName, tableLoadTask);
244  }
245 
251  private void startTableLoadingThreads() {
252  ExecutorService loadingPool = Executors.newFixedThreadPool(numLoadingThreads_);
253  try {
254  for (int i = 0; i < numLoadingThreads_; ++i) {
255  loadingPool.execute(new Runnable() {
256  @Override
257  public void run() {
258  while (true) {
259  try {
260  loadNextTable();
261  } catch (Exception e) {
262  LOG.error("Error loading table: ", e);
263  // Ignore exception.
264  }
265  }
266  }
267  });
268  }
269  } finally {
270  loadingPool.shutdown();
271  }
272  }
273 
278  private void loadNextTable() throws InterruptedException {
279  // Always get the next table from the head of the deque.
280  final TTableName tblName = tableLoadingDeque_.takeFirst();
281  tableLoadingSet_.remove(tblName);
282  LOG.debug("Loading next table. Remaining items in queue: "
283  + tableLoadingDeque_.size());
284  try {
285  // TODO: Instead of calling "getOrLoad" here we could call "loadAsync". We would
286  // just need to add a mechanism for moving loaded tables into the Catalog.
287  catalog_.getOrLoadTable(tblName.getDb_name(), tblName.getTable_name());
288  } catch (CatalogException e) {
289  // Ignore.
290  }
291  }
292 
296  private void execAsyncRefreshWork(TTableName tblName) {
297  if (!waitForCacheDirs(tblName)) return;
298  try {
299  // Reload the table metadata to pickup the new cached block location information.
300  catalog_.reloadTable(tblName);
301  } catch (CatalogException e) {
302  LOG.error("Error reloading cached table: ", e);
303  }
304  }
305 
310  private boolean waitForCacheDirs(TTableName tblName) {
311  boolean isRefreshNeeded = false;
312  // Keep processing cache directives for this table until there are none left.
313  while (true) {
314  // Get all pending requests for this table.
315  List<Long> cacheDirIds = null;
316  synchronized (pendingTableCacheDirs_) {
317  cacheDirIds = pendingTableCacheDirs_.remove(tblName);
318  }
319  if (cacheDirIds == null || cacheDirIds.size() == 0) return isRefreshNeeded;
320  isRefreshNeeded = true;
321 
322  // Wait for each cache request to complete.
323  for (Long dirId: cacheDirIds) {
324  if (dirId == null) continue;
325  try {
326  HdfsCachingUtil.waitForDirective(dirId);
327  } catch (Exception e) {
328  LOG.error(String.format(
329  "Error waiting for cache request %d to complete: ", dirId), e);
330  }
331  }
332  }
333  }
334 }
final LinkedBlockingDeque< TTableName > tableLoadingDeque_
boolean waitForCacheDirs(TTableName tblName)
TableLoadingMgr(CatalogServiceCatalog catalog, int numLoadingThreads)
final Map< TTableName, List< Long > > pendingTableCacheDirs_
final LinkedBlockingQueue< TTableName > refreshThreadWork_
LoadRequest(TTableName tblName, Future< Table > tblTask)
void watchCacheDirs(List< Long > cacheDirIds, final TTableName tblName)
final ConcurrentHashMap< TTableName, FutureTask< Table > > loadingTables_
LoadRequest loadAsync(final TTableName tblName, final Table previousTbl)