Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
disk-io-mgr-reader-context.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include "common/names.h"
18 
19 using namespace impala;
20 
22  DCHECK(!status.ok());
23 
24  // Callbacks are collected in this vector and invoked while no lock is held.
25  vector<WriteRange::WriteDoneCallback> write_callbacks;
26  {
27  lock_guard<mutex> lock(lock_);
28  DCHECK(Validate()) << endl << DebugString();
29 
30  // Already being cancelled
31  if (state_ == RequestContext::Cancelled) return;
32 
33  DCHECK(status_.ok());
34  status_ = status;
35 
36  // The reader will be put into a cancelled state until call cleanup is complete.
38 
39  // Cancel all scan ranges for this reader. Each range could be one one of
40  // four queues.
41  for (int i = 0; i < disk_states_.size(); ++i) {
43  RequestRange* range = NULL;
44  while ((range = state.in_flight_ranges()->Dequeue()) != NULL) {
45  if (range->request_type() == RequestType::READ) {
46  static_cast<ScanRange*>(range)->Cancel(status);
47  } else {
48  DCHECK(range->request_type() == RequestType::WRITE);
49  write_callbacks.push_back(static_cast<WriteRange*>(range)->callback_);
50  }
51  }
52 
53  ScanRange* scan_range;
54  while ((scan_range = state.unstarted_scan_ranges()->Dequeue()) != NULL) {
55  scan_range->Cancel(status);
56  }
57  WriteRange* write_range;
58  while ((write_range = state.unstarted_write_ranges()->Dequeue()) != NULL) {
59  write_callbacks.push_back(write_range->callback_);
60  }
61  }
62 
63  ScanRange* range = NULL;
64  while ((range = ready_to_start_ranges_.Dequeue()) != NULL) {
65  range->Cancel(status);
66  }
67  while ((range = blocked_ranges_.Dequeue()) != NULL) {
68  range->Cancel(status);
69  }
70  while ((range = cached_ranges_.Dequeue()) != NULL) {
71  range->Cancel(status);
72  }
73 
74  // Schedule reader on all disks. The disks will notice it is cancelled and do any
75  // required cleanup
76  for (int i = 0; i < disk_states_.size(); ++i) {
78  state.ScheduleContext(this, i);
79  }
80  }
81 
82  BOOST_FOREACH(const WriteRange::WriteDoneCallback& write_callback, write_callbacks) {
83  write_callback(status_);
84  }
85 
86  // Signal reader and unblock the GetNext/Read thread. That read will fail with
87  // a cancelled status.
88  ready_to_start_ranges_cv_.notify_all();
89 }
90 
92  DiskIoMgr::RequestRange* range, bool schedule_immediately) {
93  // DCHECK(lock_.is_locked()); // TODO: boost should have this API
94  RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
95  if (state.done()) {
96  DCHECK_EQ(state.num_remaining_ranges(), 0);
97  state.set_done(false);
98  ++num_disks_with_ranges_;
99  }
100 
101  bool schedule_context;
102  if (range->request_type() == RequestType::READ) {
103  DiskIoMgr::ScanRange* scan_range = static_cast<DiskIoMgr::ScanRange*>(range);
104  if (schedule_immediately) {
105  ScheduleScanRange(scan_range);
106  } else {
107  state.unstarted_scan_ranges()->Enqueue(scan_range);
108  ++num_unstarted_scan_ranges_;
109  }
110  // If next_scan_range_to_start is NULL, schedule this RequestContext so that it will
111  // be set. If it's not NULL, this context will be scheduled when GetNextRange() is
112  // invoked.
113  schedule_context = state.next_scan_range_to_start() == NULL;
114  } else {
115  DCHECK(range->request_type() == RequestType::WRITE);
116  DCHECK(!schedule_immediately);
117  DiskIoMgr::WriteRange* write_range = static_cast<DiskIoMgr::WriteRange*>(range);
118  state.unstarted_write_ranges()->Enqueue(write_range);
119 
120  // ScheduleContext() has no effect if the context is already scheduled,
121  // so this is safe.
122  schedule_context = true;
123  }
124 
125  if (schedule_context) state.ScheduleContext(this, range->disk_id());
126  ++state.num_remaining_ranges();
127 }
128 
130  : parent_(parent),
131  bytes_read_counter_(NULL),
132  read_timer_(NULL),
133  active_read_thread_counter_(NULL),
134  disks_accessed_bitmap_(NULL),
135  state_(Inactive),
136  disk_states_(num_disks) {
137 }
138 
139 // Resets this object.
141  DCHECK_EQ(state_, Inactive);
142  status_ = Status::OK;
143 
144  bytes_read_counter_ = NULL;
145  read_timer_ = NULL;
146  active_read_thread_counter_ = NULL;
147  disks_accessed_bitmap_ = NULL;
148 
149  state_ = Active;
150  mem_tracker_ = tracker;
151 
152  num_unstarted_scan_ranges_ = 0;
153  num_disks_with_ranges_ = 0;
154  num_used_buffers_ = 0;
155  num_buffers_in_reader_ = 0;
156  num_ready_buffers_ = 0;
157  total_range_queue_capacity_ = 0;
158  num_finished_ranges_ = 0;
159  num_remote_ranges_ = 0;
160  bytes_read_local_ = 0;
161  bytes_read_short_circuit_ = 0;
162  bytes_read_dn_cache_ = 0;
163  unexpected_remote_bytes_ = 0;
164  initial_queue_capacity_ = DiskIoMgr::DEFAULT_QUEUE_CAPACITY;
165 
166  DCHECK(ready_to_start_ranges_.empty());
167  DCHECK(blocked_ranges_.empty());
168  DCHECK(cached_ranges_.empty());
169 
170  for (int i = 0; i < disk_states_.size(); ++i) {
171  disk_states_[i].Reset();
172  }
173 }
174 
175 // Dumps out request context information. Lock should be taken by caller
177  stringstream ss;
178  ss << endl << " RequestContext: " << (void*)this << " (state=";
179  if (state_ == RequestContext::Inactive) ss << "Inactive";
180  if (state_ == RequestContext::Cancelled) ss << "Cancelled";
181  if (state_ == RequestContext::Active) ss << "Active";
182  if (state_ != RequestContext::Inactive) {
183  ss << " status_=" << (status_.ok() ? "OK" : status_.GetDetail())
184  << " #ready_buffers=" << num_ready_buffers_
185  << " #used_buffers=" << num_used_buffers_
186  << " #num_buffers_in_reader=" << num_buffers_in_reader_
187  << " #finished_scan_ranges=" << num_finished_ranges_
188  << " #disk_with_ranges=" << num_disks_with_ranges_
189  << " #disks=" << num_disks_with_ranges_;
190  for (int i = 0; i < disk_states_.size(); ++i) {
191  ss << endl << " " << i << ": "
192  << "is_on_queue=" << disk_states_[i].is_on_queue()
193  << " done=" << disk_states_[i].done()
194  << " #num_remaining_scan_ranges=" << disk_states_[i].num_remaining_ranges()
195  << " #in_flight_ranges=" << disk_states_[i].in_flight_ranges()->size()
196  << " #unstarted_scan_ranges=" << disk_states_[i].unstarted_scan_ranges()->size()
197  << " #unstarted_write_ranges="
198  << disk_states_[i].unstarted_write_ranges()->size()
199  << " #reading_threads=" << disk_states_[i].num_threads_in_op();
200  }
201  }
202  ss << ")";
203  return ss.str();
204 }
205 
207  if (state_ == RequestContext::Inactive) {
208  LOG(WARNING) << "state_ == RequestContext::Inactive";
209  return false;
210  }
211 
212  if (num_used_buffers_ < 0) {
213  LOG(WARNING) << "num_used_buffers_ < 0: #used=" << num_used_buffers_;
214  return false;
215  }
216 
217  if (num_ready_buffers_ < 0) {
218  LOG(WARNING) << "num_ready_buffers_ < 0: #used=" << num_ready_buffers_;
219  return false;
220  }
221 
222  int total_unstarted_ranges = 0;
223  for (int i = 0; i < disk_states_.size(); ++i) {
224  const PerDiskState& state = disk_states_[i];
225  bool on_queue = state.is_on_queue();
226  int num_reading_threads = state.num_threads_in_op();
227 
228  total_unstarted_ranges += state.unstarted_scan_ranges()->size();
229 
230  if (num_reading_threads < 0) {
231  LOG(WARNING) << "disk_id=" << i
232  << "state.num_threads_in_read < 0: #threads="
233  << num_reading_threads;
234  return false;
235  }
236 
237  if (state_ != RequestContext::Cancelled) {
238  if (state.unstarted_scan_ranges()->size() + state.in_flight_ranges()->size() >
239  state.num_remaining_ranges()) {
240  LOG(WARNING) << "disk_id=" << i
241  << " state.unstarted_ranges.size() + state.in_flight_ranges.size()"
242  << " > state.num_remaining_ranges:"
243  << " #unscheduled=" << state.unstarted_scan_ranges()->size()
244  << " #in_flight=" << state.in_flight_ranges()->size()
245  << " #remaining=" << state.num_remaining_ranges();
246  return false;
247  }
248 
249  // If we have an in_flight range, the reader must be on the queue or have a
250  // thread actively reading for it.
251  if (!state.in_flight_ranges()->empty() && !on_queue && num_reading_threads == 0) {
252  LOG(WARNING) << "disk_id=" << i
253  << " reader has inflight ranges but is not on the disk queue."
254  << " #in_flight_ranges=" << state.in_flight_ranges()->size()
255  << " #reading_threads=" << num_reading_threads
256  << " on_queue=" << on_queue;
257  return false;
258  }
259 
260  if (state.done() && num_reading_threads > 0) {
261  LOG(WARNING) << "disk_id=" << i
262  << " state set to done but there are still threads working."
263  << " #reading_threads=" << num_reading_threads;
264  return false;
265  }
266  } else {
267  // Is Cancelled
268  if (!state.in_flight_ranges()->empty()) {
269  LOG(WARNING) << "disk_id=" << i
270  << "Reader cancelled but has in flight ranges.";
271  return false;
272  }
273  if (!state.unstarted_scan_ranges()->empty()) {
274  LOG(WARNING) << "disk_id=" << i
275  << "Reader cancelled but has unstarted ranges.";
276  return false;
277  }
278  }
279 
280  if (state.done() && on_queue) {
281  LOG(WARNING) << "disk_id=" << i
282  << " state set to done but the reader is still on the disk queue."
283  << " state.done=true and state.is_on_queue=true";
284  return false;
285  }
286  }
287 
288  if (state_ != RequestContext::Cancelled) {
289  if (total_unstarted_ranges != num_unstarted_scan_ranges_) {
290  LOG(WARNING) << "total_unstarted_ranges=" << total_unstarted_ranges
291  << " sum_in_states=" << num_unstarted_scan_ranges_;
292  return false;
293  }
294  } else {
295  if (!ready_to_start_ranges_.empty()) {
296  LOG(WARNING) << "Reader cancelled but has ready to start ranges.";
297  return false;
298  }
299  if (!blocked_ranges_.empty()) {
300  LOG(WARNING) << "Reader cancelled but has blocked ranges.";
301  return false;
302  }
303  }
304 
305  return true;
306 }
std::string DebugString() const
Dumps out reader information. Lock should be taken by caller.
RequestType::type request_type() const
Definition: disk-io-mgr.h:270
InternalQueue< ScanRange > blocked_ranges_
Ranges that are blocked due to back pressure on outgoing buffers.
MemTracker tracker
boost::function< void(const Status &)> WriteDoneCallback
Definition: disk-io-mgr.h:464
Status status_
Status of this reader. Set to non-ok if cancelled.
const InternalQueue< WriteRange > * unstarted_write_ranges() const
InternalQueue< ScanRange > cached_ranges_
WriteDoneCallback callback_
Callback to invoke after the write is complete.
Definition: disk-io-mgr.h:480
void AddRequestRange(DiskIoMgr::RequestRange *range, bool schedule_immediately)
static const int DEFAULT_QUEUE_CAPACITY
Definition: disk-io-mgr.h:619
const InternalQueue< RequestRange > * in_flight_ranges() const
Reader is initialized and maps to a client.
This class is thread-safe.
Definition: mem-tracker.h:61
void ScheduleContext(RequestContext *context, int disk_id)
bool Validate() const
Validates invariants of reader. Reader lock must be taken beforehand.
void Reset(MemTracker *tracker)
Resets this object.
boost::condition_variable ready_to_start_ranges_cv_
RequestContext(DiskIoMgr *parent, int num_disks)
State state_
Current state of the reader.
static const Status OK
Definition: status.h:87
std::vector< PerDiskState > disk_states_
void Cancel(const Status &status)
Cancels the context with status code 'status'.
bool ok() const
Definition: status.h:172
const InternalQueue< ScanRange > * unstarted_scan_ranges() const
void Cancel(const Status &status)
RuntimeProfile::Counter read_timer_
Total time spent in hdfs reading.
Definition: disk-io-mgr.h:665
InternalQueue< ScanRange > ready_to_start_ranges_