15 #include <boost/filesystem.hpp>
16 #include <boost/thread.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <boost/thread/condition_variable.hpp>
19 #include <boost/uuid/uuid_io.hpp>
21 #include <thrift/concurrency/Thread.h>
22 #include <thrift/concurrency/ThreadManager.h>
23 #include <thrift/protocol/TBinaryProtocol.h>
24 #include <thrift/server/TThreadPoolServer.h>
25 #include <thrift/server/TThreadedServer.h>
26 #include <thrift/transport/TSocket.h>
27 #include <thrift/transport/TSSLServerSocket.h>
28 #include <thrift/transport/TSSLSocket.h>
29 #include <thrift/server/TThreadPoolServer.h>
30 #include <thrift/transport/TServerSocket.h>
31 #include <gflags/gflags.h>
33 #include "gen-cpp/Types_types.h"
44 namespace posix_time = boost::posix_time;
45 using boost::filesystem::exists;
46 using boost::get_system_time;
47 using boost::system_time;
48 using boost::uuids::uuid;
49 using namespace apache::thrift::concurrency;
51 using namespace apache::thrift::server;
52 using namespace apache::thrift::transport;
53 using namespace apache::thrift;
56 DEFINE_int32(rpc_cnxn_retry_interval_ms, 2000,
"Deprecated");
68 : thrift_server_(thrift_server),
69 signal_fired_(false) { }
74 virtual void preServe();
78 virtual void* createContext(shared_ptr<TProtocol> input, shared_ptr<TProtocol> output);
81 virtual void processContext(
void* context, shared_ptr<TTransport> output);
84 virtual void deleteContext(
void* serverContext, shared_ptr<TProtocol> input,
85 shared_ptr<TProtocol> output);
89 Status StartAndWaitForServer();
109 static const int TIMEOUT_MS = 2500;
115 Status ThriftServer::ThriftServerEventProcessor::StartAndWaitForServer() {
117 unique_lock<mutex> lock(signal_lock_);
118 thrift_server_->started_ =
false;
121 name <<
"supervise-" << thrift_server_->name_;
122 thrift_server_->server_thread_.reset(
123 new Thread(
"thrift-server", name.str(),
124 &ThriftServer::ThriftServerEventProcessor::Supervise,
this));
126 system_time deadline = get_system_time() +
127 posix_time::milliseconds(ThriftServer::ThriftServerEventProcessor::TIMEOUT_MS);
131 while (!signal_fired_) {
133 if (!signal_cond_.timed_wait(lock, deadline)) {
135 ss <<
"ThriftServer '" << thrift_server_->name_ <<
"' (on port: "
136 << thrift_server_->port_ <<
") did not start within "
137 << ThriftServer::ThriftServerEventProcessor::TIMEOUT_MS <<
"ms";
138 LOG(ERROR) << ss.str();
145 if (thrift_server_->started_ ==
false) {
147 ss <<
"ThriftServer '" << thrift_server_->name_ <<
"' (on port: "
148 << thrift_server_->port_ <<
") did not start correctly ";
149 LOG(ERROR) << ss.str();
155 void ThriftServer::ThriftServerEventProcessor::Supervise() {
156 DCHECK(thrift_server_->server_.get() != NULL);
158 thrift_server_->server_->serve();
159 }
catch (TException& e) {
160 LOG(ERROR) <<
"ThriftServer '" << thrift_server_->name_ <<
"' (on port: "
161 << thrift_server_->port_ <<
") exited due to TException: " << e.what();
165 lock_guard<mutex> lock(signal_lock_);
166 thrift_server_->started_ =
false;
172 signal_fired_ =
true;
174 signal_cond_.notify_all();
177 void ThriftServer::ThriftServerEventProcessor::preServe() {
180 lock_guard<mutex> lock(signal_lock_);
181 signal_fired_ =
true;
185 thrift_server_->started_ =
true;
188 signal_cond_.notify_all();
197 const TUniqueId& ThriftServer::GetThreadConnectionId() {
205 void* ThriftServer::ThriftServerEventProcessor::createContext(shared_ptr<TProtocol> input,
206 shared_ptr<TProtocol> output) {
207 TSocket* socket = NULL;
208 TTransport* transport = input->getTransport().get();
209 shared_ptr<ConnectionContext> connection_ptr =
211 TTransport* underlying_transport =
212 (
static_cast<TBufferedTransport*
>(transport))->getUnderlyingTransport().get();
213 if (!thrift_server_->auth_provider_->is_sasl()) {
214 socket =
static_cast<TSocket*
>(underlying_transport);
217 underlying_transport);
220 connection_ptr->username = sasl_transport->
getUsername();
225 connection_ptr->server_name = thrift_server_->name_;
226 connection_ptr->network_address =
229 lock_guard<mutex> l(thrift_server_->connection_contexts_lock_);
230 uuid connection_uuid = thrift_server_->uuid_generator_();
235 thrift_server_->connection_contexts_[connection_ptr.get()] = connection_ptr;
238 if (thrift_server_->connection_handler_ != NULL) {
242 if (thrift_server_->metrics_enabled_) {
243 thrift_server_->num_current_connections_metric_->Increment(1L);
244 thrift_server_->total_connections_metric_->Increment(1L);
253 void ThriftServer::ThriftServerEventProcessor::processContext(
void* context,
254 shared_ptr<TTransport> transport) {
258 void ThriftServer::ThriftServerEventProcessor::deleteContext(
void* serverContext,
259 shared_ptr<TProtocol> input, shared_ptr<TProtocol> output) {
262 if (thrift_server_->connection_handler_ != NULL) {
267 lock_guard<mutex> l(thrift_server_->connection_contexts_lock_);
271 if (thrift_server_->metrics_enabled_) {
272 thrift_server_->num_current_connections_metric_->Increment(-1L);
276 ThriftServer::ThriftServer(
const string&
name,
const shared_ptr<TProcessor>& processor,
282 num_worker_threads_(num_worker_threads),
283 server_type_(server_type),
285 server_thread_(NULL),
287 processor_(processor),
288 connection_handler_(NULL),
289 auth_provider_(auth_provider) {
293 if (metrics != NULL) {
295 stringstream count_ss;
296 count_ss <<
"impala.thrift-server." << name <<
".connections-in-use";
299 max_ss <<
"impala.thrift-server." << name <<
".total-connections";
310 shared_ptr<TSSLSocketFactory> socket_factory(
new TSSLSocketFactory());
314 socket->reset(
new TSSLServerSocket(
port_, socket_factory));
315 }
catch (
const TException& e) {
316 stringstream err_msg;
317 err_msg <<
"Could not create SSL socket: " << e.what();
318 return Status(err_msg.str());
322 socket->reset(
new TServerSocket(
port_));
329 if (certificate.empty())
return Status(
"SSL certificate path may not be blank");
330 if (private_key.empty())
return Status(
"SSL private key path may not be blank");
332 if (!exists(certificate)) {
333 stringstream err_msg;
334 err_msg <<
"Certificate file " << certificate <<
" does not exist";
335 return Status(err_msg.str());
339 if (!exists(private_key)) {
340 stringstream err_msg;
341 err_msg <<
"Private key file " << private_key <<
" does not exist";
342 return Status(err_msg.str());
353 shared_ptr<TProtocolFactory> protocol_factory(
new TBinaryProtocolFactory());
354 shared_ptr<ThreadFactory> thread_factory(
359 shared_ptr<TServerTransport> server_socket;
360 shared_ptr<TTransportFactory> transport_factory;
366 shared_ptr<ThreadManager> thread_mgr(
368 thread_mgr->threadFactory(thread_factory);
371 transport_factory, protocol_factory, thread_mgr));
376 transport_factory, protocol_factory, thread_factory));
379 stringstream error_msg;
380 error_msg <<
"Unsupported server type: " <<
server_type_;
381 LOG(ERROR) << error_msg.str();
382 return Status(error_msg.str());
384 shared_ptr<ThriftServer::ThriftServerEventProcessor> event_processor(
386 server_->setServerEventHandler(event_processor);
390 LOG(INFO) <<
"ThriftServer '" <<
name_ <<
"' started on port: " <<
port_
SimpleMetric< T, TMetricKind::COUNTER > * AddCounter(const std::string &key, const T &value, const TUnit::type unit=TUnit::UNIT, const std::string &description="")
bool ssl_enabled_
True if the server socket only accepts SSL connections.
boost::scoped_ptr< apache::thrift::server::TServer > server_
Thrift housekeeping.
const StringSearch UrlParser::protocol_search & protocol
TODO: Consider allowing fragment IDs as category parameters.
AuthProvider * GetInternalAuthProvider()
boost::scoped_ptr< Thread > server_thread_
Thread that runs ThriftServerEventProcessor::Supervise() in a separate loop.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
MetricGroups may be organised hierarchically as a tree.
virtual Status GetServerTransportFactory(boost::shared_ptr< apache::thrift::transport::TTransportFactory > *factory)=0
Status EnableSsl(const std::string &certificate, const std::string &private_key)
void Join()
Blocks until the server stops and exits its main thread.
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
int port_
The port on which the server interface is exposed.
ThriftServer * thrift_server_
bool started_
True if the server has been successfully started, for internal use only.
__thread ThriftServer::ConnectionContext * __connection_context__
ThriftServerEventProcessor(ThriftServer *thrift_server)
std::string private_key_path_
Path to private key file in .PEM format.
AuthProvider * auth_provider_
Not owned by us, owned by the AuthManager.
boost::mutex signal_lock_
IntGauge * num_current_connections_metric_
Number of currently active connections.
Per-connection information.
DECLARE_string(principal)
DEFINE_int32(rpc_cnxn_attempts, 10,"Deprecated")
static AuthManager * GetInstance()
SimpleMetric< T > * AddGauge(const std::string &key, const T &value, const TUnit::type unit=TUnit::NONE, const std::string &description="")
Create a gauge metric object with given key and initial value (owned by this object) ...
boost::condition_variable signal_cond_
Status CreateSocket(boost::shared_ptr< apache::thrift::transport::TServerTransport > *socket)
void UUIDToTUniqueId(const boost::uuids::uuid &uuid, T *unique_id)
bool metrics_enabled_
True if metrics are enabled.
std::string getUsername()
boost::shared_ptr< TTransport > getUnderlyingTransport()
ServerType server_type_
ThreadPool or Threaded server.
IntCounter * total_connections_metric_
Total connections made over the lifetime of this server.
std::string certificate_path_
Path to certificate file in .PEM format.
const std::string name_
User-specified identifier that shows up in logs.
boost::shared_ptr< apache::thrift::TProcessor > processor_