Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
rpc-trace.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rpc/rpc-trace.h"
16 
17 #include <boost/bind.hpp>
18 #include <boost/scoped_ptr.hpp>
19 #include <boost/foreach.hpp>
20 #include <gutil/strings/substitute.h>
21 
22 #include "common/logging.h"
23 #include "util/debug-util.h"
24 #include "util/time.h"
25 #include "util/webserver.h"
26 
27 #include "common/names.h"
28 
29 using namespace impala;
30 using namespace rapidjson;
31 using namespace strings;
32 
33 // Singleton class to keep track of all RpcEventHandlers, and to render them to a
34 // web-based summary page.
36  public:
37  // Adds an event handler to the list of those tracked
38  void RegisterEventHandler(RpcEventHandler* event_handler);
39 
40  // Produces Json for /rpcz with summary information for all Rpc methods.
41  // { "servers": [
42  // .. list of output from RpcEventHandler::ToJson()
43  // ] }
44  void JsonCallback(const Webserver::ArgumentMap& args, Document* document);
45 
46  // Resets method statistics. Takes two optional arguments: 'server' and 'method'. If
47  // neither are specified, all server statistics are reset. If only the former is
48  // specified, all statistics for that server are reset. If both arguments are present,
49  // resets the statistics for a single method only. Produces no JSON output.
50  void ResetCallback(const Webserver::ArgumentMap& args, Document* document);
51 
52  private:
53  // Protects event_handlers_
54  mutex lock_;
55 
56  // List of all event handlers in the system - once an event handler is registered, it
57  // should never be deleted. Event handlers are owned by the TProcessor which calls them,
58  // which are themselves owned by a ThriftServer. Since we do not terminate ThriftServers
59  // after they are started, event handlers have a lifetime equivalent to the length of
60  // the process.
61  vector<RpcEventHandler*> event_handlers_;
62 };
63 
64 // Only instance of RpcEventHandlerManager
65 scoped_ptr<RpcEventHandlerManager> handler_manager;
66 
69  if (webserver != NULL) {
70  Webserver::UrlCallback json = bind<void>(
72  webserver->RegisterUrlCallback("/rpcz", "rpcz.tmpl", json);
73 
74  Webserver::UrlCallback reset = bind<void>(
76  webserver->RegisterUrlCallback("/rpcz_reset", "", reset, false);
77  }
78 }
79 
81  DCHECK(event_handler != NULL);
82  lock_guard<mutex> l(lock_);
83  event_handlers_.push_back(event_handler);
84 }
85 
87  Document* document) {
88  lock_guard<mutex> l(lock_);
89  Value servers(kArrayType);
90  BOOST_FOREACH(RpcEventHandler* handler, event_handlers_) {
91  Value server(kObjectType);
92  handler->ToJson(&server, document);
93  servers.PushBack(server, document->GetAllocator());
94  }
95  document->AddMember("servers", servers, document->GetAllocator());
96 }
97 
99  Document* document) {
100  Webserver::ArgumentMap::const_iterator server_it = args.find("server");
101  bool reset_all_servers = (server_it == args.end());
102  Webserver::ArgumentMap::const_iterator method_it = args.find("method");
103  bool reset_all_in_server = (method_it == args.end());
104  lock_guard<mutex> l(lock_);
105  BOOST_FOREACH(RpcEventHandler* handler, event_handlers_) {
106  if (reset_all_servers || handler->server_name() == server_it->second) {
107  if (reset_all_in_server) {
108  handler->ResetAll();
109  } else {
110  handler->Reset(method_it->second);
111  }
112  if (!reset_all_servers) return;
113  }
114  }
115 }
116 
117 void RpcEventHandler::Reset(const string& method_name) {
118  lock_guard<mutex> l(method_map_lock_);
119  MethodMap::iterator it = method_map_.find(method_name);
120  if (it == method_map_.end()) return;
121  it->second->time_stats->Reset();
122  it->second->num_in_flight = 0L;
123 }
124 
126  lock_guard<mutex> l(method_map_lock_);
127  BOOST_FOREACH(const MethodMap::value_type& method, method_map_) {
128  method.second->time_stats->Reset();
129  method.second->num_in_flight = 0L;
130  }
131 }
132 
133 RpcEventHandler::RpcEventHandler(const string& server_name, MetricGroup* metrics) :
134  server_name_(server_name), metrics_(metrics) {
135  if (handler_manager.get() != NULL) handler_manager->RegisterEventHandler(this);
136 }
137 
138 void RpcEventHandler::ToJson(Value* server, Document* document) {
139  lock_guard<mutex> l(method_map_lock_);
140  Value name(server_name_.c_str(), document->GetAllocator());
141  server->AddMember("name", name, document->GetAllocator());
142  Value methods(kArrayType);
143  BOOST_FOREACH(const MethodMap::value_type& rpc, method_map_) {
144  Value method(kObjectType);
145  Value method_name(rpc.first.c_str(), document->GetAllocator());
146  method.AddMember("name", method_name, document->GetAllocator());
147  const string& human_readable = rpc.second->time_stats->ToHumanReadable();
148  Value summary(human_readable.c_str(), document->GetAllocator());
149  method.AddMember("summary", summary, document->GetAllocator());
150  method.AddMember("in_flight", rpc.second->num_in_flight, document->GetAllocator());
151  Value server_name(server_name_.c_str(), document->GetAllocator());
152  method.AddMember("server_name", server_name, document->GetAllocator());
153  methods.PushBack(method, document->GetAllocator());
154  }
155  server->AddMember("methods", methods, document->GetAllocator());
156 }
157 
158 void* RpcEventHandler::getContext(const char* fn_name, void* server_context) {
160  reinterpret_cast<ThriftServer::ConnectionContext*>(server_context);
161  MethodMap::iterator it;
162  {
163  lock_guard<mutex> l(method_map_lock_);
164  it = method_map_.find(fn_name);
165  if (it == method_map_.end()) {
166  MethodDescriptor* descriptor = new MethodDescriptor();
167  descriptor->name = fn_name;
168  const string& time_metric_name =
169  Substitute("rpc-method.$0.$1.call_duration", server_name_, descriptor->name);
170  descriptor->time_stats = metrics_->RegisterMetric(
171  new StatsMetric<double>(time_metric_name, TUnit::TIME_MS));
172  it = method_map_.insert(make_pair(descriptor->name, descriptor)).first;
173  }
174  }
175  ++(it->second->num_in_flight);
176  // TODO: Consider pooling these
177  InvocationContext* ctxt_ptr =
178  new InvocationContext(MonotonicMillis(), cnxn_ctx, it->second);
179  VLOG_RPC << "RPC call: " << string(fn_name) << "(from "
180  << ctxt_ptr->cnxn_ctx->network_address << ")";
181  return reinterpret_cast<void*>(ctxt_ptr);
182 }
183 
184 void RpcEventHandler::postWrite(void* ctx, const char* fn_name, uint32_t bytes) {
185  InvocationContext* rpc_ctx = reinterpret_cast<InvocationContext*>(ctx);
186  int64_t elapsed_time = MonotonicMillis() - rpc_ctx->start_time_ms;
187  const string& call_name = string(fn_name);
188  // TODO: bytes is always 0, how come?
189  VLOG_RPC << "RPC call: " << server_name_ << ":" << call_name << " from "
190  << rpc_ctx->cnxn_ctx->network_address << " took "
191  << PrettyPrinter::Print(elapsed_time * 1000L * 1000L, TUnit::TIME_NS);
192  MethodDescriptor* descriptor = rpc_ctx->method_descriptor;
193  delete rpc_ctx;
194  --descriptor->num_in_flight;
195  descriptor->time_stats->Update(elapsed_time);
196 }
boost::mutex method_map_lock_
Protects method_map_ and rpc_counter_.
Definition: rpc-trace.h:112
boost::function< void(const ArgumentMap &args, rapidjson::Document *json)> UrlCallback
Definition: webserver.h:38
MetricGroup * metrics_
Metrics subsystem access.
Definition: rpc-trace.h:121
M * RegisterMetric(M *metric)
Definition: metrics.h:211
boost::mutex lock_
protects all fields below
Definition: coordinator.h:233
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
void RegisterUrlCallback(const std::string &path, const std::string &template_filename, const UrlCallback &callback, bool is_on_nav_bar=true)
Only one callback may be registered per URL.
Definition: webserver.cc:412
const ThriftServer::ConnectionContext * cnxn_ctx
Definition: rpc-trace.h:101
void ResetCallback(const Webserver::ArgumentMap &args, Document *document)
Definition: rpc-trace.cc:98
void RegisterEventHandler(RpcEventHandler *event_handler)
Definition: rpc-trace.cc:80
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
std::map< std::string, std::string > ArgumentMap
Definition: webserver.h:36
void ResetAll()
Resets the statistics for all methods.
Definition: rpc-trace.cc:125
vector< RpcEventHandler * > event_handlers_
Definition: rpc-trace.cc:61
void Reset(const std::string &method_name)
Resets the statistics for a single method.
Definition: rpc-trace.cc:117
scoped_ptr< RpcEventHandlerManager > handler_manager
Definition: rpc-trace.cc:65
std::string server_name_
Name of the server that we listen for events from.
Definition: rpc-trace.h:118
Created per-Rpc invocation.
Definition: rpc-trace.h:94
void JsonCallback(const Webserver::ArgumentMap &args, Document *document)
Definition: rpc-trace.cc:86
Per-connection information.
Definition: thrift-server.h:45
void InitRpcEventTracing(Webserver *webserver)
Initialises rpc event tracing, must be called before any RpcEventHandlers are created.
Definition: rpc-trace.cc:67
std::string name
Name of the method.
Definition: rpc-trace.h:81
virtual void * getContext(const char *fn_name, void *server_context)
Definition: rpc-trace.cc:158
void Update(const T &value)
int64_t MonotonicMillis()
Definition: time.h:35
void JsonCallback(bool always_text, const Webserver::ArgumentMap &args, Document *document)
StatsMetric< double > * time_stats
Summary statistics for the time taken to respond to this method.
Definition: rpc-trace.h:84
void ToJson(rapidjson::Value *server, rapidjson::Document *document)
Definition: rpc-trace.cc:138
const int64_t start_time_ms
Monotonic milliseconds (typically boot time) when the call started.
Definition: rpc-trace.h:96
#define VLOG_RPC
Definition: logging.h:56
virtual void postWrite(void *ctx, const char *fn_name, uint32_t bytes)
Definition: rpc-trace.cc:184
MethodMap method_map_
Map of all methods, populated lazily as they are invoked for the first time.
Definition: rpc-trace.h:115
std::string server_name() const
Definition: rpc-trace.h:75
MethodDescriptor * method_descriptor
Pointer to parent MethodDescriptor, to save a lookup on deletion.
Definition: rpc-trace.h:104
AtomicInt< uint32_t > num_in_flight
Number of invocations in flight.
Definition: rpc-trace.h:87
string name
Definition: cpu-info.cc:50
RpcEventHandler(const std::string &server_name, MetricGroup *metrics)
Definition: rpc-trace.cc:133