29 #include <boost/bind.hpp>
31 #include <rsc/misc/langutils.h>
38 using namespace boost;
40 using namespace boost::asio;
43 using namespace rsc::logging;
49 BusConnection::BusConnection(
BusPtr bus,
53 logger(Logger::getLogger(
"rsb.transport.socket.BusConnection")),
54 socket(socket), bus(bus), disconnecting(false), activeShutdown(false) {
59 RSCINFO(
logger,
"Setting TCP_NODELAY option");
60 boost::asio::ip::tcp::no_delay option(
true);
61 socket->set_option(option);
83 RSCINFO(
logger,
"Shutting down");
85 boost::recursive_mutex::scoped_lock lock(this->
mutex);
89 this->
socket->shutdown(ip::tcp::socket::shutdown_send);
95 RSCINFO(
logger,
"Disconnecting");
97 boost::recursive_mutex::scoped_lock lock(this->
mutex);
111 const string& wireSchema) {
120 *static_pointer_cast<string>(event->getData()));
131 boost::recursive_mutex::scoped_lock lock(this->
mutex);
133 RSCDEBUG(this->
logger,
"Ignoring to send a notification "
134 "because we are shutting down.");
151 bus->removeConnection(shared_from_this());
152 }
catch (
const boost::bad_weak_ptr&) {
160 }
catch (
const std::exception& e) {
161 RSCDEBUG(
logger,
"Failed to disconnect (in " << context <<
"): "
171 boost::asio::placeholders::error,
172 boost::asio::placeholders::bytes_transferred));
176 size_t bytesTransferred) {
179 boost::recursive_mutex::scoped_lock lock(this->
mutex);
180 if (error == boost::asio::error::eof) {
181 RSCDEBUG(
logger,
"Received eof");
190 if (error || (bytesTransferred != 4)) {
192 RSCDEBUG(
logger,
"Receive failure (error " << error <<
")"
193 <<
" or incomplete message header (received " << bytesTransferred <<
" bytes)"
194 <<
"; closing connection");
206 RSCDEBUG(
logger,
"Received message header with size " << size);
213 boost::asio::placeholders::error,
214 boost::asio::placeholders::bytes_transferred,
219 size_t bytesTransferred,
221 if (error || (bytesTransferred != expected)) {
223 RSCWARN(
logger,
"Receive failure (error " << error <<
")"
224 <<
" or incomplete message body (received " << bytesTransferred <<
" bytes)"
225 <<
"; closing connection");
240 BusPtr bus = this->bus.lock();
242 bus->handleIncoming(event, shared_from_this());
244 RSCWARN(
logger,
"Dangling bus pointer when trying to dispatch incoming event; closing connection");
255 stream <<
"local = " << this->
socket->local_endpoint()
256 <<
", remote = " << this->
socket->remote_endpoint();
258 stream <<
"<error printing socket info>";
void handleReadBody(const boost::system::error_code &error, size_t bytesTransferred, size_t expected)
EventPtr notificationToEvent(protocol::Notification ¬ification, bool exposeWireSchema)
Converts notification into an Event.
void sendEvent(EventPtr event, const std::string &wireSchema)
volatile bool activeShutdown
void printContents(std::ostream &stream) const
void eventToNotification(protocol::Notification ¬ification, const EventPtr &event, const string &wireSchema, const string &data)
Converts the Event event into a protocol::Notification, storing the result in notification.
void performSafeCleanup(const std::string &context)
void handleReadLength(const boost::system::error_code &error, size_t bytesTransferred)
std::string lengthSendBuffer
rsc::logging::LoggerPtr logger
std::string messageReceiveBuffer
boost::recursive_mutex mutex
std::string messageSendBuffer
volatile bool disconnecting
boost::shared_ptr< boost::asio::ip::tcp::socket > SocketPtr
std::string lengthReceiveBuffer
protocol::Notification notification
boost::shared_ptr< Event > EventPtr
boost::shared_ptr< Bus > BusPtr