Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
impala::DiskIoMgr::RequestContext Class Reference

#include <disk-io-mgr-internal.h>

Collaboration diagram for impala::DiskIoMgr::RequestContext:

Classes

class  PerDiskState
 

Public Types

enum  State { Active, Cancelled, Inactive }
 

Public Member Functions

 RequestContext (DiskIoMgr *parent, int num_disks)
 
void Reset (MemTracker *tracker)
 Resets this object. More...
 
void DecrementDiskRefCount ()
 
void ScheduleScanRange (DiskIoMgr::ScanRange *range)
 
void Cancel (const Status &status)
 Cancels the context with status code 'status'. More...
 
void AddRequestRange (DiskIoMgr::RequestRange *range, bool schedule_immediately)
 
int initial_scan_range_queue_capacity () const
 
bool Validate () const
 Validates invariants of reader. Reader lock must be taken beforehand. More...
 
std::string DebugString () const
 Dumps out reader information. Lock should be taken by caller. More...
 

Private Attributes

DiskIoMgrparent_
 Parent object. More...
 
MemTrackermem_tracker_
 Memory used for this reader. This is unowned by this object. More...
 
RuntimeProfile::Counterbytes_read_counter_
 Total bytes read for this reader. More...
 
RuntimeProfile::Counterread_timer_
 Total time spent in hdfs reading. More...
 
RuntimeProfile::Counteractive_read_thread_counter_
 Number of active read threads. More...
 
RuntimeProfile::Counterdisks_accessed_bitmap_
 
AtomicInt< int64_t > bytes_read_local_
 Total number of bytes read locally, updated at end of each range scan. More...
 
AtomicInt< int64_t > bytes_read_short_circuit_
 Total number of bytes read via short circuit read, updated at end of each range scan. More...
 
AtomicInt< int64_t > bytes_read_dn_cache_
 Total number of bytes read from date node cache, updated at end of each range scan. More...
 
AtomicInt< int64_t > unexpected_remote_bytes_
 Total number of bytes from remote reads that were expected to be local. More...
 
AtomicInt< int > num_buffers_in_reader_
 
AtomicInt< int > num_finished_ranges_
 The number of scan ranges that have been completed for this reader. More...
 
AtomicInt< int > num_remote_ranges_
 
AtomicInt< int > num_unstarted_scan_ranges_
 
AtomicInt< int > num_used_buffers_
 
AtomicInt< int > num_ready_buffers_
 
AtomicInt< int > total_range_queue_capacity_
 
int initial_queue_capacity_
 
boost::mutex lock_
 
State state_
 Current state of the reader. More...
 
Status status_
 Status of this reader. Set to non-ok if cancelled. More...
 
int num_disks_with_ranges_
 
InternalQueue< ScanRangecached_ranges_
 
InternalQueue< ScanRangeready_to_start_ranges_
 
boost::condition_variable ready_to_start_ranges_cv_
 
InternalQueue< ScanRangeblocked_ranges_
 Ranges that are blocked due to back pressure on outgoing buffers. More...
 
boost::condition_variable disks_complete_cond_var_
 Condition variable for UnregisterContext() to wait for all disks to complete. More...
 
std::vector< PerDiskStatedisk_states_
 

Friends

class DiskIoMgr
 

Detailed Description

Internal per request-context state. This object maintains a lot of state that is carefully synchronized. The context maintains state across all disks as well as per disk state. The unit for an IO request is a RequestRange, which may be a ScanRange or a WriteRange. A scan range for the reader is on one of five states: 1) PerDiskState's unstarted_ranges: This range has only been queued and nothing has been read from it. 2) RequestContext's ready_to_start_ranges_: This range is about to be started. As soon as the reader picks it up, it will move to the in_flight_ranges queue. 3) PerDiskState's in_flight_ranges: This range is being processed and will be read from the next time a disk thread picks it up in GetNextRequestRange() 4) ScanRange's outgoing ready buffers is full. We can't read for this range anymore. We need the caller to pull a buffer off which will put this in the in_flight_ranges queue. These ranges are in the RequestContext's blocked_ranges_ queue. 5) ScanRange is cached and in the cached_ranges_ queue. If the scan range is read and does not get blocked on the outgoing queue, the transitions are: 1 -> 2 -> 3. If the scan range does get blocked, the transitions are 1 -> 2 -> 3 -> (4 -> 3)* In the case of a cached scan range, the range is immediately put in cached_ranges_. When the caller asks for the next range to process, we first pull ranges from the cache_ranges_ queue. If the range was cached, the range is removed and done (ranges are either entirely cached or not at all). If the cached read attempt fails, we put the range in state 1. A write range for a context may be in one of two lists: 1) unstarted_write_ranges_ : Ranges that have been queued but not processed. 2) in_flight_ranges_: The write range is ready to be processed by the next disk thread that picks it up in GetNextRequestRange(). AddWriteRange() adds WriteRanges for a disk. It is the responsibility of the client to pin the data to be written via a WriteRange in memory. After a WriteRange has been written, a callback is invoked to inform the client that the write has completed. An important assumption is that write does not exceed the maximum read size and that the entire range is written when the write request is handled. (In other words, writes are not broken up.) When a RequestContext is processed by a disk thread in GetNextRequestRange(), a write range is always removed from the list of unstarted write ranges and appended to the in_flight_ranges_ queue. This is done to alternate reads and writes - a read that is scheduled (by calling GetNextRange()) is always followed by a write (if one exists). And since at most one WriteRange can be present in in_flight_ranges_ at any time (once a write range is returned from GetNetxRequestRange() it is completed and not re-enqueued), a scan range scheduled via a call to GetNextRange() can be queued up behind at most one write range.

Definition at line 122 of file disk-io-mgr-internal.h.

Member Enumeration Documentation

Enumerator
Active 

Reader is initialized and maps to a client.

Cancelled 

Reader is in the process of being cancelled. Cancellation is coordinated between different threads and when they are all complete, the reader context is moved to the inactive state.

Inactive 

Reader context does not map to a client. Accessing memory in this context is invalid (i.e. it is equivalent to a dangling pointer).

Definition at line 124 of file disk-io-mgr-internal.h.

Constructor & Destructor Documentation

DiskIoMgr::RequestContext::RequestContext ( DiskIoMgr parent,
int  num_disks 
)

Definition at line 129 of file disk-io-mgr-reader-context.cc.

Member Function Documentation

void impala::DiskIoMgr::RequestContext::DecrementDiskRefCount ( )
inline

Decrements the number of active disks for this reader. If the disk count goes to 0, the disk complete condition variable is signaled. Reader lock must be taken before this call.

Definition at line 146 of file disk-io-mgr-internal.h.

References DebugString(), disks_complete_cond_var_, num_disks_with_ranges_, and Validate().

Referenced by impala::DiskIoMgr::RequestContext::PerDiskState::DecrementRequestThreadAndCheckDone().

int impala::DiskIoMgr::RequestContext::initial_scan_range_queue_capacity ( ) const
inline

Returns the default queue capacity for scan ranges. This is updated as the reader processes ranges.

Definition at line 182 of file disk-io-mgr-internal.h.

References initial_queue_capacity_.

Referenced by impala::DiskIoMgr::ScanRange::InitInternal().

void DiskIoMgr::RequestContext::Reset ( MemTracker tracker)
void impala::DiskIoMgr::RequestContext::ScheduleScanRange ( DiskIoMgr::ScanRange range)
inline

Reader & Disk Scheduling: Readers that currently can't do work are not on the disk's queue. These readers are ones that don't have any ranges in the in_flight_queue AND have not prepared a range by setting next_range_to_start. The rule to make sure readers are scheduled correctly is to ensure anytime a range is put on the in_flight_queue or anytime next_range_to_start is set to NULL, the reader is scheduled. Adds range to in_flight_ranges, scheduling this reader on the disk threads if necessary. Reader lock must be taken before this.

Definition at line 165 of file disk-io-mgr-internal.h.

References Active, impala::DiskIoMgr::RequestRange::disk_id(), disk_states_, impala::DiskIoMgr::RequestContext::PerDiskState::in_flight_ranges(), impala::DiskIoMgr::RequestContext::PerDiskState::ScheduleContext(), and state_.

Referenced by impala::DiskIoMgr::GetNextRange(), and impala::DiskIoMgr::HandleReadFinished().

Friends And Related Function Documentation

friend class DiskIoMgr
friend

Definition at line 191 of file disk-io-mgr-internal.h.

Member Data Documentation

RuntimeProfile::Counter* impala::DiskIoMgr::RequestContext::active_read_thread_counter_
private

Number of active read threads.

Definition at line 207 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::ReadRange(), and impala::DiskIoMgr::set_active_read_thread_counter().

InternalQueue<ScanRange> impala::DiskIoMgr::RequestContext::blocked_ranges_
private

Ranges that are blocked due to back pressure on outgoing buffers.

Definition at line 295 of file disk-io-mgr-internal.h.

Referenced by Cancel(), impala::DiskIoMgr::HandleReadFinished(), and impala::DiskIoMgr::ReadRange().

RuntimeProfile::Counter* impala::DiskIoMgr::RequestContext::bytes_read_counter_
private

Total bytes read for this reader.

Definition at line 201 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::ReadRange(), and impala::DiskIoMgr::set_bytes_read_counter().

AtomicInt<int64_t> impala::DiskIoMgr::RequestContext::bytes_read_dn_cache_
private

Total number of bytes read from date node cache, updated at end of each range scan.

Definition at line 221 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::bytes_read_dn_cache().

AtomicInt<int64_t> impala::DiskIoMgr::RequestContext::bytes_read_local_
private

Total number of bytes read locally, updated at end of each range scan.

Definition at line 215 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::bytes_read_local().

AtomicInt<int64_t> impala::DiskIoMgr::RequestContext::bytes_read_short_circuit_
private

Total number of bytes read via short circuit read, updated at end of each range scan.

Definition at line 218 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::bytes_read_short_circuit().

InternalQueue<ScanRange> impala::DiskIoMgr::RequestContext::cached_ranges_
private

This is the list of ranges that are expected to be cached on the DN. When the reader asks for a new range (GetNextScanRange()), we first return ranges from this list.

Definition at line 282 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::AddScanRanges(), Cancel(), and impala::DiskIoMgr::GetNextRange().

std::vector<PerDiskState> impala::DiskIoMgr::RequestContext::disk_states_
private

Per disk states to synchronize multiple disk threads accessing the same request context.

Definition at line 464 of file disk-io-mgr-internal.h.

Referenced by Cancel(), impala::DiskIoMgr::GetNextRange(), impala::DiskIoMgr::HandleReadFinished(), impala::DiskIoMgr::HandleWriteFinished(), impala::DiskIoMgr::ReadRange(), and ScheduleScanRange().

RuntimeProfile::Counter* impala::DiskIoMgr::RequestContext::disks_accessed_bitmap_
private

Disk access bitmap. The counter's bit[i] is set if disk id i has been accessed. TODO: we can only support up to 64 disks with this bitmap but it lets us use a builtin atomic instruction. Probably good enough for now.

Definition at line 212 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::ReadRange(), and impala::DiskIoMgr::set_disks_access_bitmap().

boost::condition_variable impala::DiskIoMgr::RequestContext::disks_complete_cond_var_
private

Condition variable for UnregisterContext() to wait for all disks to complete.

Definition at line 298 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::CancelContext(), and DecrementDiskRefCount().

int impala::DiskIoMgr::RequestContext::initial_queue_capacity_
private

The initial queue size for new scan ranges. This is always total_range_queue_capacity_ / num_finished_ranges_ but stored as a separate variable to allow reading this value without taking a lock. Doing the division at read time (with no lock) could lead to a race where only total_range_queue_capacity_ or num_finished_ranges_ was updated.

Definition at line 263 of file disk-io-mgr-internal.h.

Referenced by initial_scan_range_queue_capacity().

boost::mutex impala::DiskIoMgr::RequestContext::lock_
private
MemTracker* impala::DiskIoMgr::RequestContext::mem_tracker_
private

Memory used for this reader. This is unowned by this object.

Definition at line 198 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::GetBufferDesc(), and impala::DiskIoMgr::ReadRange().

AtomicInt<int> impala::DiskIoMgr::RequestContext::num_buffers_in_reader_
private

The number of buffers that have been returned to the reader (via GetNext) that the reader has not returned. Only included for debugging and diagnostics.

Definition at line 228 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::ScanRange::EnqueueBuffer(), impala::DiskIoMgr::ReturnBuffer(), and impala::DiskIoMgr::UnregisterContext().

int impala::DiskIoMgr::RequestContext::num_disks_with_ranges_
private

The number of disks with scan ranges remaining (always equal to the sum of disks with ranges).

Definition at line 277 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::CancelContext(), and DecrementDiskRefCount().

AtomicInt<int> impala::DiskIoMgr::RequestContext::num_finished_ranges_
private

The number of scan ranges that have been completed for this reader.

Definition at line 231 of file disk-io-mgr-internal.h.

AtomicInt<int> impala::DiskIoMgr::RequestContext::num_ready_buffers_
private

The total number of ready buffers across all ranges. Ready buffers are buffers that have been read from disk but not retrieved by the caller. This is the sum of all queued buffers in all ranges for this reader context.

Definition at line 249 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::ScanRange::EnqueueBuffer(), and impala::DiskIoMgr::queue_size().

AtomicInt<int> impala::DiskIoMgr::RequestContext::num_remote_ranges_
private

The number of scan ranges that required a remote read, updated at the end of each range scan. Only used for diagnostics.

Definition at line 235 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::num_remote_ranges().

AtomicInt<int> impala::DiskIoMgr::RequestContext::num_unstarted_scan_ranges_
private

The total number of scan ranges that have not been started. Only used for diagnostics. This is the sum of all unstarted_scan_ranges across all disks.

Definition at line 239 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::GetNextRange(), and impala::DiskIoMgr::num_unstarted_ranges().

AtomicInt<int> impala::DiskIoMgr::RequestContext::num_used_buffers_
private

The number of buffers that are being used for this reader. This is the sum of all buffers in ScanRange queues and buffers currently being read into (i.e. about to be queued).

Definition at line 244 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::ScanRange::EnqueueBuffer(), impala::DiskIoMgr::ReadRange(), and impala::DiskIoMgr::UnregisterContext().

DiskIoMgr* impala::DiskIoMgr::RequestContext::parent_
private

Parent object.

Definition at line 192 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::RequestContext::PerDiskState::ScheduleContext().

RuntimeProfile::Counter* impala::DiskIoMgr::RequestContext::read_timer_
private

Total time spent in hdfs reading.

Definition at line 204 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::ReadRange(), and impala::DiskIoMgr::set_read_timer().

InternalQueue<ScanRange> impala::DiskIoMgr::RequestContext::ready_to_start_ranges_
private

A list of ranges that should be returned in subsequent calls to GetNextRange. There is a trade-off with when to populate this list. Populating it on demand means consumers need to wait (happens in DiskIoMgr::GetNextRange()). Populating it preemptively means we make worse scheduling decisions. We currently populate one range per disk. TODO: think about this some more.

Definition at line 291 of file disk-io-mgr-internal.h.

Referenced by Cancel(), and impala::DiskIoMgr::GetNextRange().

boost::condition_variable impala::DiskIoMgr::RequestContext::ready_to_start_ranges_cv_
private

Definition at line 292 of file disk-io-mgr-internal.h.

Referenced by Cancel(), and impala::DiskIoMgr::GetNextRange().

AtomicInt<int> impala::DiskIoMgr::RequestContext::total_range_queue_capacity_
private

The total (sum) of queue capacities for finished scan ranges. This value divided by num_finished_ranges_ is the average for finished ranges and used to seed the starting queue capacity for future ranges. The assumption is that if previous ranges were fast, new ones will be fast too. The scan range adjusts the queue capacity dynamically so a rough approximation will do.

Definition at line 256 of file disk-io-mgr-internal.h.

AtomicInt<int64_t> impala::DiskIoMgr::RequestContext::unexpected_remote_bytes_
private

Total number of bytes from remote reads that were expected to be local.

Definition at line 224 of file disk-io-mgr-internal.h.

Referenced by impala::DiskIoMgr::unexpected_remote_bytes().


The documentation for this class was generated from the following files: