29 #include <boost/bind.hpp>
30 #include <boost/thread/condition.hpp>
31 #include <boost/function.hpp>
32 #include <boost/thread/mutex.hpp>
33 #include <boost/shared_ptr.hpp>
34 #include <boost/thread.hpp>
36 #include "../misc/IllegalStateException.h"
64 template<
class M,
class R>
71 typedef boost::function<void(boost::shared_ptr<R>& receiver,
79 typedef boost::function<bool(boost::shared_ptr<R>& receiver,
102 deliver(boost::shared_ptr<R>& receiver,
const M& message) = 0;
129 filter(boost::shared_ptr<R>& receiver,
const M& message) = 0;
144 bool filter(boost::shared_ptr<R>& ,
const M& ) {
161 bool filter(boost::shared_ptr<R>& receiver,
const M& message) {
162 return function(receiver, message);
182 void deliver(boost::shared_ptr<R>& receiver,
const M& message) {
183 function(receiver, message);
242 boost::shared_ptr<Receiver>& receiver) {
264 for (
size_t pos = 0; pos <
receivers.size(); ++pos) {
278 receiver->processing =
true;
309 receiver->processing =
false;
310 receiver->processingCondition.notify_all();
314 if (!receiver->queue.empty()) {
325 void worker(
const unsigned int& workerNum) {
330 boost::shared_ptr<Receiver> receiver;
332 M message = receiver->queue.pop();
404 false),
started(false), threadPoolSize(threadPoolSize), deliveryHandler(
418 false),
started(false), threadPoolSize(threadPoolSize), deliveryHandler(
419 deliveryHandler), filterHandler(filterHandler) {
437 boost::shared_ptr<Receiver> rec(
new Receiver(receiver));
451 for (
typename std::vector<boost::shared_ptr<Receiver> >::iterator it =
453 boost::shared_ptr<Receiver> rec = *it;
454 if (rec->receiver == receiver) {
456 while (rec->processing) {
457 rec ->processingCondition.wait(lock);
502 boost::function<void()> workerMethod = boost::bind(
504 boost::shared_ptr<boost::thread> w(
new boost::thread(workerMethod));
523 for (
unsigned int i = 0; i <
threadPool.size(); ++i) {
542 for (
typename std::vector<boost::shared_ptr<Receiver> >::iterator
544 (*it)->queue.push(message);
bool filter(boost::shared_ptr< R > &receiver, const M &message)
A function that filters a message for a receiver.
void registerReceiver(boost::shared_ptr< R > receiver)
Registers a new receiver at the pool.
Exception indicating a call on a method where the underlying object is in an illegal state for this c...
A thread pool that dispatches messages to a list of receivers.
void deliver(boost::shared_ptr< R > &receiver, const M &message)
Requests this handler to deliver the message to the receiver.
volatile bool parallelCalls
unsigned int threadPoolSize
FilterHandlerPtr filterHandler
boost::function< bool(boost::shared_ptr< R > &receiver, const M &message)> filterFunction
A function that filters a message for a receiver.
boost::shared_ptr< DeliveryHandler > DeliveryHandlerPtr
void finishedWork(boost::shared_ptr< Receiver > receiver)
OrderedQueueDispatcherPool(const unsigned int &threadPoolSize, DeliveryHandlerPtr deliveryHandler, FilterHandlerPtr filterHandler)
Constructs a new pool using the object-oriented handler interface.
DeliverFunctionAdapter(deliverFunction function)
boost::shared_ptr< R > receiver
A handler that is used to filter messages for a certain receiver.
virtual ~DeliveryHandler()
void nextJob(const unsigned int &, boost::shared_ptr< Receiver > &receiver)
Returns the next job to process for worker threads and blocks if there is no job. ...
An exception thrown if a blocking operation was interrupted.
std::vector< boost::shared_ptr< Receiver > > receivers
void start()
Non-blocking start.
volatile bool processing
Indicates whether a job for this worker is currently being processed and this receiver hence cannot b...
An adapter for function-based filter to the object-oriented interface.
void worker(const unsigned int &workerNum)
Threaded worker method.
Represents on registered receiver of the pool.
An adapter for function-based delivery handlers to the object-oriented interface. ...
void stop()
Blocking until every thread has stopped working.
volatile bool interrupted
A handler that is called whenever a message is received from the pool and should be passed to a recei...
FilterFunctionAdapter(filterFunction function)
std::vector< boost::shared_ptr< boost::thread > > threadPool
DeliveryHandlerPtr deliveryHandler
SynchronizedQueue< M > queue
boost::condition jobsAvailableCondition
boost::recursive_mutex mutex
boost::mutex receiversMutex
virtual ~OrderedQueueDispatcherPool()
volatile bool jobsAvailable
boost::condition processingCondition
void setParallelCalls(const bool &allow)
Decides whether a single receiver might be called in parallel with overlapping calls or not...
boost::shared_ptr< FilterHandler > FilterHandlerPtr
virtual bool filter(boost::shared_ptr< R > &receiver, const M &message)=0
A function that filters a message for a receiver.
OrderedQueueDispatcherPool(const unsigned int &threadPoolSize, deliverFunction delFunc, filterFunction filterFunc)
Constructs a new pool.
boost::function< void(boost::shared_ptr< R > &receiver, const M &message)> deliverFunction
A function that delivers a message to a receiver.
virtual void deliver(boost::shared_ptr< R > &receiver, const M &message)=0
Requests this handler to deliver the message to the receiver.
void push(const M &message)
Pushes a new message to be dispatched to all receivers in this pool.
bool unregisterReceiver(boost::shared_ptr< R > receiver)
Unregisters all registration of one receiver.
OrderedQueueDispatcherPool(const unsigned int &threadPoolSize, deliverFunction delFunc)
Constructs a new pool.
OrderedQueueDispatcherPool(const unsigned int &threadPoolSize, DeliveryHandlerPtr deliveryHandler)
Constructs a new pool using the object-oriented handler interface that accepts every message...
bool filter(boost::shared_ptr< R > &, const M &)
A function that filters a message for a receiver.
A queue with synchronized access and interruption support.
Receiver(boost::shared_ptr< R > receiver)
A filter that accepts every message.