Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
network-perf-benchmark.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <iostream>
16 
17 #include <thrift/protocol/TBinaryProtocol.h>
18 #include <thrift/protocol/TDebugProtocol.h>
19 #include <thrift/transport/TSocket.h>
20 #include <thrift/transport/TTransportUtils.h>
21 
22 #include <boost/algorithm/string.hpp>
23 #include <boost/thread/thread.hpp>
24 #include "gen-cpp/NetworkTest_types.h"
25 #include "gen-cpp/NetworkTestService.h"
26 
27 #include "common/logging.h"
28 #include "util/cpu-info.h"
29 #include "util/stopwatch.h"
30 #include "rpc/thrift-client.h"
31 #include "rpc/thrift-server.h"
32 #include "rpc/thrift-thread.h"
33 
34 #include "common/names.h"
35 
36 DEFINE_int32(port, 22222, "Port for NetworkTestService");
37 DEFINE_int64(send_batch_size, 0, "Batch size (in bytes). Data is split up into batches");
38 
39 // Simple client server network speed benchmark utility. This compiles to
40 // a binary that runs as both the client and server. The server can be started
41 // up by just running the binary. After the server starts up, it will drop into
42 // the client 'shell' where benchmarks can be run.
43 // The supported benchmarks are:
44 // 'send <size in mb> <target ip>'
45 // 'broadcast <size in mb> <list of space separated target ips>
46 // The command can also be passed in via command line in the same format. If
47 // run in this mode, the server does not start up.
48 // For broadcast, the data is sent in parallel to all nodes.
49 //
50 // The expected usage for measuring 'send' is to start up the server on one machine
51 // and issue the send from another.
52 // For 'broadcast', the server should be started on all the machines and then the
53 // broadcast is issued from one of them.
54 
55 using boost::algorithm::is_any_of;
56 using boost::algorithm::token_compress_on;
57 using boost::algorithm::split;
58 
59 using namespace apache::thrift;
60 using namespace apache::thrift::protocol;
61 using namespace apache::thrift::transport;
62 using namespace apache::thrift::server;
63 using namespace apache::thrift::concurrency;
64 using namespace impala;
65 using namespace impalatest;
66 
67 
69  public:
71  }
72 
73  virtual ~TestServer() {
74  }
75 
76  virtual void Send(ThriftDataResult& result, const ThriftDataParams& params) {
77  result.__set_bytes_received(params.data.size());
78  }
79 
80  void Server(ThriftServer* server) {
81  server->Start();
82  server->Join();
83  }
84 };
85 
86 // Send bytes to client respecting the batch size
87 // Returns the rate in mb/s to send the data.
88 double Send(ThriftClient<NetworkTestServiceClient>* client, int64_t bytes) {
89  int64_t batch_size = FLAGS_send_batch_size;
90  if (batch_size == 0) batch_size = bytes;
91  int64_t total_sent = 0;
92 
93  MonotonicStopWatch timer;
94  timer.Start();
95  while (total_sent < bytes) {
96  int64_t send_size = min(bytes - total_sent, batch_size);
97  total_sent += send_size;
98 
99  ThriftDataParams data;
100  ThriftDataResult result;
101  data.data.resize(send_size);
102  client->iface()->Send(result, data);
103 
104  if (result.bytes_received != send_size) {
105  return -1;
106  }
107  }
108  timer.Stop();
109 
110  double mb = bytes / (1024. * 1024.);
111  double sec = timer.ElapsedTime() / (1000.) / (1000.) / (1000.);
112  return mb/sec;
113 }
114 
115 // Send tokens[1] megabytes to tokens[2]
116 void HandleSend(const vector<string>& tokens) {
117  if (tokens.size() != 3) {
118  return;
119  }
120 
121  int64_t mbs = atoi(tokens[1].c_str());
122  int64_t bytes = mbs * (1024L * 1024L);
123  cout << "Sending " << mbs << " megabytes..." << endl;
124  const string& ip = tokens[2];
125 
126  ThriftClient<NetworkTestServiceClient> client(ip, FLAGS_port);
127  Status status = client.Open();
128  if (!status.ok()) {
129  cerr << "Could not connect to server" << endl;
130  return;
131  }
132 
133  double rate = Send(&client, bytes);
134  if (rate < 0) {
135  cerr << "Send failed";
136  return;
137  }
138  cout << "Send rate: (MB/s): " << rate << endl;
139 }
140 
141 // Broadcast tokens[1] megabytes to tokens[2...n] nodes in parallel.
142 void HandleBroadcast(const vector<string>& tokens) {
143  if (tokens.size() <= 2) {
144  return;
145  }
146  int64_t mbs = atoi(tokens[1].c_str());
147  int64_t bytes = mbs * (1024L * 1024L);
148  cout << "Broadcasting " << mbs << " megabytes..." << endl;
149 
150  vector<ThriftClient<NetworkTestServiceClient>* > clients;
151  for (int i = 2; i < tokens.size(); ++i) {
153  new ThriftClient<NetworkTestServiceClient>(tokens[i], FLAGS_port);
154  Status status = client->Open();
155  if (!status.ok()) {
156  cerr << "Could not connect to server: " << tokens[i] << endl;
157  return;
158  }
159  clients.push_back(client);
160  }
161 
162  MonotonicStopWatch timer;
163  timer.Start();
164  thread_group threads;
165  for (int i = 0; i < clients.size(); ++i) {
166  threads.add_thread(new thread(Send, clients[i], bytes));
167  }
168  threads.join_all();
169  timer.Stop();
170 
171  double mb = bytes / (1024 * 1024.);
172  double sec = timer.ElapsedTime() / (1000.) / (1000.) / (1000.);
173 
174  cout << "Send rate per node: (MB/s) " << (mb/sec) << endl;
175  cout << "Send rate cluster: (MB/s) " << (mb * clients.size() / sec) << endl;
176 }
177 
178 void ConvertToLowerCase(vector<string>* tokens) {
179  for (int i = 0; i < tokens->size(); ++i) {
180  transform(
181  (*tokens)[i].begin(), (*tokens)[i].end(), (*tokens)[i].begin(), ::tolower);
182  }
183 }
184 
185 bool ProcessCommand(const vector<string>& tokens) {
186  if (tokens.empty()) return false;
187 
188  if (tokens[0] == "quit") return true;;
189 
190  if (tokens[0] == "send") {
191  HandleSend(tokens);
192  } else if (tokens[0] == "broadcast") {
193  HandleBroadcast(tokens);
194  } else {
195  cerr << "Invalid command" << endl;
196  return false;
197  }
198  return false;
199 }
200 
201 int main(int argc, char** argv) {
202  google::ParseCommandLineFlags(&argc, &argv, true);
203  CpuInfo::Init();
204 
205  if (argc != 1) {
206  // Just run client from command line args
207  vector<string> tokens;
208  for (int i = 1; i < argc; ++i) {
209  tokens.push_back(argv[i]);
210  }
211  ConvertToLowerCase(&tokens);
212  ProcessCommand(tokens);
213  return 0;
214  }
215 
216  // Start up server and client shell
217  shared_ptr<TestServer> handler(new TestServer);
218  shared_ptr<ThreadFactory> thread_factory(new ThriftThreadFactory("test", "test"));
219  shared_ptr<TProcessor> processor(new NetworkTestServiceProcessor(handler));
220  ThriftServer* server = new ThriftServer("Network Test Server", processor,
221  FLAGS_port, NULL, NULL, 100, ThriftServer::ThreadPool);
222  thread* server_thread = new thread(&TestServer::Server, handler.get(), server);
223 
224  string input;
225  while (1) {
226  vector<string> tokens;
227  cout << "> ";
228  cout.flush();
229 
230  getline(cin, input);
231  if (cin.eof()) break;
232 
233  split(tokens, input, is_any_of(" "), token_compress_on);
234 
235  ConvertToLowerCase(&tokens);
236  if (ProcessCommand(tokens)) break;
237  }
238 
239  server->StopForTesting();
240  server_thread->join();
241 
242  return 0;
243 }
const StringSearch UrlParser::protocol_search & protocol
Definition: url-parser.cc:36
InterfaceType * iface()
Returns the object used to actually make RPCs against the remote server.
void HandleBroadcast(const vector< string > &tokens)
void Join()
Blocks until the server stops and exits its main thread.
DEFINE_int64(send_batch_size, 0,"Batch size (in bytes). Data is split up into batches")
void HandleSend(const vector< string > &tokens)
virtual void Send(ThriftDataResult &result, const ThriftDataParams &params)
uint64_t ElapsedTime() const
Returns time in nanosecond.
Definition: stopwatch.h:105
DEFINE_int32(port, 22222,"Port for NetworkTestService")
bool ProcessCommand(const vector< string > &tokens)
void Server(ThriftServer *server)
void ConvertToLowerCase(vector< string > *tokens)
double Send(ThriftClient< NetworkTestServiceClient > *client, int64_t bytes)
bool ok() const
Definition: status.h:172
int main(int argc, char **argv)