RSB  0.7.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
SpreadConnection.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is a part of the RSB project
4  *
5  * Copyright (C) 2010 by Sebastian Wrede <swrede at techfak dot uni-bielefeld dot de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "SpreadConnection.h"
28 
29 #include <iostream>
30 #include <string.h>
31 
32 #include <boost/lexical_cast.hpp>
33 #include <boost/format.hpp>
34 
35 #include <rsc/misc/IllegalStateException.h>
36 
37 #include <sp.h>
38 
39 #include "../../CommException.h"
40 
41 using namespace std;
42 
43 using namespace boost;
44 
45 using namespace rsc::logging;
46 
47 namespace rsb {
48 namespace spread {
49 
50 #define SPREAD_MAX_GROUPS 100
51 #define SPREAD_MAX_MESSLEN 180000
52 
53 SpreadConnection::SpreadConnection(const string& id, const string& host,
54  unsigned int port) :
55  logger(Logger::getLogger("rsb.transport.spread.SpreadConnection")), connected(false),
56  host(host), port(port),
57 #ifdef WIN32
58  spreadname(str(format("%1%@%2%") % port % host)),
59 #else
60  spreadname((host == defaultHost())
61  ? lexical_cast<string>(port)
62  : str(format("%1%@%2%") % port % host)),
63 #endif
64  conId(id), msgCount(0) {
65  RSCDEBUG(logger, "instantiated spread connection with id " << conId
66  << " to spread daemon at " << spreadname);
67 }
68 
70  RSCDEBUG(logger, "destroying SpreadConnection object");
71  if (connected) {
72  deactivate();
73  }
74 }
75 
77  // XXX spread init and group join - not threadsafe, what to do about this?
78  if (connected) {
79  throw rsc::misc::IllegalStateException("Connection with id " + conId
80  + " is already active.");
81  }
82 
83  RSCDEBUG(logger, "connecting to spread daemon at " << spreadname);
84  char spreadPrivateGroup[MAX_GROUP_NAME];
85  int ret = SP_connect(spreadname.c_str(), 0, 0, 0, &con, spreadPrivateGroup);
86  spreadpg = string(spreadPrivateGroup);
87  stringstream errorString;
88  if (ret != ACCEPT_SESSION) {
89  errorString << "Error connecting to '" << spreadname << "': ";
90  switch (ret) {
91  case ILLEGAL_SPREAD:
92  errorString
93  << "connection to spread daemon at "
94  << spreadname << " failed, check port and hostname";
95  break;
96  case COULD_NOT_CONNECT:
97  errorString
98  << "connection to spread daemon failed due to socket errors, check port and hostname";
99  break;
100  case CONNECTION_CLOSED:
101  errorString
102  << "communication errors occurred during setup of connection";
103  break;
104  case REJECT_VERSION:
105  errorString
106  << "daemon or library version mismatch";
107  break;
108  case REJECT_NO_NAME:
109  errorString << "protocol error during setup";
110  break;
111  case REJECT_ILLEGAL_NAME:
112  errorString
113  << "name provided violated requirement, length or illegal character";
114  break;
115  case REJECT_NOT_UNIQUE:
116  errorString
117  << "name provided is not unique on this daemon";
118  break;
119  default:
120  errorString << "unknown spread connect error, value: " << ret;
121  }
122  SP_error(ret);
123  RSCFATAL(logger, errorString.str());
124  throw CommException(errorString.str());
125  } else {
126  RSCDEBUG(logger, "success, private group id is " << spreadpg);
127  }
128  RSCINFO(logger, "connected to spread daemon");
129 
130  connected = true;
131 
132 }
133 
135 
136  if (!connected) {
137  throw rsc::misc::IllegalStateException("Connection with id " + conId
138  + " is not active.");
139  }
140 
141  connected = false;
142 
143 }
144 
146  return connected;
147 }
148 
150 
151  if (!connected) {
152  throw rsc::misc::IllegalStateException("Connection with id " + conId
153  + " is not active.");
154  }
155 
156  sm->reset();
157 
158  // read from Spread multicast group
159  int serviceType;
160  int numGroups;
161  char sender[MAX_GROUP_NAME];
162  char retGroups[SPREAD_MAX_GROUPS][MAX_GROUP_NAME];
163  int16 messType;
164  int dummyEndianMismatch;
165  char buf[SPREAD_MAX_MESSLEN];
166  int ret = SP_receive(con, &serviceType, sender, SPREAD_MAX_GROUPS,
167  &numGroups, retGroups, &messType, &dummyEndianMismatch,
168  SPREAD_MAX_MESSLEN, buf);
169 
170  // check for errors
171  if (ret < 0) {
172 
173  string err;
174  switch (ret) {
175  case ILLEGAL_SESSION:
176  err = "spread receive error: mbox given to receive on was illegal";
177  break;
178  case ILLEGAL_MESSAGE:
179  err = "spread receive error: message had an illegal structure";
180  break;
181  case CONNECTION_CLOSED:
182  err = "spread receive error: message communication errors occurred";
183  break;
184  case GROUPS_TOO_SHORT:
185  err
186  = "spread receive error: groups array too short to hold list of groups";
187  break;
188  case BUFFER_TOO_SHORT:
189  err
190  = "spread receive error: message body buffer too short to hold the message received";
191  break;
192  default:
193  err = "unknown spread receive error";
194  }
195  throw CommException("Spread communication error. Reason: " + err);
196 
197  }
198 
199  // handle normal messages
200  if (Is_regular_mess(serviceType)) {
201 
202  RSCDEBUG(logger, "regular spread message received");
203 
204  // cancel if requested
205  if (numGroups == 1 && string(retGroups[0]) == string(spreadpg)) {
206  throw boost::thread_interrupted();
207  }
208 
209  sm->setType(SpreadMessage::REGULAR);
210  sm->setData(string(buf, ret));
211  if (numGroups < 0) {
212  // TODO check whether we shall implement a best effort strategy here
213  RSCWARN(logger,
214  "error during message reception, group array too large, requested size "
215  << " configured size " << SPREAD_MAX_GROUPS);
216  }
217  for (int i = 0; i < numGroups; i++) {
218  if (retGroups[i] != NULL) {
219  string group = string(retGroups[i]);
220  RSCDEBUG(logger,
221  "received message, addressed at group with name "
222  << group);
223  sm->addGroup(group);
224  }
225  }
226 
227  } else if (Is_membership_mess(serviceType)) {
228  // this will currently never happen as we do not want to have membership messages
229  // and this message does not contain any contents
230 
231  RSCINFO(logger, "received spread membership message type");
233 
234  } else {
235 
236  RSCFATAL(logger, "received unknown spread message type with code " << serviceType);
237  assert(false);
238  throw CommException(
239  "Received a message that is neither membership nor data message. "
240  "This should never happen according to the spread documentation.");
241 
242  }
243 
244 }
245 
247 
248  // TODO check message size, if larger than ~100KB throw exception
249  if (!connected) {
250  throw rsc::misc::IllegalStateException("Connection with id " + conId
251  + " is not active.");
252  }
253 
254  const unsigned int groupCount = msg.getGroupCount();
255  if (groupCount == 0) {
256  throw CommException("Group information missing in message");
257  }
258 
259  int ret;
260  if (groupCount == 1) {
261  // use SP_multicast
262  string group = *msg.getGroupsBegin();
263  RSCDEBUG(logger, "sending message to group with name " << group);
264  ret = SP_multicast(con, msg.getQOS(), group.c_str(), 0, msg.getSize(),
265  msg.getData());
266  } else {
267  // use SP_multigroup_multicast
268  char* groups = new char[groupCount * MAX_GROUP_NAME];
269  memset(groups, 0, groupCount * MAX_GROUP_NAME);
270  int j = 0;
271  for (list<string>::const_iterator it = msg.getGroupsBegin(); it
272  != msg.getGroupsEnd(); ++it) {
273  string group = *it;
274  strcpy(groups + j * MAX_GROUP_NAME, group.c_str());
275  j++;
276  }
277  ret = SP_multigroup_multicast(con, msg.getQOS(), groupCount,
278  (const char(*)[MAX_GROUP_NAME]) groups, 0, msg.getSize(),
279  msg.getData());
280  delete[] groups;
281  }
282 
283  // TODO shouldn't msgCount be incremented only in case of success?
284  ++msgCount;
285 
286  if (ret < 0) {
287 
288  stringstream err;
289  switch (ret) {
290  case ILLEGAL_SESSION:
291  err << "Illegal Session";
292  break;
293  case ILLEGAL_MESSAGE:
294  err << "Illegal Message";
295  break;
296  case CONNECTION_CLOSED:
297  err << "Connection Closed";
298  break;
299  default:
300  err << "Unknown spread error with code " << ret;
301  break;
302  }
303 
304  throw CommException(err.str());
305 
306  }
307 
308 }
309 
311 
312  if (!connected) {
313  throw rsc::misc::IllegalStateException("Connection with id " + conId
314  + " is not active.");
315  }
316 
317  SP_multicast(con, RELIABLE_MESS, spreadpg.c_str(), 0, 0, 0);
318 
319 }
320 
321 string SpreadConnection::generateId(const string& prefix) {
322  // TODO generate meaningful and unique Id according to MAX_PRIVATE_NAME
323  // uuid_t id;
324  // uuid_generate(id);
325  // char buf[37];
326  // uuid_unparse(id,buf);
327  return prefix;
328 }
329 
331  return msgCount;
332 }
333 
335  if (!connected) {
336  throw rsc::misc::IllegalStateException("Connection with id " + conId
337  + " is not active.");
338  }
339  return& con;
340 }
341 
342 string defaultHost() {
343  return "localhost";
344 }
345 
346 unsigned int defaultPort() {
347  return DEFAULT_SPREAD_PORT;
348 }
349 
350 }
351 }