RSB  0.7.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Bus.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is part of the RSB project
4  *
5  * Copyright (C) 2011, 2012 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "Bus.h"
28 
29 #include <rsc/runtime/ContainerIO.h>
30 
31 #include <boost/bind.hpp>
32 
33 #include <boost/thread.hpp>
34 
35 #include <boost/asio/ip/tcp.hpp>
36 
37 #include "Factory.h"
38 #include "../../MetaData.h"
39 
40 using namespace std;
41 
42 using namespace rsc::logging;
43 
44 using namespace boost::asio;
45 using boost::asio::ip::tcp;
46 
47 namespace rsb {
48 namespace transport {
49 namespace socket {
50 
51 Bus::Bus(io_service& service, bool tcpnodelay) :
52  logger(Logger::getLogger("rsb.transport.socket.Bus")),
53  service(service), tcpnodelay(tcpnodelay) {
54 }
55 
56 Bus::~Bus() {
57  // Sinks should be empty.
58  if (!this->sinks.empty()) {
59  RSCWARN(logger, "" << this->sinks.size() << " non-empty scopes when destructing");
60  }
61 
62  // Active connections hold a shared_ptr to themselves and would
63  // thus not be destructed. Disconnecting resolves this problem.
64  for (ConnectionList::iterator it = this->connections.begin();
65  it != this->connections.end(); ++it) {
66  try {
67  (*it)->disconnect();
68  } catch (const std::exception& e) {
69  RSCDEBUG(logger, "Failed to disconnect connection " << *it
70  << ": " << e.what());
71  }
72  }
73 }
74 
75 bool Bus::isTcpnodelay() const {
76  return this->tcpnodelay;
77 }
78 
79 Bus::ConnectionList Bus::getConnections() const {
80  return this->connections;
81 }
82 
83 boost::recursive_mutex& Bus::getConnectionLock() {
84  return this->connectionLock;
85 }
86 
87 void Bus::addSink(InConnectorPtr sink) {
88  boost::recursive_mutex::scoped_lock lock(this->connectorLock);
89 
90  Scope scope = sink->getScope();
91  RSCDEBUG(logger, "Adding sink " << sink << " to scope " << scope);
92  SinkList& connectors = this->sinks[scope];
93  connectors.push_back(sink);
94 }
95 
96 void Bus::removeSink(InConnector* sink) {
97  boost::recursive_mutex::scoped_lock lock(this->connectorLock);
98 
99  Scope scope = sink->getScope();
100  RSCDEBUG(logger, "Removing sink " << sink << " from scope " << scope);
101  SinkList& connectors = this->sinks[scope];
102  RSCDEBUG(logger, "Scope " << scope << " has "
103  << connectors.size() << " connectors (before removing)");
104  for (SinkList::iterator it = connectors.begin(); it != connectors.end(); ++it) {
105  // If the weak pointer is dangling, we found our
106  // sink. Otherwise, we can just check the pointer.
107  InConnectorPtr ptr = it->lock();
108  if (!ptr || (ptr.get() == sink)) {
109  RSCDEBUG(logger, "Found connector " << sink << " in scope " << scope);
110  connectors.erase(it);
111  break;
112  }
113  }
114  RSCDEBUG(logger, "Scope " << scope << " has "
115  << connectors.size() << " connectors (after removing)");
116 
117  // If no connectors remain for the scope, the whole entry can be
118  // removed.
119  if (connectors.empty()) {
120  RSCDEBUG(logger, "Removing empty scope " << scope);
121  this->sinks.erase(scope);
122  }
123 }
124 
125 void Bus::addConnector(ConnectorBase* connector) {
126  boost::recursive_mutex::scoped_lock lock(this->connectorLock);
127 
128  RSCDEBUG(logger, "Adding connector " << connector);
129  this->connectors.push_back(connector);
130 }
131 
132 void Bus::removeConnector(ConnectorBase* connector) {
133  boost::recursive_mutex::scoped_lock lock(this->connectorLock);
134 
135  RSCDEBUG(logger, "Removing connector " << connector);
136  this->connectors.remove(connector);
137 
138  // If no connectors remain, destroy the bus.
139  if (this->connectors.empty()) {
140  RSCINFO(logger, "No more connectors; suiciding");
141  suicide();
142  }
143 }
144 
145 void Bus::addConnection(BusConnectionPtr connection) {
146  RSCDEBUG(logger, "Adding connection " << connection);
147 
148  boost::recursive_mutex::scoped_lock lock(this->connectionLock);
149 
150  this->connections.push_back(connection);
151 }
152 
153 void Bus::removeConnection(BusConnectionPtr connection) {
154  RSCDEBUG(logger, "Removing connection " << connection);
155 
156  boost::recursive_mutex::scoped_lock lock(this->connectionLock);
157 
158  this->connections.remove(connection);
159 }
160 
161 void Bus::handleIncoming(EventPtr event,
162  BusConnectionPtr /*connection*/) {
163  RSCDEBUG(logger, "Delivering received event to connectors " << event);
164 
165  vector<Scope> scopes = event->getScopePtr()->superScopes(true);
166  RSCDEBUG(logger, "Relevant scopes " << scopes);
167 
168  {
169  boost::recursive_mutex::scoped_lock lock(this->connectorLock);
170 
171  for (vector<Scope>::const_iterator it = scopes.begin(); it != scopes.end(); ++it) {
172  SinkMap::const_iterator it_ = this->sinks.find(*it);
173  if (it_ != this->sinks.end()) {
174  const SinkList& connectors = it_->second;
175 
176  for (SinkList::const_iterator it__ = connectors.begin(); it__
177  != connectors.end(); ++it__) {
178  InConnectorPtr connector = it__->lock();
179  if (connector) {
180  RSCDEBUG(logger, "Delivering to connector " << connector << " in " << *it);
181  connector->handle(event);
182  }
183  }
184  }
185  }
186  }
187 }
188 
189 void Bus::handle(EventPtr event) {
190  // Dispatch to our own connectors.
191  RSCDEBUG(logger, "Delivering outgoing event to connectors " << event);
192 
193  vector<Scope> scopes = event->getScopePtr()->superScopes(true);
194  RSCDEBUG(logger, "Relevant scopes " << scopes);
195 
196  {
197  boost::recursive_mutex::scoped_lock lock(this->connectorLock);
198 
199  for (vector<Scope>::const_iterator it = scopes.begin(); it != scopes.end(); ++it) {
200  SinkMap::const_iterator it_ = this->sinks.find(*it);
201  if (it_ != this->sinks.end()) {
202  const SinkList& connectors = it_->second;
203 
204  for (SinkList::const_iterator it__ = connectors.begin(); it__
205  != connectors.end(); ++it__) {
206  InConnectorPtr connector = it__->lock();
207  if (connector) {
208  RSCDEBUG(logger, "Delivering to connector " << connector << " in " << *it);
209  connector->handle(event);
210  }
211  }
212  }
213  }
214  }
215 
216  // Dispatch to outgoing connections.
217  {
218  boost::recursive_mutex::scoped_lock lock(this->connectionLock);
219 
220  RSCDEBUG(logger, "Dispatching outgoing event " << event << " to connections");
221 
222  string wireSchema = event->getMetaData().getUserInfo("rsb.wire-schema");
223  list<BusConnectionPtr> failing;
224  for (list<BusConnectionPtr>::iterator it = this->connections.begin();
225  it != this->connections.end(); ++it) {
226  RSCDEBUG(logger, "Dispatching to connection " << *it);
227  try {
228  (*it)->sendEvent(event, wireSchema);
229  } catch (const std::exception& e) {
230  RSCWARN(logger, "Send failure (" << e.what() << "); will close connection later");
231  // We record failing connections instead of closing them
232  // immediately to avoid invalidating the iterator.
233  failing.push_back(*it);
234  }
235  }
236 
237  // This should remove all references to the connection objects.
238  for (list<BusConnectionPtr>::const_iterator it = failing.begin();
239  it != failing.end(); ++it) {
240  removeConnection(*it);
241  }
242  }
243 }
244 
245 void Bus::suicide() {
246  Factory::getInstance().removeBusClient(shared_from_this());
247 }
248 
249 void Bus::printContents(ostream& stream) const {
250  stream << "connections = " << this->connections
251  << ", sinks = " << this->sinks;
252 }
253 
254 }
255 }
256 }