Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
|
#include <disk-io-mgr-internal.h>
Classes | |
class | PerDiskState |
Public Types | |
enum | State { Active, Cancelled, Inactive } |
Public Member Functions | |
RequestContext (DiskIoMgr *parent, int num_disks) | |
void | Reset (MemTracker *tracker) |
Resets this object. More... | |
void | DecrementDiskRefCount () |
void | ScheduleScanRange (DiskIoMgr::ScanRange *range) |
void | Cancel (const Status &status) |
Cancels the context with status code 'status'. More... | |
void | AddRequestRange (DiskIoMgr::RequestRange *range, bool schedule_immediately) |
int | initial_scan_range_queue_capacity () const |
bool | Validate () const |
Validates invariants of reader. Reader lock must be taken beforehand. More... | |
std::string | DebugString () const |
Dumps out reader information. Lock should be taken by caller. More... | |
Private Attributes | |
DiskIoMgr * | parent_ |
Parent object. More... | |
MemTracker * | mem_tracker_ |
Memory used for this reader. This is unowned by this object. More... | |
RuntimeProfile::Counter * | bytes_read_counter_ |
Total bytes read for this reader. More... | |
RuntimeProfile::Counter * | read_timer_ |
Total time spent in hdfs reading. More... | |
RuntimeProfile::Counter * | active_read_thread_counter_ |
Number of active read threads. More... | |
RuntimeProfile::Counter * | disks_accessed_bitmap_ |
AtomicInt< int64_t > | bytes_read_local_ |
Total number of bytes read locally, updated at end of each range scan. More... | |
AtomicInt< int64_t > | bytes_read_short_circuit_ |
Total number of bytes read via short circuit read, updated at end of each range scan. More... | |
AtomicInt< int64_t > | bytes_read_dn_cache_ |
Total number of bytes read from date node cache, updated at end of each range scan. More... | |
AtomicInt< int64_t > | unexpected_remote_bytes_ |
Total number of bytes from remote reads that were expected to be local. More... | |
AtomicInt< int > | num_buffers_in_reader_ |
AtomicInt< int > | num_finished_ranges_ |
The number of scan ranges that have been completed for this reader. More... | |
AtomicInt< int > | num_remote_ranges_ |
AtomicInt< int > | num_unstarted_scan_ranges_ |
AtomicInt< int > | num_used_buffers_ |
AtomicInt< int > | num_ready_buffers_ |
AtomicInt< int > | total_range_queue_capacity_ |
int | initial_queue_capacity_ |
boost::mutex | lock_ |
State | state_ |
Current state of the reader. More... | |
Status | status_ |
Status of this reader. Set to non-ok if cancelled. More... | |
int | num_disks_with_ranges_ |
InternalQueue< ScanRange > | cached_ranges_ |
InternalQueue< ScanRange > | ready_to_start_ranges_ |
boost::condition_variable | ready_to_start_ranges_cv_ |
InternalQueue< ScanRange > | blocked_ranges_ |
Ranges that are blocked due to back pressure on outgoing buffers. More... | |
boost::condition_variable | disks_complete_cond_var_ |
Condition variable for UnregisterContext() to wait for all disks to complete. More... | |
std::vector< PerDiskState > | disk_states_ |
Friends | |
class | DiskIoMgr |
Internal per request-context state. This object maintains a lot of state that is carefully synchronized. The context maintains state across all disks as well as per disk state. The unit for an IO request is a RequestRange, which may be a ScanRange or a WriteRange. A scan range for the reader is on one of five states: 1) PerDiskState's unstarted_ranges: This range has only been queued and nothing has been read from it. 2) RequestContext's ready_to_start_ranges_: This range is about to be started. As soon as the reader picks it up, it will move to the in_flight_ranges queue. 3) PerDiskState's in_flight_ranges: This range is being processed and will be read from the next time a disk thread picks it up in GetNextRequestRange() 4) ScanRange's outgoing ready buffers is full. We can't read for this range anymore. We need the caller to pull a buffer off which will put this in the in_flight_ranges queue. These ranges are in the RequestContext's blocked_ranges_ queue. 5) ScanRange is cached and in the cached_ranges_ queue. If the scan range is read and does not get blocked on the outgoing queue, the transitions are: 1 -> 2 -> 3. If the scan range does get blocked, the transitions are 1 -> 2 -> 3 -> (4 -> 3)* In the case of a cached scan range, the range is immediately put in cached_ranges_. When the caller asks for the next range to process, we first pull ranges from the cache_ranges_ queue. If the range was cached, the range is removed and done (ranges are either entirely cached or not at all). If the cached read attempt fails, we put the range in state 1. A write range for a context may be in one of two lists: 1) unstarted_write_ranges_ : Ranges that have been queued but not processed. 2) in_flight_ranges_: The write range is ready to be processed by the next disk thread that picks it up in GetNextRequestRange(). AddWriteRange() adds WriteRanges for a disk. It is the responsibility of the client to pin the data to be written via a WriteRange in memory. After a WriteRange has been written, a callback is invoked to inform the client that the write has completed. An important assumption is that write does not exceed the maximum read size and that the entire range is written when the write request is handled. (In other words, writes are not broken up.) When a RequestContext is processed by a disk thread in GetNextRequestRange(), a write range is always removed from the list of unstarted write ranges and appended to the in_flight_ranges_ queue. This is done to alternate reads and writes - a read that is scheduled (by calling GetNextRange()) is always followed by a write (if one exists). And since at most one WriteRange can be present in in_flight_ranges_ at any time (once a write range is returned from GetNetxRequestRange() it is completed and not re-enqueued), a scan range scheduled via a call to GetNextRange() can be queued up behind at most one write range.
Definition at line 122 of file disk-io-mgr-internal.h.
Definition at line 124 of file disk-io-mgr-internal.h.
DiskIoMgr::RequestContext::RequestContext | ( | DiskIoMgr * | parent, |
int | num_disks | ||
) |
Definition at line 129 of file disk-io-mgr-reader-context.cc.
void DiskIoMgr::RequestContext::AddRequestRange | ( | DiskIoMgr::RequestRange * | range, |
bool | schedule_immediately | ||
) |
Adds request range to disk queue for this request context. Currently, schedule_immediately must be false is RequestRange is a write range.
Definition at line 91 of file disk-io-mgr-reader-context.cc.
References impala::DiskIoMgr::RequestRange::disk_id(), impala::DiskIoMgr::RequestContext::PerDiskState::done(), impala::DiskIoMgr::RequestContext::PerDiskState::next_scan_range_to_start(), impala::DiskIoMgr::RequestContext::PerDiskState::num_remaining_ranges(), impala::DiskIoMgr::RequestType::READ, impala::DiskIoMgr::RequestRange::request_type(), impala::DiskIoMgr::RequestContext::PerDiskState::ScheduleContext(), impala::DiskIoMgr::RequestContext::PerDiskState::set_done(), impala::DiskIoMgr::RequestContext::PerDiskState::unstarted_scan_ranges(), impala::DiskIoMgr::RequestContext::PerDiskState::unstarted_write_ranges(), and impala::DiskIoMgr::RequestType::WRITE.
Referenced by impala::DiskIoMgr::AddScanRanges(), impala::DiskIoMgr::AddWriteRange(), and impala::DiskIoMgr::GetNextRange().
void DiskIoMgr::RequestContext::Cancel | ( | const Status & | status | ) |
Cancels the context with status code 'status'.
Definition at line 21 of file disk-io-mgr-reader-context.cc.
References blocked_ranges_, cached_ranges_, impala::DiskIoMgr::WriteRange::callback_, impala::DiskIoMgr::ScanRange::Cancel(), Cancelled, DebugString(), disk_states_, impala::DiskIoMgr::RequestContext::PerDiskState::in_flight_ranges(), lock_, impala::Status::ok(), impala::DiskIoMgr::RequestType::READ, ready_to_start_ranges_, ready_to_start_ranges_cv_, impala::DiskIoMgr::RequestRange::request_type(), impala::DiskIoMgr::RequestContext::PerDiskState::ScheduleContext(), state_, status_, impala::DiskIoMgr::RequestContext::PerDiskState::unstarted_scan_ranges(), impala::DiskIoMgr::RequestContext::PerDiskState::unstarted_write_ranges(), Validate(), and impala::DiskIoMgr::RequestType::WRITE.
Referenced by impala::DiskIoMgr::CancelContext().
string DiskIoMgr::RequestContext::DebugString | ( | ) | const |
Dumps out reader information. Lock should be taken by caller.
Definition at line 176 of file disk-io-mgr-reader-context.cc.
References Active, Cancelled, and Inactive.
Referenced by impala::DiskIoMgr::AddScanRanges(), Cancel(), impala::DiskIoMgr::CancelContext(), DecrementDiskRefCount(), impala::DiskIoMgr::GetNextRange(), impala::DiskIoMgr::HandleReadFinished(), impala::DiskIoMgr::HandleWriteFinished(), impala::DiskIoMgr::ReadRange(), and impala::DiskIoMgr::UnregisterContext().
|
inline |
Decrements the number of active disks for this reader. If the disk count goes to 0, the disk complete condition variable is signaled. Reader lock must be taken before this call.
Definition at line 146 of file disk-io-mgr-internal.h.
References DebugString(), disks_complete_cond_var_, num_disks_with_ranges_, and Validate().
Referenced by impala::DiskIoMgr::RequestContext::PerDiskState::DecrementRequestThreadAndCheckDone().
|
inline |
Returns the default queue capacity for scan ranges. This is updated as the reader processes ranges.
Definition at line 182 of file disk-io-mgr-internal.h.
References initial_queue_capacity_.
Referenced by impala::DiskIoMgr::ScanRange::InitInternal().
void DiskIoMgr::RequestContext::Reset | ( | MemTracker * | tracker | ) |
Resets this object.
Definition at line 140 of file disk-io-mgr-reader-context.cc.
References impala::DiskIoMgr::DEFAULT_QUEUE_CAPACITY, impala::Status::OK, impala::DiskIoMgr::read_timer_, and tracker.
|
inline |
Reader & Disk Scheduling: Readers that currently can't do work are not on the disk's queue. These readers are ones that don't have any ranges in the in_flight_queue AND have not prepared a range by setting next_range_to_start. The rule to make sure readers are scheduled correctly is to ensure anytime a range is put on the in_flight_queue or anytime next_range_to_start is set to NULL, the reader is scheduled. Adds range to in_flight_ranges, scheduling this reader on the disk threads if necessary. Reader lock must be taken before this.
Definition at line 165 of file disk-io-mgr-internal.h.
References Active, impala::DiskIoMgr::RequestRange::disk_id(), disk_states_, impala::DiskIoMgr::RequestContext::PerDiskState::in_flight_ranges(), impala::DiskIoMgr::RequestContext::PerDiskState::ScheduleContext(), and state_.
Referenced by impala::DiskIoMgr::GetNextRange(), and impala::DiskIoMgr::HandleReadFinished().
bool DiskIoMgr::RequestContext::Validate | ( | ) | const |
Validates invariants of reader. Reader lock must be taken beforehand.
Definition at line 206 of file disk-io-mgr-reader-context.cc.
References Cancelled, impala::DiskIoMgr::RequestContext::PerDiskState::done(), impala::DiskIoMgr::RequestContext::PerDiskState::in_flight_ranges(), Inactive, impala::DiskIoMgr::RequestContext::PerDiskState::is_on_queue(), impala::DiskIoMgr::RequestContext::PerDiskState::num_remaining_ranges(), impala::DiskIoMgr::RequestContext::PerDiskState::num_threads_in_op(), and impala::DiskIoMgr::RequestContext::PerDiskState::unstarted_scan_ranges().
Referenced by impala::DiskIoMgr::AddScanRanges(), Cancel(), impala::DiskIoMgr::CancelContext(), DecrementDiskRefCount(), impala::DiskIoMgr::GetNextRange(), impala::DiskIoMgr::HandleReadFinished(), impala::DiskIoMgr::HandleWriteFinished(), impala::DiskIoMgr::ReadRange(), and impala::DiskIoMgr::UnregisterContext().
|
friend |
Definition at line 191 of file disk-io-mgr-internal.h.
|
private |
Number of active read threads.
Definition at line 207 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::ReadRange(), and impala::DiskIoMgr::set_active_read_thread_counter().
|
private |
Ranges that are blocked due to back pressure on outgoing buffers.
Definition at line 295 of file disk-io-mgr-internal.h.
Referenced by Cancel(), impala::DiskIoMgr::HandleReadFinished(), and impala::DiskIoMgr::ReadRange().
|
private |
Total bytes read for this reader.
Definition at line 201 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::ReadRange(), and impala::DiskIoMgr::set_bytes_read_counter().
|
private |
Total number of bytes read from date node cache, updated at end of each range scan.
Definition at line 221 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::bytes_read_dn_cache().
|
private |
Total number of bytes read locally, updated at end of each range scan.
Definition at line 215 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::bytes_read_local().
|
private |
Total number of bytes read via short circuit read, updated at end of each range scan.
Definition at line 218 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::bytes_read_short_circuit().
|
private |
This is the list of ranges that are expected to be cached on the DN. When the reader asks for a new range (GetNextScanRange()), we first return ranges from this list.
Definition at line 282 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::AddScanRanges(), Cancel(), and impala::DiskIoMgr::GetNextRange().
|
private |
Per disk states to synchronize multiple disk threads accessing the same request context.
Definition at line 464 of file disk-io-mgr-internal.h.
Referenced by Cancel(), impala::DiskIoMgr::GetNextRange(), impala::DiskIoMgr::HandleReadFinished(), impala::DiskIoMgr::HandleWriteFinished(), impala::DiskIoMgr::ReadRange(), and ScheduleScanRange().
|
private |
Disk access bitmap. The counter's bit[i] is set if disk id i has been accessed. TODO: we can only support up to 64 disks with this bitmap but it lets us use a builtin atomic instruction. Probably good enough for now.
Definition at line 212 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::ReadRange(), and impala::DiskIoMgr::set_disks_access_bitmap().
|
private |
Condition variable for UnregisterContext() to wait for all disks to complete.
Definition at line 298 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::CancelContext(), and DecrementDiskRefCount().
|
private |
The initial queue size for new scan ranges. This is always total_range_queue_capacity_ / num_finished_ranges_ but stored as a separate variable to allow reading this value without taking a lock. Doing the division at read time (with no lock) could lead to a race where only total_range_queue_capacity_ or num_finished_ranges_ was updated.
Definition at line 263 of file disk-io-mgr-internal.h.
Referenced by initial_scan_range_queue_capacity().
|
private |
All fields below are accessed by multiple threads and the lock needs to be taken before accessing them.
Definition at line 267 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::AddScanRanges(), impala::DiskIoMgr::AddWriteRange(), Cancel(), impala::DiskIoMgr::CancelContext(), impala::DiskIoMgr::context_status(), impala::DiskIoMgr::GetNextRange(), impala::DiskIoMgr::HandleReadFinished(), impala::DiskIoMgr::HandleWriteFinished(), impala::DiskIoMgr::ReadRange(), and impala::DiskIoMgr::UnregisterContext().
|
private |
Memory used for this reader. This is unowned by this object.
Definition at line 198 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::GetBufferDesc(), and impala::DiskIoMgr::ReadRange().
|
private |
The number of buffers that have been returned to the reader (via GetNext) that the reader has not returned. Only included for debugging and diagnostics.
Definition at line 228 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::ScanRange::EnqueueBuffer(), impala::DiskIoMgr::ReturnBuffer(), and impala::DiskIoMgr::UnregisterContext().
|
private |
The number of disks with scan ranges remaining (always equal to the sum of disks with ranges).
Definition at line 277 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::CancelContext(), and DecrementDiskRefCount().
|
private |
The number of scan ranges that have been completed for this reader.
Definition at line 231 of file disk-io-mgr-internal.h.
|
private |
The total number of ready buffers across all ranges. Ready buffers are buffers that have been read from disk but not retrieved by the caller. This is the sum of all queued buffers in all ranges for this reader context.
Definition at line 249 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::ScanRange::EnqueueBuffer(), and impala::DiskIoMgr::queue_size().
|
private |
The number of scan ranges that required a remote read, updated at the end of each range scan. Only used for diagnostics.
Definition at line 235 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::num_remote_ranges().
|
private |
The total number of scan ranges that have not been started. Only used for diagnostics. This is the sum of all unstarted_scan_ranges across all disks.
Definition at line 239 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::GetNextRange(), and impala::DiskIoMgr::num_unstarted_ranges().
|
private |
The number of buffers that are being used for this reader. This is the sum of all buffers in ScanRange queues and buffers currently being read into (i.e. about to be queued).
Definition at line 244 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::ScanRange::EnqueueBuffer(), impala::DiskIoMgr::ReadRange(), and impala::DiskIoMgr::UnregisterContext().
|
private |
Parent object.
Definition at line 192 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::RequestContext::PerDiskState::ScheduleContext().
|
private |
Total time spent in hdfs reading.
Definition at line 204 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::ReadRange(), and impala::DiskIoMgr::set_read_timer().
|
private |
A list of ranges that should be returned in subsequent calls to GetNextRange. There is a trade-off with when to populate this list. Populating it on demand means consumers need to wait (happens in DiskIoMgr::GetNextRange()). Populating it preemptively means we make worse scheduling decisions. We currently populate one range per disk. TODO: think about this some more.
Definition at line 291 of file disk-io-mgr-internal.h.
Referenced by Cancel(), and impala::DiskIoMgr::GetNextRange().
|
private |
Definition at line 292 of file disk-io-mgr-internal.h.
Referenced by Cancel(), and impala::DiskIoMgr::GetNextRange().
|
private |
Current state of the reader.
Definition at line 270 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::AddScanRanges(), impala::DiskIoMgr::AddWriteRange(), Cancel(), impala::DiskIoMgr::GetNextRange(), impala::DiskIoMgr::HandleReadFinished(), impala::DiskIoMgr::HandleWriteFinished(), impala::DiskIoMgr::ReadRange(), impala::DiskIoMgr::RequestContextCache::ReturnContext(), and ScheduleScanRange().
|
private |
Status of this reader. Set to non-ok if cancelled.
Definition at line 273 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::AddScanRanges(), impala::DiskIoMgr::AddWriteRange(), Cancel(), impala::DiskIoMgr::context_status(), impala::DiskIoMgr::GetNextRange(), impala::DiskIoMgr::HandleReadFinished(), and impala::DiskIoMgr::ReadRange().
|
private |
The total (sum) of queue capacities for finished scan ranges. This value divided by num_finished_ranges_ is the average for finished ranges and used to seed the starting queue capacity for future ranges. The assumption is that if previous ranges were fast, new ones will be fast too. The scan range adjusts the queue capacity dynamically so a rough approximation will do.
Definition at line 256 of file disk-io-mgr-internal.h.
|
private |
Total number of bytes from remote reads that were expected to be local.
Definition at line 224 of file disk-io-mgr-internal.h.
Referenced by impala::DiskIoMgr::unexpected_remote_bytes().