18 #include <boost/thread/mutex.hpp>
19 #include <gutil/strings/substitute.h>
25 #include "gen-cpp/beeswax_types.h"
26 #include "thrift/protocol/TDebugProtocol.h"
33 using boost::adopt_lock_t;
34 using namespace apache::thrift;
35 using namespace beeswax;
36 using namespace impala;
37 using namespace rapidjson;
38 using namespace strings;
42 void ImpalaServer::RegisterWebserverCallbacks(
Webserver* webserver) {
43 DCHECK(webserver != NULL);
46 bind<void>(mem_fn(&ImpalaServer::HadoopVarzUrlCallback),
this, _1, _2);
48 hadoop_varz_callback);
51 bind<void>(mem_fn(&ImpalaServer::QueryStateUrlCallback),
this, _1, _2);
56 bind<void>(mem_fn(&ImpalaServer::SessionsUrlCallback),
this, _1, _2);
58 sessions_json_callback);
61 bind<void>(mem_fn(&ImpalaServer::CatalogUrlCallback),
this, _1, _2);
66 bind<void>(mem_fn(&ImpalaServer::CatalogObjectsUrlCallback),
this, _1, _2);
68 catalog_objects_callback,
false);
71 bind<void>(mem_fn(&ImpalaServer::QueryProfileUrlCallback),
this, _1, _2);
73 profile_callback,
false);
76 bind<void>(mem_fn(&ImpalaServer::CancelQueryUrlCallback),
this, _1, _2);
81 bind<void>(mem_fn(&ImpalaServer::QueryProfileEncodedUrlCallback),
this, _1, _2);
83 profile_encoded_callback,
false);
86 bind<void>(mem_fn(&ImpalaServer::InflightQueryIdsUrlCallback),
this, _1, _2);
88 inflight_query_ids_callback,
false);
91 bind<void>(mem_fn(&ImpalaServer::QuerySummaryCallback),
this,
false,
true, _1, _2);
93 query_summary_callback,
false);
96 bind<void>(mem_fn(&ImpalaServer::QuerySummaryCallback),
this,
true,
true, _1, _2);
98 query_plan_callback,
false);
101 bind<void>(mem_fn(&ImpalaServer::QuerySummaryCallback),
this,
false,
false, _1, _2);
103 query_plan_text_callback,
false);
105 query_plan_text_callback,
false);
109 Document* document) {
110 TGetAllHadoopConfigsResponse response;
112 if (!status.
ok())
return;
114 Value configs(kArrayType);
115 typedef map<string, string> ConfigMap;
116 BOOST_FOREACH(
const ConfigMap::value_type& config, response.configs) {
117 Value key(config.first.c_str(), document->GetAllocator());
118 Value value(config.second.c_str(), document->GetAllocator());
119 Value config_json(kObjectType);
120 config_json.AddMember(
"key", key, document->GetAllocator());
121 config_json.AddMember(
"value", value, document->GetAllocator());
122 configs.PushBack(config_json, document->GetAllocator());
124 document->AddMember(
"configs", configs, document->GetAllocator());
130 Webserver::ArgumentMap::const_iterator it = args.find(
"query_id");
131 if (it == args.end()) {
132 return Status(
"No 'query_id' argument found");
135 return Status(Substitute(
"Could not parse 'query_id' argument: $0", it->second));
140 Document* document) {
144 Value error(status.
GetDetail().c_str(), document->GetAllocator());
145 document->AddMember(
"error", error, document->GetAllocator());
148 Status cause(
"Cancelled from Impala's debug web interface");
149 status = UnregisterQuery(unique_id,
true, &cause);
151 Value error(status.
GetDetail().c_str(), document->GetAllocator());
152 document->AddMember(
"error", error, document->GetAllocator());
155 Value message(
"Query cancellation successful", document->GetAllocator());
156 document->AddMember(
"contents", message, document->GetAllocator());
160 Document* document) {
163 if (!parse_status.
ok()) {
164 Value error(parse_status.
GetDetail().c_str(), document->GetAllocator());
165 document->AddMember(
"error", error, document->GetAllocator());
170 Status status = GetRuntimeProfileStr(unique_id,
false, &ss);
172 Value error(status.
GetDetail().c_str(), document->GetAllocator());
173 document->AddMember(
"error", error, document->GetAllocator());
177 Value profile(ss.str().c_str(), document->GetAllocator());
178 document->AddMember(
"profile", profile, document->GetAllocator());
179 document->AddMember(
"query_id", args.find(
"query_id")->second.c_str(),
180 document->GetAllocator());
184 Document* document) {
191 Status status = GetRuntimeProfileStr(unique_id,
true, &ss);
193 ss.str(Substitute(
"Could not obtain runtime profile: $0", status.
GetDetail()));
197 document->AddMember(Webserver::ENABLE_RAW_JSON_KEY,
true, document->GetAllocator());
198 Value profile(ss.str().c_str(), document->GetAllocator());
199 document->AddMember(
"contents", profile, document->GetAllocator());
203 Document* document) {
204 lock_guard<mutex> l(query_exec_state_map_lock_);
206 BOOST_FOREACH(
const QueryExecStateMap::value_type& exec_state, query_exec_state_map_) {
207 ss << exec_state.second->query_id() <<
"\n";
209 document->AddMember(Webserver::ENABLE_RAW_JSON_KEY,
true, document->GetAllocator());
210 Value query_ids(ss.str().c_str(), document->GetAllocator());
211 document->AddMember(
"contents", query_ids, document->GetAllocator());
215 Value* value, Document* document) {
216 Value user(record.
effective_user.c_str(), document->GetAllocator());
217 value->AddMember(
"effective_user", user, document->GetAllocator());
219 Value default_db(record.
default_db.c_str(), document->GetAllocator());
220 value->AddMember(
"default_db", default_db, document->GetAllocator());
223 Value stmt(
RedactCopy(record.
stmt).c_str(), document->GetAllocator());
224 value->AddMember(
"stmt", stmt, document->GetAllocator());
226 Value stmt_type(_TStmtType_VALUES_TO_NAMES.find(record.
stmt_type)->second,
227 document->GetAllocator());
228 value->AddMember(
"stmt_type", stmt_type, document->GetAllocator());
231 value->AddMember(
"start_time", start_time, document->GetAllocator());
234 value->AddMember(
"end_time", end_time, document->GetAllocator());
240 const string& printed_duration = PrettyPrinter::Print(duration, TUnit::TIME_S);
241 Value val_duration(printed_duration.c_str(), document->GetAllocator());
242 value->AddMember(
"duration", val_duration, document->GetAllocator());
257 Value progress_json(progress.c_str(), document->GetAllocator());
258 value->AddMember(
"progress", progress_json, document->GetAllocator());
260 Value state(_QueryState_VALUES_TO_NAMES.find(record.
query_state)->second,
261 document->GetAllocator());
262 value->AddMember(
"state", state, document->GetAllocator());
264 value->AddMember(
"rows_fetched", record.
num_rows_fetched, document->GetAllocator());
267 value->AddMember(
"query_id",
query_id, document->GetAllocator());
271 document->GetAllocator());
272 value->AddMember(
"last_event", last_event, document->GetAllocator());
277 Document* document) {
278 set<QueryStateRecord, QueryStateRecord> sorted_query_records;
280 lock_guard<mutex> l(query_exec_state_map_lock_);
282 const QueryExecStateMap::value_type& exec_state, query_exec_state_map_) {
288 Value in_flight_queries(kArrayType);
290 Value record_json(kObjectType);
291 QueryStateToJson(record, &record_json, document);
292 in_flight_queries.PushBack(record_json, document->GetAllocator());
294 document->AddMember(
"in_flight_queries", in_flight_queries, document->GetAllocator());
295 document->AddMember(
"num_in_flight_queries", sorted_query_records.size(),
296 document->GetAllocator());
298 Value completed_queries(kArrayType);
300 lock_guard<mutex> l(query_log_lock_);
302 Value record_json(kObjectType);
303 QueryStateToJson(log_entry, &record_json, document);
304 completed_queries.PushBack(record_json, document->GetAllocator());
307 document->AddMember(
"completed_queries", completed_queries, document->GetAllocator());
308 document->AddMember(
"completed_log_size", FLAGS_query_log_size,
309 document->GetAllocator());
311 Value query_locations(kArrayType);
313 lock_guard<mutex> l(query_locations_lock_);
314 BOOST_FOREACH(
const QueryLocations::value_type& location, query_locations_) {
315 Value location_json(kObjectType);
316 Value location_name(lexical_cast<string>(location.first).c_str(),
317 document->GetAllocator());
318 location_json.AddMember(
"location", location_name, document->GetAllocator());
319 location_json.AddMember(
"count", location.second.size(),
320 document->GetAllocator());
321 query_locations.PushBack(location_json, document->GetAllocator());
324 document->AddMember(
"query_locations", query_locations, document->GetAllocator());
329 Document* document) {
330 lock_guard<mutex> l(session_state_map_lock_);
331 Value sessions(kArrayType);
332 BOOST_FOREACH(
const SessionStateMap::value_type& session, session_state_map_) {
333 shared_ptr<SessionState> state = session.second;
334 Value session_json(kObjectType);
336 document->GetAllocator());
337 session_json.AddMember(
"type", type, document->GetAllocator());
339 session_json.AddMember(
"num_queries", state->inflight_queries.size(),
340 document->GetAllocator());
342 Value user(state->connected_user.c_str(), document->GetAllocator());
343 session_json.AddMember(
"user", user, document->GetAllocator());
345 Value delegated_user(state->do_as_user.c_str(), document->GetAllocator());
346 session_json.AddMember(
"delegated_user", delegated_user, document->GetAllocator());
348 Value session_id(
PrintId(session.first).c_str(), document->GetAllocator());
349 session_json.AddMember(
"session_id", session_id, document->GetAllocator());
351 Value network_address(lexical_cast<string>(state->network_address).c_str(),
352 document->GetAllocator());
353 session_json.AddMember(
"network_address", network_address, document->GetAllocator());
355 Value default_db(state->database.c_str(), document->GetAllocator());
356 session_json.AddMember(
"default_database", default_db, document->GetAllocator());
358 Value start_time(state->start_time.DebugString().c_str(), document->GetAllocator());
359 session_json.AddMember(
"start_time", start_time, document->GetAllocator());
363 document->GetAllocator());
364 session_json.AddMember(
"last_accessed", last_accessed, document->GetAllocator());
366 session_json.AddMember(
"expired", state->expired, document->GetAllocator());
367 session_json.AddMember(
"closed", state->closed, document->GetAllocator());
368 session_json.AddMember(
"ref_count", state->ref_count, document->GetAllocator());
369 sessions.PushBack(session_json, document->GetAllocator());
372 document->AddMember(
"sessions", sessions, document->GetAllocator());
373 document->AddMember(
"num_sessions", session_state_map_.size(), document->GetAllocator());
377 Document* document) {
378 TGetDbsResult get_dbs_result;
381 Value error(status.
GetDetail().c_str(), document->GetAllocator());
382 document->AddMember(
"error", error, document->GetAllocator());
386 Value databases(kArrayType);
387 BOOST_FOREACH(
const string& db, get_dbs_result.dbs) {
388 Value database(kObjectType);
389 Value str(db.c_str(), document->GetAllocator());
390 database.AddMember(
"name", str, document->GetAllocator());
392 TGetTablesResult get_table_results;
396 Value error(status.
GetDetail().c_str(), document->GetAllocator());
397 database.AddMember(
"error", error, document->GetAllocator());
401 Value table_array(kArrayType);
402 BOOST_FOREACH(
const string& table, get_table_results.tables) {
403 Value table_obj(kObjectType);
404 Value fq_name(Substitute(
"$0.$1", db, table).c_str(), document->GetAllocator());
405 table_obj.AddMember(
"fqtn", fq_name, document->GetAllocator());
406 Value table_name(table.c_str(), document->GetAllocator());
407 table_obj.AddMember(
"name", table_name, document->GetAllocator());
408 table_array.PushBack(table_obj, document->GetAllocator());
410 database.AddMember(
"num_tables", table_array.Size(), document->GetAllocator());
411 database.AddMember(
"tables", table_array, document->GetAllocator());
412 databases.PushBack(database, document->GetAllocator());
414 document->AddMember(
"databases", databases, document->GetAllocator());
418 Document* document) {
419 Webserver::ArgumentMap::const_iterator object_type_arg = args.find(
"object_type");
420 Webserver::ArgumentMap::const_iterator object_name_arg = args.find(
"object_name");
421 if (object_type_arg != args.end() && object_name_arg != args.end()) {
422 TCatalogObjectType::type object_type =
426 TCatalogObject request;
430 TCatalogObject result;
433 Value debug_string(ThriftDebugString(result).c_str(), document->GetAllocator());
434 document->AddMember(
"thrift_string", debug_string, document->GetAllocator());
436 Value error(status.
GetDetail().c_str(), document->GetAllocator());
437 document->AddMember(
"error", error, document->GetAllocator());
440 Value error(
"Please specify values for the object_type and object_name parameters.",
441 document->GetAllocator());
442 document->AddMember(
"error", error, document->GetAllocator());
451 const vector<TPlanNode>& nodes,
452 vector<TPlanNode>::const_iterator* it, rapidjson::Document* document, Value* value) {
453 Value children(kArrayType);
454 value->AddMember(
"label", (*it)->label.c_str(), document->GetAllocator());
456 Value label_detail(
RedactCopy((*it)->label_detail).c_str(), document->GetAllocator());
457 value->AddMember(
"label_detail", label_detail, document->GetAllocator());
459 TPlanNodeId
id = (*it)->node_id;
460 map<TPlanNodeId, TPlanNodeExecSummary>::const_iterator summary = summaries.find(
id);
461 if (summary != summaries.end()) {
462 int64_t cardinality = 0;
463 int64_t max_time = 0L;
464 int64_t total_time = 0;
465 BOOST_FOREACH(
const TExecStats& stat, summary->second.exec_stats) {
466 if (summary->second.is_broadcast) {
468 cardinality = ::max(cardinality, stat.cardinality);
470 cardinality += stat.cardinality;
472 total_time += stat.latency_ns;
473 max_time = ::max(max_time, stat.latency_ns);
475 value->AddMember(
"output_card", cardinality, document->GetAllocator());
476 value->AddMember(
"num_instances", summary->second.exec_stats.size(),
477 document->GetAllocator());
478 if (summary->second.is_broadcast) {
479 value->AddMember(
"is_broadcast",
true, document->GetAllocator());
482 const string& max_time_str = PrettyPrinter::Print(max_time, TUnit::TIME_NS);
483 Value max_time_str_json(max_time_str.c_str(), document->GetAllocator());
484 value->AddMember(
"max_time", max_time_str_json, document->GetAllocator());
485 value->AddMember(
"max_time_val", max_time, document->GetAllocator());
489 const string& avg_time_str = PrettyPrinter::Print(
491 total_time / ::max(static_cast<int>(summary->second.exec_stats.size()), 1),
493 Value avg_time_str_json(avg_time_str.c_str(), document->GetAllocator());
494 value->AddMember(
"avg_time", avg_time_str_json, document->GetAllocator());
497 int num_children = (*it)->num_children;
498 for (
int i = 0; i < num_children; ++i) {
500 Value container(kObjectType);
502 children.PushBack(container, document->GetAllocator());
504 value->AddMember(
"children", children, document->GetAllocator());
532 void PlanToJson(
const vector<TPlanFragment>& fragments,
const TExecSummary& summary,
533 rapidjson::Document* document, Value* value) {
536 map<TPlanNodeId, string> label_map;
537 BOOST_FOREACH(
const TPlanFragment& fragment, fragments) {
538 BOOST_FOREACH(
const TPlanNode& node, fragment.plan.nodes) {
539 label_map[node.node_id] = node.label;
543 map<TPlanNodeId, TPlanNodeExecSummary> exec_summaries;
544 BOOST_FOREACH(
const TPlanNodeExecSummary& s, summary.nodes) {
545 exec_summaries[s.node_id] = s;
548 Value nodes(kArrayType);
549 BOOST_FOREACH(
const TPlanFragment& fragment, fragments) {
550 Value plan_fragment(kObjectType);
551 vector<TPlanNode>::const_iterator it = fragment.plan.nodes.begin();
552 PlanToJsonHelper(exec_summaries, fragment.plan.nodes, &it, document, &plan_fragment);
553 if (fragment.__isset.output_sink) {
554 const TDataSink& sink = fragment.output_sink;
555 if (sink.__isset.stream_sink) {
556 plan_fragment.AddMember(
"data_stream_target",
557 label_map[sink.stream_sink.dest_node_id].c_str(), document->GetAllocator());
560 nodes.PushBack(plan_fragment, document->GetAllocator());
562 value->AddMember(
"plan_nodes", nodes, document->GetAllocator());
565 void ImpalaServer::QuerySummaryCallback(
bool include_json_plan,
bool include_summary,
572 document->AddMember(
"error", json_error, document->GetAllocator());
576 TExecSummary summary;
581 vector<TPlanFragment> fragments;
585 shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id,
true);
586 if (exec_state != NULL) {
588 lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
589 if (exec_state->coord() == NULL) {
590 const string& err = Substitute(
"Invalid query id: $0",
PrintId(query_id));
591 Value json_error(err.c_str(), document->GetAllocator());
592 document->AddMember(
"error", json_error, document->GetAllocator());
595 query_status = exec_state->query_status();
596 stmt = exec_state->sql_stmt();
597 plan = exec_state->exec_request().query_exec_request.query_plan;
598 if (include_json_plan || include_summary) {
599 lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock());
600 summary = exec_state->coord()->exec_summary();
602 if (include_json_plan) {
603 fragments = exec_state->exec_request().query_exec_request.fragments;
609 lock_guard<mutex> l(query_log_lock_);
610 QueryLogIndex::const_iterator query_record = query_log_index_.find(query_id);
611 if (query_record == query_log_index_.end()) {
612 const string& err = Substitute(
"Unknown query id: $0",
PrintId(query_id));
613 Value json_error(err.c_str(), document->GetAllocator());
614 document->AddMember(
"error", json_error, document->GetAllocator());
617 if (include_json_plan || include_summary) {
618 summary = query_record->second->exec_summary;
620 stmt = query_record->second->stmt;
621 plan = query_record->second->plan;
622 query_status = query_record->second->query_status;
623 if (include_json_plan) {
624 fragments = query_record->second->fragments;
628 if (include_json_plan) {
629 Value v(kObjectType);
631 document->AddMember(
"plan_json", v, document->GetAllocator());
633 if (include_summary) {
635 Value json_summary(printed_summary.c_str(), document->GetAllocator());
636 document->AddMember(
"summary", json_summary, document->GetAllocator());
638 Value json_stmt(
RedactCopy(stmt).c_str(), document->GetAllocator());
639 document->AddMember(
"stmt", json_stmt, document->GetAllocator());
640 Value json_plan_text(
RedactCopy(plan).c_str(), document->GetAllocator());
641 document->AddMember(
"plan", json_plan_text, document->GetAllocator());
644 Value json_status(query_status.
ok() ?
"OK" :
646 document->AddMember(
"status", json_status, document->GetAllocator());
647 Value json_id(
PrintId(query_id).c_str(), document->GetAllocator());
648 document->AddMember(
"query_id", json_id, document->GetAllocator());
Status TCatalogObjectFromObjectName(const TCatalogObjectType::type &object_type, const string &object_name, TCatalogObject *catalog_object)
DECLARE_int32(query_log_size)
std::string stmt
SQL statement text.
const std::string GetDetail() const
std::string RedactCopy(const std::string &original)
Utility function to redacted a string without modifying the original.
bool ParseId(const string &s, TUniqueId *id)
void PlanToJson(const vector< TPlanFragment > &fragments, const TExecSummary &summary, rapidjson::Document *document, Value *value)
boost::function< void(const ArgumentMap &args, rapidjson::Document *json)> UrlCallback
int64_t num_complete_fragments
The number of fragments that have completed.
int64_t num_rows_fetched
The number of rows fetched by the client.
const TUniqueId & query_id() const
void RegisterUrlCallback(const std::string &path, const std::string &template_filename, const UrlCallback &callback, bool is_on_nav_bar=true)
Only one callback may be registered per URL.
beeswax::QueryState::type query_state
The state of the query as of this snapshot.
string PrintId(const TUniqueId &id, const string &separator)
std::string PrintTSessionType(const TSessionType::type &type)
Status GetCatalogObject(const TCatalogObject &request, TCatalogObject *response)
TCatalogObjectType::type TCatalogObjectTypeFromName(const string &name)
std::map< std::string, std::string > ArgumentMap
int64_t total_fragments
The total number of fragments.
TStmtType::type stmt_type
DDL, DML etc.
static Status ParseQueryId(const Webserver::ArgumentMap &args, TUniqueId *id)
void PrintExecSummary(const TExecSummary &exec_summary, int indent_level, int new_indent_level, int *node_idx, vector< vector< string > > *result)
TEventSequence event_sequence
Timeline of important query events.
std::string effective_user
Snapshot of a query's state, archived in the query log.
Status GetDbNames(const std::string *pattern, const TSessionState *session, TGetDbsResult *table_names)
std::string default_db
default db for this query
double ToSubsecondUnixTime() const
const ProgressUpdater & progress()
bool has_coord
True if the query required a coordinator fragment.
void PlanToJsonHelper(const map< TPlanNodeId, TPlanNodeExecSummary > &summaries, const vector< TPlanNode > &nodes, vector< TPlanNode >::const_iterator *it, rapidjson::Document *document, Value *value)
Status GetTableNames(const std::string &db, const std::string *pattern, const TSessionState *session, TGetTablesResult *table_names)
TimestampValue start_time
Start and end time of the query.
Status GetAllHadoopConfigs(TGetAllHadoopConfigsResponse *result)
Returns all Hadoop configurations in key, value form in result.
std::string DebugString() const