31 using namespace rsc::logging;
33 using namespace rsb::transport;
40 logger(Logger::getLogger(
"rsb.transport.inprocess.Bus")), singleThreaded(false) {
44 if (!this->
sinks.empty()) {
47 "" << this->
sinks.size()
48 <<
" non-empty scopes when destructing");
57 boost::recursive_mutex::scoped_lock lock(this->
mutex);
59 RSCDEBUG(
logger,
"Adding sink " << sink);
61 SinkMap::iterator it = this->
sinks.find(sink->getScope());
62 if (it == this->
sinks.end()) {
64 "No entry in sink map for event scope " << sink->getScope());
66 set<boost::weak_ptr<InConnector> > connectors;
67 for (SinkMap::iterator it_ = this->
sinks.begin(); it_
68 != this->
sinks.end(); ++it_) {
71 "Adding " << it_->second.size() <<
" connectors from "
74 if (it_->first == sink->getScope() || it_->first.isSuperScopeOf(
76 copy(it_->second.begin(), it_->second.end(),
77 inserter(connectors, connectors.begin()));
80 copy(connectors.begin(), connectors.end(),
81 back_inserter(this->
sinks[sink->getScope()]));
85 "Created entry in sink map for scope " << sink->getScope()
86 <<
" with " << connectors.size() <<
" connectors");
88 it = this->
sinks.find(sink->getScope());
90 it->second.push_back(sink);
92 for (SinkMap::iterator it = this->
sinks.begin(); it != this->
sinks.end(); ++it) {
93 if (it->first.isSubScopeOf(sink->getScope())) {
95 connectors.push_back(sink);
101 boost::recursive_mutex::scoped_lock lock(this->
mutex);
104 RSCDEBUG(
logger,
"Removing sink " << sink);
106 for (SinkMap::iterator it = this->
sinks.begin(); it != this->
sinks.end(); ++it) {
110 "Scope " << it->first <<
" has " << connectors.size()
113 for (SinkList::iterator it_ = connectors.begin(); it_
114 != connectors.end(); ++it_) {
118 if (!ptr || (ptr.get() == sink)) {
120 "Found connector " << sink <<
" in scope " << it->first);
121 it_ = connectors.erase(it_);
128 "Scope " << it->first <<
" has " << connectors.size()
130 if (connectors.empty()) {
131 RSCDEBUG(
logger,
"Removing empty scope " << it->first);
143 boost::recursive_mutex::scoped_lock lock(this->
mutex);
151 SinkMap::const_iterator it = this->
sinks.find(*event->getScopePtr());
152 if (it == this->
sinks.end()) {
154 "No entry in sink map for event scope " << *event->getScopePtr());
156 set<boost::weak_ptr<InConnector> > connectors;
157 for (SinkMap::iterator it_ = this->
sinks.begin(); it_
158 != this->
sinks.end(); ++it_) {
161 "Adding " << it_->second.size() <<
" connectors from "
164 if (it_->first == *event->getScopePtr() || it_->first.isSuperScopeOf(
165 *event->getScopePtr())) {
166 copy(it_->second.begin(), it_->second.end(),
167 inserter(connectors, connectors.begin()));
170 copy(connectors.begin(), connectors.end(),
171 back_inserter(this->
sinks[*event->getScopePtr()]));
175 "Created entry in sink map for scope " << *event->getScopePtr()
176 <<
" with " << connectors.size() <<
" connectors");
178 it = this->
sinks.find(*event->getScopePtr());
181 const SinkList& connectors = it->second;
182 for (SinkList::const_iterator it__ = connectors.begin(); it__
183 != connectors.end(); ++it__) {
187 connector->handle(event);
boost::recursive_mutex mutex
boost::shared_ptr< InConnector > InConnectorPtr
void removeSink(InConnector *sink)
void handle(EventPtr event)
Handle event.
void addSink(InConnectorPtr sink)
std::list< boost::weak_ptr< InConnector > > SinkList
rsc::logging::LoggerPtr logger
virtual Scope getScope() const
void handleNoLock(EventPtr event)
std::vector< Scope > superScopes(const bool &includeSelf=false) const
Generates all super scopes of this scope including the root scope "/".
boost::shared_ptr< Event > EventPtr