RSC
0.7.17
|
A thread pool that dispatches messages to a list of receivers. More...
#include <OrderedQueueDispatcherPool.h>
Classes | |
class | DeliverFunctionAdapter |
An adapter for function-based delivery handlers to the object-oriented interface. More... | |
class | DeliveryHandler |
A handler that is called whenever a message is received from the pool and should be passed to a receiver of the pool. More... | |
class | FilterFunctionAdapter |
An adapter for function-based filter to the object-oriented interface. More... | |
class | FilterHandler |
A handler that is used to filter messages for a certain receiver. More... | |
class | Receiver |
Represents on registered receiver of the pool. More... | |
class | TrueFilter |
A filter that accepts every message. More... | |
Public Types | |
typedef boost::function< void(boost::shared_ptr < R > &receiver, const M &message)> | deliverFunction |
A function that delivers a message to a receiver. More... | |
typedef boost::function< bool(boost::shared_ptr < R > &receiver, const M &message)> | filterFunction |
A function that filters a message for a receiver. More... | |
typedef boost::shared_ptr < DeliveryHandler > | DeliveryHandlerPtr |
typedef boost::shared_ptr < FilterHandler > | FilterHandlerPtr |
Public Member Functions | |
OrderedQueueDispatcherPool (const unsigned int &threadPoolSize, deliverFunction delFunc) | |
Constructs a new pool. More... | |
OrderedQueueDispatcherPool (const unsigned int &threadPoolSize, deliverFunction delFunc, filterFunction filterFunc) | |
Constructs a new pool. More... | |
OrderedQueueDispatcherPool (const unsigned int &threadPoolSize, DeliveryHandlerPtr deliveryHandler) | |
Constructs a new pool using the object-oriented handler interface that accepts every message. More... | |
OrderedQueueDispatcherPool (const unsigned int &threadPoolSize, DeliveryHandlerPtr deliveryHandler, FilterHandlerPtr filterHandler) | |
Constructs a new pool using the object-oriented handler interface. More... | |
virtual | ~OrderedQueueDispatcherPool () |
void | registerReceiver (boost::shared_ptr< R > receiver) |
Registers a new receiver at the pool. More... | |
bool | unregisterReceiver (boost::shared_ptr< R > receiver) |
Unregisters all registration of one receiver. More... | |
void | start () |
Non-blocking start. More... | |
void | stop () |
Blocking until every thread has stopped working. More... | |
void | push (const M &message) |
Pushes a new message to be dispatched to all receivers in this pool. More... | |
Private Member Functions | |
void | nextJob (const unsigned int &, boost::shared_ptr< Receiver > &receiver) |
Returns the next job to process for worker threads and blocks if there is no job. More... | |
void | finishedWork (boost::shared_ptr< Receiver > receiver) |
void | worker (const unsigned int &workerNum) |
Threaded worker method. More... | |
Private Attributes | |
boost::mutex | receiversMutex |
std::vector< boost::shared_ptr < Receiver > > | receivers |
size_t | currentPosition |
volatile bool | jobsAvailable |
boost::condition | jobsAvailableCondition |
volatile bool | interrupted |
volatile bool | started |
unsigned int | threadPoolSize |
std::vector< boost::shared_ptr < boost::thread > > | threadPool |
DeliveryHandlerPtr | deliveryHandler |
FilterHandlerPtr | filterHandler |
A thread pool that dispatches messages to a list of receivers.
The number of threads is usually smaller than the number of receivers and for each receiver it is guaranteed that messages arrive in the order they were published. No guarantees are given between different receivers. All methods except start and stop are reentrant.
The pool can be stopped and restarted at any time during the processing but these calls must be single-threaded.
Assumptions:
Filtering and delivery of message to receivers are performed by handlers. These handlers should be stateless.
M | type of the messages dispatched by the pool |
R | type of the message receiver |
Definition at line 65 of file OrderedQueueDispatcherPool.h.
typedef boost::function<void(boost::shared_ptr<R>& receiver, const M& message)> rsc::threading::OrderedQueueDispatcherPool< M, R >::deliverFunction |
A function that delivers a message to a receiver.
Must be reentrant.
Definition at line 72 of file OrderedQueueDispatcherPool.h.
typedef boost::shared_ptr<DeliveryHandler> rsc::threading::OrderedQueueDispatcherPool< M, R >::DeliveryHandlerPtr |
Definition at line 106 of file OrderedQueueDispatcherPool.h.
typedef boost::function<bool(boost::shared_ptr<R>& receiver, const M& message)> rsc::threading::OrderedQueueDispatcherPool< M, R >::filterFunction |
A function that filters a message for a receiver.
If true
is returned, the message is acceptable for the receiver, else it will not be delivered. Must be reentrant.
Definition at line 80 of file OrderedQueueDispatcherPool.h.
typedef boost::shared_ptr<FilterHandler> rsc::threading::OrderedQueueDispatcherPool< M, R >::FilterHandlerPtr |
Definition at line 133 of file OrderedQueueDispatcherPool.h.
|
inline |
Constructs a new pool.
threadPoolSize | number of threads for this pool |
delFunc | the strategy used to deliver messages of type M to receivers of type R. This will most likely be a simple delegate function mapping to a concrete method call. Must be reentrant. |
Definition at line 364 of file OrderedQueueDispatcherPool.h.
|
inline |
Constructs a new pool.
threadPoolSize | number of threads for this pool |
delFunc | the strategy used to deliver messages of type M to receivers of type R. This will most likely be a simple delegate function mapping to a concrete method call. Must be reentrant. |
filterFunc | Reentrant function used to filter messages per receiver. Default accepts every message. |
Definition at line 384 of file OrderedQueueDispatcherPool.h.
|
inline |
Constructs a new pool using the object-oriented handler interface that accepts every message.
threadPoolSize | number of threads for this pool |
deliveryHandler | handler to deliver messages to receivers |
Definition at line 399 of file OrderedQueueDispatcherPool.h.
|
inline |
Constructs a new pool using the object-oriented handler interface.
threadPoolSize | number of threads for this pool |
deliveryHandler | handler to deliver messages to receivers |
filterHandler | filter handler for messages |
Definition at line 413 of file OrderedQueueDispatcherPool.h.
|
inlinevirtual |
Definition at line 420 of file OrderedQueueDispatcherPool.h.
References rsc::threading::OrderedQueueDispatcherPool< M, R >::stop().
|
inlineprivate |
Definition at line 301 of file OrderedQueueDispatcherPool.h.
References rsc::threading::OrderedQueueDispatcherPool< M, R >::jobsAvailable, rsc::threading::OrderedQueueDispatcherPool< M, R >::jobsAvailableCondition, and rsc::threading::OrderedQueueDispatcherPool< M, R >::receiversMutex.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::worker().
|
inlineprivate |
Returns the next job to process for worker threads and blocks if there is no job.
workerNum | number of the worker requesting a new job |
receiver | out param with the receiver to work on |
Definition at line 239 of file OrderedQueueDispatcherPool.h.
References rsc::threading::OrderedQueueDispatcherPool< M, R >::currentPosition, rsc::threading::OrderedQueueDispatcherPool< M, R >::interrupted, rsc::threading::OrderedQueueDispatcherPool< M, R >::jobsAvailable, rsc::threading::OrderedQueueDispatcherPool< M, R >::jobsAvailableCondition, rsc::threading::OrderedQueueDispatcherPool< M, R >::receivers, and rsc::threading::OrderedQueueDispatcherPool< M, R >::receiversMutex.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::worker().
|
inline |
Pushes a new message to be dispatched to all receivers in this pool.
message | message to dispatch |
Definition at line 515 of file OrderedQueueDispatcherPool.h.
References rsc::threading::OrderedQueueDispatcherPool< M, R >::jobsAvailable, rsc::threading::OrderedQueueDispatcherPool< M, R >::jobsAvailableCondition, rsc::threading::OrderedQueueDispatcherPool< M, R >::receivers, and rsc::threading::OrderedQueueDispatcherPool< M, R >::receiversMutex.
|
inline |
Registers a new receiver at the pool.
Multiple registrations of the same receiver are possible resulting in being called multiple times for the same message (but effectively this destroys the guarantee about ordering given above because multiple message queues are used for every subscription).
receiver | new receiver |
Definition at line 433 of file OrderedQueueDispatcherPool.h.
References rsc::threading::OrderedQueueDispatcherPool< M, R >::receivers, and rsc::threading::OrderedQueueDispatcherPool< M, R >::receiversMutex.
|
inline |
Non-blocking start.
IllegalStateException | if the pool was already started and is running |
Definition at line 470 of file OrderedQueueDispatcherPool.h.
References rsc::threading::OrderedQueueDispatcherPool< M, R >::interrupted, rsc::threading::OrderedQueueDispatcherPool< M, R >::receiversMutex, rsc::threading::OrderedQueueDispatcherPool< M, R >::started, rsc::threading::OrderedQueueDispatcherPool< M, R >::threadPool, rsc::threading::OrderedQueueDispatcherPool< M, R >::threadPoolSize, and rsc::threading::OrderedQueueDispatcherPool< M, R >::worker().
|
inline |
Blocking until every thread has stopped working.
Definition at line 493 of file OrderedQueueDispatcherPool.h.
References rsc::threading::OrderedQueueDispatcherPool< M, R >::interrupted, rsc::threading::OrderedQueueDispatcherPool< M, R >::jobsAvailableCondition, rsc::threading::OrderedQueueDispatcherPool< M, R >::receiversMutex, rsc::threading::OrderedQueueDispatcherPool< M, R >::started, and rsc::threading::OrderedQueueDispatcherPool< M, R >::threadPool.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::~OrderedQueueDispatcherPool().
|
inline |
Unregisters all registration of one receiver.
receiver | receiver to unregister |
true
if one or more receivers were unregistered, else false
Definition at line 445 of file OrderedQueueDispatcherPool.h.
References rsc::threading::OrderedQueueDispatcherPool< M, R >::receivers, and rsc::threading::OrderedQueueDispatcherPool< M, R >::receiversMutex.
|
inlineprivate |
Threaded worker method.
Definition at line 323 of file OrderedQueueDispatcherPool.h.
References rsc::threading::OrderedQueueDispatcherPool< M, R >::deliveryHandler, rsc::threading::OrderedQueueDispatcherPool< M, R >::filterHandler, rsc::threading::OrderedQueueDispatcherPool< M, R >::finishedWork(), and rsc::threading::OrderedQueueDispatcherPool< M, R >::nextJob().
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::start().
|
private |
Definition at line 224 of file OrderedQueueDispatcherPool.h.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::nextJob().
|
private |
Definition at line 349 of file OrderedQueueDispatcherPool.h.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::worker().
|
private |
Definition at line 350 of file OrderedQueueDispatcherPool.h.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::worker().
|
private |
Definition at line 228 of file OrderedQueueDispatcherPool.h.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::nextJob(), rsc::threading::OrderedQueueDispatcherPool< M, R >::start(), and rsc::threading::OrderedQueueDispatcherPool< M, R >::stop().
|
private |
|
private |
Definition at line 227 of file OrderedQueueDispatcherPool.h.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::finishedWork(), rsc::threading::OrderedQueueDispatcherPool< M, R >::nextJob(), rsc::threading::OrderedQueueDispatcherPool< M, R >::push(), and rsc::threading::OrderedQueueDispatcherPool< M, R >::stop().
|
private |
Definition at line 223 of file OrderedQueueDispatcherPool.h.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::nextJob(), rsc::threading::OrderedQueueDispatcherPool< M, R >::push(), rsc::threading::OrderedQueueDispatcherPool< M, R >::registerReceiver(), and rsc::threading::OrderedQueueDispatcherPool< M, R >::unregisterReceiver().
|
private |
Definition at line 222 of file OrderedQueueDispatcherPool.h.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::finishedWork(), rsc::threading::OrderedQueueDispatcherPool< M, R >::nextJob(), rsc::threading::OrderedQueueDispatcherPool< M, R >::push(), rsc::threading::OrderedQueueDispatcherPool< M, R >::registerReceiver(), rsc::threading::OrderedQueueDispatcherPool< M, R >::start(), rsc::threading::OrderedQueueDispatcherPool< M, R >::stop(), and rsc::threading::OrderedQueueDispatcherPool< M, R >::unregisterReceiver().
|
private |
Definition at line 230 of file OrderedQueueDispatcherPool.h.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::start(), and rsc::threading::OrderedQueueDispatcherPool< M, R >::stop().
|
private |
Definition at line 347 of file OrderedQueueDispatcherPool.h.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::start(), and rsc::threading::OrderedQueueDispatcherPool< M, R >::stop().
|
private |
Definition at line 299 of file OrderedQueueDispatcherPool.h.
Referenced by rsc::threading::OrderedQueueDispatcherPool< M, R >::start().