RSC
0.17.1
|
A thread pool that dispatches messages to a list of receivers. More...
#include <OrderedQueueDispatcherPool.h>
Classes | |
class | DeliverFunctionAdapter |
An adapter for function-based delivery handlers to the object-oriented interface. More... | |
class | DeliveryHandler |
A handler that is called whenever a message is received from the pool and should be passed to a receiver of the pool. More... | |
class | FilterFunctionAdapter |
An adapter for function-based filter to the object-oriented interface. More... | |
class | FilterHandler |
A handler that is used to filter messages for a certain receiver. More... | |
class | Receiver |
Represents on registered receiver of the pool. More... | |
class | TrueFilter |
A filter that accepts every message. More... | |
Public Types | |
typedef boost::function< void(boost::shared_ptr< R > &receiver, const M &message)> | deliverFunction |
A function that delivers a message to a receiver. More... | |
typedef boost::function< bool(boost::shared_ptr< R > &receiver, const M &message)> | filterFunction |
A function that filters a message for a receiver. More... | |
typedef boost::shared_ptr< DeliveryHandler > | DeliveryHandlerPtr |
typedef boost::shared_ptr< FilterHandler > | FilterHandlerPtr |
Public Member Functions | |
OrderedQueueDispatcherPool (const unsigned int &threadPoolSize, deliverFunction delFunc) | |
Constructs a new pool. More... | |
OrderedQueueDispatcherPool (const unsigned int &threadPoolSize, deliverFunction delFunc, filterFunction filterFunc) | |
Constructs a new pool. More... | |
OrderedQueueDispatcherPool (const unsigned int &threadPoolSize, DeliveryHandlerPtr deliveryHandler) | |
Constructs a new pool using the object-oriented handler interface that accepts every message. More... | |
OrderedQueueDispatcherPool (const unsigned int &threadPoolSize, DeliveryHandlerPtr deliveryHandler, FilterHandlerPtr filterHandler) | |
Constructs a new pool using the object-oriented handler interface. More... | |
virtual | ~OrderedQueueDispatcherPool () |
void | registerReceiver (boost::shared_ptr< R > receiver) |
Registers a new receiver at the pool. More... | |
bool | unregisterReceiver (boost::shared_ptr< R > receiver) |
Unregisters all registration of one receiver. More... | |
void | setParallelCalls (const bool &allow) |
Decides whether a single receiver might be called in parallel with overlapping calls or not. More... | |
void | start () |
Non-blocking start. More... | |
void | stop () |
Blocking until every thread has stopped working. More... | |
void | push (const M &message) |
Pushes a new message to be dispatched to all receivers in this pool. More... | |
Private Member Functions | |
void | nextJob (const unsigned int &, boost::shared_ptr< Receiver > &receiver) |
Returns the next job to process for worker threads and blocks if there is no job. More... | |
void | finishedWork (boost::shared_ptr< Receiver > receiver) |
void | worker (const unsigned int &workerNum) |
Threaded worker method. More... | |
Private Attributes | |
boost::mutex | receiversMutex |
std::vector< boost::shared_ptr< Receiver > > | receivers |
size_t | currentPosition |
volatile bool | jobsAvailable |
boost::condition | jobsAvailableCondition |
volatile bool | interrupted |
volatile bool | parallelCalls |
volatile bool | started |
unsigned int | threadPoolSize |
std::vector< boost::shared_ptr< boost::thread > > | threadPool |
DeliveryHandlerPtr | deliveryHandler |
FilterHandlerPtr | filterHandler |
A thread pool that dispatches messages to a list of receivers.
The number of threads is usually smaller than the number of receivers and for each receiver it is guaranteed that messages arrive in the order they were published. No guarantees are given between different receivers. All methods except start and stop are reentrant.
The pool can be stopped and restarted at any time during the processing but these calls must be single-threaded.
Assumptions:
Filtering and delivery of message to receivers are performed by handlers. These handlers should be stateless.
M | type of the messages dispatched by the pool |
R | type of the message receiver |
Definition at line 65 of file OrderedQueueDispatcherPool.h.
typedef boost::function<void(boost::shared_ptr<R>& receiver, const M& message)> rsc::threading::OrderedQueueDispatcherPool< M, R >::deliverFunction |
A function that delivers a message to a receiver.
Must be reentrant.
Definition at line 72 of file OrderedQueueDispatcherPool.h.
typedef boost::shared_ptr<DeliveryHandler> rsc::threading::OrderedQueueDispatcherPool< M, R >::DeliveryHandlerPtr |
Definition at line 106 of file OrderedQueueDispatcherPool.h.
typedef boost::function<bool(boost::shared_ptr<R>& receiver, const M& message)> rsc::threading::OrderedQueueDispatcherPool< M, R >::filterFunction |
A function that filters a message for a receiver.
If true
is returned, the message is acceptable for the receiver, else it will not be delivered. Must be reentrant.
Definition at line 80 of file OrderedQueueDispatcherPool.h.
typedef boost::shared_ptr<FilterHandler> rsc::threading::OrderedQueueDispatcherPool< M, R >::FilterHandlerPtr |
Definition at line 133 of file OrderedQueueDispatcherPool.h.
|
inline |
Constructs a new pool.
threadPoolSize | number of threads for this pool |
delFunc | the strategy used to deliver messages of type M to receivers of type R. This will most likely be a simple delegate function mapping to a concrete method call. Must be reentrant. |
Definition at line 366 of file OrderedQueueDispatcherPool.h.
|
inline |
Constructs a new pool.
threadPoolSize | number of threads for this pool |
delFunc | the strategy used to deliver messages of type M to receivers of type R. This will most likely be a simple delegate function mapping to a concrete method call. Must be reentrant. |
filterFunc | Reentrant function used to filter messages per receiver. Default accepts every message. |
Definition at line 386 of file OrderedQueueDispatcherPool.h.
|
inline |
Constructs a new pool using the object-oriented handler interface that accepts every message.
threadPoolSize | number of threads for this pool |
deliveryHandler | handler to deliver messages to receivers |
Definition at line 401 of file OrderedQueueDispatcherPool.h.
|
inline |
Constructs a new pool using the object-oriented handler interface.
threadPoolSize | number of threads for this pool |
deliveryHandler | handler to deliver messages to receivers |
filterHandler | filter handler for messages |
Definition at line 415 of file OrderedQueueDispatcherPool.h.
|
inlinevirtual |
Definition at line 422 of file OrderedQueueDispatcherPool.h.
References rsc::threading::OrderedQueueDispatcherPool< M, R >::stop().
|
inlineprivate |
Definition at line 303 of file OrderedQueueDispatcherPool.h.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::worker().
|
inlineprivate |
Returns the next job to process for worker threads and blocks if there is no job.
workerNum | number of the worker requesting a new job |
receiver | out param with the receiver to work on |
Definition at line 241 of file OrderedQueueDispatcherPool.h.
References rsc::threading::OrderedQueueDispatcherPool< M, R >::currentPosition.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::worker().
|
inline |
Pushes a new message to be dispatched to all receivers in this pool.
message | message to dispatch |
Definition at line 537 of file OrderedQueueDispatcherPool.h.
|
inline |
Registers a new receiver at the pool.
Multiple registrations of the same receiver are possible resulting in being called multiple times for the same message (but effectively this destroys the guarantee about ordering given above because multiple message queues are used for every subscription).
receiver | new receiver |
Definition at line 435 of file OrderedQueueDispatcherPool.h.
|
inline |
Decides whether a single receiver might be called in parallel with overlapping calls or not.
As the default, each receiver receives only one call at a time until the deliver method for that receiver has returned. Each call might originate from a different thread, but they will never overlap. By setting this flag to true
, calls might overlap to allow a true parallelism. This, however, weakens the ordering guarantees given by this class. The thread pool is still guaranteed to process the orders in the way they arrive. However, due to the processing of multiple parallel threads, during the time of picking up the new message to dispatch and the actual dispatch task to the user code, ordering might change as a result of thread scheduling.
allow | if set to true , receivers might be called in parallel by multiple thread with successive messages to dispatch |
Definition at line 482 of file OrderedQueueDispatcherPool.h.
|
inline |
Non-blocking start.
IllegalStateException | if the pool was already started and is running |
Definition at line 492 of file OrderedQueueDispatcherPool.h.
References rsc::threading::OrderedQueueDispatcherPool< M, R >::threadPoolSize, and rsc::threading::OrderedQueueDispatcherPool< M, R >::worker().
|
inline |
Blocking until every thread has stopped working.
Definition at line 515 of file OrderedQueueDispatcherPool.h.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::~OrderedQueueDispatcherPool().
|
inline |
Unregisters all registration of one receiver.
receiver | receiver to unregister |
true
if one or more receivers were unregistered, else false
Definition at line 447 of file OrderedQueueDispatcherPool.h.
|
inlineprivate |
Threaded worker method.
Definition at line 325 of file OrderedQueueDispatcherPool.h.
References rsc::threading::OrderedQueueDispatcherPool< M, R >::deliveryHandler, rsc::threading::OrderedQueueDispatcherPool< M, R >::filterHandler, rsc::threading::OrderedQueueDispatcherPool< M, R >::finishedWork(), and rsc::threading::OrderedQueueDispatcherPool< M, R >::nextJob().
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::start().
|
private |
Definition at line 224 of file OrderedQueueDispatcherPool.h.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::nextJob().
|
private |
Definition at line 351 of file OrderedQueueDispatcherPool.h.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::worker().
|
private |
Definition at line 352 of file OrderedQueueDispatcherPool.h.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::worker().
|
private |
Definition at line 228 of file OrderedQueueDispatcherPool.h.
|
private |
Definition at line 226 of file OrderedQueueDispatcherPool.h.
|
private |
Definition at line 227 of file OrderedQueueDispatcherPool.h.
|
private |
Definition at line 230 of file OrderedQueueDispatcherPool.h.
|
private |
Definition at line 223 of file OrderedQueueDispatcherPool.h.
|
private |
Definition at line 222 of file OrderedQueueDispatcherPool.h.
|
private |
Definition at line 232 of file OrderedQueueDispatcherPool.h.
|
private |
Definition at line 349 of file OrderedQueueDispatcherPool.h.
|
private |
Definition at line 301 of file OrderedQueueDispatcherPool.h.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::start().