29 #include <boost/bind.hpp>
31 #include <rsc/misc/langutils.h>
38 using namespace boost;
40 using namespace boost::asio;
43 using namespace rsc::logging;
49 BusConnection::BusConnection(
BusPtr bus,
53 logger(Logger::getLogger(
"rsb.transport.socket.BusConnection")),
54 socket(socket), bus(bus), disconnecting(false), activeShutdown(false) {
59 RSCINFO(
logger,
"Setting TCP_NODELAY option");
60 boost::asio::ip::tcp::no_delay option(
true);
61 socket->set_option(option);
83 RSCINFO(
logger,
"Shutting down");
85 boost::recursive_mutex::scoped_lock lock(this->
mutex);
89 this->
socket->shutdown(ip::tcp::socket::shutdown_send);
95 RSCINFO(
logger,
"Disconnecting");
97 boost::recursive_mutex::scoped_lock lock(this->
mutex);
111 const string& wireSchema) {
120 *static_pointer_cast<string>(event->getData()));
131 boost::recursive_mutex::scoped_lock lock(this->
mutex);
133 RSCDEBUG(this->
logger,
"Ignoring to send a notification "
134 "because we are shutting down.");
151 bus->removeConnection(shared_from_this());
152 }
catch (
const boost::bad_weak_ptr&) {
160 }
catch (
const std::exception& e) {
161 RSCDEBUG(
logger,
"Failed to disconnect (in " << context <<
"): "
171 boost::asio::placeholders::error,
172 boost::asio::placeholders::bytes_transferred));
176 size_t bytesTransferred) {
179 boost::recursive_mutex::scoped_lock lock(this->
mutex);
180 if (error == boost::asio::error::eof) {
181 RSCDEBUG(
logger,
"Received eof");
190 if (error || (bytesTransferred != 4)) {
192 RSCDEBUG(
logger,
"Receive failure (error " << error <<
")"
193 <<
" or incomplete message header (received " << bytesTransferred <<
" bytes)"
194 <<
"; closing connection");
206 RSCDEBUG(
logger,
"Received message header with size " << size);
213 boost::asio::placeholders::error,
214 boost::asio::placeholders::bytes_transferred,
219 size_t bytesTransferred,
221 if (error || (bytesTransferred != expected)) {
223 RSCWARN(
logger,
"Receive failure (error " << error <<
")"
224 <<
" or incomplete message body (received " << bytesTransferred <<
" bytes)"
225 <<
"; closing connection");
240 BusPtr bus = this->bus.lock();
242 bus->handleIncoming(event, shared_from_this());
244 RSCWARN(
logger,
"Dangling bus pointer when trying to dispatch incoming event; closing connection");
255 stream <<
"local = " << this->
socket->local_endpoint()
256 <<
", remote = " << this->
socket->remote_endpoint();
258 stream <<
"<error printing socket info>";