17 #include <thrift/protocol/TBinaryProtocol.h>
18 #include <thrift/protocol/TDebugProtocol.h>
19 #include <thrift/transport/TSocket.h>
20 #include <thrift/transport/TTransportUtils.h>
22 #include <boost/algorithm/string.hpp>
23 #include <boost/thread/thread.hpp>
24 #include "gen-cpp/NetworkTest_types.h"
25 #include "gen-cpp/NetworkTestService.h"
36 DEFINE_int32(port, 22222,
"Port for NetworkTestService");
37 DEFINE_int64(send_batch_size, 0,
"Batch size (in bytes). Data is split up into batches");
55 using boost::algorithm::is_any_of;
56 using boost::algorithm::token_compress_on;
57 using boost::algorithm::split;
59 using namespace apache::thrift;
61 using namespace apache::thrift::transport;
62 using namespace apache::thrift::server;
63 using namespace apache::thrift::concurrency;
64 using namespace impala;
65 using namespace impalatest;
76 virtual void Send(ThriftDataResult& result,
const ThriftDataParams& params) {
77 result.__set_bytes_received(params.data.size());
89 int64_t batch_size = FLAGS_send_batch_size;
90 if (batch_size == 0) batch_size = bytes;
91 int64_t total_sent = 0;
95 while (total_sent < bytes) {
96 int64_t send_size = min(bytes - total_sent, batch_size);
97 total_sent += send_size;
99 ThriftDataParams data;
100 ThriftDataResult result;
101 data.data.resize(send_size);
102 client->
iface()->Send(result, data);
104 if (result.bytes_received != send_size) {
110 double mb = bytes / (1024. * 1024.);
111 double sec = timer.
ElapsedTime() / (1000.) / (1000.) / (1000.);
117 if (tokens.size() != 3) {
121 int64_t mbs = atoi(tokens[1].c_str());
122 int64_t bytes = mbs * (1024L * 1024L);
123 cout <<
"Sending " << mbs <<
" megabytes..." << endl;
124 const string& ip = tokens[2];
129 cerr <<
"Could not connect to server" << endl;
133 double rate =
Send(&client, bytes);
135 cerr <<
"Send failed";
138 cout <<
"Send rate: (MB/s): " << rate << endl;
143 if (tokens.size() <= 2) {
146 int64_t mbs = atoi(tokens[1].c_str());
147 int64_t bytes = mbs * (1024L * 1024L);
148 cout <<
"Broadcasting " << mbs <<
" megabytes..." << endl;
150 vector<ThriftClient<NetworkTestServiceClient>* > clients;
151 for (
int i = 2; i < tokens.size(); ++i) {
156 cerr <<
"Could not connect to server: " << tokens[i] << endl;
159 clients.push_back(client);
164 thread_group threads;
165 for (
int i = 0; i < clients.size(); ++i) {
166 threads.add_thread(
new thread(
Send, clients[i], bytes));
171 double mb = bytes / (1024 * 1024.);
172 double sec = timer.
ElapsedTime() / (1000.) / (1000.) / (1000.);
174 cout <<
"Send rate per node: (MB/s) " << (mb/sec) << endl;
175 cout <<
"Send rate cluster: (MB/s) " << (mb * clients.size() / sec) << endl;
179 for (
int i = 0; i < tokens->size(); ++i) {
181 (*tokens)[i].begin(), (*tokens)[i].end(), (*tokens)[i].begin(), ::tolower);
186 if (tokens.empty())
return false;
188 if (tokens[0] ==
"quit")
return true;;
190 if (tokens[0] ==
"send") {
192 }
else if (tokens[0] ==
"broadcast") {
195 cerr <<
"Invalid command" << endl;
201 int main(
int argc,
char** argv) {
202 google::ParseCommandLineFlags(&argc, &argv,
true);
207 vector<string> tokens;
208 for (
int i = 1; i < argc; ++i) {
209 tokens.push_back(argv[i]);
217 shared_ptr<TestServer> handler(
new TestServer);
219 shared_ptr<TProcessor> processor(
new NetworkTestServiceProcessor(handler));
221 FLAGS_port, NULL, NULL, 100, ThriftServer::ThreadPool);
226 vector<string> tokens;
231 if (cin.eof())
break;
233 split(tokens, input, is_any_of(
" "), token_compress_on);
240 server_thread->join();
const StringSearch UrlParser::protocol_search & protocol
InterfaceType * iface()
Returns the object used to actually make RPCs against the remote server.
void HandleBroadcast(const vector< string > &tokens)
void Join()
Blocks until the server stops and exits its main thread.
DEFINE_int64(send_batch_size, 0,"Batch size (in bytes). Data is split up into batches")
void HandleSend(const vector< string > &tokens)
virtual void Send(ThriftDataResult &result, const ThriftDataParams ¶ms)
uint64_t ElapsedTime() const
Returns time in nanosecond.
DEFINE_int32(port, 22222,"Port for NetworkTestService")
bool ProcessCommand(const vector< string > &tokens)
void Server(ThriftServer *server)
void ConvertToLowerCase(vector< string > *tokens)
double Send(ThriftClient< NetworkTestServiceClient > *client, int64_t bytes)
int main(int argc, char **argv)