Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
blocking-queue.h
Go to the documentation of this file.
1 // Copyright 2013 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_UTIL_BLOCKING_QUEUE_H
17 #define IMPALA_UTIL_BLOCKING_QUEUE_H
18 
19 #include <boost/thread/condition_variable.hpp>
20 #include <boost/thread/mutex.hpp>
21 #include <list>
22 #include <unistd.h>
23 
24 #include "util/stopwatch.h"
25 
26 namespace impala {
27 
30 
33 template <typename T>
35  public:
36  BlockingQueue(size_t max_elements)
37  : shutdown_(false),
38  max_elements_(max_elements),
41  }
42 
46  bool BlockingGet(T* out) {
47  MonotonicStopWatch timer;
48  boost::unique_lock<boost::mutex> unique_lock(lock_);
49 
50  while (true) {
51  if (!list_.empty()) {
52  *out = list_.front();
53  list_.pop_front();
55  unique_lock.unlock();
56  put_cv_.notify_one();
57  return true;
58  }
59  if (shutdown_) return false;
60 
61  timer.Start();
62  get_cv_.wait(unique_lock);
63  timer.Stop();
64  }
65  }
66 
69  bool BlockingPut(const T& val) {
70  MonotonicStopWatch timer;
71  boost::unique_lock<boost::mutex> unique_lock(lock_);
72 
73  while (list_.size() >= max_elements_ && !shutdown_) {
74  timer.Start();
75  put_cv_.wait(unique_lock);
76  timer.Stop();
77  }
79  if (shutdown_) return false;
80 
81  DCHECK_LT(list_.size(), max_elements_);
82  list_.push_back(val);
83  unique_lock.unlock();
84  get_cv_.notify_one();
85  return true;
86  }
87 
89  void Shutdown() {
90  {
91  boost::lock_guard<boost::mutex> guard(lock_);
92  shutdown_ = true;
93  }
94 
95  get_cv_.notify_all();
96  put_cv_.notify_all();
97  }
98 
99  uint32_t GetSize() const {
100  boost::unique_lock<boost::mutex> l(lock_);
101  return list_.size();
102  }
103 
106  boost::lock_guard<boost::mutex> guard(lock_);
107  return total_get_wait_time_;
108  }
109 
112  boost::lock_guard<boost::mutex> guard(lock_);
113  return total_put_wait_time_;
114  }
115 
116  private:
117  bool shutdown_;
118  const int max_elements_;
119  boost::condition_variable get_cv_; // 'get' callers wait on this
120  boost::condition_variable put_cv_; // 'put' callers wait on this
122  mutable boost::mutex lock_;
123  std::list<T> list_;
126 };
127 
128 }
129 
130 #endif
std::list< T > list_
uint64_t total_get_wait_time() const
Returns the total amount of time threads have blocked in BlockingGet.
boost::condition_variable get_cv_
uint64_t total_put_wait_time() const
Returns the total amount of time threads have blocked in BlockingPut.
bool BlockingPut(const T &val)
uint32_t GetSize() const
BlockingQueue(size_t max_elements)
uint64_t ElapsedTime() const
Returns time in nanosecond.
Definition: stopwatch.h:105
boost::condition_variable put_cv_
void Shutdown()
Shut down the queue. Wakes up all threads waiting on BlockingGet or BlockingPut.
bool BlockingGet(T *out)
boost::mutex lock_
lock_ guards access to list_, total_get_wait_time, and total_put_wait_time