30 #include <rsc/debug/DebugTools.h>
31 #include <rsc/runtime/ContainerIO.h>
32 #include <rsc/misc/langutils.h>
34 #include "../MetaData.h"
35 #include "../filter/Filter.h"
39 using namespace boost;
41 using namespace rsc::runtime;
42 using namespace rsc::logging;
43 using namespace rsc::debug;
46 namespace eventprocessing {
52 ParallelEventReceivingStrategy::ParallelEventReceivingStrategy(
unsigned int numThreads) :
53 logger(Logger::getLogger(
"rsb.eventprocessing.ParallelEventReceivingStrategy")),
66 return "ParallelEventReceivingStrategy";
82 RSCDEBUG(
logger,
"Matching event " << e <<
" for handler " << handler);
87 if (!handler->acceptsMethod(e->getMethod())) {
92 for (set<filter::FilterPtr>::const_iterator filterIt =
filters.begin(); filterIt
94 if (!(*filterIt)->match(e)) {
101 }
catch (
const std::exception& ex) {
104 s <<
"Exception matching event " << e <<
" for handler " << handler
106 s << ex.what() << endl;
107 s << DebugTools::newInstance()->exceptionInfo(ex);
114 s <<
"Catch-all handler called matching event " << e <<
" for handler "
116 DebugToolsPtr tool = DebugTools::newInstance();
117 vector<string> trace = tool->createBacktrace();
118 s << tool->formatBacktrace(trace);
137 cerr << message << endl;
140 cerr << message << endl;
154 RSCDEBUG(
logger,
"Delivering event " << e <<
" to handler " << handler);
160 }
catch (
const std::exception& ex) {
163 s <<
"Exception dispatching event " << e <<
" to handler " << handler
165 s << ex.what() << endl;
166 s << DebugTools::newInstance()->exceptionInfo(ex);
173 s <<
"Catch-all handler called dispatching event " << e
174 <<
" to handler " << handler << endl;
175 DebugToolsPtr tool = DebugTools::newInstance();
176 vector<string> trace = tool->createBacktrace();
177 s << tool->formatBacktrace(trace);
186 event->mutableMetaData().setDeliverTime(rsc::misc::currentTimeMicros());
193 pool.registerReceiver(handler);
199 pool.unregisterReceiver(handler);
Uses stderr for printing a message.
bool filter(rsb::HandlerPtr handler, EventPtr event)
Implementations of this interface organize the receiving of events via rsb::transport::InConnector s...
boost::recursive_mutex errorStrategyMutex
rsc::threading::OrderedQueueDispatcherPool< EventPtr, rsb::Handler > pool
virtual ~ParallelEventReceivingStrategy()
boost::shared_ptr< Filter > FilterPtr
virtual void addHandler(rsb::HandlerPtr handler, const bool &wait)
Adds a new handler that will be notified about new events.
virtual void addFilter(filter::FilterPtr filter)
void handleDispatchError(const std::string &message)
void setHandlerErrorStrategy(const ParticipantConfig::ErrorStrategy &strategy)
Defines the strategy to use for handling dispatching errors to the client handler.
void printContents(std::ostream &stream) const
std::set< filter::FilterPtr > filters
This push-style event receiving strategy uses one or more threads to filter rsb::Event s and dispatch...
std::string getClassName() const
boost::shared_ptr< Handler > HandlerPtr
ErrorStrategy
Possible error handling strategies in user-provided code like event handlers.
Logs a message using the logging mechanism.
virtual void removeHandler(rsb::HandlerPtr handler, const bool &wait)
Removes a handler that will will then not be notified anymore.
void deliver(rsb::HandlerPtr handler, EventPtr event)
A class describing the configuration of Participant instances.
virtual void removeFilter(filter::FilterPtr filter)
boost::shared_ptr< Event > EventPtr
ParticipantConfig::ErrorStrategy errorStrategy
void handle(EventPtr e)
Dispatches the event to the listener.
boost::shared_mutex filtersMutex
rsc::logging::LoggerPtr logger