Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
base-sequence-scanner.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_EXEC_BASE_SEQUENCE_SCANNER_H
17 #define IMPALA_EXEC_BASE_SEQUENCE_SCANNER_H
18 
19 #include <vector>
20 #include <memory>
21 #include <stdint.h>
22 
23 #include "exec/hdfs-scanner.h"
24 
25 namespace impala {
26 
27 struct HdfsFileDesc;
28 class ScannerContext;
29 
36  public:
38  static Status IssueInitialRanges(HdfsScanNode* scan_node,
39  const std::vector<HdfsFileDesc*>& files);
40 
41  virtual Status Prepare(ScannerContext* context);
42  virtual void Close();
43  virtual Status ProcessSplit();
44 
45  virtual ~BaseSequenceScanner();
46 
47  protected:
49  const static int SYNC_HASH_SIZE = 16;
50 
53  struct FileHeader {
54  virtual ~FileHeader() {}
55 
57  uint8_t sync[SYNC_HASH_SIZE];
58 
61 
63  std::string codec;
64 
66  THdfsCompression::type compression_type;
67 
70  int64_t header_size;
71  };
72 
81 
85  virtual FileHeader* AllocateFileHeader() = 0;
86 
90  virtual Status ReadFileHeader() = 0;
91 
97  virtual Status ProcessRange() = 0;
98 
100  virtual THdfsFileFormat::type file_format() const = 0;
101 
103 
108  Status ReadSync();
109 
115  Status SkipToSync(const uint8_t* sync, int sync_size);
116 
117  bool finished() { return finished_; }
118 
121  const static int HEADER_SIZE;
122 
124  const static int SYNC_MARKER;
125 
128 
131 
132  private:
138  //
144  //
146  bool finished_;
147 
150  int64_t block_start_;
151 
155 
158 
162  int ReadPastSize(int64_t file_offset);
163 
166  int FindSyncBlock(const uint8_t* buffer, int buffer_len, const uint8_t* sync,
167  int sync_len);
168 
171  void CloseFileRanges(const char* file);
172 
175 };
176 
177 }
178 
179 #endif
static Status IssueInitialRanges(HdfsScanNode *scan_node, const std::vector< HdfsFileDesc * > &files)
Issue the initial ranges for all sequence container files.
bool only_parsing_header_
If true, this scanner object is only for processing the header.
FileHeader * header_
File header for this scan range. This is not owned by the parent scan node.
static const int SYNC_HASH_SIZE
Size of the sync hash field.
virtual Status ProcessRange()=0
Status SkipToSync(const uint8_t *sync, int sync_size)
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
THdfsCompression::type compression_type
Enum for compression type.
virtual Status ReadFileHeader()=0
int ReadPastSize(int64_t file_offset)
int num_syncs_
The number of syncs seen by this scanner so far.
RuntimeProfile::Counter * bytes_skipped_counter_
Number of bytes skipped when advancing to next sync on error.
static const int SYNC_MARKER
Sync indicator.
virtual THdfsFileFormat::type file_format() const =0
Returns type of scanner: e.g. rcfile, seqfile.
uint8_t sync[SYNC_HASH_SIZE]
The sync hash for this file.
std::string codec
Codec name if it is compressed.
int FindSyncBlock(const uint8_t *buffer, int buffer_len, const uint8_t *sync, int sync_len)
bool is_compressed
true if the file is compressed
bool finished_
finished_ is set by ReadSync() and SkipToSync().
void CloseFileRanges(const char *file)
BaseSequenceScanner(HdfsScanNode *, RuntimeState *)
virtual FileHeader * AllocateFileHeader()=0