RSB  0.12.2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
BusConnection.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is part of the RSB project
4  *
5  * Copyright (C) 2011, 2012, 2014 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "BusConnection.h"
28 
29 #include <boost/bind.hpp>
30 
31 #include <rsc/misc/langutils.h>
32 
33 #include "Bus.h"
34 #include "Serialization.h"
35 
36 using namespace std;
37 
38 using namespace boost;
39 
40 using namespace boost::asio;
41 //using tcp = boost::asio::ip::tcp;
42 
43 using namespace rsc::logging;
44 
45 namespace rsb {
46 namespace transport {
47 namespace socket {
48 
49 // Return the exception's what() string falling back to a replacement
50 // string in case what() throws an exception.
51 std::string safeSocketExceptionString(const std::exception& exception) {
52  try {
53  return exception.what();
54  } catch (...) {
55  return "<failed to determine exception message>";
56  }
57 }
58 
59 BusConnection::BusConnection(BusPtr bus,
60  SocketPtr socket,
61  bool client,
62  bool tcpNoDelay) :
63  logger(Logger::getLogger("rsb.transport.socket.BusConnection")),
64  socket(socket), bus(bus), disconnecting(false), activeShutdown(false) {
65 
66  // Enable TCPNODELAY socket option to trade decreased throughput
67  // for reduced latency.
68  if (tcpNoDelay){
69  RSCINFO(logger, "Setting TCP_NODELAY option");
70  boost::asio::ip::tcp::no_delay option(true);
71  socket->set_option(option);
72  }
73 
74  // Allocate static buffers.
75  this->lengthReceiveBuffer.resize(4);
76  this->lengthSendBuffer.resize(4);
77 
78  // Perform request role of the handshake.
79  if (client) {
80  read(*this->socket, buffer(&this->lengthReceiveBuffer[0], 4));
81  } else {
82  write(*this->socket, buffer(this->lengthSendBuffer));
83  }
84 }
85 
87  // only a safety measure. Usually we will not be destructed as long as
88  // receiving is running.
89  performSafeCleanup("destruction");
90 }
91 
93  RSCINFO(logger, "Shutting down");
94 
95  boost::recursive_mutex::scoped_lock lock(this->mutex);
96  this->activeShutdown = true;
97 
98  if (this->socket && this->socket->is_open()) {
99  this->socket->shutdown(ip::tcp::socket::shutdown_send);
100  }
101 
102 }
103 
105  RSCINFO(logger, "Disconnecting");
106 
107  boost::recursive_mutex::scoped_lock lock(this->mutex);
108  this->disconnecting = true;
109 
110  if (this->socket && this->socket->is_open()) {
111  this->socket->close();
112  }
113 
114 }
115 
117  receiveEvent();
118 }
119 
121  const string& wireSchema) {
122 
123 
124  // Convert the event into a notification object and serialize the
125  // notification object.
126  // The payload already is a byte-array, since it has been
127  // serialized by the connector which submitted the event.
128  protocol::Notification notification;
129  eventToNotification(notification, event, wireSchema,
130  *static_pointer_cast<string>(event->getData()));
131  notification.SerializeToString(&this->messageSendBuffer);
132 
133  // Encode the size of the serialized notification object.
134  uint32_t length = this->messageSendBuffer.size();
135  this->lengthSendBuffer[0] = (length & 0x000000fful) >> 0;
136  this->lengthSendBuffer[1] = (length & 0x0000ff00ul) >> 8;
137  this->lengthSendBuffer[2] = (length & 0x00ff0000ul) >> 16;
138  this->lengthSendBuffer[3] = (length & 0xff000000ul) >> 24;
139 
140  {
141  boost::recursive_mutex::scoped_lock lock(this->mutex);
142  if (this->activeShutdown) {
143  RSCDEBUG(this->logger, "Ignoring to send a notification "
144  "because we are shutting down.");
145  return;
146  }
147 
148  // Send the size header, followed by the actual notification data.
149  write(*this->socket, buffer(this->lengthSendBuffer));
150  write(*this->socket, buffer(this->messageSendBuffer));
151  }
152 
153 }
154 
155 void BusConnection::performSafeCleanup(const string& context) {
156  // Remove ourselves from the bus to which we are connected.
157  BusPtr bus = this->bus.lock();
158  if (bus) {
159  // The bus may already have removed its pointer.
160  try {
161  bus->removeConnection(shared_from_this());
162  } catch (const boost::bad_weak_ptr&) {
163  }
164  }
165 
166  // We can only ignore the error here.
167  if (!this->disconnecting) {
168  try {
169  disconnect();
170  } catch (const std::exception& e) {
171  // Use safeSocketExceptionString to avoid exceptions
172  // potentially thrown when printing Boost.Asio exceptions.
173  RSCDEBUG(this->logger, "Failed to disconnect (in " << context << "): "
175  }
176  }
177 }
178 
180  async_read(*this->socket,
181  buffer(&this->lengthReceiveBuffer[0], 4),
182  boost::bind(&BusConnection::handleReadLength, shared_from_this(),
183  boost::asio::placeholders::error,
184  boost::asio::placeholders::bytes_transferred));
185 }
186 
187 void BusConnection::handleReadLength(const boost::system::error_code& error,
188  size_t bytesTransferred) {
189 
190  {
191  boost::recursive_mutex::scoped_lock lock(this->mutex);
192  if (error == boost::asio::error::eof) {
193  RSCDEBUG(logger, "Received eof");
194  if (!this->activeShutdown) {
195  try {
196  shutdown();
197  } catch (const std::exception& e) {
198  // Use safeSocketExceptionString to avoid
199  // exceptions potentially thrown when printing
200  // Boost.Asio exceptions.
201  RSCWARN(this->logger, "Failed to shut down socket: "
203  }
204  }
205  performSafeCleanup("handleReadLength[eof]");
206  return;
207  }
208  }
209 
210  if (error || (bytesTransferred != 4)) {
211  if (!disconnecting) {
212  RSCDEBUG(logger, "Receive failure (error " << error << ")"
213  << " or incomplete message header (received " << bytesTransferred << " bytes)"
214  << "; closing connection");
215  }
216  performSafeCleanup("handleReadLength[unknown error]");
217  return;
218  }
219 
220  uint32_t size
221  = (((uint32_t) *reinterpret_cast<unsigned char*>(&this->lengthReceiveBuffer[0])) << 0)
222  | (((uint32_t) *reinterpret_cast<unsigned char*>(&this->lengthReceiveBuffer[1])) << 8)
223  | (((uint32_t) *reinterpret_cast<unsigned char*>(&this->lengthReceiveBuffer[2])) << 16)
224  | (((uint32_t) *reinterpret_cast<unsigned char*>(&this->lengthReceiveBuffer[3])) << 24);
225 
226  RSCDEBUG(logger, "Received message header with size " << size);
227 
228  this->messageReceiveBuffer.resize(size);
229 
230  async_read(*this->socket,
231  buffer(&this->messageReceiveBuffer[0], size),
232  boost::bind(&BusConnection::handleReadBody, shared_from_this(),
233  boost::asio::placeholders::error,
234  boost::asio::placeholders::bytes_transferred,
235  size));
236 }
237 
238 void BusConnection::handleReadBody(const boost::system::error_code& error,
239  size_t bytesTransferred,
240  size_t expected) {
241  if (error || (bytesTransferred != expected)) {
242  if (!this->disconnecting) {
243  RSCWARN(logger, "Receive failure (error " << error << ")"
244  << " or incomplete message body (received " << bytesTransferred << " bytes)"
245  << "; closing connection");
246  }
247  performSafeCleanup("handleReadBody");
248  return;
249  }
250 
251  // Deserialize the notification.
252  this->notification.ParseFromString(this->messageReceiveBuffer);
253 
254  // Construct an Event instance *without* deserializing the
255  // payload. This has to be done in connectors since different
256  // converters can be used.
257  EventPtr event = notificationToEvent(this->notification, true);
258 
259  // Dispatch the received event to connectors.
260  BusPtr bus = this->bus.lock();
261  if (bus) {
262  bus->handleIncoming(event, shared_from_this());
263  } else {
264  RSCWARN(logger, "Dangling bus pointer when trying to dispatch incoming event; closing connection");
265  performSafeCleanup("handleReadBody");
266  return;
267  }
268 
269  // Submit task to start receiving the next event.
270  receiveEvent();
271 }
272 
273 void BusConnection::printContents(ostream& stream) const {
274  try {
275  stream << "local = " << this->socket->local_endpoint()
276  << ", remote = " << this->socket->remote_endpoint();
277  } catch (...) {
278  stream << "<error printing socket info>";
279  }
280 }
281 
282 }
283 }
284 }
void handleReadBody(const boost::system::error_code &error, size_t bytesTransferred, size_t expected)
EventPtr notificationToEvent(protocol::Notification &notification, bool exposeWireSchema)
Converts notification into an Event.
void sendEvent(EventPtr event, const std::string &wireSchema)
void printContents(std::ostream &stream) const
void eventToNotification(protocol::Notification &notification, const EventPtr &event, const string &wireSchema, const string &data)
Converts the Event event into a protocol::Notification, storing the result in notification.
void performSafeCleanup(const std::string &context)
void handleReadLength(const boost::system::error_code &error, size_t bytesTransferred)
rsc::logging::LoggerPtr logger
Definition: BusConnection.h:95
std::string safeSocketExceptionString(const std::exception &exception)
boost::shared_ptr< boost::asio::ip::tcp::socket > SocketPtr
Definition: BusConnection.h:77
protocol::Notification notification
boost::shared_ptr< Event > EventPtr
Definition: Event.h:254
boost::shared_ptr< Bus > BusPtr
Definition: BusConnection.h:51