30 using namespace impala;
34 "org.apache.hadoop.io.Text";
38 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
42 unparsed_data_buffer_(NULL),
43 num_buffered_records_in_compressed_block_(0) {
51 const vector<ExprContext*>& conjunct_ctxs) {
55 Function* write_complete_tuple_fn =
57 if (write_complete_tuple_fn == NULL)
return NULL;
84 THdfsFileFormat::SEQUENCE_FILE,
"HdfsSequenceScanner"));
104 int64_t* record_len) {
117 ss <<
"Invalid record size: " << in_size;
120 uint8_t* compressed_data;
129 VLOG_FILE <<
"Decompressed " << in_size <<
" to " << len;
134 if (size == -1)
return Status(
"Invalid record sizse");
139 if (*record_len < 0) {
141 ss <<
"Invalid record length: " << *record_len;
182 if (sync_indicator != -1) {
185 ss <<
"Expecting sync indicator (-1) at file offset "
187 <<
"Sync indicator found " << sync_indicator <<
".";
190 return Status(
"Bad sync hash");
214 int field_location_offset = 0;
215 for (
int i = 0; i < num_to_process; ++i) {
220 return Status(
"Invalid record sizes in compressed block.");
228 for (
int i = 0; i < num_to_process; ++i) {
242 field_location_offset += num_fields;
293 DCHECK_GT(max_tuples, 0);
296 bool add_row =
false;
305 uint8_t error_in_row =
false;
310 DCHECK(num_tuples == 1);
312 uint8_t errors[num_fields];
313 memset(errors, 0,
sizeof(errors));
353 ss <<
"Invalid SEQFILE_VERSION_HEADER: '"
366 ss <<
"Invalid SEQFILE_VALUE_CLASS_NAME: '"
367 << string(reinterpret_cast<char*>(class_name), len) <<
"'";
372 bool is_blk_compressed;
382 header_->
codec = string(reinterpret_cast<char*>(codec_ptr), len);
399 for (
int i = 0; i < map_size; ++i) {
424 position -=
sizeof(int32_t);
433 position -=
sizeof(int32_t);
453 ss <<
"Bad compressed block record count: "
457 return Status(
"bad record count");
469 int64_t block_size = 0;
474 ss <<
"Compressed block size is: " << block_size;
478 uint8_t* compressed_data = NULL;
486 VLOG_FILE <<
"Decompressed " << block_size <<
" to " << len;
const std::vector< SlotDescriptor * > & materialized_slots() const
const std::string & null_column_value() const
static const CodecMap CODEC_MAP
boost::scoped_ptr< Codec > decompressor_
Decompressor class to use, if any.
bool SkipText(Status *)
Skip this text object.
HdfsScanNode * scan_node_
The scan node that started this scanner.
boost::scoped_ptr< DelimitedTextParser > delimited_text_parser_
Helper class for picking fields and rows from delimited text.
Status GetRecord(uint8_t **record_ptr, int64_t *record_len)
int num_partition_keys() const
Returns number of partition keys in the table, including non-materialized slots.
uint8_t * next_record_in_compressed_block_
Next record from block compressed data.
ScannerContext * context_
Context for this scanner.
bool only_parsing_header_
If true, this scanner object is only for processing the header.
int64_t total_bytes_returned()
Returns the total number of bytes returned.
FileHeader * header_
File header for this scan range. This is not owned by the parent scan node.
boost::scoped_ptr< MemPool > data_buffer_pool_
static llvm::Function * CodegenWriteCompleteTuple(HdfsScanNode *, LlvmCodeGen *, const std::vector< ExprContext * > &conjunct_ctxs)
bool ReadInt(int32_t *val, Status *, bool peek=false)
std::vector< FieldLocation > field_locations_
static const int SYNC_HASH_SIZE
Size of the sync hash field.
boost::scoped_ptr< TextConverter > text_converter_
Helper class for converting text to other types;.
virtual Status ReadFileHeader()
WriteTuplesFn write_tuples_fn_
Jitted write tuples function pointer. Null if codegen is disabled.
virtual void LogRowParseError(int row_idx, std::stringstream *)
const HdfsTableDescriptor * hdfs_table()
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Status ReadCompressedBlock()
int64_t num_buffered_records_in_compressed_block_
Number of buffered records unparsed_data_buffer_ from block compressed data.
int WriteEmptyTuples(RowBatch *row_batch, int num_tuples)
bool LogHasSpace()
Returns true if the error log has not reached max_errors_.
int current_key_length_
Length of the current key. This is specified as 4 bytes in the format description.
virtual ~HdfsSequenceScanner()
RuntimeProfile::Counter * rows_read_counter() const
virtual FileHeader * AllocateFileHeader()
Implementation of sequence container super class methods.
bool ReadBoolean(bool *boolean, Status *)
bool ReadVLong(int64_t *val, Status *)
bool ReadText(uint8_t **buf, int64_t *length, Status *)
static const uint8_t SEQFILE_VERSION_HEADER[4]
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
#define COUNTER_ADD(c, v)
int64_t file_offset() const
Returns the buffer's current offset in the file.
const bool * is_materialized_col()
uint8_t * unparsed_data_buffer_
Buffer for data read from HDFS or from decompressing the HDFS data.
virtual Status Prepare(ScannerContext *context)
Implementation of HdfsScanner interface.
LLVM code generator. This is the top level object to generate jitted code.
RuntimeState * state_
RuntimeState for error reporting.
Status UpdateDecompressor(const THdfsCompression::type &compression)
bool LogError(const ErrorMsg &msg)
int GetMemory(MemPool **pool, Tuple **tuple_mem, TupleRow **tuple_row_mem)
static const char *const SEQFILE_VALUE_CLASS_NAME
bool eof() const
If true, the stream has reached the end of the file.
std::vector< RecordLocation > record_locations_
Status ProcessBlockCompressedScanRange()
bool SkipBytes(int64_t length, Status *)
Skip over the next length bytes in the specified HDFS file.
Status CommitRows(int num_rows)
#define RETURN_IF_FALSE(x)
static const int MAX_BLOCK_SIZE
RuntimeState * runtime_state()
bool WriteCompleteTuple(MemPool *pool, FieldLocation *fields, Tuple *tuple, TupleRow *tuple_row, Tuple *template_tuple, uint8_t *error_fields, uint8_t *error_in_row)
static const int SYNC_MARKER
Sync indicator.
int64_t rows_returned() const
static llvm::Function * CodegenWriteAlignedTuples(HdfsScanNode *, LlvmCodeGen *, llvm::Function *write_tuple_fn)
void AttachPool(MemPool *pool, bool commit_batch)
virtual Status InitNewRange()
Reset internal state for a new scan range.
virtual Status ProcessRange()
bool codegen_enabled() const
Returns true if codegen is enabled for this query.
static int GetVLong(uint8_t *buf, int64_t *vlong)
static llvm::Function * Codegen(HdfsScanNode *, const std::vector< ExprContext * > &conjunct_ctxs)
Codegen writing tuples and evaluating predicates.
int WriteAlignedTuples(MemPool *pool, TupleRow *tuple_row_mem, int row_size, FieldLocation *fields, int num_tuples, int max_added_tuples, int slots_per_tuple, int row_start_indx)
Status GetCodegen(LlvmCodeGen **codegen, bool initialize=true)
Metadata for a single partition inside an Hdfs table.
Tuple * tuple_
Current tuple pointer into tuple_mem_.
static std::string HexDump(const uint8_t *buf, int64_t length)
Dump the first length bytes of buf to a Hex string.
int current_block_length_
Length of the current sequence file block (or record).
char collection_delim() const
RuntimeProfile::Counter * decompress_timer_
Time spent decompressing bytes.
bool ReadBytes(int64_t length, uint8_t **buf, Status *, bool peek=false)
Status InitializeWriteTuplesFn(HdfsPartitionDescriptor *partition, THdfsFileFormat::type type, const std::string &scanner_name)
HdfsPartitionDescriptor * partition_descriptor()
bool ReportTupleParseError(FieldLocation *fields, uint8_t *errors, int row_idx)
ScannerContext::Stream * stream_
The first stream for context_.
HdfsSequenceScanner(HdfsScanNode *scan_node, RuntimeState *state)
Status ProcessDecompressedBlock()
RuntimeProfile::Counter * materialize_tuple_timer() const