29 #include <boost/thread/shared_mutex.hpp>
30 #include <boost/thread/locks.hpp>
32 #include <rsc/debug/DebugTools.h>
33 #include <rsc/runtime/ContainerIO.h>
34 #include <rsc/misc/langutils.h>
36 #include "../MetaData.h"
37 #include "../filter/Filter.h"
41 using namespace boost;
43 using namespace rsc::runtime;
44 using namespace rsc::logging;
47 namespace eventprocessing {
53 DirectEventReceivingStrategy::DirectEventReceivingStrategy(
bool singleThreaded) :
54 logger(Logger::getLogger(
"rsb.eventprocessing.DirectEventReceivingStrategy")),
56 singleThreaded(singleThreaded) {
63 boost::shared_lock<boost::shared_mutex> filtersLock(this->
filtersMutex);
65 stream <<
"filters = " << this->
filters
82 boost::shared_lock<boost::shared_mutex> lock(
filtersMutex);
86 }
catch (
const std::exception& ex) {
89 s <<
"Exception matching event " << e <<
":" << endl;
90 s << ex.what() << endl;
91 s << rsc::debug::DebugTools::newInstance()->exceptionInfo(ex);
98 s <<
"Catch-all handler called matching event " << endl;
99 rsc::debug::DebugToolsPtr tool = rsc::debug::DebugTools::newInstance();
100 vector<string> trace = tool->createBacktrace();
101 s << tool->formatBacktrace(trace);
112 for (set<filter::FilterPtr>::const_iterator filterIt =
114 if (!(*filterIt)->match(e)) {
126 RSCERROR(
logger, message);
129 cerr << message << endl;
132 cerr << message << endl;
137 RSCERROR(
logger, message);
144 RSCDEBUG(
logger,
"Delivering event " << e <<
" to handler " << handler);
150 }
catch (
const std::exception& ex) {
153 s <<
"Exception dispatching event " << e <<
" to handler " << handler
155 s << ex.what() << endl;
156 s << rsc::debug::DebugTools::newInstance()->exceptionInfo(ex);
163 s <<
"Catch-all handler called dispatching event " << e
164 <<
" to handler " << handler << endl;
165 rsc::debug::DebugToolsPtr tool = rsc::debug::DebugTools::newInstance();
166 vector<string> trace = tool->createBacktrace();
167 s << tool->formatBacktrace(trace);
179 boost::shared_lock<boost::shared_mutex> lock(this->
handlerMutex);
186 event->mutableMetaData().setDeliverTime(rsc::misc::currentTimeMicros());
189 for (HandlerList::const_iterator it = this->
handlers.begin(); it
197 boost::shared_lock<boost::shared_mutex> lock(this->
handlerMutex);
204 boost::shared_lock<boost::shared_mutex> lock(this->
handlerMutex);
210 boost::unique_lock<boost::shared_mutex> lock(
filtersMutex);
216 boost::unique_lock<boost::shared_mutex> lock(
filtersMutex);
Uses stderr for printing a message.
virtual ~DirectEventReceivingStrategy()
Implementations of this interface organize the receiving of events via rsb::transport::InConnector s...
bool filter(EventPtr event)
void handle(EventPtr e)
Dispatches the event to the listener.
boost::shared_ptr< Filter > FilterPtr
This push-style event receiving strategy filters and dispatches rsb::Event s in the context of the th...
rsc::logging::LoggerPtr logger
void deliver(rsb::HandlerPtr handler, EventPtr event)
boost::shared_mutex filtersMutex
virtual void addHandler(rsb::HandlerPtr handler, const bool &wait)
Adds a new handler that will be notified about new events.
virtual void removeHandler(rsb::HandlerPtr handler, const bool &wait)
Removes a handler that will will then not be notified anymore.
ParticipantConfig::ErrorStrategy errorStrategy
void setHandlerErrorStrategy(const ParticipantConfig::ErrorStrategy &strategy)
Defines the strategy to use for handling dispatching errors to the client handler.
boost::shared_ptr< Handler > HandlerPtr
std::set< filter::FilterPtr > filters
ErrorStrategy
Possible error handling strategies in user-provided code like event handlers.
Logs a message using the logging mechanism.
void handleDispatchError(const std::string &message)
virtual void removeFilter(filter::FilterPtr filter)
A class describing the configuration of Participant instances.
virtual void addFilter(filter::FilterPtr filter)
void printContents(std::ostream &stream) const
bool filterNoLock(EventPtr e)
boost::shared_ptr< Event > EventPtr
boost::shared_mutex handlerMutex
void handleNoLock(EventPtr e)
boost::shared_mutex errorStrategyMutex