Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-scan-node.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/hdfs-scan-node.h"
17 #include "exec/hdfs-text-scanner.h"
21 #include "exec/hdfs-avro-scanner.h"
23 
24 #include <sstream>
25 #include <boost/algorithm/string.hpp>
26 #include <boost/foreach.hpp>
27 #include <boost/filesystem.hpp>
28 #include <gutil/strings/substitute.h>
29 
30 #include <hdfs.h>
31 
32 #include "codegen/llvm-codegen.h"
33 #include "common/logging.h"
34 #include "common/object-pool.h"
35 #include "exprs/expr-context.h"
36 #include "runtime/descriptors.h"
37 #include "runtime/hdfs-fs-cache.h"
38 #include "runtime/runtime-state.h"
39 #include "runtime/mem-pool.h"
40 #include "runtime/raw-value.h"
41 #include "runtime/row-batch.h"
42 #include "util/bit-util.h"
43 #include "util/container-util.h"
44 #include "util/debug-util.h"
45 #include "util/disk-info.h"
46 #include "util/error-util.h"
47 #include "util/hdfs-util.h"
48 #include "util/impalad-metrics.h"
50 #include "util/runtime-profile.h"
51 
52 #include "gen-cpp/PlanNodes_types.h"
53 
54 #include "common/names.h"
55 
56 DEFINE_int32(max_row_batches, 0, "the maximum size of materialized_row_batches_");
57 DECLARE_string(cgroup_hierarchy_path);
58 DECLARE_bool(enable_rm);
59 
60 namespace filesystem = boost::filesystem;
61 using namespace impala;
62 using namespace llvm;
63 using namespace strings;
64 
66  "Hdfs split stats (<volume id>:<# splits>/<split lengths>)";
67 
68 // Amount of memory that we approximate a scanner thread will use not including IoBuffers.
69 // The memory used does not vary considerably between file formats (just a couple of MBs).
70 // This value is conservative and taken from running against the tpch lineitem table.
71 // TODO: revisit how we do this.
72 const int SCANNER_THREAD_MEM_USAGE = 32 * 1024 * 1024;
73 
74 // Estimated upper bound on the compression ratio of compressed text files. Used to
75 // estimate scanner thread memory usage.
77 
78 // Determines how many unexpected remote bytes trigger an error in the runtime state
79 const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024;
80 
81 HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode,
82  const DescriptorTbl& descs)
83  : ScanNode(pool, tnode, descs),
84  thrift_plan_node_(new TPlanNode(tnode)),
85  runtime_state_(NULL),
86  tuple_id_(tnode.hdfs_scan_node.tuple_id),
87  reader_context_(NULL),
88  tuple_desc_(NULL),
89  unknown_disk_id_warned_(false),
90  initial_ranges_issued_(false),
91  scanner_thread_bytes_required_(0),
92  disks_accessed_bitmap_(TUnit::UNIT, 0),
93  done_(false),
94  all_ranges_started_(false),
95  counters_running_(false),
96  rm_callback_id_(-1) {
97  max_materialized_row_batches_ = FLAGS_max_row_batches;
99  // TODO: This parameter has an U-shaped effect on performance: increasing the value
100  // would first improves performance, but further increasing would degrade performance.
101  // Investigate and tune this.
104  }
106 }
107 
109 }
110 
111 Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
112  SCOPED_TIMER(runtime_profile_->total_time_counter());
113 
114  if (!initial_ranges_issued_) {
115  // We do this in GetNext() to ensure that all execution time predicates have
116  // been generated (e.g. probe side bitmap filters).
117  // TODO: we could do dynamic partition pruning here as well.
118  initial_ranges_issued_ = true;
119  // Issue initial ranges for all file types.
121  per_type_files_[THdfsFileFormat::TEXT]));
123  per_type_files_[THdfsFileFormat::SEQUENCE_FILE]));
125  per_type_files_[THdfsFileFormat::RC_FILE]));
127  per_type_files_[THdfsFileFormat::AVRO]));
129  per_type_files_[THdfsFileFormat::PARQUET]));
130  if (progress_.done()) SetDone();
131  }
132 
133  Status status = GetNextInternal(state, row_batch, eos);
134  if (status.IsMemLimitExceeded()) state->SetMemLimitExceeded();
135  if (!status.ok() || *eos) StopAndFinalizeCounters();
136  return status;
137 }
138 
140  RuntimeState* state, RowBatch* row_batch, bool* eos) {
141  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
142  RETURN_IF_CANCELLED(state);
144 
145  if (ReachedLimit()) {
146  // LIMIT 0 case. Other limit values handled below.
147  DCHECK_EQ(limit_, 0);
148  *eos = true;
149  return Status::OK;
150  }
151  *eos = false;
152  RowBatch* materialized_batch = materialized_row_batches_->GetBatch();
153  if (materialized_batch != NULL) {
154  num_owned_io_buffers_ -= materialized_batch->num_io_buffers();
155  row_batch->AcquireState(materialized_batch);
156  // Update the number of materialized rows now instead of when they are materialized.
157  // This means that scanners might process and queue up more rows than are necessary
158  // for the limit case but we want to avoid the synchronized writes to
159  // num_rows_returned_.
160  num_rows_returned_ += row_batch->num_rows();
162 
163  if (ReachedLimit()) {
164  int num_rows_over = num_rows_returned_ - limit_;
165  row_batch->set_num_rows(row_batch->num_rows() - num_rows_over);
166  num_rows_returned_ -= num_rows_over;
168 
169  *eos = true;
170  SetDone();
171  }
172  DCHECK_EQ(materialized_batch->num_io_buffers(), 0);
173  delete materialized_batch;
174  return Status::OK;
175  }
176  // The RowBatchQueue was shutdown either because all scan ranges are complete or a
177  // scanner thread encountered an error. Check status_ to distinguish those cases.
178  *eos = true;
179  unique_lock<mutex> l(lock_);
180  return status_;
181 }
182 
184  hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id,
185  int disk_id, bool try_cache, bool expected_local, int64_t mtime) {
186  DCHECK_GE(disk_id, -1);
187  // Require that the scan range is within [0, file_length). While this cannot be used
188  // to guarantee safety (file_length metadata may be stale), it avoids different
189  // behavior between Hadoop FileSystems (e.g. s3n hdfsSeek() returns error when seeking
190  // beyond the end of the file).
191  DCHECK_GE(offset, 0);
192  DCHECK_GE(len, 0);
193  DCHECK_LE(offset + len, GetFileDesc(file)->file_length)
194  << "Scan range beyond end of file (offset=" << offset << ", len=" << len << ")";
195  disk_id = runtime_state_->io_mgr()->AssignQueue(file, disk_id, expected_local);
196 
197  ScanRangeMetadata* metadata =
198  runtime_state_->obj_pool()->Add(new ScanRangeMetadata(partition_id));
199  DiskIoMgr::ScanRange* range =
201  range->Reset(fs, file, len, offset, disk_id, try_cache, expected_local,
202  mtime, metadata);
203  return range;
204 }
205 
206 HdfsFileDesc* HdfsScanNode::GetFileDesc(const string& filename) {
207  DCHECK(file_descs_.find(filename) != file_descs_.end());
208  return file_descs_[filename];
209 }
210 
211 void HdfsScanNode::SetFileMetadata(const string& filename, void* metadata) {
212  unique_lock<mutex> l(metadata_lock_);
213  DCHECK(per_file_metadata_.find(filename) == per_file_metadata_.end());
214  per_file_metadata_[filename] = metadata;
215 }
216 
217 void* HdfsScanNode::GetFileMetadata(const string& filename) {
218  unique_lock<mutex> l(metadata_lock_);
219  map<string, void*>::iterator it = per_file_metadata_.find(filename);
220  if (it == per_file_metadata_.end()) return NULL;
221  return it->second;
222 }
223 
224 void* HdfsScanNode::GetCodegenFn(THdfsFileFormat::type type) {
225  CodegendFnMap::iterator it = codegend_fn_map_.find(type);
226  if (it == codegend_fn_map_.end()) return NULL;
227  return it->second;
228 }
229 
231  ScannerContext* context, Status* status) {
232  DCHECK(context != NULL);
233  HdfsScanner* scanner = NULL;
234  THdfsCompression::type compression =
235  context->GetStream()->file_desc()->file_compression;
236 
237  // Create a new scanner for this file format and compression.
238  switch (partition->file_format()) {
239  case THdfsFileFormat::TEXT:
240  // Lzo-compressed text files are scanned by a scanner that it is implemented as a
241  // dynamic library, so that Impala does not include GPL code.
242  if (compression == THdfsCompression::LZO) {
244  } else {
245  scanner = new HdfsTextScanner(this, runtime_state_);
246  }
247  break;
248  case THdfsFileFormat::SEQUENCE_FILE:
249  scanner = new HdfsSequenceScanner(this, runtime_state_);
250  break;
251  case THdfsFileFormat::RC_FILE:
252  scanner = new HdfsRCFileScanner(this, runtime_state_);
253  break;
254  case THdfsFileFormat::AVRO:
255  scanner = new HdfsAvroScanner(this, runtime_state_);
256  break;
257  case THdfsFileFormat::PARQUET:
258  scanner = new HdfsParquetScanner(this, runtime_state_);
259  break;
260  default:
261  DCHECK(false) << "Unknown Hdfs file format type:" << partition->file_format();
262  return NULL;
263  }
264  DCHECK(scanner != NULL);
265  runtime_state_->obj_pool()->Add(scanner);
266  *status = scanner->Prepare(context);
267  return scanner;
268 }
269 
271  const vector<ExprContext*>& value_ctxs) {
272  if (partition_key_slots_.empty()) return NULL;
273 
274  // Look to protect access to partition_key_pool_ and value_ctxs
275  // TODO: we can push the lock to the mempool and exprs_values should not
276  // use internal memory.
277  Tuple* template_tuple = InitEmptyTemplateTuple();
278 
279  unique_lock<mutex> l(lock_);
280  for (int i = 0; i < partition_key_slots_.size(); ++i) {
281  const SlotDescriptor* slot_desc = partition_key_slots_[i];
282  // Exprs guaranteed to be literals, so can safely be evaluated without a row context
283  void* value = value_ctxs[slot_desc->col_pos()]->GetValue(NULL);
284  RawValue::Write(value, template_tuple, slot_desc, NULL);
285  }
286  return template_tuple;
287 }
288 
290  Tuple* template_tuple = NULL;
291  {
292  unique_lock<mutex> l(lock_);
293  template_tuple = Tuple::Create(tuple_desc_->byte_size(), scan_node_pool_.get());
294  }
295  memset(template_tuple, 0, tuple_desc_->byte_size());
296  return template_tuple;
297 }
298 
300  unique_lock<mutex> l(lock_);
301  scan_node_pool_->AcquireData(pool, false);
302 }
303 
305  SCOPED_TIMER(runtime_profile_->total_time_counter());
306  runtime_state_ = state;
308 
310  DCHECK(tuple_desc_ != NULL);
311 
312  if (!state->cgroup().empty()) {
315  }
316 
317  // One-time initialisation of state that is constant across scan ranges
318  DCHECK(tuple_desc_->table_desc() != NULL);
319  hdfs_table_ = static_cast<const HdfsTableDescriptor*>(tuple_desc_->table_desc());
320  scan_node_pool_.reset(new MemPool(mem_tracker()));
321 
322  // Gather materialized partition-key slots and non-partition slots.
323  const vector<SlotDescriptor*>& slots = tuple_desc_->slots();
324  for (size_t i = 0; i < slots.size(); ++i) {
325  if (!slots[i]->is_materialized()) continue;
326  if (hdfs_table_->IsClusteringCol(slots[i])) {
327  partition_key_slots_.push_back(slots[i]);
328  } else {
329  materialized_slots_.push_back(slots[i]);
330  }
331  }
332 
333  // Order the materialized slots such that for schemaless file formats (e.g. text) the
334  // order corresponds to the physical order in files. For formats where the file schema
335  // is independent of the table schema (e.g. Avro, Parquet), this step is not necessary.
336  sort(materialized_slots_.begin(), materialized_slots_.end(),
338 
339  // Populate mapping from slot path to index into materialized_slots_.
340  for (int i = 0; i < materialized_slots_.size(); ++i) {
342  }
343 
344  // Initialize is_materialized_col_
345  is_materialized_col_.resize(hdfs_table_->num_cols());
346  for (int i = 0; i < hdfs_table_->num_cols(); ++i) {
347  is_materialized_col_[i] = GetMaterializedSlotIdx(vector<int>(1, i)) != SKIP_COLUMN;
348  }
349 
350  HdfsFsCache::HdfsFsMap fs_cache;
351  // Convert the TScanRangeParams into per-file DiskIO::ScanRange objects and populate
352  // partition_ids_, file_descs_, and per_type_files_.
353  DCHECK(scan_range_params_ != NULL)
354  << "Must call SetScanRanges() before calling Prepare()";
355  int num_ranges_missing_volume_id = 0;
356  for (int i = 0; i < scan_range_params_->size(); ++i) {
357  DCHECK((*scan_range_params_)[i].scan_range.__isset.hdfs_file_split);
358  const THdfsFileSplit& split = (*scan_range_params_)[i].scan_range.hdfs_file_split;
359  partition_ids_.insert(split.partition_id);
360  HdfsPartitionDescriptor* partition_desc =
361  hdfs_table_->GetPartition(split.partition_id);
362  filesystem::path file_path(partition_desc->location());
363  file_path.append(split.file_name, filesystem::path::codecvt());
364  const string& native_file_path = file_path.native();
365 
366  HdfsFileDesc* file_desc = NULL;
367  FileDescMap::iterator file_desc_it = file_descs_.find(native_file_path);
368  if (file_desc_it == file_descs_.end()) {
369  // Add new file_desc to file_descs_ and per_type_files_
370  file_desc = runtime_state_->obj_pool()->Add(new HdfsFileDesc(native_file_path));
371  file_descs_[native_file_path] = file_desc;
372  file_desc->file_length = split.file_length;
373  file_desc->mtime = split.mtime;
374  file_desc->file_compression = split.file_compression;
375  RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
376  native_file_path, &file_desc->fs, &fs_cache));
377 
378  if (partition_desc == NULL) {
379  stringstream ss;
380  ss << "Could not find partition with id: " << split.partition_id;
381  return Status(ss.str());
382  }
384  per_type_files_[partition_desc->file_format()].push_back(file_desc);
385  } else {
386  // File already processed
387  file_desc = file_desc_it->second;
388  }
389 
390  bool expected_local = (*scan_range_params_)[i].__isset.is_remote &&
391  !(*scan_range_params_)[i].is_remote;
392  if (expected_local && (*scan_range_params_)[i].volume_id == -1) {
394  AddRuntimeExecOption("Missing Volume Id");
395  runtime_state()->LogError(ErrorMsg(TErrorCode::HDFS_SCAN_NODE_UNKNOWN_DISK));
397  }
398  ++num_ranges_missing_volume_id;
399  }
400 
401  bool try_cache = (*scan_range_params_)[i].is_cached;
402  if (runtime_state_->query_options().disable_cached_reads) {
403  DCHECK(!try_cache) << "Params should not have had this set.";
404  }
405  file_desc->splits.push_back(
406  AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length,
407  split.offset, split.partition_id, (*scan_range_params_)[i].volume_id,
408  try_cache, expected_local, file_desc->mtime));
409  }
410 
411  // Compute the minimum bytes required to start a new thread. This is based on the
412  // file format.
413  // The higher the estimate, the less likely it is the query will fail but more likely
414  // the query will be throttled when it does not need to be.
415  // TODO: how many buffers should we estimate per range. The IoMgr will throttle down to
416  // one but if there are already buffers queued before memory pressure was hit, we can't
417  // reclaim that memory.
418  if (per_type_files_[THdfsFileFormat::PARQUET].size() > 0) {
419  // Parquet files require buffers per column
421  materialized_slots_.size() * 3 * runtime_state_->io_mgr()->max_read_buffer_size();
422  } else {
424  3 * runtime_state_->io_mgr()->max_read_buffer_size();
425  }
426  // scanner_thread_bytes_required_ now contains the IoBuffer requirement.
427  // Next we add in the other memory the scanner thread will use.
428  // e.g. decompression buffers, tuple buffers, etc.
429  // For compressed text, we estimate this based on the file size (since the whole file
430  // will need to be decompressed at once). For all other formats, we use a constant.
431  // TODO: can we do something better?
432  int64_t scanner_thread_mem_usage = SCANNER_THREAD_MEM_USAGE;
433  BOOST_FOREACH(HdfsFileDesc* file, per_type_files_[THdfsFileFormat::TEXT]) {
434  if (file->file_compression != THdfsCompression::NONE) {
435  int64_t bytes_required = file->file_length * COMPRESSED_TEXT_COMPRESSION_RATIO;
436  scanner_thread_mem_usage = ::max(bytes_required, scanner_thread_mem_usage);
437  }
438  }
439  scanner_thread_bytes_required_ += scanner_thread_mem_usage;
440 
441  // Prepare all the partitions scanned by the scan node
442  BOOST_FOREACH(const int64_t& partition_id, partition_ids_) {
443  HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
444  DCHECK(partition_desc != NULL);
445  RETURN_IF_ERROR(partition_desc->PrepareExprs(state));
446  }
447 
448  // Update server wide metrics for number of scan ranges and ranges that have
449  // incomplete metadata.
451  ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id);
452 
453  // Add per volume stats to the runtime profile
454  PerVolumnStats per_volume_stats;
455  stringstream str;
456  UpdateHdfsSplitStats(*scan_range_params_, &per_volume_stats);
457  PrintHdfsSplitStats(per_volume_stats, &str);
459 
460  // Initialize conjunct exprs
462  runtime_state_->obj_pool(), thrift_plan_node_->conjuncts, &conjunct_ctxs_));
464  Expr::Prepare(conjunct_ctxs_, runtime_state_, row_desc(), expr_mem_tracker()));
466 
467  for (int format = THdfsFileFormat::TEXT;
468  format <= THdfsFileFormat::PARQUET; ++format) {
469  vector<HdfsFileDesc*>& file_descs =
470  per_type_files_[static_cast<THdfsFileFormat::type>(format)];
471 
472  if (file_descs.empty()) continue;
473 
474  // Randomize the order this node processes the files. We want to do this to avoid
475  // issuing remote reads to the same DN from different impalads. In file formats such
476  // as avro/seq/rc (i.e. splittable with a header), every node first reads the header.
477  // If every node goes through the files in the same order, all the remote reads are
478  // for the same file meaning a few DN serves a lot of remote reads at the same time.
479  random_shuffle(file_descs.begin(), file_descs.end());
480 
481  // Create reusable codegen'd functions for each file type type needed
482  Function* fn;
483  switch (format) {
484  case THdfsFileFormat::TEXT:
486  break;
487  case THdfsFileFormat::SEQUENCE_FILE:
489  break;
490  case THdfsFileFormat::AVRO:
492  break;
493  default:
494  // No codegen for this format
495  fn = NULL;
496  }
497  if (fn != NULL) {
498  LlvmCodeGen* codegen;
499  RETURN_IF_ERROR(runtime_state_->GetCodegen(&codegen));
500  codegen->AddFunctionToJit(
501  fn, &codegend_fn_map_[static_cast<THdfsFileFormat::type>(format)]);
502  }
503  }
504 
505  return Status::OK;
506 }
507 
508 // This function initiates the connection to hdfs and starts up the initial scanner
509 // threads. The scanner subclasses are passed the initial splits. Scanners are expected
510 // to queue up a non-zero number of those splits to the io mgr (via the ScanNode).
513 
514  // We need at least one scanner thread to make progress. We need to make this
515  // reservation before any ranges are issued.
517  if (runtime_state_->query_options().num_scanner_threads > 0) {
519  runtime_state_->query_options().num_scanner_threads);
520  }
521 
523  bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1));
524 
525  if (runtime_state_->query_resource_mgr() != NULL) {
527  bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this,
529  }
530 
531  if (file_descs_.empty()) {
532  SetDone();
533  return Status::OK;
534  }
535 
536  // Open all the partition exprs used by the scan node
537  BOOST_FOREACH(const int64_t& partition_id, partition_ids_) {
538  HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
539  DCHECK(partition_desc != NULL);
540  RETURN_IF_ERROR(partition_desc->OpenExprs(state));
541  }
542 
543  // Open all conjuncts
544  Expr::Open(conjunct_ctxs_, state);
545 
548 
549  // Initialize HdfsScanNode specific counters
552  PER_READ_THREAD_THROUGHPUT_COUNTER, TUnit::BYTES_PER_SECOND,
553  bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_read_counter_, read_timer_));
556  if (DiskInfo::num_disks() < 64) {
559  } else {
561  }
564 
571 
576 
577  bytes_read_local_ = ADD_COUNTER(runtime_profile(), "BytesReadLocal",
578  TUnit::BYTES);
579  bytes_read_short_circuit_ = ADD_COUNTER(runtime_profile(), "BytesReadShortCircuit",
580  TUnit::BYTES);
581  bytes_read_dn_cache_ = ADD_COUNTER(runtime_profile(), "BytesReadDataNodeCache",
582  TUnit::BYTES);
583  num_remote_ranges_ = ADD_COUNTER(runtime_profile(), "RemoteScanRanges",
584  TUnit::UNIT);
585  unexpected_remote_bytes_ = ADD_COUNTER(runtime_profile(), "BytesReadRemoteUnexpected",
586  TUnit::BYTES);
587 
589  "MaxCompressedTextFileLength", TUnit::BYTES);
590 
591  for (int i = 0; i < state->io_mgr()->num_total_disks() + 1; ++i) {
593  pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
594  }
597 
598  counters_running_ = true;
599 
600  int total_splits = 0;
601  for (FileDescMap::iterator it = file_descs_.begin(); it != file_descs_.end(); ++it) {
602  total_splits += it->second->splits.size();
603  }
604 
605  if (total_splits == 0) {
606  SetDone();
607  return Status::OK;
608  }
609 
610  stringstream ss;
611  ss << "Splits complete (node=" << id() << "):";
612  progress_ = ProgressUpdater(ss.str(), total_splits);
613 
614  return Status::OK;
615 }
616 
618  DCHECK(false) << "NYI";
619  return Status("NYI");
620 }
621 
623  if (is_closed()) return;
624  SetDone();
625 
626  state->resource_pool()->SetThreadAvailableCb(NULL);
627  if (state->query_resource_mgr() != NULL && rm_callback_id_ != -1) {
629  }
630 
632 
634  DCHECK_EQ(num_owned_io_buffers_, 0) << "ScanNode has leaked io buffers";
635 
636  if (reader_context_ != NULL) {
637  // There may still be io buffers used by parent nodes so we can't unregister the
638  // reader context yet. The runtime state keeps a list of all the reader contexts and
639  // they are unregistered when the fragment is closed.
640  state->reader_contexts()->push_back(reader_context_);
641  // Need to wait for all the active scanner threads to finish to ensure there is no
642  // more memory tracked by this scan node's mem tracker.
643  state->io_mgr()->CancelContext(reader_context_, true);
644  }
645 
647 
648  // There should be no active scanner threads and hdfs read threads.
649  DCHECK_EQ(active_scanner_thread_counter_.value(), 0);
650  DCHECK_EQ(active_hdfs_read_thread_counter_.value(), 0);
651 
652  if (scan_node_pool_.get() != NULL) scan_node_pool_->FreeAll();
653 
654  // Close all conjuncts
655  Expr::Close(conjunct_ctxs_, state);
656 
657  // Close all the partitions scanned by the scan node
658  BOOST_FOREACH(const int64_t& partition_id, partition_ids_) {
659  HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
660  DCHECK(partition_desc != NULL);
661  partition_desc->CloseExprs(state);
662  }
663 
664  ScanNode::Close(state);
665 }
666 
667 Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges) {
671  return Status::OK;
672 }
673 
675  const vector<DiskIoMgr::ScanRange*>& ranges = desc->splits;
678  MarkFileDescIssued(desc);
680  return Status::OK;
681 }
682 
684  DCHECK_GT(num_unqueued_files_, 0);
686 }
687 
689  materialized_row_batches_->AddBatch(row_batch);
690 }
691 
692 Status HdfsScanNode::GetConjunctCtxs(vector<ExprContext*>* ctxs) {
694 }
695 
696 // For controlling the amount of memory used for scanners, we approximate the
697 // scanner mem usage based on scanner_thread_bytes_required_, rather than the
698 // consumption in the scan node's mem tracker. The problem with the scan node
699 // trackers is that it does not account for the memory the scanner will use.
700 // For example, if there is 110 MB of memory left (based on the mem tracker)
701 // and we estimate that a scanner will use 100MB, we want to make sure to only
702 // start up one additional thread. However, after starting the first thread, the
703 // mem tracker value will not change immediately (it takes some time before the
704 // scanner is running and starts using memory). Therefore we just use the estimate
705 // based on the number of running scanner threads.
707  int64_t committed_scanner_mem =
709  int64_t tracker_consumption = mem_tracker()->consumption();
710  int64_t est_additional_scanner_mem = committed_scanner_mem - tracker_consumption;
711  if (est_additional_scanner_mem < 0) {
712  // This is the case where our estimate was too low. Expand the estimate based
713  // on the usage.
714  int64_t avg_consumption =
715  tracker_consumption / active_scanner_thread_counter_.value();
716  // Take the average and expand it by 50%. Some scanners will not have hit their
717  // steady state mem usage yet.
718  // TODO: how can we scale down if we've overestimated.
719  // TODO: better heuristic?
720  scanner_thread_bytes_required_ = static_cast<int64_t>(avg_consumption * 1.5);
721  est_additional_scanner_mem = 0;
722  }
723 
724  // If we are starting a new thread, take that into account now.
725  if (new_thread) est_additional_scanner_mem += scanner_thread_bytes_required_;
726  return est_additional_scanner_mem < mem_tracker()->SpareCapacity();
727 }
728 
730  // This is called to start up new scanner threads. It's not a big deal if we
731  // spin up more than strictly necessary since they will go through and terminate
732  // promptly. However, we want to minimize that by checking a conditions.
733  // 1. Don't start up if the ScanNode is done
734  // 2. Don't start up if all the ranges have been taken by another thread.
735  // 3. Don't start up if the number of ranges left is less than the number of
736  // active scanner threads.
737  // 4. Don't start up if no initial ranges have been issued (see IMPALA-1722).
738  // 5. Don't start up a ScannerThread if materialized_row_batches_ is full since
739  // we are not scanner bound.
740  // 6. Don't start up a thread if there isn't enough memory left to run it.
741  // 7. Don't start up if there are no thread tokens.
742  // 8. Don't start up if we are running too many threads for our vcore allocation
743  // (unless the thread is reserved, in which case it has to run).
744 
745  // Case 4. We have not issued the initial ranges so don't start a scanner thread.
746  // Issuing ranges will call this function and we'll start the scanner threads then.
747  // TODO: It would be good to have a test case for that.
748  if (!initial_ranges_issued_) return;
749 
750  bool started_scanner = false;
751  while (true) {
752  // The lock must be given up between loops in order to give writers to done_,
753  // all_ranges_started_ etc. a chance to grab the lock.
754  // TODO: This still leans heavily on starvation-free locks, come up with a more
755  // correct way to communicate between this method and ScannerThreadHelper
756  unique_lock<mutex> lock(lock_);
757  // Cases 1, 2, 3.
758  if (done_ || all_ranges_started_ ||
760  break;
761  }
762 
763  // Cases 5 and 6.
767  break;
768  }
769 
770  // Case 7.
771  bool is_reserved = false;
772  if (!pool->TryAcquireThreadToken(&is_reserved)) break;
773 
774  // Case 8.
775  if (!is_reserved) {
776  if (runtime_state_->query_resource_mgr() != NULL &&
778  break;
779  }
780  }
781 
784  stringstream ss;
785  ss << "scanner-thread(" << num_scanner_threads_started_counter_->value() << ")";
787  new Thread("hdfs-scan-node", ss.str(), &HdfsScanNode::ScannerThread, this));
788  started_scanner = true;
789 
790  if (runtime_state_->query_resource_mgr() != NULL) {
792  }
793  }
794  if (!started_scanner) ++num_skipped_tokens_;
795 }
796 
800 
801  while (!done_) {
802  {
803  // Check if we have enough resources (thread token and memory) to keep using
804  // this thread.
805  unique_lock<mutex> l(lock_);
809  // We can't break here. We need to update the counter with the lock held or else
810  // all threads might see active_scanner_thread_counter_.value > 1
812  // Unlock before releasing the thread token to avoid deadlock in
813  // ThreadTokenAvailableCb().
814  l.unlock();
815  if (runtime_state_->query_resource_mgr() != NULL) {
817  }
819  return;
820  }
821  } else {
822  // If this is the only scanner thread, it should keep running regardless
823  // of resource constraints.
824  }
825  }
826 
827  DiskIoMgr::ScanRange* scan_range;
828  // Take a snapshot of num_unqueued_files_ before calling GetNextRange().
829  // We don't want num_unqueued_files_ to go to zero between the return from
830  // GetNextRange() and the check for when all ranges are complete.
831  int num_unqueued_files = num_unqueued_files_;
833  Status status = runtime_state_->io_mgr()->GetNextRange(reader_context_, &scan_range);
834 
835  if (status.ok() && scan_range != NULL) {
836  // Got a scan range. Create a new scanner object and process the range
837  // end to end (in this thread).
838  ScanRangeMetadata* metadata =
839  reinterpret_cast<ScanRangeMetadata*>(scan_range->meta_data());
840  int64_t partition_id = metadata->partition_id;
841  HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
842  DCHECK_NOTNULL(partition);
843 
844  ScannerContext* context = runtime_state_->obj_pool()->Add(
845  new ScannerContext(runtime_state_, this, partition, scan_range));
846  Status scanner_status;
847  HdfsScanner* scanner = CreateAndPrepareScanner(partition, context, &scanner_status);
848  if (VLOG_QUERY_IS_ON && (!scanner_status.ok() || scanner == NULL)) {
849  stringstream ss;
850  ss << "Error preparing text scanner for scan range " << scan_range->file() <<
851  "(" << scan_range->offset() << ":" << scan_range->len() << ").";
852  ss << endl << runtime_state_->ErrorLog();
853  VLOG_QUERY << ss.str();
854  }
855 
856  status = scanner->ProcessSplit();
857  if (VLOG_QUERY_IS_ON && !status.ok() && !runtime_state_->error_log().empty()) {
858  // This thread hit an error, record it and bail
859  // TODO: better way to report errors? Maybe via the thrift interface?
860  stringstream ss;
861  ss << "Scan node (id=" << id() << ") ran into a parse error for scan range "
862  << scan_range->file() << "(" << scan_range->offset() << ":"
863  << scan_range->len() << ").";
864  if (partition->file_format() != THdfsFileFormat::PARQUET) {
865  // Parquet doesn't read the range end to end so the current offset isn't useful.
866  // TODO: make sure the parquet reader is outputting as much diagnostic
867  // information as possible.
868  ScannerContext::Stream* stream = context->GetStream();
869  ss << " Processed " << stream->total_bytes_returned() << " bytes.";
870  }
871  ss << endl << runtime_state_->ErrorLog();
872  VLOG_QUERY << ss.str();
873  }
874  scanner->Close();
875  }
876 
877  if (!status.ok()) {
878  {
879  unique_lock<mutex> l(lock_);
880  // If there was already an error, the main thread will do the cleanup
881  if (!status_.ok()) break;
882 
883  if (status.IsCancelled()) {
884  // Scan node should be the only thing that initiated scanner threads to see
885  // cancelled (i.e. limit reached). No need to do anything here.
886  DCHECK(done_);
887  break;
888  }
889  // Set status_ before calling SetDone() (which shuts down the RowBatchQueue),
890  // to ensure that GetNextInternal() notices the error status.
891  status_ = status;
892  }
893 
895  SetDone();
896  break;
897  }
898 
899  // Done with range and it completed successfully
900  if (progress_.done()) {
901  // All ranges are finished. Indicate we are done.
902  SetDone();
903  break;
904  }
905 
906  if (scan_range == NULL && num_unqueued_files == 0) {
907  // TODO: Based on the usage pattern of all_ranges_started_, it looks like it is not
908  // needed to acquire the lock in x86.
909  unique_lock<mutex> l(lock_);
910  // All ranges have been queued and GetNextRange() returned NULL. This means that
911  // every range is either done or being processed by another thread.
912  all_ranges_started_ = true;
913  break;
914  }
915  }
916 
918  if (runtime_state_->query_resource_mgr() != NULL) {
920  }
922 }
923 
924 void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type,
925  const THdfsCompression::type& compression_type) {
926  vector<THdfsCompression::type> types;
927  types.push_back(compression_type);
928  RangeComplete(file_type, types);
929 }
930 
931 void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type,
932  const vector<THdfsCompression::type>& compression_types) {
934  progress_.Update(1);
935 
936  {
937  lock_guard<SpinLock> l(file_type_counts_lock_);
938  for (int i = 0; i < compression_types.size(); ++i) {
939  ++file_type_counts_[make_pair(file_type, compression_types[i])];
940  }
941  }
942 }
943 
945  {
946  unique_lock<mutex> l(lock_);
947  if (done_) return;
948  done_ = true;
949  }
950  if (reader_context_ != NULL) {
952  }
953  materialized_row_batches_->Shutdown();
954 }
955 
956 void HdfsScanNode::ComputeSlotMaterializationOrder(vector<int>* order) const {
957  const vector<ExprContext*>& conjuncts = ExecNode::conjunct_ctxs();
958  // Initialize all order to be conjuncts.size() (after the last conjunct)
959  order->insert(order->begin(), materialized_slots().size(), conjuncts.size());
960 
961  const DescriptorTbl& desc_tbl = runtime_state_->desc_tbl();
962 
963  vector<SlotId> slot_ids;
964  for (int conjunct_idx = 0; conjunct_idx < conjuncts.size(); ++conjunct_idx) {
965  slot_ids.clear();
966  int num_slots = conjuncts[conjunct_idx]->root()->GetSlotIds(&slot_ids);
967  for (int j = 0; j < num_slots; ++j) {
968  SlotDescriptor* slot_desc = desc_tbl.GetSlotDescriptor(slot_ids[j]);
969  int slot_idx = GetMaterializedSlotIdx(slot_desc->col_path());
970  // slot_idx == -1 means this was a partition key slot which is always
971  // materialized before any slots.
972  if (slot_idx == -1) continue;
973  // If this slot hasn't been assigned an order, assign it be materialized
974  // before evaluating conjuncts[i]
975  if ((*order)[slot_idx] == conjuncts.size()) {
976  (*order)[slot_idx] = conjunct_idx;
977  }
978  }
979  }
980 }
981 
983  unique_lock<mutex> l(lock_);
984  if (!counters_running_) return;
985  counters_running_ = false;
986 
992  true);
993 
994  // Output hdfs read thread concurrency into info string
995  stringstream ss;
996  for (int i = 0; i < hdfs_read_thread_concurrency_bucket_.size(); ++i) {
997  ss << i << ":" << setprecision(4)
998  << hdfs_read_thread_concurrency_bucket_[i]->double_value() << "% ";
999  }
1000  runtime_profile_->AddInfoString("Hdfs Read Thread Concurrency Bucket", ss.str());
1001 
1002  // Convert disk access bitmap to num of disk accessed
1003  uint64_t num_disk_bitmap = disks_accessed_bitmap_.value();
1004  int64_t num_disk_accessed = BitUtil::Popcount(num_disk_bitmap);
1005  if (num_disks_accessed_counter_ != NULL) {
1006  num_disks_accessed_counter_->Set(num_disk_accessed);
1007  }
1008 
1009  // output completed file types and counts to info string
1010  if (!file_type_counts_.empty()) {
1011  stringstream ss;
1012  {
1013  lock_guard<SpinLock> l2(file_type_counts_lock_);
1014  for (FileTypeCountsMap::const_iterator it = file_type_counts_.begin();
1015  it != file_type_counts_.end(); ++it) {
1016  ss << it->first.first << "/" << it->first.second << ":" << it->second << " ";
1017  }
1018  }
1019  runtime_profile_->AddInfoString("File Formats", ss.str());
1020  }
1021 
1022  // Output fraction of scanners with codegen enabled
1023  ss.str(std::string());
1024  ss << "Codegen enabled: " << num_scanners_codegen_enabled_ << " out of "
1026  AddRuntimeExecOption(ss.str());
1027 
1028  if (reader_context_ != NULL) {
1034  num_remote_ranges_->Set(static_cast<int64_t>(
1038 
1040  runtime_state_->LogError(ErrorMsg(TErrorCode::GENERAL, Substitute(
1041  "Read $0 of data across network that was expected to be local. "
1042  "Block locality metadata for table '$1.$2' may be stale. Consider running "
1043  "\"INVALIDATE METADATA `$1`.`$2`\".",
1046  }
1047 
1055  }
1056 }
1057 
1059  const vector<TScanRangeParams>& scan_range_params_list,
1060  PerVolumnStats* per_volume_stats) {
1061  pair<int, int64_t> init_value(0, 0);
1062  BOOST_FOREACH(const TScanRangeParams& scan_range_params, scan_range_params_list) {
1063  const TScanRange& scan_range = scan_range_params.scan_range;
1064  if (!scan_range.__isset.hdfs_file_split) continue;
1065  const THdfsFileSplit& split = scan_range.hdfs_file_split;
1066  pair<int, int64_t>* stats =
1067  FindOrInsert(per_volume_stats, scan_range_params.volume_id, init_value);
1068  ++(stats->first);
1069  stats->second += split.length;
1070  }
1071 }
1072 
1074  stringstream* ss) {
1075  for (PerVolumnStats::const_iterator i = per_volume_stats.begin();
1076  i != per_volume_stats.end(); ++i) {
1077  (*ss) << i->first << ":" << i->second.first << "/"
1078  << PrettyPrinter::Print(i->second.second, TUnit::BYTES) << " ";
1079  }
1080 }
static Status IssueInitialRanges(HdfsScanNode *scan_node, const std::vector< HdfsFileDesc * > &files)
Issue the initial ranges for all sequence container files.
const std::vector< SlotDescriptor * > & materialized_slots() const
RuntimeProfile::ThreadCounters * scanner_thread_counters() const
Definition: scan-node.h:110
virtual int64_t value() const
static const std::string NUM_SCANNER_THREADS_STARTED
Definition: scan-node.h:134
ThreadGroup scanner_threads_
Thread group for all scanner worker threads.
DerivedCounter * AddDerivedCounter(const std::string &name, TUnit::type unit, const DerivedCounterFunction &counter_fn, const std::string &parent_counter_name="")
const TableDescriptor * table_desc() const
Definition: descriptors.h:304
int id() const
Definition: exec-node.h:154
int max_materialized_row_batches_
Maximum size of materialized_row_batches_.
void AddRuntimeExecOption(const std::string &option)
Appends option to 'runtime_exec_options_'.
Definition: exec-node.cc:188
DECLARE_bool(enable_rm)
void CancelContext(RequestContext *context, bool wait_for_disks_completion=false)
Definition: disk-io-mgr.cc:377
RuntimeProfile::Counter * per_read_thread_throughput_counter_
Per thread read throughput [bytes/sec].
Definition: scan-node.h:149
HdfsScanner * CreateAndPrepareScanner(HdfsPartitionDescriptor *partition_desc, ScannerContext *context, Status *status)
int num_remote_ranges(RequestContext *reader) const
Definition: disk-io-mgr.cc:432
AtomicInt< int > num_owned_io_buffers_
AtomicInt< int > num_unqueued_files_
Number of files that have not been issued from the scanners.
int64_t consumption() const
Returns the memory consumed in bytes.
Definition: mem-tracker.h:298
Status SetCgroup(const std::string &cgroup)
Definition: thread.cc:333
int num_rows() const
Definition: row-batch.h:215
virtual Status Prepare(RuntimeState *state)
Definition: scan-node.cc:44
int32_t AddVcoreAvailableCb(const VcoreAvailableCb &callback)
THdfsFileFormat::type file_format() const
Definition: descriptors.h:184
Counter * AddSamplingCounter(const std::string &name, Counter *src_counter)
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
Status OpenExprs(RuntimeState *state)
Definition: descriptors.cc:145
int64_t total_bytes_returned()
Returns the total number of bytes returned.
void AddInfoString(const std::string &key, const std::string &value)
RuntimeProfile::Counter * scan_ranges_complete_counter_
Definition: scan-node.h:152
static bool ColPathLessThan(const SlotDescriptor *a, const SlotDescriptor *b)
Definition: descriptors.cc:66
std::string filename
File name including the path.
int64_t num_rows_returned_
Definition: exec-node.h:223
void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool *pool)
const std::string & database() const
Definition: descriptors.h:164
Status PrepareExprs(RuntimeState *state)
Definition: descriptors.cc:134
boost::unordered_map< std::string, hdfsFS > HdfsFsMap
Definition: hdfs-fs-cache.h:41
MemTracker * mem_tracker()
Definition: exec-node.h:162
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
int AssignQueue(const char *file, int disk_id, bool expected_local)
bool TryAcquireThreadToken(bool *is_reserved=NULL)
void set_active_read_thread_counter(RequestContext *, RuntimeProfile::Counter *)
Definition: disk-io-mgr.cc:397
std::string ErrorLog()
Returns the error log lines as a string joined with ' '.
static const std::string AVERAGE_SCANNER_THREAD_CONCURRENCY
Definition: scan-node.h:132
int GetMaterializedSlotIdx(const std::vector< int > &path) const
static Status Clone(const std::vector< ExprContext * > &ctxs, RuntimeState *state, std::vector< ExprContext * > *new_ctxs)
Definition: expr.cc:374
AtomicInt< int > num_scanners_codegen_disabled_
RuntimeProfile::Counter * bytes_read_dn_cache_
Total number of bytes read from data node cache.
A scanner for reading RCFiles into tuples.
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
RuntimeProfile::Counter * scan_ranges_complete_counter() const
Definition: scan-node.h:107
int num_io_buffers() const
Definition: row-batch.h:149
int64_t bytes_read_dn_cache(RequestContext *reader) const
Definition: disk-io-mgr.cc:428
void SetThreadAvailableCb(ThreadAvailableCb fn)
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
static const std::string HDFS_SPLIT_STATS_DESC
Description string for the per volume stats output.
RuntimeProfile::Counter * num_scanner_threads_started_counter_
Definition: scan-node.h:170
void RegisterBucketingCounters(Counter *src_counter, std::vector< Counter * > *buckets)
ProgressUpdater progress_
Keeps track of total splits and the number finished.
virtual Status Open(RuntimeState *state)
boost::scoped_ptr< RowBatchQueue > materialized_row_batches_
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
RuntimeProfile::Counter * read_timer_
Definition: scan-node.h:145
static int num_disks()
Returns the number of (logical) disks on the system.
Definition: disk-info.h:38
boost::unordered_set< int64_t > partition_ids_
Partitions scanned by this scan node.
static const std::string NUM_DISKS_ACCESSED_COUNTER
Definition: scan-node.h:127
int64_t SpareCapacity() const
Definition: mem-tracker.h:270
#define ADD_TIMER(profile, name)
void AcquireState(RowBatch *src)
Definition: row-batch.cc:271
std::vector< SlotDescriptor * > partition_key_slots_
const std::vector< int > & col_path() const
Definition: descriptors.h:85
RuntimeProfile::Counter * num_remote_ranges_
Total number of remote scan ranges.
boost::scoped_ptr< TPlanNode > thrift_plan_node_
const std::vector< SlotDescriptor * > & slots() const
Definition: descriptors.h:302
const char * file() const
Definition: disk-io-mgr.h:266
void MarkFileDescIssued(const HdfsFileDesc *file_desc)
int byte_size() const
Definition: descriptors.h:300
friend class ScannerContext
static IntGauge * IO_MGR_BYTES_READ
void SetFileMetadata(const std::string &filename, void *metadata)
void * GetFileMetadata(const std::string &filename)
Status AddScanRanges(RequestContext *reader, const std::vector< ScanRange * > &ranges, bool schedule_immediately=false)
Definition: disk-io-mgr.cc:455
static int64_t UnitsPerSecond(const Counter *total_counter, const Counter *timer)
Derived counter function: return measured throughput as input_value/second.
FileFormatsMap per_type_files_
RuntimeProfile::Counter * bytes_read_counter_
Definition: scan-node.h:140
const std::string & cgroup() const
Tuple * InitEmptyTemplateTuple()
virtual Status Reset(RuntimeState *state)
const RowDescriptor & row_desc() const
Definition: exec-node.h:156
static Tuple * Create(int size, MemPool *pool)
initialize individual tuple with data residing in mem pool
Definition: tuple.h:51
Status GetNextRange(RequestContext *reader, ScanRange **range)
Definition: disk-io-mgr.cc:501
boost::scoped_ptr< MemPool > scan_node_pool_
#define COUNTER_ADD(c, v)
int64_t partition_id
The partition id that this range is part of.
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
Definition: exec-node.cc:378
bool ReachedLimit()
Definition: exec-node.h:159
void RangeComplete(const THdfsFileFormat::type &file_type, const THdfsCompression::type &compression_type)
virtual void Set(int64_t value)
TupleDescriptor * GetTupleDescriptor(TupleId id) const
Definition: descriptors.cc:437
#define SCOPED_TIMER(c)
const std::vector< ExprContext * > & conjunct_ctxs() const
Definition: exec-node.h:152
RuntimeProfile::Counter * unexpected_remote_bytes_
Total number of bytes read remotely that were expected to be local.
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
static HdfsScanner * GetHdfsLzoTextScanner(HdfsScanNode *scan_node, RuntimeState *state)
std::map< std::string, void * > per_file_metadata_
int64_t limit_
Definition: exec-node.h:222
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
HdfsScanNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
const ErrorLogMap & error_log() const
std::vector< char > is_materialized_col_
static Status IssueInitialRanges(HdfsScanNode *scan_node, const std::vector< HdfsFileDesc * > &files)
Issue io manager byte ranges for 'files'.
static const std::string PER_READ_THREAD_THROUGHPUT_COUNTER
Definition: scan-node.h:126
virtual void Close()
Definition: hdfs-scanner.cc:82
static HdfsFsCache * instance()
Definition: hdfs-fs-cache.h:43
V * FindOrInsert(std::map< K, V > *m, const K &key, const V &default_val)
LLVM code generator. This is the top level object to generate jitted code.
Definition: llvm-codegen.h:107
CgroupsMgr * cgroups_mgr()
Definition: exec-env.h:88
MemTracker * expr_mem_tracker()
Definition: exec-node.h:163
static const std::string SCAN_RANGES_COMPLETE_COUNTER
Definition: scan-node.h:129
virtual Status ProcessSplit()=0
#define VLOG_QUERY
Definition: logging.h:57
QueryResourceMgr * query_resource_mgr() const
static IntCounter * NUM_RANGES_MISSING_VOLUME_ID
RuntimeProfile::Counter * average_hdfs_read_thread_concurrency_
Definition: scan-node.h:168
bool optional_exceeded()
Returns true if the number of optional threads has now exceeded the quota.
std::vector< RuntimeProfile::Counter * > hdfs_read_thread_concurrency_bucket_
Definition: scan-node.h:174
SpinLock file_type_counts_lock_
THdfsCompression::type file_compression
const HdfsFileDesc * file_desc()
int64_t bytes_read_short_circuit(RequestContext *reader) const
Definition: disk-io-mgr.cc:424
int64_t mtime
Last modified time.
HdfsFileDesc * GetFileDesc(const std::string &filename)
Returns the file desc for 'filename'. Returns NULL if filename is invalid.
RuntimeProfile::Counter * num_disks_accessed_counter_
Definition: scan-node.h:150
static void StopBucketingCounters(std::vector< RuntimeProfile::Counter * > *buckets, bool convert)
bool LogError(const ErrorMsg &msg)
const TQueryOptions & query_options() const
Definition: runtime-state.h:95
FileTypeCountsMap file_type_counts_
void CloseExprs(RuntimeState *state)
Definition: descriptors.cc:151
static IntCounter * NUM_RANGES_PROCESSED
Status AddDiskIoRanges(const std::vector< DiskIoMgr::ScanRange * > &ranges)
Adds ranges to the io mgr queue and starts up new scanner threads if possible.
static const std::string AVERAGE_HDFS_READ_THREAD_CONCURRENCY
Definition: scan-node.h:133
const int COMPRESSED_TEXT_COMPRESSION_RATIO
int num_total_disks() const
Returns the total number of disk queues (both local and remote).
Definition: disk-io-mgr.h:593
const int tuple_id_
Tuple id resolved in Prepare() to set tuple_desc_;.
void ComputeSlotMaterializationOrder(std::vector< int > *order) const
const std::string & location() const
Definition: descriptors.h:189
bool IsCancelled() const
Definition: status.h:174
RuntimeProfile::Counter * bytes_read_local_
Total number of bytes read locally.
void set_num_rows(int num_rows)
Definition: row-batch.h:113
CodegendFnMap codegend_fn_map_
ObjectPool * obj_pool() const
Definition: runtime-state.h:92
#define RETURN_IF_CANCELLED(state)
RuntimeProfile::Counter active_scanner_thread_counter_
The number of active scanner threads that are not blocked by IO.
Definition: scan-node.h:157
ObjectPool pool
static IntGauge * IO_MGR_SHORT_CIRCUIT_BYTES_READ
DECLARE_string(cgroup_hierarchy_path)
static llvm::Function * Codegen(HdfsScanNode *, const std::vector< ExprContext * > &conjunct_ctxs)
Codegen writing tuples and evaluating predicates.
RuntimeProfile::Counter disks_accessed_bitmap_
Disk accessed bitmap.
bool done() const
Returns if all tasks are done.
static void StopRateCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'.
#define ADD_COUNTER(profile, name, unit)
static const int SKIP_COLUMN
void set_disks_access_bitmap(RequestContext *, RuntimeProfile::Counter *)
Definition: disk-io-mgr.cc:402
int col_pos() const
Definition: descriptors.h:84
void AddFunctionToJit(llvm::Function *fn, void **fn_ptr)
static void Write(const void *value, Tuple *tuple, const SlotDescriptor *slot_desc, MemPool *pool)
Definition: raw-value.cc:303
void AddExprCtxsToFree(const std::vector< ExprContext * > &ctxs)
void set_bytes_read_counter(RequestContext *, RuntimeProfile::Counter *)
Definition: disk-io-mgr.cc:393
RuntimeState * runtime_state()
void AddMaterializedRowBatch(RowBatch *row_batch)
Status GetNextInternal(RuntimeState *state, RowBatch *row_batch, bool *eos)
Checks for eos conditions and returns batches from materialized_row_batches_.
virtual Status QueryMaintenance(RuntimeState *state)
Definition: exec-node.cc:401
RuntimeProfile::Counter * bytes_read_counter() const
Definition: scan-node.h:95
void Reset(hdfsFS fs, const char *file, int64_t len, int64_t offset, int disk_id, bool try_cache, bool expected_local, int64_t mtime, void *metadata=NULL)
std::vector< SlotDescriptor * > materialized_slots_
const DescriptorTbl & desc_tbl() const
Definition: runtime-state.h:93
virtual Status Prepare(RuntimeState *state)
ExecNode methods.
bool is_closed()
Definition: exec-node.h:242
const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD
Abstract base class of all scan nodes; introduces SetScanRange().
Definition: scan-node.h:77
void * GetCodegenFn(THdfsFileFormat::type)
std::vector< ExprContext * > conjunct_ctxs_
hdfsFS fs
Connection to the filesystem containing the file.
static void PrintHdfsSplitStats(const PerVolumnStats &per_volume_stats, std::stringstream *ss)
void RemoveVcoreAvailableCb(int32_t callback_id)
Removes the callback with the given ID.
#define COUNTER_SET(c, v)
RuntimeState * runtime_state_
int64_t scanner_thread_bytes_required_
bool EnoughMemoryForScannerThread(bool new_thread)
static IntGauge * IO_MGR_CACHED_BYTES_READ
RuntimeProfile::Counter * rows_returned_counter_
Definition: exec-node.h:226
int64_t bytes_read_local(RequestContext *reader) const
Definition: disk-io-mgr.cc:420
RuntimeProfile::HighWaterMarkCounter * max_compressed_text_file_length_
const std::string & name() const
Definition: descriptors.h:163
static void StopSamplingCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'.
static void MemoryBarrier()
Definition: atomic.h:36
Status SetMemLimitExceeded(MemTracker *tracker=NULL, int64_t failed_allocation_size=0)
Status RegisterContext(RequestContext **request_context, MemTracker *reader_mem_tracker=NULL)
Definition: disk-io-mgr.cc:336
static void UpdateHdfsSplitStats(const std::vector< TScanRangeParams > &scan_range_params_list, PerVolumnStats *per_volume_stats)
Update the per volume stats with the given scan range params list.
const TupleDescriptor * tuple_desc_
Descriptor for tuples this scan node constructs.
AtomicInt< int > num_scanners_codegen_enabled_
void TransferToScanNodePool(MemPool *pool)
Acquires all allocations from pool into scan_node_pool_. Thread-safe.
boost::mutex metadata_lock_
DEFINE_int32(max_row_batches, 0,"the maximum size of materialized_row_batches_")
HighWaterMarkCounter * AddHighWaterMarkCounter(const std::string &name, TUnit::type unit, const std::string &parent_counter_name="")
SlotDescriptor * GetSlotDescriptor(SlotId id) const
Definition: descriptors.cc:447
const int SCANNER_THREAD_MEM_USAGE
RuntimeProfile::Counter * total_throughput_counter() const
Definition: scan-node.h:98
static llvm::Function * Codegen(HdfsScanNode *, const std::vector< ExprContext * > &conjunct_ctxs)
Codegen writing tuples and evaluating predicates.
std::vector< DiskIoMgr::ScanRange * > splits
Splits (i.e. raw byte ranges) for this file, assigned to this scan node.
int64_t unexpected_remote_bytes(RequestContext *reader) const
Definition: disk-io-mgr.cc:436
DiskIoMgr::ScanRange * AllocateScanRange(hdfsFS fs, const char *file, int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool try_cache, bool expected_local, int64_t mtime)
static const Status OK
Definition: status.h:87
void NotifyThreadUsageChange(int delta)
std::vector< DiskIoMgr::RequestContext * > * reader_contexts()
void SetCgroupsMgr(CgroupsMgr *cgroups_mgr)
Definition: thread.h:186
ObjectPool * pool_
Definition: exec-node.h:211
Status AddThread(Thread *thread)
Definition: thread.cc:318
static const std::string TOTAL_HDFS_READ_TIMER
Definition: scan-node.h:123
void set_read_timer(RequestContext *, RuntimeProfile::Counter *)
Definition: disk-io-mgr.cc:389
uint8_t offset[7 *64-sizeof(uint64_t)]
Metadata for a single partition inside an Hdfs table.
Definition: descriptors.h:177
static llvm::Function * Codegen(HdfsScanNode *, const std::vector< ExprContext * > &conjunct_ctxs)
Codegen parsing records, writing tuples and evaluating predicates.
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
Definition: expr.cc:149
RuntimeProfile::Counter * bytes_read_short_circuit_
Total number of bytes read via short circuit read.
#define SCOPED_THREAD_COUNTER_MEASUREMENT(c)
HdfsPartitionDescriptor * GetPartition(int64_t partition_id) const
Definition: descriptors.h:238
DiskIoMgr::RequestContext * reader_context_
RequestContext object to use with the disk-io-mgr for reads.
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
virtual void Add(int64_t delta)
Stream * GetStream(int idx=0)
RuntimeProfile::Counter * average_scanner_thread_concurrency_
Definition: scan-node.h:161
RuntimeProfile::TimeSeriesCounter * bytes_read_timeseries_counter_
Time series of the bytes_read_counter_.
Definition: scan-node.h:142
virtual Status Open(RuntimeState *state)
Definition: exec-node.cc:154
bool ok() const
Definition: status.h:172
const std::vector< TScanRangeParams > * scan_range_params_
The scan ranges this scan node is responsible for. Not owned.
Definition: scan-node.h:138
static IntGauge * IO_MGR_LOCAL_BYTES_READ
ThreadResourceMgr::ResourcePool * resource_pool()
DiskIoMgr * io_mgr()
static Status IssueInitialRanges(HdfsScanNode *scan_node, const std::vector< HdfsFileDesc * > &files)
virtual void Close(RuntimeState *state)
Definition: exec-node.cc:166
PathToSlotIdxMap path_to_materialized_slot_idx_
const HdfsTableDescriptor * hdfs_table_
RuntimeProfile::Counter * total_cpu_timer()
RuntimeProfile::Counter active_hdfs_read_thread_counter_
The number of active hdfs reading threads reading for this node.
Definition: scan-node.h:164
int64_t remaining() const
boost::unordered_map< int32_t, std::pair< int, int64_t > > PerVolumnStats
map from volume id to <number of split, per volume split lengths>
RuntimeProfile::Counter * read_timer() const
Definition: scan-node.h:97
AtomicInt< int > num_skipped_tokens_
virtual void Close(RuntimeState *state)
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
Tuple * InitTemplateTuple(RuntimeState *state, const std::vector< ExprContext * > &value_ctxs)
#define VLOG_QUERY_IS_ON
Definition: logging.h:64
bool IsMemLimitExceeded() const
Definition: status.h:178
static int Popcount(uint64_t x)
Returns the number of set bits in x.
Definition: bit-util.h:116
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
Definition: hdfs-scanner.cc:71
static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter *counter)
Stops 'counter' from receiving any more samples.
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161
void Update(int64_t delta)
Status GetConjunctCtxs(std::vector< ExprContext * > *ctxs)