Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
disk-io-mgr.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_RUNTIME_DISK_IO_MGR_H
17 #define IMPALA_RUNTIME_DISK_IO_MGR_H
18 
19 #include <list>
20 #include <vector>
21 #include <boost/scoped_ptr.hpp>
22 #include <boost/unordered_set.hpp>
23 #include <boost/thread/mutex.hpp>
24 #include <boost/thread/condition_variable.hpp>
25 #include <boost/thread/thread.hpp>
26 
27 #include "common/atomic.h"
28 #include "common/hdfs.h"
29 #include "common/object-pool.h"
30 #include "common/status.h"
32 #include "util/bit-util.h"
33 #include "util/error-util.h"
34 #include "util/internal-queue.h"
35 #include "util/runtime-profile.h"
36 #include "util/thread.h"
37 
38 namespace impala {
39 
40 class MemTracker;
41 
45 //
50 //
53 //
67 //
74 //
82 //
88 //
99 //
111 //
125 //
131 //
136 //
143 //
164 //
173 //
179 //
188 class DiskIoMgr {
189  public:
190  class RequestContext;
191  class ScanRange;
192 
197  public:
199  char* buffer() { return buffer_; }
200  int64_t buffer_len() { return buffer_len_; }
201  int64_t len() { return len_; }
202  bool eosr() { return eosr_; }
203 
205  int64_t scan_range_offset() const { return scan_range_offset_; }
206 
210 
213  void Return();
214 
215  private:
216  friend class DiskIoMgr;
217  BufferDescriptor(DiskIoMgr* io_mgr);
218 
220  void Reset(RequestContext* reader, ScanRange* range, char* buffer,
221  int64_t buffer_len);
222 
224 
227 
230 
233 
235  char* buffer_;
236 
238  int64_t buffer_len_;
239 
241  int64_t len_;
242 
244  bool eosr_;
245 
248 
250  };
251 
253  struct RequestType {
254  enum type {
257  };
258  };
259 
263  class RequestRange : public InternalQueue<RequestRange>::Node {
264  public:
265  hdfsFS fs() const { return fs_; }
266  const char* file() const { return file_.c_str(); }
267  int64_t offset() const { return offset_; }
268  int64_t len() const { return len_; }
269  int disk_id() const { return disk_id_; }
271 
272  protected:
274  hdfsFS fs_;
275 
277  std::string file_;
278 
280  int64_t offset_;
281 
283  int64_t len_;
284 
286  int disk_id_;
287 
290  };
291 
295  class ScanRange : public RequestRange {
296  public:
297 
299  const static int64_t NEVER_CACHE = -1;
300 
302  ScanRange(int initial_capacity = -1);
303 
304  virtual ~ScanRange();
305 
309  void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
310  bool try_cache, bool expected_local, int64_t mtime, void* metadata = NULL);
311 
312  void* meta_data() const { return meta_data_; }
313  bool try_cache() const { return try_cache_; }
314  bool expected_local() const { return expected_local_; }
316 
322  Status GetNext(BufferDescriptor** buffer);
323 
328  void Cancel(const Status& status);
329 
331  std::string DebugString() const;
332 
333  int64_t mtime() const { return mtime_; }
334 
335  private:
336  friend class DiskIoMgr;
337 
339  void InitInternal(DiskIoMgr* io_mgr, RequestContext* reader);
340 
343  bool EnqueueBuffer(BufferDescriptor* buffer);
344 
347  void CleanupQueuedBuffers();
348 
351  bool Validate();
352 
354  int64_t MaxReadChunkSize() const;
355 
357  Status Open();
358 
360  void Close();
361 
364  Status Read(char* buffer, int64_t* bytes_read, bool* eosr);
365 
370  Status ReadFromCache(bool* read_succeeded);
371 
374  void* meta_data_;
375 
380 
387 
389 
392 
394  union {
395  FILE* local_file_;
396  hdfsFile hdfs_file_;
397  };
398 
401  struct hadoopRzBuffer* cached_buffer_;
402 
405  boost::mutex lock_;
406 
409 
414 
417 
420 
424 
427  boost::condition_variable buffer_ready_cv_;
428  std::list<BufferDescriptor*> ready_buffers_;
429 
435 
442  boost::mutex hdfs_lock_;
443 
446 
448  int64_t mtime_;
449  };
450 
455  class WriteRange : public RequestRange {
456  public:
464  typedef boost::function<void (const Status&)> WriteDoneCallback;
465  WriteRange(const std::string& file, int64_t file_offset, int disk_id,
466  WriteDoneCallback callback);
467 
470  void SetData(const uint8_t* buffer, int64_t len);
471 
472  private:
473  friend class DiskIoMgr;
474 
477  const uint8_t* data_;
478 
481  };
482 
490  DiskIoMgr(int num_disks, int threads_per_disk, int min_buffer_size,
491  int max_buffer_size);
492 
494  DiskIoMgr();
495 
498  ~DiskIoMgr();
499 
501  Status Init(MemTracker* process_mem_tracker);
502 
511  Status RegisterContext(RequestContext** request_context,
512  MemTracker* reader_mem_tracker = NULL);
513 
520  void UnregisterContext(RequestContext* context);
521 
530  void CancelContext(RequestContext* context, bool wait_for_disks_completion = false);
531 
538  Status AddScanRanges(RequestContext* reader, const std::vector<ScanRange*>& ranges,
539  bool schedule_immediately = false);
540 
543  Status AddWriteRange(RequestContext* writer, WriteRange* write_range);
544 
550  Status GetNextRange(RequestContext* reader, ScanRange** range);
551 
557  Status Read(RequestContext* reader, ScanRange* range, BufferDescriptor** buffer);
558 
563  int AssignQueue(const char* file, int disk_id, bool expected_local);
564 
567  Status context_status(RequestContext* context) const;
568 
570  int num_unstarted_ranges(RequestContext* reader) const;
571 
576 
577  int64_t queue_size(RequestContext* reader) const;
578  int64_t bytes_read_local(RequestContext* reader) const;
579  int64_t bytes_read_short_circuit(RequestContext* reader) const;
580  int64_t bytes_read_dn_cache(RequestContext* reader) const;
581  int num_remote_ranges(RequestContext* reader) const;
582  int64_t unexpected_remote_bytes(RequestContext* reader) const;
583 
587  int64_t GetReadThroughput();
588 
590  int max_read_buffer_size() const { return max_buffer_size_; }
591 
593  int num_total_disks() const { return disk_queues_.size(); }
594 
596  int num_remote_disks() const { return REMOTE_NUM_DISKS; }
597 
599  int num_local_disks() const { return num_total_disks() - num_remote_disks(); }
600 
603 
606 
609 
611  std::string DebugString();
612 
615  bool Validate() const;
616 
619  static const int DEFAULT_QUEUE_CAPACITY;
620 
623  enum {
626  };
627 
628  private:
629  friend class BufferDescriptor;
630  struct DiskQueue;
632 
633  friend class DiskIoMgrTest_Buffers_Test;
634 
637 
640 
644 
646  const int max_buffer_size_;
647 
649  const int min_buffer_size_;
650 
653 
655  struct hadoopRzOptions* cached_read_options_;
656 
659  volatile bool shut_down_;
660 
663 
666 
671  boost::scoped_ptr<RequestContextCache> request_context_cache_;
672 
674  boost::mutex free_buffers_lock_;
675 
680  //
687  std::vector<std::list<char*> > free_buffers_;
688 
690  std::list<BufferDescriptor*> free_buffer_descs_;
691 
694 
697 
701  std::vector<DiskQueue*> disk_queues_;
702 
704  int free_buffers_idx(int64_t buffer_size);
705 
711  RequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size);
712 
715 
718  void ReturnBuffer(BufferDescriptor* buffer);
719 
724  char* GetFreeBuffer(int64_t* buffer_size);
725 
730  void GcIoBuffers();
731 
735  void ReturnFreeBuffer(char* buffer, int64_t buffer_size);
736 
740 
744  void WorkLoop(DiskQueue* queue);
745 
751  bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
752  RequestContext** request_context);
753 
757 
764  void HandleWriteFinished(RequestContext* writer, WriteRange* write_range,
765  const Status& write_status);
766 
769 
772  void Write(RequestContext* writer_context, WriteRange* write_range);
773 
777  Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range);
778 
780  void ReadRange(DiskQueue* disk_queue, RequestContext* reader,
781  ScanRange* range);
782 };
783 
784 }
785 
786 #endif
void ReturnBufferDesc(BufferDescriptor *desc)
Returns a buffer desc object which can now be used for another reader.
Definition: disk-io-mgr.cc:602
void CancelContext(RequestContext *context, bool wait_for_disks_completion=false)
Definition: disk-io-mgr.cc:377
ScanRange(int initial_capacity=-1)
The initial queue capacity for this. Specify -1 to use IoMgr default.
RequestType::type request_type() const
Definition: disk-io-mgr.h:270
int num_remote_ranges(RequestContext *reader) const
Definition: disk-io-mgr.cc:432
AtomicInt< int > num_buffers_in_readers_
Total number of buffers in readers.
Definition: disk-io-mgr.h:696
Status status_
Status of the read to this buffer. if status is not ok, 'buffer' is NULL.
Definition: disk-io-mgr.h:247
char * buffer_
buffer with the read contents
Definition: disk-io-mgr.h:235
void Write(RequestContext *writer_context, WriteRange *write_range)
MemTracker * mem_tracker_
The current tracker this buffer is associated with.
Definition: disk-io-mgr.h:229
MemTracker tracker
int64_t offset_
Offset within file_ being read or written.
Definition: disk-io-mgr.h:280
boost::function< void(const Status &)> WriteDoneCallback
Definition: disk-io-mgr.h:464
int AssignQueue(const char *file, int disk_id, bool expected_local)
void set_active_read_thread_counter(RequestContext *, RuntimeProfile::Counter *)
Definition: disk-io-mgr.cc:397
AtomicInt< int > num_allocated_buffers_
Total number of allocated buffers, used for debugging.
Definition: disk-io-mgr.h:693
const int num_threads_per_disk_
Definition: disk-io-mgr.h:643
int64_t queue_size(RequestContext *reader) const
Definition: disk-io-mgr.cc:407
int64_t len_
Length of data read or written.
Definition: disk-io-mgr.h:283
The request type, read or write associated with a request range.
Definition: disk-io-mgr.h:253
boost::scoped_ptr< RequestContextCache > request_context_cache_
Definition: disk-io-mgr.h:671
const int min_buffer_size_
The minimum size of each read buffer.
Definition: disk-io-mgr.h:649
int64_t GetReadThroughput()
Definition: disk-io-mgr.cc:440
int64_t bytes_read_dn_cache(RequestContext *reader) const
Definition: disk-io-mgr.cc:428
struct hadoopRzOptions * cached_read_options_
Options object for cached hdfs reads. Set on startup and never modified.
Definition: disk-io-mgr.h:655
boost::condition_variable buffer_ready_cv_
Definition: disk-io-mgr.h:427
WriteRange(const std::string &file, int64_t file_offset, int disk_id, WriteDoneCallback callback)
Definition: disk-io-mgr.cc:196
std::list< BufferDescriptor * > free_buffer_descs_
List of free buffer desc objects that can be handed out to clients.
Definition: disk-io-mgr.h:690
Status AddWriteRange(RequestContext *writer, WriteRange *write_range)
int num_remote_disks() const
Returns the total number of remote "disk" queues.
Definition: disk-io-mgr.h:596
void InitInternal(DiskIoMgr *io_mgr, RequestContext *reader)
Initialize internal fields.
void UnregisterContext(RequestContext *context)
Definition: disk-io-mgr.cc:344
const char * file() const
Definition: disk-io-mgr.h:266
int RemoteS3DiskId() const
The disk ID (and therefore disk_queues_ index) used for S3 accesses.
Definition: disk-io-mgr.h:602
Status WriteRangeHelper(FILE *file_handle, WriteRange *write_range)
Status AddScanRanges(RequestContext *reader, const std::vector< ScanRange * > &ranges, bool schedule_immediately=false)
Definition: disk-io-mgr.cc:455
RequestType::type request_type_
The type of IO request, READ or WRITE.
Definition: disk-io-mgr.h:289
Status GetNextRange(RequestContext *reader, ScanRange **range)
Definition: disk-io-mgr.cc:501
volatile bool shut_down_
Definition: disk-io-mgr.h:659
int64_t scan_range_offset() const
Returns the offset within the scan range that this buffer starts at.
Definition: disk-io-mgr.h:205
Status Read(RequestContext *reader, ScanRange *range, BufferDescriptor **buffer)
Definition: disk-io-mgr.cc:555
void Reset(RequestContext *reader, ScanRange *range, char *buffer, int64_t buffer_len)
Resets the buffer descriptor state for a new reader, range and data buffer.
Definition: disk-io-mgr.cc:165
int free_buffers_idx(int64_t buffer_size)
Returns the index into free_buffers_ for a given buffer size.
hdfsFS fs_
Hadoop filesystem that contains file_, or set to NULL for local filesystem.
Definition: disk-io-mgr.h:274
Status Read(char *buffer, int64_t *bytes_read, bool *eosr)
int num_allocated_buffers() const
Returns the number of allocated buffers.
Definition: disk-io-mgr.h:605
void ReturnBuffer(BufferDescriptor *buffer)
Definition: disk-io-mgr.cc:575
bool EnqueueBuffer(BufferDescriptor *buffer)
void SetData(const uint8_t *buffer, int64_t len)
Definition: disk-io-mgr.cc:205
Status Open()
Opens the file for this range. This function only modifies state in this range.
int64_t buffer_len_
length of buffer_. For buffers from cached reads, the length is 0.
Definition: disk-io-mgr.h:238
WriteDoneCallback callback_
Callback to invoke after the write is complete.
Definition: disk-io-mgr.h:480
std::string file_
Path to file being read or written.
Definition: disk-io-mgr.h:277
int64_t bytes_read_short_circuit(RequestContext *reader) const
Definition: disk-io-mgr.cc:424
bool eosr_returned_
If true, the last buffer for this scan range has been returned.
Definition: disk-io-mgr.h:419
int num_local_disks() const
Returns the number of local disks attached to the system.
Definition: disk-io-mgr.h:599
int num_total_disks() const
Returns the total number of disk queues (both local and remote).
Definition: disk-io-mgr.h:593
DiskIoMgr()
Create DiskIoMgr with default configs.
Definition: disk-io-mgr.cc:219
bool eosr_
true if the current scan range is complete
Definition: disk-io-mgr.h:244
static const int DEFAULT_QUEUE_CAPACITY
Definition: disk-io-mgr.h:619
void SetMemTracker(MemTracker *tracker)
Definition: disk-io-mgr.cc:187
void ReturnFreeBuffer(char *buffer, int64_t buffer_size)
Definition: disk-io-mgr.cc:696
void set_disks_access_bitmap(RequestContext *, RuntimeProfile::Counter *)
Definition: disk-io-mgr.cc:402
void HandleReadFinished(DiskQueue *, RequestContext *, BufferDescriptor *)
Definition: disk-io-mgr.cc:866
bool eosr_queued_
If true, the last buffer for this scan range has been queued.
Definition: disk-io-mgr.h:416
int ready_buffers_capacity() const
Definition: disk-io-mgr.h:315
MemTracker * process_mem_tracker_
Process memory tracker; needed to account for io buffers.
Definition: disk-io-mgr.h:639
void HandleWriteFinished(RequestContext *writer, WriteRange *write_range, const Status &write_status)
Definition: disk-io-mgr.cc:845
void set_bytes_read_counter(RequestContext *, RuntimeProfile::Counter *)
Definition: disk-io-mgr.cc:393
ThreadGroup disk_thread_group_
Thread group containing all the worker threads.
Definition: disk-io-mgr.h:652
void Reset(hdfsFS fs, const char *file, int64_t len, int64_t offset, int disk_id, bool try_cache, bool expected_local, int64_t mtime, void *metadata=NULL)
boost::mutex free_buffers_lock_
Protects free_buffers_ and free_buffer_descs_.
Definition: disk-io-mgr.h:674
This class is thread-safe.
Definition: mem-tracker.h:61
RuntimeProfile::Counter total_bytes_read_counter_
Total bytes read by the IoMgr.
Definition: disk-io-mgr.h:662
RequestContext * reader_
Reader that this buffer is for.
Definition: disk-io-mgr.h:226
int64_t len_
length of read contents
Definition: disk-io-mgr.h:241
RequestContext * reader_
Reader/owner of the scan range.
Definition: disk-io-mgr.h:391
std::vector< DiskQueue * > disk_queues_
Definition: disk-io-mgr.h:701
const int max_buffer_size_
Maximum read size. This is also the maximum size of each allocated buffer.
Definition: disk-io-mgr.h:646
static const int64_t NEVER_CACHE
If the mtime is set to NEVER_CACHE, the file handle should never be cached.
Definition: disk-io-mgr.h:299
Status Init(MemTracker *process_mem_tracker)
Initialize the IoMgr. Must be called once before any of the other APIs.
Definition: disk-io-mgr.cc:296
int64_t bytes_read_local(RequestContext *reader) const
Definition: disk-io-mgr.cc:420
bool is_cancelled_
If true, this scan range has been cancelled.
Definition: disk-io-mgr.h:445
bool Validate() const
friend class DiskIoMgrTest_Buffers_Test
Definition: disk-io-mgr.h:631
Status RegisterContext(RequestContext **request_context, MemTracker *reader_mem_tracker=NULL)
Definition: disk-io-mgr.cc:336
void ReadRange(DiskQueue *disk_queue, RequestContext *reader, ScanRange *range)
Reads the specified scan range and calls HandleReadFinished when done.
Definition: disk-io-mgr.cc:954
T must be a subclass of InternalQueue::Node.
char * GetFreeBuffer(int64_t *buffer_size)
Definition: disk-io-mgr.cc:627
std::string DebugString()
Dumps the disk IoMgr queues (for readers and disks)
Definition: disk-io-mgr.cc:142
Status GetNext(BufferDescriptor **buffer)
ScanRange * scan_range_
Scan range that this buffer is for.
Definition: disk-io-mgr.h:232
int64_t unexpected_remote_bytes(RequestContext *reader) const
Definition: disk-io-mgr.cc:436
void set_read_timer(RequestContext *, RuntimeProfile::Counter *)
Definition: disk-io-mgr.cc:389
std::list< BufferDescriptor * > ready_buffers_
Definition: disk-io-mgr.h:428
int disk_id_
Id of disk containing file_;.
Definition: disk-io-mgr.h:286
BufferDescriptor * GetBufferDesc(RequestContext *reader, ScanRange *range, char *buffer, int64_t buffer_size)
Definition: disk-io-mgr.cc:610
ObjectPool pool_
Pool to allocate BufferDescriptors.
Definition: disk-io-mgr.h:636
Status ValidateScanRange(ScanRange *range)
Validates that range is correctly initialized.
Definition: disk-io-mgr.cc:444
int max_read_buffer_size() const
Returns the maximum read buffer size.
Definition: disk-io-mgr.h:590
bool GetNextRequestRange(DiskQueue *disk_queue, RequestRange **range, RequestContext **request_context)
Definition: disk-io-mgr.cc:730
std::vector< std::list< char * > > free_buffers_
Definition: disk-io-mgr.h:687
int bytes_read_
Number of bytes read so far for this scan range.
Definition: disk-io-mgr.h:408
struct hadoopRzBuffer * cached_buffer_
Definition: disk-io-mgr.h:401
int64_t MaxReadChunkSize() const
Maximum length in bytes for hdfsRead() calls.
std::string DebugString() const
return a descriptive string for debug.
int num_unstarted_ranges(RequestContext *reader) const
Returns the number of unstarted scan ranges for this reader.
Definition: disk-io-mgr.cc:416
void WorkLoop(DiskQueue *queue)
Definition: disk-io-mgr.cc:920
Status ReadFromCache(bool *read_succeeded)
void Cancel(const Status &status)
RuntimeProfile::Counter read_timer_
Total time spent in hdfs reading.
Definition: disk-io-mgr.h:665
Status context_status(RequestContext *context) const
Definition: disk-io-mgr.cc:411
int num_buffers_in_readers() const
Returns the number of buffers currently owned by all readers.
Definition: disk-io-mgr.h:608
void Close()
Closes the file for this range. This function only modifies state in this range.
int64_t mtime_
Last modified time of the file associated with the scan range.
Definition: disk-io-mgr.h:448