30 #include <rsc/debug/DebugTools.h>
31 #include <rsc/runtime/ContainerIO.h>
32 #include <rsc/misc/langutils.h>
34 #include "../MetaData.h"
35 #include "../filter/Filter.h"
39 using namespace rsc::runtime;
40 using namespace rsc::logging;
41 using namespace rsc::debug;
44 namespace eventprocessing {
48 props.getAs<
bool>(
"parallelhandlercalls",
false));
51 ParallelEventReceivingStrategy::ParallelEventReceivingStrategy(
unsigned int numThreads,
52 bool parallelHandlerCalls) :
53 logger(Logger::getLogger(
"rsb.eventprocessing.ParallelEventReceivingStrategy")),
58 pool.setParallelCalls(parallelHandlerCalls);
67 return "ParallelEventReceivingStrategy";
71 boost::shared_lock<boost::shared_mutex> filtersLock(
filtersMutex);
83 RSCDEBUG(
logger,
"Matching event " << e <<
" for handler " << handler);
88 if (!handler->acceptsMethod(e->getMethod())) {
92 boost::shared_lock<boost::shared_mutex> lock(
filtersMutex);
93 for (set<filter::FilterPtr>::const_iterator filterIt =
filters.begin(); filterIt
95 if (!(*filterIt)->match(e)) {
102 }
catch (
const std::exception& ex) {
105 s <<
"Exception matching event " << e <<
" for handler " << handler
107 s << ex.what() << endl;
108 s << DebugTools::newInstance()->exceptionInfo(ex);
115 s <<
"Catch-all handler called matching event " << e <<
" for handler "
117 DebugToolsPtr tool = DebugTools::newInstance();
118 vector<string> trace = tool->createBacktrace();
119 s << tool->formatBacktrace(trace);
138 cerr << message << endl;
141 cerr << message << endl;
155 RSCDEBUG(
logger,
"Delivering event " << e <<
" to handler " << handler);
161 }
catch (
const std::exception& ex) {
164 s <<
"Exception dispatching event " << e <<
" to handler " << handler
166 s << ex.what() << endl;
167 s << DebugTools::newInstance()->exceptionInfo(ex);
174 s <<
"Catch-all handler called dispatching event " << e
175 <<
" to handler " << handler << endl;
176 DebugToolsPtr tool = DebugTools::newInstance();
177 vector<string> trace = tool->createBacktrace();
178 s << tool->formatBacktrace(trace);
187 event->mutableMetaData().setDeliverTime(rsc::misc::currentTimeMicros());
194 pool.registerReceiver(handler);
200 pool.unregisterReceiver(handler);
204 boost::unique_lock<boost::shared_mutex> lock(
filtersMutex);
209 boost::unique_lock<boost::shared_mutex> lock(
filtersMutex);
Uses stderr for printing a message.
bool filter(rsb::HandlerPtr handler, EventPtr event)
Implementations of this interface organize the receiving of events via rsb::transport::InConnector s...
boost::recursive_mutex errorStrategyMutex
rsc::threading::OrderedQueueDispatcherPool< EventPtr, rsb::Handler > pool
virtual ~ParallelEventReceivingStrategy()
boost::shared_ptr< Filter > FilterPtr
virtual void addHandler(rsb::HandlerPtr handler, const bool &wait)
Adds a new handler that will be notified about new events.
virtual void addFilter(filter::FilterPtr filter)
void handleDispatchError(const std::string &message)
void setHandlerErrorStrategy(const ParticipantConfig::ErrorStrategy &strategy)
Defines the strategy to use for handling dispatching errors to the client handler.
void printContents(std::ostream &stream) const
std::set< filter::FilterPtr > filters
This push-style event receiving strategy uses one or more threads to filter rsb::Event s and dispatch...
std::string getClassName() const
boost::shared_ptr< Handler > HandlerPtr
ErrorStrategy
Possible error handling strategies in user-provided code like event handlers.
Logs a message using the logging mechanism.
virtual void removeHandler(rsb::HandlerPtr handler, const bool &wait)
Removes a handler that will will then not be notified anymore.
void deliver(rsb::HandlerPtr handler, EventPtr event)
A class describing the configuration of Participant instances.
virtual void removeFilter(filter::FilterPtr filter)
boost::shared_ptr< Event > EventPtr
ParticipantConfig::ErrorStrategy errorStrategy
void handle(EventPtr e)
Dispatches the event to the listener.
boost::shared_mutex filtersMutex
rsc::logging::LoggerPtr logger