Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-text-scanner.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/hdfs-text-scanner.h"
16 
17 #include "codegen/llvm-codegen.h"
21 #include "exec/hdfs-scan-node.h"
23 #include "exec/text-converter.h"
25 #include "runtime/row-batch.h"
26 #include "runtime/runtime-state.h"
27 #include "util/codec.h"
28 #include "util/decompress.h"
29 #include "util/cpu-info.h"
30 #include "util/debug-util.h"
31 
32 #include "common/names.h"
33 
34 using boost::algorithm::ends_with;
35 using boost::algorithm::to_lower;
36 using namespace impala;
37 using namespace llvm;
38 
39 DEFINE_bool(debug_disable_streaming_gzip, false, "Debug flag, will be removed. Disables "
40  "streaming gzip decompression.");
41 
42 const char* HdfsTextScanner::LLVM_CLASS_NAME = "class.impala::HdfsTextScanner";
43 
44 // Suffix for lzo index file: hdfs-filename.index
45 const string HdfsTextScanner::LZO_INDEX_SUFFIX = ".index";
46 
47 // Number of bytes to read when the previous attempt to streaming decompress did not make
48 // progress.
49 const int64_t GZIP_FIXED_READ_SIZE = 1 * 1024 * 1024;
50 
52  : HdfsScanner(scan_node, state),
53  byte_buffer_ptr_(NULL),
54  byte_buffer_end_(NULL),
55  byte_buffer_read_size_(0),
56  only_parsing_header_(false),
57  boundary_pool_(new MemPool(scan_node->mem_tracker())),
58  boundary_row_(boundary_pool_.get()),
59  boundary_column_(boundary_pool_.get()),
60  slot_idx_(0),
61  error_in_row_(false) {
62 }
63 
65 }
66 
68  const vector<HdfsFileDesc*>& files) {
69  vector<DiskIoMgr::ScanRange*> compressed_text_scan_ranges;
70  vector<HdfsFileDesc*> lzo_text_files;
71  bool warning_written = false;
72  for (int i = 0; i < files.size(); ++i) {
73  warning_written = false;
74  THdfsCompression::type compression = files[i]->file_compression;
75  switch (compression) {
76  case THdfsCompression::NONE:
77  // For uncompressed text we just issue all ranges at once.
78  // TODO: Lz4 is splittable, should be treated similarly.
79  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(files[i]));
80  break;
81 
82  case THdfsCompression::GZIP:
83  case THdfsCompression::SNAPPY:
84  case THdfsCompression::SNAPPY_BLOCKED:
85  case THdfsCompression::BZIP2:
86  for (int j = 0; j < files[i]->splits.size(); ++j) {
87  // In order to decompress gzip-, snappy- and bzip2-compressed text files, we
88  // need to read entire files. Only read a file if we're assigned the first split
89  // to avoid reading multi-block files with multiple scanners.
90  DiskIoMgr::ScanRange* split = files[i]->splits[j];
91 
92  // We only process the split that starts at offset 0.
93  if (split->offset() != 0) {
94  if (!warning_written) {
95  // We are expecting each file to be one hdfs block (so all the scan range
96  // offsets should be 0). This is not incorrect but we will issue a warning.
97  // We write a single warning per file per impalad to reduce the number of
98  // log warnings.
99  stringstream ss;
100  ss << "For better performance, snappy, gzip and bzip-compressed files "
101  << "should not be split into multiple hdfs-blocks. file="
102  << files[i]->filename << " offset " << split->offset();
103  scan_node->runtime_state()->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
104  warning_written = true;
105  }
106  // We assign the entire file to one scan range, so mark all but one split
107  // (i.e. the first split) as complete.
108  scan_node->RangeComplete(THdfsFileFormat::TEXT, compression);
109  continue;
110  }
111 
112  // Populate the list of compressed text scan ranges.
113  DCHECK_GT(files[i]->file_length, 0);
114  ScanRangeMetadata* metadata =
115  reinterpret_cast<ScanRangeMetadata*>(split->meta_data());
116  DiskIoMgr::ScanRange* file_range = scan_node->AllocateScanRange(
117  files[i]->fs, files[i]->filename.c_str(), files[i]->file_length, 0,
118  metadata->partition_id, split->disk_id(), split->try_cache(),
119  split->expected_local(), files[i]->mtime);
120  compressed_text_scan_ranges.push_back(file_range);
121  scan_node->max_compressed_text_file_length()->Set(files[i]->file_length);
122  }
123  break;
124 
125  case THdfsCompression::LZO:
126  // lzo-compressed text need to be processed by the specialized HdfsLzoTextScanner.
127  // Note that any LZO_INDEX files (no matter what the case of their suffix) will be
128  // filtered by the planner.
129  {
130  #ifndef NDEBUG
131  // No straightforward way to do this in one line inside a DCHECK, so for once
132  // we'll explicitly use NDEBUG to avoid executing debug-only code.
133  string lower_filename = files[i]->filename;
134  to_lower(lower_filename);
135  DCHECK(!ends_with(lower_filename, LZO_INDEX_SUFFIX));
136  #endif
137  lzo_text_files.push_back(files[i]);
138  }
139  break;
140 
141  default:
142  DCHECK(false);
143  }
144  }
145 
146  if (compressed_text_scan_ranges.size() > 0) {
147  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges));
148  }
149  if (lzo_text_files.size() > 0) {
150  // This will dlopen the lzo binary and can fail if the lzo binary is not present.
151  RETURN_IF_ERROR(HdfsLzoTextScanner::IssueInitialRanges(scan_node, lzo_text_files));
152  }
153  return Status::OK;
154 }
155 
157  // Reset state for new scan range
159 
160  // Find the first tuple. If tuple_found is false, it means we went through the entire
161  // scan range without finding a single tuple. The bytes will be picked up by the scan
162  // range before.
163  bool tuple_found;
164  RETURN_IF_ERROR(FindFirstTuple(&tuple_found));
165 
166  if (tuple_found) {
167  // Update the decompressor depending on the compression type of the file in the
168  // context.
169  DCHECK(stream_->file_desc()->file_compression != THdfsCompression::SNAPPY)
170  << "FE should have generated SNAPPY_BLOCKED instead.";
172 
173  // Process the scan range.
174  int dummy_num_tuples;
175  RETURN_IF_ERROR(ProcessRange(&dummy_num_tuples, false));
176 
177  // Finish up reading past the scan range.
179  }
180 
181  // All done with this scan range.
182  return Status::OK;
183 }
184 
186  // Need to close the decompressor before releasing the resources at AddFinalRowBatch(),
187  // because in some cases there is memory allocated in decompressor_'s temp_memory_pool_.
188  if (decompressor_.get() != NULL) {
189  decompressor_->Close();
190  decompressor_.reset(NULL);
191  }
192  AttachPool(data_buffer_pool_.get(), false);
193  AttachPool(boundary_pool_.get(), false);
195  if (!only_parsing_header_) {
197  THdfsFileFormat::TEXT, stream_->file_desc()->file_compression);
198  }
200 }
201 
203  // Compressed text does not reference data in the io buffers directly. In such case, we
204  // can recycle the buffers in the stream_ more promptly.
205  if (stream_->file_desc()->file_compression != THdfsCompression::NONE) {
207  }
208 
210  char field_delim = hdfs_partition->field_delim();
211  char collection_delim = hdfs_partition->collection_delim();
212  if (scan_node_->materialized_slots().size() == 0) {
213  field_delim = '\0';
214  collection_delim = '\0';
215  }
216 
219  scan_node_->is_materialized_col(), hdfs_partition->line_delim(),
220  field_delim, collection_delim, hdfs_partition->escape_char()));
221  text_converter_.reset(new TextConverter(hdfs_partition->escape_char(),
223 
225  return Status::OK;
226 }
227 
229  error_in_row_ = false;
230 
231  // Note - this initialisation relies on the assumption that N partition keys will occupy
232  // entries 0 through N-1 in column_idx_to_slot_idx. If this changes, we will need
233  // another layer of indirection to map text-file column indexes onto the
234  // column_idx_to_slot_idx table used below.
235  slot_idx_ = 0;
236 
239  delimited_text_parser_->ParserReset();
240 
241  partial_tuple_empty_ = true;
243 
246 
247  // Initialize codegen fn
249  context_->partition_descriptor(), THdfsFileFormat::TEXT, "HdfsTextScanner"));
250  return Status::OK;
251 }
252 
254  if (scan_node_->ReachedLimit()) return Status::OK;
255 
256  // For text we always need to scan past the scan range to find the next delimiter
257  while (true) {
258  bool eosr = true;
259  Status status = Status::OK;
261 
262  // If compressed text, then there is nothing more to be read.
263  // TODO: calling FillByteBuffer() at eof() can cause
264  // ScannerContext::Stream::GetNextBuffer to DCHECK. Fix this.
265  if (decompressor_.get() == NULL && !stream_->eof()) {
266  status = FillByteBuffer(&eosr, NEXT_BLOCK_READ_SIZE);
267  }
268 
269  if (!status.ok() || byte_buffer_read_size_ == 0) {
270  if (status.IsCancelled()) return status;
271 
272  if (!status.ok()) {
273  stringstream ss;
274  ss << "Read failed while trying to finish scan range: " << stream_->filename()
275  << ":" << stream_->file_offset() << endl << status.GetDetail();
276  if (state_->LogHasSpace()) {
277  state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
278  }
279  if (state_->abort_on_error()) return Status(ss.str());
280  } else if (!partial_tuple_empty_ || !boundary_column_.Empty() ||
281  !boundary_row_.Empty() ||
282  (delimited_text_parser_->HasUnfinishedTuple() &&
283  (!scan_node_->materialized_slots().empty() ||
285  // Missing columns or row delimiter at end of the file is ok, fill the row in.
286  char* col = boundary_column_.str().ptr;
287  int num_fields = 0;
288  delimited_text_parser_->FillColumns<true>(boundary_column_.Size(),
289  &col, &num_fields, &field_locations_[0]);
290 
291  MemPool* pool;
292  TupleRow* tuple_row_mem;
293  int max_tuples = GetMemory(&pool, &tuple_, &tuple_row_mem);
294  DCHECK_GE(max_tuples, 1);
295  // Set variables for proper error outputting on boundary tuple
298  int num_tuples = WriteFields(pool, tuple_row_mem, num_fields, 1);
299  DCHECK_LE(num_tuples, 1);
300  DCHECK_GE(num_tuples, 0);
301  COUNTER_ADD(scan_node_->rows_read_counter(), num_tuples);
302  RETURN_IF_ERROR(CommitRows(num_tuples));
303  } else if (delimited_text_parser_->HasUnfinishedTuple()) {
304  DCHECK(scan_node_->materialized_slots().empty());
306  // If no fields are materialized we do not update partial_tuple_empty_,
307  // boundary_column_, or boundary_row_. However, we still need to handle the case
308  // of partial tuple due to missing tuple delimiter at the end of file.
310  }
311  break;
312  }
313 
314  DCHECK(eosr);
315 
316  int num_tuples;
317  RETURN_IF_ERROR(ProcessRange(&num_tuples, true));
318  if (num_tuples == 1) break;
319  DCHECK_EQ(num_tuples, 0);
320  }
321 
322  return Status::OK;
323 }
324 
325 Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) {
326  bool eosr = past_scan_range || stream_->eosr();
327 
328  while (true) {
329  if (!eosr && byte_buffer_ptr_ == byte_buffer_end_) {
331  }
332 
333  MemPool* pool;
334  TupleRow* tuple_row_mem;
335  int max_tuples = GetMemory(&pool, &tuple_, &tuple_row_mem);
336 
337  if (past_scan_range) {
338  // byte_buffer_ptr_ is already set from FinishScanRange()
339  max_tuples = 1;
340  eosr = true;
341  }
342 
343  *num_tuples = 0;
344  int num_fields = 0;
345 
346  DCHECK_GT(max_tuples, 0);
347 
349  char* col_start = byte_buffer_ptr_;
350  {
351  // Parse the bytes for delimiters and store their offsets in field_locations_
353  RETURN_IF_ERROR(delimited_text_parser_->ParseFieldLocations(max_tuples,
355  &row_end_locations_[0],
356  &field_locations_[0], num_tuples, &num_fields, &col_start));
357  }
358 
359  // Materialize the tuples into the in memory format for this query
360  int num_tuples_materialized = 0;
361  if (scan_node_->materialized_slots().size() != 0 &&
362  (num_fields > 0 || *num_tuples > 0)) {
363  // There can be one partial tuple which returned no more fields from this buffer.
364  DCHECK_LE(*num_tuples, num_fields + 1);
365  if (!boundary_column_.Empty()) {
368  }
369  num_tuples_materialized = WriteFields(pool, tuple_row_mem, num_fields, *num_tuples);
370  DCHECK_GE(num_tuples_materialized, 0);
372  if (*num_tuples > 0) {
373  // If we saw any tuple delimiters, clear the boundary_row_.
375  }
376  } else if (*num_tuples != 0) {
378  // If we are doing count(*) then we return tuples only containing partition keys
380  num_tuples_materialized = WriteEmptyTuples(context_, tuple_row_mem, *num_tuples);
381  }
382 
383  // Save contents that are split across buffers if we are going to return this column
384  if (col_start != byte_buffer_ptr_ && delimited_text_parser_->ReturnCurrentColumn()) {
386  boundary_column_.Append(col_start, byte_buffer_ptr_ - col_start);
387  char* last_row = NULL;
388  if (*num_tuples == 0) {
389  last_row = batch_start_ptr_;
390  } else {
391  last_row = row_end_locations_[*num_tuples - 1] + 1;
392  }
393  boundary_row_.Append(last_row, byte_buffer_ptr_ - last_row);
394  }
395  COUNTER_ADD(scan_node_->rows_read_counter(), *num_tuples);
396 
397  // Commit the rows to the row batch and scan node
398  RETURN_IF_ERROR(CommitRows(num_tuples_materialized));
399 
400  // Done with this buffer and the scan range
401  if ((byte_buffer_ptr_ == byte_buffer_end_ && eosr) || past_scan_range) {
402  break;
403  }
404 
405  if (scan_node_->ReachedLimit()) return Status::OK;
406  }
407  return Status::OK;
408 }
409 
410 Status HdfsTextScanner::FillByteBuffer(bool* eosr, int num_bytes) {
411  *eosr = false;
412  Status status;
413 
414  if (decompressor_.get() == NULL) {
415  if (num_bytes > 0) {
416  stream_->GetBytes(num_bytes, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
417  &byte_buffer_read_size_, &status);
418  } else {
419  DCHECK_EQ(num_bytes, 0);
420  status = stream_->GetBuffer(false, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
422  }
423  *eosr = stream_->eosr();
424  } else if (!FLAGS_debug_disable_streaming_gzip &&
425  decompression_type_ == THdfsCompression::GZIP) {
426  DCHECK_EQ(num_bytes, 0);
428  } else {
429  DCHECK_EQ(num_bytes, 0);
431  }
432 
434  return status;
435 }
436 
438  // Attach any previously decompressed buffers to the row batch before decompressing
439  // any more data.
440  if (!decompressor_->reuse_output_buffer()) {
441  AttachPool(data_buffer_pool_.get(), false);
442  }
443 
444  // Gzip compressed text is decompressed as buffers are read from stream_ (unlike
445  // other codecs which decompress the entire file in a single call). A compressed
446  // buffer is passed to ProcessBlockStreaming but it may not consume all of the input.
447  // In the unlikely case that decompressed output is not produced, we attempt to try
448  // again with a reasonably large fixed size input buffer (explicitly calling
449  // GetBytes()) before failing.
450  bool try_read_fixed_size = false;
451  uint8_t* decompressed_buffer = NULL;
452  int64_t decompressed_len = 0;
453  do {
454  uint8_t* gzip_buffer_ptr = NULL;
455  int64_t gzip_buffer_size = 0;
456  // We don't know how many bytes ProcessBlockStreaming() will consume so we set
457  // peak=true and then later advance the stream using SkipBytes().
458  if (!try_read_fixed_size) {
459  RETURN_IF_ERROR(stream_->GetBuffer(true, &gzip_buffer_ptr, &gzip_buffer_size));
460  } else {
461  Status status;
462  stream_->GetBytes(GZIP_FIXED_READ_SIZE, &gzip_buffer_ptr, &gzip_buffer_size,
463  &status, true);
464  RETURN_IF_ERROR(status);
465  try_read_fixed_size = false;
466  }
467  if (gzip_buffer_size == 0) {
468  // If the compressed file was not properly ended, the decoder will not know that
469  // the last buffer should have been eos.
470  stringstream ss;
471  ss << "Unexpected end of file decompressing gzip. File may be malformed. ";
472  ss << "file: " << stream_->filename();
473  return Status(ss.str());
474  }
475 
476  int64_t gzip_buffer_bytes_read = 0;
477  {
479  RETURN_IF_ERROR(decompressor_->ProcessBlockStreaming(gzip_buffer_size,
480  gzip_buffer_ptr, &gzip_buffer_bytes_read, &decompressed_len,
481  &decompressed_buffer, eosr));
482  DCHECK_GE(gzip_buffer_size, gzip_buffer_bytes_read);
483  DCHECK_GE(decompressed_len, 0);
484  }
485 
486  // Skip the bytes in stream_ that were decompressed.
487  Status status;
488  stream_->SkipBytes(gzip_buffer_bytes_read, &status);
489  RETURN_IF_ERROR(status);
490 
491  if (!*eosr && decompressed_len == 0) {
492  // It's possible (but very unlikely) that ProcessBlockStreaming() wasn't able to
493  // make progress if the compressed buffer returned by GetBytes() is too small.
494  // (Note that this did not even occur in simple experiments where the input buffer
495  // is always 1 byte, but we need to handle this case to be defensive.) In this
496  // case, try again with a reasonably large fixed size buffer. If we still did not
497  // make progress, then return an error.
498  if (try_read_fixed_size) {
499  stringstream ss;
500  ss << "Unable to make progress decoding gzip text. ";
501  ss << "file: " << stream_->filename();
502  return Status(ss.str());
503  }
504  VLOG_FILE << "Unable to make progress decompressing gzip, trying again";
505  try_read_fixed_size = true;
506  }
507  } while (try_read_fixed_size);
508 
509  byte_buffer_ptr_ = reinterpret_cast<char*>(decompressed_buffer);
510  byte_buffer_read_size_ = decompressed_len;
511 
512  if (*eosr) {
513  if (!stream_->eosr()) {
514  // TODO: Add a test case that exercises this path.
515  stringstream ss;
516  ss << "Unexpected end of gzip stream before end of file: ";
517  ss << stream_->filename();
518  if (state_->LogHasSpace()) {
519  state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
520  }
521  if (state_->abort_on_error()) parse_status_ = Status(ss.str());
523  }
524 
525  context_->ReleaseCompletedResources(NULL, true);
526  }
527  return Status::OK;
528 }
529 
531  // For other compressed text: attempt to read and decompress the entire file, point
532  // to the decompressed buffer, and then continue normal processing.
533  DCHECK(decompression_type_ != THdfsCompression::SNAPPY);
535  int64_t file_size = desc->file_length;
536  DCHECK_GT(file_size, 0);
537  Status status;
538  stream_->GetBytes(file_size, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
539  &byte_buffer_read_size_, &status);
540  RETURN_IF_ERROR(status);
541 
542  // If didn't read anything, return.
543  if (byte_buffer_read_size_ == 0) {
544  *eosr = true;
545  return Status::OK;
546  }
547 
548  // Need to read the entire file.
549  if (file_size < byte_buffer_read_size_) {
550  stringstream ss;
551  ss << "Expected to read a compressed text file of size " << file_size << " bytes. "
552  << "But only read " << byte_buffer_read_size_ << " bytes. This may indicate "
553  << "data file corruption. (file: " << stream_->filename() << ").";
554  return Status(ss.str());
555  }
556 
557  // Decompress and adjust the byte_buffer_ptr_ and byte_buffer_read_size_ accordingly.
558  int64_t decompressed_len = 0;
559  uint8_t* decompressed_buffer = NULL;
561  // TODO: Once the writers are in, add tests with very large compressed files (4GB)
562  // that could overflow.
564  reinterpret_cast<uint8_t*>(byte_buffer_ptr_), &decompressed_len,
565  &decompressed_buffer));
566 
567  // Inform stream_ that the buffer with the compressed text can be released.
568  context_->ReleaseCompletedResources(NULL, true);
569 
570  VLOG_FILE << "Decompressed " << byte_buffer_read_size_ << " to " << decompressed_len;
571  byte_buffer_ptr_ = reinterpret_cast<char*>(decompressed_buffer);
572  byte_buffer_read_size_ = decompressed_len;
573  *eosr = stream_->eosr();
574  return Status::OK;
575 }
576 
578  *tuple_found = true;
579  if (stream_->scan_range()->offset() != 0) {
580  *tuple_found = false;
581  // Offset may not point to tuple boundary, skip ahead to the first full tuple
582  // start.
583  while (true) {
584  bool eosr = false;
586 
587  delimited_text_parser_->ParserReset();
589  int first_tuple_offset = delimited_text_parser_->FindFirstInstance(
591 
592  if (first_tuple_offset == -1) {
593  // Didn't find tuple in this buffer, keep going with this scan range
594  if (!eosr) continue;
595  } else {
596  byte_buffer_ptr_ += first_tuple_offset;
597  *tuple_found = true;
598  }
599  break;
600  }
601  }
602  DCHECK(delimited_text_parser_->AtTupleStart());
603  return Status::OK;
604 }
605 
606 // Codegen for materializing parsed data into tuples. The function WriteCompleteTuple is
607 // codegen'd using the IRBuilder for the specific tuple description. This function
608 // is then injected into the cross-compiled driving function, WriteAlignedTuples().
610  const vector<ExprContext*>& conjunct_ctxs) {
611  if (!node->runtime_state()->codegen_enabled()) return NULL;
612  LlvmCodeGen* codegen;
613  if (!node->runtime_state()->GetCodegen(&codegen).ok()) return NULL;
614  Function* write_complete_tuple_fn =
615  CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs);
616  if (write_complete_tuple_fn == NULL) return NULL;
617  return CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn);
618 }
619 
622 
624  "DelimiterParseTime", ScanNode::SCANNER_THREAD_TOTAL_WALLCLOCK_TIME);
625 
626  // Allocate the scratch space for two pass parsing. The most fields we can go
627  // through in one parse pass is the batch size (tuples) * the number of fields per tuple
628  // TODO: This should probably be based on L2/L3 cache sizes (as should the batch size)
631 
632  return Status::OK;
633 }
634 
635 void HdfsTextScanner::LogRowParseError(int row_idx, stringstream* ss) {
636  DCHECK_LT(row_idx, row_end_locations_.size());
637  char* row_end = row_end_locations_[row_idx];
638  char* row_start;
639  if (row_idx == 0) {
640  row_start = batch_start_ptr_;
641  } else {
642  // Row start at 1 past the row end (i.e. the row delimiter) for the previous row
643  row_start = row_end_locations_[row_idx - 1] + 1;
644  }
645 
646  if (!boundary_row_.Empty()) {
647  // Log the beginning of the line from the previous file buffer(s)
648  *ss << boundary_row_.str();
649  }
650  // Log the erroneous line (or the suffix of a line if !boundary_line.empty()).
651  *ss << string(row_start, row_end - row_start);
652 }
653 
654 // This function writes fields in 'field_locations_' to the row_batch. This function
655 // deals with tuples that straddle batches. There are two cases:
656 // 1. There is already a partial tuple in flight from the previous time around.
657 // This tuple can either be fully materialized (all the materialized columns have
658 // been processed but we haven't seen the tuple delimiter yet) or only partially
659 // materialized. In this case num_tuples can be greater than num_fields
660 // 2. There is a non-fully materialized tuple at the end. The cols that have been
661 // parsed so far are written to 'tuple_' and the remained will be picked up (case 1)
662 // the next time around.
664  int num_fields, int num_tuples) {
666 
667  FieldLocation* fields = &field_locations_[0];
668 
669  int num_tuples_processed = 0;
670  int num_tuples_materialized = 0;
671  // Write remaining fields, if any, from the previous partial tuple.
672  if (slot_idx_ != 0) {
673  DCHECK(tuple_ != NULL);
674  int num_partial_fields = scan_node_->materialized_slots().size() - slot_idx_;
675  // Corner case where there will be no materialized tuples but at least one col
676  // worth of string data. In this case, make a deep copy and reuse the byte buffer.
677  bool copy_strings = num_partial_fields > num_fields;
678  num_partial_fields = min(num_partial_fields, num_fields);
679  WritePartialTuple(fields, num_partial_fields, copy_strings);
680 
681  // This handles case 1. If the tuple is complete and we've found a tuple delimiter
682  // this time around (i.e. num_tuples > 0), add it to the row batch. Otherwise,
683  // it will get picked up the next time around
684  if (slot_idx_ == scan_node_->materialized_slots().size() && num_tuples > 0) {
685  if (UNLIKELY(error_in_row_)) {
686  if (state_->LogHasSpace()) {
687  stringstream ss;
688  ss << "file: " << stream_->filename() << endl << "record: ";
689  LogRowParseError(0, &ss);
690  state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
691  }
693  if (!parse_status_.ok()) return 0;
694  error_in_row_ = false;
695  }
697 
699  partial_tuple_empty_ = true;
700  tuple_row->SetTuple(scan_node_->tuple_idx(), tuple_);
701 
702  slot_idx_ = 0;
703  ++num_tuples_processed;
704  --num_tuples;
705 
706  if (EvalConjuncts(tuple_row)) {
707  ++num_tuples_materialized;
709  tuple_row = next_row(tuple_row);
710  }
711  }
712 
713  num_fields -= num_partial_fields;
714  fields += num_partial_fields;
715  }
716 
717  // Write complete tuples. The current field, if any, is at the start of a tuple.
718  if (num_tuples > 0) {
719  int max_added_tuples = (scan_node_->limit() == -1) ?
720  num_tuples : scan_node_->limit() - scan_node_->rows_returned();
721  int tuples_returned = 0;
722  // Call jitted function if possible
723  if (write_tuples_fn_ != NULL) {
724  tuples_returned = write_tuples_fn_(this, pool, tuple_row,
725  batch_->row_byte_size(), fields, num_tuples, max_added_tuples,
726  scan_node_->materialized_slots().size(), num_tuples_processed);
727  } else {
728  tuples_returned = WriteAlignedTuples(pool, tuple_row,
729  batch_->row_byte_size(), fields, num_tuples, max_added_tuples,
730  scan_node_->materialized_slots().size(), num_tuples_processed);
731  }
732  if (tuples_returned == -1) return 0;
733  DCHECK_EQ(slot_idx_, 0);
734 
735  num_tuples_materialized += tuples_returned;
736  num_fields -= num_tuples * scan_node_->materialized_slots().size();
737  fields += num_tuples * scan_node_->materialized_slots().size();
738  }
739 
740  DCHECK_GE(num_fields, 0);
741  DCHECK_LE(num_fields, scan_node_->materialized_slots().size());
742 
743  // Write out the remaining slots (resulting in a partially materialized tuple)
744  if (num_fields != 0) {
745  DCHECK(tuple_ != NULL);
747  // If there have been no materialized tuples at this point, copy string data
748  // out of byte_buffer and reuse the byte_buffer. The copied data can be at
749  // most one tuple's worth.
750  WritePartialTuple(fields, num_fields, num_tuples_materialized == 0);
751  partial_tuple_empty_ = false;
752  }
753  DCHECK_LE(slot_idx_, scan_node_->materialized_slots().size());
754  return num_tuples_materialized;
755 }
756 
758  bool needs_escape = data->len < 0;
759  int copy_len = needs_escape ? -data->len : data->len;
760  int total_len = copy_len + boundary_column_.Size();
761  char* str_data = reinterpret_cast<char*>(pool->Allocate(total_len));
762  memcpy(str_data, boundary_column_.str().ptr, boundary_column_.Size());
763  memcpy(str_data + boundary_column_.Size(), data->start, copy_len);
764  data->start = str_data;
765  data->len = needs_escape ? -total_len : total_len;
766 }
767 
769  int num_fields, bool copy_strings) {
770  int next_line_offset = 0;
771  for (int i = 0; i < num_fields; ++i) {
772  int need_escape = false;
773  int len = fields[i].len;
774  if (len < 0) {
775  len = -len;
776  need_escape = true;
777  }
778  next_line_offset += (len + 1);
779 
781  if (!text_converter_->WriteSlot(desc, partial_tuple_,
782  fields[i].start, len, true, need_escape, data_buffer_pool_.get())) {
783  ReportColumnParseError(desc, fields[i].start, len);
784  error_in_row_ = true;
785  }
786  ++slot_idx_;
787  }
788  return next_line_offset;
789 }
const std::vector< SlotDescriptor * > & materialized_slots() const
const std::string & null_column_value() const
Definition: descriptors.h:233
Status ProcessRange(int *num_tuples, bool past_scan_range)
virtual Status InitNewRange()
boost::scoped_ptr< Codec > decompressor_
Decompressor class to use, if any.
Definition: hdfs-scanner.h:198
void ReportColumnParseError(const SlotDescriptor *desc, const char *data, int len)
std::vector< char * > row_end_locations_
const StringValue & str() const
Returns the underlying StringValue.
Definition: string-buffer.h:86
HdfsScanNode * scan_node_
The scan node that started this scanner.
Definition: hdfs-scanner.h:141
const std::string GetDetail() const
Definition: status.cc:184
virtual void LogRowParseError(int row_idx, std::stringstream *)
int Size() const
Returns the length of the current string.
Definition: string-buffer.h:81
int num_partition_keys() const
Returns number of partition keys in the table, including non-materialized slots.
Status GetBuffer(bool peek, uint8_t **buffer, int64_t *out_len)
ScannerContext * context_
Context for this scanner.
Definition: hdfs-scanner.h:147
void Append(const char *str, int len)
Append 'str' to the current string, allocating a new buffer as necessary.
Definition: string-buffer.h:44
StringBuffer boundary_column_
Helper string for dealing with columns that span file blocks.
RuntimeProfile::HighWaterMarkCounter * max_compressed_text_file_length()
boost::scoped_ptr< MemPool > data_buffer_pool_
Definition: hdfs-scanner.h:205
static llvm::Function * CodegenWriteCompleteTuple(HdfsScanNode *, LlvmCodeGen *, const std::vector< ExprContext * > &conjunct_ctxs)
std::string ErrorLog()
Returns the error log lines as a string joined with ' '.
boost::scoped_ptr< TextConverter > text_converter_
Helper class for converting text to other types;.
Definition: hdfs-scanner.h:186
WriteTuplesFn write_tuples_fn_
Jitted write tuples function pointer. Null if codegen is disabled.
Definition: hdfs-scanner.h:215
char * byte_buffer_end_
Ending position of HDFS buffer.
Status FillByteBufferCompressedFile(bool *eosr)
const HdfsTableDescriptor * hdfs_table()
void Clear()
Clear the underlying StringValue. The allocated buffer can be reused.
Definition: string-buffer.h:65
void ReleaseCompletedResources(RowBatch *batch, bool done)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
bool Empty() const
Returns whether the current string is empty.
Definition: string-buffer.h:76
int WriteEmptyTuples(RowBatch *row_batch, int num_tuples)
bool LogHasSpace()
Returns true if the error log has not reached max_errors_.
static const int NEXT_BLOCK_READ_SIZE
RuntimeProfile::Counter * rows_read_counter() const
Definition: scan-node.h:96
int byte_size() const
Definition: descriptors.h:300
const int64_t GZIP_FIXED_READ_SIZE
Status FinishScanRange()
Reads past the end of the scan range for the next tuple end.
static Tuple * Create(int size, MemPool *pool)
initialize individual tuple with data residing in mem pool
Definition: tuple.h:51
TupleRow * next_row(TupleRow *r) const
Definition: hdfs-scanner.h:368
#define COUNTER_ADD(c, v)
int64_t partition_id
The partition id that this range is part of.
int row_byte_size()
Definition: row-batch.h:147
int64_t file_offset() const
Returns the buffer's current offset in the file.
bool ReachedLimit()
Definition: exec-node.h:159
std::vector< FieldLocation > field_locations_
Return field locations from the Delimited Text Parser.
void RangeComplete(const THdfsFileFormat::type &file_type, const THdfsCompression::type &compression_type)
#define ADD_CHILD_TIMER(profile, name, parent)
#define SCOPED_TIMER(c)
int WritePartialTuple(FieldLocation *, int num_fields, bool copy_strings)
virtual Status Prepare(ScannerContext *context)
Implementation of HdfsScanner interface.
bool only_parsing_header_
True if we are parsing the header for this scanner.
const bool * is_materialized_col()
THdfsCompression::type decompression_type_
The most recently used decompression type.
Definition: hdfs-scanner.h:201
static Status IssueInitialRanges(HdfsScanNode *scan_node, const std::vector< HdfsFileDesc * > &files)
Issue io manager byte ranges for 'files'.
virtual void Close()
Definition: hdfs-scanner.cc:82
bool GetBytes(int64_t requested_len, uint8_t **buffer, int64_t *out_len, Status *status, bool peek=false)
LLVM code generator. This is the top level object to generate jitted code.
Definition: llvm-codegen.h:107
RuntimeState * state_
RuntimeState for error reporting.
Definition: hdfs-scanner.h:144
THdfsCompression::type file_compression
const HdfsFileDesc * file_desc()
HdfsFileDesc * GetFileDesc(const std::string &filename)
Returns the file desc for 'filename'. Returns NULL if filename is invalid.
Status UpdateDecompressor(const THdfsCompression::type &compression)
bool LogError(const ErrorMsg &msg)
void InitTuple(Tuple *template_tuple, Tuple *tuple)
Definition: hdfs-scanner.h:355
int GetMemory(MemPool **pool, Tuple **tuple_mem, TupleRow **tuple_row_mem)
Status AddDiskIoRanges(const std::vector< DiskIoMgr::ScanRange * > &ranges)
Adds ranges to the io mgr queue and starts up new scanner threads if possible.
static Status IssueInitialRanges(HdfsScanNode *scan_node, const std::vector< HdfsFileDesc * > &files)
boost::scoped_ptr< DelimitedTextParser > delimited_text_parser_
Helper class for picking fields and rows from delimited text.
bool IsCancelled() const
Definition: status.h:174
bool eof() const
If true, the stream has reached the end of the file.
ObjectPool pool
static const char * LLVM_CLASS_NAME
Status FindFirstTuple(bool *tuple_found)
HdfsTextScanner(HdfsScanNode *scan_node, RuntimeState *state)
static llvm::Function * Codegen(HdfsScanNode *, const std::vector< ExprContext * > &conjunct_ctxs)
Codegen writing tuples and evaluating predicates.
bool SkipBytes(int64_t length, Status *)
Skip over the next length bytes in the specified HDFS file.
Status CommitRows(int num_rows)
RuntimeState * runtime_state()
int slot_idx_
Index into materialized_slots_ for the next slot to output for the current tuple. ...
int64_t rows_returned() const
Definition: exec-node.h:157
int batch_size() const
Definition: runtime-state.h:98
bool IR_ALWAYS_INLINE EvalConjuncts(TupleRow *row)
Definition: hdfs-scanner.h:266
RuntimeProfile::Counter * parse_delimiter_timer_
Time parsing text files.
static llvm::Function * CodegenWriteAlignedTuples(HdfsScanNode *, LlvmCodeGen *, llvm::Function *write_tuple_fn)
void AttachPool(MemPool *pool, bool commit_batch)
Definition: hdfs-scanner.h:256
int WriteFields(MemPool *, TupleRow *tuple_row_mem, int num_fields, int num_tuples)
Status FillByteBufferGzip(bool *eosr)
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
boost::scoped_ptr< MemPool > boundary_pool_
Mem pool for boundary_row_ and boundary_column_.
#define UNLIKELY(expr)
Definition: compiler-util.h:33
bool codegen_enabled() const
Returns true if codegen is enabled for this query.
DEFINE_bool(debug_disable_streaming_gzip, false,"Debug flag, will be removed. Disables ""streaming gzip decompression.")
virtual Status ProcessSplit()
char * byte_buffer_ptr_
Current position in byte buffer.
DiskIoMgr::ScanRange * AllocateScanRange(hdfsFS fs, const char *file, int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool try_cache, bool expected_local, int64_t mtime)
static const Status OK
Definition: status.h:87
int WriteAlignedTuples(MemPool *pool, TupleRow *tuple_row_mem, int row_size, FieldLocation *fields, int num_tuples, int max_added_tuples, int slots_per_tuple, int row_start_indx)
Status GetCodegen(LlvmCodeGen **codegen, bool initialize=true)
void CopyBoundaryField(FieldLocation *data, MemPool *pool)
Metadata for a single partition inside an Hdfs table.
Definition: descriptors.h:177
static const std::string LZO_INDEX_SUFFIX
Suffix for lzo index files.
Tuple * tuple_
Current tuple pointer into tuple_mem_.
Definition: hdfs-scanner.h:170
int64_t byte_buffer_read_size_
Actual bytes received from last file read.
const DiskIoMgr::ScanRange * scan_range()
#define VLOG_FILE
Definition: logging.h:58
int tuple_idx() const
bool abort_on_error() const
Definition: runtime-state.h:99
bool ok() const
Definition: status.h:172
RuntimeProfile::Counter * decompress_timer_
Time spent decompressing bytes.
Definition: hdfs-scanner.h:208
static const std::string SCANNER_THREAD_TOTAL_WALLCLOCK_TIME
Definition: scan-node.h:131
Status InitializeWriteTuplesFn(HdfsPartitionDescriptor *partition, THdfsFileFormat::type type, const std::string &scanner_name)
Definition: hdfs-scanner.cc:87
HdfsPartitionDescriptor * partition_descriptor()
int num_materialized_partition_keys() const
Returns number of materialized partition key slots.
virtual Status FillByteBuffer(bool *eosr, int num_bytes=0)
ScannerContext::Stream * stream_
The first stream for context_.
Definition: hdfs-scanner.h:150
uint8_t * Allocate(int size)
Definition: mem-pool.h:92
const TupleDescriptor * tuple_desc()
Tuple * next_tuple(Tuple *t) const
Definition: hdfs-scanner.h:363
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
Definition: hdfs-scanner.cc:71
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161
RuntimeProfile::Counter * materialize_tuple_timer() const
Definition: scan-node.h:104