29 #include <boost/lexical_cast.hpp>
31 #include <boost/asio/ip/address.hpp>
33 #include <boost/format.hpp>
37 using namespace boost;
39 using namespace boost::asio;
40 using boost::asio::ip::tcp;
42 using namespace rsc::logging;
51 logger(Logger::getLogger(
"rsb.transport.socket.Factory")),
52 keepAlive(new io_service::work(service)),
53 thread(boost::bind(&boost::asio::io_service::run, &service)) {
54 RSCINFO(
logger,
"Started service thread");
65 RSCINFO(
logger,
"Stopping service thread");
68 RSCINFO(
logger,
"Stopped service thread");
75 RSCDEBUG(
logger,
"Was asked for a bus client for " << host <<
":" << port);
82 BusClientMap::const_iterator it;
84 BusPtr result = it->second;
86 result->addConnector(connector);
87 RSCDEBUG(
logger,
"Found existing bus client "
88 << result <<
" without resolving");
92 RSCDEBUG(
logger,
"Did not find bus client without resolving");
100 RSCDEBUG(
logger,
"Resolving endpoint")
101 tcp::resolver resolver(this->
service);
102 tcp::resolver::query query(host, lexical_cast<string>(port),
103 tcp::resolver::query::numeric_service);
104 for (tcp::resolver::iterator endpointIterator = resolver.resolve(query);
105 endpointIterator != tcp::resolver::iterator();
106 ++endpointIterator) {
107 endpoint =
Endpoint(endpointIterator->host_name(), port);
110 BusClientMap::const_iterator it;
112 BusPtr result = it->second;
114 result->addConnector(connector);
115 RSCDEBUG(
logger,
"Found existing bus client "
116 << it->second <<
" after resolving");
123 for (tcp::resolver::iterator endpointIterator = resolver.resolve(query);
124 endpointIterator != tcp::resolver::iterator();
125 ++endpointIterator) {
126 endpoint =
Endpoint(endpointIterator->host_name(), port);
127 RSCDEBUG(
logger,
"Trying endpoint " << endpointIterator->endpoint());
128 socket.reset(
new tcp::socket(this->
service));
129 boost::system::error_code error;
130 socket->connect(endpointIterator->endpoint(), error);
132 RSCDEBUG(
logger,
"Success");
135 RSCDEBUG(
logger,
"Failed: " << error.message());
139 throw runtime_error(str(format(
"Could not connect to any of the endpoints to which %1%:%2% resolved.")
145 RSCDEBUG(
logger,
"Did not find bus client after resolving; creating a new one");
151 result->addConnection(connection);
152 connection->startReceiving();
154 result->addConnector(connector);
156 RSCDEBUG(
logger,
"Created new bus client " << result);
162 RSCDEBUG(
logger,
"Removing client bus " << bus);
164 boost::mutex::scoped_lock lock(this->
busMutex);
166 for (BusClientMap::iterator it = this->
busClients.begin();
168 if (it->second == bus) {
170 RSCDEBUG(
logger,
"Removed");
180 RSCDEBUG(
logger,
"Was asked for a bus server for " << host <<
":" << port);
185 BusServerMap::const_iterator it;
187 RSCDEBUG(
logger,
"Found existing bus server " << it->second);
189 it->second->addConnector(connector);
195 RSCDEBUG(
logger,
"Did not find bus server; creating a new one");
201 result->addConnector(connector);
203 RSCDEBUG(
logger,
"Created new bus client " << result);
209 RSCDEBUG(
logger,
"Removing server bus " << bus);
211 boost::mutex::scoped_lock lock(this->
busMutex);
213 for (BusServerMap::iterator it = this->
busServers.begin();
215 if (it->second == bus) {
216 boost::dynamic_pointer_cast<
BusServer>(bus)->deactivate();
218 RSCDEBUG(
logger,
"Removed");
225 const std::string& host,
226 const boost::uint16_t& port,
230 boost::mutex::scoped_lock lock(this->
busMutex);
232 switch (serverMode) {
240 }
catch (
const std::exception& e) {
242 "Could not create server for bus: " << e.what() <<
"; trying to access bus as client");
247 throw invalid_argument(
"Impossible Server enum value received");
253 if (bus->isTcpnodelay() != tcpnodelay) {
254 throw invalid_argument(str(format(
"Requested tcpnodelay option %1% does not match existing option %2%")
255 % tcpnodelay % bus->isTcpnodelay()));
rsc::logging::LoggerPtr logger
BusServerPtr getBusServerFor(const std::string &host, boost::uint16_t port, bool tcpnodelay, ConnectorBase *connector)
void removeBusClient(BusPtr bus)
boost::shared_ptr< BusConnection > BusConnectionPtr
boost::asio::io_service service
boost::shared_ptr< boost::asio::ip::tcp::socket > SocketPtr
boost::shared_ptr< BusServer > BusServerPtr
Instances of this class provide access to a socket-based bus for local and remote bus clients...
BusPtr getBusClientFor(const std::string &host, boost::uint16_t port, bool tcpnodelay, ConnectorBase *connector)
void removeBusServer(BusPtr bus)
This class is intended to be used as a base class for connector classes of the socket-based transport...
static void checkOptions(BusPtr bus, bool tcpnodelay)
std::pair< std::string, boost::uint16_t > Endpoint
Instances of this class implement connections to a socket-based bus.
BusPtr getBus(const Server &serverMode, const std::string &host, const boost::uint16_t &port, bool tcpnodelay, ConnectorBase *connector)
Returns either a BusClient or Server depending on the chosen serverMode and the existence of a server...
boost::shared_ptr< Bus > BusPtr