Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-sequence-scanner.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include "codegen/llvm-codegen.h"
19 #include "exec/hdfs-scan-node.h"
22 #include "runtime/descriptors.h"
23 #include "runtime/runtime-state.h"
24 #include "runtime/tuple.h"
25 #include "runtime/tuple-row.h"
26 #include "util/codec.h"
27 
28 #include "common/names.h"
29 
30 using namespace impala;
31 using namespace llvm;
32 
34  "org.apache.hadoop.io.Text";
35 
36 const uint8_t HdfsSequenceScanner::SEQFILE_VERSION_HEADER[4] = {'S', 'E', 'Q', 6};
37 
38 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
39 
41  : BaseSequenceScanner(scan_node, state),
42  unparsed_data_buffer_(NULL),
43  num_buffered_records_in_compressed_block_(0) {
44 }
45 
47 }
48 
49 // Codegen for materialized parsed data into tuples.
51  const vector<ExprContext*>& conjunct_ctxs) {
52  if (!node->runtime_state()->codegen_enabled()) return NULL;
53  LlvmCodeGen* codegen;
54  if (!node->runtime_state()->GetCodegen(&codegen).ok()) return NULL;
55  Function* write_complete_tuple_fn =
56  CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs);
57  if (write_complete_tuple_fn == NULL) return NULL;
58  return CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn);
59 }
60 
62  DCHECK(header_ != NULL);
63  only_parsing_header_ = false;
64 
66 
67  text_converter_.reset(new TextConverter(hdfs_partition->escape_char(),
69 
72  scan_node_->is_materialized_col(), '\0', hdfs_partition->field_delim(),
73  hdfs_partition->collection_delim(), hdfs_partition->escape_char()));
74 
76 
77  SeqFileHeader* seq_header = reinterpret_cast<SeqFileHeader*>(header_);
78  if (seq_header->is_compressed) {
80  }
81 
82  // Initialize codegen fn
84  THdfsFileFormat::SEQUENCE_FILE, "HdfsSequenceScanner"));
85  return Status::OK;
86 }
87 
90 
91  // Allocate the scratch space for two pass parsing. The most fields we can go
92  // through in one parse pass is the batch size (tuples) * the number of fields per tuple
93  // TODO: This should probably be based on L2/L3 cache sizes (as should the batch size)
96  return Status::OK;
97 }
98 
100  return new SeqFileHeader;
101 }
102 
103 inline Status HdfsSequenceScanner::GetRecord(uint8_t** record_ptr,
104  int64_t* record_len) {
105  // There are 2 cases:
106  // - Record-compressed -- like a regular record, but the data is compressed.
107  // - Uncompressed.
109 
110  // We don't look at the keys, only the values.
112 
113  if (header_->is_compressed) {
114  int64_t in_size = current_block_length_ - current_key_length_;
115  if (in_size < 0) {
116  stringstream ss;
117  ss << "Invalid record size: " << in_size;
118  return Status(ss.str());
119  }
120  uint8_t* compressed_data;
122  stream_->ReadBytes(in_size, &compressed_data, &parse_status_));
123 
124  int64_t len;
125  {
127  RETURN_IF_ERROR(decompressor_->ProcessBlock(false, in_size, compressed_data,
128  &len, &unparsed_data_buffer_));
129  VLOG_FILE << "Decompressed " << in_size << " to " << len;
130  }
131  *record_ptr = unparsed_data_buffer_;
132  // Read the length of the record.
133  int size = ReadWriteUtil::GetVLong(*record_ptr, record_len);
134  if (size == -1) return Status("Invalid record sizse");
135  *record_ptr += size;
136  } else {
137  // Uncompressed records
139  if (*record_len < 0) {
140  stringstream ss;
141  ss << "Invalid record length: " << *record_len;
142  return Status(ss.str());
143  }
145  stream_->ReadBytes(*record_len, record_ptr, &parse_status_));
146  }
147  return Status::OK;
148 }
149 
150 // Process block compressed sequence files. This is the most used sequence file
151 // format. The general strategy is to process the data in large chunks to minimize
152 // function calls. The process is:
153 // 1. Decompress an entire block
154 // 2. In row batch sizes:
155 // a. Collect the start of records and their lengths
156 // b. Parse cols locations to field_locations_
157 // c. Materialize those field locations to row batches
158 // 3. Read the sync indicator and check the sync block
159 // This mimics the technique for text.
160 // This function only returns on error or when the entire scan range is complete.
162  DCHECK(header_->is_compressed);
163 
164  while (!finished()) {
165  if (scan_node_->ReachedLimit()) return Status::OK;
166 
167  // Step 1
170 
171  // Step 2
174  }
175 
176  // SequenceFiles don't end with syncs
177  if (stream_->eof()) return Status::OK;
178 
179  // Step 3
180  int sync_indicator;
181  RETURN_IF_FALSE(stream_->ReadInt(&sync_indicator, &parse_status_));
182  if (sync_indicator != -1) {
183  if (state_->LogHasSpace()) {
184  stringstream ss;
185  ss << "Expecting sync indicator (-1) at file offset "
186  << (stream_->file_offset() - sizeof(int)) << ". "
187  << "Sync indicator found " << sync_indicator << ".";
188  state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
189  }
190  return Status("Bad sync hash");
191  }
193  }
194 
195  return Status::OK;
196 }
197 
199  MemPool* pool;
200  TupleRow* tuple_row;
201  int64_t max_tuples = GetMemory(&pool, &tuple_, &tuple_row);
202  int num_to_process = min(max_tuples, num_buffered_records_in_compressed_block_);
204 
205  if (scan_node_->materialized_slots().empty()) {
206  // Handle case where there are no slots to materialize (e.g. count(*))
207  num_to_process = WriteEmptyTuples(context_, tuple_row, num_to_process);
208  COUNTER_ADD(scan_node_->rows_read_counter(), num_to_process);
209  RETURN_IF_ERROR(CommitRows(num_to_process));
210  return Status::OK;
211  }
212 
213  // Parse record starts and lengths
214  int field_location_offset = 0;
215  for (int i = 0; i < num_to_process; ++i) {
216  DCHECK_LT(i, record_locations_.size());
217  int bytes_read = ReadWriteUtil::GetVLong(
219  if (UNLIKELY(bytes_read == -1)) {
220  return Status("Invalid record sizes in compressed block.");
221  }
222  next_record_in_compressed_block_ += bytes_read;
225  }
226 
227  // Parse records to find field locations.
228  for (int i = 0; i < num_to_process; ++i) {
229  int num_fields = 0;
230  if (delimited_text_parser_->escape_char() == '\0') {
231  delimited_text_parser_->ParseSingleTuple<false>(
232  record_locations_[i].len,
233  reinterpret_cast<char*>(record_locations_[i].record),
234  &field_locations_[field_location_offset], &num_fields);
235  } else {
236  delimited_text_parser_->ParseSingleTuple<true>(
237  record_locations_[i].len,
238  reinterpret_cast<char*>(record_locations_[i].record),
239  &field_locations_[field_location_offset], &num_fields);
240  }
241  DCHECK_EQ(num_fields, scan_node_->materialized_slots().size());
242  field_location_offset += num_fields;
243  DCHECK_LE(field_location_offset, field_locations_.size());
244  }
245 
246  int max_added_tuples = (scan_node_->limit() == -1) ?
247  num_to_process :
249 
250  // Materialize parsed cols to tuples
252  // Call jitted function if possible
253  int tuples_returned;
254  if (write_tuples_fn_ != NULL) {
255  // last argument: seq always starts at record_location[0]
256  tuples_returned = write_tuples_fn_(this, pool, tuple_row,
257  batch_->row_byte_size(), &field_locations_[0], num_to_process,
258  max_added_tuples, scan_node_->materialized_slots().size(), 0);
259  } else {
260  tuples_returned = WriteAlignedTuples(pool, tuple_row,
261  batch_->row_byte_size(), &field_locations_[0], num_to_process,
262  max_added_tuples, scan_node_->materialized_slots().size(), 0);
263  }
264 
265  if (tuples_returned == -1) return parse_status_;
266  COUNTER_ADD(scan_node_->rows_read_counter(), num_to_process);
267  RETURN_IF_ERROR(CommitRows(tuples_returned));
268  return Status::OK;
269 }
270 
273 
274  SeqFileHeader* seq_header = reinterpret_cast<SeqFileHeader*>(header_);
275  // Block compressed is handled separately to minimize function calls.
276  if (seq_header->is_compressed && !seq_header->is_row_compressed) {
278  }
279 
280  // We count the time here since there is too much overhead to do
281  // this on each record.
283 
284  while (!finished()) {
285  DCHECK_GT(record_locations_.size(), 0);
286  // Get the next compressed or uncompressed record.
288  GetRecord(&record_locations_[0].record, &record_locations_[0].len));
289 
290  MemPool* pool;
291  TupleRow* tuple_row_mem;
292  int max_tuples = GetMemory(&pool, &tuple_, &tuple_row_mem);
293  DCHECK_GT(max_tuples, 0);
294 
295  // Parse the current record.
296  bool add_row = false;
297 
298  // Parse the current record.
299  if (scan_node_->materialized_slots().size() != 0) {
300  char* col_start;
301  uint8_t* record_start = record_locations_[0].record;
302  int num_tuples = 0;
303  int num_fields = 0;
304  char* row_end_loc;
305  uint8_t error_in_row = false;
306 
307  RETURN_IF_ERROR(delimited_text_parser_->ParseFieldLocations(
308  1, record_locations_[0].len, reinterpret_cast<char**>(&record_start),
309  &row_end_loc, &field_locations_[0], &num_tuples, &num_fields, &col_start));
310  DCHECK(num_tuples == 1);
311 
312  uint8_t errors[num_fields];
313  memset(errors, 0, sizeof(errors));
314 
315  add_row = WriteCompleteTuple(pool, &field_locations_[0], tuple_, tuple_row_mem,
316  template_tuple_, &errors[0], &error_in_row);
317 
318  if (UNLIKELY(error_in_row)) {
319  ReportTupleParseError(&field_locations_[0], errors, 0);
321  }
322  } else {
323  add_row = WriteEmptyTuples(context_, tuple_row_mem, 1);
324  }
325 
327  if (add_row) RETURN_IF_ERROR(CommitRows(1));
328  if (scan_node_->ReachedLimit()) break;
329 
330  // Sequence files don't end with syncs
331  if (stream_->eof()) return Status::OK;
332 
333  // Check for sync by looking for the marker that precedes syncs.
334  int marker;
335  RETURN_IF_FALSE(stream_->ReadInt(&marker, &parse_status_, /* peek */ true));
336  if (marker == SYNC_MARKER) {
337  RETURN_IF_FALSE(stream_->ReadInt(&marker, &parse_status_, /* peek */ false));
339  }
340  }
341 
342  return Status::OK;
343 }
344 
346  uint8_t* header;
347 
349  sizeof(SEQFILE_VERSION_HEADER), &header, &parse_status_));
350 
351  if (memcmp(header, SEQFILE_VERSION_HEADER, sizeof(SEQFILE_VERSION_HEADER))) {
352  stringstream ss;
353  ss << "Invalid SEQFILE_VERSION_HEADER: '"
354  << ReadWriteUtil::HexDump(header, sizeof(SEQFILE_VERSION_HEADER)) << "'";
355  return Status(ss.str());
356  }
357 
358  // We don't care what this is since we don't use the keys.
360 
361  uint8_t* class_name;
362  int64_t len;
363  RETURN_IF_FALSE(stream_->ReadText(&class_name, &len, &parse_status_));
364  if (memcmp(class_name, HdfsSequenceScanner::SEQFILE_VALUE_CLASS_NAME, len)) {
365  stringstream ss;
366  ss << "Invalid SEQFILE_VALUE_CLASS_NAME: '"
367  << string(reinterpret_cast<char*>(class_name), len) << "'";
368  return Status(ss.str());
369  }
370 
371  SeqFileHeader* seq_header = reinterpret_cast<SeqFileHeader*>(header_);
372  bool is_blk_compressed;
376  stream_->ReadBoolean(&is_blk_compressed, &parse_status_));
377  seq_header->is_row_compressed = !is_blk_compressed;
378 
379  if (header_->is_compressed) {
380  uint8_t* codec_ptr;
381  RETURN_IF_FALSE(stream_->ReadText(&codec_ptr, &len, &parse_status_));
382  header_->codec = string(reinterpret_cast<char*>(codec_ptr), len);
383  Codec::CodecMap::const_iterator it = Codec::CODEC_MAP.find(header_->codec);
384  DCHECK(it != Codec::CODEC_MAP.end());
385  header_->compression_type = it->second;
386  } else {
387  header_->compression_type = THdfsCompression::NONE;
388  }
389  VLOG_FILE << stream_->filename() << ": "
390  << (header_->is_compressed ?
391  (seq_header->is_row_compressed ? "row compressed" : "block compressed") :
392  "not compressed");
394 
395  // Skip file metadata
396  int map_size = 0;
398 
399  for (int i = 0; i < map_size; ++i) {
402  }
403 
404  // Read file sync marker
405  uint8_t* sync;
407  memcpy(header_->sync, sync, SYNC_HASH_SIZE);
408 
410 
411  if (!header_->is_compressed || seq_header->is_row_compressed) {
412  // Block-compressed scan ranges have an extra sync following the sync in the header,
413  // all other formats do not
415  }
416  return Status::OK;
417 }
418 
421  if (current_block_length_ < 0) {
422  stringstream ss;
423  int64_t position = stream_->file_offset();
424  position -= sizeof(int32_t);
425  ss << "Bad block length: " << current_block_length_ << " at offset " << position;
426  return Status(ss.str());
427  }
428 
430  if (current_key_length_ < 0) {
431  stringstream ss;
432  int64_t position = stream_->file_offset();
433  position -= sizeof(int32_t);
434  ss << "Bad key length: " << current_key_length_ << " at offset " << position;
435  return Status(ss.str());
436  }
437 
438  return Status::OK;
439 }
440 
442  // We are reading a new compressed block. Pass the previous buffer pool
443  // bytes to the batch. We don't need them anymore.
444  if (!decompressor_->reuse_output_buffer()) {
445  AttachPool(data_buffer_pool_.get(), true);
446  }
447 
451  if (state_->LogHasSpace()) {
452  stringstream ss;
453  ss << "Bad compressed block record count: "
455  state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
456  }
457  return Status("bad record count");
458  }
459 
460  // Skip the compressed key length and key buffers, we don't need them.
463 
464  // Skip the compressed value length buffer. We don't need these either since the
465  // records are in Text format with length included.
467 
468  // Read the compressed value buffer from the unbuffered stream.
469  int64_t block_size = 0;
471  // Check for a reasonable size
472  if (block_size > MAX_BLOCK_SIZE || block_size < 0) {
473  stringstream ss;
474  ss << "Compressed block size is: " << block_size;
475  return Status(ss.str());
476  }
477 
478  uint8_t* compressed_data = NULL;
479  RETURN_IF_FALSE(stream_->ReadBytes(block_size, &compressed_data, &parse_status_));
480 
481  {
482  int64_t len;
484  RETURN_IF_ERROR(decompressor_->ProcessBlock(false, block_size, compressed_data,
485  &len, &unparsed_data_buffer_));
486  VLOG_FILE << "Decompressed " << block_size << " to " << len;
488  }
489 
490  return Status::OK;
491 }
492 
493 void HdfsSequenceScanner::LogRowParseError(int row_idx, stringstream* ss) {
494  DCHECK_LT(row_idx, record_locations_.size());
495  *ss << string(reinterpret_cast<const char*>(record_locations_[row_idx].record),
496  record_locations_[row_idx].len);
497 }
const std::vector< SlotDescriptor * > & materialized_slots() const
const std::string & null_column_value() const
Definition: descriptors.h:233
static const CodecMap CODEC_MAP
Definition: codec.h:52
boost::scoped_ptr< Codec > decompressor_
Decompressor class to use, if any.
Definition: hdfs-scanner.h:198
bool SkipText(Status *)
Skip this text object.
HdfsScanNode * scan_node_
The scan node that started this scanner.
Definition: hdfs-scanner.h:141
boost::scoped_ptr< DelimitedTextParser > delimited_text_parser_
Helper class for picking fields and rows from delimited text.
Status GetRecord(uint8_t **record_ptr, int64_t *record_len)
int num_partition_keys() const
Returns number of partition keys in the table, including non-materialized slots.
uint8_t * next_record_in_compressed_block_
Next record from block compressed data.
ScannerContext * context_
Context for this scanner.
Definition: hdfs-scanner.h:147
bool only_parsing_header_
If true, this scanner object is only for processing the header.
int64_t total_bytes_returned()
Returns the total number of bytes returned.
FileHeader * header_
File header for this scan range. This is not owned by the parent scan node.
boost::scoped_ptr< MemPool > data_buffer_pool_
Definition: hdfs-scanner.h:205
static llvm::Function * CodegenWriteCompleteTuple(HdfsScanNode *, LlvmCodeGen *, const std::vector< ExprContext * > &conjunct_ctxs)
bool ReadInt(int32_t *val, Status *, bool peek=false)
std::vector< FieldLocation > field_locations_
static const int SYNC_HASH_SIZE
Size of the sync hash field.
boost::scoped_ptr< TextConverter > text_converter_
Helper class for converting text to other types;.
Definition: hdfs-scanner.h:186
WriteTuplesFn write_tuples_fn_
Jitted write tuples function pointer. Null if codegen is disabled.
Definition: hdfs-scanner.h:215
virtual void LogRowParseError(int row_idx, std::stringstream *)
const HdfsTableDescriptor * hdfs_table()
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
int64_t num_buffered_records_in_compressed_block_
Number of buffered records unparsed_data_buffer_ from block compressed data.
int WriteEmptyTuples(RowBatch *row_batch, int num_tuples)
bool LogHasSpace()
Returns true if the error log has not reached max_errors_.
int current_key_length_
Length of the current key. This is specified as 4 bytes in the format description.
RuntimeProfile::Counter * rows_read_counter() const
Definition: scan-node.h:96
virtual FileHeader * AllocateFileHeader()
Implementation of sequence container super class methods.
bool ReadBoolean(bool *boolean, Status *)
bool ReadVLong(int64_t *val, Status *)
bool ReadText(uint8_t **buf, int64_t *length, Status *)
static const uint8_t SEQFILE_VERSION_HEADER[4]
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
#define COUNTER_ADD(c, v)
int row_byte_size()
Definition: row-batch.h:147
int64_t file_offset() const
Returns the buffer's current offset in the file.
bool ReachedLimit()
Definition: exec-node.h:159
THdfsCompression::type compression_type
Enum for compression type.
#define SCOPED_TIMER(c)
const bool * is_materialized_col()
uint8_t * unparsed_data_buffer_
Buffer for data read from HDFS or from decompressing the HDFS data.
virtual Status Prepare(ScannerContext *context)
Implementation of HdfsScanner interface.
LLVM code generator. This is the top level object to generate jitted code.
Definition: llvm-codegen.h:107
RuntimeState * state_
RuntimeState for error reporting.
Definition: hdfs-scanner.h:144
Status UpdateDecompressor(const THdfsCompression::type &compression)
bool LogError(const ErrorMsg &msg)
int GetMemory(MemPool **pool, Tuple **tuple_mem, TupleRow **tuple_row_mem)
static const char *const SEQFILE_VALUE_CLASS_NAME
bool eof() const
If true, the stream has reached the end of the file.
std::vector< RecordLocation > record_locations_
ObjectPool pool
bool SkipBytes(int64_t length, Status *)
Skip over the next length bytes in the specified HDFS file.
Status CommitRows(int num_rows)
Data that is fixed across headers. This struct is shared between scan ranges.
#define RETURN_IF_FALSE(x)
RuntimeState * runtime_state()
bool WriteCompleteTuple(MemPool *pool, FieldLocation *fields, Tuple *tuple, TupleRow *tuple_row, Tuple *template_tuple, uint8_t *error_fields, uint8_t *error_in_row)
static const int SYNC_MARKER
Sync indicator.
int64_t rows_returned() const
Definition: exec-node.h:157
int batch_size() const
Definition: runtime-state.h:98
static llvm::Function * CodegenWriteAlignedTuples(HdfsScanNode *, LlvmCodeGen *, llvm::Function *write_tuple_fn)
void AttachPool(MemPool *pool, bool commit_batch)
Definition: hdfs-scanner.h:256
virtual Status InitNewRange()
Reset internal state for a new scan range.
#define UNLIKELY(expr)
Definition: compiler-util.h:33
bool codegen_enabled() const
Returns true if codegen is enabled for this query.
uint8_t sync[SYNC_HASH_SIZE]
The sync hash for this file.
static int GetVLong(uint8_t *buf, int64_t *vlong)
static llvm::Function * Codegen(HdfsScanNode *, const std::vector< ExprContext * > &conjunct_ctxs)
Codegen writing tuples and evaluating predicates.
static const Status OK
Definition: status.h:87
int WriteAlignedTuples(MemPool *pool, TupleRow *tuple_row_mem, int row_size, FieldLocation *fields, int num_tuples, int max_added_tuples, int slots_per_tuple, int row_start_indx)
Status GetCodegen(LlvmCodeGen **codegen, bool initialize=true)
std::string codec
Codec name if it is compressed.
Metadata for a single partition inside an Hdfs table.
Definition: descriptors.h:177
Tuple * tuple_
Current tuple pointer into tuple_mem_.
Definition: hdfs-scanner.h:170
static std::string HexDump(const uint8_t *buf, int64_t length)
Dump the first length bytes of buf to a Hex string.
int current_block_length_
Length of the current sequence file block (or record).
#define VLOG_FILE
Definition: logging.h:58
bool ok() const
Definition: status.h:172
RuntimeProfile::Counter * decompress_timer_
Time spent decompressing bytes.
Definition: hdfs-scanner.h:208
bool ReadBytes(int64_t length, uint8_t **buf, Status *, bool peek=false)
bool is_compressed
true if the file is compressed
bool is_row_compressed
If true, the file uses row compression.
Status InitializeWriteTuplesFn(HdfsPartitionDescriptor *partition, THdfsFileFormat::type type, const std::string &scanner_name)
Definition: hdfs-scanner.cc:87
HdfsPartitionDescriptor * partition_descriptor()
bool ReportTupleParseError(FieldLocation *fields, uint8_t *errors, int row_idx)
ScannerContext::Stream * stream_
The first stream for context_.
Definition: hdfs-scanner.h:150
HdfsSequenceScanner(HdfsScanNode *scan_node, RuntimeState *state)
RuntimeProfile::Counter * materialize_tuple_timer() const
Definition: scan-node.h:104