RSB  0.9.6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Bus.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is part of the RSB project
4  *
5  * Copyright (C) 2011, 2012 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "Bus.h"
28 
29 using namespace std;
30 
31 using namespace rsc::logging;
32 
33 using namespace rsb::transport;
34 
35 namespace rsb {
36 namespace transport{
37 namespace inprocess {
38 
39 Bus::Bus() :
40  logger(Logger::getLogger("rsb.transport.inprocess.Bus")), singleThreaded(false) {
41 }
42 
44  if (!this->sinks.empty()) {
45  RSCWARN(
46  logger,
47  "" << this->sinks.size()
48  << " non-empty scopes when destructing");
49  }
50 }
51 
52 /*void Bus::printContents(ostream& stream) const {
53  stream << "sinks = " << this->sinks;
54  }*/
55 
57  boost::recursive_mutex::scoped_lock lock(this->mutex);
58 
59  RSCDEBUG(logger, "Adding sink " << sink);
60 
61  SinkMap::iterator it = this->sinks.find(sink->getScope());
62  if (it == this->sinks.end()) {
63  RSCDEBUG(logger,
64  "No entry in sink map for event scope " << sink->getScope());
65 
66  set<boost::weak_ptr<InConnector> > connectors;
67  for (SinkMap::iterator it_ = this->sinks.begin(); it_
68  != this->sinks.end(); ++it_) {
69  RSCDEBUG(
70  logger,
71  "Adding " << it_->second.size() << " connectors from "
72  << it_->first);
73 
74  if (it_->first == sink->getScope() || it_->first.isSuperScopeOf(
75  sink->getScope())) {
76  copy(it_->second.begin(), it_->second.end(),
77  inserter(connectors, connectors.begin()));
78  }
79  }
80  copy(connectors.begin(), connectors.end(),
81  back_inserter(this->sinks[sink->getScope()]));
82 
83  RSCDEBUG(
84  logger,
85  "Created entry in sink map for scope " << sink->getScope()
86  << " with " << connectors.size() << " connectors");
87 
88  it = this->sinks.find(sink->getScope());
89  }
90  it->second.push_back(sink);
91 
92  for (SinkMap::iterator it = this->sinks.begin(); it != this->sinks.end(); ++it) {
93  if (it->first.isSubScopeOf(sink->getScope())) {
94  SinkList& connectors = it->second;
95  connectors.push_back(sink);
96  }
97  }
98 }
99 
101  boost::recursive_mutex::scoped_lock lock(this->mutex);
102 
103  vector<Scope> scopes = sink->getScope().superScopes(true);
104  RSCDEBUG(logger, "Removing sink " << sink);
105 
106  for (SinkMap::iterator it = this->sinks.begin(); it != this->sinks.end(); ++it) {
107  SinkList& connectors = it->second;
108  RSCDEBUG(
109  logger,
110  "Scope " << it->first << " has " << connectors.size()
111  << " connectors");
112 
113  for (SinkList::iterator it_ = connectors.begin(); it_
114  != connectors.end(); ++it_) {
115  // If the weak pointer is dangling, we found our
116  // sink. Otherwise, we can just check the pointer.
117  InConnectorPtr ptr = it_->lock();
118  if (!ptr || (ptr.get() == sink)) {
119  RSCDEBUG(logger,
120  "Found connector " << sink << " in scope " << it->first);
121  it_ = connectors.erase(it_);
122  break;
123  }
124  }
125 
126  RSCDEBUG(
127  logger,
128  "Scope " << it->first << " has " << connectors.size()
129  << " connectors");
130  if (connectors.empty()) {
131  RSCDEBUG(logger, "Removing empty scope " << it->first);
132  //this->sinks.erase(it);
133  }
134  }
135 }
136 
137 void Bus::handle(EventPtr event) {
138  // RSCDEBUG(logger, "Delivering event " << event);
139 
140  if (singleThreaded) {
141  this->handleNoLock(event);
142  } else {
143  boost::recursive_mutex::scoped_lock lock(this->mutex);
144  this->handleNoLock(event);
145  }
146 
147 }
148 
150 
151  SinkMap::const_iterator it = this->sinks.find(*event->getScopePtr());
152  if (it == this->sinks.end()) {
153  RSCDEBUG(logger,
154  "No entry in sink map for event scope " << *event->getScopePtr());
155 
156  set<boost::weak_ptr<InConnector> > connectors;
157  for (SinkMap::iterator it_ = this->sinks.begin(); it_
158  != this->sinks.end(); ++it_) {
159  RSCDEBUG(
160  logger,
161  "Adding " << it_->second.size() << " connectors from "
162  << it_->first);
163 
164  if (it_->first == *event->getScopePtr() || it_->first.isSuperScopeOf(
165  *event->getScopePtr())) {
166  copy(it_->second.begin(), it_->second.end(),
167  inserter(connectors, connectors.begin()));
168  }
169  }
170  copy(connectors.begin(), connectors.end(),
171  back_inserter(this->sinks[*event->getScopePtr()]));
172 
173  RSCDEBUG(
174  logger,
175  "Created entry in sink map for scope " << *event->getScopePtr()
176  << " with " << connectors.size() << " connectors");
177 
178  it = this->sinks.find(*event->getScopePtr());
179  }
180 
181  const SinkList& connectors = it->second;
182  for (SinkList::const_iterator it__ = connectors.begin(); it__
183  != connectors.end(); ++it__) {
184  InConnectorPtr connector = it__->lock();
185  if (connector) {
186  // RSCDEBUG(logger, "Delivering to " << connector << " in " << *it);
187  connector->handle(event);
188  }
189  }
190 }
191 
192 }
193 }
194 }
boost::recursive_mutex mutex
Definition: Bus.h:75
boost::shared_ptr< InConnector > InConnectorPtr
Definition: InConnector.h:72
void removeSink(InConnector *sink)
Definition: Bus.cpp:100
void handle(EventPtr event)
Handle event.
Definition: Bus.cpp:137
void addSink(InConnectorPtr sink)
Definition: Bus.cpp:56
std::list< boost::weak_ptr< InConnector > > SinkList
Definition: Bus.h:69
rsc::logging::LoggerPtr logger
Definition: Bus.h:72
void handleNoLock(EventPtr event)
Definition: Bus.cpp:149
std::vector< Scope > superScopes(const bool &includeSelf=false) const
Generates all super scopes of this scope including the root scope "/".
Definition: Scope.cpp:208
boost::shared_ptr< Event > EventPtr
Definition: Event.h:251