15 #include <boost/bind.hpp>
27 using namespace impala;
38 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
41 const vector<HdfsFileDesc*>& files) {
45 vector<DiskIoMgr::ScanRange*> header_ranges;
46 for (
int i = 0; i < files.size(); ++i) {
49 int64_t header_size = min(static_cast<int64_t>(
HEADER_SIZE), files[i]->file_length);
55 files[i]->fs, files[i]->filename.c_str(), header_size, 0, metadata->
partition_id,
56 -1,
false,
false, files[i]->mtime);
57 header_ranges.push_back(header_range);
143 if (status.
ok())
break;
180 ss <<
"Bad synchronization marker" << endl
195 const uint8_t* sync,
int sync_len) {
196 char* sync_str =
reinterpret_cast<char*
>(
const_cast<uint8_t*
>(sync));
200 const_cast<char*>(reinterpret_cast<const char*>(buffer)), buffer_len);
227 DCHECK_LE(offset, buffer_len);
228 if (offset != -1)
break;
235 int64_t to_skip = max(static_cast<int64_t>(0), buffer_len - (sync_size - 1));
241 (sync_size - 1) * 2, &buffer, &buffer_len, &
parse_status_,
true));
243 DCHECK_LE(offset, buffer_len);
244 if (offset != -1)
break;
257 DCHECK_GE(offset, sync_size);
277 const vector<DiskIoMgr::ScanRange*>& splits = desc->
splits;
278 for (
int i = 0; i < splits.size(); ++i) {
296 DCHECK_GE(block_bytes_read, 0);
297 int bytes_left = max(average_block_size - block_bytes_read, 0);
static Status IssueInitialRanges(HdfsScanNode *scan_node, const std::vector< HdfsFileDesc * > &files)
Issue the initial ranges for all sequence container files.
void set_read_past_size_cb(ReadPastSizeCallback cb)
boost::scoped_ptr< Codec > decompressor_
Decompressor class to use, if any.
virtual Status InitNewRange()=0
Reset internal state for a new scan range.
HdfsScanNode * scan_node_
The scan node that started this scanner.
Status GetBuffer(bool peek, uint8_t **buffer, int64_t *out_len)
bool only_parsing_header_
If true, this scanner object is only for processing the header.
FileHeader * header_
File header for this scan range. This is not owned by the parent scan node.
int64_t bytes_left()
Return the number of bytes left in the range for this stream.
boost::scoped_ptr< MemPool > data_buffer_pool_
static const int SYNC_HASH_SIZE
Size of the sync hash field.
virtual Status ProcessRange()=0
Status SkipToSync(const uint8_t *sync, int sync_size)
int Search(const StringValue *str) const
const StringSearch UrlParser::hash_search & hash
#define RETURN_IF_ERROR(stmt)
some generally useful macros
void SetFileMetadata(const std::string &filename, void *metadata)
void * GetFileMetadata(const std::string &filename)
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
#define COUNTER_ADD(c, v)
int64_t file_offset() const
Returns the buffer's current offset in the file.
void RangeComplete(const THdfsFileFormat::type &file_type, const THdfsCompression::type &compression_type)
bool GetBytes(int64_t requested_len, uint8_t **buffer, int64_t *out_len, Status *status, bool peek=false)
RuntimeState * state_
RuntimeState for error reporting.
static const int REMAINING_BLOCK_SIZE_GUESS
#define RETURN_IF_FALSE(x)
static const int MIN_SYNC_READ_SIZE
virtual Status ReadFileHeader()=0
virtual Status ProcessSplit()
int ReadPastSize(int64_t file_offset)
HdfsFileDesc * GetFileDesc(const std::string &filename)
Returns the file desc for 'filename'. Returns NULL if filename is invalid.
bool LogError(const ErrorMsg &msg)
Status AddDiskIoRanges(const std::vector< DiskIoMgr::ScanRange * > &ranges)
Adds ranges to the io mgr queue and starts up new scanner threads if possible.
bool eof() const
If true, the stream has reached the end of the file.
ObjectPool * obj_pool() const
int num_syncs_
The number of syncs seen by this scanner so far.
virtual ~BaseSequenceScanner()
bool SkipBytes(int64_t length, Status *)
Skip over the next length bytes in the specified HDFS file.
#define ADD_COUNTER(profile, name, unit)
RuntimeProfile::Counter * bytes_skipped_counter_
Number of bytes skipped when advancing to next sync on error.
static const int SYNC_MARKER
Sync indicator.
virtual THdfsFileFormat::type file_format() const =0
Returns type of scanner: e.g. rcfile, seqfile.
void AttachPool(MemPool *pool, bool commit_batch)
static const double BLOCK_SIZE_PADDING_PERCENT
std::vector< DiskIoMgr::ScanRange * > splits
Splits (i.e. raw byte ranges) for this file, assigned to this scan node.
DiskIoMgr::ScanRange * AllocateScanRange(hdfsFS fs, const char *file, int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool try_cache, bool expected_local, int64_t mtime)
uint8_t offset[7 *64-sizeof(uint64_t)]
static std::string HexDump(const uint8_t *buf, int64_t length)
Dump the first length bytes of buf to a Hex string.
const ErrorMsg & msg() const
Returns the error message associated with a non-successful status.
int max_read_buffer_size() const
Returns the maximum read buffer size.
int FindSyncBlock(const uint8_t *buffer, int buffer_len, const uint8_t *sync, int sync_len)
bool abort_on_error() const
static const int HEADER_SIZE
ScannerContext::Stream * stream_
The first stream for context_.
void set_contains_tuple_data(bool v)
bool finished_
finished_ is set by ReadSync() and SkipToSync().
bool IsMemLimitExceeded() const
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
void CloseFileRanges(const char *file)
RuntimeProfile * runtime_profile()
BaseSequenceScanner(HdfsScanNode *, RuntimeState *)
virtual FileHeader * AllocateFileHeader()=0