Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
base-sequence-scanner.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <boost/bind.hpp>
16 
18 
19 #include "exec/hdfs-scan-node.h"
21 #include "runtime/runtime-state.h"
22 #include "runtime/string-search.h"
23 #include "util/codec.h"
24 
25 #include "common/names.h"
26 
27 using namespace impala;
28 
29 const int BaseSequenceScanner::HEADER_SIZE = 1024;
31 
32 // Constants used in ReadPastSize()
33 static const double BLOCK_SIZE_PADDING_PERCENT = 0.1;
34 static const int REMAINING_BLOCK_SIZE_GUESS = 100 * 1024; // bytes
35 static const int MIN_SYNC_READ_SIZE = 10 * 1024; // bytes
36 
37 // Macro to convert between SerdeUtil errors to Status returns.
38 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
39 
41  const vector<HdfsFileDesc*>& files) {
42  // Issue just the header range for each file. When the header is complete,
43  // we'll issue the splits for that file. Splits cannot be processed until the
44  // header is parsed (the header object is then shared across splits for that file).
45  vector<DiskIoMgr::ScanRange*> header_ranges;
46  for (int i = 0; i < files.size(); ++i) {
47  ScanRangeMetadata* metadata =
48  reinterpret_cast<ScanRangeMetadata*>(files[i]->splits[0]->meta_data());
49  int64_t header_size = min(static_cast<int64_t>(HEADER_SIZE), files[i]->file_length);
50  // The header is almost always a remote read. Set the disk id to -1 and indicate
51  // it is not cached.
52  // TODO: add remote disk id and plumb that through to the io mgr. It should have
53  // 1 queue for each NIC as well?
54  DiskIoMgr::ScanRange* header_range = scan_node->AllocateScanRange(
55  files[i]->fs, files[i]->filename.c_str(), header_size, 0, metadata->partition_id,
56  -1, false, false, files[i]->mtime);
57  header_ranges.push_back(header_range);
58  }
59  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(header_ranges));
60  return Status::OK;
61 }
62 
64  : HdfsScanner(node, state),
65  header_(NULL),
66  block_start_(0),
67  total_block_size_(0),
68  num_syncs_(0) {
69 }
70 
72 }
73 
78  scan_node_->runtime_profile(), "BytesSkipped", TUnit::BYTES);
79  return Status::OK;
80 }
81 
83  VLOG_FILE << "Bytes read past scan range: " << -stream_->bytes_left();
84  VLOG_FILE << "Average block size: "
85  << (num_syncs_ > 1 ? total_block_size_ / (num_syncs_ - 1) : 0);
86  // Need to close the decompressor before releasing the resources at AddFinalRowBatch(),
87  // because in some cases there is memory allocated in decompressor_'s temp_memory_pool_.
88  if (decompressor_.get() != NULL) {
89  decompressor_->Close();
90  decompressor_.reset(NULL);
91  }
92  AttachPool(data_buffer_pool_.get(), false);
94  if (!only_parsing_header_) {
96  }
98 }
99 
101  header_ = reinterpret_cast<FileHeader*>(
103  if (header_ == NULL) {
104  // This is the initial scan range just to parse the header
105  only_parsing_header_ = true;
107  Status status = ReadFileHeader();
108  if (!status.ok()) {
109  if (state_->abort_on_error()) return status;
110  state_->LogError(status.msg());
111  // We need to complete the ranges for this file.
113  return Status::OK;
114  }
115 
116  // Header is parsed, set the metadata in the scan node and issue more ranges
120  return Status::OK;
121  }
122 
123  // Initialize state for new scan range
124  finished_ = false;
125  // If the file is compressed, the buffers in the stream_ are not used directly.
128 
129  Status status = Status::OK;
130 
131  // Skip to the first record
133  // If the scan range starts within the header, skip to the end of the header so we
134  // don't accidentally skip to an extra sync within the header
137  }
139 
140  // Process Range.
141  while (!finished_) {
142  status = ProcessRange();
143  if (status.ok()) break;
144  if (status.IsCancelled() || status.IsMemLimitExceeded()) return status;
145 
146  // Log error from file format parsing.
147  state_->LogError(ErrorMsg(TErrorCode::SEQUENCE_SCANNER_PARSE_ERROR,
149  (stream_->eof() ? "(EOF)" : "")));
150 
151  // Make sure errors specified in the status are logged as well
152  state_->LogError(status.msg());
153 
154  // If abort on error then return, otherwise try to recover.
155  if (state_->abort_on_error()) return status;
156 
157  // Recover by skipping to the next sync.
159  int64_t error_offset = stream_->file_offset();
162  RETURN_IF_ERROR(status);
163  DCHECK(parse_status_.ok());
164  }
165 
166  // All done with this scan range.
167  return Status::OK;
168 }
169 
171  // We are finished when we read a sync marker occurring completely in the next
172  // scan range
173  finished_ = stream_->eosr();
174 
175  uint8_t* hash;
176  int64_t out_len;
178  if (out_len != SYNC_HASH_SIZE || memcmp(hash, header_->sync, SYNC_HASH_SIZE)) {
179  stringstream ss;
180  ss << "Bad synchronization marker" << endl
181  << " Expected: '"
183  << " Actual: '"
184  << ReadWriteUtil::HexDump(hash, SYNC_HASH_SIZE) << "'";
185  return Status(ss.str());
186  }
187  finished_ |= stream_->eof();
190  ++num_syncs_;
191  return Status::OK;
192 }
193 
194 int BaseSequenceScanner::FindSyncBlock(const uint8_t* buffer, int buffer_len,
195  const uint8_t* sync, int sync_len) {
196  char* sync_str = reinterpret_cast<char*>(const_cast<uint8_t*>(sync));
197  StringValue needle = StringValue(sync_str, sync_len);
198 
199  StringValue haystack(
200  const_cast<char*>(reinterpret_cast<const char*>(buffer)), buffer_len);
201 
202  StringSearch search(&needle);
203  int offset = search.Search(&haystack);
204 
205  if (offset != -1) {
206  // Advance offset past sync
207  offset += sync_len;
208  }
209  return offset;
210 }
211 
212 Status BaseSequenceScanner::SkipToSync(const uint8_t* sync, int sync_size) {
213  // offset into current buffer of end of sync (once found, -1 until then)
214  int offset = -1;
215  uint8_t* buffer;
216  int64_t buffer_len;
217  Status status;
218 
219  // Read buffers until we find a sync or reach the end of the scan range. If we read all
220  // the buffers remaining in the scan range and none of them contain a sync (including a
221  // sync that starts at the end of this scan range and continues into the next one), then
222  // there are no more syncs in this scan range and we're finished.
223  while (!stream_->eosr()) {
224  // Check if there's a sync fully contained in the current buffer
225  RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_len));
226  offset = FindSyncBlock(buffer, buffer_len, sync, sync_size);
227  DCHECK_LE(offset, buffer_len);
228  if (offset != -1) break;
229 
230  // No sync found in the current buffer, so check if there's a sync spanning the
231  // current buffer and the next. First we skip so there are sync_size - 1 bytes left,
232  // then we read these bytes plus the first sync_size - 1 bytes of the next buffer.
233  // This guarantees that we find any syncs that start in the current buffer and end in
234  // the next buffer.
235  int64_t to_skip = max(static_cast<int64_t>(0), buffer_len - (sync_size - 1));
237  // Peek so we don't advance stream_ into the next buffer. If we don't find a sync here
238  // then we'll need to check all of the next buffer, including the first sync_size -1
239  // bytes.
241  (sync_size - 1) * 2, &buffer, &buffer_len, &parse_status_, true));
242  offset = FindSyncBlock(buffer, buffer_len, sync, sync_size);
243  DCHECK_LE(offset, buffer_len);
244  if (offset != -1) break;
245 
246  // No sync starting in this buffer, so advance stream_ to the beginning of the next
247  // buffer.
248  RETURN_IF_ERROR(stream_->GetBuffer(false, &buffer, &buffer_len));
249  }
250 
251  if (offset == -1) {
252  // No more syncs in this scan range
253  DCHECK(stream_->eosr());
254  finished_ = true;
255  return Status::OK;
256  }
257  DCHECK_GE(offset, sync_size);
258 
259  // Make sure sync starts in our scan range
260  if (offset - sync_size >= stream_->bytes_left()) {
261  finished_ = true;
262  return Status::OK;
263  }
264 
266  VLOG_FILE << "Found sync for: " << stream_->filename()
267  << " at " << stream_->file_offset() - sync_size;
268  if (stream_->eof()) finished_ = true;
270  ++num_syncs_;
271  return Status::OK;
272 }
273 
274 void BaseSequenceScanner::CloseFileRanges(const char* filename) {
275  DCHECK(only_parsing_header_);
276  HdfsFileDesc* desc = scan_node_->GetFileDesc(filename);
277  const vector<DiskIoMgr::ScanRange*>& splits = desc->splits;
278  for (int i = 0; i < splits.size(); ++i) {
279  COUNTER_ADD(bytes_skipped_counter_, splits[i]->len());
280  scan_node_->RangeComplete(file_format(), THdfsCompression::NONE);
281  }
282 }
283 
284 int BaseSequenceScanner::ReadPastSize(int64_t file_offset) {
285  DCHECK_GE(total_block_size_, 0);
286  if (total_block_size_ == 0) {
287  // This scan range didn't include a complete block, so we have no idea how many bytes
288  // remain in the block. Guess.
290  }
291  DCHECK_GE(num_syncs_, 2);
292  int average_block_size = total_block_size_ / (num_syncs_ - 1);
293 
294  // Number of bytes read in the current block
295  int block_bytes_read = file_offset - block_start_;
296  DCHECK_GE(block_bytes_read, 0);
297  int bytes_left = max(average_block_size - block_bytes_read, 0);
298  // Include some padding
299  bytes_left += average_block_size * BLOCK_SIZE_PADDING_PERCENT;
300 
301  int max_read_size = state_->io_mgr()->max_read_buffer_size();
302  return min(max(bytes_left, MIN_SYNC_READ_SIZE), max_read_size);
303 }
static Status IssueInitialRanges(HdfsScanNode *scan_node, const std::vector< HdfsFileDesc * > &files)
Issue the initial ranges for all sequence container files.
void set_read_past_size_cb(ReadPastSizeCallback cb)
boost::scoped_ptr< Codec > decompressor_
Decompressor class to use, if any.
Definition: hdfs-scanner.h:198
virtual Status InitNewRange()=0
Reset internal state for a new scan range.
HdfsScanNode * scan_node_
The scan node that started this scanner.
Definition: hdfs-scanner.h:141
Status GetBuffer(bool peek, uint8_t **buffer, int64_t *out_len)
bool only_parsing_header_
If true, this scanner object is only for processing the header.
FileHeader * header_
File header for this scan range. This is not owned by the parent scan node.
int64_t bytes_left()
Return the number of bytes left in the range for this stream.
boost::scoped_ptr< MemPool > data_buffer_pool_
Definition: hdfs-scanner.h:205
static const int SYNC_HASH_SIZE
Size of the sync hash field.
virtual Status ProcessRange()=0
Status SkipToSync(const uint8_t *sync, int sync_size)
int Search(const StringValue *str) const
const StringSearch UrlParser::hash_search & hash
Definition: url-parser.cc:41
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
void SetFileMetadata(const std::string &filename, void *metadata)
void * GetFileMetadata(const std::string &filename)
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
#define COUNTER_ADD(c, v)
int64_t partition_id
The partition id that this range is part of.
int64_t file_offset() const
Returns the buffer's current offset in the file.
THdfsCompression::type compression_type
Enum for compression type.
void RangeComplete(const THdfsFileFormat::type &file_type, const THdfsCompression::type &compression_type)
virtual void Close()
Definition: hdfs-scanner.cc:82
bool GetBytes(int64_t requested_len, uint8_t **buffer, int64_t *out_len, Status *status, bool peek=false)
RuntimeState * state_
RuntimeState for error reporting.
Definition: hdfs-scanner.h:144
static const int REMAINING_BLOCK_SIZE_GUESS
#define RETURN_IF_FALSE(x)
static const int MIN_SYNC_READ_SIZE
virtual Status ReadFileHeader()=0
int ReadPastSize(int64_t file_offset)
HdfsFileDesc * GetFileDesc(const std::string &filename)
Returns the file desc for 'filename'. Returns NULL if filename is invalid.
bool LogError(const ErrorMsg &msg)
Status AddDiskIoRanges(const std::vector< DiskIoMgr::ScanRange * > &ranges)
Adds ranges to the io mgr queue and starts up new scanner threads if possible.
bool IsCancelled() const
Definition: status.h:174
bool eof() const
If true, the stream has reached the end of the file.
ObjectPool * obj_pool() const
Definition: runtime-state.h:92
int num_syncs_
The number of syncs seen by this scanner so far.
bool SkipBytes(int64_t length, Status *)
Skip over the next length bytes in the specified HDFS file.
#define ADD_COUNTER(profile, name, unit)
RuntimeProfile::Counter * bytes_skipped_counter_
Number of bytes skipped when advancing to next sync on error.
static const int SYNC_MARKER
Sync indicator.
virtual THdfsFileFormat::type file_format() const =0
Returns type of scanner: e.g. rcfile, seqfile.
void AttachPool(MemPool *pool, bool commit_batch)
Definition: hdfs-scanner.h:256
static const double BLOCK_SIZE_PADDING_PERCENT
uint8_t sync[SYNC_HASH_SIZE]
The sync hash for this file.
std::vector< DiskIoMgr::ScanRange * > splits
Splits (i.e. raw byte ranges) for this file, assigned to this scan node.
DiskIoMgr::ScanRange * AllocateScanRange(hdfsFS fs, const char *file, int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool try_cache, bool expected_local, int64_t mtime)
static const Status OK
Definition: status.h:87
uint8_t offset[7 *64-sizeof(uint64_t)]
static std::string HexDump(const uint8_t *buf, int64_t length)
Dump the first length bytes of buf to a Hex string.
const ErrorMsg & msg() const
Returns the error message associated with a non-successful status.
Definition: status.h:189
#define VLOG_FILE
Definition: logging.h:58
int max_read_buffer_size() const
Returns the maximum read buffer size.
Definition: disk-io-mgr.h:590
int FindSyncBlock(const uint8_t *buffer, int buffer_len, const uint8_t *sync, int sync_len)
bool abort_on_error() const
Definition: runtime-state.h:99
bool ok() const
Definition: status.h:172
DiskIoMgr * io_mgr()
bool is_compressed
true if the file is compressed
ScannerContext::Stream * stream_
The first stream for context_.
Definition: hdfs-scanner.h:150
bool finished_
finished_ is set by ReadSync() and SkipToSync().
bool IsMemLimitExceeded() const
Definition: status.h:178
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
Definition: hdfs-scanner.cc:71
void CloseFileRanges(const char *file)
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161
BaseSequenceScanner(HdfsScanNode *, RuntimeState *)
virtual FileHeader * AllocateFileHeader()=0