RSB  0.17.0
BusConnection.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is part of the RSB project
4  *
5  * Copyright (C) 2011, 2012, 2014, 2015 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "BusConnection.h"
28 
29 #include <boost/bind.hpp>
30 #include <boost/format.hpp>
31 
32 #include <rsc/misc/langutils.h>
33 
34 #include "Bus.h"
35 #include "Serialization.h"
36 
37 using namespace std;
38 
39 using namespace boost;
40 
41 using namespace boost::asio;
42 //using tcp = boost::asio::ip::tcp;
43 
44 using namespace rsc::logging;
45 
46 namespace rsb {
47 namespace transport {
48 namespace socket {
49 
50 // Return the exception's what() string falling back to a replacement
51 // string in case what() throws an exception.
52 std::string safeSocketExceptionString(const std::exception& exception) {
53  try {
54  return exception.what();
55  } catch (...) {
56  return "<failed to determine exception message>";
57  }
58 }
59 
60 BusConnection::BusConnection(BusPtr bus,
61  SocketPtr socket,
62  bool client,
63  bool tcpNoDelay) :
64  logger(Logger::getLogger("rsb.transport.socket.BusConnection")),
65  socket(socket), bus(bus), disconnecting(false), activeShutdown(false) {
66 
67  // Enable TCPNODELAY socket option to trade decreased throughput
68  // for reduced latency.
69  if (tcpNoDelay){
70  RSCINFO(logger, "Setting TCP_NODELAY option");
71  boost::asio::ip::tcp::no_delay option(true);
72  socket->set_option(option);
73  }
74 
75  // Allocate static buffers.
76  this->lengthReceiveBuffer.resize(4);
77  this->lengthSendBuffer.resize(4);
78 
79  // Perform request role of the handshake.
80  if (client) {
81  read(*this->socket, buffer(&this->lengthReceiveBuffer[0], 4));
82  } else {
83  write(*this->socket, buffer(this->lengthSendBuffer));
84  }
85 }
86 
88  // only a safety measure. Usually we will not be destructed as long as
89  // receiving is running.
90  performSafeCleanup("destruction");
91 }
92 
94  RSCINFO(logger, "Shutting down");
95 
96  boost::recursive_mutex::scoped_lock lock(this->mutex);
97  this->activeShutdown = true;
98 
99  if (this->socket && this->socket->is_open()) {
100  this->socket->shutdown(ip::tcp::socket::shutdown_send);
101  }
102 
103 }
104 
106  RSCINFO(logger, "Disconnecting");
107 
108  boost::recursive_mutex::scoped_lock lock(this->mutex);
109  this->disconnecting = true;
110 
111  if (this->socket && this->socket->is_open()) {
112  this->socket->close();
113  }
114 
115 }
116 
118  receiveEvent();
119 }
120 
122  const string& wireSchema) {
123 
124 
125  // Convert the event into a notification object and serialize the
126  // notification object.
127  // The payload already is a byte-array, since it has been
128  // serialized by the connector which submitted the event.
129  protocol::Notification notification;
130  eventToNotification(notification, event, wireSchema,
131  *static_pointer_cast<string>(event->getData()));
132  notification.SerializeToString(&this->messageSendBuffer);
133 
134  // Encode the size of the serialized notification object.
135  uint32_t length = this->messageSendBuffer.size();
136  this->lengthSendBuffer[0] = (length & 0x000000fful) >> 0;
137  this->lengthSendBuffer[1] = (length & 0x0000ff00ul) >> 8;
138  this->lengthSendBuffer[2] = (length & 0x00ff0000ul) >> 16;
139  this->lengthSendBuffer[3] = (length & 0xff000000ul) >> 24;
140 
141  {
142  boost::recursive_mutex::scoped_lock lock(this->mutex);
143  if (this->activeShutdown) {
144  RSCDEBUG(this->logger, "Ignoring to send a notification "
145  "because we are shutting down.");
146  return;
147  }
148 
149  // Send the size header, followed by the actual notification data.
150  write(*this->socket, buffer(this->lengthSendBuffer));
151  write(*this->socket, buffer(this->messageSendBuffer));
152  }
153 
154 }
155 
156 void BusConnection::performSafeCleanup(const string& context) {
157  // Remove ourselves from the bus to which we are connected.
158  BusPtr bus = this->bus.lock();
159  if (bus) {
160  // The bus may already have removed its pointer.
161  try {
162  bus->removeConnection(shared_from_this());
163  } catch (const boost::bad_weak_ptr&) {
164  }
165  }
166 
167  // We can only ignore the error here.
168  if (!this->disconnecting) {
169  try {
170  disconnect();
171  } catch (const std::exception& e) {
172  // Use safeSocketExceptionString to avoid exceptions
173  // potentially thrown when printing Boost.Asio exceptions.
174  RSCDEBUG(this->logger, "Failed to disconnect (in " << context << "): "
176  }
177  }
178 }
179 
181  async_read(*this->socket,
182  buffer(&this->lengthReceiveBuffer[0], 4),
183  boost::bind(&BusConnection::handleReadLength, shared_from_this(),
184  boost::asio::placeholders::error,
185  boost::asio::placeholders::bytes_transferred));
186 }
187 
188 void BusConnection::handleReadLength(const boost::system::error_code& error,
189  size_t bytesTransferred) {
190 
191  {
192  boost::recursive_mutex::scoped_lock lock(this->mutex);
193  if (error == boost::asio::error::eof) {
194  RSCDEBUG(logger, "Received eof");
195  if (!this->activeShutdown) {
196  try {
197  shutdown();
198  } catch (const std::exception& e) {
199  // Use safeSocketExceptionString to avoid
200  // exceptions potentially thrown when printing
201  // Boost.Asio exceptions.
202  RSCWARN(this->logger, "Failed to shut down socket: "
204  }
205  }
206  performSafeCleanup("handleReadLength[eof]");
207  return;
208  }
209  }
210 
211  if (error || (bytesTransferred != 4)) {
212  if (!disconnecting) {
213  RSCDEBUG(logger, "Receive failure (error " << error << ")"
214  << " or incomplete message header (received " << bytesTransferred << " bytes)"
215  << "; closing connection");
216  }
217  performSafeCleanup("handleReadLength[unknown error]");
218  return;
219  }
220 
221  uint32_t size
222  = (((uint32_t) *reinterpret_cast<unsigned char*>(&this->lengthReceiveBuffer[0])) << 0)
223  | (((uint32_t) *reinterpret_cast<unsigned char*>(&this->lengthReceiveBuffer[1])) << 8)
224  | (((uint32_t) *reinterpret_cast<unsigned char*>(&this->lengthReceiveBuffer[2])) << 16)
225  | (((uint32_t) *reinterpret_cast<unsigned char*>(&this->lengthReceiveBuffer[3])) << 24);
226 
227  RSCDEBUG(logger, "Received message header with size " << size);
228 
229  this->messageReceiveBuffer.resize(size);
230 
231  async_read(*this->socket,
232  buffer(&this->messageReceiveBuffer[0], size),
233  boost::bind(&BusConnection::handleReadBody, shared_from_this(),
234  boost::asio::placeholders::error,
235  boost::asio::placeholders::bytes_transferred,
236  size));
237 }
238 
239 void BusConnection::handleReadBody(const boost::system::error_code& error,
240  size_t bytesTransferred,
241  size_t expected) {
242  if (error || (bytesTransferred != expected)) {
243  if (!this->disconnecting) {
244  RSCWARN(logger, "Receive failure (error " << error << ")"
245  << " or incomplete message body (received " << bytesTransferred << " bytes)"
246  << "; closing connection");
247  }
248  performSafeCleanup("handleReadBody");
249  return;
250  }
251 
252  // Deserialize the notification.
253  if (!this->notification.ParseFromString(this->messageReceiveBuffer)) {
254  RSCWARN(logger, "Received unparseable protobuf message, closing connection");
255  performSafeCleanup("handleReadBody[parsing]");
256  return;
257  }
258 
259  // Construct an Event instance *without* deserializing the
260  // payload. This has to be done in connectors since different
261  // converters can be used.
262  EventPtr event = notificationToEvent(this->notification, true);
263 
264  // Dispatch the received event to connectors.
265  BusPtr bus = this->bus.lock();
266  if (bus) {
267  bus->handleIncoming(event, shared_from_this());
268  } else {
269  RSCWARN(logger, "Dangling bus pointer when trying to dispatch incoming event; closing connection");
270  performSafeCleanup("handleReadBody");
271  return;
272  }
273 
274  // Submit task to start receiving the next event.
275  receiveEvent();
276 }
277 
278 void BusConnection::printContents(ostream& stream) const {
279  try {
280  stream << "local = " << this->socket->local_endpoint()
281  << ", remote = " << this->socket->remote_endpoint();
282  } catch (...) {
283  stream << "<error printing socket info>";
284  }
285 }
286 
287 const std::string BusConnection::getTransportURL() const {
288  return boost::str(boost::format("socket://%1%:%2%")
289  % this->socket->remote_endpoint().address().to_string()
290  % this->socket->remote_endpoint().port());
291 }
292 
293 }
294 }
295 }
void handleReadBody(const boost::system::error_code &error, size_t bytesTransferred, size_t expected)
EventPtr notificationToEvent(protocol::Notification &notification, bool exposeWireSchema)
Converts notification into an Event.
void sendEvent(EventPtr event, const std::string &wireSchema)
void printContents(std::ostream &stream) const
STL namespace.
void eventToNotification(protocol::Notification &notification, const EventPtr &event, const string &wireSchema, const string &data)
Converts the Event event into a protocol::Notification, storing the result in notification.
void performSafeCleanup(const std::string &context)
void handleReadLength(const boost::system::error_code &error, size_t bytesTransferred)
rsc::logging::LoggerPtr logger
Definition: BusConnection.h:97
std::string safeSocketExceptionString(const std::exception &exception)
boost::shared_ptr< boost::asio::ip::tcp::socket > SocketPtr
Definition: BusConnection.h:77
protocol::Notification notification
boost::shared_ptr< Event > EventPtr
Definition: Event.h:264
virtual const std::string getTransportURL() const
boost::shared_ptr< Bus > BusPtr
Definition: BusConnection.h:51