Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
plan-fragment-executor.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <thrift/protocol/TDebugProtocol.h>
18 #include <boost/date_time/posix_time/posix_time_types.hpp>
19 #include <boost/unordered_map.hpp>
20 #include <boost/foreach.hpp>
21 #include <gutil/strings/substitute.h>
22 
23 #include "codegen/llvm-codegen.h"
24 #include "common/logging.h"
25 #include "common/object-pool.h"
26 #include "exec/data-sink.h"
27 #include "exec/exec-node.h"
28 #include "exec/exchange-node.h"
29 #include "exec/scan-node.h"
30 #include "exec/hdfs-scan-node.h"
32 #include "exprs/expr.h"
33 #include "runtime/descriptors.h"
35 #include "runtime/row-batch.h"
36 #include "runtime/mem-tracker.h"
37 #include "util/cgroups-mgr.h"
38 #include "util/cpu-info.h"
39 #include "util/debug-util.h"
40 #include "util/container-util.h"
41 #include "util/parse-util.h"
42 #include "util/mem-info.h"
44 #include "util/llama-util.h"
45 #include "util/pretty-printer.h"
46 
47 DEFINE_bool(serialize_batch, false, "serialize and deserialize each returned row batch");
48 DEFINE_int32(status_report_interval, 5, "interval between profile reports; in seconds");
49 DECLARE_bool(enable_rm);
50 
51 #include "common/names.h"
52 
53 namespace posix_time = boost::posix_time;
54 using boost::get_system_time;
55 using boost::system_time;
56 using namespace apache::thrift;
57 using namespace strings;
58 
59 namespace impala {
60 
61 const string PlanFragmentExecutor::PER_HOST_PEAK_MEM_COUNTER = "PerHostPeakMemUsage";
62 
63 PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
64  const ReportStatusCallback& report_status_cb) :
65  exec_env_(exec_env), plan_(NULL), report_status_cb_(report_status_cb),
66  report_thread_active_(false), done_(false), prepared_(false), closed_(false),
67  has_thread_token_(false), average_thread_tokens_(NULL),
68  mem_usage_sampled_counter_(NULL), thread_usage_sampled_counter_(NULL) {
69 }
70 
72  Close();
73  if (runtime_state_->query_resource_mgr() != NULL) {
75  }
76  // at this point, the report thread should have been stopped
77  DCHECK(!report_thread_active_);
78 }
79 
80 Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
82  const TPlanFragmentExecParams& params = request.params;
83  query_id_ = request.fragment_instance_ctx.query_ctx.query_id;
84 
85  VLOG_QUERY << "Prepare(): query_id=" << PrintId(query_id_) << " instance_id="
86  << PrintId(request.fragment_instance_ctx.fragment_instance_id);
87  VLOG(2) << "params:\n" << ThriftDebugString(params);
88 
89  if (request.__isset.reserved_resource) {
90  VLOG_QUERY << "Executing fragment in reserved resource:\n"
91  << request.reserved_resource;
92  }
93 
94  string cgroup = "";
95  if (FLAGS_enable_rm && request.__isset.reserved_resource) {
97  }
98 
99  runtime_state_.reset(
100  new RuntimeState(request.fragment_instance_ctx, cgroup, exec_env_));
101 
102  // total_time_counter() is in the runtime_state_ so start it up now.
103  SCOPED_TIMER(profile()->total_time_counter());
104 
105  // Register after setting runtime_state_ to ensure proper cleanup.
106  if (FLAGS_enable_rm && !cgroup.empty() && request.__isset.reserved_resource) {
107  bool is_first;
109  request.fragment_instance_ctx.fragment_instance_id, cgroup, &is_first));
110  // The first fragment using cgroup sets the cgroup's CPU shares based on the reserved
111  // resource.
112  if (is_first) {
113  DCHECK(request.__isset.reserved_resource);
114  int32_t cpu_shares = exec_env_->cgroups_mgr()->VirtualCoresToCpuShares(
115  request.reserved_resource.v_cpu_cores);
116  RETURN_IF_ERROR(exec_env_->cgroups_mgr()->SetCpuShares(cgroup, cpu_shares));
117  }
118  }
119 
120  // TODO: Find the reservation id when the resource request is not set
121  if (FLAGS_enable_rm && request.__isset.reserved_resource) {
122  TUniqueId reservation_id;
123  reservation_id << request.reserved_resource.reservation_id;
124 
125  // TODO: Combine this with RegisterFragment() etc.
126  QueryResourceMgr* res_mgr;
128  reservation_id, request.local_resource_address, &res_mgr);
129  DCHECK(res_mgr != NULL);
130  runtime_state_->SetQueryResourceMgr(res_mgr);
131  if (is_first) {
132  runtime_state_->query_resource_mgr()->InitVcoreAcquisition(
133  request.reserved_resource.v_cpu_cores);
134  }
135  }
136 
137  // reservation or a query option.
138  int64_t bytes_limit = -1;
139  if (runtime_state_->query_options().__isset.mem_limit &&
140  runtime_state_->query_options().mem_limit > 0) {
141  bytes_limit = runtime_state_->query_options().mem_limit;
142  VLOG_QUERY << "Using query memory limit from query options: "
143  << PrettyPrinter::Print(bytes_limit, TUnit::BYTES);
144  }
145 
146  int64_t rm_reservation_size_bytes = -1;
147  if (request.__isset.reserved_resource && request.reserved_resource.memory_mb > 0) {
148  rm_reservation_size_bytes =
149  static_cast<int64_t>(request.reserved_resource.memory_mb) * 1024L * 1024L;
150  // Queries that use more than the hard limit will be killed, so it's not useful to
151  // have a reservation larger than the hard limit. Clamp reservation bytes limit to the
152  // hard limit (if it exists).
153  if (rm_reservation_size_bytes > bytes_limit && bytes_limit != -1) {
154  runtime_state_->LogError(ErrorMsg(TErrorCode::FRAGMENT_EXECUTOR,
155  PrettyPrinter::PrintBytes(rm_reservation_size_bytes),
156  PrettyPrinter::PrintBytes(bytes_limit)));
157  rm_reservation_size_bytes = bytes_limit;
158  }
159  VLOG_QUERY << "Using RM reservation memory limit from resource reservation: "
160  << PrettyPrinter::Print(rm_reservation_size_bytes, TUnit::BYTES);
161  }
162 
163  DCHECK(!params.request_pool.empty());
164  runtime_state_->InitMemTrackers(query_id_, &params.request_pool,
165  bytes_limit, rm_reservation_size_bytes);
166  RETURN_IF_ERROR(runtime_state_->CreateBlockMgr());
167 
168  // Reserve one main thread from the pool
169  runtime_state_->resource_pool()->AcquireThreadToken();
170  if (runtime_state_->query_resource_mgr() != NULL) {
171  runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(1);
172  }
173  has_thread_token_ = true;
174 
175  average_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens",
176  bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
177  runtime_state_->resource_pool()));
179  TUnit::BYTES,
180  bind<int64_t>(mem_fn(&MemTracker::consumption),
181  runtime_state_->instance_mem_tracker()));
183  TUnit::UNIT,
184  bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
185  runtime_state_->resource_pool()));
186 
187  // set up desc tbl
188  DescriptorTbl* desc_tbl = NULL;
189  DCHECK(request.__isset.desc_tbl);
191  DescriptorTbl::Create(obj_pool(), request.desc_tbl, &desc_tbl));
192  runtime_state_->set_desc_tbl(desc_tbl);
193  VLOG_QUERY << "descriptor table for fragment="
194  << request.fragment_instance_ctx.fragment_instance_id
195  << "\n" << desc_tbl->DebugString();
196 
197  // set up plan
198  DCHECK(request.__isset.fragment);
200  ExecNode::CreateTree(obj_pool(), request.fragment.plan, *desc_tbl, &plan_));
201  runtime_state_->set_fragment_root_id(plan_->id());
202 
203  if (request.params.__isset.debug_node_id) {
204  DCHECK(request.params.__isset.debug_action);
205  DCHECK(request.params.__isset.debug_phase);
206  ExecNode::SetDebugOptions(request.params.debug_node_id,
207  request.params.debug_phase, request.params.debug_action, plan_);
208  }
209 
210  // set #senders of exchange nodes before calling Prepare()
211  vector<ExecNode*> exch_nodes;
212  plan_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
213  BOOST_FOREACH(ExecNode* exch_node, exch_nodes)
214  {
215  DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE);
216  int num_senders = FindWithDefault(params.per_exch_num_senders,
217  exch_node->id(), 0);
218  DCHECK_GT(num_senders, 0);
219  static_cast<ExchangeNode*>(exch_node)->set_num_senders(num_senders);
220  }
221 
222  // set scan ranges
223  vector<ExecNode*> scan_nodes;
224  vector<TScanRangeParams> no_scan_ranges;
225  plan_->CollectScanNodes(&scan_nodes);
226  for (int i = 0; i < scan_nodes.size(); ++i) {
227  ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
228  const vector<TScanRangeParams>& scan_ranges = FindWithDefault(
229  params.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
230  scan_node->SetScanRanges(scan_ranges);
231  }
232 
233  RuntimeProfile::Counter* prepare_timer = ADD_TIMER(profile(), "PrepareTime");
234  {
235  SCOPED_TIMER(prepare_timer);
237  }
238 
239  PrintVolumeIds(params.per_node_scan_ranges);
240 
241  // set up sink, if required
242  if (request.fragment.__isset.output_sink) {
244  obj_pool(), request.fragment.output_sink, request.fragment.output_exprs,
245  params, row_desc(), &sink_));
246  RETURN_IF_ERROR(sink_->Prepare(runtime_state()));
247 
248  RuntimeProfile* sink_profile = sink_->profile();
249  if (sink_profile != NULL) {
250  profile()->AddChild(sink_profile);
251  }
252  } else {
253  sink_.reset(NULL);
254  }
255 
256  // set up profile counters
259  ADD_COUNTER(profile(), "RowsProduced", TUnit::UNIT);
262 
263  row_batch_.reset(new RowBatch(plan_->row_desc(), runtime_state_->batch_size(),
264  runtime_state_->instance_mem_tracker()));
265  VLOG(2) << "plan_root=\n" << plan_->DebugString();
266  prepared_ = true;
267  return Status::OK;
268 }
269 
271  if (!runtime_state_->codegen_created()) return;
272  LlvmCodeGen* codegen;
273  Status status = runtime_state_->GetCodegen(&codegen, /* initalize */ false);
274  DCHECK(status.ok());
275  DCHECK_NOTNULL(codegen);
276  status = codegen->FinalizeModule();
277  if (!status.ok()) {
278  stringstream ss;
279  ss << "Error with codegen for this query: " << status.GetDetail();
280  runtime_state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
281  }
282 }
283 
285  const PerNodeScanRanges& per_node_scan_ranges) {
286  if (per_node_scan_ranges.empty()) return;
287 
288  HdfsScanNode::PerVolumnStats per_volume_stats;
289  BOOST_FOREACH(const PerNodeScanRanges::value_type& entry, per_node_scan_ranges) {
290  HdfsScanNode::UpdateHdfsSplitStats(entry.second, &per_volume_stats);
291  }
292 
293  stringstream str;
294 
295  HdfsScanNode::PrintHdfsSplitStats(per_volume_stats, &str);
297  VLOG_FILE
298  << "Hdfs split stats (<volume id>:<# splits>/<split lengths>) for query="
299  << query_id_ << ":\n" << str.str();
300 }
301 
303  VLOG_QUERY << "Open(): instance_id="
304  << runtime_state_->fragment_instance_id();
305  // we need to start the profile-reporting thread before calling Open(), since it
306  // may block
307  if (!report_status_cb_.empty() && FLAGS_status_report_interval > 0) {
308  unique_lock<mutex> l(report_thread_lock_);
309  report_thread_.reset(
310  new Thread("plan-fragment-executor", "report-profile",
312  // make sure the thread started up, otherwise ReportProfile() might get into a race
313  // with StopReportThread()
315  report_thread_active_ = true;
316  }
317 
319 
320  Status status = OpenInternal();
321  if (!status.ok() && !status.IsCancelled() && !status.IsMemLimitExceeded()) {
322  // Log error message in addition to returning in Status. Queries that do not
323  // fetch results (e.g. insert) may not receive the message directly and can
324  // only retrieve the log.
325  runtime_state_->LogError(status.msg());
326  }
327  UpdateStatus(status);
328  return status;
329 }
330 
332  {
333  SCOPED_TIMER(profile()->total_time_counter());
335  }
336 
337  if (sink_.get() == NULL) return Status::OK;
338 
339  RETURN_IF_ERROR(sink_->Open(runtime_state_.get()));
340 
341  // If there is a sink, do all the work of driving it here, so that
342  // when this returns the query has actually finished
343  while (!done_) {
344  RowBatch* batch;
346  if (batch == NULL) break;
347  if (VLOG_ROW_IS_ON) {
348  VLOG_ROW << "OpenInternal: #rows=" << batch->num_rows();
349  for (int i = 0; i < batch->num_rows(); ++i) {
350  VLOG_ROW << PrintRow(batch->GetRow(i), row_desc());
351  }
352  }
353 
354  SCOPED_TIMER(profile()->total_time_counter());
355  RETURN_IF_ERROR(sink_->Send(runtime_state(), batch, done_));
356  }
357 
358  // Close the sink *before* stopping the report thread. Close may
359  // need to add some important information to the last report that
360  // gets sent. (e.g. table sinks record the files they have written
361  // to in this method)
362  // The coordinator report channel waits until all backends are
363  // either in error or have returned a status report with done =
364  // true, so tearing down any data stream state (a separate
365  // channel) in Close is safe.
366  SCOPED_TIMER(profile()->total_time_counter());
367  sink_->Close(runtime_state());
368  done_ = true;
369 
371  return Status::OK;
372 }
373 
375  VLOG_FILE << "ReportProfile(): instance_id="
376  << runtime_state_->fragment_instance_id();
377  DCHECK(!report_status_cb_.empty());
378  unique_lock<mutex> l(report_thread_lock_);
379  // tell Open() that we started
380  report_thread_started_cv_.notify_one();
381 
382  // Jitter the reporting time of remote fragments by a random amount between
383  // 0 and the report_interval. This way, the coordinator doesn't get all the
384  // updates at once so its better for contention as well as smoother progress
385  // reporting.
386  int report_fragment_offset = rand() % FLAGS_status_report_interval;
387  system_time timeout = get_system_time()
388  + posix_time::seconds(report_fragment_offset);
389  // We don't want to wait longer than it takes to run the entire fragment.
390  stop_report_thread_cv_.timed_wait(l, timeout);
391 
392  while (report_thread_active_) {
393  system_time timeout = get_system_time()
394  + posix_time::seconds(FLAGS_status_report_interval);
395 
396  // timed_wait can return because the timeout occurred or the condition variable
397  // was signaled. We can't rely on its return value to distinguish between the
398  // two cases (e.g. there is a race here where the wait timed out but before grabbing
399  // the lock, the condition variable was signaled). Instead, we will use an external
400  // flag, report_thread_active_, to coordinate this.
401  stop_report_thread_cv_.timed_wait(l, timeout);
402 
403  if (VLOG_FILE_IS_ON) {
404  VLOG_FILE << "Reporting " << (!report_thread_active_ ? "final " : " ")
405  << "profile for instance " << runtime_state_->fragment_instance_id();
406  stringstream ss;
407  profile()->PrettyPrint(&ss);
408  VLOG_FILE << ss.str();
409  }
410 
411  if (!report_thread_active_) break;
412 
413  if (completed_report_sent_.Read() == 0) {
414  // No complete fragment report has been sent.
415  SendReport(false);
416  }
417  }
418 
419  VLOG_FILE << "exiting reporting thread: instance_id="
420  << runtime_state_->fragment_instance_id();
421 }
422 
424  if (report_status_cb_.empty()) return;
425 
426  Status status;
427  {
428  lock_guard<mutex> l(status_lock_);
429  status = status_;
430  }
431 
432  // Update the counter for the peak per host mem usage.
433  per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption());
434 
435  // This will send a report even if we are cancelled. If the query completed correctly
436  // but fragments still need to be cancelled (e.g. limit reached), the coordinator will
437  // be waiting for a final report and profile.
438  report_status_cb_(status, profile(), done || !status.ok());
439 }
440 
442  if (!report_thread_active_) return;
443  {
444  lock_guard<mutex> l(report_thread_lock_);
445  report_thread_active_ = false;
446  }
447  stop_report_thread_cv_.notify_one();
448  report_thread_->Join();
449 }
450 
451 // TODO: why can't we just put the total_time_counter() at the
452 // beginning of Open() and GetNext(). This seems to really mess
453 // the timer here, presumably because the data stream sender is
454 // multithreaded and the timer we use gets confused.
456  VLOG_FILE << "GetNext(): instance_id="
457  << runtime_state_->fragment_instance_id();
458  Status status = GetNextInternal(batch);
459  UpdateStatus(status);
460  if (done_) {
461  VLOG_QUERY << "Finished executing fragment query_id=" << PrintId(query_id_)
462  << " instance_id=" << PrintId(runtime_state_->fragment_instance_id());
464  // GetNext() uses *batch = NULL to signal the end.
465  if (*batch != NULL && (*batch)->num_rows() == 0) *batch = NULL;
466  }
467 
468  return status;
469 }
470 
472  if (done_) {
473  *batch = NULL;
474  return Status::OK;
475  }
476 
477  while (!done_) {
478  row_batch_->Reset();
479  SCOPED_TIMER(profile()->total_time_counter());
481  plan_->GetNext(runtime_state_.get(), row_batch_.get(), &done_));
482  *batch = row_batch_.get();
483  if (row_batch_->num_rows() > 0) {
485  break;
486  }
487  }
488 
489  return Status::OK;
490 }
491 
493  // Check the atomic flag. If it is set, then a fragment complete report has already
494  // been sent.
495  bool send_report = completed_report_sent_.CompareAndSwap(0,1);
496 
497  fragment_sw_.Stop();
498  int64_t cpu_and_wait_time = fragment_sw_.ElapsedTime();
500  int64_t cpu_time = cpu_and_wait_time
501  - runtime_state_->total_storage_wait_timer()->value()
502  - runtime_state_->total_network_send_timer()->value()
503  - runtime_state_->total_network_receive_timer()->value();
504  // Timing is not perfect.
505  if (cpu_time < 0)
506  cpu_time = 0;
507  runtime_state_->total_cpu_timer()->Add(cpu_time);
508 
511  if (send_report) SendReport(true);
512 }
513 
515  if (status.ok()) return;
516 
517  bool send_report = completed_report_sent_.CompareAndSwap(0,1);
518 
519  {
520  lock_guard<mutex> l(status_lock_);
521  if (status_.ok()) {
522  if (status.IsMemLimitExceeded()) runtime_state_->SetMemLimitExceeded();
523  status_ = status;
524  }
525  }
526 
528  if (send_report) SendReport(true);
529 }
530 
532  VLOG_QUERY << "Cancel(): instance_id="
533  << runtime_state_->fragment_instance_id();
534  DCHECK(prepared_);
535  runtime_state_->set_is_cancelled(true);
536  runtime_state_->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
537 }
538 
540  return plan_->row_desc();
541 }
542 
544  return runtime_state_->runtime_profile();
545 }
546 
548  return plan_->ReachedLimit();
549 }
550 
552  if (has_thread_token_) {
553  has_thread_token_ = false;
554  runtime_state_->resource_pool()->ReleaseThreadToken(true);
555  if (runtime_state_->query_resource_mgr() != NULL) {
556  runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(-1);
557  }
561  }
562 }
563 
565  if (closed_) return;
566  row_batch_.reset();
567  // Prepare may not have been called, which sets runtime_state_
568  if (runtime_state_.get() != NULL) {
569  if (runtime_state_->query_resource_mgr() != NULL) {
571  runtime_state_->fragment_instance_id(), runtime_state_->cgroup());
572  }
573  if (plan_ != NULL) plan_->Close(runtime_state_.get());
574  if (sink_.get() != NULL) sink_->Close(runtime_state());
575  BOOST_FOREACH(DiskIoMgr::RequestContext* context,
576  *runtime_state_->reader_contexts()) {
577  runtime_state_->io_mgr()->UnregisterContext(context);
578  }
579  exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool());
580  }
581  if (mem_usage_sampled_counter_ != NULL) {
584  }
585  closed_ = true;
586 }
587 
588 }
int id() const
Definition: exec-node.h:154
void UpdateStatus(const Status &status)
boost::scoped_ptr< RowBatch > row_batch_
void CollectNodes(TPlanNodeType::type node_type, std::vector< ExecNode * > *nodes)
Definition: exec-node.cc:359
int64_t consumption() const
Returns the memory consumed in bytes.
Definition: mem-tracker.h:298
void CollectScanNodes(std::vector< ExecNode * > *nodes)
Collect all scan node types.
Definition: exec-node.cc:366
int num_rows() const
Definition: row-batch.h:215
const std::string GetDetail() const
Definition: status.cc:184
int32_t VirtualCoresToCpuShares(int16_t v_cpu_cores)
Definition: cgroups-mgr.cc:59
Counter * AddSamplingCounter(const std::string &name, Counter *src_counter)
void AddInfoString(const std::string &key, const std::string &value)
Status GetNextInternal(RowBatch **batch)
RuntimeProfile::TimeSeriesCounter * thread_usage_sampled_counter_
Sampled thread usage (tokens) at even time intervals.
Status RegisterFragment(const TUniqueId &fragment_instance_id, const std::string &cgroup, bool *is_first)
Definition: cgroups-mgr.cc:198
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
void FragmentComplete()
Called when the fragment execution is complete to finalize counters.
RuntimeProfile::Counter * per_host_mem_usage_
static const std::string HDFS_SPLIT_STATS_DESC
Description string for the per volume stats output.
MonotonicStopWatch fragment_sw_
Stopwatch for this entire fragment. Started in Prepare(), stopped in Close().
std::string UniqueIdToCgroup(const std::string &unique_id) const
Definition: cgroups-mgr.cc:54
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
RuntimeProfile::Counter * average_thread_tokens_
#define ADD_TIMER(profile, name)
static Status CreateTree(ObjectPool *pool, const TPlan &plan, const DescriptorTbl &descs, ExecNode **root)
Definition: exec-node.cc:199
const DescriptorTbl & desc_tbl()
DEFINE_int32(status_report_interval, 5,"interval between profile reports; in seconds")
Status SetCpuShares(const std::string &cgroup, int32_t num_shares)
Definition: cgroups-mgr.cc:101
bool GetQueryResourceMgr(const TUniqueId &query_id, const TUniqueId &reservation_id, const TNetworkAddress &local_resource_address, QueryResourceMgr **res_mgr)
boost::condition_variable stop_report_thread_cv_
const RowDescriptor & row_desc() const
Definition: exec-node.h:156
string PrintId(const TUniqueId &id, const string &separator)
Definition: debug-util.cc:97
#define COUNTER_ADD(c, v)
bool ReachedLimit()
Definition: exec-node.h:159
void Cancel()
Initiate cancellation. Must not be called until after Prepare() returned.
virtual void Set(int64_t value)
#define SCOPED_TIMER(c)
boost::condition_variable report_thread_started_cv_
ResourceBroker * resource_broker()
Definition: exec-env.h:95
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
T Read()
Safe read of the value.
Definition: atomic.h:100
void UnregisterPool(ResourcePool *pool)
ReportStatusCallback report_status_cb_
profile reporting-related
Status Prepare(const TExecPlanFragmentParams &request)
RuntimeProfile * profile()
Profile information for plan and output sink.
void ReleaseThreadToken()
Releases the thread token for this fragment executor.
TPlanNodeType::type type() const
Definition: exec-node.h:155
#define VLOG_FILE_IS_ON
Definition: logging.h:65
LLVM code generator. This is the top level object to generate jitted code.
Definition: llvm-codegen.h:107
bool closed_
true if Close() has been called
CgroupsMgr * cgroups_mgr()
Definition: exec-env.h:88
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
Definition: exec-node.cc:345
bool CompareAndSwap(T old_val, T new_val)
Returns true if the atomic compare-and-swap was successful.
Definition: atomic.h:131
#define VLOG_QUERY
Definition: logging.h:57
boost::scoped_ptr< DataSink > sink_
void PrintVolumeIds(const TPlanExecParams &params)
Print stats about scan ranges for each volumeId in params to info log.
bool IsCancelled() const
Definition: status.h:174
void UnregisterQueryResourceMgr(const TUniqueId &query_id)
virtual Status Prepare(RuntimeState *state)
Definition: exec-node.cc:130
#define ADD_COUNTER(profile, name, unit)
std::string DebugString() const
Definition: descriptors.cc:608
#define VLOG_ROW
Definition: logging.h:59
void SetScanRanges(const std::vector< TScanRangeParams > &scan_range_params)
Definition: scan-node.h:89
Abstract base class of all scan nodes; introduces SetScanRange().
Definition: scan-node.h:77
uint64_t ElapsedTime() const
Returns time in nanosecond.
Definition: stopwatch.h:105
boost::scoped_ptr< RuntimeState > runtime_state_
static void PrintHdfsSplitStats(const PerVolumnStats &per_volume_stats, std::stringstream *ss)
TimeSeriesCounter * AddTimeSeriesCounter(const std::string &name, TUnit::type unit, DerivedCounterFunction sample_fn)
RuntimeProfile::Counter * rows_produced_counter_
Number of rows returned by this fragment.
static void StopSamplingCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'.
static void UpdateHdfsSplitStats(const std::vector< TScanRangeParams > &scan_range_params_list, PerVolumnStats *per_volume_stats)
Update the per volume stats with the given scan range params list.
RuntimeState * runtime_state()
call these only after Prepare()
static const Status OK
Definition: status.h:87
void PrettyPrint(std::ostream *s, const std::string &prefix="") const
std::map< TPlanNodeId, std::vector< TScanRangeParams > > PerNodeScanRanges
typedef for TPlanFragmentExecParams.per_node_scan_ranges
boost::function< void(const Status &status, RuntimeProfile *profile, bool done)> ReportStatusCallback
static Status Create(ObjectPool *pool, const TDescriptorTable &thrift_tbl, DescriptorTbl **tbl)
Definition: descriptors.cc:378
ExecEnv * exec_env_
Definition: coordinator.h:193
bool ReachedLimit()
Returns true if this query has a limit and it has been reached.
const V & FindWithDefault(const std::map< K, V > &m, const K &key, const V &default_val)
DEFINE_bool(serialize_batch, false,"serialize and deserialize each returned row batch")
const ErrorMsg & msg() const
Returns the error message associated with a non-successful status.
Definition: status.h:189
RuntimeProfile::TimeSeriesCounter * mem_usage_sampled_counter_
Sampled memory usage at even time intervals.
#define VLOG_FILE
Definition: logging.h:58
bool done_
true if plan_->GetNext() indicated that it's done
#define VLOG_ROW_IS_ON
Definition: logging.h:66
void AddChild(RuntimeProfile *child, bool indent=true, RuntimeProfile *location=NULL)
virtual Status Open(RuntimeState *state)
Definition: exec-node.cc:154
Only CPU-heavy threads need be managed using this class.
string PrintRow(TupleRow *row, const RowDescriptor &d)
Definition: debug-util.cc:192
bool ok() const
Definition: status.h:172
static const std::string PER_HOST_PEAK_MEM_COUNTER
Name of the counter that is tracking per query, per host peak mem usage.
bool prepared_
true if Prepare() returned OK
MemTracker * query_mem_tracker()
ThreadResourceMgr * thread_mgr()
Definition: exec-env.h:87
virtual void Close(RuntimeState *state)
Definition: exec-node.cc:166
static void SetDebugOptions(int node_id, TExecNodePhase::type phase, TDebugAction::type action, ExecNode *tree)
Set debug action for node with given id in 'tree'.
Definition: exec-node.cc:332
boost::unordered_map< int32_t, std::pair< int, int64_t > > PerVolumnStats
map from volume id to <number of split, per volume split lengths>
Status UnregisterFragment(const TUniqueId &fragment_instance_id, const std::string &cgroup)
Definition: cgroups-mgr.cc:215
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)=0
Status GetNext(RowBatch **batch)
DECLARE_bool(enable_rm)
boost::scoped_ptr< Thread > report_thread_
bool IsMemLimitExceeded() const
Definition: status.h:178
static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter *counter)
Stops 'counter' from receiving any more samples.
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161
bool has_thread_token_
true if this fragment has not returned the thread token to the thread resource mgr ...
static Status CreateDataSink(ObjectPool *pool, const TDataSink &thrift_sink, const std::vector< TExpr > &output_exprs, const TPlanFragmentExecParams &params, const RowDescriptor &row_desc, boost::scoped_ptr< DataSink > *sink)
Definition: data-sink.cc:34