Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
thrift-client.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rpc/thrift-client.h"
16 
17 #include <boost/assign.hpp>
18 #include <boost/lexical_cast.hpp>
19 #include <ostream>
20 #include <thrift/Thrift.h>
21 #include <gutil/strings/substitute.h>
22 
23 #include "util/time.h"
24 
25 #include "common/names.h"
26 
27 using namespace apache::thrift::transport;
28 using namespace apache::thrift;
29 using namespace strings;
30 
31 DECLARE_string(ssl_client_ca_certificate);
32 
33 namespace impala {
34 
35 Status ThriftClientImpl::Open() {
36  if (!socket_create_status_.ok()) return socket_create_status_;
37  try {
38  if (!transport_->isOpen()) {
39  transport_->open();
40  }
41  } catch (const TException& e) {
42  return Status(Substitute("Couldn't open transport for $0 ($1)",
43  lexical_cast<string>(address_), e.what()));
44  }
45  return Status::OK;
46 }
47 
48 Status ThriftClientImpl::OpenWithRetry(uint32_t num_tries, uint64_t wait_ms) {
49  uint32_t try_count = 0L;
50  while (true) {
51  ++try_count;
52  Status status = Open();
53  if (status.ok()) return status;
54 
55  LOG(INFO) << "Unable to connect to " << address_;
56  if (num_tries == 0) {
57  LOG(INFO) << "(Attempt " << try_count << ", will retry indefinitely)";
58  } else {
59  if (num_tries != 1) {
60  // No point logging 'attempt 1 of 1'
61  LOG(INFO) << "(Attempt " << try_count << " of " << num_tries << ")";
62  }
63  if (try_count == num_tries) return status;
64  }
65  SleepForMs(wait_ms);
66  }
67 }
68 
69 void ThriftClientImpl::Close() {
70  try {
71  if (transport_.get() != NULL && transport_->isOpen()) transport_->close();
72  } catch (const TException& e) {
73  LOG(INFO) << "Error closing connection to: " << address_ << ", ignoring (" << e.what()
74  << ")";
75  // Forcibly close the socket (since the transport may have failed to get that far
76  // during close())
77  try {
78  if (socket_.get() != NULL) socket_->close();
79  } catch (const TException& e) {
80  LOG(INFO) << "Error closing socket to: " << address_ << ", ignoring (" << e.what()
81  << ")";
82  }
83  }
84 }
85 
86 Status ThriftClientImpl::CreateSocket() {
87  if (!ssl_) {
88  socket_.reset(new TSocket(address_.hostname, address_.port));
89  } else {
90  try {
91  TSSLSocketFactory factory;
92  // TODO: No need to do this every time we create a socket, the factory can be
93  // shared. But since there may be many certificates, this needs some slightly more
94  // complex infrastructure to do right.
95  factory.loadTrustedCertificates(FLAGS_ssl_client_ca_certificate.c_str());
96  socket_ = factory.createSocket(address_.hostname, address_.port);
97  } catch (const TException& e) {
98  return Status(Substitute("Failed to create socket: $0", e.what()));
99  }
100  }
101 
102  return Status::OK;
103 }
104 
105 }
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
Definition: time.cc:21
DECLARE_string(ssl_client_ca_certificate)
bool ok() const
Definition: status.h:172