34 using boost::algorithm::ends_with;
35 using boost::algorithm::to_lower;
36 using namespace impala;
39 DEFINE_bool(debug_disable_streaming_gzip,
false,
"Debug flag, will be removed. Disables "
40 "streaming gzip decompression.");
53 byte_buffer_ptr_(NULL),
54 byte_buffer_end_(NULL),
55 byte_buffer_read_size_(0),
56 only_parsing_header_(false),
57 boundary_pool_(new
MemPool(scan_node->mem_tracker())),
58 boundary_row_(boundary_pool_.get()),
59 boundary_column_(boundary_pool_.get()),
61 error_in_row_(false) {
68 const vector<HdfsFileDesc*>& files) {
69 vector<DiskIoMgr::ScanRange*> compressed_text_scan_ranges;
70 vector<HdfsFileDesc*> lzo_text_files;
71 bool warning_written =
false;
72 for (
int i = 0; i < files.size(); ++i) {
73 warning_written =
false;
74 THdfsCompression::type compression = files[i]->file_compression;
75 switch (compression) {
76 case THdfsCompression::NONE:
82 case THdfsCompression::GZIP:
83 case THdfsCompression::SNAPPY:
84 case THdfsCompression::SNAPPY_BLOCKED:
85 case THdfsCompression::BZIP2:
86 for (
int j = 0; j < files[i]->splits.size(); ++j) {
93 if (split->
offset() != 0) {
94 if (!warning_written) {
100 ss <<
"For better performance, snappy, gzip and bzip-compressed files "
101 <<
"should not be split into multiple hdfs-blocks. file="
102 << files[i]->filename <<
" offset " << split->
offset();
104 warning_written =
true;
108 scan_node->
RangeComplete(THdfsFileFormat::TEXT, compression);
113 DCHECK_GT(files[i]->file_length, 0);
117 files[i]->fs, files[i]->filename.c_str(), files[i]->file_length, 0,
120 compressed_text_scan_ranges.push_back(file_range);
125 case THdfsCompression::LZO:
133 string lower_filename = files[i]->filename;
134 to_lower(lower_filename);
137 lzo_text_files.push_back(files[i]);
146 if (compressed_text_scan_ranges.size() > 0) {
149 if (lzo_text_files.size() > 0) {
170 <<
"FE should have generated SNAPPY_BLOCKED instead.";
174 int dummy_num_tuples;
214 collection_delim =
'\0';
220 field_delim, collection_delim, hdfs_partition->
escape_char()));
274 ss <<
"Read failed while trying to finish scan range: " <<
stream_->
filename()
294 DCHECK_GE(max_tuples, 1);
298 int num_tuples =
WriteFields(pool, tuple_row_mem, num_fields, 1);
299 DCHECK_LE(num_tuples, 1);
300 DCHECK_GE(num_tuples, 0);
318 if (num_tuples == 1)
break;
319 DCHECK_EQ(num_tuples, 0);
337 if (past_scan_range) {
346 DCHECK_GT(max_tuples, 0);
360 int num_tuples_materialized = 0;
362 (num_fields > 0 || *num_tuples > 0)) {
364 DCHECK_LE(*num_tuples, num_fields + 1);
369 num_tuples_materialized =
WriteFields(pool, tuple_row_mem, num_fields, *num_tuples);
370 DCHECK_GE(num_tuples_materialized, 0);
372 if (*num_tuples > 0) {
376 }
else if (*num_tuples != 0) {
387 char* last_row = NULL;
388 if (*num_tuples == 0) {
419 DCHECK_EQ(num_bytes, 0);
424 }
else if (!FLAGS_debug_disable_streaming_gzip &&
426 DCHECK_EQ(num_bytes, 0);
429 DCHECK_EQ(num_bytes, 0);
450 bool try_read_fixed_size =
false;
451 uint8_t* decompressed_buffer = NULL;
452 int64_t decompressed_len = 0;
454 uint8_t* gzip_buffer_ptr = NULL;
455 int64_t gzip_buffer_size = 0;
458 if (!try_read_fixed_size) {
465 try_read_fixed_size =
false;
467 if (gzip_buffer_size == 0) {
471 ss <<
"Unexpected end of file decompressing gzip. File may be malformed. ";
476 int64_t gzip_buffer_bytes_read = 0;
480 gzip_buffer_ptr, &gzip_buffer_bytes_read, &decompressed_len,
481 &decompressed_buffer, eosr));
482 DCHECK_GE(gzip_buffer_size, gzip_buffer_bytes_read);
483 DCHECK_GE(decompressed_len, 0);
491 if (!*eosr && decompressed_len == 0) {
498 if (try_read_fixed_size) {
500 ss <<
"Unable to make progress decoding gzip text. ";
504 VLOG_FILE <<
"Unable to make progress decompressing gzip, trying again";
505 try_read_fixed_size =
true;
507 }
while (try_read_fixed_size);
516 ss <<
"Unexpected end of gzip stream before end of file: ";
536 DCHECK_GT(file_size, 0);
551 ss <<
"Expected to read a compressed text file of size " << file_size <<
" bytes. "
558 int64_t decompressed_len = 0;
559 uint8_t* decompressed_buffer = NULL;
565 &decompressed_buffer));
580 *tuple_found =
false;
592 if (first_tuple_offset == -1) {
610 const vector<ExprContext*>& conjunct_ctxs) {
614 Function* write_complete_tuple_fn =
616 if (write_complete_tuple_fn == NULL)
return NULL;
651 *ss << string(row_start, row_end - row_start);
664 int num_fields,
int num_tuples) {
669 int num_tuples_processed = 0;
670 int num_tuples_materialized = 0;
677 bool copy_strings = num_partial_fields > num_fields;
678 num_partial_fields = min(num_partial_fields, num_fields);
703 ++num_tuples_processed;
707 ++num_tuples_materialized;
713 num_fields -= num_partial_fields;
714 fields += num_partial_fields;
718 if (num_tuples > 0) {
721 int tuples_returned = 0;
732 if (tuples_returned == -1)
return 0;
735 num_tuples_materialized += tuples_returned;
740 DCHECK_GE(num_fields, 0);
744 if (num_fields != 0) {
754 return num_tuples_materialized;
758 bool needs_escape = data->
len < 0;
759 int copy_len = needs_escape ? -data->
len : data->
len;
761 char* str_data =
reinterpret_cast<char*
>(pool->
Allocate(total_len));
764 data->
start = str_data;
765 data->
len = needs_escape ? -total_len : total_len;
769 int num_fields,
bool copy_strings) {
770 int next_line_offset = 0;
771 for (
int i = 0; i < num_fields; ++i) {
772 int need_escape =
false;
773 int len = fields[i].
len;
778 next_line_offset += (len + 1);
788 return next_line_offset;
const std::vector< SlotDescriptor * > & materialized_slots() const
const std::string & null_column_value() const
Status ProcessRange(int *num_tuples, bool past_scan_range)
virtual Status InitNewRange()
boost::scoped_ptr< Codec > decompressor_
Decompressor class to use, if any.
void ReportColumnParseError(const SlotDescriptor *desc, const char *data, int len)
std::vector< char * > row_end_locations_
const StringValue & str() const
Returns the underlying StringValue.
HdfsScanNode * scan_node_
The scan node that started this scanner.
const std::string GetDetail() const
virtual void LogRowParseError(int row_idx, std::stringstream *)
int Size() const
Returns the length of the current string.
int num_partition_keys() const
Returns number of partition keys in the table, including non-materialized slots.
Status GetBuffer(bool peek, uint8_t **buffer, int64_t *out_len)
ScannerContext * context_
Context for this scanner.
void Append(const char *str, int len)
Append 'str' to the current string, allocating a new buffer as necessary.
StringBuffer boundary_column_
Helper string for dealing with columns that span file blocks.
RuntimeProfile::HighWaterMarkCounter * max_compressed_text_file_length()
boost::scoped_ptr< MemPool > data_buffer_pool_
static llvm::Function * CodegenWriteCompleteTuple(HdfsScanNode *, LlvmCodeGen *, const std::vector< ExprContext * > &conjunct_ctxs)
std::string ErrorLog()
Returns the error log lines as a string joined with ' '.
boost::scoped_ptr< TextConverter > text_converter_
Helper class for converting text to other types;.
WriteTuplesFn write_tuples_fn_
Jitted write tuples function pointer. Null if codegen is disabled.
char * byte_buffer_end_
Ending position of HDFS buffer.
Status FillByteBufferCompressedFile(bool *eosr)
const HdfsTableDescriptor * hdfs_table()
void Clear()
Clear the underlying StringValue. The allocated buffer can be reused.
void ReleaseCompletedResources(RowBatch *batch, bool done)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
bool Empty() const
Returns whether the current string is empty.
int WriteEmptyTuples(RowBatch *row_batch, int num_tuples)
bool LogHasSpace()
Returns true if the error log has not reached max_errors_.
static const int NEXT_BLOCK_READ_SIZE
RuntimeProfile::Counter * rows_read_counter() const
const int64_t GZIP_FIXED_READ_SIZE
Status FinishScanRange()
Reads past the end of the scan range for the next tuple end.
static Tuple * Create(int size, MemPool *pool)
initialize individual tuple with data residing in mem pool
TupleRow * next_row(TupleRow *r) const
#define COUNTER_ADD(c, v)
int64_t file_offset() const
Returns the buffer's current offset in the file.
std::vector< FieldLocation > field_locations_
Return field locations from the Delimited Text Parser.
void RangeComplete(const THdfsFileFormat::type &file_type, const THdfsCompression::type &compression_type)
StringBuffer boundary_row_
#define ADD_CHILD_TIMER(profile, name, parent)
int WritePartialTuple(FieldLocation *, int num_fields, bool copy_strings)
virtual Status Prepare(ScannerContext *context)
Implementation of HdfsScanner interface.
bool only_parsing_header_
True if we are parsing the header for this scanner.
const bool * is_materialized_col()
THdfsCompression::type decompression_type_
The most recently used decompression type.
static Status IssueInitialRanges(HdfsScanNode *scan_node, const std::vector< HdfsFileDesc * > &files)
Issue io manager byte ranges for 'files'.
bool GetBytes(int64_t requested_len, uint8_t **buffer, int64_t *out_len, Status *status, bool peek=false)
LLVM code generator. This is the top level object to generate jitted code.
RuntimeState * state_
RuntimeState for error reporting.
THdfsCompression::type file_compression
const HdfsFileDesc * file_desc()
HdfsFileDesc * GetFileDesc(const std::string &filename)
Returns the file desc for 'filename'. Returns NULL if filename is invalid.
Status UpdateDecompressor(const THdfsCompression::type &compression)
bool LogError(const ErrorMsg &msg)
void InitTuple(Tuple *template_tuple, Tuple *tuple)
int GetMemory(MemPool **pool, Tuple **tuple_mem, TupleRow **tuple_row_mem)
Status AddDiskIoRanges(const std::vector< DiskIoMgr::ScanRange * > &ranges)
Adds ranges to the io mgr queue and starts up new scanner threads if possible.
static Status IssueInitialRanges(HdfsScanNode *scan_node, const std::vector< HdfsFileDesc * > &files)
bool partial_tuple_empty_
boost::scoped_ptr< DelimitedTextParser > delimited_text_parser_
Helper class for picking fields and rows from delimited text.
bool eof() const
If true, the stream has reached the end of the file.
static const char * LLVM_CLASS_NAME
Status FindFirstTuple(bool *tuple_found)
HdfsTextScanner(HdfsScanNode *scan_node, RuntimeState *state)
static llvm::Function * Codegen(HdfsScanNode *, const std::vector< ExprContext * > &conjunct_ctxs)
Codegen writing tuples and evaluating predicates.
bool SkipBytes(int64_t length, Status *)
Skip over the next length bytes in the specified HDFS file.
Status CommitRows(int num_rows)
RuntimeState * runtime_state()
int slot_idx_
Index into materialized_slots_ for the next slot to output for the current tuple. ...
int64_t rows_returned() const
bool IR_ALWAYS_INLINE EvalConjuncts(TupleRow *row)
RuntimeProfile::Counter * parse_delimiter_timer_
Time parsing text files.
static llvm::Function * CodegenWriteAlignedTuples(HdfsScanNode *, LlvmCodeGen *, llvm::Function *write_tuple_fn)
void AttachPool(MemPool *pool, bool commit_batch)
int WriteFields(MemPool *, TupleRow *tuple_row_mem, int num_fields, int num_tuples)
Status FillByteBufferGzip(bool *eosr)
void SetTuple(int tuple_idx, Tuple *tuple)
boost::scoped_ptr< MemPool > boundary_pool_
Mem pool for boundary_row_ and boundary_column_.
bool codegen_enabled() const
Returns true if codegen is enabled for this query.
DEFINE_bool(debug_disable_streaming_gzip, false,"Debug flag, will be removed. Disables ""streaming gzip decompression.")
virtual void Set(int64_t v)
virtual Status ProcessSplit()
char * byte_buffer_ptr_
Current position in byte buffer.
bool expected_local() const
DiskIoMgr::ScanRange * AllocateScanRange(hdfsFS fs, const char *file, int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool try_cache, bool expected_local, int64_t mtime)
int WriteAlignedTuples(MemPool *pool, TupleRow *tuple_row_mem, int row_size, FieldLocation *fields, int num_tuples, int max_added_tuples, int slots_per_tuple, int row_start_indx)
Status GetCodegen(LlvmCodeGen **codegen, bool initialize=true)
void CopyBoundaryField(FieldLocation *data, MemPool *pool)
Metadata for a single partition inside an Hdfs table.
static const std::string LZO_INDEX_SUFFIX
Suffix for lzo index files.
Tuple * tuple_
Current tuple pointer into tuple_mem_.
int64_t byte_buffer_read_size_
Actual bytes received from last file read.
const DiskIoMgr::ScanRange * scan_range()
bool abort_on_error() const
char collection_delim() const
RuntimeProfile::Counter * decompress_timer_
Time spent decompressing bytes.
static const std::string SCANNER_THREAD_TOTAL_WALLCLOCK_TIME
Status InitializeWriteTuplesFn(HdfsPartitionDescriptor *partition, THdfsFileFormat::type type, const std::string &scanner_name)
HdfsPartitionDescriptor * partition_descriptor()
int num_materialized_partition_keys() const
Returns number of materialized partition key slots.
virtual Status FillByteBuffer(bool *eosr, int num_bytes=0)
virtual ~HdfsTextScanner()
ScannerContext::Stream * stream_
The first stream for context_.
void set_contains_tuple_data(bool v)
uint8_t * Allocate(int size)
const TupleDescriptor * tuple_desc()
Tuple * next_tuple(Tuple *t) const
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
RuntimeProfile * runtime_profile()
RuntimeProfile::Counter * materialize_tuple_timer() const