Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-scan-node.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_EXEC_HDFS_SCAN_NODE_H_
17 #define IMPALA_EXEC_HDFS_SCAN_NODE_H_
18 
19 #include <vector>
20 #include <memory>
21 #include <stdint.h>
22 
23 #include <boost/unordered_map.hpp>
24 #include <boost/unordered_set.hpp>
25 #include <boost/scoped_ptr.hpp>
26 #include <boost/thread/condition_variable.hpp>
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/thread.hpp>
29 
30 #include "exec/scan-node.h"
31 #include "exec/scanner-context.h"
32 #include "runtime/descriptors.h"
33 #include "runtime/disk-io-mgr.h"
34 #include "runtime/string-buffer.h"
35 #include "util/progress-updater.h"
36 #include "util/spinlock.h"
37 #include "util/thread.h"
38 
39 #include "gen-cpp/PlanNodes_types.h"
40 
41 namespace impala {
42 
43 class DescriptorTbl;
44 class HdfsScanner;
45 class RowBatch;
46 class Status;
47 class Tuple;
48 class TPlanNode;
49 class TScanRange;
50 
53 struct HdfsFileDesc {
55  hdfsFS fs;
56 
58  std::string filename;
59 
62  int64_t file_length;
63 
65  int64_t mtime;
66 
67  THdfsCompression::type file_compression;
68 
70  std::vector<DiskIoMgr::ScanRange*> splits;
71  HdfsFileDesc(const std::string& filename)
72  : filename(filename), file_length(0), mtime(0), file_compression(THdfsCompression::NONE) {
73  }
74 };
75 
80  int64_t partition_id;
81 
83  : partition_id(partition_id) { }
84 };
85 
104 class HdfsScanNode : public ScanNode {
105  public:
106  HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
107 
108  ~HdfsScanNode();
109 
111  virtual Status Prepare(RuntimeState* state);
112  virtual Status Open(RuntimeState* state);
113  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
114  virtual Status Reset(RuntimeState* state);
115  virtual void Close(RuntimeState* state);
116 
117  int limit() const { return limit_; }
118 
119  const std::vector<SlotDescriptor*>& materialized_slots()
120  const { return materialized_slots_; }
121 
124  int tuple_idx() const { return 0; }
125 
128 
131 
133 
135 
137 
139 
142  }
143 
144  const static int SKIP_COLUMN = -1;
145 
148  Status GetConjunctCtxs(std::vector<ExprContext*>* ctxs);
149 
152  int GetMaterializedSlotIdx(const std::vector<int>& path) const {
153  PathToSlotIdxMap::const_iterator result = path_to_materialized_slot_idx_.find(path);
154  if (result == path_to_materialized_slot_idx_.end()) return SKIP_COLUMN;
155  return result->second;
156  }
157 
160  const bool* is_materialized_col() {
161  return reinterpret_cast<const bool*>(&is_materialized_col_[0]);
162  }
163 
166  void* GetCodegenFn(THdfsFileFormat::type);
167 
170  }
171 
174  }
175 
179  void AddMaterializedRowBatch(RowBatch* row_batch);
180 
189  hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id,
190  int disk_id, bool try_cache, bool expected_local, int64_t mtime);
191 
193  Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& ranges);
194 
196  Status AddDiskIoRanges(const HdfsFileDesc* file_desc);
197 
203  void MarkFileDescIssued(const HdfsFileDesc* file_desc);
204 
210  const std::vector<ExprContext*>& value_ctxs);
211 
216 
218  void TransferToScanNodePool(MemPool* pool);
219 
221  HdfsFileDesc* GetFileDesc(const std::string& filename);
222 
227  void* GetFileMetadata(const std::string& filename);
228 
231  void SetFileMetadata(const std::string& filename, void* metadata);
232 
236  void RangeComplete(const THdfsFileFormat::type& file_type,
237  const THdfsCompression::type& compression_type);
240  void RangeComplete(const THdfsFileFormat::type& file_type,
241  const std::vector<THdfsCompression::type>& compression_type);
242 
249  void ComputeSlotMaterializationOrder(std::vector<int>* order) const;
250 
252  typedef boost::unordered_map<int32_t, std::pair<int, int64_t> > PerVolumnStats;
253 
255  static void UpdateHdfsSplitStats(
256  const std::vector<TScanRangeParams>& scan_range_params_list,
257  PerVolumnStats* per_volume_stats);
258 
261  static void PrintHdfsSplitStats(const PerVolumnStats& per_volume_stats,
262  std::stringstream* ss);
263 
265  static const std::string HDFS_SPLIT_STATS_DESC;
266 
267  private:
268  friend class ScannerContext;
269 
272  boost::scoped_ptr<TPlanNode> thrift_plan_node_;
273 
275 
277  const int tuple_id_;
278 
281 
284 
288 
292 
294  boost::unordered_set<int64_t> partition_ids_;
295 
297  typedef std::map<std::string, HdfsFileDesc*> FileDescMap;
299 
301  typedef std::map<THdfsFileFormat::type, std::vector<HdfsFileDesc*> > FileFormatsMap;
303 
308 
313 
316 
319  typedef std::map<THdfsFileFormat::type, HdfsScanner*> ScannerMap;
321 
323  typedef std::map<THdfsFileFormat::type, void*> CodegendFnMap;
325 
328  std::vector<ExprContext*> conjunct_ctxs_;
329 
331  typedef boost::unordered_map<std::vector<int>, int> PathToSlotIdxMap;
333 
336  //
339  std::vector<char> is_materialized_col_;
340 
344  std::vector<SlotDescriptor*> materialized_slots_;
345 
348  std::vector<SlotDescriptor*> partition_key_slots_;
349 
352 
355  boost::mutex metadata_lock_;
356  std::map<std::string, void*> per_file_metadata_;
357 
360 
363  boost::scoped_ptr<RowBatchQueue> materialized_row_batches_;
364 
367 
372 
376 
381 
385 
388 
391 
394 
397 
400 
403 
407  boost::mutex lock_;
408 
413  bool done_;
414 
418 
421  boost::scoped_ptr<MemPool> scan_node_pool_;
422 
427 
432  typedef std::map<
433  std::pair<THdfsFileFormat::type, THdfsCompression::type>, int> FileTypeCountsMap;
435 
439 
443 
449 
453  ScannerContext* context, Status* status);
454 
458  void ScannerThread();
459 
466  bool EnoughMemoryForScannerThread(bool new_thread);
467 
469  Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos);
470 
473  void SetDone();
474 
481 };
482 
483 }
484 
485 #endif
const std::vector< SlotDescriptor * > & materialized_slots() const
ThreadGroup scanner_threads_
Thread group for all scanner worker threads.
int max_materialized_row_batches_
Maximum size of materialized_row_batches_.
HdfsScanner * CreateAndPrepareScanner(HdfsPartitionDescriptor *partition_desc, ScannerContext *context, Status *status)
AtomicInt< int > num_owned_io_buffers_
AtomicInt< int > num_unqueued_files_
Number of files that have not been issued from the scanners.
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
int num_partition_keys() const
Returns number of partition keys in the table, including non-materialized slots.
std::string filename
File name including the path.
RuntimeProfile::HighWaterMarkCounter * max_compressed_text_file_length()
void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool *pool)
int GetMaterializedSlotIdx(const std::vector< int > &path) const
AtomicInt< int > num_scanners_codegen_disabled_
RuntimeProfile::Counter * bytes_read_dn_cache_
Total number of bytes read from data node cache.
const HdfsTableDescriptor * hdfs_table()
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
static const std::string HDFS_SPLIT_STATS_DESC
Description string for the per volume stats output.
ProgressUpdater progress_
Keeps track of total splits and the number finished.
virtual Status Open(RuntimeState *state)
boost::scoped_ptr< RowBatchQueue > materialized_row_batches_
Lightweight spinlock.
Definition: spinlock.h:24
boost::unordered_set< int64_t > partition_ids_
Partitions scanned by this scan node.
boost::unordered_map< std::vector< int >, int > PathToSlotIdxMap
Maps from a slot's path to its index into materialized_slots_.
std::vector< SlotDescriptor * > partition_key_slots_
RuntimeProfile::Counter * num_remote_ranges_
Total number of remote scan ranges.
std::map< THdfsFileFormat::type, void * > CodegendFnMap
Per scanner type codegen'd fn.
boost::scoped_ptr< TPlanNode > thrift_plan_node_
void MarkFileDescIssued(const HdfsFileDesc *file_desc)
void SetFileMetadata(const std::string &filename, void *metadata)
void * GetFileMetadata(const std::string &filename)
FileFormatsMap per_type_files_
Tuple * InitEmptyTemplateTuple()
virtual Status Reset(RuntimeState *state)
boost::scoped_ptr< MemPool > scan_node_pool_
HdfsFileDesc(const std::string &filename)
int64_t partition_id
The partition id that this range is part of.
void RangeComplete(const THdfsFileFormat::type &file_type, const THdfsCompression::type &compression_type)
RuntimeProfile::Counter * unexpected_remote_bytes_
Total number of bytes read remotely that were expected to be local.
std::map< std::string, void * > per_file_metadata_
int64_t limit_
Definition: exec-node.h:222
HdfsScanNode(ObjectPool *pool, const TPlanNode &tnode, const DescriptorTbl &descs)
const bool * is_materialized_col()
std::vector< char > is_materialized_col_
std::map< THdfsFileFormat::type, std::vector< HdfsFileDesc * > > FileFormatsMap
File format => file descriptors.
std::map< std::string, HdfsFileDesc * > FileDescMap
File path => file descriptor (which includes the file's splits)
void IncNumScannersCodegenEnabled()
SpinLock file_type_counts_lock_
THdfsCompression::type file_compression
int64_t mtime
Last modified time.
HdfsFileDesc * GetFileDesc(const std::string &filename)
Returns the file desc for 'filename'. Returns NULL if filename is invalid.
FileTypeCountsMap file_type_counts_
Status AddDiskIoRanges(const std::vector< DiskIoMgr::ScanRange * > &ranges)
Adds ranges to the io mgr queue and starts up new scanner threads if possible.
const int tuple_id_
Tuple id resolved in Prepare() to set tuple_desc_;.
void ComputeSlotMaterializationOrder(std::vector< int > *order) const
RuntimeProfile::Counter * bytes_read_local_
Total number of bytes read locally.
CodegendFnMap codegend_fn_map_
ObjectPool pool
int num_clustering_cols() const
Definition: descriptors.h:153
RuntimeProfile::Counter disks_accessed_bitmap_
Disk accessed bitmap.
static const int SKIP_COLUMN
RuntimeState * runtime_state()
void AddMaterializedRowBatch(RowBatch *row_batch)
Status GetNextInternal(RuntimeState *state, RowBatch *row_batch, bool *eos)
Checks for eos conditions and returns batches from materialized_row_batches_.
DiskIoMgr::RequestContext * reader_context()
std::vector< SlotDescriptor * > materialized_slots_
virtual Status Prepare(RuntimeState *state)
ExecNode methods.
Abstract base class of all scan nodes; introduces SetScanRange().
Definition: scan-node.h:77
void * GetCodegenFn(THdfsFileFormat::type)
std::vector< ExprContext * > conjunct_ctxs_
hdfsFS fs
Connection to the filesystem containing the file.
static void PrintHdfsSplitStats(const PerVolumnStats &per_volume_stats, std::stringstream *ss)
RuntimeState * runtime_state_
int64_t scanner_thread_bytes_required_
bool EnoughMemoryForScannerThread(bool new_thread)
std::map< THdfsFileFormat::type, HdfsScanner * > ScannerMap
RuntimeProfile::HighWaterMarkCounter * max_compressed_text_file_length_
static void UpdateHdfsSplitStats(const std::vector< TScanRangeParams > &scan_range_params_list, PerVolumnStats *per_volume_stats)
Update the per volume stats with the given scan range params list.
const TupleDescriptor * tuple_desc_
Descriptor for tuples this scan node constructs.
AtomicInt< int > num_scanners_codegen_enabled_
void TransferToScanNodePool(MemPool *pool)
Acquires all allocations from pool into scan_node_pool_. Thread-safe.
boost::mutex metadata_lock_
std::vector< DiskIoMgr::ScanRange * > splits
Splits (i.e. raw byte ranges) for this file, assigned to this scan node.
DiskIoMgr::ScanRange * AllocateScanRange(hdfsFS fs, const char *file, int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool try_cache, bool expected_local, int64_t mtime)
uint8_t offset[7 *64-sizeof(uint64_t)]
std::map< std::pair< THdfsFileFormat::type, THdfsCompression::type >, int > FileTypeCountsMap
Metadata for a single partition inside an Hdfs table.
Definition: descriptors.h:177
RuntimeProfile::Counter * bytes_read_short_circuit_
Total number of bytes read via short circuit read.
DiskIoMgr::RequestContext * reader_context_
RequestContext object to use with the disk-io-mgr for reads.
int tuple_idx() const
PathToSlotIdxMap path_to_materialized_slot_idx_
const HdfsTableDescriptor * hdfs_table_
boost::unordered_map< int32_t, std::pair< int, int64_t > > PerVolumnStats
map from volume id to <number of split, per volume split lengths>
AtomicInt< int > num_skipped_tokens_
virtual void Close(RuntimeState *state)
int num_materialized_partition_keys() const
Returns number of materialized partition key slots.
ScanRangeMetadata(int64_t partition_id)
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)
Tuple * InitTemplateTuple(RuntimeState *state, const std::vector< ExprContext * > &value_ctxs)
void IncNumScannersCodegenDisabled()
const TupleDescriptor * tuple_desc()
Status GetConjunctCtxs(std::vector< ExprContext * > *ctxs)