RSC  0.17.1
SynchronizedQueue.h
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is a part of RSC project
4  *
5  * Copyright (C) 2010 by Johannes Wienke <jwienke at techfak dot uni-bielefeld dot de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #pragma once
28 
29 #include <queue>
30 
31 #include <boost/format.hpp>
32 #include <boost/function.hpp>
33 #include <boost/noncopyable.hpp>
34 #include <boost/thread/condition.hpp>
35 #include <boost/thread/recursive_mutex.hpp>
36 
37 #include "InterruptedException.h"
38 #include "rsc/rscexports.h"
39 
40 namespace rsc {
41 namespace threading {
42 
48 class RSC_EXPORT QueueEmptyException: public std::runtime_error {
49 public:
51  explicit QueueEmptyException(const std::string& message);
52 };
53 
61 template<class M>
62 class SynchronizedQueue: public boost::noncopyable {
63 public:
64 
65  typedef boost::function<void(const M& drop)> dropHandlerType;
66 
67 private:
68 
69  volatile bool interrupted;
70  std::queue<M> queue;
71  mutable boost::recursive_mutex mutex;
72  boost::condition condition;
73  unsigned int sizeLimit;
74  dropHandlerType dropHandler;
75 
76 public:
77 
86  explicit SynchronizedQueue(const unsigned int& sizeLimit = 0,
87  dropHandlerType dropHandler = 0) :
88  interrupted(false), sizeLimit(sizeLimit), dropHandler(dropHandler) {
89  }
90 
91  virtual ~SynchronizedQueue() {
92  // TODO what to do on destruction if still someone in waiting?
93  // Simply calling interrupt does not help because threads still access
94  // the queue variables at waking up from the interruption while this
95  // class is being destructed and the condition variable gets destructed
96  }
97 
104  void push(const M& message) {
105  {
106  boost::recursive_mutex::scoped_lock lock(this->mutex);
107  if (this->sizeLimit > 0) {
108  while (this->queue.size() > this->sizeLimit - 1) {
109  if (this->dropHandler) {
110  this->dropHandler(this->queue.front());
111  }
112  this->queue.pop();
113  }
114  }
115  this->queue.push(message);
116  }
117  this->condition.notify_one();
118  }
119 
134  M peek(const boost::uint32_t& timeoutMs = 0) {
135 
136  boost::recursive_mutex::scoped_lock lock(this->mutex);
137 
138  while (!this->interrupted && this->queue.empty()) {
139  if (timeoutMs == 0) {
140  this->condition.wait(lock);
141  } else {
142 #if BOOST_VERSION >= 105000
143  if (!this->condition.timed_wait(lock, boost::posix_time::milliseconds(timeoutMs))) {
144 #else
145  const boost::system_time timeout = boost::get_system_time()
146  + boost::posix_time::milliseconds(timeoutMs);
147  if (!this->condition.timed_wait(lock, timeout)) {
148 #endif
149  throw QueueEmptyException(boost::str(boost::format("No element available on queue within %d ms.") % timeoutMs));
150  }
151  }
152  }
153 
154  if (this->interrupted) {
155  throw InterruptedException("Queue was interrupted");
156  }
157 
158  return this->queue.front();
159  }
160 
175  M pop(const boost::uint32_t& timeoutMs = 0) {
176 
177  boost::recursive_mutex::scoped_lock lock(this->mutex);
178 
179  while (!this->interrupted && this->queue.empty()) {
180  if (timeoutMs == 0) {
181  this->condition.wait(lock);
182  } else {
183 #if BOOST_VERSION >= 105000
184  if (!this->condition.timed_wait(lock, boost::posix_time::milliseconds(timeoutMs))) {
185 #else
186  const boost::system_time timeout = boost::get_system_time()
187  + boost::posix_time::milliseconds(timeoutMs);
188  if (!this->condition.timed_wait(lock, timeout)) {
189 #endif
190  throw QueueEmptyException(boost::str(boost::format("No element available on queue within %d ms.") % timeoutMs));
191  }
192  }
193  }
194 
195  if (this->interrupted) {
196  throw InterruptedException("Queue was interrupted");
197  }
198 
199  M message = this->queue.front();
200  this->queue.pop();
201  return message;
202 
203  }
204 
212  M tryPop() {
213 
214  boost::recursive_mutex::scoped_lock lock(this->mutex);
215 
216  if (this->queue.empty()) {
217  throw QueueEmptyException();
218  }
219 
220  M message = this->queue.front();
221  this->queue.pop();
222  return message;
223 
224  }
225 
232  bool empty() const {
233  boost::recursive_mutex::scoped_lock lock(this->mutex);
234  return this->queue.empty();
235  }
236 
242  std::size_t size() const {
243  boost::recursive_mutex::scoped_lock lock(this->mutex);
244  return this->queue.size();
245  }
246 
250  void clear() {
251  boost::recursive_mutex::scoped_lock lock(this->mutex);
252  while (!this->queue.empty()) {
253  this->queue.pop();
254  }
255  }
256 
262  void interrupt() {
263  {
264  boost::recursive_mutex::scoped_lock lock(this->mutex);
265  this->interrupted = true;
266  }
267  this->condition.notify_all();
268  }
269 
270 };
271 
272 }
273 }
SynchronizedQueue(const unsigned int &sizeLimit=0, dropHandlerType dropHandler=0)
Creates a new queue.
M pop(const boost::uint32_t &timeoutMs=0)
Returns the next element form the queue and wait until there is such an element.
boost::function< void(const M &drop)> dropHandlerType
bool empty() const
Checks whether this queue is empty.
An exception thrown if a blocking operation was interrupted.
M tryPop()
Tries to pop an element from the queue but does not wait if there is no element on the queue...
Indicates that a queue was empty while trying to pop an element from it.
M peek(const boost::uint32_t &timeoutMs=0)
Returns the element at the front of the queue without removing it, waiting until there is such an ele...
void interrupt()
Interrupts the processing on this queue for every current call to pop and any later call...
void clear()
Remove all elements from the queue.
std::size_t size() const
Return element count of the queue.
A queue with synchronized access and interruption support.
void push(const M &message)
Pushes a new element on the queue and wakes up one waiting client if there is one.