19 #include <gutil/strings/substitute.h>
20 #include <boost/algorithm/string.hpp>
26 using namespace impala;
27 using namespace strings;
31 DEFINE_int32(num_disks, 0,
"Number of disks on data node.");
34 DEFINE_int32(num_threads_per_disk, 0,
"number of threads per disk");
39 DEFINE_int32(num_s3_io_threads, 16,
"number of S3 I/O threads");
44 DEFINE_int32(read_size, 8 * 1024 * 1024,
"Read Size (in bytes)");
45 DEFINE_int32(min_buffer_size, 1024,
"The minimum read buffer size (in bytes)");
49 "For each io buffer size, the maximum number of buffers the IoMgr will hold onto");
75 DCHECK(reader->
state_ != RequestContext::Inactive);
76 reader->
state_ = RequestContext::Inactive;
77 lock_guard<mutex> l(
lock_);
78 inactive_contexts_.push_back(reader);
83 lock_guard<mutex> l(
lock_);
84 if (!inactive_contexts_.empty()) {
86 inactive_contexts_.pop_front();
90 all_contexts_.push_back(reader);
97 for (list<RequestContext*>::iterator it = all_contexts_.begin();
98 it != all_contexts_.end(); ++it) {
106 for (list<RequestContext*>::iterator it = all_contexts_.begin();
107 it != all_contexts_.end(); ++it) {
108 if ((*it)->state_ != RequestContext::Inactive) {
112 DCHECK_EQ(all_contexts_.size(), inactive_contexts_.size());
113 return all_contexts_.size() == inactive_contexts_.size();
132 lock_guard<mutex> l(
lock_);
134 for (list<RequestContext*>::iterator it = all_contexts_.begin();
135 it != all_contexts_.end(); ++it) {
136 unique_lock<mutex> lock((*it)->lock_);
137 ss << (*it)->DebugString() << endl;
144 ss <<
"RequestContexts: " << endl << request_context_cache_->DebugString() << endl;
146 ss <<
"Disks: " << endl;
147 for (
int i = 0; i < disk_queues_.size(); ++i) {
148 unique_lock<mutex> lock(disk_queues_[i]->lock);
149 ss <<
" " << (
void*) disk_queues_[i] <<
":" ;
150 if (!disk_queues_[i]->request_contexts.empty()) {
152 BOOST_FOREACH(
RequestContext* req_context, disk_queues_[i]->request_contexts) {
153 ss << (
void*)req_context;
162 io_mgr_(io_mgr), reader_(NULL), buffer_(NULL) {
166 ScanRange* range,
char* buffer, int64_t buffer_len) {
167 DCHECK(io_mgr_ != NULL);
168 DCHECK(buffer_ == NULL);
169 DCHECK(range != NULL);
170 DCHECK(buffer != NULL);
171 DCHECK_GE(buffer_len, 0);
175 buffer_len_ = buffer_len;
183 DCHECK(io_mgr_ != NULL);
184 io_mgr_->ReturnBuffer(
this);
189 if (scan_range_->cached_buffer_ != NULL)
return;
190 if (mem_tracker_ == tracker)
return;
191 if (mem_tracker_ != NULL) mem_tracker_->Release(buffer_len_);
193 if (mem_tracker_ != NULL) mem_tracker_->
Consume(buffer_len_);
199 offset_ = file_offset;
201 callback_ = callback;
212 LOG(WARNING) <<
"This machine does not support sse4_2. The default IO system "
213 "configurations are suboptimal for this hardware. Consider "
214 "increasing the number of threads per disk by restarting impalad "
215 "using the --num_threads_per_disk flag with a higher value";
235 int max_buffer_size) :
236 num_threads_per_disk_(threads_per_disk),
237 max_buffer_size_(max_buffer_size),
238 min_buffer_size_(min_buffer_size),
239 cached_read_options_(NULL),
241 total_bytes_read_counter_(TUnit::BYTES),
242 read_timer_(TUnit::TIME_NS) {
268 for (list<RequestContext*>::iterator it =
disk_queues_[i]->request_contexts.begin();
270 DCHECK_EQ((*it)->disk_states_[disk_id].num_threads_in_op(), 0);
271 DCHECK((*it)->disk_states_[disk_id].done());
272 (*it)->DecrementDiskRefCount();
282 int num_free_buffers = 0;
297 DCHECK(process_mem_tracker != NULL);
305 int num_threads_per_disk;
307 num_threads_per_disk = FLAGS_num_s3_io_threads;
315 for (
int j = 0; j < num_threads_per_disk; ++j) {
317 ss <<
"work-loop(Disk: " << i <<
", Thread: " << j <<
")";
340 (*request_context)->Reset(mem_tracker);
349 unique_lock<mutex> reader_lock(reader->
lock_);
380 if (wait_for_disks_completion) {
381 unique_lock<mutex> lock(context->
lock_);
412 unique_lock<mutex> lock(context->
lock_);
448 ss <<
"Invalid scan range. Bad disk id: " << disk_id;
449 DCHECK(
false) << ss.str();
456 const vector<ScanRange*>& ranges,
bool schedule_immediately) {
460 for (
int i = 0; i < ranges.size(); ++i) {
462 ranges[i]->InitInternal(
this, reader);
466 unique_lock<mutex> reader_lock(reader->
lock_);
475 for (
int i = 0; i < ranges.size(); ++i) {
477 DCHECK_NE(ranges[i]->len(), 0);
481 if (schedule_immediately) {
482 bool cached_read_succeeded;
484 if (cached_read_succeeded)
continue;
502 DCHECK_NOTNULL(reader);
503 DCHECK_NOTNULL(range);
507 unique_lock<mutex> reader_lock(reader->
lock_);
526 DCHECK((*range)->try_cache_);
527 bool cached_read_succeeded;
542 DCHECK_NOTNULL(*range);
543 int disk_id = (*range)->disk_id();
544 DCHECK_EQ(*range, reader->
disk_states_[disk_id].next_scan_range_to_start());
547 reader->
disk_states_[disk_id].set_next_scan_range_to_start(NULL);
557 DCHECK_NOTNULL(range);
558 DCHECK_NOTNULL(buffer);
562 return Status(Substitute(
"Cannot perform sync read larger than $0. Request was $1",
566 vector<DiskIoMgr::ScanRange*> ranges;
567 ranges.push_back(range);
570 DCHECK((*buffer) != NULL);
571 DCHECK((*buffer)->eosr());
576 DCHECK_NOTNULL(buffer_desc);
580 if (buffer_desc->
buffer_ != NULL) {
603 DCHECK(desc != NULL);
622 buffer_desc->
Reset(reader, range, buffer, buffer_size);
629 DCHECK_GT(*buffer_size, 0);
649 buffer =
new char[*buffer_size];
657 DCHECK(buffer != NULL);
663 int buffers_freed = 0;
674 bytes_freed += buffer_size;
697 DCHECK(buffer != NULL);
700 <<
"buffer_size_ / min_buffer_size_ should be power of 2, got buffer_size = "
703 if (!FLAGS_disable_mem_pools &&
free_buffers_[idx].size() < FLAGS_max_free_io_buffers) {
732 int disk_id = disk_queue->
disk_id;
737 *request_context = NULL;
740 unique_lock<mutex> disk_lock(disk_queue->
lock);
756 DCHECK(*request_context != NULL);
757 request_disk_state = &((*request_context)->disk_states_[disk_id]);
771 bool reader_limit_exceeded = (*request_context)->mem_tracker_ != NULL
772 ? (*request_context)->mem_tracker_->AnyLimitExceeded() :
false;
774 if (process_limit_exceeded || reader_limit_exceeded) {
778 unique_lock<mutex> request_lock((*request_context)->lock_);
779 VLOG_FILE <<
"Disk (id=" << disk_id <<
") reading for "
780 << (*request_context)->DebugString();
789 << (*request_context)->DebugString();
797 --(*request_context)->num_unstarted_scan_ranges_;
798 (*request_context)->ready_to_start_ranges_.Enqueue(new_range);
801 if ((*request_context)->num_unstarted_scan_ranges_ == 0) {
805 (*request_context)->ready_to_start_ranges_cv_.notify_all();
807 (*request_context)->ready_to_start_ranges_cv_.notify_one();
832 DCHECK(*range != NULL);
837 DCHECK((*request_context)->Validate()) << endl << (*request_context)->DebugString();
846 const Status& write_status) {
854 unique_lock<mutex> writer_lock(writer->
lock_);
868 unique_lock<mutex> reader_lock(reader->
lock_);
873 DCHECK(buffer->
buffer_ != NULL);
887 DCHECK(buffer->
buffer_ != NULL);
896 buffer->
eosr_ =
true;
899 }
else if (buffer->
eosr_) {
942 ReadRange(disk_queue, worker_context, static_cast<ScanRange*>(range));
945 Write(worker_context, static_cast<WriteRange*>(range));
958 DCHECK_GT(bytes_remaining, 0);
959 int64_t buffer_size = ::min(bytes_remaining, static_cast<int64_t>(
max_buffer_size_));
960 bool enough_memory =
true;
963 if (!enough_memory) {
970 if (!enough_memory) {
972 unique_lock<mutex> reader_lock(reader->
lock_);
1002 DCHECK(range != NULL);
1003 DCHECK(reader != NULL);
1004 DCHECK(buffer != NULL);
1007 DCHECK(buffer_desc != NULL);
1018 int64_t disk_bit = 1 << disk_queue->
disk_id;
1042 FILE* file_handle = fopen(write_range->
file(),
"rb+");
1044 if (file_handle == NULL) {
1046 Substitute(
"fopen($0, \"rb+\") failed with errno=$1 description=$2",
1051 int success = fclose(file_handle);
1052 if (ret_status.ok() && success != 0) {
1053 ret_status =
Status(
ErrorMsg(TErrorCode::RUNTIME_ERROR, Substitute(
"fclose($0) failed",
1054 write_range->
file_)));
1063 int file_desc = fileno(file_handle);
1065 if (write_range->
len_ > 0) {
1066 success = posix_fallocate(file_desc, write_range->
offset(), write_range->
len_);
1070 Substitute(
"posix_fallocate($0, $1, $2) failed for file $3"
1071 " with returnval=$4 description=$5", file_desc, write_range->
offset(),
1075 success = fseek(file_handle, write_range->
offset(), SEEK_SET);
1078 Substitute(
"fseek($0, $1, SEEK_SET) failed with errno=$2 description=$3",
1082 int64_t bytes_written = fwrite(write_range->
data_, 1, write_range->
len_, file_handle);
1083 if (bytes_written < write_range->len_) {
1085 Substitute(
"fwrite(buffer, 1, $0, $1) failed with errno=$2 description=$3",
1105 unique_lock<mutex> writer_lock(writer->
lock_);
1119 DCHECK(!expected_local);
1122 if (disk_id == -1) {
1124 static int next_disk_id = 0;
1125 disk_id = next_disk_id++;
std::string DebugString() const
Dumps out reader information. Lock should be taken by caller.
bool IsS3APath(const char *path)
Returns true iff the path refers to a location on an S3A filesystem.
void ReturnBufferDesc(BufferDescriptor *desc)
Returns a buffer desc object which can now be used for another reader.
void CancelContext(RequestContext *context, bool wait_for_disks_completion=false)
RequestType::type request_type() const
int num_remote_ranges(RequestContext *reader) const
AtomicInt< int > num_buffers_in_readers_
Total number of buffers in readers.
Status status_
Status of the read to this buffer. if status is not ok, 'buffer' is NULL.
static IntGauge * IO_MGR_TOTAL_BYTES
char * buffer_
buffer with the read contents
list< RequestContext * > all_contexts_
InternalQueue< ScanRange > blocked_ranges_
Ranges that are blocked due to back pressure on outgoing buffers.
void Write(RequestContext *writer_context, WriteRange *write_range)
boost::function< void(const Status &)> WriteDoneCallback
TODO: Consider allowing fragment IDs as category parameters.
int AssignQueue(const char *file, int disk_id, bool expected_local)
void set_active_read_thread_counter(RequestContext *, RuntimeProfile::Counter *)
static IntGauge * IO_MGR_NUM_UNUSED_BUFFERS
Status status_
Status of this reader. Set to non-ok if cancelled.
void DecrementRequestThread()
AtomicInt< int > num_allocated_buffers_
Total number of allocated buffers, used for debugging.
const int num_threads_per_disk_
int64_t queue_size(RequestContext *reader) const
static IntGauge * IO_MGR_BYTES_WRITTEN
int64_t len_
Length of data read or written.
boost::scoped_ptr< RequestContextCache > request_context_cache_
void DecrementRequestThreadAndCheckDone(RequestContext *context)
const int min_buffer_size_
The minimum size of each read buffer.
int64_t GetReadThroughput()
int64_t bytes_read_dn_cache(RequestContext *reader) const
boost::condition_variable disks_complete_cond_var_
Condition variable for UnregisterContext() to wait for all disks to complete.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
struct hadoopRzOptions * cached_read_options_
Options object for cached hdfs reads. Set on startup and never modified.
WriteRange(const std::string &file, int64_t file_offset, int disk_id, WriteDoneCallback callback)
static int num_disks()
Returns the number of (logical) disks on the system.
boost::mutex lock_
protects all fields below
void IncrementRequestThreadAndDequeue()
const InternalQueue< WriteRange > * unstarted_write_ranges() const
Status AddWriteRange(RequestContext *writer, WriteRange *write_range)
std::list< BufferDescriptor * > free_buffer_descs_
List of free buffer desc objects that can be handed out to clients.
int64_t SpareCapacity() const
list< RequestContext * > inactive_contexts_
friend class BufferDescriptor
static const int LOW_MEMORY
void UnregisterContext(RequestContext *context)
const char * file() const
int RemoteS3DiskId() const
The disk ID (and therefore disk_queues_ index) used for S3 accesses.
Status WriteRangeHelper(FILE *file_handle, WriteRange *write_range)
Status AddScanRanges(RequestContext *reader, const std::vector< ScanRange * > &ranges, bool schedule_immediately=false)
AtomicInt< int64_t > bytes_read_dn_cache_
Total number of bytes read from date node cache, updated at end of each range scan.
static int64_t UnitsPerSecond(const Counter *total_counter, const Counter *timer)
Derived counter function: return measured throughput as input_value/second.
void AddGcFunction(GcFunction f)
ScanRange * next_scan_range_to_start()
Status GetNextRange(RequestContext *reader, ScanRange **range)
#define COUNTER_ADD(c, v)
Status Read(RequestContext *reader, ScanRange *range, BufferDescriptor **buffer)
void Reset(RequestContext *reader, ScanRange *range, char *buffer, int64_t buffer_len)
Resets the buffer descriptor state for a new reader, range and data buffer.
int free_buffers_idx(int64_t buffer_size)
Returns the index into free_buffers_ for a given buffer size.
RuntimeProfile::Counter * disks_accessed_bitmap_
static bool is_rotational(int disk_id)
Status Read(char *buffer, int64_t *bytes_read, bool *eosr)
void ReturnBuffer(BufferDescriptor *buffer)
InternalQueue< ScanRange > cached_ranges_
bool EnqueueBuffer(BufferDescriptor *buffer)
RuntimeProfile::Counter * active_read_thread_counter_
Number of active read threads.
void SetData(const uint8_t *buffer, int64_t len)
Status Open()
Opens the file for this range. This function only modifies state in this range.
int64_t buffer_len_
length of buffer_. For buffers from cached reads, the length is 0.
static const int THREADS_PER_FLASH_DISK
static void CheckSseSupport()
WriteDoneCallback callback_
Callback to invoke after the write is complete.
std::string file_
Path to file being read or written.
int num_disks_with_ranges_
int64_t bytes_read_short_circuit(RequestContext *reader) const
bool ValidateAllInactive()
void AddRequestRange(DiskIoMgr::RequestRange *range, bool schedule_immediately)
int64_t scan_range_offset_
int num_local_disks() const
Returns the number of local disks attached to the system.
DiskIoMgr()
Create DiskIoMgr with default configs.
bool eosr_
true if the current scan range is complete
AtomicInt< int > num_remote_ranges_
RequestContext * GetNewContext()
static const int DEFAULT_QUEUE_CAPACITY
void SetMemTracker(MemTracker *tracker)
void ReturnFreeBuffer(char *buffer, int64_t buffer_size)
int num_threads_in_op() const
AtomicInt< int64_t > bytes_read_short_circuit_
Total number of bytes read via short circuit read, updated at end of each range scan.
std::string DebugString(const T &val)
void set_disks_access_bitmap(RequestContext *, RuntimeProfile::Counter *)
AtomicInt< int > num_unstarted_scan_ranges_
void HandleReadFinished(DiskQueue *, RequestContext *, BufferDescriptor *)
static const int64_t SSE4_2
MemTracker * process_mem_tracker_
Process memory tracker; needed to account for io buffers.
DECLARE_bool(disable_mem_pools)
std::list< RequestContext * > request_contexts
list of all request contexts that have work queued on this disk
void HandleWriteFinished(RequestContext *writer, WriteRange *write_range, const Status &write_status)
void set_bytes_read_counter(RequestContext *, RuntimeProfile::Counter *)
static int Ceil(int value, int divisor)
Returns the ceil of value/divisor.
const InternalQueue< RequestRange > * in_flight_ranges() const
ThreadGroup disk_thread_group_
Thread group containing all the worker threads.
Reader is initialized and maps to a client.
boost::mutex free_buffers_lock_
Protects free_buffers_ and free_buffer_descs_.
This class is thread-safe.
void ScheduleContext(RequestContext *context, int disk_id)
int num_remaining_ranges() const
void set_next_scan_range_to_start(ScanRange *range)
void Release(int64_t bytes)
Decreases consumption of this tracker and its ancestors by 'bytes'.
RuntimeProfile::Counter total_bytes_read_counter_
Total bytes read by the IoMgr.
boost::mutex lock
Lock that protects access to 'request_contexts' and 'work_available'.
BufferDescriptor(DiskIoMgr *io_mgr)
void ReturnContext(RequestContext *reader)
bool Validate() const
Validates invariants of reader. Reader lock must be taken beforehand.
RequestContext * reader_
Reader that this buffer is for.
int64_t len_
length of read contents
static const Status CANCELLED
std::vector< DiskQueue * > disk_queues_
const int max_buffer_size_
Maximum read size. This is also the maximum size of each allocated buffer.
Status Init(MemTracker *process_mem_tracker)
Initialize the IoMgr. Must be called once before any of the other APIs.
int64_t bytes_read_local(RequestContext *reader) const
static IntGauge * IO_MGR_NUM_BUFFERS
static const Status MEM_LIMIT_EXCEEDED
AtomicInt< int > num_used_buffers_
bool is_cancelled_
If true, this scan range has been cancelled.
static const int THREADS_PER_ROTATIONAL_DISK
Status RegisterContext(RequestContext **request_context, MemTracker *reader_mem_tracker=NULL)
RuntimeProfile::Counter * bytes_read_counter_
Total bytes read for this reader.
boost::condition_variable ready_to_start_ranges_cv_
void ReadRange(DiskQueue *disk_queue, RequestContext *reader, ScanRange *range)
Reads the specified scan range and calls HandleReadFinished when done.
State state_
Current state of the reader.
char * GetFreeBuffer(int64_t *buffer_size)
std::string DebugString()
Dumps the disk IoMgr queues (for readers and disks)
RuntimeProfile::Counter * read_timer_
Total time spent in hdfs reading.
AtomicInt< int64_t > bytes_read_local_
Total number of bytes read locally, updated at end of each range scan.
Status GetNext(BufferDescriptor **buffer)
ScanRange * scan_range_
Scan range that this buffer is for.
int64_t unexpected_remote_bytes(RequestContext *reader) const
void Consume(int64_t bytes)
Increases consumption of this tracker and its ancestors by 'bytes'.
Status AddThread(Thread *thread)
std::vector< PerDiskState > disk_states_
void set_read_timer(RequestContext *, RuntimeProfile::Counter *)
std::list< BufferDescriptor * > ready_buffers_
boost::condition_variable work_available
int disk_id_
Id of disk containing file_;.
BufferDescriptor * GetBufferDesc(RequestContext *reader, ScanRange *range, char *buffer, int64_t buffer_size)
void ScheduleScanRange(DiskIoMgr::ScanRange *range)
ObjectPool pool_
Pool to allocate BufferDescriptors.
Status ValidateScanRange(ScanRange *range)
Validates that range is correctly initialized.
AtomicInt< int > num_buffers_in_reader_
virtual void Add(int64_t delta)
bool GetNextRequestRange(DiskQueue *disk_queue, RequestRange **range, RequestContext **request_context)
DEFINE_int32(periodic_counter_update_period_ms, 500,"Period to update rate counters and"" sampling counters in ms")
AtomicInt< int64_t > unexpected_remote_bytes_
Total number of bytes from remote reads that were expected to be local.
std::vector< std::list< char * > > free_buffers_
int bytes_read_
Number of bytes read so far for this scan range.
void Cancel(const Status &status)
Cancels the context with status code 'status'.
RequestContextCache(DiskIoMgr *io_mgr)
MemTracker * mem_tracker_
Memory used for this reader. This is unowned by this object.
struct hadoopRzBuffer * cached_buffer_
int num_unstarted_ranges(RequestContext *reader) const
Returns the number of unstarted scan ranges for this reader.
static bool IsSupported(long flag)
Returns whether of not the cpu supports this flag.
void BitOr(int64_t delta)
Use this to update if the counter is a bitmap.
int disk_id
Disk id (0-based)
void WorkLoop(DiskQueue *queue)
Status ReadFromCache(bool *read_succeeded)
const InternalQueue< ScanRange > * unstarted_scan_ranges() const
AtomicInt< int > num_ready_buffers_
void Cancel(const Status &status)
RuntimeProfile::Counter read_timer_
Total time spent in hdfs reading.
Status context_status(RequestContext *context) const
static int Log2(uint64_t x)
void Close()
Closes the file for this range. This function only modifies state in this range.
InternalQueue< ScanRange > ready_to_start_ranges_