19 #include <thrift/protocol/TDebugProtocol.h>
20 #include <boost/algorithm/string/join.hpp>
21 #include <boost/accumulators/accumulators.hpp>
22 #include <boost/accumulators/statistics/stats.hpp>
23 #include <boost/accumulators/statistics/min.hpp>
24 #include <boost/accumulators/statistics/mean.hpp>
25 #include <boost/accumulators/statistics/median.hpp>
26 #include <boost/accumulators/statistics/max.hpp>
27 #include <boost/accumulators/statistics/variance.hpp>
28 #include <boost/bind.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/foreach.hpp>
31 #include <boost/lexical_cast.hpp>
32 #include <boost/unordered_set.hpp>
33 #include <boost/algorithm/string/split.hpp>
34 #include <boost/algorithm/string.hpp>
35 #include <gutil/strings/substitute.h>
60 #include "gen-cpp/ImpalaInternalService.h"
61 #include "gen-cpp/ImpalaInternalService_types.h"
62 #include "gen-cpp/Frontend_types.h"
63 #include "gen-cpp/PlanNodes_types.h"
64 #include "gen-cpp/Partitions_types.h"
65 #include "gen-cpp/ImpalaInternalService_constants.h"
69 using namespace apache::thrift;
70 using namespace strings;
71 namespace accumulators = boost::accumulators;
72 using boost::algorithm::iequals;
73 using boost::algorithm::is_any_of;
74 using boost::algorithm::join;
75 using boost::algorithm::token_compress_on;
76 using boost::algorithm::split;
82 DEFINE_bool(insert_inherit_permissions,
false,
"If true, new directories created by "
83 "INSERTs will inherit the permissions of their parent directories");
96 : backend_num(-1), node_id(-1), action(TDebugAction::WAIT),
97 phase(TExecNodePhase::INVALID) {}
142 const TNetworkAddress& coord_address,
143 int backend_num,
const TPlanFragment& fragment,
int fragment_idx,
146 : fragment_instance_id(params.instance_ids[instance_idx]),
147 backend_address(params.hosts[instance_idx]),
149 fragment_idx(fragment_idx),
150 instance_idx(instance_idx),
153 profile_created(false),
154 total_ranges_complete(0) {
156 ss <<
"Instance " <<
PrintId(fragment_instance_id)
157 <<
" (host=" << backend_address <<
")";
159 coord->SetExecPlanFragmentParams(schedule, backend_num, fragment, fragment_idx,
160 params, instance_idx, coord_address, &rpc_params);
161 if (debug_options != NULL) {
162 rpc_params.params.__set_debug_node_id(debug_options->
node_id);
163 rpc_params.params.__set_debug_action(debug_options->
action);
164 rpc_params.params.__set_debug_phase(debug_options->
phase);
166 ComputeTotalSplitSize();
171 void ComputeTotalSplitSize();
176 int64_t GetNodeThroughput(
int plan_node_id);
181 int64_t GetNumScanRangesCompleted(
int plan_node_id);
186 int64_t UpdateNumScanRangesCompleted();
189 void Coordinator::BackendExecState::ComputeTotalSplitSize() {
190 const PerNodeScanRanges& per_node_scan_ranges = rpc_params.params.per_node_scan_ranges;
191 total_split_size = 0;
192 BOOST_FOREACH(
const PerNodeScanRanges::value_type& entry, per_node_scan_ranges) {
193 BOOST_FOREACH(
const TScanRangeParams& scan_range_params, entry.second) {
194 if (!scan_range_params.scan_range.__isset.hdfs_file_split)
continue;
195 total_split_size += scan_range_params.scan_range.hdfs_file_split.length;
200 int64_t Coordinator::BackendExecState::GetNodeThroughput(
int plan_node_id) {
203 lock_guard<mutex> l(lock);
204 CounterMap& throughput_counters = aggregate_counters.throughput_counters;
205 CounterMap::iterator i = throughput_counters.find(plan_node_id);
206 if (i == throughput_counters.end())
return 0;
209 DCHECK(counter != NULL);
211 return counter->
value();
214 int64_t Coordinator::BackendExecState::GetNumScanRangesCompleted(
int plan_node_id) {
217 lock_guard<mutex> l(lock);
218 CounterMap& ranges_complete = aggregate_counters.scan_ranges_complete_counters;
219 CounterMap::iterator i = ranges_complete.find(plan_node_id);
220 if (i == ranges_complete.end())
return 0;
223 DCHECK(counter != NULL);
225 return counter->
value();
228 int64_t Coordinator::BackendExecState::UpdateNumScanRangesCompleted() {
230 CounterMap& complete = aggregate_counters.scan_ranges_complete_counters;
231 for (CounterMap::iterator i = complete.begin(); i != complete.end(); ++i) {
232 total += i->second->value();
234 int64_t delta = total - total_ranges_complete;
235 total_ranges_complete = total;
256 map<int, const char*>::const_iterator entry =
257 _TExecNodePhase_VALUES_TO_NAMES.begin();
258 for (; entry != _TExecNodePhase_VALUES_TO_NAMES.end(); ++entry) {
259 if (iequals(key, (*entry).second)) {
260 return static_cast<TExecNodePhase::type
>(entry->first);
263 return TExecNodePhase::INVALID;
268 map<int, const char*>::const_iterator entry =
269 _TDebugAction_VALUES_TO_NAMES.begin();
270 for (; entry != _TDebugAction_VALUES_TO_NAMES.end(); ++entry) {
271 if (iequals(key, (*entry).second)) {
272 return static_cast<TDebugAction::type
>(entry->first);
275 return TDebugAction::WAIT;
279 const TQueryOptions& query_options,
DebugOptions* debug_options) {
280 DCHECK(debug_options != NULL);
281 if (!query_options.__isset.debug_action || query_options.debug_action.empty()) {
282 debug_options->
phase = TExecNodePhase::INVALID;
285 vector<string> components;
286 split(components, query_options.debug_action, is_any_of(
":"), token_compress_on);
287 if (components.size() < 3 || components.size() > 4)
return;
288 if (components.size() == 3) {
290 debug_options->
node_id = atoi(components[0].c_str());
294 debug_options->
backend_num = atoi(components[0].c_str());
295 debug_options->
node_id = atoi(components[1].c_str());
299 DCHECK(!(debug_options->
phase == TExecNodePhase::CLOSE &&
300 debug_options->
action == TDebugAction::WAIT))
301 <<
"Do not use CLOSE:WAIT debug actions "
302 <<
"because nodes cannot be cancelled in Close()";
306 vector<ExprContext*>* output_expr_ctxs) {
307 const TQueryExecRequest& request = schedule.request();
308 DCHECK_GT(request.fragments.size(), 0);
310 if (needs_finalization_) {
314 VLOG_QUERY <<
"Exec() query_id=" << schedule.query_id();
326 vector<FragmentExecParams>* fragment_exec_params = schedule.exec_params();
332 lock_guard<mutex> l(
lock_);
335 bool has_coordinator_fragment =
336 request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
338 if (has_coordinator_fragment) {
339 executor_.reset(
new PlanFragmentExecutor(
346 TExecPlanFragmentParams rpc_params;
348 (*fragment_exec_params)[0], 0, coord, &rpc_params);
353 DCHECK(output_expr_ctxs != NULL);
358 -1, -1,
"Output exprs",
runtime_state()->instance_mem_tracker(),
false));
367 int64_t query_limit = -1;
368 if (
query_ctx_.request.query_options.__isset.mem_limit &&
369 query_ctx_.request.query_options.mem_limit > 0) {
370 query_limit =
query_ctx_.request.query_options.mem_limit;
383 DebugOptions debug_options;
390 VLOG_QUERY <<
"starting " << schedule.num_backends()
395 StatsMetric<double> latencies(
"fragment-latencies", TUnit::TIME_NS);
396 for (
int fragment_idx = (has_coordinator_fragment ? 1 : 0);
397 fragment_idx < request.fragments.size(); ++fragment_idx) {
398 const FragmentExecParams& params = (*fragment_exec_params)[fragment_idx];
401 int num_hosts = params.hosts.size();
402 DCHECK_GT(num_hosts, 0);
403 for (
int instance_idx = 0; instance_idx < num_hosts; ++instance_idx) {
404 DebugOptions* backend_debug_options =
405 (debug_options.phase != TExecNodePhase::INVALID
406 && (debug_options.backend_num == -1
407 || debug_options.backend_num == backend_num)
411 BackendExecState* exec_state =
412 obj_pool()->
Add(
new BackendExecState(schedule,
this, coord, backend_num,
413 request.fragments[fragment_idx], fragment_idx,
414 params, instance_idx, backend_debug_options,
obj_pool()));
417 VLOG(2) <<
"Exec(): starting instance: fragment_idx=" << fragment_idx
418 <<
" instance_id=" << params.instance_ids[instance_idx];
426 num_hosts, &latencies);
428 if (!fragments_exec_status.ok()) {
433 return fragments_exec_status;
439 latencies.ToHumanReadable());
446 if (has_coordinator_fragment && request.fragments.size() > 1) {
454 progress_ = ProgressUpdater(ss.str(), schedule.num_scan_ranges());
460 lock_guard<mutex> l(
lock_);
466 lock_guard<mutex> l(
lock_);
483 if (instance_id != NULL) {
484 VLOG_QUERY <<
"Query id=" << query_id_ <<
" failed because fragment id="
485 << *instance_id <<
" failed.";
495 int scheme_end = path_str.find(
"://");
497 if (scheme_end != string::npos) {
499 stripped_str = path_str.substr(path_str.find(
"/", scheme_end + 3));
501 stripped_str = path_str;
505 vector<string> components;
506 split(components, stripped_str, is_any_of(
"/"));
510 vector<string> prefixes;
512 stringstream accumulator;
513 BOOST_FOREACH(
const string& component, components) {
514 if (component.empty())
continue;
515 accumulator <<
"/" << component;
516 prefixes.push_back(accumulator.str());
530 short permissions = 0;
531 BOOST_FOREACH(
const string&
path, prefixes) {
532 PermissionCache::const_iterator it = permissions_cache->find(path);
533 if (it == permissions_cache->end()) {
534 hdfsFileInfo* info = hdfsGetPathInfo(fs, path.c_str());
537 permissions_cache->insert(
538 make_pair(path, make_pair(
false, info->mPermissions)));
539 permissions = info->mPermissions;
540 hdfsFreeFileInfo(info, 1);
544 permissions_cache->insert(make_pair(path, make_pair(
true, permissions)));
547 permissions = it->second.second;
554 hdfsFS hdfs_connection;
564 HdfsOperationSet partition_create_ops(&hdfs_connection);
565 DescriptorTbl* descriptor_table;
567 HdfsTableDescriptor* hdfs_table =
static_cast<HdfsTableDescriptor*
>(
569 DCHECK(hdfs_table != NULL) <<
"INSERT target table not known in descriptor table: "
576 "FinalizationTimer"));
579 stringstream part_path_ss;
580 if (partition.second.id == -1) {
585 HdfsPartitionDescriptor* part = hdfs_table->GetPartition(partition.second.id);
586 DCHECK(part != NULL) <<
"Partition " << partition.second.id
587 <<
" not known in descriptor table";
588 part_path_ss << part->location();
590 const string& part_path = part_path_ss.str();
594 if (partition.first.empty()) {
603 hdfsFileInfo* existing_files =
604 hdfsListDirectory(hdfs_connection, part_path.c_str(), &num_files);
605 if (existing_files == NULL) {
608 for (
int i = 0; i < num_files; ++i) {
609 const string filename =
path(existing_files[i].mName).filename().string();
610 if (existing_files[i].mKind == kObjectKindFile && !
IsHiddenFile(filename)) {
611 partition_create_ops.Add(
DELETE, existing_files[i].mName);
614 hdfsFreeFileInfo(existing_files, num_files);
620 if (FLAGS_insert_inherit_permissions) {
623 if (hdfsExists(hdfs_connection, part_path.c_str()) != -1) {
627 partition_create_ops.Add(
CREATE_DIR, part_path);
631 if (FLAGS_insert_inherit_permissions) {
634 if (hdfsExists(hdfs_connection, part_path.c_str()) == -1) {
635 partition_create_ops.Add(
CREATE_DIR, part_path);
642 "FinalizationTimer"));
649 ss <<
"Error(s) deleting partition directories. First error (of "
650 << partition_create_ops.errors().size() <<
") was: " << err.second;
651 return Status(ss.str());
658 HdfsOperationSet move_ops(&hdfs_connection);
659 HdfsOperationSet dir_deletion_ops(&hdfs_connection);
664 if (move.second.empty()) {
665 VLOG_ROW <<
"Deleting file: " << move.first;
666 dir_deletion_ops.Add(
DELETE, move.first);
668 VLOG_ROW <<
"Moving tmp file: " << move.first <<
" to " << move.second;
669 move_ops.Add(
RENAME, move.first, move.second);
677 ss <<
"Error(s) moving partition files. First error (of "
678 << move_ops.errors().size() <<
") was: " << move_ops.errors()[0].second;
679 return Status(ss.str());
686 "FinalizationTimer"));
689 ss <<
"Error(s) deleting staging directories. First error (of "
690 << dir_deletion_ops.errors().size() <<
") was: "
691 << dir_deletion_ops.errors()[0].second;
692 return Status(ss.str());
698 if (FLAGS_insert_inherit_permissions) {
699 HdfsOperationSet chmod_ops(&hdfs_connection);
700 BOOST_FOREACH(
const PermissionCache::value_type& perm, permissions_cache) {
701 bool new_dir = perm.second.first;
703 short permissions = perm.second.second;
704 VLOG_QUERY <<
"INSERT created new directory: " << perm.first
705 <<
", inherited permissions are: " << oct << permissions;
706 chmod_ops.Add(
CHMOD, perm.first, permissions);
711 ss <<
"Error(s) setting permissions on newly created partition directories. First"
712 <<
" error (of " << chmod_ops.errors().size() <<
") was: "
713 << chmod_ops.errors()[0].second;
714 return Status(ss.str());
726 DCHECK(needs_finalization_);
731 if (return_status.ok()) {
735 stringstream staging_dir;
741 VLOG_QUERY <<
"Removing staging directory: " << staging_dir.str();
742 hdfsDelete(hdfs_conn, staging_dir.str().c_str(), 1);
744 return return_status;
748 unique_lock<mutex> l(
lock_);
750 VLOG_QUERY <<
"Coordinator waiting for backends to finish, "
755 VLOG_QUERY <<
"All backends finished successfully.";
757 VLOG_QUERY <<
"All backends finished due to one or more errors.";
773 if (return_status.ok()) {
778 DCHECK(state != NULL);
797 if (!needs_finalization_ && !status.ok())
return status;
801 if (needs_finalization_) {
814 return return_status;
831 Status status =
executor_->GetNext(batch);
838 if (*batch == NULL) {
849 state->stream_mgr()->Cancel(state->fragment_instance_id());
877 double min = accumulators::min(acc);
878 double max = accumulators::max(acc);
879 double mean = accumulators::mean(acc);
880 double stddev = sqrt(accumulators::variance(acc));
889 VLOG_FILE <<
"Byte split for fragment " << i <<
" " << ss.str();
892 if (exec_state->fragment_idx != i)
continue;
893 VLOG_FILE <<
"data volume for ipaddress " << exec_state <<
": "
895 exec_state->total_split_size, TUnit::BYTES);
904 for (
int i = 0; i < request.fragments.size(); ++i) {
905 if (!request.fragments[i].__isset.plan)
continue;
906 const TPlan& plan = request.fragments[i].plan;
909 for (
int j = 0; j < plan.nodes.size(); ++j) {
910 TPlanNodeExecSummary node;
911 node.node_id = plan.nodes[j].node_id;
912 node.fragment_id = i;
913 node.label = plan.nodes[j].label;
914 node.__set_label_detail(plan.nodes[j].label_detail);
915 node.num_children = plan.nodes[j].num_children;
917 if (plan.nodes[j].__isset.estimated_stats) {
918 node.__set_estimated_stats(plan.nodes[j].estimated_stats);
925 if (request.fragments[i].__isset.output_sink &&
926 request.fragments[i].output_sink.type == TDataSinkType::DATA_STREAM_SINK) {
927 const TDataStreamSink& sink = request.fragments[i].output_sink.stream_sink;
929 if (sink.output_partition.type == TPartitionType::UNPARTITIONED) {
933 exec_summary_.exch_to_sender_map[exch_idx] = fragment_first_node_idx;
941 executor_->profile()->set_name(Substitute(
"Coordinator Fragment $0",
942 request.fragments[0].display_name));
948 bool has_coordinator_fragment =
949 request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
951 for (
int i = 0; i < request.fragments.size(); ++i) {
956 if (i == 0 && has_coordinator_fragment) {
963 Substitute(
"Averaged Fragment $0", request.fragments[i].display_name),
true));
976 Substitute(
"Fragment $0", request.fragments[i].display_name)));
984 FragmentInstanceCounters* counters) {
985 vector<RuntimeProfile*> children;
986 profile->GetAllChildren(&children);
987 for (
int i = 0; i < children.size(); ++i) {
988 RuntimeProfile* p = children[i];
992 if (
id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID)
continue;
994 RuntimeProfile::Counter* throughput_counter =
996 if (throughput_counter != NULL) {
997 counters->throughput_counters[id] = throughput_counter;
999 RuntimeProfile::Counter* scan_ranges_counter =
1001 if (scan_ranges_counter != NULL) {
1002 counters->scan_ranges_complete_counters[id] = scan_ranges_counter;
1008 const vector<TPlanFragment>& fragments) {
1009 BOOST_FOREACH(
const TPlanFragment& fragment, fragments) {
1010 if (!fragment.__isset.plan)
continue;
1011 const vector<TPlanNode>& nodes = fragment.plan.nodes;
1012 BOOST_FOREACH(
const TPlanNode& node, nodes) {
1013 if (node.node_type != TPlanNodeType::HDFS_SCAN_NODE
1014 && node.node_type != TPlanNodeType::HBASE_SCAN_NODE) {
1020 << node.node_id <<
") Throughput";
1021 query_profile_->AddDerivedCounter(s.str(), TUnit::BYTES_PER_SECOND,
1023 this, node.node_id));
1026 << node.node_id <<
") Completed scan ranges";
1029 this, node.node_id));
1038 value += exec_state->GetNodeThroughput(node_id);
1042 CounterMap::iterator it = throughput_counters.find(node_id);
1043 if (it != throughput_counters.end()) {
1044 value += it->second->value();
1053 value += exec_state->GetNumScanRangesCompleted(node_id);
1057 CounterMap::iterator it = scan_ranges_complete.find(node_id);
1058 if (it != scan_ranges_complete.end()) {
1059 value += it->second->value();
1065 BackendExecState* exec_state =
reinterpret_cast<BackendExecState*
>(exec_state_arg);
1066 VLOG_FILE <<
"making rpc: ExecPlanFragment query_id=" << query_id_
1067 <<
" instance_id=" << exec_state->fragment_instance_id
1068 <<
" host=" << exec_state->backend_address;
1069 lock_guard<mutex> l(exec_state->lock);
1076 TExecPlanFragmentResult thrift_result;
1077 Status rpc_status = backend_client.DoRpc(&ImpalaInternalServiceClient::ExecPlanFragment,
1078 exec_state->rpc_params, &thrift_result);
1079 if (!rpc_status.ok()) {
1081 msg <<
"ExecPlanRequest rpc query_id=" << query_id_
1082 <<
" instance_id=" << exec_state->fragment_instance_id
1083 <<
" failed: " << rpc_status.msg().msg();
1085 exec_state->status = Status(msg.str());
1089 exec_state->status = thrift_result.status;
1090 if (exec_state->status.ok()) {
1091 exec_state->initiated =
true;
1092 exec_state->stopwatch.Start();
1094 return exec_state->status;
1098 lock_guard<mutex> l(
lock_);
1126 if (exec_state == NULL)
continue;
1131 lock_guard<mutex> l(exec_state->lock);
1134 if (!exec_state->status.ok())
continue;
1137 if (!exec_state->initiated)
continue;
1140 if (exec_state->done)
continue;
1154 TCancelPlanFragmentParams params;
1155 params.protocol_version = ImpalaInternalServiceVersion::V1;
1156 params.__set_fragment_instance_id(exec_state->fragment_instance_id);
1157 TCancelPlanFragmentResult res;
1158 VLOG_QUERY <<
"sending CancelPlanFragment rpc for instance_id="
1159 << exec_state->fragment_instance_id <<
" backend="
1160 << exec_state->backend_address;
1161 Status rpc_status = backend_client.DoRpc(
1162 &ImpalaInternalServiceClient::CancelPlanFragment, params, &res);
1163 if (!rpc_status.ok()) {
1164 exec_state->status.MergeStatus(rpc_status);
1166 msg <<
"CancelPlanFragment rpc query_id=" << query_id_
1167 <<
" instance_id=" << exec_state->fragment_instance_id
1168 <<
" failed: " << rpc_status.msg().msg();
1170 exec_state->status.AddDetail(msg.str());
1174 exec_state->status.AddDetail(join(res.status.error_msgs,
"; "));
1183 VLOG_FILE <<
"UpdateFragmentExecStatus() query_id=" << query_id_
1184 <<
" status=" << params.status.status_code
1185 <<
" done=" << (params.done ?
"true" :
"false");
1187 return Status(TErrorCode::INTERNAL_ERROR,
"unknown backend number");
1191 const TRuntimeProfileTree& cumulative_profile = params.profile;
1192 Status status(params.status);
1194 lock_guard<mutex> l(exec_state->lock);
1200 exec_state->status = status;
1202 exec_state->done = params.done;
1203 if (exec_state->status.ok()) {
1214 exec_state->profile->Update(cumulative_profile);
1217 exec_state->profile->ComputeTimeInProfile();
1220 exec_state->profile);
1222 if (!exec_state->profile_created) {
1225 exec_state->profile_created =
true;
1228 if (params.__isset.error_log && params.error_log.size() > 0) {
1232 VLOG_FILE <<
"instance_id=" << exec_state->fragment_instance_id
1238 if (params.done && params.__isset.insert_exec_status) {
1239 lock_guard<mutex> l(
lock_);
1242 BOOST_FOREACH(
const PartitionStatusMap::value_type& partition,
1243 params.insert_exec_status.per_partition_status) {
1245 status->num_appended_rows += partition.second.num_appended_rows;
1246 status->id = partition.second.id;
1247 if (!status->__isset.stats) status->__set_stats(TInsertStats());
1250 files_to_move_.insert(
1251 params.insert_exec_status.files_to_move.begin(),
1252 params.insert_exec_status.files_to_move.end());
1257 exec_state->profile->PrettyPrint(&s);
1258 VLOG_FILE <<
"profile for query_id=" << query_id_
1259 <<
" instance_id=" << exec_state->fragment_instance_id
1267 VLOG_FILE <<
"cumulative profile for query_id=" << query_id_
1275 UpdateStatus(status, &exec_state->fragment_instance_id);
1280 lock_guard<mutex> l(
lock_);
1281 exec_state->stopwatch.Stop();
1282 DCHECK_GT(num_remaining_backends_, 0);
1283 VLOG_QUERY <<
"Backend " << params.backend_num <<
" completed, "
1284 << num_remaining_backends_ - 1 <<
" remaining: query_id=" <<
query_id_;
1290 lock_guard<mutex> l2(exec_state->lock);
1291 if (!exec_state->done) {
1292 VLOG_QUERY <<
"query_id=" << query_id_ <<
": first in-progress backend: "
1293 << exec_state->backend_address;
1298 if (--num_remaining_backends_ == 0) {
1316 return executor_.get() == NULL ? query_mem_tracker_.get() :
1317 executor_->runtime_state()->query_mem_tracker();
1325 catalog_update->created_partitions.insert(partition.first);
1328 return catalog_update->created_partitions.size() != 0;
1337 a.first->total_time_counter()->value() > b.first->total_time_counter()->value();
1339 } InstanceComparator;
1343 int fragment_idx = backend_exec_state->fragment_idx;
1344 DCHECK_GE(fragment_idx, 0);
1349 data.averaged_profile->UpdateAverage(backend_exec_state->profile);
1350 data.root_profile->AddChild(backend_exec_state->profile);
1355 int fragment_idx = backend_exec_state->fragment_idx;
1356 DCHECK_GE(fragment_idx, 0);
1360 int64_t completion_time = backend_exec_state->stopwatch.ElapsedTime();
1361 data.completion_times(completion_time);
1362 data.rates(backend_exec_state->total_split_size / (completion_time / 1000.0
1363 / 1000.0 / 1000.0));
1368 data.root_profile->AddChild(backend_exec_state->profile);
1372 RuntimeProfile* profile) {
1373 vector<RuntimeProfile*> children;
1374 profile->GetAllChildren(&children);
1377 for (
int i = 0; i < children.size(); ++i) {
1379 if (
id == -1)
continue;
1383 if (exec_summary.exec_stats.empty()) {
1388 DCHECK_LT(instance_idx, exec_summary.exec_stats.size());
1389 TExecStats& stats = exec_summary.exec_stats[instance_idx];
1391 RuntimeProfile::Counter* rows_counter = children[i]->GetCounter(
"RowsReturned");
1392 RuntimeProfile::Counter* mem_counter = children[i]->GetCounter(
"PeakMemoryUsage");
1393 if (rows_counter != NULL) stats.__set_cardinality(rows_counter->value());
1394 if (mem_counter != NULL) stats.__set_memory_used(mem_counter->value());
1395 stats.__set_latency_ns(children[i]->local_time());
1397 exec_summary.__isset.exec_stats =
true;
1417 executor_->profile()->ComputeTimeInProfile();
1431 InstanceComparator comparator;
1438 stringstream times_label;
1441 accumulators::min(completion_times), TUnit::TIME_NS)
1443 accumulators::max(completion_times), TUnit::TIME_NS)
1445 accumulators::mean(completion_times), TUnit::TIME_NS)
1447 sqrt(accumulators::variance(completion_times)), TUnit::TIME_NS);
1449 stringstream rates_label;
1452 accumulators::min(rates), TUnit::BYTES_PER_SECOND)
1454 accumulators::max(rates), TUnit::BYTES_PER_SECOND)
1456 accumulators::mean(rates), TUnit::BYTES_PER_SECOND)
1458 sqrt(accumulators::variance(rates)), TUnit::BYTES_PER_SECOND);
1461 "completion times", times_label.str());
1463 "execution rates", rates_label.str());
1470 typedef boost::unordered_map<TNetworkAddress, int64_t> PerNodePeakMemoryUsage;
1471 PerNodePeakMemoryUsage per_node_peak_mem_usage;
1474 RuntimeProfile::Counter* mem_usage_counter =
1477 if (mem_usage_counter != NULL) {
1479 per_node_peak_mem_usage[coord] = mem_usage_counter->value();
1483 int64_t initial_usage = 0;
1484 int64_t* mem_usage =
FindOrInsert(&per_node_peak_mem_usage,
1486 RuntimeProfile::Counter* mem_usage_counter =
1489 if (mem_usage_counter != NULL && mem_usage_counter->value() > *mem_usage) {
1491 mem_usage_counter->value();
1495 BOOST_FOREACH(PerNodePeakMemoryUsage::value_type entry, per_node_peak_mem_usage) {
1496 info << entry.first <<
"("
1499 query_profile_->AddInfoString(
"Per Node Peak Memory Usage", info.str());
1506 lock_guard<mutex> l(
lock_);
1508 !
executor_->runtime_state()->ErrorLogIsEmpty()) {
1522 QuerySchedule& schedule,
int backend_num,
const TPlanFragment& fragment,
1523 int fragment_idx,
const FragmentExecParams& params,
int instance_idx,
1524 const TNetworkAddress& coord, TExecPlanFragmentParams* rpc_params) {
1525 rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1);
1526 rpc_params->__set_fragment(fragment);
1528 TNetworkAddress exec_host = params.hosts[instance_idx];
1529 if (schedule.HasReservation()) {
1531 TNetworkAddress resource_hostport;
1532 schedule.GetResourceHostport(exec_host, &resource_hostport);
1533 map<TNetworkAddress, llama::TAllocatedResource>::const_iterator it =
1534 schedule.reservation()->allocated_resources.find(resource_hostport);
1538 if (it != schedule.reservation()->allocated_resources.end()) {
1539 rpc_params->__set_reserved_resource(it->second);
1540 rpc_params->__set_local_resource_address(resource_hostport);
1543 rpc_params->params.__set_request_pool(schedule.request_pool());
1544 FragmentScanRangeAssignment::const_iterator it =
1545 params.scan_range_assignment.find(exec_host);
1550 rpc_params->params.__set_per_node_scan_ranges(scan_ranges);
1551 rpc_params->params.__set_per_exch_num_senders(params.per_exch_num_senders);
1552 rpc_params->params.__set_destinations(params.destinations);
1553 rpc_params->params.__set_sender_id(params.sender_id_base + instance_idx);
1554 rpc_params->__isset.params =
true;
1555 rpc_params->fragment_instance_ctx.__set_query_ctx(
query_ctx_);
1556 rpc_params->fragment_instance_ctx.fragment_instance_id =
1557 params.instance_ids[instance_idx];
1558 rpc_params->fragment_instance_ctx.fragment_instance_idx = instance_idx;
1559 rpc_params->fragment_instance_ctx.num_fragment_instances = params.instance_ids.size();
1560 rpc_params->fragment_instance_ctx.backend_num = backend_num;
1561 rpc_params->__isset.fragment_instance_ctx =
true;
virtual int64_t value() const
static boost::shared_ptr< MemTracker > GetQueryMemTracker(const TUniqueId &id, int64_t byte_limit, int64_t rm_reserved_limit, MemTracker *parent, QueryResourceMgr *res_mgr)
TExecNodePhase::type phase
Status UpdateFragmentExecStatus(const TReportExecStatusParams ¶ms)
string PrintErrorMapToString(const ErrorLogMap &errors)
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
RuntimeState * runtime_state()
only valid after calling Exec(), and may return NULL if there is no executor
Status ExecRemoteFragment(void *exec_state)
client RuntimeProfile::EventSequence * events
Status GetNext(RowBatch **batch, RuntimeState *state)
TExecPlanFragmentParams rpc_params
TFinalizeParams finalize_params_
Only valid if needs_finalization is true.
static const std::string TOTAL_THROUGHPUT_COUNTER
Struct for per fragment instance counters that will be aggregated by the coordinator.
ProgressUpdater progress_
Keeps track of number of completed ranges and total scan ranges.
static int GetNodeIdFromProfile(RuntimeProfile *p)
Extract node id from p->name().
boost::scoped_ptr< ObjectPool > obj_pool_
Object pool owned by the coordinator. Any executor will have its own pool.
TExecSummary exec_summary_
void CreateAggregateCounters(const std::vector< TPlanFragment > &fragments)
Create aggregate counters for all scan nodes in any of the fragments.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
void PopulatePathPermissionCache(hdfsFS fs, const std::string &path_str, PermissionCache *permissions_cache)
void SetExecPlanFragmentParams(QuerySchedule &schedule, int backend_num, const TPlanFragment &fragment, int fragment_idx, const FragmentExecParams ¶ms, int instance_idx, const TNetworkAddress &coord, TExecPlanFragmentParams *rpc_params)
Fill in rpc_params based on parameters.
boost::mutex lock_
protects all fields below
#define ADD_TIMER(profile, name)
pair< RuntimeProfile *, bool > Profile
CounterMap throughput_counters
Throughput counters per node.
TDescriptorTable desc_tbl_
copied from TQueryExecRequest; constant across all fragments
DEFINE_bool(insert_inherit_permissions, false,"If true, new directories created by ""INSERTs will inherit the permissions of their parent directories")
FileMoveMap files_to_move_
void UpdateExecSummary(int fragment_idx, int instance_idx, RuntimeProfile *profile)
static void ProcessQueryOptions(const TQueryOptions &query_options, DebugOptions *debug_options)
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
void MergeErrorMaps(ErrorLogMap *left, const ErrorLogMap &right)
const TNetworkAddress backend_address
string PrintId(const TUniqueId &id, const string &separator)
void Cancel(const Status *cause=NULL)
std::vector< BackendExecState * > backend_exec_states_
BackendExecStates owned by obj_pool()
#define ADD_CHILD_TIMER(profile, name, parent)
int64_t ComputeTotalScanRangesComplete(int node_id)
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
Status Exec(QuerySchedule &schedule, std::vector< ExprContext * > *output_expr_ctxs)
void CancelRemoteFragments()
bool PrepareCatalogUpdate(TUpdateCatalogRequest *catalog_update)
ObjectPool * obj_pool()
Returns a local object pool.
FragmentInstanceCounters aggregate_counters
SpinLock exec_summary_lock_
Execution summary for this query.
static HdfsFsCache * instance()
V * FindOrInsert(std::map< K, V > *m, const K &key, const V &default_val)
MemTracker * process_mem_tracker()
bool returned_all_results_
void MarkEvent(const std::string &label)
boost::accumulators::accumulator_set< int64_t, boost::accumulators::features< boost::accumulators::tag::min, boost::accumulators::tag::max, boost::accumulators::tag::mean, boost::accumulators::tag::variance > > SummaryStats
static const std::string SCAN_RANGES_COMPLETE_COUNTER
std::map< TPlanNodeId, std::vector< TScanRangeParams > > PerNodeScanRanges
map from scan node id to a list of scan ranges
std::map< PlanNodeId, RuntimeProfile::Counter * > CounterMap
map from id of a scan node to a specific counter in the node's profile
TExecNodePhase::type GetExecNodePhase(const string &key)
Status GetStatus()
Returns query_status_.
int64_t ComputeTotalThroughput(int node_id)
ObjectPool * obj_pool() const
int num_remaining_backends_
bool needs_finalization_
True if the query needs a post-execution step to tidy up.
HdfsOpThreadPool * hdfs_op_thread_pool()
boost::unordered_map< TPlanNodeId, int > plan_node_id_to_summary_map_
A mapping of plan node ids to index into exec_summary_.nodes.
FragmentInstanceCounters coordinator_counters_
Throughput counters for the coordinator fragment.
static std::string OutputInsertStats(const PartitionStatusMap &stats, const std::string &prefix="")
Outputs the insert stats contained in the map of insert partition updates to a string.
static void MergeInsertStats(const TInsertStats &src_stats, TInsertStats *dst_stats)
RuntimeProfile::Counter * finalization_timer_
Total time spent in finalization (typically 0 except for INSERT into hdfs tables) ...
boost::mutex wait_lock_
ensures single-threaded execution of Wait(); must not hold lock_ when acquiring this ...
const RowDescriptor & row_desc() const
static const Status CANCELLED
CounterMap scan_ranges_complete_counters
Total finished scan ranges per node.
boost::unordered_map< std::string, std::pair< bool, short > > PermissionCache
std::string PrintExecSummary(const TExecSummary &exec_summary)
Print the exec summary as a formatted table.
PartitionStatusMap per_partition_status_
BackendExecState(QuerySchedule &schedule, Coordinator *coord, const TNetworkAddress &coord_address, int backend_num, const TPlanFragment &fragment, int fragment_idx, const FragmentExecParams ¶ms, int instance_idx, DebugOptions *debug_options, ObjectPool *obj_pool)
void InitExecProfile(const TQueryExecRequest &request)
static MemTracker * GetRequestPoolMemTracker(const std::string &pool_name, MemTracker *parent)
boost::shared_ptr< MemTracker > query_mem_tracker_
Status WaitForAllBackends()
boost::condition_variable backend_completion_cv_
bool operator()(const Profile &a, const Profile &b) const
std::vector< PerFragmentProfileData > fragment_profiles_
void UpdateAverageProfile(BackendExecState *backend_exec_state)
TUniqueId fragment_instance_id
string PrintPlanNodeType(const TPlanNodeType::type &type)
const TExecSummary & exec_summary() const
boost::function< void(const Status &status, RuntimeProfile *profile, bool done)> ReportStatusCallback
bool IsHiddenFile(const string &filename)
static Status Create(ObjectPool *pool, const TDescriptorTable &thrift_tbl, DescriptorTbl **tbl)
void ComputeFragmentSummaryStats(BackendExecState *backend_exec_state)
RuntimeProfile::EventSequence * query_events_
Event timeline for this query. Unowned.
MonotonicStopWatch stopwatch
std::string GetErrorLog()
Status UpdateStatus(const Status &status, const TUniqueId *failed_fragment)
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
ImpaladQueryExecutor * executor_
execution state of coordinator fragment
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
ClientConnection< ImpalaInternalServiceClient > ImpalaInternalServiceConnection
void CancelInternal()
Runs cancel logic. Assumes that lock_ is held.
int64_t total_ranges_complete
static const std::string PER_HOST_PEAK_MEM_COUNTER
Name of the counter that is tracking per query, per host peak mem usage.
string GetHdfsErrorMsg(const string &prefix, const string &file)
void ReportQuerySummary()
MemTracker * query_mem_tracker()
TDebugAction::type action
std::pair< const HdfsOp *, std::string > Error
Status FinalizeSuccessfulInsert()
Moves all temporary staging files to their final destinations.
void CollectScanNodeCounters(RuntimeProfile *, FragmentInstanceCounters *result)
std::map< TErrorCode::type, TErrorLogEntry > ErrorLogMap
Tracks log messages per error code.
static Status Exec(Function function, void **args, int num_args, StatsMetric< double > *latencies=NULL)
Callers may pass a StatsMetric to gather the latency distribution of task execution.
boost::scoped_ptr< RuntimeProfile > query_profile_
Aggregate counters for the entire query.
TStmtType::type stmt_type_
copied from TQueryExecRequest, governs when to call ReportQuerySummary
TDebugAction::type GetDebugAction(const string &key)
void Update(int64_t delta)
ImpalaInternalServiceClientCache * impalad_client_cache()