RSC  0.7.17
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
OrderedQueueDispatcherPool.h
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is a part of RSC project
4  *
5  * Copyright (C) 2010 by Johannes Wienke <jwienke at techfak dot uni-bielefeld dot de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #pragma once
28 
29 #include <boost/bind.hpp>
30 #include <boost/thread/condition.hpp>
31 #include <boost/function.hpp>
32 #include <boost/thread/mutex.hpp>
33 #include <boost/shared_ptr.hpp>
34 #include <boost/thread.hpp>
35 
36 #include "../misc/IllegalStateException.h"
37 #include "SynchronizedQueue.h"
38 
39 namespace rsc {
40 namespace threading {
41 
64 template<class M, class R>
66 public:
67 
71  typedef boost::function<void(boost::shared_ptr<R>& receiver,
72  const M& message)> deliverFunction;
73 
79  typedef boost::function<bool(boost::shared_ptr<R>& receiver,
80  const M& message)> filterFunction;
81 
90  public:
91 
92  virtual ~DeliveryHandler() {
93  }
94 
101  virtual void
102  deliver(boost::shared_ptr<R>& receiver, const M& message) = 0;
103 
104  };
105 
106  typedef boost::shared_ptr<DeliveryHandler> DeliveryHandlerPtr;
107 
115  public:
116 
117  virtual ~FilterHandler() {
118  }
119 
128  virtual bool
129  filter(boost::shared_ptr<R>& receiver, const M& message) = 0;
130 
131  };
132 
133  typedef boost::shared_ptr<FilterHandler> FilterHandlerPtr;
134 
135 private:
136 
142  class TrueFilter: public FilterHandler {
143  public:
144  bool filter(boost::shared_ptr<R>& /*receiver*/, const M& /*message*/) {
145  return true;
146  }
147  };
148 
155  public:
156 
158  function(function) {
159  }
160 
161  bool filter(boost::shared_ptr<R>& receiver, const M& message) {
162  return function(receiver, message);
163  }
164 
165  private:
166  filterFunction function;
167  };
168 
176  public:
177 
179  function(function) {
180  }
181 
182  void deliver(boost::shared_ptr<R>& receiver, const M& message) {
183  function(receiver, message);
184  }
185 
186  private:
187  deliverFunction function;
188  };
189 
195  class Receiver {
196  public:
197 
198  Receiver(boost::shared_ptr<R> receiver) :
199  receiver(receiver), processing(false) {
200  }
201 
202  boost::shared_ptr<R> receiver;
203  // TODO think about if this really requires a synchronized queue if
204  // all message dispatching to worker threads is synchronized
206 
207  boost::condition processingCondition;
208 
217  volatile bool processing;
218 
219  };
220 
221  // TODO make this a set to only allow unique subscriptions?
222  boost::mutex receiversMutex;
223  std::vector<boost::shared_ptr<Receiver> > receivers;
225 
226  volatile bool jobsAvailable;
227  boost::condition jobsAvailableCondition;
228  volatile bool interrupted;
229 
230  volatile bool started;
231 
239  void nextJob(const unsigned int& /*workerNum*/,
240  boost::shared_ptr<Receiver>& receiver) {
241 
242  boost::mutex::scoped_lock lock(receiversMutex);
243 
244  // std::cout << "Worker " << workerNum << " requests a new job"
245  // << std::endl;
246 
247  // wait until a job is available
248  bool gotJob = false;
249  while (!gotJob) {
250 
251  while (!jobsAvailable && !interrupted) {
252  // std::cout << "Worker " << workerNum
253  // << ": no jobs available, waiting" << std::endl;
254  jobsAvailableCondition.wait(lock);
255  }
256 
257  if (interrupted) {
258  throw InterruptedException("Processing was interrupted");
259  }
260 
261  // search for the next job
262  for (size_t pos = 0; pos < receivers.size(); ++pos) {
263 
264  // TODO is this selection fair in any case?
265  ++currentPosition;
266  size_t realPos = currentPosition % receivers.size();
267 
268  // TODO maybe provide an atomic pop and tell if successful
269  // operation in SynchronizedQueue
270  if (!receivers[realPos]->processing
271  && !receivers[realPos]->queue.empty()) {
272 
273  // found a job
274 
275  receiver = receivers[realPos];
276  receiver->processing = true;
277  gotJob = true;
278  break;
279 
280  }
281 
282  }
283 
284  // did not find a job, hence there are no other jobs right now
285  if (!gotJob) {
286  jobsAvailable = false;
287  }
288 
289  }
290 
291  // if I got a job there are certainly others interested in jobs
292  // std::cout << "Worker " << workerNum << ": got job for receiver"
293  // << receiver->receiver << ", notify_one" << std::endl;
294  lock.unlock();
295  jobsAvailableCondition.notify_one();
296 
297  }
298 
299  unsigned int threadPoolSize;
300 
301  void finishedWork(boost::shared_ptr<Receiver> receiver) {
302 
303  boost::mutex::scoped_lock lock(receiversMutex);
304  // changing this flag must already be locked as it is read by the
305  // globally synchronized nextJob method to determine if a job is
306  // available
307  receiver->processing = false;
308  receiver->processingCondition.notify_all();
309  // maybe avoid informing other threads about new jobs of the receiver
310  // will be removed by adding a flag? Right now they will start to search
311  // for a new job and may not find one.
312  if (!receiver->queue.empty()) {
313  jobsAvailable = true;
314  lock.unlock();
315  jobsAvailableCondition.notify_one();
316  }
317 
318  }
319 
323  void worker(const unsigned int& workerNum) {
324 
325  try {
326  while (true) {
327 
328  boost::shared_ptr<Receiver> receiver;
329  nextJob(workerNum, receiver);
330  M message = receiver->queue.pop();
331  // std::cout << "Worker " << workerNum << " got new job: "
332  // << message << " for receiver " << *(receiver->receiver)
333  // << std::endl;
334  if (filterHandler->filter(receiver->receiver, message)) {
335  deliveryHandler->deliver(receiver->receiver, message);
336  }
337  finishedWork(receiver);
338 
339  }
340  } catch (InterruptedException& e) {
341  // std::cout << "Worker " << workerNum << " was interrupted"
342  // << std::endl;
343  }
344 
345  }
346 
347  std::vector<boost::shared_ptr<boost::thread> > threadPool;
348 
351 
352 public:
353 
365  deliverFunction delFunc) :
366  currentPosition(0), jobsAvailable(false), interrupted(false), started(
367  false), threadPoolSize(threadPoolSize), deliveryHandler(
368  new DeliverFunctionAdapter(delFunc)), filterHandler(
369  new TrueFilter()) {
370  }
371 
385  deliverFunction delFunc, filterFunction filterFunc) :
386  currentPosition(0), jobsAvailable(false), interrupted(false), started(
387  false), threadPoolSize(threadPoolSize), deliveryHandler(
388  new DeliverFunctionAdapter(delFunc)), filterHandler(
389  new FilterFunctionAdapter(filterFunc)) {
390  }
391 
401  currentPosition(0), jobsAvailable(false), interrupted(false), started(
402  false), threadPoolSize(threadPoolSize), deliveryHandler(
403  deliveryHandler), filterHandler(new TrueFilter) {
404  }
405 
415  currentPosition(0), jobsAvailable(false), interrupted(false), started(
416  false), threadPoolSize(threadPoolSize), deliveryHandler(
417  deliveryHandler), filterHandler(filterHandler) {
418  }
419 
421  stop();
422  }
423 
433  void registerReceiver(boost::shared_ptr<R> receiver) {
434  boost::mutex::scoped_lock lock(receiversMutex);
435  boost::shared_ptr<Receiver> rec(new Receiver(receiver));
436  receivers.push_back(rec);
437  }
438 
445  bool unregisterReceiver(boost::shared_ptr<R> receiver) {
446 
447  boost::mutex::scoped_lock lock(receiversMutex);
448 
449  for (typename std::vector<boost::shared_ptr<Receiver> >::iterator it =
450  receivers.begin(); it != receivers.end(); ++it) {
451  boost::shared_ptr<Receiver> rec = *it;
452  if (rec->receiver == receiver) {
453  it = receivers.erase(it);
454  while (rec->processing) {
455  rec ->processingCondition.wait(lock);
456  }
457  return true;
458  }
459  }
460  return false;
461 
462  }
463 
470  void start() {
471 
472  boost::mutex::scoped_lock lock(receiversMutex);
473  if (started) {
474  throw rsc::misc::IllegalStateException("Pool already running");
475  }
476 
477  interrupted = false;
478 
479  for (unsigned int i = 0; i < threadPoolSize; ++i) {
480  boost::function<void()> workerMethod = boost::bind(
482  boost::shared_ptr<boost::thread> w(new boost::thread(workerMethod));
483  threadPool.push_back(w);
484  }
485 
486  started = true;
487 
488  }
489 
493  void stop() {
494 
495  {
496  boost::mutex::scoped_lock lock(receiversMutex);
497  interrupted = true;
498  }
499  jobsAvailableCondition.notify_all();
500 
501  for (unsigned int i = 0; i < threadPool.size(); ++i) {
502  threadPool[i]->join();
503  }
504  threadPool.clear();
505 
506  started = false;
507 
508  }
509 
515  void push(const M& message) {
516 
517  // std::cout << "new job " << message << std::endl;
518  {
519  boost::mutex::scoped_lock lock(receiversMutex);
520  for (typename std::vector<boost::shared_ptr<Receiver> >::iterator
521  it = receivers.begin(); it != receivers.end(); ++it) {
522  (*it)->queue.push(message);
523  }
524  jobsAvailable = true;
525  }
526  jobsAvailableCondition.notify_one();
527 
528  }
529 
530 };
531 
532 }
533 }
534 
bool filter(boost::shared_ptr< R > &receiver, const M &message)
A function that filters a message for a receiver.
void registerReceiver(boost::shared_ptr< R > receiver)
Registers a new receiver at the pool.
Exception indicating a call on a method where the underlying object is in an illegal state for this c...
A thread pool that dispatches messages to a list of receivers.
void deliver(boost::shared_ptr< R > &receiver, const M &message)
Requests this handler to deliver the message to the receiver.
boost::function< bool(boost::shared_ptr< R > &receiver, const M &message)> filterFunction
A function that filters a message for a receiver.
boost::shared_ptr< DeliveryHandler > DeliveryHandlerPtr
void finishedWork(boost::shared_ptr< Receiver > receiver)
OrderedQueueDispatcherPool(const unsigned int &threadPoolSize, DeliveryHandlerPtr deliveryHandler, FilterHandlerPtr filterHandler)
Constructs a new pool using the object-oriented handler interface.
A handler that is used to filter messages for a certain receiver.
void nextJob(const unsigned int &, boost::shared_ptr< Receiver > &receiver)
Returns the next job to process for worker threads and blocks if there is no job. ...
An exception thrown if a blocking operation was interrupted.
std::vector< boost::shared_ptr< Receiver > > receivers
volatile bool processing
Indicates whether a job for this worker is currently being processed and this receiver hence cannot b...
An adapter for function-based filter to the object-oriented interface.
void worker(const unsigned int &workerNum)
Threaded worker method.
Represents on registered receiver of the pool.
An adapter for function-based delivery handlers to the object-oriented interface. ...
void stop()
Blocking until every thread has stopped working.
A handler that is called whenever a message is received from the pool and should be passed to a recei...
std::vector< boost::shared_ptr< boost::thread > > threadPool
boost::shared_ptr< FilterHandler > FilterHandlerPtr
virtual bool filter(boost::shared_ptr< R > &receiver, const M &message)=0
A function that filters a message for a receiver.
OrderedQueueDispatcherPool(const unsigned int &threadPoolSize, deliverFunction delFunc, filterFunction filterFunc)
Constructs a new pool.
boost::function< void(boost::shared_ptr< R > &receiver, const M &message)> deliverFunction
A function that delivers a message to a receiver.
virtual void deliver(boost::shared_ptr< R > &receiver, const M &message)=0
Requests this handler to deliver the message to the receiver.
void push(const M &message)
Pushes a new message to be dispatched to all receivers in this pool.
bool unregisterReceiver(boost::shared_ptr< R > receiver)
Unregisters all registration of one receiver.
OrderedQueueDispatcherPool(const unsigned int &threadPoolSize, deliverFunction delFunc)
Constructs a new pool.
OrderedQueueDispatcherPool(const unsigned int &threadPoolSize, DeliveryHandlerPtr deliveryHandler)
Constructs a new pool using the object-oriented handler interface that accepts every message...
bool filter(boost::shared_ptr< R > &, const M &)
A function that filters a message for a receiver.
A queue with synchronized access and interruption support.