Source code for rsbperfmondbadapter

# ============================================================
#
# Copyright (C) 2014 by Johannes Wienke <jwienke at techfak dot uni-bielefeld dot de>
#
# This file may be licensed under the terms of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by:
#   CoR-Lab, Research Institute for Cognition and Robotics
#     Bielefeld University
#
# ============================================================

"""
Module containing the real worker logic.

.. codeauthor:: jwienke
"""

import abc
import argparse
import copy
import logging
import os.path
import pickle
import re
import socket
import string
import struct
import threading
from loggerbyclass import get_logger_by_class

import rst
from rst.devices.generic.NetworkState_pb2 import NetworkState


[docs]def positive_int(candidate): """ Checks a command line argument value for being a positive integer. """ try: num = int(candidate) if num <= 0: raise argparse.ArgumentTypeError('Number must be >= 0.') return num except ValueError, error: raise argparse.ArgumentTypeError( '{0} is not a valid int. Reason: {1}'.format(candidate, error))
# fields that should be ignored IGNORE_CUE_FIELDS = ['source', 'start_time'] # the process fields that require a derivative PROCESS_CUES_TO_DERIVE = ['utime', 'stime', 'wall_time', 'virtual_time', 'etime', 'utime_scaled', 'stime_scaled', 'wall_time_scaled', 'rchar', 'wchar', 'read_bytes', 'write_bytes', 'received_bytes', 'sent_bytes'] # the cpu fields that require a derivative CPU_CUES_TO_DERIVE = ['total', 'idle', 'user', 'user_low', 'system', 'iowait', 'irq', 'softirq', 'steal', 'guest'] # the storage device fields that require a derivative STORAGE_DEVICE_CUES_TO_DERIVE = ['read_count', 'write_count', 'read_bytes', 'write_bytes', 'read_time', 'write_time'] # the nic fields that require a derivative NIC_CUES_TO_DERIVE = ['bytes_received', 'bytes_sent', 'packets_received', 'packets_sent', 'receive_errors', 'send_error', 'receive_drops', 'send_drops']
[docs]class DatabaseBackend(object): """ Interface class for database backends which store processed data somehow. Apart from implementing the abstract methods, also the two static methods :meth:`prepare_arguments` and :meth:`create` need to be implemented. The former will be first called to request command line options to present to the user and the latter one will be called afterwards as a factory method to create an instance of the backend from the parsed options. """ __metaclass__ = abc.ABCMeta @staticmethod
[docs] def prepare_arguments(group): """ Provide command line parsing arguments to the provided argparse group. Always use a consistent naming with a prefix identifying the backend to provide name clashes. """ raise NotImplementedError()
@staticmethod
[docs] def create(args): """ Factory method called to create a new instance of the backend. Args: args (argparse.Namespace): parsed command line arguments Returns: DatabaseBackend: newly created backend according to the user options. """ raise NotImplementedError()
@abc.abstractmethod
[docs] def submit_process_data(self, process_cues, timestamp, do_subprocesses): """ Store the provided data for a process in the database. """ pass
@abc.abstractmethod
[docs] def submit_host_information(self, host_information, timestamp): """ Store the provided host information in the database. """ pass
[docs] def shutdown(self): """ Shutdown the backend so that this instance can be deleted safely. """ pass
def __enter__(self): return self def __exit__(self, exec_type, exec_value, traceback): self.shutdown()
[docs]class TimeseriesDbBackend(DatabaseBackend): # pylint: disable=abstract-method """ Abstract base class for adapters which put data into timeseries databases that are structured by series and associated measurement names. Subclasses need to ensure that the :meth:`__init__` is called correctly and need to override :meth:`_submit`. """ #: A regex to find all characters in keys that somehow create troubles with #: timeseries names, e.g. by being confused with an operator. INVALID_KEY_CHARS = re.compile('[^a-zA-Z0-9_]') #: Characters to use as a substitute for invalid ones in influxdb keys. REPLACE_KEY_CHAR = '_' #: Character to use for separating the provider from the cue. COMPONENT_SEPARATOR = '.' #: Format string to generate the series name for process information PROCESS_SERIES_PATTERN = '{prefix}{separator}{name}' #: Format string to generate the series name for general host information HOST_SERIES_PATTERN = '{prefix}{separator}{name}' #: Format string to generate the series name for a cpu HOST_CPU_SERIES_PATTERN = '{prefix}{separator}{name}' \ '{separator}cpus{separator}cpu{index}' #: Format string to generate the series name for a disk partition HOST_PARTITION_SERIES_PATTERN = '{prefix}{separator}{name}' \ '{separator}disk{separator}partitions{separator}{partition}' #: Format string to generate the series name for a storage device HOST_DISK_SERIES_PATTERN = '{prefix}{separator}{name}' \ '{separator}disk{separator}devices{separator}{device}' #: Format string to generate the series name for a network controller HOST_NIC_SERIES_PATTERN = '{prefix}{separator}{name}' \ '{separator}network{separator}nics{separator}{nic}' #: Format string to generate the series name for a network controller HOST_CONNECTION_SERIES_PATTERN = '{prefix}{separator}{name}' \ '{separator}network{separator}connections' \ '{separator}{protocol}{separator}{family}' #: Format string to generate the series name for the utility tcp series # with all connections summarized. HOST_TCP_SERIES_PATTERN = '{prefix}{separator}{name}' \ '{separator}network{separator}connections{separator}tcp' #: Format string to generate the series name for the utility udp series # with all connections summarized. HOST_UDP_SERIES_PATTERN = '{prefix}{separator}{name}' \ '{separator}network{separator}connections{separator}udp' def __init__(self, process_prefix, host_prefix): self._logger = get_logger_by_class(self.__class__) self._process_prefix = process_prefix self._host_prefix = host_prefix @abc.abstractmethod def _submit(self, measurement, fields, timestamp): """ Submits a new set of data to a database. Args: measurement (str): Name of the measurement as a dotted string fields (dict of str -> number): The actual values to enter into the timeseries database. timestamp (float): The measurement time of the data in seconds + fraction. """ pass @classmethod def _sanitize_series_fragment(cls, raw): """ Sanitizes a string so that it forms a valid fragment of a dotted series name. """ return cls.INVALID_KEY_CHARS.sub(cls.REPLACE_KEY_CHAR, raw)
[docs] def submit_process_data(self, process_cues, timestamp, do_subprocesses): self._logger.debug('Submitting %s with subprocesses: %s', process_cues, do_subprocesses) fields = {'pid': process_cues.pid, 'numsubprocesses': len(process_cues.children_by_pid)} for source, cue in process_cues.cues_by_source.iteritems(): for name, value in cue.iteritems(): # exclude some values where we know that they are useless for # visualization purposes in graphite if name in IGNORE_CUE_FIELDS: continue fields['{provider}{separator}{cue}'.format( provider=self._sanitize_series_fragment(source), separator=self.COMPONENT_SEPARATOR, cue=name)] = value self._submit(self.HOST_SERIES_PATTERN.format( prefix=self._process_prefix, name=self._sanitize_series_fragment( process_cues.name), separator=self.COMPONENT_SEPARATOR), fields, timestamp)
[docs] def submit_host_information(self, host_information, timestamp): self._logger.debug('Processing host information %s', host_information) fields = { 'load_1': host_information.load_1, 'load_5': host_information.load_5, 'load_15': host_information.load_15, 'memory_used': host_information.memory_used, 'memory_total': host_information.memory_total, 'cpu_percent': host_information.cpu_percent_cumulated, 'cpu_percent_normalized': host_information.cpu_percent_normalized, 'cpu_number': host_information.cpu_number } self._add_if_exists( fields, host_information, 'memory_usable') self._add_if_exists( fields, host_information, 'memory_percent') self._add_if_exists( fields, host_information, 'logged_in_users') self._add_if_exists( fields, host_information, 'login_sessions') self._add_if_exists( fields, host_information, 'login_hosts') self._submit(self.HOST_SERIES_PATTERN.format( prefix=self._host_prefix, name=self._sanitize_series_fragment( host_information.hostname), separator=self.COMPONENT_SEPARATOR), fields, timestamp) self._submit_cpu_information(host_information, timestamp) self._submit_partition_information(host_information, timestamp) self._submit_disk_information(host_information, timestamp) self._submit_nic_information(host_information, timestamp) self._submit_connection_information(host_information, timestamp)
def _submit_connection_information(self, host_information, timestamp): """ Submits information about network connections. """ total_tcp = 0 total_udp = 0 connections_seen = False for (protocol, family), connection in \ host_information.network_connections.iteritems(): connections_seen = True fields = {} for key, value in connection.iteritems(): fields['{cue}'.format(cue=key)] = value # generate total count if key != 'num_listen' and protocol == 'tcp': # exclude server sockets total_tcp += value if protocol == 'udp': # exclude server sockets total_udp += value self._submit(self.HOST_CONNECTION_SERIES_PATTERN.format( prefix=self._host_prefix, name=self._sanitize_series_fragment( host_information.hostname), protocol=protocol, family=family, separator=self.COMPONENT_SEPARATOR), fields, timestamp) if connections_seen: self._submit(self.HOST_TCP_SERIES_PATTERN.format( prefix=self._host_prefix, name=self._sanitize_series_fragment( host_information.hostname), separator=self.COMPONENT_SEPARATOR), {'total': total_tcp}, timestamp) self._submit(self.HOST_UDP_SERIES_PATTERN.format( prefix=self._host_prefix, name=self._sanitize_series_fragment( host_information.hostname), separator=self.COMPONENT_SEPARATOR), {'total': total_udp}, timestamp) def _submit_cpu_information(self, host_information, timestamp): """ Submits information about installed CPUs in a host. """ for cpu_index, cpu in \ host_information.cpus_by_index.iteritems(): fields = {} for cue_name, value in cpu.iteritems(): fields['{cue}'.format(cue=cue_name)] = value self._submit(self.HOST_CPU_SERIES_PATTERN.format( prefix=self._host_prefix, name=self._sanitize_series_fragment( host_information.hostname), index=cpu_index, separator=self.COMPONENT_SEPARATOR), fields, timestamp) def _submit_disk_information(self, host_information, timestamp): """ Submits information about storage device to the data base. """ for name, device in \ host_information.storage_devices.iteritems(): fields = {} for cue_name, value in device.iteritems(): fields['{cue}'.format(cue=cue_name)] = value self._submit(self.HOST_DISK_SERIES_PATTERN.format( prefix=self._host_prefix, name=self._sanitize_series_fragment( host_information.hostname), device=name, separator=self.COMPONENT_SEPARATOR), fields, timestamp) def _submit_partition_information(self, host_information, timestamp): """ Submits information about disk partitions to the database. """ def partition_name(mount_point): """Generates a sanitized partition name.""" name = mount_point if mount_point == '/': name = 'ROOT' return self.INVALID_KEY_CHARS.sub(self.REPLACE_KEY_CHAR, name) for mount_point, partition in \ host_information.disk_partitions.iteritems(): fields = {} for cue_name, value in partition.iteritems(): fields['{cue}'.format(cue=cue_name)] = value self._submit(self.HOST_PARTITION_SERIES_PATTERN.format( prefix=self._host_prefix, name=self._sanitize_series_fragment( host_information.hostname), partition=partition_name(mount_point), separator=self.COMPONENT_SEPARATOR), fields, timestamp) def _submit_nic_information(self, host_information, timestamp): """ Submits information about network controllers. """ for name, nic in \ host_information.network_controllers.iteritems(): fields = {} for cue_name, value in nic.iteritems(): fields['{cue}'.format(cue=cue_name)] = value self._submit(self.HOST_NIC_SERIES_PATTERN.format( prefix=self._host_prefix, name=self.INVALID_KEY_CHARS.sub( self.REPLACE_KEY_CHAR, host_information.hostname), nic=self._sanitize_series_fragment(name), separator=self.COMPONENT_SEPARATOR), fields, timestamp) @staticmethod def _add_if_exists(fields, source, field_name, column_name=None): """ Adds data to a series in case it exists in the source. """ if hasattr(source, field_name) and getattr(source, field_name): fields[column_name or field_name] = getattr(source, field_name)
[docs]class InfluxdbBackend(TimeseriesDbBackend): """ A backend for the timeseries database InfluxDB. """ @staticmethod
[docs] def prepare_arguments(group): group.add_argument('--influxdb-host', type=str, metavar='HOST', default='localhost', dest='influxdb_host', help='The host to connect to for sending.') group.add_argument('--influxdb-port', type=positive_int, metavar='PORT', default=8086, dest='influxdb_port', help='The port to use for sending.') group.add_argument('--influxdb-user', type=str, metavar='USERNAME', default='root', dest='influxdb_user', help='Username for connecting to InfluxDB.') group.add_argument('--influxdb-password', type=str, metavar='PASSWORD', default='root', dest='influxdb_password', help='Password for connecting to InfluxDB.') group.add_argument('--influxdb-database', type=str, metavar='DATABASE', default='data', dest='influxdb_database', help='Database to connect to on the InfluxDB ' 'server.') group.add_argument('--influxdb-process-prefix', type=str, metavar='STRING', default='process', dest='influxdb_process_prefix', help='Prefix for process time series names.') group.add_argument('--influxdb-host-prefix', type=str, metavar='STRING', default='host', dest='influxdb_host_prefix', help='Prefix for host time series names.') group.add_argument('--influxdb-version', metavar='SPEC', choices=['0.8', '0.9'], default='0.8', dest='influxdb_version', help='Influxdb version to connect to.')
@staticmethod
[docs] def create(args): if args.influxdb_version == '0.8': import influxdb.influxdb08 as influxdb elif args.influxdb_version == '0.9': import influxdb else: raise RuntimeError( 'Impossible influxdb version ' + args.influxdb_version + ' specified.') client = influxdb.InfluxDBClient(args.influxdb_host, args.influxdb_port, args.influxdb_user, args.influxdb_password, args.influxdb_database, timeout=2) return InfluxdbBackend(client, args.influxdb_process_prefix, args.influxdb_host_prefix, args.influxdb_version)
def __init__(self, client, process_prefix, host_prefix, version): """ Construct a new instance of this backend. """ TimeseriesDbBackend.__init__(self, process_prefix, host_prefix) self._logger = get_logger_by_class(self.__class__) self._version = version if self._version == '0.8': self._submit = self._submit08 elif self._version == '0.9': self._submit = self._submit09 else: raise RuntimeError( 'Impossible influxdb version ' + self._version + ' specified.') self._client = client self._logger.debug('Established connection with client %s', self._client) def _submit(self, measurement, fields, timestamp): # pylint: disable=method-hidden """ Dummy method to prevent abc instantiation errors. """ pass def _submit09(self, measurement, fields, timestamp): """ Submit function for influxdb 0.9. """ data = [ { "measurement": measurement, "timestamp": int(timestamp * 1000.0), "fields": fields } ] self._logger.debug('Submitting data: %s', data) self._client.write_points(data, time_precision='ms') def _submit08(self, measurement, fields, timestamp): """ Submit function for influxdb 0.8. """ columns, point = zip(*sorted(fields.items())) columns = list(columns) point = list(point) columns.append('time') point.append(timestamp * 1000.0) body = [{'points': [point], 'name': measurement, 'columns': columns}] self._logger.debug('Submitting data: %s', body) self._client.write_points(body, time_precision='ms')
[docs]class GraphiteBackend(TimeseriesDbBackend): """ A backend to submit data to a graphite installation. """ @staticmethod
[docs] def prepare_arguments(group): group.add_argument('--graphite-host', type=str, metavar='HOST', default='localhost', dest='graphite_host', help='The host to connect to for sending.') group.add_argument('--graphite-port', type=positive_int, metavar='PORT', default=2004, dest='graphite_port', help='The port to use for sending.') group.add_argument('--graphite-process-prefix', type=str, metavar='STRING', default='process', dest='graphite_process_prefix', help='Prefix for process time series names.') group.add_argument('--graphite-host-prefix', type=str, metavar='STRING', default='host', dest='graphite_host_prefix', help='Prefix for host time series names.')
@staticmethod
[docs] def create(args): return GraphiteBackend(args.graphite_host, args.graphite_port, args.graphite_process_prefix, args.graphite_host_prefix)
def __init__(self, host, port, process_prefix, host_prefix): TimeseriesDbBackend.__init__(self, process_prefix, host_prefix) self._logger = get_logger_by_class(self.__class__) self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._logger.info('Connecting to %s:%s', host, port) self._socket.connect((host, port)) self._socket_lock = threading.RLock()
[docs] def shutdown(self): self._logger.info('Starting shutdown') with self._socket_lock: self._socket.close() self._logger.info('Shutdown finished')
def _submit(self, measurement, fields, timestamp): self._logger.debug('Submitting new data to carbon') data = [] for name, value in fields.iteritems(): data.append(('{measurement}.{name}'.format(measurement=measurement, name=name), (timestamp, float(value)))) self._logger.debug('Submitting cleaned data:\n%s', data) payload = pickle.dumps(data) header = struct.pack("!L", len(payload)) message = header + payload with self._socket_lock: self._socket.send(message)
def _assign_if_available(source, target, field_name): """ Assigns a value from a protobuf to a target object in case the field has a value in the protobuf message. """ if source.HasField(field_name): setattr(target, field_name, getattr(source, field_name))
[docs]class CopyableObject(object): # pylint: disable=too-few-public-methods """ Generic data holder class with deep copy and dynamic assignment support. """ def __init__(self, name=None): self._data = {} self._name = name or self.__class__.__name__ def __getattr__(self, name): try: return self._data[name] except KeyError: raise AttributeError(name) def __setattr__(self, name, value): if name in ['_data', '_name']: object.__setattr__(self, name, value) else: self._data[name] = value
[docs] def clone(self): duplicate = self.__class__(self._name) duplicate._data = copy.deepcopy(self._data) return duplicate
def __str__(self): return '{}{}'.format(self._name, self._data)
[docs]class Processor(object): """ Controller to accept raw data, pre-processes it, and passes it to backends. """ def __init__(self, backend, do_threads=False, aggregate_subprocesses=False): self._logger = get_logger_by_class(self.__class__) self._derive_logger = logging.getLogger(self._logger.name + '.derive') self._backend = backend self._last_process_data = {} self._last_process_data_lock = threading.RLock() self._last_host_data = {} self._last_host_data_lock = threading.RLock() self._do_threads = do_threads self._aggregate_subprocesses = aggregate_subprocesses def _derive_dict(self, previous, now, cues_to_derive, previous_timestamp, now_timestamp): # pylint: disable=too-many-arguments """ Calculates the derivative for a dictionary of cues. Args: previous (dict): State from the previous iteration now (dict): Current state of the cues cues_to_derive (list of str): List of cue names for which to calculate a derivative inside the dictionary Returns: dict: ``now`` dict with desired cues being derived Raises: ZeroDivisionError: Normalization is impossible because data is from the same point in time. """ result = copy.deepcopy(now) for index, data in now.iteritems(): new_data = {} for cue_name in data.keys(): # skip cues that should not be derived if cue_name not in cues_to_derive: continue new_value = data[cue_name] old_value = previous[index][cue_name] # calculate the derivative and normalize to something # per second derivative = float(new_value - old_value) / \ float(now_timestamp - previous_timestamp) self._derive_logger.debug('Derived value for %s: %s', cue_name, derivative) # things should never become negative. This can only be # artifacts from relaunching components or recorded # data etc. derivative = max(0, derivative) new_data[cue_name] = derivative result[index].update(new_data) return result def _aggregate(self, process_cues): """ Aggregates the counters from subprocesses into cumulative counters. """ self._logger.debug('Aggregating subprocess cues') for _, subprocess in process_cues.children_by_pid.iteritems(): for source, cues in subprocess.cues_by_source.iteritems(): parent_cues = process_cues.cues_by_source[source] for cue_name, value in cues.iteritems(): # exclude some values where we know that they are useless # for visualization purposes in graphite if cue_name in IGNORE_CUE_FIELDS: continue parent_cues[cue_name] = parent_cues[cue_name] + value class _LastIterationData(object): # pylint: disable=too-few-public-methods """ Data holder for timestamped previous data. """ def __init__(self, data, timestamp): self.data = data self.timestamp = timestamp def _derive_processes(self, process_cues, timestamp, do_subprocesses): """ Calculates derivatives for applicable cues. Derivative calculation is performed for the process itself and all it subprocesses individually. Cues to derives are taken from :data:`PROCESS_CUES_TO_DERIVE`. Args: process_cues (_ProcessData): the process data including potential subprocesses timestamp (float): timestamp of measurement for the given data in seconds do_subprocesses (bool): if `True`, derive individual subprocesses as well """ self._logger.debug('Calculating derivatives for required cues') with self._last_process_data_lock: # look up previous data previous = None if process_cues.pid in self._last_process_data: previous = self._last_process_data[process_cues.pid] # store current data self._last_process_data[process_cues.pid] = \ self._LastIterationData(process_cues, timestamp) # TODO somehow clean this map. Currently this is ever-growing. # further processing needs to be skipped in case we cannot build # derivatives if not previous: return None # Now we can do the real logic of building derivatives since all # required data is available try: self._derive_logger.debug( 'Calculating derivative between\n%s\nand\n%s', previous.data, process_cues) result = process_cues.clone() result.cues_by_source.update( self._derive_dict(previous.data.cues_by_source, process_cues.cues_by_source, PROCESS_CUES_TO_DERIVE, previous.timestamp, timestamp)) return result except ZeroDivisionError: self._derive_logger.warning('Zero division error when deriving', exc_info=True) # it is not clear why we might receive two events with the same # timestamp. In any case, if this happens, we cannot do anything # since a good normalization is not possible then. The best thing # we can do is to skip these cases return None # calculate the derivative for every subprocess if requested if do_subprocesses: for pid, child in process_cues.children_by_pid.iteritems(): self._derive_logger.debug('Processing child with pid %s', pid) # we only need to derive something in case a process existed in # the iteration before. Otherwise, it is a new process and all # counters can be used directly since they are immediately the # difference of 0 = start and their current value. if pid in previous.data.children_by_pid: self._derive_logger.debug('Child %s exists in previous ' 'iteration, deriving...', pid) previous_child = previous.data.children_by_pid[pid] result.children_by_pid[pid] = derive(previous_child, child, previous.timestamp, timestamp) else: # if subprocesses should not be touched, copy their data so that # later processing stages can still access them for pid, child in process_cues.children_by_pid.iteritems(): result.children_by_pid[pid] = child # TODO threads return result @staticmethod def _extract_process_data(process_cues): """ Recursively converts a process and children to :class:`ProcessData`. """ def extract_process(process_cues): # pylint: disable=attribute-defined-outside-init """ Extracts the data from the protocol buffers definition of a single process without children. """ data = CopyableObject('ProcessData') data.pid = process_cues.pid data.host_name = process_cues.host_name data.name = process_cues.name data.command_line = process_cues.command_line data.children_by_pid = {} data.cues_by_source = {} for cues in process_cues.cues: values_by_name = {} for descriptor, value in cues.ListFields(): if descriptor.name in IGNORE_CUE_FIELDS: continue values_by_name[descriptor.name] = value data.cues_by_source[cues.source] = values_by_name return data top = extract_process(process_cues) for child in process_cues.children: top.children_by_pid[child.pid] = extract_process(child) return top
[docs] def insert_process(self, process_cues, timestamp): """ Processes new measurements for a monitored process. Args: process_cues (rst.monitoring.ProcessCues): the new measurement data as an RST type timestamp (float): the timestamp of the measurements as a unix timestamp in seconds """ self._logger.info('Processing new process results for timestamp %s', timestamp) # start by wrapping the data in an object that accepts any type for any # field so that we can to math in here data = self._extract_process_data(process_cues) # aggregate if requested if self._aggregate_subprocesses: self._aggregate(data) # derive cues and check results data = self._derive_processes(data, timestamp, not self._aggregate_subprocesses) if not data: self._logger.info('Skipping cues for process %s since this ' 'seems to be the first message', process_cues.pid) return # submit data to the backend self._backend.submit_process_data(data, timestamp, not self._aggregate_subprocesses)
def _derive_host(self, host_information, timestamp): """ Calculates derivatives for host-related counters. """ with self._last_host_data_lock: # look up previous data previous = None if host_information.hostname in self._last_host_data: previous = self._last_host_data[host_information.hostname] # store current data self._last_host_data[host_information.hostname] = \ self._LastIterationData(host_information, timestamp) # TODO somehow clean this map. Currently this is ever-growing. # further processing needs to be skipped in case we cannot build # derivatives if not previous: return None # Now we can do the real logic of building derivatives since all # required data is available try: result = host_information.clone() result.cpus_by_index.update( self._derive_dict(previous.data.cpus_by_index, host_information.cpus_by_index, CPU_CUES_TO_DERIVE, previous.timestamp, timestamp)) result.storage_devices.update( self._derive_dict(previous.data.storage_devices, host_information.storage_devices, STORAGE_DEVICE_CUES_TO_DERIVE, previous.timestamp, timestamp)) result.network_controllers.update( self._derive_dict(previous.data.network_controllers, host_information.network_controllers, NIC_CUES_TO_DERIVE, previous.timestamp, timestamp)) return result except ZeroDivisionError: self._derive_logger.debug('Zero division error will be ignored', exc_info=True) # it is not clear why we might receive to events with the same # timestamp. In any case, if this happens, we cannot do anything # since a good normalization is not possible then. The best thing # we can do is to skip these cases return None @classmethod def _calculate_cpu_percent(cls, host_information): """ Convert detailed CPU counters into a percentage value. """ for _, cpu in host_information.cpus_by_index.iteritems(): cpu['percent'] = \ float(cpu['total'] - cpu['idle'] - cpu['iowait']) \ / float(cpu['total']) @classmethod def _summarize_cpu_usage(cls, host_information): """ Summarize cpu percentage values for all CPUs. Generates two data items which contain the accumulated percentages and the number of CPUs. """ host_information.cpu_number = len(host_information.cpus_by_index) sum_percent = 0 for _, cpu in host_information.cpus_by_index.iteritems(): sum_percent = sum_percent + cpu['percent'] host_information.cpu_percent_cumulated = sum_percent host_information.cpu_percent_normalized = \ sum_percent / len(host_information.cpus_by_index) @staticmethod def _extract_host_data(host_data): # pylint: disable=attribute-defined-outside-init, # pylint: disable=too-many-locals # pylint: disable=too-many-branches # pylint: disable=too-many-statements """ Extracts the data from the protocol buffer representation. """ data = CopyableObject('HostData') data.hostname = host_data.hostname # memory data.memory_total = host_data.memory_state.total data.memory_used = host_data.memory_state.used data.memory_usable = host_data.memory_state.usable if data.memory_usable: data.memory_percent = 1.0 - (data.memory_usable / data.memory_total) # load data.load_1 = host_data.cpu_state.load_1 data.load_5 = host_data.cpu_state.load_5 data.load_15 = host_data.cpu_state.load_15 # cpu data.jiffy_length = host_data.cpu_state.jiffy_length data.cpus_by_index = {} for cpu in host_data.cpu_state.cpus: index = cpu.index cpu_data = {} for descriptor, value in cpu.ListFields(): if descriptor.name == 'index': continue cpu_data[descriptor.name] = value data.cpus_by_index[index] = cpu_data # users _assign_if_available( host_data.user_state, data, 'logged_in_users') _assign_if_available( host_data.user_state, data, 'login_sessions') _assign_if_available( host_data.user_state, data, 'login_hosts') # disks data.disk_partitions = {} for partition in host_data.disk_state.partitions: mount_point = partition.mount_point partition_data = {} for descriptor, value in partition.ListFields(): if descriptor.name in ['mount_point', 'device']: continue partition_data[descriptor.name] = value if 'space_used' in partition_data and \ 'space_total' in partition_data: partition_data['usage_percent'] = \ float(partition_data['space_used']) \ / float(partition_data['space_total']) data.disk_partitions[mount_point] = partition_data # storage devices data.storage_devices = {} for device in host_data.disk_state.devices: name = device.name device_data = {} for descriptor, value in device.ListFields(): if descriptor.name in ['name']: continue device_data[descriptor.name] = value data.storage_devices[name] = device_data # network interfaces data.network_controllers = {} for nic in host_data.network_state.nics: name = nic.name nic_data = {} for descriptor, value in nic.ListFields(): if descriptor.name in ['name']: continue nic_data[descriptor.name] = value data.network_controllers[name] = nic_data # network interfaces data.network_connections = {} for connection in host_data.network_state.connections: # pylint: disable=no-member key = ( NetworkState.NetworkConnections.Protocol.Name( connection.protocol).replace('PROTOCOL_', '').lower(), NetworkState.NetworkConnections.AddressFamily.Name( connection.family).replace('FAMILY_', '').lower()) connection_data = {} for field in NetworkState.NetworkConnections.DESCRIPTOR.fields: if field.name in ['protocol', 'family']: continue connection_data[field.name] = getattr(connection, field.name) data.network_connections[key] = connection_data return data
[docs] def insert_host(self, host_information, timestamp): """ Process new host-related measurements. Args: host_information (rst.devices.generic.HostInformation): new measurements timestamp (float): timestamp of the measurements as unix timestamp in seconds """ self._logger.info('Processing new host results') # start by wrapping the data in an object that accepts any type for any # field so that we can to math in here host_information = self._extract_host_data(host_information) # calculate derivative and check for success host_information = self._derive_host(host_information, timestamp) if not host_information: self._logger.info('Skipping host information since no ' 'derived result was calculated.') return # calculate utility counters for individual CPUs try: self._calculate_cpu_percent(host_information) except ZeroDivisionError: self._logger.debug('Not processing host results due to cpu ' 'percent calculation error', exc_info=True) return self._summarize_cpu_usage(host_information) self._backend.submit_host_information(host_information, timestamp)