RSB  0.13.2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Factory.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is a part of the RSB project.
4  *
5  * Copyright (C) 2011 by Johannes Wienke <jwienke at techfak dot uni-bielefeld dot de>
6  * Copyright (C) 2012, 2013, 2014 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
7  *
8  * This file may be licensed under the terms of the
9  * GNU Lesser General Public License Version 3 (the ``LGPL''),
10  * or (at your option) any later version.
11  *
12  * Software distributed under the License is distributed
13  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
14  * express or implied. See the LGPL for the specific language
15  * governing rights and limitations.
16  *
17  * You should have received a copy of the LGPL along with this
18  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
19  * or write to the Free Software Foundation, Inc.,
20  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21  *
22  * The development of this software was supported by:
23  * CoR-Lab, Research Institute for Cognition and Robotics
24  * Bielefeld University
25  *
26  * ============================================================ */
27 
28 #include "Factory.h"
29 
30 #include <boost/filesystem/fstream.hpp>
31 
32 #include <rsc/config/Configuration.h>
33 #include <rsc/config/Environment.h>
34 
35 #include <rsc/logging/OptionBasedConfigurator.h>
36 
37 #include <rsc/plugins/Manager.h>
38 #include <rsc/plugins/Configurator.h>
39 
40 #include <rsb/Version.h>
41 
43 
44 #include "converter/converters.h"
45 #include "converter/Repository.h"
47 
48 #include "transport/transports.h"
49 
50 using namespace std;
51 
52 using namespace rsc::config;
53 using namespace rsc::logging;
54 using namespace rsc::runtime;
55 
56 using namespace rsb::converter;
57 using namespace rsb::transport;
58 
59 namespace { // anonymous namespace for file-local helper functions.
60 
62  SERIALIZATION,
63  DESERIALIZATION
64 };
65 
66 template<typename C>
67 std::map<typename C::value_type::first_type,
68  typename C::value_type::second_type>
69 converterSelectionToMap(const C& container, ConverterDirection direction) {
70  typedef typename C::value_type::first_type first_type;
71  typedef typename C::value_type::second_type second_type;
72 
73  typedef typename C::const_iterator const_iterator;
74 
75  std::map<first_type, second_type> result;
76  for (const_iterator it = container.begin(); it != container.end(); ++it) {
77  switch (direction) {
78  case DESERIALIZATION:
79  result[it->first] = it->second;
80  break;
81  case SERIALIZATION:
82  if (result.find(it->second) != result.end()) {
83  throw std::invalid_argument(
84  boost::str(
85  boost::format(
86  "Multiple wire-schemas (%1%, %2%) selected for data-type %3%.\n"
87  "Probably you wrote the lines transport.<name>.cpp.%1% = %3% "
88  "and transport.<name>.cpp.%2% = %3% in your rsb config. One of "
89  "these should be deleted.")
90  % it->first
91  % result[it->second]
92  % it->second
93  )
94  );
95  }
96  result[it->second] = it->first;
97  break;
98  default:
99  assert(false);
100  throw std::runtime_error("got unexpected serialization direction");
101  }
102  }
103  return result;
104 }
105 
106 Properties
107 prepareConnectorOptions(const rsb::ParticipantConfig::Transport& config,
108  ConverterDirection direction,
109  rsc::logging::LoggerPtr logger) {
110  Properties options = config.getOptions();
111  RSCDEBUG(logger, "Supplied connector options " << options);
112 
113  // For local transport, we do not mess with the converter
114  // configuration since it is not used (or even touched) anyway.
115  //
116  // For remote transports, we build a converter selection strategy
117  // - suitable for direction - and put it into the "converters"
118  // property of the transport configuration.
119  //
120  // Note that config.getConverters() only returns /converter
121  // disambiguations/. These are used to guide the actual converter
122  // selection in converterSelectionToMap().
123  if (rsb::transport::isRemote(config.getName()) && !options.has("converters")) {
124  RSCDEBUG(logger, "Converter configuration for transport `"
125  << config.getName() << "': " << config.getConverters());
126 
127  ConverterSelectionStrategy<string>::Ptr converters; // TODO we should not have to know the transport's wire-type here
128  switch (direction) {
129  case SERIALIZATION:
130  converters
131  = converterRepository<string>() // TODO wire-type
132  ->getConvertersForSerialization
133  (converterSelectionToMap(config.getConverters(), direction));
134  break;
135  case DESERIALIZATION:
136  converters
137  = converterRepository<string>() // TODO wire-type
138  ->getConvertersForDeserialization
139  (converterSelectionToMap(config.getConverters(), direction));
140  break;
141  default:
142  assert(false);
143  throw std::runtime_error("got unexpected serialization direction");
144  }
145  RSCDEBUG(logger, "Selected converters for transport `"
146  << config.getName() << "': " << converters);
147  options["converters"] = converters;
148  }
149 
150  return options;
151 }
152 
153 }
154 
155 namespace rsb {
156 
158 
162  } else {
163  static Factory factory;
164  return factory;
165  }
166 }
167 
168 Factory::Factory() :
169  logger(Logger::getLogger("rsb.Factory")),
170  pluginManager(new rsc::plugins::Manager()),
171  signalParticipantCreated(new SignalParticipantCreated),
172  signalParticipantDestroyed(new SignalParticipantDestroyed) {
173 
174  // Configure RSC-based logging.
175  {
176  rsc::logging::OptionBasedConfigurator configurator;
177  configure(configurator, "rsb.conf", "RSC_", 0, 0, false, Version::installPrefix());
178  }
179 
180  // Register default implementation for all extension points.
181  RSCINFO(this->logger, "Registering default implementations");
185 
186  // Configure plugin path and load plugins to register additional
187  // implementations for extension points.
188  //
189  // We use the following default plugin path:
190  // 1. $HOME/.$RSB_PLUGIN_PATH_SUFFIX
191  // 2. $libdir/$RSB_PLUGIN_PATH_SUFFIX
192  RSCINFO(this->logger, "Processing plugin configuration");
193  {
195 
196  vector<boost::filesystem::path> defaultPath;
197  // It may be impossible to determine a home directory for the
198  // current user. Warn, but don't throw.
199  try {
200  defaultPath.push_back(userHomeDirectory()
201  / ("." + Version::buildPluginPathSuffix()));
202  } catch (const runtime_error& e) {
203  RSCWARN(this->logger,
204  "Failed to determine user-specific plugin directory: "
205  << e.what());
206  }
207  defaultPath.push_back(Version::libdir() / Version::buildPluginPathSuffix());
208  rsc::plugins::Configurator configurator(pluginManager, defaultPath);
209  provideConfigOptions(configurator);
210  configurator.execute(true);
211  }
212  factoryWhileLoadingPlugins = NULL; // TODO unwind-protect
213 
214  // Setup default participant config
215  //
216  // Collect all available connector implementations:
217  // + In-push
218  // + In-pull
219  // + Out
220  // Disable discovered connectors with the exception of the
221  // socket transport.
222  set<string> availableTransports = getAvailableTransports(DIRECTION_IN_PUSH | DIRECTION_IN_PULL | DIRECTION_OUT);
223 
225  for (set<string>::const_iterator it = availableTransports.begin();
226  it != availableTransports.end(); ++it) {
227  this->defaultConfig.addTransport(ParticipantConfig::Transport(*it, *it == "socket"));
228  }
229 
230  // If there is only one transport, we can blindly enable it since
231  // the user could end up without any enabled transport otherwise.
232  if (this->defaultConfig.getTransports().size() == 1) {
233  string name = this->defaultConfig.getTransports().begin()->getName();
234  this->defaultConfig.mutableTransport(name).setEnabled(true);
235  }
236 
237  // Merge with user configuration (configuration files, environment
238  // variables)
240 
241  // Issue a warning if the combination of available transport
242  // implementations and user configuration leads to no enabled
243  // transports.
244  if (this->defaultConfig.getTransports().empty()) {
245  RSCWARN(logger, "No transports are enabled. This is probably a configuration error or an internal RSB error.");
246  }
247 
248  RSCDEBUG(logger, "Default config " << defaultConfig);
249 }
250 
252 }
253 
254 void Factory::provideConfigOptions(OptionHandler& handler) {
255  configure(handler, "rsb.conf", "RSB_", 0, 0, true,
256  Version::installPrefix());
257 }
258 
260  return this->signalParticipantCreated;
261 }
262 
264  return this->signalParticipantDestroyed;
265 }
266 
268  const string& dataType,
269  const ParticipantConfig& config,
270  ParticipantPtr parent) {
271  InformerBasePtr informer(
272  new InformerBase(createOutConnectors(config), scope, config, dataType));
273  informer->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
274  (*this->signalParticipantCreated)(informer, parent);
275  return informer;
276 }
277 
278 
280  const ParticipantConfig& config,
281  ParticipantPtr parent) {
282  ListenerPtr listener(
283  new Listener(createInPushConnectors(config), scope, config));
284  listener->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
285  (*this->signalParticipantCreated)(listener, parent);
286  return listener;
287 }
288 
290  const ParticipantConfig& config,
291  ParticipantPtr parent) {
292  ReaderPtr reader(
293  new Reader(createInPullConnectors(config), scope, config));
294  reader->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
295  (*this->signalParticipantCreated)(reader, parent);
296  return reader;
297 }
298 
300 (const Scope& scope,
302  const ParticipantConfig& listenerConfig,
303  const ParticipantConfig& informerConfig,
304  ParticipantPtr parent) {
307  (scope, scope.getComponents()[scope.getComponents().size() -1],
308  listenerConfig, informerConfig, callback));
309  method->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
310  (*this->signalParticipantCreated)(method, parent);
311  return method;
312 }
313 
315  const ParticipantConfig &listenerConfig,
316  const ParticipantConfig &informerConfig,
317  ParticipantPtr parent) {
319  new patterns::LocalServer(scope, listenerConfig, informerConfig));
320  server->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
321  (*this->signalParticipantCreated)(server, parent);
322  return server;
323 }
324 
326 (const Scope& scope,
327  const ParticipantConfig& listenerConfig,
328  const ParticipantConfig& informerConfig,
329  ParticipantPtr parent) {
332  (scope, scope.getComponents()[scope.getComponents().size() -1],
333  listenerConfig, informerConfig));
334  method->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
335  (*this->signalParticipantCreated)(method, parent);
336  return method;
337 }
338 
340  const ParticipantConfig &listenerConfig,
341  const ParticipantConfig &informerConfig,
342  ParticipantPtr parent) {
344  new patterns::RemoteServer(scope, listenerConfig, informerConfig));
345  server->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
346  (*this->signalParticipantCreated)(server, parent);
347  return server;
348 }
349 
351  boost::recursive_mutex::scoped_lock lock(configMutex);
352  return defaultConfig;
353 }
354 
356  boost::recursive_mutex::scoped_lock lock(configMutex);
357  this->defaultConfig = config;
358 }
359 
360 vector<InPullConnectorPtr>
362  // Note: getTransports() only returns *enabled* transports.
363  vector<InPullConnectorPtr> connectors;
364  set<ParticipantConfig::Transport> configuredTransports = config.getTransports();
365  for (set<ParticipantConfig::Transport>::const_iterator transportIt =
366  configuredTransports.begin(); transportIt
367  != configuredTransports.end(); ++transportIt) {
368  RSCDEBUG(logger, "Trying to add connector " << *transportIt);
369  try {
370  connectors.push_back(InPullConnectorPtr(getInPullFactory()
371  .createInst(transportIt->getName(),
372  prepareConnectorOptions(*transportIt,
373  DESERIALIZATION,
374  this->logger))));
375  } catch (const exception& e) {
376  throw runtime_error(boost::str(boost::format("Error configuring connector `%1%', in-pull: %2%")
377  % transportIt->getName() % e.what()));
378  }
379  }
380  return connectors;
381 }
382 
383 vector<InPushConnectorPtr>
385  // Note: getTransports() only returns *enabled* transports.
386  vector<InPushConnectorPtr> connectors;
387  set<ParticipantConfig::Transport> configuredTransports = config.getTransports();
388  for (set<ParticipantConfig::Transport>::const_iterator transportIt =
389  configuredTransports.begin(); transportIt
390  != configuredTransports.end(); ++transportIt) {
391  RSCDEBUG(logger, "Trying to add connector " << *transportIt);
392  try {
393  connectors.push_back(InPushConnectorPtr(getInPushFactory()
394  .createInst(transportIt->getName(),
395  prepareConnectorOptions(*transportIt,
396  DESERIALIZATION,
397  this->logger))));
398  } catch (const exception& e) {
399  throw runtime_error(boost::str(boost::format("Error configuring connector `%1%', in-push: %2%")
400  % transportIt->getName() % e.what()));
401  }
402  }
403  return connectors;
404 }
405 
406 vector<OutConnectorPtr>
408  // Note: getTransports() only returns *enabled* transports.
409  vector<OutConnectorPtr> connectors;
410  set<ParticipantConfig::Transport> configuredTransports = config.getTransports();
411  for (set<ParticipantConfig::Transport>::const_iterator transportIt =
412  configuredTransports.begin(); transportIt
413  != configuredTransports.end(); ++transportIt) {
414  RSCDEBUG(logger, "Trying to add connector " << *transportIt);
415  try {
416  connectors.push_back(OutConnectorPtr(getOutFactory()
417  .createInst(transportIt->getName(),
418  prepareConnectorOptions(*transportIt,
419  SERIALIZATION,
420  this->logger))));
421  } catch (const exception& e) {
422  throw runtime_error(boost::str(boost::format("Error configuring connector `%1%', out: %2%")
423  % transportIt->getName() % e.what()));
424  }
425  }
426  return connectors;
427 }
428 
429 rsc::plugins::ManagerPtr Factory::getPluginManager() const {
430  return this->pluginManager;
431 }
432 
433 }
rsc::runtime::Properties getOptions() const
Returns the specified options for the transport.
void registerDefaultEventProcessingStrategies()
Definition: strategies.cpp:44
A derived Method class which can be used to invoke methods on a remote LocalServer object...
Definition: RemoteServer.h:90
bool isRemote(const string &transportName)
Returns true if transportName names a remote transport.
Definition: transports.cpp:229
std::vector< transport::OutConnectorPtr > createOutConnectors(const ParticipantConfig &config)
Definition: Factory.cpp:407
boost::shared_ptr< RemoteMethod > RemoteMethodPtr
Definition: RemoteServer.h:115
boost::shared_ptr< LocalServer > LocalServerPtr
Definition: LocalServer.h:489
boost::shared_ptr< LocalMethod > LocalMethodPtr
Definition: LocalServer.h:387
std::string getName() const
Returns the name of this transport description.
boost::signals2::signal< void(Participant *)> SignalParticipantDestroyed
Definition: Participant.h:45
ParticipantConfig getDefaultParticipantConfig() const
Returns the default configuration for new participants.
Definition: Factory.cpp:350
SignalParticipantDestroyedPtr getSignalParticipantDestroyed()
Definition: Factory.cpp:263
void addTransport(const Transport &transport)
Adds a transport to the list of desired transport mechanisms.
ConverterNames getConverters() const
boost::shared_ptr< Reader > ReaderPtr
Definition: Reader.h:106
A informer to publish data.
Definition: Informer.h:95
ListenerPtr createListener(const Scope &scope, const ParticipantConfig &config=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a new listener for the specified scope.
Definition: Factory.cpp:279
Factory & getFactory()
Returns a factory for client-level RSB objects.
Definition: Factory.cpp:159
ConverterDirection
Definition: Factory.cpp:61
virtual ~Factory()
Definition: Factory.cpp:251
std::vector< transport::InPushConnectorPtr > createInPushConnectors(const ParticipantConfig &config)
Definition: Factory.cpp:384
patterns::LocalServerPtr createLocalServer(const Scope &scope, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a Server object that exposes methods under the scope scope.
Definition: Factory.cpp:314
set< string > getAvailableTransports(unsigned int requiredDirections)
Returns the names of all available transports which support requiredDirections.
Definition: transports.cpp:142
SignalParticipantDestroyedPtr signalParticipantDestroyed
Definition: Factory.h:336
boost::shared_ptr< Listener > ListenerPtr
Definition: Listener.h:155
boost::shared_ptr< OutConnector > OutConnectorPtr
const std::vector< std::string > & getComponents() const
Returns all components of the scope as an ordered list.
Definition: Scope.cpp:139
boost::shared_ptr< Participant > ParticipantPtr
Definition: Participant.h:122
std::vector< transport::InPullConnectorPtr > createInPullConnectors(const ParticipantConfig &config)
Definition: Factory.cpp:361
A derived Method class which can be called from the remote side and implements its behavior by invoki...
Definition: LocalServer.h:366
boost::shared_ptr< ConverterSelectionStrategy< WireType > > Ptr
boost::shared_ptr< IntlCallback > CallbackPtr
Definition: LocalServer.h:71
Description of a desired transport.
The server side of a request-reply-based communication channel.
Definition: LocalServer.h:54
void registerDefaultTransports()
Definition: transports.cpp:53
boost::shared_ptr< SignalParticipantCreated > SignalParticipantCreatedPtr
Definition: Factory.h:60
Factory for RSB user-level domain objects for communication patterns.
Definition: Factory.h:77
A Listener receives events published by rsb::Informer objects by participating in a channel with a su...
Definition: Listener.h:81
A Reader receives events published by a informers by participating in a channel with a suitable scope...
Definition: Reader.h:62
Transport & mutableTransport(const std::string &name)
Returns a single configured transport which can be modified in place.
InformerBasePtr createInformerBase(const Scope &scope, const std::string &dataType="", const ParticipantConfig &config=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates and returns a new Informer that publishes Event s under the Scope scope.
Definition: Factory.cpp:267
boost::shared_ptr< InPushConnector > InPushConnectorPtr
Definition: Listener.h:59
patterns::RemoteServer::RemoteMethodPtr createRemoteMethod(const Scope &scope, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a patterns::RemoteServer::RemoteMethod.
Definition: Factory.cpp:326
static void provideConfigOptions(rsc::config::OptionHandler &handler)
Provides the default configuration options for RSB to the specified handler.
Definition: Factory.cpp:254
ParticipantConfig defaultConfig
Always acquire configMutex before reading or writing the config.
Definition: Factory.h:332
A class describing the configuration of Participant instances.
InPullFactory & getInPullFactory()
Definition: Factory.cpp:32
SignalParticipantCreatedPtr getSignalParticipantCreated()
Definition: Factory.cpp:259
ReaderPtr createReader(const Scope &scope, const ParticipantConfig &config=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a new Reader object for the specified scope.
Definition: Factory.cpp:289
void setDefaultParticipantConfig(const ParticipantConfig &config)
Sets the default config for participants that will be used for every new participant that is created ...
Definition: Factory.cpp:355
boost::shared_ptr< InformerBase > InformerBasePtr
Definition: Informer.h:217
SignalParticipantCreatedPtr signalParticipantCreated
Definition: Factory.h:335
InPushFactory & getInPushFactory()
Definition: Factory.cpp:37
boost::shared_ptr< RemoteServer > RemoteServerPtr
Definition: RemoteServer.h:311
OutFactory & getOutFactory()
Definition: Factory.cpp:42
void registerDefaultConverters()
Definition: converters.cpp:50
rsc::plugins::ManagerPtr pluginManager
Definition: Factory.h:327
boost::recursive_mutex configMutex
Definition: Factory.h:333
patterns::LocalServer::LocalMethodPtr createLocalMethod(const Scope &scope, patterns::LocalServer::CallbackPtr callback, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a patterns::LocalServer::LocalMethod.
Definition: Factory.cpp:300
boost::shared_ptr< SignalParticipantDestroyed > SignalParticipantDestroyedPtr
Definition: Participant.h:48
boost::shared_ptr< InPullConnector > InPullConnectorPtr
Scope is a descriptor for a hierarchical channel of the unified bus.
Definition: Scope.h:46
std::set< Transport > getTransports(bool includeDisabled=false) const
Returns the set of desired transports for a participant.
rsc::plugins::ManagerPtr getPluginManager() const
Returns the plugin manager instance used by the RSB core.
Definition: Factory.cpp:429
patterns::RemoteServerPtr createRemoteServer(const Scope &scope, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a RemoteServer object for the server at scope scope.
Definition: Factory.cpp:339
Factory * factoryWhileLoadingPlugins
Definition: Factory.cpp:157
boost::signals2::signal< void(ParticipantPtr, ParticipantPtr)> SignalParticipantCreated
Definition: Factory.h:59
The client side of a request-reply-based communication channel.
Definition: RemoteServer.h:60