RSB  0.7.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Factory.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is part of the RSB project
4  *
5  * Copyright (C) 2011, 2012 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "Factory.h"
28 
29 #include <boost/lexical_cast.hpp>
30 
31 #include <boost/asio/ip/address.hpp>
32 
33 #include <boost/format.hpp>
34 
35 using namespace std;
36 
37 using namespace boost;
38 
39 using namespace boost::asio;
40 using boost::asio::ip::tcp;
41 
42 using namespace rsc::logging;
43 
44 namespace rsb {
45 namespace transport {
46 namespace socket {
47 
48 // Create and start an io_service. This service will be shared between
49 // all bus providers created by this factory.
50 Factory::Factory() :
51  logger(Logger::getLogger("rsb.transport.socket.Factory")),
52  keepAlive(new io_service::work(service)),
53  thread(boost::bind(&boost::asio::io_service::run, &service)) {
54  RSCINFO(logger, "Started service thread");
55 }
56 
58  if (!this->busClients.empty()) {
59  RSCWARN(logger, "Remaining bus clients: " << this->busClients);
60  }
61 
62  RSCINFO(logger, "Stopping service thread");
63  this->keepAlive.reset();
64  this->thread.join();
65  RSCINFO(logger, "Stopped service thread");
66 }
67 
68 BusPtr Factory::getBusClientFor(const string& host,
69  uint16_t port,
70  bool tcpnodelay,
71  ConnectorBase* connector) {
72  RSCDEBUG(logger, "Was asked for a bus client for " << host << ":" << port);
73 
74  // Try to find an entry for the exact specified endpoint. If this
75  // yields a hit, there is no need to resolve the specified name.
76  Endpoint endpoint(host, port);
77 
78  {
79  BusClientMap::const_iterator it;
80  if ((it = this->busClients.find(endpoint)) != this->busClients.end()) {
81  BusPtr result = it->second;
82  checkOptions(result, tcpnodelay);
83  result->addConnector(connector);
84  RSCDEBUG(logger, "Found existing bus client "
85  << result << " without resolving");
86  return result;
87  }
88 
89  RSCDEBUG(logger, "Did not find bus client without resolving");
90  }
91 
92  // We did not find an entry for the exact specified entry. We try
93  // to resolve it to a working endpoint and use that one in the
94  // lookup.
95  // TODO(jmoringe): avoid this useless socket connection just for
96  // the lookup
97  RSCDEBUG(logger, "Resolving endpoint")
98  tcp::resolver resolver(this->service);
99  tcp::resolver::query query(host, lexical_cast<string>(port),
100  tcp::resolver::query::numeric_service);
101  for (tcp::resolver::iterator endpointIterator = resolver.resolve(query);
102  endpointIterator != tcp::resolver::iterator();
103  ++endpointIterator) {
104  endpoint = Endpoint(endpointIterator->host_name(), port);
105  // When we have a working endpoint, repeat the lookup. Create
106  // a new bus client, if there still is no entry.
107  BusClientMap::const_iterator it;
108  if ((it = this->busClients.find(endpoint)) != this->busClients.end()) {
109  BusPtr result = it->second;
110  checkOptions(result, tcpnodelay);
111  result->addConnector(connector);
112  RSCDEBUG(logger, "Found existing bus client "
113  << it->second << " after resolving");
114  return result;
115  }
116  }
117 
118  // Try to open a socket for the resolved endpoint.
119  SocketPtr socket;
120  for (tcp::resolver::iterator endpointIterator = resolver.resolve(query);
121  endpointIterator != tcp::resolver::iterator();
122  ++endpointIterator) {
123  endpoint = Endpoint(endpointIterator->host_name(), port);
124  RSCDEBUG(logger, "Trying endpoint " << endpointIterator->endpoint());
125  socket.reset(new tcp::socket(this->service));
126  boost::system::error_code error;
127  socket->connect(endpointIterator->endpoint(), error);
128  if (!error) {
129  RSCDEBUG(logger, "Success");
130  break;
131  }
132  RSCDEBUG(logger, "Failed: " << error.message());
133  socket.reset();
134  }
135  if (!socket) {
136  throw runtime_error(str(format("Could not connector to any of the endpoints to which %1%:%2% resolved.")
137  % host % port));
138  }
139 
140  // Name resolution did not yield any endpoints, or none of the
141  // worked. Create a new bus client.
142  RSCDEBUG(logger, "Did not find bus client after resolving; creating a new one");
143 
144  BusPtr result(new Bus(this->service, tcpnodelay));
145  this->busClients[endpoint] = result;
146 
147  BusConnectionPtr connection(new BusConnection(result, socket, true, tcpnodelay));
148  result->addConnection(connection);
149  connection->startReceiving();
150 
151  result->addConnector(connector);
152 
153  RSCDEBUG(logger, "Created new bus client " << result);
154 
155  return result;
156 }
157 
159  RSCDEBUG(logger, "Removing client bus " << bus);
160 
161  boost::mutex::scoped_lock lock(this->busMutex);
162 
163  for (BusClientMap::iterator it = this->busClients.begin();
164  it != this->busClients.end(); ++it) {
165  if (it->second == bus) {
166  this->busClients.erase(it);
167  RSCDEBUG(logger, "Removed");
168  return;
169  }
170  }
171 }
172 
174  uint16_t port,
175  bool tcpnodelay,
176  ConnectorBase* connector) {
177  RSCDEBUG(logger, "Was asked for a bus server for " << host << ":" << port);
178 
179  // Try to find an existing entry for the specified endpoint.
180  Endpoint endpoint(host, port);
181 
182  BusServerMap::const_iterator it;
183  if ((it = this->busServers.find(endpoint)) != this->busServers.end()) {
184  RSCDEBUG(logger, "Found existing bus server " << it->second);
185  checkOptions(it->second, tcpnodelay);
186  it->second->addConnector(connector);
187  return it->second;
188  }
189 
190  // If there is no entry, create a new bus server and put it into
191  // the map.
192  RSCDEBUG(logger, "Did not find bus server; creating a new one");
193 
194  BusServerPtr result(new BusServer(port, tcpnodelay, this->service));
195  result->activate();
196  this->busServers[endpoint] = result;
197 
198  result->addConnector(connector);
199 
200  RSCDEBUG(logger, "Created new bus client " << result);
201 
202  return result;
203 }
204 
206  RSCDEBUG(logger, "Removing server bus " << bus);
207 
208  boost::mutex::scoped_lock lock(this->busMutex);
209 
210  for (BusServerMap::iterator it = this->busServers.begin();
211  it != this->busServers.end(); ++it) {
212  if (it->second == bus) {
213  boost::dynamic_pointer_cast<BusServer>(bus)->deactivate();
214  this->busServers.erase(it);
215  RSCDEBUG(logger, "Removed");
216  return;
217  }
218  }
219 }
220 
221 BusPtr Factory::getBus(const Server& serverMode,
222  const std::string& host,
223  const boost::uint16_t& port,
224  bool tcpnodelay,
225  ConnectorBase* connector) {
226 
227  boost::mutex::scoped_lock lock(this->busMutex);
228 
229  switch (serverMode) {
230  case SERVER_NO:
231  return getBusClientFor(host, port, tcpnodelay, connector);
232  case SERVER_YES:
233  return getBusServerFor(host, port, tcpnodelay, connector);
234  case SERVER_AUTO:
235  try {
236  return getBusServerFor(host, port, tcpnodelay, connector);
237  } catch (const std::exception& e) {
238  RSCINFO(logger,
239  "Could not create server for bus: " << e.what() << "; trying to access bus as client");
240  return getBusClientFor(host, port, tcpnodelay, connector);
241  }
242  default:
243  assert(false);
244  throw invalid_argument("Impossible Server enum value received");
245  }
246 
247 }
248 
249 void Factory::checkOptions(BusPtr bus, bool tcpnodelay) {
250  if (bus->isTcpnodelay() != tcpnodelay) {
251  throw invalid_argument(str(format("Requested tcpnodelay option %1% does not match existing option %2%")
252  % tcpnodelay % bus->isTcpnodelay()));
253  }
254 }
255 
256 }
257 }
258 }