Source code for rsbperfmondbadapter.executable

# ============================================================
#
# Copyright (C) 2014 by Johannes Wienke <jwienke at techfak dot uni-bielefeld dot de>
#
# This file may be licensed under the terms of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by:
#   CoR-Lab, Research Institute for Cognition and Robotics
#     Bielefeld University
#
# ============================================================

"""
Provides executable functions.

.. codeauthor:: jwienke
"""

import argparse
import rsb
import rsb.converter
import rst  # pylint: disable=unused-import
import rstsandbox  # pylint: disable=unused-import
from rst.monitoring.ProcessCues_pb2 import ProcessCues  # pylint: disable=no-name-in-module,import-error
from rst.devices.generic.HostInformation_pb2 import HostInformation
import logging
import logging.config
import os
import signal
from rsbperfmondbadapter import GraphiteBackend, InfluxdbBackend, Processor

_BACKENDS = {'graphite': GraphiteBackend,
             'influxdb': InfluxdbBackend}


[docs]def configure_logging(file_or_flag): """ Configure the python :mod:`logging` system. If the provided argument is a `file` instance, try to use the pointed to file as a configuration for the logging system. Otherwise, if the given argument evaluates to :class:True:, use a default configuration with many logging messages. If everything fails, just log starting from the warning level. Args: file_or_flag (file or bool): either a configuration file pointed by a :ref:`file object <python:bltin-file-objects>` instance or something that evaluates to :class:`bool`. """ if isinstance(file_or_flag, file): logging.config.fileConfig(file_or_flag.name) elif file_or_flag: try: import pkg_resources default_config = pkg_resources.resource_filename( __name__, 'logging.cfg') print default_config logging.config.fileConfig(default_config) except: logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger('graphiteadapter.executable') logger.warning( 'Unable to configure logging with the default ' 'configuration file. Falling back to basicConfig.', exc_info=True) else: # at least configure warnings logging.basicConfig(level=logging.WARNING)
[docs]def generate_argument_parser(): """ Generates an :mod:`argparse` parser for the adapter executable. Returns: argparse.ArgumentParser: configured parser instance with all arguments """ def rsb_scope(candidate): """ Check an :mod:`argparse` argument for being a :class:`rsb.Scope`. """ try: return rsb.Scope(candidate) except ValueError, error: raise argparse.ArgumentTypeError( '{0} is not a valid scope. ' 'Reason: {1}'.format(candidate, error)) parser = argparse.ArgumentParser( description='Pushes results from the rsb-performance-monitor to ' 'a graphite installation.', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('-p', '--procscope', type=rsb_scope, metavar='SCOPE', nargs='+', default=[rsb.Scope('/monitoring')], help='Scopes on which process monitoring results ' 'are sent.') parser.add_argument('-n', '--hostscope', type=rsb_scope, metavar='SCOPE', nargs='+', default=[rsb.Scope('/host')], help='Scope on which host information are sent.') parser.add_argument('-b', '--backend', choices=_BACKENDS.keys(), default=_BACKENDS.keys()[0], help='The database backend to use.') parser.add_argument('-a', '--aggregate', action='store_true', help='If set, aggregate subprocess statistics into ' 'the parent process. Else, export separately.') parser.add_argument('-q', '--noqueue', action='store_true', help='Do not queue events internally') # backend options for name, cls in _BACKENDS.items(): group = parser.add_argument_group(name) cls.prepare_arguments(group) parser.add_argument('-l', '--logging', type=argparse.FileType('r'), metavar='FILE', nargs='?', default=False, const=True, help='Configures the python logging system. If used ' 'without an argument, all logging is enabled to ' 'the console. If used with an argument, the ' 'configuration is read from the specified file.') return parser
[docs]def run(): # pylint: disable=too-many-locals,too-many-branches """ Main method for the adapter executable. Includes all the necessary work to run as a command line application. """ args = generate_argument_parser().parse_args() configure_logging(args.logging) logger = logging.getLogger('rsbperfmondbadapter.executable') logger.debug('parsed arguments: %s', args) process_converter = rsb.converter.ProtocolBufferConverter( messageClass=ProcessCues) rsb.converter.registerGlobalConverter(process_converter) host_converter = rsb.converter.ProtocolBufferConverter( messageClass=HostInformation) rsb.converter.registerGlobalConverter(host_converter) # find out which event receiving strategy we want to use listener_kwargs = {} if args.noqueue: listener_kwargs['receivingStrategy'] = \ rsb.eventprocessing.NonQueuingParallelEventReceivingStrategy() def create_backend(name): """ Create an instance of a backend with the given name. """ logger.info('Selected backend class: %s', _BACKENDS[name]) return _BACKENDS[name].create(args) with create_backend(args.backend) as backend: adapter = Processor(backend, do_threads=False, aggregate_subprocesses=args.aggregate) proc_listeners = [] host_listeners = [] try: def handle_proc(event): """ RSB handler method for new process data events. """ if isinstance(event.data, ProcessCues): try: adapter.insert_process(event.data, event.metaData.createTime) except Exception: # pylint: disable=broad-except logger.error('Unable to handle process data', exc_info=True) os.kill(os.getpid(), signal.SIGINT) def handle_host(event): """ RSB handler method for new host data events. """ if isinstance(event.data, HostInformation): try: adapter.insert_host(event.data, event.metaData.createTime) except Exception: # pylint: disable=broad-except logger.error('Unable to handle process data', exc_info=True) os.kill(os.getpid(), signal.SIGINT) # pylint: disable=star-args for proc_scope in args.procscope: listener = rsb.createListener(proc_scope, **listener_kwargs) listener.addHandler(handle_proc) listener.activate() proc_listeners.append(listener) for host_scope in args.hostscope: listener = rsb.createListener(host_scope, **listener_kwargs) listener.addHandler(handle_host) listener.activate() host_listeners.append(listener) logger.info('Everything is started and running') try: signal.pause() except KeyboardInterrupt: pass logger.info('Received signal. Terminating.') finally: for listener in proc_listeners + host_listeners: logger.debug('Deactivating listener %s', listener) try: listener.deactivate() except Exception: # pylint: disable=broad-except logger.warning('Unable to terminate a listener correctly', exec_info=True)
if __name__ == '__main__': run()