29 #include <boost/bind.hpp>
30 #include <boost/format.hpp>
32 #include <rsc/misc/langutils.h>
39 using namespace boost;
41 using namespace boost::asio;
44 using namespace rsc::logging;
54 return exception.what();
56 return "<failed to determine exception message>";
60 BusConnection::BusConnection(
BusPtr bus,
64 logger(Logger::getLogger(
"rsb.transport.socket.BusConnection")),
65 socket(socket), bus(bus), disconnecting(false), activeShutdown(false) {
70 RSCINFO(
logger,
"Setting TCP_NODELAY option");
71 boost::asio::ip::tcp::no_delay option(
true);
72 socket->set_option(option);
94 RSCINFO(
logger,
"Shutting down");
96 boost::recursive_mutex::scoped_lock lock(this->
mutex);
100 this->
socket->shutdown(ip::tcp::socket::shutdown_send);
106 RSCINFO(
logger,
"Disconnecting");
108 boost::recursive_mutex::scoped_lock lock(this->
mutex);
122 const string& wireSchema) {
131 *static_pointer_cast<string>(event->getData()));
142 boost::recursive_mutex::scoped_lock lock(this->
mutex);
144 RSCDEBUG(this->
logger,
"Ignoring to send a notification "
145 "because we are shutting down.");
162 bus->removeConnection(shared_from_this());
163 }
catch (
const boost::bad_weak_ptr&) {
171 }
catch (
const std::exception& e) {
174 RSCDEBUG(this->
logger,
"Failed to disconnect (in " << context <<
"): "
184 boost::asio::placeholders::error,
185 boost::asio::placeholders::bytes_transferred));
189 size_t bytesTransferred) {
192 boost::recursive_mutex::scoped_lock lock(this->
mutex);
193 if (error == boost::asio::error::eof) {
194 RSCDEBUG(
logger,
"Received eof");
198 }
catch (
const std::exception& e) {
202 RSCWARN(this->
logger,
"Failed to shut down socket: "
211 if (error || (bytesTransferred != 4)) {
213 RSCDEBUG(
logger,
"Receive failure (error " << error <<
")"
214 <<
" or incomplete message header (received " << bytesTransferred <<
" bytes)"
215 <<
"; closing connection");
227 RSCDEBUG(
logger,
"Received message header with size " << size);
234 boost::asio::placeholders::error,
235 boost::asio::placeholders::bytes_transferred,
240 size_t bytesTransferred,
242 if (error || (bytesTransferred != expected)) {
244 RSCWARN(
logger,
"Receive failure (error " << error <<
")"
245 <<
" or incomplete message body (received " << bytesTransferred <<
" bytes)"
246 <<
"; closing connection");
263 bus->handleIncoming(event, shared_from_this());
265 RSCWARN(
logger,
"Dangling bus pointer when trying to dispatch incoming event; closing connection");
276 stream <<
"local = " << this->
socket->local_endpoint()
277 <<
", remote = " << this->
socket->remote_endpoint();
279 stream <<
"<error printing socket info>";
284 return boost::str(boost::format(
"socket://%1%:%2%")
285 % this->
socket->remote_endpoint().address().to_string()
286 % this->
socket->remote_endpoint().port());
void handleReadBody(const boost::system::error_code &error, size_t bytesTransferred, size_t expected)
EventPtr notificationToEvent(protocol::Notification ¬ification, bool exposeWireSchema)
Converts notification into an Event.
void sendEvent(EventPtr event, const std::string &wireSchema)
volatile bool activeShutdown
void printContents(std::ostream &stream) const
void eventToNotification(protocol::Notification ¬ification, const EventPtr &event, const string &wireSchema, const string &data)
Converts the Event event into a protocol::Notification, storing the result in notification.
void performSafeCleanup(const std::string &context)
void handleReadLength(const boost::system::error_code &error, size_t bytesTransferred)
std::string lengthSendBuffer
rsc::logging::LoggerPtr logger
std::string messageReceiveBuffer
std::string safeSocketExceptionString(const std::exception &exception)
boost::recursive_mutex mutex
std::string messageSendBuffer
volatile bool disconnecting
boost::shared_ptr< boost::asio::ip::tcp::socket > SocketPtr
std::string lengthReceiveBuffer
protocol::Notification notification
boost::shared_ptr< Event > EventPtr
virtual const std::string getTransportURL() const
boost::shared_ptr< Bus > BusPtr