rsb.eventprocessing

A module with classes maintaining the processing of events between the transport layer and the client interface.

Code author: jwienke

Code author: jmoringe

Classes

BroadcastProcessor([handlers]) This event processor implements synchronous broadcast dispatch to a list of handlers.
Configurator([connectors]) Superclass for in- and out-direction Configurator classes.
DirectEventSendingStrategy()
EventReceivingStrategy Superclass for event receiving strategies.
EventSendingStrategy
FirstConnectorPullEventReceivingStrategy Directly receives events only from the first provided connector.
FullyParallelEventReceivingStrategy() An PushEventReceivingStrategy that dispatches events to multiple handlers in individual threads in parallel.
InPullRouteConfigurator([connectors, …]) Instances of this class manage the pull-based receiving of events via one or more rsb.transport.Connector s and an PullEventReceivingStrategy.
InPushRouteConfigurator([connectors, …]) Instances of this class manage the receiving, filtering and dispatching of events via one or more rsb.transport.Connector s and an PushEventReceivingStrategy.
NonQueuingParallelEventReceivingStrategy() An PushEventReceivingStrategy that dispatches events to multiple handlers using a single thread and without queuing.
OutRouteConfigurator([connectors, …]) Instances of this class manage the sending of events via one or more rsb.transport.Connector s and an EventSendingStrategy.
ParallelEventReceivingStrategy([numThreads]) An PushEventReceivingStrategy that dispatches events to multiple handlers in individual threads in parallel.
PullEventReceivingStrategy Superclass for pull-based event receiving.
PushEventReceivingStrategy Superclass for push-based event receiving strategies.
class rsb.eventprocessing.BroadcastProcessor(handlers=None)

Bases: object

This event processor implements synchronous broadcast dispatch to a list of handlers.

Code author: jmoringe

addHandler(handler)
dispatch(event)
getHandlers()
handle(event)
removeHandler(handler)
handlers
class rsb.eventprocessing.Configurator(connectors=None)

Bases: object

Superclass for in- and out-direction Configurator classes. Manages the basic aspects like the connector list and (de)activation that are not direction-specific.

Code author: jwienke

Code author: jmoringe

activate()
deactivate()
getConnectors()
getScope()
getTransportURLs()

Return list of transport URLs describing the connectors managed by the configurator.

Returns:List of transport URLs.
Return type:list
isActive()
setQualityOfServiceSpec(qos)
setScope(scope)

Defines the scope the in route has to be set up. This will be called before calling #activate.

Parameters:scope (rsb.Scope) – the scope of the in route
active
connectors
scope
transportURLs

Return list of transport URLs describing the connectors managed by the configurator.

Returns:List of transport URLs.
Return type:list
class rsb.eventprocessing.DirectEventSendingStrategy

Bases: rsb.eventprocessing.EventSendingStrategy

addConnector(connector)
getConnectors()
handle(event)
removeConnector(connector)
connectors
class rsb.eventprocessing.EventReceivingStrategy

Bases: object

Superclass for event receiving strategies.

Code author: jwienke

class rsb.eventprocessing.EventSendingStrategy

Bases: object

addConnector(connector)
getConnectors()
handle(event)
removeConnector(connector)
connectors
class rsb.eventprocessing.FirstConnectorPullEventReceivingStrategy

Bases: rsb.eventprocessing.PullEventReceivingStrategy

Directly receives events only from the first provided connector.

Code author: jwienke

raiseEvent(block)

Receives the next event.

Parameters:block (bool) – if True, wait for the next event. Else, immediately return, potentially a None.
setConnectors(connectors)
class rsb.eventprocessing.FullyParallelEventReceivingStrategy

Bases: rsb.eventprocessing.PushEventReceivingStrategy

An PushEventReceivingStrategy that dispatches events to multiple handlers in individual threads in parallel. Each handler can be called in parallel for different requests.

Code author: jwienke

class Worker(handler, event, filters)

Bases: threading.Thread

getName()
isAlive()

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. The module function enumerate() returns a list of all alive threads.

isDaemon()
is_alive()

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. The module function enumerate() returns a list of all alive threads.

join(timeout=None)

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call isAlive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

setDaemon(daemonic)
setName(name)
start()

Start the thread’s activity.

It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.

daemon

A boolean value indicating whether this thread is a daemon thread (True) or not (False).

This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when no alive non-daemon threads are left.

ident

Thread identifier of this thread or None if it has not been started.

This is a nonzero integer. See the thread.get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.

name

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

addFilter(f)
addHandler(handler, wait)
deactivate()
handle(event)

Dispatches the event to all registered listeners.

Parameters:event – event to dispatch
removeFilter(theFilter)
removeHandler(handler, wait)
class rsb.eventprocessing.InPullRouteConfigurator(connectors=None, receivingStrategy=None)

Bases: rsb.eventprocessing.Configurator

Instances of this class manage the pull-based receiving of events via one or more rsb.transport.Connector s and an PullEventReceivingStrategy.

Code author: jwienke

Creates a new configurator which manages connectors and receivingStrategy.

Parameters:
  • connectors – Connectors through which events are received.
  • receivingStrategy – The event receiving strategy according to which the dispatching of incoming events should be performed.
activate()
deactivate()
getConnectors()
getReceivingStrategy()
getScope()
getTransportURLs()

Return list of transport URLs describing the connectors managed by the configurator.

Returns:List of transport URLs.
Return type:list
isActive()
setQualityOfServiceSpec(qos)
setScope(scope)

Defines the scope the in route has to be set up. This will be called before calling #activate.

Parameters:scope (rsb.Scope) – the scope of the in route
active
connectors
scope
transportURLs

Return list of transport URLs describing the connectors managed by the configurator.

Returns:List of transport URLs.
Return type:list
class rsb.eventprocessing.InPushRouteConfigurator(connectors=None, receivingStrategy=None)

Bases: rsb.eventprocessing.Configurator

Instances of this class manage the receiving, filtering and dispatching of events via one or more rsb.transport.Connector s and an PushEventReceivingStrategy.

Code author: jwienke

Code author: jmoringe

Creates a new configurator which manages connectors and receivingStrategy.

Parameters:
  • connectors – Connectors through which events are received.
  • receivingStrategy – The event receiving strategy according to which the filtering and dispatching of incoming events should be performed.
activate()
deactivate()
filterAdded(theFilter)
filterRemoved(theFilter)
getConnectors()
getScope()
getTransportURLs()

Return list of transport URLs describing the connectors managed by the configurator.

Returns:List of transport URLs.
Return type:list
handlerAdded(handler, wait)
handlerRemoved(handler, wait)
isActive()
setQualityOfServiceSpec(qos)
setScope(scope)

Defines the scope the in route has to be set up. This will be called before calling #activate.

Parameters:scope (rsb.Scope) – the scope of the in route
active
connectors
scope
transportURLs

Return list of transport URLs describing the connectors managed by the configurator.

Returns:List of transport URLs.
Return type:list
class rsb.eventprocessing.NonQueuingParallelEventReceivingStrategy

Bases: rsb.eventprocessing.PushEventReceivingStrategy

An PushEventReceivingStrategy that dispatches events to multiple handlers using a single thread and without queuing. Only a single buffer is used to decouple the transport from the registered handlers. In case the handler processing is slower than the transport, the transport will block on inserting events into this strategy. Callers must ensure that they are in no active call for #handle when deactivating this instance.

Code author: jwienke

addFilter(f)
addHandler(handler, wait)
deactivate()
handle(event)
removeFilter(theFilter)
removeHandler(handler, wait)
class rsb.eventprocessing.OutRouteConfigurator(connectors=None, sendingStrategy=None)

Bases: rsb.eventprocessing.Configurator

Instances of this class manage the sending of events via one or more rsb.transport.Connector s and an EventSendingStrategy.

Code author: jmoringe

activate()
deactivate()
getConnectors()
getScope()
getTransportURLs()

Return list of transport URLs describing the connectors managed by the configurator.

Returns:List of transport URLs.
Return type:list
handle(event)
isActive()
setQualityOfServiceSpec(qos)
setScope(scope)

Defines the scope the in route has to be set up. This will be called before calling #activate.

Parameters:scope (rsb.Scope) – the scope of the in route
active
connectors
scope
transportURLs

Return list of transport URLs describing the connectors managed by the configurator.

Returns:List of transport URLs.
Return type:list
class rsb.eventprocessing.ParallelEventReceivingStrategy(numThreads=5)

Bases: rsb.eventprocessing.PushEventReceivingStrategy

An PushEventReceivingStrategy that dispatches events to multiple handlers in individual threads in parallel. Each handler is called only sequentially but potentially from different threads.

Code author: jwienke

addFilter(theFilter)
addHandler(handler, wait)
deactivate()
handle(event)

Dispatches the event to all registered listeners.

Parameters:event – event to dispatch
removeFilter(theFilter)
removeHandler(handler, wait)
class rsb.eventprocessing.PullEventReceivingStrategy

Bases: rsb.eventprocessing.EventReceivingStrategy

Superclass for pull-based event receiving.

Code author: jwienke

raiseEvent(block)

Receives the next event.

Parameters:block (bool) – if True, wait for the next event. Else, immediately return, potentially a None.
setConnectors(connectors)
class rsb.eventprocessing.PushEventReceivingStrategy

Bases: rsb.eventprocessing.EventReceivingStrategy

Superclass for push-based event receiving strategies.

Code author: jmoringe

Code author: jwienke

addFilter(theFilter)
addHandler(handler, wait)
handle(event)
removeFilter(theFilter)
removeHandler(handler, wait)