29 #include <boost/bind.hpp>
31 #include <rsc/misc/langutils.h>
38 using namespace boost;
40 using namespace boost::asio;
43 using namespace rsc::logging;
53 return exception.what();
55 return "<failed to determine exception message>";
59 BusConnection::BusConnection(
BusPtr bus,
63 logger(Logger::getLogger(
"rsb.transport.socket.BusConnection")),
64 socket(socket), bus(bus), disconnecting(false), activeShutdown(false) {
69 RSCINFO(
logger,
"Setting TCP_NODELAY option");
70 boost::asio::ip::tcp::no_delay option(
true);
71 socket->set_option(option);
93 RSCINFO(
logger,
"Shutting down");
95 boost::recursive_mutex::scoped_lock lock(this->
mutex);
99 this->
socket->shutdown(ip::tcp::socket::shutdown_send);
105 RSCINFO(
logger,
"Disconnecting");
107 boost::recursive_mutex::scoped_lock lock(this->
mutex);
121 const string& wireSchema) {
130 *static_pointer_cast<string>(event->getData()));
141 boost::recursive_mutex::scoped_lock lock(this->
mutex);
143 RSCDEBUG(this->
logger,
"Ignoring to send a notification "
144 "because we are shutting down.");
161 bus->removeConnection(shared_from_this());
162 }
catch (
const boost::bad_weak_ptr&) {
170 }
catch (
const std::exception& e) {
173 RSCDEBUG(this->
logger,
"Failed to disconnect (in " << context <<
"): "
183 boost::asio::placeholders::error,
184 boost::asio::placeholders::bytes_transferred));
188 size_t bytesTransferred) {
191 boost::recursive_mutex::scoped_lock lock(this->
mutex);
192 if (error == boost::asio::error::eof) {
193 RSCDEBUG(
logger,
"Received eof");
197 }
catch (
const std::exception& e) {
201 RSCWARN(this->
logger,
"Failed to shut down socket: "
210 if (error || (bytesTransferred != 4)) {
212 RSCDEBUG(
logger,
"Receive failure (error " << error <<
")"
213 <<
" or incomplete message header (received " << bytesTransferred <<
" bytes)"
214 <<
"; closing connection");
226 RSCDEBUG(
logger,
"Received message header with size " << size);
233 boost::asio::placeholders::error,
234 boost::asio::placeholders::bytes_transferred,
239 size_t bytesTransferred,
241 if (error || (bytesTransferred != expected)) {
243 RSCWARN(
logger,
"Receive failure (error " << error <<
")"
244 <<
" or incomplete message body (received " << bytesTransferred <<
" bytes)"
245 <<
"; closing connection");
262 bus->handleIncoming(event, shared_from_this());
264 RSCWARN(
logger,
"Dangling bus pointer when trying to dispatch incoming event; closing connection");
275 stream <<
"local = " << this->
socket->local_endpoint()
276 <<
", remote = " << this->
socket->remote_endpoint();
278 stream <<
"<error printing socket info>";
void handleReadBody(const boost::system::error_code &error, size_t bytesTransferred, size_t expected)
EventPtr notificationToEvent(protocol::Notification ¬ification, bool exposeWireSchema)
Converts notification into an Event.
void sendEvent(EventPtr event, const std::string &wireSchema)
volatile bool activeShutdown
void printContents(std::ostream &stream) const
void eventToNotification(protocol::Notification ¬ification, const EventPtr &event, const string &wireSchema, const string &data)
Converts the Event event into a protocol::Notification, storing the result in notification.
void performSafeCleanup(const std::string &context)
void handleReadLength(const boost::system::error_code &error, size_t bytesTransferred)
std::string lengthSendBuffer
rsc::logging::LoggerPtr logger
std::string messageReceiveBuffer
std::string safeSocketExceptionString(const std::exception &exception)
boost::recursive_mutex mutex
std::string messageSendBuffer
volatile bool disconnecting
boost::shared_ptr< boost::asio::ip::tcp::socket > SocketPtr
std::string lengthReceiveBuffer
protocol::Notification notification
boost::shared_ptr< Event > EventPtr
boost::shared_ptr< Bus > BusPtr