RSC  0.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
SynchronizedQueue.h
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is a part of RSC project
4  *
5  * Copyright (C) 2010 by Johannes Wienke <jwienke at techfak dot uni-bielefeld dot de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #pragma once
28 
29 #include <queue>
30 
31 #include <boost/format.hpp>
32 #include <boost/function.hpp>
33 #include <boost/noncopyable.hpp>
34 #include <boost/thread/condition.hpp>
35 #include <boost/thread/recursive_mutex.hpp>
36 
37 #include "InterruptedException.h"
38 #include "rsc/rscexports.h"
39 
40 namespace rsc {
41 namespace threading {
42 
48 class RSC_EXPORT QueueEmptyException: public std::runtime_error {
49 public:
51  explicit QueueEmptyException(const std::string& message);
52 };
53 
61 template<class M>
62 class SynchronizedQueue: public boost::noncopyable {
63 public:
64 
65  typedef boost::function<void(const M& drop)> dropHandlerType;
66 
67 private:
68 
69  volatile bool interrupted;
70  std::queue<M> queue;
71  mutable boost::recursive_mutex mutex;
72  boost::condition condition;
73  unsigned int sizeLimit;
75 
76 public:
77 
86  explicit SynchronizedQueue(const unsigned int& sizeLimit = 0,
89  }
90 
91  virtual ~SynchronizedQueue() {
92  // TODO what to do on destruction if still someone in waiting?
93  // Simply calling interrupt does not help because threads still access
94  // the queue variables at waking up from the interruption while this
95  // class is being destructed and the condition variable gets destructed
96  }
97 
104  void push(const M& message) {
105  {
106  boost::recursive_mutex::scoped_lock lock(mutex);
107  if (sizeLimit > 0) {
108  while (queue.size() > sizeLimit - 1) {
109  if (dropHandler) {
110  dropHandler(queue.front());
111  }
112  queue.pop();
113  }
114  }
115  queue.push(message);
116  }
117  condition.notify_one();
118  }
119 
134  M pop(const boost::uint32_t& timeoutMs = 0) {
135 
136  boost::recursive_mutex::scoped_lock lock(mutex);
137 
138  while (!interrupted && queue.empty()) {
139  if (timeoutMs == 0) {
140  condition.wait(lock);
141  } else {
142 #if BOOST_VERSION >= 105000
143  if (!condition.timed_wait(lock, boost::posix_time::milliseconds(timeoutMs))) {
144 #else
145  const boost::system_time timeout = boost::get_system_time()
146  + boost::posix_time::milliseconds(timeoutMs);
147  if (!condition.timed_wait(lock, timeout)) {
148 #endif
149  throw QueueEmptyException(boost::str(boost::format("No element available on queue within %d ms.") % timeoutMs));
150  }
151  }
152  }
153 
154  if (interrupted) {
155  throw InterruptedException("Queue was interrupted");
156  }
157 
158  M message = queue.front();
159  queue.pop();
160  return message;
161 
162  }
163 
171  M tryPop() {
172 
173  boost::recursive_mutex::scoped_lock lock(mutex);
174 
175  if (queue.empty()) {
176  throw QueueEmptyException();
177  }
178 
179  M message = queue.front();
180  queue.pop();
181  return message;
182 
183  }
184 
191  bool empty() const {
192  boost::recursive_mutex::scoped_lock lock(mutex);
193  return queue.empty();
194  }
195 
201  std::size_t size() const {
202  boost::recursive_mutex::scoped_lock lock(mutex);
203  return this->queue.size();
204  }
205 
209  void clear() {
210  boost::recursive_mutex::scoped_lock lock(mutex);
211  while (!this->queue.empty()) {
212  this->queue.pop();
213  }
214  }
215 
221  void interrupt() {
222  {
223  boost::recursive_mutex::scoped_lock lock(mutex);
224  interrupted = true;
225  }
226  condition.notify_all();
227  }
228 
229 };
230 
231 }
232 }