31 #include <boost/format.hpp>
32 #include <boost/function.hpp>
33 #include <boost/noncopyable.hpp>
34 #include <boost/thread/condition.hpp>
35 #include <boost/thread/recursive_mutex.hpp>
38 #include "rsc/rscexports.h"
71 mutable boost::recursive_mutex
mutex;
106 boost::recursive_mutex::scoped_lock lock(this->
mutex);
115 this->
queue.push(message);
134 M
peek(
const boost::uint32_t& timeoutMs = 0) {
136 boost::recursive_mutex::scoped_lock lock(this->
mutex);
139 if (timeoutMs == 0) {
142 #if BOOST_VERSION >= 105000
143 if (!this->
condition.timed_wait(lock, boost::posix_time::milliseconds(timeoutMs))) {
145 const boost::system_time timeout = boost::get_system_time()
146 + boost::posix_time::milliseconds(timeoutMs);
147 if (!this->
condition.timed_wait(lock, timeout)) {
149 throw QueueEmptyException(boost::str(boost::format(
"No element available on queue within %d ms.") % timeoutMs));
158 return this->
queue.front();
175 M
pop(
const boost::uint32_t& timeoutMs = 0) {
177 boost::recursive_mutex::scoped_lock lock(this->
mutex);
180 if (timeoutMs == 0) {
183 #if BOOST_VERSION >= 105000
184 if (!this->
condition.timed_wait(lock, boost::posix_time::milliseconds(timeoutMs))) {
186 const boost::system_time timeout = boost::get_system_time()
187 + boost::posix_time::milliseconds(timeoutMs);
188 if (!this->
condition.timed_wait(lock, timeout)) {
190 throw QueueEmptyException(boost::str(boost::format(
"No element available on queue within %d ms.") % timeoutMs));
199 M message = this->
queue.front();
214 boost::recursive_mutex::scoped_lock lock(this->
mutex);
216 if (this->
queue.empty()) {
220 M message = this->
queue.front();
233 boost::recursive_mutex::scoped_lock lock(this->
mutex);
234 return this->
queue.empty();
243 boost::recursive_mutex::scoped_lock lock(this->
mutex);
244 return this->
queue.size();
251 boost::recursive_mutex::scoped_lock lock(this->
mutex);
252 while (!this->
queue.empty()) {
264 boost::recursive_mutex::scoped_lock lock(this->
mutex);
SynchronizedQueue(const unsigned int &sizeLimit=0, dropHandlerType dropHandler=0)
Creates a new queue.
boost::condition condition
M pop(const boost::uint32_t &timeoutMs=0)
Returns the next element form the queue and wait until there is such an element.
virtual ~SynchronizedQueue()
boost::function< void(const M &drop)> dropHandlerType
bool empty() const
Checks whether this queue is empty.
An exception thrown if a blocking operation was interrupted.
M tryPop()
Tries to pop an element from the queue but does not wait if there is no element on the queue...
Indicates that a queue was empty while trying to pop an element from it.
M peek(const boost::uint32_t &timeoutMs=0)
Returns the element at the front of the queue without removing it, waiting until there is such an ele...
boost::recursive_mutex mutex
void interrupt()
Interrupts the processing on this queue for every current call to pop and any later call...
void clear()
Remove all elements from the queue.
volatile bool interrupted
dropHandlerType dropHandler
std::size_t size() const
Return element count of the queue.
A queue with synchronized access and interruption support.
void push(const M &message)
Pushes a new element on the queue and wakes up one waiting client if there is one.