RSC  0.7.17
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
SynchronizedQueue.h
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is a part of RSC project
4  *
5  * Copyright (C) 2010 by Johannes Wienke <jwienke at techfak dot uni-bielefeld dot de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #pragma once
28 
29 #include <queue>
30 
31 #include <boost/format.hpp>
32 #include <boost/function.hpp>
33 #include <boost/noncopyable.hpp>
34 #include <boost/thread/condition.hpp>
35 #include <boost/thread/recursive_mutex.hpp>
36 
37 #include "InterruptedException.h"
38 #include "rsc/rscexports.h"
39 
40 namespace rsc {
41 namespace threading {
42 
48 class RSC_EXPORT QueueEmptyException: public std::runtime_error {
49 public:
51  explicit QueueEmptyException(const std::string& message);
52 };
53 
61 template<class M>
62 class SynchronizedQueue: public boost::noncopyable {
63 public:
64 
65  typedef boost::function<void(const M& drop)> dropHandlerType;
66 
67 private:
68 
69  volatile bool interrupted;
70  std::queue<M> queue;
71  mutable boost::recursive_mutex mutex;
72  boost::condition condition;
73  unsigned int sizeLimit;
75 
76 public:
77 
86  explicit SynchronizedQueue(const unsigned int& sizeLimit = 0,
89  }
90 
91  virtual ~SynchronizedQueue() {
92  // TODO what to do on destruction if still someone in waiting?
93  // Simply calling interrupt does not help because threads still access
94  // the queue variables at waking up from the interruption while this
95  // class is being destructed and the condition variable gets destructed
96  }
97 
104  void push(const M& message) {
105  {
106  boost::recursive_mutex::scoped_lock lock(mutex);
107  if (sizeLimit > 0) {
108  while (queue.size() > sizeLimit - 1) {
109  if (dropHandler) {
110  dropHandler(queue.front());
111  }
112  queue.pop();
113  }
114  }
115  queue.push(message);
116  }
117  condition.notify_one();
118  }
119 
134  M pop(const boost::uint32_t& timeoutMs = 0) {
135 
136  boost::recursive_mutex::scoped_lock lock(mutex);
137 
138  while (!interrupted && queue.empty()) {
139  if (timeoutMs == 0) {
140  condition.wait(lock);
141  } else {
142 #if BOOST_VERSION >= 105000
143  if (!condition.timed_wait(lock, boost::posix_time::milliseconds(timeoutMs))) {
144 #else
145  const boost::system_time timeout = boost::get_system_time()
146  + boost::posix_time::milliseconds(timeoutMs);
147  if (!condition.timed_wait(lock, timeout)) {
148 #endif
149  throw QueueEmptyException(boost::str(boost::format("No element available on queue within %d ms.") % timeoutMs));
150  }
151  }
152  }
153 
154  if (interrupted) {
155  throw InterruptedException("Queue was interrupted");
156  }
157 
158  M message = queue.front();
159  queue.pop();
160  return message;
161 
162  }
163 
171  M tryPop() {
172 
173  boost::recursive_mutex::scoped_lock lock(mutex);
174 
175  if (queue.empty()) {
176  throw QueueEmptyException();
177  }
178 
179  M message = queue.front();
180  queue.pop();
181  return message;
182 
183  }
184 
191  bool empty() const {
192  boost::recursive_mutex::scoped_lock lock(mutex);
193  return queue.empty();
194  }
195 
201  std::size_t size() const {
202  boost::recursive_mutex::scoped_lock lock(mutex);
203  return this->queue.size();
204  }
205 
209  void clear() {
210  boost::recursive_mutex::scoped_lock lock(mutex);
211  while (!this->queue.empty()) {
212  this->queue.pop();
213  }
214  }
215 
221  void interrupt() {
222  {
223  boost::recursive_mutex::scoped_lock lock(mutex);
224  interrupted = true;
225  }
226  condition.notify_all();
227  }
228 
229 };
230 
231 }
232 }
SynchronizedQueue(const unsigned int &sizeLimit=0, dropHandlerType dropHandler=0)
Creates a new queue.
M pop(const boost::uint32_t &timeoutMs=0)
Returns the next element form the queue and wait until there is such an element.
boost::function< void(const M &drop)> dropHandlerType
bool empty() const
Checks whether this queue is empty.
An exception thrown if a blocking operation was interrupted.
M tryPop()
Tries to pop an element from the queue but does not wait if there is no element on the queue...
Indicates that a queue was empty while trying to pop an element from it.
void interrupt()
Interrupts the processing on this queue for every current call to pop and any later call...
void clear()
Remove all elements from the queue.
std::size_t size() const
Return element count of the queue.
A queue with synchronized access and interruption support.
void push(const M &message)
Pushes a new element on the queue and wakes up one waiting client if there is one.