RSB  0.19.0
Factory.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is part of the RSB project
4  *
5  * Copyright (C) 2011-2018 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "Factory.h"
28 
29 #include <boost/lexical_cast.hpp>
30 
31 #include <boost/asio/ip/address.hpp>
32 
33 #include <boost/format.hpp>
34 
35 #include <rsc/runtime/ContainerIO.h>
36 
37 #include "BusImpl.h"
38 #include "BusServerImpl.h"
39 #include "LifecycledBusServer.h"
40 
41 using namespace std;
42 
43 using namespace boost;
44 
45 using namespace boost::asio;
46 using boost::asio::ip::tcp;
47 
48 using namespace rsc::logging;
49 
50 namespace rsb {
51 namespace transport {
52 namespace socket {
53 
54 // Create and start an io_service. This service will be shared between
55 // all bus providers created by this factory.
56 Factory::Factory() :
57  logger(Logger::getLogger("rsb.transport.socket.Factory")), asioService(
58  new AsioServiceContext) {
59  RSCDEBUG(logger, "Constructed and asio service created");
60 }
61 
63  RSCDEBUG(logger, "Destructing");
64 }
65 
66 template<class BusType>
67 boost::shared_ptr<BusType> Factory::searchInMap(const Endpoint& endpoint,
68  bool tcpnodelay, map<Endpoint, boost::weak_ptr<BusType> >& map) {
69  typename std::map<Endpoint, boost::weak_ptr<BusType> >::const_iterator it;
70  if ((it = map.find(endpoint)) != map.end()) {
71  boost::shared_ptr<BusType> result = it->second.lock();
72  if (result) {
73  checkOptions(result, tcpnodelay);
74  RSCDEBUG(logger,
75  "Found existing bus " << result
76  << " without resolving");
77  return result;
78  } else {
79  map.erase(endpoint);
80  }
81  }
82  return boost::shared_ptr<BusType>();
83 }
84 
85 BusPtr Factory::getBusClientFor(const string& host,
86  uint16_t port,
87  bool tcpnodelay) {
88  RSCDEBUG(logger, "Was asked for a bus client for " << host << ":" << port);
89 
90  // Try to find an entry for the exact specified endpoint. If this
91  // yields a hit, there is no need to resolve the specified name.
92  Endpoint endpoint(host, port);
93 
94  {
95  BusPtr result = searchInMap(endpoint, tcpnodelay, busClients);
96  if (result) {
97  return result;
98  }
99 
100  RSCDEBUG(logger, "Did not find bus client without resolving");
101  }
102 
103  // We did not find an entry for the exact specified entry. We try
104  // to resolve it to a working endpoint and use that one in the
105  // lookup.
106  // TODO(jmoringe): avoid this useless socket connection just for
107  // the lookup
108  RSCDEBUG(logger, "Resolving endpoint")
109  tcp::resolver resolver(*this->asioService->getService());
110  tcp::resolver::query query(host, lexical_cast<string>(port),
111  tcp::resolver::query::numeric_service);
112  for (tcp::resolver::iterator endpointIterator = resolver.resolve(query);
113  endpointIterator != tcp::resolver::iterator();
114  ++endpointIterator) {
115  endpoint = Endpoint(endpointIterator->host_name(), port);
116  // When we have a working endpoint, repeat the lookup.
117  BusPtr result = searchInMap(endpoint, tcpnodelay, busClients);
118  if (result) {
119  return result;
120  }
121  }
122 
123  // Try to open a socket for the resolved endpoint.
124  SocketPtr socket;
125  for (tcp::resolver::iterator endpointIterator = resolver.resolve(query);
126  endpointIterator != tcp::resolver::iterator();
127  ++endpointIterator) {
128  endpoint = Endpoint(endpointIterator->host_name(), port);
129  RSCDEBUG(logger, "Trying endpoint " << endpointIterator->endpoint());
130  socket.reset(new tcp::socket(*this->asioService->getService()));
131  boost::system::error_code error;
132  socket->connect(endpointIterator->endpoint(), error);
133  if (!error) {
134  RSCDEBUG(logger, "Success");
135  break;
136  }
137  RSCDEBUG(logger, "Failed: " << error.message());
138  socket.reset();
139  }
140  if (!socket) {
141  throw runtime_error(str(format("Could not connect to any of the endpoints to which %1%:%2% resolved.")
142  % host % port));
143  }
144 
145  // Name resolution did not yield any endpoints, or none of the
146  // worked. Create a new bus client.
147  RSCDEBUG(logger, "Did not find bus client after resolving; creating a new one");
148 
149  BusPtr result(new BusImpl(this->asioService, tcpnodelay));
150  this->busClients[endpoint] = result;
151 
152  BusConnectionPtr connection(new BusConnection(result, socket, true, tcpnodelay));
153  result->addConnection(connection);
154  connection->startReceiving();
155 
156  RSCDEBUG(logger, "Created new bus client " << result);
157 
158  return result;
159 }
160 
162  uint16_t port,
163  bool tcpnodelay) {
164  RSCDEBUG(logger, "Was asked for a bus server for " << host << ":" << port);
165 
166  // Try to find an existing entry for the specified endpoint.
167  Endpoint endpoint(host, port);
168 
169  BusServerPtr result = searchInMap(endpoint, tcpnodelay, busServers);
170  if (result) {
171  return result;
172  }
173 
174  // If there is no entry, create a new bus server and put it into
175  // the map.
176  RSCDEBUG(logger, "Did not find bus server; creating a new one");
177 
178  result = BusServerPtr(
180  BusServerPtr(
181  new BusServerImpl(this->asioService, port,
182  tcpnodelay))));
183  result->activate();
184  this->busServers[endpoint] = result;
185 
186  RSCDEBUG(logger, "Created new bus server " << result);
187 
188  return result;
189 }
190 
191 BusPtr Factory::getBus(const Server& serverMode,
192  const std::string& host,
193  const boost::uint16_t& port,
194  bool tcpnodelay) {
195 
196  boost::mutex::scoped_lock lock(this->busMutex);
197 
198  switch (serverMode) {
199  case SERVER_NO:
200  return getBusClientFor(host, port, tcpnodelay);
201  case SERVER_YES:
202  return getBusServerFor(host, port, tcpnodelay);
203  case SERVER_AUTO:
204  try {
205  return getBusServerFor(host, port, tcpnodelay);
206  } catch (const std::exception& e) {
207  RSCINFO(logger,
208  "Could not create server for bus: " << e.what() << "; trying to access bus as client");
209  return getBusClientFor(host, port, tcpnodelay);
210  }
211  default:
212  assert(false);
213  throw invalid_argument("Impossible Server enum value received");
214  }
215 
216 }
217 
218 void Factory::checkOptions(BusPtr bus, bool tcpnodelay) {
219  if (bus->isTcpnodelay() != tcpnodelay) {
220  throw invalid_argument(str(format("Requested tcpnodelay option %1% does not match existing option %2%")
221  % tcpnodelay % bus->isTcpnodelay()));
222  }
223 }
224 
226  static boost::mutex mutex;
227  static FactoryPtr defaultFactory;
228  boost::mutex::scoped_lock lock(mutex);
229  if (!defaultFactory) {
230  defaultFactory.reset(new Factory);
231  }
232  return defaultFactory;
233 }
234 
235 }
236 }
237 }
rsc::logging::LoggerPtr logger
Definition: Factory.h:85
STL namespace.
boost::shared_ptr< BusType > searchInMap(const Endpoint &endpoint, bool tcpnodelay, std::map< Endpoint, boost::weak_ptr< BusType > > &map)
Searches inside a given map for an active pointer to a Bus instance matching the given query...
Definition: Factory.cpp:67
boost::shared_ptr< Factory > FactoryPtr
Definition: Factory.h:113
A class that keeps a boost asio service alive as long as this class lives.
boost::shared_ptr< boost::asio::ip::tcp::socket > SocketPtr
Definition: Factory.h:78
boost::shared_ptr< BusServer > BusServerPtr
Definition: BusServer.h:64
FactoryPtr getDefaultFactory()
Definition: Factory.cpp:225
The singleton instance of this class is responsible for managing bus provider objects.
Definition: Factory.h:62
static void checkOptions(BusPtr bus, bool tcpnodelay)
Definition: Factory.cpp:218
AsioServiceContextPtr asioService
Definition: Factory.h:91
boost::shared_ptr< BusConnection > BusConnectionPtr
Definition: Bus.h:46
std::pair< std::string, boost::uint16_t > Endpoint
Definition: Factory.h:77
A facade around BusServer instances to allow breaking dependency cycles.
Instances of this class implement connections to a socket-based bus.
Definition: BusConnection.h:74
Instances of this class provide access to a socket-based bus for local and remote bus clients...
Definition: BusServerImpl.h:60
BusPtr getBus(const Server &serverMode, const std::string &host, const boost::uint16_t &port, bool tcpnodelay)
Returns either a BusClient or Server depending on the chosen serverMode and the existence of a server...
Definition: Factory.cpp:191
BusServerPtr getBusServerFor(const std::string &host, boost::uint16_t port, bool tcpnodelay)
Definition: Factory.cpp:161
boost::shared_ptr< Bus > BusPtr
Definition: Bus.h:99
BusPtr getBusClientFor(const std::string &host, boost::uint16_t port, bool tcpnodelay)
Definition: Factory.cpp:85