31 #include <boost/bind.hpp> 33 #include <boost/thread/thread_time.hpp> 35 #include "../../MetaData.h" 41 using boost::asio::ip::tcp;
52 :
Bus(asioService, tcpnodelay),
BusServer(asioService, tcpnodelay),
53 logger(Logger::getLogger(
"rsb.transport.socket.BusServerImpl")),
54 acceptor(*this->getService()->getService(), tcp::endpoint(tcp::v4(), port)),
55 active(false), shutdown(false) {
60 RSCDEBUG(
logger,
"Destructing");
67 RSCDEBUG(
logger,
"Activating");
74 RSCDEBUG(
logger,
"Deactivating");
88 RSCINFO(
logger,
"Listening on " << this->
acceptor.local_endpoint());
91 boost::asio::placeholders::error));
96 const boost::system::error_code& error) {
98 RSCINFO(
logger,
"Got connection from " << socket->remote_endpoint());
102 connection->startReceiving();
104 RSCWARN(
logger,
"Accept failure, trying to continue");
123 RSCDEBUG(
logger,
"Delivering received event to connections " << event);
128 list<BusConnectionPtr> failing;
129 for (ConnectionList::iterator it = connections.begin();
130 it != connections.end(); ++it) {
131 if (*it != connection) {
132 RSCDEBUG(
logger,
"Delivering to connection " << *it);
134 (*it)->sendEvent(event, event->getMetaData().getUserInfo(
"rsb.wire-schema"));
135 }
catch (
const std::exception& e) {
136 RSCWARN(
logger,
"Send failure (" << e.what() <<
"); will close connection later");
140 failing.push_back(*it);
147 for (list<BusConnectionPtr>::const_iterator it = failing.begin();
148 it != failing.end(); ++it) {
155 return boost::str(boost::format(
"socket://%1%:%2%")
156 % this->
acceptor.local_endpoint().address().to_string()
157 % this->
acceptor.local_endpoint().port());
ConnectionList connections
void handleIncoming(EventPtr event, BusConnectionPtr connection)
boost::shared_ptr< boost::asio::ip::tcp::socket > SocketPtr
ConnectionList getConnections() const
Instances of this class provide access to a socket-based bus.
virtual void removeConnection(BusConnectionPtr connection)
Removes connection from the list of connections of this bus.
virtual void handleIncoming(EventPtr event, BusConnectionPtr connection)
virtual const std::string getTransportURL() const
boost::shared_ptr< AsioServiceContext > AsioServiceContextPtr
boost::shared_ptr< BusConnection > BusConnectionPtr
rsc::logging::LoggerPtr logger
Instances of this class provide access to a socket-based bus for local and remote bus clients...
friend class BusConnection
virtual bool isTcpnodelay() const
virtual void addConnection(BusConnectionPtr connection)
Adds connection to the list of connections of the bus.
void handleAccept(boost::shared_ptr< BusServerImpl > ref, SocketPtr socket, const boost::system::error_code &error)
boost::recursive_mutex & getConnectionLock()
boost::asio::ip::tcp::acceptor acceptor
void activate()
Activate the object.
void acceptOne(boost::shared_ptr< BusServerImpl > ref)
virtual AsioServiceContextPtr getService() const
boost::shared_ptr< Event > EventPtr
std::list< BusConnectionPtr > ConnectionList