RSB  0.14.2
Factory.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is a part of the RSB project.
4  *
5  * Copyright (C) 2011 by Johannes Wienke <jwienke at techfak dot uni-bielefeld dot de>
6  * Copyright (C) 2012, 2013, 2014, 2016 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
7  *
8  * This file may be licensed under the terms of the
9  * GNU Lesser General Public License Version 3 (the ``LGPL''),
10  * or (at your option) any later version.
11  *
12  * Software distributed under the License is distributed
13  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
14  * express or implied. See the LGPL for the specific language
15  * governing rights and limitations.
16  *
17  * You should have received a copy of the LGPL along with this
18  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
19  * or write to the Free Software Foundation, Inc.,
20  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21  *
22  * The development of this software was supported by:
23  * CoR-Lab, Research Institute for Cognition and Robotics
24  * Bielefeld University
25  *
26  * ============================================================ */
27 
28 #include "Factory.h"
29 
30 #include <boost/filesystem/fstream.hpp>
31 
32 #include <rsc/config/Configuration.h>
33 #include <rsc/config/Environment.h>
34 
35 #include <rsc/logging/OptionBasedConfigurator.h>
36 
37 #include <rsc/plugins/Manager.h>
38 #include <rsc/plugins/Configurator.h>
39 
40 #include <rsb/Version.h>
41 
43 
44 #include "converter/converters.h"
45 #include "converter/Repository.h"
47 
48 #include "transport/transports.h"
49 
50 using namespace std;
51 
52 using namespace rsc::config;
53 using namespace rsc::logging;
54 using namespace rsc::runtime;
55 
56 using namespace rsb::converter;
57 using namespace rsb::transport;
58 
59 namespace { // anonymous namespace for file-local helper functions.
60 
62  SERIALIZATION,
63  DESERIALIZATION
64 };
65 
66 template<typename C>
67 std::map<typename C::value_type::first_type,
68  typename C::value_type::second_type>
69 converterSelectionToMap(const C& container, ConverterDirection direction) {
70  typedef typename C::value_type::first_type first_type;
71  typedef typename C::value_type::second_type second_type;
72 
73  typedef typename C::const_iterator const_iterator;
74 
75  std::map<first_type, second_type> result;
76  for (const_iterator it = container.begin(); it != container.end(); ++it) {
77  switch (direction) {
78  case DESERIALIZATION:
79  result[it->first] = it->second;
80  break;
81  case SERIALIZATION:
82  if (result.find(it->second) != result.end()) {
83  throw std::invalid_argument(
84  boost::str(
85  boost::format(
86  "Multiple wire-schemas (%1%, %2%) selected for data-type %3%.\n"
87  "Probably you wrote the lines transport.<name>.cpp.%1% = %3% "
88  "and transport.<name>.cpp.%2% = %3% in your rsb config. One of "
89  "these should be deleted.")
90  % it->first
91  % result[it->second]
92  % it->second
93  )
94  );
95  }
96  result[it->second] = it->first;
97  break;
98  default:
99  assert(false);
100  throw std::runtime_error("got unexpected serialization direction");
101  }
102  }
103  return result;
104 }
105 
106 Properties
107 prepareConnectorOptions(const rsb::ParticipantConfig::Transport& config,
108  ConverterDirection direction,
109  rsc::logging::LoggerPtr logger) {
110  Properties options = config.getOptions();
111  RSCDEBUG(logger, "Supplied connector options " << options);
112 
113  // For local transport, we do not mess with the converter
114  // configuration since it is not used (or even touched) anyway.
115  //
116  // For remote transports, we build a converter selection strategy
117  // - suitable for direction - and put it into the "converters"
118  // property of the transport configuration.
119  //
120  // Note that config.getConverters() only returns /converter
121  // disambiguations/. These are used to guide the actual converter
122  // selection in converterSelectionToMap().
123  if (rsb::transport::isRemote(config.getName()) && !options.has("converters")) {
124  RSCDEBUG(logger, "Converter configuration for transport `"
125  << config.getName() << "': " << config.getConverters());
126 
127  ConverterSelectionStrategy<string>::Ptr converters; // TODO we should not have to know the transport's wire-type here
128  switch (direction) {
129  case SERIALIZATION:
130  converters
131  = converterRepository<string>() // TODO wire-type
132  ->getConvertersForSerialization
133  (converterSelectionToMap(config.getConverters(), direction));
134  break;
135  case DESERIALIZATION:
136  converters
137  = converterRepository<string>() // TODO wire-type
138  ->getConvertersForDeserialization
139  (converterSelectionToMap(config.getConverters(), direction));
140  break;
141  default:
142  assert(false);
143  throw std::runtime_error("got unexpected serialization direction");
144  }
145  RSCDEBUG(logger, "Selected converters for transport `"
146  << config.getName() << "': " << converters);
147  options["converters"] = converters;
148  }
149 
150  return options;
151 }
152 
153 }
154 
155 namespace rsb {
156 
158 
160  if (factoryWhileLoadingPlugins) {
162  } else {
163  static Factory factory;
164  return factory;
165  }
166 }
167 
169  ConfigDebugPrinter(const std::string& phase, bool enabled)
170  : phase(phase), enabled(enabled) {
171  if (this->enabled){
172  std::cerr << "Starting processing " << this->phase << " configuration"
173  << std::endl << std::endl;
174  }
175  }
176 
178  if (this->enabled) {
179  std::cerr << std::endl
180  << "Finished processing " << this->phase << " configuration"
181  << std::endl << std::endl;
182  }
183  }
184 
185  std::string phase;
186  bool enabled;
187 };
188 
189 Factory::Factory() :
190  logger(Logger::getLogger("rsb.Factory")),
191  pluginManager(new rsc::plugins::Manager()),
192  signalParticipantCreated(new SignalParticipantCreated),
193  signalParticipantDestroyed(new SignalParticipantDestroyed) {
194 
195  bool debugConfig = getEnvironmentVariable("__CONFIG_DEBUG").get();
196 
197  // Configure RSC-based logging.
198  {
199  ConfigDebugPrinter printer("RSC-based logging", debugConfig);
200 
201  rsc::logging::OptionBasedConfigurator configurator;
202  configure(configurator, "rsb.conf", "RSC_", 0, 0, false, Version::installPrefix());
203  }
204 
205  // Register default implementation for all extension points.
206  RSCINFO(this->logger, "Registering default implementations");
210 
211  // Configure plugin path and load plugins to register additional
212  // implementations for extension points.
213  //
214  // We use the following default plugin path:
215  // 1. $HOME/.$RSB_PLUGIN_PATH_SUFFIX
216  // 2. $libdir/$RSB_PLUGIN_PATH_SUFFIX
217  RSCINFO(this->logger, "Processing plugin configuration");
218  try {
219  ConfigDebugPrinter printer("plugin", debugConfig);
220 
221  factoryWhileLoadingPlugins = this;
222 
223  vector<boost::filesystem::path> defaultPath;
224  // It may be impossible to determine a home directory for the
225  // current user. Warn, but don't throw.
226  try {
227  defaultPath.push_back(userHomeDirectory()
228  / ("." + Version::buildPluginPathSuffix()));
229  } catch (const runtime_error& e) {
230  RSCWARN(this->logger,
231  "Failed to determine user-specific plugin directory: "
232  << e.what());
233  }
234  defaultPath.push_back(Version::libdir() / Version::buildPluginPathSuffix());
235  rsc::plugins::Configurator configurator(pluginManager, defaultPath);
236  provideConfigOptions(configurator);
237  configurator.execute(true);
238  } catch (...) {
239  factoryWhileLoadingPlugins = NULL;
240  throw;
241  }
242  factoryWhileLoadingPlugins = NULL;
243 
244  // Setup default participant config
245  //
246  // Collect all available connector implementations:
247  // + In-push
248  // + In-pull
249  // + Out
250  // Disable discovered connectors with the exception of the
251  // socket transport.
252  set<string> availableTransports = getAvailableTransports(DIRECTION_IN_PUSH
254  | DIRECTION_OUT);
255 
257  for (set<string>::const_iterator it = availableTransports.begin();
258  it != availableTransports.end(); ++it) {
259  this->defaultConfig.addTransport(ParticipantConfig::Transport(*it, *it == "socket"));
260  }
261 
262  // If there is only one transport, we can blindly enable it since
263  // the user could end up without any enabled transport otherwise.
264  if (this->defaultConfig.getTransports().size() == 1) {
265  string name = this->defaultConfig.getTransports().begin()->getName();
266  this->defaultConfig.mutableTransport(name).setEnabled(true);
267  }
268 
269  // Merge with user configuration (configuration files, environment
270  // variables)
271  {
272  ConfigDebugPrinter printer("default participant", debugConfig);
273 
275  }
276  if (debugConfig) {
277  std::cerr << "Default participant configuration" << std::endl
278  << defaultConfig << std::endl << std::endl;
279  }
280 
281  // Issue a warning if the combination of available transport
282  // implementations and user configuration leads to no enabled
283  // transports.
284  if (this->defaultConfig.getTransports().empty()) {
285  RSCWARN(logger, "No transports are enabled. This is probably a"
286  " configuration error or an internal RSB error.");
287  }
288 
289  RSCDEBUG(logger, "Default config " << defaultConfig);
290 }
291 
293 }
294 
295 void Factory::provideConfigOptions(OptionHandler& handler) {
296  configure(handler, "rsb.conf", "RSB_", 0, 0, true,
297  Version::installPrefix());
298 }
299 
301  return this->signalParticipantCreated;
302 }
303 
305  return this->signalParticipantDestroyed;
306 }
307 
309  const string& dataType,
310  const ParticipantConfig& config,
311  ParticipantPtr parent) {
312  InformerBasePtr informer(
313  new InformerBase(createOutConnectors(config), scope, config, dataType));
314  informer->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
315  (*this->signalParticipantCreated)(informer, parent);
316  return informer;
317 }
318 
319 
321  const ParticipantConfig& config,
322  ParticipantPtr parent) {
323  ListenerPtr listener(
324  new Listener(createInPushConnectors(config), scope, config));
325  listener->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
326  (*this->signalParticipantCreated)(listener, parent);
327  return listener;
328 }
329 
331  const ParticipantConfig& config,
332  ParticipantPtr parent) {
333  ReaderPtr reader(
334  new Reader(createInPullConnectors(config), scope, config));
335  reader->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
336  (*this->signalParticipantCreated)(reader, parent);
337  return reader;
338 }
339 
341 (const Scope& scope,
343  const ParticipantConfig& listenerConfig,
344  const ParticipantConfig& informerConfig,
345  ParticipantPtr parent) {
348  (scope, scope.getComponents()[scope.getComponents().size() -1],
349  listenerConfig, informerConfig, callback));
350  method->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
351  (*this->signalParticipantCreated)(method, parent);
352  return method;
353 }
354 
356  const ParticipantConfig &listenerConfig,
357  const ParticipantConfig &informerConfig,
358  ParticipantPtr parent) {
360  new patterns::LocalServer(scope, listenerConfig, informerConfig));
361  server->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
362  (*this->signalParticipantCreated)(server, parent);
363  return server;
364 }
365 
367 (const Scope& scope,
368  const ParticipantConfig& listenerConfig,
369  const ParticipantConfig& informerConfig,
370  ParticipantPtr parent) {
373  (scope, scope.getComponents()[scope.getComponents().size() -1],
374  listenerConfig, informerConfig));
375  method->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
376  (*this->signalParticipantCreated)(method, parent);
377  return method;
378 }
379 
381  const ParticipantConfig &listenerConfig,
382  const ParticipantConfig &informerConfig,
383  ParticipantPtr parent) {
385  new patterns::RemoteServer(scope, listenerConfig, informerConfig));
386  server->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
387  (*this->signalParticipantCreated)(server, parent);
388  return server;
389 }
390 
392  boost::recursive_mutex::scoped_lock lock(configMutex);
393  return defaultConfig;
394 }
395 
397  boost::recursive_mutex::scoped_lock lock(configMutex);
398  this->defaultConfig = config;
399 }
400 
401 vector<InPullConnectorPtr>
403  // Note: getTransports() only returns *enabled* transports.
404  vector<InPullConnectorPtr> connectors;
405  set<ParticipantConfig::Transport> configuredTransports = config.getTransports();
406  for (set<ParticipantConfig::Transport>::const_iterator transportIt =
407  configuredTransports.begin(); transportIt
408  != configuredTransports.end(); ++transportIt) {
409  RSCDEBUG(logger, "Trying to add connector " << *transportIt);
410  try {
411  connectors.push_back(InPullConnectorPtr(getInPullFactory()
412  .createInst(transportIt->getName(),
413  prepareConnectorOptions(*transportIt,
414  DESERIALIZATION,
415  this->logger))));
416  } catch (const exception& e) {
417  throw runtime_error(boost::str(boost::format("Error configuring connector `%1%', in-pull: %2%")
418  % transportIt->getName() % e.what()));
419  }
420  }
421  return connectors;
422 }
423 
424 vector<InPushConnectorPtr>
426  // Note: getTransports() only returns *enabled* transports.
427  vector<InPushConnectorPtr> connectors;
428  set<ParticipantConfig::Transport> configuredTransports = config.getTransports();
429  for (set<ParticipantConfig::Transport>::const_iterator transportIt =
430  configuredTransports.begin(); transportIt
431  != configuredTransports.end(); ++transportIt) {
432  RSCDEBUG(logger, "Trying to add connector " << *transportIt);
433  try {
434  connectors.push_back(InPushConnectorPtr(getInPushFactory()
435  .createInst(transportIt->getName(),
436  prepareConnectorOptions(*transportIt,
437  DESERIALIZATION,
438  this->logger))));
439  } catch (const exception& e) {
440  throw runtime_error(boost::str(boost::format("Error configuring connector `%1%', in-push: %2%")
441  % transportIt->getName() % e.what()));
442  }
443  }
444  return connectors;
445 }
446 
447 vector<OutConnectorPtr>
449  // Note: getTransports() only returns *enabled* transports.
450  vector<OutConnectorPtr> connectors;
451  set<ParticipantConfig::Transport> configuredTransports = config.getTransports();
452  for (set<ParticipantConfig::Transport>::const_iterator transportIt =
453  configuredTransports.begin(); transportIt
454  != configuredTransports.end(); ++transportIt) {
455  RSCDEBUG(logger, "Trying to add connector " << *transportIt);
456  try {
457  connectors.push_back(OutConnectorPtr(getOutFactory()
458  .createInst(transportIt->getName(),
459  prepareConnectorOptions(*transportIt,
460  SERIALIZATION,
461  this->logger))));
462  } catch (const exception& e) {
463  throw runtime_error(boost::str(boost::format("Error configuring connector `%1%', out: %2%")
464  % transportIt->getName() % e.what()));
465  }
466  }
467  return connectors;
468 }
469 
470 rsc::plugins::ManagerPtr Factory::getPluginManager() const {
471  return this->pluginManager;
472 }
473 
474 }
rsc::runtime::Properties getOptions() const
Returns the specified options for the transport.
void registerDefaultEventProcessingStrategies()
Definition: strategies.cpp:44
A derived Method class which can be used to invoke methods on a remote LocalServer object...
Definition: RemoteServer.h:90
bool isRemote(const string &transportName)
Returns true if transportName names a remote transport.
Definition: transports.cpp:229
std::vector< transport::OutConnectorPtr > createOutConnectors(const ParticipantConfig &config)
Definition: Factory.cpp:448
boost::shared_ptr< RemoteMethod > RemoteMethodPtr
Definition: RemoteServer.h:115
boost::shared_ptr< LocalServer > LocalServerPtr
Definition: LocalServer.h:489
boost::shared_ptr< LocalMethod > LocalMethodPtr
Definition: LocalServer.h:387
std::string getName() const
Returns the name of this transport description.
STL namespace.
boost::signals2::signal< void(Participant *)> SignalParticipantDestroyed
Definition: Participant.h:45
ParticipantConfig getDefaultParticipantConfig() const
Returns the default configuration for new participants.
Definition: Factory.cpp:391
ConfigDebugPrinter(const std::string &phase, bool enabled)
Definition: Factory.cpp:169
SignalParticipantDestroyedPtr getSignalParticipantDestroyed()
Definition: Factory.cpp:304
void addTransport(const Transport &transport)
Adds a transport to the list of desired transport mechanisms.
ConverterNames getConverters() const
boost::shared_ptr< Reader > ReaderPtr
Definition: Reader.h:106
A informer to publish data.
Definition: Informer.h:95
ListenerPtr createListener(const Scope &scope, const ParticipantConfig &config=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a new listener for the specified scope.
Definition: Factory.cpp:320
Factory & getFactory()
Returns a factory for client-level RSB objects.
Definition: Factory.cpp:159
ConverterDirection
Definition: Factory.cpp:61
virtual ~Factory()
Definition: Factory.cpp:292
std::vector< transport::InPushConnectorPtr > createInPushConnectors(const ParticipantConfig &config)
Definition: Factory.cpp:425
patterns::LocalServerPtr createLocalServer(const Scope &scope, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a Server object that exposes methods under the scope scope.
Definition: Factory.cpp:355
set< string > getAvailableTransports(unsigned int requiredDirections)
Returns the names of all available transports which support requiredDirections.
Definition: transports.cpp:142
SignalParticipantDestroyedPtr signalParticipantDestroyed
Definition: Factory.h:336
boost::shared_ptr< Listener > ListenerPtr
Definition: Listener.h:155
boost::shared_ptr< OutConnector > OutConnectorPtr
const std::vector< std::string > & getComponents() const
Returns all components of the scope as an ordered list.
Definition: Scope.cpp:139
boost::shared_ptr< Participant > ParticipantPtr
Definition: Participant.h:122
std::vector< transport::InPullConnectorPtr > createInPullConnectors(const ParticipantConfig &config)
Definition: Factory.cpp:402
A derived Method class which can be called from the remote side and implements its behavior by invoki...
Definition: LocalServer.h:366
boost::shared_ptr< ConverterSelectionStrategy< WireType > > Ptr
boost::shared_ptr< IntlCallback > CallbackPtr
Definition: LocalServer.h:71
Description of a desired transport.
The server side of a request-reply-based communication channel.
Definition: LocalServer.h:54
void registerDefaultTransports()
Definition: transports.cpp:53
boost::shared_ptr< SignalParticipantCreated > SignalParticipantCreatedPtr
Definition: Factory.h:60
Factory for RSB user-level domain objects for communication patterns.
Definition: Factory.h:77
A Listener receives events published by rsb::Informer objects by participating in a channel with a su...
Definition: Listener.h:81
A Reader receives events published by a informers by participating in a channel with a suitable scope...
Definition: Reader.h:62
Transport & mutableTransport(const std::string &name)
Returns a single configured transport which can be modified in place.
InformerBasePtr createInformerBase(const Scope &scope, const std::string &dataType="", const ParticipantConfig &config=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates and returns a new Informer that publishes Event s under the Scope scope.
Definition: Factory.cpp:308
boost::shared_ptr< InPushConnector > InPushConnectorPtr
Definition: Listener.h:59
patterns::RemoteServer::RemoteMethodPtr createRemoteMethod(const Scope &scope, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a patterns::RemoteServer::RemoteMethod.
Definition: Factory.cpp:367
static void provideConfigOptions(rsc::config::OptionHandler &handler)
Provides the default configuration options for RSB to the specified handler.
Definition: Factory.cpp:295
ParticipantConfig defaultConfig
Always acquire configMutex before reading or writing the config.
Definition: Factory.h:332
A class describing the configuration of Participant instances.
InPullFactory & getInPullFactory()
Definition: Factory.cpp:32
SignalParticipantCreatedPtr getSignalParticipantCreated()
Definition: Factory.cpp:300
ReaderPtr createReader(const Scope &scope, const ParticipantConfig &config=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a new Reader object for the specified scope.
Definition: Factory.cpp:330
void setDefaultParticipantConfig(const ParticipantConfig &config)
Sets the default config for participants that will be used for every new participant that is created ...
Definition: Factory.cpp:396
boost::shared_ptr< InformerBase > InformerBasePtr
Definition: Informer.h:217
SignalParticipantCreatedPtr signalParticipantCreated
Definition: Factory.h:335
InPushFactory & getInPushFactory()
Definition: Factory.cpp:37
boost::shared_ptr< RemoteServer > RemoteServerPtr
Definition: RemoteServer.h:311
rsc::logging::LoggerPtr logger
Definition: Factory.h:325
OutFactory & getOutFactory()
Definition: Factory.cpp:42
void registerDefaultConverters()
Definition: converters.cpp:50
rsc::plugins::ManagerPtr pluginManager
Definition: Factory.h:327
boost::recursive_mutex configMutex
Definition: Factory.h:333
patterns::LocalServer::LocalMethodPtr createLocalMethod(const Scope &scope, patterns::LocalServer::CallbackPtr callback, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a patterns::LocalServer::LocalMethod.
Definition: Factory.cpp:341
boost::shared_ptr< SignalParticipantDestroyed > SignalParticipantDestroyedPtr
Definition: Participant.h:48
boost::shared_ptr< InPullConnector > InPullConnectorPtr
Scope is a descriptor for a hierarchical channel of the unified bus.
Definition: Scope.h:46
std::set< Transport > getTransports(bool includeDisabled=false) const
Returns the set of desired transports for a participant.
rsc::plugins::ManagerPtr getPluginManager() const
Returns the plugin manager instance used by the RSB core.
Definition: Factory.cpp:470
patterns::RemoteServerPtr createRemoteServer(const Scope &scope, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a RemoteServer object for the server at scope scope.
Definition: Factory.cpp:380
Factory * factoryWhileLoadingPlugins
Definition: Factory.cpp:157
boost::signals2::signal< void(ParticipantPtr, ParticipantPtr)> SignalParticipantCreated
Definition: Factory.h:59
The client side of a request-reply-based communication channel.
Definition: RemoteServer.h:60