22 using namespace impala;
38 unique_lock<mutex> scan_range_lock(
lock_);
74 unique_lock<mutex> scan_range_lock(
lock_);
78 if (ready_buffers_.empty()) {
81 ++ready_buffers_capacity_ ;
85 while (ready_buffers_.empty() && !is_cancelled_) {
86 buffer_ready_cv_.wait(scan_range_lock);
90 DCHECK(!status_.ok());
95 DCHECK(!ready_buffers_.empty());
96 *buffer = ready_buffers_.front();
97 ready_buffers_.pop_front();
98 eosr_returned_ = (*buffer)->
eosr();
103 ++io_mgr_->num_buffers_in_readers_;
104 ++reader_->num_buffers_in_reader_;
105 --reader_->num_ready_buffers_;
106 --reader_->num_used_buffers_;
108 Status status = (*buffer)->status_;
115 unique_lock<mutex> reader_lock(reader_->lock_);
116 if (eosr_returned_) {
117 reader_->total_range_queue_capacity_ += ready_buffers_capacity_;
118 ++reader_->num_finished_ranges_;
119 reader_->initial_queue_capacity_ =
120 reader_->total_range_queue_capacity_ / reader_->num_finished_ranges_;
123 DCHECK(reader_->Validate()) << endl << reader_->DebugString();
125 reader_->blocked_ranges_.Remove(
this);
132 bool was_blocked = blocked_on_queue_;
133 blocked_on_queue_ = ready_buffers_.size() >= ready_buffers_capacity_;
134 if (was_blocked && !blocked_on_queue_ && !eosr_queued_) {
137 reader_->blocked_ranges_.Remove(
this);
138 reader_->ScheduleScanRange(
this);
145 if (io_mgr_ == NULL)
return;
147 DCHECK(!status.
ok());
150 unique_lock<mutex> scan_range_lock(
lock_);
151 unique_lock<mutex> hdfs_lock(hdfs_lock_);
153 if (is_cancelled_)
return;
154 is_cancelled_ =
true;
157 buffer_ready_cv_.notify_all();
158 CleanupQueuedBuffers();
162 if (cached_buffer_ == NULL) Close();
166 DCHECK(is_cancelled_);
167 io_mgr_->num_buffers_in_readers_ += ready_buffers_.size();
168 reader_->num_buffers_in_reader_ += ready_buffers_.size();
169 reader_->num_used_buffers_ -= ready_buffers_.size();
170 reader_->num_ready_buffers_ -= ready_buffers_.size();
172 while (!ready_buffers_.empty()) {
175 ready_buffers_.pop_front();
181 ss <<
"file=" << file_ <<
" disk_id=" << disk_id_ <<
" offset=" << offset_
182 <<
" len=" << len_ <<
" bytes_read=" << bytes_read_
183 <<
" buffer_queue=" << ready_buffers_.size()
184 <<
" capacity=" << ready_buffers_capacity_
185 <<
" hdfs_file=" << hdfs_file_;
190 if (bytes_read_ > len_) {
191 LOG(WARNING) <<
"Bytes read tracking is wrong. Shouldn't read past the scan range."
192 <<
" bytes_read_=" << bytes_read_ <<
" len_=" << len_;
195 if (eosr_returned_ && !eosr_queued_) {
196 LOG(WARNING) <<
"Returned eosr to reader before finishing reading the scan range"
197 <<
" eosr_returned_=" << eosr_returned_
198 <<
" eosr_queued_=" << eosr_queued_;
205 : ready_buffers_capacity_(capacity) {
211 DCHECK(hdfs_file_ == NULL) <<
"File was not closed.";
212 DCHECK(cached_buffer_ == NULL) <<
"Cached buffer was not released.";
216 int disk_id,
bool try_cache,
bool expected_local, int64_t mtime,
void* meta_data) {
217 DCHECK(ready_buffers_.empty());
223 try_cache_ = try_cache;
224 expected_local_ = expected_local;
225 meta_data_ = meta_data;
226 cached_buffer_ = NULL;
234 DCHECK(hdfs_file_ == NULL);
240 is_cancelled_ =
false;
242 eosr_returned_=
false;
243 blocked_on_queue_ =
false;
244 if (ready_buffers_capacity_ <= 0) {
252 unique_lock<mutex> hdfs_lock(hdfs_lock_);
260 hdfs_file_ = hdfsOpenFile(fs_, file(), O_RDONLY, 0, 0, 0);
261 VLOG_FILE <<
"hdfsOpenFile() file=" << file();
262 if (hdfs_file_ == NULL) {
266 if (hdfsSeek(fs_, hdfs_file_, offset_) != 0) {
267 hdfsCloseFile(fs_, hdfs_file_);
268 VLOG_FILE <<
"hdfsCloseFile() (error) file=" << file();
272 ss <<
"Error seeking to " << offset_ <<
" in file: " << file_ <<
" " << error_msg;
278 local_file_ = fopen(file(),
"r");
279 if (local_file_ == NULL) {
282 ss <<
"Could not open file: " << file_ <<
": " << error_msg;
285 if (fseek(local_file_, offset_, SEEK_SET) == -1) {
290 ss <<
"Could not seek to " << offset_ <<
" for file: " << file_
291 <<
": " << error_msg;
302 unique_lock<mutex> hdfs_lock(hdfs_lock_);
304 if (hdfs_file_ == NULL)
return;
306 struct hdfsReadStatistics* stats;
308 int success = hdfsFileGetReadStatistics(hdfs_file_, &stats);
310 reader_->bytes_read_local_ += stats->totalLocalBytesRead;
311 reader_->bytes_read_short_circuit_ += stats->totalShortCircuitBytesRead;
312 reader_->bytes_read_dn_cache_ += stats->totalZeroCopyBytesRead;
313 if (stats->totalLocalBytesRead != stats->totalBytesRead) {
314 ++reader_->num_remote_ranges_;
315 if (expected_local_) {
316 int remote_bytes = stats->totalBytesRead - stats->totalLocalBytesRead;
317 reader_->unexpected_remote_bytes_ += remote_bytes;
318 VLOG_FILE <<
"Unexpected remote HDFS read of "
320 <<
" for file '" << file_ <<
"'";
323 hdfsFileFreeReadStatistics(stats);
326 if (cached_buffer_ != NULL) {
327 hadoopRzBufferFree(hdfs_file_, cached_buffer_);
328 cached_buffer_ = NULL;
330 hdfsCloseFile(fs_, hdfs_file_);
331 VLOG_FILE <<
"hdfsCloseFile() file=" << file();
334 if (local_file_ == NULL)
return;
350 if (disk_id_ == io_mgr_->RemoteS3DiskId()) {
354 return numeric_limits<int64_t>::max();
361 unique_lock<mutex> hdfs_lock(hdfs_lock_);
368 DCHECK_LE(
sizeof(io_mgr_->max_buffer_size_),
sizeof(
int));
370 min(static_cast<int64_t>(io_mgr_->max_buffer_size_), len_ - bytes_read_);
373 DCHECK_NOTNULL(hdfs_file_);
374 int64_t max_chunk_size = MaxReadChunkSize();
375 while (*bytes_read < bytes_to_read) {
376 int chunk_size = min(bytes_to_read - *bytes_read, max_chunk_size);
377 int last_read = hdfsRead(fs_, hdfs_file_, buffer + *bytes_read, chunk_size);
378 if (last_read == -1) {
380 }
else if (last_read == 0) {
385 *bytes_read += last_read;
388 DCHECK(local_file_ != NULL);
389 *bytes_read = fread(buffer, 1, bytes_to_read, local_file_);
390 if (*bytes_read < 0) {
393 ss <<
"Could not read from " << file_ <<
" at byte offset: "
394 << bytes_read_ <<
": " << error_msg;
398 bytes_read_ += *bytes_read;
399 DCHECK_LE(bytes_read_, len_);
400 if (bytes_read_ == len_) *eosr =
true;
406 DCHECK_EQ(bytes_read_, 0);
407 *read_succeeded =
false;
409 if (!status.
ok())
return status;
415 unique_lock<mutex> hdfs_lock(hdfs_lock_);
418 DCHECK(hdfs_file_ != NULL);
419 DCHECK(cached_buffer_ == NULL);
420 cached_buffer_ = hadoopReadZero(hdfs_file_, io_mgr_->cached_read_options_, len());
423 if (cached_buffer_ == NULL)
return Status::OK;
427 void* buffer =
const_cast<void*
>(hadoopRzBufferGet(cached_buffer_));
428 int32_t bytes_read = hadoopRzBufferLength(cached_buffer_);
432 DCHECK_EQ(bytes_read, len());
436 reader_,
this, reinterpret_cast<char*>(buffer), 0);
437 desc->
len_ = bytes_read;
440 bytes_read_ = bytes_read;
442 if (reader_->bytes_read_counter_ != NULL) {
443 COUNTER_ADD(reader_->bytes_read_counter_, bytes_read);
445 *read_succeeded =
true;
446 ++reader_->num_used_buffers_;
bool IsS3APath(const char *path)
Returns true iff the path refers to a location on an S3A filesystem.
ScanRange(int initial_capacity=-1)
The initial queue capacity for this. Specify -1 to use IoMgr default.
AtomicInt< int > num_buffers_in_readers_
Total number of buffers in readers.
char * buffer_
buffer with the read contents
boost::condition_variable buffer_ready_cv_
boost::mutex lock_
protects all fields below
static IntGauge * IO_MGR_NUM_OPEN_FILES
void InitInternal(DiskIoMgr *io_mgr, RequestContext *reader)
Initialize internal fields.
RequestType::type request_type_
The type of IO request, READ or WRITE.
#define COUNTER_ADD(c, v)
void Cancel(const Status *cause=NULL)
void CleanupQueuedBuffers()
Status Read(char *buffer, int64_t *bytes_read, bool *eosr)
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
bool EnqueueBuffer(BufferDescriptor *buffer)
Status Open()
Opens the file for this range. This function only modifies state in this range.
const int MAX_QUEUE_CAPACITY
bool eosr_returned_
If true, the last buffer for this scan range has been returned.
int64_t scan_range_offset_
bool eosr_
true if the current scan range is complete
bool eosr_queued_
If true, the last buffer for this scan range has been queued.
void Reset(hdfsFS fs, const char *file, int64_t len, int64_t offset, int disk_id, bool try_cache, bool expected_local, int64_t mtime, void *metadata=NULL)
int64_t len_
length of read contents
static const Status CANCELLED
RequestContext * reader_
Reader/owner of the scan range.
static const int64_t NEVER_CACHE
If the mtime is set to NEVER_CACHE, the file handle should never be cached.
AtomicInt< int > num_used_buffers_
bool is_cancelled_
If true, this scan range has been cancelled.
int initial_scan_range_queue_capacity() const
std::string DebugString()
Dumps the disk IoMgr queues (for readers and disks)
Status GetNext(BufferDescriptor **buffer)
uint8_t offset[7 *64-sizeof(uint64_t)]
std::list< BufferDescriptor * > ready_buffers_
int ready_buffers_capacity_
AtomicInt< int > num_buffers_in_reader_
string GetHdfsErrorMsg(const string &prefix, const string &file)
bool IsDfsPath(const char *path)
Returns true iff the path refers to a location on an HDFS filesystem.
int64_t MaxReadChunkSize() const
Maximum length in bytes for hdfsRead() calls.
std::string DebugString() const
return a descriptive string for debug.
const int MIN_QUEUE_CAPACITY
Status ReadFromCache(bool *read_succeeded)
AtomicInt< int > num_ready_buffers_
void Cancel(const Status &status)
void Close()
Closes the file for this range. This function only modifies state in this range.