29 #include <rsc/runtime/ContainerIO.h> 44 logger(Logger::getLogger(
"rsb.transport.inprocess.Bus")), singleThreaded(false) {
48 RSCDEBUG(
logger,
"Starting destruction");
49 for (SinkMap::iterator sinkIt = this->
sinks.begin();
50 sinkIt != this->
sinks.end(); ++sinkIt) {
51 if (!sinkIt->second.empty()) {
53 "non-empty scope " << sinkIt->first <<
" when destructing: " << sinkIt->second);
63 boost::recursive_mutex::scoped_lock lock(this->
mutex);
65 RSCDEBUG(
logger,
"Adding sink " << sink);
67 SinkMap::iterator it = this->
sinks.find(sink->getScope());
68 if (it == this->
sinks.end()) {
70 "No entry in sink map for event scope " << sink->getScope());
72 set<boost::weak_ptr<InConnector> > connectors;
73 for (SinkMap::iterator it_ = this->
sinks.begin(); it_
74 != this->
sinks.end(); ++it_) {
77 "Adding " << it_->second.size() <<
" connectors from " 80 if (it_->first == sink->getScope() || it_->first.isSuperScopeOf(
82 copy(it_->second.begin(), it_->second.end(),
83 inserter(connectors, connectors.begin()));
86 copy(connectors.begin(), connectors.end(),
87 back_inserter(this->
sinks[sink->getScope()]));
91 "Created entry in sink map for scope " << sink->getScope()
92 <<
" with " << connectors.size() <<
" connectors");
94 it = this->
sinks.find(sink->getScope());
96 it->second.push_back(sink);
98 for (SinkMap::iterator it = this->
sinks.begin(); it != this->
sinks.end(); ++it) {
99 if (it->first.isSubScopeOf(sink->getScope())) {
101 connectors.push_back(sink);
107 boost::recursive_mutex::scoped_lock lock(this->
mutex);
110 RSCDEBUG(
logger,
"Removing sink " << sink);
112 set<Scope> emptyScopes;
113 for (SinkMap::iterator it = this->
sinks.begin(); it != this->
sinks.end(); ++it) {
117 "Scope " << it->first <<
" has " << connectors.size()
120 for (SinkList::iterator it_ = connectors.begin(); it_
121 != connectors.end(); ++it_) {
125 if (!ptr || (ptr.get() == sink)) {
127 "Found connector " << sink <<
" in scope " << it->first);
128 it_ = connectors.erase(it_);
135 "Scope " << it->first <<
" has " << connectors.size()
137 if (connectors.empty()) {
138 RSCDEBUG(
logger,
"Marking empty scope " << it->first
140 emptyScopes.insert(it->first);
144 for (set<Scope>::iterator scopeIt = emptyScopes.begin();
145 scopeIt != emptyScopes.end(); ++scopeIt) {
146 this->
sinks.erase(*scopeIt);
156 boost::recursive_mutex::scoped_lock lock(this->
mutex);
164 SinkMap::const_iterator it = this->
sinks.find(*event->getScopePtr());
165 if (it == this->
sinks.end()) {
167 "No entry in sink map for event scope " << *event->getScopePtr());
173 set<boost::weak_ptr<InConnector> > connectors;
174 for (SinkMap::iterator it_ = this->
sinks.begin(); it_
175 != this->
sinks.end(); ++it_) {
178 "Adding " << it_->second.size() <<
" connectors from " 181 if (it_->first == *event->getScopePtr() || it_->first.isSuperScopeOf(
182 *event->getScopePtr())) {
183 copy(it_->second.begin(), it_->second.end(),
184 inserter(connectors, connectors.begin()));
187 copy(connectors.begin(), connectors.end(),
188 back_inserter(this->
sinks[*event->getScopePtr()]));
192 "Created entry in sink map for scope " << *event->getScopePtr()
193 <<
" with " << connectors.size() <<
" connectors");
195 it = this->
sinks.find(*event->getScopePtr());
198 const SinkList& connectors = it->second;
199 for (SinkList::const_iterator it__ = connectors.begin(); it__
200 != connectors.end(); ++it__) {
204 connector->handle(event);
210 static boost::mutex
mutex;
212 boost::mutex::scoped_lock lock(mutex);
214 defaultBus.reset(
new Bus);
boost::recursive_mutex mutex
void removeSink(InConnector *sink)
void handle(EventPtr event)
Handle event.
void addSink(InConnectorPtr sink)
std::list< boost::weak_ptr< InConnector > > SinkList
rsc::logging::LoggerPtr logger
virtual Scope getScope() const
boost::shared_ptr< InConnector > InConnectorPtr
boost::shared_ptr< Bus > BusPtr
void handleNoLock(EventPtr event)
std::vector< Scope > superScopes(const bool &includeSelf=false) const
Generates all super scopes of this scope including the root scope "/".
boost::shared_ptr< Event > EventPtr