16 #ifndef IMPALA_EXEC_HDFS_SCAN_NODE_H_
17 #define IMPALA_EXEC_HDFS_SCAN_NODE_H_
23 #include <boost/unordered_map.hpp>
24 #include <boost/unordered_set.hpp>
25 #include <boost/scoped_ptr.hpp>
26 #include <boost/thread/condition_variable.hpp>
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/thread.hpp>
39 #include "gen-cpp/PlanNodes_types.h"
70 std::vector<DiskIoMgr::ScanRange*>
splits;
83 : partition_id(partition_id) { }
155 return result->second;
189 hdfsFS fs,
const char* file, int64_t len, int64_t
offset, int64_t partition_id,
190 int disk_id,
bool try_cache,
bool expected_local, int64_t mtime);
210 const std::vector<ExprContext*>& value_ctxs);
237 const THdfsCompression::type& compression_type);
241 const std::vector<THdfsCompression::type>& compression_type);
256 const std::vector<TScanRangeParams>& scan_range_params_list,
262 std::stringstream* ss);
301 typedef std::map<THdfsFileFormat::type, std::vector<HdfsFileDesc*> >
FileFormatsMap;
319 typedef std::map<THdfsFileFormat::type, HdfsScanner*>
ScannerMap;
const std::vector< SlotDescriptor * > & materialized_slots() const
ThreadGroup scanner_threads_
Thread group for all scanner worker threads.
int max_materialized_row_batches_
Maximum size of materialized_row_batches_.
HdfsScanner * CreateAndPrepareScanner(HdfsPartitionDescriptor *partition_desc, ScannerContext *context, Status *status)
AtomicInt< int > num_owned_io_buffers_
AtomicInt< int > num_unqueued_files_
Number of files that have not been issued from the scanners.
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
int num_partition_keys() const
Returns number of partition keys in the table, including non-materialized slots.
std::string filename
File name including the path.
RuntimeProfile::HighWaterMarkCounter * max_compressed_text_file_length()
void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool *pool)
int GetMaterializedSlotIdx(const std::vector< int > &path) const
AtomicInt< int > num_scanners_codegen_disabled_
RuntimeProfile::Counter * bytes_read_dn_cache_
Total number of bytes read from data node cache.
const HdfsTableDescriptor * hdfs_table()
A tuple with 0 materialised slots is represented as NULL.
static const std::string HDFS_SPLIT_STATS_DESC
Description string for the per volume stats output.
ProgressUpdater progress_
Keeps track of total splits and the number finished.
virtual Status Open(RuntimeState *state)
boost::scoped_ptr< RowBatchQueue > materialized_row_batches_
bool unknown_disk_id_warned_
boost::unordered_set< int64_t > partition_ids_
Partitions scanned by this scan node.
boost::unordered_map< std::vector< int >, int > PathToSlotIdxMap
Maps from a slot's path to its index into materialized_slots_.
std::vector< SlotDescriptor * > partition_key_slots_
RuntimeProfile::Counter * num_remote_ranges_
Total number of remote scan ranges.
std::map< THdfsFileFormat::type, void * > CodegendFnMap
Per scanner type codegen'd fn.
boost::scoped_ptr< TPlanNode > thrift_plan_node_
void MarkFileDescIssued(const HdfsFileDesc *file_desc)
void SetFileMetadata(const std::string &filename, void *metadata)
void * GetFileMetadata(const std::string &filename)
FileFormatsMap per_type_files_
Tuple * InitEmptyTemplateTuple()
virtual Status Reset(RuntimeState *state)
boost::scoped_ptr< MemPool > scan_node_pool_
HdfsFileDesc(const std::string &filename)
void RangeComplete(const THdfsFileFormat::type &file_type, const THdfsCompression::type &compression_type)
RuntimeProfile::Counter * unexpected_remote_bytes_
Total number of bytes read remotely that were expected to be local.
std::map< std::string, void * > per_file_metadata_
HdfsScanNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
const bool * is_materialized_col()
std::vector< char > is_materialized_col_
std::map< THdfsFileFormat::type, std::vector< HdfsFileDesc * > > FileFormatsMap
File format => file descriptors.
std::map< std::string, HdfsFileDesc * > FileDescMap
File path => file descriptor (which includes the file's splits)
void IncNumScannersCodegenEnabled()
void StopAndFinalizeCounters()
SpinLock file_type_counts_lock_
THdfsCompression::type file_compression
int64_t mtime
Last modified time.
HdfsFileDesc * GetFileDesc(const std::string &filename)
Returns the file desc for 'filename'. Returns NULL if filename is invalid.
FileTypeCountsMap file_type_counts_
Status AddDiskIoRanges(const std::vector< DiskIoMgr::ScanRange * > &ranges)
Adds ranges to the io mgr queue and starts up new scanner threads if possible.
const int tuple_id_
Tuple id resolved in Prepare() to set tuple_desc_;.
void ComputeSlotMaterializationOrder(std::vector< int > *order) const
RuntimeProfile::Counter * bytes_read_local_
Total number of bytes read locally.
CodegendFnMap codegend_fn_map_
int num_clustering_cols() const
RuntimeProfile::Counter disks_accessed_bitmap_
Disk accessed bitmap.
static const int SKIP_COLUMN
RuntimeState * runtime_state()
void AddMaterializedRowBatch(RowBatch *row_batch)
Status GetNextInternal(RuntimeState *state, RowBatch *row_batch, bool *eos)
Checks for eos conditions and returns batches from materialized_row_batches_.
DiskIoMgr::RequestContext * reader_context()
std::vector< SlotDescriptor * > materialized_slots_
virtual Status Prepare(RuntimeState *state)
ExecNode methods.
Abstract base class of all scan nodes; introduces SetScanRange().
void * GetCodegenFn(THdfsFileFormat::type)
std::vector< ExprContext * > conjunct_ctxs_
hdfsFS fs
Connection to the filesystem containing the file.
static void PrintHdfsSplitStats(const PerVolumnStats &per_volume_stats, std::stringstream *ss)
RuntimeState * runtime_state_
int64_t scanner_thread_bytes_required_
bool EnoughMemoryForScannerThread(bool new_thread)
std::map< THdfsFileFormat::type, HdfsScanner * > ScannerMap
RuntimeProfile::HighWaterMarkCounter * max_compressed_text_file_length_
static void UpdateHdfsSplitStats(const std::vector< TScanRangeParams > &scan_range_params_list, PerVolumnStats *per_volume_stats)
Update the per volume stats with the given scan range params list.
const TupleDescriptor * tuple_desc_
Descriptor for tuples this scan node constructs.
AtomicInt< int > num_scanners_codegen_enabled_
bool initial_ranges_issued_
void TransferToScanNodePool(MemPool *pool)
Acquires all allocations from pool into scan_node_pool_. Thread-safe.
boost::mutex metadata_lock_
std::vector< DiskIoMgr::ScanRange * > splits
Splits (i.e. raw byte ranges) for this file, assigned to this scan node.
DiskIoMgr::ScanRange * AllocateScanRange(hdfsFS fs, const char *file, int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool try_cache, bool expected_local, int64_t mtime)
uint8_t offset[7 *64-sizeof(uint64_t)]
std::map< std::pair< THdfsFileFormat::type, THdfsCompression::type >, int > FileTypeCountsMap
Metadata for a single partition inside an Hdfs table.
RuntimeProfile::Counter * bytes_read_short_circuit_
Total number of bytes read via short circuit read.
DiskIoMgr::RequestContext * reader_context_
RequestContext object to use with the disk-io-mgr for reads.
PathToSlotIdxMap path_to_materialized_slot_idx_
const HdfsTableDescriptor * hdfs_table_
boost::unordered_map< int32_t, std::pair< int, int64_t > > PerVolumnStats
map from volume id to <number of split, per volume split lengths>
AtomicInt< int > num_skipped_tokens_
virtual void Close(RuntimeState *state)
int num_materialized_partition_keys() const
Returns number of materialized partition key slots.
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
Tuple * InitTemplateTuple(RuntimeState *state, const std::vector< ExprContext * > &value_ctxs)
void IncNumScannersCodegenDisabled()
const TupleDescriptor * tuple_desc()
Status GetConjunctCtxs(std::vector< ExprContext * > *ctxs)