18 #include <thrift/server/TServer.h>
19 #include <thrift/protocol/TBinaryProtocol.h>
20 #include <thrift/transport/TSocket.h>
21 #include <thrift/transport/TTransportUtils.h>
24 #include <boost/foreach.hpp>
30 #include "gen-cpp/ImpalaInternalService.h"
33 using namespace apache::thrift;
34 using namespace apache::thrift::server;
35 using namespace apache::thrift::transport;
40 Status ClientCacheHelper::GetClient(
const TNetworkAddress& address,
42 shared_ptr<PerHostCache> host_cache;
44 lock_guard<mutex> lock(cache_lock_);
45 VLOG(2) <<
"GetClient(" << address <<
")";
46 shared_ptr<PerHostCache>* ptr = &per_host_caches_[address];
52 lock_guard<mutex> lock(host_cache->lock);
53 if (!host_cache->clients.empty()) {
54 *client_key = host_cache->clients.front();
55 VLOG(2) <<
"GetClient(): returning cached client for " << address;
56 host_cache->clients.pop_front();
57 if (metrics_enabled_) clients_in_use_metric_->Increment(1);
64 if (metrics_enabled_) clients_in_use_metric_->Increment(1);
72 shared_ptr<ThriftClientImpl> client_impl;
73 ClientMap::iterator client;
75 lock_guard<mutex> lock(client_map_lock_);
76 client = client_map_.find(*client_key);
77 DCHECK(client != client_map_.end());
78 client_impl = client->second;
80 VLOG(1) <<
"ReopenClient(): re-creating client for " << client_impl->address();
88 if (metrics_enabled_) total_clients_metric_->Increment(-1);
89 Status status = CreateClient(client_impl->address(), factory_method, client_key);
94 lock_guard<mutex> lock(client_map_lock_);
95 client_map_.erase(client);
99 *client_key = *old_client_key;
104 Status ClientCacheHelper::CreateClient(
const TNetworkAddress& address,
106 shared_ptr<ThriftClientImpl> client_impl(factory_method(address, client_key));
107 VLOG(2) <<
"CreateClient(): creating new client for " << client_impl->address();
108 Status status = client_impl->OpenWithRetry(num_tries_, wait_ms_);
114 client_impl->setRecvTimeout(recv_timeout_ms_);
115 client_impl->setSendTimeout(send_timeout_ms_);
119 lock_guard<mutex> lock(client_map_lock_);
120 client_map_[*client_key] = client_impl;
123 if (metrics_enabled_) total_clients_metric_->Increment(1);
127 void ClientCacheHelper::ReleaseClient(
ClientKey* client_key) {
128 DCHECK(*client_key != NULL) <<
"Trying to release NULL client";
129 shared_ptr<ThriftClientImpl> client_impl;
131 lock_guard<mutex> lock(client_map_lock_);
132 ClientMap::iterator client = client_map_.find(*client_key);
133 DCHECK(client != client_map_.end());
134 client_impl = client->second;
136 VLOG(2) <<
"Releasing client for " << client_impl->address() <<
" back to cache";
138 lock_guard<mutex> lock(cache_lock_);
139 PerHostCacheMap::iterator cache = per_host_caches_.find(client_impl->address());
140 DCHECK(cache != per_host_caches_.end());
141 lock_guard<mutex> entry_lock(cache->second->lock);
142 cache->second->clients.push_back(*client_key);
144 if (metrics_enabled_) clients_in_use_metric_->Increment(-1);
148 void ClientCacheHelper::CloseConnections(
const TNetworkAddress& address) {
151 lock_guard<mutex> lock(cache_lock_);
152 PerHostCacheMap::iterator cache_it = per_host_caches_.find(address);
153 if (cache_it == per_host_caches_.end())
return;
154 cache = cache_it->second.get();
158 VLOG(2) <<
"Invalidating all " << cache->
clients.size() <<
" clients for: "
160 lock_guard<mutex> entry_lock(cache->
lock);
161 lock_guard<mutex> map_lock(client_map_lock_);
163 ClientMap::iterator client_map_entry = client_map_.find(client_key);
164 DCHECK(client_map_entry != client_map_.end());
165 client_map_entry->second->Close();
171 lock_guard<mutex> lock(cache_lock_);
173 out <<
"ClientCacheHelper(#hosts=" << per_host_caches_.size()
176 BOOST_FOREACH(
const PerHostCacheMap::value_type& cache, per_host_caches_) {
177 lock_guard<mutex> host_cache_lock(cache.second->lock);
178 if (!first) out <<
" ";
179 out << cache.first <<
":" << cache.second->clients.size();
186 void ClientCacheHelper::TestShutdown() {
187 vector<TNetworkAddress> addresses;
189 lock_guard<mutex> lock(cache_lock_);
190 BOOST_FOREACH(
const PerHostCacheMap::value_type& cache_entry, per_host_caches_) {
191 addresses.push_back(cache_entry.first);
194 BOOST_FOREACH(
const TNetworkAddress& address, addresses) {
195 CloseConnections(address);
199 void ClientCacheHelper::InitMetrics(
MetricGroup* metrics,
const string& key_prefix) {
200 DCHECK(metrics != NULL);
203 lock_guard<mutex> lock(cache_lock_);
204 stringstream count_ss;
205 count_ss << key_prefix <<
".client-cache.clients-in-use";
206 clients_in_use_metric_ = metrics->
AddGauge(count_ss.str(), 0L);
209 max_ss << key_prefix <<
".client-cache.total-clients";
210 total_clients_metric_ = metrics->
AddGauge(max_ss.str(), 0L);
211 metrics_enabled_ =
true;
boost::function< ThriftClientImpl *(const TNetworkAddress &address, ClientKey *client_key)> ClientFactory
boost::mutex lock
Protects clients.
const StringSearch UrlParser::protocol_search & protocol
#define RETURN_IF_ERROR(stmt)
some generally useful macros
MetricGroups may be organised hierarchically as a tree.
std::string DebugString(const T &val)
SimpleMetric< T > * AddGauge(const std::string &key, const T &value, const TUnit::type unit=TUnit::NONE, const std::string &description="")
Create a gauge metric object with given key and initial value (owned by this object) ...
std::list< ClientKey > clients
List of client keys for this entry's host.