29 #include <rsc/runtime/ContainerIO.h> 31 #include <boost/bind.hpp> 33 #include <boost/thread.hpp> 35 #include <boost/asio/ip/tcp.hpp> 39 #include "../../MetaData.h" 46 using boost::asio::ip::tcp;
53 logger(Logger::getLogger(
"rsb.transport.socket.Bus")),
54 asioService(asioService), tcpnodelay(tcpnodelay) {
58 RSCDEBUG(
logger,
"Destructing bus instance");
61 if (!this->
sinks.empty()) {
62 RSCWARN(
logger,
"" << this->
sinks.size() <<
" non-empty scopes when destructing");
67 for (ConnectionList::iterator it = this->
connections.begin();
71 }
catch (
const std::exception& e) {
72 RSCDEBUG(
logger,
"Failed to disconnect connection " << *it
77 RSCDEBUG(
logger,
"Bus destruction finished");
97 boost::recursive_mutex::scoped_lock lock(this->
connectorLock);
99 Scope scope = sink->getScope();
100 RSCDEBUG(
logger,
"Adding sink " << sink <<
" to scope " << scope);
102 connectors.push_back(sink);
106 boost::recursive_mutex::scoped_lock lock(this->
connectorLock);
109 RSCDEBUG(
logger,
"Removing sink " << sink <<
" from scope " << scope);
111 RSCDEBUG(
logger,
"Scope " << scope <<
" has " 112 << connectors.size() <<
" connectors (before removing)");
113 for (SinkList::iterator it = connectors.begin(); it != connectors.end(); ++it) {
117 if (!ptr || (ptr.get() == sink)) {
118 RSCDEBUG(
logger,
"Found connector " << sink <<
" in scope " << scope);
119 connectors.erase(it);
123 RSCDEBUG(
logger,
"Scope " << scope <<
" has " 124 << connectors.size() <<
" connectors (after removing)");
128 if (connectors.empty()) {
129 RSCDEBUG(
logger,
"Removing empty scope " << scope);
130 this->
sinks.erase(scope);
135 RSCDEBUG(
logger,
"Adding connection " << connection);
143 RSCDEBUG(
logger,
"Removing connection " << connection);
152 RSCDEBUG(
logger,
"Delivering received event to connectors " << event);
154 vector<Scope> scopes =
event->getScopePtr()->superScopes(
true);
155 RSCDEBUG(
logger,
"Relevant scopes " << scopes);
158 boost::recursive_mutex::scoped_lock lock(this->
connectorLock);
160 for (vector<Scope>::const_iterator it = scopes.begin(); it != scopes.end(); ++it) {
161 SinkMap::const_iterator it_ = this->
sinks.find(*it);
162 if (it_ != this->
sinks.end()) {
163 const SinkList& connectors = it_->second;
165 for (SinkList::const_iterator it__ = connectors.begin(); it__
166 != connectors.end(); ++it__) {
169 RSCDEBUG(
logger,
"Delivering to connector " << connector <<
" in " << *it);
170 connector->handle(event);
180 RSCDEBUG(
logger,
"Delivering outgoing event to connectors " << event);
182 vector<Scope> scopes =
event->getScopePtr()->superScopes(
true);
183 RSCDEBUG(
logger,
"Relevant scopes " << scopes);
186 boost::recursive_mutex::scoped_lock lock(this->
connectorLock);
188 for (vector<Scope>::const_iterator it = scopes.begin(); it != scopes.end(); ++it) {
189 SinkMap::const_iterator it_ = this->
sinks.find(*it);
190 if (it_ != this->
sinks.end()) {
191 const SinkList& connectors = it_->second;
193 for (SinkList::const_iterator it__ = connectors.begin(); it__
194 != connectors.end(); ++it__) {
197 RSCDEBUG(
logger,
"Delivering to connector " << connector <<
" in " << *it);
198 connector->handle(event);
209 RSCDEBUG(
logger,
"Dispatching outgoing event " << event <<
" to connections");
211 string wireSchema =
event->getMetaData().getUserInfo(
"rsb.wire-schema");
212 list<BusConnectionPtr> failing;
213 for (list<BusConnectionPtr>::iterator it = this->
connections.begin();
215 RSCDEBUG(
logger,
"Dispatching to connection " << *it);
217 (*it)->sendEvent(event, wireSchema);
218 }
catch (
const std::exception& e) {
219 RSCWARN(
logger,
"Send failure (" << e.what() <<
"); will close connection later");
222 failing.push_back(*it);
227 for (list<BusConnectionPtr>::const_iterator it = failing.begin();
228 it != failing.end(); ++it) {
236 <<
", sinks = " << this->
sinks;
rsc::logging::LoggerPtr logger
ConnectionList connections
boost::shared_ptr< InConnector > InConnectorPtr
virtual void handle(EventPtr event)
Handle event.
ConnectionList getConnections() const
virtual void removeConnection(BusConnectionPtr connection)
Removes connection from the list of connections of this bus.
virtual void handleIncoming(EventPtr event, BusConnectionPtr connection)
boost::shared_ptr< AsioServiceContext > AsioServiceContextPtr
boost::shared_ptr< BusConnection > BusConnectionPtr
std::list< boost::weak_ptr< InConnector > > SinkList
virtual Scope getScope() const
Instances of this class receive events from a bus that is accessed via a socket connection.
virtual bool isTcpnodelay() const
virtual void addConnection(BusConnectionPtr connection)
Adds connection to the list of connections of the bus.
AsioServiceContextPtr asioService
virtual void printContents(std::ostream &stream) const
boost::recursive_mutex & getConnectionLock()
virtual void removeSink(InConnector *sink)
boost::recursive_mutex connectorLock
virtual void addSink(InConnectorPtr sink)
boost::recursive_mutex connectionLock
virtual const std::string getTransportURL() const
virtual AsioServiceContextPtr getService() const
boost::shared_ptr< Event > EventPtr
Scope is a descriptor for a hierarchical channel of the unified bus.
std::list< BusConnectionPtr > ConnectionList