RSB  0.9.6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Factory.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is part of the RSB project
4  *
5  * Copyright (C) 2011, 2012 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "Factory.h"
28 
29 #include <boost/lexical_cast.hpp>
30 
31 #include <boost/asio/ip/address.hpp>
32 
33 #include <boost/format.hpp>
34 
35 using namespace std;
36 
37 using namespace boost;
38 
39 using namespace boost::asio;
40 using boost::asio::ip::tcp;
41 
42 using namespace rsc::logging;
43 
44 namespace rsb {
45 namespace transport {
46 namespace socket {
47 
48 // Create and start an io_service. This service will be shared between
49 // all bus providers created by this factory.
50 Factory::Factory() :
51  logger(Logger::getLogger("rsb.transport.socket.Factory")),
52  keepAlive(new io_service::work(service)),
53  thread(boost::bind(&boost::asio::io_service::run, &service)) {
54  RSCINFO(logger, "Started service thread");
55 }
56 
58  if (!this->busClients.empty()) {
59  RSCWARN(logger, "Remaining bus clients: " << this->busClients);
60  }
61  if (!this->busServers.empty()) {
62  RSCWARN(logger, "Remaining bus servers: " << this->busServers);
63  }
64 
65  RSCINFO(logger, "Stopping service thread");
66  this->keepAlive.reset();
67  this->thread.join();
68  RSCINFO(logger, "Stopped service thread");
69 }
70 
71 BusPtr Factory::getBusClientFor(const string& host,
72  uint16_t port,
73  bool tcpnodelay,
74  ConnectorBase* connector) {
75  RSCDEBUG(logger, "Was asked for a bus client for " << host << ":" << port);
76 
77  // Try to find an entry for the exact specified endpoint. If this
78  // yields a hit, there is no need to resolve the specified name.
79  Endpoint endpoint(host, port);
80 
81  {
82  BusClientMap::const_iterator it;
83  if ((it = this->busClients.find(endpoint)) != this->busClients.end()) {
84  BusPtr result = it->second;
85  checkOptions(result, tcpnodelay);
86  result->addConnector(connector);
87  RSCDEBUG(logger, "Found existing bus client "
88  << result << " without resolving");
89  return result;
90  }
91 
92  RSCDEBUG(logger, "Did not find bus client without resolving");
93  }
94 
95  // We did not find an entry for the exact specified entry. We try
96  // to resolve it to a working endpoint and use that one in the
97  // lookup.
98  // TODO(jmoringe): avoid this useless socket connection just for
99  // the lookup
100  RSCDEBUG(logger, "Resolving endpoint")
101  tcp::resolver resolver(this->service);
102  tcp::resolver::query query(host, lexical_cast<string>(port),
103  tcp::resolver::query::numeric_service);
104  for (tcp::resolver::iterator endpointIterator = resolver.resolve(query);
105  endpointIterator != tcp::resolver::iterator();
106  ++endpointIterator) {
107  endpoint = Endpoint(endpointIterator->host_name(), port);
108  // When we have a working endpoint, repeat the lookup. Create
109  // a new bus client, if there still is no entry.
110  BusClientMap::const_iterator it;
111  if ((it = this->busClients.find(endpoint)) != this->busClients.end()) {
112  BusPtr result = it->second;
113  checkOptions(result, tcpnodelay);
114  result->addConnector(connector);
115  RSCDEBUG(logger, "Found existing bus client "
116  << it->second << " after resolving");
117  return result;
118  }
119  }
120 
121  // Try to open a socket for the resolved endpoint.
122  SocketPtr socket;
123  for (tcp::resolver::iterator endpointIterator = resolver.resolve(query);
124  endpointIterator != tcp::resolver::iterator();
125  ++endpointIterator) {
126  endpoint = Endpoint(endpointIterator->host_name(), port);
127  RSCDEBUG(logger, "Trying endpoint " << endpointIterator->endpoint());
128  socket.reset(new tcp::socket(this->service));
129  boost::system::error_code error;
130  socket->connect(endpointIterator->endpoint(), error);
131  if (!error) {
132  RSCDEBUG(logger, "Success");
133  break;
134  }
135  RSCDEBUG(logger, "Failed: " << error.message());
136  socket.reset();
137  }
138  if (!socket) {
139  throw runtime_error(str(format("Could not connect to any of the endpoints to which %1%:%2% resolved.")
140  % host % port));
141  }
142 
143  // Name resolution did not yield any endpoints, or none of the
144  // worked. Create a new bus client.
145  RSCDEBUG(logger, "Did not find bus client after resolving; creating a new one");
146 
147  BusPtr result(new Bus(this->service, tcpnodelay));
148  this->busClients[endpoint] = result;
149 
150  BusConnectionPtr connection(new BusConnection(result, socket, true, tcpnodelay));
151  result->addConnection(connection);
152  connection->startReceiving();
153 
154  result->addConnector(connector);
155 
156  RSCDEBUG(logger, "Created new bus client " << result);
157 
158  return result;
159 }
160 
162  RSCDEBUG(logger, "Removing client bus " << bus);
163 
164  boost::mutex::scoped_lock lock(this->busMutex);
165 
166  for (BusClientMap::iterator it = this->busClients.begin();
167  it != this->busClients.end(); ++it) {
168  if (it->second == bus) {
169  this->busClients.erase(it);
170  RSCDEBUG(logger, "Removed");
171  return;
172  }
173  }
174 }
175 
177  uint16_t port,
178  bool tcpnodelay,
179  ConnectorBase* connector) {
180  RSCDEBUG(logger, "Was asked for a bus server for " << host << ":" << port);
181 
182  // Try to find an existing entry for the specified endpoint.
183  Endpoint endpoint(host, port);
184 
185  BusServerMap::const_iterator it;
186  if ((it = this->busServers.find(endpoint)) != this->busServers.end()) {
187  RSCDEBUG(logger, "Found existing bus server " << it->second);
188  checkOptions(it->second, tcpnodelay);
189  it->second->addConnector(connector);
190  return it->second;
191  }
192 
193  // If there is no entry, create a new bus server and put it into
194  // the map.
195  RSCDEBUG(logger, "Did not find bus server; creating a new one");
196 
197  BusServerPtr result(new BusServer(port, tcpnodelay, this->service));
198  result->activate();
199  this->busServers[endpoint] = result;
200 
201  result->addConnector(connector);
202 
203  RSCDEBUG(logger, "Created new bus client " << result);
204 
205  return result;
206 }
207 
209  RSCDEBUG(logger, "Removing server bus " << bus);
210 
211  boost::mutex::scoped_lock lock(this->busMutex);
212 
213  for (BusServerMap::iterator it = this->busServers.begin();
214  it != this->busServers.end(); ++it) {
215  if (it->second == bus) {
216  boost::dynamic_pointer_cast<BusServer>(bus)->deactivate();
217  this->busServers.erase(it);
218  RSCDEBUG(logger, "Removed");
219  return;
220  }
221  }
222 }
223 
224 BusPtr Factory::getBus(const Server& serverMode,
225  const std::string& host,
226  const boost::uint16_t& port,
227  bool tcpnodelay,
228  ConnectorBase* connector) {
229 
230  boost::mutex::scoped_lock lock(this->busMutex);
231 
232  switch (serverMode) {
233  case SERVER_NO:
234  return getBusClientFor(host, port, tcpnodelay, connector);
235  case SERVER_YES:
236  return getBusServerFor(host, port, tcpnodelay, connector);
237  case SERVER_AUTO:
238  try {
239  return getBusServerFor(host, port, tcpnodelay, connector);
240  } catch (const std::exception& e) {
241  RSCINFO(logger,
242  "Could not create server for bus: " << e.what() << "; trying to access bus as client");
243  return getBusClientFor(host, port, tcpnodelay, connector);
244  }
245  default:
246  assert(false);
247  throw invalid_argument("Impossible Server enum value received");
248  }
249 
250 }
251 
252 void Factory::checkOptions(BusPtr bus, bool tcpnodelay) {
253  if (bus->isTcpnodelay() != tcpnodelay) {
254  throw invalid_argument(str(format("Requested tcpnodelay option %1% does not match existing option %2%")
255  % tcpnodelay % bus->isTcpnodelay()));
256  }
257 }
258 
259 }
260 }
261 }
rsc::logging::LoggerPtr logger
Definition: Factory.h:85
BusServerPtr getBusServerFor(const std::string &host, boost::uint16_t port, bool tcpnodelay, ConnectorBase *connector)
Definition: Factory.cpp:176
void removeBusClient(BusPtr bus)
Definition: Factory.cpp:161
boost::shared_ptr< BusConnection > BusConnectionPtr
boost::asio::io_service service
Definition: Factory.h:91
boost::shared_ptr< boost::asio::ip::tcp::socket > SocketPtr
Definition: Factory.h:78
boost::shared_ptr< BusServer > BusServerPtr
Definition: BusServer.h:105
Instances of this class provide access to a socket-based bus for local and remote bus clients...
Definition: BusServer.h:59
BusPtr getBusClientFor(const std::string &host, boost::uint16_t port, bool tcpnodelay, ConnectorBase *connector)
Definition: Factory.cpp:71
void removeBusServer(BusPtr bus)
Definition: Factory.cpp:208
This class is intended to be used as a base class for connector classes of the socket-based transport...
Definition: ConnectorBase.h:58
static void checkOptions(BusPtr bus, bool tcpnodelay)
Definition: Factory.cpp:252
std::pair< std::string, boost::uint16_t > Endpoint
Definition: Factory.h:77
Instances of this class implement connections to a socket-based bus.
Definition: BusConnection.h:74
BusPtr getBus(const Server &serverMode, const std::string &host, const boost::uint16_t &port, bool tcpnodelay, ConnectorBase *connector)
Returns either a BusClient or Server depending on the chosen serverMode and the existence of a server...
Definition: Factory.cpp:224
boost::shared_ptr< Bus > BusPtr
Definition: BusConnection.h:51