Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
coordinator.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "runtime/coordinator.h"
16 
17 #include <limits>
18 #include <map>
19 #include <thrift/protocol/TDebugProtocol.h>
20 #include <boost/algorithm/string/join.hpp>
21 #include <boost/accumulators/accumulators.hpp>
22 #include <boost/accumulators/statistics/stats.hpp>
23 #include <boost/accumulators/statistics/min.hpp>
24 #include <boost/accumulators/statistics/mean.hpp>
25 #include <boost/accumulators/statistics/median.hpp>
26 #include <boost/accumulators/statistics/max.hpp>
27 #include <boost/accumulators/statistics/variance.hpp>
28 #include <boost/bind.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/foreach.hpp>
31 #include <boost/lexical_cast.hpp>
32 #include <boost/unordered_set.hpp>
33 #include <boost/algorithm/string/split.hpp>
34 #include <boost/algorithm/string.hpp>
35 #include <gutil/strings/substitute.h>
36 
37 #include "common/logging.h"
38 #include "exprs/expr.h"
39 #include "exec/data-sink.h"
40 #include "runtime/client-cache.h"
43 #include "runtime/exec-env.h"
44 #include "runtime/hdfs-fs-cache.h"
46 #include "runtime/row-batch.h"
48 #include "statestore/scheduler.h"
49 #include "exec/data-sink.h"
50 #include "exec/scan-node.h"
51 #include "util/container-util.h"
52 #include "util/debug-util.h"
53 #include "util/error-util.h"
54 #include "util/hdfs-bulk-ops.h"
55 #include "util/hdfs-util.h"
56 #include "util/llama-util.h"
57 #include "util/network-util.h"
58 #include "util/pretty-printer.h"
59 #include "util/summary-util.h"
60 #include "gen-cpp/ImpalaInternalService.h"
61 #include "gen-cpp/ImpalaInternalService_types.h"
62 #include "gen-cpp/Frontend_types.h"
63 #include "gen-cpp/PlanNodes_types.h"
64 #include "gen-cpp/Partitions_types.h"
65 #include "gen-cpp/ImpalaInternalService_constants.h"
66 
67 #include "common/names.h"
68 
69 using namespace apache::thrift;
70 using namespace strings;
71 namespace accumulators = boost::accumulators;
72 using boost::algorithm::iequals;
73 using boost::algorithm::is_any_of;
74 using boost::algorithm::join;
75 using boost::algorithm::token_compress_on;
76 using boost::algorithm::split;
78 
79 DECLARE_int32(be_port);
80 DECLARE_string(hostname);
81 
82 DEFINE_bool(insert_inherit_permissions, false, "If true, new directories created by "
83  "INSERTs will inherit the permissions of their parent directories");
84 
85 namespace impala {
86 
87 // container for debug options in TPlanFragmentExecParams (debug_node, debug_action,
88 // debug_phase)
89 struct DebugOptions {
91  int node_id;
92  TDebugAction::type action;
93  TExecNodePhase::type phase; // INVALID: debug options invalid
94 
96  : backend_num(-1), node_id(-1), action(TDebugAction::WAIT),
97  phase(TExecNodePhase::INVALID) {}
98 };
99 
100 // Execution state of a particular fragment.
101 // Concurrent accesses:
102 // - GetNodeThroughput() called when coordinator's profile is printed
103 // - updates through UpdateFragmentExecStatus()
105  public:
107  MonotonicStopWatch stopwatch; // wall clock timer for this fragment
108  const TNetworkAddress backend_address; // of ImpalaInternalService
109  int64_t total_split_size; // summed up across all splits; in bytes
110 
111  // assembled in c'tor
112  TExecPlanFragmentParams rpc_params;
113 
114  // Fragment idx for this ExecState
116 
117  // The 0-based instance idx.
119 
120  // protects fields below
121  // lock ordering: Coordinator::lock_ can only get obtained *prior*
122  // to lock
123  boost::mutex lock;
124 
125  // if the status indicates an error status, execution of this fragment
126  // has either been aborted by the remote backend (which then reported the error)
127  // or cancellation has been initiated; either way, execution must not be cancelled
129 
130  bool initiated; // if true, TPlanExecRequest rpc has been sent
131  bool done; // if true, execution terminated; do not cancel in that case
132  bool profile_created; // true after the first call to profile->Update()
133  RuntimeProfile* profile; // owned by obj_pool()
134  ErrorLogMap error_log; // errors reported by this backend
135 
136  // Total scan ranges complete across all scan nodes
138 
140 
141  BackendExecState(QuerySchedule& schedule, Coordinator* coord,
142  const TNetworkAddress& coord_address,
143  int backend_num, const TPlanFragment& fragment, int fragment_idx,
144  const FragmentExecParams& params, int instance_idx,
145  DebugOptions* debug_options, ObjectPool* obj_pool)
146  : fragment_instance_id(params.instance_ids[instance_idx]),
147  backend_address(params.hosts[instance_idx]),
148  total_split_size(0),
149  fragment_idx(fragment_idx),
150  instance_idx(instance_idx),
151  initiated(false),
152  done(false),
153  profile_created(false),
154  total_ranges_complete(0) {
155  stringstream ss;
156  ss << "Instance " << PrintId(fragment_instance_id)
157  << " (host=" << backend_address << ")";
158  profile = obj_pool->Add(new RuntimeProfile(obj_pool, ss.str()));
159  coord->SetExecPlanFragmentParams(schedule, backend_num, fragment, fragment_idx,
160  params, instance_idx, coord_address, &rpc_params);
161  if (debug_options != NULL) {
162  rpc_params.params.__set_debug_node_id(debug_options->node_id);
163  rpc_params.params.__set_debug_action(debug_options->action);
164  rpc_params.params.__set_debug_phase(debug_options->phase);
165  }
166  ComputeTotalSplitSize();
167  }
168 
169  // Computes sum of split sizes of leftmost scan. Call only after setting
170  // exec_params.
171  void ComputeTotalSplitSize();
172 
173  // Return value of throughput counter for given plan_node_id, or 0 if that node
174  // doesn't exist.
175  // Thread-safe.
176  int64_t GetNodeThroughput(int plan_node_id);
177 
178  // Return number of completed scan ranges for plan_node_id, or 0 if that node
179  // doesn't exist.
180  // Thread-safe.
181  int64_t GetNumScanRangesCompleted(int plan_node_id);
182 
183  // Updates the total number of scan ranges complete for this fragment. Returns
184  // the delta since the last time this was called.
185  // lock must be taken before calling this.
186  int64_t UpdateNumScanRangesCompleted();
187 };
188 
189 void Coordinator::BackendExecState::ComputeTotalSplitSize() {
190  const PerNodeScanRanges& per_node_scan_ranges = rpc_params.params.per_node_scan_ranges;
191  total_split_size = 0;
192  BOOST_FOREACH(const PerNodeScanRanges::value_type& entry, per_node_scan_ranges) {
193  BOOST_FOREACH(const TScanRangeParams& scan_range_params, entry.second) {
194  if (!scan_range_params.scan_range.__isset.hdfs_file_split) continue;
195  total_split_size += scan_range_params.scan_range.hdfs_file_split.length;
196  }
197  }
198 }
199 
200 int64_t Coordinator::BackendExecState::GetNodeThroughput(int plan_node_id) {
201  RuntimeProfile::Counter* counter = NULL;
202  {
203  lock_guard<mutex> l(lock);
204  CounterMap& throughput_counters = aggregate_counters.throughput_counters;
205  CounterMap::iterator i = throughput_counters.find(plan_node_id);
206  if (i == throughput_counters.end()) return 0;
207  counter = i->second;
208  }
209  DCHECK(counter != NULL);
210  // make sure not to hold lock when calling value() to avoid potential deadlocks
211  return counter->value();
212 }
213 
214 int64_t Coordinator::BackendExecState::GetNumScanRangesCompleted(int plan_node_id) {
215  RuntimeProfile::Counter* counter = NULL;
216  {
217  lock_guard<mutex> l(lock);
218  CounterMap& ranges_complete = aggregate_counters.scan_ranges_complete_counters;
219  CounterMap::iterator i = ranges_complete.find(plan_node_id);
220  if (i == ranges_complete.end()) return 0;
221  counter = i->second;
222  }
223  DCHECK(counter != NULL);
224  // make sure not to hold lock when calling value() to avoid potential deadlocks
225  return counter->value();
226 }
227 
228 int64_t Coordinator::BackendExecState::UpdateNumScanRangesCompleted() {
229  int64_t total = 0;
230  CounterMap& complete = aggregate_counters.scan_ranges_complete_counters;
231  for (CounterMap::iterator i = complete.begin(); i != complete.end(); ++i) {
232  total += i->second->value();
233  }
234  int64_t delta = total - total_ranges_complete;
235  total_ranges_complete = total;
236  DCHECK_GE(delta, 0);
237  return delta;
238 }
239 
240 Coordinator::Coordinator(ExecEnv* exec_env, RuntimeProfile::EventSequence* events)
241  : exec_env_(exec_env),
242  has_called_wait_(false),
243  returned_all_results_(false),
244  executor_(NULL), // Set in Prepare()
245  query_mem_tracker_(), // Set in Exec()
247  obj_pool_(new ObjectPool()),
248  query_events_(events) {
249 }
250 
252  query_mem_tracker_.reset();
253 }
254 
255 TExecNodePhase::type GetExecNodePhase(const string& key) {
256  map<int, const char*>::const_iterator entry =
257  _TExecNodePhase_VALUES_TO_NAMES.begin();
258  for (; entry != _TExecNodePhase_VALUES_TO_NAMES.end(); ++entry) {
259  if (iequals(key, (*entry).second)) {
260  return static_cast<TExecNodePhase::type>(entry->first);
261  }
262  }
263  return TExecNodePhase::INVALID;
264 }
265 
266 // TODO: templatize this
267 TDebugAction::type GetDebugAction(const string& key) {
268  map<int, const char*>::const_iterator entry =
269  _TDebugAction_VALUES_TO_NAMES.begin();
270  for (; entry != _TDebugAction_VALUES_TO_NAMES.end(); ++entry) {
271  if (iequals(key, (*entry).second)) {
272  return static_cast<TDebugAction::type>(entry->first);
273  }
274  }
275  return TDebugAction::WAIT;
276 }
277 
279  const TQueryOptions& query_options, DebugOptions* debug_options) {
280  DCHECK(debug_options != NULL);
281  if (!query_options.__isset.debug_action || query_options.debug_action.empty()) {
282  debug_options->phase = TExecNodePhase::INVALID; // signal not set
283  return;
284  }
285  vector<string> components;
286  split(components, query_options.debug_action, is_any_of(":"), token_compress_on);
287  if (components.size() < 3 || components.size() > 4) return;
288  if (components.size() == 3) {
289  debug_options->backend_num = -1;
290  debug_options->node_id = atoi(components[0].c_str());
291  debug_options->phase = GetExecNodePhase(components[1]);
292  debug_options->action = GetDebugAction(components[2]);
293  } else {
294  debug_options->backend_num = atoi(components[0].c_str());
295  debug_options->node_id = atoi(components[1].c_str());
296  debug_options->phase = GetExecNodePhase(components[2]);
297  debug_options->action = GetDebugAction(components[3]);
298  }
299  DCHECK(!(debug_options->phase == TExecNodePhase::CLOSE &&
300  debug_options->action == TDebugAction::WAIT))
301  << "Do not use CLOSE:WAIT debug actions "
302  << "because nodes cannot be cancelled in Close()";
303 }
304 
305 Status Coordinator::Exec(QuerySchedule& schedule,
306  vector<ExprContext*>* output_expr_ctxs) {
307  const TQueryExecRequest& request = schedule.request();
308  DCHECK_GT(request.fragments.size(), 0);
309  needs_finalization_ = request.__isset.finalize_params;
310  if (needs_finalization_) {
311  finalize_params_ = request.finalize_params;
312  }
313 
314  VLOG_QUERY << "Exec() query_id=" << schedule.query_id();
315  stmt_type_ = request.stmt_type;
316  query_id_ = schedule.query_id();
317  desc_tbl_ = request.desc_tbl;
318  query_ctx_ = request.query_ctx;
319 
320  query_profile_.reset(
321  new RuntimeProfile(obj_pool(), "Execution Profile " + PrintId(query_id_)));
322  finalization_timer_ = ADD_TIMER(query_profile_, "FinalizationTimer");
323 
324  SCOPED_TIMER(query_profile_->total_time_counter());
325 
326  vector<FragmentExecParams>* fragment_exec_params = schedule.exec_params();
327  TNetworkAddress coord = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
328 
329  // to keep things simple, make async Cancel() calls wait until plan fragment
330  // execution has been initiated, otherwise we might try to cancel fragment
331  // execution at backends where it hasn't even started
332  lock_guard<mutex> l(lock_);
333 
334  // we run the root fragment ourselves if it is unpartitioned
335  bool has_coordinator_fragment =
336  request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
337 
338  if (has_coordinator_fragment) {
339  executor_.reset(new PlanFragmentExecutor(
341  // If a coordinator fragment is requested (for most queries this
342  // will be the case, the exception is parallel INSERT queries), start
343  // this before starting any more plan fragments in backend threads,
344  // otherwise they start sending data before the local exchange node
345  // had a chance to register with the stream mgr.
346  TExecPlanFragmentParams rpc_params;
347  SetExecPlanFragmentParams(schedule, 0, request.fragments[0], 0,
348  (*fragment_exec_params)[0], 0, coord, &rpc_params);
349  RETURN_IF_ERROR(executor_->Prepare(rpc_params));
350 
351  // Prepare output_expr_ctxs before optimizing the LLVM module. The other exprs of this
352  // coordinator fragment have been prepared in executor_->Prepare().
353  DCHECK(output_expr_ctxs != NULL);
355  runtime_state()->obj_pool(), request.fragments[0].output_exprs,
356  output_expr_ctxs));
357  MemTracker* output_expr_tracker = runtime_state()->obj_pool()->Add(new MemTracker(
358  -1, -1, "Output exprs", runtime_state()->instance_mem_tracker(), false));
360  *output_expr_ctxs, runtime_state(), row_desc(), output_expr_tracker));
361  } else {
362  // The coordinator instance may require a query mem tracker even if there is no
363  // coordinator fragment. For example, result-caching tracks memory via the query mem
364  // tracker.
365  // If there is a fragment, the fragment executor created above initializes the query
366  // mem tracker. If not, the query mem tracker is created here.
367  int64_t query_limit = -1;
368  if (query_ctx_.request.query_options.__isset.mem_limit &&
369  query_ctx_.request.query_options.mem_limit > 0) {
370  query_limit = query_ctx_.request.query_options.mem_limit;
371  }
372  MemTracker* pool_tracker = MemTracker::GetRequestPoolMemTracker(
373  schedule.request_pool(), exec_env_->process_mem_tracker());
375  MemTracker::GetQueryMemTracker(query_id_, query_limit, -1, pool_tracker, NULL);
376 
377  executor_.reset(NULL);
378  }
379 
380  // Initialize the execution profile structures.
381  InitExecProfile(request);
382 
383  DebugOptions debug_options;
384  ProcessQueryOptions(schedule.query_options(), &debug_options);
385 
386  // start fragment instances from left to right, so that receivers have
387  // Prepare()'d before senders start sending
388  backend_exec_states_.resize(schedule.num_backends());
389  num_remaining_backends_ = schedule.num_backends();
390  VLOG_QUERY << "starting " << schedule.num_backends()
391  << " backends for query " << query_id_;
392 
393  query_events_->MarkEvent("Ready to start remote fragments");
394  int backend_num = 0;
395  StatsMetric<double> latencies("fragment-latencies", TUnit::TIME_NS);
396  for (int fragment_idx = (has_coordinator_fragment ? 1 : 0);
397  fragment_idx < request.fragments.size(); ++fragment_idx) {
398  const FragmentExecParams& params = (*fragment_exec_params)[fragment_idx];
399 
400  // set up exec states
401  int num_hosts = params.hosts.size();
402  DCHECK_GT(num_hosts, 0);
403  for (int instance_idx = 0; instance_idx < num_hosts; ++instance_idx) {
404  DebugOptions* backend_debug_options =
405  (debug_options.phase != TExecNodePhase::INVALID
406  && (debug_options.backend_num == -1
407  || debug_options.backend_num == backend_num)
408  ? &debug_options
409  : NULL);
410  // TODO: pool of pre-formatted BackendExecStates?
411  BackendExecState* exec_state =
412  obj_pool()->Add(new BackendExecState(schedule, this, coord, backend_num,
413  request.fragments[fragment_idx], fragment_idx,
414  params, instance_idx, backend_debug_options, obj_pool()));
415  backend_exec_states_[backend_num] = exec_state;
416  ++backend_num;
417  VLOG(2) << "Exec(): starting instance: fragment_idx=" << fragment_idx
418  << " instance_id=" << params.instance_ids[instance_idx];
419  }
420  fragment_profiles_[fragment_idx].num_instances = num_hosts;
421 
422  // Issue all rpcs in parallel
423  Status fragments_exec_status = ParallelExecutor::Exec(
424  bind<Status>(mem_fn(&Coordinator::ExecRemoteFragment), this, _1),
425  reinterpret_cast<void**>(&backend_exec_states_[backend_num - num_hosts]),
426  num_hosts, &latencies);
427 
428  if (!fragments_exec_status.ok()) {
429  DCHECK(query_status_.ok()); // nobody should have been able to cancel
430  query_status_ = fragments_exec_status;
431  // tear down running fragments and return
432  CancelInternal();
433  return fragments_exec_status;
434  }
435  }
436 
437  query_events_->MarkEvent("Remote fragments started");
438  query_profile_->AddInfoString("Fragment start latencies",
439  latencies.ToHumanReadable());
440 
441  // If we have a coordinator fragment and remote fragments (the common case),
442  // release the thread token on the coordinator fragment. This fragment
443  // spends most of the time waiting and doing very little work. Holding on to
444  // the token causes underutilization of the machine. If there are 12 queries
445  // on this node, that's 12 tokens reserved for no reason.
446  if (has_coordinator_fragment && request.fragments.size() > 1) {
447  executor_->ReleaseThreadToken();
448  }
449 
451 
452  stringstream ss;
453  ss << "Query " << query_id_;
454  progress_ = ProgressUpdater(ss.str(), schedule.num_scan_ranges());
455 
456  return Status::OK;
457 }
458 
459 Status Coordinator::GetStatus() {
460  lock_guard<mutex> l(lock_);
461  return query_status_;
462 }
463 
464 Status Coordinator::UpdateStatus(const Status& status, const TUniqueId* instance_id) {
465  {
466  lock_guard<mutex> l(lock_);
467 
468  // The query is done and we are just waiting for remote fragments to clean up.
469  // Ignore their cancelled updates.
470  if (returned_all_results_ && status.IsCancelled()) return query_status_;
471 
472  // nothing to update
473  if (status.ok()) return query_status_;
474 
475  // don't override an error status; also, cancellation has already started
476  if (!query_status_.ok()) return query_status_;
477 
478  query_status_ = status;
479  CancelInternal();
480  }
481 
482  // Log the id of the fragment that first failed so we can track it down easier.
483  if (instance_id != NULL) {
484  VLOG_QUERY << "Query id=" << query_id_ << " failed because fragment id="
485  << *instance_id << " failed.";
486  }
487 
488  return query_status_;
489 }
490 
491 void Coordinator::PopulatePathPermissionCache(hdfsFS fs, const string& path_str,
492  PermissionCache* permissions_cache) {
493  // Find out if the path begins with a hdfs:// -style prefix, and remove it and the
494  // location (e.g. host:port) if so.
495  int scheme_end = path_str.find("://");
496  string stripped_str;
497  if (scheme_end != string::npos) {
498  // Skip past the subsequent location:port/ prefix.
499  stripped_str = path_str.substr(path_str.find("/", scheme_end + 3));
500  } else {
501  stripped_str = path_str;
502  }
503 
504  // Get the list of path components, used to build all path prefixes.
505  vector<string> components;
506  split(components, stripped_str, is_any_of("/"));
507 
508  // Build a set of all prefixes (including the complete string) of stripped_path. So
509  // /a/b/c/d leads to a vector of: /a, /a/b, /a/b/c, /a/b/c/d
510  vector<string> prefixes;
511  // Stores the current prefix
512  stringstream accumulator;
513  BOOST_FOREACH(const string& component, components) {
514  if (component.empty()) continue;
515  accumulator << "/" << component;
516  prefixes.push_back(accumulator.str());
517  }
518 
519  // Now for each prefix, stat() it to see if a) it exists and b) if so what its
520  // permissions are. When we meet a directory that doesn't exist, we record the fact that
521  // we need to create it, and the permissions of its parent dir to inherit.
522  //
523  // Every prefix is recorded in the PermissionCache so we don't do more than one stat()
524  // for each path. If we need to create the directory, we record it as the pair (true,
525  // perms) so that the caller can identify which directories need their permissions
526  // explicitly set.
527 
528  // Set to the permission of the immediate parent (i.e. the permissions to inherit if the
529  // current dir doesn't exist).
530  short permissions = 0;
531  BOOST_FOREACH(const string& path, prefixes) {
532  PermissionCache::const_iterator it = permissions_cache->find(path);
533  if (it == permissions_cache->end()) {
534  hdfsFileInfo* info = hdfsGetPathInfo(fs, path.c_str());
535  if (info != NULL) {
536  // File exists, so fill the cache with its current permissions.
537  permissions_cache->insert(
538  make_pair(path, make_pair(false, info->mPermissions)));
539  permissions = info->mPermissions;
540  hdfsFreeFileInfo(info, 1);
541  } else {
542  // File doesn't exist, so we need to set its permissions to its immediate parent
543  // once it's been created.
544  permissions_cache->insert(make_pair(path, make_pair(true, permissions)));
545  }
546  } else {
547  permissions = it->second.second;
548  }
549  }
550 }
551 
553  PermissionCache permissions_cache;
554  hdfsFS hdfs_connection;
555  // InsertStmt ensures that all partitions are on the same filesystem as the table's
556  // base directory, so opening a single connection is okay.
557  // TODO: modify this code so that restriction can be lifted.
558  RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
559  finalize_params_.hdfs_base_dir, &hdfs_connection));
560 
561  // INSERT finalization happens in the five following steps
562  // 1. If OVERWRITE, remove all the files in the target directory
563  // 2. Create all the necessary partition directories.
564  HdfsOperationSet partition_create_ops(&hdfs_connection);
565  DescriptorTbl* descriptor_table;
566  DescriptorTbl::Create(obj_pool(), desc_tbl_, &descriptor_table);
567  HdfsTableDescriptor* hdfs_table = static_cast<HdfsTableDescriptor*>(
568  descriptor_table->GetTableDescriptor(finalize_params_.table_id));
569  DCHECK(hdfs_table != NULL) << "INSERT target table not known in descriptor table: "
570  << finalize_params_.table_id;
571 
572  // Loop over all partitions that were updated by this insert, and create the set of
573  // filesystem operations required to create the correct partition structure on disk.
574  BOOST_FOREACH(const PartitionStatusMap::value_type& partition, per_partition_status_) {
575  SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "Overwrite/PartitionCreationTimer",
576  "FinalizationTimer"));
577 
578  // Look up the partition in the descriptor table.
579  stringstream part_path_ss;
580  if (partition.second.id == -1) {
581  // If this is a non-existant partition, use the default partition location of
582  // <base_dir>/part_key_1=val/part_key_2=val/...
583  part_path_ss << finalize_params_.hdfs_base_dir << "/" << partition.first;
584  } else {
585  HdfsPartitionDescriptor* part = hdfs_table->GetPartition(partition.second.id);
586  DCHECK(part != NULL) << "Partition " << partition.second.id
587  << " not known in descriptor table";
588  part_path_ss << part->location();
589  }
590  const string& part_path = part_path_ss.str();
591 
592  // If this is an overwrite insert, we will need to delete any updated partitions
593  if (finalize_params_.is_overwrite) {
594  if (partition.first.empty()) {
595  // If the root directory is written to, then the table must not be partitioned
596  DCHECK(per_partition_status_.size() == 1);
597  // We need to be a little more careful, and only delete data files in the root
598  // because the tmp directories the sink(s) wrote are there also.
599  // So only delete files in the table directory - all files are treated as data
600  // files by Hive and Impala, but directories are ignored (and may legitimately
601  // be used to store permanent non-table data by other applications).
602  int num_files = 0;
603  hdfsFileInfo* existing_files =
604  hdfsListDirectory(hdfs_connection, part_path.c_str(), &num_files);
605  if (existing_files == NULL) {
606  return GetHdfsErrorMsg("Could not list directory: ", part_path);
607  }
608  for (int i = 0; i < num_files; ++i) {
609  const string filename = path(existing_files[i].mName).filename().string();
610  if (existing_files[i].mKind == kObjectKindFile && !IsHiddenFile(filename)) {
611  partition_create_ops.Add(DELETE, existing_files[i].mName);
612  }
613  }
614  hdfsFreeFileInfo(existing_files, num_files);
615  } else {
616  // This is a partition directory, not the root directory; we can delete
617  // recursively with abandon, after checking that it ever existed.
618  // TODO: There's a potential race here between checking for the directory
619  // and a third-party deleting it.
620  if (FLAGS_insert_inherit_permissions) {
621  PopulatePathPermissionCache(hdfs_connection, part_path, &permissions_cache);
622  }
623  if (hdfsExists(hdfs_connection, part_path.c_str()) != -1) {
624  partition_create_ops.Add(DELETE_THEN_CREATE, part_path);
625  } else {
626  // Otherwise just create the directory.
627  partition_create_ops.Add(CREATE_DIR, part_path);
628  }
629  }
630  } else {
631  if (FLAGS_insert_inherit_permissions) {
632  PopulatePathPermissionCache(hdfs_connection, part_path, &permissions_cache);
633  }
634  if (hdfsExists(hdfs_connection, part_path.c_str()) == -1) {
635  partition_create_ops.Add(CREATE_DIR, part_path);
636  }
637  }
638  }
639 
640  {
641  SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "Overwrite/PartitionCreationTimer",
642  "FinalizationTimer"));
643  if (!partition_create_ops.Execute(exec_env_->hdfs_op_thread_pool(), false)) {
644  BOOST_FOREACH(const HdfsOperationSet::Error& err, partition_create_ops.errors()) {
645  // It's ok to ignore errors creating the directories, since they may already
646  // exist. If there are permission errors, we'll run into them later.
647  if (err.first->op() != CREATE_DIR) {
648  stringstream ss;
649  ss << "Error(s) deleting partition directories. First error (of "
650  << partition_create_ops.errors().size() << ") was: " << err.second;
651  return Status(ss.str());
652  }
653  }
654  }
655  }
656 
657  // 3. Move all tmp files
658  HdfsOperationSet move_ops(&hdfs_connection);
659  HdfsOperationSet dir_deletion_ops(&hdfs_connection);
660 
661  BOOST_FOREACH(FileMoveMap::value_type& move, files_to_move_) {
662  // Empty destination means delete, so this is a directory. These get deleted in a
663  // separate pass to ensure that we have moved all the contents of the directory first.
664  if (move.second.empty()) {
665  VLOG_ROW << "Deleting file: " << move.first;
666  dir_deletion_ops.Add(DELETE, move.first);
667  } else {
668  VLOG_ROW << "Moving tmp file: " << move.first << " to " << move.second;
669  move_ops.Add(RENAME, move.first, move.second);
670  }
671  }
672 
673  {
674  SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileMoveTimer", "FinalizationTimer"));
675  if (!move_ops.Execute(exec_env_->hdfs_op_thread_pool(), false)) {
676  stringstream ss;
677  ss << "Error(s) moving partition files. First error (of "
678  << move_ops.errors().size() << ") was: " << move_ops.errors()[0].second;
679  return Status(ss.str());
680  }
681  }
682 
683  // 4. Delete temp directories
684  {
685  SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileDeletionTimer",
686  "FinalizationTimer"));
687  if (!dir_deletion_ops.Execute(exec_env_->hdfs_op_thread_pool(), false)) {
688  stringstream ss;
689  ss << "Error(s) deleting staging directories. First error (of "
690  << dir_deletion_ops.errors().size() << ") was: "
691  << dir_deletion_ops.errors()[0].second;
692  return Status(ss.str());
693  }
694  }
695 
696  // 5. Optionally update the permissions of the created partition directories
697  // Do this last in case we make the dirs unwriteable.
698  if (FLAGS_insert_inherit_permissions) {
699  HdfsOperationSet chmod_ops(&hdfs_connection);
700  BOOST_FOREACH(const PermissionCache::value_type& perm, permissions_cache) {
701  bool new_dir = perm.second.first;
702  if (new_dir) {
703  short permissions = perm.second.second;
704  VLOG_QUERY << "INSERT created new directory: " << perm.first
705  << ", inherited permissions are: " << oct << permissions;
706  chmod_ops.Add(CHMOD, perm.first, permissions);
707  }
708  }
709  if (!chmod_ops.Execute(exec_env_->hdfs_op_thread_pool(), false)) {
710  stringstream ss;
711  ss << "Error(s) setting permissions on newly created partition directories. First"
712  << " error (of " << chmod_ops.errors().size() << ") was: "
713  << chmod_ops.errors()[0].second;
714  return Status(ss.str());
715  }
716  }
717 
718  return Status::OK;
719 }
720 
722  // All backends must have reported their final statuses before finalization, which is a
723  // post-condition of Wait. If the query was not successful, still try to clean up the
724  // staging directory.
725  DCHECK(has_called_wait_);
726  DCHECK(needs_finalization_);
727 
728  VLOG_QUERY << "Finalizing query: " << query_id_;
730  Status return_status = GetStatus();
731  if (return_status.ok()) {
732  return_status = FinalizeSuccessfulInsert();
733  }
734 
735  stringstream staging_dir;
736  DCHECK(finalize_params_.__isset.staging_dir);
737  staging_dir << finalize_params_.staging_dir << "/" << PrintId(query_id_,"_") << "/";
738 
739  hdfsFS hdfs_conn;
740  RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(staging_dir.str(), &hdfs_conn));
741  VLOG_QUERY << "Removing staging directory: " << staging_dir.str();
742  hdfsDelete(hdfs_conn, staging_dir.str().c_str(), 1);
743 
744  return return_status;
745 }
746 
748  unique_lock<mutex> l(lock_);
749  while (num_remaining_backends_ > 0 && query_status_.ok()) {
750  VLOG_QUERY << "Coordinator waiting for backends to finish, "
751  << num_remaining_backends_ << " remaining";
752  backend_completion_cv_.wait(l);
753  }
754  if (query_status_.ok()) {
755  VLOG_QUERY << "All backends finished successfully.";
756  } else {
757  VLOG_QUERY << "All backends finished due to one or more errors.";
758  }
759 
760  return query_status_;
761 }
762 
763 Status Coordinator::Wait() {
764  lock_guard<mutex> l(wait_lock_);
765  SCOPED_TIMER(query_profile_->total_time_counter());
766  if (has_called_wait_) return Status::OK;
767  has_called_wait_ = true;
768  Status return_status = Status::OK;
769  if (executor_.get() != NULL) {
770  // Open() may block
771  return_status = UpdateStatus(executor_->Open(), NULL);
772 
773  if (return_status.ok()) {
774  // If the coordinator fragment has a sink, it will have finished executing at this
775  // point. It's safe therefore to copy the set of files to move and updated
776  // partitions into the query-wide set.
777  RuntimeState* state = runtime_state();
778  DCHECK(state != NULL);
779 
780  // No other backends should have updated these structures if the coordinator has a
781  // fragment. (Backends have a sink only if the coordinator does not)
782  DCHECK_EQ(files_to_move_.size(), 0);
783  DCHECK_EQ(per_partition_status_.size(), 0);
784 
785  // Because there are no other updates, safe to copy the maps rather than merge them.
786  files_to_move_ = *state->hdfs_files_to_move();
787  per_partition_status_ = *state->per_partition_status();
788  }
789  } else {
790  // Query finalization can only happen when all backends have reported
791  // relevant state. They only have relevant state to report in the parallel
792  // INSERT case, otherwise all the relevant state is from the coordinator
793  // fragment which will be available after Open() returns.
794  // Ignore the returned status if finalization is required., since FinalizeQuery() will
795  // pick it up and needs to execute regardless.
796  Status status = WaitForAllBackends();
797  if (!needs_finalization_ && !status.ok()) return status;
798  }
799 
800  // Query finalization is required only for HDFS table sinks
801  if (needs_finalization_) {
803  }
804 
805  if (stmt_type_ == TStmtType::DML) {
806  query_profile_->AddInfoString("Insert Stats",
808  // For DML queries, when Wait is done, the query is complete. Report aggregate
809  // query profiles at this point.
810  // TODO: make sure ReportQuerySummary gets called on error
812  }
813 
814  return return_status;
815 }
816 
817 Status Coordinator::GetNext(RowBatch** batch, RuntimeState* state) {
818  VLOG_ROW << "GetNext() query_id=" << query_id_;
819  DCHECK(has_called_wait_);
820  SCOPED_TIMER(query_profile_->total_time_counter());
821 
822  if (executor_.get() == NULL) {
823  // If there is no local fragment, we produce no output, and execution will
824  // have finished after Wait.
825  *batch = NULL;
826  return GetStatus();
827  }
828 
829  // do not acquire lock_ here, otherwise we could block and prevent an async
830  // Cancel() from proceeding
831  Status status = executor_->GetNext(batch);
832 
833  // if there was an error, we need to return the query's error status rather than
834  // the status we just got back from the local executor (which may well be CANCELLED
835  // in that case). Coordinator fragment failed in this case so we log the query_id.
836  RETURN_IF_ERROR(UpdateStatus(status, &runtime_state()->fragment_instance_id()));
837 
838  if (*batch == NULL) {
839  returned_all_results_ = true;
840  if (executor_->ReachedLimit()) {
841  // We've reached the query limit, cancel the remote fragments. The
842  // Exchange node on our fragment is no longer receiving rows so the
843  // remote fragments must be explicitly cancelled.
845  RuntimeState* state = runtime_state();
846  if (state != NULL) {
847  // Cancel the streams receiving batches. The exchange nodes that would
848  // normally read from the streams are done.
849  state->stream_mgr()->Cancel(state->fragment_instance_id());
850  }
851  }
852 
853  // Don't return final NULL until all backends have completed.
854  // GetNext must wait for all backends to complete before
855  // ultimately signalling the end of execution via a NULL
856  // batch. After NULL is returned, the coordinator may tear down
857  // query state, and perform post-query finalization which might
858  // depend on the reports from all backends.
860  if (query_status_.ok()) {
861  // If the query completed successfully, report aggregate query profiles.
863  }
864  }
865  return Status::OK;
866 }
867 
869  for (int i = 0; i < backend_exec_states_.size(); ++i) {
870  SummaryStats& acc =
871  fragment_profiles_[backend_exec_states_[i]->fragment_idx].bytes_assigned;
872  acc(backend_exec_states_[i]->total_split_size);
873  }
874 
875  for (int i = (executor_.get() == NULL ? 0 : 1); i < fragment_profiles_.size(); ++i) {
876  SummaryStats& acc = fragment_profiles_[i].bytes_assigned;
877  double min = accumulators::min(acc);
878  double max = accumulators::max(acc);
879  double mean = accumulators::mean(acc);
880  double stddev = sqrt(accumulators::variance(acc));
881  stringstream ss;
882  ss << " min: " << PrettyPrinter::Print(min, TUnit::BYTES)
883  << ", max: " << PrettyPrinter::Print(max, TUnit::BYTES)
884  << ", avg: " << PrettyPrinter::Print(mean, TUnit::BYTES)
885  << ", stddev: " << PrettyPrinter::Print(stddev, TUnit::BYTES);
886  fragment_profiles_[i].averaged_profile->AddInfoString("split sizes", ss.str());
887 
888  if (VLOG_FILE_IS_ON) {
889  VLOG_FILE << "Byte split for fragment " << i << " " << ss.str();
890  for (int j = 0; j < backend_exec_states_.size(); ++j) {
891  BackendExecState* exec_state = backend_exec_states_[j];
892  if (exec_state->fragment_idx != i) continue;
893  VLOG_FILE << "data volume for ipaddress " << exec_state << ": "
895  exec_state->total_split_size, TUnit::BYTES);
896  }
897  }
898  }
899 }
900 
901 void Coordinator::InitExecProfile(const TQueryExecRequest& request) {
902  // Initialize the structure to collect execution summary of every plan node.
903  exec_summary_.__isset.nodes = true;
904  for (int i = 0; i < request.fragments.size(); ++i) {
905  if (!request.fragments[i].__isset.plan) continue;
906  const TPlan& plan = request.fragments[i].plan;
907  int fragment_first_node_idx = exec_summary_.nodes.size();
908 
909  for (int j = 0; j < plan.nodes.size(); ++j) {
910  TPlanNodeExecSummary node;
911  node.node_id = plan.nodes[j].node_id;
912  node.fragment_id = i;
913  node.label = plan.nodes[j].label;
914  node.__set_label_detail(plan.nodes[j].label_detail);
915  node.num_children = plan.nodes[j].num_children;
916 
917  if (plan.nodes[j].__isset.estimated_stats) {
918  node.__set_estimated_stats(plan.nodes[j].estimated_stats);
919  }
920 
921  plan_node_id_to_summary_map_[plan.nodes[j].node_id] = exec_summary_.nodes.size();
922  exec_summary_.nodes.push_back(node);
923  }
924 
925  if (request.fragments[i].__isset.output_sink &&
926  request.fragments[i].output_sink.type == TDataSinkType::DATA_STREAM_SINK) {
927  const TDataStreamSink& sink = request.fragments[i].output_sink.stream_sink;
928  int exch_idx = plan_node_id_to_summary_map_[sink.dest_node_id];
929  if (sink.output_partition.type == TPartitionType::UNPARTITIONED) {
930  exec_summary_.nodes[exch_idx].__set_is_broadcast(true);
931  }
932  exec_summary_.__isset.exch_to_sender_map = true;
933  exec_summary_.exch_to_sender_map[exch_idx] = fragment_first_node_idx;
934  }
935  }
936 
937  if (executor_.get() != NULL) {
938  // register coordinator's fragment profile now, before those of the backends,
939  // so it shows up at the top
940  query_profile_->AddChild(executor_->profile());
941  executor_->profile()->set_name(Substitute("Coordinator Fragment $0",
942  request.fragments[0].display_name));
944  }
945 
946  // Initialize the runtime profile structure. This adds the per fragment average
947  // profiles followed by the per fragment instance profiles.
948  bool has_coordinator_fragment =
949  request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
950  fragment_profiles_.resize(request.fragments.size());
951  for (int i = 0; i < request.fragments.size(); ++i) {
952  fragment_profiles_[i].num_instances = 0;
953 
954  // Special case fragment idx 0 if there is a coordinator. There is only one
955  // instance of this profile so the average is just the coordinator profile.
956  if (i == 0 && has_coordinator_fragment) {
957  fragment_profiles_[i].averaged_profile = executor_->profile();
958  fragment_profiles_[i].num_instances = 1;
959  continue;
960  }
961  fragment_profiles_[i].averaged_profile =
962  obj_pool()->Add(new RuntimeProfile(obj_pool(),
963  Substitute("Averaged Fragment $0", request.fragments[i].display_name), true));
964  // Insert the avg profiles in ascending fragment number order. If
965  // there is a coordinator fragment, it's been placed in
966  // fragment_profiles_[0].averaged_profile, ensuring that this code
967  // will put the first averaged profile immediately after it. If
968  // there is no coordinator fragment, the first averaged profile
969  // will be inserted as the first child of query_profile_, and then
970  // all other averaged fragments will follow.
971  query_profile_->AddChild(fragment_profiles_[i].averaged_profile, true,
972  (i > 0) ? fragment_profiles_[i-1].averaged_profile : NULL);
973 
974  fragment_profiles_[i].root_profile =
975  obj_pool()->Add(new RuntimeProfile(obj_pool(),
976  Substitute("Fragment $0", request.fragments[i].display_name)));
977  // Note: we don't start the wall timer here for the fragment
978  // profile; it's uninteresting and misleading.
979  query_profile_->AddChild(fragment_profiles_[i].root_profile);
980  }
981 }
982 
983 void Coordinator::CollectScanNodeCounters(RuntimeProfile* profile,
984  FragmentInstanceCounters* counters) {
985  vector<RuntimeProfile*> children;
986  profile->GetAllChildren(&children);
987  for (int i = 0; i < children.size(); ++i) {
988  RuntimeProfile* p = children[i];
990 
991  // This profile is not for an exec node.
992  if (id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) continue;
993 
994  RuntimeProfile::Counter* throughput_counter =
995  p->GetCounter(ScanNode::TOTAL_THROUGHPUT_COUNTER);
996  if (throughput_counter != NULL) {
997  counters->throughput_counters[id] = throughput_counter;
998  }
999  RuntimeProfile::Counter* scan_ranges_counter =
1001  if (scan_ranges_counter != NULL) {
1002  counters->scan_ranges_complete_counters[id] = scan_ranges_counter;
1003  }
1004  }
1005 }
1006 
1008  const vector<TPlanFragment>& fragments) {
1009  BOOST_FOREACH(const TPlanFragment& fragment, fragments) {
1010  if (!fragment.__isset.plan) continue;
1011  const vector<TPlanNode>& nodes = fragment.plan.nodes;
1012  BOOST_FOREACH(const TPlanNode& node, nodes) {
1013  if (node.node_type != TPlanNodeType::HDFS_SCAN_NODE
1014  && node.node_type != TPlanNodeType::HBASE_SCAN_NODE) {
1015  continue;
1016  }
1017 
1018  stringstream s;
1019  s << PrintPlanNodeType(node.node_type) << " (id="
1020  << node.node_id << ") Throughput";
1021  query_profile_->AddDerivedCounter(s.str(), TUnit::BYTES_PER_SECOND,
1022  bind<int64_t>(mem_fn(&Coordinator::ComputeTotalThroughput),
1023  this, node.node_id));
1024  s.str("");
1025  s << PrintPlanNodeType(node.node_type) << " (id="
1026  << node.node_id << ") Completed scan ranges";
1027  query_profile_->AddDerivedCounter(s.str(), TUnit::UNIT,
1028  bind<int64_t>(mem_fn(&Coordinator::ComputeTotalScanRangesComplete),
1029  this, node.node_id));
1030  }
1031  }
1032 }
1033 
1034 int64_t Coordinator::ComputeTotalThroughput(int node_id) {
1035  int64_t value = 0;
1036  for (int i = 0; i < backend_exec_states_.size(); ++i) {
1037  BackendExecState* exec_state = backend_exec_states_[i];
1038  value += exec_state->GetNodeThroughput(node_id);
1039  }
1040  // Add up the local fragment throughput counter
1041  CounterMap& throughput_counters = coordinator_counters_.throughput_counters;
1042  CounterMap::iterator it = throughput_counters.find(node_id);
1043  if (it != throughput_counters.end()) {
1044  value += it->second->value();
1045  }
1046  return value;
1047 }
1048 
1049 int64_t Coordinator::ComputeTotalScanRangesComplete(int node_id) {
1050  int64_t value = 0;
1051  for (int i = 0; i < backend_exec_states_.size(); ++i) {
1052  BackendExecState* exec_state = backend_exec_states_[i];
1053  value += exec_state->GetNumScanRangesCompleted(node_id);
1054  }
1055  // Add up the local fragment throughput counter
1057  CounterMap::iterator it = scan_ranges_complete.find(node_id);
1058  if (it != scan_ranges_complete.end()) {
1059  value += it->second->value();
1060  }
1061  return value;
1062 }
1063 
1064 Status Coordinator::ExecRemoteFragment(void* exec_state_arg) {
1065  BackendExecState* exec_state = reinterpret_cast<BackendExecState*>(exec_state_arg);
1066  VLOG_FILE << "making rpc: ExecPlanFragment query_id=" << query_id_
1067  << " instance_id=" << exec_state->fragment_instance_id
1068  << " host=" << exec_state->backend_address;
1069  lock_guard<mutex> l(exec_state->lock);
1070 
1071  Status status;
1072  ImpalaInternalServiceConnection backend_client(
1073  exec_env_->impalad_client_cache(), exec_state->backend_address, &status);
1074  RETURN_IF_ERROR(status);
1075 
1076  TExecPlanFragmentResult thrift_result;
1077  Status rpc_status = backend_client.DoRpc(&ImpalaInternalServiceClient::ExecPlanFragment,
1078  exec_state->rpc_params, &thrift_result);
1079  if (!rpc_status.ok()) {
1080  stringstream msg;
1081  msg << "ExecPlanRequest rpc query_id=" << query_id_
1082  << " instance_id=" << exec_state->fragment_instance_id
1083  << " failed: " << rpc_status.msg().msg();
1084  VLOG_QUERY << msg.str();
1085  exec_state->status = Status(msg.str());
1086  return status;
1087  }
1088 
1089  exec_state->status = thrift_result.status;
1090  if (exec_state->status.ok()) {
1091  exec_state->initiated = true;
1092  exec_state->stopwatch.Start();
1093  }
1094  return exec_state->status;
1095 }
1096 
1097 void Coordinator::Cancel(const Status* cause) {
1098  lock_guard<mutex> l(lock_);
1099  // if the query status indicates an error, cancellation has already been initiated
1100  if (!query_status_.ok()) return;
1101  // prevent others from cancelling a second time
1102  query_status_ = (cause != NULL && !cause->ok()) ? *cause : Status::CANCELLED;
1103  CancelInternal();
1104 }
1105 
1107  VLOG_QUERY << "Cancel() query_id=" << query_id_;
1108  DCHECK(!query_status_.ok());
1109 
1110  // cancel local fragment
1111  if (executor_.get() != NULL) executor_->Cancel();
1112 
1114 
1115  // Report the summary with whatever progress the query made before being cancelled.
1117 }
1118 
1120  for (int i = 0; i < backend_exec_states_.size(); ++i) {
1121  BackendExecState* exec_state = backend_exec_states_[i];
1122 
1123  // If a fragment failed before we finished issuing all remote fragments,
1124  // this function will have been called before we finished populating
1125  // backend_exec_states_. Skip any such uninitialized exec states.
1126  if (exec_state == NULL) continue;
1127 
1128  // lock each exec_state individually to synchronize correctly with
1129  // UpdateFragmentExecStatus() (which doesn't get the global lock_
1130  // to set its status)
1131  lock_guard<mutex> l(exec_state->lock);
1132 
1133  // no need to cancel if we already know it terminated w/ an error status
1134  if (!exec_state->status.ok()) continue;
1135 
1136  // Nothing to cancel if the exec rpc was not sent
1137  if (!exec_state->initiated) continue;
1138 
1139  // don't cancel if it already finished
1140  if (exec_state->done) continue;
1141 
1142  // set an error status to make sure we only cancel this once
1143  exec_state->status = Status::CANCELLED;
1144 
1145  // if we get an error while trying to get a connection to the backend,
1146  // keep going
1147  Status status;
1148  ImpalaInternalServiceConnection backend_client(
1149  exec_env_->impalad_client_cache(), exec_state->backend_address, &status);
1150  if (!status.ok()) {
1151  continue;
1152  }
1153 
1154  TCancelPlanFragmentParams params;
1155  params.protocol_version = ImpalaInternalServiceVersion::V1;
1156  params.__set_fragment_instance_id(exec_state->fragment_instance_id);
1157  TCancelPlanFragmentResult res;
1158  VLOG_QUERY << "sending CancelPlanFragment rpc for instance_id="
1159  << exec_state->fragment_instance_id << " backend="
1160  << exec_state->backend_address;
1161  Status rpc_status = backend_client.DoRpc(
1162  &ImpalaInternalServiceClient::CancelPlanFragment, params, &res);
1163  if (!rpc_status.ok()) {
1164  exec_state->status.MergeStatus(rpc_status);
1165  stringstream msg;
1166  msg << "CancelPlanFragment rpc query_id=" << query_id_
1167  << " instance_id=" << exec_state->fragment_instance_id
1168  << " failed: " << rpc_status.msg().msg();
1169  // make a note of the error status, but keep on cancelling the other fragments
1170  exec_state->status.AddDetail(msg.str());
1171  continue;
1172  }
1173  if (res.status.status_code != TErrorCode::OK) {
1174  exec_state->status.AddDetail(join(res.status.error_msgs, "; "));
1175  }
1176  }
1177 
1178  // notify that we completed with an error
1179  backend_completion_cv_.notify_all();
1180 }
1181 
1182 Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& params) {
1183  VLOG_FILE << "UpdateFragmentExecStatus() query_id=" << query_id_
1184  << " status=" << params.status.status_code
1185  << " done=" << (params.done ? "true" : "false");
1186  if (params.backend_num >= backend_exec_states_.size()) {
1187  return Status(TErrorCode::INTERNAL_ERROR, "unknown backend number");
1188  }
1189  BackendExecState* exec_state = backend_exec_states_[params.backend_num];
1190 
1191  const TRuntimeProfileTree& cumulative_profile = params.profile;
1192  Status status(params.status);
1193  {
1194  lock_guard<mutex> l(exec_state->lock);
1195  if (!status.ok()) {
1196  // During query cancellation, exec_state is set to CANCELLED. However, we might
1197  // process a non-error message from a fragment executor that is sent
1198  // before query cancellation is invoked. Make sure we don't go from error status to
1199  // OK.
1200  exec_state->status = status;
1201  }
1202  exec_state->done = params.done;
1203  if (exec_state->status.ok()) {
1204  // We can't update this backend's profile if ReportQuerySummary() is running,
1205  // because it depends on all profiles not changing during its execution (when it
1206  // calls SortChildren()). ReportQuerySummary() only gets called after
1207  // WaitForAllBackends() returns or at the end of CancelRemoteFragments().
1208  // WaitForAllBackends() only returns after all backends have completed (in which
1209  // case we wouldn't be in this function), or when there's an error, in which case
1210  // CancelRemoteFragments() is called. CancelRemoteFragments sets all exec_state's
1211  // statuses to cancelled.
1212  // TODO: We're losing this profile information. Call ReportQuerySummary only after
1213  // all backends have completed.
1214  exec_state->profile->Update(cumulative_profile);
1215 
1216  // Update the average profile for the fragment corresponding to this instance.
1217  exec_state->profile->ComputeTimeInProfile();
1218  UpdateAverageProfile(exec_state);
1219  UpdateExecSummary(exec_state->fragment_idx, exec_state->instance_idx,
1220  exec_state->profile);
1221  }
1222  if (!exec_state->profile_created) {
1223  CollectScanNodeCounters(exec_state->profile, &exec_state->aggregate_counters);
1224  }
1225  exec_state->profile_created = true;
1226 
1227  // Log messages aggregated by type
1228  if (params.__isset.error_log && params.error_log.size() > 0) {
1229  // Append the log messages from each update with the global state of the query
1230  // execution
1231  MergeErrorMaps(&exec_state->error_log, params.error_log);
1232  VLOG_FILE << "instance_id=" << exec_state->fragment_instance_id
1233  << " error log: " << PrintErrorMapToString(exec_state->error_log);
1234  }
1235  progress_.Update(exec_state->UpdateNumScanRangesCompleted());
1236  }
1237 
1238  if (params.done && params.__isset.insert_exec_status) {
1239  lock_guard<mutex> l(lock_);
1240  // Merge in table update data (partitions written to, files to be moved as part of
1241  // finalization)
1242  BOOST_FOREACH(const PartitionStatusMap::value_type& partition,
1243  params.insert_exec_status.per_partition_status) {
1244  TInsertPartitionStatus* status = &(per_partition_status_[partition.first]);
1245  status->num_appended_rows += partition.second.num_appended_rows;
1246  status->id = partition.second.id;
1247  if (!status->__isset.stats) status->__set_stats(TInsertStats());
1248  DataSink::MergeInsertStats(partition.second.stats, &status->stats);
1249  }
1250  files_to_move_.insert(
1251  params.insert_exec_status.files_to_move.begin(),
1252  params.insert_exec_status.files_to_move.end());
1253  }
1254 
1255  if (VLOG_FILE_IS_ON) {
1256  stringstream s;
1257  exec_state->profile->PrettyPrint(&s);
1258  VLOG_FILE << "profile for query_id=" << query_id_
1259  << " instance_id=" << exec_state->fragment_instance_id
1260  << "\n" << s.str();
1261  }
1262  // also print the cumulative profile
1263  // TODO: fix the coordinator/PlanFragmentExecutor, so this isn't needed
1264  if (VLOG_FILE_IS_ON) {
1265  stringstream s;
1266  query_profile_->PrettyPrint(&s);
1267  VLOG_FILE << "cumulative profile for query_id=" << query_id_
1268  << "\n" << s.str();
1269  }
1270 
1271  // for now, abort the query if we see any error except if the error is cancelled
1272  // and returned_all_results_ is true.
1273  // (UpdateStatus() initiates cancellation, if it hasn't already been initiated)
1274  if (!(returned_all_results_ && status.IsCancelled()) && !status.ok()) {
1275  UpdateStatus(status, &exec_state->fragment_instance_id);
1276  return Status::OK;
1277  }
1278 
1279  if (params.done) {
1280  lock_guard<mutex> l(lock_);
1281  exec_state->stopwatch.Stop();
1282  DCHECK_GT(num_remaining_backends_, 0);
1283  VLOG_QUERY << "Backend " << params.backend_num << " completed, "
1284  << num_remaining_backends_ - 1 << " remaining: query_id=" << query_id_;
1285  if (VLOG_QUERY_IS_ON && num_remaining_backends_ > 1) {
1286  // print host/port info for the first backend that's still in progress as a
1287  // debugging aid for backend deadlocks
1288  for (int i = 0; i < backend_exec_states_.size(); ++i) {
1289  BackendExecState* exec_state = backend_exec_states_[i];
1290  lock_guard<mutex> l2(exec_state->lock);
1291  if (!exec_state->done) {
1292  VLOG_QUERY << "query_id=" << query_id_ << ": first in-progress backend: "
1293  << exec_state->backend_address;
1294  break;
1295  }
1296  }
1297  }
1298  if (--num_remaining_backends_ == 0) {
1299  backend_completion_cv_.notify_all();
1300  }
1301  }
1302 
1303  return Status::OK;
1304 }
1305 
1306 const RowDescriptor& Coordinator::row_desc() const {
1307  DCHECK(executor_.get() != NULL);
1308  return executor_->row_desc();
1309 }
1310 
1311 RuntimeState* Coordinator::runtime_state() {
1312  return executor_.get() == NULL ? NULL : executor_->runtime_state();
1313 }
1314 
1315 MemTracker* Coordinator::query_mem_tracker() {
1316  return executor_.get() == NULL ? query_mem_tracker_.get() :
1317  executor_->runtime_state()->query_mem_tracker();
1318 }
1319 
1320 bool Coordinator::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) {
1321  // Assume we are called only after all fragments have completed
1322  DCHECK(has_called_wait_);
1323 
1324  BOOST_FOREACH(const PartitionStatusMap::value_type& partition, per_partition_status_) {
1325  catalog_update->created_partitions.insert(partition.first);
1326  }
1327 
1328  return catalog_update->created_partitions.size() != 0;
1329 }
1330 
1331 // Comparator to order fragments by descending total time
1332 typedef struct {
1333  typedef pair<RuntimeProfile*, bool> Profile;
1334  bool operator()(const Profile& a, const Profile& b) const {
1335  // Reverse ordering: we want the longest first
1336  return
1337  a.first->total_time_counter()->value() > b.first->total_time_counter()->value();
1338  }
1339 } InstanceComparator;
1340 
1341 // Update fragment average profile information from a backend execution state.
1342 void Coordinator::UpdateAverageProfile(BackendExecState* backend_exec_state) {
1343  int fragment_idx = backend_exec_state->fragment_idx;
1344  DCHECK_GE(fragment_idx, 0);
1345  DCHECK_LT(fragment_idx, fragment_profiles_.size());
1346  PerFragmentProfileData& data = fragment_profiles_[fragment_idx];
1347 
1348  // No locks are taken since UpdateAverage() and AddChild() take their own locks
1349  data.averaged_profile->UpdateAverage(backend_exec_state->profile);
1350  data.root_profile->AddChild(backend_exec_state->profile);
1351 }
1352 
1353 // Compute fragment summary information from a backend execution state.
1354 void Coordinator::ComputeFragmentSummaryStats(BackendExecState* backend_exec_state) {
1355  int fragment_idx = backend_exec_state->fragment_idx;
1356  DCHECK_GE(fragment_idx, 0);
1357  DCHECK_LT(fragment_idx, fragment_profiles_.size());
1358  PerFragmentProfileData& data = fragment_profiles_[fragment_idx];
1359 
1360  int64_t completion_time = backend_exec_state->stopwatch.ElapsedTime();
1361  data.completion_times(completion_time);
1362  data.rates(backend_exec_state->total_split_size / (completion_time / 1000.0
1363  / 1000.0 / 1000.0));
1364 
1365  // Add the child in case it has not been added previously
1366  // via UpdateAverageProfile(). AddChild() will do nothing if the child
1367  // already exists.
1368  data.root_profile->AddChild(backend_exec_state->profile);
1369 }
1370 
1371 void Coordinator::UpdateExecSummary(int fragment_idx, int instance_idx,
1372  RuntimeProfile* profile) {
1373  vector<RuntimeProfile*> children;
1374  profile->GetAllChildren(&children);
1375 
1376  lock_guard<SpinLock> l(exec_summary_lock_);
1377  for (int i = 0; i < children.size(); ++i) {
1378  int id = ExecNode::GetNodeIdFromProfile(children[i]);
1379  if (id == -1) continue;
1380 
1381  TPlanNodeExecSummary& exec_summary =
1383  if (exec_summary.exec_stats.empty()) {
1384  // First time, make an exec_stats for each instance this plan node is running on.
1385  DCHECK_LT(fragment_idx, fragment_profiles_.size());
1386  exec_summary.exec_stats.resize(fragment_profiles_[fragment_idx].num_instances);
1387  }
1388  DCHECK_LT(instance_idx, exec_summary.exec_stats.size());
1389  TExecStats& stats = exec_summary.exec_stats[instance_idx];
1390 
1391  RuntimeProfile::Counter* rows_counter = children[i]->GetCounter("RowsReturned");
1392  RuntimeProfile::Counter* mem_counter = children[i]->GetCounter("PeakMemoryUsage");
1393  if (rows_counter != NULL) stats.__set_cardinality(rows_counter->value());
1394  if (mem_counter != NULL) stats.__set_memory_used(mem_counter->value());
1395  stats.__set_latency_ns(children[i]->local_time());
1396  // TODO: we don't track cpu time per node now. Do that.
1397  exec_summary.__isset.exec_stats = true;
1398  }
1399  VLOG(2) << PrintExecSummary(exec_summary_);
1400 }
1401 
1402 // This function appends summary information to the query_profile_ before
1403 // outputting it to VLOG. It adds:
1404 // 1. Averaged remote fragment profiles (TODO: add outliers)
1405 // 2. Summary of remote fragment durations (min, max, mean, stddev)
1406 // 3. Summary of remote fragment rates (min, max, mean, stddev)
1407 // TODO: add histogram/percentile
1409  // In this case, the query did not even get to start on all the remote nodes,
1410  // some of the state that is used below might be uninitialized. In this case,
1411  // the query has made so little progress, reporting a summary is not very useful.
1412  if (!has_called_wait_) return;
1413 
1414  // The fragment has finished executing. Update the profile to compute the
1415  // fraction of time spent in each node.
1416  if (executor_.get() != NULL) {
1417  executor_->profile()->ComputeTimeInProfile();
1418  UpdateExecSummary(0, 0, executor_->profile());
1419  }
1420 
1421  if (!backend_exec_states_.empty()) {
1422  // Average all remote fragments for each fragment.
1423  for (int i = 0; i < backend_exec_states_.size(); ++i) {
1424  backend_exec_states_[i]->profile->ComputeTimeInProfile();
1427  UpdateExecSummary(backend_exec_states_[i]->fragment_idx,
1428  backend_exec_states_[i]->instance_idx, backend_exec_states_[i]->profile);
1429  }
1430 
1431  InstanceComparator comparator;
1432  // Per fragment instances have been collected, output summaries
1433  for (int i = (executor_.get() != NULL ? 1 : 0); i < fragment_profiles_.size(); ++i) {
1434  fragment_profiles_[i].root_profile->SortChildren(comparator);
1435  SummaryStats& completion_times = fragment_profiles_[i].completion_times;
1436  SummaryStats& rates = fragment_profiles_[i].rates;
1437 
1438  stringstream times_label;
1439  times_label
1440  << "min:" << PrettyPrinter::Print(
1441  accumulators::min(completion_times), TUnit::TIME_NS)
1442  << " max:" << PrettyPrinter::Print(
1443  accumulators::max(completion_times), TUnit::TIME_NS)
1444  << " mean: " << PrettyPrinter::Print(
1445  accumulators::mean(completion_times), TUnit::TIME_NS)
1446  << " stddev:" << PrettyPrinter::Print(
1447  sqrt(accumulators::variance(completion_times)), TUnit::TIME_NS);
1448 
1449  stringstream rates_label;
1450  rates_label
1451  << "min:" << PrettyPrinter::Print(
1452  accumulators::min(rates), TUnit::BYTES_PER_SECOND)
1453  << " max:" << PrettyPrinter::Print(
1454  accumulators::max(rates), TUnit::BYTES_PER_SECOND)
1455  << " mean:" << PrettyPrinter::Print(
1456  accumulators::mean(rates), TUnit::BYTES_PER_SECOND)
1457  << " stddev:" << PrettyPrinter::Print(
1458  sqrt(accumulators::variance(rates)), TUnit::BYTES_PER_SECOND);
1459 
1460  fragment_profiles_[i].averaged_profile->AddInfoString(
1461  "completion times", times_label.str());
1462  fragment_profiles_[i].averaged_profile->AddInfoString(
1463  "execution rates", rates_label.str());
1464  fragment_profiles_[i].averaged_profile->AddInfoString(
1465  "num instances", lexical_cast<string>(fragment_profiles_[i].num_instances));
1466  }
1467 
1468  // Add per node peak memory usage as InfoString
1469  // Map from Impalad address to peak memory usage of this query
1470  typedef boost::unordered_map<TNetworkAddress, int64_t> PerNodePeakMemoryUsage;
1471  PerNodePeakMemoryUsage per_node_peak_mem_usage;
1472  if (executor_.get() != NULL) {
1473  // Coordinator fragment is not included in backend_exec_states_.
1474  RuntimeProfile::Counter* mem_usage_counter =
1475  executor_->profile()->GetCounter(
1477  if (mem_usage_counter != NULL) {
1478  TNetworkAddress coord = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
1479  per_node_peak_mem_usage[coord] = mem_usage_counter->value();
1480  }
1481  }
1482  for (int i = 0; i < backend_exec_states_.size(); ++i) {
1483  int64_t initial_usage = 0;
1484  int64_t* mem_usage = FindOrInsert(&per_node_peak_mem_usage,
1485  backend_exec_states_[i]->backend_address, initial_usage);
1486  RuntimeProfile::Counter* mem_usage_counter =
1487  backend_exec_states_[i]->profile->GetCounter(
1489  if (mem_usage_counter != NULL && mem_usage_counter->value() > *mem_usage) {
1490  per_node_peak_mem_usage[backend_exec_states_[i]->backend_address] =
1491  mem_usage_counter->value();
1492  }
1493  }
1494  stringstream info;
1495  BOOST_FOREACH(PerNodePeakMemoryUsage::value_type entry, per_node_peak_mem_usage) {
1496  info << entry.first << "("
1497  << PrettyPrinter::Print(entry.second, TUnit::BYTES) << ") ";
1498  }
1499  query_profile_->AddInfoString("Per Node Peak Memory Usage", info.str());
1500  }
1501 }
1502 
1503 string Coordinator::GetErrorLog() {
1504  ErrorLogMap merged;
1505  {
1506  lock_guard<mutex> l(lock_);
1507  if (executor_.get() != NULL && executor_->runtime_state() != NULL &&
1508  !executor_->runtime_state()->ErrorLogIsEmpty()) {
1509  MergeErrorMaps(&merged, executor_->runtime_state()->error_log());
1510  }
1511  }
1512  for (int i = 0; i < backend_exec_states_.size(); ++i) {
1513  lock_guard<mutex> l(backend_exec_states_[i]->lock);
1514  if (backend_exec_states_[i]->error_log.size() > 0) {
1515  MergeErrorMaps(&merged, backend_exec_states_[i]->error_log);
1516  }
1517  }
1518  return PrintErrorMapToString(merged);
1519 }
1520 
1522  QuerySchedule& schedule, int backend_num, const TPlanFragment& fragment,
1523  int fragment_idx, const FragmentExecParams& params, int instance_idx,
1524  const TNetworkAddress& coord, TExecPlanFragmentParams* rpc_params) {
1525  rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1);
1526  rpc_params->__set_fragment(fragment);
1527  rpc_params->__set_desc_tbl(desc_tbl_);
1528  TNetworkAddress exec_host = params.hosts[instance_idx];
1529  if (schedule.HasReservation()) {
1530  // The reservation has already have been validated at this point.
1531  TNetworkAddress resource_hostport;
1532  schedule.GetResourceHostport(exec_host, &resource_hostport);
1533  map<TNetworkAddress, llama::TAllocatedResource>::const_iterator it =
1534  schedule.reservation()->allocated_resources.find(resource_hostport);
1535  // Only set reserved resource if we actually have one for this plan
1536  // fragment. Otherwise, don't set it (usually this the coordinator fragment), and it
1537  // won't participate in dynamic RM controls.
1538  if (it != schedule.reservation()->allocated_resources.end()) {
1539  rpc_params->__set_reserved_resource(it->second);
1540  rpc_params->__set_local_resource_address(resource_hostport);
1541  }
1542  }
1543  rpc_params->params.__set_request_pool(schedule.request_pool());
1544  FragmentScanRangeAssignment::const_iterator it =
1545  params.scan_range_assignment.find(exec_host);
1546  // Scan ranges may not always be set, so use an empty structure if so.
1547  const PerNodeScanRanges& scan_ranges =
1548  (it != params.scan_range_assignment.end()) ? it->second : PerNodeScanRanges();
1549 
1550  rpc_params->params.__set_per_node_scan_ranges(scan_ranges);
1551  rpc_params->params.__set_per_exch_num_senders(params.per_exch_num_senders);
1552  rpc_params->params.__set_destinations(params.destinations);
1553  rpc_params->params.__set_sender_id(params.sender_id_base + instance_idx);
1554  rpc_params->__isset.params = true;
1555  rpc_params->fragment_instance_ctx.__set_query_ctx(query_ctx_);
1556  rpc_params->fragment_instance_ctx.fragment_instance_id =
1557  params.instance_ids[instance_idx];
1558  rpc_params->fragment_instance_ctx.fragment_instance_idx = instance_idx;
1559  rpc_params->fragment_instance_ctx.num_fragment_instances = params.instance_ids.size();
1560  rpc_params->fragment_instance_ctx.backend_num = backend_num;
1561  rpc_params->__isset.fragment_instance_ctx = true;
1562 }
1563 
1564 }
virtual int64_t value() const
static boost::shared_ptr< MemTracker > GetQueryMemTracker(const TUniqueId &id, int64_t byte_limit, int64_t rm_reserved_limit, MemTracker *parent, QueryResourceMgr *res_mgr)
Definition: mem-tracker.cc:156
TExecNodePhase::type phase
Definition: coordinator.cc:93
Status UpdateFragmentExecStatus(const TReportExecStatusParams &params)
string PrintErrorMapToString(const ErrorLogMap &errors)
Definition: error-util.cc:153
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
int PlanNodeId
Definition: global-types.h:26
~Coordinator()
RuntimeState * runtime_state()
only valid after calling Exec(), and may return NULL if there is no executor
Status ExecRemoteFragment(void *exec_state)
client RuntimeProfile::EventSequence * events
Definition: coordinator.h:64
Status GetNext(RowBatch **batch, RuntimeState *state)
TExecPlanFragmentParams rpc_params
Definition: coordinator.cc:112
TFinalizeParams finalize_params_
Only valid if needs_finalization is true.
Definition: coordinator.h:222
static const std::string TOTAL_THROUGHPUT_COUNTER
Definition: scan-node.h:125
Struct for per fragment instance counters that will be aggregated by the coordinator.
Definition: coordinator.h:207
ProgressUpdater progress_
Keeps track of number of completed ranges and total scan ranges.
Definition: coordinator.h:230
static int GetNodeIdFromProfile(RuntimeProfile *p)
Extract node id from p->name().
Definition: exec-node.cc:62
TUniqueId query_id_
Definition: coordinator.h:194
boost::scoped_ptr< ObjectPool > obj_pool_
Object pool owned by the coordinator. Any executor will have its own pool.
Definition: coordinator.h:296
TExecSummary exec_summary_
Definition: coordinator.h:300
void CreateAggregateCounters(const std::vector< TPlanFragment > &fragments)
Create aggregate counters for all scan nodes in any of the fragments.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
void PopulatePathPermissionCache(hdfsFS fs, const std::string &path_str, PermissionCache *permissions_cache)
void SetExecPlanFragmentParams(QuerySchedule &schedule, int backend_num, const TPlanFragment &fragment, int fragment_idx, const FragmentExecParams &params, int instance_idx, const TNetworkAddress &coord, TExecPlanFragmentParams *rpc_params)
Fill in rpc_params based on parameters.
boost::mutex lock_
protects all fields below
Definition: coordinator.h:233
#define ADD_TIMER(profile, name)
DECLARE_int32(be_port)
pair< RuntimeProfile *, bool > Profile
CounterMap throughput_counters
Throughput counters per node.
Definition: coordinator.h:209
TDescriptorTable desc_tbl_
copied from TQueryExecRequest; constant across all fragments
Definition: coordinator.h:197
DEFINE_bool(insert_inherit_permissions, false,"If true, new directories created by ""INSERTs will inherit the permissions of their parent directories")
FileMoveMap files_to_move_
Definition: coordinator.h:293
void UpdateExecSummary(int fragment_idx, int instance_idx, RuntimeProfile *profile)
static void ProcessQueryOptions(const TQueryOptions &query_options, DebugOptions *debug_options)
Definition: coordinator.cc:278
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
Definition: network-util.cc:96
void MergeErrorMaps(ErrorLogMap *left, const ErrorLogMap &right)
Definition: error-util.cc:159
const TNetworkAddress backend_address
Definition: coordinator.cc:108
string PrintId(const TUniqueId &id, const string &separator)
Definition: debug-util.cc:97
void Cancel(const Status *cause=NULL)
std::vector< BackendExecState * > backend_exec_states_
BackendExecStates owned by obj_pool()
Definition: coordinator.h:216
DECLARE_string(hostname)
#define ADD_CHILD_TIMER(profile, name, parent)
#define SCOPED_TIMER(c)
int64_t ComputeTotalScanRangesComplete(int node_id)
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
Status Exec(QuerySchedule &schedule, std::vector< ExprContext * > *output_expr_ctxs)
Status FinalizeQuery()
bool has_called_wait_
Definition: coordinator.h:227
void CancelRemoteFragments()
bool PrepareCatalogUpdate(TUpdateCatalogRequest *catalog_update)
Status Wait()
ObjectPool * obj_pool()
Returns a local object pool.
Definition: coordinator.h:263
FragmentInstanceCounters aggregate_counters
Definition: coordinator.cc:139
SpinLock exec_summary_lock_
Execution summary for this query.
Definition: coordinator.h:299
static HdfsFsCache * instance()
Definition: hdfs-fs-cache.h:43
V * FindOrInsert(std::map< K, V > *m, const K &key, const V &default_val)
MemTracker * process_mem_tracker()
Definition: exec-env.h:86
#define VLOG_FILE_IS_ON
Definition: logging.h:65
bool returned_all_results_
Definition: coordinator.h:243
void MarkEvent(const std::string &label)
boost::accumulators::accumulator_set< int64_t, boost::accumulators::features< boost::accumulators::tag::min, boost::accumulators::tag::max, boost::accumulators::tag::mean, boost::accumulators::tag::variance > > SummaryStats
Definition: coordinator.h:181
static const std::string SCAN_RANGES_COMPLETE_COUNTER
Definition: scan-node.h:129
#define VLOG_QUERY
Definition: logging.h:57
Status query_status_
Definition: coordinator.h:237
std::map< TPlanNodeId, std::vector< TScanRangeParams > > PerNodeScanRanges
map from scan node id to a list of scan ranges
std::map< PlanNodeId, RuntimeProfile::Counter * > CounterMap
map from id of a scan node to a specific counter in the node's profile
Definition: coordinator.h:204
TExecNodePhase::type GetExecNodePhase(const string &key)
Definition: coordinator.cc:255
Status GetStatus()
Returns query_status_.
int64_t ComputeTotalThroughput(int node_id)
ObjectPool * obj_pool() const
Definition: runtime-state.h:92
int num_remaining_backends_
Definition: coordinator.h:278
bool needs_finalization_
True if the query needs a post-execution step to tidy up.
Definition: coordinator.h:219
HdfsOpThreadPool * hdfs_op_thread_pool()
Definition: exec-env.h:89
boost::unordered_map< TPlanNodeId, int > plan_node_id_to_summary_map_
A mapping of plan node ids to index into exec_summary_.nodes.
Definition: coordinator.h:303
FragmentInstanceCounters coordinator_counters_
Throughput counters for the coordinator fragment.
Definition: coordinator.h:342
static std::string OutputInsertStats(const PartitionStatusMap &stats, const std::string &prefix="")
Outputs the insert stats contained in the map of insert partition updates to a string.
Definition: data-sink.cc:103
#define VLOG_ROW
Definition: logging.h:59
static void MergeInsertStats(const TInsertStats &src_stats, TInsertStats *dst_stats)
Definition: data-sink.cc:90
RuntimeProfile::Counter * finalization_timer_
Total time spent in finalization (typically 0 except for INSERT into hdfs tables) ...
Definition: coordinator.h:348
boost::mutex wait_lock_
ensures single-threaded execution of Wait(); must not hold lock_ when acquiring this ...
Definition: coordinator.h:225
const RowDescriptor & row_desc() const
static const Status CANCELLED
Definition: status.h:88
CounterMap scan_ranges_complete_counters
Total finished scan ranges per node.
Definition: coordinator.h:212
boost::unordered_map< std::string, std::pair< bool, short > > PermissionCache
Definition: coordinator.h:450
std::string PrintExecSummary(const TExecSummary &exec_summary)
Print the exec summary as a formatted table.
PartitionStatusMap per_partition_status_
Definition: coordinator.h:289
BackendExecState(QuerySchedule &schedule, Coordinator *coord, const TNetworkAddress &coord_address, int backend_num, const TPlanFragment &fragment, int fragment_idx, const FragmentExecParams &params, int instance_idx, DebugOptions *debug_options, ObjectPool *obj_pool)
Definition: coordinator.cc:141
void InitExecProfile(const TQueryExecRequest &request)
static MemTracker * GetRequestPoolMemTracker(const std::string &pool_name, MemTracker *parent)
Definition: mem-tracker.cc:134
boost::shared_ptr< MemTracker > query_mem_tracker_
Definition: coordinator.h:252
Status WaitForAllBackends()
boost::condition_variable backend_completion_cv_
Definition: coordinator.h:274
bool operator()(const Profile &a, const Profile &b) const
std::vector< PerFragmentProfileData > fragment_profiles_
Definition: coordinator.h:339
void UpdateAverageProfile(BackendExecState *backend_exec_state)
string PrintPlanNodeType(const TPlanNodeType::type &type)
Definition: debug-util.cc:157
static const Status OK
Definition: status.h:87
const TExecSummary & exec_summary() const
Definition: coordinator.h:173
boost::function< void(const Status &status, RuntimeProfile *profile, bool done)> ReportStatusCallback
bool IsHiddenFile(const string &filename)
Definition: hdfs-util.cc:50
static Status Create(ObjectPool *pool, const TDescriptorTable &thrift_tbl, DescriptorTbl **tbl)
Definition: descriptors.cc:378
void ComputeFragmentSummaryStats(BackendExecState *backend_exec_state)
ExecEnv * exec_env_
Definition: coordinator.h:193
RuntimeProfile::EventSequence * query_events_
Event timeline for this query. Unowned.
Definition: coordinator.h:309
std::string GetErrorLog()
Status UpdateStatus(const Status &status, const TUniqueId *failed_fragment)
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
Definition: expr.cc:149
ImpaladQueryExecutor * executor_
execution state of coordinator fragment
Definition: expr-test.cc:71
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
#define VLOG_FILE
Definition: logging.h:58
ClientConnection< ImpalaInternalServiceClient > ImpalaInternalServiceConnection
Definition: client-cache.h:346
void CancelInternal()
Runs cancel logic. Assumes that lock_ is held.
bool ok() const
Definition: status.h:172
static const std::string PER_HOST_PEAK_MEM_COUNTER
Name of the counter that is tracking per query, per host peak mem usage.
string GetHdfsErrorMsg(const string &prefix, const string &file)
Definition: hdfs-util.cc:26
void ReportQuerySummary()
MemTracker * query_mem_tracker()
TDebugAction::type action
Definition: coordinator.cc:92
std::pair< const HdfsOp *, std::string > Error
Status FinalizeSuccessfulInsert()
Moves all temporary staging files to their final destinations.
TQueryCtx query_ctx_
Definition: coordinator.h:198
void CollectScanNodeCounters(RuntimeProfile *, FragmentInstanceCounters *result)
std::map< TErrorCode::type, TErrorLogEntry > ErrorLogMap
Tracks log messages per error code.
Definition: error-util.h:144
static Status Exec(Function function, void **args, int num_args, StatsMetric< double > *latencies=NULL)
Callers may pass a StatsMetric to gather the latency distribution of task execution.
boost::scoped_ptr< RuntimeProfile > query_profile_
Aggregate counters for the entire query.
Definition: coordinator.h:306
TStmtType::type stmt_type_
copied from TQueryExecRequest, governs when to call ReportQuerySummary
Definition: coordinator.h:201
#define VLOG_QUERY_IS_ON
Definition: logging.h:64
TDebugAction::type GetDebugAction(const string &key)
Definition: coordinator.cc:267
void PrintBackendInfo()
void Update(int64_t delta)
ImpalaInternalServiceClientCache * impalad_client_cache()
Definition: exec-env.h:76