17 #include <boost/algorithm/string.hpp>
34 #include "gen-cpp/PlanNodes_types.h"
37 using namespace impala;
40 "org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer";
43 "org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer";
46 "hive.io.rcfile.column.number";
51 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
92 for (
int i = 0; i <
columns_.size(); ++i) {
93 if (i < num_table_cols) {
99 columns_[i].materialize_column =
false;
121 ss <<
"Invalid RCFILE_VERSION_HEADER: '"
128 uint8_t* class_name_key;
135 ss <<
"Invalid RCFILE_KEY_CLASS_NAME: '"
136 << string(reinterpret_cast<char*>(class_name_key), len)
141 uint8_t* class_name_val;
147 ss <<
"Invalid RCFILE_VALUE_CLASS_NAME: '"
148 << string(reinterpret_cast<char*>(class_name_val), len)
161 bool is_blk_compressed;
164 if (is_blk_compressed) {
166 ss <<
"RC files do no support block compression.";
175 header_->
codec = string(reinterpret_cast<char*>(codec_ptr), len);
201 for (
int i = 0; i < map_size; ++i) {
202 uint8_t* key, *value;
203 int64_t key_len, value_len;
209 string value_str(reinterpret_cast<char*>(value), value_len);
212 StringParser::StringToInt<int>(value_str.c_str(), value_str.size(), &result);
215 ss <<
"Could not parse number of columns in file " <<
stream_->
filename()
216 <<
": " << value_str;
237 for (
int i = 0; i <
columns_.size(); ++i) {
240 columns_[i].uncompressed_buffer_len = 0;
244 columns_[i].current_field_len_rep = 0;
279 int32_t record_length;
281 if (record_length < 0) {
284 position -=
sizeof(int32_t);
285 ss <<
"Bad record length: " << record_length <<
" at offset: " << position;
292 position -=
sizeof(int32_t);
293 ss <<
"Bad key length: " <<
key_length_ <<
" at offset: " << position;
300 position -=
sizeof(int32_t);
302 <<
" at offset: " << position;
313 uint8_t* compressed_buffer;
331 uint8_t* key_buf_ptr = key_buffer;
333 key_buf_ptr += bytes_read;
335 for (
int col_idx = 0; col_idx <
columns_.size(); ++col_idx) {
345 uint8_t** key_buf_ptr) {
349 *key_buf_ptr += bytes_read;
352 *key_buf_ptr += bytes_read;
356 *key_buf_ptr += bytes_read;
358 if (!skip_col_data) {
365 *key_buf_ptr += col_key_buf_len;
381 if (bytes_read == -1) {
384 ss <<
"Invalid column length at offset: " << position;
404 for (
int col_idx = 0; col_idx <
columns_.size(); ++col_idx) {
405 if (
columns_[col_idx].materialize_column) {
414 for (
int col_idx = 0; col_idx <
columns_.size(); ++col_idx) {
416 if (!
columns_[col_idx].materialize_column) {
427 uint8_t* compressed_input;
435 &compressed_output));
440 uint8_t* uncompressed_data;
468 bool error_in_row =
false;
469 const vector<SlotDescriptor*>& materialized_slots =
471 vector<SlotDescriptor*>::const_iterator it;
477 int max_tuples =
GetMemory(&pool, &tuple, ¤t_row);
480 if (materialized_slots.empty()) {
490 int num_to_commit = 0;
491 for (
int i = 0; i < max_tuples; ++i) {
498 for (it = materialized_slots.begin(); it != materialized_slots.end(); ++it) {
503 if (file_column_idx >=
columns_.size()) {
511 const char* col_start =
reinterpret_cast<const char*
>(
514 DCHECK_LE(col_start + field_len,
517 if (!
text_converter_->WriteSlot(slot_desc, tuple, col_start, field_len,
518 false,
false, pool)) {
525 error_in_row =
false;
541 current_row =
next_row(current_row);
566 *out << string(indentation_level * 2,
' ')
571 *out <<
"])" << endl;
const std::vector< SlotDescriptor * > & materialized_slots() const
const std::string & null_column_value() const
static const CodecMap CODEC_MAP
boost::scoped_ptr< Codec > decompressor_
Decompressor class to use, if any.
void ReportColumnParseError(const SlotDescriptor *desc, const char *data, int len)
virtual FileHeader * AllocateFileHeader()
Implementation of superclass functions.
HdfsScanNode * scan_node_
The scan node that started this scanner.
void SetNull(const NullIndicatorOffset &offset)
int num_partition_keys() const
Returns number of partition keys in the table, including non-materialized slots.
int32_t current_field_len_rep
RLE: Repetition count of the current field.
ScannerContext * context_
Context for this scanner.
bool only_parsing_header_
If true, this scanner object is only for processing the header.
uint8_t * row_group_buffer_
int64_t total_bytes_returned()
Returns the total number of bytes returned.
FileHeader * header_
File header for this scan range. This is not owned by the parent scan node.
boost::scoped_ptr< MemPool > data_buffer_pool_
std::vector< uint8_t > key_buffer_
Buffer for copying key buffers. This buffer is reused between row groups.
void DebugString(int indentation_level, std::stringstream *out) const
bool ReadInt(int32_t *val, Status *, bool peek=false)
MemTracker * mem_tracker()
std::string ErrorLog()
Returns the error log lines as a string joined with ' '.
Status ReadNumColumnsMetadata()
static Status CreateDecompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *decompressor)
int GetMaterializedSlotIdx(const std::vector< int > &path) const
static const int SYNC_HASH_SIZE
Size of the sync hash field.
boost::scoped_ptr< TextConverter > text_converter_
Helper class for converting text to other types;.
virtual Status ReadFileHeader()
Status ReadColumnBuffers()
const HdfsTableDescriptor * hdfs_table()
A tuple with 0 materialised slots is represented as NULL.
int32_t buffer_pos
Offset from the start of the column for the next field in the column.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
int num_rows_
number of rows in this rowgroup object
int32_t buffer_len
Uncompressed and compressed byte lengths for this column.
HdfsRCFileScanner(HdfsScanNode *scan_node, RuntimeState *state)
int32_t current_field_len
RLE: Length of the current field.
int WriteEmptyTuples(RowBatch *row_batch, int num_tuples)
bool LogHasSpace()
Returns true if the error log has not reached max_errors_.
int compressed_key_length_
virtual ~HdfsRCFileScanner()
bool materialize_column
If true, this column should be materialized, otherwise, it can be skipped.
RuntimeProfile::Counter * rows_read_counter() const
bool ReadBoolean(bool *boolean, Status *)
bool ReadText(uint8_t **buf, int64_t *length, Status *)
static const uint8_t SEQFILE_VERSION_HEADER[4]
TupleRow * next_row(TupleRow *r) const
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
#define COUNTER_ADD(c, v)
const NullIndicatorOffset & null_indicator_offset() const
int64_t file_offset() const
Returns the buffer's current offset in the file.
bool reuse_row_group_buffer_
const std::vector< SlotDescriptor * > & string_slots() const
#define RETURN_IF_FALSE(x)
RuntimeState * state_
RuntimeState for error reporting.
void ResetRowGroup()
Reset state for a new row group.
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
int row_group_buffer_size_
bool LogError(const ErrorMsg &msg)
void InitTuple(Tuple *template_tuple, Tuple *tuple)
static const char *const RCFILE_VALUE_CLASS_NAME
int GetMemory(MemPool **pool, Tuple **tuple_mem, TupleRow **tuple_row_mem)
void ReportFileErrors(const std::string &file_name, int num_errors)
Report that num_errors occurred while parsing file_name.
bool eof() const
If true, the stream has reached the end of the file.
int32_t uncompressed_buffer_len
uint8_t * key_buffer
This is a ptr into the scanner's key_buffer_ for this column.
virtual Status ProcessRange()
Status ReadRowGroupHeader()
bool SkipBytes(int64_t length, Status *)
Skip over the next length bytes in the specified HDFS file.
Status CommitRows(int num_rows)
virtual Status InitNewRange()
Reset internal state for a new scan range.
static const int SKIP_COLUMN
void GetCurrentKeyBuffer(int col_idx, bool skip_col_data, uint8_t **key_buf_ptr)
int32_t start_offset
Offset into row_group_buffer_ for the start of this column.
std::vector< ColumnInfo > columns_
static int GetVInt(uint8_t *buf, int32_t *vint)
static const uint8_t RCFILE_VERSION_HEADER[4]
static const int SYNC_MARKER
Sync indicator.
bool IR_ALWAYS_INLINE EvalConjuncts(TupleRow *row)
Status SetMemLimitExceeded(MemTracker *tracker=NULL, int64_t failed_allocation_size=0)
void AttachPool(MemPool *pool, bool commit_batch)
void SetTuple(int tuple_idx, Tuple *tuple)
Status NextField(int col_idx)
static int GetVLong(uint8_t *buf, int64_t *vlong)
static const char *const RCFILE_METADATA_KEY_NUM_COLS
static std::string HexDump(const uint8_t *buf, int64_t length)
Dump the first length bytes of buf to a Hex string.
bool abort_on_error() const
RuntimeProfile::Counter * decompress_timer_
Time spent decompressing bytes.
bool ReadBytes(int64_t length, uint8_t **buf, Status *, bool peek=false)
int32_t key_buffer_pos
Current position in the key buffer.
void IncNumScannersCodegenDisabled()
ScannerContext::Stream * stream_
The first stream for context_.
void set_contains_tuple_data(bool v)
const TupleDescriptor * tuple_desc()
Tuple * next_tuple(Tuple *t) const
static const char *const RCFILE_KEY_CLASS_NAME
RuntimeProfile::Counter * materialize_tuple_timer() const