Source code for rsbhostmonitor

# ============================================================
#
# Copyright (C) 2015 by Johannes Wienke <jwienke at techfak dot uni-bielefeld dot de>
#
# This file may be licensed under the terms of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by:
#   CoR-Lab, Research Institute for Cognition and Robotics
#     Bielefeld University
#
# ============================================================

"""
Provides the actual implementation.

.. codeauthor:: jwienke
"""

import abc
import os
import platform
import socket
import threading
import time

from loggerbyclass import get_logger_by_class

import numpy as np

import psutil

import rsb

import rst  # pylint: disable=unused-import
import rstsandbox  # pylint: disable=unused-import
from rst.devices.generic.HostInformation_pb2 import HostInformation
from rst.devices.generic.NetworkState_pb2 import NetworkState


[docs]class DataSource(object): @abc.abstractmethod
[docs] def provide_data(self, message): pass
[docs]class CpuDataSource(DataSource): def __init__(self): self._jiffy_hz = os.sysconf(os.sysconf_names['SC_CLK_TCK']) self._jiffy_length = int(1000000.0 / float(self._jiffy_hz))
[docs] def provide_data(self, message): cpu_state = message.cpu_state cpu_state.jiffy_length = self._jiffy_length for index, cpu in enumerate(psutil.cpu_times(percpu=True)): cpu_message = cpu_state.cpus.add() cpu_message.index = index cpu_message.total = int(np.sum(cpu) * self._jiffy_hz) for key, value in cpu._asdict().iteritems(): if hasattr(cpu_message, key): setattr(cpu_message, key, int(value * self._jiffy_hz))
[docs]class LoadDataSource(DataSource):
[docs] def provide_data(self, message): load = os.getloadavg() message.cpu_state.load_1 = load[0] message.cpu_state.load_5 = load[1] message.cpu_state.load_15 = load[2]
[docs]class MemoryDataSource(DataSource):
[docs] def provide_data(self, message): memory_state = message.memory_state memory = psutil.virtual_memory() memory_state.total = memory.total memory_state.used = memory.used memory_state.usable = memory.available swap = psutil.swap_memory() memory_state.swap_total = swap.total memory_state.swap_used = swap.used
[docs]class ProcessNumberDataSource(DataSource):
[docs] def provide_data(self, message): message.process_num = len(psutil.pids())
[docs]class DiskDataSource(DataSource):
[docs] def provide_data(self, message): disk_state = message.disk_state # partitions for partition in psutil.disk_partitions(): partition_message = disk_state.partitions.add() partition_message.mount_point = partition.mountpoint partition_message.device = partition.device usage = psutil.disk_usage(partition.mountpoint) partition_message.space_total = usage.total partition_message.space_used = usage.used # devices for name, device in psutil.disk_io_counters(perdisk=True).iteritems(): device_message = disk_state.devices.add() device_message.name = name for key, value in device._asdict().iteritems(): if hasattr(device_message, key): setattr(device_message, key, value)
[docs]class NetworkDataSource(DataSource): _TYPE_MAP = { 'tcp': (NetworkState.NetworkConnections.PROTOCOL_TCP, None), 'udp': (NetworkState.NetworkConnections.PROTOCOL_UDP, None), 'unix': (NetworkState.NetworkConnections.PROTOCOL_OTHER, (socket.AF_UNIX, NetworkState.NetworkConnections.FAMILY_UNIX)) } _REMOTE_FAMILY_MAPPING = { socket.AF_INET: NetworkState.NetworkConnections.FAMILY_IPV4, socket.AF_INET6: NetworkState.NetworkConnections.FAMILY_IPV6 }
[docs] def provide_data(self, message): network_state = message.network_state self._provide_nic_infos(network_state) self._provide_connection_infos(network_state)
@staticmethod def _provide_nic_infos(network_state): for device, counters in \ psutil.net_io_counters(pernic=True).iteritems(): nic_state = network_state.nics.add() nic_state.name = device nic_state.bytes_received = counters.bytes_recv nic_state.bytes_sent = counters.bytes_sent nic_state.packets_received = counters.packets_recv nic_state.packets_sent = counters.packets_sent nic_state.receive_errors = counters.errin nic_state.send_errors = counters.errout nic_state.receive_drops = counters.dropin nic_state.send_drops = counters.dropout @staticmethod def _prepare_connections_output(protocol, family, network_state): output = network_state.connections.add() output.protocol = protocol output.family = family return output @classmethod def _provide_connection_infos(cls, network_state): for kind, (protocol, family_default) in cls._TYPE_MAP.iteritems(): # prepare the output message structures as a target for counting # connections counters_by_family = {} if family_default is None: for orig_type, out_type in \ cls._REMOTE_FAMILY_MAPPING.iteritems(): counters_by_family[orig_type] = \ cls._prepare_connections_output( protocol, out_type, network_state) else: counters_by_family[family_default[0]] = \ cls._prepare_connections_output( protocol, family_default[1], network_state) for connection in psutil.net_connections(kind=kind): field_name = 'num_{}'.format(connection.status.lower()) counters = counters_by_family[connection.family] setattr(counters, field_name, getattr(counters, field_name) + 1)
[docs]class UserDataSource(DataSource):
[docs] def provide_data(self, message): user_state = message.user_state user_names = [] hosts = [] users = psutil.users() for user in users: user_names.append(user.name) hosts.append(user.host) user_state.logged_in_users = len(set(user_names)) user_state.login_sessions = len(users) user_state.login_hosts = len(set(hosts))
[docs]class MonitoringThread(threading.Thread): # pylint: disable=too-many-instance-attributes def __init__(self, cycle_time, informer, host_name=platform.node(), print_results=False): """ Constructor. Args: cycle_time (float): Length of one monitoring iteration in seconds. informer (rsb.Informer): RSB informer to use for sending result events. host_name (str): Name of the host this thread operates for. Will be placed in the exported RSB data. Defaults to ``platform.node()``. print_results (bool): If True, print data to send on stdout. Defaults to False. """ threading.Thread.__init__(self) self.name = 'MonitoringThread[{}]'.format(self.name) self._logger = get_logger_by_class(self.__class__) self._cycle_time = cycle_time self._informer = informer self._host_name = host_name self._print_results = print_results self._interrupted = threading.Event() self._sources = [] self._sources.append(CpuDataSource()) self._sources.append(LoadDataSource()) self._sources.append(MemoryDataSource()) self._sources.append(ProcessNumberDataSource()) self._sources.append(DiskDataSource()) self._sources.append(NetworkDataSource()) self._sources.append(UserDataSource())
[docs] def interrupt(self): """ Interrupts the processing of the thread. """ self._interrupted.set()
[docs] def run(self): self._logger.info('Starting monitoring iterations') start = time.time() while not self._interrupted.is_set(): self._logger.debug('Starting monitoring iteration') event = rsb.Event(scope=self._informer.getScope(), type=HostInformation) message = HostInformation() message.hostname = self._host_name event.metaData.setCreateTime() for source in self._sources: source.provide_data(message) event.metaData.setUserTime('monitor-finished') event.data = message if self._print_results: print(message) self._informer.publishEvent(event) self._logger.debug('Monitoring iteration finished, sleeping') now = time.time() self._interrupted.wait(max(0, self._cycle_time - (now - start))) start += self._cycle_time self._logger.info('Terminating')