Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-rcfile-scanner.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <boost/algorithm/string.hpp>
18 
19 #include "exec/hdfs-scan-node.h"
23 #include "exprs/expr.h"
24 #include "runtime/descriptors.h"
25 #include "runtime/runtime-state.h"
26 #include "runtime/mem-pool.h"
27 #include "runtime/raw-value.h"
28 #include "runtime/tuple-row.h"
29 #include "runtime/tuple.h"
30 #include "runtime/string-value.h"
31 #include "util/codec.h"
32 #include "util/string-parser.h"
33 
34 #include "gen-cpp/PlanNodes_types.h"
35 
36 #include "common/names.h"
37 using namespace impala;
38 
40  "org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer";
41 
43  "org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer";
44 
46  "hive.io.rcfile.column.number";
47 
48 const uint8_t HdfsRCFileScanner::RCFILE_VERSION_HEADER[4] = {'R', 'C', 'F', 1};
49 
50 // Macro to convert between SerdeUtil errors to Status returns.
51 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
52 
54  : BaseSequenceScanner(scan_node, state) {
55 }
56 
58 }
59 
62  text_converter_.reset(
65  return Status::OK;
66 }
67 
69  DCHECK(header_ != NULL);
70 
71  only_parsing_header_ = false;
73 
74  // Can reuse buffer if there are no string columns (since the tuple won't contain
75  // ptrs into the decompressed data).
77 
78  // The scanner currently copies all the column data out of the io buffer so the
79  // stream never contains any tuple data.
81 
82  if (header_->is_compressed) {
85  }
86 
87  // Allocate the buffers for the key information that is used to read and decode
88  // the column data.
89  columns_.resize(reinterpret_cast<RcFileHeader*>(header_)->num_cols);
90  int num_table_cols =
92  for (int i = 0; i < columns_.size(); ++i) {
93  if (i < num_table_cols) {
94  int col_idx = i + scan_node_->num_partition_keys();
95  columns_[i].materialize_column = scan_node_->GetMaterializedSlotIdx(
96  vector<int>(1, col_idx)) != HdfsScanNode::SKIP_COLUMN;
97  } else {
98  // Treat columns not found in table metadata as extra unmaterialized columns
99  columns_[i].materialize_column = false;
100  }
101  }
102 
103  // TODO: Initialize codegen fn here
104  return Status::OK;
105 }
106 
108  uint8_t* header;
109 
110  RcFileHeader* rc_header = reinterpret_cast<RcFileHeader*>(header_);
111  // Validate file version
113  sizeof(RCFILE_VERSION_HEADER), &header, &parse_status_));
116  rc_header->version = SEQ6;
117  } else if (!memcmp(header, RCFILE_VERSION_HEADER, sizeof(RCFILE_VERSION_HEADER))) {
118  rc_header->version = RCF1;
119  } else {
120  stringstream ss;
121  ss << "Invalid RCFILE_VERSION_HEADER: '"
122  << ReadWriteUtil::HexDump(header, sizeof(RCFILE_VERSION_HEADER)) << "'";
123  return Status(ss.str());
124  }
125 
126  if (rc_header->version == SEQ6) {
127  // Validate class name key/value
128  uint8_t* class_name_key;
129  int64_t len;
131  stream_->ReadText(&class_name_key, &len, &parse_status_));
132  if (len != strlen(HdfsRCFileScanner::RCFILE_KEY_CLASS_NAME) ||
133  memcmp(class_name_key, HdfsRCFileScanner::RCFILE_KEY_CLASS_NAME, len)) {
134  stringstream ss;
135  ss << "Invalid RCFILE_KEY_CLASS_NAME: '"
136  << string(reinterpret_cast<char*>(class_name_key), len)
137  << "' len=" << len;
138  return Status(ss.str());
139  }
140 
141  uint8_t* class_name_val;
143  stream_->ReadText(&class_name_val, &len, &parse_status_));
144  if (len != strlen(HdfsRCFileScanner::RCFILE_VALUE_CLASS_NAME) ||
145  memcmp(class_name_val, HdfsRCFileScanner::RCFILE_VALUE_CLASS_NAME, len)) {
146  stringstream ss;
147  ss << "Invalid RCFILE_VALUE_CLASS_NAME: '"
148  << string(reinterpret_cast<char*>(class_name_val), len)
149  << "' len=" << len;
150  return Status(ss.str());
151  }
152  }
153 
154  // Check for compression
157  if (rc_header->version == SEQ6) {
158  // Read the is_blk_compressed header field. This field should *always*
159  // be FALSE, and is the result of using the sequence file header format in the
160  // original RCFile format.
161  bool is_blk_compressed;
163  stream_->ReadBoolean(&is_blk_compressed, &parse_status_));
164  if (is_blk_compressed) {
165  stringstream ss;
166  ss << "RC files do no support block compression.";
167  return Status(ss.str());
168  }
169  }
170  if (header_->is_compressed) {
171  uint8_t* codec_ptr;
172  int64_t len;
173  // Read the codec and get the right decompressor class.
174  RETURN_IF_FALSE(stream_->ReadText(&codec_ptr, &len, &parse_status_));
175  header_->codec = string(reinterpret_cast<char*>(codec_ptr), len);
176  Codec::CodecMap::const_iterator it = Codec::CODEC_MAP.find(header_->codec);
177  DCHECK(it != Codec::CODEC_MAP.end());
178  header_->compression_type = it->second;
179  } else {
180  header_->compression_type = THdfsCompression::NONE;
181  }
182  VLOG_FILE << stream_->filename() << ": "
183  << (header_->is_compressed ? "compressed" : "not compressed");
185 
187 
188  // Read file sync marker
189  uint8_t* sync;
191  memcpy(header_->sync, sync, SYNC_HASH_SIZE);
192 
194  return Status::OK;
195 }
196 
198  int map_size = 0;
200 
201  for (int i = 0; i < map_size; ++i) {
202  uint8_t* key, *value;
203  int64_t key_len, value_len;
204  RETURN_IF_FALSE(stream_->ReadText(&key, &key_len, &parse_status_));
205  RETURN_IF_FALSE(stream_->ReadText(&value, &value_len, &parse_status_));
206 
207  if (key_len == strlen(RCFILE_METADATA_KEY_NUM_COLS) &&
208  !memcmp(key, HdfsRCFileScanner::RCFILE_METADATA_KEY_NUM_COLS, key_len)) {
209  string value_str(reinterpret_cast<char*>(value), value_len);
211  int num_cols =
212  StringParser::StringToInt<int>(value_str.c_str(), value_str.size(), &result);
213  if (result != StringParser::PARSE_SUCCESS) {
214  stringstream ss;
215  ss << "Could not parse number of columns in file " << stream_->filename()
216  << ": " << value_str;
217  if (result == StringParser::PARSE_OVERFLOW) ss << " (result overflowed)";
218  return Status(ss.str());
219  }
220  RcFileHeader* rc_header = reinterpret_cast<RcFileHeader*>(header_);
221  rc_header->num_cols = num_cols;
222  }
223  }
224  return Status::OK;
225 }
226 
228  return new RcFileHeader;
229 }
230 
232  num_rows_ = 0;
233  row_pos_ = 0;
234  key_length_ = 0;
236 
237  for (int i = 0; i < columns_.size(); ++i) {
238  columns_[i].buffer_len = 0;
239  columns_[i].buffer_pos = 0;
240  columns_[i].uncompressed_buffer_len = 0;
241  columns_[i].key_buffer_len = 0;
242  columns_[i].key_buffer_pos = 0;
243  columns_[i].current_field_len = 0;
244  columns_[i].current_field_len_rep = 0;
245  }
246 
247  // We are done with this row group, pass along external buffers if necessary.
249  AttachPool(data_buffer_pool_.get(), true);
251  }
252 }
253 
255  ResetRowGroup();
256 
257  while (num_rows_ == 0) {
261  // Allocate a new buffer for reading the row group. Row groups have a
262  // fixed number of rows so take a guess at how big it will be based on
263  // the previous row group size.
264  // The row group length depends on the user data and can be very big. This
265  // can cause us to go way over the mem limit so use TryAllocate instead.
267  if (row_group_length_ > 0 && row_group_buffer_ == NULL) {
268  return state_->SetMemLimitExceeded(
270  }
272  }
274  }
275  return Status::OK;
276 }
277 
279  int32_t record_length;
280  RETURN_IF_FALSE(stream_->ReadInt(&record_length, &parse_status_));
281  if (record_length < 0) {
282  stringstream ss;
283  int64_t position = stream_->file_offset();
284  position -= sizeof(int32_t);
285  ss << "Bad record length: " << record_length << " at offset: " << position;
286  return Status(ss.str());
287  }
289  if (key_length_ < 0) {
290  stringstream ss;
291  int64_t position = stream_->file_offset();
292  position -= sizeof(int32_t);
293  ss << "Bad key length: " << key_length_ << " at offset: " << position;
294  return Status(ss.str());
295  }
297  if (compressed_key_length_ < 0) {
298  stringstream ss;
299  int64_t position = stream_->file_offset();
300  position -= sizeof(int32_t);
301  ss << "Bad compressed key length: " << compressed_key_length_
302  << " at offset: " << position;
303  return Status(ss.str());
304  }
305  return Status::OK;
306 }
307 
309  if (key_buffer_.size() < key_length_) key_buffer_.resize(key_length_);
310  uint8_t* key_buffer = &key_buffer_[0];
311 
312  if (header_->is_compressed) {
313  uint8_t* compressed_buffer;
315  compressed_key_length_, &compressed_buffer, &parse_status_));
316  {
319  compressed_buffer, &key_length_, &key_buffer));
320  VLOG_FILE << "Decompressed " << compressed_key_length_ << " to " << key_length_;
321  }
322  } else {
323  uint8_t* buffer;
326  // Make a copy of this buffer. The underlying IO buffer will get recycled
327  memcpy(key_buffer, buffer, key_length_);
328  }
329 
330  row_group_length_ = 0;
331  uint8_t* key_buf_ptr = key_buffer;
332  int bytes_read = ReadWriteUtil::GetVInt(key_buf_ptr, &num_rows_);
333  key_buf_ptr += bytes_read;
334 
335  for (int col_idx = 0; col_idx < columns_.size(); ++col_idx) {
336  GetCurrentKeyBuffer(col_idx, !columns_[col_idx].materialize_column, &key_buf_ptr);
337  DCHECK_LE(key_buf_ptr, key_buffer + key_length_);
338  }
339  DCHECK_EQ(key_buf_ptr, key_buffer + key_length_);
340 
341  return Status::OK;
342 }
343 
344 void HdfsRCFileScanner::GetCurrentKeyBuffer(int col_idx, bool skip_col_data,
345  uint8_t** key_buf_ptr) {
346  ColumnInfo& col_info = columns_[col_idx];
347 
348  int bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr, &col_info.buffer_len);
349  *key_buf_ptr += bytes_read;
350 
351  bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr, &col_info.uncompressed_buffer_len);
352  *key_buf_ptr += bytes_read;
353 
354  int col_key_buf_len;
355  bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr , &col_key_buf_len);
356  *key_buf_ptr += bytes_read;
357 
358  if (!skip_col_data) {
359  col_info.key_buffer = *key_buf_ptr;
360 
361  // Set the offset for the start of the data for this column in the allocated buffer.
362  col_info.start_offset = row_group_length_;
364  }
365  *key_buf_ptr += col_key_buf_len;
366 }
367 
369  ColumnInfo& col_info = columns_[col_idx];
370  col_info.buffer_pos += col_info.current_field_len;
371 
372  if (col_info.current_field_len_rep > 0) {
373  // repeat the previous length
374  --col_info.current_field_len_rep;
375  } else {
376  // Get the next column length or repeat count
377  int64_t length = 0;
378  uint8_t* col_key_buf = col_info.key_buffer;
379  int bytes_read = ReadWriteUtil::GetVLong(
380  col_key_buf, col_info.key_buffer_pos, &length);
381  if (bytes_read == -1) {
382  int64_t position = stream_->file_offset();
383  stringstream ss;
384  ss << "Invalid column length at offset: " << position;
385  return Status(ss.str());
386  }
387  col_info.key_buffer_pos += bytes_read;
388 
389  if (length < 0) {
390  // The repeat count is stored as the logical negation of the number of repetitions.
391  // See the column-key-buffer comment in hdfs-rcfile-scanner.h.
392  col_info.current_field_len_rep = ~length - 1;
393  } else {
394  col_info.current_field_len = length;
395  }
396  }
397  return Status::OK;
398 }
399 
401  // TODO: Wrap this in an iterator and prevent people from alternating
402  // calls to NextField()/NextRow()
403  DCHECK_LT(row_pos_, num_rows_);
404  for (int col_idx = 0; col_idx < columns_.size(); ++col_idx) {
405  if (columns_[col_idx].materialize_column) {
406  RETURN_IF_ERROR(NextField(col_idx));
407  }
408  }
409  ++row_pos_;
410  return Status::OK;
411 }
412 
414  for (int col_idx = 0; col_idx < columns_.size(); ++col_idx) {
415  ColumnInfo& column = columns_[col_idx];
416  if (!columns_[col_idx].materialize_column) {
417  // Not materializing this column, just skip it.
420  continue;
421  }
422 
423  // TODO: Stream through these column buffers instead of reading everything
424  // in at once.
425  DCHECK_LE(column.uncompressed_buffer_len + column.start_offset, row_group_length_);
426  if (header_->is_compressed) {
427  uint8_t* compressed_input;
429  column.buffer_len, &compressed_input, &parse_status_));
430  uint8_t* compressed_output = row_group_buffer_ + column.start_offset;
431  {
433  RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, column.buffer_len,
434  compressed_input, &column.uncompressed_buffer_len,
435  &compressed_output));
436  VLOG_FILE << "Decompressed " << column.buffer_len << " to "
437  << column.uncompressed_buffer_len;
438  }
439  } else {
440  uint8_t* uncompressed_data;
442  column.buffer_len, &uncompressed_data, &parse_status_));
443  // TODO: this is bad. Remove this copy.
444  memcpy(row_group_buffer_ + column.start_offset,
445  uncompressed_data, column.buffer_len);
446  }
447  }
448  return Status::OK;
449 }
450 
452  ResetRowGroup();
453 
454  // HdfsRCFileScanner effectively does buffered IO, in that it reads all the
455  // materialized columns into a row group buffer.
456  // It will then materialize tuples from the row group buffer. When the row
457  // group is complete, it will move onto the next row group.
458  while (!finished()) {
459  DCHECK_EQ(num_rows_, row_pos_);
460  // Finished materializing this row group, read the next one.
462  if (num_rows_ == 0) break;
463 
464  while (num_rows_ != row_pos_) {
466 
467  // Indicates whether the current row has errors.
468  bool error_in_row = false;
469  const vector<SlotDescriptor*>& materialized_slots =
471  vector<SlotDescriptor*>::const_iterator it;
472 
473  // Materialize rows from this row group in row batch sizes
474  MemPool* pool;
475  Tuple* tuple;
476  TupleRow* current_row;
477  int max_tuples = GetMemory(&pool, &tuple, &current_row);
478  max_tuples = min(max_tuples, num_rows_ - row_pos_);
479 
480  if (materialized_slots.empty()) {
481  // If there are no materialized slots (e.g. count(*) or just partition cols)
482  // we can shortcircuit the parse loop
483  row_pos_ += max_tuples;
484  int num_to_commit = WriteEmptyTuples(context_, current_row, max_tuples);
485  COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
486  RETURN_IF_ERROR(CommitRows(num_to_commit));
487  continue;
488  }
489 
490  int num_to_commit = 0;
491  for (int i = 0; i < max_tuples; ++i) {
493 
494  // Initialize tuple from the partition key template tuple before writing the
495  // slots
496  InitTuple(template_tuple_, tuple);
497 
498  for (it = materialized_slots.begin(); it != materialized_slots.end(); ++it) {
499  const SlotDescriptor* slot_desc = *it;
500  int file_column_idx = slot_desc->col_pos() - scan_node_->num_partition_keys();
501 
502  // Set columns missing in this file to NULL
503  if (file_column_idx >= columns_.size()) {
504  tuple->SetNull(slot_desc->null_indicator_offset());
505  continue;
506  }
507 
508  ColumnInfo& column = columns_[file_column_idx];
509  DCHECK(column.materialize_column);
510 
511  const char* col_start = reinterpret_cast<const char*>(
512  row_group_buffer_ + column.start_offset + column.buffer_pos);
513  int field_len = column.current_field_len;
514  DCHECK_LE(col_start + field_len,
515  reinterpret_cast<const char*>(row_group_buffer_ + row_group_length_));
516 
517  if (!text_converter_->WriteSlot(slot_desc, tuple, col_start, field_len,
518  false, false, pool)) {
519  ReportColumnParseError(slot_desc, col_start, field_len);
520  error_in_row = true;
521  }
522  }
523 
524  if (error_in_row) {
525  error_in_row = false;
526  if (state_->LogHasSpace()) {
527  stringstream ss;
528  ss << "file: " << stream_->filename();
529  state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
530  }
531  if (state_->abort_on_error()) {
533  return Status(state_->ErrorLog());
534  }
535  }
536 
537  current_row->SetTuple(scan_node_->tuple_idx(), tuple);
538  // Evaluate the conjuncts and add the row to the batch
539  if (EvalConjuncts(current_row)) {
540  ++num_to_commit;
541  current_row = next_row(current_row);
542  tuple = next_tuple(tuple);
543  }
544  }
545  COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
546  RETURN_IF_ERROR(CommitRows(num_to_commit));
547  if (scan_node_->ReachedLimit()) return Status::OK;
548  }
549 
550  // RCFiles don't end with syncs
551  if (stream_->eof()) return Status::OK;
552 
553  // Check for sync by looking for the marker that precedes syncs.
554  int marker;
555  RETURN_IF_FALSE(stream_->ReadInt(&marker, &parse_status_, /* peek */ true));
556  if (marker == HdfsRCFileScanner::SYNC_MARKER) {
557  RETURN_IF_FALSE(stream_->ReadInt(&marker, &parse_status_, /* peek */ false));
559  }
560  }
561  return Status::OK;
562 }
563 
564 void HdfsRCFileScanner::DebugString(int indentation_level, stringstream* out) const {
565  // TODO: Add more details of internal state.
566  *out << string(indentation_level * 2, ' ')
567  << "HdfsRCFileScanner(tupleid=" << scan_node_->tuple_idx()
568  << " file=" << stream_->filename();
569  // TODO: Scanner::DebugString
570  // ExecNode::DebugString(indentation_level, out);
571  *out << "])" << endl;
572 }
const std::vector< SlotDescriptor * > & materialized_slots() const
const std::string & null_column_value() const
Definition: descriptors.h:233
static const CodecMap CODEC_MAP
Definition: codec.h:52
boost::scoped_ptr< Codec > decompressor_
Decompressor class to use, if any.
Definition: hdfs-scanner.h:198
void ReportColumnParseError(const SlotDescriptor *desc, const char *data, int len)
virtual FileHeader * AllocateFileHeader()
Implementation of superclass functions.
HdfsScanNode * scan_node_
The scan node that started this scanner.
Definition: hdfs-scanner.h:141
void SetNull(const NullIndicatorOffset &offset)
Definition: tuple.h:101
int num_partition_keys() const
Returns number of partition keys in the table, including non-materialized slots.
int32_t current_field_len_rep
RLE: Repetition count of the current field.
ScannerContext * context_
Context for this scanner.
Definition: hdfs-scanner.h:147
bool only_parsing_header_
If true, this scanner object is only for processing the header.
int64_t total_bytes_returned()
Returns the total number of bytes returned.
FileHeader * header_
File header for this scan range. This is not owned by the parent scan node.
boost::scoped_ptr< MemPool > data_buffer_pool_
Definition: hdfs-scanner.h:205
std::vector< uint8_t > key_buffer_
Buffer for copying key buffers. This buffer is reused between row groups.
void DebugString(int indentation_level, std::stringstream *out) const
bool ReadInt(int32_t *val, Status *, bool peek=false)
MemTracker * mem_tracker()
Definition: exec-node.h:162
std::string ErrorLog()
Returns the error log lines as a string joined with ' '.
static Status CreateDecompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *decompressor)
int GetMaterializedSlotIdx(const std::vector< int > &path) const
static const int SYNC_HASH_SIZE
Size of the sync hash field.
boost::scoped_ptr< TextConverter > text_converter_
Helper class for converting text to other types;.
Definition: hdfs-scanner.h:186
const HdfsTableDescriptor * hdfs_table()
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
int32_t buffer_pos
Offset from the start of the column for the next field in the column.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
int num_rows_
number of rows in this rowgroup object
int32_t buffer_len
Uncompressed and compressed byte lengths for this column.
HdfsRCFileScanner(HdfsScanNode *scan_node, RuntimeState *state)
int32_t current_field_len
RLE: Length of the current field.
int WriteEmptyTuples(RowBatch *row_batch, int num_tuples)
bool LogHasSpace()
Returns true if the error log has not reached max_errors_.
bool materialize_column
If true, this column should be materialized, otherwise, it can be skipped.
RuntimeProfile::Counter * rows_read_counter() const
Definition: scan-node.h:96
bool ReadBoolean(bool *boolean, Status *)
bool ReadText(uint8_t **buf, int64_t *length, Status *)
static const uint8_t SEQFILE_VERSION_HEADER[4]
TupleRow * next_row(TupleRow *r) const
Definition: hdfs-scanner.h:368
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
#define COUNTER_ADD(c, v)
const NullIndicatorOffset & null_indicator_offset() const
Definition: descriptors.h:89
int64_t file_offset() const
Returns the buffer's current offset in the file.
bool ReachedLimit()
Definition: exec-node.h:159
THdfsCompression::type compression_type
Enum for compression type.
#define SCOPED_TIMER(c)
const std::vector< SlotDescriptor * > & string_slots() const
Definition: descriptors.h:303
#define RETURN_IF_FALSE(x)
RuntimeState * state_
RuntimeState for error reporting.
Definition: hdfs-scanner.h:144
void ResetRowGroup()
Reset state for a new row group.
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
bool LogError(const ErrorMsg &msg)
void InitTuple(Tuple *template_tuple, Tuple *tuple)
Definition: hdfs-scanner.h:355
static const char *const RCFILE_VALUE_CLASS_NAME
int GetMemory(MemPool **pool, Tuple **tuple_mem, TupleRow **tuple_row_mem)
void ReportFileErrors(const std::string &file_name, int num_errors)
Report that num_errors occurred while parsing file_name.
bool eof() const
If true, the stream has reached the end of the file.
uint8_t * key_buffer
This is a ptr into the scanner's key_buffer_ for this column.
ObjectPool pool
bool SkipBytes(int64_t length, Status *)
Skip over the next length bytes in the specified HDFS file.
Status CommitRows(int num_rows)
virtual Status InitNewRange()
Reset internal state for a new scan range.
static const int SKIP_COLUMN
void GetCurrentKeyBuffer(int col_idx, bool skip_col_data, uint8_t **key_buf_ptr)
int32_t start_offset
Offset into row_group_buffer_ for the start of this column.
std::vector< ColumnInfo > columns_
int col_pos() const
Definition: descriptors.h:84
static int GetVInt(uint8_t *buf, int32_t *vint)
static const uint8_t RCFILE_VERSION_HEADER[4]
static const int SYNC_MARKER
Sync indicator.
bool IR_ALWAYS_INLINE EvalConjuncts(TupleRow *row)
Definition: hdfs-scanner.h:266
Status SetMemLimitExceeded(MemTracker *tracker=NULL, int64_t failed_allocation_size=0)
void AttachPool(MemPool *pool, bool commit_batch)
Definition: hdfs-scanner.h:256
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
uint8_t sync[SYNC_HASH_SIZE]
The sync hash for this file.
static int GetVLong(uint8_t *buf, int64_t *vlong)
static const Status OK
Definition: status.h:87
static const char *const RCFILE_METADATA_KEY_NUM_COLS
std::string codec
Codec name if it is compressed.
static std::string HexDump(const uint8_t *buf, int64_t length)
Dump the first length bytes of buf to a Hex string.
#define VLOG_FILE
Definition: logging.h:58
int tuple_idx() const
bool abort_on_error() const
Definition: runtime-state.h:99
RuntimeProfile::Counter * decompress_timer_
Time spent decompressing bytes.
Definition: hdfs-scanner.h:208
bool ReadBytes(int64_t length, uint8_t **buf, Status *, bool peek=false)
bool is_compressed
true if the file is compressed
int32_t key_buffer_pos
Current position in the key buffer.
void IncNumScannersCodegenDisabled()
ScannerContext::Stream * stream_
The first stream for context_.
Definition: hdfs-scanner.h:150
const TupleDescriptor * tuple_desc()
Tuple * next_tuple(Tuple *t) const
Definition: hdfs-scanner.h:363
static const char *const RCFILE_KEY_CLASS_NAME
Data that is fixed across headers. This struct is shared between scan ranges.
RuntimeProfile::Counter * materialize_tuple_timer() const
Definition: scan-node.h:104