15 #ifndef IMPALA_RUNTIME_DISK_IO_MGR_INTERNAL_H
16 #define IMPALA_RUNTIME_DISK_IO_MGR_INTERNAL_H
20 #include <boost/thread/locks.hpp>
22 #include <gutil/strings/substitute.h>
58 boost::unique_lock<boost::mutex> disk_lock(
lock);
167 DCHECK(range != NULL);
320 __sync_synchronize();
326 __sync_synchronize();
std::string DebugString() const
Dumps out reader information. Lock should be taken by caller.
InternalQueue< RequestRange > * in_flight_ranges()
int initial_queue_capacity_
InternalQueue< ScanRange > blocked_ranges_
Ranges that are blocked due to back pressure on outgoing buffers.
InternalQueue< ScanRange > * unstarted_scan_ranges()
InternalQueue< WriteRange > * unstarted_write_ranges()
Status status_
Status of this reader. Set to non-ok if cancelled.
void DecrementRequestThread()
void DecrementRequestThreadAndCheckDone(RequestContext *context)
InternalQueue< WriteRange > unstarted_write_ranges_
boost::condition_variable disks_complete_cond_var_
Condition variable for UnregisterContext() to wait for all disks to complete.
void IncrementRequestThreadAndDequeue()
const InternalQueue< WriteRange > * unstarted_write_ranges() const
AtomicInt< int64_t > bytes_read_dn_cache_
Total number of bytes read from date node cache, updated at end of each range scan.
int num_remaining_ranges_
AtomicInt< int > total_range_queue_capacity_
ScanRange * next_scan_range_to_start()
RuntimeProfile::Counter * disks_accessed_bitmap_
ScanRange * next_scan_range_to_start_
InternalQueue< ScanRange > cached_ranges_
RuntimeProfile::Counter * active_read_thread_counter_
Number of active read threads.
void DecrementDiskRefCount()
AtomicInt< int > num_finished_ranges_
The number of scan ranges that have been completed for this reader.
int num_disks_with_ranges_
void AddRequestRange(DiskIoMgr::RequestRange *range, bool schedule_immediately)
void EnqueueContext(RequestContext *worker)
Enqueue the request context to the disk queue. The DiskQueue lock must not be taken.
InternalQueue< ScanRange > unstarted_scan_ranges_
AtomicInt< int > num_remote_ranges_
int num_threads_in_op() const
AtomicInt< int64_t > bytes_read_short_circuit_
Total number of bytes read via short circuit read, updated at end of each range scan.
AtomicInt< int > num_threads_in_op_
AtomicInt< int > num_unstarted_scan_ranges_
std::list< RequestContext * > request_contexts
list of all request contexts that have work queued on this disk
const InternalQueue< RequestRange > * in_flight_ranges() const
Reader is initialized and maps to a client.
This class is thread-safe.
void ScheduleContext(RequestContext *context, int disk_id)
int num_remaining_ranges() const
void set_next_scan_range_to_start(ScanRange *range)
boost::mutex lock
Lock that protects access to 'request_contexts' and 'work_available'.
bool Validate() const
Validates invariants of reader. Reader lock must be taken beforehand.
std::vector< DiskQueue * > disk_queues_
void Reset(MemTracker *tracker)
Resets this object.
AtomicInt< int > num_used_buffers_
InternalQueue< RequestRange > in_flight_ranges_
RuntimeProfile::Counter * bytes_read_counter_
Total bytes read for this reader.
int initial_scan_range_queue_capacity() const
boost::condition_variable ready_to_start_ranges_cv_
RequestContext(DiskIoMgr *parent, int num_disks)
T must be a subclass of InternalQueue::Node.
State state_
Current state of the reader.
RuntimeProfile::Counter * read_timer_
Total time spent in hdfs reading.
AtomicInt< int64_t > bytes_read_local_
Total number of bytes read locally, updated at end of each range scan.
std::vector< PerDiskState > disk_states_
int & num_remaining_ranges()
boost::condition_variable work_available
void ScheduleScanRange(DiskIoMgr::ScanRange *range)
AtomicInt< int > num_buffers_in_reader_
AtomicInt< int64_t > unexpected_remote_bytes_
Total number of bytes from remote reads that were expected to be local.
void Cancel(const Status &status)
Cancels the context with status code 'status'.
MemTracker * mem_tracker_
Memory used for this reader. This is unowned by this object.
int disk_id
Disk id (0-based)
const InternalQueue< ScanRange > * unstarted_scan_ranges() const
AtomicInt< int > num_ready_buffers_
InternalQueue< ScanRange > ready_to_start_ranges_
DiskIoMgr * parent_
Parent object.