Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
disk-io-mgr-internal.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef IMPALA_RUNTIME_DISK_IO_MGR_INTERNAL_H
16 #define IMPALA_RUNTIME_DISK_IO_MGR_INTERNAL_H
17 
18 #include "disk-io-mgr.h"
19 #include <queue>
20 #include <boost/thread/locks.hpp>
21 #include <unistd.h>
22 #include <gutil/strings/substitute.h>
23 
24 #include "common/logging.h"
25 #include "runtime/mem-tracker.h"
27 #include "util/cpu-info.h"
28 #include "util/debug-util.h"
29 #include "util/disk-info.h"
30 #include "util/hdfs-util.h"
31 #include "util/filesystem-util.h"
32 #include "util/impalad-metrics.h"
33 
36 namespace impala {
37 
41  int disk_id;
42 
44  boost::mutex lock;
45 
50  boost::condition_variable work_available;
51 
53  std::list<RequestContext*> request_contexts;
54 
56  inline void EnqueueContext(RequestContext* worker) {
57  {
58  boost::unique_lock<boost::mutex> disk_lock(lock);
60  DCHECK(find(request_contexts.begin(), request_contexts.end(), worker) ==
61  request_contexts.end());
62  request_contexts.push_back(worker);
63  }
64  work_available.notify_all();
65  }
66 
67  DiskQueue(int id) : disk_id(id) { }
68 };
69 
88 //
93 //
99 //
104 //
109 //
113 //
123  public:
124  enum State {
127 
132 
136  };
137 
138  RequestContext(DiskIoMgr* parent, int num_disks);
139 
141  void Reset(MemTracker* tracker);
142 
147  // boost doesn't let us dcheck that the reader lock is taken
148  DCHECK_GT(num_disks_with_ranges_, 0);
149  if (--num_disks_with_ranges_ == 0) {
150  disks_complete_cond_var_.notify_one();
151  }
152  DCHECK(Validate()) << std::endl << DebugString();
153  }
154 
161 
166  DCHECK_EQ(state_, Active);
167  DCHECK(range != NULL);
169  state.in_flight_ranges()->Enqueue(range);
170  state.ScheduleContext(this, range->disk_id());
171  }
172 
174  void Cancel(const Status& status);
175 
178  void AddRequestRange(DiskIoMgr::RequestRange* range, bool schedule_immediately);
179 
183 
185  bool Validate() const;
186 
188  std::string DebugString() const;
189 
190  private:
191  friend class DiskIoMgr;
193 
196 
199 
202 
205 
208 
213 
216 
219 
222 
225 
229 
232 
236 
240 
245 
250 
257 
264 
267  boost::mutex lock_;
268 
271 
274 
278 
283 
292  boost::condition_variable ready_to_start_ranges_cv_; // used with lock_
293 
296 
298  boost::condition_variable disks_complete_cond_var_;
299 
302  class PerDiskState {
303  public:
304  bool done() const { return done_; }
305  void set_done(bool b) { done_ = b; }
306 
309 
313  }
314 
318  bool is_on_queue() const {
319  bool b = is_on_queue_;
320  __sync_synchronize();
321  return b;
322  }
323 
324  int num_threads_in_op() const {
325  int v = num_threads_in_op_;
326  __sync_synchronize();
327  return v;
328  }
329 
331  return &unstarted_scan_ranges_;
332  }
334  return &unstarted_write_ranges_;
335  }
337  return &in_flight_ranges_;
338  }
339 
342  return &unstarted_write_ranges_;
343  }
345 
347  Reset();
348  }
349 
352  void ScheduleContext(RequestContext* context, int disk_id) {
353  if (!is_on_queue_ && !done_) {
354  is_on_queue_ = true;
355  context->parent_->disk_queues_[disk_id]->EnqueueContext(context);
356  }
357  }
358 
364  is_on_queue_ = false;
365  }
366 
369  }
370 
375  // We don't need to worry about reordered loads here because updating
376  // num_threads_in_request_ uses an atomic, which is a barrier.
377  if (!is_on_queue_ && num_threads_in_op_ == 0 && !done_) {
378  // This thread is the last one for this reader on this disk, do final cleanup
379  context->DecrementDiskRefCount();
380  done_ = true;
381  }
382  }
383 
384  void Reset() {
385  DCHECK(in_flight_ranges_.empty());
386  DCHECK(unstarted_scan_ranges_.empty());
387  DCHECK(unstarted_write_ranges_.empty());
388 
389  done_ = true;
391  is_on_queue_ = false;
392  num_threads_in_op_ = 0;
394  }
395 
396  private:
401  bool done_;
402 
416 
422 
426 
434 
444 
452 
460  };
461 
464  std::vector<PerDiskState> disk_states_;
465 };
466 
467 }
468 
469 #endif
std::string DebugString() const
Dumps out reader information. Lock should be taken by caller.
InternalQueue< RequestRange > * in_flight_ranges()
InternalQueue< ScanRange > blocked_ranges_
Ranges that are blocked due to back pressure on outgoing buffers.
MemTracker tracker
InternalQueue< ScanRange > * unstarted_scan_ranges()
InternalQueue< WriteRange > * unstarted_write_ranges()
Status status_
Status of this reader. Set to non-ok if cancelled.
void DecrementRequestThreadAndCheckDone(RequestContext *context)
boost::condition_variable disks_complete_cond_var_
Condition variable for UnregisterContext() to wait for all disks to complete.
const InternalQueue< WriteRange > * unstarted_write_ranges() const
AtomicInt< int64_t > bytes_read_dn_cache_
Total number of bytes read from date node cache, updated at end of each range scan.
RuntimeProfile::Counter * disks_accessed_bitmap_
InternalQueue< ScanRange > cached_ranges_
RuntimeProfile::Counter * active_read_thread_counter_
Number of active read threads.
AtomicInt< int > num_finished_ranges_
The number of scan ranges that have been completed for this reader.
void AddRequestRange(DiskIoMgr::RequestRange *range, bool schedule_immediately)
void EnqueueContext(RequestContext *worker)
Enqueue the request context to the disk queue. The DiskQueue lock must not be taken.
AtomicInt< int64_t > bytes_read_short_circuit_
Total number of bytes read via short circuit read, updated at end of each range scan.
std::list< RequestContext * > request_contexts
list of all request contexts that have work queued on this disk
const InternalQueue< RequestRange > * in_flight_ranges() const
Reader is initialized and maps to a client.
This class is thread-safe.
Definition: mem-tracker.h:61
void ScheduleContext(RequestContext *context, int disk_id)
boost::mutex lock
Lock that protects access to 'request_contexts' and 'work_available'.
bool Validate() const
Validates invariants of reader. Reader lock must be taken beforehand.
std::vector< DiskQueue * > disk_queues_
Definition: disk-io-mgr.h:701
void Reset(MemTracker *tracker)
Resets this object.
RuntimeProfile::Counter * bytes_read_counter_
Total bytes read for this reader.
boost::condition_variable ready_to_start_ranges_cv_
RequestContext(DiskIoMgr *parent, int num_disks)
T must be a subclass of InternalQueue::Node.
State state_
Current state of the reader.
RuntimeProfile::Counter * read_timer_
Total time spent in hdfs reading.
AtomicInt< int64_t > bytes_read_local_
Total number of bytes read locally, updated at end of each range scan.
std::vector< PerDiskState > disk_states_
boost::condition_variable work_available
void ScheduleScanRange(DiskIoMgr::ScanRange *range)
AtomicInt< int64_t > unexpected_remote_bytes_
Total number of bytes from remote reads that were expected to be local.
void Cancel(const Status &status)
Cancels the context with status code 'status'.
MemTracker * mem_tracker_
Memory used for this reader. This is unowned by this object.
const InternalQueue< ScanRange > * unstarted_scan_ranges() const
InternalQueue< ScanRange > ready_to_start_ranges_
DiskIoMgr * parent_
Parent object.