RSB  0.9.6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Factory.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is a part of the RSB project.
4  *
5  * Copyright (C) 2011 by Johannes Wienke <jwienke at techfak dot uni-bielefeld dot de>
6  * Copyright (C) 2012, 2013 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
7  *
8  * This file may be licensed under the terms of the
9  * GNU Lesser General Public License Version 3 (the ``LGPL''),
10  * or (at your option) any later version.
11  *
12  * Software distributed under the License is distributed
13  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
14  * express or implied. See the LGPL for the specific language
15  * governing rights and limitations.
16  *
17  * You should have received a copy of the LGPL along with this
18  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
19  * or write to the Free Software Foundation, Inc.,
20  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21  *
22  * The development of this software was supported by:
23  * CoR-Lab, Research Institute for Cognition and Robotics
24  * Bielefeld University
25  *
26  * ============================================================ */
27 
28 #include "Factory.h"
29 
30 #include <boost/filesystem/fstream.hpp>
31 
32 #include <rsc/config/Configuration.h>
33 #include <rsc/config/Environment.h>
34 
35 #include <rsc/logging/OptionBasedConfigurator.h>
36 
37 #include <rsc/plugins/Manager.h>
38 #include <rsc/plugins/Configurator.h>
39 
40 #include <rsb/Version.h>
41 
43 
44 #include "converter/converters.h"
45 #include "converter/Repository.h"
47 
48 #include "transport/transports.h"
49 
50 #include "LocalService.h"
51 
52 using namespace std;
53 
54 using namespace rsc::config;
55 using namespace rsc::logging;
56 using namespace rsc::runtime;
57 
58 using namespace rsb::converter;
59 using namespace rsb::transport;
60 
61 namespace {
62 
63 template<unsigned int which, typename C>
64 std::map<typename C::value_type::first_type,
65  typename C::value_type::second_type> pairsToMap(const C& container) {
66  typedef typename C::value_type::first_type first_type;
67  typedef typename C::value_type::second_type second_type;
68 
69  typedef typename C::const_iterator const_iterator;
70 
71  std::map<first_type, second_type> result;
72  for (const_iterator it = container.begin(); it != container.end(); ++it) {
73  if (which == 1)
74  result[it->first] = it->second;
75  else
76  result[it->second] = it->first;
77  }
78  return result;
79 }
80 
81 }
82 
83 namespace rsb {
84 
85 Factory& Factory::getInstance() {
86  return getFactory();
87 }
88 
90  return Factory::getInstanceBase();
91 }
92 
93 Factory& Factory::getInstanceBase() {
94  return rsc::patterns::Singleton<Factory>::getInstance();
95 }
96 
97 Factory::Factory() :
98  logger(Logger::getLogger("rsb.Factory")),
99  pluginManager(new rsc::plugins::Manager()) {
100 
101  // Configure RSC-based logging.
102  {
103  rsc::logging::OptionBasedConfigurator configurator;
104  configure(configurator, "rsb.conf", "RSC_", 0, 0, false, Version::installPrefix());
105  }
106 
107  // Register default implementation for all extension points.
108  RSCINFO(this->logger, "Registering default implementations");
112 
113  // Configure plugin path and load plugins to register additional
114  // implementations for extension points.
115  //
116  // We use the following default plugin path:
117  // 1. $HOME/.rsb$MAJOR.$MINOR/plugins
118  // 2. $libdir/rsb$MAJOR.$MINOR/plugins
119  RSCINFO(this->logger, "Processing plugin configuration");
120  {
121  string versioned = str(boost::format("rsb%1%.%2%")
122  % RSB_VERSION_MAJOR
123  % RSB_VERSION_MINOR);
124  vector<boost::filesystem::path> defaultPath;
125  // It may be impossible to determine a home directory for the
126  // current user. Warn, but don't throw.
127  try {
128  defaultPath.push_back(userHomeDirectory() / ("." + versioned) / "plugins");
129  } catch (const runtime_error& e) {
130  RSCWARN(this->logger,
131  "Failed to determine user-specific plugin directory: "
132  << e.what());
133  }
134  defaultPath.push_back(Version::libdir() / versioned / "plugins");
135  rsc::plugins::Configurator configurator(pluginManager, defaultPath);
136  configure(configurator, "rsb.conf", "RSB_", 0, 0, true, Version::installPrefix());
137  }
138 
139  // Setup default participant config
140  //
141  // Collect all available connector implementations from the
142  // connector factories:
143  // + In-push
144  // + In-pull
145  // + Out
146  // Disable discovered connectors with the exception of the
147  // socket transport.
148  set<string> availableTransports;
149  {
150  set<InPullFactory::ConnectorInfo> infos = getInPullFactory().getConnectorInfos();
151  for (set<InPullFactory::ConnectorInfo>::const_iterator it
152  = infos.begin(); it != infos.end(); ++it) {
153  availableTransports.insert(it->getName());
154  }
155  }{
156  set<InPushFactory::ConnectorInfo> infos = getInPushFactory().getConnectorInfos();
157  for (set<InPushFactory::ConnectorInfo>::const_iterator it
158  = infos.begin(); it != infos.end(); ++it) {
159  availableTransports.insert(it->getName());
160  }
161  }{
162  set<OutFactory::ConnectorInfo> infos = getOutFactory().getConnectorInfos();
163  for (set<OutFactory::ConnectorInfo>::const_iterator it
164  = infos.begin(); it != infos.end(); ++it) {
165  availableTransports.insert(it->getName());
166  }
167  }
168 
170  for (set<string>::const_iterator it = availableTransports.begin();
171  it != availableTransports.end(); ++it) {
172  this->defaultConfig.addTransport(ParticipantConfig::Transport(*it, *it == "socket"));
173  }
174 
175  // If there is only one transport, we can blindly enable it since
176  // the user could end up without any enabled transport otherwise.
177  if (this->defaultConfig.getTransports().size() == 1) {
178  string name = this->defaultConfig.getTransports().begin()->getName();
179  this->defaultConfig.mutableTransport(name).setEnabled(true);
180  }
181 
182  // Merge with user configuration (configuration files, environment
183  // variables)
184  configure(this->defaultConfig, "rsb.conf", "RSB_", 0, 0, true, Version::installPrefix());
185 
186  // Issue a warning if the combination of available transport
187  // implementations and user configuration leads to no enabled
188  // transports.
189  if (this->defaultConfig.getTransports().empty()) {
190  RSCWARN(logger, "No transports are enabled. This is probably a configuration error or an internal RSB error.");
191  }
192 
193  RSCDEBUG(logger, "Default config " << defaultConfig);
194 }
195 
197 }
198 
200  const string& dataType,
201  const ParticipantConfig& config) {
202  return InformerBasePtr(new InformerBase(createOutConnectors(config), scope, config, dataType));
203 }
204 
205 
207  const ParticipantConfig& config) {
208  return ListenerPtr(new Listener(createInPushConnectors(config), scope, config));
209 }
210 
212  const ParticipantConfig& config) {
213  return ReaderPtr(new Reader(createInPullConnectors(config), scope, config));
214 }
215 
217  const ParticipantConfig &listenerConfig,
218  const ParticipantConfig &informerConfig) {
219  return patterns::ServerPtr(
220  new patterns::Server(scope, listenerConfig, informerConfig));
221 }
222 
224  const ParticipantConfig &listenerConfig,
225  const ParticipantConfig &informerConfig) {
227  new patterns::RemoteServer(scope, listenerConfig, informerConfig));
228 }
229 
231  boost::recursive_mutex::scoped_lock lock(configMutex);
232  return defaultConfig;
233 }
234 
236  boost::recursive_mutex::scoped_lock lock(configMutex);
237  this->defaultConfig = config;
238 }
239 
241  return ServicePtr(new LocalService(scope));
242 }
243 
244 vector<InPullConnectorPtr>
246  // Note: getTransports() only returns *enabled* transports.
247  vector<InPullConnectorPtr> connectors;
248  set<ParticipantConfig::Transport> configuredTransports = config.getTransports();
249  for (set<ParticipantConfig::Transport>::const_iterator transportIt =
250  configuredTransports.begin(); transportIt
251  != configuredTransports.end(); ++transportIt) {
252  RSCDEBUG(logger, "Trying to add connector " << *transportIt);
253  try {
254  Properties options = transportIt->getOptions();
255  RSCDEBUG(logger, "Supplied connector options " << transportIt->getOptions());
256 
257  // Take care of converters
258  if (!options.has("converters")) {
259  RSCDEBUG(logger, "Converter configuration for transport `"
260  << transportIt->getName() << "': " << transportIt->getConverters());
261  // TODO we should not have to know the transport's wire-type here
263  = converterRepository<string>()
264  ->getConvertersForDeserialization(pairsToMap<1> (transportIt->getConverters()));
265  RSCDEBUG(logger, "Selected converters for transport `"
266  << transportIt->getName() << "': " << converters);
267  options["converters"] = converters;
268  }
269  connectors.push_back(InPullConnectorPtr(getInPullFactory().createInst(transportIt->getName(), options)));
270  } catch (const exception& e) {
271  throw runtime_error(boost::str(boost::format("Error configuring connector `%1%', in-pull: %2%")
272  % transportIt->getName() % e.what()));
273  }
274  }
275  return connectors;
276 }
277 
278 vector<InPushConnectorPtr>
280  // Note: getTransports() only returns *enabled* transports.
281  vector<InPushConnectorPtr> connectors;
282  set<ParticipantConfig::Transport> configuredTransports = config.getTransports();
283  for (set<ParticipantConfig::Transport>::const_iterator transportIt =
284  configuredTransports.begin(); transportIt
285  != configuredTransports.end(); ++transportIt) {
286  RSCDEBUG(logger, "Trying to add connector " << *transportIt);
287  try {
288  Properties options = transportIt->getOptions();
289  RSCDEBUG(logger, "Supplied connector options " << transportIt->getOptions());
290 
291  // Take care of converters
292  if (!options.has("converters")) {
293  RSCDEBUG(logger, "Converter configuration for transport `"
294  << transportIt->getName() << "': " << transportIt->getConverters());
295  // TODO we should not have to know the transport's wire-type here
297  = converterRepository<string>()
298  ->getConvertersForDeserialization(pairsToMap<1> (transportIt->getConverters()));
299  RSCDEBUG(logger, "Selected converters for transport `"
300  << transportIt->getName() << "': " << converters);
301  options["converters"] = converters;
302  }
303  connectors.push_back(InPushConnectorPtr(getInPushFactory().createInst(transportIt->getName(), options)));
304  } catch (const exception& e) {
305  throw runtime_error(boost::str(boost::format("Error configuring connector `%1%', in-push: %2%")
306  % transportIt->getName() % e.what()));
307  }
308  }
309  return connectors;
310 }
311 
312 vector<OutConnectorPtr>
314  // Note: getTransports() only returns *enabled* transports.
315  vector<OutConnectorPtr> connectors;
316  set<ParticipantConfig::Transport> configuredTransports = config.getTransports();
317  for (set<ParticipantConfig::Transport>::const_iterator transportIt =
318  configuredTransports.begin(); transportIt
319  != configuredTransports.end(); ++transportIt) {
320  RSCDEBUG(logger, "Trying to add connector " << *transportIt);
321  try {
322  Properties options = transportIt->getOptions();
323  RSCDEBUG(logger, "Supplied connector options " << transportIt->getOptions());
324 
325  // Take care of converters
326  if (!options.has("converters")) {
327  RSCDEBUG(logger, "Converter configuration for transport `"
328  << transportIt->getName() << "': " << transportIt->getConverters());
329  // TODO we should not have to know the transport's wire-type here
331  = converterRepository<string>()
332  ->getConvertersForSerialization(pairsToMap<2> (transportIt->getConverters()));
333  RSCDEBUG(logger, "Selected converters for transport `"
334  << transportIt->getName() << "': " << converters);
335  options["converters"] = converters;
336  }
337  connectors.push_back(OutConnectorPtr(getOutFactory().createInst(transportIt->getName(), options)));
338  } catch (const exception& e) {
339  throw runtime_error(boost::str(boost::format("Error configuring connector `%1%', out: %2%")
340  % transportIt->getName() % e.what()));
341  }
342  }
343  return connectors;
344 }
345 
346 rsc::plugins::ManagerPtr Factory::getPluginManager() const {
347  return this->pluginManager;
348 }
349 
350 }
patterns::RemoteServerPtr createRemoteServer(const Scope &scope, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig())
Creates a RemoteServer object for the server at scope scope.
Definition: Factory.cpp:223
void registerDefaultEventProcessingStrategies()
Definition: strategies.cpp:44
std::vector< transport::OutConnectorPtr > createOutConnectors(const ParticipantConfig &config)
Definition: Factory.cpp:313
ParticipantConfig getDefaultParticipantConfig() const
Returns the default configuration for new participants.
Definition: Factory.cpp:230
std::set< ConnectorInfo > getConnectorInfos() const
Definition: Factory.h:209
A Service implementation that structures services locally.
Definition: LocalService.h:41
void addTransport(const Transport &transport)
Adds a transport to the list of desired transport mechanisms.
boost::shared_ptr< Reader > ReaderPtr
Definition: Reader.h:101
A informer to publish data.
Definition: Informer.h:95
Factory & getFactory()
Returns a factory for client-level RSB objects.
Definition: Factory.cpp:89
ReaderPtr createReader(const Scope &scope, const ParticipantConfig &config=getFactory().getDefaultParticipantConfig())
Creates a new Reader object for the specified scope.
Definition: Factory.cpp:211
virtual ~Factory()
Definition: Factory.cpp:196
boost::shared_ptr< Server > ServerPtr
Definition: Server.h:293
std::vector< transport::InPushConnectorPtr > createInPushConnectors(const ParticipantConfig &config)
Definition: Factory.cpp:279
boost::shared_ptr< Listener > ListenerPtr
Definition: Listener.h:150
patterns::ServerPtr createServer(const Scope &scope, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig())
Creates a Server object that exposes methods under the scope scope.
Definition: Factory.cpp:216
boost::shared_ptr< OutConnector > OutConnectorPtr
std::vector< transport::InPullConnectorPtr > createInPullConnectors(const ParticipantConfig &config)
Definition: Factory.cpp:245
boost::shared_ptr< ConverterSelectionStrategy< WireType > > Ptr
ServicePtr createService(const Scope &scope)
Creates a Service instance operating on the given scope.
Definition: Factory.cpp:240
Description of a desired transport.
void registerDefaultTransports()
Definition: transports.cpp:53
ListenerPtr createListener(const Scope &scope, const ParticipantConfig &config=getFactory().getDefaultParticipantConfig())
Creates a new listener for the specified scope.
Definition: Factory.cpp:206
Factory for RSB user-level domain objects for communication patterns.
Definition: Factory.h:71
A Listener receives events published by rsb::Informer objects by participating in a channel with a su...
Definition: Listener.h:80
boost::shared_ptr< Service > ServicePtr
Definition: Service.h:42
A Reader receives events published by a informers by participating in a channel with a suitable scope...
Definition: Reader.h:61
Transport & mutableTransport(const std::string &name)
Returns a single configured transport which can be modified in place.
boost::shared_ptr< InPushConnector > InPushConnectorPtr
Definition: Listener.h:58
ParticipantConfig defaultConfig
Always acquire configMutex before reading or writing the config.
Definition: Factory.h:236
A class describing the configuration of Participant instances.
InPullFactory & getInPullFactory()
Definition: Factory.cpp:32
void setDefaultParticipantConfig(const ParticipantConfig &config)
Sets the default config for participants that will be used for every new participant that is created ...
Definition: Factory.cpp:235
boost::shared_ptr< InformerBase > InformerBasePtr
Definition: Informer.h:211
boost::shared_ptr< RemoteServer > RemoteServerPtr
Definition: RemoteServer.h:282
InPushFactory & getInPushFactory()
Definition: Factory.cpp:36
rsc::logging::LoggerPtr logger
Definition: Factory.h:229
OutFactory & getOutFactory()
Definition: Factory.cpp:40
void registerDefaultConverters()
Definition: converters.cpp:49
rsc::plugins::ManagerPtr pluginManager
Definition: Factory.h:231
The server side of a request-reply-based communication channel.
Definition: Server.h:54
boost::recursive_mutex configMutex
Definition: Factory.h:237
boost::shared_ptr< InPullConnector > InPullConnectorPtr
Scope is a descriptor for a hierarchical channel of the unified bus.
Definition: Scope.h:46
std::set< Transport > getTransports(bool includeDisabled=false) const
Returns the set of desired transports for a participant.
rsc::plugins::ManagerPtr getPluginManager() const
Returns the plugin manager instance used by the RSB core.
Definition: Factory.cpp:346
InformerBasePtr createInformerBase(const Scope &scope, const std::string &dataType="", const ParticipantConfig &config=getFactory().getDefaultParticipantConfig())
Creates and returns a new Informer that publishes Event s under the Scope scope.
Definition: Factory.cpp:199
The client side of a request-reply-based communication channel.
Definition: RemoteServer.h:62