29 #include <boost/format.hpp> 31 #include <rsc/misc/UUID.h> 33 #include <rsb/protocol/introspection/Hello.pb.h> 34 #include <rsb/protocol/introspection/Bye.pb.h> 36 #include "../EventId.h" 37 #include "../MetaData.h" 38 #include "../Handler.h" 39 #include "../Factory.h" 45 namespace introspection {
55 RSCDEBUG(this->
sender.
logger,
"Processing introspection query " << query);
59 if (query->getType() == rsc::runtime::typeName<void>()) {
60 if (query->getMethod() ==
"SURVEY") {
62 }
else if (query->getMethod() ==
"REQUEST") {
65 RSCWARN(this->
sender.
logger,
"Introspection query not understood: " << query);
67 }
else if ((query->getType() == rsc::runtime::typeName<std::string>())
68 && (*boost::static_pointer_cast<std::string>(query->getData())
75 for (IntrospectionSender::ParticipantList::const_iterator it
85 = (query->getScope().getComponents()
86 [query->getScope().getComponents().size() - 1]);
87 rsc::misc::UUID id(idString);
88 for (IntrospectionSender::ParticipantList::const_iterator it
91 if (it->getId() == id) {
99 for (IntrospectionSender::ParticipantList::const_iterator it
109 pongEvent->setScope(this->
sender.
informer->getScope()->concat(participant.
getId().getIdAsString()));
110 pongEvent->setType(rsc::runtime::typeName<std::string>());
111 pongEvent->setData(boost::shared_ptr<std::string>(
new std::string(
"pong")));
124 request->getType()));
125 reply->mutableMetaData().setUserTime(
"request.send",
126 request->getMetaData().getSendTime());
127 reply->mutableMetaData().setUserTime(
"request.receive",
128 request->getMetaData().getReceiveTime());
136 : logger(rsc::logging::Logger::getLogger(
"rsb.introspection.IntrospectionSender")),
137 processDisplayName(processDisplayName),
141 "", informerConfig)),
144 % host.getId() % process.getPid())),
145 listenerConfig, informerConfig)) {
152 RSCDEBUG(this->
logger, boost::format(
"Adding participant %1%") % participant);
154 boost::mutex::scoped_lock lock(this->
mutex);
157 participant->getId(),
158 (parent ? parent->getId() : rsc::misc::UUID(
false)),
159 *participant->getScope(),
161 participant->getTransportURLs());
168 RSCDEBUG(this->
logger, boost::format(
"Removing participant %1%") % participant);
170 boost::mutex::scoped_lock lock(this->
mutex);
172 ParticipantList::iterator it;
174 if (it->getId() == participant.
getId()) {
179 RSCWARN(this->
logger, boost::format(
"Trying to remove unknown participant %1%")
187 RSCDEBUG(this->
logger, boost::format(
"%1% participant(s) remain(s)")
194 = boost::posix_time::ptime(boost::gregorian::date(1970, boost::date_time::Jan, 1));
198 boost::shared_ptr<rsb::protocol::introspection::Hello> hello(
199 new rsb::protocol::introspection::Hello());
202 hello->set_id(participant.
getId().getId().data,
203 participant.
getId().getId().size());
204 if (participant.
getParentId() != rsc::misc::UUID(
false)) {
205 hello->set_parent(participant.
getParentId().getId().data,
208 hello->set_kind(participant.
getKind());
210 for (std::set<std::string>::const_iterator it
213 hello->add_transport(*it);
217 rsb::protocol::operatingsystem::Process*
process 218 = hello->mutable_process();
219 process->set_id(boost::lexical_cast<std::string>(this->process.getPid()));
220 process->set_program_name(this->process.getProgramName());
221 std::vector<std::string> arguments = this->process.
getArguments();
222 for (std::vector<std::string>::const_iterator it = arguments.begin();
223 it != arguments.end(); ++it) {
224 process->add_commandline_arguments(*it);
226 process->set_start_time((this->process.getStartTime() -
UNIX_EPOCH)
227 .total_microseconds());
228 process->set_rsb_version(this->process.getRSBVersion());
229 if (!this->process.getExecutingUser().empty()) {
230 process->set_executing_user(this->process.getExecutingUser());
237 rsb::protocol::operatingsystem::Host*
host = hello->mutable_host();
238 if (!this->host.getId().empty()) {
239 host->set_id(this->host.getId());
241 host->set_hostname(this->host.getHostname());
242 if (!this->host.getMachineType().empty()) {
243 host->set_machine_type(this->host.getMachineType());
245 if (!this->host.getMachineVersion().empty()) {
246 host->set_machine_version(this->host.getMachineVersion());
248 if (!this->host.getSoftwareType().empty()) {
249 host->set_software_type(this->host.getSoftwareType());
251 if (!this->host.getSoftwareVersion().empty()) {
252 host->set_software_version(this->host.getSoftwareVersion());
257 helloEvent->setScope(this->
informer->getScope()->concat(boost::str(boost::format(
"/%1%")
258 % participant.
getId().getIdAsString())));
259 helloEvent->setData(hello);
260 helloEvent->setType(rsc::runtime::typeName(*hello.get()));
262 helloEvent->addCause(query->getId());
265 this->
informer->publish(helloEvent);
269 boost::shared_ptr<rsb::protocol::introspection::Bye> bye(
270 new rsb::protocol::introspection::Bye());
271 bye->set_id(participant.
getId().getId().data,
272 participant.
getId().getId().size());
274 byeEvent->setScope(this->
informer->getScope()
275 ->concat(boost::str(boost::format(
"/%1%")
276 % participant.
getId().getIdAsString())));
277 byeEvent->setData(bye);
278 byeEvent->setType(rsc::runtime::typeName(*bye.get()));
Instances of this class store information about participants in the current process.
patterns::LocalServerPtr createLocalServer(const Scope &scope)
Creates and returns a Server object that exposes methods under the Scope scope.
const Scope INTROSPECTION_PARTICIPANTS_SCOPE
const std::string & getKind() const
EventPtr call(const std::string &, EventPtr request)
friend struct QueryHandler
Basic message that is exchanged between informers and listeners.
Objects of this class participate in the exchange of notifications on one channel of the bus...
boost::shared_ptr< std::string > processDisplayName
Instances of this class publish information about the local host, the current process and its partici...
Asynchronously called handler interface on the client level.
void sendHello(const ParticipantInfo &participant, EventPtr query=EventPtr())
const std::string & toString() const
Reconstructs a fully formal string representation of the scope with leading an trailing slashes...
IntrospectionSender & sender
void handle(EventPtr query)
Handle event.
const std::set< std::string > & getTransportURLs() const
Callback class with receives and returns events.
const rsc::misc::UUID & getParentId() const
Factory & getFactory()
Returns a factory for client-level RSB objects.
void addParticipant(ParticipantPtr participant, ParticipantPtr parent)
const boost::posix_time::ptime UNIX_EPOCH
const std::vector< std::string > & getArguments() const
void sendPong(const ParticipantInfo &participant, EventPtr)
boost::shared_ptr< Participant > ParticipantPtr
const Scope & getScope() const
boost::shared_ptr< IntlCallback > CallbackPtr
ListenerPtr createListener(const Scope &scope, const ParticipantConfig &config)
Creates and returns a new Listener for the Scope scope.
boost::shared_ptr< Handler > HandlerPtr
rsb::patterns::LocalServerPtr server
bool removeParticipant(const Participant &participant)
void handleSurvey(EventPtr query)
ParticipantList participants
A class describing the configuration of Participant instances.
QueryHandler(IntrospectionSender &sender)
IntrospectionSender(boost::shared_ptr< std::string > processDisplayName, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig())
rsc::misc::UUID getId() const
Returns the unique id of the participant.
boost::shared_ptr< Event > EventPtr
const Scope INTROSPECTION_HOSTS_SCOPE
rsc::logging::LoggerPtr logger
void handleRequest(EventPtr query)
void sendBye(const ParticipantInfo &participant)
void handlePing(EventPtr query)
const rsc::misc::UUID & getId() const