29 #include <boost/bind.hpp> 30 #include <boost/thread/condition.hpp> 31 #include <boost/function.hpp> 32 #include <boost/thread/mutex.hpp> 33 #include <boost/shared_ptr.hpp> 34 #include <boost/thread.hpp> 36 #include "../misc/IllegalStateException.h" 64 template<
class M,
class R>
71 typedef boost::function<void(boost::shared_ptr<R>& receiver,
79 typedef boost::function<bool(boost::shared_ptr<R>& receiver,
102 deliver(boost::shared_ptr<R>& receiver,
const M& message) = 0;
129 filter(boost::shared_ptr<R>& receiver,
const M& message) = 0;
144 bool filter(boost::shared_ptr<R>& ,
const M& ) {
161 bool filter(boost::shared_ptr<R>& receiver,
const M& message) {
162 return function(receiver, message);
182 void deliver(boost::shared_ptr<R>& receiver,
const M& message) {
183 function(receiver, message);
199 receiver(receiver), processing(false) {
242 boost::shared_ptr<Receiver>& receiver) {
244 boost::mutex::scoped_lock lock(receiversMutex);
253 while (!jobsAvailable && !interrupted) {
256 jobsAvailableCondition.wait(lock);
264 for (
size_t pos = 0; pos < receivers.size(); ++pos) {
268 size_t realPos = currentPosition % receivers.size();
272 if ((parallelCalls || !receivers[realPos]->processing)
273 && !receivers[realPos]->queue.empty()) {
277 receiver = receivers[realPos];
278 receiver->processing =
true;
288 jobsAvailable =
false;
297 jobsAvailableCondition.notify_one();
305 boost::mutex::scoped_lock lock(receiversMutex);
309 receiver->processing =
false;
310 receiver->processingCondition.notify_all();
314 if (!receiver->queue.empty()) {
315 jobsAvailable =
true;
317 jobsAvailableCondition.notify_one();
325 void worker(
const unsigned int& workerNum) {
330 boost::shared_ptr<Receiver> receiver;
332 M message = receiver->queue.pop();
368 currentPosition(0), jobsAvailable(false), interrupted(false), parallelCalls(
369 false), started(false), threadPoolSize(threadPoolSize), deliveryHandler(
388 currentPosition(0), jobsAvailable(false), interrupted(false), parallelCalls(
389 false), started(false), threadPoolSize(threadPoolSize), deliveryHandler(
402 DeliveryHandlerPtr deliveryHandler) :
403 currentPosition(0), jobsAvailable(false), interrupted(false), parallelCalls(
404 false), started(false), threadPoolSize(threadPoolSize), deliveryHandler(
405 deliveryHandler), filterHandler(new
TrueFilter) {
416 DeliveryHandlerPtr deliveryHandler, FilterHandlerPtr filterHandler) :
417 currentPosition(0), jobsAvailable(false), interrupted(false), parallelCalls(
418 false), started(false), threadPoolSize(threadPoolSize), deliveryHandler(
419 deliveryHandler), filterHandler(filterHandler) {
436 boost::mutex::scoped_lock lock(receiversMutex);
437 boost::shared_ptr<Receiver> rec(
new Receiver(receiver));
438 receivers.push_back(rec);
449 boost::mutex::scoped_lock lock(receiversMutex);
451 for (
typename std::vector<boost::shared_ptr<Receiver> >::iterator it =
452 receivers.begin(); it != receivers.end(); ++it) {
453 boost::shared_ptr<Receiver> rec = *it;
454 if (rec->receiver == receiver) {
455 it = receivers.erase(it);
456 while (rec->processing) {
457 rec ->processingCondition.wait(lock);
483 parallelCalls = allow;
494 boost::mutex::scoped_lock lock(receiversMutex);
502 boost::function<void()> workerMethod = boost::bind(
504 boost::shared_ptr<boost::thread> w(
new boost::thread(workerMethod));
505 threadPool.push_back(w);
518 boost::mutex::scoped_lock lock(receiversMutex);
521 jobsAvailableCondition.notify_all();
523 for (
unsigned int i = 0; i < threadPool.size(); ++i) {
524 threadPool[i]->join();
541 boost::mutex::scoped_lock lock(receiversMutex);
542 for (
typename std::vector<boost::shared_ptr<Receiver> >::iterator
543 it = receivers.begin(); it != receivers.end(); ++it) {
544 (*it)->queue.push(message);
546 jobsAvailable =
true;
548 jobsAvailableCondition.notify_one();
bool filter(boost::shared_ptr< R > &receiver, const M &message)
A function that filters a message for a receiver.
void registerReceiver(boost::shared_ptr< R > receiver)
Registers a new receiver at the pool.
Exception indicating a call on a method where the underlying object is in an illegal state for this c...
A thread pool that dispatches messages to a list of receivers.
void deliver(boost::shared_ptr< R > &receiver, const M &message)
Requests this handler to deliver the message to the receiver.
volatile bool parallelCalls
unsigned int threadPoolSize
FilterHandlerPtr filterHandler
boost::function< bool(boost::shared_ptr< R > &receiver, const M &message)> filterFunction
A function that filters a message for a receiver.
boost::shared_ptr< DeliveryHandler > DeliveryHandlerPtr
void finishedWork(boost::shared_ptr< Receiver > receiver)
OrderedQueueDispatcherPool(const unsigned int &threadPoolSize, DeliveryHandlerPtr deliveryHandler, FilterHandlerPtr filterHandler)
Constructs a new pool using the object-oriented handler interface.
DeliverFunctionAdapter(deliverFunction function)
boost::shared_ptr< R > receiver
A handler that is used to filter messages for a certain receiver.
virtual ~DeliveryHandler()
void nextJob(const unsigned int &, boost::shared_ptr< Receiver > &receiver)
Returns the next job to process for worker threads and blocks if there is no job. ...
An exception thrown if a blocking operation was interrupted.
std::vector< boost::shared_ptr< Receiver > > receivers
void start()
Non-blocking start.
volatile bool processing
Indicates whether a job for this worker is currently being processed and this receiver hence cannot b...
An adapter for function-based filter to the object-oriented interface.
void worker(const unsigned int &workerNum)
Threaded worker method.
Represents on registered receiver of the pool.
An adapter for function-based delivery handlers to the object-oriented interface. ...
void stop()
Blocking until every thread has stopped working.
volatile bool interrupted
A handler that is called whenever a message is received from the pool and should be passed to a recei...
FilterFunctionAdapter(filterFunction function)
std::vector< boost::shared_ptr< boost::thread > > threadPool
DeliveryHandlerPtr deliveryHandler
SynchronizedQueue< M > queue
boost::condition jobsAvailableCondition
boost::mutex receiversMutex
virtual ~OrderedQueueDispatcherPool()
volatile bool jobsAvailable
boost::condition processingCondition
void setParallelCalls(const bool &allow)
Decides whether a single receiver might be called in parallel with overlapping calls or not...
boost::shared_ptr< FilterHandler > FilterHandlerPtr
OrderedQueueDispatcherPool(const unsigned int &threadPoolSize, deliverFunction delFunc, filterFunction filterFunc)
Constructs a new pool.
boost::function< void(boost::shared_ptr< R > &receiver, const M &message)> deliverFunction
A function that delivers a message to a receiver.
virtual void deliver(boost::shared_ptr< R > &receiver, const M &message)=0
Requests this handler to deliver the message to the receiver.
void push(const M &message)
Pushes a new message to be dispatched to all receivers in this pool.
bool unregisterReceiver(boost::shared_ptr< R > receiver)
Unregisters all registration of one receiver.
OrderedQueueDispatcherPool(const unsigned int &threadPoolSize, deliverFunction delFunc)
Constructs a new pool.
OrderedQueueDispatcherPool(const unsigned int &threadPoolSize, DeliveryHandlerPtr deliveryHandler)
Constructs a new pool using the object-oriented handler interface that accepts every message...
bool filter(boost::shared_ptr< R > &, const M &)
A function that filters a message for a receiver.
A queue with synchronized access and interruption support.
Receiver(boost::shared_ptr< R > receiver)
A filter that accepts every message.