RSB  0.7.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
SpreadConnector.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is a part of the RSB project
4  *
5  * Copyright (C) 2010 by Sebastian Wrede <swrede at techfak dot uni-bielefeld dot de>
6  * Copyright (C) 2012 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
7  *
8  * This file may be licensed under the terms of the
9  * GNU Lesser General Public License Version 3 (the ``LGPL''),
10  * or (at your option) any later version.
11  *
12  * Software distributed under the License is distributed
13  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
14  * express or implied. See the LGPL for the specific language
15  * governing rights and limitations.
16  *
17  * You should have received a copy of the LGPL along with this
18  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
19  * or write to the Free Software Foundation, Inc.,
20  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21  *
22  * The development of this software was supported by:
23  * CoR-Lab, Research Institute for Cognition and Robotics
24  * Bielefeld University
25  *
26  * ============================================================ */
27 
28 #include <string.h>
29 #include <math.h>
30 
31 #include <rsc/misc/Registry.h>
32 
33 #include "../../CommException.h"
34 #include "../../UnsupportedQualityOfServiceException.h"
35 #include "../../util/MD5.h"
36 #include "../../Scope.h"
37 
38 #include "../../converter/Converter.h"
39 
40 #include "SpreadConnection.h"
41 #include "SpreadConnector.h"
42 
43 #include <sp.h>
44 
45 using namespace std;
46 
47 using namespace rsc::logging;
48 using namespace rsc::runtime;
49 
50 using namespace rsb;
51 using namespace rsb::util;
52 using namespace rsb::transport;
53 
54 namespace rsb {
55 namespace spread {
56 
57 const SpreadConnector::QoSMap SpreadConnector::qosMapping =
58  SpreadConnector::buildQoSMapping();
59 
60 SpreadConnector::SpreadConnector(const string& host, unsigned int port) {
61  init(host, port);
62 }
63 
64 void SpreadConnector::init(const string& host, unsigned int port) {
65  this->logger = Logger::getLogger("rsb.spread.SpreadConnector");
66  RSCDEBUG(logger, "SpreadConnector::init() entered");
67  this->activated = false;
68  // TODO ConnectionPool for SpreadConnections?!?
69  // TODO Send Message over Managing / Introspection Channel
70  this->con = SpreadConnectionPtr(
71  new SpreadConnection(id.getIdAsString(), host, port));
72  this->memberships = MembershipManagerPtr(new MembershipManager());
73  setQualityOfServiceSpecs(QualityOfServiceSpec());
74 }
75 
76 void SpreadConnector::activate() {
77  // connect to spread
78  this->con->activate();
79  this->activated = true;
80 }
81 
82 void SpreadConnector::deactivate() {
83  RSCDEBUG(logger, "deactivate() entered");
84  if (this->con->isActive()) {
85  this->con->deactivate();
86  }
87  // memberships->leaveAll();
88  RSCTRACE(logger, "deactivate() finished"); // << *id);
89  this->activated = false;
90 }
91 
92 void SpreadConnector::join(const string& name) {
93  this->memberships->join(name, this->con);
94 }
95 
96 void SpreadConnector::leave(const string& name) {
97  this->memberships->leave(name, this->con);
98 }
99 
100 void SpreadConnector::send(const SpreadMessage& msg) {
101  this->con->send(msg);
102 }
103 
104 void SpreadConnector::receive(SpreadMessagePtr msg) {
105  this->con->receive(msg);
106 }
107 
108 SpreadConnector::~SpreadConnector() {
109  if (this->activated) {
110  deactivate();
111  }
112 }
113 
114 SpreadConnectionPtr SpreadConnector::getConnection() {
115  return this->con;
116 }
117 
118 SpreadMessage::QOS SpreadConnector::getMessageQoS() const {
119  return this->messageQoS;
120 }
121 
122 SpreadConnector::QoSMap SpreadConnector::buildQoSMapping() {
123  map<QualityOfServiceSpec::Reliability, SpreadMessage::QOS> unorderedMap;
124  unorderedMap.insert(
125  make_pair(QualityOfServiceSpec::UNRELIABLE,
126  SpreadMessage::UNRELIABLE));
127  unorderedMap.insert(
128  make_pair(QualityOfServiceSpec::RELIABLE, SpreadMessage::RELIABLE));
129 
130  map<QualityOfServiceSpec::Reliability, SpreadMessage::QOS> orderedMap;
131  orderedMap.insert(
132  make_pair(QualityOfServiceSpec::UNRELIABLE, SpreadMessage::FIFO));
133  orderedMap.insert(
134  make_pair(QualityOfServiceSpec::RELIABLE, SpreadMessage::FIFO));
135 
137  SpreadMessage::QOS> > table;
138  table.insert(make_pair(QualityOfServiceSpec::UNORDERED, unorderedMap));
139  table.insert(make_pair(QualityOfServiceSpec::ORDERED, orderedMap));
140 
141  return table;
142 }
143 
144 void SpreadConnector::setQualityOfServiceSpecs(
145  const QualityOfServiceSpec& specs) {
146 
147  QoSMap::const_iterator orderMapIt = qosMapping.find(specs.getOrdering());
148  if (orderMapIt == qosMapping.end()) {
149  throw UnsupportedQualityOfServiceException("Unknown ordering", specs);
150  }
151  map<QualityOfServiceSpec::Reliability, SpreadMessage::QOS>::const_iterator
152  mapIt = orderMapIt->second.find(specs.getReliability());
153  if (mapIt == orderMapIt->second.end()) {
154  throw UnsupportedQualityOfServiceException("Unknown reliability", specs);
155  }
156 
157  messageQoS = mapIt->second;
158 
159  RSCDEBUG(logger, "Selected new message type " << messageQoS);
160 }
161 
162 const vector<string>& SpreadConnector::makeGroupNames(
163  const Scope& scope) const {
164 
165  boost::upgrade_lock<boost::shared_mutex> lock(groupNameCacheMutex);
166 
167  GroupNameCache::const_iterator it = this->groupNameCache.find(scope);
168  if (it != this->groupNameCache.end()) {
169  return it->second;
170  }
171 
172  boost::upgrade_to_unique_lock<boost::shared_mutex> uniqueLock(lock);
173 
174  // avoid flooding the cache
175  // rationale: normally there is only a limited amount of group names used in
176  // a system. In other cases we assume that the group names are created
177  // dynamically and in this case the cache won't help at all
178  if (groupNameCache.size() > 300) {
179  RSCDEBUG(logger, "Flushing group name cache");
180  groupNameCache.clear();
181  }
182 
183  // Warm-up cache
184  vector<string>& cacheItem = this->groupNameCache[scope];
185  vector<Scope> scopes = scope.superScopes(true);
186  for (vector<Scope>::const_iterator scopeIt = scopes.begin(); scopeIt
187  != scopes.end(); ++scopeIt) {
188  cacheItem.push_back(this->makeGroupName(*scopeIt));
189  }
190 
191  return cacheItem;
192 
193 }
194 
195 std::string SpreadConnector::makeGroupName(const Scope& scope) const {
196  return MD5(scope.toString()).toHexString().substr(0, MAX_GROUP_NAME - 1);
197 }
198 
199 }
200 }