Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
impala::DiskIoMgr Class Reference

#include <disk-io-mgr.h>

Collaboration diagram for impala::DiskIoMgr:

Classes

class  BufferDescriptor
 
struct  DiskQueue
 Per disk state. More...
 
class  RequestContext
 
class  RequestContextCache
 
class  RequestRange
 
struct  RequestType
 The request type, read or write associated with a request range. More...
 
class  ScanRange
 
class  WriteRange
 

Public Types

enum  { REMOTE_S3_DISK_OFFSET = 0, REMOTE_NUM_DISKS }
 

Public Member Functions

 DiskIoMgr (int num_disks, int threads_per_disk, int min_buffer_size, int max_buffer_size)
 
 DiskIoMgr ()
 Create DiskIoMgr with default configs. More...
 
 ~DiskIoMgr ()
 
Status Init (MemTracker *process_mem_tracker)
 Initialize the IoMgr. Must be called once before any of the other APIs. More...
 
Status RegisterContext (RequestContext **request_context, MemTracker *reader_mem_tracker=NULL)
 
void UnregisterContext (RequestContext *context)
 
void CancelContext (RequestContext *context, bool wait_for_disks_completion=false)
 
Status AddScanRanges (RequestContext *reader, const std::vector< ScanRange * > &ranges, bool schedule_immediately=false)
 
Status AddWriteRange (RequestContext *writer, WriteRange *write_range)
 
Status GetNextRange (RequestContext *reader, ScanRange **range)
 
Status Read (RequestContext *reader, ScanRange *range, BufferDescriptor **buffer)
 
int AssignQueue (const char *file, int disk_id, bool expected_local)
 
Status context_status (RequestContext *context) const
 
int num_unstarted_ranges (RequestContext *reader) const
 Returns the number of unstarted scan ranges for this reader. More...
 
void set_bytes_read_counter (RequestContext *, RuntimeProfile::Counter *)
 
void set_read_timer (RequestContext *, RuntimeProfile::Counter *)
 
void set_active_read_thread_counter (RequestContext *, RuntimeProfile::Counter *)
 
void set_disks_access_bitmap (RequestContext *, RuntimeProfile::Counter *)
 
int64_t queue_size (RequestContext *reader) const
 
int64_t bytes_read_local (RequestContext *reader) const
 
int64_t bytes_read_short_circuit (RequestContext *reader) const
 
int64_t bytes_read_dn_cache (RequestContext *reader) const
 
int num_remote_ranges (RequestContext *reader) const
 
int64_t unexpected_remote_bytes (RequestContext *reader) const
 
int64_t GetReadThroughput ()
 
int max_read_buffer_size () const
 Returns the maximum read buffer size. More...
 
int num_total_disks () const
 Returns the total number of disk queues (both local and remote). More...
 
int num_remote_disks () const
 Returns the total number of remote "disk" queues. More...
 
int num_local_disks () const
 Returns the number of local disks attached to the system. More...
 
int RemoteS3DiskId () const
 The disk ID (and therefore disk_queues_ index) used for S3 accesses. More...
 
int num_allocated_buffers () const
 Returns the number of allocated buffers. More...
 
int num_buffers_in_readers () const
 Returns the number of buffers currently owned by all readers. More...
 
std::string DebugString ()
 Dumps the disk IoMgr queues (for readers and disks) More...
 
bool Validate () const
 

Static Public Attributes

static const int DEFAULT_QUEUE_CAPACITY = 2
 

Private Member Functions

int free_buffers_idx (int64_t buffer_size)
 Returns the index into free_buffers_ for a given buffer size. More...
 
BufferDescriptorGetBufferDesc (RequestContext *reader, ScanRange *range, char *buffer, int64_t buffer_size)
 
void ReturnBufferDesc (BufferDescriptor *desc)
 Returns a buffer desc object which can now be used for another reader. More...
 
void ReturnBuffer (BufferDescriptor *buffer)
 
char * GetFreeBuffer (int64_t *buffer_size)
 
void GcIoBuffers ()
 
void ReturnFreeBuffer (char *buffer, int64_t buffer_size)
 
void ReturnFreeBuffer (BufferDescriptor *desc)
 
void WorkLoop (DiskQueue *queue)
 
bool GetNextRequestRange (DiskQueue *disk_queue, RequestRange **range, RequestContext **request_context)
 
void HandleReadFinished (DiskQueue *, RequestContext *, BufferDescriptor *)
 
void HandleWriteFinished (RequestContext *writer, WriteRange *write_range, const Status &write_status)
 
Status ValidateScanRange (ScanRange *range)
 Validates that range is correctly initialized. More...
 
void Write (RequestContext *writer_context, WriteRange *write_range)
 
Status WriteRangeHelper (FILE *file_handle, WriteRange *write_range)
 
void ReadRange (DiskQueue *disk_queue, RequestContext *reader, ScanRange *range)
 Reads the specified scan range and calls HandleReadFinished when done. More...
 

Private Attributes

ObjectPool pool_
 Pool to allocate BufferDescriptors. More...
 
MemTrackerprocess_mem_tracker_
 Process memory tracker; needed to account for io buffers. More...
 
const int num_threads_per_disk_
 
const int max_buffer_size_
 Maximum read size. This is also the maximum size of each allocated buffer. More...
 
const int min_buffer_size_
 The minimum size of each read buffer. More...
 
ThreadGroup disk_thread_group_
 Thread group containing all the worker threads. More...
 
struct hadoopRzOptions * cached_read_options_
 Options object for cached hdfs reads. Set on startup and never modified. More...
 
volatile bool shut_down_
 
RuntimeProfile::Counter total_bytes_read_counter_
 Total bytes read by the IoMgr. More...
 
RuntimeProfile::Counter read_timer_
 Total time spent in hdfs reading. More...
 
boost::scoped_ptr
< RequestContextCache
request_context_cache_
 
boost::mutex free_buffers_lock_
 Protects free_buffers_ and free_buffer_descs_. More...
 
std::vector< std::list< char * > > free_buffers_
 
std::list< BufferDescriptor * > free_buffer_descs_
 List of free buffer desc objects that can be handed out to clients. More...
 
AtomicInt< int > num_allocated_buffers_
 Total number of allocated buffers, used for debugging. More...
 
AtomicInt< int > num_buffers_in_readers_
 Total number of buffers in readers. More...
 
std::vector< DiskQueue * > disk_queues_
 

Friends

class BufferDescriptor
 
class DiskIoMgrTest_Buffers_Test
 

Detailed Description

Manager object that schedules IO for all queries on all disks and remote filesystems (such as S3). Each query maps to one or more RequestContext objects, each of which has its own queue of scan ranges and/or write ranges. The API splits up requesting scan/write ranges (non-blocking) and reading the data (blocking). The DiskIoMgr has worker threads that will read from and write to disk/hdfs/remote-filesystems, allowing interleaving of IO and CPU. This allows us to keep all disks and all cores as busy as possible. All public APIs are thread-safe. It is not valid to call any of the APIs after UnregisterContext() returns. For Readers: We can model this problem as a multiple producer (threads for each disk), multiple consumer (scan ranges) problem. There are multiple queues that need to be synchronized. Conceptually, there are two queues:

  1. The per disk queue: this contains a queue of readers that need reads.
  2. The per scan range ready-buffer queue: this contains buffers that have been read and are ready for the caller. The disk queue contains a queue of readers and is scheduled in a round robin fashion. Readers map to scan nodes. The reader then contains a queue of scan ranges. The caller asks the IoMgr for the next range to process. The IoMgr then selects the best range to read based on disk activity and begins reading and queuing buffers for that range. TODO: We should map readers to queries. A reader is the unit of scheduling and queries that have multiple scan nodes shouldn't have more 'turns'. For Writers: Data is written via AddWriteRange(). This is non-blocking and adds a WriteRange to a per-disk queue. After the write is complete, a callback in WriteRange is invoked. No memory is allocated within IoMgr for writes and no copies are made. It is the responsibility of the client to ensure that the data to be written is valid and that the file to be written to exists until the callback is invoked. The IoMgr provides three key APIs.

AddScanRanges: this is non-blocking and tells the IoMgr all the ranges that will eventually need to be read.

  1. GetNextRange: returns to the caller the next scan range it should process. This is based on disk load. This also begins reading the data in this scan range. This is blocking.
  2. ScanRange::GetNext: returns the next buffer for this range. This is blocking. The disk threads do not synchronize with each other. The readers and writers don't synchronize with each other. There is a lock and condition variable for each request context queue and each disk queue. IMPORTANT: whenever both locks are needed, the lock order is to grab the context lock before the disk lock. Scheduling: If there are multiple request contexts with work for a single disk, the request contexts are scheduled in round-robin order. Multiple disk threads can operate on the same request context. Exactly one request range is processed by a disk thread at a time. If there are multiple scan ranges scheduled via GetNextRange() for a single context, these are processed in round-robin order. If there are multiple scan and write ranges for a disk, a read is always followed by a write, and a write is followed by a read, i.e. reads and writes alternate. If multiple write ranges are enqueued for a single disk, they will be processed by the disk threads in order, but may complete in any order. No guarantees are made on ordering of writes across disks. Resource Management: effective resource management in the IoMgr is key to good performance. The IoMgr helps coordinate two resources: CPU and disk. For CPU, spinning up too many threads causes thrashing. Memory usage in the IoMgr comes from queued read buffers. If we queue the minimum (i.e. 1), then the disks are idle while we are processing the buffer. If we don't limit the queue, then it possible we end up queueing the entire data set (i.e. CPU is slower than disks) and run out of memory. For both CPU and memory, we want to model the machine as having a fixed amount of resources. If a single query is running, it should saturate either CPU or Disk as well as using as little memory as possible. With multiple queries, each query should get less CPU, therefore need fewer queued buffers and therefore less memory usage. The IoMgr defers CPU management to the caller. The IoMgr provides a GetNextRange API which will return the next scan range the caller should process. The caller can call this from the desired number of reading threads. Once a scan range has been returned via GetNextRange, the IoMgr will start to buffer reads for that range and it is expected the caller will pull those buffers promptly. For example, if the caller would like to have 1 scanner thread, the read loop would look like: while (more_ranges) range = GetNextRange() while (!range.eosr) buffer = range.GetNext() To have multiple reading threads, the caller would simply spin up the threads and each would process the loops above. To control the number of IO buffers, each scan range has a soft max capacity for the number of queued buffers. If the number of buffers is at capacity, the IoMgr will no longer read for that scan range until the caller has processed a buffer. This capacity does not need to be fixed, and the caller can dynamically adjust it if necessary. As an example: If we allowed 5 buffers per range on a 24 core, 72 thread (we default to allowing 3x threads) machine, we should see at most 72 * 5 * 8MB = 2.8GB in io buffers memory usage. This should remain roughly constant regardless of how many concurrent readers are running. Buffer Management: Buffers are allocated by the IoMgr as necessary to service reads. These buffers are directly returned to the caller. The caller must call Return() on the buffer when it is done, at which point the buffer will be recycled for another read. In error cases, the IoMgr will recycle the buffers more promptly but regardless, the caller must always call Return() Caching support: Scan ranges contain metadata on whether or not it is cached on the DN. In that case, we use the HDFS APIs to read the cached data without doing any copies. For these ranges, the reads happen on the caller thread (as opposed to the disk threads). It is possible for the cached read APIs to fail, in which case the ranges are then queued on the disk threads and behave identically to the case where the range is not cached. Resources for these ranges are also not accounted against the reader because none are consumed. While a cached block is being processed, the block is mlocked. We want to minimize the time the mlock is held.
    • HDFS will time us out if we hold onto the mlock for too long
    • Holding the lock prevents uncaching this file due to a caching policy change. Therefore, we only issue the cached read when the caller is ready to process the range (GetNextRange()) instead of when the ranges are issued. This guarantees that there will be a CPU available to process the buffer and any throttling we do with the number of scanner threads properly controls the amount of files we mlock. With cached scan ranges, we cannot close the scan range until the cached buffer is returned (HDFS does not allow this). We therefore need to defer the close until the cached buffer is returned (BufferDescriptor::Return()). Remote filesystem support (e.g. S3): Remote filesystems are modeled as "remote disks". That is, there is a seperate disk queue for each supported remote filesystem type. In order to maximize throughput, multiple connections are opened in parallel by having multiple threads running per queue. Also note that reading from a remote filesystem service can be more CPU intensive than local disk/hdfs because of non-direct I/O and SSL processing, and can be CPU bottlenecked especially if not enough I/O threads for these queues are started. TODO: IoMgr should be able to request additional scan ranges from the coordinator to help deal with stragglers. TODO: look into using a lock free queue TODO: simplify the common path (less locking, memory allocations). TODO: Break this up the .h/.cc into multiple files under an /io subdirectory. Structure of the Implementation:

Definition at line 188 of file disk-io-mgr.h.

Member Enumeration Documentation

anonymous enum

"Disk" queue offsets for remote accesses. Offset 0 corresponds to disk ID (i.e. disk_queue_ index) of num_local_disks().

Enumerator
REMOTE_S3_DISK_OFFSET 
REMOTE_NUM_DISKS 

Definition at line 623 of file disk-io-mgr.h.

Constructor & Destructor Documentation

DiskIoMgr::DiskIoMgr ( int  num_disks,
int  threads_per_disk,
int  min_buffer_size,
int  max_buffer_size 
)

Create a DiskIoMgr object.

  • num_disks: The number of disks the IoMgr should use. This is used for testing. Specify 0, to have the disk IoMgr query the os for the number of disks.
  • threads_per_disk: number of read threads to create per disk. This is also the max queue depth.
  • min_buffer_size: minimum io buffer size (in bytes)
  • max_buffer_size: maximum io buffer size (in bytes). Also the max read size.

Definition at line 234 of file disk-io-mgr.cc.

References impala::BitUtil::Ceil(), CheckSseSupport(), disk_queues_, free_buffers_, impala::BitUtil::Log2(), max_buffer_size_, min_buffer_size_, impala::DiskInfo::num_disks(), and REMOTE_NUM_DISKS.

DiskIoMgr::~DiskIoMgr ( )

Clean up all threads and resources. This is mostly useful for testing since for impalad, this object is never destroyed.

Definition at line 250 of file disk-io-mgr.cc.

References cached_read_options_, DebugString(), disk_queues_, disk_thread_group_, free_buffers_, GcIoBuffers(), gen_ir_descriptions::idx, impala::ThreadGroup::JoinAll(), num_allocated_buffers_, num_buffers_in_readers_, request_context_cache_, and shut_down_.

Member Function Documentation

Status DiskIoMgr::AddScanRanges ( RequestContext reader,
const std::vector< ScanRange * > &  ranges,
bool  schedule_immediately = false 
)

Adds the scan ranges to the queues. This call is non-blocking. The caller must not deallocate the scan range pointers before UnregisterContext(). If schedule_immediately, the ranges are immediately put on the read queue (i.e. the caller should not/cannot call GetNextRange for these ranges). This can be used to do synchronous reads as well as schedule dependent ranges, as in the case for columnar formats.

Definition at line 455 of file disk-io-mgr.cc.

References impala::DiskIoMgr::RequestContext::AddRequestRange(), impala::DiskIoMgr::RequestContext::cached_ranges_, impala::DiskIoMgr::RequestContext::Cancelled, impala::DiskIoMgr::RequestContext::DebugString(), impala::DiskIoMgr::RequestContext::lock_, impala::Status::OK, impala::Status::ok(), impala::DiskIoMgr::ScanRange::ReadFromCache(), RETURN_IF_ERROR, impala::DiskIoMgr::RequestContext::state_, impala::DiskIoMgr::RequestContext::status_, impala::DiskIoMgr::ScanRange::try_cache_, impala::DiskIoMgr::RequestContext::Validate(), and ValidateScanRange().

Referenced by impala::HdfsScanNode::AddDiskIoRanges(), impala::HdfsParquetScanner::InitColumns(), impala::BufferedBlockMgr::PinBlock(), Read(), and impala::TEST_F().

int DiskIoMgr::AssignQueue ( const char *  file,
int  disk_id,
bool  expected_local 
)

Determine which disk queue this file should be assigned to. Returns an index into disk_queues_. The disk_id is the volume ID for the local disk that holds the files, or -1 if unknown. Flag expected_local is true iff this impalad is co-located with the datanode for this file.

Definition at line 1116 of file disk-io-mgr.cc.

References impala::IsS3APath(), num_local_disks(), and RemoteS3DiskId().

Referenced by impala::HdfsScanNode::AllocateScanRange().

int64_t DiskIoMgr::bytes_read_dn_cache ( RequestContext reader) const
int64_t DiskIoMgr::bytes_read_local ( RequestContext reader) const
int64_t DiskIoMgr::bytes_read_short_circuit ( RequestContext reader) const
void DiskIoMgr::CancelContext ( RequestContext context,
bool  wait_for_disks_completion = false 
)

This function cancels the context asychronously. All outstanding requests are aborted and tracking structures cleaned up. This does not need to be called if the context finishes normally. This will also fail any outstanding GetNext()/Read requests. If wait_for_disks_completion is true, wait for the number of active disks for this context to reach 0. After calling with wait_for_disks_completion = true, the only valid API is returning IO buffers that have already been returned. Takes context->lock_ if wait_for_disks_completion is true.

Definition at line 377 of file disk-io-mgr.cc.

References impala::DiskIoMgr::RequestContext::Cancel(), impala::Status::CANCELLED, impala::DiskIoMgr::RequestContext::DebugString(), impala::DiskIoMgr::RequestContext::disks_complete_cond_var_, impala::DiskIoMgr::RequestContext::lock_, impala::DiskIoMgr::RequestContext::num_disks_with_ranges_, and impala::DiskIoMgr::RequestContext::Validate().

Referenced by impala::BufferedBlockMgr::Cancel(), impala::HdfsScanNode::Close(), impala::HdfsScanNode::SetDone(), impala::TEST_F(), and UnregisterContext().

Status DiskIoMgr::context_status ( RequestContext context) const

TODO: The functions below can be moved to RequestContext. Returns the current status of the context.

Definition at line 411 of file disk-io-mgr.cc.

References impala::DiskIoMgr::RequestContext::lock_, and impala::DiskIoMgr::RequestContext::status_.

Referenced by impala::TEST_F().

string DiskIoMgr::DebugString ( )

Dumps the disk IoMgr queues (for readers and disks)

Definition at line 142 of file disk-io-mgr.cc.

Referenced by impala::DiskIoMgr::ScanRange::Cancel(), impala::DiskIoMgr::ScanRange::GetNext(), impala::DiskIoMgr::ScanRange::InitInternal(), and ~DiskIoMgr().

int DiskIoMgr::free_buffers_idx ( int64_t  buffer_size)
private

Returns the index into free_buffers_ for a given buffer size.

Definition at line 1095 of file disk-io-mgr.cc.

References impala::BitUtil::Ceil(), free_buffers_, gen_ir_descriptions::idx, impala::BitUtil::Log2(), and min_buffer_size_.

Referenced by GetFreeBuffer(), and ReturnFreeBuffer().

void DiskIoMgr::GcIoBuffers ( )
private

Garbage collect all unused io buffers. This is currently only triggered when the process wide limit is hit. This is not good enough. While it is sufficient for the IoMgr, other components do not trigger this GC. TODO: make this run periodically?

Definition at line 661 of file disk-io-mgr.cc.

References free_buffers_, free_buffers_lock_, gen_ir_descriptions::idx, impala::ImpaladMetrics::IO_MGR_NUM_BUFFERS, impala::ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS, impala::ImpaladMetrics::IO_MGR_TOTAL_BYTES, min_buffer_size_, num_allocated_buffers_, process_mem_tracker_, and impala::MemTracker::Release().

Referenced by Init(), ReadRange(), impala::TEST_F(), and ~DiskIoMgr().

DiskIoMgr::BufferDescriptor * DiskIoMgr::GetBufferDesc ( RequestContext reader,
ScanRange range,
char *  buffer,
int64_t  buffer_size 
)
private

Gets a buffer description object, initialized for this reader, allocating one as necessary. buffer_size / min_buffer_size_ should be a power of 2, and buffer_size should be <= max_buffer_size_. These constraints will be met if buffer was acquired via GetFreeBuffer() (which it should have been).

Definition at line 610 of file disk-io-mgr.cc.

References impala::ObjectPool::Add(), BufferDescriptor, free_buffer_descs_, free_buffers_lock_, impala::DiskIoMgr::RequestContext::mem_tracker_, pool_, impala::DiskIoMgr::BufferDescriptor::Reset(), and impala::DiskIoMgr::BufferDescriptor::SetMemTracker().

Referenced by ReadRange().

char * DiskIoMgr::GetFreeBuffer ( int64_t *  buffer_size)
private

Returns a buffer to read into with size between *buffer_size and max_buffer_size_, and *buffer_size is set to the size of the buffer. If there is an appropriately-sized free buffer in the 'free_buffers_', that is returned, otherwise a new one is allocated. *buffer_size must be between 0 and max_buffer_size_.

Definition at line 627 of file disk-io-mgr.cc.

References impala::MemTracker::Consume(), free_buffers_, free_buffers_idx(), free_buffers_lock_, gen_ir_descriptions::idx, impala::ImpaladMetrics::IO_MGR_NUM_BUFFERS, impala::ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS, impala::ImpaladMetrics::IO_MGR_TOTAL_BYTES, max_buffer_size_, min_buffer_size_, num_allocated_buffers_, and process_mem_tracker_.

Referenced by ReadRange(), and impala::TEST_F().

bool DiskIoMgr::GetNextRequestRange ( DiskQueue disk_queue,
RequestRange **  range,
RequestContext **  request_context 
)
private

This is called from the disk thread to get the next range to process. It will wait until a scan range and buffer are available, or a write range is available. This functions returns the range to process. Only returns false if the disk thread should be shut down. No locks should be taken before this function call and none are left taken after.

Definition at line 730 of file disk-io-mgr.cc.

References impala::DiskIoMgr::RequestContext::Active, impala::DiskIoMgr::RequestContext::Cancelled, impala::DiskIoMgr::RequestContext::PerDiskState::DecrementRequestThread(), impala::DiskIoMgr::RequestContext::PerDiskState::DecrementRequestThreadAndCheckDone(), impala::DiskIoMgr::DiskQueue::disk_id, impala::DiskIoMgr::RequestContext::PerDiskState::in_flight_ranges(), impala::DiskIoMgr::RequestContext::PerDiskState::IncrementRequestThreadAndDequeue(), impala::MemTracker::LimitExceeded(), impala::DiskIoMgr::DiskQueue::lock, impala::Status::MEM_LIMIT_EXCEEDED, impala::DiskIoMgr::RequestContext::PerDiskState::next_scan_range_to_start(), impala::DiskIoMgr::RequestContext::PerDiskState::num_remaining_ranges(), process_mem_tracker_, impala::DiskIoMgr::DiskQueue::request_contexts, impala::DiskIoMgr::RequestContext::PerDiskState::ScheduleContext(), impala::DiskIoMgr::RequestContext::PerDiskState::set_next_scan_range_to_start(), shut_down_, impala::DiskIoMgr::RequestContext::PerDiskState::unstarted_scan_ranges(), impala::DiskIoMgr::RequestContext::PerDiskState::unstarted_write_ranges(), VLOG_FILE, and impala::DiskIoMgr::DiskQueue::work_available.

Referenced by WorkLoop().

int64_t DiskIoMgr::GetReadThroughput ( )

Returns the read throughput across all readers. TODO: should this be a sliding window? This should report metrics for the last minute, hour and since the beginning.

Definition at line 440 of file disk-io-mgr.cc.

References read_timer_, total_bytes_read_counter_, and impala::RuntimeProfile::UnitsPerSecond().

void DiskIoMgr::HandleReadFinished ( DiskQueue disk_queue,
RequestContext reader,
BufferDescriptor buffer 
)
private
void DiskIoMgr::HandleWriteFinished ( RequestContext writer,
WriteRange write_range,
const Status write_status 
)
private

Invokes write_range->callback_ after the range has been written and updates per-disk state and handle state. The status of the write OK/RUNTIME_ERROR etc. is passed via write_status and to the callback. The write_status does not affect the writer->status_. That is, an write error does not cancel the writer context - that decision is left to the callback handler. TODO: On the read path, consider not canceling the reader context on error.

Definition at line 845 of file disk-io-mgr.cc.

References impala::DiskIoMgr::WriteRange::callback_, impala::DiskIoMgr::RequestContext::Cancelled, impala::DiskIoMgr::RequestContext::DebugString(), impala::DiskIoMgr::RequestContext::PerDiskState::DecrementRequestThread(), impala::DiskIoMgr::RequestContext::PerDiskState::DecrementRequestThreadAndCheckDone(), impala::DiskIoMgr::RequestRange::disk_id_, impala::DiskIoMgr::RequestContext::disk_states_, impala::DiskIoMgr::RequestContext::lock_, impala::DiskIoMgr::RequestContext::PerDiskState::num_remaining_ranges(), impala::DiskIoMgr::RequestContext::state_, and impala::DiskIoMgr::RequestContext::Validate().

Referenced by Write().

int impala::DiskIoMgr::max_read_buffer_size ( ) const
inline

Returns the maximum read buffer size.

Definition at line 590 of file disk-io-mgr.h.

References max_buffer_size_.

Referenced by impala::HdfsParquetScanner::ProcessFooter(), and impala::BaseSequenceScanner::ReadPastSize().

int impala::DiskIoMgr::num_allocated_buffers ( ) const
inline

Returns the number of allocated buffers.

Definition at line 605 of file disk-io-mgr.h.

References num_allocated_buffers_.

int impala::DiskIoMgr::num_buffers_in_readers ( ) const
inline

Returns the number of buffers currently owned by all readers.

Definition at line 608 of file disk-io-mgr.h.

References num_buffers_in_readers_.

int impala::DiskIoMgr::num_local_disks ( ) const
inline

Returns the number of local disks attached to the system.

Definition at line 599 of file disk-io-mgr.h.

References num_remote_disks(), and num_total_disks().

Referenced by AssignQueue(), DiskIoMgr(), RemoteS3DiskId(), and impala::BufferedBlockMgr::WriteUnpinnedBlock().

int impala::DiskIoMgr::num_remote_disks ( ) const
inline

Returns the total number of remote "disk" queues.

Definition at line 596 of file disk-io-mgr.h.

References REMOTE_NUM_DISKS.

Referenced by num_local_disks().

int DiskIoMgr::num_remote_ranges ( RequestContext reader) const
int impala::DiskIoMgr::num_total_disks ( ) const
inline

Returns the total number of disk queues (both local and remote).

Definition at line 593 of file disk-io-mgr.h.

References disk_queues_.

Referenced by num_local_disks(), and impala::HdfsScanNode::Open().

int DiskIoMgr::num_unstarted_ranges ( RequestContext reader) const

Returns the number of unstarted scan ranges for this reader.

Definition at line 416 of file disk-io-mgr.cc.

References impala::DiskIoMgr::RequestContext::num_unstarted_scan_ranges_.

int64_t DiskIoMgr::queue_size ( RequestContext reader) const
Status DiskIoMgr::Read ( RequestContext reader,
ScanRange range,
BufferDescriptor **  buffer 
)

Reads the range and returns the result in buffer. This behaves like the typical synchronous read() api, blocking until the data is read. This can be called while there are outstanding ScanRanges and is thread safe. Multiple threads can be calling Read() per reader at a time. range cannot have already been added via AddScanRanges.

Definition at line 555 of file disk-io-mgr.cc.

References AddScanRanges(), impala::DiskIoMgr::ScanRange::GetNext(), impala::DiskIoMgr::RequestRange::len(), max_buffer_size_, impala::Status::OK, and RETURN_IF_ERROR.

Referenced by impala::HdfsParquetScanner::ProcessFooter(), and impala::DiskIoMgrTest::ValidateSyncRead().

void DiskIoMgr::ReadRange ( DiskQueue disk_queue,
RequestContext reader,
ScanRange range 
)
private

Reads the specified scan range and calls HandleReadFinished when done.

Definition at line 954 of file disk-io-mgr.cc.

References impala::DiskIoMgr::RequestContext::active_read_thread_counter_, impala::RuntimeProfile::Counter::Add(), impala::RuntimeProfile::Counter::BitOr(), impala::DiskIoMgr::ScanRange::blocked_on_queue_, impala::DiskIoMgr::RequestContext::blocked_ranges_, impala::DiskIoMgr::ScanRange::bytes_read_, impala::DiskIoMgr::RequestContext::bytes_read_counter_, impala::DiskIoMgr::ScanRange::Cancel(), impala::DiskIoMgr::RequestContext::Cancelled, COUNTER_ADD, impala::DiskIoMgr::RequestContext::DebugString(), impala::DiskIoMgr::RequestContext::PerDiskState::DecrementRequestThread(), impala::DiskIoMgr::RequestContext::PerDiskState::DecrementRequestThreadAndCheckDone(), impala::DiskIoMgr::DiskQueue::disk_id, impala::DiskIoMgr::RequestContext::disk_states_, impala::DiskIoMgr::RequestContext::disks_accessed_bitmap_, impala::DiskIoMgr::BufferDescriptor::eosr_, GcIoBuffers(), GetBufferDesc(), GetFreeBuffer(), HandleReadFinished(), impala::DiskIoMgr::BufferDescriptor::len_, impala::DiskIoMgr::RequestRange::len_, impala::DiskIoMgr::RequestContext::lock_, LOW_MEMORY, max_buffer_size_, impala::DiskIoMgr::RequestContext::mem_tracker_, impala::DiskIoMgr::RequestContext::num_used_buffers_, impala::Status::ok(), impala::DiskIoMgr::ScanRange::Open(), impala::DiskIoMgr::ScanRange::Read(), impala::DiskIoMgr::RequestContext::read_timer_, read_timer_, impala::DiskIoMgr::ScanRange::ready_buffers_, impala::DiskIoMgr::BufferDescriptor::scan_range_offset_, SCOPED_TIMER, impala::MemTracker::SpareCapacity(), impala::DiskIoMgr::RequestContext::state_, impala::DiskIoMgr::BufferDescriptor::status_, impala::DiskIoMgr::RequestContext::status_, total_bytes_read_counter_, and impala::DiskIoMgr::RequestContext::Validate().

Referenced by WorkLoop().

Status DiskIoMgr::RegisterContext ( RequestContext **  request_context,
MemTracker reader_mem_tracker = NULL 
)

Allocates tracking structure for a request context. Register a new request context which is returned in *request_context. The IoMgr owns the allocated RequestContext object. The caller must call UnregisterContext() for each context. reader_mem_tracker: Is non-null only for readers. IO buffers used for this reader will be tracked by this. If the limit is exceeded the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned via GetNext().

Definition at line 336 of file disk-io-mgr.cc.

References impala::Status::OK, and request_context_cache_.

Referenced by impala::BufferedBlockMgr::Init(), impala::HdfsScanNode::Open(), and impala::TEST_F().

int impala::DiskIoMgr::RemoteS3DiskId ( ) const
inline

The disk ID (and therefore disk_queues_ index) used for S3 accesses.

Definition at line 602 of file disk-io-mgr.h.

References num_local_disks(), and REMOTE_S3_DISK_OFFSET.

Referenced by AssignQueue(), and Init().

void DiskIoMgr::ReturnBufferDesc ( BufferDescriptor desc)
private

Returns a buffer desc object which can now be used for another reader.

Definition at line 602 of file disk-io-mgr.cc.

References free_buffer_descs_, and free_buffers_lock_.

Referenced by ReturnBuffer().

void DiskIoMgr::ReturnFreeBuffer ( char *  buffer,
int64_t  buffer_size 
)
private

Returns a buffer to the free list. buffer_size / min_buffer_size_ should be a power of 2, and buffer_size should be <= max_buffer_size_. These constraints will be met if buffer was acquired via GetFreeBuffer() (which it should have been).

Definition at line 696 of file disk-io-mgr.cc.

References impala::BitUtil::Ceil(), free_buffers_, free_buffers_idx(), free_buffers_lock_, gen_ir_descriptions::idx, impala::ImpaladMetrics::IO_MGR_NUM_BUFFERS, impala::ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS, impala::ImpaladMetrics::IO_MGR_TOTAL_BYTES, min_buffer_size_, num_allocated_buffers_, process_mem_tracker_, and impala::MemTracker::Release().

Referenced by HandleReadFinished(), ReturnBuffer(), ReturnFreeBuffer(), and impala::TEST_F().

void DiskIoMgr::ReturnFreeBuffer ( BufferDescriptor desc)
private

Returns the buffer in desc (cannot be NULL), sets buffer to NULL and clears the mem tracker.

Definition at line 690 of file disk-io-mgr.cc.

References impala::DiskIoMgr::BufferDescriptor::buffer_, impala::DiskIoMgr::BufferDescriptor::buffer_len_, ReturnFreeBuffer(), and impala::DiskIoMgr::BufferDescriptor::SetMemTracker().

void DiskIoMgr::set_active_read_thread_counter ( RequestContext r,
RuntimeProfile::Counter c 
)
void DiskIoMgr::set_bytes_read_counter ( RequestContext r,
RuntimeProfile::Counter c 
)
void DiskIoMgr::set_disks_access_bitmap ( RequestContext r,
RuntimeProfile::Counter c 
)
void DiskIoMgr::set_read_timer ( RequestContext r,
RuntimeProfile::Counter c 
)
int64_t DiskIoMgr::unexpected_remote_bytes ( RequestContext reader) const
void DiskIoMgr::UnregisterContext ( RequestContext context)

Unregisters context from the disk IoMgr. This must be called for every RegisterContext() regardless of cancellation and must be called in the same thread as GetNext() The 'context' cannot be used after this call. This call blocks until all the disk threads have finished cleaning up. UnregisterContext also cancels the reader/writer from the disk IoMgr.

Definition at line 344 of file disk-io-mgr.cc.

References CancelContext(), impala::DiskIoMgr::RequestContext::DebugString(), impala::DiskIoMgr::RequestContext::lock_, impala::DiskIoMgr::RequestContext::num_buffers_in_reader_, impala::DiskIoMgr::RequestContext::num_used_buffers_, request_context_cache_, and impala::DiskIoMgr::RequestContext::Validate().

Referenced by impala::TEST_F(), and impala::BufferedBlockMgr::~BufferedBlockMgr().

bool impala::DiskIoMgr::Validate ( ) const

Validates the internal state is consistent. This is intended to only be used for debugging.

Referenced by impala::DiskIoMgr::ScanRange::Cancel(), impala::DiskIoMgr::ScanRange::GetNext(), and impala::DiskIoMgr::ScanRange::InitInternal().

Status DiskIoMgr::ValidateScanRange ( ScanRange range)
private

Validates that range is correctly initialized.

Definition at line 444 of file disk-io-mgr.cc.

References impala::DiskIoMgr::RequestRange::disk_id_, disk_queues_, and impala::Status::OK.

Referenced by AddScanRanges().

void DiskIoMgr::WorkLoop ( DiskQueue queue)
private

Disk worker thread loop. This function retrieves the next range to process on the disk queue and invokes ReadRange() or Write() depending on the type of Range(). There can be multiple threads per disk running this loop.

Definition at line 920 of file disk-io-mgr.cc.

References GetNextRequestRange(), impala::DiskIoMgr::RequestType::READ, ReadRange(), impala::DiskIoMgr::RequestRange::request_type(), shut_down_, impala::DiskIoMgr::RequestType::WRITE, and Write().

Referenced by Init().

void DiskIoMgr::Write ( RequestContext writer_context,
WriteRange write_range 
)
private

Write the specified range to disk and calls HandleWriteFinished when done. Responsible for opening and closing the file that is written.

Definition at line 1041 of file disk-io-mgr.cc.

References impala::DiskIoMgr::RequestRange::file(), impala::DiskIoMgr::RequestRange::file_, impala::GetStrErrMsg(), HandleWriteFinished(), and WriteRangeHelper().

Referenced by WorkLoop().

Status DiskIoMgr::WriteRangeHelper ( FILE *  file_handle,
WriteRange write_range 
)
private

Helper method to write a range using the specified FILE handle. Returns Status:OK if the write succeeded, or a RUNTIME_ERROR with an appropriate message otherwise. Does not open or close the file that is written.

Definition at line 1061 of file disk-io-mgr.cc.

References impala::DiskIoMgr::WriteRange::data_, impala::DiskIoMgr::RequestRange::file_, impala::GetStrErrMsg(), impala::ImpaladMetrics::IO_MGR_BYTES_WRITTEN, impala::DiskIoMgr::RequestRange::len_, impala::DiskIoMgr::RequestRange::offset(), and impala::Status::OK.

Referenced by Write().

Friends And Related Function Documentation

friend class BufferDescriptor
friend

Definition at line 629 of file disk-io-mgr.h.

Referenced by GetBufferDesc().

friend class DiskIoMgrTest_Buffers_Test
friend

Definition at line 631 of file disk-io-mgr.h.

Member Data Documentation

struct hadoopRzOptions* impala::DiskIoMgr::cached_read_options_
private

Options object for cached hdfs reads. Set on startup and never modified.

Definition at line 655 of file disk-io-mgr.h.

Referenced by Init(), and ~DiskIoMgr().

const int DiskIoMgr::DEFAULT_QUEUE_CAPACITY = 2
static

Default ready buffer queue capacity. This constant doesn't matter too much since the system dynamically adjusts.

Definition at line 619 of file disk-io-mgr.h.

Referenced by impala::DiskIoMgr::RequestContext::Reset(), and impala::ExecEnv::StartServices().

std::vector<DiskQueue*> impala::DiskIoMgr::disk_queues_
private

Per disk queues. This is static and created once at Init() time. One queue is allocated for each local disk on the system and for each remote filesystem type. It is indexed by disk id.

Definition at line 701 of file disk-io-mgr.h.

Referenced by DiskIoMgr(), Init(), num_total_disks(), impala::DiskIoMgr::RequestContext::PerDiskState::ScheduleContext(), ValidateScanRange(), and ~DiskIoMgr().

ThreadGroup impala::DiskIoMgr::disk_thread_group_
private

Thread group containing all the worker threads.

Definition at line 652 of file disk-io-mgr.h.

Referenced by Init(), and ~DiskIoMgr().

std::list<BufferDescriptor*> impala::DiskIoMgr::free_buffer_descs_
private

List of free buffer desc objects that can be handed out to clients.

Definition at line 690 of file disk-io-mgr.h.

Referenced by GetBufferDesc(), and ReturnBufferDesc().

std::vector<std::list<char*> > impala::DiskIoMgr::free_buffers_
private

Free buffers that can be handed out to clients. There is one list for each buffer size, indexed by the Log2 of the buffer size in units of min_buffer_size_. The maximum buffer size is max_buffer_size_, so the maximum index is Log2(max_buffer_size_ / min_buffer_size_). E.g. if min_buffer_size_ = 1024 bytes: free_buffers_[0] => list of free buffers with size 1024 B free_buffers_[1] => list of free buffers with size 2048 B free_buffers_[10] => list of free buffers with size 1 MB free_buffers_[13] => list of free buffers with size 8 MB free_buffers_[n] => list of free buffers with size 2^n * 1024 B

Definition at line 687 of file disk-io-mgr.h.

Referenced by DiskIoMgr(), free_buffers_idx(), GcIoBuffers(), GetFreeBuffer(), ReturnFreeBuffer(), and ~DiskIoMgr().

boost::mutex impala::DiskIoMgr::free_buffers_lock_
private

Protects free_buffers_ and free_buffer_descs_.

Definition at line 674 of file disk-io-mgr.h.

Referenced by GcIoBuffers(), GetBufferDesc(), GetFreeBuffer(), ReturnBufferDesc(), and ReturnFreeBuffer().

const int impala::DiskIoMgr::max_buffer_size_
private

Maximum read size. This is also the maximum size of each allocated buffer.

Definition at line 646 of file disk-io-mgr.h.

Referenced by AddWriteRange(), DiskIoMgr(), GetFreeBuffer(), max_read_buffer_size(), Read(), and ReadRange().

const int impala::DiskIoMgr::min_buffer_size_
private

The minimum size of each read buffer.

Definition at line 649 of file disk-io-mgr.h.

Referenced by DiskIoMgr(), free_buffers_idx(), GcIoBuffers(), GetFreeBuffer(), and ReturnFreeBuffer().

AtomicInt<int> impala::DiskIoMgr::num_allocated_buffers_
private

Total number of allocated buffers, used for debugging.

Definition at line 693 of file disk-io-mgr.h.

Referenced by GcIoBuffers(), GetFreeBuffer(), num_allocated_buffers(), ReturnFreeBuffer(), impala::TEST_F(), and ~DiskIoMgr().

AtomicInt<int> impala::DiskIoMgr::num_buffers_in_readers_
private

Total number of buffers in readers.

Definition at line 696 of file disk-io-mgr.h.

Referenced by impala::DiskIoMgr::ScanRange::EnqueueBuffer(), num_buffers_in_readers(), ReturnBuffer(), and ~DiskIoMgr().

const int impala::DiskIoMgr::num_threads_per_disk_
private

Number of worker(read) threads per disk. Also the max depth of queued work to the disk.

Definition at line 643 of file disk-io-mgr.h.

Referenced by Init().

ObjectPool impala::DiskIoMgr::pool_
private

Pool to allocate BufferDescriptors.

Definition at line 636 of file disk-io-mgr.h.

Referenced by GetBufferDesc().

MemTracker* impala::DiskIoMgr::process_mem_tracker_
private

Process memory tracker; needed to account for io buffers.

Definition at line 639 of file disk-io-mgr.h.

Referenced by GcIoBuffers(), GetFreeBuffer(), GetNextRequestRange(), Init(), and ReturnFreeBuffer().

RuntimeProfile::Counter impala::DiskIoMgr::read_timer_
private

Total time spent in hdfs reading.

Definition at line 665 of file disk-io-mgr.h.

Referenced by GetReadThroughput(), ReadRange(), and impala::DiskIoMgr::RequestContext::Reset().

boost::scoped_ptr<RequestContextCache> impala::DiskIoMgr::request_context_cache_
private

Contains all contexts that the IoMgr is tracking. This includes contexts that are active as well as those in the process of being cancelled. This is a cache of context objects that get recycled to minimize object allocations and lock contention.

Definition at line 671 of file disk-io-mgr.h.

Referenced by Init(), RegisterContext(), UnregisterContext(), and ~DiskIoMgr().

volatile bool impala::DiskIoMgr::shut_down_
private

True if the IoMgr should be torn down. Worker threads watch for this to know to terminate. This variable is read/written to by different threads.

Definition at line 659 of file disk-io-mgr.h.

Referenced by GetNextRequestRange(), WorkLoop(), and ~DiskIoMgr().

RuntimeProfile::Counter impala::DiskIoMgr::total_bytes_read_counter_
private

Total bytes read by the IoMgr.

Definition at line 662 of file disk-io-mgr.h.

Referenced by GetReadThroughput(), and ReadRange().


The documentation for this class was generated from the following files: