Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
client-cache.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "runtime/client-cache.h"
16 
17 #include <sstream>
18 #include <thrift/server/TServer.h>
19 #include <thrift/protocol/TBinaryProtocol.h>
20 #include <thrift/transport/TSocket.h>
21 #include <thrift/transport/TTransportUtils.h>
22 #include <memory>
23 
24 #include <boost/foreach.hpp>
25 
26 #include "common/logging.h"
27 #include "util/container-util.h"
28 #include "util/network-util.h"
29 #include "rpc/thrift-util.h"
30 #include "gen-cpp/ImpalaInternalService.h"
31 
32 #include "common/names.h"
33 using namespace apache::thrift;
34 using namespace apache::thrift::server;
35 using namespace apache::thrift::transport;
36 using namespace apache::thrift::protocol;
37 
38 namespace impala {
39 
40 Status ClientCacheHelper::GetClient(const TNetworkAddress& address,
41  ClientFactory factory_method, ClientKey* client_key) {
42  shared_ptr<PerHostCache> host_cache;
43  {
44  lock_guard<mutex> lock(cache_lock_);
45  VLOG(2) << "GetClient(" << address << ")";
46  shared_ptr<PerHostCache>* ptr = &per_host_caches_[address];
47  if (ptr->get() == NULL) ptr->reset(new PerHostCache());
48  host_cache = *ptr;
49  }
50 
51  {
52  lock_guard<mutex> lock(host_cache->lock);
53  if (!host_cache->clients.empty()) {
54  *client_key = host_cache->clients.front();
55  VLOG(2) << "GetClient(): returning cached client for " << address;
56  host_cache->clients.pop_front();
57  if (metrics_enabled_) clients_in_use_metric_->Increment(1);
58  return Status::OK;
59  }
60  }
61 
62  // Only get here if host_cache->clients.empty(). No need for the lock.
63  RETURN_IF_ERROR(CreateClient(address, factory_method, client_key));
64  if (metrics_enabled_) clients_in_use_metric_->Increment(1);
65  return Status::OK;
66 }
67 
68 Status ClientCacheHelper::ReopenClient(ClientFactory factory_method,
69  ClientKey* client_key) {
70  // Clients are not ordinarily removed from the cache completely (in the future, they may
71  // be); this is the only method where a client may be deleted and replaced with another.
72  shared_ptr<ThriftClientImpl> client_impl;
73  ClientMap::iterator client;
74  {
75  lock_guard<mutex> lock(client_map_lock_);
76  client = client_map_.find(*client_key);
77  DCHECK(client != client_map_.end());
78  client_impl = client->second;
79  }
80  VLOG(1) << "ReopenClient(): re-creating client for " << client_impl->address();
81 
82  client_impl->Close();
83 
84  // TODO: Thrift TBufferedTransport cannot be re-opened after Close() because it does not
85  // clean up internal buffers it reopens. To work around this issue, create a new client
86  // instead.
87  ClientKey* old_client_key = client_key;
88  if (metrics_enabled_) total_clients_metric_->Increment(-1);
89  Status status = CreateClient(client_impl->address(), factory_method, client_key);
90  // Only erase the existing client from the map if creation of the new one succeeded.
91  // This helps to ensure the proper accounting of metrics in the presence of
92  // re-connection failures (the original client should be released as usual).
93  if (status.ok()) {
94  lock_guard<mutex> lock(client_map_lock_);
95  client_map_.erase(client);
96  } else {
97  // Restore the client used before the failed re-opening attempt, so the caller can
98  // properly release it.
99  *client_key = *old_client_key;
100  }
101  return status;
102 }
103 
104 Status ClientCacheHelper::CreateClient(const TNetworkAddress& address,
105  ClientFactory factory_method, ClientKey* client_key) {
106  shared_ptr<ThriftClientImpl> client_impl(factory_method(address, client_key));
107  VLOG(2) << "CreateClient(): creating new client for " << client_impl->address();
108  Status status = client_impl->OpenWithRetry(num_tries_, wait_ms_);
109  if (!status.ok()) {
110  *client_key = NULL;
111  return status;
112  }
113  // Set the TSocket's send and receive timeouts.
114  client_impl->setRecvTimeout(recv_timeout_ms_);
115  client_impl->setSendTimeout(send_timeout_ms_);
116 
117  // Because the client starts life 'checked out', we don't add it to its host cache.
118  {
119  lock_guard<mutex> lock(client_map_lock_);
120  client_map_[*client_key] = client_impl;
121  }
122 
123  if (metrics_enabled_) total_clients_metric_->Increment(1);
124  return Status::OK;
125 }
126 
127 void ClientCacheHelper::ReleaseClient(ClientKey* client_key) {
128  DCHECK(*client_key != NULL) << "Trying to release NULL client";
129  shared_ptr<ThriftClientImpl> client_impl;
130  {
131  lock_guard<mutex> lock(client_map_lock_);
132  ClientMap::iterator client = client_map_.find(*client_key);
133  DCHECK(client != client_map_.end());
134  client_impl = client->second;
135  }
136  VLOG(2) << "Releasing client for " << client_impl->address() << " back to cache";
137  {
138  lock_guard<mutex> lock(cache_lock_);
139  PerHostCacheMap::iterator cache = per_host_caches_.find(client_impl->address());
140  DCHECK(cache != per_host_caches_.end());
141  lock_guard<mutex> entry_lock(cache->second->lock);
142  cache->second->clients.push_back(*client_key);
143  }
144  if (metrics_enabled_) clients_in_use_metric_->Increment(-1);
145  *client_key = NULL;
146 }
147 
148 void ClientCacheHelper::CloseConnections(const TNetworkAddress& address) {
149  PerHostCache* cache;
150  {
151  lock_guard<mutex> lock(cache_lock_);
152  PerHostCacheMap::iterator cache_it = per_host_caches_.find(address);
153  if (cache_it == per_host_caches_.end()) return;
154  cache = cache_it->second.get();
155  }
156 
157  {
158  VLOG(2) << "Invalidating all " << cache->clients.size() << " clients for: "
159  << address;
160  lock_guard<mutex> entry_lock(cache->lock);
161  lock_guard<mutex> map_lock(client_map_lock_);
162  BOOST_FOREACH(ClientKey client_key, cache->clients) {
163  ClientMap::iterator client_map_entry = client_map_.find(client_key);
164  DCHECK(client_map_entry != client_map_.end());
165  client_map_entry->second->Close();
166  }
167  }
168 }
169 
171  lock_guard<mutex> lock(cache_lock_);
172  stringstream out;
173  out << "ClientCacheHelper(#hosts=" << per_host_caches_.size()
174  << " [";
175  bool first = true;
176  BOOST_FOREACH(const PerHostCacheMap::value_type& cache, per_host_caches_) {
177  lock_guard<mutex> host_cache_lock(cache.second->lock);
178  if (!first) out << " ";
179  out << cache.first << ":" << cache.second->clients.size();
180  first = false;
181  }
182  out << "])";
183  return out.str();
184 }
185 
186 void ClientCacheHelper::TestShutdown() {
187  vector<TNetworkAddress> addresses;
188  {
189  lock_guard<mutex> lock(cache_lock_);
190  BOOST_FOREACH(const PerHostCacheMap::value_type& cache_entry, per_host_caches_) {
191  addresses.push_back(cache_entry.first);
192  }
193  }
194  BOOST_FOREACH(const TNetworkAddress& address, addresses) {
195  CloseConnections(address);
196  }
197 }
198 
199 void ClientCacheHelper::InitMetrics(MetricGroup* metrics, const string& key_prefix) {
200  DCHECK(metrics != NULL);
201  // Not strictly needed if InitMetrics is called before any cache usage, but ensures that
202  // metrics_enabled_ is published.
203  lock_guard<mutex> lock(cache_lock_);
204  stringstream count_ss;
205  count_ss << key_prefix << ".client-cache.clients-in-use";
206  clients_in_use_metric_ = metrics->AddGauge(count_ss.str(), 0L);
207 
208  stringstream max_ss;
209  max_ss << key_prefix << ".client-cache.total-clients";
210  total_clients_metric_ = metrics->AddGauge(max_ss.str(), 0L);
211  metrics_enabled_ = true;
212 }
213 
214 }
void * ClientKey
Definition: client-cache.h:35
boost::function< ThriftClientImpl *(const TNetworkAddress &address, ClientKey *client_key)> ClientFactory
Definition: client-cache.h:68
boost::mutex lock
Protects clients.
Definition: client-cache.h:133
const StringSearch UrlParser::protocol_search & protocol
Definition: url-parser.cc:36
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
std::string DebugString(const T &val)
Definition: udf-debug.h:27
SimpleMetric< T > * AddGauge(const std::string &key, const T &value, const TUnit::type unit=TUnit::NONE, const std::string &description="")
Create a gauge metric object with given key and initial value (owned by this object) ...
Definition: metrics.h:223
std::list< ClientKey > clients
List of client keys for this entry's host.
Definition: client-cache.h:136
bool ok() const
Definition: status.h:172