RSB  0.7.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
BusConnection.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is part of the RSB project
4  *
5  * Copyright (C) 2011, 2012 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "BusConnection.h"
28 
29 #include <boost/bind.hpp>
30 
31 #include <rsc/misc/langutils.h>
32 
33 #include "Bus.h"
34 #include "Serialization.h"
35 
36 using namespace std;
37 
38 using namespace boost;
39 
40 using namespace boost::asio;
41 //using tcp = boost::asio::ip::tcp;
42 
43 using namespace rsc::logging;
44 
45 namespace rsb {
46 namespace transport {
47 namespace socket {
48 
49 BusConnection::BusConnection(BusPtr bus,
50  SocketPtr socket,
51  bool client,
52  bool tcpNoDelay) :
53  logger(Logger::getLogger("rsb.transport.socket.BusConnection")),
54  socket(socket), bus(bus), disconnecting(false) {
55 
56  // Enable TCPNODELAY socket option to trade decreased throughput
57  // for reduced latency.
58  if (tcpNoDelay){
59  RSCINFO(logger, "Setting TCP_NODELAY option");
60  boost::asio::ip::tcp::no_delay option(true);
61  socket->set_option(option);
62  }
63 
64  // Allocate static buffers.
65  this->lengthReceiveBuffer.resize(4);
66  this->lengthSendBuffer.resize(4);
67 
68  // Perform request role of the handshake.
69  if (client) {
70  read(*this->socket, buffer(&this->lengthReceiveBuffer[0], 4));
71  } else {
72  write(*this->socket, buffer(this->lengthSendBuffer));
73  }
74 }
75 
77  performSafeCleanup("destructor");
78 }
79 
81  RSCINFO(logger, "Disconnecting");
82  this->disconnecting = true;
83 
84  if (this->socket && this->socket->is_open()) {
85  this->socket->shutdown(ip::tcp::socket::shutdown_send);
86  this->socket->shutdown(ip::tcp::socket::shutdown_receive);
87  RSCINFO(logger, "Closing");
88  this->socket->close();
89  }
90 }
91 
93  receiveEvent();
94 }
95 
97  const string& wireSchema) {
98  // Convert the event into a notification object and serialize the
99  // notification object.
100  // The payload already is a byte-array, since it has been
101  // serialized by the connector which submitted the event.
102  protocol::Notification notification;
103  eventToNotification(notification, event, wireSchema,
104  *static_pointer_cast<string>(event->getData()));
105  notification.SerializeToString(&this->messageSendBuffer);
106 
107  // Encode the size of the serialized notification object.
108  uint32_t length = this->messageSendBuffer.size();
109  this->lengthSendBuffer[0] = (length & 0x000000fful) >> 0;
110  this->lengthSendBuffer[1] = (length & 0x0000ff00ul) >> 8;
111  this->lengthSendBuffer[2] = (length & 0x00ff0000ul) >> 16;
112  this->lengthSendBuffer[3] = (length & 0xff000000ul) >> 24;
113 
114  // Send the size header, followed by the actual notification data.
115  write(*this->socket, buffer(this->lengthSendBuffer));
116  write(*this->socket, buffer(this->messageSendBuffer));
117 }
118 
119 void BusConnection::performSafeCleanup(const string& context) {
120  // Remove ourselves from the bus to which we are connected.
121  BusPtr bus = this->bus.lock();
122  if (bus) {
123  // The bus may already have removed its pointer.
124  try {
125  bus->removeConnection(shared_from_this());
126  } catch (const boost::bad_weak_ptr&) {
127  }
128  }
129 
130  // We can only ignore the error here.
131  if (!this->disconnecting) {
132  try {
133  disconnect();
134  } catch (const std::exception& e) {
135  RSCDEBUG(logger, "Failed to disconnect (in " << context << "): "
136  << e.what())
137  }
138  }
139 }
140 
142  async_read(*this->socket,
143  buffer(&this->lengthReceiveBuffer[0], 4),
144  boost::bind(&BusConnection::handleReadLength, shared_from_this(),
145  boost::asio::placeholders::error,
146  boost::asio::placeholders::bytes_transferred));
147 }
148 
149 void BusConnection::handleReadLength(const boost::system::error_code& error,
150  size_t bytesTransferred) {
151  if (error || (bytesTransferred != 4)) {
152  if (!disconnecting) {
153  RSCDEBUG(logger, "Receive failure (error " << error << ")"
154  << " or incomplete message header (received " << bytesTransferred << " bytes)"
155  << "; closing connection");
156  }
157  performSafeCleanup("handleReadLength");
158  return;
159  }
160 
161  uint32_t size
162  = (((uint32_t) *reinterpret_cast<unsigned char*>(&this->lengthReceiveBuffer[0])) << 0)
163  | (((uint32_t) *reinterpret_cast<unsigned char*>(&this->lengthReceiveBuffer[1])) << 8)
164  | (((uint32_t) *reinterpret_cast<unsigned char*>(&this->lengthReceiveBuffer[2])) << 16)
165  | (((uint32_t) *reinterpret_cast<unsigned char*>(&this->lengthReceiveBuffer[3])) << 24);
166 
167  RSCDEBUG(logger, "Received message header with size " << size);
168 
169  this->messageReceiveBuffer.resize(size);
170 
171  async_read(*this->socket,
172  buffer(&this->messageReceiveBuffer[0], size),
173  boost::bind(&BusConnection::handleReadBody, shared_from_this(),
174  boost::asio::placeholders::error,
175  boost::asio::placeholders::bytes_transferred,
176  size));
177 }
178 
179 void BusConnection::handleReadBody(const boost::system::error_code& error,
180  size_t bytesTransferred,
181  size_t expected) {
182  if (error || (bytesTransferred != expected)) {
183  if (!this->disconnecting) {
184  RSCWARN(logger, "Receive failure (error " << error << ")"
185  << " or incomplete message body (received " << bytesTransferred << " bytes)"
186  << "; closing connection");
187  }
188  performSafeCleanup("handleReadBody");
189  return;
190  }
191 
192  // Deserialize the notification.
193  this->notification.ParseFromString(this->messageReceiveBuffer);
194 
195  // Construct an Event instance *without* deserializing the
196  // payload. This has to be done in connectors since different
197  // converters can be used.
198  EventPtr event = notificationToEvent(this->notification, true);
199 
200  // Dispatch the received event to connectors.
201  BusPtr bus = this->bus.lock();
202  if (bus) {
203  bus->handleIncoming(event, shared_from_this());
204  } else {
205  RSCWARN(logger, "Dangling bus pointer when trying to dispatch incoming event; closing connection");
206  performSafeCleanup("handleReadBody");
207  return;
208  }
209 
210  // Submit task to start receiving the next event.
211  receiveEvent();
212 }
213 
214 void BusConnection::printContents(ostream& stream) const {
215  try {
216  stream << "local = " << this->socket->local_endpoint()
217  << ", remote = " << this->socket->remote_endpoint();
218  } catch (...) {
219  stream << "<error printing socket info>";
220  }
221 }
222 
223 }
224 }
225 }