Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
impala-server-callbacks.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "service/impala-server.h"
16 
17 #include <sstream>
18 #include <boost/thread/mutex.hpp>
19 #include <gutil/strings/substitute.h>
20 
21 #include "catalog/catalog-util.h"
23 #include "util/webserver.h"
24 
25 #include "gen-cpp/beeswax_types.h"
26 #include "thrift/protocol/TDebugProtocol.h"
27 #include "util/redactor.h"
28 #include "util/summary-util.h"
29 #include "util/url-coding.h"
30 
31 #include "common/names.h"
32 
33 using boost::adopt_lock_t;
34 using namespace apache::thrift;
35 using namespace beeswax;
36 using namespace impala;
37 using namespace rapidjson;
38 using namespace strings;
39 
40 DECLARE_int32(query_log_size);
41 
42 void ImpalaServer::RegisterWebserverCallbacks(Webserver* webserver) {
43  DCHECK(webserver != NULL);
44 
45  Webserver::UrlCallback hadoop_varz_callback =
46  bind<void>(mem_fn(&ImpalaServer::HadoopVarzUrlCallback), this, _1, _2);
47  webserver->RegisterUrlCallback("/hadoop-varz", "hadoop-varz.tmpl",
48  hadoop_varz_callback);
49 
50  Webserver::UrlCallback query_json_callback =
51  bind<void>(mem_fn(&ImpalaServer::QueryStateUrlCallback), this, _1, _2);
52  webserver->RegisterUrlCallback("/queries", "queries.tmpl",
53  query_json_callback);
54 
55  Webserver::UrlCallback sessions_json_callback =
56  bind<void>(mem_fn(&ImpalaServer::SessionsUrlCallback), this, _1, _2);
57  webserver->RegisterUrlCallback("/sessions", "sessions.tmpl",
58  sessions_json_callback);
59 
60  Webserver::UrlCallback catalog_callback =
61  bind<void>(mem_fn(&ImpalaServer::CatalogUrlCallback), this, _1, _2);
62  webserver->RegisterUrlCallback("/catalog", "catalog.tmpl",
63  catalog_callback);
64 
65  Webserver::UrlCallback catalog_objects_callback =
66  bind<void>(mem_fn(&ImpalaServer::CatalogObjectsUrlCallback), this, _1, _2);
67  webserver->RegisterUrlCallback("/catalog_object", "catalog_object.tmpl",
68  catalog_objects_callback, false);
69 
70  Webserver::UrlCallback profile_callback =
71  bind<void>(mem_fn(&ImpalaServer::QueryProfileUrlCallback), this, _1, _2);
72  webserver->RegisterUrlCallback("/query_profile", "query_profile.tmpl",
73  profile_callback, false);
74 
75  Webserver::UrlCallback cancel_callback =
76  bind<void>(mem_fn(&ImpalaServer::CancelQueryUrlCallback), this, _1, _2);
77  webserver->RegisterUrlCallback("/cancel_query", "common-pre.tmpl", cancel_callback,
78  false);
79 
80  Webserver::UrlCallback profile_encoded_callback =
81  bind<void>(mem_fn(&ImpalaServer::QueryProfileEncodedUrlCallback), this, _1, _2);
82  webserver->RegisterUrlCallback("/query_profile_encoded", "raw_text.tmpl",
83  profile_encoded_callback, false);
84 
85  Webserver::UrlCallback inflight_query_ids_callback =
86  bind<void>(mem_fn(&ImpalaServer::InflightQueryIdsUrlCallback), this, _1, _2);
87  webserver->RegisterUrlCallback("/inflight_query_ids", "raw_text.tmpl",
88  inflight_query_ids_callback, false);
89 
90  Webserver::UrlCallback query_summary_callback =
91  bind<void>(mem_fn(&ImpalaServer::QuerySummaryCallback), this, false, true, _1, _2);
92  webserver->RegisterUrlCallback("/query_summary", "query_summary.tmpl",
93  query_summary_callback, false);
94 
95  Webserver::UrlCallback query_plan_callback =
96  bind<void>(mem_fn(&ImpalaServer::QuerySummaryCallback), this, true, true, _1, _2);
97  webserver->RegisterUrlCallback("/query_plan", "query_plan.tmpl",
98  query_plan_callback, false);
99 
100  Webserver::UrlCallback query_plan_text_callback =
101  bind<void>(mem_fn(&ImpalaServer::QuerySummaryCallback), this, false, false, _1, _2);
102  webserver->RegisterUrlCallback("/query_plan_text", "query_plan_text.tmpl",
103  query_plan_text_callback, false);
104  webserver->RegisterUrlCallback("/query_stmt", "query_stmt.tmpl",
105  query_plan_text_callback, false);
106 }
107 
108 void ImpalaServer::HadoopVarzUrlCallback(const Webserver::ArgumentMap& args,
109  Document* document) {
110  TGetAllHadoopConfigsResponse response;
111  Status status = exec_env_->frontend()->GetAllHadoopConfigs(&response);
112  if (!status.ok()) return;
113 
114  Value configs(kArrayType);
115  typedef map<string, string> ConfigMap;
116  BOOST_FOREACH(const ConfigMap::value_type& config, response.configs) {
117  Value key(config.first.c_str(), document->GetAllocator());
118  Value value(config.second.c_str(), document->GetAllocator());
119  Value config_json(kObjectType);
120  config_json.AddMember("key", key, document->GetAllocator());
121  config_json.AddMember("value", value, document->GetAllocator());
122  configs.PushBack(config_json, document->GetAllocator());
123  }
124  document->AddMember("configs", configs, document->GetAllocator());
125 }
126 
127 // We expect the query id to be passed as one parameter, 'query_id'.
128 // Returns true if the query id was present and valid; false otherwise.
129 static Status ParseQueryId(const Webserver::ArgumentMap& args, TUniqueId* id) {
130  Webserver::ArgumentMap::const_iterator it = args.find("query_id");
131  if (it == args.end()) {
132  return Status("No 'query_id' argument found");
133  } else {
134  if (ParseId(it->second, id)) return Status::OK;
135  return Status(Substitute("Could not parse 'query_id' argument: $0", it->second));
136  }
137 }
138 
139 void ImpalaServer::CancelQueryUrlCallback(const Webserver::ArgumentMap& args,
140  Document* document) {
141  TUniqueId unique_id;
142  Status status = ParseQueryId(args, &unique_id);
143  if (!status.ok()) {
144  Value error(status.GetDetail().c_str(), document->GetAllocator());
145  document->AddMember("error", error, document->GetAllocator());
146  return;
147  }
148  Status cause("Cancelled from Impala's debug web interface");
149  status = UnregisterQuery(unique_id, true, &cause);
150  if (!status.ok()) {
151  Value error(status.GetDetail().c_str(), document->GetAllocator());
152  document->AddMember("error", error, document->GetAllocator());
153  return;
154  }
155  Value message("Query cancellation successful", document->GetAllocator());
156  document->AddMember("contents", message, document->GetAllocator());
157 }
158 
159 void ImpalaServer::QueryProfileUrlCallback(const Webserver::ArgumentMap& args,
160  Document* document) {
161  TUniqueId unique_id;
162  Status parse_status = ParseQueryId(args, &unique_id);
163  if (!parse_status.ok()) {
164  Value error(parse_status.GetDetail().c_str(), document->GetAllocator());
165  document->AddMember("error", error, document->GetAllocator());
166  return;
167  }
168 
169  stringstream ss;
170  Status status = GetRuntimeProfileStr(unique_id, false, &ss);
171  if (!status.ok()) {
172  Value error(status.GetDetail().c_str(), document->GetAllocator());
173  document->AddMember("error", error, document->GetAllocator());
174  return;
175  }
176 
177  Value profile(ss.str().c_str(), document->GetAllocator());
178  document->AddMember("profile", profile, document->GetAllocator());
179  document->AddMember("query_id", args.find("query_id")->second.c_str(),
180  document->GetAllocator());
181 }
182 
183 void ImpalaServer::QueryProfileEncodedUrlCallback(const Webserver::ArgumentMap& args,
184  Document* document) {
185  TUniqueId unique_id;
186  stringstream ss;
187  Status status = ParseQueryId(args, &unique_id);
188  if (!status.ok()) {
189  ss << status.GetDetail();
190  } else {
191  Status status = GetRuntimeProfileStr(unique_id, true, &ss);
192  if (!status.ok()) {
193  ss.str(Substitute("Could not obtain runtime profile: $0", status.GetDetail()));
194  }
195  }
196 
197  document->AddMember(Webserver::ENABLE_RAW_JSON_KEY, true, document->GetAllocator());
198  Value profile(ss.str().c_str(), document->GetAllocator());
199  document->AddMember("contents", profile, document->GetAllocator());
200 }
201 
202 void ImpalaServer::InflightQueryIdsUrlCallback(const Webserver::ArgumentMap& args,
203  Document* document) {
204  lock_guard<mutex> l(query_exec_state_map_lock_);
205  stringstream ss;
206  BOOST_FOREACH(const QueryExecStateMap::value_type& exec_state, query_exec_state_map_) {
207  ss << exec_state.second->query_id() << "\n";
208  }
209  document->AddMember(Webserver::ENABLE_RAW_JSON_KEY, true, document->GetAllocator());
210  Value query_ids(ss.str().c_str(), document->GetAllocator());
211  document->AddMember("contents", query_ids, document->GetAllocator());
212 }
213 
214 void ImpalaServer::QueryStateToJson(const ImpalaServer::QueryStateRecord& record,
215  Value* value, Document* document) {
216  Value user(record.effective_user.c_str(), document->GetAllocator());
217  value->AddMember("effective_user", user, document->GetAllocator());
218 
219  Value default_db(record.default_db.c_str(), document->GetAllocator());
220  value->AddMember("default_db", default_db, document->GetAllocator());
221 
222  // Redact the query string
223  Value stmt(RedactCopy(record.stmt).c_str(), document->GetAllocator());
224  value->AddMember("stmt", stmt, document->GetAllocator());
225 
226  Value stmt_type(_TStmtType_VALUES_TO_NAMES.find(record.stmt_type)->second,
227  document->GetAllocator());
228  value->AddMember("stmt_type", stmt_type, document->GetAllocator());
229 
230  Value start_time(record.start_time.DebugString().c_str(), document->GetAllocator());
231  value->AddMember("start_time", start_time, document->GetAllocator());
232 
233  Value end_time(record.end_time.DebugString().c_str(), document->GetAllocator());
234  value->AddMember("end_time", end_time, document->GetAllocator());
235 
236  const TimestampValue& end_timestamp =
237  record.end_time.HasDate() ? record.end_time : TimestampValue::LocalTime();
238  double duration =
239  end_timestamp.ToSubsecondUnixTime() - record.start_time.ToSubsecondUnixTime();
240  const string& printed_duration = PrettyPrinter::Print(duration, TUnit::TIME_S);
241  Value val_duration(printed_duration.c_str(), document->GetAllocator());
242  value->AddMember("duration", val_duration, document->GetAllocator());
243 
244  string progress = "N/A";
245  if (record.has_coord) {
246  stringstream ss;
247  ss << record.num_complete_fragments << " / " << record.total_fragments
248  << " (" << setw(4);
249  if (record.total_fragments == 0) {
250  ss << "0%)";
251  } else {
252  ss << (100.0 * record.num_complete_fragments / (1.f * record.total_fragments))
253  << "%)";
254  }
255  progress = ss.str();
256  }
257  Value progress_json(progress.c_str(), document->GetAllocator());
258  value->AddMember("progress", progress_json, document->GetAllocator());
259 
260  Value state(_QueryState_VALUES_TO_NAMES.find(record.query_state)->second,
261  document->GetAllocator());
262  value->AddMember("state", state, document->GetAllocator());
263 
264  value->AddMember("rows_fetched", record.num_rows_fetched, document->GetAllocator());
265 
266  Value query_id(PrintId(record.id).c_str(), document->GetAllocator());
267  value->AddMember("query_id", query_id, document->GetAllocator());
268 
269  if (record.event_sequence.labels.size() > 0) {
270  Value last_event(record.event_sequence.labels.back().c_str(),
271  document->GetAllocator());
272  value->AddMember("last_event", last_event, document->GetAllocator());
273  }
274 }
275 
276 void ImpalaServer::QueryStateUrlCallback(const Webserver::ArgumentMap& args,
277  Document* document) {
278  set<QueryStateRecord, QueryStateRecord> sorted_query_records;
279  {
280  lock_guard<mutex> l(query_exec_state_map_lock_);
281  BOOST_FOREACH(
282  const QueryExecStateMap::value_type& exec_state, query_exec_state_map_) {
283  // TODO: Do this in the browser so that sorts on other keys are possible.
284  sorted_query_records.insert(QueryStateRecord(*exec_state.second));
285  }
286  }
287 
288  Value in_flight_queries(kArrayType);
289  BOOST_FOREACH(const QueryStateRecord& record, sorted_query_records) {
290  Value record_json(kObjectType);
291  QueryStateToJson(record, &record_json, document);
292  in_flight_queries.PushBack(record_json, document->GetAllocator());
293  }
294  document->AddMember("in_flight_queries", in_flight_queries, document->GetAllocator());
295  document->AddMember("num_in_flight_queries", sorted_query_records.size(),
296  document->GetAllocator());
297 
298  Value completed_queries(kArrayType);
299  {
300  lock_guard<mutex> l(query_log_lock_);
301  BOOST_FOREACH(const QueryStateRecord& log_entry, query_log_) {
302  Value record_json(kObjectType);
303  QueryStateToJson(log_entry, &record_json, document);
304  completed_queries.PushBack(record_json, document->GetAllocator());
305  }
306  }
307  document->AddMember("completed_queries", completed_queries, document->GetAllocator());
308  document->AddMember("completed_log_size", FLAGS_query_log_size,
309  document->GetAllocator());
310 
311  Value query_locations(kArrayType);
312  {
313  lock_guard<mutex> l(query_locations_lock_);
314  BOOST_FOREACH(const QueryLocations::value_type& location, query_locations_) {
315  Value location_json(kObjectType);
316  Value location_name(lexical_cast<string>(location.first).c_str(),
317  document->GetAllocator());
318  location_json.AddMember("location", location_name, document->GetAllocator());
319  location_json.AddMember("count", location.second.size(),
320  document->GetAllocator());
321  query_locations.PushBack(location_json, document->GetAllocator());
322  }
323  }
324  document->AddMember("query_locations", query_locations, document->GetAllocator());
325 }
326 
327 
328 void ImpalaServer::SessionsUrlCallback(const Webserver::ArgumentMap& args,
329  Document* document) {
330  lock_guard<mutex> l(session_state_map_lock_);
331  Value sessions(kArrayType);
332  BOOST_FOREACH(const SessionStateMap::value_type& session, session_state_map_) {
333  shared_ptr<SessionState> state = session.second;
334  Value session_json(kObjectType);
335  Value type(PrintTSessionType(state->session_type).c_str(),
336  document->GetAllocator());
337  session_json.AddMember("type", type, document->GetAllocator());
338 
339  session_json.AddMember("num_queries", state->inflight_queries.size(),
340  document->GetAllocator());
341 
342  Value user(state->connected_user.c_str(), document->GetAllocator());
343  session_json.AddMember("user", user, document->GetAllocator());
344 
345  Value delegated_user(state->do_as_user.c_str(), document->GetAllocator());
346  session_json.AddMember("delegated_user", delegated_user, document->GetAllocator());
347 
348  Value session_id(PrintId(session.first).c_str(), document->GetAllocator());
349  session_json.AddMember("session_id", session_id, document->GetAllocator());
350 
351  Value network_address(lexical_cast<string>(state->network_address).c_str(),
352  document->GetAllocator());
353  session_json.AddMember("network_address", network_address, document->GetAllocator());
354 
355  Value default_db(state->database.c_str(), document->GetAllocator());
356  session_json.AddMember("default_database", default_db, document->GetAllocator());
357 
358  Value start_time(state->start_time.DebugString().c_str(), document->GetAllocator());
359  session_json.AddMember("start_time", start_time, document->GetAllocator());
360 
361  Value last_accessed(
362  TimestampValue(session.second->last_accessed_ms / 1000).DebugString().c_str(),
363  document->GetAllocator());
364  session_json.AddMember("last_accessed", last_accessed, document->GetAllocator());
365 
366  session_json.AddMember("expired", state->expired, document->GetAllocator());
367  session_json.AddMember("closed", state->closed, document->GetAllocator());
368  session_json.AddMember("ref_count", state->ref_count, document->GetAllocator());
369  sessions.PushBack(session_json, document->GetAllocator());
370  }
371 
372  document->AddMember("sessions", sessions, document->GetAllocator());
373  document->AddMember("num_sessions", session_state_map_.size(), document->GetAllocator());
374 }
375 
376 void ImpalaServer::CatalogUrlCallback(const Webserver::ArgumentMap& args,
377  Document* document) {
378  TGetDbsResult get_dbs_result;
379  Status status = exec_env_->frontend()->GetDbNames(NULL, NULL, &get_dbs_result);
380  if (!status.ok()) {
381  Value error(status.GetDetail().c_str(), document->GetAllocator());
382  document->AddMember("error", error, document->GetAllocator());
383  return;
384  }
385 
386  Value databases(kArrayType);
387  BOOST_FOREACH(const string& db, get_dbs_result.dbs) {
388  Value database(kObjectType);
389  Value str(db.c_str(), document->GetAllocator());
390  database.AddMember("name", str, document->GetAllocator());
391 
392  TGetTablesResult get_table_results;
393  Status status =
394  exec_env_->frontend()->GetTableNames(db, NULL, NULL, &get_table_results);
395  if (!status.ok()) {
396  Value error(status.GetDetail().c_str(), document->GetAllocator());
397  database.AddMember("error", error, document->GetAllocator());
398  continue;
399  }
400 
401  Value table_array(kArrayType);
402  BOOST_FOREACH(const string& table, get_table_results.tables) {
403  Value table_obj(kObjectType);
404  Value fq_name(Substitute("$0.$1", db, table).c_str(), document->GetAllocator());
405  table_obj.AddMember("fqtn", fq_name, document->GetAllocator());
406  Value table_name(table.c_str(), document->GetAllocator());
407  table_obj.AddMember("name", table_name, document->GetAllocator());
408  table_array.PushBack(table_obj, document->GetAllocator());
409  }
410  database.AddMember("num_tables", table_array.Size(), document->GetAllocator());
411  database.AddMember("tables", table_array, document->GetAllocator());
412  databases.PushBack(database, document->GetAllocator());
413  }
414  document->AddMember("databases", databases, document->GetAllocator());
415 }
416 
417 void ImpalaServer::CatalogObjectsUrlCallback(const Webserver::ArgumentMap& args,
418  Document* document) {
419  Webserver::ArgumentMap::const_iterator object_type_arg = args.find("object_type");
420  Webserver::ArgumentMap::const_iterator object_name_arg = args.find("object_name");
421  if (object_type_arg != args.end() && object_name_arg != args.end()) {
422  TCatalogObjectType::type object_type =
423  TCatalogObjectTypeFromName(object_type_arg->second);
424 
425  // Get the object type and name from the topic entry key
426  TCatalogObject request;
427  TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request);
428 
429  // Get the object and dump its contents.
430  TCatalogObject result;
431  Status status = exec_env_->frontend()->GetCatalogObject(request, &result);
432  if (status.ok()) {
433  Value debug_string(ThriftDebugString(result).c_str(), document->GetAllocator());
434  document->AddMember("thrift_string", debug_string, document->GetAllocator());
435  } else {
436  Value error(status.GetDetail().c_str(), document->GetAllocator());
437  document->AddMember("error", error, document->GetAllocator());
438  }
439  } else {
440  Value error("Please specify values for the object_type and object_name parameters.",
441  document->GetAllocator());
442  document->AddMember("error", error, document->GetAllocator());
443  }
444 }
445 
446 // Helper for PlanToJson(), processes a single list of plan nodes which are the
447 // DFS-flattened representation of a single plan fragment. Called recursively, the
448 // iterator parameter is updated in place so that when a recursive call returns, the
449 // caller is pointing at the next of its children.
450 void PlanToJsonHelper(const map<TPlanNodeId, TPlanNodeExecSummary>& summaries,
451  const vector<TPlanNode>& nodes,
452  vector<TPlanNode>::const_iterator* it, rapidjson::Document* document, Value* value) {
453  Value children(kArrayType);
454  value->AddMember("label", (*it)->label.c_str(), document->GetAllocator());
455  // Node "details" may contain exprs which should be redacted.
456  Value label_detail(RedactCopy((*it)->label_detail).c_str(), document->GetAllocator());
457  value->AddMember("label_detail", label_detail, document->GetAllocator());
458 
459  TPlanNodeId id = (*it)->node_id;
460  map<TPlanNodeId, TPlanNodeExecSummary>::const_iterator summary = summaries.find(id);
461  if (summary != summaries.end()) {
462  int64_t cardinality = 0;
463  int64_t max_time = 0L;
464  int64_t total_time = 0;
465  BOOST_FOREACH(const TExecStats& stat, summary->second.exec_stats) {
466  if (summary->second.is_broadcast) {
467  // Avoid multiple-counting for recipients of broadcasts.
468  cardinality = ::max(cardinality, stat.cardinality);
469  } else {
470  cardinality += stat.cardinality;
471  }
472  total_time += stat.latency_ns;
473  max_time = ::max(max_time, stat.latency_ns);
474  }
475  value->AddMember("output_card", cardinality, document->GetAllocator());
476  value->AddMember("num_instances", summary->second.exec_stats.size(),
477  document->GetAllocator());
478  if (summary->second.is_broadcast) {
479  value->AddMember("is_broadcast", true, document->GetAllocator());
480  }
481 
482  const string& max_time_str = PrettyPrinter::Print(max_time, TUnit::TIME_NS);
483  Value max_time_str_json(max_time_str.c_str(), document->GetAllocator());
484  value->AddMember("max_time", max_time_str_json, document->GetAllocator());
485  value->AddMember("max_time_val", max_time, document->GetAllocator());
486 
487  // Round to the nearest ns, to workaround a bug in pretty-printing a fraction of a
488  // ns. See IMPALA-1800.
489  const string& avg_time_str = PrettyPrinter::Print(
490  // A bug may occasionally cause 1-instance nodes to appear to have 0 instances.
491  total_time / ::max(static_cast<int>(summary->second.exec_stats.size()), 1),
492  TUnit::TIME_NS);
493  Value avg_time_str_json(avg_time_str.c_str(), document->GetAllocator());
494  value->AddMember("avg_time", avg_time_str_json, document->GetAllocator());
495  }
496 
497  int num_children = (*it)->num_children;
498  for (int i = 0; i < num_children; ++i) {
499  ++(*it);
500  Value container(kObjectType);
501  PlanToJsonHelper(summaries, nodes, it, document, &container);
502  children.PushBack(container, document->GetAllocator());
503  }
504  value->AddMember("children", children, document->GetAllocator());
505 }
506 
507 // Helper method which converts a list of plan fragments into a single JSON document, with
508 // the following schema:
509 // "plan_nodes": [
510 // {
511 // "label": "12:AGGREGATE",
512 // "label_detail": "FINALIZE",
513 // "output_card": 23456,
514 // "num_instances": 34,
515 // "max_time": "1m23s",
516 // "avg_time": "1.3ms",
517 // "children": [
518 // {
519 // "label": "11:EXCHANGE",
520 // "label_detail": "UNPARTITIONED",
521 // "children": []
522 // }
523 // ]
524 // },
525 // {
526 // "label": "07:AGGREGATE",
527 // "label_detail": "",
528 // "children": [],
529 // "data_stream_target": "11:EXCHANGE"
530 // }
531 // ]
532 void PlanToJson(const vector<TPlanFragment>& fragments, const TExecSummary& summary,
533  rapidjson::Document* document, Value* value) {
534  // Build a map from id to label so that we can resolve the targets of data-stream sinks
535  // and connect plan fragments.
536  map<TPlanNodeId, string> label_map;
537  BOOST_FOREACH(const TPlanFragment& fragment, fragments) {
538  BOOST_FOREACH(const TPlanNode& node, fragment.plan.nodes) {
539  label_map[node.node_id] = node.label;
540  }
541  }
542 
543  map<TPlanNodeId, TPlanNodeExecSummary> exec_summaries;
544  BOOST_FOREACH(const TPlanNodeExecSummary& s, summary.nodes) {
545  exec_summaries[s.node_id] = s;
546  }
547 
548  Value nodes(kArrayType);
549  BOOST_FOREACH(const TPlanFragment& fragment, fragments) {
550  Value plan_fragment(kObjectType);
551  vector<TPlanNode>::const_iterator it = fragment.plan.nodes.begin();
552  PlanToJsonHelper(exec_summaries, fragment.plan.nodes, &it, document, &plan_fragment);
553  if (fragment.__isset.output_sink) {
554  const TDataSink& sink = fragment.output_sink;
555  if (sink.__isset.stream_sink) {
556  plan_fragment.AddMember("data_stream_target",
557  label_map[sink.stream_sink.dest_node_id].c_str(), document->GetAllocator());
558  }
559  }
560  nodes.PushBack(plan_fragment, document->GetAllocator());
561  }
562  value->AddMember("plan_nodes", nodes, document->GetAllocator());
563 }
564 
565 void ImpalaServer::QuerySummaryCallback(bool include_json_plan, bool include_summary,
566  const Webserver::ArgumentMap& args, Document* document) {
567  TUniqueId query_id;
568  Status status = ParseQueryId(args, &query_id);
569  if (!status.ok()) {
570  // Redact the error message, it may contain part or all of the query.
571  Value json_error(RedactCopy(status.GetDetail()).c_str(), document->GetAllocator());
572  document->AddMember("error", json_error, document->GetAllocator());
573  return;
574  }
575 
576  TExecSummary summary;
577  string stmt;
578  string plan;
579  Status query_status;
580  bool found = false;
581  vector<TPlanFragment> fragments;
582 
583  // Search the in-flight queries first, followed by the archived ones.
584  {
585  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
586  if (exec_state != NULL) {
587  found = true;
588  lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
589  if (exec_state->coord() == NULL) {
590  const string& err = Substitute("Invalid query id: $0", PrintId(query_id));
591  Value json_error(err.c_str(), document->GetAllocator());
592  document->AddMember("error", json_error, document->GetAllocator());
593  return;
594  }
595  query_status = exec_state->query_status();
596  stmt = exec_state->sql_stmt();
597  plan = exec_state->exec_request().query_exec_request.query_plan;
598  if (include_json_plan || include_summary) {
599  lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock());
600  summary = exec_state->coord()->exec_summary();
601  }
602  if (include_json_plan) {
603  fragments = exec_state->exec_request().query_exec_request.fragments;
604  }
605  }
606  }
607 
608  if (!found) {
609  lock_guard<mutex> l(query_log_lock_);
610  QueryLogIndex::const_iterator query_record = query_log_index_.find(query_id);
611  if (query_record == query_log_index_.end()) {
612  const string& err = Substitute("Unknown query id: $0", PrintId(query_id));
613  Value json_error(err.c_str(), document->GetAllocator());
614  document->AddMember("error", json_error, document->GetAllocator());
615  return;
616  }
617  if (include_json_plan || include_summary) {
618  summary = query_record->second->exec_summary;
619  }
620  stmt = query_record->second->stmt;
621  plan = query_record->second->plan;
622  query_status = query_record->second->query_status;
623  if (include_json_plan) {
624  fragments = query_record->second->fragments;
625  }
626  }
627 
628  if (include_json_plan) {
629  Value v(kObjectType);
630  PlanToJson(fragments, summary, document, &v);
631  document->AddMember("plan_json", v, document->GetAllocator());
632  }
633  if (include_summary) {
634  const string& printed_summary = PrintExecSummary(summary);
635  Value json_summary(printed_summary.c_str(), document->GetAllocator());
636  document->AddMember("summary", json_summary, document->GetAllocator());
637  }
638  Value json_stmt(RedactCopy(stmt).c_str(), document->GetAllocator());
639  document->AddMember("stmt", json_stmt, document->GetAllocator());
640  Value json_plan_text(RedactCopy(plan).c_str(), document->GetAllocator());
641  document->AddMember("plan", json_plan_text, document->GetAllocator());
642 
643  // Redact the error in case the query is contained in the error message.
644  Value json_status(query_status.ok() ? "OK" :
645  RedactCopy(query_status.GetDetail()).c_str(), document->GetAllocator());
646  document->AddMember("status", json_status, document->GetAllocator());
647  Value json_id(PrintId(query_id).c_str(), document->GetAllocator());
648  document->AddMember("query_id", json_id, document->GetAllocator());
649 }
Status TCatalogObjectFromObjectName(const TCatalogObjectType::type &object_type, const string &object_name, TCatalogObject *catalog_object)
Definition: catalog-util.cc:68
DECLARE_int32(query_log_size)
std::string stmt
SQL statement text.
const std::string GetDetail() const
Definition: status.cc:184
std::string RedactCopy(const std::string &original)
Utility function to redacted a string without modifying the original.
Definition: redactor.h:63
bool ParseId(const string &s, TUniqueId *id)
Definition: debug-util.cc:112
void PlanToJson(const vector< TPlanFragment > &fragments, const TExecSummary &summary, rapidjson::Document *document, Value *value)
boost::function< void(const ArgumentMap &args, rapidjson::Document *json)> UrlCallback
Definition: webserver.h:38
int64_t num_complete_fragments
The number of fragments that have completed.
int64_t num_rows_fetched
The number of rows fetched by the client.
const TUniqueId & query_id() const
Definition: coordinator.h:152
void RegisterUrlCallback(const std::string &path, const std::string &template_filename, const UrlCallback &callback, bool is_on_nav_bar=true)
Only one callback may be registered per URL.
Definition: webserver.cc:412
beeswax::QueryState::type query_state
The state of the query as of this snapshot.
string PrintId(const TUniqueId &id, const string &separator)
Definition: debug-util.cc:97
Frontend * frontend()
Definition: exec-env.h:91
std::string PrintTSessionType(const TSessionType::type &type)
Status GetCatalogObject(const TCatalogObject &request, TCatalogObject *response)
Definition: frontend.cc:168
TCatalogObjectType::type TCatalogObjectTypeFromName(const string &name)
Definition: catalog-util.cc:29
std::map< std::string, std::string > ArgumentMap
Definition: webserver.h:36
int64_t total_fragments
The total number of fragments.
TStmtType::type stmt_type
DDL, DML etc.
static Status ParseQueryId(const Webserver::ArgumentMap &args, TUniqueId *id)
void PrintExecSummary(const TExecSummary &exec_summary, int indent_level, int new_indent_level, int *node_idx, vector< vector< string > > *result)
Definition: summary-util.cc:34
TEventSequence event_sequence
Timeline of important query events.
Snapshot of a query's state, archived in the query log.
Status GetDbNames(const std::string *pattern, const TSessionState *session, TGetDbsResult *table_names)
Definition: frontend.cc:128
std::string default_db
default db for this query
double ToSubsecondUnixTime() const
const ProgressUpdater & progress()
Definition: coordinator.h:166
bool has_coord
True if the query required a coordinator fragment.
ExecEnv * exec_env_
Definition: coordinator.h:193
void PlanToJsonHelper(const map< TPlanNodeId, TPlanNodeExecSummary > &summaries, const vector< TPlanNode > &nodes, vector< TPlanNode >::const_iterator *it, rapidjson::Document *document, Value *value)
Status GetTableNames(const std::string &db, const std::string *pattern, const TSessionState *session, TGetTablesResult *table_names)
Definition: frontend.cc:119
bool ok() const
Definition: status.h:172
TimestampValue start_time
Start and end time of the query.
Status GetAllHadoopConfigs(TGetAllHadoopConfigsResponse *result)
Returns all Hadoop configurations in key, value form in result.
Definition: frontend.cc:211
std::string DebugString() const