Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
thrift-server.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <boost/filesystem.hpp>
16 #include <boost/thread.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <boost/thread/condition_variable.hpp>
19 #include <boost/uuid/uuid_io.hpp>
20 
21 #include <thrift/concurrency/Thread.h>
22 #include <thrift/concurrency/ThreadManager.h>
23 #include <thrift/protocol/TBinaryProtocol.h>
24 #include <thrift/server/TThreadPoolServer.h>
25 #include <thrift/server/TThreadedServer.h>
26 #include <thrift/transport/TSocket.h>
27 #include <thrift/transport/TSSLServerSocket.h>
28 #include <thrift/transport/TSSLSocket.h>
29 #include <thrift/server/TThreadPoolServer.h>
30 #include <thrift/transport/TServerSocket.h>
31 #include <gflags/gflags.h>
32 
33 #include "gen-cpp/Types_types.h"
34 #include "rpc/authentication.h"
35 #include "rpc/thrift-server.h"
36 #include "rpc/thrift-thread.h"
37 #include "util/debug-util.h"
38 #include "util/network-util.h"
39 #include "util/uid-util.h"
40 #include <sstream>
41 
42 #include "common/names.h"
43 
44 namespace posix_time = boost::posix_time;
45 using boost::filesystem::exists;
46 using boost::get_system_time;
47 using boost::system_time;
48 using boost::uuids::uuid;
49 using namespace apache::thrift::concurrency;
50 using namespace apache::thrift::protocol;
51 using namespace apache::thrift::server;
52 using namespace apache::thrift::transport;
53 using namespace apache::thrift;
54 
55 DEFINE_int32(rpc_cnxn_attempts, 10, "Deprecated");
56 DEFINE_int32(rpc_cnxn_retry_interval_ms, 2000, "Deprecated");
57 DECLARE_string(principal);
58 DECLARE_string(keytab_file);
59 
60 namespace impala {
61 
62 // Helper class that starts a server in a separate thread, and handles
63 // the inter-thread communication to monitor whether it started
64 // correctly.
66  public:
68  : thrift_server_(thrift_server),
69  signal_fired_(false) { }
70 
71  // Called by the Thrift server implementation when it has acquired its resources and is
72  // ready to serve, and signals to StartAndWaitForServer that start-up is finished. From
73  // TServerEventHandler.
74  virtual void preServe();
75 
76  // Called when a client connects; we create per-client state and call any
77  // ConnectionHandlerIf handler.
78  virtual void* createContext(shared_ptr<TProtocol> input, shared_ptr<TProtocol> output);
79 
80  // Called when a client starts an RPC; we set the thread-local connection context.
81  virtual void processContext(void* context, shared_ptr<TTransport> output);
82 
83  // Called when a client disconnects; we call any ConnectionHandlerIf handler.
84  virtual void deleteContext(void* serverContext, shared_ptr<TProtocol> input,
85  shared_ptr<TProtocol> output);
86 
87  // Waits for a timeout of TIMEOUT_MS for a server to signal that it has started
88  // correctly.
89  Status StartAndWaitForServer();
90 
91  private:
92  // Lock used to ensure that there are no missed notifications between starting the
93  // supervision thread and calling signal_cond_.timed_wait. Also used to ensure
94  // thread-safe access to members of thrift_server_
95  boost::mutex signal_lock_;
96 
97  // Condition variable that is notified by the supervision thread once either
98  // a) all is well or b) an error occurred.
99  boost::condition_variable signal_cond_;
100 
101  // The ThriftServer under management. This class is a friend of ThriftServer, and
102  // reaches in to change member variables at will.
104 
105  // Guards against spurious condition variable wakeups
107 
108  // The time, in milliseconds, to wait for a server to come up
109  static const int TIMEOUT_MS = 2500;
110 
111  // Called in a separate thread
112  void Supervise();
113 };
114 
115 Status ThriftServer::ThriftServerEventProcessor::StartAndWaitForServer() {
116  // Locking here protects against missed notifications if Supervise executes quickly
117  unique_lock<mutex> lock(signal_lock_);
118  thrift_server_->started_ = false;
119 
120  stringstream name;
121  name << "supervise-" << thrift_server_->name_;
122  thrift_server_->server_thread_.reset(
123  new Thread("thrift-server", name.str(),
124  &ThriftServer::ThriftServerEventProcessor::Supervise, this));
125 
126  system_time deadline = get_system_time() +
127  posix_time::milliseconds(ThriftServer::ThriftServerEventProcessor::TIMEOUT_MS);
128 
129  // Loop protects against spurious wakeup. Locks provide necessary fences to ensure
130  // visibility.
131  while (!signal_fired_) {
132  // Yields lock and allows supervision thread to continue and signal
133  if (!signal_cond_.timed_wait(lock, deadline)) {
134  stringstream ss;
135  ss << "ThriftServer '" << thrift_server_->name_ << "' (on port: "
136  << thrift_server_->port_ << ") did not start within "
137  << ThriftServer::ThriftServerEventProcessor::TIMEOUT_MS << "ms";
138  LOG(ERROR) << ss.str();
139  return Status(ss.str());
140  }
141  }
142 
143  // started_ == true only if preServe was called. May be false if there was an exception
144  // after preServe that was caught by Supervise, causing it to reset the error condition.
145  if (thrift_server_->started_ == false) {
146  stringstream ss;
147  ss << "ThriftServer '" << thrift_server_->name_ << "' (on port: "
148  << thrift_server_->port_ << ") did not start correctly ";
149  LOG(ERROR) << ss.str();
150  return Status(ss.str());
151  }
152  return Status::OK;
153 }
154 
155 void ThriftServer::ThriftServerEventProcessor::Supervise() {
156  DCHECK(thrift_server_->server_.get() != NULL);
157  try {
158  thrift_server_->server_->serve();
159  } catch (TException& e) {
160  LOG(ERROR) << "ThriftServer '" << thrift_server_->name_ << "' (on port: "
161  << thrift_server_->port_ << ") exited due to TException: " << e.what();
162  }
163  {
164  // signal_lock_ ensures mutual exclusion of access to thrift_server_
165  lock_guard<mutex> lock(signal_lock_);
166  thrift_server_->started_ = false;
167 
168  // There may not be anyone waiting on this signal (if the
169  // exception occurs after startup). That's not a problem, this is
170  // just to avoid waiting for the timeout in case of a bind
171  // failure, for example.
172  signal_fired_ = true;
173  }
174  signal_cond_.notify_all();
175 }
176 
177 void ThriftServer::ThriftServerEventProcessor::preServe() {
178  // Acquire the signal lock to ensure that StartAndWaitForServer is
179  // waiting on signal_cond_ when we notify.
180  lock_guard<mutex> lock(signal_lock_);
181  signal_fired_ = true;
182 
183  // This is the (only) success path - if this is not reached within TIMEOUT_MS,
184  // StartAndWaitForServer will indicate failure.
185  thrift_server_->started_ = true;
186 
187  // Should only be one thread waiting on signal_cond_, but wake all just in case.
188  signal_cond_.notify_all();
189 }
190 
191 // This thread-local variable contains the current connection context for whichever
192 // thrift server is currently serving a request on the current thread. This includes
193 // connection state such as the connection identifier and the username.
195 
196 
197 const TUniqueId& ThriftServer::GetThreadConnectionId() {
199 }
200 
201 const ThriftServer::ConnectionContext* ThriftServer::GetThreadConnectionContext() {
202  return __connection_context__;
203 }
204 
205 void* ThriftServer::ThriftServerEventProcessor::createContext(shared_ptr<TProtocol> input,
206  shared_ptr<TProtocol> output) {
207  TSocket* socket = NULL;
208  TTransport* transport = input->getTransport().get();
209  shared_ptr<ConnectionContext> connection_ptr =
210  shared_ptr<ConnectionContext>(new ConnectionContext);
211  TTransport* underlying_transport =
212  (static_cast<TBufferedTransport*>(transport))->getUnderlyingTransport().get();
213  if (!thrift_server_->auth_provider_->is_sasl()) {
214  socket = static_cast<TSocket*>(underlying_transport);
215  } else {
216  TSaslServerTransport* sasl_transport = static_cast<TSaslServerTransport*>(
217  underlying_transport);
218 
219  // Get the username from the transport.
220  connection_ptr->username = sasl_transport->getUsername();
221  socket = static_cast<TSocket*>(sasl_transport->getUnderlyingTransport().get());
222  }
223 
224  {
225  connection_ptr->server_name = thrift_server_->name_;
226  connection_ptr->network_address =
227  MakeNetworkAddress(socket->getPeerAddress(), socket->getPeerPort());
228 
229  lock_guard<mutex> l(thrift_server_->connection_contexts_lock_);
230  uuid connection_uuid = thrift_server_->uuid_generator_();
231  UUIDToTUniqueId(connection_uuid, &connection_ptr->connection_id);
232 
233  // Add the connection to the connection map.
234  __connection_context__ = connection_ptr.get();
235  thrift_server_->connection_contexts_[connection_ptr.get()] = connection_ptr;
236  }
237 
238  if (thrift_server_->connection_handler_ != NULL) {
239  thrift_server_->connection_handler_->ConnectionStart(*__connection_context__);
240  }
241 
242  if (thrift_server_->metrics_enabled_) {
243  thrift_server_->num_current_connections_metric_->Increment(1L);
244  thrift_server_->total_connections_metric_->Increment(1L);
245  }
246 
247  // Store the __connection_context__ in the per-client context. If only this were
248  // accessible from RPC method calls, we wouldn't have to
249  // mess around with thread locals.
250  return (void*)__connection_context__;
251 }
252 
253 void ThriftServer::ThriftServerEventProcessor::processContext(void* context,
254  shared_ptr<TTransport> transport) {
255  __connection_context__ = reinterpret_cast<ConnectionContext*>(context);
256 }
257 
258 void ThriftServer::ThriftServerEventProcessor::deleteContext(void* serverContext,
259  shared_ptr<TProtocol> input, shared_ptr<TProtocol> output) {
260  __connection_context__ = (ConnectionContext*) serverContext;
261 
262  if (thrift_server_->connection_handler_ != NULL) {
263  thrift_server_->connection_handler_->ConnectionEnd(*__connection_context__);
264  }
265 
266  {
267  lock_guard<mutex> l(thrift_server_->connection_contexts_lock_);
268  thrift_server_->connection_contexts_.erase(__connection_context__);
269  }
270 
271  if (thrift_server_->metrics_enabled_) {
272  thrift_server_->num_current_connections_metric_->Increment(-1L);
273  }
274 }
275 
276 ThriftServer::ThriftServer(const string& name, const shared_ptr<TProcessor>& processor,
277  int port, AuthProvider* auth_provider, MetricGroup* metrics, int num_worker_threads,
278  ServerType server_type)
279  : started_(false),
280  port_(port),
281  ssl_enabled_(false),
282  num_worker_threads_(num_worker_threads),
283  server_type_(server_type),
284  name_(name),
285  server_thread_(NULL),
286  server_(NULL),
287  processor_(processor),
288  connection_handler_(NULL),
289  auth_provider_(auth_provider) {
290  if (auth_provider_ == NULL) {
292  }
293  if (metrics != NULL) {
294  metrics_enabled_ = true;
295  stringstream count_ss;
296  count_ss << "impala.thrift-server." << name << ".connections-in-use";
297  num_current_connections_metric_ = metrics->AddGauge(count_ss.str(), 0L);
298  stringstream max_ss;
299  max_ss << "impala.thrift-server." << name << ".total-connections";
300  total_connections_metric_ = metrics->AddCounter(max_ss.str(), 0L);
301  } else {
302  metrics_enabled_ = false;
303  }
304 }
305 
306 Status ThriftServer::CreateSocket(shared_ptr<TServerTransport>* socket) {
307  if (ssl_enabled()) {
308  // This 'factory' is only called once, since CreateSocket() is only called from
309  // Start()
310  shared_ptr<TSSLSocketFactory> socket_factory(new TSSLSocketFactory());
311  try {
312  socket_factory->loadCertificate(certificate_path_.c_str());
313  socket_factory->loadPrivateKey(private_key_path_.c_str());
314  socket->reset(new TSSLServerSocket(port_, socket_factory));
315  } catch (const TException& e) {
316  stringstream err_msg;
317  err_msg << "Could not create SSL socket: " << e.what();
318  return Status(err_msg.str());
319  }
320  return Status::OK;
321  } else {
322  socket->reset(new TServerSocket(port_));
323  return Status::OK;
324  }
325 }
326 
327 Status ThriftServer::EnableSsl(const string& certificate, const string& private_key) {
328  DCHECK(!started_);
329  if (certificate.empty()) return Status("SSL certificate path may not be blank");
330  if (private_key.empty()) return Status("SSL private key path may not be blank");
331 
332  if (!exists(certificate)) {
333  stringstream err_msg;
334  err_msg << "Certificate file " << certificate << " does not exist";
335  return Status(err_msg.str());
336  }
337 
338  // TODO: Consider warning if private key file is world-readable
339  if (!exists(private_key)) {
340  stringstream err_msg;
341  err_msg << "Private key file " << private_key << " does not exist";
342  return Status(err_msg.str());
343  }
344 
345  ssl_enabled_ = true;
346  certificate_path_ = certificate;
347  private_key_path_ = private_key;
348  return Status::OK;
349 }
350 
352  DCHECK(!started_);
353  shared_ptr<TProtocolFactory> protocol_factory(new TBinaryProtocolFactory());
354  shared_ptr<ThreadFactory> thread_factory(
355  new ThriftThreadFactory("thrift-server", name_));
356 
357  // Note - if you change the transport types here, you must check that the
358  // logic in createContext is still accurate.
359  shared_ptr<TServerTransport> server_socket;
360  shared_ptr<TTransportFactory> transport_factory;
361  RETURN_IF_ERROR(CreateSocket(&server_socket));
363  switch (server_type_) {
364  case ThreadPool:
365  {
366  shared_ptr<ThreadManager> thread_mgr(
367  ThreadManager::newSimpleThreadManager(num_worker_threads_));
368  thread_mgr->threadFactory(thread_factory);
369  thread_mgr->start();
370  server_.reset(new TThreadPoolServer(processor_, server_socket,
371  transport_factory, protocol_factory, thread_mgr));
372  }
373  break;
374  case Threaded:
375  server_.reset(new TThreadedServer(processor_, server_socket,
376  transport_factory, protocol_factory, thread_factory));
377  break;
378  default:
379  stringstream error_msg;
380  error_msg << "Unsupported server type: " << server_type_;
381  LOG(ERROR) << error_msg.str();
382  return Status(error_msg.str());
383  }
384  shared_ptr<ThriftServer::ThriftServerEventProcessor> event_processor(
386  server_->setServerEventHandler(event_processor);
387 
388  RETURN_IF_ERROR(event_processor->StartAndWaitForServer());
389 
390  LOG(INFO) << "ThriftServer '" << name_ << "' started on port: " << port_
391  << (ssl_enabled() ? "s" : "");
392  DCHECK(started_);
393  return Status::OK;
394 }
395 
397  DCHECK(server_thread_ != NULL);
398  DCHECK(started_);
399  server_thread_->Join();
400 }
401 
403  DCHECK(server_thread_ != NULL);
404  DCHECK(server_);
405  DCHECK_EQ(server_type_, Threaded);
406  server_->stop();
407  if (started_) Join();
408 }
409 }
SimpleMetric< T, TMetricKind::COUNTER > * AddCounter(const std::string &key, const T &value, const TUnit::type unit=TUnit::UNIT, const std::string &description="")
Definition: metrics.h:239
bool ssl_enabled_
True if the server socket only accepts SSL connections.
boost::scoped_ptr< apache::thrift::server::TServer > server_
Thrift housekeeping.
const StringSearch UrlParser::protocol_search & protocol
Definition: url-parser.cc:36
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
AuthProvider * GetInternalAuthProvider()
boost::scoped_ptr< Thread > server_thread_
Thread that runs ThriftServerEventProcessor::Supervise() in a separate loop.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
virtual Status GetServerTransportFactory(boost::shared_ptr< apache::thrift::transport::TTransportFactory > *factory)=0
Status EnableSsl(const std::string &certificate, const std::string &private_key)
void Join()
Blocks until the server stops and exits its main thread.
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
Definition: network-util.cc:96
int port_
The port on which the server interface is exposed.
bool started_
True if the server has been successfully started, for internal use only.
__thread ThriftServer::ConnectionContext * __connection_context__
ThriftServerEventProcessor(ThriftServer *thrift_server)
std::string private_key_path_
Path to private key file in .PEM format.
AuthProvider * auth_provider_
Not owned by us, owned by the AuthManager.
IntGauge * num_current_connections_metric_
Number of currently active connections.
Per-connection information.
Definition: thrift-server.h:45
bool ssl_enabled() const
Definition: thrift-server.h:94
DECLARE_string(principal)
DEFINE_int32(rpc_cnxn_attempts, 10,"Deprecated")
static AuthManager * GetInstance()
SimpleMetric< T > * AddGauge(const std::string &key, const T &value, const TUnit::type unit=TUnit::NONE, const std::string &description="")
Create a gauge metric object with given key and initial value (owned by this object) ...
Definition: metrics.h:223
Status CreateSocket(boost::shared_ptr< apache::thrift::transport::TServerTransport > *socket)
void UUIDToTUniqueId(const boost::uuids::uuid &uuid, T *unique_id)
Definition: uid-util.h:38
bool metrics_enabled_
True if metrics are enabled.
static const Status OK
Definition: status.h:87
boost::shared_ptr< TTransport > getUnderlyingTransport()
ServerType server_type_
ThreadPool or Threaded server.
string name
Definition: cpu-info.cc:50
IntCounter * total_connections_metric_
Total connections made over the lifetime of this server.
std::string certificate_path_
Path to certificate file in .PEM format.
const std::string name_
User-specified identifier that shows up in logs.
boost::shared_ptr< apache::thrift::TProcessor > processor_