rsb.transport.socket

This package contains a transport implementation that uses multiple point-to-point socket connections to simulate a bus.

Code author: jmoringe

Functions

getBusClientFor(host, port, tcpnodelay, …) Return (creating it if necessary), a BusClient for the endpoint designated by host and port and attach connector to it.
getBusServerFor(host, port, tcpnodelay, …) Return (creating it if necessary), a BusServer for the endpoint designated by host and port and attach connector to it.
initialize()
removeConnector(bus, connector)

Classes

Bus() Instances of this class provide access to a socket-based bus.
BusClient(host, port, tcpnodelay) Instances of this class provide access to a bus by means of a client socket.
BusConnection([host, port, socket_, …]) Instances of this class implement connections to a socket-based bus.
BusServer(host, port, tcpnodelay[, backlog]) Instances of this class provide access to a socket-based bus for local and remote bus clients.
Connector(converters[, options]) Instances of subclasses of this class receive events from a bus (represented by a Bus object) that is accessed via a socket connection.
InPushConnector(**kwargs) Instances of this class receive events from a bus (represented by a Bus object) that is accessed via a socket connection.
OutConnector(**kwargs) Instance of this class send events to a bus (represented by a Bus object) that is accessed via a socket connection.
TransportFactory TransportFactory implementation for the socket transport.
class rsb.transport.socket.Bus

Bases: object

Instances of this class provide access to a socket-based bus.

It is transparent for clients (connectors) of this class whether is accessed by running the bus server or by connecting to the bus server as a client.

In-direction connectors add themselves as event sinks using the addConnector method.

Out-direction connectors submit events to the bus using the handleOutgoing method.

Code author: jmoringe

activate()
addConnection(connection)

Add connection to the list of connections of this bus. This cause notifications send over this bus to be send through connection and notifications received via connection to be dispatched to connectors of this bus.

Parameters:connection – The connection that should be added to this bus.
addConnector(connector)

Add connector to the list of connectors of this bus. Depending on the direction of connector, this causes connector to either receive or broadcast notifications via this bus.

Parameters:connector – The connector that should be added to this bus.
deactivate()
getConnections()
Returns:A list of all connections to the bus.
Return type:list
getConnectors()
getLock()
handleIncoming(connectionAndNotification)
handleOutgoing(notification)
removeConnection(connection)

Remove connection from the list of connections of this bus.

Parameters:connection – The connection that should be removed from this bus.
removeConnector(connector)

Remove connector from the list of connectors of this bus.

Parameters:connector – The connector that should be removed from this bus.
connections
list:
A list of all connections to the bus.
Type:Returns
connectors
lock
class rsb.transport.socket.BusClient(host, port, tcpnodelay)

Bases: rsb.transport.socket.Bus

Instances of this class provide access to a bus by means of a client socket.

Code author: jmoringe

Parameters:
  • host (str) – A hostname or address of the node on which the bus server listens.
  • port (int) – The port on which the new bus server listens.
  • tcpnodelay (bool) – If True, the socket will be set to TCP_NODELAY.
activate()
addConnection(connection)

Add connection to the list of connections of this bus. This cause notifications send over this bus to be send through connection and notifications received via connection to be dispatched to connectors of this bus.

Parameters:connection – The connection that should be added to this bus.
addConnector(connector)

Add connector to the list of connectors of this bus. Depending on the direction of connector, this causes connector to either receive or broadcast notifications via this bus.

Parameters:connector – The connector that should be added to this bus.
deactivate()
getConnections()
Returns:A list of all connections to the bus.
Return type:list
getConnectors()
getLock()
handleIncoming(connectionAndNotification)
handleOutgoing(notification)
removeConnection(connection)

Remove connection from the list of connections of this bus.

Parameters:connection – The connection that should be removed from this bus.
removeConnector(connector)

Remove connector from the list of connectors of this bus.

Parameters:connector – The connector that should be removed from this bus.
connections
list:
A list of all connections to the bus.
Type:Returns
connectors
lock
class rsb.transport.socket.BusConnection(host=None, port=None, socket_=None, isServer=False, tcpnodelay=True)

Bases: rsb.eventprocessing.BroadcastProcessor

Instances of this class implement connections to a socket-based bus.

The basic operations provided by this class are receiving an event by calling receiveNotification and submitting an event to the bus by calling sendNotification.

In a process which act as a client for a particular bus, a single instance of this class is connected to the bus server and provides access to the bus for the process.

A process which acts as the server for a particular bus, manages (via the BusServer class) one BusConnection object for each client (remote process) connected to the bus.

Code author: jmoringe

Args:

Returns:

Parameters:
  • host (str or None) – Hostname or address of the bus server.
  • port (int or None) – Port of the bus server.
  • socket – A socket object through which the new connection should access the bus.
  • isServer (bool) – if True, the created object will perform the server part of the handshake protocol.
  • tcpnodelay (bool) – If True, the socket will be set to TCP_NODELAY.
activate()
addHandler(handler)
static bufferToNotification(serialized)
deactivate()
dispatch(event)
doOneNotification()
getErrorHook()
getHandlers()
handle(notification)
static notificationToBuffer(notification)
receiveNotification()
receiveNotifications()
removeHandler(handler)
sendNotification(notification)
setErrorHook(newValue)
shutdown()
waitForDeactivation()
errorHook
handlers
class rsb.transport.socket.BusServer(host, port, tcpnodelay, backlog=5)

Bases: rsb.transport.socket.Bus

Instances of this class provide access to a socket-based bus for local and remote bus clients.

Remote clients can connect to a server socket in order to send and receive events through the resulting socket connection.

Local clients (connectors) use the usual Bus interface to receive events submitted by remote clients and submit events which will be distributed to remote clients by the BusServer.

Code author: jmoringe

Parameters:
  • host (str) – A hostname or address identifying the interface to which the listen socket of the new bus server should be bound.
  • port (int) – The port to which the listen socket of the new bus server should be bound.
  • tcpnodelay (bool) – If True, the socket will be set to TCP_NODELAY.
  • backlog (int) – The maximum number of queued connection attempts.
acceptClients()
activate()
addConnection(connection)

Add connection to the list of connections of this bus. This cause notifications send over this bus to be send through connection and notifications received via connection to be dispatched to connectors of this bus.

Parameters:connection – The connection that should be added to this bus.
addConnector(connector)

Add connector to the list of connectors of this bus. Depending on the direction of connector, this causes connector to either receive or broadcast notifications via this bus.

Parameters:connector – The connector that should be added to this bus.
deactivate()
getConnections()
Returns:A list of all connections to the bus.
Return type:list
getConnectors()
getLock()
handleIncoming(connectionAndNotification)
handleOutgoing(notification)
removeConnection(connection)

Remove connection from the list of connections of this bus.

Parameters:connection – The connection that should be removed from this bus.
removeConnector(connector)

Remove connector from the list of connectors of this bus.

Parameters:connector – The connector that should be removed from this bus.
connections
list:
A list of all connections to the bus.
Type:Returns
connectors
lock
class rsb.transport.socket.Connector(converters, options=None, **kwargs)

Bases: rsb.transport.Connector, rsb.transport.ConverterSelectingConnector

Instances of subclasses of this class receive events from a bus (represented by a Bus object) that is accessed via a socket connection.

Code author: jmoringe

activate()
deactivate()
getBus()
getConverterForDataType(dataType)

Returns a converter that can convert the supplied data to the wire-type.

Parameters:dataType – the type of the object for which a suitable converter should returned.
Returns:converter
Raises:KeyError – no converter is available for the supplied data.
getConverterForWireSchema(wireSchema)

Returns a suitable converter for the wireSchema.

Parameters:wireSchema (str) – the wire-schema to or from which the returned converter should convert
Returns:converter
Raises:KeyError – no converter is available for the specified wire-schema.
getConverterMap()
getScope()
getTransportURL()
getWireType()

Returns the serialization type used for this connector.

Returns:python serialization type
setQualityOfServiceSpec(qos)
setScope(newValue)

Sets the scope this connector will receive events from to newValue. Called before #activate.

Parameters:newValue (rsb.Scope) – scope of the connector
bus
converterMap
scope
wireType

Returns the serialization type used for this connector.

Returns:python serialization type
class rsb.transport.socket.InPushConnector(**kwargs)

Bases: rsb.transport.socket.Connector, rsb.transport.InPushConnector

Instances of this class receive events from a bus (represented by a Bus object) that is accessed via a socket connection.

The receiving and dispatching of events is done in push mode: each instance has a Bus which pushes appropriate events into the instance. The connector deserializes event payloads and pushes the events into handlers (usually objects which implement some event processing strategy).

Code author: jmoringe

activate()
deactivate()
filterNotify(theFilter, action)
getBus()
getConverterForDataType(dataType)

Returns a converter that can convert the supplied data to the wire-type.

Parameters:dataType – the type of the object for which a suitable converter should returned.
Returns:converter
Raises:KeyError – no converter is available for the supplied data.
getConverterForWireSchema(wireSchema)

Returns a suitable converter for the wireSchema.

Parameters:wireSchema (str) – the wire-schema to or from which the returned converter should convert
Returns:converter
Raises:KeyError – no converter is available for the specified wire-schema.
getConverterMap()
getScope()
getTransportURL()
getWireType()

Returns the serialization type used for this connector.

Returns:python serialization type
handle(notification)
setObserverAction(action)

Sets the action used by the connector to notify about incoming events. The call to this method must be thread-safe.

Parameters:action – action called if a new message is received from the connector. Must accept an Event as parameter.
setQualityOfServiceSpec(qos)
setScope(newValue)

Sets the scope this connector will receive events from to newValue. Called before #activate.

Parameters:newValue (rsb.Scope) – scope of the connector
bus
converterMap
scope
wireType

Returns the serialization type used for this connector.

Returns:python serialization type
class rsb.transport.socket.OutConnector(**kwargs)

Bases: rsb.transport.socket.Connector, rsb.transport.OutConnector

Instance of this class send events to a bus (represented by a Bus object) that is accessed via a socket connection.

Code author: jmoringe

activate()
deactivate()
getBus()
getConverterForDataType(dataType)

Returns a converter that can convert the supplied data to the wire-type.

Parameters:dataType – the type of the object for which a suitable converter should returned.
Returns:converter
Raises:KeyError – no converter is available for the supplied data.
getConverterForWireSchema(wireSchema)

Returns a suitable converter for the wireSchema.

Parameters:wireSchema (str) – the wire-schema to or from which the returned converter should convert
Returns:converter
Raises:KeyError – no converter is available for the specified wire-schema.
getConverterMap()
getScope()
getTransportURL()
getWireType()

Returns the serialization type used for this connector.

Returns:python serialization type
handle(event)

Sends event and adapts its meta data instance with the actual send time.

Parameters:event – event to send
setQualityOfServiceSpec(qos)
setScope(newValue)

Sets the scope this connector will receive events from to newValue. Called before #activate.

Parameters:newValue (rsb.Scope) – scope of the connector
bus
converterMap
scope
wireType

Returns the serialization type used for this connector.

Returns:python serialization type
class rsb.transport.socket.TransportFactory

Bases: rsb.transport.TransportFactory

TransportFactory implementation for the socket transport.

Code author: jwienke

createInPullConnector(converters, options)

Creates a new instance of an InPullConnector for the represented transport.

Parameters:
  • converters (ConverterSelectionStrategy) – the converters to use for this type
  • options (dict of str) – options for the new connector
Returns:

the new connector instance

Return type:

rsb.transport.InPullConnector

createInPushConnector(converters, options)

Creates a new instance of an InPushConnector for the represented transport.

Parameters:converters (ConverterSelectionStrategy) – the converters to use for this type options(dict of str): options for the new connector
Returns:the new connector instance
Return type:rsb.transport.InPushConnector
createOutConnector(converters, options)

Creates a new instance of an OutConnector for the represented transport.

Parameters:converters (ConverterSelectionStrategy) – the converters to use for this type options(dict of str): options for the new connector
Returns:the new connector instance
Return type:rsb.transport.OutConnector
getName()

Returns the name representing this transport.

Returns:name of the transport, non-empty
Return type:str
isRemote()

Returns true is the transport performs remote communication.

Returns:does the transport perform remote communication?
Return type:bool
rsb.transport.socket.getBusClientFor(host, port, tcpnodelay, connector)

Return (creating it if necessary), a BusClient for the endpoint designated by host and port and attach connector to it. Attaching connector marks the bus client as being in use and protects it from being destroyed in a race condition situation.

Parameters:
  • host (str) – A hostname or address of the node on which the bus server listens.
  • port (int) – The port on which the bus server listens.
  • tcpnodelay (bool) – If True, the socket will be set to TCP_NODELAY.
  • connector – A connector that should be attached to the bus client.
rsb.transport.socket.getBusServerFor(host, port, tcpnodelay, connector)

Return (creating it if necessary), a BusServer for the endpoint designated by host and port and attach connector to it. Attaching connector marks the bus server as being in use and protects it from being destroyed in a race condition situation.

Parameters:
  • host (str) – A hostname or address identifying the interface to which the listen socket of the new bus server should be bound.
  • port (int) – The port to which the listen socket of the new bus server should be bound.
  • tcpnodelay (bool) – If True, the socket will be set to TCP_NODELAY.
  • connector – A connector that should be attached to the bus server.
rsb.transport.socket.initialize()
rsb.transport.socket.removeConnector(bus, connector)