RSB  0.17.0
Factory.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is part of the RSB project
4  *
5  * Copyright (C) 2011, 2012 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "Factory.h"
28 
29 #include <boost/lexical_cast.hpp>
30 
31 #include <boost/asio/ip/address.hpp>
32 
33 #include <boost/format.hpp>
34 
35 #include <rsc/runtime/ContainerIO.h>
36 
37 #include "BusServerImpl.h"
38 #include "LifecycledBusServer.h"
39 
40 using namespace std;
41 
42 using namespace boost;
43 
44 using namespace boost::asio;
45 using boost::asio::ip::tcp;
46 
47 using namespace rsc::logging;
48 
49 namespace rsb {
50 namespace transport {
51 namespace socket {
52 
53 // Create and start an io_service. This service will be shared between
54 // all bus providers created by this factory.
55 Factory::Factory() :
56  logger(Logger::getLogger("rsb.transport.socket.Factory")), asioService(
57  new AsioServiceContext) {
58  RSCDEBUG(logger, "Constructed and asio service created");
59 }
60 
62  RSCDEBUG(logger, "Destructing");
63 }
64 
65 template<class BusType>
66 boost::shared_ptr<BusType> Factory::searchInMap(const Endpoint& endpoint,
67  bool tcpnodelay, map<Endpoint, boost::weak_ptr<BusType> >& map) {
68  typename std::map<Endpoint, boost::weak_ptr<BusType> >::const_iterator it;
69  if ((it = map.find(endpoint)) != map.end()) {
70  boost::shared_ptr<BusType> result = it->second.lock();
71  if (result) {
72  checkOptions(result, tcpnodelay);
73  RSCDEBUG(logger,
74  "Found existing bus " << result
75  << " without resolving");
76  return result;
77  } else {
78  map.erase(endpoint);
79  }
80  }
81  return boost::shared_ptr<BusType>();
82 }
83 
84 BusPtr Factory::getBusClientFor(const string& host,
85  uint16_t port,
86  bool tcpnodelay) {
87  RSCDEBUG(logger, "Was asked for a bus client for " << host << ":" << port);
88 
89  // Try to find an entry for the exact specified endpoint. If this
90  // yields a hit, there is no need to resolve the specified name.
91  Endpoint endpoint(host, port);
92 
93  {
94  BusPtr result = searchInMap(endpoint, tcpnodelay, busClients);
95  if (result) {
96  return result;
97  }
98 
99  RSCDEBUG(logger, "Did not find bus client without resolving");
100  }
101 
102  // We did not find an entry for the exact specified entry. We try
103  // to resolve it to a working endpoint and use that one in the
104  // lookup.
105  // TODO(jmoringe): avoid this useless socket connection just for
106  // the lookup
107  RSCDEBUG(logger, "Resolving endpoint")
108  tcp::resolver resolver(*this->asioService->getService());
109  tcp::resolver::query query(host, lexical_cast<string>(port),
110  tcp::resolver::query::numeric_service);
111  for (tcp::resolver::iterator endpointIterator = resolver.resolve(query);
112  endpointIterator != tcp::resolver::iterator();
113  ++endpointIterator) {
114  endpoint = Endpoint(endpointIterator->host_name(), port);
115  // When we have a working endpoint, repeat the lookup.
116  BusPtr result = searchInMap(endpoint, tcpnodelay, busClients);
117  if (result) {
118  return result;
119  }
120  }
121 
122  // Try to open a socket for the resolved endpoint.
123  SocketPtr socket;
124  for (tcp::resolver::iterator endpointIterator = resolver.resolve(query);
125  endpointIterator != tcp::resolver::iterator();
126  ++endpointIterator) {
127  endpoint = Endpoint(endpointIterator->host_name(), port);
128  RSCDEBUG(logger, "Trying endpoint " << endpointIterator->endpoint());
129  socket.reset(new tcp::socket(*this->asioService->getService()));
130  boost::system::error_code error;
131  socket->connect(endpointIterator->endpoint(), error);
132  if (!error) {
133  RSCDEBUG(logger, "Success");
134  break;
135  }
136  RSCDEBUG(logger, "Failed: " << error.message());
137  socket.reset();
138  }
139  if (!socket) {
140  throw runtime_error(str(format("Could not connect to any of the endpoints to which %1%:%2% resolved.")
141  % host % port));
142  }
143 
144  // Name resolution did not yield any endpoints, or none of the
145  // worked. Create a new bus client.
146  RSCDEBUG(logger, "Did not find bus client after resolving; creating a new one");
147 
148  BusPtr result(new Bus(this->asioService, tcpnodelay));
149  this->busClients[endpoint] = result;
150 
151  BusConnectionPtr connection(new BusConnection(result, socket, true, tcpnodelay));
152  result->addConnection(connection);
153  connection->startReceiving();
154 
155  RSCDEBUG(logger, "Created new bus client " << result);
156 
157  return result;
158 }
159 
161  uint16_t port,
162  bool tcpnodelay) {
163  RSCDEBUG(logger, "Was asked for a bus server for " << host << ":" << port);
164 
165  // Try to find an existing entry for the specified endpoint.
166  Endpoint endpoint(host, port);
167 
168  BusServerPtr result = searchInMap(endpoint, tcpnodelay, busServers);
169  if (result) {
170  return result;
171  }
172 
173  // If there is no entry, create a new bus server and put it into
174  // the map.
175  RSCDEBUG(logger, "Did not find bus server; creating a new one");
176 
177  result = BusServerPtr(
179  BusServerPtr(
180  new BusServerImpl(this->asioService, port,
181  tcpnodelay))));
182  result->activate();
183  this->busServers[endpoint] = result;
184 
185  RSCDEBUG(logger, "Created new bus server " << result);
186 
187  return result;
188 }
189 
190 BusPtr Factory::getBus(const Server& serverMode,
191  const std::string& host,
192  const boost::uint16_t& port,
193  bool tcpnodelay) {
194 
195  boost::mutex::scoped_lock lock(this->busMutex);
196 
197  switch (serverMode) {
198  case SERVER_NO:
199  return getBusClientFor(host, port, tcpnodelay);
200  case SERVER_YES:
201  return getBusServerFor(host, port, tcpnodelay);
202  case SERVER_AUTO:
203  try {
204  return getBusServerFor(host, port, tcpnodelay);
205  } catch (const std::exception& e) {
206  RSCINFO(logger,
207  "Could not create server for bus: " << e.what() << "; trying to access bus as client");
208  return getBusClientFor(host, port, tcpnodelay);
209  }
210  default:
211  assert(false);
212  throw invalid_argument("Impossible Server enum value received");
213  }
214 
215 }
216 
217 void Factory::checkOptions(BusPtr bus, bool tcpnodelay) {
218  if (bus->isTcpnodelay() != tcpnodelay) {
219  throw invalid_argument(str(format("Requested tcpnodelay option %1% does not match existing option %2%")
220  % tcpnodelay % bus->isTcpnodelay()));
221  }
222 }
223 
225  static boost::mutex mutex;
226  static FactoryPtr defaultFactory;
227  boost::mutex::scoped_lock lock(mutex);
228  if (!defaultFactory) {
229  defaultFactory.reset(new Factory);
230  }
231  return defaultFactory;
232 }
233 
234 }
235 }
236 }
rsc::logging::LoggerPtr logger
Definition: Factory.h:85
Instances of this class provide access to a socket-based bus.
Definition: Bus.h:75
STL namespace.
boost::shared_ptr< BusType > searchInMap(const Endpoint &endpoint, bool tcpnodelay, std::map< Endpoint, boost::weak_ptr< BusType > > &map)
Searches inside a given map for an active pointer to a Bus instance matching the given query...
Definition: Factory.cpp:66
boost::shared_ptr< Factory > FactoryPtr
Definition: Factory.h:113
boost::shared_ptr< BusConnection > BusConnectionPtr
A class that keeps a boost asio service alive as long as this class lives.
boost::shared_ptr< boost::asio::ip::tcp::socket > SocketPtr
Definition: Factory.h:78
boost::shared_ptr< BusServer > BusServerPtr
Definition: BusServer.h:67
FactoryPtr getDefaultFactory()
Definition: Factory.cpp:224
The singleton instance of this class is responsible for managing bus provider objects.
Definition: Factory.h:62
static void checkOptions(BusPtr bus, bool tcpnodelay)
Definition: Factory.cpp:217
AsioServiceContextPtr asioService
Definition: Factory.h:91
std::pair< std::string, boost::uint16_t > Endpoint
Definition: Factory.h:77
A facade around BusServer instances to allow breaking dependency cycles.
Instances of this class implement connections to a socket-based bus.
Definition: BusConnection.h:74
Instances of this class provide access to a socket-based bus for local and remote bus clients...
Definition: BusServerImpl.h:59
BusPtr getBus(const Server &serverMode, const std::string &host, const boost::uint16_t &port, bool tcpnodelay)
Returns either a BusClient or Server depending on the chosen serverMode and the existence of a server...
Definition: Factory.cpp:190
BusServerPtr getBusServerFor(const std::string &host, boost::uint16_t port, bool tcpnodelay)
Definition: Factory.cpp:160
boost::shared_ptr< Bus > BusPtr
Definition: BusConnection.h:51
BusPtr getBusClientFor(const std::string &host, boost::uint16_t port, bool tcpnodelay)
Definition: Factory.cpp:84