RSB  0.12.2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Factory.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is a part of the RSB project.
4  *
5  * Copyright (C) 2011 by Johannes Wienke <jwienke at techfak dot uni-bielefeld dot de>
6  * Copyright (C) 2012, 2013, 2014 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
7  *
8  * This file may be licensed under the terms of the
9  * GNU Lesser General Public License Version 3 (the ``LGPL''),
10  * or (at your option) any later version.
11  *
12  * Software distributed under the License is distributed
13  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
14  * express or implied. See the LGPL for the specific language
15  * governing rights and limitations.
16  *
17  * You should have received a copy of the LGPL along with this
18  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
19  * or write to the Free Software Foundation, Inc.,
20  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21  *
22  * The development of this software was supported by:
23  * CoR-Lab, Research Institute for Cognition and Robotics
24  * Bielefeld University
25  *
26  * ============================================================ */
27 
28 #include "Factory.h"
29 
30 #include <boost/filesystem/fstream.hpp>
31 
32 #include <rsc/config/Configuration.h>
33 #include <rsc/config/Environment.h>
34 
35 #include <rsc/logging/OptionBasedConfigurator.h>
36 
37 #include <rsc/plugins/Manager.h>
38 #include <rsc/plugins/Configurator.h>
39 
40 #include <rsb/Version.h>
41 
43 
44 #include "converter/converters.h"
45 #include "converter/Repository.h"
47 
48 #include "transport/transports.h"
49 
50 using namespace std;
51 
52 using namespace rsc::config;
53 using namespace rsc::logging;
54 using namespace rsc::runtime;
55 
56 using namespace rsb::converter;
57 using namespace rsb::transport;
58 
59 namespace { // anonymous namespace for file-local helper functions.
60 
62  SERIALIZATION,
63  DESERIALIZATION
64 };
65 
66 template<typename C>
67 std::map<typename C::value_type::first_type,
68  typename C::value_type::second_type>
69 converterSelectionToMap(const C& container, ConverterDirection direction) {
70  typedef typename C::value_type::first_type first_type;
71  typedef typename C::value_type::second_type second_type;
72 
73  typedef typename C::const_iterator const_iterator;
74 
75  std::map<first_type, second_type> result;
76  for (const_iterator it = container.begin(); it != container.end(); ++it) {
77  switch (direction) {
78  case DESERIALIZATION:
79  result[it->first] = it->second;
80  break;
81  case SERIALIZATION:
82  if (result.find(it->second) != result.end()) {
83  throw std::invalid_argument(
84  boost::str(
85  boost::format(
86  "Multiple wire-schemas (%1%, %2%) selected for data-type %3%.\n"
87  "Probably you wrote the lines transport.<name>.cpp.%1% = %3% "
88  "and transport.<name>.cpp.%2% = %3% in your rsb config. One of "
89  "these should be deleted.")
90  % it->first
91  % result[it->second]
92  % it->second
93  )
94  );
95  }
96  result[it->second] = it->first;
97  break;
98  default:
99  assert(false);
100  throw std::runtime_error("got unexpected serialization direction");
101  }
102  }
103  return result;
104 }
105 
106 Properties
107 prepareConnectorOptions(const rsb::ParticipantConfig::Transport& config,
108  ConverterDirection direction,
109  rsc::logging::LoggerPtr logger) {
110  Properties options = config.getOptions();
111  RSCDEBUG(logger, "Supplied connector options " << options);
112 
113  // For local transport, we do not mess with the converter
114  // configuration since it is not used (or even touched) anyway.
115  //
116  // For remote transports, we build a converter selection strategy
117  // - suitable for direction - and put it into the "converters"
118  // property of the transport configuration.
119  //
120  // Note that config.getConverters() only returns /converter
121  // disambiguations/. These are used to guide the actual converter
122  // selection in converterSelectionToMap().
123  if (rsb::transport::isRemote(config.getName()) && !options.has("converters")) {
124  RSCDEBUG(logger, "Converter configuration for transport `"
125  << config.getName() << "': " << config.getConverters());
126 
127  ConverterSelectionStrategy<string>::Ptr converters; // TODO we should not have to know the transport's wire-type here
128  switch (direction) {
129  case SERIALIZATION:
130  converters
131  = converterRepository<string>() // TODO wire-type
132  ->getConvertersForSerialization
133  (converterSelectionToMap(config.getConverters(), direction));
134  break;
135  case DESERIALIZATION:
136  converters
137  = converterRepository<string>() // TODO wire-type
138  ->getConvertersForDeserialization
139  (converterSelectionToMap(config.getConverters(), direction));
140  break;
141  default:
142  assert(false);
143  throw std::runtime_error("got unexpected serialization direction");
144  }
145  RSCDEBUG(logger, "Selected converters for transport `"
146  << config.getName() << "': " << converters);
147  options["converters"] = converters;
148  }
149 
150  return options;
151 }
152 
153 }
154 
155 namespace rsb {
156 
158 
162  } else {
163  static Factory factory;
164  return factory;
165  }
166 }
167 
168 Factory::Factory() :
169  logger(Logger::getLogger("rsb.Factory")),
170  pluginManager(new rsc::plugins::Manager()),
171  signalParticipantCreated(new SignalParticipantCreated),
172  signalParticipantDestroyed(new SignalParticipantDestroyed) {
173 
174  // Configure RSC-based logging.
175  {
176  rsc::logging::OptionBasedConfigurator configurator;
177  configure(configurator, "rsb.conf", "RSC_", 0, 0, false, Version::installPrefix());
178  }
179 
180  // Register default implementation for all extension points.
181  RSCINFO(this->logger, "Registering default implementations");
185 
186  // Configure plugin path and load plugins to register additional
187  // implementations for extension points.
188  //
189  // We use the following default plugin path:
190  // 1. $HOME/.$RSB_PLUGIN_PATH_SUFFIX
191  // 2. $libdir/$RSB_PLUGIN_PATH_SUFFIX
192  RSCINFO(this->logger, "Processing plugin configuration");
193  {
195 
196  vector<boost::filesystem::path> defaultPath;
197  // It may be impossible to determine a home directory for the
198  // current user. Warn, but don't throw.
199  try {
200  defaultPath.push_back(userHomeDirectory()
201  / ("." + Version::buildPluginPathSuffix()));
202  } catch (const runtime_error& e) {
203  RSCWARN(this->logger,
204  "Failed to determine user-specific plugin directory: "
205  << e.what());
206  }
207  defaultPath.push_back(Version::libdir() / Version::buildPluginPathSuffix());
208  rsc::plugins::Configurator configurator(pluginManager, defaultPath);
209  provideConfigOptions(configurator);
210  }
211  factoryWhileLoadingPlugins = NULL; // TODO unwind-protect
212 
213  // Setup default participant config
214  //
215  // Collect all available connector implementations:
216  // + In-push
217  // + In-pull
218  // + Out
219  // Disable discovered connectors with the exception of the
220  // socket transport.
221  set<string> availableTransports = getAvailableTransports(DIRECTION_IN_PUSH | DIRECTION_IN_PULL | DIRECTION_OUT);
222 
224  for (set<string>::const_iterator it = availableTransports.begin();
225  it != availableTransports.end(); ++it) {
226  this->defaultConfig.addTransport(ParticipantConfig::Transport(*it, *it == "socket"));
227  }
228 
229  // If there is only one transport, we can blindly enable it since
230  // the user could end up without any enabled transport otherwise.
231  if (this->defaultConfig.getTransports().size() == 1) {
232  string name = this->defaultConfig.getTransports().begin()->getName();
233  this->defaultConfig.mutableTransport(name).setEnabled(true);
234  }
235 
236  // Merge with user configuration (configuration files, environment
237  // variables)
239 
240  // Issue a warning if the combination of available transport
241  // implementations and user configuration leads to no enabled
242  // transports.
243  if (this->defaultConfig.getTransports().empty()) {
244  RSCWARN(logger, "No transports are enabled. This is probably a configuration error or an internal RSB error.");
245  }
246 
247  RSCDEBUG(logger, "Default config " << defaultConfig);
248 }
249 
251 }
252 
253 void Factory::provideConfigOptions(OptionHandler& handler) {
254  configure(handler, "rsb.conf", "RSB_", 0, 0, true,
255  Version::installPrefix());
256 }
257 
259  return this->signalParticipantCreated;
260 }
261 
263  return this->signalParticipantDestroyed;
264 }
265 
267  const string& dataType,
268  const ParticipantConfig& config,
269  ParticipantPtr parent) {
270  InformerBasePtr informer(
271  new InformerBase(createOutConnectors(config), scope, config, dataType));
272  informer->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
273  (*this->signalParticipantCreated)(informer, parent);
274  return informer;
275 }
276 
277 
279  const ParticipantConfig& config,
280  ParticipantPtr parent) {
281  ListenerPtr listener(
282  new Listener(createInPushConnectors(config), scope, config));
283  listener->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
284  (*this->signalParticipantCreated)(listener, parent);
285  return listener;
286 }
287 
289  const ParticipantConfig& config,
290  ParticipantPtr parent) {
291  ReaderPtr reader(
292  new Reader(createInPullConnectors(config), scope, config));
293  reader->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
294  (*this->signalParticipantCreated)(reader, parent);
295  return reader;
296 }
297 
299 (const Scope& scope,
301  const ParticipantConfig& listenerConfig,
302  const ParticipantConfig& informerConfig,
303  ParticipantPtr parent) {
306  (scope, scope.getComponents()[scope.getComponents().size() -1],
307  listenerConfig, informerConfig, callback));
308  method->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
309  (*this->signalParticipantCreated)(method, parent);
310  return method;
311 }
312 
314  const ParticipantConfig &listenerConfig,
315  const ParticipantConfig &informerConfig,
316  ParticipantPtr parent) {
318  new patterns::LocalServer(scope, listenerConfig, informerConfig));
319  server->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
320  (*this->signalParticipantCreated)(server, parent);
321  return server;
322 }
323 
325 (const Scope& scope,
326  const ParticipantConfig& listenerConfig,
327  const ParticipantConfig& informerConfig,
328  ParticipantPtr parent) {
331  (scope, scope.getComponents()[scope.getComponents().size() -1],
332  listenerConfig, informerConfig));
333  method->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
334  (*this->signalParticipantCreated)(method, parent);
335  return method;
336 }
337 
339  const ParticipantConfig &listenerConfig,
340  const ParticipantConfig &informerConfig,
341  ParticipantPtr parent) {
343  new patterns::RemoteServer(scope, listenerConfig, informerConfig));
344  server->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
345  (*this->signalParticipantCreated)(server, parent);
346  return server;
347 }
348 
350  boost::recursive_mutex::scoped_lock lock(configMutex);
351  return defaultConfig;
352 }
353 
355  boost::recursive_mutex::scoped_lock lock(configMutex);
356  this->defaultConfig = config;
357 }
358 
359 vector<InPullConnectorPtr>
361  // Note: getTransports() only returns *enabled* transports.
362  vector<InPullConnectorPtr> connectors;
363  set<ParticipantConfig::Transport> configuredTransports = config.getTransports();
364  for (set<ParticipantConfig::Transport>::const_iterator transportIt =
365  configuredTransports.begin(); transportIt
366  != configuredTransports.end(); ++transportIt) {
367  RSCDEBUG(logger, "Trying to add connector " << *transportIt);
368  try {
369  connectors.push_back(InPullConnectorPtr(getInPullFactory()
370  .createInst(transportIt->getName(),
371  prepareConnectorOptions(*transportIt,
372  DESERIALIZATION,
373  this->logger))));
374  } catch (const exception& e) {
375  throw runtime_error(boost::str(boost::format("Error configuring connector `%1%', in-pull: %2%")
376  % transportIt->getName() % e.what()));
377  }
378  }
379  return connectors;
380 }
381 
382 vector<InPushConnectorPtr>
384  // Note: getTransports() only returns *enabled* transports.
385  vector<InPushConnectorPtr> connectors;
386  set<ParticipantConfig::Transport> configuredTransports = config.getTransports();
387  for (set<ParticipantConfig::Transport>::const_iterator transportIt =
388  configuredTransports.begin(); transportIt
389  != configuredTransports.end(); ++transportIt) {
390  RSCDEBUG(logger, "Trying to add connector " << *transportIt);
391  try {
392  connectors.push_back(InPushConnectorPtr(getInPushFactory()
393  .createInst(transportIt->getName(),
394  prepareConnectorOptions(*transportIt,
395  DESERIALIZATION,
396  this->logger))));
397  } catch (const exception& e) {
398  throw runtime_error(boost::str(boost::format("Error configuring connector `%1%', in-push: %2%")
399  % transportIt->getName() % e.what()));
400  }
401  }
402  return connectors;
403 }
404 
405 vector<OutConnectorPtr>
407  // Note: getTransports() only returns *enabled* transports.
408  vector<OutConnectorPtr> connectors;
409  set<ParticipantConfig::Transport> configuredTransports = config.getTransports();
410  for (set<ParticipantConfig::Transport>::const_iterator transportIt =
411  configuredTransports.begin(); transportIt
412  != configuredTransports.end(); ++transportIt) {
413  RSCDEBUG(logger, "Trying to add connector " << *transportIt);
414  try {
415  connectors.push_back(OutConnectorPtr(getOutFactory()
416  .createInst(transportIt->getName(),
417  prepareConnectorOptions(*transportIt,
418  SERIALIZATION,
419  this->logger))));
420  } catch (const exception& e) {
421  throw runtime_error(boost::str(boost::format("Error configuring connector `%1%', out: %2%")
422  % transportIt->getName() % e.what()));
423  }
424  }
425  return connectors;
426 }
427 
428 rsc::plugins::ManagerPtr Factory::getPluginManager() const {
429  return this->pluginManager;
430 }
431 
432 }
rsc::runtime::Properties getOptions() const
Returns the specified options for the transport.
void registerDefaultEventProcessingStrategies()
Definition: strategies.cpp:44
A derived Method class which can be used to invoke methods on a remote LocalServer object...
Definition: RemoteServer.h:89
bool isRemote(const string &transportName)
Returns true if transportName names a remote transport.
Definition: transports.cpp:229
std::vector< transport::OutConnectorPtr > createOutConnectors(const ParticipantConfig &config)
Definition: Factory.cpp:406
boost::shared_ptr< RemoteMethod > RemoteMethodPtr
Definition: RemoteServer.h:114
boost::shared_ptr< LocalServer > LocalServerPtr
Definition: LocalServer.h:486
boost::shared_ptr< LocalMethod > LocalMethodPtr
Definition: LocalServer.h:387
std::string getName() const
Returns the name of this transport description.
boost::signals2::signal< void(Participant *)> SignalParticipantDestroyed
Definition: Participant.h:44
ParticipantConfig getDefaultParticipantConfig() const
Returns the default configuration for new participants.
Definition: Factory.cpp:349
SignalParticipantDestroyedPtr getSignalParticipantDestroyed()
Definition: Factory.cpp:262
void addTransport(const Transport &transport)
Adds a transport to the list of desired transport mechanisms.
ConverterNames getConverters() const
boost::shared_ptr< Reader > ReaderPtr
Definition: Reader.h:102
A informer to publish data.
Definition: Informer.h:95
ListenerPtr createListener(const Scope &scope, const ParticipantConfig &config=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a new listener for the specified scope.
Definition: Factory.cpp:278
Factory & getFactory()
Returns a factory for client-level RSB objects.
Definition: Factory.cpp:159
ConverterDirection
Definition: Factory.cpp:61
virtual ~Factory()
Definition: Factory.cpp:250
std::vector< transport::InPushConnectorPtr > createInPushConnectors(const ParticipantConfig &config)
Definition: Factory.cpp:383
patterns::LocalServerPtr createLocalServer(const Scope &scope, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a Server object that exposes methods under the scope scope.
Definition: Factory.cpp:313
set< string > getAvailableTransports(unsigned int requiredDirections)
Returns the names of all available transports which support requiredDirections.
Definition: transports.cpp:142
SignalParticipantDestroyedPtr signalParticipantDestroyed
Definition: Factory.h:336
boost::shared_ptr< Listener > ListenerPtr
Definition: Listener.h:151
boost::shared_ptr< OutConnector > OutConnectorPtr
const std::vector< std::string > & getComponents() const
Returns all components of the scope as an ordered list.
Definition: Scope.cpp:139
boost::shared_ptr< Participant > ParticipantPtr
Definition: Participant.h:116
std::vector< transport::InPullConnectorPtr > createInPullConnectors(const ParticipantConfig &config)
Definition: Factory.cpp:360
A derived Method class which can be called from the remote side and implements its behavior by invoki...
Definition: LocalServer.h:366
boost::shared_ptr< ConverterSelectionStrategy< WireType > > Ptr
boost::shared_ptr< IntlCallback > CallbackPtr
Definition: LocalServer.h:71
Description of a desired transport.
The server side of a request-reply-based communication channel.
Definition: LocalServer.h:54
void registerDefaultTransports()
Definition: transports.cpp:53
boost::shared_ptr< SignalParticipantCreated > SignalParticipantCreatedPtr
Definition: Factory.h:60
Factory for RSB user-level domain objects for communication patterns.
Definition: Factory.h:77
A Listener receives events published by rsb::Informer objects by participating in a channel with a su...
Definition: Listener.h:80
A Reader receives events published by a informers by participating in a channel with a suitable scope...
Definition: Reader.h:61
Transport & mutableTransport(const std::string &name)
Returns a single configured transport which can be modified in place.
InformerBasePtr createInformerBase(const Scope &scope, const std::string &dataType="", const ParticipantConfig &config=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates and returns a new Informer that publishes Event s under the Scope scope.
Definition: Factory.cpp:266
boost::shared_ptr< InPushConnector > InPushConnectorPtr
Definition: Listener.h:58
patterns::RemoteServer::RemoteMethodPtr createRemoteMethod(const Scope &scope, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a patterns::RemoteServer::RemoteMethod.
Definition: Factory.cpp:325
static void provideConfigOptions(rsc::config::OptionHandler &handler)
Provides the default configuration options for RSB to the specified handler.
Definition: Factory.cpp:253
ParticipantConfig defaultConfig
Always acquire configMutex before reading or writing the config.
Definition: Factory.h:332
A class describing the configuration of Participant instances.
InPullFactory & getInPullFactory()
Definition: Factory.cpp:32
SignalParticipantCreatedPtr getSignalParticipantCreated()
Definition: Factory.cpp:258
ReaderPtr createReader(const Scope &scope, const ParticipantConfig &config=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a new Reader object for the specified scope.
Definition: Factory.cpp:288
void setDefaultParticipantConfig(const ParticipantConfig &config)
Sets the default config for participants that will be used for every new participant that is created ...
Definition: Factory.cpp:354
boost::shared_ptr< InformerBase > InformerBasePtr
Definition: Informer.h:214
SignalParticipantCreatedPtr signalParticipantCreated
Definition: Factory.h:335
InPushFactory & getInPushFactory()
Definition: Factory.cpp:37
boost::shared_ptr< RemoteServer > RemoteServerPtr
Definition: RemoteServer.h:307
OutFactory & getOutFactory()
Definition: Factory.cpp:42
void registerDefaultConverters()
Definition: converters.cpp:49
rsc::plugins::ManagerPtr pluginManager
Definition: Factory.h:327
boost::recursive_mutex configMutex
Definition: Factory.h:333
patterns::LocalServer::LocalMethodPtr createLocalMethod(const Scope &scope, patterns::LocalServer::CallbackPtr callback, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a patterns::LocalServer::LocalMethod.
Definition: Factory.cpp:299
boost::shared_ptr< SignalParticipantDestroyed > SignalParticipantDestroyedPtr
Definition: Participant.h:47
boost::shared_ptr< InPullConnector > InPullConnectorPtr
Scope is a descriptor for a hierarchical channel of the unified bus.
Definition: Scope.h:46
std::set< Transport > getTransports(bool includeDisabled=false) const
Returns the set of desired transports for a participant.
rsc::plugins::ManagerPtr getPluginManager() const
Returns the plugin manager instance used by the RSB core.
Definition: Factory.cpp:428
patterns::RemoteServerPtr createRemoteServer(const Scope &scope, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a RemoteServer object for the server at scope scope.
Definition: Factory.cpp:338
Factory * factoryWhileLoadingPlugins
Definition: Factory.cpp:157
boost::signals2::signal< void(ParticipantPtr, ParticipantPtr)> SignalParticipantCreated
Definition: Factory.h:59
The client side of a request-reply-based communication channel.
Definition: RemoteServer.h:59