29 #include <boost/format.hpp>
31 #include <rsc/misc/UUID.h>
33 #include <rsb/protocol/introspection/Hello.pb.h>
34 #include <rsb/protocol/introspection/Bye.pb.h>
36 #include "../EventId.h"
37 #include "../MetaData.h"
38 #include "../Handler.h"
39 #include "../Factory.h"
45 namespace introspection {
55 RSCDEBUG(this->
sender.
logger,
"Processing introspection query " << query);
59 if (query->getType() == rsc::runtime::typeName<void>()) {
60 if (query->getMethod() ==
"SURVEY") {
62 }
else if (query->getMethod() ==
"REQUEST") {
65 RSCWARN(this->
sender.
logger,
"Introspection query not understood: " << query);
67 }
else if ((query->getType() == rsc::runtime::typeName<std::string>())
68 && (*boost::static_pointer_cast<std::string>(query->getData())
75 for (IntrospectionSender::ParticipantList::const_iterator it
85 = (query->getScope().getComponents()
86 [query->getScope().getComponents().size() - 1]);
87 rsc::misc::UUID id(idString);
88 for (IntrospectionSender::ParticipantList::const_iterator it
91 if (it->getId() == id) {
99 for (IntrospectionSender::ParticipantList::const_iterator it
109 pongEvent->setScope(this->
sender.
informer->getScope()->concat(participant.
getId().getIdAsString()));
110 pongEvent->setType(rsc::runtime::typeName<std::string>());
111 pongEvent->setData(boost::shared_ptr<std::string>(
new std::string(
"pong")));
124 request->getType()));
125 reply->mutableMetaData().setUserTime(
"request.send",
126 request->getMetaData().getSendTime());
127 reply->mutableMetaData().setUserTime(
"request.receive",
128 request->getMetaData().getReceiveTime());
136 : logger(rsc::logging::Logger::getLogger(
"rsb.introspection.IntrospectionSender")),
137 processDisplayName(processDisplayName),
141 "", informerConfig)),
143 .concat(boost::str(boost::format(
"/%1%/%2%")
144 % host.getId() % process.getPid())),
145 listenerConfig, informerConfig)) {
152 RSCDEBUG(this->
logger, boost::format(
"Adding participant %1%") % participant);
154 boost::mutex::scoped_lock lock(this->
mutex);
157 participant->getId(),
158 (parent ? parent->getId() : rsc::misc::UUID(
false)),
159 *participant->getScope(),
167 RSCDEBUG(this->
logger, boost::format(
"Removing participant %1%") % participant);
169 boost::mutex::scoped_lock lock(this->
mutex);
171 ParticipantList::iterator it;
173 if (it->getId() == participant.
getId()) {
178 RSCWARN(this->
logger, boost::format(
"Trying to remove unknown participant %1%")
186 RSCDEBUG(this->
logger, boost::format(
"%1% participant(s) remain(s)")
193 = boost::posix_time::ptime(boost::gregorian::date(1970, boost::date_time::Jan, 1));
197 boost::shared_ptr<rsb::protocol::introspection::Hello> hello(
198 new rsb::protocol::introspection::Hello());
201 hello->set_id(participant.
getId().getId().data,
202 participant.
getId().getId().size());
203 if (participant.
getParentId() != rsc::misc::UUID(
false)) {
204 hello->set_parent(participant.
getParentId().getId().data,
207 hello->set_kind(participant.
getKind());
211 rsb::protocol::operatingsystem::Process*
process
212 = hello->mutable_process();
213 process->set_id(boost::lexical_cast<std::string>(this->process.getPid()));
214 process->set_program_name(this->process.getProgramName());
215 std::vector<std::string> arguments = this->process.
getArguments();
216 for (std::vector<std::string>::const_iterator it = arguments.begin();
217 it != arguments.end(); ++it) {
218 process->add_commandline_arguments(*it);
220 process->set_start_time((this->process.getStartTime() -
UNIX_EPOCH)
221 .total_microseconds());
222 process->set_rsb_version(this->process.getRSBVersion());
223 if (!this->process.getExecutingUser().empty()) {
224 process->set_executing_user(this->process.getExecutingUser());
231 rsb::protocol::operatingsystem::Host*
host = hello->mutable_host();
232 if (!this->host.getId().empty()) {
233 host->set_id(this->host.getId());
235 host->set_hostname(this->host.getHostname());
236 if (!this->host.getMachineType().empty()) {
237 host->set_machine_type(this->host.getMachineType());
239 if (!this->host.getMachineVersion().empty()) {
240 host->set_machine_version(this->host.getMachineVersion());
242 if (!this->host.getSoftwareType().empty()) {
243 host->set_software_type(this->host.getSoftwareType());
245 if (!this->host.getSoftwareVersion().empty()) {
246 host->set_software_version(this->host.getSoftwareVersion());
251 helloEvent->setScope(this->
informer->getScope()->concat(boost::str(boost::format(
"/%1%")
252 % participant.
getId().getIdAsString())));
253 helloEvent->setData(hello);
254 helloEvent->setType(rsc::runtime::typeName(*hello.get()));
256 helloEvent->addCause(query->getEventId());
259 this->
informer->publish(helloEvent);
263 boost::shared_ptr<rsb::protocol::introspection::Bye> bye(
264 new rsb::protocol::introspection::Bye());
265 bye->set_id(participant.
getId().getId().data,
266 participant.
getId().getId().size());
268 byeEvent->setScope(this->
informer->getScope()
269 ->concat(boost::str(boost::format(
"/%1%")
270 % participant.
getId().getIdAsString())));
271 byeEvent->setData(bye);
272 byeEvent->setType(rsc::runtime::typeName(*bye.get()));
Instances of this class store information about participants in the current process.
patterns::LocalServerPtr createLocalServer(const Scope &scope)
Creates and returns a Server object that exposes methods under the Scope scope.
const Scope INTROSPECTION_PARTICIPANTS_SCOPE
const std::string & getKind() const
EventPtr call(const std::string &, EventPtr request)
friend struct QueryHandler
Basic message that is exchanged between informers and listeners.
Objects of this class participate in the exchange of notifications on one channel of the bus...
boost::shared_ptr< std::string > processDisplayName
Instances of this class publish information about the local host, the current process and its partici...
Asynchronously called handler interface on the client level.
void sendHello(const ParticipantInfo &participant, EventPtr query=EventPtr())
const std::string & toString() const
Reconstructs a fully formal string representation of the scope with leading an trailing slashes...
IntrospectionSender & sender
void handle(EventPtr query)
Handle event.
Callback class with receives and returns events.
const rsc::misc::UUID & getParentId() const
Factory & getFactory()
Returns a factory for client-level RSB objects.
void addParticipant(ParticipantPtr participant, ParticipantPtr parent)
const boost::posix_time::ptime UNIX_EPOCH
const std::vector< std::string > & getArguments() const
void sendPong(const ParticipantInfo &participant, EventPtr)
boost::shared_ptr< Participant > ParticipantPtr
const Scope & getScope() const
boost::shared_ptr< IntlCallback > CallbackPtr
ListenerPtr createListener(const Scope &scope, const ParticipantConfig &config)
Creates and returns a new Listener for the Scope scope.
boost::shared_ptr< Handler > HandlerPtr
rsb::patterns::LocalServerPtr server
bool removeParticipant(const Participant &participant)
void handleSurvey(EventPtr query)
ParticipantList participants
A class describing the configuration of Participant instances.
QueryHandler(IntrospectionSender &sender)
IntrospectionSender(boost::shared_ptr< std::string > processDisplayName, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig())
rsc::misc::UUID getId() const
Returns the unique id of the participant.
boost::shared_ptr< Event > EventPtr
const Scope INTROSPECTION_HOSTS_SCOPE
rsc::logging::LoggerPtr logger
void handleRequest(EventPtr query)
void sendBye(const ParticipantInfo &participant)
void handlePing(EventPtr query)
const rsc::misc::UUID & getId() const