RSB  0.7.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ReceiverTask.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is a part of the RSB project
4  *
5  * Copyright (C) 2010 by Sebastian Wrede <swrede at techfak dot uni-bielefeld dot de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "ReceiverTask.h"
28 
29 #include <rsc/misc/langutils.h>
30 #include <rsc/debug/DebugTools.h>
31 
32 #include "../../CommException.h"
33 #include "../../MetaData.h"
34 #include "../../EventId.h"
35 
36 #include "../../converter/Converter.h"
37 
38 #include "SpreadConnection.h"
39 #include "InPushConnector.h"
40 
41 using namespace std;
42 
43 using namespace rsc::logging;
44 
45 using namespace rsb;
46 using namespace rsb::eventprocessing;
47 using namespace rsb::transport;
48 using namespace rsb::protocol;
49 
50 namespace rsb {
51 namespace spread {
52 
53 ReceiverTask::ReceiverTask(SpreadConnectionPtr s, HandlerPtr handler,
54  InPushConnector* connector) :
55  logger(rsc::logging::Logger::getLogger("rsb.spread.ReceiverTask")), con(
56  s), connector(connector), assemblyPool(new AssemblyPool()), handler(
57  handler) {
58 
59  RSCTRACE(logger, "ReceiverTask::ReceiverTask, SpreadConnection: " << con);
60 
61 }
62 
64 }
65 
67  // TODO Do performance optimization for data joining
68  try {
69 
71  con->receive(message);
72  if (!message) {
73  throw CommException(
74  "Receiving a SpreadMessage returned a zero pointer, why?");
75  }
76 
77  RSCDEBUG(logger,
78  "ReceiverTask::execute new SpreadMessage received " << message);
79 
80  if (message->getType() != SpreadMessage::REGULAR) {
81  return;
82  }
83 
84  FragmentedNotificationPtr notification(new FragmentedNotification());
85  if (!notification->ParseFromString(message->getDataAsString())) {
86  throw CommException("Failed to parse notification in pbuf format");
87  }
88 
89  RSCTRACE(logger,
90  "Parsed event seqnum: " << notification->notification().event_id().sequence_number());
91  RSCTRACE(logger,
92  "Binary length: " << notification->notification().data().length());
93  RSCTRACE(logger,
94  "Number of split message parts: " << notification->num_data_parts());
95  RSCTRACE(logger,
96  "... received message part : " << notification->data_part());
97 
98  // Build data from parts
99  NotificationPtr completeNotification =
101  if (completeNotification) {
102  RSCTRACE(logger,
103  "ReceiverTask::execute fragmented notification joined, last message " << message);
104  notifyHandler(completeNotification);
105  }
106 
107  } catch (rsb::CommException& e) {
108  // TODO QoS would not like swallowing the exception
109  // TODO maybe at least use the ErrorHandlingStrategy here?
110  rsc::debug::DebugToolsPtr tools = rsc::debug::DebugTools::newInstance();
111  RSCERROR(logger,
112  "Error receiving spread message: " << e.what() << endl << tools->exceptionInfo(e));
113  } catch (boost::thread_interrupted& e) {
114  return;
115  }
116 
117 }
118 
120  FragmentedNotificationPtr notification) {
121 
122  NotificationPtr completeNotification;
123 
124  bool multiPartNotification = notification->num_data_parts() > 1;
125  if (multiPartNotification) {
126  completeNotification = this->assemblyPool->add(notification);
127  } else {
128  completeNotification.reset(
129  notification->mutable_notification(),
130  rsc::misc::ParentSharedPtrDeleter
131  < rsb::protocol::FragmentedNotification
132  > (notification));
133  }
134  return completeNotification;
135 
136 }
137 
139 
140  EventPtr e(new Event());
141 
142  // TODO fix error handling, see #796
143  try {
145  notification->wire_schema());
146  AnnotatedData deserialized = c->deserialize(
147  notification->wire_schema(), notification->data());
148 
149  fillEvent(e, *notification, deserialized.second, deserialized.first);
150 
151  e->mutableMetaData().setReceiveTime();
152 
153  boost::recursive_mutex::scoped_lock lock(handlerMutex);
154  if (this->handler) {
155  this->handler->handle(e);
156  } else {
157  RSCINFO(logger, "No handler");
158  }
159  } catch (const std::exception& ex) {
160  RSCWARN(logger, "ReceiverTask::notifyHandler catched std exception: " << ex.what() );
161  } catch (...) {
162  RSCWARN(logger, "ReceiverTask::notifyHandler catched unknown exception" );
163  }
164 
165 }
166 
168  boost::recursive_mutex::scoped_lock lock(handlerMutex);
169  this->handler = handler;
170 }
171 
172 void ReceiverTask::setPruning(const bool& pruning) {
173  assemblyPool->setPruning(pruning);
174 }
175 
176 }
177 }