# ============================================================
#
# Copyright (C) 2014 by Johannes Wienke <jwienke at techfak dot uni-bielefeld dot de>
#
# This file may be licensed under the terms of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by:
# CoR-Lab, Research Institute for Cognition and Robotics
# Bielefeld University
#
# ============================================================
"""
Provides executable functions.
.. codeauthor:: jwienke
"""
import argparse
import rsb
import rsb.converter
import rst # pylint: disable=unused-import
import rstsandbox # pylint: disable=unused-import
from rst.monitoring.ProcessCues_pb2 import ProcessCues # pylint: disable=no-name-in-module,import-error
from rst.devices.generic.HostInformation_pb2 import HostInformation
import logging
import logging.config
import os
import signal
from rsbperfmondbadapter import GraphiteBackend, InfluxdbBackend, Processor
_BACKENDS = {'graphite': GraphiteBackend,
'influxdb': InfluxdbBackend}
[docs]def generate_argument_parser():
"""
Generates an :mod:`argparse` parser for the adapter executable.
Returns:
argparse.ArgumentParser:
configured parser instance with all arguments
"""
def rsb_scope(candidate):
"""
Check an :mod:`argparse` argument for being a :class:`rsb.Scope`.
"""
try:
return rsb.Scope(candidate)
except ValueError, error:
raise argparse.ArgumentTypeError(
'{0} is not a valid scope. '
'Reason: {1}'.format(candidate, error))
parser = argparse.ArgumentParser(
description='Pushes results from the rsb-performance-monitor to '
'a graphite installation.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-p', '--procscope',
type=rsb_scope,
metavar='SCOPE',
nargs='+',
default=[rsb.Scope('/monitoring')],
help='Scopes on which process monitoring results '
'are sent.')
parser.add_argument('-n', '--hostscope',
type=rsb_scope,
metavar='SCOPE',
nargs='+',
default=[rsb.Scope('/host')],
help='Scope on which host information are sent.')
parser.add_argument('-b', '--backend',
choices=_BACKENDS.keys(),
default=_BACKENDS.keys()[0],
help='The database backend to use.')
parser.add_argument('-a', '--aggregate',
action='store_true',
help='If set, aggregate subprocess statistics into '
'the parent process. Else, export separately.')
parser.add_argument('-q', '--noqueue',
action='store_true',
help='Do not queue events internally')
# backend options
for name, cls in _BACKENDS.items():
group = parser.add_argument_group(name)
cls.prepare_arguments(group)
parser.add_argument('-l', '--logging',
type=argparse.FileType('r'),
metavar='FILE',
nargs='?',
default=False,
const=True,
help='Configures the python logging system. If used '
'without an argument, all logging is enabled to '
'the console. If used with an argument, the '
'configuration is read from the specified file.')
return parser
[docs]def run():
# pylint: disable=too-many-locals,too-many-branches
"""
Main method for the adapter executable.
Includes all the necessary work to run as a command line application.
"""
args = generate_argument_parser().parse_args()
configure_logging(args.logging)
logger = logging.getLogger('rsbperfmondbadapter.executable')
logger.debug('parsed arguments: %s', args)
process_converter = rsb.converter.ProtocolBufferConverter(
messageClass=ProcessCues)
rsb.converter.registerGlobalConverter(process_converter)
host_converter = rsb.converter.ProtocolBufferConverter(
messageClass=HostInformation)
rsb.converter.registerGlobalConverter(host_converter)
# find out which event receiving strategy we want to use
listener_kwargs = {}
if args.noqueue:
listener_kwargs['receivingStrategy'] = \
rsb.eventprocessing.NonQueuingParallelEventReceivingStrategy()
def create_backend(name):
"""
Create an instance of a backend with the given name.
"""
logger.info('Selected backend class: %s', _BACKENDS[name])
return _BACKENDS[name].create(args)
with create_backend(args.backend) as backend:
adapter = Processor(backend,
do_threads=False,
aggregate_subprocesses=args.aggregate)
proc_listeners = []
host_listeners = []
try:
def handle_proc(event):
"""
RSB handler method for new process data events.
"""
if isinstance(event.data, ProcessCues):
try:
adapter.insert_process(event.data,
event.metaData.createTime)
except Exception: # pylint: disable=broad-except
logger.error('Unable to handle process data',
exc_info=True)
os.kill(os.getpid(), signal.SIGINT)
def handle_host(event):
"""
RSB handler method for new host data events.
"""
if isinstance(event.data, HostInformation):
try:
adapter.insert_host(event.data,
event.metaData.createTime)
except Exception: # pylint: disable=broad-except
logger.error('Unable to handle process data',
exc_info=True)
os.kill(os.getpid(), signal.SIGINT)
# pylint: disable=star-args
for proc_scope in args.procscope:
listener = rsb.createListener(proc_scope, **listener_kwargs)
listener.addHandler(handle_proc)
listener.activate()
proc_listeners.append(listener)
for host_scope in args.hostscope:
listener = rsb.createListener(host_scope, **listener_kwargs)
listener.addHandler(handle_host)
listener.activate()
host_listeners.append(listener)
logger.info('Everything is started and running')
try:
signal.pause()
except KeyboardInterrupt:
pass
logger.info('Received signal. Terminating.')
finally:
for listener in proc_listeners + host_listeners:
logger.debug('Deactivating listener %s', listener)
try:
listener.deactivate()
except Exception: # pylint: disable=broad-except
logger.warning('Unable to terminate a listener correctly',
exec_info=True)
if __name__ == '__main__':
run()