# ============================================================
#
# Copyright (C) 2015 by Johannes Wienke <jwienke at techfak dot uni-bielefeld dot de>
#
# This file may be licensed under the terms of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by:
# CoR-Lab, Research Institute for Cognition and Robotics
# Bielefeld University
#
# ============================================================
"""
Provides the actual implementation.
.. codeauthor:: jwienke
"""
import abc
import os
import platform
import socket
import threading
import time
from loggerbyclass import get_logger_by_class
import numpy as np
import psutil
import rsb
import rst # pylint: disable=unused-import
import rstsandbox # pylint: disable=unused-import
from rst.devices.generic.HostInformation_pb2 import HostInformation
from rst.devices.generic.NetworkState_pb2 import NetworkState
[docs]class DataSource(object):
@abc.abstractmethod
[docs] def provide_data(self, message):
pass
[docs]class CpuDataSource(DataSource):
def __init__(self):
self._jiffy_hz = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
self._jiffy_length = int(1000000.0 / float(self._jiffy_hz))
[docs] def provide_data(self, message):
cpu_state = message.cpu_state
cpu_state.jiffy_length = self._jiffy_length
for index, cpu in enumerate(psutil.cpu_times(percpu=True)):
cpu_message = cpu_state.cpus.add()
cpu_message.index = index
cpu_message.total = int(np.sum(cpu) * self._jiffy_hz)
for key, value in cpu._asdict().iteritems():
if hasattr(cpu_message, key):
setattr(cpu_message, key, int(value * self._jiffy_hz))
[docs]class LoadDataSource(DataSource):
[docs] def provide_data(self, message):
load = os.getloadavg()
message.cpu_state.load_1 = load[0]
message.cpu_state.load_5 = load[1]
message.cpu_state.load_15 = load[2]
[docs]class MemoryDataSource(DataSource):
[docs] def provide_data(self, message):
memory_state = message.memory_state
memory = psutil.virtual_memory()
memory_state.total = memory.total
memory_state.used = memory.used
memory_state.usable = memory.available
swap = psutil.swap_memory()
memory_state.swap_total = swap.total
memory_state.swap_used = swap.used
[docs]class ProcessNumberDataSource(DataSource):
[docs] def provide_data(self, message):
message.process_num = len(psutil.pids())
[docs]class DiskDataSource(DataSource):
[docs] def provide_data(self, message):
disk_state = message.disk_state
# partitions
for partition in psutil.disk_partitions():
partition_message = disk_state.partitions.add()
partition_message.mount_point = partition.mountpoint
partition_message.device = partition.device
usage = psutil.disk_usage(partition.mountpoint)
partition_message.space_total = usage.total
partition_message.space_used = usage.used
# devices
for name, device in psutil.disk_io_counters(perdisk=True).iteritems():
device_message = disk_state.devices.add()
device_message.name = name
for key, value in device._asdict().iteritems():
if hasattr(device_message, key):
setattr(device_message, key, value)
[docs]class NetworkDataSource(DataSource):
_TYPE_MAP = {
'tcp': (NetworkState.NetworkConnections.PROTOCOL_TCP, None),
'udp': (NetworkState.NetworkConnections.PROTOCOL_UDP, None),
'unix': (NetworkState.NetworkConnections.PROTOCOL_OTHER,
(socket.AF_UNIX,
NetworkState.NetworkConnections.FAMILY_UNIX))
}
_REMOTE_FAMILY_MAPPING = {
socket.AF_INET: NetworkState.NetworkConnections.FAMILY_IPV4,
socket.AF_INET6: NetworkState.NetworkConnections.FAMILY_IPV6
}
[docs] def provide_data(self, message):
network_state = message.network_state
self._provide_nic_infos(network_state)
self._provide_connection_infos(network_state)
@staticmethod
def _provide_nic_infos(network_state):
for device, counters in \
psutil.net_io_counters(pernic=True).iteritems():
nic_state = network_state.nics.add()
nic_state.name = device
nic_state.bytes_received = counters.bytes_recv
nic_state.bytes_sent = counters.bytes_sent
nic_state.packets_received = counters.packets_recv
nic_state.packets_sent = counters.packets_sent
nic_state.receive_errors = counters.errin
nic_state.send_errors = counters.errout
nic_state.receive_drops = counters.dropin
nic_state.send_drops = counters.dropout
@staticmethod
def _prepare_connections_output(protocol, family, network_state):
output = network_state.connections.add()
output.protocol = protocol
output.family = family
return output
@classmethod
def _provide_connection_infos(cls, network_state):
for kind, (protocol, family_default) in cls._TYPE_MAP.iteritems():
# prepare the output message structures as a target for counting
# connections
counters_by_family = {}
if family_default is None:
for orig_type, out_type in \
cls._REMOTE_FAMILY_MAPPING.iteritems():
counters_by_family[orig_type] = \
cls._prepare_connections_output(
protocol,
out_type,
network_state)
else:
counters_by_family[family_default[0]] = \
cls._prepare_connections_output(
protocol,
family_default[1],
network_state)
for connection in psutil.net_connections(kind=kind):
field_name = 'num_{}'.format(connection.status.lower())
counters = counters_by_family[connection.family]
setattr(counters, field_name,
getattr(counters, field_name) + 1)
[docs]class UserDataSource(DataSource):
[docs] def provide_data(self, message):
user_state = message.user_state
user_names = []
hosts = []
users = psutil.users()
for user in users:
user_names.append(user.name)
hosts.append(user.host)
user_state.logged_in_users = len(set(user_names))
user_state.login_sessions = len(users)
user_state.login_hosts = len(set(hosts))
[docs]class MonitoringThread(threading.Thread):
# pylint: disable=too-many-instance-attributes
def __init__(self, cycle_time, informer, host_name=platform.node(),
print_results=False):
"""
Constructor.
Args:
cycle_time (float):
Length of one monitoring iteration in seconds.
informer (rsb.Informer):
RSB informer to use for sending result events.
host_name (str):
Name of the host this thread operates for. Will be placed in
the exported RSB data. Defaults to ``platform.node()``.
print_results (bool):
If True, print data to send on stdout. Defaults to False.
"""
threading.Thread.__init__(self)
self.name = 'MonitoringThread[{}]'.format(self.name)
self._logger = get_logger_by_class(self.__class__)
self._cycle_time = cycle_time
self._informer = informer
self._host_name = host_name
self._print_results = print_results
self._interrupted = threading.Event()
self._sources = []
self._sources.append(CpuDataSource())
self._sources.append(LoadDataSource())
self._sources.append(MemoryDataSource())
self._sources.append(ProcessNumberDataSource())
self._sources.append(DiskDataSource())
self._sources.append(NetworkDataSource())
self._sources.append(UserDataSource())
[docs] def interrupt(self):
"""
Interrupts the processing of the thread.
"""
self._interrupted.set()
[docs] def run(self):
self._logger.info('Starting monitoring iterations')
start = time.time()
while not self._interrupted.is_set():
self._logger.debug('Starting monitoring iteration')
event = rsb.Event(scope=self._informer.getScope(),
type=HostInformation)
message = HostInformation()
message.hostname = self._host_name
event.metaData.setCreateTime()
for source in self._sources:
source.provide_data(message)
event.metaData.setUserTime('monitor-finished')
event.data = message
if self._print_results:
print(message)
self._informer.publishEvent(event)
self._logger.debug('Monitoring iteration finished, sleeping')
now = time.time()
self._interrupted.wait(max(0, self._cycle_time - (now - start)))
start += self._cycle_time
self._logger.info('Terminating')