31 #include <boost/format.hpp> 32 #include <boost/function.hpp> 33 #include <boost/noncopyable.hpp> 34 #include <boost/thread/condition.hpp> 35 #include <boost/thread/recursive_mutex.hpp> 38 #include "rsc/rscexports.h" 71 mutable boost::recursive_mutex
mutex;
87 dropHandlerType dropHandler = 0) :
88 interrupted(false), sizeLimit(sizeLimit), dropHandler(dropHandler) {
106 boost::recursive_mutex::scoped_lock lock(this->mutex);
107 if (this->sizeLimit > 0) {
108 while (this->queue.size() > this->sizeLimit - 1) {
109 if (this->dropHandler) {
110 this->dropHandler(this->queue.front());
115 this->queue.push(message);
117 this->condition.notify_one();
134 M
peek(
const boost::uint32_t& timeoutMs = 0) {
136 boost::recursive_mutex::scoped_lock lock(this->mutex);
138 while (!this->interrupted && this->queue.empty()) {
139 if (timeoutMs == 0) {
140 this->condition.wait(lock);
142 #if BOOST_VERSION >= 105000 143 if (!this->condition.timed_wait(lock, boost::posix_time::milliseconds(timeoutMs))) {
145 const boost::system_time timeout = boost::get_system_time()
146 + boost::posix_time::milliseconds(timeoutMs);
147 if (!this->condition.timed_wait(lock, timeout)) {
149 throw QueueEmptyException(boost::str(boost::format(
"No element available on queue within %d ms.") % timeoutMs));
154 if (this->interrupted) {
158 return this->queue.front();
175 M
pop(
const boost::uint32_t& timeoutMs = 0) {
177 boost::recursive_mutex::scoped_lock lock(this->mutex);
179 while (!this->interrupted && this->queue.empty()) {
180 if (timeoutMs == 0) {
181 this->condition.wait(lock);
183 #if BOOST_VERSION >= 105000 184 if (!this->condition.timed_wait(lock, boost::posix_time::milliseconds(timeoutMs))) {
186 const boost::system_time timeout = boost::get_system_time()
187 + boost::posix_time::milliseconds(timeoutMs);
188 if (!this->condition.timed_wait(lock, timeout)) {
190 throw QueueEmptyException(boost::str(boost::format(
"No element available on queue within %d ms.") % timeoutMs));
195 if (this->interrupted) {
199 M message = this->queue.front();
214 boost::recursive_mutex::scoped_lock lock(this->mutex);
216 if (this->queue.empty()) {
220 M message = this->queue.front();
233 boost::recursive_mutex::scoped_lock lock(this->mutex);
234 return this->queue.empty();
243 boost::recursive_mutex::scoped_lock lock(this->mutex);
244 return this->queue.size();
251 boost::recursive_mutex::scoped_lock lock(this->mutex);
252 while (!this->queue.empty()) {
264 boost::recursive_mutex::scoped_lock lock(this->mutex);
265 this->interrupted =
true;
267 this->condition.notify_all();
SynchronizedQueue(const unsigned int &sizeLimit=0, dropHandlerType dropHandler=0)
Creates a new queue.
boost::condition condition
M pop(const boost::uint32_t &timeoutMs=0)
Returns the next element form the queue and wait until there is such an element.
virtual ~SynchronizedQueue()
boost::function< void(const M &drop)> dropHandlerType
bool empty() const
Checks whether this queue is empty.
An exception thrown if a blocking operation was interrupted.
M tryPop()
Tries to pop an element from the queue but does not wait if there is no element on the queue...
Indicates that a queue was empty while trying to pop an element from it.
M peek(const boost::uint32_t &timeoutMs=0)
Returns the element at the front of the queue without removing it, waiting until there is such an ele...
boost::recursive_mutex mutex
void interrupt()
Interrupts the processing on this queue for every current call to pop and any later call...
void clear()
Remove all elements from the queue.
volatile bool interrupted
dropHandlerType dropHandler
std::size_t size() const
Return element count of the queue.
A queue with synchronized access and interruption support.
void push(const M &message)
Pushes a new element on the queue and wakes up one waiting client if there is one.