Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
in-process-servers.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include "statestore/statestore.h"
18 #include "rpc/thrift-util.h"
19 #include "rpc/thrift-server.h"
20 #include "util/network-util.h"
21 #include "util/webserver.h"
23 #include "util/metrics.h"
24 #include "runtime/exec-env.h"
25 #include "service/impala-server.h"
26 
27 #include "common/names.h"
28 
29 using namespace apache::thrift;
30 using namespace impala;
31 
32 InProcessImpalaServer::InProcessImpalaServer(const string& hostname, int backend_port,
33  int subscriber_port, int webserver_port, const string& statestore_host,
34  int statestore_port)
35  : hostname_(hostname), backend_port_(backend_port),
36  impala_server_(NULL),
37  exec_env_(new ExecEnv(hostname, backend_port, subscriber_port, webserver_port,
38  statestore_host, statestore_port)) {
39 }
40 
42  DCHECK(impala_server_ != NULL) << "Call Start*() first.";
43  exec_env_->frontend()->SetCatalogInitialized();
44 }
45 
47  bool use_statestore) {
48  RETURN_IF_ERROR(exec_env_->StartServices());
49  ThriftServer* be_server;
50  ThriftServer* hs2_server;
51  ThriftServer* beeswax_server;
52  RETURN_IF_ERROR(CreateImpalaServer(exec_env_.get(), beeswax_port, hs2_port,
53  backend_port_, &beeswax_server, &hs2_server,
54  &be_server, &impala_server_));
55  be_server_.reset(be_server);
56  hs2_server_.reset(hs2_server);
57  beeswax_server_.reset(beeswax_server);
58 
59  RETURN_IF_ERROR(be_server_->Start());
60  RETURN_IF_ERROR(hs2_server_->Start());
62 
63  // Wait for up to 1s for the backend server to start
64  RETURN_IF_ERROR(WaitForServer(hostname_, backend_port_, 100, 100));
65 
66  return Status::OK;
67 }
68 
70  RETURN_IF_ERROR(exec_env_->StartServices());
71  ThriftServer* be_server;
73  &be_server, &impala_server_));
74  be_server_.reset(be_server);
75  RETURN_IF_ERROR(be_server_->Start());
76  return Status::OK;
77 }
78 
80  be_server_->Join();
81  return Status::OK;
82 }
83 
84 InProcessStatestore::InProcessStatestore(int statestore_port, int webserver_port)
85  : webserver_(new Webserver(webserver_port)),
86  metrics_(new MetricGroup("statestore")),
87  statestore_port_(statestore_port),
88  statestore_(new Statestore(metrics_.get())) {
90  statestore_->RegisterWebpages(webserver_.get());
91 }
92 
94  webserver_->Start();
95  shared_ptr<TProcessor> processor(
96  new StatestoreServiceProcessor(statestore_->thrift_iface()));
97 
98  statestore_server_.reset(new ThriftServer("StatestoreService", processor,
99  statestore_port_, NULL, metrics_.get(), 5));
100  statestore_main_loop_.reset(
101  new Thread("statestore", "main-loop", &Statestore::MainLoop, statestore_.get()));
102 
104  return WaitForServer("localhost", statestore_port_, 10, 100);
105 }
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
void AddDefaultUrlCallbacks(Webserver *webserver, MemTracker *process_mem_tracker=NULL)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
boost::scoped_ptr< ExecEnv > exec_env_
ExecEnv holds much of the per-service state.
InProcessStatestore(int statestore_port, int webserver_port)
Constructs but does not start the statestore.
boost::scoped_ptr< Thread > statestore_main_loop_
boost::scoped_ptr< Statestore > statestore_
The statestore instance.
const std::string hostname_
Hostname for this server, usually FLAGS_hostname.
boost::scoped_ptr< MetricGroup > metrics_
MetricGroup object.
boost::scoped_ptr< Webserver > webserver_
Websever object to serve debug pages through.
Status WaitForServer(const string &host, int port, int num_retries, int retry_interval_ms)
Definition: thrift-util.cc:121
boost::scoped_ptr< ThriftServer > hs2_server_
Frontend HiveServer2 server.
Status CreateImpalaServer(ExecEnv *exec_env, int beeswax_port, int hs2_port, int be_port, ThriftServer **beeswax_server, ThriftServer **hs2_server, ThriftServer **be_server, ImpalaServer **impala_server)
Status MainLoop()
The main processing loop. Blocks until the exit flag is set.
Definition: statestore.cc:743
uint32_t statestore_port_
Port to start the statestore on.
const uint32_t backend_port_
Port to start the backend server on.
Status Start()
Starts the statestore server, and the processing thread.
boost::scoped_ptr< ThriftServer > statestore_server_
Statestore Thrift server.
Status StartWithClientServers(int beeswax_port, int hs2_port, bool use_statestore)
static const Status OK
Definition: status.h:87
ExecEnv * exec_env_
Definition: coordinator.h:193
boost::scoped_ptr< ThriftServer > be_server_
Backend Thrift server.
boost::scoped_ptr< ThriftServer > beeswax_server_
Frontend Beeswax server.
Status StartAsBackendOnly(bool use_statestore)