25 #include <boost/algorithm/string.hpp>
26 #include <boost/foreach.hpp>
27 #include <boost/filesystem.hpp>
28 #include <gutil/strings/substitute.h>
52 #include "gen-cpp/PlanNodes_types.h"
56 DEFINE_int32(max_row_batches, 0,
"the maximum size of materialized_row_batches_");
60 namespace filesystem = boost::filesystem;
61 using namespace impala;
63 using namespace strings;
66 "Hdfs split stats (<volume id>:<# splits>/<split lengths>)";
84 thrift_plan_node_(new TPlanNode(tnode)),
86 tuple_id_(tnode.hdfs_scan_node.tuple_id),
87 reader_context_(NULL),
89 unknown_disk_id_warned_(false),
90 initial_ranges_issued_(false),
91 scanner_thread_bytes_required_(0),
92 disks_accessed_bitmap_(TUnit::UNIT, 0),
94 all_ranges_started_(false),
95 counters_running_(false),
153 if (materialized_batch != NULL) {
173 delete materialized_batch;
179 unique_lock<mutex> l(
lock_);
184 hdfsFS fs,
const char* file, int64_t len, int64_t
offset, int64_t partition_id,
185 int disk_id,
bool try_cache,
bool expected_local, int64_t mtime) {
186 DCHECK_GE(disk_id, -1);
191 DCHECK_GE(offset, 0);
193 DCHECK_LE(offset + len,
GetFileDesc(file)->file_length)
194 <<
"Scan range beyond end of file (offset=" << offset <<
", len=" << len <<
")";
201 range->
Reset(fs, file, len, offset, disk_id, try_cache, expected_local,
232 DCHECK(context != NULL);
234 THdfsCompression::type compression =
239 case THdfsFileFormat::TEXT:
242 if (compression == THdfsCompression::LZO) {
248 case THdfsFileFormat::SEQUENCE_FILE:
251 case THdfsFileFormat::RC_FILE:
254 case THdfsFileFormat::AVRO:
257 case THdfsFileFormat::PARQUET:
261 DCHECK(
false) <<
"Unknown Hdfs file format type:" << partition->
file_format();
264 DCHECK(scanner != NULL);
266 *status = scanner->
Prepare(context);
271 const vector<ExprContext*>& value_ctxs) {
279 unique_lock<mutex> l(
lock_);
283 void* value = value_ctxs[slot_desc->
col_pos()]->GetValue(NULL);
286 return template_tuple;
290 Tuple* template_tuple = NULL;
292 unique_lock<mutex> l(
lock_);
296 return template_tuple;
300 unique_lock<mutex> l(
lock_);
312 if (!state->
cgroup().empty()) {
324 for (
size_t i = 0; i < slots.size(); ++i) {
325 if (!slots[i]->is_materialized())
continue;
326 if (hdfs_table_->IsClusteringCol(slots[i])) {
346 for (
int i = 0; i < hdfs_table_->num_cols(); ++i) {
354 <<
"Must call SetScanRanges() before calling Prepare()";
355 int num_ranges_missing_volume_id = 0;
358 const THdfsFileSplit& split = (*scan_range_params_)[i].scan_range.hdfs_file_split;
361 hdfs_table_->GetPartition(split.partition_id);
363 file_path.append(split.file_name, filesystem::path::codecvt());
364 const string& native_file_path = file_path.native();
367 FileDescMap::iterator file_desc_it =
file_descs_.find(native_file_path);
370 file_desc = runtime_state_->obj_pool()->Add(
new HdfsFileDesc(native_file_path));
373 file_desc->
mtime = split.mtime;
376 native_file_path, &file_desc->
fs, &fs_cache));
378 if (partition_desc == NULL) {
380 ss <<
"Could not find partition with id: " << split.partition_id;
387 file_desc = file_desc_it->second;
390 bool expected_local = (*scan_range_params_)[i].__isset.is_remote &&
391 !(*scan_range_params_)[i].is_remote;
398 ++num_ranges_missing_volume_id;
401 bool try_cache = (*scan_range_params_)[i].is_cached;
402 if (runtime_state_->query_options().disable_cached_reads) {
403 DCHECK(!try_cache) <<
"Params should not have had this set.";
405 file_desc->
splits.push_back(
407 split.offset, split.partition_id, (*scan_range_params_)[i].volume_id,
408 try_cache, expected_local, file_desc->
mtime));
424 3 * runtime_state_->io_mgr()->max_read_buffer_size();
436 scanner_thread_mem_usage = ::max(bytes_required, scanner_thread_mem_usage);
444 DCHECK(partition_desc != NULL);
467 for (
int format = THdfsFileFormat::TEXT;
468 format <= THdfsFileFormat::PARQUET; ++format) {
469 vector<HdfsFileDesc*>& file_descs =
472 if (file_descs.empty())
continue;
479 random_shuffle(file_descs.begin(), file_descs.end());
484 case THdfsFileFormat::TEXT:
487 case THdfsFileFormat::SEQUENCE_FILE:
490 case THdfsFileFormat::AVRO:
539 DCHECK(partition_desc != NULL);
589 "MaxCompressedTextFileLength", TUnit::BYTES);
600 int total_splits = 0;
602 total_splits += it->second->splits.size();
605 if (total_splits == 0) {
611 ss <<
"Splits complete (node=" <<
id() <<
"):";
618 DCHECK(
false) <<
"NYI";
660 DCHECK(partition_desc != NULL);
675 const vector<DiskIoMgr::ScanRange*>& ranges = desc->
splits;
707 int64_t committed_scanner_mem =
710 int64_t est_additional_scanner_mem = committed_scanner_mem - tracker_consumption;
711 if (est_additional_scanner_mem < 0) {
714 int64_t avg_consumption =
721 est_additional_scanner_mem = 0;
750 bool started_scanner =
false;
756 unique_lock<mutex> lock(
lock_);
771 bool is_reserved =
false;
788 started_scanner =
true;
805 unique_lock<mutex> l(
lock_);
835 if (status.
ok() && scan_range != NULL) {
842 DCHECK_NOTNULL(partition);
850 ss <<
"Error preparing text scanner for scan range " << scan_range->
file() <<
851 "(" << scan_range->
offset() <<
":" << scan_range->
len() <<
").";
861 ss <<
"Scan node (id=" <<
id() <<
") ran into a parse error for scan range "
862 << scan_range->
file() <<
"(" << scan_range->
offset() <<
":"
863 << scan_range->
len() <<
").";
864 if (partition->
file_format() != THdfsFileFormat::PARQUET) {
879 unique_lock<mutex> l(
lock_);
906 if (scan_range == NULL && num_unqueued_files == 0) {
909 unique_lock<mutex> l(
lock_);
925 const THdfsCompression::type& compression_type) {
926 vector<THdfsCompression::type> types;
927 types.push_back(compression_type);
932 const vector<THdfsCompression::type>& compression_types) {
938 for (
int i = 0; i < compression_types.size(); ++i) {
946 unique_lock<mutex> l(
lock_);
963 vector<SlotId> slot_ids;
964 for (
int conjunct_idx = 0; conjunct_idx < conjuncts.size(); ++conjunct_idx) {
966 int num_slots = conjuncts[conjunct_idx]->root()->GetSlotIds(&slot_ids);
967 for (
int j = 0; j < num_slots; ++j) {
972 if (slot_idx == -1)
continue;
975 if ((*order)[slot_idx] == conjuncts.size()) {
976 (*order)[slot_idx] = conjunct_idx;
983 unique_lock<mutex> l(
lock_);
997 ss << i <<
":" << setprecision(4)
1000 runtime_profile_->AddInfoString(
"Hdfs Read Thread Concurrency Bucket", ss.str());
1016 ss << it->first.first <<
"/" << it->first.second <<
":" << it->second <<
" ";
1023 ss.str(std::string());
1041 "Read $0 of data across network that was expected to be local. "
1042 "Block locality metadata for table '$1.$2' may be stale. Consider running "
1043 "\"INVALIDATE METADATA `$1`.`$2`\".",
1059 const vector<TScanRangeParams>& scan_range_params_list,
1061 pair<int, int64_t> init_value(0, 0);
1062 BOOST_FOREACH(
const TScanRangeParams& scan_range_params, scan_range_params_list) {
1063 const TScanRange& scan_range = scan_range_params.scan_range;
1064 if (!scan_range.__isset.hdfs_file_split)
continue;
1065 const THdfsFileSplit& split = scan_range.hdfs_file_split;
1066 pair<int, int64_t>* stats =
1067 FindOrInsert(per_volume_stats, scan_range_params.volume_id, init_value);
1069 stats->second += split.length;
1075 for (PerVolumnStats::const_iterator i = per_volume_stats.begin();
1076 i != per_volume_stats.end(); ++i) {
1077 (*ss) << i->first <<
":" << i->second.first <<
"/"
static Status IssueInitialRanges(HdfsScanNode *scan_node, const std::vector< HdfsFileDesc * > &files)
Issue the initial ranges for all sequence container files.
const std::vector< SlotDescriptor * > & materialized_slots() const
RuntimeProfile::ThreadCounters * scanner_thread_counters() const
virtual int64_t value() const
static const std::string NUM_SCANNER_THREADS_STARTED
ThreadGroup scanner_threads_
Thread group for all scanner worker threads.
DerivedCounter * AddDerivedCounter(const std::string &name, TUnit::type unit, const DerivedCounterFunction &counter_fn, const std::string &parent_counter_name="")
const TableDescriptor * table_desc() const
int max_materialized_row_batches_
Maximum size of materialized_row_batches_.
void AddRuntimeExecOption(const std::string &option)
Appends option to 'runtime_exec_options_'.
void CancelContext(RequestContext *context, bool wait_for_disks_completion=false)
RuntimeProfile::Counter * per_read_thread_throughput_counter_
Per thread read throughput [bytes/sec].
HdfsScanner * CreateAndPrepareScanner(HdfsPartitionDescriptor *partition_desc, ScannerContext *context, Status *status)
int num_remote_ranges(RequestContext *reader) const
AtomicInt< int > num_owned_io_buffers_
AtomicInt< int > num_unqueued_files_
Number of files that have not been issued from the scanners.
int64_t consumption() const
Returns the memory consumed in bytes.
Status SetCgroup(const std::string &cgroup)
virtual Status Prepare(RuntimeState *state)
int32_t AddVcoreAvailableCb(const VcoreAvailableCb &callback)
THdfsFileFormat::type file_format() const
Counter * AddSamplingCounter(const std::string &name, Counter *src_counter)
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
Status OpenExprs(RuntimeState *state)
int64_t total_bytes_returned()
Returns the total number of bytes returned.
void AddInfoString(const std::string &key, const std::string &value)
RuntimeProfile::Counter * scan_ranges_complete_counter_
static bool ColPathLessThan(const SlotDescriptor *a, const SlotDescriptor *b)
std::string filename
File name including the path.
int64_t num_rows_returned_
void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool *pool)
const std::string & database() const
Status PrepareExprs(RuntimeState *state)
boost::unordered_map< std::string, hdfsFS > HdfsFsMap
MemTracker * mem_tracker()
TODO: Consider allowing fragment IDs as category parameters.
int AssignQueue(const char *file, int disk_id, bool expected_local)
bool TryAcquireThreadToken(bool *is_reserved=NULL)
void set_active_read_thread_counter(RequestContext *, RuntimeProfile::Counter *)
std::string ErrorLog()
Returns the error log lines as a string joined with ' '.
static const std::string AVERAGE_SCANNER_THREAD_CONCURRENCY
int GetMaterializedSlotIdx(const std::vector< int > &path) const
static Status Clone(const std::vector< ExprContext * > &ctxs, RuntimeState *state, std::vector< ExprContext * > *new_ctxs)
AtomicInt< int > num_scanners_codegen_disabled_
RuntimeProfile::Counter * bytes_read_dn_cache_
Total number of bytes read from data node cache.
A scanner for reading RCFiles into tuples.
boost::scoped_ptr< RuntimeProfile > runtime_profile_
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
RuntimeProfile::Counter * scan_ranges_complete_counter() const
int num_io_buffers() const
int64_t bytes_read_dn_cache(RequestContext *reader) const
void SetThreadAvailableCb(ThreadAvailableCb fn)
A tuple with 0 materialised slots is represented as NULL.
static const std::string HDFS_SPLIT_STATS_DESC
Description string for the per volume stats output.
RuntimeProfile::Counter * num_scanner_threads_started_counter_
void RegisterBucketingCounters(Counter *src_counter, std::vector< Counter * > *buckets)
ProgressUpdater progress_
Keeps track of total splits and the number finished.
virtual Status Open(RuntimeState *state)
boost::scoped_ptr< RowBatchQueue > materialized_row_batches_
#define RETURN_IF_ERROR(stmt)
some generally useful macros
RuntimeProfile::Counter * read_timer_
static int num_disks()
Returns the number of (logical) disks on the system.
bool unknown_disk_id_warned_
boost::unordered_set< int64_t > partition_ids_
Partitions scanned by this scan node.
static const std::string NUM_DISKS_ACCESSED_COUNTER
int64_t SpareCapacity() const
#define ADD_TIMER(profile, name)
void AcquireState(RowBatch *src)
std::vector< SlotDescriptor * > partition_key_slots_
const std::vector< int > & col_path() const
RuntimeProfile::Counter * num_remote_ranges_
Total number of remote scan ranges.
boost::scoped_ptr< TPlanNode > thrift_plan_node_
const std::vector< SlotDescriptor * > & slots() const
const char * file() const
void MarkFileDescIssued(const HdfsFileDesc *file_desc)
friend class ScannerContext
static IntGauge * IO_MGR_BYTES_READ
void SetFileMetadata(const std::string &filename, void *metadata)
void * GetFileMetadata(const std::string &filename)
Status AddScanRanges(RequestContext *reader, const std::vector< ScanRange * > &ranges, bool schedule_immediately=false)
static int64_t UnitsPerSecond(const Counter *total_counter, const Counter *timer)
Derived counter function: return measured throughput as input_value/second.
FileFormatsMap per_type_files_
RuntimeProfile::Counter * bytes_read_counter_
void ReserveOptionalTokens(int num)
const std::string & cgroup() const
Tuple * InitEmptyTemplateTuple()
virtual Status Reset(RuntimeState *state)
const RowDescriptor & row_desc() const
static Tuple * Create(int size, MemPool *pool)
initialize individual tuple with data residing in mem pool
Status GetNextRange(RequestContext *reader, ScanRange **range)
boost::scoped_ptr< MemPool > scan_node_pool_
void ReleaseThreadToken(bool required)
#define COUNTER_ADD(c, v)
Status ExecDebugAction(TExecNodePhase::type phase, RuntimeState *state)
void RangeComplete(const THdfsFileFormat::type &file_type, const THdfsCompression::type &compression_type)
virtual void Set(int64_t value)
TupleDescriptor * GetTupleDescriptor(TupleId id) const
const std::vector< ExprContext * > & conjunct_ctxs() const
RuntimeProfile::Counter * unexpected_remote_bytes_
Total number of bytes read remotely that were expected to be local.
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
static HdfsScanner * GetHdfsLzoTextScanner(HdfsScanNode *scan_node, RuntimeState *state)
std::map< std::string, void * > per_file_metadata_
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
HdfsScanNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
const ErrorLogMap & error_log() const
std::vector< char > is_materialized_col_
static Status IssueInitialRanges(HdfsScanNode *scan_node, const std::vector< HdfsFileDesc * > &files)
Issue io manager byte ranges for 'files'.
static const std::string PER_READ_THREAD_THROUGHPUT_COUNTER
static HdfsFsCache * instance()
V * FindOrInsert(std::map< K, V > *m, const K &key, const V &default_val)
void StopAndFinalizeCounters()
LLVM code generator. This is the top level object to generate jitted code.
CgroupsMgr * cgroups_mgr()
MemTracker * expr_mem_tracker()
static const std::string SCAN_RANGES_COMPLETE_COUNTER
virtual Status ProcessSplit()=0
QueryResourceMgr * query_resource_mgr() const
static IntCounter * NUM_RANGES_MISSING_VOLUME_ID
RuntimeProfile::Counter * average_hdfs_read_thread_concurrency_
bool optional_exceeded()
Returns true if the number of optional threads has now exceeded the quota.
std::vector< RuntimeProfile::Counter * > hdfs_read_thread_concurrency_bucket_
SpinLock file_type_counts_lock_
void set_max_quota(int quota)
THdfsCompression::type file_compression
const HdfsFileDesc * file_desc()
int64_t bytes_read_short_circuit(RequestContext *reader) const
int64_t mtime
Last modified time.
HdfsFileDesc * GetFileDesc(const std::string &filename)
Returns the file desc for 'filename'. Returns NULL if filename is invalid.
RuntimeProfile::Counter * num_disks_accessed_counter_
static void StopBucketingCounters(std::vector< RuntimeProfile::Counter * > *buckets, bool convert)
bool LogError(const ErrorMsg &msg)
const TQueryOptions & query_options() const
FileTypeCountsMap file_type_counts_
void CloseExprs(RuntimeState *state)
static IntCounter * NUM_RANGES_PROCESSED
Status AddDiskIoRanges(const std::vector< DiskIoMgr::ScanRange * > &ranges)
Adds ranges to the io mgr queue and starts up new scanner threads if possible.
static const std::string AVERAGE_HDFS_READ_THREAD_CONCURRENCY
const int COMPRESSED_TEXT_COMPRESSION_RATIO
int num_total_disks() const
Returns the total number of disk queues (both local and remote).
const int tuple_id_
Tuple id resolved in Prepare() to set tuple_desc_;.
void ComputeSlotMaterializationOrder(std::vector< int > *order) const
const std::string & location() const
RuntimeProfile::Counter * bytes_read_local_
Total number of bytes read locally.
void set_num_rows(int num_rows)
CodegendFnMap codegend_fn_map_
ObjectPool * obj_pool() const
#define RETURN_IF_CANCELLED(state)
RuntimeProfile::Counter active_scanner_thread_counter_
The number of active scanner threads that are not blocked by IO.
static IntGauge * IO_MGR_SHORT_CIRCUIT_BYTES_READ
DECLARE_string(cgroup_hierarchy_path)
static llvm::Function * Codegen(HdfsScanNode *, const std::vector< ExprContext * > &conjunct_ctxs)
Codegen writing tuples and evaluating predicates.
RuntimeProfile::Counter disks_accessed_bitmap_
Disk accessed bitmap.
bool done() const
Returns if all tasks are done.
static void StopRateCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'.
#define ADD_COUNTER(profile, name, unit)
static const int SKIP_COLUMN
void set_disks_access_bitmap(RequestContext *, RuntimeProfile::Counter *)
void AddFunctionToJit(llvm::Function *fn, void **fn_ptr)
static void Write(const void *value, Tuple *tuple, const SlotDescriptor *slot_desc, MemPool *pool)
void AddExprCtxsToFree(const std::vector< ExprContext * > &ctxs)
void set_bytes_read_counter(RequestContext *, RuntimeProfile::Counter *)
RuntimeState * runtime_state()
void AddMaterializedRowBatch(RowBatch *row_batch)
Status GetNextInternal(RuntimeState *state, RowBatch *row_batch, bool *eos)
Checks for eos conditions and returns batches from materialized_row_batches_.
virtual Status QueryMaintenance(RuntimeState *state)
RuntimeProfile::Counter * bytes_read_counter() const
void Reset(hdfsFS fs, const char *file, int64_t len, int64_t offset, int disk_id, bool try_cache, bool expected_local, int64_t mtime, void *metadata=NULL)
std::vector< SlotDescriptor * > materialized_slots_
const DescriptorTbl & desc_tbl() const
virtual Status Prepare(RuntimeState *state)
ExecNode methods.
const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD
Abstract base class of all scan nodes; introduces SetScanRange().
void * GetCodegenFn(THdfsFileFormat::type)
std::vector< ExprContext * > conjunct_ctxs_
hdfsFS fs
Connection to the filesystem containing the file.
static void PrintHdfsSplitStats(const PerVolumnStats &per_volume_stats, std::stringstream *ss)
void RemoveVcoreAvailableCb(int32_t callback_id)
Removes the callback with the given ID.
#define COUNTER_SET(c, v)
RuntimeState * runtime_state_
int64_t scanner_thread_bytes_required_
bool EnoughMemoryForScannerThread(bool new_thread)
static IntGauge * IO_MGR_CACHED_BYTES_READ
RuntimeProfile::Counter * rows_returned_counter_
int64_t bytes_read_local(RequestContext *reader) const
RuntimeProfile::HighWaterMarkCounter * max_compressed_text_file_length_
const std::string & name() const
static void StopSamplingCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'.
static void MemoryBarrier()
Status SetMemLimitExceeded(MemTracker *tracker=NULL, int64_t failed_allocation_size=0)
Status RegisterContext(RequestContext **request_context, MemTracker *reader_mem_tracker=NULL)
static void UpdateHdfsSplitStats(const std::vector< TScanRangeParams > &scan_range_params_list, PerVolumnStats *per_volume_stats)
Update the per volume stats with the given scan range params list.
const TupleDescriptor * tuple_desc_
Descriptor for tuples this scan node constructs.
AtomicInt< int > num_scanners_codegen_enabled_
bool initial_ranges_issued_
void TransferToScanNodePool(MemPool *pool)
Acquires all allocations from pool into scan_node_pool_. Thread-safe.
boost::mutex metadata_lock_
DEFINE_int32(max_row_batches, 0,"the maximum size of materialized_row_batches_")
HighWaterMarkCounter * AddHighWaterMarkCounter(const std::string &name, TUnit::type unit, const std::string &parent_counter_name="")
SlotDescriptor * GetSlotDescriptor(SlotId id) const
const int SCANNER_THREAD_MEM_USAGE
RuntimeProfile::Counter * total_throughput_counter() const
static llvm::Function * Codegen(HdfsScanNode *, const std::vector< ExprContext * > &conjunct_ctxs)
Codegen writing tuples and evaluating predicates.
std::vector< DiskIoMgr::ScanRange * > splits
Splits (i.e. raw byte ranges) for this file, assigned to this scan node.
int64_t unexpected_remote_bytes(RequestContext *reader) const
DiskIoMgr::ScanRange * AllocateScanRange(hdfsFS fs, const char *file, int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool try_cache, bool expected_local, int64_t mtime)
void NotifyThreadUsageChange(int delta)
std::vector< DiskIoMgr::RequestContext * > * reader_contexts()
void SetCgroupsMgr(CgroupsMgr *cgroups_mgr)
Status AddThread(Thread *thread)
bool IsVcoreOverSubscribed()
static const std::string TOTAL_HDFS_READ_TIMER
void set_read_timer(RequestContext *, RuntimeProfile::Counter *)
uint8_t offset[7 *64-sizeof(uint64_t)]
Metadata for a single partition inside an Hdfs table.
static llvm::Function * Codegen(HdfsScanNode *, const std::vector< ExprContext * > &conjunct_ctxs)
Codegen parsing records, writing tuples and evaluating predicates.
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
RuntimeProfile::Counter * bytes_read_short_circuit_
Total number of bytes read via short circuit read.
#define SCOPED_THREAD_COUNTER_MEASUREMENT(c)
HdfsPartitionDescriptor * GetPartition(int64_t partition_id) const
DiskIoMgr::RequestContext * reader_context_
RequestContext object to use with the disk-io-mgr for reads.
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
virtual void Add(int64_t delta)
Stream * GetStream(int idx=0)
RuntimeProfile::Counter * average_scanner_thread_concurrency_
RuntimeProfile::TimeSeriesCounter * bytes_read_timeseries_counter_
Time series of the bytes_read_counter_.
virtual Status Open(RuntimeState *state)
const std::vector< TScanRangeParams > * scan_range_params_
The scan ranges this scan node is responsible for. Not owned.
static IntGauge * IO_MGR_LOCAL_BYTES_READ
ThreadResourceMgr::ResourcePool * resource_pool()
static Status IssueInitialRanges(HdfsScanNode *scan_node, const std::vector< HdfsFileDesc * > &files)
virtual void Close(RuntimeState *state)
PathToSlotIdxMap path_to_materialized_slot_idx_
const HdfsTableDescriptor * hdfs_table_
RuntimeProfile::Counter * total_cpu_timer()
RuntimeProfile::Counter active_hdfs_read_thread_counter_
The number of active hdfs reading threads reading for this node.
int64_t remaining() const
boost::unordered_map< int32_t, std::pair< int, int64_t > > PerVolumnStats
map from volume id to <number of split, per volume split lengths>
RuntimeProfile::Counter * read_timer() const
AtomicInt< int > num_skipped_tokens_
virtual void Close(RuntimeState *state)
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
Tuple * InitTemplateTuple(RuntimeState *state, const std::vector< ExprContext * > &value_ctxs)
bool IsMemLimitExceeded() const
static int Popcount(uint64_t x)
Returns the number of set bits in x.
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter *counter)
Stops 'counter' from receiving any more samples.
RuntimeProfile * runtime_profile()
void Update(int64_t delta)
Status GetConjunctCtxs(std::vector< ExprContext * > *ctxs)