Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
disk-io-mgr-scan-range.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "runtime/disk-io-mgr.h"
17 #include "util/error-util.h"
18 #include "util/hdfs-util.h"
19 
20 #include "common/names.h"
21 
22 using namespace impala;
23 
24 // A very large max value to prevent things from going out of control. Not
25 // expected to ever hit this value (1GB of buffered data per range).
26 const int MAX_QUEUE_CAPACITY = 128;
27 const int MIN_QUEUE_CAPACITY = 2;
28 
29 // Implementation of the ScanRange functionality. Each ScanRange contains a queue
30 // of ready buffers. For each ScanRange, there is only a single producer and
31 // consumer thread, i.e. only one disk thread will push to a scan range at
32 // any time and only one thread will remove from the queue. This is to guarantee
33 // that buffers are queued and read in file order.
34 
35 // This must be called with the reader lock taken.
37  {
38  unique_lock<mutex> scan_range_lock(lock_);
39  DCHECK(Validate()) << DebugString();
40  DCHECK(!eosr_returned_);
41  DCHECK(!eosr_queued_);
42  if (is_cancelled_) {
43  // Return the buffer, this range has been cancelled
44  if (buffer->buffer_ != NULL) {
47  }
49  buffer->Return();
50  return false;
51  }
53  ready_buffers_.push_back(buffer);
54  eosr_queued_ = buffer->eosr();
55 
58  // We have filled the queue, indicating we need back pressure on
59  // the producer side (i.e. we are pushing buffers faster than they
60  // are pulled off, throttle this range more).
62  }
63  }
64 
65  buffer_ready_cv_.notify_one();
66 
67  return blocked_on_queue_;
68 }
69 
71  *buffer = NULL;
72 
73  {
74  unique_lock<mutex> scan_range_lock(lock_);
75  if (eosr_returned_) return Status::OK;
76  DCHECK(Validate()) << DebugString();
77 
78  if (ready_buffers_.empty()) {
79  // The queue is empty indicating this thread could use more
80  // IO. Increase the capacity to allow for more queueing.
81  ++ready_buffers_capacity_ ;
82  ready_buffers_capacity_ = ::min(ready_buffers_capacity_, MAX_QUEUE_CAPACITY);
83  }
84 
85  while (ready_buffers_.empty() && !is_cancelled_) {
86  buffer_ready_cv_.wait(scan_range_lock);
87  }
88 
89  if (is_cancelled_) {
90  DCHECK(!status_.ok());
91  return status_;
92  }
93 
94  // Remove the first ready buffer from the queue and return it
95  DCHECK(!ready_buffers_.empty());
96  *buffer = ready_buffers_.front();
97  ready_buffers_.pop_front();
98  eosr_returned_ = (*buffer)->eosr();
99  }
100 
101  // Update tracking counters. The buffer has now moved from the IoMgr to the
102  // caller.
103  ++io_mgr_->num_buffers_in_readers_;
104  ++reader_->num_buffers_in_reader_;
105  --reader_->num_ready_buffers_;
106  --reader_->num_used_buffers_;
107 
108  Status status = (*buffer)->status_;
109  if (!status.ok()) {
110  (*buffer)->Return();
111  *buffer = NULL;
112  return status;
113  }
114 
115  unique_lock<mutex> reader_lock(reader_->lock_);
116  if (eosr_returned_) {
117  reader_->total_range_queue_capacity_ += ready_buffers_capacity_;
118  ++reader_->num_finished_ranges_;
119  reader_->initial_queue_capacity_ =
120  reader_->total_range_queue_capacity_ / reader_->num_finished_ranges_;
121  }
122 
123  DCHECK(reader_->Validate()) << endl << reader_->DebugString();
124  if (reader_->state_ == RequestContext::Cancelled) {
125  reader_->blocked_ranges_.Remove(this);
126  Cancel(reader_->status_);
127  (*buffer)->Return();
128  *buffer = NULL;
129  return status_;
130  }
131 
132  bool was_blocked = blocked_on_queue_;
133  blocked_on_queue_ = ready_buffers_.size() >= ready_buffers_capacity_;
134  if (was_blocked && !blocked_on_queue_ && !eosr_queued_) {
135  // This scan range was blocked and is no longer, add it to the reader
136  // queue again.
137  reader_->blocked_ranges_.Remove(this);
138  reader_->ScheduleScanRange(this);
139  }
140  return Status::OK;
141 }
142 
144  // Cancelling a range that was never started, ignore.
145  if (io_mgr_ == NULL) return;
146 
147  DCHECK(!status.ok());
148  {
149  // Grab both locks to make sure that all working threads see is_cancelled_.
150  unique_lock<mutex> scan_range_lock(lock_);
151  unique_lock<mutex> hdfs_lock(hdfs_lock_);
152  DCHECK(Validate()) << DebugString();
153  if (is_cancelled_) return;
154  is_cancelled_ = true;
155  status_ = status;
156  }
157  buffer_ready_cv_.notify_all();
158  CleanupQueuedBuffers();
159 
160  // For cached buffers, we can't close the range until the cached buffer is returned.
161  // Close() is called from DiskIoMgr::ReturnBuffer().
162  if (cached_buffer_ == NULL) Close();
163 }
164 
166  DCHECK(is_cancelled_);
167  io_mgr_->num_buffers_in_readers_ += ready_buffers_.size();
168  reader_->num_buffers_in_reader_ += ready_buffers_.size();
169  reader_->num_used_buffers_ -= ready_buffers_.size();
170  reader_->num_ready_buffers_ -= ready_buffers_.size();
171 
172  while (!ready_buffers_.empty()) {
173  BufferDescriptor* buffer = ready_buffers_.front();
174  buffer->Return();
175  ready_buffers_.pop_front();
176  }
177 }
178 
180  stringstream ss;
181  ss << "file=" << file_ << " disk_id=" << disk_id_ << " offset=" << offset_
182  << " len=" << len_ << " bytes_read=" << bytes_read_
183  << " buffer_queue=" << ready_buffers_.size()
184  << " capacity=" << ready_buffers_capacity_
185  << " hdfs_file=" << hdfs_file_;
186  return ss.str();
187 }
188 
190  if (bytes_read_ > len_) {
191  LOG(WARNING) << "Bytes read tracking is wrong. Shouldn't read past the scan range."
192  << " bytes_read_=" << bytes_read_ << " len_=" << len_;
193  return false;
194  }
195  if (eosr_returned_ && !eosr_queued_) {
196  LOG(WARNING) << "Returned eosr to reader before finishing reading the scan range"
197  << " eosr_returned_=" << eosr_returned_
198  << " eosr_queued_=" << eosr_queued_;
199  return false;
200  }
201  return true;
202 }
203 
205  : ready_buffers_capacity_(capacity) {
207  Reset(NULL, "", -1, -1, -1, false, false, NEVER_CACHE);
208 }
209 
211  DCHECK(hdfs_file_ == NULL) << "File was not closed.";
212  DCHECK(cached_buffer_ == NULL) << "Cached buffer was not released.";
213 }
214 
215 void DiskIoMgr::ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
216  int disk_id, bool try_cache, bool expected_local, int64_t mtime, void* meta_data) {
217  DCHECK(ready_buffers_.empty());
218  fs_ = fs;
219  file_ = file;
220  len_ = len;
221  offset_ = offset;
222  disk_id_ = disk_id;
223  try_cache_ = try_cache;
224  expected_local_ = expected_local;
225  meta_data_ = meta_data;
226  cached_buffer_ = NULL;
227  io_mgr_ = NULL;
228  reader_ = NULL;
229  hdfs_file_ = NULL;
230  mtime_ = mtime;
231 }
232 
234  DCHECK(hdfs_file_ == NULL);
235  io_mgr_ = io_mgr;
236  reader_ = reader;
237  local_file_ = NULL;
238  hdfs_file_ = NULL;
239  bytes_read_ = 0;
240  is_cancelled_ = false;
241  eosr_queued_= false;
242  eosr_returned_= false;
243  blocked_on_queue_ = false;
244  if (ready_buffers_capacity_ <= 0) {
245  ready_buffers_capacity_ = reader->initial_scan_range_queue_capacity();
246  DCHECK_GE(ready_buffers_capacity_, MIN_QUEUE_CAPACITY);
247  }
248  DCHECK(Validate()) << DebugString();
249 }
250 
252  unique_lock<mutex> hdfs_lock(hdfs_lock_);
253  if (is_cancelled_) return Status::CANCELLED;
254 
255  if (fs_ != NULL) {
256  if (hdfs_file_ != NULL) return Status::OK;
257 
258  // TODO: is there much overhead opening hdfs files? Should we try to preserve
259  // the handle across multiple scan ranges of a file?
260  hdfs_file_ = hdfsOpenFile(fs_, file(), O_RDONLY, 0, 0, 0);
261  VLOG_FILE << "hdfsOpenFile() file=" << file();
262  if (hdfs_file_ == NULL) {
263  return Status(GetHdfsErrorMsg("Failed to open HDFS file ", file_));
264  }
265 
266  if (hdfsSeek(fs_, hdfs_file_, offset_) != 0) {
267  hdfsCloseFile(fs_, hdfs_file_);
268  VLOG_FILE << "hdfsCloseFile() (error) file=" << file();
269  hdfs_file_ = NULL;
270  string error_msg = GetHdfsErrorMsg("");
271  stringstream ss;
272  ss << "Error seeking to " << offset_ << " in file: " << file_ << " " << error_msg;
273  return Status(ss.str());
274  }
275  } else {
276  if (local_file_ != NULL) return Status::OK;
277 
278  local_file_ = fopen(file(), "r");
279  if (local_file_ == NULL) {
280  string error_msg = GetStrErrMsg();
281  stringstream ss;
282  ss << "Could not open file: " << file_ << ": " << error_msg;
283  return Status(ss.str());
284  }
285  if (fseek(local_file_, offset_, SEEK_SET) == -1) {
286  fclose(local_file_);
287  local_file_ = NULL;
288  string error_msg = GetStrErrMsg();
289  stringstream ss;
290  ss << "Could not seek to " << offset_ << " for file: " << file_
291  << ": " << error_msg;
292  return Status(ss.str());
293  }
294  }
297  }
298  return Status::OK;
299 }
300 
302  unique_lock<mutex> hdfs_lock(hdfs_lock_);
303  if (fs_ != NULL) {
304  if (hdfs_file_ == NULL) return;
305 
306  struct hdfsReadStatistics* stats;
307  if (IsDfsPath(file())) {
308  int success = hdfsFileGetReadStatistics(hdfs_file_, &stats);
309  if (success == 0) {
310  reader_->bytes_read_local_ += stats->totalLocalBytesRead;
311  reader_->bytes_read_short_circuit_ += stats->totalShortCircuitBytesRead;
312  reader_->bytes_read_dn_cache_ += stats->totalZeroCopyBytesRead;
313  if (stats->totalLocalBytesRead != stats->totalBytesRead) {
314  ++reader_->num_remote_ranges_;
315  if (expected_local_) {
316  int remote_bytes = stats->totalBytesRead - stats->totalLocalBytesRead;
317  reader_->unexpected_remote_bytes_ += remote_bytes;
318  VLOG_FILE << "Unexpected remote HDFS read of "
319  << PrettyPrinter::Print(remote_bytes, TUnit::BYTES)
320  << " for file '" << file_ << "'";
321  }
322  }
323  hdfsFileFreeReadStatistics(stats);
324  }
325  }
326  if (cached_buffer_ != NULL) {
327  hadoopRzBufferFree(hdfs_file_, cached_buffer_);
328  cached_buffer_ = NULL;
329  }
330  hdfsCloseFile(fs_, hdfs_file_);
331  VLOG_FILE << "hdfsCloseFile() file=" << file();
332  hdfs_file_ = NULL;
333  } else {
334  if (local_file_ == NULL) return;
335  fclose(local_file_);
336  local_file_ = NULL;
337  }
340  }
341 }
342 
344  // S3 InputStreams don't support DIRECT_READ (i.e. java.nio.ByteBuffer read()
345  // interface). So, hdfsRead() needs to allocate a Java byte[] and copy the data out.
346  // Profiles show that both the JNI array allocation and the memcpy adds much more
347  // overhead for larger buffers, so limit the size of each read request. 128K was
348  // chosen empirically by trying values between 4K and 8M and optimizing for lower CPU
349  // utilization and higher S3 througput.
350  if (disk_id_ == io_mgr_->RemoteS3DiskId()) {
351  DCHECK(IsS3APath(file()));
352  return 128 * 1024;
353  }
354  return numeric_limits<int64_t>::max();
355 }
356 
357 // TODO: how do we best use the disk here. e.g. is it good to break up a
358 // 1MB read into 8 128K reads?
359 // TODO: look at linux disk scheduling
360 Status DiskIoMgr::ScanRange::Read(char* buffer, int64_t* bytes_read, bool* eosr) {
361  unique_lock<mutex> hdfs_lock(hdfs_lock_);
362  if (is_cancelled_) return Status::CANCELLED;
363 
364  *eosr = false;
365  *bytes_read = 0;
366  // hdfsRead() length argument is an int. Since max_buffer_size_ type is no bigger
367  // than an int, this min() will ensure that we don't overflow the length argument.
368  DCHECK_LE(sizeof(io_mgr_->max_buffer_size_), sizeof(int));
369  int bytes_to_read =
370  min(static_cast<int64_t>(io_mgr_->max_buffer_size_), len_ - bytes_read_);
371 
372  if (fs_ != NULL) {
373  DCHECK_NOTNULL(hdfs_file_);
374  int64_t max_chunk_size = MaxReadChunkSize();
375  while (*bytes_read < bytes_to_read) {
376  int chunk_size = min(bytes_to_read - *bytes_read, max_chunk_size);
377  int last_read = hdfsRead(fs_, hdfs_file_, buffer + *bytes_read, chunk_size);
378  if (last_read == -1) {
379  return Status(GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
380  } else if (last_read == 0) {
381  // No more bytes in the file. The scan range went past the end.
382  *eosr = true;
383  break;
384  }
385  *bytes_read += last_read;
386  }
387  } else {
388  DCHECK(local_file_ != NULL);
389  *bytes_read = fread(buffer, 1, bytes_to_read, local_file_);
390  if (*bytes_read < 0) {
391  string error_msg = GetStrErrMsg();
392  stringstream ss;
393  ss << "Could not read from " << file_ << " at byte offset: "
394  << bytes_read_ << ": " << error_msg;
395  return Status(ss.str());
396  }
397  }
398  bytes_read_ += *bytes_read;
399  DCHECK_LE(bytes_read_, len_);
400  if (bytes_read_ == len_) *eosr = true;
401  return Status::OK;
402 }
403 
405  DCHECK(try_cache_);
406  DCHECK_EQ(bytes_read_, 0);
407  *read_succeeded = false;
408  Status status = Open();
409  if (!status.ok()) return status;
410 
411  // Cached reads not supported on local filesystem.
412  if (fs_ == NULL) return Status::OK;
413 
414  {
415  unique_lock<mutex> hdfs_lock(hdfs_lock_);
416  if (is_cancelled_) return Status::CANCELLED;
417 
418  DCHECK(hdfs_file_ != NULL);
419  DCHECK(cached_buffer_ == NULL);
420  cached_buffer_ = hadoopReadZero(hdfs_file_, io_mgr_->cached_read_options_, len());
421 
422  // Data was not cached, caller will fall back to normal read path.
423  if (cached_buffer_ == NULL) return Status::OK;
424  }
425 
426  // Cached read succeeded.
427  void* buffer = const_cast<void*>(hadoopRzBufferGet(cached_buffer_));
428  int32_t bytes_read = hadoopRzBufferLength(cached_buffer_);
429  // For now, entire the entire block is cached or none of it.
430  // TODO: if HDFS ever changes this, we'll have to handle the case where half
431  // the block is cached.
432  DCHECK_EQ(bytes_read, len());
433 
434  // Create a single buffer desc for the entire scan range and enqueue that.
435  BufferDescriptor* desc = io_mgr_->GetBufferDesc(
436  reader_, this, reinterpret_cast<char*>(buffer), 0);
437  desc->len_ = bytes_read;
438  desc->scan_range_offset_ = 0;
439  desc->eosr_ = true;
440  bytes_read_ = bytes_read;
441  EnqueueBuffer(desc);
442  if (reader_->bytes_read_counter_ != NULL) {
443  COUNTER_ADD(reader_->bytes_read_counter_, bytes_read);
444  }
445  *read_succeeded = true;
446  ++reader_->num_used_buffers_;
447  return Status::OK;
448 }
bool IsS3APath(const char *path)
Returns true iff the path refers to a location on an S3A filesystem.
Definition: hdfs-util.cc:72
ScanRange(int initial_capacity=-1)
The initial queue capacity for this. Specify -1 to use IoMgr default.
AtomicInt< int > num_buffers_in_readers_
Total number of buffers in readers.
Definition: disk-io-mgr.h:696
char * buffer_
buffer with the read contents
Definition: disk-io-mgr.h:235
boost::condition_variable buffer_ready_cv_
Definition: disk-io-mgr.h:427
boost::mutex lock_
protects all fields below
Definition: coordinator.h:233
static IntGauge * IO_MGR_NUM_OPEN_FILES
void InitInternal(DiskIoMgr *io_mgr, RequestContext *reader)
Initialize internal fields.
RequestType::type request_type_
The type of IO request, READ or WRITE.
Definition: disk-io-mgr.h:289
#define COUNTER_ADD(c, v)
void Cancel(const Status *cause=NULL)
Status Read(char *buffer, int64_t *bytes_read, bool *eosr)
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
bool EnqueueBuffer(BufferDescriptor *buffer)
Status Open()
Opens the file for this range. This function only modifies state in this range.
const int MAX_QUEUE_CAPACITY
bool eosr_returned_
If true, the last buffer for this scan range has been returned.
Definition: disk-io-mgr.h:419
bool eosr_
true if the current scan range is complete
Definition: disk-io-mgr.h:244
string GetStrErrMsg()
Definition: error-util.cc:30
bool eosr_queued_
If true, the last buffer for this scan range has been queued.
Definition: disk-io-mgr.h:416
void Reset(hdfsFS fs, const char *file, int64_t len, int64_t offset, int disk_id, bool try_cache, bool expected_local, int64_t mtime, void *metadata=NULL)
int64_t len_
length of read contents
Definition: disk-io-mgr.h:241
static const Status CANCELLED
Definition: status.h:88
RequestContext * reader_
Reader/owner of the scan range.
Definition: disk-io-mgr.h:391
static const int64_t NEVER_CACHE
If the mtime is set to NEVER_CACHE, the file handle should never be cached.
Definition: disk-io-mgr.h:299
bool is_cancelled_
If true, this scan range has been cancelled.
Definition: disk-io-mgr.h:445
bool Validate() const
std::string DebugString()
Dumps the disk IoMgr queues (for readers and disks)
Definition: disk-io-mgr.cc:142
Status GetNext(BufferDescriptor **buffer)
static const Status OK
Definition: status.h:87
uint8_t offset[7 *64-sizeof(uint64_t)]
std::list< BufferDescriptor * > ready_buffers_
Definition: disk-io-mgr.h:428
#define VLOG_FILE
Definition: logging.h:58
bool ok() const
Definition: status.h:172
string GetHdfsErrorMsg(const string &prefix, const string &file)
Definition: hdfs-util.cc:26
bool IsDfsPath(const char *path)
Returns true iff the path refers to a location on an HDFS filesystem.
Definition: hdfs-util.cc:66
int64_t MaxReadChunkSize() const
Maximum length in bytes for hdfsRead() calls.
std::string DebugString() const
return a descriptive string for debug.
const int MIN_QUEUE_CAPACITY
Status ReadFromCache(bool *read_succeeded)
void Cancel(const Status &status)
void Close()
Closes the file for this range. This function only modifies state in this range.