# ============================================================
#
# Copyright (C) 2014 by Johannes Wienke <jwienke at techfak dot uni-bielefeld dot de>
#
# This file may be licensed under the terms of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by:
# CoR-Lab, Research Institute for Cognition and Robotics
# Bielefeld University
#
# ============================================================
"""
Module containing the real worker logic.
.. codeauthor:: jwienke
"""
import abc
import argparse
import copy
import logging
import os.path
import pickle
import re
import socket
import string
import struct
import threading
from loggerbyclass import get_logger_by_class
import rst
from rst.devices.generic.NetworkState_pb2 import NetworkState
[docs]def positive_int(candidate):
"""
Checks a command line argument value for being a positive integer.
"""
try:
num = int(candidate)
if num <= 0:
raise argparse.ArgumentTypeError('Number must be >= 0.')
return num
except ValueError, error:
raise argparse.ArgumentTypeError(
'{0} is not a valid int. Reason: {1}'.format(candidate, error))
# fields that should be ignored
IGNORE_CUE_FIELDS = ['source', 'start_time']
# the process fields that require a derivative
PROCESS_CUES_TO_DERIVE = ['utime', 'stime', 'wall_time', 'virtual_time',
'etime', 'utime_scaled', 'stime_scaled',
'wall_time_scaled',
'rchar', 'wchar',
'read_bytes', 'write_bytes',
'received_bytes', 'sent_bytes']
# the cpu fields that require a derivative
CPU_CUES_TO_DERIVE = ['total', 'idle', 'user', 'user_low', 'system', 'iowait',
'irq', 'softirq', 'steal', 'guest']
# the storage device fields that require a derivative
STORAGE_DEVICE_CUES_TO_DERIVE = ['read_count', 'write_count',
'read_bytes', 'write_bytes',
'read_time', 'write_time']
# the nic fields that require a derivative
NIC_CUES_TO_DERIVE = ['bytes_received', 'bytes_sent',
'packets_received', 'packets_sent',
'receive_errors', 'send_error',
'receive_drops', 'send_drops']
[docs]class DatabaseBackend(object):
"""
Interface class for database backends which store processed data somehow.
Apart from implementing the abstract methods, also the two static methods
:meth:`prepare_arguments` and :meth:`create` need to be implemented.
The former will be first called to request command line options to present
to the user and the latter one will be called afterwards as a factory
method to create an instance of the backend from the parsed options.
"""
__metaclass__ = abc.ABCMeta
@staticmethod
[docs] def prepare_arguments(group):
"""
Provide command line parsing arguments to the provided argparse group.
Always use a consistent naming with a prefix identifying the backend to
provide name clashes.
"""
raise NotImplementedError()
@staticmethod
[docs] def create(args):
"""
Factory method called to create a new instance of the backend.
Args:
args (argparse.Namespace):
parsed command line arguments
Returns:
DatabaseBackend:
newly created backend according to the user options.
"""
raise NotImplementedError()
@abc.abstractmethod
[docs] def submit_process_data(self, process_cues, timestamp, do_subprocesses):
"""
Store the provided data for a process in the database.
"""
pass
@abc.abstractmethod
[docs] def shutdown(self):
"""
Shutdown the backend so that this instance can be deleted safely.
"""
pass
def __enter__(self):
return self
def __exit__(self, exec_type, exec_value, traceback):
self.shutdown()
[docs]class TimeseriesDbBackend(DatabaseBackend):
# pylint: disable=abstract-method
"""
Abstract base class for adapters which put data into timeseries databases
that are structured by series and associated measurement names.
Subclasses need to ensure that the :meth:`__init__` is called correctly
and need to override :meth:`_submit`.
"""
#: A regex to find all characters in keys that somehow create troubles with
#: timeseries names, e.g. by being confused with an operator.
INVALID_KEY_CHARS = re.compile('[^a-zA-Z0-9_]')
#: Characters to use as a substitute for invalid ones in influxdb keys.
REPLACE_KEY_CHAR = '_'
#: Character to use for separating the provider from the cue.
COMPONENT_SEPARATOR = '.'
#: Format string to generate the series name for process information
PROCESS_SERIES_PATTERN = '{prefix}{separator}{name}'
#: Format string to generate the series name for general host information
HOST_SERIES_PATTERN = '{prefix}{separator}{name}'
#: Format string to generate the series name for a cpu
HOST_CPU_SERIES_PATTERN = '{prefix}{separator}{name}' \
'{separator}cpus{separator}cpu{index}'
#: Format string to generate the series name for a disk partition
HOST_PARTITION_SERIES_PATTERN = '{prefix}{separator}{name}' \
'{separator}disk{separator}partitions{separator}{partition}'
#: Format string to generate the series name for a storage device
HOST_DISK_SERIES_PATTERN = '{prefix}{separator}{name}' \
'{separator}disk{separator}devices{separator}{device}'
#: Format string to generate the series name for a network controller
HOST_NIC_SERIES_PATTERN = '{prefix}{separator}{name}' \
'{separator}network{separator}nics{separator}{nic}'
#: Format string to generate the series name for a network controller
HOST_CONNECTION_SERIES_PATTERN = '{prefix}{separator}{name}' \
'{separator}network{separator}connections' \
'{separator}{protocol}{separator}{family}'
#: Format string to generate the series name for the utility tcp series
# with all connections summarized.
HOST_TCP_SERIES_PATTERN = '{prefix}{separator}{name}' \
'{separator}network{separator}connections{separator}tcp'
#: Format string to generate the series name for the utility udp series
# with all connections summarized.
HOST_UDP_SERIES_PATTERN = '{prefix}{separator}{name}' \
'{separator}network{separator}connections{separator}udp'
def __init__(self, process_prefix, host_prefix):
self._logger = get_logger_by_class(self.__class__)
self._process_prefix = process_prefix
self._host_prefix = host_prefix
@abc.abstractmethod
def _submit(self, measurement, fields, timestamp):
"""
Submits a new set of data to a database.
Args:
measurement (str):
Name of the measurement as a dotted string
fields (dict of str -> number):
The actual values to enter into the timeseries database.
timestamp (float):
The measurement time of the data in seconds + fraction.
"""
pass
@classmethod
def _sanitize_series_fragment(cls, raw):
"""
Sanitizes a string so that it forms a valid fragment of a dotted series
name.
"""
return cls.INVALID_KEY_CHARS.sub(cls.REPLACE_KEY_CHAR, raw)
[docs] def submit_process_data(self, process_cues, timestamp, do_subprocesses):
self._logger.debug('Submitting %s with subprocesses: %s',
process_cues, do_subprocesses)
fields = {'pid': process_cues.pid,
'numsubprocesses': len(process_cues.children_by_pid)}
for source, cue in process_cues.cues_by_source.iteritems():
for name, value in cue.iteritems():
# exclude some values where we know that they are useless for
# visualization purposes in graphite
if name in IGNORE_CUE_FIELDS:
continue
fields['{provider}{separator}{cue}'.format(
provider=self._sanitize_series_fragment(source),
separator=self.COMPONENT_SEPARATOR,
cue=name)] = value
self._submit(self.HOST_SERIES_PATTERN.format(
prefix=self._process_prefix,
name=self._sanitize_series_fragment(
process_cues.name),
separator=self.COMPONENT_SEPARATOR),
fields,
timestamp)
def _submit_connection_information(self, host_information, timestamp):
"""
Submits information about network connections.
"""
total_tcp = 0
total_udp = 0
connections_seen = False
for (protocol, family), connection in \
host_information.network_connections.iteritems():
connections_seen = True
fields = {}
for key, value in connection.iteritems():
fields['{cue}'.format(cue=key)] = value
# generate total count
if key != 'num_listen' and protocol == 'tcp':
# exclude server sockets
total_tcp += value
if protocol == 'udp':
# exclude server sockets
total_udp += value
self._submit(self.HOST_CONNECTION_SERIES_PATTERN.format(
prefix=self._host_prefix,
name=self._sanitize_series_fragment(
host_information.hostname),
protocol=protocol,
family=family,
separator=self.COMPONENT_SEPARATOR),
fields,
timestamp)
if connections_seen:
self._submit(self.HOST_TCP_SERIES_PATTERN.format(
prefix=self._host_prefix,
name=self._sanitize_series_fragment(
host_information.hostname),
separator=self.COMPONENT_SEPARATOR),
{'total': total_tcp},
timestamp)
self._submit(self.HOST_UDP_SERIES_PATTERN.format(
prefix=self._host_prefix,
name=self._sanitize_series_fragment(
host_information.hostname),
separator=self.COMPONENT_SEPARATOR),
{'total': total_udp},
timestamp)
def _submit_cpu_information(self, host_information, timestamp):
"""
Submits information about installed CPUs in a host.
"""
for cpu_index, cpu in \
host_information.cpus_by_index.iteritems():
fields = {}
for cue_name, value in cpu.iteritems():
fields['{cue}'.format(cue=cue_name)] = value
self._submit(self.HOST_CPU_SERIES_PATTERN.format(
prefix=self._host_prefix,
name=self._sanitize_series_fragment(
host_information.hostname),
index=cpu_index,
separator=self.COMPONENT_SEPARATOR),
fields,
timestamp)
def _submit_disk_information(self, host_information, timestamp):
"""
Submits information about storage device to the data base.
"""
for name, device in \
host_information.storage_devices.iteritems():
fields = {}
for cue_name, value in device.iteritems():
fields['{cue}'.format(cue=cue_name)] = value
self._submit(self.HOST_DISK_SERIES_PATTERN.format(
prefix=self._host_prefix,
name=self._sanitize_series_fragment(
host_information.hostname),
device=name,
separator=self.COMPONENT_SEPARATOR),
fields,
timestamp)
def _submit_partition_information(self, host_information, timestamp):
"""
Submits information about disk partitions to the database.
"""
def partition_name(mount_point):
"""Generates a sanitized partition name."""
name = mount_point
if mount_point == '/':
name = 'ROOT'
return self.INVALID_KEY_CHARS.sub(self.REPLACE_KEY_CHAR, name)
for mount_point, partition in \
host_information.disk_partitions.iteritems():
fields = {}
for cue_name, value in partition.iteritems():
fields['{cue}'.format(cue=cue_name)] = value
self._submit(self.HOST_PARTITION_SERIES_PATTERN.format(
prefix=self._host_prefix,
name=self._sanitize_series_fragment(
host_information.hostname),
partition=partition_name(mount_point),
separator=self.COMPONENT_SEPARATOR),
fields,
timestamp)
def _submit_nic_information(self, host_information, timestamp):
"""
Submits information about network controllers.
"""
for name, nic in \
host_information.network_controllers.iteritems():
fields = {}
for cue_name, value in nic.iteritems():
fields['{cue}'.format(cue=cue_name)] = value
self._submit(self.HOST_NIC_SERIES_PATTERN.format(
prefix=self._host_prefix,
name=self.INVALID_KEY_CHARS.sub(
self.REPLACE_KEY_CHAR,
host_information.hostname),
nic=self._sanitize_series_fragment(name),
separator=self.COMPONENT_SEPARATOR),
fields,
timestamp)
@staticmethod
def _add_if_exists(fields, source, field_name,
column_name=None):
"""
Adds data to a series in case it exists in the source.
"""
if hasattr(source, field_name) and getattr(source, field_name):
fields[column_name or field_name] = getattr(source, field_name)
[docs]class InfluxdbBackend(TimeseriesDbBackend):
"""
A backend for the timeseries database InfluxDB.
"""
@staticmethod
[docs] def prepare_arguments(group):
group.add_argument('--influxdb-host',
type=str,
metavar='HOST',
default='localhost',
dest='influxdb_host',
help='The host to connect to for sending.')
group.add_argument('--influxdb-port',
type=positive_int,
metavar='PORT',
default=8086,
dest='influxdb_port',
help='The port to use for sending.')
group.add_argument('--influxdb-user',
type=str,
metavar='USERNAME',
default='root',
dest='influxdb_user',
help='Username for connecting to InfluxDB.')
group.add_argument('--influxdb-password',
type=str,
metavar='PASSWORD',
default='root',
dest='influxdb_password',
help='Password for connecting to InfluxDB.')
group.add_argument('--influxdb-database',
type=str,
metavar='DATABASE',
default='data',
dest='influxdb_database',
help='Database to connect to on the InfluxDB '
'server.')
group.add_argument('--influxdb-process-prefix',
type=str,
metavar='STRING',
default='process',
dest='influxdb_process_prefix',
help='Prefix for process time series names.')
group.add_argument('--influxdb-host-prefix',
type=str,
metavar='STRING',
default='host',
dest='influxdb_host_prefix',
help='Prefix for host time series names.')
group.add_argument('--influxdb-version',
metavar='SPEC',
choices=['0.8', '0.9'],
default='0.8',
dest='influxdb_version',
help='Influxdb version to connect to.')
@staticmethod
[docs] def create(args):
if args.influxdb_version == '0.8':
import influxdb.influxdb08 as influxdb
elif args.influxdb_version == '0.9':
import influxdb
else:
raise RuntimeError(
'Impossible influxdb version '
+ args.influxdb_version + ' specified.')
client = influxdb.InfluxDBClient(args.influxdb_host,
args.influxdb_port,
args.influxdb_user,
args.influxdb_password,
args.influxdb_database,
timeout=2)
return InfluxdbBackend(client,
args.influxdb_process_prefix,
args.influxdb_host_prefix,
args.influxdb_version)
def __init__(self, client, process_prefix, host_prefix, version):
"""
Construct a new instance of this backend.
"""
TimeseriesDbBackend.__init__(self, process_prefix, host_prefix)
self._logger = get_logger_by_class(self.__class__)
self._version = version
if self._version == '0.8':
self._submit = self._submit08
elif self._version == '0.9':
self._submit = self._submit09
else:
raise RuntimeError(
'Impossible influxdb version '
+ self._version + ' specified.')
self._client = client
self._logger.debug('Established connection with client %s',
self._client)
def _submit(self, measurement, fields, timestamp):
# pylint: disable=method-hidden
"""
Dummy method to prevent abc instantiation errors.
"""
pass
def _submit09(self, measurement, fields, timestamp):
"""
Submit function for influxdb 0.9.
"""
data = [
{
"measurement": measurement,
"timestamp": int(timestamp * 1000.0),
"fields": fields
}
]
self._logger.debug('Submitting data: %s', data)
self._client.write_points(data, time_precision='ms')
def _submit08(self, measurement, fields, timestamp):
"""
Submit function for influxdb 0.8.
"""
columns, point = zip(*sorted(fields.items()))
columns = list(columns)
point = list(point)
columns.append('time')
point.append(timestamp * 1000.0)
body = [{'points': [point],
'name': measurement,
'columns': columns}]
self._logger.debug('Submitting data: %s', body)
self._client.write_points(body, time_precision='ms')
[docs]class GraphiteBackend(TimeseriesDbBackend):
"""
A backend to submit data to a graphite installation.
"""
@staticmethod
[docs] def prepare_arguments(group):
group.add_argument('--graphite-host',
type=str,
metavar='HOST',
default='localhost',
dest='graphite_host',
help='The host to connect to for sending.')
group.add_argument('--graphite-port',
type=positive_int,
metavar='PORT',
default=2004,
dest='graphite_port',
help='The port to use for sending.')
group.add_argument('--graphite-process-prefix',
type=str,
metavar='STRING',
default='process',
dest='graphite_process_prefix',
help='Prefix for process time series names.')
group.add_argument('--graphite-host-prefix',
type=str,
metavar='STRING',
default='host',
dest='graphite_host_prefix',
help='Prefix for host time series names.')
@staticmethod
[docs] def create(args):
return GraphiteBackend(args.graphite_host,
args.graphite_port,
args.graphite_process_prefix,
args.graphite_host_prefix)
def __init__(self, host, port, process_prefix, host_prefix):
TimeseriesDbBackend.__init__(self, process_prefix, host_prefix)
self._logger = get_logger_by_class(self.__class__)
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._logger.info('Connecting to %s:%s', host, port)
self._socket.connect((host, port))
self._socket_lock = threading.RLock()
[docs] def shutdown(self):
self._logger.info('Starting shutdown')
with self._socket_lock:
self._socket.close()
self._logger.info('Shutdown finished')
def _submit(self, measurement, fields, timestamp):
self._logger.debug('Submitting new data to carbon')
data = []
for name, value in fields.iteritems():
data.append(('{measurement}.{name}'.format(measurement=measurement,
name=name),
(timestamp, float(value))))
self._logger.debug('Submitting cleaned data:\n%s', data)
payload = pickle.dumps(data)
header = struct.pack("!L", len(payload))
message = header + payload
with self._socket_lock:
self._socket.send(message)
def _assign_if_available(source, target, field_name):
"""
Assigns a value from a protobuf to a target object in case the field has a
value in the protobuf message.
"""
if source.HasField(field_name):
setattr(target, field_name, getattr(source, field_name))
[docs]class CopyableObject(object):
# pylint: disable=too-few-public-methods
"""
Generic data holder class with deep copy and dynamic assignment support.
"""
def __init__(self, name=None):
self._data = {}
self._name = name or self.__class__.__name__
def __getattr__(self, name):
try:
return self._data[name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, value):
if name in ['_data', '_name']:
object.__setattr__(self, name, value)
else:
self._data[name] = value
[docs] def clone(self):
duplicate = self.__class__(self._name)
duplicate._data = copy.deepcopy(self._data)
return duplicate
def __str__(self):
return '{}{}'.format(self._name, self._data)
[docs]class Processor(object):
"""
Controller to accept raw data, pre-processes it, and passes it to backends.
"""
def __init__(self, backend, do_threads=False,
aggregate_subprocesses=False):
self._logger = get_logger_by_class(self.__class__)
self._derive_logger = logging.getLogger(self._logger.name + '.derive')
self._backend = backend
self._last_process_data = {}
self._last_process_data_lock = threading.RLock()
self._last_host_data = {}
self._last_host_data_lock = threading.RLock()
self._do_threads = do_threads
self._aggregate_subprocesses = aggregate_subprocesses
def _derive_dict(self, previous, now, cues_to_derive,
previous_timestamp, now_timestamp):
# pylint: disable=too-many-arguments
"""
Calculates the derivative for a dictionary of cues.
Args:
previous (dict):
State from the previous iteration
now (dict):
Current state of the cues
cues_to_derive (list of str):
List of cue names for which to calculate a derivative
inside the dictionary
Returns:
dict:
``now`` dict with desired cues being derived
Raises:
ZeroDivisionError:
Normalization is impossible because data is from the
same point in time.
"""
result = copy.deepcopy(now)
for index, data in now.iteritems():
new_data = {}
for cue_name in data.keys():
# skip cues that should not be derived
if cue_name not in cues_to_derive:
continue
new_value = data[cue_name]
old_value = previous[index][cue_name]
# calculate the derivative and normalize to something
# per second
derivative = float(new_value - old_value) / \
float(now_timestamp - previous_timestamp)
self._derive_logger.debug('Derived value for %s: %s',
cue_name,
derivative)
# things should never become negative. This can only be
# artifacts from relaunching components or recorded
# data etc.
derivative = max(0, derivative)
new_data[cue_name] = derivative
result[index].update(new_data)
return result
def _aggregate(self, process_cues):
"""
Aggregates the counters from subprocesses into cumulative counters.
"""
self._logger.debug('Aggregating subprocess cues')
for _, subprocess in process_cues.children_by_pid.iteritems():
for source, cues in subprocess.cues_by_source.iteritems():
parent_cues = process_cues.cues_by_source[source]
for cue_name, value in cues.iteritems():
# exclude some values where we know that they are useless
# for visualization purposes in graphite
if cue_name in IGNORE_CUE_FIELDS:
continue
parent_cues[cue_name] = parent_cues[cue_name] + value
class _LastIterationData(object):
# pylint: disable=too-few-public-methods
"""
Data holder for timestamped previous data.
"""
def __init__(self, data, timestamp):
self.data = data
self.timestamp = timestamp
def _derive_processes(self, process_cues, timestamp, do_subprocesses):
"""
Calculates derivatives for applicable cues.
Derivative calculation is performed for the process itself and all it
subprocesses individually. Cues to derives are taken from
:data:`PROCESS_CUES_TO_DERIVE`.
Args:
process_cues (_ProcessData):
the process data including potential subprocesses
timestamp (float):
timestamp of measurement for the given data in seconds
do_subprocesses (bool):
if `True`, derive individual subprocesses as well
"""
self._logger.debug('Calculating derivatives for required cues')
with self._last_process_data_lock:
# look up previous data
previous = None
if process_cues.pid in self._last_process_data:
previous = self._last_process_data[process_cues.pid]
# store current data
self._last_process_data[process_cues.pid] = \
self._LastIterationData(process_cues, timestamp)
# TODO somehow clean this map. Currently this is ever-growing.
# further processing needs to be skipped in case we cannot build
# derivatives
if not previous:
return None
# Now we can do the real logic of building derivatives since all
# required data is available
try:
self._derive_logger.debug(
'Calculating derivative between\n%s\nand\n%s', previous.data,
process_cues)
result = process_cues.clone()
result.cues_by_source.update(
self._derive_dict(previous.data.cues_by_source,
process_cues.cues_by_source,
PROCESS_CUES_TO_DERIVE,
previous.timestamp,
timestamp))
return result
except ZeroDivisionError:
self._derive_logger.warning('Zero division error when deriving',
exc_info=True)
# it is not clear why we might receive two events with the same
# timestamp. In any case, if this happens, we cannot do anything
# since a good normalization is not possible then. The best thing
# we can do is to skip these cases
return None
# calculate the derivative for every subprocess if requested
if do_subprocesses:
for pid, child in process_cues.children_by_pid.iteritems():
self._derive_logger.debug('Processing child with pid %s', pid)
# we only need to derive something in case a process existed in
# the iteration before. Otherwise, it is a new process and all
# counters can be used directly since they are immediately the
# difference of 0 = start and their current value.
if pid in previous.data.children_by_pid:
self._derive_logger.debug('Child %s exists in previous '
'iteration, deriving...', pid)
previous_child = previous.data.children_by_pid[pid]
result.children_by_pid[pid] = derive(previous_child,
child,
previous.timestamp,
timestamp)
else:
# if subprocesses should not be touched, copy their data so that
# later processing stages can still access them
for pid, child in process_cues.children_by_pid.iteritems():
result.children_by_pid[pid] = child
# TODO threads
return result
@staticmethod
def _extract_process_data(process_cues):
"""
Recursively converts a process and children to :class:`ProcessData`.
"""
def extract_process(process_cues):
# pylint: disable=attribute-defined-outside-init
"""
Extracts the data from the protocol buffers definition of a single
process without children.
"""
data = CopyableObject('ProcessData')
data.pid = process_cues.pid
data.host_name = process_cues.host_name
data.name = process_cues.name
data.command_line = process_cues.command_line
data.children_by_pid = {}
data.cues_by_source = {}
for cues in process_cues.cues:
values_by_name = {}
for descriptor, value in cues.ListFields():
if descriptor.name in IGNORE_CUE_FIELDS:
continue
values_by_name[descriptor.name] = value
data.cues_by_source[cues.source] = values_by_name
return data
top = extract_process(process_cues)
for child in process_cues.children:
top.children_by_pid[child.pid] = extract_process(child)
return top
[docs] def insert_process(self, process_cues, timestamp):
"""
Processes new measurements for a monitored process.
Args:
process_cues (rst.monitoring.ProcessCues):
the new measurement data as an RST type
timestamp (float):
the timestamp of the measurements as a unix timestamp in
seconds
"""
self._logger.info('Processing new process results for timestamp %s',
timestamp)
# start by wrapping the data in an object that accepts any type for any
# field so that we can to math in here
data = self._extract_process_data(process_cues)
# aggregate if requested
if self._aggregate_subprocesses:
self._aggregate(data)
# derive cues and check results
data = self._derive_processes(data, timestamp,
not self._aggregate_subprocesses)
if not data:
self._logger.info('Skipping cues for process %s since this '
'seems to be the first message',
process_cues.pid)
return
# submit data to the backend
self._backend.submit_process_data(data, timestamp,
not self._aggregate_subprocesses)
def _derive_host(self, host_information, timestamp):
"""
Calculates derivatives for host-related counters.
"""
with self._last_host_data_lock:
# look up previous data
previous = None
if host_information.hostname in self._last_host_data:
previous = self._last_host_data[host_information.hostname]
# store current data
self._last_host_data[host_information.hostname] = \
self._LastIterationData(host_information, timestamp)
# TODO somehow clean this map. Currently this is ever-growing.
# further processing needs to be skipped in case we cannot build
# derivatives
if not previous:
return None
# Now we can do the real logic of building derivatives since all
# required data is available
try:
result = host_information.clone()
result.cpus_by_index.update(
self._derive_dict(previous.data.cpus_by_index,
host_information.cpus_by_index,
CPU_CUES_TO_DERIVE,
previous.timestamp,
timestamp))
result.storage_devices.update(
self._derive_dict(previous.data.storage_devices,
host_information.storage_devices,
STORAGE_DEVICE_CUES_TO_DERIVE,
previous.timestamp,
timestamp))
result.network_controllers.update(
self._derive_dict(previous.data.network_controllers,
host_information.network_controllers,
NIC_CUES_TO_DERIVE,
previous.timestamp,
timestamp))
return result
except ZeroDivisionError:
self._derive_logger.debug('Zero division error will be ignored',
exc_info=True)
# it is not clear why we might receive to events with the same
# timestamp. In any case, if this happens, we cannot do anything
# since a good normalization is not possible then. The best thing
# we can do is to skip these cases
return None
@classmethod
def _calculate_cpu_percent(cls, host_information):
"""
Convert detailed CPU counters into a percentage value.
"""
for _, cpu in host_information.cpus_by_index.iteritems():
cpu['percent'] = \
float(cpu['total'] - cpu['idle'] - cpu['iowait']) \
/ float(cpu['total'])
@classmethod
def _summarize_cpu_usage(cls, host_information):
"""
Summarize cpu percentage values for all CPUs.
Generates two data items which contain the accumulated percentages and
the number of CPUs.
"""
host_information.cpu_number = len(host_information.cpus_by_index)
sum_percent = 0
for _, cpu in host_information.cpus_by_index.iteritems():
sum_percent = sum_percent + cpu['percent']
host_information.cpu_percent_cumulated = sum_percent
host_information.cpu_percent_normalized = \
sum_percent / len(host_information.cpus_by_index)
@staticmethod
def _extract_host_data(host_data):
# pylint: disable=attribute-defined-outside-init,
# pylint: disable=too-many-locals
# pylint: disable=too-many-branches
# pylint: disable=too-many-statements
"""
Extracts the data from the protocol buffer representation.
"""
data = CopyableObject('HostData')
data.hostname = host_data.hostname
# memory
data.memory_total = host_data.memory_state.total
data.memory_used = host_data.memory_state.used
data.memory_usable = host_data.memory_state.usable
if data.memory_usable:
data.memory_percent = 1.0 - (data.memory_usable
/ data.memory_total)
# load
data.load_1 = host_data.cpu_state.load_1
data.load_5 = host_data.cpu_state.load_5
data.load_15 = host_data.cpu_state.load_15
# cpu
data.jiffy_length = host_data.cpu_state.jiffy_length
data.cpus_by_index = {}
for cpu in host_data.cpu_state.cpus:
index = cpu.index
cpu_data = {}
for descriptor, value in cpu.ListFields():
if descriptor.name == 'index':
continue
cpu_data[descriptor.name] = value
data.cpus_by_index[index] = cpu_data
# users
_assign_if_available(
host_data.user_state, data, 'logged_in_users')
_assign_if_available(
host_data.user_state, data, 'login_sessions')
_assign_if_available(
host_data.user_state, data, 'login_hosts')
# disks
data.disk_partitions = {}
for partition in host_data.disk_state.partitions:
mount_point = partition.mount_point
partition_data = {}
for descriptor, value in partition.ListFields():
if descriptor.name in ['mount_point', 'device']:
continue
partition_data[descriptor.name] = value
if 'space_used' in partition_data and \
'space_total' in partition_data:
partition_data['usage_percent'] = \
float(partition_data['space_used']) \
/ float(partition_data['space_total'])
data.disk_partitions[mount_point] = partition_data
# storage devices
data.storage_devices = {}
for device in host_data.disk_state.devices:
name = device.name
device_data = {}
for descriptor, value in device.ListFields():
if descriptor.name in ['name']:
continue
device_data[descriptor.name] = value
data.storage_devices[name] = device_data
# network interfaces
data.network_controllers = {}
for nic in host_data.network_state.nics:
name = nic.name
nic_data = {}
for descriptor, value in nic.ListFields():
if descriptor.name in ['name']:
continue
nic_data[descriptor.name] = value
data.network_controllers[name] = nic_data
# network interfaces
data.network_connections = {}
for connection in host_data.network_state.connections:
# pylint: disable=no-member
key = (
NetworkState.NetworkConnections.Protocol.Name(
connection.protocol).replace('PROTOCOL_', '').lower(),
NetworkState.NetworkConnections.AddressFamily.Name(
connection.family).replace('FAMILY_', '').lower())
connection_data = {}
for field in NetworkState.NetworkConnections.DESCRIPTOR.fields:
if field.name in ['protocol', 'family']:
continue
connection_data[field.name] = getattr(connection, field.name)
data.network_connections[key] = connection_data
return data
[docs] def insert_host(self, host_information, timestamp):
"""
Process new host-related measurements.
Args:
host_information (rst.devices.generic.HostInformation):
new measurements
timestamp (float):
timestamp of the measurements as unix timestamp in seconds
"""
self._logger.info('Processing new host results')
# start by wrapping the data in an object that accepts any type for any
# field so that we can to math in here
host_information = self._extract_host_data(host_information)
# calculate derivative and check for success
host_information = self._derive_host(host_information, timestamp)
if not host_information:
self._logger.info('Skipping host information since no '
'derived result was calculated.')
return
# calculate utility counters for individual CPUs
try:
self._calculate_cpu_percent(host_information)
except ZeroDivisionError:
self._logger.debug('Not processing host results due to cpu '
'percent calculation error', exc_info=True)
return
self._summarize_cpu_usage(host_information)
self._backend.submit_host_information(host_information, timestamp)