RSC  0.19.0
OrderedQueueDispatcherPool.h
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is a part of RSC project
4  *
5  * Copyright (C) 2010 by Johannes Wienke <jwienke at techfak dot uni-bielefeld dot de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #pragma once
28 
29 #include <boost/bind.hpp>
30 #include <boost/thread/condition.hpp>
31 #include <boost/function.hpp>
32 #include <boost/thread/mutex.hpp>
33 #include <boost/shared_ptr.hpp>
34 #include <boost/thread.hpp>
35 
36 #include "../misc/IllegalStateException.h"
37 #include "SynchronizedQueue.h"
38 
39 namespace rsc {
40 namespace threading {
41 
64 template<class M, class R>
66 public:
67 
71  typedef boost::function<void(boost::shared_ptr<R>& receiver,
72  const M& message)> deliverFunction;
73 
79  typedef boost::function<bool(boost::shared_ptr<R>& receiver,
80  const M& message)> filterFunction;
81 
90  public:
91 
92  virtual ~DeliveryHandler() {
93  }
94 
101  virtual void
102  deliver(boost::shared_ptr<R>& receiver, const M& message) = 0;
103 
104  };
105 
106  typedef boost::shared_ptr<DeliveryHandler> DeliveryHandlerPtr;
107 
115  public:
116 
117  virtual ~FilterHandler() {
118  }
119 
128  virtual bool
129  filter(boost::shared_ptr<R>& receiver, const M& message) = 0;
130 
131  };
132 
133  typedef boost::shared_ptr<FilterHandler> FilterHandlerPtr;
134 
135 private:
136 
142  class TrueFilter: public FilterHandler {
143  public:
144  bool filter(boost::shared_ptr<R>& /*receiver*/, const M& /*message*/) {
145  return true;
146  }
147  };
148 
155  public:
156 
158  function(function) {
159  }
160 
161  bool filter(boost::shared_ptr<R>& receiver, const M& message) {
162  return function(receiver, message);
163  }
164 
165  private:
166  filterFunction function;
167  };
168 
176  public:
177 
179  function(function) {
180  }
181 
182  void deliver(boost::shared_ptr<R>& receiver, const M& message) {
183  function(receiver, message);
184  }
185 
186  private:
187  deliverFunction function;
188  };
189 
195  class Receiver {
196  public:
197 
198  Receiver(boost::shared_ptr<R> receiver) :
199  receiver(receiver), processing(false) {
200  }
201 
202  boost::shared_ptr<R> receiver;
203  // TODO think about if this really requires a synchronized queue if
204  // all message dispatching to worker threads is synchronized
206 
207  boost::condition processingCondition;
208 
217  volatile bool processing;
218 
219  };
220 
221  // TODO make this a set to only allow unique subscriptions?
222  boost::mutex receiversMutex;
223  std::vector<boost::shared_ptr<Receiver> > receivers;
225 
226  volatile bool jobsAvailable;
227  boost::condition jobsAvailableCondition;
228  volatile bool interrupted;
229 
230  volatile bool parallelCalls;
231 
232  volatile bool started;
233 
241  void nextJob(const unsigned int& /*workerNum*/,
242  boost::shared_ptr<Receiver>& receiver) {
243 
244  boost::mutex::scoped_lock lock(receiversMutex);
245 
246  // std::cout << "Worker " << workerNum << " requests a new job"
247  // << std::endl;
248 
249  // wait until a job is available
250  bool gotJob = false;
251  while (!gotJob) {
252 
253  while (!jobsAvailable && !interrupted) {
254  // std::cout << "Worker " << workerNum
255  // << ": no jobs available, waiting" << std::endl;
256  jobsAvailableCondition.wait(lock);
257  }
258 
259  if (interrupted) {
260  throw InterruptedException("Processing was interrupted");
261  }
262 
263  // search for the next job
264  for (size_t pos = 0; pos < receivers.size(); ++pos) {
265 
266  // TODO is this selection fair in any case?
267  ++currentPosition;
268  size_t realPos = currentPosition % receivers.size();
269 
270  // TODO maybe provide an atomic pop and tell if successful
271  // operation in SynchronizedQueue
272  if ((parallelCalls || !receivers[realPos]->processing)
273  && !receivers[realPos]->queue.empty()) {
274 
275  // found a job
276 
277  receiver = receivers[realPos];
278  receiver->processing = true;
279  gotJob = true;
280  break;
281 
282  }
283 
284  }
285 
286  // did not find a job, hence there are no other jobs right now
287  if (!gotJob) {
288  jobsAvailable = false;
289  }
290 
291  }
292 
293  // if I got a job there are certainly others interested in jobs
294  // std::cout << "Worker " << workerNum << ": got job for receiver"
295  // << receiver->receiver << ", notify_one" << std::endl;
296  lock.unlock();
297  jobsAvailableCondition.notify_one();
298 
299  }
300 
301  unsigned int threadPoolSize;
302 
303  void finishedWork(boost::shared_ptr<Receiver> receiver) {
304 
305  boost::mutex::scoped_lock lock(receiversMutex);
306  // changing this flag must already be locked as it is read by the
307  // globally synchronized nextJob method to determine if a job is
308  // available
309  receiver->processing = false;
310  receiver->processingCondition.notify_all();
311  // maybe avoid informing other threads about new jobs of the receiver
312  // will be removed by adding a flag? Right now they will start to search
313  // for a new job and may not find one.
314  if (!receiver->queue.empty()) {
315  jobsAvailable = true;
316  lock.unlock();
317  jobsAvailableCondition.notify_one();
318  }
319 
320  }
321 
325  void worker(const unsigned int& workerNum) {
326 
327  try {
328  while (true) {
329 
330  boost::shared_ptr<Receiver> receiver;
331  nextJob(workerNum, receiver);
332  M message = receiver->queue.pop();
333  // std::cout << "Worker " << workerNum << " got new job: "
334  // << message << " for receiver " << *(receiver->receiver)
335  // << std::endl;
336  if (filterHandler->filter(receiver->receiver, message)) {
337  deliveryHandler->deliver(receiver->receiver, message);
338  }
339  finishedWork(receiver);
340 
341  }
342  } catch (InterruptedException& e) {
343  // std::cout << "Worker " << workerNum << " was interrupted"
344  // << std::endl;
345  }
346 
347  }
348 
349  std::vector<boost::shared_ptr<boost::thread> > threadPool;
350 
351  DeliveryHandlerPtr deliveryHandler;
352  FilterHandlerPtr filterHandler;
353 
354 public:
355 
366  OrderedQueueDispatcherPool(const unsigned int& threadPoolSize,
367  deliverFunction delFunc) :
368  currentPosition(0), jobsAvailable(false), interrupted(false), parallelCalls(
369  false), started(false), threadPoolSize(threadPoolSize), deliveryHandler(
370  new DeliverFunctionAdapter(delFunc)), filterHandler(
371  new TrueFilter()) {
372  }
373 
386  OrderedQueueDispatcherPool(const unsigned int& threadPoolSize,
387  deliverFunction delFunc, filterFunction filterFunc) :
388  currentPosition(0), jobsAvailable(false), interrupted(false), parallelCalls(
389  false), started(false), threadPoolSize(threadPoolSize), deliveryHandler(
390  new DeliverFunctionAdapter(delFunc)), filterHandler(
391  new FilterFunctionAdapter(filterFunc)) {
392  }
393 
401  OrderedQueueDispatcherPool(const unsigned int& threadPoolSize,
402  DeliveryHandlerPtr deliveryHandler) :
403  currentPosition(0), jobsAvailable(false), interrupted(false), parallelCalls(
404  false), started(false), threadPoolSize(threadPoolSize), deliveryHandler(
405  deliveryHandler), filterHandler(new TrueFilter) {
406  }
407 
415  OrderedQueueDispatcherPool(const unsigned int& threadPoolSize,
416  DeliveryHandlerPtr deliveryHandler, FilterHandlerPtr filterHandler) :
417  currentPosition(0), jobsAvailable(false), interrupted(false), parallelCalls(
418  false), started(false), threadPoolSize(threadPoolSize), deliveryHandler(
419  deliveryHandler), filterHandler(filterHandler) {
420  }
421 
423  stop();
424  }
425 
435  void registerReceiver(boost::shared_ptr<R> receiver) {
436  boost::mutex::scoped_lock lock(receiversMutex);
437  boost::shared_ptr<Receiver> rec(new Receiver(receiver));
438  receivers.push_back(rec);
439  }
440 
447  bool unregisterReceiver(boost::shared_ptr<R> receiver) {
448 
449  boost::mutex::scoped_lock lock(receiversMutex);
450 
451  for (typename std::vector<boost::shared_ptr<Receiver> >::iterator it =
452  receivers.begin(); it != receivers.end(); ++it) {
453  boost::shared_ptr<Receiver> rec = *it;
454  if (rec->receiver == receiver) {
455  it = receivers.erase(it);
456  while (rec->processing) {
457  rec ->processingCondition.wait(lock);
458  }
459  return true;
460  }
461  }
462  return false;
463 
464  }
465 
482  void setParallelCalls(const bool& allow) {
483  parallelCalls = allow;
484  }
485 
492  void start() {
493 
494  boost::mutex::scoped_lock lock(receiversMutex);
495  if (started) {
496  throw rsc::misc::IllegalStateException("Pool already running");
497  }
498 
499  interrupted = false;
500 
501  for (unsigned int i = 0; i < threadPoolSize; ++i) {
502  boost::function<void()> workerMethod = boost::bind(
504  boost::shared_ptr<boost::thread> w(new boost::thread(workerMethod));
505  threadPool.push_back(w);
506  }
507 
508  started = true;
509 
510  }
511 
515  void stop() {
516 
517  {
518  boost::mutex::scoped_lock lock(receiversMutex);
519  interrupted = true;
520  }
521  jobsAvailableCondition.notify_all();
522 
523  for (unsigned int i = 0; i < threadPool.size(); ++i) {
524  threadPool[i]->join();
525  }
526  threadPool.clear();
527 
528  started = false;
529 
530  }
531 
537  void push(const M& message) {
538 
539  // std::cout << "new job " << message << std::endl;
540  {
541  boost::mutex::scoped_lock lock(receiversMutex);
542  for (typename std::vector<boost::shared_ptr<Receiver> >::iterator
543  it = receivers.begin(); it != receivers.end(); ++it) {
544  (*it)->queue.push(message);
545  }
546  jobsAvailable = true;
547  }
548  jobsAvailableCondition.notify_one();
549 
550  }
551 
552 };
553 
554 }
555 }
556 
bool filter(boost::shared_ptr< R > &receiver, const M &message)
A function that filters a message for a receiver.
void registerReceiver(boost::shared_ptr< R > receiver)
Registers a new receiver at the pool.
Exception indicating a call on a method where the underlying object is in an illegal state for this c...
A thread pool that dispatches messages to a list of receivers.
void deliver(boost::shared_ptr< R > &receiver, const M &message)
Requests this handler to deliver the message to the receiver.
boost::function< bool(boost::shared_ptr< R > &receiver, const M &message)> filterFunction
A function that filters a message for a receiver.
boost::shared_ptr< DeliveryHandler > DeliveryHandlerPtr
void finishedWork(boost::shared_ptr< Receiver > receiver)
OrderedQueueDispatcherPool(const unsigned int &threadPoolSize, DeliveryHandlerPtr deliveryHandler, FilterHandlerPtr filterHandler)
Constructs a new pool using the object-oriented handler interface.
A handler that is used to filter messages for a certain receiver.
void nextJob(const unsigned int &, boost::shared_ptr< Receiver > &receiver)
Returns the next job to process for worker threads and blocks if there is no job. ...
An exception thrown if a blocking operation was interrupted.
std::vector< boost::shared_ptr< Receiver > > receivers
volatile bool processing
Indicates whether a job for this worker is currently being processed and this receiver hence cannot b...
An adapter for function-based filter to the object-oriented interface.
void worker(const unsigned int &workerNum)
Threaded worker method.
Represents on registered receiver of the pool.
An adapter for function-based delivery handlers to the object-oriented interface. ...
void stop()
Blocking until every thread has stopped working.
A handler that is called whenever a message is received from the pool and should be passed to a recei...
std::vector< boost::shared_ptr< boost::thread > > threadPool
void setParallelCalls(const bool &allow)
Decides whether a single receiver might be called in parallel with overlapping calls or not...
boost::shared_ptr< FilterHandler > FilterHandlerPtr
OrderedQueueDispatcherPool(const unsigned int &threadPoolSize, deliverFunction delFunc, filterFunction filterFunc)
Constructs a new pool.
boost::function< void(boost::shared_ptr< R > &receiver, const M &message)> deliverFunction
A function that delivers a message to a receiver.
virtual void deliver(boost::shared_ptr< R > &receiver, const M &message)=0
Requests this handler to deliver the message to the receiver.
void push(const M &message)
Pushes a new message to be dispatched to all receivers in this pool.
bool unregisterReceiver(boost::shared_ptr< R > receiver)
Unregisters all registration of one receiver.
OrderedQueueDispatcherPool(const unsigned int &threadPoolSize, deliverFunction delFunc)
Constructs a new pool.
OrderedQueueDispatcherPool(const unsigned int &threadPoolSize, DeliveryHandlerPtr deliveryHandler)
Constructs a new pool using the object-oriented handler interface that accepts every message...
bool filter(boost::shared_ptr< R > &, const M &)
A function that filters a message for a receiver.
A queue with synchronized access and interruption support.