Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
disk-io-mgr.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "runtime/disk-io-mgr.h"
17 #include "util/hdfs-util.h"
18 
19 #include <gutil/strings/substitute.h>
20 #include <boost/algorithm/string.hpp>
21 
22 DECLARE_bool(disable_mem_pools);
23 
24 #include "common/names.h"
25 
26 using namespace impala;
27 using namespace strings;
28 
29 // Control the number of disks on the machine. If 0, this comes from the system
30 // settings.
31 DEFINE_int32(num_disks, 0, "Number of disks on data node.");
32 // Default IoMgr configs:
33 // The maximum number of the threads per disk is also the max queue depth per disk.
34 DEFINE_int32(num_threads_per_disk, 0, "number of threads per disk");
35 // The maximum number of S3 I/O threads. The default value of 16 was chosen emperically
36 // to maximize S3 throughput. Maximum throughput is achieved with multiple connections
37 // open to S3 and use of multiple CPU cores since S3 reads are relatively compute
38 // expensive (SSL and JNI buffer overheads).
39 DEFINE_int32(num_s3_io_threads, 16, "number of S3 I/O threads");
40 // The read size is the size of the reads sent to hdfs/os.
41 // There is a trade off of latency and throughout, trying to keep disks busy but
42 // not introduce seeks. The literature seems to agree that with 8 MB reads, random
43 // io and sequential io perform similarly.
44 DEFINE_int32(read_size, 8 * 1024 * 1024, "Read Size (in bytes)");
45 DEFINE_int32(min_buffer_size, 1024, "The minimum read buffer size (in bytes)");
46 
47 // With 1024B through 8MB buffers, this is up to ~2GB of buffers.
48 DEFINE_int32(max_free_io_buffers, 128,
49  "For each io buffer size, the maximum number of buffers the IoMgr will hold onto");
50 
51 // Rotational disks should have 1 thread per disk to minimize seeks. Non-rotational
52 // don't have this penalty and benefit from multiple concurrent IO requests.
53 static const int THREADS_PER_ROTATIONAL_DISK = 1;
54 static const int THREADS_PER_FLASH_DISK = 8;
55 
56 // The IoMgr is able to run with a wide range of memory usage. If a query has memory
57 // remaining less than this value, the IoMgr will stop all buffering regardless of the
58 // current queue size.
59 static const int LOW_MEMORY = 64 * 1024 * 1024;
60 
62 
63 // This class provides a cache of RequestContext objects. RequestContexts are recycled.
64 // This is good for locality as well as lock contention. The cache has the property that
65 // regardless of how many clients get added/removed, the memory locations for
66 // existing clients do not change (not the case with std::vector) minimizing the locks we
67 // have to take across all readers.
68 // All functions on this object are thread safe
70  public:
71  RequestContextCache(DiskIoMgr* io_mgr) : io_mgr_(io_mgr) {}
72 
73  // Returns a context to the cache. This object can now be reused.
74  void ReturnContext(RequestContext* reader) {
75  DCHECK(reader->state_ != RequestContext::Inactive);
76  reader->state_ = RequestContext::Inactive;
77  lock_guard<mutex> l(lock_);
78  inactive_contexts_.push_back(reader);
79  }
80 
81  // Returns a new RequestContext object. Allocates a new object if necessary.
83  lock_guard<mutex> l(lock_);
84  if (!inactive_contexts_.empty()) {
85  RequestContext* reader = inactive_contexts_.front();
86  inactive_contexts_.pop_front();
87  return reader;
88  } else {
89  RequestContext* reader = new RequestContext(io_mgr_, io_mgr_->num_total_disks());
90  all_contexts_.push_back(reader);
91  return reader;
92  }
93  }
94 
95  // This object has the same lifetime as the disk IoMgr.
97  for (list<RequestContext*>::iterator it = all_contexts_.begin();
98  it != all_contexts_.end(); ++it) {
99  delete *it;
100  }
101  }
102 
103  // Validates that all readers are cleaned up and in the inactive state. No locks
104  // are taken since this is only called from the disk IoMgr destructor.
106  for (list<RequestContext*>::iterator it = all_contexts_.begin();
107  it != all_contexts_.end(); ++it) {
108  if ((*it)->state_ != RequestContext::Inactive) {
109  return false;
110  }
111  }
112  DCHECK_EQ(all_contexts_.size(), inactive_contexts_.size());
113  return all_contexts_.size() == inactive_contexts_.size();
114  }
115 
116  string DebugString();
117 
118  private:
120 
121  // lock to protect all members below
122  mutex lock_;
123 
124  // List of all request contexts created. Used for debugging
125  list<RequestContext*> all_contexts_;
126 
127  // List of inactive readers. These objects can be used for a new reader.
128  list<RequestContext*> inactive_contexts_;
129 };
130 
132  lock_guard<mutex> l(lock_);
133  stringstream ss;
134  for (list<RequestContext*>::iterator it = all_contexts_.begin();
135  it != all_contexts_.end(); ++it) {
136  unique_lock<mutex> lock((*it)->lock_);
137  ss << (*it)->DebugString() << endl;
138  }
139  return ss.str();
140 }
141 
143  stringstream ss;
144  ss << "RequestContexts: " << endl << request_context_cache_->DebugString() << endl;
145 
146  ss << "Disks: " << endl;
147  for (int i = 0; i < disk_queues_.size(); ++i) {
148  unique_lock<mutex> lock(disk_queues_[i]->lock);
149  ss << " " << (void*) disk_queues_[i] << ":" ;
150  if (!disk_queues_[i]->request_contexts.empty()) {
151  ss << " Readers: ";
152  BOOST_FOREACH(RequestContext* req_context, disk_queues_[i]->request_contexts) {
153  ss << (void*)req_context;
154  }
155  }
156  ss << endl;
157  }
158  return ss.str();
159 }
160 
162  io_mgr_(io_mgr), reader_(NULL), buffer_(NULL) {
163 }
164 
166  ScanRange* range, char* buffer, int64_t buffer_len) {
167  DCHECK(io_mgr_ != NULL);
168  DCHECK(buffer_ == NULL);
169  DCHECK(range != NULL);
170  DCHECK(buffer != NULL);
171  DCHECK_GE(buffer_len, 0);
172  reader_ = reader;
173  scan_range_ = range;
174  buffer_ = buffer;
175  buffer_len_ = buffer_len;
176  len_ = 0;
177  eosr_ = false;
178  status_ = Status::OK;
179  mem_tracker_ = NULL;
180 }
181 
183  DCHECK(io_mgr_ != NULL);
184  io_mgr_->ReturnBuffer(this);
185 }
186 
188  // Cached buffers don't count towards mem usage.
189  if (scan_range_->cached_buffer_ != NULL) return;
190  if (mem_tracker_ == tracker) return;
191  if (mem_tracker_ != NULL) mem_tracker_->Release(buffer_len_);
192  mem_tracker_ = tracker;
193  if (mem_tracker_ != NULL) mem_tracker_->Consume(buffer_len_);
194 }
195 
196 DiskIoMgr::WriteRange::WriteRange(const string& file, int64_t file_offset, int disk_id,
197  WriteDoneCallback callback) {
198  file_ = file;
199  offset_ = file_offset;
200  disk_id_ = disk_id;
201  callback_ = callback;
202  request_type_ = RequestType::WRITE;
203 }
204 
205 void DiskIoMgr::WriteRange::SetData(const uint8_t* buffer, int64_t len) {
206  data_ = buffer;
207  len_ = len;
208 }
209 
210 static void CheckSseSupport() {
212  LOG(WARNING) << "This machine does not support sse4_2. The default IO system "
213  "configurations are suboptimal for this hardware. Consider "
214  "increasing the number of threads per disk by restarting impalad "
215  "using the --num_threads_per_disk flag with a higher value";
216  }
217 }
218 
220  num_threads_per_disk_(FLAGS_num_threads_per_disk),
221  max_buffer_size_(FLAGS_read_size),
222  min_buffer_size_(FLAGS_min_buffer_size),
223  cached_read_options_(NULL),
224  shut_down_(false),
225  total_bytes_read_counter_(TUnit::BYTES),
226  read_timer_(TUnit::TIME_NS) {
227  int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_);
228  free_buffers_.resize(BitUtil::Log2(max_buffer_size_scaled) + 1);
229  int num_local_disks = FLAGS_num_disks == 0 ? DiskInfo::num_disks() : FLAGS_num_disks;
230  disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS);
231  CheckSseSupport();
232 }
233 
234 DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_disk, int min_buffer_size,
235  int max_buffer_size) :
236  num_threads_per_disk_(threads_per_disk),
237  max_buffer_size_(max_buffer_size),
238  min_buffer_size_(min_buffer_size),
239  cached_read_options_(NULL),
240  shut_down_(false),
241  total_bytes_read_counter_(TUnit::BYTES),
242  read_timer_(TUnit::TIME_NS) {
243  int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_);
244  free_buffers_.resize(BitUtil::Log2(max_buffer_size_scaled) + 1);
245  if (num_local_disks == 0) num_local_disks = DiskInfo::num_disks();
246  disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS);
247  CheckSseSupport();
248 }
249 
251  shut_down_ = true;
252  // Notify all worker threads and shut them down.
253  for (int i = 0; i < disk_queues_.size(); ++i) {
254  if (disk_queues_[i] == NULL) continue;
255  {
256  // This lock is necessary to properly use the condition var to notify
257  // the disk worker threads. The readers also grab this lock so updates
258  // to shut_down_ are protected.
259  unique_lock<mutex> disk_lock(disk_queues_[i]->lock);
260  }
261  disk_queues_[i]->work_available.notify_all();
262  }
264 
265  for (int i = 0; i < disk_queues_.size(); ++i) {
266  if (disk_queues_[i] == NULL) continue;
267  int disk_id = disk_queues_[i]->disk_id;
268  for (list<RequestContext*>::iterator it = disk_queues_[i]->request_contexts.begin();
269  it != disk_queues_[i]->request_contexts.end(); ++it) {
270  DCHECK_EQ((*it)->disk_states_[disk_id].num_threads_in_op(), 0);
271  DCHECK((*it)->disk_states_[disk_id].done());
272  (*it)->DecrementDiskRefCount();
273  }
274  }
275 
276  DCHECK(request_context_cache_.get() == NULL ||
277  request_context_cache_->ValidateAllInactive())
278  << endl << DebugString();
279  DCHECK_EQ(num_buffers_in_readers_, 0);
280 
281  // Delete all allocated buffers
282  int num_free_buffers = 0;
283  for (int idx = 0; idx < free_buffers_.size(); ++idx) {
284  num_free_buffers += free_buffers_[idx].size();
285  }
286  DCHECK_EQ(num_allocated_buffers_, num_free_buffers);
287  GcIoBuffers();
288 
289  for (int i = 0; i < disk_queues_.size(); ++i) {
290  delete disk_queues_[i];
291  }
292 
293  if (cached_read_options_ != NULL) hadoopRzOptionsFree(cached_read_options_);
294 }
295 
296 Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
297  DCHECK(process_mem_tracker != NULL);
298  process_mem_tracker_ = process_mem_tracker;
299  // If we hit the process limit, see if we can reclaim some memory by removing
300  // previously allocated (but unused) io buffers.
301  process_mem_tracker->AddGcFunction(bind(&DiskIoMgr::GcIoBuffers, this));
302 
303  for (int i = 0; i < disk_queues_.size(); ++i) {
304  disk_queues_[i] = new DiskQueue(i);
305  int num_threads_per_disk;
306  if (i == RemoteS3DiskId()) {
307  num_threads_per_disk = FLAGS_num_s3_io_threads;
308  } else if (num_threads_per_disk_ != 0) {
309  num_threads_per_disk = num_threads_per_disk_;
310  } else if (DiskInfo::is_rotational(i)) {
311  num_threads_per_disk = THREADS_PER_ROTATIONAL_DISK;
312  } else {
313  num_threads_per_disk = THREADS_PER_FLASH_DISK;
314  }
315  for (int j = 0; j < num_threads_per_disk; ++j) {
316  stringstream ss;
317  ss << "work-loop(Disk: " << i << ", Thread: " << j << ")";
318  disk_thread_group_.AddThread(new Thread("disk-io-mgr", ss.str(),
319  &DiskIoMgr::WorkLoop, this, disk_queues_[i]));
320  }
321  }
323 
324  cached_read_options_ = hadoopRzOptionsAlloc();
325  DCHECK(cached_read_options_ != NULL);
326  // Disable checksumming for cached reads.
327  int ret = hadoopRzOptionsSetSkipChecksum(cached_read_options_, true);
328  DCHECK_EQ(ret, 0);
329  // Disable automatic fallback for cached reads.
330  ret = hadoopRzOptionsSetByteBufferPool(cached_read_options_, NULL);
331  DCHECK_EQ(ret, 0);
332 
333  return Status::OK;
334 }
335 
337  MemTracker* mem_tracker) {
338  DCHECK(request_context_cache_.get() != NULL) << "Must call Init() first.";
339  *request_context = request_context_cache_->GetNewContext();
340  (*request_context)->Reset(mem_tracker);
341  return Status::OK;
342 }
343 
345  // Blocking cancel (waiting for disks completion).
346  CancelContext(reader, true);
347 
348  // All the disks are done with clean, validate nothing is leaking.
349  unique_lock<mutex> reader_lock(reader->lock_);
350  DCHECK_EQ(reader->num_buffers_in_reader_, 0) << endl << reader->DebugString();
351  DCHECK_EQ(reader->num_used_buffers_, 0) << endl << reader->DebugString();
352 
353  DCHECK(reader->Validate()) << endl << reader->DebugString();
354  request_context_cache_->ReturnContext(reader);
355 }
356 
357 // Cancellation requires coordination from multiple threads. Each thread that currently
358 // has a reference to the request context must notice the cancel and remove it from its
359 // tracking structures. The last thread to touch the context should deallocate (aka
360 // recycle) the request context object. Potential threads are:
361 // 1. Disk threads that are currently reading for this reader.
362 // 2. Caller threads that are waiting in GetNext.
363 //
364 // The steps are:
365 // 1. Cancel will immediately set the context in the Cancelled state. This prevents any
366 // other thread from adding more ready buffers to the context (they all take a lock and
367 // check the state before doing so), or any write ranges to the context.
368 // 2. Cancel will call cancel on each ScanRange that is not yet complete, unblocking
369 // any threads in GetNext(). The reader will see the cancelled Status returned. Cancel
370 // also invokes the callback for the WriteRanges with the cancelled state.
371 // 3. Disk threads notice the context is cancelled either when picking the next context
372 // to process or when they try to enqueue a ready buffer. Upon noticing the cancelled
373 // state, removes the context from the disk queue. The last thread per disk with an
374 // outstanding reference to the context decrements the number of disk queues the context
375 // is on.
376 // If wait_for_disks_completion is true, wait for the number of active disks to become 0.
377 void DiskIoMgr::CancelContext(RequestContext* context, bool wait_for_disks_completion) {
378  context->Cancel(Status::CANCELLED);
379 
380  if (wait_for_disks_completion) {
381  unique_lock<mutex> lock(context->lock_);
382  DCHECK(context->Validate()) << endl << context->DebugString();
383  while (context->num_disks_with_ranges_ > 0) {
384  context->disks_complete_cond_var_.wait(lock);
385  }
386  }
387 }
388 
390  r->read_timer_ = c;
391 }
392 
394  r->bytes_read_counter_ = c;
395 }
396 
400 }
401 
404  r->disks_accessed_bitmap_ = c;
405 }
406 
407 int64_t DiskIoMgr::queue_size(RequestContext* reader) const {
408  return reader->num_ready_buffers_;
409 }
410 
412  unique_lock<mutex> lock(context->lock_);
413  return context->status_;
414 }
415 
417  return reader->num_unstarted_scan_ranges_;
418 }
419 
421  return reader->bytes_read_local_;
422 }
423 
425  return reader->bytes_read_short_circuit_;
426 }
427 
429  return reader->bytes_read_dn_cache_;
430 }
431 
433  return reader->num_remote_ranges_;
434 }
435 
437  return reader->unexpected_remote_bytes_;
438 }
439 
442 }
443 
445  int disk_id = range->disk_id_;
446  if (disk_id < 0 || disk_id >= disk_queues_.size()) {
447  stringstream ss;
448  ss << "Invalid scan range. Bad disk id: " << disk_id;
449  DCHECK(false) << ss.str();
450  return Status(ss.str());
451  }
452  return Status::OK;
453 }
454 
456  const vector<ScanRange*>& ranges, bool schedule_immediately) {
457  if (ranges.empty()) return Status::OK;
458 
459  // Validate and initialize all ranges
460  for (int i = 0; i < ranges.size(); ++i) {
462  ranges[i]->InitInternal(this, reader);
463  }
464 
465  // disks that this reader needs to be scheduled on.
466  unique_lock<mutex> reader_lock(reader->lock_);
467  DCHECK(reader->Validate()) << endl << reader->DebugString();
468 
469  if (reader->state_ == RequestContext::Cancelled) {
470  DCHECK(!reader->status_.ok());
471  return reader->status_;
472  }
473 
474  // Add each range to the queue of the disk the range is on
475  for (int i = 0; i < ranges.size(); ++i) {
476  // Don't add empty ranges.
477  DCHECK_NE(ranges[i]->len(), 0);
478  ScanRange* range = ranges[i];
479 
480  if (range->try_cache_) {
481  if (schedule_immediately) {
482  bool cached_read_succeeded;
483  RETURN_IF_ERROR(range->ReadFromCache(&cached_read_succeeded));
484  if (cached_read_succeeded) continue;
485  // Cached read failed, fall back to AddRequestRange() below.
486  } else {
487  reader->cached_ranges_.Enqueue(range);
488  continue;
489  }
490  }
491  reader->AddRequestRange(range, schedule_immediately);
492  }
493  DCHECK(reader->Validate()) << endl << reader->DebugString();
494 
495  return Status::OK;
496 }
497 
498 // This function returns the next scan range the reader should work on, checking
499 // for eos and error cases. If there isn't already a cached scan range or a scan
500 // range prepared by the disk threads, the caller waits on the disk threads.
502  DCHECK_NOTNULL(reader);
503  DCHECK_NOTNULL(range);
504  *range = NULL;
505  Status status = Status::OK;
506 
507  unique_lock<mutex> reader_lock(reader->lock_);
508  DCHECK(reader->Validate()) << endl << reader->DebugString();
509 
510  while (true) {
511  if (reader->state_ == RequestContext::Cancelled) {
512  DCHECK(!reader->status_.ok());
513  status = reader->status_;
514  break;
515  }
516 
517  if (reader->num_unstarted_scan_ranges_ == 0 &&
518  reader->ready_to_start_ranges_.empty() && reader->cached_ranges_.empty()) {
519  // All ranges are done, just return.
520  break;
521  }
522 
523  if (!reader->cached_ranges_.empty()) {
524  // We have a cached range.
525  *range = reader->cached_ranges_.Dequeue();
526  DCHECK((*range)->try_cache_);
527  bool cached_read_succeeded;
528  RETURN_IF_ERROR((*range)->ReadFromCache(&cached_read_succeeded));
529  if (cached_read_succeeded) return Status::OK;
530 
531  // This range ended up not being cached. Loop again and pick up a new range.
532  reader->AddRequestRange(*range, false);
533  DCHECK(reader->Validate()) << endl << reader->DebugString();
534  *range = NULL;
535  continue;
536  }
537 
538  if (reader->ready_to_start_ranges_.empty()) {
539  reader->ready_to_start_ranges_cv_.wait(reader_lock);
540  } else {
541  *range = reader->ready_to_start_ranges_.Dequeue();
542  DCHECK_NOTNULL(*range);
543  int disk_id = (*range)->disk_id();
544  DCHECK_EQ(*range, reader->disk_states_[disk_id].next_scan_range_to_start());
545  // Set this to NULL, the next time this disk runs for this reader, it will
546  // get another range ready.
547  reader->disk_states_[disk_id].set_next_scan_range_to_start(NULL);
548  reader->ScheduleScanRange(*range);
549  break;
550  }
551  }
552  return status;
553 }
554 
556  ScanRange* range, BufferDescriptor** buffer) {
557  DCHECK_NOTNULL(range);
558  DCHECK_NOTNULL(buffer);
559  *buffer = NULL;
560 
561  if (range->len() > max_buffer_size_) {
562  return Status(Substitute("Cannot perform sync read larger than $0. Request was $1",
563  max_buffer_size_, range->len()));
564  }
565 
566  vector<DiskIoMgr::ScanRange*> ranges;
567  ranges.push_back(range);
568  RETURN_IF_ERROR(AddScanRanges(reader, ranges, true));
569  RETURN_IF_ERROR(range->GetNext(buffer));
570  DCHECK((*buffer) != NULL);
571  DCHECK((*buffer)->eosr());
572  return Status::OK;
573 }
574 
576  DCHECK_NOTNULL(buffer_desc);
577  if (!buffer_desc->status_.ok()) DCHECK(buffer_desc->buffer_ == NULL);
578 
579  RequestContext* reader = buffer_desc->reader_;
580  if (buffer_desc->buffer_ != NULL) {
581  if (buffer_desc->scan_range_->cached_buffer_ == NULL) {
582  // Not a cached buffer. Return the io buffer and update mem tracking.
583  ReturnFreeBuffer(buffer_desc);
584  }
585  buffer_desc->buffer_ = NULL;
587  --reader->num_buffers_in_reader_;
588  } else {
589  // A NULL buffer means there was an error in which case there is no buffer
590  // to return.
591  }
592 
593  if (buffer_desc->eosr_ || buffer_desc->scan_range_->is_cancelled_) {
594  // Need to close the scan range if returning the last buffer or the scan range
595  // has been cancelled (and the caller might never get the last buffer).
596  // Close() is idempotent so multiple cancelled buffers is okay.
597  buffer_desc->scan_range_->Close();
598  }
599  ReturnBufferDesc(buffer_desc);
600 }
601 
603  DCHECK(desc != NULL);
604  unique_lock<mutex> lock(free_buffers_lock_);
605  DCHECK(find(free_buffer_descs_.begin(), free_buffer_descs_.end(), desc)
606  == free_buffer_descs_.end());
607  free_buffer_descs_.push_back(desc);
608 }
609 
611  RequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size) {
612  BufferDescriptor* buffer_desc;
613  {
614  unique_lock<mutex> lock(free_buffers_lock_);
615  if (free_buffer_descs_.empty()) {
616  buffer_desc = pool_.Add(new BufferDescriptor(this));
617  } else {
618  buffer_desc = free_buffer_descs_.front();
619  free_buffer_descs_.pop_front();
620  }
621  }
622  buffer_desc->Reset(reader, range, buffer, buffer_size);
623  buffer_desc->SetMemTracker(reader->mem_tracker_);
624  return buffer_desc;
625 }
626 
627 char* DiskIoMgr::GetFreeBuffer(int64_t* buffer_size) {
628  DCHECK_LE(*buffer_size, max_buffer_size_);
629  DCHECK_GT(*buffer_size, 0);
630  *buffer_size = min(static_cast<int64_t>(max_buffer_size_), *buffer_size);
631  int idx = free_buffers_idx(*buffer_size);
632  // Quantize buffer size to nearest power of 2 greater than the specified buffer size and
633  // convert to bytes
634  *buffer_size = (1 << idx) * min_buffer_size_;
635 
636  unique_lock<mutex> lock(free_buffers_lock_);
637  char* buffer = NULL;
638  if (free_buffers_[idx].empty()) {
641  ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(1L);
642  }
644  ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(*buffer_size);
645  }
646  // Update the process mem usage. This is checked the next time we start
647  // a read for the next reader (DiskIoMgr::GetNextScanRange)
648  process_mem_tracker_->Consume(*buffer_size);
649  buffer = new char[*buffer_size];
650  } else {
653  }
654  buffer = free_buffers_[idx].front();
655  free_buffers_[idx].pop_front();
656  }
657  DCHECK(buffer != NULL);
658  return buffer;
659 }
660 
662  unique_lock<mutex> lock(free_buffers_lock_);
663  int buffers_freed = 0;
664  int bytes_freed = 0;
665  for (int idx = 0; idx < free_buffers_.size(); ++idx) {
666  for (list<char*>::iterator iter = free_buffers_[idx].begin();
667  iter != free_buffers_[idx].end(); ++iter) {
668  int64_t buffer_size = (1 << idx) * min_buffer_size_;
669  process_mem_tracker_->Release(buffer_size);
671  delete[] *iter;
672 
673  ++buffers_freed;
674  bytes_freed += buffer_size;
675  }
676  free_buffers_[idx].clear();
677  }
678 
680  ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-buffers_freed);
681  }
683  ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-bytes_freed);
684  }
687  }
688 }
689 
691  ReturnFreeBuffer(desc->buffer_, desc->buffer_len_);
692  desc->SetMemTracker(NULL);
693  desc->buffer_ = NULL;
694 }
695 
696 void DiskIoMgr::ReturnFreeBuffer(char* buffer, int64_t buffer_size) {
697  DCHECK(buffer != NULL);
698  int idx = free_buffers_idx(buffer_size);
699  DCHECK_EQ(BitUtil::Ceil(buffer_size, min_buffer_size_) & ~(1 << idx), 0)
700  << "buffer_size_ / min_buffer_size_ should be power of 2, got buffer_size = "
701  << buffer_size << ", min_buffer_size_ = " << min_buffer_size_;
702  unique_lock<mutex> lock(free_buffers_lock_);
703  if (!FLAGS_disable_mem_pools && free_buffers_[idx].size() < FLAGS_max_free_io_buffers) {
704  free_buffers_[idx].push_back(buffer);
707  }
708  } else {
709  process_mem_tracker_->Release(buffer_size);
711  delete[] buffer;
713  ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-1L);
714  }
716  ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-buffer_size);
717  }
718  }
719 }
720 
721 // This function gets the next RequestRange to work on for this disk. It checks for
722 // cancellation and
723 // a) Updates ready_to_start_ranges if there are no scan ranges queued for this disk.
724 // b) Adds an unstarted write range to in_flight_ranges_. The write range is processed
725 // immediately if there are no preceding scan ranges in in_flight_ranges_
726 // It blocks until work is available or the thread is shut down.
727 // Work is available if there is a RequestContext with
728 // - A ScanRange with a buffer available, or
729 // - A WriteRange in unstarted_write_ranges_.
731  RequestContext** request_context) {
732  int disk_id = disk_queue->disk_id;
733  *range = NULL;
734 
735  // This loops returns either with work to do or when the disk IoMgr shuts down.
736  while (true) {
737  *request_context = NULL;
738  RequestContext::PerDiskState* request_disk_state = NULL;
739  {
740  unique_lock<mutex> disk_lock(disk_queue->lock);
741 
742  while (!shut_down_ && disk_queue->request_contexts.empty()) {
743  // wait if there are no readers on the queue
744  disk_queue->work_available.wait(disk_lock);
745  }
746  if (shut_down_) break;
747  DCHECK(!disk_queue->request_contexts.empty());
748 
749  // Get the next reader and remove the reader so that another disk thread
750  // can't pick it up. It will be enqueued before issuing the read to HDFS
751  // so this is not a big deal (i.e. multiple disk threads can read for the
752  // same reader).
753  // TODO: revisit.
754  *request_context = disk_queue->request_contexts.front();
755  disk_queue->request_contexts.pop_front();
756  DCHECK(*request_context != NULL);
757  request_disk_state = &((*request_context)->disk_states_[disk_id]);
758  request_disk_state->IncrementRequestThreadAndDequeue();
759  }
760 
761  // NOTE: no locks were taken in between. We need to be careful about what state
762  // could have changed to the reader and disk in between.
763  // There are some invariants here. Only one disk thread can have the
764  // same reader here (the reader is removed from the queue). There can be
765  // other disk threads operating on this reader in other functions though.
766 
767  // We just picked a reader, check the mem limits.
768  // TODO: we can do a lot better here. The reader can likely make progress
769  // with fewer io buffers.
770  bool process_limit_exceeded = process_mem_tracker_->LimitExceeded();
771  bool reader_limit_exceeded = (*request_context)->mem_tracker_ != NULL
772  ? (*request_context)->mem_tracker_->AnyLimitExceeded() : false;
773 
774  if (process_limit_exceeded || reader_limit_exceeded) {
775  (*request_context)->Cancel(Status::MEM_LIMIT_EXCEEDED);
776  }
777 
778  unique_lock<mutex> request_lock((*request_context)->lock_);
779  VLOG_FILE << "Disk (id=" << disk_id << ") reading for "
780  << (*request_context)->DebugString();
781 
782  // Check if reader has been cancelled
783  if ((*request_context)->state_ == RequestContext::Cancelled) {
784  request_disk_state->DecrementRequestThreadAndCheckDone(*request_context);
785  continue;
786  }
787 
788  DCHECK_EQ((*request_context)->state_, RequestContext::Active)
789  << (*request_context)->DebugString();
790 
791  if (request_disk_state->next_scan_range_to_start() == NULL &&
792  !request_disk_state->unstarted_scan_ranges()->empty()) {
793  // We don't have a range queued for this disk for what the caller should
794  // read next. Populate that. We want to have one range waiting to minimize
795  // wait time in GetNextRange.
796  ScanRange* new_range = request_disk_state->unstarted_scan_ranges()->Dequeue();
797  --(*request_context)->num_unstarted_scan_ranges_;
798  (*request_context)->ready_to_start_ranges_.Enqueue(new_range);
799  request_disk_state->set_next_scan_range_to_start(new_range);
800 
801  if ((*request_context)->num_unstarted_scan_ranges_ == 0) {
802  // All the ranges have been started, notify everyone blocked on GetNextRange.
803  // Only one of them will get work so make sure to return NULL to the other
804  // caller threads.
805  (*request_context)->ready_to_start_ranges_cv_.notify_all();
806  } else {
807  (*request_context)->ready_to_start_ranges_cv_.notify_one();
808  }
809  }
810 
811  // Always enqueue a WriteRange to be processed into in_flight_ranges_.
812  // This is done so in_flight_ranges_ does not exclusively contain ScanRanges.
813  // For now, enqueuing a WriteRange on each invocation of GetNextRequestRange()
814  // does not flood in_flight_ranges() with WriteRanges because the entire
815  // WriteRange is processed and removed from the queue after GetNextRequestRange()
816  // returns. (A DCHECK is used to ensure that writes do not exceed 8MB).
817  if (!request_disk_state->unstarted_write_ranges()->empty()) {
818  WriteRange* write_range = request_disk_state->unstarted_write_ranges()->Dequeue();
819  request_disk_state->in_flight_ranges()->Enqueue(write_range);
820  }
821 
822  // Get the next scan range to work on from the reader. Only in_flight_ranges
823  // are eligible since the disk threads do not start new ranges on their own.
824 
825  // There are no inflight ranges, nothing to do.
826  if (request_disk_state->in_flight_ranges()->empty()) {
827  request_disk_state->DecrementRequestThread();
828  continue;
829  }
830  DCHECK_GT(request_disk_state->num_remaining_ranges(), 0);
831  *range = request_disk_state->in_flight_ranges()->Dequeue();
832  DCHECK(*range != NULL);
833 
834  // Now that we've picked a request range, put the context back on the queue so
835  // another thread can pick up another request range for this context.
836  request_disk_state->ScheduleContext(*request_context, disk_id);
837  DCHECK((*request_context)->Validate()) << endl << (*request_context)->DebugString();
838  return true;
839  }
840 
841  DCHECK(shut_down_);
842  return false;
843 }
844 
846  const Status& write_status) {
847  // Execute the callback before decrementing the thread count. Otherwise CancelContext()
848  // that waits for the disk ref count to be 0 will return, creating a race, e.g.
849  // between BufferedBlockMgr::WriteComplete() and BufferedBlockMgr::~BufferedBlockMgr().
850  // See IMPALA-1890.
851  // The status of the write does not affect the status of the writer context.
852  write_range->callback_(write_status);
853  {
854  unique_lock<mutex> writer_lock(writer->lock_);
855  DCHECK(writer->Validate()) << endl << writer->DebugString();
856  RequestContext::PerDiskState& state = writer->disk_states_[write_range->disk_id_];
857  if (writer->state_ == RequestContext::Cancelled) {
859  } else {
860  state.DecrementRequestThread();
861  }
862  --state.num_remaining_ranges();
863  }
864 }
865 
867  BufferDescriptor* buffer) {
868  unique_lock<mutex> reader_lock(reader->lock_);
869 
870  RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
871  DCHECK(reader->Validate()) << endl << reader->DebugString();
872  DCHECK_GT(state.num_threads_in_op(), 0);
873  DCHECK(buffer->buffer_ != NULL);
874 
875  if (reader->state_ == RequestContext::Cancelled) {
877  DCHECK(reader->Validate()) << endl << reader->DebugString();
878  ReturnFreeBuffer(buffer);
879  buffer->buffer_ = NULL;
880  buffer->scan_range_->Cancel(reader->status_);
881  // Enqueue the buffer to use the scan range's buffer cleanup path.
882  buffer->scan_range_->EnqueueBuffer(buffer);
883  return;
884  }
885 
886  DCHECK_EQ(reader->state_, RequestContext::Active);
887  DCHECK(buffer->buffer_ != NULL);
888 
889  // Update the reader's scan ranges. There are a three cases here:
890  // 1. Read error
891  // 2. End of scan range
892  // 3. Middle of scan range
893  if (!buffer->status_.ok()) {
894  // Error case
895  ReturnFreeBuffer(buffer);
896  buffer->eosr_ = true;
897  --state.num_remaining_ranges();
898  buffer->scan_range_->Cancel(buffer->status_);
899  } else if (buffer->eosr_) {
900  --state.num_remaining_ranges();
901  }
902 
903  bool queue_full = buffer->scan_range_->EnqueueBuffer(buffer);
904  if (buffer->eosr_) {
905  // For cached buffers, we can't close the range until the cached buffer is returned.
906  // Close() is called from DiskIoMgr::ReturnBuffer().
907  if (buffer->scan_range_->cached_buffer_ == NULL) {
908  buffer->scan_range_->Close();
909  }
910  } else {
911  if (queue_full) {
912  reader->blocked_ranges_.Enqueue(buffer->scan_range_);
913  } else {
914  reader->ScheduleScanRange(buffer->scan_range_);
915  }
916  }
917  state.DecrementRequestThread();
918 }
919 
920 void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
921  // The thread waits until there is work or the entire system is being shut down.
922  // If there is work, performs the read or write requested and re-enqueues the
923  // requesting context.
924  // Locks are not taken when reading from or writing to disk.
925  // The main loop has three parts:
926  // 1. GetNextRequestContext(): get the next request context (read or write) to
927  // process and dequeue it.
928  // 2. For the dequeued request, gets the next scan- or write-range to process and
929  // re-enqueues the request.
930  // 3. Perform the read or write as specified.
931  // Cancellation checking needs to happen in both steps 1 and 3.
932  while (true) {
933  RequestContext* worker_context = NULL;;
934  RequestRange* range = NULL;
935 
936  if (!GetNextRequestRange(disk_queue, &range, &worker_context)) {
937  DCHECK(shut_down_);
938  break;
939  }
940 
941  if (range->request_type() == RequestType::READ) {
942  ReadRange(disk_queue, worker_context, static_cast<ScanRange*>(range));
943  } else {
944  DCHECK(range->request_type() == RequestType::WRITE);
945  Write(worker_context, static_cast<WriteRange*>(range));
946  }
947  }
948 
949  DCHECK(shut_down_);
950 }
951 
952 // This function reads the specified scan range associated with the
953 // specified reader context and disk queue.
955  ScanRange* range) {
956  char* buffer = NULL;
957  int64_t bytes_remaining = range->len_ - range->bytes_read_;
958  DCHECK_GT(bytes_remaining, 0);
959  int64_t buffer_size = ::min(bytes_remaining, static_cast<int64_t>(max_buffer_size_));
960  bool enough_memory = true;
961  if (reader->mem_tracker_ != NULL) {
962  enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
963  if (!enough_memory) {
964  // Low memory, GC and try again.
965  GcIoBuffers();
966  enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
967  }
968  }
969 
970  if (!enough_memory) {
971  RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
972  unique_lock<mutex> reader_lock(reader->lock_);
973 
974  // Just grabbed the reader lock, check for cancellation.
975  if (reader->state_ == RequestContext::Cancelled) {
976  DCHECK(reader->Validate()) << endl << reader->DebugString();
978  range->Cancel(reader->status_);
979  DCHECK(reader->Validate()) << endl << reader->DebugString();
980  return;
981  }
982 
983  if (!range->ready_buffers_.empty()) {
984  // We have memory pressure and this range doesn't need another buffer
985  // (it already has one queued). Skip this range and pick it up later.
986  range->blocked_on_queue_ = true;
987  reader->blocked_ranges_.Enqueue(range);
988  state.DecrementRequestThread();
989  return;
990  } else {
991  // We need to get a buffer anyway since there are none queued. The query
992  // is likely to fail due to mem limits but there's nothing we can do about that
993  // now.
994  }
995  }
996 
997  buffer = GetFreeBuffer(&buffer_size);
998  ++reader->num_used_buffers_;
999 
1000  // Validate more invariants.
1001  DCHECK_GT(reader->num_used_buffers_, 0);
1002  DCHECK(range != NULL);
1003  DCHECK(reader != NULL);
1004  DCHECK(buffer != NULL);
1005 
1006  BufferDescriptor* buffer_desc = GetBufferDesc(reader, range, buffer, buffer_size);
1007  DCHECK(buffer_desc != NULL);
1008 
1009  // No locks in this section. Only working on local vars. We don't want to hold a
1010  // lock across the read call.
1011  buffer_desc->status_ = range->Open();
1012  if (buffer_desc->status_.ok()) {
1013  // Update counters.
1014  if (reader->active_read_thread_counter_) {
1015  reader->active_read_thread_counter_->Add(1L);
1016  }
1017  if (reader->disks_accessed_bitmap_) {
1018  int64_t disk_bit = 1 << disk_queue->disk_id;
1019  reader->disks_accessed_bitmap_->BitOr(disk_bit);
1020  }
1022  SCOPED_TIMER(reader->read_timer_);
1023 
1024  buffer_desc->status_ = range->Read(buffer, &buffer_desc->len_, &buffer_desc->eosr_);
1025  buffer_desc->scan_range_offset_ = range->bytes_read_ - buffer_desc->len_;
1026 
1027  if (reader->bytes_read_counter_ != NULL) {
1028  COUNTER_ADD(reader->bytes_read_counter_, buffer_desc->len_);
1029  }
1030 
1031  COUNTER_ADD(&total_bytes_read_counter_, buffer_desc->len_);
1032  if (reader->active_read_thread_counter_) {
1033  reader->active_read_thread_counter_->Add(-1L);
1034  }
1035  }
1036 
1037  // Finished read, update reader/disk based on the results
1038  HandleReadFinished(disk_queue, reader, buffer_desc);
1039 }
1040 
1041 void DiskIoMgr::Write(RequestContext* writer_context, WriteRange* write_range) {
1042  FILE* file_handle = fopen(write_range->file(), "rb+");
1043  Status ret_status;
1044  if (file_handle == NULL) {
1045  ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
1046  Substitute("fopen($0, \"rb+\") failed with errno=$1 description=$2",
1047  write_range->file_, errno, GetStrErrMsg())));
1048  } else {
1049  ret_status = WriteRangeHelper(file_handle, write_range);
1050 
1051  int success = fclose(file_handle);
1052  if (ret_status.ok() && success != 0) {
1053  ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR, Substitute("fclose($0) failed",
1054  write_range->file_)));
1055  }
1056  }
1057 
1058  HandleWriteFinished(writer_context, write_range, ret_status);
1059 }
1060 
1061 Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, WriteRange* write_range) {
1062  // First ensure that disk space is allocated via fallocate().
1063  int file_desc = fileno(file_handle);
1064  int success = 0;
1065  if (write_range->len_ > 0) {
1066  success = posix_fallocate(file_desc, write_range->offset(), write_range->len_);
1067  }
1068  if (success != 0) {
1069  return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
1070  Substitute("posix_fallocate($0, $1, $2) failed for file $3"
1071  " with returnval=$4 description=$5", file_desc, write_range->offset(),
1072  write_range->len_, write_range->file_, success, GetStrErrMsg())));
1073  }
1074  // Seek to the correct offset and perform the write.
1075  success = fseek(file_handle, write_range->offset(), SEEK_SET);
1076  if (success != 0) {
1077  return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
1078  Substitute("fseek($0, $1, SEEK_SET) failed with errno=$2 description=$3",
1079  write_range->file_, write_range->offset(), errno, GetStrErrMsg())));
1080  }
1081 
1082  int64_t bytes_written = fwrite(write_range->data_, 1, write_range->len_, file_handle);
1083  if (bytes_written < write_range->len_) {
1084  return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
1085  Substitute("fwrite(buffer, 1, $0, $1) failed with errno=$2 description=$3",
1086  write_range->len_, write_range->file_, errno, GetStrErrMsg())));
1087  }
1089  ImpaladMetrics::IO_MGR_BYTES_WRITTEN->Increment(write_range->len_);
1090  }
1091 
1092  return Status::OK;
1093 }
1094 
1095 int DiskIoMgr::free_buffers_idx(int64_t buffer_size) {
1096  int64_t buffer_size_scaled = BitUtil::Ceil(buffer_size, min_buffer_size_);
1097  int idx = BitUtil::Log2(buffer_size_scaled);
1098  DCHECK_GE(idx, 0);
1099  DCHECK_LT(idx, free_buffers_.size());
1100  return idx;
1101 }
1102 
1104  DCHECK_LE(write_range->len(), max_buffer_size_);
1105  unique_lock<mutex> writer_lock(writer->lock_);
1106 
1107  if (writer->state_ == RequestContext::Cancelled) {
1108  DCHECK(!writer->status_.ok());
1109  return writer->status_;
1110  }
1111 
1112  writer->AddRequestRange(write_range, false);
1113  return Status::OK;
1114 }
1115 
1116 int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) {
1117  // TODO: add a queue for remote HDFS accesses.
1118  if (IsS3APath(file)) {
1119  DCHECK(!expected_local);
1120  return RemoteS3DiskId();
1121  }
1122  if (disk_id == -1) {
1123  // disk id is unknown, assign it a random one.
1124  static int next_disk_id = 0;
1125  disk_id = next_disk_id++;
1126  }
1127  // TODO: we need to parse the config for the number of dirs configured for this
1128  // data node.
1129  return disk_id % num_local_disks();
1130 }
std::string DebugString() const
Dumps out reader information. Lock should be taken by caller.
bool IsS3APath(const char *path)
Returns true iff the path refers to a location on an S3A filesystem.
Definition: hdfs-util.cc:72
void ReturnBufferDesc(BufferDescriptor *desc)
Returns a buffer desc object which can now be used for another reader.
Definition: disk-io-mgr.cc:602
void CancelContext(RequestContext *context, bool wait_for_disks_completion=false)
Definition: disk-io-mgr.cc:377
RequestType::type request_type() const
Definition: disk-io-mgr.h:270
int num_remote_ranges(RequestContext *reader) const
Definition: disk-io-mgr.cc:432
AtomicInt< int > num_buffers_in_readers_
Total number of buffers in readers.
Definition: disk-io-mgr.h:696
Status status_
Status of the read to this buffer. if status is not ok, 'buffer' is NULL.
Definition: disk-io-mgr.h:247
static IntGauge * IO_MGR_TOTAL_BYTES
char * buffer_
buffer with the read contents
Definition: disk-io-mgr.h:235
list< RequestContext * > all_contexts_
Definition: disk-io-mgr.cc:125
InternalQueue< ScanRange > blocked_ranges_
Ranges that are blocked due to back pressure on outgoing buffers.
void Write(RequestContext *writer_context, WriteRange *write_range)
MemTracker tracker
boost::function< void(const Status &)> WriteDoneCallback
Definition: disk-io-mgr.h:464
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
int AssignQueue(const char *file, int disk_id, bool expected_local)
void set_active_read_thread_counter(RequestContext *, RuntimeProfile::Counter *)
Definition: disk-io-mgr.cc:397
static IntGauge * IO_MGR_NUM_UNUSED_BUFFERS
Status status_
Status of this reader. Set to non-ok if cancelled.
AtomicInt< int > num_allocated_buffers_
Total number of allocated buffers, used for debugging.
Definition: disk-io-mgr.h:693
const int num_threads_per_disk_
Definition: disk-io-mgr.h:643
int64_t queue_size(RequestContext *reader) const
Definition: disk-io-mgr.cc:407
static IntGauge * IO_MGR_BYTES_WRITTEN
int64_t len_
Length of data read or written.
Definition: disk-io-mgr.h:283
boost::scoped_ptr< RequestContextCache > request_context_cache_
Definition: disk-io-mgr.h:671
void DecrementRequestThreadAndCheckDone(RequestContext *context)
const int min_buffer_size_
The minimum size of each read buffer.
Definition: disk-io-mgr.h:649
int64_t GetReadThroughput()
Definition: disk-io-mgr.cc:440
int64_t bytes_read_dn_cache(RequestContext *reader) const
Definition: disk-io-mgr.cc:428
boost::condition_variable disks_complete_cond_var_
Condition variable for UnregisterContext() to wait for all disks to complete.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
struct hadoopRzOptions * cached_read_options_
Options object for cached hdfs reads. Set on startup and never modified.
Definition: disk-io-mgr.h:655
WriteRange(const std::string &file, int64_t file_offset, int disk_id, WriteDoneCallback callback)
Definition: disk-io-mgr.cc:196
static int num_disks()
Returns the number of (logical) disks on the system.
Definition: disk-info.h:38
boost::mutex lock_
protects all fields below
Definition: coordinator.h:233
const InternalQueue< WriteRange > * unstarted_write_ranges() const
Status AddWriteRange(RequestContext *writer, WriteRange *write_range)
std::list< BufferDescriptor * > free_buffer_descs_
List of free buffer desc objects that can be handed out to clients.
Definition: disk-io-mgr.h:690
int64_t SpareCapacity() const
Definition: mem-tracker.h:270
list< RequestContext * > inactive_contexts_
Definition: disk-io-mgr.cc:128
friend class BufferDescriptor
Definition: disk-io-mgr.h:629
static const int LOW_MEMORY
Definition: disk-io-mgr.cc:59
void UnregisterContext(RequestContext *context)
Definition: disk-io-mgr.cc:344
const char * file() const
Definition: disk-io-mgr.h:266
int RemoteS3DiskId() const
The disk ID (and therefore disk_queues_ index) used for S3 accesses.
Definition: disk-io-mgr.h:602
Status WriteRangeHelper(FILE *file_handle, WriteRange *write_range)
Status AddScanRanges(RequestContext *reader, const std::vector< ScanRange * > &ranges, bool schedule_immediately=false)
Definition: disk-io-mgr.cc:455
AtomicInt< int64_t > bytes_read_dn_cache_
Total number of bytes read from date node cache, updated at end of each range scan.
static int64_t UnitsPerSecond(const Counter *total_counter, const Counter *timer)
Derived counter function: return measured throughput as input_value/second.
void AddGcFunction(GcFunction f)
Definition: mem-tracker.h:313
Status GetNextRange(RequestContext *reader, ScanRange **range)
Definition: disk-io-mgr.cc:501
volatile bool shut_down_
Definition: disk-io-mgr.h:659
#define COUNTER_ADD(c, v)
Status Read(RequestContext *reader, ScanRange *range, BufferDescriptor **buffer)
Definition: disk-io-mgr.cc:555
void Reset(RequestContext *reader, ScanRange *range, char *buffer, int64_t buffer_len)
Resets the buffer descriptor state for a new reader, range and data buffer.
Definition: disk-io-mgr.cc:165
int free_buffers_idx(int64_t buffer_size)
Returns the index into free_buffers_ for a given buffer size.
#define SCOPED_TIMER(c)
RuntimeProfile::Counter * disks_accessed_bitmap_
static bool is_rotational(int disk_id)
Definition: disk-info.h:70
Status Read(char *buffer, int64_t *bytes_read, bool *eosr)
void ReturnBuffer(BufferDescriptor *buffer)
Definition: disk-io-mgr.cc:575
InternalQueue< ScanRange > cached_ranges_
bool EnqueueBuffer(BufferDescriptor *buffer)
RuntimeProfile::Counter * active_read_thread_counter_
Number of active read threads.
void SetData(const uint8_t *buffer, int64_t len)
Definition: disk-io-mgr.cc:205
Status Open()
Opens the file for this range. This function only modifies state in this range.
int64_t buffer_len_
length of buffer_. For buffers from cached reads, the length is 0.
Definition: disk-io-mgr.h:238
static const int THREADS_PER_FLASH_DISK
Definition: disk-io-mgr.cc:54
static void CheckSseSupport()
Definition: disk-io-mgr.cc:210
WriteDoneCallback callback_
Callback to invoke after the write is complete.
Definition: disk-io-mgr.h:480
std::string file_
Path to file being read or written.
Definition: disk-io-mgr.h:277
int64_t bytes_read_short_circuit(RequestContext *reader) const
Definition: disk-io-mgr.cc:424
void AddRequestRange(DiskIoMgr::RequestRange *range, bool schedule_immediately)
int num_local_disks() const
Returns the number of local disks attached to the system.
Definition: disk-io-mgr.h:599
DiskIoMgr()
Create DiskIoMgr with default configs.
Definition: disk-io-mgr.cc:219
bool eosr_
true if the current scan range is complete
Definition: disk-io-mgr.h:244
static const int DEFAULT_QUEUE_CAPACITY
Definition: disk-io-mgr.h:619
string GetStrErrMsg()
Definition: error-util.cc:30
void SetMemTracker(MemTracker *tracker)
Definition: disk-io-mgr.cc:187
void ReturnFreeBuffer(char *buffer, int64_t buffer_size)
Definition: disk-io-mgr.cc:696
AtomicInt< int64_t > bytes_read_short_circuit_
Total number of bytes read via short circuit read, updated at end of each range scan.
std::string DebugString(const T &val)
Definition: udf-debug.h:27
void set_disks_access_bitmap(RequestContext *, RuntimeProfile::Counter *)
Definition: disk-io-mgr.cc:402
void HandleReadFinished(DiskQueue *, RequestContext *, BufferDescriptor *)
Definition: disk-io-mgr.cc:866
static const int64_t SSE4_2
Definition: cpu-info.h:34
MemTracker * process_mem_tracker_
Process memory tracker; needed to account for io buffers.
Definition: disk-io-mgr.h:639
DECLARE_bool(disable_mem_pools)
std::list< RequestContext * > request_contexts
list of all request contexts that have work queued on this disk
void HandleWriteFinished(RequestContext *writer, WriteRange *write_range, const Status &write_status)
Definition: disk-io-mgr.cc:845
void set_bytes_read_counter(RequestContext *, RuntimeProfile::Counter *)
Definition: disk-io-mgr.cc:393
static int Ceil(int value, int divisor)
Returns the ceil of value/divisor.
Definition: bit-util.h:32
const InternalQueue< RequestRange > * in_flight_ranges() const
ThreadGroup disk_thread_group_
Thread group containing all the worker threads.
Definition: disk-io-mgr.h:652
Reader is initialized and maps to a client.
boost::mutex free_buffers_lock_
Protects free_buffers_ and free_buffer_descs_.
Definition: disk-io-mgr.h:674
This class is thread-safe.
Definition: mem-tracker.h:61
void ScheduleContext(RequestContext *context, int disk_id)
void Release(int64_t bytes)
Decreases consumption of this tracker and its ancestors by 'bytes'.
Definition: mem-tracker.h:209
RuntimeProfile::Counter total_bytes_read_counter_
Total bytes read by the IoMgr.
Definition: disk-io-mgr.h:662
boost::mutex lock
Lock that protects access to 'request_contexts' and 'work_available'.
void ReturnContext(RequestContext *reader)
Definition: disk-io-mgr.cc:74
bool Validate() const
Validates invariants of reader. Reader lock must be taken beforehand.
RequestContext * reader_
Reader that this buffer is for.
Definition: disk-io-mgr.h:226
int64_t len_
length of read contents
Definition: disk-io-mgr.h:241
static const Status CANCELLED
Definition: status.h:88
std::vector< DiskQueue * > disk_queues_
Definition: disk-io-mgr.h:701
const int max_buffer_size_
Maximum read size. This is also the maximum size of each allocated buffer.
Definition: disk-io-mgr.h:646
Status Init(MemTracker *process_mem_tracker)
Initialize the IoMgr. Must be called once before any of the other APIs.
Definition: disk-io-mgr.cc:296
int64_t bytes_read_local(RequestContext *reader) const
Definition: disk-io-mgr.cc:420
static IntGauge * IO_MGR_NUM_BUFFERS
static const Status MEM_LIMIT_EXCEEDED
Definition: status.h:89
bool is_cancelled_
If true, this scan range has been cancelled.
Definition: disk-io-mgr.h:445
static const int THREADS_PER_ROTATIONAL_DISK
Definition: disk-io-mgr.cc:53
Status RegisterContext(RequestContext **request_context, MemTracker *reader_mem_tracker=NULL)
Definition: disk-io-mgr.cc:336
RuntimeProfile::Counter * bytes_read_counter_
Total bytes read for this reader.
boost::condition_variable ready_to_start_ranges_cv_
void ReadRange(DiskQueue *disk_queue, RequestContext *reader, ScanRange *range)
Reads the specified scan range and calls HandleReadFinished when done.
Definition: disk-io-mgr.cc:954
State state_
Current state of the reader.
char * GetFreeBuffer(int64_t *buffer_size)
Definition: disk-io-mgr.cc:627
std::string DebugString()
Dumps the disk IoMgr queues (for readers and disks)
Definition: disk-io-mgr.cc:142
RuntimeProfile::Counter * read_timer_
Total time spent in hdfs reading.
AtomicInt< int64_t > bytes_read_local_
Total number of bytes read locally, updated at end of each range scan.
Status GetNext(BufferDescriptor **buffer)
ScanRange * scan_range_
Scan range that this buffer is for.
Definition: disk-io-mgr.h:232
int64_t unexpected_remote_bytes(RequestContext *reader) const
Definition: disk-io-mgr.cc:436
static const Status OK
Definition: status.h:87
void Consume(int64_t bytes)
Increases consumption of this tracker and its ancestors by 'bytes'.
Definition: mem-tracker.h:118
Status AddThread(Thread *thread)
Definition: thread.cc:318
std::vector< PerDiskState > disk_states_
void set_read_timer(RequestContext *, RuntimeProfile::Counter *)
Definition: disk-io-mgr.cc:389
std::list< BufferDescriptor * > ready_buffers_
Definition: disk-io-mgr.h:428
boost::condition_variable work_available
int disk_id_
Id of disk containing file_;.
Definition: disk-io-mgr.h:286
BufferDescriptor * GetBufferDesc(RequestContext *reader, ScanRange *range, char *buffer, int64_t buffer_size)
Definition: disk-io-mgr.cc:610
void ScheduleScanRange(DiskIoMgr::ScanRange *range)
ObjectPool pool_
Pool to allocate BufferDescriptors.
Definition: disk-io-mgr.h:636
Status ValidateScanRange(ScanRange *range)
Validates that range is correctly initialized.
Definition: disk-io-mgr.cc:444
#define VLOG_FILE
Definition: logging.h:58
virtual void Add(int64_t delta)
bool GetNextRequestRange(DiskQueue *disk_queue, RequestRange **range, RequestContext **request_context)
Definition: disk-io-mgr.cc:730
DEFINE_int32(periodic_counter_update_period_ms, 500,"Period to update rate counters and"" sampling counters in ms")
AtomicInt< int64_t > unexpected_remote_bytes_
Total number of bytes from remote reads that were expected to be local.
std::vector< std::list< char * > > free_buffers_
Definition: disk-io-mgr.h:687
int bytes_read_
Number of bytes read so far for this scan range.
Definition: disk-io-mgr.h:408
void Cancel(const Status &status)
Cancels the context with status code 'status'.
bool ok() const
Definition: status.h:172
MemTracker * mem_tracker_
Memory used for this reader. This is unowned by this object.
struct hadoopRzBuffer * cached_buffer_
Definition: disk-io-mgr.h:401
int num_unstarted_ranges(RequestContext *reader) const
Returns the number of unstarted scan ranges for this reader.
Definition: disk-io-mgr.cc:416
static bool IsSupported(long flag)
Returns whether of not the cpu supports this flag.
Definition: cpu-info.h:58
void BitOr(int64_t delta)
Use this to update if the counter is a bitmap.
void WorkLoop(DiskQueue *queue)
Definition: disk-io-mgr.cc:920
Status ReadFromCache(bool *read_succeeded)
const InternalQueue< ScanRange > * unstarted_scan_ranges() const
void Cancel(const Status &status)
RuntimeProfile::Counter read_timer_
Total time spent in hdfs reading.
Definition: disk-io-mgr.h:665
Status context_status(RequestContext *context) const
Definition: disk-io-mgr.cc:411
static int Log2(uint64_t x)
Definition: bit-util.h:135
void Close()
Closes the file for this range. This function only modifies state in this range.
InternalQueue< ScanRange > ready_to_start_ranges_