RSC  0.16.0
rsc::threading::OrderedQueueDispatcherPool< M, R > Class Template Reference

A thread pool that dispatches messages to a list of receivers. More...

#include <OrderedQueueDispatcherPool.h>

Collaboration diagram for rsc::threading::OrderedQueueDispatcherPool< M, R >:
Collaboration graph

Classes

class  DeliverFunctionAdapter
 An adapter for function-based delivery handlers to the object-oriented interface. More...
 
class  DeliveryHandler
 A handler that is called whenever a message is received from the pool and should be passed to a receiver of the pool. More...
 
class  FilterFunctionAdapter
 An adapter for function-based filter to the object-oriented interface. More...
 
class  FilterHandler
 A handler that is used to filter messages for a certain receiver. More...
 
class  Receiver
 Represents on registered receiver of the pool. More...
 
class  TrueFilter
 A filter that accepts every message. More...
 

Public Types

typedef boost::function< void(boost::shared_ptr< R > &receiver, const M &message)> deliverFunction
 A function that delivers a message to a receiver. More...
 
typedef boost::function< bool(boost::shared_ptr< R > &receiver, const M &message)> filterFunction
 A function that filters a message for a receiver. More...
 
typedef boost::shared_ptr< DeliveryHandlerDeliveryHandlerPtr
 
typedef boost::shared_ptr< FilterHandlerFilterHandlerPtr
 

Public Member Functions

 OrderedQueueDispatcherPool (const unsigned int &threadPoolSize, deliverFunction delFunc)
 Constructs a new pool. More...
 
 OrderedQueueDispatcherPool (const unsigned int &threadPoolSize, deliverFunction delFunc, filterFunction filterFunc)
 Constructs a new pool. More...
 
 OrderedQueueDispatcherPool (const unsigned int &threadPoolSize, DeliveryHandlerPtr deliveryHandler)
 Constructs a new pool using the object-oriented handler interface that accepts every message. More...
 
 OrderedQueueDispatcherPool (const unsigned int &threadPoolSize, DeliveryHandlerPtr deliveryHandler, FilterHandlerPtr filterHandler)
 Constructs a new pool using the object-oriented handler interface. More...
 
virtual ~OrderedQueueDispatcherPool ()
 
void registerReceiver (boost::shared_ptr< R > receiver)
 Registers a new receiver at the pool. More...
 
bool unregisterReceiver (boost::shared_ptr< R > receiver)
 Unregisters all registration of one receiver. More...
 
void setParallelCalls (const bool &allow)
 Decides whether a single receiver might be called in parallel with overlapping calls or not. More...
 
void start ()
 Non-blocking start. More...
 
void stop ()
 Blocking until every thread has stopped working. More...
 
void push (const M &message)
 Pushes a new message to be dispatched to all receivers in this pool. More...
 

Private Member Functions

void nextJob (const unsigned int &, boost::shared_ptr< Receiver > &receiver)
 Returns the next job to process for worker threads and blocks if there is no job. More...
 
void finishedWork (boost::shared_ptr< Receiver > receiver)
 
void worker (const unsigned int &workerNum)
 Threaded worker method. More...
 

Private Attributes

boost::mutex receiversMutex
 
std::vector< boost::shared_ptr< Receiver > > receivers
 
size_t currentPosition
 
volatile bool jobsAvailable
 
boost::condition jobsAvailableCondition
 
volatile bool interrupted
 
volatile bool parallelCalls
 
volatile bool started
 
unsigned int threadPoolSize
 
std::vector< boost::shared_ptr< boost::thread > > threadPool
 
DeliveryHandlerPtr deliveryHandler
 
FilterHandlerPtr filterHandler
 

Detailed Description

template<class M, class R>
class rsc::threading::OrderedQueueDispatcherPool< M, R >

A thread pool that dispatches messages to a list of receivers.

The number of threads is usually smaller than the number of receivers and for each receiver it is guaranteed that messages arrive in the order they were published. No guarantees are given between different receivers. All methods except start and stop are reentrant.

The pool can be stopped and restarted at any time during the processing but these calls must be single-threaded.

Assumptions:

  • same subscriptions for multiple receivers unlikely, hence filtering done per receiver thread

Filtering and delivery of message to receivers are performed by handlers. These handlers should be stateless.

Author
jwienke
Template Parameters
Mtype of the messages dispatched by the pool
Rtype of the message receiver

Definition at line 65 of file OrderedQueueDispatcherPool.h.

Member Typedef Documentation

template<class M , class R >
typedef boost::function<void(boost::shared_ptr<R>& receiver, const M& message)> rsc::threading::OrderedQueueDispatcherPool< M, R >::deliverFunction

A function that delivers a message to a receiver.

Must be reentrant.

Definition at line 72 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
typedef boost::shared_ptr<DeliveryHandler> rsc::threading::OrderedQueueDispatcherPool< M, R >::DeliveryHandlerPtr

Definition at line 106 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
typedef boost::function<bool(boost::shared_ptr<R>& receiver, const M& message)> rsc::threading::OrderedQueueDispatcherPool< M, R >::filterFunction

A function that filters a message for a receiver.

If true is returned, the message is acceptable for the receiver, else it will not be delivered. Must be reentrant.

Definition at line 80 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
typedef boost::shared_ptr<FilterHandler> rsc::threading::OrderedQueueDispatcherPool< M, R >::FilterHandlerPtr

Definition at line 133 of file OrderedQueueDispatcherPool.h.

Constructor & Destructor Documentation

template<class M , class R >
rsc::threading::OrderedQueueDispatcherPool< M, R >::OrderedQueueDispatcherPool ( const unsigned int &  threadPoolSize,
deliverFunction  delFunc 
)
inline

Constructs a new pool.

Parameters
threadPoolSizenumber of threads for this pool
delFuncthe strategy used to deliver messages of type M to receivers of type R. This will most likely be a simple delegate function mapping to a concrete method call. Must be reentrant.
Note
the object-oriented interface should be preferred

Definition at line 366 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
rsc::threading::OrderedQueueDispatcherPool< M, R >::OrderedQueueDispatcherPool ( const unsigned int &  threadPoolSize,
deliverFunction  delFunc,
filterFunction  filterFunc 
)
inline

Constructs a new pool.

Parameters
threadPoolSizenumber of threads for this pool
delFuncthe strategy used to deliver messages of type M to receivers of type R. This will most likely be a simple delegate function mapping to a concrete method call. Must be reentrant.
filterFuncReentrant function used to filter messages per receiver. Default accepts every message.
Note
the object-oriented interface should be preferred

Definition at line 386 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
rsc::threading::OrderedQueueDispatcherPool< M, R >::OrderedQueueDispatcherPool ( const unsigned int &  threadPoolSize,
DeliveryHandlerPtr  deliveryHandler 
)
inline

Constructs a new pool using the object-oriented handler interface that accepts every message.

Parameters
threadPoolSizenumber of threads for this pool
deliveryHandlerhandler to deliver messages to receivers

Definition at line 401 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
rsc::threading::OrderedQueueDispatcherPool< M, R >::OrderedQueueDispatcherPool ( const unsigned int &  threadPoolSize,
DeliveryHandlerPtr  deliveryHandler,
FilterHandlerPtr  filterHandler 
)
inline

Constructs a new pool using the object-oriented handler interface.

Parameters
threadPoolSizenumber of threads for this pool
deliveryHandlerhandler to deliver messages to receivers
filterHandlerfilter handler for messages

Definition at line 415 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
virtual rsc::threading::OrderedQueueDispatcherPool< M, R >::~OrderedQueueDispatcherPool ( )
inlinevirtual

Definition at line 422 of file OrderedQueueDispatcherPool.h.

References rsc::threading::OrderedQueueDispatcherPool< M, R >::stop().

Here is the call graph for this function:

Member Function Documentation

template<class M , class R >
void rsc::threading::OrderedQueueDispatcherPool< M, R >::finishedWork ( boost::shared_ptr< Receiver receiver)
inlineprivate

Definition at line 303 of file OrderedQueueDispatcherPool.h.

Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::worker().

Here is the caller graph for this function:

template<class M , class R >
void rsc::threading::OrderedQueueDispatcherPool< M, R >::nextJob ( const unsigned int &  ,
boost::shared_ptr< Receiver > &  receiver 
)
inlineprivate

Returns the next job to process for worker threads and blocks if there is no job.

Parameters
workerNumnumber of the worker requesting a new job
receiverout param with the receiver to work on

Definition at line 241 of file OrderedQueueDispatcherPool.h.

References rsc::threading::OrderedQueueDispatcherPool< M, R >::currentPosition.

Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::worker().

Here is the caller graph for this function:

template<class M , class R >
void rsc::threading::OrderedQueueDispatcherPool< M, R >::push ( const M &  message)
inline

Pushes a new message to be dispatched to all receivers in this pool.

Parameters
messagemessage to dispatch

Definition at line 537 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
void rsc::threading::OrderedQueueDispatcherPool< M, R >::registerReceiver ( boost::shared_ptr< R >  receiver)
inline

Registers a new receiver at the pool.

Multiple registrations of the same receiver are possible resulting in being called multiple times for the same message (but effectively this destroys the guarantee about ordering given above because multiple message queues are used for every subscription).

Parameters
receivernew receiver

Definition at line 435 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
void rsc::threading::OrderedQueueDispatcherPool< M, R >::setParallelCalls ( const bool &  allow)
inline

Decides whether a single receiver might be called in parallel with overlapping calls or not.

As the default, each receiver receives only one call at a time until the deliver method for that receiver has returned. Each call might originate from a different thread, but they will never overlap. By setting this flag to true, calls might overlap to allow a true parallelism. This, however, weakens the ordering guarantees given by this class. The thread pool is still guaranteed to process the orders in the way they arrive. However, due to the processing of multiple parallel threads, during the time of picking up the new message to dispatch and the actual dispatch task to the user code, ordering might change as a result of thread scheduling.

Parameters
allowif set to true, receivers might be called in parallel by multiple thread with successive messages to dispatch

Definition at line 482 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
void rsc::threading::OrderedQueueDispatcherPool< M, R >::start ( )
inline

Non-blocking start.

Exceptions
IllegalStateExceptionif the pool was already started and is running

Definition at line 492 of file OrderedQueueDispatcherPool.h.

References rsc::threading::OrderedQueueDispatcherPool< M, R >::threadPoolSize, and rsc::threading::OrderedQueueDispatcherPool< M, R >::worker().

Here is the call graph for this function:

template<class M , class R >
void rsc::threading::OrderedQueueDispatcherPool< M, R >::stop ( )
inline

Blocking until every thread has stopped working.

Definition at line 515 of file OrderedQueueDispatcherPool.h.

Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::~OrderedQueueDispatcherPool().

Here is the caller graph for this function:

template<class M , class R >
bool rsc::threading::OrderedQueueDispatcherPool< M, R >::unregisterReceiver ( boost::shared_ptr< R >  receiver)
inline

Unregisters all registration of one receiver.

Parameters
receiverreceiver to unregister
Returns
true if one or more receivers were unregistered, else false

Definition at line 447 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
void rsc::threading::OrderedQueueDispatcherPool< M, R >::worker ( const unsigned int &  workerNum)
inlineprivate

Member Data Documentation

template<class M , class R >
size_t rsc::threading::OrderedQueueDispatcherPool< M, R >::currentPosition
private
template<class M , class R >
DeliveryHandlerPtr rsc::threading::OrderedQueueDispatcherPool< M, R >::deliveryHandler
private
template<class M , class R >
FilterHandlerPtr rsc::threading::OrderedQueueDispatcherPool< M, R >::filterHandler
private
template<class M , class R >
volatile bool rsc::threading::OrderedQueueDispatcherPool< M, R >::interrupted
private

Definition at line 228 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
volatile bool rsc::threading::OrderedQueueDispatcherPool< M, R >::jobsAvailable
private

Definition at line 226 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
boost::condition rsc::threading::OrderedQueueDispatcherPool< M, R >::jobsAvailableCondition
private

Definition at line 227 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
volatile bool rsc::threading::OrderedQueueDispatcherPool< M, R >::parallelCalls
private

Definition at line 230 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
std::vector<boost::shared_ptr<Receiver> > rsc::threading::OrderedQueueDispatcherPool< M, R >::receivers
private

Definition at line 223 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
boost::mutex rsc::threading::OrderedQueueDispatcherPool< M, R >::receiversMutex
private

Definition at line 222 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
volatile bool rsc::threading::OrderedQueueDispatcherPool< M, R >::started
private

Definition at line 232 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
std::vector<boost::shared_ptr<boost::thread> > rsc::threading::OrderedQueueDispatcherPool< M, R >::threadPool
private

Definition at line 349 of file OrderedQueueDispatcherPool.h.

template<class M , class R >
unsigned int rsc::threading::OrderedQueueDispatcherPool< M, R >::threadPoolSize
private

The documentation for this class was generated from the following file: