Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-scanner.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_EXEC_HDFS_SCANNER_H_
17 #define IMPALA_EXEC_HDFS_SCANNER_H_
18 
19 #include <vector>
20 #include <memory>
21 #include <stdint.h>
22 #include <boost/regex.hpp>
23 #include <boost/scoped_ptr.hpp>
24 
25 #include "codegen/impala-ir.h"
26 #include "exec/hdfs-scan-node.h"
27 #include "exec/scan-node.h"
28 #include "exec/scanner-context.h"
29 #include "runtime/disk-io-mgr.h"
30 #include "runtime/row-batch.h"
31 
32 namespace impala {
33 
34 class Compression;
35 class DescriptorTbl;
36 class Expr;
37 class HdfsPartitionDescriptor;
38 class MemPool;
39 class SlotDescriptor;
40 class Status;
41 class TextConverter;
42 class Tuple;
43 class TupleDescriptor;
44 class TPlanNode;
45 class TScanRange;
46 class Codec;
47 
52 struct FieldLocation {
56  char* start;
59  int len;
60 
61  static const char* LLVM_CLASS_NAME;
62 };
63 
73 //
76 //
85 //
91 class HdfsScanner {
92  public:
95  const static int FILE_BLOCK_SIZE = 4096;
96 
97  HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state);
98 
99  virtual ~HdfsScanner();
100 
102  virtual Status Prepare(ScannerContext* context);
103 
107  virtual Status ProcessSplit() = 0;
108 
111  virtual void Close();
112 
115 
131 
136 
137  static const char* LLVM_CLASS_NAME;
138 
139  protected:
142 
145 
148 
151 
154  std::vector<ExprContext*> conjunct_ctxs_;
155 
165 
168 
171 
178 
180  uint8_t* tuple_mem_;
181 
184 
186  boost::scoped_ptr<TextConverter> text_converter_;
187 
190 
196 
198  boost::scoped_ptr<Codec> decompressor_;
199 
201  THdfsCompression::type decompression_type_;
202 
205  boost::scoped_ptr<MemPool> data_buffer_pool_;
206 
209 
213  int, int, int, int);
216 
222  THdfsFileFormat::type type, const std::string& scanner_name);
223 
225  void StartNewRowBatch();
226 
228  virtual Status InitNewRange() = 0;
229 
238  int GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem);
239 
245  Status CommitRows(int num_rows);
246 
251  void AddFinalRowBatch();
252 
256  void AttachPool(MemPool* pool, bool commit_batch) {
257  DCHECK(batch_ != NULL);
258  DCHECK(pool != NULL);
259  batch_->tuple_data_pool()->AcquireData(pool, false);
260  if (commit_batch) CommitRows(0);
261  }
262 
267  return ExecNode::EvalConjuncts(&conjunct_ctxs_[0], conjunct_ctxs_.size(), row);
268  }
269 
274  int WriteEmptyTuples(RowBatch* row_batch, int num_tuples);
275 
277  int WriteEmptyTuples(ScannerContext* context, TupleRow* tuple_row, int num_tuples);
278 
290  int WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row_mem, int row_size,
291  FieldLocation* fields, int num_tuples,
292  int max_added_tuples, int slots_per_tuple, int row_start_indx);
293 
297  Status UpdateDecompressor(const THdfsCompression::type& compression);
298  Status UpdateDecompressor(const std::string& codec);
299 
306  bool ReportTupleParseError(FieldLocation* fields, uint8_t* errors, int row_idx);
307 
313  virtual void LogRowParseError(int row_idx, std::stringstream*);
314 
322  //
326  //
333  bool WriteCompleteTuple(MemPool* pool, FieldLocation* fields, Tuple* tuple,
334  TupleRow* tuple_row, Tuple* template_tuple, uint8_t* error_fields,
335  uint8_t* error_in_row);
336 
339  static llvm::Function* CodegenWriteCompleteTuple(HdfsScanNode*, LlvmCodeGen*,
340  const std::vector<ExprContext*>& conjunct_ctxs);
341 
345  static llvm::Function* CodegenWriteAlignedTuples(HdfsScanNode*, LlvmCodeGen*,
346  llvm::Function* write_tuple_fn);
347 
350  void ReportColumnParseError(const SlotDescriptor* desc, const char* data, int len);
351 
355  void InitTuple(Tuple* template_tuple, Tuple* tuple) {
356  if (template_tuple != NULL) {
357  memcpy(tuple, template_tuple, tuple_byte_size_);
358  } else {
359  memset(tuple, 0, sizeof(uint8_t) * num_null_bytes_);
360  }
361  }
362 
363  inline Tuple* next_tuple(Tuple* t) const {
364  uint8_t* mem = reinterpret_cast<uint8_t*>(t);
365  return reinterpret_cast<Tuple*>(mem + tuple_byte_size_);
366  }
367 
368  inline TupleRow* next_row(TupleRow* r) const {
369  uint8_t* mem = reinterpret_cast<uint8_t*>(r);
370  return reinterpret_cast<TupleRow*>(mem + batch_->row_byte_size());
371  }
372 
375  ExprContext* GetConjunctCtx(int idx) const;
376 };
377 
378 }
379 
380 #endif
boost::scoped_ptr< Codec > decompressor_
Decompressor class to use, if any.
Definition: hdfs-scanner.h:198
void ReportColumnParseError(const SlotDescriptor *desc, const char *data, int len)
ExprContext * GetConjunctCtx(int idx) const
static const char * LLVM_CLASS_NAME
Definition: hdfs-scanner.h:61
virtual Status InitNewRange()=0
Reset internal state for a new scan range.
HdfsScanNode * scan_node_
The scan node that started this scanner.
Definition: hdfs-scanner.h:141
virtual void LogRowParseError(int row_idx, std::stringstream *)
static const char * LLVM_CLASS_NAME
Definition: hdfs-scanner.h:137
ScannerContext * context_
Context for this scanner.
Definition: hdfs-scanner.h:147
static const int FILE_BLOCK_SIZE
Definition: hdfs-scanner.h:95
int tuple_byte_size_
Fixed size of each tuple, in bytes.
Definition: hdfs-scanner.h:167
boost::scoped_ptr< MemPool > data_buffer_pool_
Definition: hdfs-scanner.h:205
static llvm::Function * CodegenWriteCompleteTuple(HdfsScanNode *, LlvmCodeGen *, const std::vector< ExprContext * > &conjunct_ctxs)
boost::scoped_ptr< TextConverter > text_converter_
Helper class for converting text to other types;.
Definition: hdfs-scanner.h:186
int32_t num_null_bytes_
Number of null bytes in the tuple.
Definition: hdfs-scanner.h:189
WriteTuplesFn write_tuples_fn_
Jitted write tuples function pointer. Null if codegen is disabled.
Definition: hdfs-scanner.h:215
uint8_t * tuple_mem_
The tuple memory of batch_.
Definition: hdfs-scanner.h:180
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
std::vector< ExprContext * > conjunct_ctxs_
Definition: hdfs-scanner.h:154
int WriteEmptyTuples(RowBatch *row_batch, int num_tuples)
virtual ~HdfsScanner()
Definition: hdfs-scanner.cc:67
void AcquireData(MemPool *src, bool keep_current)
Definition: mem-pool.cc:161
TupleRow * next_row(TupleRow *r) const
Definition: hdfs-scanner.h:368
int row_byte_size()
Definition: row-batch.h:147
#define IR_ALWAYS_INLINE
Definition: impala-ir.h:31
void StartNewRowBatch()
Set batch_ to a new row batch and update tuple_mem_ accordingly.
THdfsCompression::type decompression_type_
The most recently used decompression type.
Definition: hdfs-scanner.h:201
virtual void Close()
Definition: hdfs-scanner.cc:82
LLVM code generator. This is the top level object to generate jitted code.
Definition: llvm-codegen.h:107
RuntimeState * state_
RuntimeState for error reporting.
Definition: hdfs-scanner.h:144
virtual Status ProcessSplit()=0
Status UpdateDecompressor(const THdfsCompression::type &compression)
void InitTuple(Tuple *template_tuple, Tuple *tuple)
Definition: hdfs-scanner.h:355
int GetMemory(MemPool **pool, Tuple **tuple_mem, TupleRow **tuple_row_mem)
int num_errors_in_file_
number of errors in current file
Definition: hdfs-scanner.h:183
ObjectPool pool
Status CommitRows(int num_rows)
HdfsScanner(HdfsScanNode *scan_node, RuntimeState *state)
Definition: hdfs-scanner.cc:53
bool WriteCompleteTuple(MemPool *pool, FieldLocation *fields, Tuple *tuple, TupleRow *tuple_row, Tuple *template_tuple, uint8_t *error_fields, uint8_t *error_in_row)
bool IR_ALWAYS_INLINE EvalConjuncts(TupleRow *row)
Definition: hdfs-scanner.h:266
MemPool * tuple_data_pool()
Definition: row-batch.h:148
static llvm::Function * CodegenWriteAlignedTuples(HdfsScanNode *, LlvmCodeGen *, llvm::Function *write_tuple_fn)
void AttachPool(MemPool *pool, bool commit_batch)
Definition: hdfs-scanner.h:256
int WriteAlignedTuples(MemPool *pool, TupleRow *tuple_row_mem, int row_size, FieldLocation *fields, int num_tuples, int max_added_tuples, int slots_per_tuple, int row_start_indx)
Metadata for a single partition inside an Hdfs table.
Definition: descriptors.h:177
Tuple * tuple_
Current tuple pointer into tuple_mem_.
Definition: hdfs-scanner.h:170
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
Definition: exec-node.cc:393
RuntimeProfile::Counter * decompress_timer_
Time spent decompressing bytes.
Definition: hdfs-scanner.h:208
Status InitializeWriteTuplesFn(HdfsPartitionDescriptor *partition, THdfsFileFormat::type type, const std::string &scanner_name)
Definition: hdfs-scanner.cc:87
bool ReportTupleParseError(FieldLocation *fields, uint8_t *errors, int row_idx)
ScannerContext::Stream * stream_
The first stream for context_.
Definition: hdfs-scanner.h:150
int(* WriteTuplesFn)(HdfsScanner *, MemPool *, TupleRow *, int, FieldLocation *, int, int, int, int)
Definition: hdfs-scanner.h:212
Tuple * next_tuple(Tuple *t) const
Definition: hdfs-scanner.h:363
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
Definition: hdfs-scanner.cc:71