RSB  0.17.0
Bus.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is part of the RSB project
4  *
5  * Copyright (C) 2011, 2012 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "Bus.h"
28 
29 #include <rsc/runtime/ContainerIO.h>
30 
31 #include "InConnector.h"
32 
33 using namespace std;
34 
35 using namespace rsc::logging;
36 
37 using namespace rsb::transport;
38 
39 namespace rsb {
40 namespace transport{
41 namespace inprocess {
42 
43 Bus::Bus() :
44  logger(Logger::getLogger("rsb.transport.inprocess.Bus")), singleThreaded(false) {
45 }
46 
48  RSCDEBUG(logger, "Starting destruction");
49  for (SinkMap::iterator sinkIt = this->sinks.begin();
50  sinkIt != this->sinks.end(); ++sinkIt) {
51  if (!sinkIt->second.empty()) {
52  RSCWARN(logger,
53  "non-empty scope " << sinkIt->first << " when destructing: " << sinkIt->second);
54  }
55  }
56 }
57 
58 /*void Bus::printContents(ostream& stream) const {
59  stream << "sinks = " << this->sinks;
60  }*/
61 
63  boost::recursive_mutex::scoped_lock lock(this->mutex);
64 
65  RSCDEBUG(logger, "Adding sink " << sink);
66 
67  SinkMap::iterator it = this->sinks.find(sink->getScope());
68  if (it == this->sinks.end()) {
69  RSCDEBUG(logger,
70  "No entry in sink map for event scope " << sink->getScope());
71 
72  set<boost::weak_ptr<InConnector> > connectors;
73  for (SinkMap::iterator it_ = this->sinks.begin(); it_
74  != this->sinks.end(); ++it_) {
75  RSCDEBUG(
76  logger,
77  "Adding " << it_->second.size() << " connectors from "
78  << it_->first);
79 
80  if (it_->first == sink->getScope() || it_->first.isSuperScopeOf(
81  sink->getScope())) {
82  copy(it_->second.begin(), it_->second.end(),
83  inserter(connectors, connectors.begin()));
84  }
85  }
86  copy(connectors.begin(), connectors.end(),
87  back_inserter(this->sinks[sink->getScope()]));
88 
89  RSCDEBUG(
90  logger,
91  "Created entry in sink map for scope " << sink->getScope()
92  << " with " << connectors.size() << " connectors");
93 
94  it = this->sinks.find(sink->getScope());
95  }
96  it->second.push_back(sink);
97 
98  for (SinkMap::iterator it = this->sinks.begin(); it != this->sinks.end(); ++it) {
99  if (it->first.isSubScopeOf(sink->getScope())) {
100  SinkList& connectors = it->second;
101  connectors.push_back(sink);
102  }
103  }
104 }
105 
107  boost::recursive_mutex::scoped_lock lock(this->mutex);
108 
109  vector<Scope> scopes = sink->getScope().superScopes(true);
110  RSCDEBUG(logger, "Removing sink " << sink);
111 
112  set<Scope> emptyScopes;
113  for (SinkMap::iterator it = this->sinks.begin(); it != this->sinks.end(); ++it) {
114  SinkList& connectors = it->second;
115  RSCDEBUG(
116  logger,
117  "Scope " << it->first << " has " << connectors.size()
118  << " connectors");
119 
120  for (SinkList::iterator it_ = connectors.begin(); it_
121  != connectors.end(); ++it_) {
122  // If the weak pointer is dangling, we found our
123  // sink. Otherwise, we can just check the pointer.
124  InConnectorPtr ptr = it_->lock();
125  if (!ptr || (ptr.get() == sink)) {
126  RSCDEBUG(logger,
127  "Found connector " << sink << " in scope " << it->first);
128  it_ = connectors.erase(it_);
129  break;
130  }
131  }
132 
133  RSCDEBUG(
134  logger,
135  "Scope " << it->first << " has " << connectors.size()
136  << " connectors");
137  if (connectors.empty()) {
138  RSCDEBUG(logger, "Marking empty scope " << it->first
139  << " for removal");
140  emptyScopes.insert(it->first);
141  }
142  }
143  // delete scopes that became empty
144  for (set<Scope>::iterator scopeIt = emptyScopes.begin();
145  scopeIt != emptyScopes.end(); ++scopeIt) {
146  this->sinks.erase(*scopeIt);
147  }
148 }
149 
150 void Bus::handle(EventPtr event) {
151  // RSCDEBUG(logger, "Delivering event " << event);
152 
153  if (singleThreaded) {
154  this->handleNoLock(event);
155  } else {
156  boost::recursive_mutex::scoped_lock lock(this->mutex);
157  this->handleNoLock(event);
158  }
159 
160 }
161 
163 
164  SinkMap::const_iterator it = this->sinks.find(*event->getScopePtr());
165  if (it == this->sinks.end()) {
166  RSCDEBUG(logger,
167  "No entry in sink map for event scope " << *event->getScopePtr());
168 
169  // we have not received an event on this scope before. Therefore the
170  // internal map of sinks needs to be populated from all existing sinks
171  // that will receive events from this scope
172 
173  set<boost::weak_ptr<InConnector> > connectors;
174  for (SinkMap::iterator it_ = this->sinks.begin(); it_
175  != this->sinks.end(); ++it_) {
176  RSCDEBUG(
177  logger,
178  "Adding " << it_->second.size() << " connectors from "
179  << it_->first);
180 
181  if (it_->first == *event->getScopePtr() || it_->first.isSuperScopeOf(
182  *event->getScopePtr())) {
183  copy(it_->second.begin(), it_->second.end(),
184  inserter(connectors, connectors.begin()));
185  }
186  }
187  copy(connectors.begin(), connectors.end(),
188  back_inserter(this->sinks[*event->getScopePtr()]));
189 
190  RSCDEBUG(
191  logger,
192  "Created entry in sink map for scope " << *event->getScopePtr()
193  << " with " << connectors.size() << " connectors");
194 
195  it = this->sinks.find(*event->getScopePtr());
196  }
197 
198  const SinkList& connectors = it->second;
199  for (SinkList::const_iterator it__ = connectors.begin(); it__
200  != connectors.end(); ++it__) {
201  InConnectorPtr connector = it__->lock();
202  if (connector) {
203  // RSCDEBUG(logger, "Delivering to " << connector << " in " << *it);
204  connector->handle(event);
205  }
206  }
207 }
208 
210  static boost::mutex mutex;
211  static BusPtr defaultBus;
212  boost::mutex::scoped_lock lock(mutex);
213  if (!defaultBus) {
214  defaultBus.reset(new Bus);
215  }
216  return defaultBus;
217 }
218 
219 }
220 }
221 }
boost::recursive_mutex mutex
Definition: Bus.h:76
STL namespace.
void removeSink(InConnector *sink)
Definition: Bus.cpp:106
void handle(EventPtr event)
Handle event.
Definition: Bus.cpp:150
void addSink(InConnectorPtr sink)
Definition: Bus.cpp:62
std::list< boost::weak_ptr< InConnector > > SinkList
Definition: Bus.h:70
rsc::logging::LoggerPtr logger
Definition: Bus.h:73
boost::shared_ptr< InConnector > InConnectorPtr
Definition: Bus.h:49
boost::shared_ptr< Bus > BusPtr
Definition: Bus.h:81
void handleNoLock(EventPtr event)
Definition: Bus.cpp:162
BusPtr getDefaultBus()
Definition: Bus.cpp:209
std::vector< Scope > superScopes(const bool &includeSelf=false) const
Generates all super scopes of this scope including the root scope "/".
Definition: Scope.cpp:208
boost::shared_ptr< Event > EventPtr
Definition: Event.h:264