RSB  0.7.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Bus.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is part of the RSB project
4  *
5  * Copyright (C) 2011, 2012 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "Bus.h"
28 
29 using namespace std;
30 
31 using namespace rsc::logging;
32 
33 using namespace rsb::transport;
34 
35 namespace rsb {
36 namespace inprocess {
37 
38 Bus::Bus() :
39  logger(Logger::getLogger("rsb.inprocess.Bus")), singleThreaded(false) {
40 }
41 
43  if (!this->sinks.empty()) {
44  RSCWARN(
45  logger,
46  "" << this->sinks.size()
47  << " non-empty scopes when destructing");
48  }
49 }
50 
51 string Bus::getClassName() const {
52  return "Bus";
53 }
54 
55 /*void Bus::printContents(ostream& stream) const {
56  stream << "sinks = " << this->sinks;
57  }*/
58 
60  boost::recursive_mutex::scoped_lock lock(this->mutex);
61 
62  RSCDEBUG(logger, "Adding sink " << sink);
63 
64  SinkMap::iterator it = this->sinks.find(sink->getScope());
65  if (it == this->sinks.end()) {
66  RSCDEBUG(logger,
67  "No entry in sink map for event scope " << sink->getScope());
68 
69  set<boost::weak_ptr<InConnector> > connectors;
70  for (SinkMap::iterator it_ = this->sinks.begin(); it_
71  != this->sinks.end(); ++it_) {
72  RSCDEBUG(
73  logger,
74  "Adding " << it_->second.size() << " connectors from "
75  << it_->first);
76 
77  if (it_->first == sink->getScope() || it_->first.isSuperScopeOf(
78  sink->getScope())) {
79  copy(it_->second.begin(), it_->second.end(),
80  inserter(connectors, connectors.begin()));
81  }
82  }
83  copy(connectors.begin(), connectors.end(),
84  back_inserter(this->sinks[sink->getScope()]));
85 
86  RSCDEBUG(
87  logger,
88  "Created entry in sink map for scope " << sink->getScope()
89  << " with " << connectors.size() << " connectors");
90 
91  it = this->sinks.find(sink->getScope());
92  }
93  it->second.push_back(sink);
94 
95  for (SinkMap::iterator it = this->sinks.begin(); it != this->sinks.end(); ++it) {
96  if (it->first.isSubScopeOf(sink->getScope())) {
97  SinkList& connectors = it->second;
98  connectors.push_back(sink);
99  }
100  }
101 }
102 
104  boost::recursive_mutex::scoped_lock lock(this->mutex);
105 
106  vector<Scope> scopes = sink->getScope().superScopes(true);
107  RSCDEBUG(logger, "Removing sink " << sink);
108 
109  for (SinkMap::iterator it = this->sinks.begin(); it != this->sinks.end(); ++it) {
110  SinkList& connectors = it->second;
111  RSCDEBUG(
112  logger,
113  "Scope " << it->first << " has " << connectors.size()
114  << " connectors");
115 
116  for (SinkList::iterator it_ = connectors.begin(); it_
117  != connectors.end(); ++it_) {
118  // If the weak pointer is dangling, we found our
119  // sink. Otherwise, we can just check the pointer.
120  InConnectorPtr ptr = it_->lock();
121  if (!ptr || (ptr.get() == sink)) {
122  RSCDEBUG(logger,
123  "Found connector " << sink << " in scope " << it->first);
124  it_ = connectors.erase(it_);
125  break;
126  }
127  }
128 
129  RSCDEBUG(
130  logger,
131  "Scope " << it->first << " has " << connectors.size()
132  << " connectors");
133  if (connectors.empty()) {
134  RSCDEBUG(logger, "Removing empty scope " << it->first);
135  //this->sinks.erase(it);
136  }
137  }
138 }
139 
140 void Bus::handle(EventPtr event) {
141  // RSCDEBUG(logger, "Delivering event " << event);
142 
143  if (singleThreaded) {
144  this->handleNoLock(event);
145  } else {
146  boost::recursive_mutex::scoped_lock lock(this->mutex);
147  this->handleNoLock(event);
148  }
149 
150 }
151 
153 
154  SinkMap::const_iterator it = this->sinks.find(*event->getScopePtr());
155  if (it == this->sinks.end()) {
156  RSCDEBUG(logger,
157  "No entry in sink map for event scope " << *event->getScopePtr());
158 
159  set<boost::weak_ptr<InConnector> > connectors;
160  for (SinkMap::iterator it_ = this->sinks.begin(); it_
161  != this->sinks.end(); ++it_) {
162  RSCDEBUG(
163  logger,
164  "Adding " << it_->second.size() << " connectors from "
165  << it_->first);
166 
167  if (it_->first == *event->getScopePtr() || it_->first.isSuperScopeOf(
168  *event->getScopePtr())) {
169  copy(it_->second.begin(), it_->second.end(),
170  inserter(connectors, connectors.begin()));
171  }
172  }
173  copy(connectors.begin(), connectors.end(),
174  back_inserter(this->sinks[*event->getScopePtr()]));
175 
176  RSCDEBUG(
177  logger,
178  "Created entry in sink map for scope " << *event->getScopePtr()
179  << " with " << connectors.size() << " connectors");
180 
181  it = this->sinks.find(*event->getScopePtr());
182  }
183 
184  const SinkList& connectors = it->second;
185  for (SinkList::const_iterator it__ = connectors.begin(); it__
186  != connectors.end(); ++it__) {
187  InConnectorPtr connector = it__->lock();
188  if (connector) {
189  // RSCDEBUG(logger, "Delivering to " << connector << " in " << *it);
190  connector->handle(event);
191  }
192  }
193 }
194 
195 }
196 }