Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
client-cache.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef IMPALA_RUNTIME_CLIENT_CACHE_H
16 #define IMPALA_RUNTIME_CLIENT_CACHE_H
17 
18 #include <vector>
19 #include <list>
20 #include <string>
21 #include <boost/unordered_map.hpp>
22 #include <boost/thread/mutex.hpp>
23 #include <boost/bind.hpp>
24 
25 #include "util/metrics.h"
26 #include "rpc/thrift-client.h"
27 #include "rpc/thrift-util.h"
28 
29 #include "common/status.h"
30 
31 namespace impala {
32 
35 typedef void* ClientKey;
36 
41 //
47 //
54 //
56 //
64  public:
67  typedef boost::function<ThriftClientImpl* (const TNetworkAddress& address,
68  ClientKey* client_key)> ClientFactory;
69 
74  //
76  Status GetClient(const TNetworkAddress& address, ClientFactory factory_method,
77  ClientKey* client_key);
78 
81  //
84  Status ReopenClient(ClientFactory factory_method, ClientKey* client_key);
85 
88  void ReleaseClient(ClientKey* client_key);
89 
92  void CloseConnections(const TNetworkAddress& address);
93 
95  std::string DebugString();
96 
98  void TestShutdown();
99 
102  void InitMetrics(MetricGroup* metrics, const std::string& key_prefix);
103 
104  private:
105  template <class T> friend class ClientCache;
107  ClientCacheHelper(uint32_t num_tries, uint64_t wait_ms, int32_t send_timeout_ms,
108  int32_t recv_timeout_ms)
109  : num_tries_(num_tries),
110  wait_ms_(wait_ms),
111  send_timeout_ms_(send_timeout_ms),
112  recv_timeout_ms_(recv_timeout_ms),
113  metrics_enabled_(false) { }
114 
120 
125  //
131  struct PerHostCache {
133  boost::mutex lock;
134 
136  std::list<ClientKey> clients;
137  };
138 
140  boost::mutex cache_lock_;
141 
145  typedef boost::unordered_map<
146  TNetworkAddress, boost::shared_ptr<PerHostCache> > PerHostCacheMap;
148 
150  boost::mutex client_map_lock_;
151 
156  typedef std::map<ClientKey, boost::shared_ptr<ThriftClientImpl> > ClientMap;
158 
160  const uint32_t num_tries_;
161 
164 
166  const int32_t send_timeout_ms_;
167 
169  const int32_t recv_timeout_ms_;
170 
174 
177 
180 
182  Status CreateClient(const TNetworkAddress& address, ClientFactory factory_method,
183  ClientKey* client_key);
184 };
185 
186 template<class T>
188 
191 template<class T>
193  public:
194  ClientConnection(ClientCache<T>* client_cache, TNetworkAddress address, Status* status)
195  : client_cache_(client_cache), client_(NULL) {
196  *status = client_cache_->GetClient(address, &client_);
197  if (status->ok()) DCHECK(client_ != NULL);
198  }
199 
201  if (client_ != NULL) {
202  client_cache_->ReleaseClient(&client_);
203  }
204  }
205 
207  return client_cache_->ReopenClient(&client_);
208  }
209 
210  T* operator->() const { return client_; }
211 
217  //
221  //
224  template <class F, class Request, class Response>
225  Status DoRpc(const F& f, const Request& request, Response* response) {
226  DCHECK(response != NULL);
227  try {
228  (client_->*f)(*response, request);
229  } catch (const apache::thrift::TException& e) {
230  if (IsTimeoutTException(e)) return Status(TErrorCode::RPC_TIMEOUT);
231 
232  // Client may have unexpectedly been closed, so re-open and retry.
233  // TODO: ThriftClient should return proper error codes.
235  try {
236  (client_->*f)(*response, request);
237  } catch (apache::thrift::TException& e) {
238  // By this point the RPC really has failed.
239  return Status(TErrorCode::RPC_GENERAL_ERROR, e.what());
240  }
241  }
242  return Status::OK;
243  }
244 
245  private:
248 };
249 
252 template<class T>
253 class ClientCache {
254  public:
256 
257  ClientCache(const std::string& service_name = "") : client_cache_helper_(1, 0, 0, 0) {
258  client_factory_ = boost::bind<ThriftClientImpl*>(
259  boost::mem_fn(&ClientCache::MakeClient), this, _1, _2, service_name);
260  }
261 
266  ClientCache(uint32_t num_tries, uint64_t wait_ms, int32_t send_timeout_ms = 0,
267  int32_t recv_timeout_ms = 0, const std::string& service_name = "")
268  : client_cache_helper_(num_tries, wait_ms, send_timeout_ms, recv_timeout_ms) {
270  boost::bind<ThriftClientImpl*>(
271  boost::mem_fn(&ClientCache::MakeClient), this, _1, _2, service_name);
272  }
273 
277  void CloseConnections(const TNetworkAddress& address) {
278  return client_cache_helper_.CloseConnections(address);
279  }
280 
282  std::string DebugString() {
284  }
285 
287  void TestShutdown() {
289  }
290 
295  void InitMetrics(MetricGroup* metrics, const std::string& key_prefix) {
296  client_cache_helper_.InitMetrics(metrics, key_prefix);
297  }
298 
299  private:
300  friend class ClientConnection<T>;
301 
306 
309 
313  Status GetClient(const TNetworkAddress& address, T** iface) {
315  reinterpret_cast<ClientKey*>(iface));
316  }
317 
322  Status ReopenClient(T** client) {
324  reinterpret_cast<ClientKey*>(client));
325  }
326 
328  void ReleaseClient(T** client) {
329  return client_cache_helper_.ReleaseClient(reinterpret_cast<ClientKey*>(client));
330  }
331 
333  ThriftClientImpl* MakeClient(const TNetworkAddress& address, ClientKey* client_key,
334  const std::string service_name) {
335  Client* client = new Client(address.hostname, address.port, service_name);
336  *client_key = reinterpret_cast<ClientKey>(client->iface());
337  return client;
338  }
339 
340 };
341 
343 
344 class ImpalaInternalServiceClient;
347 
348 class CatalogServiceClient;
351 }
352 
353 #endif
const uint64_t wait_ms_
Time to wait between failed connection attempts.
Definition: client-cache.h:163
Status ReopenClient(ClientFactory factory_method, ClientKey *client_key)
Definition: client-cache.cc:68
void * ClientKey
Definition: client-cache.h:35
boost::function< ThriftClientImpl *(const TNetworkAddress &address, ClientKey *client_key)> ClientFactory
Definition: client-cache.h:68
Status GetClient(const TNetworkAddress &address, T **iface)
Definition: client-cache.h:313
boost::mutex lock
Protects clients.
Definition: client-cache.h:133
void TestShutdown()
For testing only: shutdown all clients.
Definition: client-cache.h:287
std::map< ClientKey, boost::shared_ptr< ThriftClientImpl > > ClientMap
Definition: client-cache.h:156
void InitMetrics(MetricGroup *metrics, const std::string &key_prefix)
Definition: client-cache.h:295
ClientConnection< CatalogServiceClient > CatalogServiceConnection
Definition: client-cache.h:350
void ReleaseClient(ClientKey *client_key)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
InterfaceType * iface()
Returns the object used to actually make RPCs against the remote server.
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
const uint32_t num_tries_
Number of attempts to make to open a connection. 0 means retry indefinitely.
Definition: client-cache.h:160
ThriftClientImpl * MakeClient(const TNetworkAddress &address, ClientKey *client_key, const std::string service_name)
Factory method to produce a new ThriftClient<T> for the wrapped cache.
Definition: client-cache.h:333
boost::mutex client_map_lock_
Protects client_map_.
Definition: client-cache.h:150
IntGauge * total_clients_metric_
Total clients in the cache, including those in use.
Definition: client-cache.h:179
ClientCacheHelper client_cache_helper_
Definition: client-cache.h:305
Status GetClient(const TNetworkAddress &address, ClientFactory factory_method, ClientKey *client_key)
If there is an error creating the new client, *client_key will be NULL.
Definition: client-cache.cc:40
ClientCache< CatalogServiceClient > CatalogServiceClientCache
Definition: client-cache.h:348
std::string DebugString()
Helper method which returns a debug string.
Definition: client-cache.h:282
IntGauge * clients_in_use_metric_
Number of clients 'checked-out' from the cache.
Definition: client-cache.h:176
Status DoRpc(const F &f, const Request &request, Response *response)
Definition: client-cache.h:225
ClientCache(const std::string &service_name="")
Definition: client-cache.h:257
ClientCacheHelper::ClientFactory client_factory_
Function pointer, bound to MakeClient, which produces clients when the cache is empty.
Definition: client-cache.h:308
bool IsTimeoutTException(const TException &e)
Definition: thrift-util.cc:175
ThriftClient< T > Client
Definition: client-cache.h:255
Super class for templatized thrift clients.
Definition: thrift-client.h:42
PerHostCacheMap per_host_caches_
Definition: client-cache.h:147
ClientCache(uint32_t num_tries, uint64_t wait_ms, int32_t send_timeout_ms=0, int32_t recv_timeout_ms=0, const std::string &service_name="")
Definition: client-cache.h:266
void ReleaseClient(T **client)
Return the client to the cache and set *client to NULL.
Definition: client-cache.h:328
const int32_t send_timeout_ms_
Time to wait for the underlying socket to send data, e.g., for an RPC.
Definition: client-cache.h:166
std::string DebugString()
Return a debug representation of the contents of this cache.
static const Status OK
Definition: status.h:87
void TestShutdown()
Closes every connection in the cache. Used only for testing.
std::list< ClientKey > clients
List of client keys for this entry's host.
Definition: client-cache.h:136
void CloseConnections(const TNetworkAddress &address)
Status CreateClient(const TNetworkAddress &address, ClientFactory factory_method, ClientKey *client_key)
Create a new client for specific address in 'client' and put it in client_map_.
void CloseConnections(const TNetworkAddress &address)
Definition: client-cache.h:277
ClientConnection< ImpalaInternalServiceClient > ImpalaInternalServiceConnection
Definition: client-cache.h:346
bool ok() const
Definition: status.h:172
boost::mutex cache_lock_
Protects per_host_caches_.
Definition: client-cache.h:140
Status ReopenClient(T **client)
Definition: client-cache.h:322
This class is thread-safe.
Definition: client-cache.h:63
ClientCacheHelper(uint32_t num_tries, uint64_t wait_ms, int32_t send_timeout_ms, int32_t recv_timeout_ms)
Private constructor so that only ClientCache can instantiate this class.
Definition: client-cache.h:107
void InitMetrics(MetricGroup *metrics, const std::string &key_prefix)
boost::unordered_map< TNetworkAddress, boost::shared_ptr< PerHostCache > > PerHostCacheMap
Definition: client-cache.h:146
ClientCache< T > * client_cache_
Definition: client-cache.h:246
ClientConnection(ClientCache< T > *client_cache, TNetworkAddress address, Status *status)
Definition: client-cache.h:194
ClientCache< ImpalaInternalServiceClient > ImpalaInternalServiceClientCache
Definition: client-cache.h:344
const int32_t recv_timeout_ms_
Time to wait for the underlying socket to receive data, e.g., for an RPC response.
Definition: client-cache.h:169