RSB  0.17.0
Bus.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is part of the RSB project
4  *
5  * Copyright (C) 2011, 2012, 2015 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "Bus.h"
28 
29 #include <rsc/runtime/ContainerIO.h>
30 
31 #include <boost/bind.hpp>
32 
33 #include <boost/thread.hpp>
34 
35 #include <boost/asio/ip/tcp.hpp>
36 
37 #include "Factory.h"
38 #include "InConnector.h"
39 #include "../../MetaData.h"
40 
41 using namespace std;
42 
43 using namespace rsc::logging;
44 
45 using namespace boost::asio;
46 using boost::asio::ip::tcp;
47 
48 namespace rsb {
49 namespace transport {
50 namespace socket {
51 
52 Bus::Bus(AsioServiceContextPtr asioService, bool tcpnodelay) :
53  logger(Logger::getLogger("rsb.transport.socket.Bus")),
54  asioService(asioService), tcpnodelay(tcpnodelay) {
55 }
56 
58  RSCDEBUG(logger, "Destructing bus instance");
59 
60  // Sinks should be empty.
61  if (!this->sinks.empty()) {
62  RSCWARN(logger, "" << this->sinks.size() << " non-empty scopes when destructing");
63  }
64 
65  // Active connections hold a shared_ptr to themselves and would
66  // thus not be destructed. Disconnecting resolves this problem.
67  for (ConnectionList::iterator it = this->connections.begin();
68  it != this->connections.end(); ++it) {
69  try {
70  (*it)->shutdown();
71  } catch (const std::exception& e) {
72  RSCDEBUG(logger, "Failed to disconnect connection " << *it
73  << ": " << e.what());
74  }
75  }
76 
77  RSCDEBUG(logger, "Bus destruction finished");
78 }
79 
81  return this->asioService;
82 }
83 
84 bool Bus::isTcpnodelay() const {
85  return this->tcpnodelay;
86 }
87 
89  return this->connections;
90 }
91 
92 boost::recursive_mutex& Bus::getConnectionLock() {
93  return this->connectionLock;
94 }
95 
97  boost::recursive_mutex::scoped_lock lock(this->connectorLock);
98 
99  Scope scope = sink->getScope();
100  RSCDEBUG(logger, "Adding sink " << sink << " to scope " << scope);
101  SinkList& connectors = this->sinks[scope];
102  connectors.push_back(sink);
103 }
104 
106  boost::recursive_mutex::scoped_lock lock(this->connectorLock);
107 
108  Scope scope = sink->getScope();
109  RSCDEBUG(logger, "Removing sink " << sink << " from scope " << scope);
110  SinkList& connectors = this->sinks[scope];
111  RSCDEBUG(logger, "Scope " << scope << " has "
112  << connectors.size() << " connectors (before removing)");
113  for (SinkList::iterator it = connectors.begin(); it != connectors.end(); ++it) {
114  // If the weak pointer is dangling, we found our
115  // sink. Otherwise, we can just check the pointer.
116  InConnectorPtr ptr = it->lock();
117  if (!ptr || (ptr.get() == sink)) {
118  RSCDEBUG(logger, "Found connector " << sink << " in scope " << scope);
119  connectors.erase(it);
120  break;
121  }
122  }
123  RSCDEBUG(logger, "Scope " << scope << " has "
124  << connectors.size() << " connectors (after removing)");
125 
126  // If no connectors remain for the scope, the whole entry can be
127  // removed.
128  if (connectors.empty()) {
129  RSCDEBUG(logger, "Removing empty scope " << scope);
130  this->sinks.erase(scope);
131  }
132 }
133 
135  RSCDEBUG(logger, "Adding connection " << connection);
136 
137  boost::recursive_mutex::scoped_lock lock(this->connectionLock);
138 
139  this->connections.push_back(connection);
140 }
141 
143  RSCDEBUG(logger, "Removing connection " << connection);
144 
145  boost::recursive_mutex::scoped_lock lock(this->connectionLock);
146 
147  this->connections.remove(connection);
148 }
149 
151  BusConnectionPtr /*connection*/) {
152  RSCDEBUG(logger, "Delivering received event to connectors " << event);
153 
154  vector<Scope> scopes = event->getScopePtr()->superScopes(true);
155  RSCDEBUG(logger, "Relevant scopes " << scopes);
156 
157  {
158  boost::recursive_mutex::scoped_lock lock(this->connectorLock);
159 
160  for (vector<Scope>::const_iterator it = scopes.begin(); it != scopes.end(); ++it) {
161  SinkMap::const_iterator it_ = this->sinks.find(*it);
162  if (it_ != this->sinks.end()) {
163  const SinkList& connectors = it_->second;
164 
165  for (SinkList::const_iterator it__ = connectors.begin(); it__
166  != connectors.end(); ++it__) {
167  InConnectorPtr connector = it__->lock();
168  if (connector) {
169  RSCDEBUG(logger, "Delivering to connector " << connector << " in " << *it);
170  connector->handle(event);
171  }
172  }
173  }
174  }
175  }
176 }
177 
178 void Bus::handle(EventPtr event) {
179  // Dispatch to our own connectors.
180  RSCDEBUG(logger, "Delivering outgoing event to connectors " << event);
181 
182  vector<Scope> scopes = event->getScopePtr()->superScopes(true);
183  RSCDEBUG(logger, "Relevant scopes " << scopes);
184 
185  {
186  boost::recursive_mutex::scoped_lock lock(this->connectorLock);
187 
188  for (vector<Scope>::const_iterator it = scopes.begin(); it != scopes.end(); ++it) {
189  SinkMap::const_iterator it_ = this->sinks.find(*it);
190  if (it_ != this->sinks.end()) {
191  const SinkList& connectors = it_->second;
192 
193  for (SinkList::const_iterator it__ = connectors.begin(); it__
194  != connectors.end(); ++it__) {
195  InConnectorPtr connector = it__->lock();
196  if (connector) {
197  RSCDEBUG(logger, "Delivering to connector " << connector << " in " << *it);
198  connector->handle(event);
199  }
200  }
201  }
202  }
203  }
204 
205  // Dispatch to outgoing connections.
206  {
207  boost::recursive_mutex::scoped_lock lock(this->connectionLock);
208 
209  RSCDEBUG(logger, "Dispatching outgoing event " << event << " to connections");
210 
211  string wireSchema = event->getMetaData().getUserInfo("rsb.wire-schema");
212  list<BusConnectionPtr> failing;
213  for (list<BusConnectionPtr>::iterator it = this->connections.begin();
214  it != this->connections.end(); ++it) {
215  RSCDEBUG(logger, "Dispatching to connection " << *it);
216  try {
217  (*it)->sendEvent(event, wireSchema);
218  } catch (const std::exception& e) {
219  RSCWARN(logger, "Send failure (" << e.what() << "); will close connection later");
220  // We record failing connections instead of closing them
221  // immediately to avoid invalidating the iterator.
222  failing.push_back(*it);
223  }
224  }
225 
226  // This should remove all references to the connection objects.
227  for (list<BusConnectionPtr>::const_iterator it = failing.begin();
228  it != failing.end(); ++it) {
229  removeConnection(*it);
230  }
231  }
232 }
233 
234 void Bus::printContents(ostream& stream) const {
235  stream << "connections = " << this->connections
236  << ", sinks = " << this->sinks;
237 }
238 
239 const std::string Bus::getTransportURL() const {
240  assert(this->connections.size() == 1);
241  return (*this->connections.begin())->getTransportURL();
242 }
243 
244 }
245 }
246 }
rsc::logging::LoggerPtr logger
Definition: Bus.h:125
ConnectionList connections
Definition: Bus.h:131
boost::shared_ptr< InConnector > InConnectorPtr
Definition: Bus.h:55
virtual void handle(EventPtr event)
Handle event.
Definition: Bus.cpp:178
ConnectionList getConnections() const
Definition: Bus.cpp:88
virtual void removeConnection(BusConnectionPtr connection)
Removes connection from the list of connections of this bus.
Definition: Bus.cpp:142
virtual void handleIncoming(EventPtr event, BusConnectionPtr connection)
Definition: Bus.cpp:150
STL namespace.
boost::shared_ptr< AsioServiceContext > AsioServiceContextPtr
boost::shared_ptr< BusConnection > BusConnectionPtr
std::list< boost::weak_ptr< InConnector > > SinkList
Definition: Bus.h:122
Instances of this class receive events from a bus that is accessed via a socket connection.
Definition: InConnector.h:61
virtual bool isTcpnodelay() const
Definition: Bus.cpp:84
virtual void addConnection(BusConnectionPtr connection)
Adds connection to the list of connections of the bus.
Definition: Bus.cpp:134
AsioServiceContextPtr asioService
Definition: Bus.h:129
virtual void printContents(std::ostream &stream) const
Definition: Bus.cpp:234
boost::recursive_mutex & getConnectionLock()
Definition: Bus.cpp:92
virtual void removeSink(InConnector *sink)
Definition: Bus.cpp:105
boost::recursive_mutex connectorLock
Definition: Bus.h:135
virtual void addSink(InConnectorPtr sink)
Definition: Bus.cpp:96
boost::recursive_mutex connectionLock
Definition: Bus.h:132
virtual const std::string getTransportURL() const
Definition: Bus.cpp:239
virtual AsioServiceContextPtr getService() const
Definition: Bus.cpp:80
boost::shared_ptr< Event > EventPtr
Definition: Event.h:264
Scope is a descriptor for a hierarchical channel of the unified bus.
Definition: Scope.h:46
std::list< BusConnectionPtr > ConnectionList
Definition: Bus.h:113