31 #include <boost/bind.hpp>
33 #include <boost/thread/thread_time.hpp>
35 #include "../../MetaData.h"
40 using namespace boost::asio;
41 using boost::asio::ip::tcp;
43 using namespace rsc::logging;
49 BusServer::BusServer(boost::uint16_t port,
52 :
Bus(service, tcpnodelay),
53 logger(Logger::getLogger(
"rsb.transport.socket.BusServer")),
54 acceptor(service, tcp::endpoint(tcp::v4(), port)),
56 active(false), shutdown(false) {
67 acceptOne(boost::dynamic_pointer_cast<BusServer>(shared_from_this()));
86 RSCINFO(
logger,
"Listening on " << this->
acceptor.local_endpoint());
89 boost::asio::placeholders::error));
94 const boost::system::error_code& error) {
96 RSCINFO(
logger,
"Got connection from " << socket->remote_endpoint());
100 connection->startReceiving();
102 RSCWARN(
logger,
"Accept failure, trying to continue");
121 RSCDEBUG(
logger,
"Delivering received event to connections " << event);
126 list<BusConnectionPtr> failing;
127 for (ConnectionList::iterator it = connections.begin();
128 it != connections.end(); ++it) {
129 if (*it != connection) {
130 RSCDEBUG(
logger,
"Delivering to connection " << *it);
132 (*it)->sendEvent(event, event->getMetaData().getUserInfo(
"rsb.wire-schema"));
133 }
catch (
const std::exception& e) {
134 RSCWARN(
logger,
"Send failure (" << e.what() <<
"); will close connection later");
138 failing.push_back(*it);
145 for (list<BusConnectionPtr>::const_iterator it = failing.begin();
146 it != failing.end(); ++it) {
153 Factory::getInstance().removeBusServer(shared_from_this());
ConnectionList connections
ConnectionList getConnections() const
Instances of this class provide access to a socket-based bus.
void removeConnection(BusConnectionPtr connection)
Removes connection from the list of connections of this bus.
virtual void handleIncoming(EventPtr event, BusConnectionPtr connection)
rsc::logging::LoggerPtr logger
void handleAccept(boost::shared_ptr< BusServer > ref, SocketPtr socket, const boost::system::error_code &error)
boost::shared_ptr< BusConnection > BusConnectionPtr
boost::asio::ip::tcp::acceptor acceptor
boost::shared_ptr< BusServer > BusServerPtr
friend class BusConnection
bool isTcpnodelay() const
void addConnection(BusConnectionPtr connection)
Adds connection to the list of connections of the bus.
boost::shared_ptr< boost::asio::ip::tcp::socket > SocketPtr
void handleIncoming(EventPtr event, BusConnectionPtr connection)
boost::recursive_mutex & getConnectionLock()
void acceptOne(boost::shared_ptr< BusServer > ref)
boost::shared_ptr< Event > EventPtr
boost::asio::io_service & service
std::list< BusConnectionPtr > ConnectionList
void activate()
Activate the object.