16 #ifndef IMPALA_RUNTIME_DISK_IO_MGR_H
17 #define IMPALA_RUNTIME_DISK_IO_MGR_H
21 #include <boost/scoped_ptr.hpp>
22 #include <boost/unordered_set.hpp>
23 #include <boost/thread/mutex.hpp>
24 #include <boost/thread/condition_variable.hpp>
25 #include <boost/thread/thread.hpp>
364 Status Read(
char* buffer, int64_t* bytes_read,
bool* eosr);
470 void SetData(
const uint8_t* buffer, int64_t
len);
490 DiskIoMgr(
int num_disks,
int threads_per_disk,
int min_buffer_size,
491 int max_buffer_size);
539 bool schedule_immediately =
false);
563 int AssignQueue(
const char* file,
int disk_id,
bool expected_local);
765 const Status& write_status);
void ReturnBufferDesc(BufferDescriptor *desc)
Returns a buffer desc object which can now be used for another reader.
void CancelContext(RequestContext *context, bool wait_for_disks_completion=false)
ScanRange(int initial_capacity=-1)
The initial queue capacity for this. Specify -1 to use IoMgr default.
RequestType::type request_type() const
int num_remote_ranges(RequestContext *reader) const
AtomicInt< int > num_buffers_in_readers_
Total number of buffers in readers.
Status status_
Status of the read to this buffer. if status is not ok, 'buffer' is NULL.
char * buffer_
buffer with the read contents
void Write(RequestContext *writer_context, WriteRange *write_range)
MemTracker * mem_tracker_
The current tracker this buffer is associated with.
int64_t offset_
Offset within file_ being read or written.
boost::function< void(const Status &)> WriteDoneCallback
int AssignQueue(const char *file, int disk_id, bool expected_local)
void set_active_read_thread_counter(RequestContext *, RuntimeProfile::Counter *)
AtomicInt< int > num_allocated_buffers_
Total number of allocated buffers, used for debugging.
const int num_threads_per_disk_
int64_t queue_size(RequestContext *reader) const
int64_t len_
Length of data read or written.
The request type, read or write associated with a request range.
boost::scoped_ptr< RequestContextCache > request_context_cache_
const int min_buffer_size_
The minimum size of each read buffer.
int64_t GetReadThroughput()
int64_t bytes_read_dn_cache(RequestContext *reader) const
struct hadoopRzOptions * cached_read_options_
Options object for cached hdfs reads. Set on startup and never modified.
boost::condition_variable buffer_ready_cv_
WriteRange(const std::string &file, int64_t file_offset, int disk_id, WriteDoneCallback callback)
std::list< BufferDescriptor * > free_buffer_descs_
List of free buffer desc objects that can be handed out to clients.
Status AddWriteRange(RequestContext *writer, WriteRange *write_range)
int num_remote_disks() const
Returns the total number of remote "disk" queues.
void InitInternal(DiskIoMgr *io_mgr, RequestContext *reader)
Initialize internal fields.
void UnregisterContext(RequestContext *context)
const char * file() const
int RemoteS3DiskId() const
The disk ID (and therefore disk_queues_ index) used for S3 accesses.
Status WriteRangeHelper(FILE *file_handle, WriteRange *write_range)
Status AddScanRanges(RequestContext *reader, const std::vector< ScanRange * > &ranges, bool schedule_immediately=false)
RequestType::type request_type_
The type of IO request, READ or WRITE.
Status GetNextRange(RequestContext *reader, ScanRange **range)
int64_t scan_range_offset() const
Returns the offset within the scan range that this buffer starts at.
void CleanupQueuedBuffers()
Status Read(RequestContext *reader, ScanRange *range, BufferDescriptor **buffer)
void Reset(RequestContext *reader, ScanRange *range, char *buffer, int64_t buffer_len)
Resets the buffer descriptor state for a new reader, range and data buffer.
int free_buffers_idx(int64_t buffer_size)
Returns the index into free_buffers_ for a given buffer size.
hdfsFS fs_
Hadoop filesystem that contains file_, or set to NULL for local filesystem.
Status Read(char *buffer, int64_t *bytes_read, bool *eosr)
int num_allocated_buffers() const
Returns the number of allocated buffers.
void ReturnBuffer(BufferDescriptor *buffer)
bool EnqueueBuffer(BufferDescriptor *buffer)
void SetData(const uint8_t *buffer, int64_t len)
Status Open()
Opens the file for this range. This function only modifies state in this range.
int64_t buffer_len_
length of buffer_. For buffers from cached reads, the length is 0.
WriteDoneCallback callback_
Callback to invoke after the write is complete.
std::string file_
Path to file being read or written.
int64_t bytes_read_short_circuit(RequestContext *reader) const
bool eosr_returned_
If true, the last buffer for this scan range has been returned.
int64_t scan_range_offset_
int num_local_disks() const
Returns the number of local disks attached to the system.
int num_total_disks() const
Returns the total number of disk queues (both local and remote).
DiskIoMgr()
Create DiskIoMgr with default configs.
bool eosr_
true if the current scan range is complete
static const int DEFAULT_QUEUE_CAPACITY
void SetMemTracker(MemTracker *tracker)
void ReturnFreeBuffer(char *buffer, int64_t buffer_size)
void set_disks_access_bitmap(RequestContext *, RuntimeProfile::Counter *)
void HandleReadFinished(DiskQueue *, RequestContext *, BufferDescriptor *)
bool eosr_queued_
If true, the last buffer for this scan range has been queued.
int ready_buffers_capacity() const
MemTracker * process_mem_tracker_
Process memory tracker; needed to account for io buffers.
void HandleWriteFinished(RequestContext *writer, WriteRange *write_range, const Status &write_status)
void set_bytes_read_counter(RequestContext *, RuntimeProfile::Counter *)
ThreadGroup disk_thread_group_
Thread group containing all the worker threads.
void Reset(hdfsFS fs, const char *file, int64_t len, int64_t offset, int disk_id, bool try_cache, bool expected_local, int64_t mtime, void *metadata=NULL)
boost::mutex free_buffers_lock_
Protects free_buffers_ and free_buffer_descs_.
This class is thread-safe.
RuntimeProfile::Counter total_bytes_read_counter_
Total bytes read by the IoMgr.
BufferDescriptor(DiskIoMgr *io_mgr)
RequestContext * reader_
Reader that this buffer is for.
int64_t len_
length of read contents
RequestContext * reader_
Reader/owner of the scan range.
std::vector< DiskQueue * > disk_queues_
const int max_buffer_size_
Maximum read size. This is also the maximum size of each allocated buffer.
static const int64_t NEVER_CACHE
If the mtime is set to NEVER_CACHE, the file handle should never be cached.
Status Init(MemTracker *process_mem_tracker)
Initialize the IoMgr. Must be called once before any of the other APIs.
int64_t bytes_read_local(RequestContext *reader) const
bool is_cancelled_
If true, this scan range has been cancelled.
friend class DiskIoMgrTest_Buffers_Test
Status RegisterContext(RequestContext **request_context, MemTracker *reader_mem_tracker=NULL)
void ReadRange(DiskQueue *disk_queue, RequestContext *reader, ScanRange *range)
Reads the specified scan range and calls HandleReadFinished when done.
T must be a subclass of InternalQueue::Node.
char * GetFreeBuffer(int64_t *buffer_size)
std::string DebugString()
Dumps the disk IoMgr queues (for readers and disks)
Status GetNext(BufferDescriptor **buffer)
bool expected_local() const
ScanRange * scan_range_
Scan range that this buffer is for.
int64_t unexpected_remote_bytes(RequestContext *reader) const
void set_read_timer(RequestContext *, RuntimeProfile::Counter *)
std::list< BufferDescriptor * > ready_buffers_
int ready_buffers_capacity_
int disk_id_
Id of disk containing file_;.
BufferDescriptor * GetBufferDesc(RequestContext *reader, ScanRange *range, char *buffer, int64_t buffer_size)
ObjectPool pool_
Pool to allocate BufferDescriptors.
Status ValidateScanRange(ScanRange *range)
Validates that range is correctly initialized.
int max_read_buffer_size() const
Returns the maximum read buffer size.
bool GetNextRequestRange(DiskQueue *disk_queue, RequestRange **range, RequestContext **request_context)
std::vector< std::list< char * > > free_buffers_
int bytes_read_
Number of bytes read so far for this scan range.
struct hadoopRzBuffer * cached_buffer_
int64_t MaxReadChunkSize() const
Maximum length in bytes for hdfsRead() calls.
std::string DebugString() const
return a descriptive string for debug.
int num_unstarted_ranges(RequestContext *reader) const
Returns the number of unstarted scan ranges for this reader.
void WorkLoop(DiskQueue *queue)
Status ReadFromCache(bool *read_succeeded)
void Cancel(const Status &status)
RuntimeProfile::Counter read_timer_
Total time spent in hdfs reading.
Status context_status(RequestContext *context) const
int num_buffers_in_readers() const
Returns the number of buffers currently owned by all readers.
void Close()
Closes the file for this range. This function only modifies state in this range.
int64_t mtime_
Last modified time of the file associated with the scan range.