RSB  0.7.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
OutConnector.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is a part of the RSB project
4  *
5  * Copyright (C) 2010 by Sebastian Wrede <swrede at techfak dot uni-bielefeld dot de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "OutConnector.h"
28 
29 #include <rsc/misc/langutils.h>
30 
31 #include "../../MetaData.h"
32 #include "../../EventId.h"
33 #include "../../Scope.h"
34 
35 #include "../../protocol/ProtocolException.h"
36 #include "../../protocol/FragmentedNotification.h"
37 
38 
39 using namespace std;
40 
41 using namespace rsc::runtime;
42 using namespace rsc::logging;
43 
44 using namespace rsb::protocol;
45 using namespace rsb::converter;
46 
47 namespace rsb {
48 namespace spread {
49 
50 transport::OutConnector* OutConnector::create(const Properties& args) {
51  static LoggerPtr logger = Logger::getLogger("rsb.spread.OutConnector");
52  RSCDEBUG(logger, "creating OutConnector with properties " << args);
53 
54  return new OutConnector(
55  args.get<ConverterSelectionStrategyPtr>("converters"),
56  args.get<string>("host", defaultHost()),
57  args.getAs<unsigned int>("port", defaultPort()),
58  args.getAs<unsigned int>("maxfragmentsize", 100000));
59 }
60 
61 OutConnector::OutConnector(ConverterSelectionStrategyPtr converters,
62  const string& host, unsigned int port, unsigned int maxFragmentSize) :
63  transport::ConverterSelectingConnector<string>(converters), logger(
64  Logger::getLogger("rsb.spread.OutConnector")), active(false), connector(
65  new SpreadConnector(host, port)), maxFragmentSize(
66  maxFragmentSize), minDataSpace(5) {
67 }
68 
70  if (this->active) {
71  deactivate();
72  }
73 }
74 
76  return "OutConnector";
77 }
78 
79 void OutConnector::printContents(ostream& stream) const {
80  stream << "connector = " << connector << ", active = " << active;
81 }
82 
84  this->connector->activate();
85  this->active = true;
86 }
87 
89  this->connector->deactivate();
90  this->active = false;
91 }
92 
94  this->connector->setQualityOfServiceSpecs(specs);
95 }
96 
98 
99  // TODO exception handling if converter is not available
100  ConverterPtr c = getConverter(event->getType());
101  string wire;
102  string wireSchema = c->serialize(
103  make_pair(event->getType(), event->getData()), wire);
104 
105  event->mutableMetaData().setSendTime(rsc::misc::currentTimeMicros());
106 
107  // create a list of all fragments required to send this event
108  vector<FragmentedNotificationPtr> fragments;
109 
110  size_t curPos = 0;
111  unsigned int currentDataPart = 0;
112  // "currentDataPart == 0" is required for the case when wire.size() == 0
113  // This can happen, for example, with the "void" wire-schema.
114  while (curPos < wire.size() || currentDataPart == 0) {
115 
116  FragmentedNotificationPtr notification(new FragmentedNotification);
117  fillNotificationId(*(notification->mutable_notification()),
118  event);
119 
120  // when sending the first time, we need to transmit all meta data.
121  if (currentDataPart == 0) {
122  fillNotificationHeader(*(notification->mutable_notification()),
123  event, wireSchema);
124  }
125  // Scale the data for this message with the size of the generated header
126  // and mandatory id fields in the notification
127  unsigned int headerByteSize = notification->ByteSize();
128  assert(headerByteSize <= maxFragmentSize - minDataSpace);
129  if (headerByteSize >= maxFragmentSize - minDataSpace) {
130  throw ProtocolException(
131  "The meta data of this event are too big for spread!");
132  }
133  unsigned int maxDataPartSize = maxFragmentSize - headerByteSize;
134 
135  // finally set the data information
136  string dataPart = wire.substr(curPos, maxDataPartSize);
137  curPos += maxDataPartSize;
138 
139  notification->mutable_notification()->set_data(dataPart);
140  notification->set_data_part(currentDataPart);
141  // optimistic guess for the number of required fragments
142  notification->set_num_data_parts(1);
143 
144  fragments.push_back(notification);
145 
146  ++currentDataPart;
147 
148  }
149 
150  // adapt num_data_parts field of each FragmentedNotification if we need more
151  // than one fragment
152  assert(!fragments.empty());
153  if (fragments.size() > 1) {
154  for (vector<FragmentedNotificationPtr>::iterator fragmentIt =
155  fragments.begin(); fragmentIt != fragments.end();
156  ++fragmentIt) {
157  (*fragmentIt)->set_num_data_parts(fragments.size());
158  }
159  }
160 
161  // finally send all fragments
162 
163  for (vector<FragmentedNotificationPtr>::const_iterator fragmentIt =
164  fragments.begin(); fragmentIt != fragments.end(); ++fragmentIt) {
165 
166  // serialize to spread message
167  string serializedMessageData;
168  if (!(*fragmentIt)->SerializeToString(&serializedMessageData)) {
169  throw ProtocolException("Failed to write notification to stream");
170  }
171  SpreadMessage spreadMessage(serializedMessageData);
172 
173  // send message to appropriate groups
174  const vector<string>& groupNames = connector->makeGroupNames(
175  *event->getScopePtr());
176  for (vector<string>::const_iterator groupIt = groupNames.begin();
177  groupIt != groupNames.end(); ++groupIt) {
178  spreadMessage.addGroup(*groupIt);
179  }
180  spreadMessage.setQOS(this->connector->getMessageQoS());
181 
182  RSCTRACE(
183  logger,
184  "This is the serialized message size before send: " << spreadMessage.getSize());
185 
186  this->connector->send(spreadMessage);
187  // TODO implement queuing or throw messages away?
188  // TODO maybe return exception with msg that was not sent
189  // TODO especially important to fulfill QoS specs
190  RSCDEBUG(logger, "event sent to spread");
191 
192  }
193 
194 }
195 
196 }
197 }