RSB  0.9.6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
BusConnection.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is part of the RSB project
4  *
5  * Copyright (C) 2011, 2012 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "BusConnection.h"
28 
29 #include <boost/bind.hpp>
30 
31 #include <rsc/misc/langutils.h>
32 
33 #include "Bus.h"
34 #include "Serialization.h"
35 
36 using namespace std;
37 
38 using namespace boost;
39 
40 using namespace boost::asio;
41 //using tcp = boost::asio::ip::tcp;
42 
43 using namespace rsc::logging;
44 
45 namespace rsb {
46 namespace transport {
47 namespace socket {
48 
49 BusConnection::BusConnection(BusPtr bus,
50  SocketPtr socket,
51  bool client,
52  bool tcpNoDelay) :
53  logger(Logger::getLogger("rsb.transport.socket.BusConnection")),
54  socket(socket), bus(bus), disconnecting(false), activeShutdown(false) {
55 
56  // Enable TCPNODELAY socket option to trade decreased throughput
57  // for reduced latency.
58  if (tcpNoDelay){
59  RSCINFO(logger, "Setting TCP_NODELAY option");
60  boost::asio::ip::tcp::no_delay option(true);
61  socket->set_option(option);
62  }
63 
64  // Allocate static buffers.
65  this->lengthReceiveBuffer.resize(4);
66  this->lengthSendBuffer.resize(4);
67 
68  // Perform request role of the handshake.
69  if (client) {
70  read(*this->socket, buffer(&this->lengthReceiveBuffer[0], 4));
71  } else {
72  write(*this->socket, buffer(this->lengthSendBuffer));
73  }
74 }
75 
77  // only a safety measure. Usually we will not be destructed as long as
78  // receiving is running.
79  performSafeCleanup("destruction");
80 }
81 
83  RSCINFO(logger, "Shutting down");
84 
85  boost::recursive_mutex::scoped_lock lock(this->mutex);
86  this->activeShutdown = true;
87 
88  if (this->socket && this->socket->is_open()) {
89  this->socket->shutdown(ip::tcp::socket::shutdown_send);
90  }
91 
92 }
93 
95  RSCINFO(logger, "Disconnecting");
96 
97  boost::recursive_mutex::scoped_lock lock(this->mutex);
98  this->disconnecting = true;
99 
100  if (this->socket && this->socket->is_open()) {
101  this->socket->close();
102  }
103 
104 }
105 
107  receiveEvent();
108 }
109 
111  const string& wireSchema) {
112 
113 
114  // Convert the event into a notification object and serialize the
115  // notification object.
116  // The payload already is a byte-array, since it has been
117  // serialized by the connector which submitted the event.
118  protocol::Notification notification;
119  eventToNotification(notification, event, wireSchema,
120  *static_pointer_cast<string>(event->getData()));
121  notification.SerializeToString(&this->messageSendBuffer);
122 
123  // Encode the size of the serialized notification object.
124  uint32_t length = this->messageSendBuffer.size();
125  this->lengthSendBuffer[0] = (length & 0x000000fful) >> 0;
126  this->lengthSendBuffer[1] = (length & 0x0000ff00ul) >> 8;
127  this->lengthSendBuffer[2] = (length & 0x00ff0000ul) >> 16;
128  this->lengthSendBuffer[3] = (length & 0xff000000ul) >> 24;
129 
130  {
131  boost::recursive_mutex::scoped_lock lock(this->mutex);
132  if (this->activeShutdown) {
133  RSCDEBUG(this->logger, "Ignoring to send a notification "
134  "because we are shutting down.");
135  return;
136  }
137 
138  // Send the size header, followed by the actual notification data.
139  write(*this->socket, buffer(this->lengthSendBuffer));
140  write(*this->socket, buffer(this->messageSendBuffer));
141  }
142 
143 }
144 
145 void BusConnection::performSafeCleanup(const string& context) {
146  // Remove ourselves from the bus to which we are connected.
147  BusPtr bus = this->bus.lock();
148  if (bus) {
149  // The bus may already have removed its pointer.
150  try {
151  bus->removeConnection(shared_from_this());
152  } catch (const boost::bad_weak_ptr&) {
153  }
154  }
155 
156  // We can only ignore the error here.
157  if (!this->disconnecting) {
158  try {
159  disconnect();
160  } catch (const std::exception& e) {
161  RSCDEBUG(logger, "Failed to disconnect (in " << context << "): "
162  << e.what())
163  }
164  }
165 }
166 
168  async_read(*this->socket,
169  buffer(&this->lengthReceiveBuffer[0], 4),
170  boost::bind(&BusConnection::handleReadLength, shared_from_this(),
171  boost::asio::placeholders::error,
172  boost::asio::placeholders::bytes_transferred));
173 }
174 
175 void BusConnection::handleReadLength(const boost::system::error_code& error,
176  size_t bytesTransferred) {
177 
178  {
179  boost::recursive_mutex::scoped_lock lock(this->mutex);
180  if (error == boost::asio::error::eof) {
181  RSCDEBUG(logger, "Received eof");
182  if (!this->activeShutdown) {
183  shutdown();
184  }
185  performSafeCleanup("handleReadLength[eof]");
186  return;
187  }
188  }
189 
190  if (error || (bytesTransferred != 4)) {
191  if (!disconnecting) {
192  RSCDEBUG(logger, "Receive failure (error " << error << ")"
193  << " or incomplete message header (received " << bytesTransferred << " bytes)"
194  << "; closing connection");
195  }
196  performSafeCleanup("handleReadLength[unknown error]");
197  return;
198  }
199 
200  uint32_t size
201  = (((uint32_t) *reinterpret_cast<unsigned char*>(&this->lengthReceiveBuffer[0])) << 0)
202  | (((uint32_t) *reinterpret_cast<unsigned char*>(&this->lengthReceiveBuffer[1])) << 8)
203  | (((uint32_t) *reinterpret_cast<unsigned char*>(&this->lengthReceiveBuffer[2])) << 16)
204  | (((uint32_t) *reinterpret_cast<unsigned char*>(&this->lengthReceiveBuffer[3])) << 24);
205 
206  RSCDEBUG(logger, "Received message header with size " << size);
207 
208  this->messageReceiveBuffer.resize(size);
209 
210  async_read(*this->socket,
211  buffer(&this->messageReceiveBuffer[0], size),
212  boost::bind(&BusConnection::handleReadBody, shared_from_this(),
213  boost::asio::placeholders::error,
214  boost::asio::placeholders::bytes_transferred,
215  size));
216 }
217 
218 void BusConnection::handleReadBody(const boost::system::error_code& error,
219  size_t bytesTransferred,
220  size_t expected) {
221  if (error || (bytesTransferred != expected)) {
222  if (!this->disconnecting) {
223  RSCWARN(logger, "Receive failure (error " << error << ")"
224  << " or incomplete message body (received " << bytesTransferred << " bytes)"
225  << "; closing connection");
226  }
227  performSafeCleanup("handleReadBody");
228  return;
229  }
230 
231  // Deserialize the notification.
232  this->notification.ParseFromString(this->messageReceiveBuffer);
233 
234  // Construct an Event instance *without* deserializing the
235  // payload. This has to be done in connectors since different
236  // converters can be used.
237  EventPtr event = notificationToEvent(this->notification, true);
238 
239  // Dispatch the received event to connectors.
240  BusPtr bus = this->bus.lock();
241  if (bus) {
242  bus->handleIncoming(event, shared_from_this());
243  } else {
244  RSCWARN(logger, "Dangling bus pointer when trying to dispatch incoming event; closing connection");
245  performSafeCleanup("handleReadBody");
246  return;
247  }
248 
249  // Submit task to start receiving the next event.
250  receiveEvent();
251 }
252 
253 void BusConnection::printContents(ostream& stream) const {
254  try {
255  stream << "local = " << this->socket->local_endpoint()
256  << ", remote = " << this->socket->remote_endpoint();
257  } catch (...) {
258  stream << "<error printing socket info>";
259  }
260 }
261 
262 }
263 }
264 }
void handleReadBody(const boost::system::error_code &error, size_t bytesTransferred, size_t expected)
EventPtr notificationToEvent(protocol::Notification &notification, bool exposeWireSchema)
Converts notification into an Event.
void sendEvent(EventPtr event, const std::string &wireSchema)
void printContents(std::ostream &stream) const
void eventToNotification(protocol::Notification &notification, const EventPtr &event, const string &wireSchema, const string &data)
Converts the Event event into a protocol::Notification, storing the result in notification.
void performSafeCleanup(const std::string &context)
void handleReadLength(const boost::system::error_code &error, size_t bytesTransferred)
rsc::logging::LoggerPtr logger
Definition: BusConnection.h:95
boost::shared_ptr< boost::asio::ip::tcp::socket > SocketPtr
Definition: BusConnection.h:77
protocol::Notification notification
boost::shared_ptr< Event > EventPtr
Definition: Event.h:251
boost::shared_ptr< Bus > BusPtr
Definition: BusConnection.h:51