Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
MetaStoreClientPool.java
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.cloudera.impala.catalog;
16 
17 import java.util.concurrent.ConcurrentLinkedQueue;
18 
19 import org.apache.hadoop.hive.conf.HiveConf;
20 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
21 import org.apache.log4j.Logger;
22 
23 import com.google.common.base.Preconditions;
24 
29 public class MetaStoreClientPool {
30  // Key for config option read from hive-site.xml
31  private static final String HIVE_METASTORE_CNXN_DELAY_MS_CONF =
32  "impala.catalog.metastore.cnxn.creation.delay.ms";
33  private static final int DEFAULT_HIVE_METASTORE_CNXN_DELAY_MS_CONF = 0;
34  // Number of milliseconds to sleep between creation of HMS connections. Used to debug
35  // IMPALA-825.
36  private final int clientCreationDelayMs_;
37 
38  private static final Logger LOG = Logger.getLogger(MetaStoreClientPool.class);
39 
40  private final ConcurrentLinkedQueue<MetaStoreClient> clientPool_ =
41  new ConcurrentLinkedQueue<MetaStoreClient>();
42  private Boolean poolClosed_ = false;
43  private final HiveConf hiveConf_;
44 
49  public class MetaStoreClient {
50  private final HiveMetaStoreClient hiveClient_;
51  private boolean isInUse_;
52 
53  private MetaStoreClient(HiveConf hiveConf) {
54  try {
55  LOG.debug("Creating MetaStoreClient. Pool Size = " + clientPool_.size());
56  this.hiveClient_ = new HiveMetaStoreClient(hiveConf);
57  } catch (Exception e) {
58  // Turn in to an unchecked exception
59  throw new IllegalStateException(e);
60  }
61  this.isInUse_ = false;
62  }
63 
67  public HiveMetaStoreClient getHiveClient() {
68  return hiveClient_;
69  }
70 
75  public void release() {
76  Preconditions.checkState(isInUse_);
77  isInUse_ = false;
78  // Ensure the connection isn't returned to the pool if the pool has been closed.
79  // This lock is needed to ensure proper behavior when a thread reads poolClosed
80  // is false, but a call to pool.close() comes in immediately afterward.
81  synchronized (poolClosed_) {
82  if (poolClosed_) {
83  hiveClient_.close();
84  } else {
85  // TODO: Currently the pool does not work properly because we cannot
86  // reuse MetastoreClient connections. No reason to add this client back
87  // to the pool. See HIVE-5181.
88  // clientPool.add(this);
89  hiveClient_.close();
90  }
91  }
92  }
93 
94  // Marks this client as in use
95  private void markInUse() {
96  isInUse_ = true;
97  }
98  }
99 
100  public MetaStoreClientPool(int initialSize) {
101  this(initialSize, new HiveConf(MetaStoreClientPool.class));
102  }
103 
104  public MetaStoreClientPool(int initialSize, HiveConf hiveConf) {
105  this.hiveConf_ = hiveConf;
108  addClients(initialSize);
109  }
110 
114  public void addClients(int numClients) {
115  for (int i = 0; i < numClients; ++i) {
116  clientPool_.add(new MetaStoreClient(hiveConf_));
117  }
118  }
119 
124  // The MetaStoreClient c'tor relies on knowing the Hadoop version by asking
125  // org.apache.hadoop.util.VersionInfo. The VersionInfo class relies on opening
126  // the 'common-version-info.properties' file as a resource from hadoop-common*.jar
127  // using the Thread's context classloader. If necessary, set the Thread's context
128  // classloader, otherwise VersionInfo will fail in it's c'tor.
129  if (Thread.currentThread().getContextClassLoader() == null) {
130  Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
131  }
132 
133  MetaStoreClient client = clientPool_.poll();
134  // The pool was empty so create a new client and return that.
135  // Serialize client creation to defend against possible race conditions accessing
136  // local Kerberos state (see IMPALA-825).
137  synchronized (this) {
138  try {
139  Thread.sleep(clientCreationDelayMs_);
140  } catch (InterruptedException e) {
141  /* ignore */
142  }
143  if (client == null) {
144  client = new MetaStoreClient(hiveConf_);
145  } else {
146  // TODO: Due to Hive Metastore bugs, there is leftover state from previous client
147  // connections so we are unable to reuse the same connection. For now simply
148  // reconnect each time. One possible culprit is HIVE-5181.
149  client = new MetaStoreClient(hiveConf_);
150  }
151  }
152  client.markInUse();
153  return client;
154  }
155 
160  public void close() {
161  // Ensure no more items get added to the pool once close is called.
162  synchronized (poolClosed_) {
163  if (poolClosed_) {
164  return;
165  }
166  poolClosed_ = true;
167  }
168 
169  MetaStoreClient client = null;
170  while ((client = clientPool_.poll()) != null) {
171  client.getHiveClient().close();
172  }
173  }
174 }
MetaStoreClientPool(int initialSize, HiveConf hiveConf)
final ConcurrentLinkedQueue< MetaStoreClient > clientPool_