19 using namespace impala;
25 vector<WriteRange::WriteDoneCallback> write_callbacks;
27 lock_guard<mutex> lock(
lock_);
49 write_callbacks.push_back(static_cast<WriteRange*>(range)->callback_);
55 scan_range->
Cancel(status);
59 write_callbacks.push_back(write_range->
callback_);
98 ++num_disks_with_ranges_;
101 bool schedule_context;
104 if (schedule_immediately) {
105 ScheduleScanRange(scan_range);
108 ++num_unstarted_scan_ranges_;
116 DCHECK(!schedule_immediately);
122 schedule_context =
true;
131 bytes_read_counter_(NULL),
133 active_read_thread_counter_(NULL),
134 disks_accessed_bitmap_(NULL),
136 disk_states_(num_disks) {
141 DCHECK_EQ(state_, Inactive);
144 bytes_read_counter_ = NULL;
146 active_read_thread_counter_ = NULL;
147 disks_accessed_bitmap_ = NULL;
152 num_unstarted_scan_ranges_ = 0;
153 num_disks_with_ranges_ = 0;
154 num_used_buffers_ = 0;
155 num_buffers_in_reader_ = 0;
156 num_ready_buffers_ = 0;
157 total_range_queue_capacity_ = 0;
158 num_finished_ranges_ = 0;
159 num_remote_ranges_ = 0;
160 bytes_read_local_ = 0;
161 bytes_read_short_circuit_ = 0;
162 bytes_read_dn_cache_ = 0;
163 unexpected_remote_bytes_ = 0;
166 DCHECK(ready_to_start_ranges_.empty());
167 DCHECK(blocked_ranges_.empty());
168 DCHECK(cached_ranges_.empty());
170 for (
int i = 0; i < disk_states_.size(); ++i) {
171 disk_states_[i].Reset();
178 ss << endl <<
" RequestContext: " << (
void*)
this <<
" (state=";
183 ss <<
" status_=" << (status_.ok() ?
"OK" : status_.GetDetail())
184 <<
" #ready_buffers=" << num_ready_buffers_
185 <<
" #used_buffers=" << num_used_buffers_
186 <<
" #num_buffers_in_reader=" << num_buffers_in_reader_
187 <<
" #finished_scan_ranges=" << num_finished_ranges_
188 <<
" #disk_with_ranges=" << num_disks_with_ranges_
189 <<
" #disks=" << num_disks_with_ranges_;
190 for (
int i = 0; i < disk_states_.size(); ++i) {
191 ss << endl <<
" " << i <<
": "
192 <<
"is_on_queue=" << disk_states_[i].is_on_queue()
193 <<
" done=" << disk_states_[i].done()
194 <<
" #num_remaining_scan_ranges=" << disk_states_[i].num_remaining_ranges()
195 <<
" #in_flight_ranges=" << disk_states_[i].in_flight_ranges()->size()
196 <<
" #unstarted_scan_ranges=" << disk_states_[i].unstarted_scan_ranges()->size()
197 <<
" #unstarted_write_ranges="
198 << disk_states_[i].unstarted_write_ranges()->size()
199 <<
" #reading_threads=" << disk_states_[i].num_threads_in_op();
208 LOG(WARNING) <<
"state_ == RequestContext::Inactive";
212 if (num_used_buffers_ < 0) {
213 LOG(WARNING) <<
"num_used_buffers_ < 0: #used=" << num_used_buffers_;
217 if (num_ready_buffers_ < 0) {
218 LOG(WARNING) <<
"num_ready_buffers_ < 0: #used=" << num_ready_buffers_;
222 int total_unstarted_ranges = 0;
223 for (
int i = 0; i < disk_states_.size(); ++i) {
230 if (num_reading_threads < 0) {
231 LOG(WARNING) <<
"disk_id=" << i
232 <<
"state.num_threads_in_read < 0: #threads="
233 << num_reading_threads;
240 LOG(WARNING) <<
"disk_id=" << i
241 <<
" state.unstarted_ranges.size() + state.in_flight_ranges.size()"
242 <<
" > state.num_remaining_ranges:"
251 if (!state.
in_flight_ranges()->empty() && !on_queue && num_reading_threads == 0) {
252 LOG(WARNING) <<
"disk_id=" << i
253 <<
" reader has inflight ranges but is not on the disk queue."
255 <<
" #reading_threads=" << num_reading_threads
256 <<
" on_queue=" << on_queue;
260 if (state.
done() && num_reading_threads > 0) {
261 LOG(WARNING) <<
"disk_id=" << i
262 <<
" state set to done but there are still threads working."
263 <<
" #reading_threads=" << num_reading_threads;
269 LOG(WARNING) <<
"disk_id=" << i
270 <<
"Reader cancelled but has in flight ranges.";
274 LOG(WARNING) <<
"disk_id=" << i
275 <<
"Reader cancelled but has unstarted ranges.";
280 if (state.
done() && on_queue) {
281 LOG(WARNING) <<
"disk_id=" << i
282 <<
" state set to done but the reader is still on the disk queue."
283 <<
" state.done=true and state.is_on_queue=true";
289 if (total_unstarted_ranges != num_unstarted_scan_ranges_) {
290 LOG(WARNING) <<
"total_unstarted_ranges=" << total_unstarted_ranges
291 <<
" sum_in_states=" << num_unstarted_scan_ranges_;
295 if (!ready_to_start_ranges_.empty()) {
296 LOG(WARNING) <<
"Reader cancelled but has ready to start ranges.";
299 if (!blocked_ranges_.empty()) {
300 LOG(WARNING) <<
"Reader cancelled but has blocked ranges.";
std::string DebugString() const
Dumps out reader information. Lock should be taken by caller.
RequestType::type request_type() const
InternalQueue< ScanRange > blocked_ranges_
Ranges that are blocked due to back pressure on outgoing buffers.
boost::function< void(const Status &)> WriteDoneCallback
Status status_
Status of this reader. Set to non-ok if cancelled.
const InternalQueue< WriteRange > * unstarted_write_ranges() const
ScanRange * next_scan_range_to_start()
InternalQueue< ScanRange > cached_ranges_
WriteDoneCallback callback_
Callback to invoke after the write is complete.
void AddRequestRange(DiskIoMgr::RequestRange *range, bool schedule_immediately)
static const int DEFAULT_QUEUE_CAPACITY
int num_threads_in_op() const
const InternalQueue< RequestRange > * in_flight_ranges() const
Reader is initialized and maps to a client.
This class is thread-safe.
void ScheduleContext(RequestContext *context, int disk_id)
int num_remaining_ranges() const
bool Validate() const
Validates invariants of reader. Reader lock must be taken beforehand.
void Reset(MemTracker *tracker)
Resets this object.
boost::condition_variable ready_to_start_ranges_cv_
RequestContext(DiskIoMgr *parent, int num_disks)
State state_
Current state of the reader.
std::vector< PerDiskState > disk_states_
void Cancel(const Status &status)
Cancels the context with status code 'status'.
const InternalQueue< ScanRange > * unstarted_scan_ranges() const
void Cancel(const Status &status)
RuntimeProfile::Counter read_timer_
Total time spent in hdfs reading.
InternalQueue< ScanRange > ready_to_start_ranges_