31 #include <boost/bind.hpp> 33 #include <boost/thread/thread_time.hpp> 35 #include "../../MetaData.h" 41 using boost::asio::ip::tcp;
52 :
BusImpl(asioService, tcpnodelay),
53 logger(Logger::getLogger(
"rsb.transport.socket.BusServerImpl")),
54 acceptor(*this->getService()->getService(), tcp::endpoint(tcp::v4(), port)),
55 active(false), shutdown(false) {
60 RSCDEBUG(
logger,
"Destructing");
67 RSCDEBUG(
logger,
"Activating");
74 RSCDEBUG(
logger,
"Deactivating");
88 RSCINFO(
logger,
"Listening on " << this->
acceptor.local_endpoint());
91 boost::asio::placeholders::error));
96 const boost::system::error_code& error) {
98 RSCINFO(
logger,
"Got connection from " << socket->remote_endpoint());
102 connection->startReceiving();
104 RSCWARN(
logger,
"Accept failure, trying to continue");
123 RSCDEBUG(
logger,
"Delivering received event to connections " << event);
128 list<BusConnectionPtr> failing;
129 for (ConnectionList::iterator it = connections.begin();
130 it != connections.end(); ++it) {
131 if (*it != connection) {
132 RSCDEBUG(
logger,
"Delivering to connection " << *it);
134 (*it)->sendEvent(event, event->getMetaData().getUserInfo(
"rsb.wire-schema"));
135 }
catch (
const std::exception& e) {
136 RSCWARN(
logger,
"Send failure (" << e.what() <<
"); will close connection later");
140 failing.push_back(*it);
147 for (list<BusConnectionPtr>::const_iterator it = failing.begin();
148 it != failing.end(); ++it) {
155 return boost::str(boost::format(
"socket://%1%:%2%")
156 % this->
acceptor.local_endpoint().address().to_string()
157 % this->
acceptor.local_endpoint().port());
virtual void addConnection(BusConnectionPtr connection)
Adds connection to the list of connections of the bus.
void handleIncoming(EventPtr event, BusConnectionPtr connection)
boost::shared_ptr< boost::asio::ip::tcp::socket > SocketPtr
virtual const std::string getTransportURL() const
virtual void removeConnection(BusConnectionPtr connection)
Removes connection from the list of connections of this bus.
boost::shared_ptr< AsioServiceContext > AsioServiceContextPtr
rsc::logging::LoggerPtr logger
virtual void handleIncoming(EventPtr event, BusConnectionPtr connection)
ConnectionList connections
virtual bool isTcpnodelay() const
void handleAccept(boost::shared_ptr< BusServerImpl > ref, SocketPtr socket, const boost::system::error_code &error)
virtual AsioServiceContextPtr getService() const
boost::shared_ptr< BusConnection > BusConnectionPtr
boost::asio::ip::tcp::acceptor acceptor
void activate()
Activate the object.
friend class BusConnection
void acceptOne(boost::shared_ptr< BusServerImpl > ref)
boost::shared_ptr< Event > EventPtr
std::list< BusConnectionPtr > ConnectionList
ConnectionList getConnections() const
boost::recursive_mutex & getConnectionLock()